Rapidflare Blog
All posts

Building a Scalable Ingestion Pipeline with Temporal (Part 2)

Heartbeats, cancellation, approval gates, scheduling, observability, and the operational patterns that made our Temporal ingestion pipeline easier to run.

·
Building a Scalable Ingestion Pipeline with Temporal (Part 2)
Contents
  1. Part 2: Operating at scale
  2. Heartbeats: keeping long activities alive
  3. Error handling
  4. Document children return status
  5. Non-retryable vs retryable
  6. Ordered exception handling
  7. Cancellation
  8. Checkpoint-based cancellation
  9. Cancelling a running activity
  10. Cascading to child workflows
  11. The swallowed-cancel race condition
  12. Cleanup after cancellation: asyncio.shield()
  13. Human approval and data promotion
  14. The approval gate
  15. Auto-approval policies
  16. The data swap
  17. Recurring ingestion with Temporal Schedules
  18. Observability: tracking progress across distributed workers
  19. Dedicated status worker
  20. Structured logging with correlation IDs
  21. Stage tracking
  22. Developer experience: isolated testing
  23. Developer-namespaced task queues
  24. Preview environments for pull requests
  25. Preview mode for fast iteration
  26. What we would build next
  27. Recommendations

Part 2: Operating at scale

Part 2 of two. Part 1 covers cloud-storage offload, the sliding window, fan-out patterns, and continue-as-new.


In Part 1, we covered the architecture behind our document ingestion pipeline: sliding-window fan-out, cloud storage as a data bus, and workflow state management with continue-as-new.

Architecture diagrams don’t crash at 3 AM. Running systems do. This post is about the part the diagram skips: keeping long activities alive, cancelling gracefully mid-flight, tracking progress across distributed workers, and cleaning up when a run fails halfway through.


Heartbeats: keeping long activities alive

The staging activity can run for hours when crawling large document libraries. Temporal uses heartbeats to distinguish “still working” from “worker crashed.” If the activity does not heartbeat within the configured interval, Temporal assumes the worker died and reschedules on a different worker.

What that means in practice: if temporal detects it as failure, the crawler starts over from the beginning unless it has explicit resume state. Hours of work, gone.

So we pass a heartbeat callback to every long-running activity. The callback fires at processing milestones, not on a fixed timer:

"Starting crawl: component_datasheets"
"component_datasheets: 0/N documents fetched"
"component_datasheets: fetched 500 datasheets"
"component_datasheets: all documents fetched ✓"
"Starting next folder: compliance_certificates"

We also heartbeat during pre-processing steps. A single PDF can run to hundreds of pages, and we need to scan each one to decide whether it needs splitting before we offload to storage:

"Pre-analyzing 1/M PDFs for splitting"
"Pre-analyzing 50/M PDFs for splitting"
"Pre-analysis complete: 12 large documents flagged for splitting"

Tip: Heartbeat at processing milestones, not on a timer. "Analyzed document TPS65235 for splitting" is useful for debugging. "heartbeat #294" tells you nothing when you have to debug on weekends.


Error handling

Document children return status

For per-document failures, child workflows do not raise exceptions to the parent. They catch the failure and return a result with a status flag:

for each document:
    try:
        extract → enrich → index
        return success(doc_id)
    except any error:
        return failure(doc_id, error_message)   # never raise to parent

If every document failure raised, the parent would have to inspect each exception and decide what to do next. Was the PDF malformed? An LLM timeout? A database connection drop? Each needs different handling, and putting that logic in the parent makes it brittle. With status returns, the parent treats every document the same way: it sees success or failure, increments a counter, and moves on. The specifics live in the document’s failure record, where they belong for debugging.

Infrastructure failures can still raise, and the base workflow handles those separately. The rule is narrower: document-level failures should stay in the document result. Every failure gets logged with doc ID, error type, and retry count. When you’re debugging one failure among thousands, you need that granularity.

Non-retryable vs retryable

Deterministic failures (invalid configuration, unsupported file format, missing credentials) are marked as non-retryable (ApplicationError(non_retryable=True)). Retrying won’t help.

Everything else (network timeouts, 429 rate limits, 503 service unavailable) is retryable by default.

The staging activity sets maximum_attempts=1, which tells Temporal not to retry it. That sounds backwards at first: isn’t the whole point of Temporal to retry failures? In this case, our crawlers already handle their own retries internally for rate limits and transient network errors. If Temporal retried on top of that, it would throw away whatever progress the crawler had made and start the entire source again from scratch. For a multi-hour crawl, that’s hours of work gone. The rule of thumb: retry at the layer that’s cheapest to retry, which is inside the crawler, not at the workflow boundary.

Ordered exception handling

The base workflow catches exceptions from most specific to most general:

  1. Domain errors: application-level errors with clear messages
  2. Activity failures: wraps the real error as a nested cause that needs unwrapping
  3. Child workflow failures: propagated from infrastructure paths, not ordinary document failures
  4. Cancellation: workflow cancelled externally, caught explicitly for clean shutdown
  5. Termination: workflow terminated by the platform
  6. Catch-all: ensures no error goes unhandled

On any unrecoverable failure, the workflow runs cleanup in rollback mode before returning a failed result.


Cancellation

Cancellation sounds simple. Set a flag, check it at checkpoints, clean up. In practice, it’s one of the hardest things we built.

Skip any of the hard parts and the failures get ugly fast. A user clicks cancel, and the workflow doesn’t notice for forty minutes because the staging activity isn’t watching for it. A cancel arrives mid-indexing, the parent stops, but child workflows keep writing documents into a namespace that’s about to be torn down. Cleanup starts running, then immediately re-cancels itself on its first await, leaving staged data orphaned in cloud storage. Worst case: the SDK silently swallows the cancel signal, and the workflow runs to completion as if nothing happened.

Those failure modes map to four problems: interrupting long-running activities, cascading to in-flight children, a race condition in the Python SDK, and running cleanup code after the workflow is already in a cancelled state.

Checkpoint-based cancellation

We check the cancellation flag at natural boundaries in the workflow lifecycle. At each checkpoint, the workflow either continues or hands off to the cancellation handler:

flowchart LR
    A["Start"] --> B["✓ Pre-staging"]
    B --> C["Staging activity"]
    C --> D["✓ Post-staging"]
    D --> E["Indexing (sliding window)"]
    E --> F["✓ Post-indexing"]
    F --> G["Approval wait"]
    G --> H["✓ Post-approval"]
    H --> I["Cleanup & complete"]

    classDef stage fill:#eef2ff,stroke:#2563eb,color:#111827,stroke-width:1.4px
    classDef checkpoint fill:#fff7cc,stroke:#b45309,color:#111827,stroke-width:1.6px
    classDef terminal fill:#ecfeff,stroke:#06b6d4,color:#111827,stroke-width:1.4px
    class A,I terminal
    class C,E,G stage
    class B,D,F,H checkpoint

Each checkpoint is a simple guard: if self.cancel_requested: return await self.handle_cancellation(trigger). No hot loop polling.

Cancelling a running activity

The hard case is mid-staging. Staging can run for hours on a large source. You can’t set the flag and wait for the activity to notice.

The pattern: start_activity() returns a non-blocking handle. Then wait_condition() resolves on whichever comes first, activity completion or cancellation signal. If cancel wins the race, we cancel the activity handle so Temporal sends a heartbeat cancellation to the running worker.

handle = workflow.start_activity(
    staging_activity,
    trigger,
    cancellation_type=ActivityCancellationType.TRY_CANCEL,
    start_to_close_timeout=timedelta(hours=5),
    heartbeat_timeout=timedelta(minutes=10),
)

await workflow.wait_condition(lambda: self.cancel_requested or handle.done())

if self.cancel_requested and not handle.done():
    handle.cancel()


return await handle

ActivityCancellationType.TRY_CANCEL tells Temporal to deliver cancellation via the heartbeat mechanism. The activity catches asyncio.CancelledError, cleans up local state, and re-raises. The workflow doesn’t wait for a multi-hour crawl to finish naturally.

Cascading to child workflows

During the indexing phase, dozens of child workflows may be processing individual documents. Before cleanup runs, we cancel all active children so no new data is written while staged documents are being deleted:

async def cancel_in_flight_children(self) -> None:
    for child_id in self.active_child_ids():
        try:
            handle = workflow.get_external_workflow_handle(child_id)
            await handle.cancel()
        except Exception as exc:
            workflow.logger.warning(f"Could not cancel child {child_id!r}: {exc}")

Errors per child are swallowed. A child may have already completed between the cancel request and this call.

The swallowed-cancel race condition

This was the hardest bug. When child completion signals and a cancel event arrives in the same Temporal Workflow Task (WFT), the Python SDK processes them in a specific order: signals first (job_sets[1]), then cancellation (job_sets[2]). The sequence:

  1. The child completion signal arrives and resolves the wait condition.
  2. In the same WFT, the cancel event fires. The SDK injects a cancellation into the coroutine that was waiting.
  3. But the wait already completed in step 1. The waiting helper catches the cancellation internally and returns normally, as if it had finished cleanly.
  4. The SDK’s internal “cancellation pending” counter gets decremented during the handoff.
  5. No later checkpoint re-raises the cancellation. The workflow continues as if cancel never happened.

The fix: we built wait_condition_cancellable(), which reads the SDK’s internal _cancel_requested flag after every return path. This flag is set unconditionally when a cancel arrives and is never cleared, making it a reliable source of truth even after the race.

def _is_cancel_requested() -> bool:
    """The workflow instance IS asyncio's running loop in Temporal's Python SDK."""
    return getattr(asyncio.get_running_loop(), "_cancel_requested", False)

async def wait_condition_cancellable(fn, *, timeout=None):
    try:
        await workflow.wait_condition(fn, timeout=timeout)
    except asyncio.TimeoutError:
        if _is_cancel_requested():
            raise asyncio.CancelledError()
        raise
    if _is_cancel_requested():
        raise asyncio.CancelledError()

Tip: If you use workflow.wait_condition() and CancelledError in the same Temporal workflow, test the scenario where a signal and a cancel arrive in the same Workflow Task. The SDK may silently consume the cancel.

This is intentionally isolated to one helper. _cancel_requested is an SDK internal, so we keep it wrapped, covered by tests, and easy to revisit when upgrading the Temporal Python SDK.

Cleanup after cancellation: asyncio.shield()

After catching CancelledError, the workflow needs to run cleanup: update status, delete staged documents, commit the result. Each of those await calls is a Temporal checkpoint, and subsequent checkpoints may re-raise CancelledError since the workflow is in a cancelled state.

Without protection, the first cleanup await raises again, and none of the cleanup completes. We wrap every cleanup call with asyncio.shield():

except (asyncio.CancelledError, CancelledError):
    await asyncio.shield(update_status(WorkflowStatus.CANCELLING))
    await asyncio.shield(cancel_and_cleanup(trigger))
    await asyncio.shield(update_status(WorkflowStatus.CANCELLED))
    return failure_result

Human approval and data promotion

The approval gate

Part 1 introduced the approval gate. Operationally, this is where the workflow either waits for a human or follows the source’s auto-approval policy before making the new data live. We once auto-approved a crawl that indexed thousands of 404 error pages from a broken document portal. That’s when we added a human checkpoint.

The workflow calls workflow.wait_condition() and blocks until it receives an approval or rejection signal. Temporal handles the durability: the workflow can sit waiting for days, and if the worker restarts, it picks right back up.

sequenceDiagram
    participant Admin as Admin UI
    participant API as API Server
    participant WF as Ingestion Workflow

    WF->>WF: wait_condition(approval_received)
    Admin->>API: POST /approve/{workflow_id}
    API->>WF: signal("approve")
    WF->>WF: Resume → swap data

If nobody acts within the timeout period, the workflow auto-rejects and cleans up. No stale ingestions hanging around forever.

Auto-approval policies

Manual approval doesn’t scale. Fifty sources on weekly schedules means fifty approval clicks per week. Most are routine re-crawls. We added a per-source approval policy:

PolicyBehavior
MANUALAlways requires human review
AUTO_APPROVEAuto-approve after basic sanity checks (for trusted, stable sources)
STRICTAuto-approve only if 100% of documents succeed; any failures trigger manual review

One edge case: if the crawler returns zero documents (broken credentials, API outage, bad config), we always reject regardless of policy. A source that silently empties its index is worse than a failed ingestion.

The data swap

We maintain two versions during ingestion: the old (live) data and the new (staged) data, each tagged by a unique run ID.

flowchart LR
    A["Ingestion complete"] --> B{Decision}
    B -->|"Approve"| C["Switch active run ID"]
    C --> E["Delete OLD docs (by previous run ID)"]
    B -->|"Reject / Timeout"| D["Delete NEW docs (by current run ID)"]
    E --> F["New data is live"]
    D --> G["Old data remains live"]

    classDef start fill:#f5f3ff,stroke:#7c3aed,color:#111827,stroke-width:1.4px
    classDef decision fill:#fff7cc,stroke:#b45309,color:#111827,stroke-width:1.6px
    classDef approve fill:#ecfdf5,stroke:#10b981,color:#111827,stroke-width:1.4px
    classDef reject fill:#fef2f2,stroke:#ef4444,color:#111827,stroke-width:1.4px
    class A start
    class B decision
    class C,E,F approve
    class D,G reject

On approval, switch the active run ID to the new version, then retire the previous version. On rejection or cancellation, delete the new version. The old data stays live until the pointer flips, so this is a zero-downtime swap.


Recurring ingestion with Temporal Schedules

Customer data isn’t static. A product datasheet portal updates weekly. A support knowledge base gains new articles every few days. Pricing pages get rewritten whenever the sales team feels like rewriting them. Ingest once and walk away, and within a week the answers our agents give start drifting from reality.

So most sources are configured to re-ingest on a schedule: nightly for fast-moving content, weekly for stable documentation, monthly for things that barely change. Fifty active sources on mixed cadences add up to hundreds of ingestion runs every month, all triggering themselves without anyone clicking a button. That’s the workload we needed scheduling to handle reliably.

The obvious answer is a cron job that calls the ingest API. We didn’t do that.

Temporal has a native Schedules API. The schedule lives inside Temporal, not in an external scheduler. You get durability, pause/unpause, and backfill out of the box.

The schedule fires a lightweight wrapper workflow, not the real ingestion workflow directly. Two reasons.

First, fresh config. A schedule created three months ago shouldn’t use three-month-old credentials or folder lists. The wrapper workflow loads fresh source config from the database before dispatching. Every scheduled run uses current settings.

Second, overlap guard. If Monday’s ingestion is still running when Tuesday’s schedule fires, you don’t want two concurrent ingestions writing to the same namespace. The wrapper checks whether an ingestion is already running. If one is, it exits with skipped: already_running. We also set ScheduleOverlapPolicy.SKIP at the Temporal level so the scheduler won’t queue a backlog.

When a source is being migrated or the data is expected to be stale, we pause the schedule rather than deleting and recreating it. Unpausing brings it back with history intact.


Observability: tracking progress across distributed workers

A running workflow has the most current orchestration state, rebuilt from Temporal history on replay. Your admin UI still needs a durable read model for real-time progress. Without it, support spends the day answering “how long until it’s done?”

Dedicated status worker

We run a dedicated status worker on its own task queue. It periodically queries running workflows via Temporal’s query API and writes progress to the database. The admin UI reads from the database rather than querying Temporal directly.

sequenceDiagram
    participant UI as Admin UI
    participant DB as Database
    participant SW as Status Worker
    participant WF as Ingestion Workflow

    SW->>WF: query("get_progress")
    WF-->>SW: {stage, total, successful, failed, ...}
    SW->>DB: UPDATE workflow_status SET ...
    UI->>DB: SELECT * FROM workflow_status
    DB-->>UI: Progress data for rendering

The read model is intentionally eventually consistent. Keeping the UI decoupled from Temporal means a saturated ingestion worker doesn’t block progress updates, and the product surface isn’t directly coupled to the workflow engine.

Structured logging with correlation IDs

Processing a single document can span multiple worker instances and restarts. To make failures debuggable, every log entry carries correlation IDs at three levels:

Workflow-level: workflow ID (stable across continue-as-new), run ID (changes on restart), execution ID (derived, stable)

Document-level: document ID, deterministic trace ID derived from document ID, parent workflow ID

Instance-level: worker identity (encodes service name, deployment revision, instance ID)

When debugging a document failure, you filter by trace ID and get every log entry for that document chronologically, even across multiple worker instances. Without this, distributed debugging is guesswork.

Stage tracking

The workflow advances through stages: initializing → staging → indexing → awaiting approval → finalizing → completed. Each transition is persisted with document counts, timing, and a human-readable summary. The admin UI renders this as a progress stepper.


Developer experience: isolated testing

Workflows that run for hours and touch external systems are miserable to test without isolation. Two developers pointing at the same Temporal cluster will pick up each other’s workflows. We solved this at two levels.

Developer-namespaced task queues

In local development, the system appends the developer’s OS username to all task queue names:

Production:  ingestion-task-queue
Developer:   ingestion-task-queue-johndoe

Two developers run their local workers against the same Temporal cluster without conflicts. The same namespacing applies to status and post-ingestion queues.

Preview environments for pull requests

For integration testing, we deploy preview workers tied to specific PRs. CI deploys a full worker instance with PR-specific task queues:

Preview Worker: preview-pr-123-ingestion-workers
Task Queue:     ingestion-task-queue-preview-pr-123

The preview worker runs the code from that PR branch. Developers trigger ingestions against the preview endpoint, and workflows route to the preview worker. When the PR is merged or closed, cleanup tears down the preview services automatically.

Preview mode for fast iteration

You don’t always need to ingest an entire source to test a change. A preview_mode flag limits document retrieval to the first few documents. Combined with developer-specific task queues, you get results in minutes instead of hours.


What we would build next

Cost optimization. We’re exploring scaling workers based on Temporal workflow queue depth rather than running them continuously.

Cross-region failover. Everything’s in one GCP region today. Temporal supports multi-region, and we plan to use it.

Adaptive backpressure. Turn slow-down signals from the database or providers into automatic window-size reduction instead of relying on individual retries.

Automated scale testing. Testing the full pipeline at scale is currently manual. We want CI-integrated integration tests that exercise the complete workflow.


Recommendations

  • Heartbeat at milestones. Log what you’re doing, not just “still alive.”
  • Document children return status. The parent shouldn’t need exception archaeology for ordinary document failures.
  • Dedicated status worker. Decouple progress tracking from ingestion so it stays responsive under load.
  • Checkpoint-based cancellation. Check the flag at lifecycle boundaries. For long activities, race the handle against a wait_condition. Cascade to in-flight children before cleanup.
  • Watch for swallowed cancels. Test cancel + signal arriving in the same Workflow Task. The Python SDK may silently consume the cancel. If you read _cancel_requested, wrap it in one helper and test it around SDK upgrades.
  • Shield cleanup work after cancellation. Once a workflow is cancelled, the next checkpoint will re-cancel the task. Wrap every cleanup await with asyncio.shield() so status updates and deletions can finish.
  • Policy-based auto-approval. Manual-only doesn’t scale. But always reject zero-document results regardless of policy.
  • Temporal Schedules over external cron. Use a thin wrapper to load fresh config and guard overlap with ScheduleOverlapPolicy.SKIP.
  • Namespace task queues for isolation. By username in dev, by PR number in preview.

About the author

— Founding Engineer at Rapidflare

Founding Engineer at Rapidflare. Previously a backend engineer at fabric on the loyalty team, and earlier built ML systems and a pre-LLM AI research assistant at RAx (acquired by Enago) and Fero.Ai.

Blog