Skip to content

Sinks

Sinks are pluggable output destinations for processed results. After your on_task_complete(), on_message_complete(), or on_window_complete() hook returns a CollectResult, the framework routes each payload to the correct sink, serializes the data, and delivers it.

Drakkar ships with six sink types. You can configure any combination of them, and each type supports multiple named instances (e.g., two separate Kafka topics or three Postgres databases).

Delivery Lifecycle

Event When What happens
connect() Worker startup, after on_startup() Every configured sink’s connect() runs concurrently (asyncio.gather), so cold-start latency is bounded by the slowest sink, not the sum. If any fails, the worker crashes immediately.
deliver(payloads) After each on_task_complete() or on_message_complete() or on_window_complete() returns payloads The framework groups payloads by (sink_type, sink_name), then dispatches one deliver() call per group in parallel via asyncio.gather. One slow or failing sink does not block others. Each group owns its own retry / DLQ / circuit-breaker state.
Circuit breaker Before each deliver() call If the sink’s breaker is open, deliver() is skipped and the batch routes straight to the DLQ.
on_delivery_error() When deliver() raises an exception Your handler decides: DLQ (default), RETRY, or SKIP. Retries re-call deliver() up to executor.max_retries times.
Offset commit After all sinks confirm delivery for a window Kafka offsets are committed only when every payload from the window has been successfully delivered (or routed to DLQ/skipped). No partial commits.
close() Worker shutdown Each sink closes its connection gracefully. Errors are logged but don’t block shutdown.

Delivery frequency. For each successful task, on_task_complete() is called once. If it returns payloads for N sink groups (e.g., 1 Kafka + 1 Postgres + 1 Redis = 3 groups), the framework makes N independent deliver() calls. With on_window_complete(), one additional delivery round happens per window. The Postgres pool exposed in on_ready() is the same pool used by the Postgres sink – you can query it directly for lookups, migrations, or health checks.

sinks:
  kafka:
    results:
      topic: "search-results"
  postgres:
    main:
      dsn: "postgresql://user:pass@db:5432/app"
  http:
    webhook:
      url: "https://hooks.example.com/notify"

Payload types

Every sink type has a corresponding payload model. You create payload instances inside on_task_complete() and return them in a CollectResult. The data field is always a Pydantic BaseModel – the framework handles serialization differently for each sink type.

KafkaPayload

Produces a message to a Kafka topic.

Field Type Description
sink str Target sink instance name (empty string for default)
key bytes \| None Kafka message key, passed through as-is
data BaseModel Payload model

Serialization: data.model_dump_json().encode() becomes the Kafka message value. The key is passed through unchanged to the producer.

from drakkar import KafkaPayload

KafkaPayload(
    data=search_result,
    key=b"request-abc",
)

PostgresPayload

Inserts a row into a PostgreSQL table.

Field Type Description
sink str Target sink instance name (empty string for default)
table str Target table name
data BaseModel Payload model

Serialization: data.model_dump() produces a {column: value} dict. The framework builds an INSERT INTO <table> (<columns>) VALUES (<placeholders>) query. Column and table names are validated against SQL injection.

from drakkar import PostgresPayload

PostgresPayload(
    table='search_results',
    data=search_summary,
)

MongoPayload

Inserts a document into a MongoDB collection.

Field Type Description
sink str Target sink instance name (empty string for default)
collection str Target MongoDB collection name
data BaseModel Payload model

Serialization: data.model_dump() produces a dict, inserted via insert_one.

from drakkar import MongoPayload

MongoPayload(
    collection='search_archive',
    data=search_result,
)

HttpPayload

Sends a JSON request to an HTTP endpoint.

Field Type Description
sink str Target sink instance name (empty string for default)
data BaseModel Payload model

Serialization: data.model_dump_json() becomes the request body with Content-Type: application/json. Non-2xx responses raise an error routed through on_delivery_error().

from drakkar import HttpPayload

HttpPayload(data=notification)

RedisPayload

Sets a key-value pair in Redis.

Field Type Description
sink str Target sink instance name (empty string for default)
key str Redis key suffix
data BaseModel Payload model
ttl int \| None Optional expiry in seconds

Serialization: data.model_dump_json() becomes the Redis string value. The full Redis key is {config.key_prefix}{payload.key}. When ttl is set, the key expires after that many seconds (SET key value EX ttl).

from drakkar import RedisPayload

RedisPayload(
    key=f'search:{request_id}',
    data=search_summary,
    ttl=3600,  # 1 hour
)

FilePayload

Appends a JSON line to a file on the local filesystem.

Field Type Description
sink str Target sink instance name (empty string for default)
path str File path (relative to the sink’s base_path config)
data BaseModel Payload model

Serialization: data.model_dump_json() + '\n' is appended to the file in JSONL format. The file is created if it does not exist. The parent directory must already exist.

Path containment: base_path is required in the filesystem sink config. All payload paths are resolved relative to base_path and canonicalized — the framework raises ValueError if the resolved path escapes the base directory (prevents path traversal).

from drakkar import FilePayload

FilePayload(
    path='high-match-results.jsonl',  # resolved relative to sink's base_path
    data=search_result,
)

Routing

The sink field on every payload controls which configured sink instance receives it.

Single sink per type

When only one sink of a given type is configured, leave sink as the empty string (the default). The framework routes automatically:

sinks:
  kafka:
    results:          # only one kafka sink
      topic: "output"
# sink='' (default) routes to "results" automatically
KafkaPayload(data=output, key=b"abc")

Multiple named sinks

When you have multiple sinks of the same type, set sink to the instance name:

sinks:
  kafka:
    results:
      topic: "search-results"
    alerts:
      topic: "search-alerts"
# Route to the "results" kafka sink
KafkaPayload(sink='results', data=full_result, key=b"abc")

# Route to the "alerts" kafka sink
KafkaPayload(sink='alerts', data=alert_data, key=b"abc")

AmbiguousSinkError

If you have multiple sinks of the same type but leave sink empty, the framework raises AmbiguousSinkError at delivery time:

AmbiguousSinkError: 2 'kafka' sinks configured (['results', 'alerts']),
but payload has empty sink name -- specify which one

This is caught during validation before any delivery attempt, so misconfiguration surfaces immediately.


Delivery and error handling

Delivery flow

The SinkManager groups all payloads from a CollectResult by (sink_type, sink_name), then calls deliver() on each group. A single on_task_complete() call can route payloads to any number of sinks in one shot.

Error handling

When a sink’s deliver() raises an exception, the framework calls your on_delivery_error() hook with a DeliveryError containing the sink name, type, error message, and the failed payloads:

class DeliveryError(BaseModel):
    sink_name: str    # e.g. "results"
    sink_type: str    # e.g. "kafka", "postgres", "http"
    error: str        # human-readable error message
    payloads: list[BaseModel]  # the payloads that failed

Your hook returns a DeliveryAction:

Action Behavior
DeliveryAction.DLQ Write the failed payloads to the dead letter queue (default)
DeliveryAction.RETRY Retry delivery, up to executor.max_retries attempts
DeliveryAction.SKIP Drop the payloads and continue processing

If RETRY is returned but retries are exhausted, the framework falls through to DLQ.

Circuit Breaker

Every sink has a per-instance circuit breaker that protects the worker from hammering a persistently-failing downstream. The breaker operates at the terminal-outcome level (retries already exhausted, operator returned DLQ), so transient blips don’t trip it.

States and transitions:

State Behavior Transition
closed Every deliver() call goes through. Terminal failures increment _consecutive_failures; any success resets it to zero. After failure_threshold consecutive failures → open.
open deliver() is skipped for the cooldown window. The batch routes directly to the DLQ — the handler’s on_delivery_error is bypassed on this path (SKIP would silently drop data, RETRY would hammer the recovering downstream, neither is appropriate). After cooldown_seconds elapsed, the next incoming batch promotes to half_open.
half_open A single probe delivery is allowed through. Parallel delivery attempts against the same sink observe the in-flight probe and skip — the _probe_inflight gate enforces single-flight semantics. Probe success → closed with counters reset. Probe failure → open with a fresh cooldown.

Operator-visible signals:

  • drakkar_sink_circuit_open{sink_type, sink_name} — gauge with states encoded as 0.0 (closed), 0.5 (half-open), 1.0 (open). Zero-initialized at registration so a never-tripped sink still appears in scrape output.
  • drakkar_sink_circuit_trips_total{sink_type, sink_name} — counter ticking on every transition into open (initial failure-threshold trip AND half-open probe failures). A flapping circuit shows up as a rising rate on this counter, not a single trip plus silent reopens.
  • SinkStats.last_error == 'circuit open' — the debug UI renders this sentinel so operators can distinguish breaker-routed DLQ entries from actual delivery failures.

The breaker defaults (failure_threshold=5, cooldown_seconds=30.0) are tuned for typical throughput. Pipelines with stricter latency budgets can lower the threshold so one persistent outage trips the breaker sooner; long-recovery downstreams may want a longer cooldown. See Configuration → Circuit Breaker for the full knobs.

Retry contract

Every BaseSink subclass declares an idempotent: bool class attribute that tells SinkManager whether duplicate delivery is safe. The flag drives a small fast-retry loop on transient errors before the failure reaches your on_delivery_error handler:

idempotent value SinkManager behavior on transient error
True Retry deliver() up to 3 times with exponential backoff (50ms / 100ms / 200ms). If still failing, surface the error to on_delivery_error.
False (default) Single attempt. Any error — transient or not — goes straight to on_delivery_error.

The transient-error classification covers ConnectionError, TimeoutError, and asyncio.TimeoutError. Every other exception (including ValueError and RuntimeError) is treated as non-transient and is not retried even when the sink is idempotent — those signal a bug in the payload or sink, and a quick re-attempt won’t change the outcome.

All fast-retries inside a single deliver() call count as one delivery attempt from the circuit breaker’s perspective — whether the sink retried internally 1 or 3 times, the breaker sees exactly one terminal outcome (success or failure) per batch.

When to set idempotent = True

Opt in when duplicate delivery is provably safe:

  • Broker-side dedup: Kafka producer with enable.idempotence=true + stable message key (the default KafkaSink does NOT enable this and therefore keeps idempotent = False).
  • Write-replace semantics: Redis SET of a fixed key, filesystem atomic-rename overwrite, S3 PutObject on a stable path.
  • Idempotency keys: HTTP POST/PUT with an Idempotency-Key header the receiver honors (Stripe, Shopify, custom APIs).
  • Upsert semantics: Postgres INSERT ... ON CONFLICT DO UPDATE, Mongo update-with-upsert on a deterministic _id.

When to keep idempotent = False

Keep the default when duplicate delivery has observable side effects:

  • Plain HTTP POST without an idempotency key.
  • Append-only file writes without a dedup key.
  • Plain INSERT without a unique constraint.
  • Any non-idempotent network call (payment API, notification dispatch, etc.).

Built-in sink defaults

Sink idempotent Reason
KafkaSink False Default producer config does not enable broker dedup.
HttpSink False HTTP POST may have side effects without an idempotency key.
FileSink False Append mode duplicates records on retry.
DLQSink False Same as KafkaSink; DLQ also uses its own send() path.
PostgresSink False Plain INSERT duplicates on retry.
MongoSink False insert_* without a stable _id duplicates on retry.
RedisSink True SET on a fixed key is write-replace.

Custom sinks inherit the safe default (False). Set the attribute explicitly on your subclass when you know duplicates are safe:

class StripeWebhookSink(HttpSink):
    """Webhook sink that attaches a Stripe idempotency key."""

    idempotent = True  # Stripe dedups by Idempotency-Key header

Example

async def on_delivery_error(self, error: dk.DeliveryError) -> dk.DeliveryAction:
    # Retry transient failures for HTTP and Redis
    if error.sink_type in ('http', 'redis'):
        return dk.DeliveryAction.RETRY

    # Skip filesystem errors (non-critical logging)
    if error.sink_type == 'filesystem':
        return dk.DeliveryAction.SKIP

    # Everything else goes to DLQ for investigation
    return dk.DeliveryAction.DLQ

Dead letter queue

When delivery fails and your on_delivery_error hook returns DeliveryAction.DLQ (or retries are exhausted), the framework writes the failed payloads to a Kafka-based dead letter queue.

Topic derivation

The DLQ topic is configured under the dlq key:

dlq:
  topic: ""       # empty = auto-derived from source topic
  brokers: ""     # empty = inherits from kafka.brokers

When topic is empty, the framework derives it as {source_topic}_dlq. For example, if kafka.source_topic is search-requests, the DLQ topic becomes search-requests_dlq.

Message format

Each DLQ message is a JSON document containing:

Field Description
original_payloads The failed payloads, each serialized as a JSON string
sink_name Name of the sink that failed
sink_type Type of the sink that failed
error Error message from the failed delivery
timestamp Unix epoch timestamp of the failure
partition Source Kafka partition the message came from
attempt_count Number of delivery attempts before writing to DLQ

Broker inheritance

When dlq.brokers is empty, the DLQ producer connects to the same brokers as the main Kafka consumer (kafka.brokers). Set dlq.brokers explicitly to write DLQ messages to a different Kafka cluster.

DLQ replay

scripts/replay_dlq.py is a reference operator tool that reads DLQ entries and republishes their preserved original_payloads to a target Kafka topic. Use it after fixing a bug that caused messages to land in the DLQ — the script drives those messages back through Kafka so the normal consumer picks them up again.

When to use it

  • A downstream sink regressed, the bug has been fixed, and you want to reprocess the dead-lettered batch without editing offsets by hand.
  • You want to inspect and filter the DLQ contents before replaying (pair --dry-run with --filter to count matches).
  • You want to divert DLQ content to a shadow topic for offline analysis (set --target-topic to a scratch topic).

Usage

# Dry run, count matches only
uv run python scripts/replay_dlq.py \
    --dlq-config=config.yaml \
    --target-topic=search-requests \
    --dry-run \
    --filter='customer_id:42'

# Real replay, bounded
uv run python scripts/replay_dlq.py \
    --dlq-config=config.yaml \
    --target-topic=search-requests \
    --limit=1000

# Different broker for the target
uv run python scripts/replay_dlq.py \
    --dlq-config=config.yaml \
    --target-topic=search-requests \
    --target-brokers=prod-kafka:9092

The script prints replayed N/M to stderr every 1000 records and a final summary line (summary: read=… published=… filtered_out=… errors=…) when it finishes. Ctrl+C is handled gracefully — the in-flight produce completes and the producer is flushed before exit.

Caveats

  • Duplicate processing: the script does NOT deduplicate against the source topic. If your sink is not idempotent (see BaseSink.idempotent) and your pipeline has already partially processed the DLQ contents, replaying will cause duplicate work. Verify the sink’s idempotency contract before replaying.
  • Filter is substring-only: --filter='customer_id:42' is a plain in check against each payload’s JSON string. It does not parse JSON or support path expressions. For structural filtering, pipe the DLQ topic through kcat | jq in advance and feed a pre-filtered topic to the script.
  • --limit does not resume: --limit=1000 stops after 1000 entries have been READ (filter-rejected entries still count). Running with --limit twice replays the first N entries both times — the consumer group is throwaway (unique per invocation), so offsets are not preserved between runs.
  • Original Kafka envelope is NOT fully preserved: the replay script publishes only the original_payloads bytes to the target topic. The original message’s key, partition, and headers are NOT currently captured by DLQMessage.serialize() and therefore cannot be restored by the replay script. If downstream consumers depend on those fields (e.g. the source partition for a keyed-routing consumer, or headers for tracing), point --target-topic at a retry topic whose producer re-derives them from the payload body. Operators who need a lossless replay should post-process the DLQ output before publishing.
  • Partial-entry failure is per-record: each DLQ entry may expand to multiple target-topic messages (one per preserved payload). If a produce fails midway through a multi-payload entry, earlier sub- payloads from that entry have already been sent. The stderr summary reports the last successful DLQ offset so operators can resume, but inspection of the specific entry may be necessary to avoid duplicates.
  • Exit code 1 with partial success: if the producer fails mid-run after some records were published, the script exits 1 and logs the last successful DLQ offset to stderr. Operators can re-run with an adjusted filter (e.g. filtering for records newer than that offset’s timestamp) to resume.

Sink connections

Each sink follows a lifecycle of connect() at startup and close() at shutdown.

Startup

connect() is called once during worker startup for every configured sink. If any sink fails to connect, the worker crashes immediately with a clear error – this prevents silent failures where the pipeline runs but cannot deliver results.

Shutdown

close() is called for every sink during worker shutdown. Close errors are logged but never raised, so shutdown proceeds cleanly even if a connection is already lost.

Connection-state flag (custom sinks)

BaseSink maintains an is_connected: bool read-only property backed by the mark_connected() / mark_disconnected() methods. You do NOT call these from your subclass — the SinkManager drives the flag: connect_all() calls mark_connected() after a successful connect(), and close_all() calls mark_disconnected() after close() runs (even when close() raises, so the flag faithfully reflects the manager’s view of the connection). Readiness probes (/readyz) read is_connected to decide whether to report the sink as ready. Your custom sink only needs to implement connect() / close(); the bookkeeping is handled for you.

Sink-specific details

PostgresSink exposes the asyncpg connection pool via its pool property. This is available in on_ready() for running migrations, loading lookup tables, or any direct database access needed at startup.

HttpSink uses httpx.AsyncClient with configurable timeout and headers:

sinks:
  http:
    webhook:
      url: "https://hooks.example.com/notify"
      method: "POST"
      timeout_seconds: 10
      headers:
        Authorization: "Bearer ${WEBHOOK_TOKEN}"

RedisSink uses redis.asyncio (via from_url), connecting to the URL specified in config:

sinks:
  redis:
    cache:
      url: "redis://redis:6379/0"
      key_prefix: "drakkar:"

KafkaSink inherits kafka.brokers when its own brokers field is empty, so you only need to specify brokers once for sinks on the same cluster.

MongoSink uses motor’s AsyncIOMotorClient for native asyncio support.

FileSink requires base_path (non-empty) and validates it exists at connect time. All payload paths are contained within base_path — traversal attempts raise ValueError. Individual payload paths must have existing parent directories.


Custom sinks (plugin API)

Drakkar discovers third-party sink types through Python’s standard importlib.metadata entry points. A plugin package registers a BaseSink subclass under the drakkar.sinks group, and SinkRegistry.discover() (called once when SinkManager is constructed) loads it automatically — no monkey-patches, no fork of Drakkar.

The same registry holds the built-in sinks. SinkRegistry.get(name) returns the class registered under that name, whether it shipped with Drakkar or arrived through a plugin. A plugin may override a built-in by re-registering the same name (for example, a custom kafka sink with extra metrics).

BaseSink contract

Subclass drakkar.sinks.base.BaseSink and implement three async methods:

from drakkar.sinks.base import BaseSink
from pydantic import BaseModel

class MyCustomPayload(BaseModel):
    """Whatever shape your sink consumes."""
    value: str

class MyCustomSink(BaseSink[MyCustomPayload]):
    sink_type = 'my_custom'           # canonical type name (used in config)
    idempotent = False                # see "Retry contract" above

    def __init__(self, name: str, config) -> None:
        super().__init__(name)
        self._config = config

    async def connect(self) -> None:
        """Open the underlying client. Raise on failure — the worker
        crashes fast with a clear startup error.
        """

    async def deliver(self, payloads: list[MyCustomPayload]) -> None:
        """Send a batch. Raise on failure — the framework handles
        retries, DLQ routing, and circuit-breaker bookkeeping.
        """

    async def close(self) -> None:
        """Release resources. Should not raise — log and continue."""

The idempotent class attribute and the circuit-breaker / retry behaviour described under Retry contract apply to custom sinks identically. You do not call mark_connected / mark_disconnected or any of the record_* circuit-breaker methods yourself — SinkManager drives those.

Entry-point declaration

In your plugin’s pyproject.toml:

[project.entry-points."drakkar.sinks"]
my_custom = "my_package.sinks:MyCustomSink"

The left side (my_custom) is the registry key — it is what SinkRegistry.get() returns your class for, and it is the type name operators use in their Drakkar config. The right side is a standard module:attribute reference resolvable by EntryPoint.load().

After pip install my-package (or uv sync in a workspace that declares the dependency), Drakkar picks up the new sink at startup without any further wiring.

Configuring instances

Plugin sinks are declared under sinks.custom.<type>.<instance> in the worker config. The <type> segment must match the entry-point key; <instance> is whatever name your handler uses to route payloads. The leaf dict is passed verbatim as the second argument to your sink’s constructor (MyCustomSink(name, config)), so you can shape it however your sink expects:

sinks:
  kafka:
    results:
      topic: "search-results"
  custom:
    my_custom:
      primary:
        endpoint: "https://api.example.com/v1/ingest"
        api_key: "xxx"
      secondary:
        endpoint: "https://api.example.com/v1/audit"
        api_key: "xxx"

If the <type> is not registered (plugin missing or entry-point typo), DrakkarApp._build_sinks raises a ValueError at startup listing the known types — failing loud beats a silently-dropped sink.

Inspecting registered sinks

from drakkar.sinks import SinkRegistry

SinkRegistry.discover()                      # idempotent — safe to call repeatedly
print(SinkRegistry.all_names())              # ['filesystem', 'http', 'kafka', 'my_custom', ...]
print(SinkRegistry.get('my_custom'))         # <class 'my_package.sinks.MyCustomSink'>
print(SinkRegistry.get('not_installed'))     # None — get() never raises

Failure modes

If a plugin entry point fails to load (missing dependency, ImportError, non-BaseSink target), SinkRegistry.discover() logs a structured sink_registry_entry_point_load_failed (or ..._invalid) warning and skips that entry. One broken plugin never crashes the worker — operators see the warning in their log aggregator and remove or fix the offending package.

Routing payloads to a plugin sink

Plugin sinks receive payloads through the custom field on CollectResult. Each CustomPayload.sink names the configured plugin sink instance (one of the keys you put under sinks.custom.<type> in your config); the framework looks the instance up by name across all plugin sinks, validates it at startup-time and per-result, and hands your data field to the sink’s deliver() method as-is.

import drakkar as dk
from pydantic import BaseModel

class MyOutput(BaseModel):
    request_id: str
    payload: dict

class SearchHandler(dk.BaseDrakkarHandler):
    async def on_task_complete(self, result):
        out = MyOutput(request_id='abc', payload={'matches': 12})
        return dk.CollectResult(
            custom=[
                dk.CustomPayload(sink='primary', data=out),
                dk.CustomPayload(sink='secondary', data=out),
            ],
        )

The corresponding config:

sinks:
  custom:
    my_custom:
      primary:
        endpoint: 'https://api.example.com/v1/ingest'
      secondary:
        endpoint: 'https://api.example.com/v1/audit'

my_custom here is the entry-point key your pyproject.toml declared (MyCustomSink.sink_type); primary / secondary are the instance names the handler routes to via CustomPayload.sink.

If the handler returns a CustomPayload with a sink name that no configured plugin instance owns, SinkManager.validate_collect raises SinkNotConfiguredError before any payload is delivered — same fail-loud guarantee as for the built-in sinks. If the same instance name is shared across two plugin sink types, AmbiguousSinkError is raised so operators rename one to disambiguate.


CollectResult

CollectResult is the return type of on_task_complete(), on_message_complete(), and on_window_complete(). It has one list field per sink type. Populate whichever fields match your configured sinks.

Complete example

This example routes a single executor result to all six sink types, with conditional routing for HTTP (webhook only for high-match results) and filesystem (JSONL log only for very high-match results):

async def on_task_complete(self, result: dk.ExecutorResult) -> dk.CollectResult | None:
    matches = [line for line in result.stdout.strip().split('\n') if line]
    meta = result.task.metadata

    full_result = SearchResult(
        request_id=meta['request_id'],
        pattern=meta['pattern'],
        match_count=len(matches),
        duration_seconds=result.duration_seconds,
        matches=matches[:50],
    )

    summary = SearchSummary(
        request_id=meta['request_id'],
        pattern=meta['pattern'],
        match_count=len(matches),
        duration_seconds=result.duration_seconds,
    )

    sinks = dk.CollectResult(
        kafka=[dk.KafkaPayload(data=full_result, key=meta['request_id'].encode())],
        postgres=[dk.PostgresPayload(table='search_results', data=summary)],
        mongo=[dk.MongoPayload(collection='search_archive', data=full_result)],
        redis=[dk.RedisPayload(key=f'search:{meta["request_id"]}', data=summary, ttl=3600)],
    )

    # Conditional: HTTP webhook for high-match results only
    if len(matches) > 20:
        notification = SearchNotification(
            request_id=meta['request_id'],
            pattern=meta['pattern'],
            match_count=len(matches),
            message=f"High match count: {len(matches)} matches for '{meta['pattern']}'",
        )
        sinks.http.append(dk.HttpPayload(data=notification))

    # Conditional: JSONL file log for very high-match results
    if len(matches) > 50:
        sinks.files.append(dk.FilePayload(path='/tmp/high-match-results.jsonl', data=full_result))

    return sinks

Returning None from on_task_complete() skips delivery entirely for that result. The framework only commits Kafka offsets after all sinks confirm delivery (or delivery errors are handled through on_delivery_error()). See Offset Commit Logic for details on watermark-based commit tracking.