Skip to content

Large payload azure blob externalization support#124

Open
andystaples wants to merge 3 commits intomicrosoft:mainfrom
andystaples:andystaples/add-large-payload-support
Open

Large payload azure blob externalization support#124
andystaples wants to merge 3 commits intomicrosoft:mainfrom
andystaples:andystaples/add-large-payload-support

Conversation

@andystaples
Copy link
Contributor

Large Payload Externalization Support

Summary

Adds support for automatically offloading oversized orchestration payloads to Azure Blob Storage (or a custom external store). When a payload exceeds a configurable threshold (default 900 KB), it is compressed, uploaded to blob storage, and replaced with a compact reference token in the gRPC message. On the receiving side, tokens are detected and the original data is transparently downloaded and decompressed. No changes are needed in orchestrator or activity code.

This mirrors the large payload externalization feature in the .NET Durable Task SDK (Microsoft.DurableTask.Extensions.AzureBlobPayloads) and uses a cross-SDK compatible blob token format (blob:v1:<container>:<blobName>).

Install

pip install durabletask[azure-blob-payloads]

Usage

from durabletask.extensions.azure_blob_payloads import BlobPayloadStore, BlobPayloadStoreOptions

store = BlobPayloadStore(BlobPayloadStoreOptions(
    connection_string="DefaultEndpointsProtocol=https;...",
))

# Pass payload_store to both worker and client
worker = TaskHubGrpcWorker(payload_store=store)
client = TaskHubGrpcClient(payload_store=store)

Changes

New packages

Package Description
durabletask/payload/ Public API: PayloadStore ABC, LargePayloadStorageOptions, and helper functions for externalizing/de-externalizing protobuf messages
durabletask/extensions/azure_blob_payloads/ Azure Blob Storage implementation (BlobPayloadStore, BlobPayloadStoreOptions) with GZip compression, sync + async support, and container auto-creation

Core SDK changes

  • worker.py — Added payload_store parameter; sends WORKER_CAPABILITY_LARGE_PAYLOADS capability flag; hooks externalize/deexternalize into orchestrator, activity, and entity execution paths
  • client.py — Added payload_store parameter to both TaskHubGrpcClient and AsyncTaskHubGrpcClient; hooks into all 10 client methods (schedule, get state, wait, raise event, terminate, signal entity, get entity, list orchestrations, list entities)
  • __init__.py — Exports PayloadStore and LargePayloadStorageOptions
  • pyproject.toml — Added azure-blob-payloads optional extra; added azurite pytest marker

Tests

  • 28 unit tests (test_large_payload.py) — Covers externalization round-trips for orchestration inputs/outputs, activity inputs/results, sub-orchestration payloads, event data, entity operations, nested/repeated message fields, threshold behavior, token parsing, and all client method hooks
  • 8 end-to-end tests (test_large_payload_e2e.py) — Tests against Azurite with real blob upload/download: large input round-trip, activity results, combined input+output, event data, terminate, fan-out/fan-in, blob verification, and small-payload inline behavior

CI

  • .github/workflows/durabletask.yml — Added Node.js 20 setup, Azurite install + background start, azure-blob-payloads extra install, aiohttp dependency, and azurite marker for e2e tests

Documentation

  • docs/features.md — New "Large payload externalization" section with configuration, usage, options table, cross-SDK compatibility notes, and custom payload store example
  • docs/supported-patterns.md — New "Large payload externalization" pattern with a complete code example
  • examples/large_payload/ — End-to-end example (app.py + README.md) demonstrating the feature with the DTS emulator and Azurite
  • examples/README.md — Added note about the large payload example
  • .github/copilot-instructions.md — Updated Project Structure with new subpackages
  • CHANGELOG.md / README.md — Updated with feature entry and install instructions

Design documents (for review context)

  • docs/large-payload-feature-comparison.md — Gap analysis between .NET and Python SDKs
  • docs/large-payload-implementation-proposals.md — Implementation approach proposals

Design decisions

  • Optional extras over separate package — The blob store is delivered as pip install durabletask[azure-blob-payloads] rather than a separate durabletask-azureblobpayloads package, keeping installation simple while avoiding mandatory azure-storage-blob dependency
  • Module structure mirrors .NETdurabletask.extensions.azure_blob_payloads maps to Microsoft.DurableTask.Extensions.AzureBlobPayloads
  • Public PayloadStore ABC — Exposed at durabletask.payload to support custom store implementations
  • Protobuf message walking — Rather than a gRPC interceptor, payloads are externalized/de-externalized by recursively walking protobuf StringValue fields, avoiding the complexity of interceptor-based message type mapping
  • Cross-SDK token format — Uses blob:v1:<container>:<blobName> matching the .NET SDK for interoperability

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds opt-in large payload externalization to the Durable Task Python SDK by introducing a PayloadStore abstraction, wiring payload externalization/de-externalization into the worker/client message paths, and providing an Azure Blob Storage-backed implementation (plus tests, docs, examples, and CI support).

Changes:

  • Introduces durabletask.payload (public PayloadStore + options + protobuf walkers for externalize/deexternalize).
  • Adds durabletask.extensions.azure_blob_payloads with BlobPayloadStore and configuration options, installable via durabletask[azure-blob-payloads].
  • Integrates payload store hooks into TaskHubGrpcWorker/TaskHubGrpcClient (+ async client), adds unit + Azurite e2e tests, docs/examples, and CI wiring.

Reviewed changes

Copilot reviewed 25 out of 25 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
tests/durabletask/test_large_payload_e2e.py New Azurite-backed end-to-end tests for upload/download round-trips.
tests/durabletask/test_large_payload.py New unit tests for message walking, token parsing, capability behavior, and client hook coverage.
pyproject.toml Adds azure-blob-payloads optional extra and azurite pytest marker.
examples/large_payload/app.py New end-to-end example app demonstrating blob externalization with DTS + Azurite.
examples/large_payload/README.md README for the new large-payload example and setup steps.
examples/README.md Notes the new example and its prerequisites.
durabletask/worker.py Adds payload_store support, capability advertising, and externalize/deexternalize hooks.
durabletask/payload/store.py New public PayloadStore ABC and LargePayloadStorageOptions dataclass.
durabletask/payload/helpers.py New recursive protobuf walkers for externalizing/de-externalizing StringValue payloads (sync + async).
durabletask/payload/init.py Exposes payload store types and helper functions as a public API.
durabletask/extensions/azure_blob_payloads/options.py Adds BlobPayloadStoreOptions extending shared large payload options.
durabletask/extensions/azure_blob_payloads/blob_payload_store.py Implements Azure Blob payload store with token format + optional gzip + sync/async APIs.
durabletask/extensions/azure_blob_payloads/init.py Extension package entry point with dependency guard + exports.
durabletask/extensions/init.py New package marker for extensions.
durabletask/client.py Adds payload_store to sync/async clients and hooks externalize/deexternalize into client methods.
durabletask/init.py Exports PayloadStore and LargePayloadStorageOptions at package root.
docs/supported-patterns.md Documents the new “Large payload externalization” pattern with example code.
docs/large-payload-implementation-proposals.md Adds design proposal doc for implementation approaches (context).
docs/large-payload-feature-comparison.md Adds comparison doc vs .NET implementation (context).
docs/features.md Adds a full “Large payload externalization” feature section, config, and custom store example.
README.md Documents the optional extra and links to docs/examples.
CHANGELOG.md Adds changelog entries for the new feature and APIs.
.vscode/mcp.json Adds VS Code MCP configuration.
.github/workflows/durabletask.yml Adds Azurite setup and installs extras to run new e2e coverage in CI.
.github/copilot-instructions.md Updates project structure docs to include the new subpackages.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +276 to +282
def upload(self, data: bytes) -> str:
# Store data and return a unique token string
...

async def upload_async(self, data: bytes) -> str:
...

from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker

# Configure the blob payload store
store = BlobPayloadStore(BlobPayloadStoreOptions(
Comment on lines +13 to +15
from azure.storage.blob import BlobServiceClient
from azure.storage.blob.aio import BlobServiceClient as AsyncBlobServiceClient

Comment on lines +85 to +89
store = BlobPayloadStore(BlobPayloadStoreOptions(
connection_string=storage_conn_str,
# Use a low threshold so that we can see externalization in action
threshold_bytes=1_024,
))
docs/features.md Outdated
max_stored_payload_bytes=10_485_760, # default (10 MB)
enable_compression=True, # default
)
store = BlobPayloadStore(options)
Comment on lines +331 to +338
asyncio.get_event_loop().run_until_complete(
externalize_payloads_async(req, store, instance_id="async-1")
)
assert req.input.value.startswith(FakePayloadStore.TOKEN_PREFIX)

asyncio.get_event_loop().run_until_complete(
deexternalize_payloads_async(req, store)
)
Comment on lines +505 to 512
capabilities = []
if self._payload_store is not None:
capabilities.append(pb.WORKER_CAPABILITY_LARGE_PAYLOADS)
get_work_items_request = pb.GetWorkItemsRequest(
maxConcurrentOrchestrationWorkItems=self._concurrency_options.maximum_concurrent_orchestration_work_items,
maxConcurrentActivityWorkItems=self._concurrency_options.maximum_concurrent_activity_work_items,
capabilities=capabilities if capabilities else None,
)
"opentelemetry-sdk>=1.0.0"
]
azure-blob-payloads = [
"azure-storage-blob>=12.0.0"
# ------------------------------------------------------------------

def is_known_token(self, value: str) -> bool:
return value.startswith(_TOKEN_PREFIX)
docs/features.md Outdated
account_url="https://<account>.blob.core.windows.net",
credential=DefaultAzureCredential(),
)
store = BlobPayloadStore(options)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants