Fan-out: One Message → Many Tasks → One Aggregate¶
A common pipeline shape: one incoming message describes work that must be split into many subprocess invocations, and the useful output is a single aggregated record over all those subprocess outcomes — not one output per task.
Drakkar supports this natively via the on_message_complete hook and
the MessageGroup dataclass.
When to use this page
If your pipeline is strictly 1-in / 1-out (one input → one subprocess
→ one output), you don’t need any of this — use on_task_complete
and ignore the rest of the page. This page is for pipelines where
one input deliberately produces several subprocess tasks.
The shape¶
flowchart LR
M["SourceMessage<br/>(e.g. 3 patterns × 2 files)"] --> A[arrange]
A --> T1[Task 1]
A --> T2[Task 2]
A --> T3[Task 3]
A --> T4[Task 4]
A --> T5[Task 5]
A --> T6[Task 6]
T1 --> C["on_task_complete<br/>(per-task detail)"]
T2 --> C
T3 --> C
T4 --> C
T5 --> C
T6 --> C
T1 & T2 & T3 & T4 & T5 & T6 -. all terminal .-> MC["on_message_complete<br/>(one aggregate)"]
C --> S1[per-task sinks]
MC --> S2[aggregate sinks]
arrange()expands the input into N tasks (the fan-out)on_task_complete(result)fires once per task — typical per-task fanouton_message_complete(group)fires once per source message, after every task derived from it has reached a terminal state
The two hooks are independent. You can use both, either, or neither.
The MessageGroup passed to on_message_complete¶
class MessageGroup(BaseModel):
source_message: SourceMessage # the original Kafka message
tasks: list[ExecutorTask] # full history (see below)
results: list[ExecutorResult] # terminal successes
errors: list[ExecutorError] # terminal failures (SKIP / retries exhausted)
started_at: float # monotonic, when arrange() scheduled first task
finished_at: float # monotonic, when last task terminal'd
# Convenience properties
succeeded: int # len(results)
failed: int # len(errors)
total: int # len(tasks) — includes REPLACED tasks (see below)
replaced: int # total - succeeded - failed
all_succeeded: bool # True iff total > 0 and failed == 0
any_failed: bool # failed > 0
is_empty: bool # total == 0 (arrange() returned nothing)
duration_seconds: float
What counts as “terminal”?¶
A task is terminal when its outcome is decided:
| Outcome | Contributes to |
|---|---|
Subprocess exit 0, on_task_complete ran |
results |
on_error returned SKIP |
errors |
on_error returned RETRY, retries exhausted |
errors |
on_error returned list[ExecutorTask] (replaced) |
neither — the replacements take its place |
Unexpected exception in on_task_complete |
errors (synthesised) |
Replaced-original tasks are kept in tasks (full history for debugging)
but not counted in results or errors. The REPLACEMENTS eventually
land in results or errors as their own terminal outcomes.
You can always recover the replacement count:
Tracing the replacement chain¶
Every task the framework schedules in response to an on_error list-return
has its parent_task_id auto-populated with the failing task’s task_id
(unless the handler explicitly set it). In on_message_complete you can
walk the chain upward to find the arrange()-produced root:
async def on_message_complete(self, group):
by_id = {t.task_id: t for t in group.tasks}
for task in group.tasks:
chain = [task]
while chain[-1].parent_task_id:
chain.append(by_id[chain[-1].parent_task_id])
# chain[0] is this task; chain[-1] is the original arrange() task
A minimal example¶
from pydantic import BaseModel, Field
import drakkar as dk
class SearchRequest(BaseModel):
request_id: str
patterns: list[str] = Field(min_length=1)
file_paths: list[str] = Field(min_length=1)
class PerTaskResult(BaseModel):
request_id: str
pattern: str
file_path: str
match_count: int
class RequestSummary(BaseModel):
request_id: str
total_tasks: int
succeeded: int
failed: int
total_matches: int
class MyHandler(dk.BaseDrakkarHandler[SearchRequest, PerTaskResult]):
async def arrange(self, messages, pending):
# One message → patterns × files tasks. Every produced task
# shares the message's source_offsets — this is what binds them
# into a MessageGroup.
tasks = []
for msg in messages:
req = msg.payload
for pattern in req.patterns:
for file_path in req.file_paths:
tasks.append(
dk.ExecutorTask(
task_id=dk.make_task_id('search'),
args=[pattern, file_path],
metadata={
'request_id': req.request_id,
'pattern': pattern,
'file_path': file_path,
},
source_offsets=[msg.offset],
)
)
return tasks
async def on_task_complete(self, result):
"""Fine-grained per-task output (optional)."""
meta = result.task.metadata
matches = sum(1 for line in result.stdout.split('\n') if line)
per_task = PerTaskResult(
request_id=meta['request_id'],
pattern=meta['pattern'],
file_path=meta['file_path'],
match_count=matches,
)
return dk.CollectResult(
kafka=[dk.KafkaPayload(data=per_task, key=meta['request_id'].encode())],
)
async def on_message_complete(self, group):
"""Called ONCE per SearchRequest, after all its tasks finished."""
req: SearchRequest = group.source_message.payload
if req is None or group.is_empty:
return None
total_matches = sum(
sum(1 for line in r.stdout.split('\n') if line)
for r in group.results
)
summary = RequestSummary(
request_id=req.request_id,
total_tasks=group.total,
succeeded=group.succeeded,
failed=group.failed,
total_matches=total_matches,
)
# One aggregate record per request to a "summaries" topic.
return dk.CollectResult(
kafka=[
dk.KafkaPayload(
data=summary,
key=req.request_id.encode(),
sink='summaries',
),
],
)
Offset commit semantics¶
Offsets are committed per source message, after on_message_complete
returns. This means:
- A fast message whose tasks finish quickly commits its offset immediately, even if another (slow) message in the same arrange() window is still in flight.
- If
on_message_completeraises, the exception is logged and offsets commit anyway — the raise doesn’t stall the partition. - On crash / revoke before
on_message_completecompletes, the offset is NOT committed. The message replays on restart (at-least-once). Any partial side effects already delivered viaon_task_completeare duplicated on replay; design downstream sinks to be idempotent (userequest_idas a primary/dedup key).
At-least-once, not exactly-once
Replays can cause duplicates in the per-task sinks. The message-level
aggregate in on_message_complete is at-least-once too: a crash
between “aggregate delivered to Kafka” and “offset committed to Kafka”
produces a duplicate aggregate on replay. If this matters, dedupe
downstream by (request_id, partition, offset).
Choosing which hook(s) to implement¶
| Your shape | Use |
|---|---|
| 1 message → 1 task → 1 output | just on_task_complete |
| 1 message → N tasks → N outputs | just on_task_complete |
| 1 message → N tasks → 1 aggregate output | on_message_complete only (return None from on_task_complete) |
| 1 message → N tasks → N detail + 1 aggregate | BOTH on_task_complete and on_message_complete |
| Multi-message batch metrics | on_window_complete (coarser than message-level) |
All three hooks coexist — they fire on the same underlying data but at different granularities. Which to use is a choice about what you want downstream consumers to see, not a framework constraint.
on_message_complete vs on_window_complete¶
on_message_complete |
on_window_complete |
|
|---|---|---|
| Fires for | one source message | one arrange() batch |
| Receives | MessageGroup |
list[ExecutorResult], list[SourceMessage] |
| Granularity | per-message | per-window (may span many messages) |
| Offset commit order | before commit | after some offsets may already be committed |
| Typical use | request-level aggregation | dashboard metrics, window-level logs |
Error handling across the group¶
The group doesn’t need every task to succeed to fire. The hook ALWAYS fires when all tasks reach a terminal state, whether that’s success or failure. Decide what to emit based on the group’s shape:
async def on_message_complete(self, group):
if group.is_empty:
# arrange() returned nothing for this message — you may still
# want to emit an audit record so the request isn't invisible.
return self._emit_skipped(group.source_message)
if group.all_succeeded:
return self._emit_success_aggregate(group)
if group.any_failed and group.succeeded == 0:
# Every task failed — emit a dead-letter-style summary.
return self._emit_total_failure(group)
# Partial failure: some succeeded, some didn't.
return self._emit_partial_aggregate(group)
Replacement chains¶
on_error returning a replacement list is a common pattern for
“subdivide a failed task into smaller work.” The replacements become
part of the same MessageGroup automatically — the group doesn’t
complete until every replacement (and any further replacements from
their failures) settles.
async def on_error(self, task, error):
if error.exception and 'memory' in (error.exception or '').lower():
# Split the file in half and try each part as a smaller task.
return [
dk.ExecutorTask(
task_id=dk.make_task_id('half1'),
args=[...],
source_offsets=task.source_offsets, # REQUIRED: inherit
),
dk.ExecutorTask(
task_id=dk.make_task_id('half2'),
args=[...],
source_offsets=task.source_offsets,
),
]
return dk.ErrorAction.SKIP
Replacements must inherit source_offsets
A replacement task with empty or different source_offsets will not
be tracked by the original message’s MessageGroup. Copy
task.source_offsets onto every replacement unless you deliberately
want to detach it.
The framework auto-populates parent_task_id so you can trace the
chain later; source_offsets is the handler’s responsibility.
Replacement accounting — Window vs MessageGroup¶
The framework tracks tasks in two overlapping accounting structures and it is worth knowing how each treats a replaced original.
MessageGroup (delivered to on_message_complete):
tasks— full history: original + every replacement.len(tasks) == total.results— terminal successes only. Original that was replaced is NOT here; the successor’s success lands here.errors— terminal failures only (SKIP or retries exhausted). A replaced original is also NOT here — it wasn’t a terminal failure, it was replaced.replaced— the delta:total - succeeded - failed. This is where replaced originals show up.
So MessageGroup cleanly separates the three outcome classes, and
group.total == group.succeeded + group.failed + group.replaced always.
window.results (delivered to on_window_complete as the first arg):
This is a slightly different beast. It contains the ExecutorResult
of every task invocation that reached a terminal outcome — both
successes AND subprocess-level failures (non-zero exit code that ended
in SKIP or retries-exhausted). Replaced originals do NOT contribute an
entry.
Concrete example:
| Scenario | total_tasks |
len(results) delivered to on_window_complete |
|---|---|---|
| 1 task, success | 1 | 1 (the success) |
| 1 task, fails → SKIP | 1 | 1 (the failure result) |
| 1 task, fails → RETRY exhausted | 1 | 1 (the final retry’s failure result) |
| 1 task, fails → replaced by 2, both succeed | 3 | 2 (just the replacements — original omitted) |
| 1 task, fails → replaced by 1, replacement also fails → SKIP | 2 | 1 (replacement’s failure; original omitted) |
| 2 tasks, one succeeds, one fails → replaced by 1 that succeeds | 3 | 2 (first success + replacement success) |
The rule of thumb: window.results has one entry per actually-executed
task invocation that did not hand off to a replacement. total_tasks
counts the full schedule. The gap between them equals the number of
replaced originals.
async def on_window_complete(self, results, source_messages):
# `len(results)` is the count of outcomes — NOT necessarily the
# count of originally-scheduled tasks. If you need the total,
# prefer summing `group.total` across MessageGroups in
# `on_message_complete`, which is always consistent.
for r in results:
# Each `r` is an ExecutorResult from a real subprocess run.
...
Why omit the replaced original?
Semantically, an on_error list-return says “this invocation
didn’t count — route around it with these instead.” Appending a
placeholder result for the replaced original would double-count
the work in on_window_complete (users typically iterate
results for summary aggregation). The trade-off: when
replacements occur, len(window.results) < total_tasks, which is
what the accounting table above captures.
Multi-message tasks (fan-IN)¶
If a task has source_offsets = [a, b, c] (one task represents work for
three source messages), it participates in THREE MessageGroups. Its
terminal outcome is reported to all of them. Each group only completes
when every task it has a stake in has reached a terminal state.
This is uncommon but legitimate — deduplication, batched external API
calls covering multiple messages at once, or cross-message aggregation
work that makes one subprocess cheaper than N. The tracking is
transparent; no new field is needed beyond the existing list-typed
source_offsets.
Example: dedupe identical queries across a window¶
A window of messages sometimes contains duplicate (pattern, file_path) combinations. Instead of running the same search subprocess many times, combine them into one task and let its result feed every message that asked the same question:
async def arrange(self, messages, pending):
# Group messages by the actual search key.
groups: dict[tuple[str, str], list[SourceMessage]] = {}
for msg in messages:
key = (msg.payload.pattern, msg.payload.file_path)
groups.setdefault(key, []).append(msg)
tasks = []
for (pattern, file_path), msgs in groups.items():
tasks.append(
dk.ExecutorTask(
task_id=dk.make_task_id('rg'),
args=[pattern, file_path],
# ONE task for every message that asked the same question.
# When the task completes, its result lands in EACH of those
# messages' MessageGroups, so every request_id is answered.
source_offsets=[m.offset for m in msgs],
metadata={
'pattern': pattern,
'file_path': file_path,
'request_ids': [m.payload.request_id for m in msgs],
},
)
)
return tasks
async def on_message_complete(self, group):
# Even though a shared task produced group.results[0], each group
# fires independently so the handler gets one callback per message.
# The request_id comes from the source_message, not the task.
req = group.source_message.payload
return dk.CollectResult(
kafka=[
dk.KafkaPayload(
data=Aggregate(request_id=req.request_id, ...),
key=req.request_id.encode(),
sink='results',
),
],
)
Example: single validation task shared across the window¶
Some pipelines want to run a quick validation pass once per window (e.g. a schema check, a rate-limit probe) whose outcome applies to every message that arrive in that batch. Express this as a single task tied to every offset:
async def arrange(self, messages, pending):
tasks = [
dk.ExecutorTask(
task_id='validate',
args=['--check-health'],
source_offsets=[m.offset for m in messages],
),
]
for msg in messages:
tasks.append(
dk.ExecutorTask(
task_id=dk.make_task_id('proc'),
args=['--process', msg.payload.data],
source_offsets=[msg.offset],
)
)
return tasks
Each message’s MessageGroup will have total = 2: the shared
validation result plus that message’s own processing task. The shared
task’s ExecutorResult appears in all groups’ results lists — the
same instance, so treat it as read-only.
Gotchas¶
-
Same partition only. All offsets in
source_offsetsmust be from the current partition (aPartitionProcessoronly knows about its own partition’s messages). Cross-partition offsets are silently skipped because no tracker exists for them. If you need cross-partition fan-in, run a downstream worker that consumes the output of this one. -
Replacements must inherit the multi-offset list. If your
on_errorreturns replacement tasks for a fan-in task, each replacement should carry the samesource_offsets(or an intentional subset) — otherwise some groups will hang waiting on a task that no longer exists:
async def on_error(self, task, error):
return [
dk.ExecutorTask(
task_id=dk.make_task_id('retry'),
args=[...],
source_offsets=task.source_offsets, # INHERIT
),
]
-
Shared results are shared objects.
group.resultsmay hold the sameExecutorResultinstance for multiple message groups when a fan-in task contributed to each. Treat results as immutable; if you need to annotate, build a new model inon_message_complete. -
Silent skip for bogus offsets. If
source_offsetsincludes an offset that isn’t currently tracked (e.g. from a previous window or a handler bug), the framework silently ignores it rather than crashing. The task still runs; its outcome just isn’t reported to a non-existent group.
What’s NOT in scope (yet)¶
Grouping that spans multiple source messages (e.g. “wait for 5 related messages with the same business key, then aggregate across them”) is a different problem — it’s stateful aggregation, and it interacts with Kafka partitioning in ways that can’t be hidden behind a handler hook without serious trade-offs.
Drakkar’s current stance: do that downstream. Emit your
on_message_complete aggregates to a Kafka topic, and run a second
Drakkar worker (or Kafka Streams / Flink / your own consumer) that
groups those aggregates by business key. That worker owns the group
state and the termination condition.
A future get_source_group_id(msg) + on_source_group_complete(group)
hook for same-partition grouping may land once the termination semantics
are worked out.
Precomputed tasks in a fan-out group¶
Any task in the fan-out — shared or per-message — can supply a
PrecomputedResult
instead of running a subprocess. Mixing precomputed and real tasks in
the same MessageGroup works transparently: group.results may
contain results from both sources. Tell them apart via
result.pid is None (precomputed) versus a numeric pid (real subprocess).
Common shape: cache-hit per-message task plus a real shared validation
task. The fan-in lookup runs once (cheap), and each message’s
on_message_complete sees both its own cached result and the shared
one.
Events emitted by the recorder¶
Each hook completion produces a distinct event in the flight recorder, so the debug UI, Prometheus queries, and downstream tooling can tell the three stages apart:
| Event | Fires after | Grain |
|---|---|---|
task_complete |
on_task_complete() returns |
per task |
message_complete |
on_message_complete() returns |
per source message |
window_complete |
on_window_complete() returns |
per arrange() window |
Each event carries duration, output_message_count, and stage-specific
fields (task_id on task_complete; offset + task_count +
succeeded + failed + replaced on message_complete; window_id
+ task_count on window_complete). Use these to build per-stage
dashboards (“how often do my requests fail entirely?”) or to hunt slow
hooks.
vs task_completed
task_completed (past tense, with a -d) is a SEPARATE event that
marks the moment a subprocess exits cleanly — emitted by the
executor, not the hook. The pipeline sees task_completed first,
then (if on_task_complete is overridden) task_complete once the
hook finishes. The naming is subtle but the distinction is what
lets you debug “my subprocess is fast but my hook is slow” without
guessing.
Integration demo¶
A full end-to-end example with real Kafka, Postgres, Mongo, and Redis
is in integration/worker/:
models.py—SearchRequest(withpatterns,file_pathslists) andSearchAggregatehandler.py—arrange()fans out patterns × files;on_task_completeemits per-task to Kafka/Mongo/Redis/Postgres archive;on_message_completeemits one aggregate per request to Kafka priority topic, and conditionally to the hot Postgres DB and a webhook
Run with:
Then watch the debug UIs at :8081–:8083 — each request fans out to
several subprocess tasks, and exactly one aggregate row per request
lands in the hot_recent_matches Postgres table.