Skip to content

Handler System

The handler is the user-facing entry point into Drakkar. You subclass BaseDrakkarHandler, override hooks, and the framework calls them at the right time during the message-processing pipeline. See Configuration for the full YAML reference.

Hook Reference

Hook When called Frequency Returns
on_startup(config) Before any components are created Once per worker lifetime Modified DrakkarConfig
on_ready(config, db_pool) After sinks connected, before polling Once per worker lifetime None
arrange(messages, pending) A window of messages is ready to process Once per window per partition list[ExecutorTask]
on_task_complete(result) A single task completes successfully (exit 0) Once per successful task CollectResult \| None
on_message_complete(group) All tasks derived from a single source message reached a terminal state Once per source message CollectResult \| None
on_window_complete(results, messages) All tasks in a window finished Once per window per partition CollectResult \| None
on_error(task, error) A single task fails (non-zero exit, timeout, crash) Once per failed task ErrorAction \| list[ExecutorTask]
on_delivery_error(error) A sink’s deliver() raises an exception Once per failed sink delivery batch DeliveryAction
on_assign(partitions) Kafka assigns partitions during rebalance Once per rebalance event None
on_revoke(partitions) Kafka revokes partitions during rebalance Once per rebalance event None
message_label(msg) Before logging each message in arrange Once per message str
task_priority(task) Before each task waits on the executor pool Once per task (incl. retries) sortable key (any <-comparable value)
arrange_http_request(req, pending) One HTTP request passed auth + rate-limit + body parsing Once per HTTP request (webapp only) list[ExecutorTask]
on_http_request_complete(group) All tasks from one HTTP request reached a terminal state Once per HTTP request (webapp only) HttpResponseT (a Pydantic model)
http_request_id(req, headers) After body parsing, before task fan-out Once per HTTP request (webapp only) str (validated; ≤64 chars, ASCII, no whitespace)
http_request_label(req, request_id) Before logging each HTTP request Once per HTTP request (webapp only) str

Only arrange() is required. All other hooks have safe defaults. The four *http* hooks at the bottom are invoked only when webapp.enabled=true and the handler subclass declares concrete types in the HttpRequestT / HttpResponseT slots; see Generic type parameters below for the four-parameter form.


BaseDrakkarHandler

from drakkar import BaseDrakkarHandler

Generic type parameters

BaseDrakkarHandler accepts up to four optional type parameters. The first two control Kafka-path message (de)serialization; the third and fourth opt the handler into the webapp pipeline:

# Two-param form (Kafka only). HttpRequestT / HttpResponseT default to None.
class MyHandler(BaseDrakkarHandler[MyInput, MyOutput]):
    ...

# Four-param form (Kafka + webapp). Required when webapp.enabled=true.
class MyHandler(BaseDrakkarHandler[KafkaIn, KafkaOut, HttpReq, HttpResp]):
    ...

The HTTP slots use PEP 696 default TypeVars, so existing two-param subclasses keep working unchanged – the third and fourth slots materialise to None and the framework never invokes the HTTP hooks. Four-param subclasses opt into the webapp pipeline by declaring concrete Pydantic models in the new slots; the framework reads those at startup and raises ConfigurationError when webapp.enabled=true but a slot is left at the default. See Webapp → Enabling for the full setup.

All declared type arguments must be Pydantic BaseModel subclasses. At class creation time (__init_subclass__), the framework inspects the generic arguments and sets four class attributes:

Attribute Value
input_model The InputT class, or None
output_model The OutputT class, or None
http_request_model The HttpRequestT class, or None
http_response_model The HttpResponseT class, or None

When input_model is set, every consumed message is automatically deserialized before arrange() is called:

# Framework calls this internally:
msg.payload = MyInput.model_validate_json(msg.value)

If deserialization fails, msg.payload is set to None and the raw bytes remain available in msg.value.

When http_request_model is set, every HTTP request body is parsed into the model before arrange_http_request() is called; parse failures return a flat 422 response without invoking the runner.

Non-generic usage

If you do not need typed deserialization, omit the type parameters:

class RawHandler(BaseDrakkarHandler):
    async def arrange(self, messages, pending):
        for msg in messages:
            raw: bytes = msg.value  # no auto-deserialization
            ...

input_model, output_model, http_request_model, and http_response_model will all be None.


Handler Hooks

Hooks are listed in lifecycle order. All hooks except arrange() have no-op defaults, so you only override what you need.

on_startup

async def on_startup(self, config: DrakkarConfig) -> DrakkarConfig

Called once, before any components (sinks, executor, consumer) are created. Receives the loaded config and must return a (possibly modified) DrakkarConfig. This is the only point where config can be changed at runtime.

import os

async def on_startup(self, config: dk.DrakkarConfig) -> dk.DrakkarConfig:
    cpu_count = os.cpu_count() or 4
    config.executor.max_executors = cpu_count * 2
    return config

on_ready

async def on_ready(self, config: DrakkarConfig, db_pool: object) -> None

Called after all sinks are connected and the executor pool is created, but before the consumer starts polling. Use it to initialize handler state, run database migrations, or load lookup tables.

The db_pool argument is an asyncpg.Pool if at least one postgres sink is configured; otherwise it is None.

async def on_ready(self, config: dk.DrakkarConfig, db_pool) -> None:
    self.lookup_table = {}
    if db_pool:
        rows = await db_pool.fetch('SELECT id, name FROM categories')
        self.lookup_table = {r['id']: r['name'] for r in rows}

arrange (required)

async def arrange(
    self,
    messages: list[SourceMessage],
    pending: PendingContext,
) -> list[ExecutorTask]

The only required hook. Transforms source messages into subprocess tasks. See ExecutorTask for the full task model reference.

Partition isolation. Each call receives messages from exactly one Kafka partition. Drakkar runs an independent pipeline per partition, so arrange() never mixes messages from different partitions in a single call. The maximum number of concurrent arrange() invocations equals the number of partitions assigned to this worker – one per partition at a time.

Windowing. The framework collects up to executor.window_size messages from the partition queue before calling arrange(). A window may contain fewer messages if the queue drains before reaching the limit. While the tasks from one window are executing, the next window can already be collected and arrange() called again for the same partition – windows are processed concurrently within a partition.

When it is called. The partition processor polls messages into a queue. Once enough messages accumulate (or a short timeout passes), they are batched into a window, deserialized (if input_model is set), and passed to arrange(). This happens continuously while the worker is running and the partition is assigned.

pending.pending_task_ids is a set[str] of task IDs currently in-flight for this partition. Use it for O(1) deduplication:

async def arrange(self, messages, pending):
    tasks = []
    for msg in messages:
        req = msg.payload
        if req is None:
            continue

        task_id = dk.make_task_id('rg')
        if task_id in pending.pending_task_ids:
            continue

        tasks.append(dk.ExecutorTask(
            task_id=task_id,
            args=[req.pattern, req.file_path],
            metadata={'request_id': req.request_id},
            source_offsets=[msg.offset],
        ))
    return tasks

If arrange() returns an empty list, all message offsets are immediately marked complete and committed.

Precomputed task results (skip the subprocess)

Sometimes the handler already knows what a task would output — from a cache, a lookup table, deterministic logic, or any other shortcut. Attach a PrecomputedResult to the task via the precomputed field and the framework will skip the subprocess entirely:

async def arrange(self, messages, pending):
    tasks = []
    for msg in messages:
        cached = self.cache.get(msg.payload.request_id)
        if cached is not None:
            # Short-circuit: handler supplies the outcome directly.
            tasks.append(dk.ExecutorTask(
                task_id=dk.make_task_id('direct'),
                source_offsets=[msg.offset],
                metadata={'request_id': msg.payload.request_id},
                precomputed=dk.PrecomputedResult(stdout=cached),
            ))
        else:
            tasks.append(dk.ExecutorTask(
                task_id=dk.make_task_id('run'),
                args=['--process', msg.payload.request_id],
                source_offsets=[msg.offset],
            ))
    return tasks

Semantics:

  • The framework synthesises an ExecutorResult from the precomputed stdout / stderr / exit_code / duration_seconds and invokes on_task_complete immediately. Downstream hooks (on_message_complete, on_window_complete) see the result indistinguishably from a real subprocess outcome — except result.pid is None and result.task.precomputed is not None if they want to distinguish.
  • No pool slot is consumed. Precomputed tasks bypass the executor semaphore entirely; a cache-hit-heavy workload is not capped by max_executors. A separate counter (drakkar_tasks_precomputed_total) tracks the volume.
  • Non-zero exit_code routes through on_error exactly like a real subprocess failure — RETRY, SKIP, and replacement-list return values all work. This lets the handler cache error outcomes too (uncommon but valid).
  • args is not required when precomputed is set (it defaults to [] and the subprocess never runs).
  • Framework-agnostic about the source. precomputed says “a result was supplied in arrange()”; the framework does NOT interpret it as “cache hit” specifically. The debug-UI event metadata is marked precomputed=true, not cached=true.

Observability:

  • task_started and task_completed events still fire (with metadata.precomputed=true) so the flight recorder timeline stays coherent.
  • executor_duration histogram is not observed for precomputed tasks — their duration is artificial and would skew the distribution.
  • drakkar_tasks_precomputed_total increments once per precomputed task; operators can chart it against drakkar_executor_tasks_total{status="completed"} to see the short-circuit rate.

When to use it:

Situation Fit
Previously computed result in Redis / in-memory LRU Direct fit — attach to tasks in arrange()
Deterministic input → deterministic output (no binary needed) Direct fit — compute in Python, wrap in PrecomputedResult
Lookup table / enrichment where subprocess would add nothing Direct fit
Expensive subprocess that’s rarely worth running Direct fit — decide in arrange()
Async I/O you don’t want to do in arrange() Less good — moves cache lookup into arrange’s hot path; consider prefetching in on_ready() instead

on_task_complete

async def on_task_complete(self, result: ExecutorResult) -> CollectResult | None

Called after each task completes successfully (exit code 0). Runs in the context of the same partition that produced the task via arrange(). Multiple on_task_complete() calls from the same window may run concurrently as tasks finish in any order.

Process the ExecutorResult and return a CollectResult with payloads for one or more sinks, or None to skip per-task delivery (for example when you aggregate in on_message_complete instead). See Sinks for all available payload types.

The result.task field carries the original ExecutorTask, including its metadata dict.

async def on_task_complete(self, result: dk.ExecutorResult) -> dk.CollectResult | None:
    meta = result.task.metadata
    matches = result.stdout.strip().splitlines()

    output = SearchResult(
        request_id=meta['request_id'],
        match_count=len(matches),
        duration_seconds=result.duration_seconds,
    )

    summary = SearchSummary(
        request_id=meta['request_id'],
        match_count=len(matches),
    )

    sinks = dk.CollectResult(
        kafka=[dk.KafkaPayload(data=output, key=meta['request_id'].encode())],
        postgres=[dk.PostgresPayload(table='results', data=summary)],
        redis=[dk.RedisPayload(key=f'search:{meta["request_id"]}', data=summary, ttl=3600)],
    )

    # conditional routing based on business logic
    if len(matches) > 20:
        notification = AlertPayload(request_id=meta['request_id'], count=len(matches))
        sinks.http.append(dk.HttpPayload(data=notification))

    return sinks

on_message_complete

async def on_message_complete(self, group: MessageGroup) -> CollectResult | None

Called once per source message, after every task derived from that message has reached a terminal state. Receives a MessageGroup summarising the whole fan-out — use for N-in → 1-out aggregation where one message produces many subprocess tasks but you want a single aggregated record as output.

Offsets are committed after this hook fires — any sink emissions here are guaranteed delivered-or-failed before the consumer offset advances for this message.

async def on_message_complete(self, group: dk.MessageGroup) -> dk.CollectResult | None:
    req: SearchRequest = group.source_message.payload
    if req is None or group.is_empty:
        return None

    total_matches = sum(
        sum(1 for line in r.stdout.split('\n') if line)
        for r in group.results
    )

    aggregate = RequestSummary(
        request_id=req.request_id,
        total_tasks=group.total,
        succeeded=group.succeeded,
        failed=group.failed,
        total_matches=total_matches,
    )

    return dk.CollectResult(
        kafka=[dk.KafkaPayload(data=aggregate, key=req.request_id.encode(), sink='summaries')],
    )

See the Fan-out page for a complete walkthrough, the full MessageGroup schema with properties, error-path semantics, replacement-chain tracing via parent_task_id, and offset commit ordering.

MessageGroup

Passed to on_message_complete. Contains every task scheduled for a single source message, the terminal outcomes, and the wall-clock timing.

Field Type Meaning
source_message SourceMessage the original Kafka input
tasks list[ExecutorTask] full history — includes replaced tasks
results list[ExecutorResult] terminal successes
errors list[ExecutorError] terminal failures (SKIP / retries exhausted)
started_at, finished_at float monotonic timestamps

Convenience properties: succeeded, failed, total, replaced, all_succeeded, any_failed, is_empty, duration_seconds.

Full details and examples on the Fan-out page.

PrecomputedResult

Attached to an ExecutorTask via precomputed= when the handler already knows what the subprocess would have produced. See Precomputed task results above for the full story.

Field Type Default Meaning
stdout str '' stdout the framework would have captured
stderr str '' stderr the framework would have captured
exit_code int 0 non-zero routes through on_error like a real failure
duration_seconds float 0.0 set if you want the UI / recorder to show a non-zero duration (e.g. reflect a cache lookup time)

The cache (self.cache)

When cache.enabled=true in config, every handler instance gains a self.cache attribute — a framework-provided key/value cache, memory-backed and periodically flushed to a per-worker SQLite file. It pairs naturally with PrecomputedResult for the classic memoization pattern:

async def arrange(self, messages, pending):
    tasks = []
    for msg in messages:
        cache_key = f'search|{msg.payload.pattern}'

        # Fast path: synchronous memory peek (zero I/O)
        cached = self.cache.peek(cache_key)
        if cached is None:
            # Slower path: memory miss, check SQLite (local + peer-synced)
            cached = await self.cache.get(cache_key)

        if cached is not None:
            tasks.append(dk.ExecutorTask(
                task_id=dk.make_task_id('search'),
                source_offsets=[msg.offset],
                precomputed=dk.PrecomputedResult(stdout=cached),
            ))
        else:
            tasks.append(dk.ExecutorTask(
                task_id=dk.make_task_id('search'),
                args=[msg.payload.pattern],
                source_offsets=[msg.offset],
            ))
    return tasks

async def on_task_complete(self, result):
    if result.pid is not None:   # skip precomputed — already cached
        cache_key = f'search|{result.task.args[0]}'
        self.cache.set(cache_key, result.stdout, ttl=3600)
    # ... return CollectResult as usual

When cache.enabled=false (the default), self.cache is a no-op stub — peek/get return None, set/delete silently discard. Handler code can call the cache unconditionally.

Full API, cross-worker sync behavior, scope rules (LOCAL / CLUSTER / GLOBAL), and the documented “delete is local-only” sharp edge are covered on the dedicated Cache page.

on_window_complete

async def on_window_complete(
    self,
    results: list[ExecutorResult],
    source_messages: list[SourceMessage],
) -> CollectResult | None

Called after all tasks in a window have finished (successes and failures). The results and source_messages belong to the same partition and the same window that was passed to arrange(). Use for cross-task aggregation or batch-level outputs. Returns a CollectResult or None.

async def on_window_complete(self, results, source_messages):
    succeeded = [r for r in results if r.exit_code == 0]
    if not succeeded:
        return None

    summary = WindowSummary(
        window_size=len(results),
        success_count=len(succeeded),
        avg_duration=sum(r.duration_seconds for r in succeeded) / len(succeeded),
    )
    return dk.CollectResult(
        postgres=[dk.PostgresPayload(table='window_stats', data=summary)],
    )

on_error

async def on_error(
    self,
    task: ExecutorTask,
    error: ExecutorError,
) -> ErrorAction | list[ExecutorTask]

Called when a subprocess task fails (non-zero exit, timeout, or launch error). Runs in the context of the partition that owns the task. Replacement tasks returned here are added to the same window and partition. Return one of:

Return value Behavior
ErrorAction.RETRY Re-run the same task (up to max_retries)
ErrorAction.SKIP Drop the task, continue processing (default)
list[ExecutorTask] Spawn replacement tasks instead

The ExecutorError fields:

Field Type Description
task ExecutorTask The task that failed
exit_code int or None Process exit code; None if never started/timeout
stderr str Process stderr or error description
exception str or None Exception message for timeout/launch failures
pid int or None Process ID; None if never started
async def on_error(self, task, error):
    if error.exit_code == 1 and 'TRANSIENT' in error.stderr:
        return dk.ErrorAction.RETRY
    return dk.ErrorAction.SKIP

on_delivery_error

async def on_delivery_error(self, error: DeliveryError) -> DeliveryAction

Called when a sink’s deliver() raises an exception. See Delivery Lifecycle for the full delivery flow. The DeliveryError contains the sink name, sink type, error message, and the payloads that failed.

Return value Behavior
DeliveryAction.DLQ Write to dead letter queue (default)
DeliveryAction.RETRY Retry delivery (up to max_retries from config)
DeliveryAction.SKIP Drop the payloads, continue
async def on_delivery_error(self, error):
    if error.sink_type in ('http', 'redis'):
        return dk.DeliveryAction.RETRY
    return dk.DeliveryAction.DLQ

on_assign

async def on_assign(self, partitions: list[int]) -> None

Called when new Kafka partitions are assigned to this worker during a rebalance. Use for per-partition initialization (loading state, resetting caches).

async def on_assign(self, partitions):
    for p in partitions:
        self.partition_cache[p] = {}

on_revoke

async def on_revoke(self, partitions: list[int]) -> None

Called when partitions are revoked from this worker during a rebalance. Use for cleanup (flushing buffers, saving state).

async def on_revoke(self, partitions):
    for p in partitions:
        self.partition_cache.pop(p, None)

task_priority

def task_priority(self, task: ExecutorTask) -> Any

Returns a sortable priority key used to order tasks waiting on the executor pool. Smaller keys are scheduled first. Called once per task right before the task waits on the ExecutorPool’s priority gate; retries call it again, but since task.source_offsets does not change between retries the result is identical.

Default:

def task_priority(self, task):
    return min(task.source_offsets) if task.source_offsets else 0

This drains older Kafka messages first, which keeps MessageTracker / OffsetTracker state in front of the watermark small — the slowest task in a fan-out no longer anchors the whole message in memory while later messages pile up behind it.

Why this matters. With a plain FIFO semaphore (the pre-priority behaviour), one slow message in the middle of a window could keep its MessageTracker alive long after later messages had already finished, blocking offset-commit progress and inflating _message_trackers. Priority ordering by oldest-offset turns that into “oldest first → fastest commit watermark progress”.

Override patterns:

class PartitionAwareHandler(BaseDrakkarHandler):
    """Keep partition fairness AND prefer older messages within each partition."""
    def task_priority(self, task):
        offset = min(task.source_offsets)
        # Group every 1000 offsets into one tier so partitions roughly take turns.
        return (offset // 1000, offset)


class TieredHandler(BaseDrakkarHandler):
    """Read a business-priority field stamped on the task by ``arrange()``."""
    def task_priority(self, task):
        # 0 = highest, 9 = lowest. Equal-tier tasks tiebreak by offset.
        tier = task.metadata.get('tier', 5)
        return (tier, min(task.source_offsets))

The return value can be any heapq-comparable object — int, tuple, or a class with __lt__. Equal-priority tasks tiebreak FIFO via the gate’s internal sequence counter.

Tasks with precomputed set bypass the gate entirelytask_priority is not called for them.

Error handling. If task_priority raises, the framework logs a priority_fn_failed warning, ticks drakkar_executor_priority_fn_errors_total, and falls back to the default key. A buggy override never stalls a task.


Offset Commit Logic

Drakkar uses watermark-based offset tracking to guarantee at-least-once delivery. Understanding this is important for designing arrange() and source_offsets.

How it works

Each partition has an OffsetTracker that maintains a sorted list of offsets and their state (PENDING or COMPLETED):

  1. When messages enter arrange(), their offsets are registered as PENDING.
  2. When a task finishes and all its sink payloads are delivered (or routed to DLQ/skipped), the offsets from task.source_offsets are marked COMPLETED.
  3. The framework asks: what is the highest consecutive completed offset starting from the lowest tracked offset? That value + 1 is committed to Kafka.

Example

A window of 5 messages from partition 3 with offsets [100, 101, 102, 103, 104]:

Step 1: All registered as PENDING
  100=PENDING  101=PENDING  102=PENDING  103=PENDING  104=PENDING
  committable() → None (100 is not completed)

Step 2: Offset 104 finishes first (fastest subprocess)
  100=PENDING  101=PENDING  102=PENDING  103=PENDING  104=COMPLETED
  committable() → None (100 is still pending)

Step 3: Offsets 100 and 101 finish
  100=COMPLETED  101=COMPLETED  102=PENDING  103=PENDING  104=COMPLETED
  committable() → 102 (consecutive run: 100, 101 → commit 101+1)

Step 4: Offsets 102 and 103 finish
  100=COMPLETED  101=COMPLETED  102=COMPLETED  103=COMPLETED  104=COMPLETED
  committable() → 105 (all consecutive → commit 104+1)

The key property: a fast task finishing before earlier tasks does not advance the commit position. Drakkar will never commit offset 105 while offset 100 is still in flight. This prevents message loss on worker crash — Kafka will redeliver from the last committed offset.

What this means for your handler

  • source_offsets must be accurate. Every offset in source_offsets blocks the commit watermark until its task completes and all payloads are delivered. If you include an offset you don’t actually process, it will stall commits for the entire partition.

  • Fewer tasks per window = lower commit latency. A window of 100 messages producing 100 tasks won’t commit until the slowest task finishes. If commit latency matters, use a smaller window_size.

  • One task can cover multiple offsets. If your arrange() batches several messages into one task, list all their offsets in source_offsets. They’ll all be marked complete together when the task finishes.

  • Empty arrange = instant commit. If arrange() returns [], all offsets in the window are marked complete immediately and committed.

When commits happen

Commits are attempted at two points:

  1. After each window completes — when the last task in a window finishes and all sink deliveries succeed.
  2. On partition revocation — pending offsets are drained (up to executor.drain_timeout_seconds). Only if drain completes cleanly is the highest committable offset committed before the partition is released. If drain times out, the final commit is skipped — in-flight tasks may still be running, and committing past them would silently skip their messages on reassign. Those messages will replay instead (at-least-once).
  3. On shutdown — same drain-then-commit-if-clean behavior as revocation.

Commits are per-partition and asynchronous. They do not block the processing loop.


Typed Messages

Define Pydantic models for your input and output schemas, then pass them as type parameters to BaseDrakkarHandler:

from pydantic import BaseModel

class SearchRequest(BaseModel):
    request_id: str
    pattern: str
    file_path: str

class SearchResult(BaseModel):
    request_id: str
    match_count: int
    duration_seconds: float

class MyHandler(dk.BaseDrakkarHandler[SearchRequest, SearchResult]):
    async def arrange(self, messages, pending):
        for msg in messages:
            req: SearchRequest = msg.payload  # auto-deserialized
            ...

The framework calls SearchRequest.model_validate_json(msg.value) before arrange(). If parsing fails, msg.payload is None and the raw bytes remain in msg.value.

Access either form in your hooks:

req = msg.payload        # SearchRequest instance (or None on parse error)
raw_bytes = msg.value    # always available

message_label()

def message_label(self, msg: SourceMessage) -> str

Override to provide a human-readable label used in structured log lines and the debug web UI. The default returns partition:offset.

def message_label(self, msg):
    if msg.payload:
        return f'{msg.partition}:{msg.offset} [{msg.payload.request_id[:8]}]'
    return f'{msg.partition}:{msg.offset}'

Task Labels

ExecutorTask.labels is a dict[str, str] of user-defined key-value pairs displayed alongside task details in the debug UI. Set them in arrange():

dk.ExecutorTask(
    task_id=dk.make_task_id('rg'),
    args=[req.pattern, req.file_path],
    metadata={'request_id': req.request_id},
    labels={
        'request_id': req.request_id,
        'pattern': req.pattern,
    },
    source_offsets=[msg.offset],
)

Labels appear on:

  • The live timeline view
  • The task detail page
  • Running and finished task tables
  • The debug trace view

Labels are purely for display. Use metadata for data you need in on_task_complete(), on_message_complete(), or on_error().


Periodic Tasks

Decorate async handler methods with @periodic(seconds=N) to run them on a recurring interval.

from drakkar import periodic

class MyHandler(dk.BaseDrakkarHandler[MyInput, MyOutput]):

    @periodic(seconds=60)
    async def refresh_cache(self):
        self.lookup_table = await fetch_lookup_data()

    @periodic(seconds=30, on_error='stop')
    async def health_check(self):
        if not os.path.isdir('/data/corpus'):
            raise RuntimeError('Corpus directory missing')

Behavior

  • Periodic tasks start after on_ready() completes.
  • They run in the same async event loop as the rest of the worker.
  • Overlapping runs are prevented – the next interval starts only after the current invocation finishes.
  • All periodic tasks are cancelled during shutdown.

Error handling

on_error value Behavior on exception
'continue' (default) Log the error, keep running on schedule
'stop' Log the error, cancel this task permanently

Custom Prometheus Metrics

Declare prometheus_client metrics as class attributes on your handler. The framework auto-discovers them and exposes them on the debug UI metrics page alongside built-in Drakkar metrics.

from prometheus_client import Counter, Gauge, Histogram

class MyHandler(dk.BaseDrakkarHandler[MyInput, MyOutput]):
    items_processed = Counter(
        'app_items_processed_total',
        'Total items processed',
    )
    active_sessions = Gauge(
        'app_active_sessions',
        'Currently active sessions',
    )
    match_count = Histogram(
        'app_match_count',
        'Matches per request',
        buckets=(0, 1, 5, 10, 50, 100, 500),
    )

    async def on_task_complete(self, result):
        self.items_processed.inc()
        self.match_count.observe(len(result.stdout.splitlines()))
        ...

The auto-discovery scans the handler’s class hierarchy (MRO) for attributes that are instances of MetricWrapperBase (Counter, Gauge, Histogram, Summary, Info). Use an app_ prefix to distinguish your metrics from Drakkar’s built-in drakkar_ metrics in dashboards and queries.