Sinks¶
Sinks are pluggable output destinations for processed results. After your on_task_complete(),
on_message_complete(), or on_window_complete() hook
returns a CollectResult, the framework routes each payload to the correct sink, serializes the data, and delivers it.
Drakkar ships with six sink types. You can configure any combination of them, and each type supports multiple named instances (e.g., two separate Kafka topics or three Postgres databases).
Delivery Lifecycle¶
| Event | When | What happens |
|---|---|---|
connect() |
Worker startup, after on_startup() | Every configured sink’s connect() runs concurrently (asyncio.gather), so cold-start latency is bounded by the slowest sink, not the sum. If any fails, the worker crashes immediately. |
deliver(payloads) |
After each on_task_complete() or on_message_complete() or on_window_complete() returns payloads | The framework groups payloads by (sink_type, sink_name), then dispatches one deliver() call per group in parallel via asyncio.gather. One slow or failing sink does not block others. Each group owns its own retry / DLQ / circuit-breaker state. |
| Circuit breaker | Before each deliver() call |
If the sink’s breaker is open, deliver() is skipped and the batch routes straight to the DLQ. |
| on_delivery_error() | When deliver() raises an exception |
Your handler decides: DLQ (default), RETRY, or SKIP. Retries re-call deliver() up to executor.max_retries times. |
| Offset commit | After all sinks confirm delivery for a window | Kafka offsets are committed only when every payload from the window has been successfully delivered (or routed to DLQ/skipped). No partial commits. |
close() |
Worker shutdown | Each sink closes its connection gracefully. Errors are logged but don’t block shutdown. |
Delivery frequency. For each successful task, on_task_complete() is called once. If it returns
payloads for N sink groups (e.g., 1 Kafka + 1 Postgres + 1 Redis = 3 groups), the framework
makes N independent deliver() calls. With on_window_complete(), one additional delivery
round happens per window. The Postgres pool exposed in on_ready() is the same pool used by
the Postgres sink – you can query it directly for lookups, migrations, or health checks.
sinks:
kafka:
results:
topic: "search-results"
postgres:
main:
dsn: "postgresql://user:pass@db:5432/app"
http:
webhook:
url: "https://hooks.example.com/notify"
Payload types¶
Every sink type has a corresponding payload model. You create payload instances inside
on_task_complete() and return them in a CollectResult. The data field is always a Pydantic
BaseModel – the framework handles serialization differently for each sink type.
KafkaPayload¶
Produces a message to a Kafka topic.
| Field | Type | Description |
|---|---|---|
sink |
str |
Target sink instance name (empty string for default) |
key |
bytes \| None |
Kafka message key, passed through as-is |
data |
BaseModel |
Payload model |
Serialization: data.model_dump_json().encode() becomes the Kafka message value.
The key is passed through unchanged to the producer.
PostgresPayload¶
Inserts a row into a PostgreSQL table.
| Field | Type | Description |
|---|---|---|
sink |
str |
Target sink instance name (empty string for default) |
table |
str |
Target table name |
data |
BaseModel |
Payload model |
Serialization: data.model_dump() produces a {column: value} dict. The framework
builds an INSERT INTO <table> (<columns>) VALUES (<placeholders>) query. Column and
table names are validated against SQL injection.
MongoPayload¶
Inserts a document into a MongoDB collection.
| Field | Type | Description |
|---|---|---|
sink |
str |
Target sink instance name (empty string for default) |
collection |
str |
Target MongoDB collection name |
data |
BaseModel |
Payload model |
Serialization: data.model_dump() produces a dict, inserted via insert_one.
HttpPayload¶
Sends a JSON request to an HTTP endpoint.
| Field | Type | Description |
|---|---|---|
sink |
str |
Target sink instance name (empty string for default) |
data |
BaseModel |
Payload model |
Serialization: data.model_dump_json() becomes the request body with
Content-Type: application/json. Non-2xx responses raise an error routed through
on_delivery_error().
RedisPayload¶
Sets a key-value pair in Redis.
| Field | Type | Description |
|---|---|---|
sink |
str |
Target sink instance name (empty string for default) |
key |
str |
Redis key suffix |
data |
BaseModel |
Payload model |
ttl |
int \| None |
Optional expiry in seconds |
Serialization: data.model_dump_json() becomes the Redis string value. The full
Redis key is {config.key_prefix}{payload.key}. When ttl is set, the key expires
after that many seconds (SET key value EX ttl).
from drakkar import RedisPayload
RedisPayload(
key=f'search:{request_id}',
data=search_summary,
ttl=3600, # 1 hour
)
FilePayload¶
Appends a JSON line to a file on the local filesystem.
| Field | Type | Description |
|---|---|---|
sink |
str |
Target sink instance name (empty string for default) |
path |
str |
File path (relative to the sink’s base_path config) |
data |
BaseModel |
Payload model |
Serialization: data.model_dump_json() + '\n' is appended to the file in JSONL
format. The file is created if it does not exist. The parent directory must already exist.
Path containment: base_path is required in the filesystem sink config. All payload
paths are resolved relative to base_path and canonicalized — the framework raises
ValueError if the resolved path escapes the base directory (prevents path traversal).
from drakkar import FilePayload
FilePayload(
path='high-match-results.jsonl', # resolved relative to sink's base_path
data=search_result,
)
Routing¶
The sink field on every payload controls which configured sink instance receives it.
Single sink per type¶
When only one sink of a given type is configured, leave sink as the empty string
(the default). The framework routes automatically:
Multiple named sinks¶
When you have multiple sinks of the same type, set sink to the instance name:
# Route to the "results" kafka sink
KafkaPayload(sink='results', data=full_result, key=b"abc")
# Route to the "alerts" kafka sink
KafkaPayload(sink='alerts', data=alert_data, key=b"abc")
AmbiguousSinkError¶
If you have multiple sinks of the same type but leave sink empty, the framework
raises AmbiguousSinkError at delivery time:
AmbiguousSinkError: 2 'kafka' sinks configured (['results', 'alerts']),
but payload has empty sink name -- specify which one
This is caught during validation before any delivery attempt, so misconfiguration surfaces immediately.
Delivery and error handling¶
Delivery flow¶
The SinkManager groups all payloads from a CollectResult by (sink_type, sink_name),
then calls deliver() on each group. A single on_task_complete() call can route payloads to
any number of sinks in one shot.
Error handling¶
When a sink’s deliver() raises an exception, the framework calls your
on_delivery_error() hook with a DeliveryError containing the sink name, type,
error message, and the failed payloads:
class DeliveryError(BaseModel):
sink_name: str # e.g. "results"
sink_type: str # e.g. "kafka", "postgres", "http"
error: str # human-readable error message
payloads: list[BaseModel] # the payloads that failed
Your hook returns a DeliveryAction:
| Action | Behavior |
|---|---|
DeliveryAction.DLQ |
Write the failed payloads to the dead letter queue (default) |
DeliveryAction.RETRY |
Retry delivery, up to executor.max_retries attempts |
DeliveryAction.SKIP |
Drop the payloads and continue processing |
If RETRY is returned but retries are exhausted, the framework falls through to DLQ.
Circuit Breaker¶
Every sink has a per-instance circuit breaker that protects the
worker from hammering a persistently-failing downstream. The breaker
operates at the terminal-outcome level (retries already
exhausted, operator returned DLQ), so transient blips don’t trip it.
States and transitions:
| State | Behavior | Transition |
|---|---|---|
closed |
Every deliver() call goes through. Terminal failures increment _consecutive_failures; any success resets it to zero. |
After failure_threshold consecutive failures → open. |
open |
deliver() is skipped for the cooldown window. The batch routes directly to the DLQ — the handler’s on_delivery_error is bypassed on this path (SKIP would silently drop data, RETRY would hammer the recovering downstream, neither is appropriate). |
After cooldown_seconds elapsed, the next incoming batch promotes to half_open. |
half_open |
A single probe delivery is allowed through. Parallel delivery attempts against the same sink observe the in-flight probe and skip — the _probe_inflight gate enforces single-flight semantics. |
Probe success → closed with counters reset. Probe failure → open with a fresh cooldown. |
Operator-visible signals:
drakkar_sink_circuit_open{sink_type, sink_name}— gauge with states encoded as0.0(closed),0.5(half-open),1.0(open). Zero-initialized at registration so a never-tripped sink still appears in scrape output.drakkar_sink_circuit_trips_total{sink_type, sink_name}— counter ticking on every transition into open (initial failure-threshold trip AND half-open probe failures). A flapping circuit shows up as a rising rate on this counter, not a single trip plus silent reopens.SinkStats.last_error == 'circuit open'— the debug UI renders this sentinel so operators can distinguish breaker-routed DLQ entries from actual delivery failures.
The breaker defaults (failure_threshold=5, cooldown_seconds=30.0)
are tuned for typical throughput. Pipelines with stricter latency
budgets can lower the threshold so one persistent outage trips the
breaker sooner; long-recovery downstreams may want a longer cooldown.
See Configuration → Circuit Breaker
for the full knobs.
Retry contract¶
Every BaseSink subclass declares an idempotent: bool class attribute that
tells SinkManager whether duplicate delivery is safe. The flag drives a small
fast-retry loop on transient errors before the failure reaches your
on_delivery_error handler:
idempotent value |
SinkManager behavior on transient error |
|---|---|
True |
Retry deliver() up to 3 times with exponential backoff (50ms / 100ms / 200ms). If still failing, surface the error to on_delivery_error. |
False (default) |
Single attempt. Any error — transient or not — goes straight to on_delivery_error. |
The transient-error classification covers ConnectionError, TimeoutError,
and asyncio.TimeoutError. Every other exception (including ValueError and
RuntimeError) is treated as non-transient and is not retried even when the
sink is idempotent — those signal a bug in the payload or sink, and a quick
re-attempt won’t change the outcome.
All fast-retries inside a single deliver() call count as one delivery
attempt from the circuit breaker’s perspective — whether the sink retried
internally 1 or 3 times, the breaker sees exactly one terminal outcome
(success or failure) per batch.
When to set idempotent = True¶
Opt in when duplicate delivery is provably safe:
- Broker-side dedup: Kafka producer with
enable.idempotence=true+ stable message key (the defaultKafkaSinkdoes NOT enable this and therefore keepsidempotent = False). - Write-replace semantics: Redis
SETof a fixed key, filesystem atomic-rename overwrite, S3PutObjecton a stable path. - Idempotency keys: HTTP
POST/PUTwith anIdempotency-Keyheader the receiver honors (Stripe, Shopify, custom APIs). - Upsert semantics: Postgres
INSERT ... ON CONFLICT DO UPDATE, Mongo update-with-upsert on a deterministic_id.
When to keep idempotent = False¶
Keep the default when duplicate delivery has observable side effects:
- Plain HTTP POST without an idempotency key.
- Append-only file writes without a dedup key.
- Plain INSERT without a unique constraint.
- Any non-idempotent network call (payment API, notification dispatch, etc.).
Built-in sink defaults¶
| Sink | idempotent |
Reason |
|---|---|---|
KafkaSink |
False |
Default producer config does not enable broker dedup. |
HttpSink |
False |
HTTP POST may have side effects without an idempotency key. |
FileSink |
False |
Append mode duplicates records on retry. |
DLQSink |
False |
Same as KafkaSink; DLQ also uses its own send() path. |
PostgresSink |
False |
Plain INSERT duplicates on retry. |
MongoSink |
False |
insert_* without a stable _id duplicates on retry. |
RedisSink |
True |
SET on a fixed key is write-replace. |
Custom sinks inherit the safe default (False). Set the attribute explicitly
on your subclass when you know duplicates are safe:
class StripeWebhookSink(HttpSink):
"""Webhook sink that attaches a Stripe idempotency key."""
idempotent = True # Stripe dedups by Idempotency-Key header
Example¶
async def on_delivery_error(self, error: dk.DeliveryError) -> dk.DeliveryAction:
# Retry transient failures for HTTP and Redis
if error.sink_type in ('http', 'redis'):
return dk.DeliveryAction.RETRY
# Skip filesystem errors (non-critical logging)
if error.sink_type == 'filesystem':
return dk.DeliveryAction.SKIP
# Everything else goes to DLQ for investigation
return dk.DeliveryAction.DLQ
Dead letter queue¶
When delivery fails and your on_delivery_error hook returns DeliveryAction.DLQ
(or retries are exhausted), the framework writes the failed payloads to a Kafka-based
dead letter queue.
Topic derivation¶
The DLQ topic is configured under the dlq key:
dlq:
topic: "" # empty = auto-derived from source topic
brokers: "" # empty = inherits from kafka.brokers
When topic is empty, the framework derives it as {source_topic}_dlq. For example,
if kafka.source_topic is search-requests, the DLQ topic becomes
search-requests_dlq.
Message format¶
Each DLQ message is a JSON document containing:
| Field | Description |
|---|---|
original_payloads |
The failed payloads, each serialized as a JSON string |
sink_name |
Name of the sink that failed |
sink_type |
Type of the sink that failed |
error |
Error message from the failed delivery |
timestamp |
Unix epoch timestamp of the failure |
partition |
Source Kafka partition the message came from |
attempt_count |
Number of delivery attempts before writing to DLQ |
Broker inheritance¶
When dlq.brokers is empty, the DLQ producer connects to the same brokers as the
main Kafka consumer (kafka.brokers). Set dlq.brokers explicitly to write DLQ
messages to a different Kafka cluster.
DLQ replay¶
scripts/replay_dlq.py is a reference operator tool that reads DLQ entries
and republishes their preserved original_payloads to a target Kafka topic.
Use it after fixing a bug that caused messages to land in the DLQ — the script
drives those messages back through Kafka so the normal consumer picks them up
again.
When to use it¶
- A downstream sink regressed, the bug has been fixed, and you want to reprocess the dead-lettered batch without editing offsets by hand.
- You want to inspect and filter the DLQ contents before replaying (pair
--dry-runwith--filterto count matches). - You want to divert DLQ content to a shadow topic for offline analysis
(set
--target-topicto a scratch topic).
Usage¶
# Dry run, count matches only
uv run python scripts/replay_dlq.py \
--dlq-config=config.yaml \
--target-topic=search-requests \
--dry-run \
--filter='customer_id:42'
# Real replay, bounded
uv run python scripts/replay_dlq.py \
--dlq-config=config.yaml \
--target-topic=search-requests \
--limit=1000
# Different broker for the target
uv run python scripts/replay_dlq.py \
--dlq-config=config.yaml \
--target-topic=search-requests \
--target-brokers=prod-kafka:9092
The script prints replayed N/M to stderr every 1000 records and a final
summary line (summary: read=… published=… filtered_out=… errors=…) when it
finishes. Ctrl+C is handled gracefully — the in-flight produce completes and
the producer is flushed before exit.
Caveats¶
- Duplicate processing: the script does NOT deduplicate against the
source topic. If your sink is not idempotent (see
BaseSink.idempotent) and your pipeline has already partially processed the DLQ contents, replaying will cause duplicate work. Verify the sink’s idempotency contract before replaying. - Filter is substring-only:
--filter='customer_id:42'is a plainincheck against each payload’s JSON string. It does not parse JSON or support path expressions. For structural filtering, pipe the DLQ topic throughkcat | jqin advance and feed a pre-filtered topic to the script. --limitdoes not resume:--limit=1000stops after 1000 entries have been READ (filter-rejected entries still count). Running with--limittwice replays the first N entries both times — the consumer group is throwaway (unique per invocation), so offsets are not preserved between runs.- Original Kafka envelope is NOT fully preserved: the replay script
publishes only the
original_payloadsbytes to the target topic. The original message’skey,partition, andheadersare NOT currently captured byDLQMessage.serialize()and therefore cannot be restored by the replay script. If downstream consumers depend on those fields (e.g. the source partition for a keyed-routing consumer, or headers for tracing), point--target-topicat a retry topic whose producer re-derives them from the payload body. Operators who need a lossless replay should post-process the DLQ output before publishing. - Partial-entry failure is per-record: each DLQ entry may expand to multiple target-topic messages (one per preserved payload). If a produce fails midway through a multi-payload entry, earlier sub- payloads from that entry have already been sent. The stderr summary reports the last successful DLQ offset so operators can resume, but inspection of the specific entry may be necessary to avoid duplicates.
- Exit code 1 with partial success: if the producer fails mid-run after some records were published, the script exits 1 and logs the last successful DLQ offset to stderr. Operators can re-run with an adjusted filter (e.g. filtering for records newer than that offset’s timestamp) to resume.
Sink connections¶
Each sink follows a lifecycle of connect() at startup and close() at shutdown.
Startup¶
connect() is called once during worker startup for every configured sink. If any
sink fails to connect, the worker crashes immediately with a clear error – this
prevents silent failures where the pipeline runs but cannot deliver results.
Shutdown¶
close() is called for every sink during worker shutdown. Close errors are logged
but never raised, so shutdown proceeds cleanly even if a connection is already lost.
Connection-state flag (custom sinks)¶
BaseSink maintains an is_connected: bool read-only property backed by the
mark_connected() / mark_disconnected() methods. You do NOT call these from
your subclass — the SinkManager drives the flag: connect_all() calls
mark_connected() after a successful connect(), and close_all() calls
mark_disconnected() after close() runs (even when close() raises, so the
flag faithfully reflects the manager’s view of the connection). Readiness
probes (/readyz) read is_connected to decide whether to report the sink
as ready. Your custom sink only needs to implement connect() / close();
the bookkeeping is handled for you.
Sink-specific details¶
PostgresSink exposes the asyncpg connection pool via its pool property. This
is available in on_ready() for running migrations, loading lookup tables, or any
direct database access needed at startup.
HttpSink uses httpx.AsyncClient with configurable timeout and headers:
sinks:
http:
webhook:
url: "https://hooks.example.com/notify"
method: "POST"
timeout_seconds: 10
headers:
Authorization: "Bearer ${WEBHOOK_TOKEN}"
RedisSink uses redis.asyncio (via from_url), connecting to the URL specified
in config:
KafkaSink inherits kafka.brokers when its own brokers field is empty, so you
only need to specify brokers once for sinks on the same cluster.
MongoSink uses motor’s AsyncIOMotorClient for native asyncio support.
FileSink requires base_path (non-empty) and validates it exists at connect time. All payload paths are contained within base_path — traversal attempts raise ValueError.
Individual payload paths must have existing parent directories.
Custom sinks (plugin API)¶
Drakkar discovers third-party sink types through Python’s standard
importlib.metadata entry points.
A plugin package registers a BaseSink subclass under the
drakkar.sinks group, and SinkRegistry.discover() (called once when
SinkManager is constructed) loads it automatically — no monkey-patches,
no fork of Drakkar.
The same registry holds the built-in sinks. SinkRegistry.get(name)
returns the class registered under that name, whether it shipped with
Drakkar or arrived through a plugin. A plugin may override a built-in by
re-registering the same name (for example, a custom kafka sink with
extra metrics).
BaseSink contract¶
Subclass drakkar.sinks.base.BaseSink and implement three async methods:
from drakkar.sinks.base import BaseSink
from pydantic import BaseModel
class MyCustomPayload(BaseModel):
"""Whatever shape your sink consumes."""
value: str
class MyCustomSink(BaseSink[MyCustomPayload]):
sink_type = 'my_custom' # canonical type name (used in config)
idempotent = False # see "Retry contract" above
def __init__(self, name: str, config) -> None:
super().__init__(name)
self._config = config
async def connect(self) -> None:
"""Open the underlying client. Raise on failure — the worker
crashes fast with a clear startup error.
"""
async def deliver(self, payloads: list[MyCustomPayload]) -> None:
"""Send a batch. Raise on failure — the framework handles
retries, DLQ routing, and circuit-breaker bookkeeping.
"""
async def close(self) -> None:
"""Release resources. Should not raise — log and continue."""
The idempotent class attribute and the circuit-breaker / retry
behaviour described under Retry contract apply to
custom sinks identically. You do not call mark_connected /
mark_disconnected or any of the record_* circuit-breaker methods
yourself — SinkManager drives those.
Entry-point declaration¶
In your plugin’s pyproject.toml:
The left side (my_custom) is the registry key — it is what
SinkRegistry.get() returns your class for, and it is the type name
operators use in their Drakkar config. The right side is a standard
module:attribute reference resolvable by EntryPoint.load().
After pip install my-package (or uv sync in a workspace that
declares the dependency), Drakkar picks up the new sink at startup
without any further wiring.
Configuring instances¶
Plugin sinks are declared under sinks.custom.<type>.<instance> in
the worker config. The <type> segment must match the entry-point
key; <instance> is whatever name your handler uses to route
payloads. The leaf dict is passed verbatim as the second argument to
your sink’s constructor (MyCustomSink(name, config)), so you can
shape it however your sink expects:
sinks:
kafka:
results:
topic: "search-results"
custom:
my_custom:
primary:
endpoint: "https://api.example.com/v1/ingest"
api_key: "xxx"
secondary:
endpoint: "https://api.example.com/v1/audit"
api_key: "xxx"
If the <type> is not registered (plugin missing or entry-point
typo), DrakkarApp._build_sinks raises a ValueError at startup
listing the known types — failing loud beats a silently-dropped
sink.
Inspecting registered sinks¶
from drakkar.sinks import SinkRegistry
SinkRegistry.discover() # idempotent — safe to call repeatedly
print(SinkRegistry.all_names()) # ['filesystem', 'http', 'kafka', 'my_custom', ...]
print(SinkRegistry.get('my_custom')) # <class 'my_package.sinks.MyCustomSink'>
print(SinkRegistry.get('not_installed')) # None — get() never raises
Failure modes¶
If a plugin entry point fails to load (missing dependency, ImportError,
non-BaseSink target), SinkRegistry.discover() logs a structured
sink_registry_entry_point_load_failed (or ..._invalid) warning and
skips that entry. One broken plugin never crashes the worker — operators
see the warning in their log aggregator and remove or fix the offending
package.
Routing payloads to a plugin sink¶
Plugin sinks receive payloads through the custom field on
CollectResult. Each CustomPayload.sink names the configured plugin
sink instance (one of the keys you put under sinks.custom.<type> in
your config); the framework looks the instance up by name across all
plugin sinks, validates it at startup-time and per-result, and hands
your data field to the sink’s deliver() method as-is.
import drakkar as dk
from pydantic import BaseModel
class MyOutput(BaseModel):
request_id: str
payload: dict
class SearchHandler(dk.BaseDrakkarHandler):
async def on_task_complete(self, result):
out = MyOutput(request_id='abc', payload={'matches': 12})
return dk.CollectResult(
custom=[
dk.CustomPayload(sink='primary', data=out),
dk.CustomPayload(sink='secondary', data=out),
],
)
The corresponding config:
sinks:
custom:
my_custom:
primary:
endpoint: 'https://api.example.com/v1/ingest'
secondary:
endpoint: 'https://api.example.com/v1/audit'
my_custom here is the entry-point key your pyproject.toml declared
(MyCustomSink.sink_type); primary / secondary are the instance
names the handler routes to via CustomPayload.sink.
If the handler returns a CustomPayload with a sink name that no
configured plugin instance owns, SinkManager.validate_collect raises
SinkNotConfiguredError before any payload is delivered — same
fail-loud guarantee as for the built-in sinks. If the same instance
name is shared across two plugin sink types, AmbiguousSinkError is
raised so operators rename one to disambiguate.
CollectResult¶
CollectResult is the return type of on_task_complete(), on_message_complete(), and on_window_complete(). It has
one list field per sink type. Populate whichever fields match your configured sinks.
Complete example¶
This example routes a single executor result to all six sink types, with conditional routing for HTTP (webhook only for high-match results) and filesystem (JSONL log only for very high-match results):
async def on_task_complete(self, result: dk.ExecutorResult) -> dk.CollectResult | None:
matches = [line for line in result.stdout.strip().split('\n') if line]
meta = result.task.metadata
full_result = SearchResult(
request_id=meta['request_id'],
pattern=meta['pattern'],
match_count=len(matches),
duration_seconds=result.duration_seconds,
matches=matches[:50],
)
summary = SearchSummary(
request_id=meta['request_id'],
pattern=meta['pattern'],
match_count=len(matches),
duration_seconds=result.duration_seconds,
)
sinks = dk.CollectResult(
kafka=[dk.KafkaPayload(data=full_result, key=meta['request_id'].encode())],
postgres=[dk.PostgresPayload(table='search_results', data=summary)],
mongo=[dk.MongoPayload(collection='search_archive', data=full_result)],
redis=[dk.RedisPayload(key=f'search:{meta["request_id"]}', data=summary, ttl=3600)],
)
# Conditional: HTTP webhook for high-match results only
if len(matches) > 20:
notification = SearchNotification(
request_id=meta['request_id'],
pattern=meta['pattern'],
match_count=len(matches),
message=f"High match count: {len(matches)} matches for '{meta['pattern']}'",
)
sinks.http.append(dk.HttpPayload(data=notification))
# Conditional: JSONL file log for very high-match results
if len(matches) > 50:
sinks.files.append(dk.FilePayload(path='/tmp/high-match-results.jsonl', data=full_result))
return sinks
Returning None from on_task_complete() skips delivery entirely for that result. The framework
only commits Kafka offsets after all sinks confirm delivery (or delivery errors are
handled through on_delivery_error()). See Offset Commit Logic for details on watermark-based commit tracking.