Skip to content

Handler System

The handler is the user-facing entry point into Drakkar. You subclass BaseDrakkarHandler, override hooks, and the framework calls them at the right time during the message-processing pipeline. See Configuration for the full YAML reference.

Hook Reference

Hook When called Frequency Returns
on_startup(config) Before any components are created Once per worker lifetime Modified DrakkarConfig
on_ready(config, db_pool) After sinks connected, before polling Once per worker lifetime None
arrange(messages, pending) A window of messages is ready to process Once per window per partition list[ExecutorTask]
collect(result) A single task completes successfully (exit 0) Once per successful task CollectResult \| None
on_window_complete(results, messages) All tasks in a window finished Once per window per partition CollectResult \| None
on_error(task, error) A single task fails (non-zero exit, timeout, crash) Once per failed task ErrorAction \| list[ExecutorTask]
on_delivery_error(error) A sink’s deliver() raises an exception Once per failed sink delivery batch DeliveryAction
on_assign(partitions) Kafka assigns partitions during rebalance Once per rebalance event None
on_revoke(partitions) Kafka revokes partitions during rebalance Once per rebalance event None
message_label(msg) Before logging each message in arrange Once per message str

Only arrange() is required. All other hooks have safe defaults.


BaseDrakkarHandler

from drakkar import BaseDrakkarHandler

Generic type parameters

BaseDrakkarHandler accepts two optional type parameters that control automatic (de)serialization of messages:

class MyHandler(BaseDrakkarHandler[MyInput, MyOutput]):
    ...

Both type arguments must be Pydantic BaseModel subclasses. At class creation time (__init_subclass__), the framework inspects the generic arguments and sets two class attributes:

Attribute Value
input_model The InputT class, or None
output_model The OutputT class, or None

When input_model is set, every consumed message is automatically deserialized before arrange() is called:

# Framework calls this internally:
msg.payload = MyInput.model_validate_json(msg.value)

If deserialization fails, msg.payload is set to None and the raw bytes remain available in msg.value.

Non-generic usage

If you do not need typed deserialization, omit the type parameters:

class RawHandler(BaseDrakkarHandler):
    async def arrange(self, messages, pending):
        for msg in messages:
            raw: bytes = msg.value  # no auto-deserialization
            ...

Both input_model and output_model will be None.


Handler Hooks

Hooks are listed in lifecycle order. All hooks except arrange() have no-op defaults, so you only override what you need.

on_startup

async def on_startup(self, config: DrakkarConfig) -> DrakkarConfig

Called once, before any components (sinks, executor, consumer) are created. Receives the loaded config and must return a (possibly modified) DrakkarConfig. This is the only point where config can be changed at runtime.

import os

async def on_startup(self, config: dk.DrakkarConfig) -> dk.DrakkarConfig:
    cpu_count = os.cpu_count() or 4
    config.executor.max_executors = cpu_count * 2
    return config

on_ready

async def on_ready(self, config: DrakkarConfig, db_pool: object) -> None

Called after all sinks are connected and the executor pool is created, but before the consumer starts polling. Use it to initialize handler state, run database migrations, or load lookup tables.

The db_pool argument is an asyncpg.Pool if at least one postgres sink is configured; otherwise it is None.

async def on_ready(self, config: dk.DrakkarConfig, db_pool) -> None:
    self.lookup_table = {}
    if db_pool:
        rows = await db_pool.fetch('SELECT id, name FROM categories')
        self.lookup_table = {r['id']: r['name'] for r in rows}

arrange (required)

async def arrange(
    self,
    messages: list[SourceMessage],
    pending: PendingContext,
) -> list[ExecutorTask]

The only required hook. Transforms source messages into subprocess tasks. See ExecutorTask for the full task model reference.

Partition isolation. Each call receives messages from exactly one Kafka partition. Drakkar runs an independent pipeline per partition, so arrange() never mixes messages from different partitions in a single call. The maximum number of concurrent arrange() invocations equals the number of partitions assigned to this worker – one per partition at a time.

Windowing. The framework collects up to executor.window_size messages from the partition queue before calling arrange(). A window may contain fewer messages if the queue drains before reaching the limit. While the tasks from one window are executing, the next window can already be collected and arrange() called again for the same partition – windows are processed concurrently within a partition.

When it is called. The partition processor polls messages into a queue. Once enough messages accumulate (or a short timeout passes), they are batched into a window, deserialized (if input_model is set), and passed to arrange(). This happens continuously while the worker is running and the partition is assigned.

pending.pending_task_ids is a set[str] of task IDs currently in-flight for this partition. Use it for O(1) deduplication:

async def arrange(self, messages, pending):
    tasks = []
    for msg in messages:
        req = msg.payload
        if req is None:
            continue

        task_id = dk.make_task_id('rg')
        if task_id in pending.pending_task_ids:
            continue

        tasks.append(dk.ExecutorTask(
            task_id=task_id,
            args=[req.pattern, req.file_path],
            metadata={'request_id': req.request_id},
            source_offsets=[msg.offset],
        ))
    return tasks

If arrange() returns an empty list, all message offsets are immediately marked complete and committed.

collect

async def collect(self, result: ExecutorResult) -> CollectResult | None

Called after each task completes successfully (exit code 0). Runs in the context of the same partition that produced the task via arrange(). Multiple collect() calls from the same window may run concurrently as tasks finish in any order.

Process the ExecutorResult and return a CollectResult with payloads for one or more sinks, or None to skip delivery. See Sinks for all available payload types.

The result.task field carries the original ExecutorTask, including its metadata dict.

async def collect(self, result: dk.ExecutorResult) -> dk.CollectResult | None:
    meta = result.task.metadata
    matches = result.stdout.strip().splitlines()

    output = SearchResult(
        request_id=meta['request_id'],
        match_count=len(matches),
        duration_seconds=result.duration_seconds,
    )

    summary = SearchSummary(
        request_id=meta['request_id'],
        match_count=len(matches),
    )

    sinks = dk.CollectResult(
        kafka=[dk.KafkaPayload(data=output, key=meta['request_id'].encode())],
        postgres=[dk.PostgresPayload(table='results', data=summary)],
        redis=[dk.RedisPayload(key=f'search:{meta["request_id"]}', data=summary, ttl=3600)],
    )

    # conditional routing based on business logic
    if len(matches) > 20:
        notification = AlertPayload(request_id=meta['request_id'], count=len(matches))
        sinks.http.append(dk.HttpPayload(data=notification))

    return sinks

on_window_complete

async def on_window_complete(
    self,
    results: list[ExecutorResult],
    source_messages: list[SourceMessage],
) -> CollectResult | None

Called after all tasks in a window have finished (successes and failures). The results and source_messages belong to the same partition and the same window that was passed to arrange(). Use for cross-task aggregation or batch-level outputs. Returns a CollectResult or None.

async def on_window_complete(self, results, source_messages):
    succeeded = [r for r in results if r.exit_code == 0]
    if not succeeded:
        return None

    summary = WindowSummary(
        window_size=len(results),
        success_count=len(succeeded),
        avg_duration=sum(r.duration_seconds for r in succeeded) / len(succeeded),
    )
    return dk.CollectResult(
        postgres=[dk.PostgresPayload(table='window_stats', data=summary)],
    )

on_error

async def on_error(
    self,
    task: ExecutorTask,
    error: ExecutorError,
) -> ErrorAction | list[ExecutorTask]

Called when a subprocess task fails (non-zero exit, timeout, or launch error). Runs in the context of the partition that owns the task. Replacement tasks returned here are added to the same window and partition. Return one of:

Return value Behavior
ErrorAction.RETRY Re-run the same task (up to max_retries)
ErrorAction.SKIP Drop the task, continue processing (default)
list[ExecutorTask] Spawn replacement tasks instead

The ExecutorError fields:

Field Type Description
task ExecutorTask The task that failed
exit_code int or None Process exit code; None if never started/timeout
stderr str Process stderr or error description
exception str or None Exception message for timeout/launch failures
pid int or None Process ID; None if never started
async def on_error(self, task, error):
    if error.exit_code == 1 and 'TRANSIENT' in error.stderr:
        return dk.ErrorAction.RETRY
    return dk.ErrorAction.SKIP

on_delivery_error

async def on_delivery_error(self, error: DeliveryError) -> DeliveryAction

Called when a sink’s deliver() raises an exception. See Delivery Lifecycle for the full delivery flow. The DeliveryError contains the sink name, sink type, error message, and the payloads that failed.

Return value Behavior
DeliveryAction.DLQ Write to dead letter queue (default)
DeliveryAction.RETRY Retry delivery (up to max_retries from config)
DeliveryAction.SKIP Drop the payloads, continue
async def on_delivery_error(self, error):
    if error.sink_type in ('http', 'redis'):
        return dk.DeliveryAction.RETRY
    return dk.DeliveryAction.DLQ

on_assign

async def on_assign(self, partitions: list[int]) -> None

Called when new Kafka partitions are assigned to this worker during a rebalance. Use for per-partition initialization (loading state, resetting caches).

async def on_assign(self, partitions):
    for p in partitions:
        self.partition_cache[p] = {}

on_revoke

async def on_revoke(self, partitions: list[int]) -> None

Called when partitions are revoked from this worker during a rebalance. Use for cleanup (flushing buffers, saving state).

async def on_revoke(self, partitions):
    for p in partitions:
        self.partition_cache.pop(p, None)

Offset Commit Logic

Drakkar uses watermark-based offset tracking to guarantee at-least-once delivery. Understanding this is important for designing arrange() and source_offsets.

How it works

Each partition has an OffsetTracker that maintains a sorted list of offsets and their state (PENDING or COMPLETED):

  1. When messages enter arrange(), their offsets are registered as PENDING.
  2. When a task finishes and all its sink payloads are delivered (or routed to DLQ/skipped), the offsets from task.source_offsets are marked COMPLETED.
  3. The framework asks: what is the highest consecutive completed offset starting from the lowest tracked offset? That value + 1 is committed to Kafka.

Example

A window of 5 messages from partition 3 with offsets [100, 101, 102, 103, 104]:

Step 1: All registered as PENDING
  100=PENDING  101=PENDING  102=PENDING  103=PENDING  104=PENDING
  committable() → None (100 is not completed)

Step 2: Offset 104 finishes first (fastest subprocess)
  100=PENDING  101=PENDING  102=PENDING  103=PENDING  104=COMPLETED
  committable() → None (100 is still pending)

Step 3: Offsets 100 and 101 finish
  100=COMPLETED  101=COMPLETED  102=PENDING  103=PENDING  104=COMPLETED
  committable() → 102 (consecutive run: 100, 101 → commit 101+1)

Step 4: Offsets 102 and 103 finish
  100=COMPLETED  101=COMPLETED  102=COMPLETED  103=COMPLETED  104=COMPLETED
  committable() → 105 (all consecutive → commit 104+1)

The key property: a fast task finishing before earlier tasks does not advance the commit position. Drakkar will never commit offset 105 while offset 100 is still in flight. This prevents message loss on worker crash — Kafka will redeliver from the last committed offset.

What this means for your handler

  • source_offsets must be accurate. Every offset in source_offsets blocks the commit watermark until its task completes and all payloads are delivered. If you include an offset you don’t actually process, it will stall commits for the entire partition.

  • Fewer tasks per window = lower commit latency. A window of 100 messages producing 100 tasks won’t commit until the slowest task finishes. If commit latency matters, use a smaller window_size.

  • One task can cover multiple offsets. If your arrange() batches several messages into one task, list all their offsets in source_offsets. They’ll all be marked complete together when the task finishes.

  • Empty arrange = instant commit. If arrange() returns [], all offsets in the window are marked complete immediately and committed.

When commits happen

Commits are attempted at two points:

  1. After each window completes — when the last task in a window finishes and all sink deliveries succeed.
  2. On partition revocation — pending offsets are drained (up to executor.drain_timeout_seconds), then the highest committable offset is committed before the partition is released.
  3. On shutdown — same drain + commit as revocation.

Commits are per-partition and asynchronous. They do not block the processing loop.


Typed Messages

Define Pydantic models for your input and output schemas, then pass them as type parameters to BaseDrakkarHandler:

from pydantic import BaseModel

class SearchRequest(BaseModel):
    request_id: str
    pattern: str
    file_path: str

class SearchResult(BaseModel):
    request_id: str
    match_count: int
    duration_seconds: float

class MyHandler(dk.BaseDrakkarHandler[SearchRequest, SearchResult]):
    async def arrange(self, messages, pending):
        for msg in messages:
            req: SearchRequest = msg.payload  # auto-deserialized
            ...

The framework calls SearchRequest.model_validate_json(msg.value) before arrange(). If parsing fails, msg.payload is None and the raw bytes remain in msg.value.

Access either form in your hooks:

req = msg.payload        # SearchRequest instance (or None on parse error)
raw_bytes = msg.value    # always available

message_label()

def message_label(self, msg: SourceMessage) -> str

Override to provide a human-readable label used in structured log lines and the debug web UI. The default returns partition:offset.

def message_label(self, msg):
    if msg.payload:
        return f'{msg.partition}:{msg.offset} [{msg.payload.request_id[:8]}]'
    return f'{msg.partition}:{msg.offset}'

Task Labels

ExecutorTask.labels is a dict[str, str] of user-defined key-value pairs displayed alongside task details in the debug UI. Set them in arrange():

dk.ExecutorTask(
    task_id=dk.make_task_id('rg'),
    args=[req.pattern, req.file_path],
    metadata={'request_id': req.request_id},
    labels={
        'request_id': req.request_id,
        'pattern': req.pattern,
    },
    source_offsets=[msg.offset],
)

Labels appear on:

  • The live timeline view
  • The task detail page
  • Running and finished task tables
  • The debug trace view

Labels are purely for display. Use metadata for data you need in collect() or on_error().


Periodic Tasks

Decorate async handler methods with @periodic(seconds=N) to run them on a recurring interval.

from drakkar import periodic

class MyHandler(dk.BaseDrakkarHandler[MyInput, MyOutput]):

    @periodic(seconds=60)
    async def refresh_cache(self):
        self.lookup_table = await fetch_lookup_data()

    @periodic(seconds=30, on_error='stop')
    async def health_check(self):
        if not os.path.isdir('/data/corpus'):
            raise RuntimeError('Corpus directory missing')

Behavior

  • Periodic tasks start after on_ready() completes.
  • They run in the same async event loop as the rest of the worker.
  • Overlapping runs are prevented – the next interval starts only after the current invocation finishes.
  • All periodic tasks are cancelled during shutdown.

Error handling

on_error value Behavior on exception
'continue' (default) Log the error, keep running on schedule
'stop' Log the error, cancel this task permanently

Custom Prometheus Metrics

Declare prometheus_client metrics as class attributes on your handler. The framework auto-discovers them and exposes them on the debug UI metrics page alongside built-in Drakkar metrics.

from prometheus_client import Counter, Gauge, Histogram

class MyHandler(dk.BaseDrakkarHandler[MyInput, MyOutput]):
    items_processed = Counter(
        'app_items_processed_total',
        'Total items processed',
    )
    active_sessions = Gauge(
        'app_active_sessions',
        'Currently active sessions',
    )
    match_count = Histogram(
        'app_match_count',
        'Matches per request',
        buckets=(0, 1, 5, 10, 50, 100, 500),
    )

    async def collect(self, result):
        self.items_processed.inc()
        self.match_count.observe(len(result.stdout.splitlines()))
        ...

The auto-discovery scans the handler’s class hierarchy (MRO) for attributes that are instances of MetricWrapperBase (Counter, Gauge, Histogram, Summary, Info). Use an app_ prefix to distinguish your metrics from Drakkar’s built-in drakkar_ metrics in dashboards and queries.