Drakkar Data Flow: Complete Processing Story¶
Architecture Overview¶
Drakkar is a Python 3.13+ framework that consumes messages from a Kafka topic, fans them out to per-partition async processors, runs user-defined logic as subprocesses, and delivers results to one or more output sinks (Kafka, PostgreSQL, MongoDB, HTTP, Redis, filesystem). The architecture follows a pipeline model: poll -> partition -> window -> arrange -> execute -> collect -> deliver -> commit. A single Drakkar worker runs one async event loop (asyncio.run) that hosts all partition processors, a shared subprocess executor pool with semaphore-based concurrency control, a sink manager, a dead-letter queue (DLQ), optional Prometheus metrics, and an optional debug flight recorder with a web UI. The user implements a handler class with hooks (arrange, collect, on_error, on_window_complete, on_delivery_error) that define the application-specific logic; everything else – Kafka consumption, offset tracking, backpressure, retries, subprocess lifecycle, sink connections, and graceful shutdown – is managed by the framework.
Configuration is loaded from a YAML file (path via config_path argument or DRAKKAR_CONFIG env var) with environment variable overrides using the DRAKKAR_ prefix and __ as a nesting delimiter (e.g., DRAKKAR_KAFKA__BROKERS). Environment variables are deep-merged on top of YAML values. The root configuration object is DrakkarConfig, a Pydantic BaseSettings model. All config fields referenced below are part of this hierarchy.
Phase 0: Worker Startup¶
0.1 Initialization¶
When DrakkarApp.run() is called, the framework:
- Resolves the worker identity:
- Reads the environment variable named by
worker_name_env(default:'WORKER_ID'). - If empty, falls back to a hex-encoded Pythonid()likedrakkar-7f3a2b. - Resolves the cluster name:
- If
cluster_name_envis set and the corresponding env var is non-empty, uses that. - Otherwise falls back tocluster_name(default:''). - Configures structured logging via structlog using
logging.level(default:'INFO') andlogging.format(default:'json', also accepts'console'). The worker_id, consumer_group, and framework version are bound as context variables to every log line. - Calls the handler’s
on_startup(config)hook, which receives the fullDrakkarConfigand may return a modified copy. This is the only point where the user can mutate configuration before the framework wires everything up.
0.2 Component Construction¶
After on_startup, the framework builds all components in this order:
-
Validates sinks are configured – if
sinks.is_emptyis True (no sinks of any type defined), raisesSinkNotConfiguredErrorimmediately. At least one sink must be configured. -
Creates the ExecutorPool with: -
executor.binary_path(default:None– each task must then provide its ownbinary_path) -executor.max_executors(default:4, min:1) – controls theasyncio.Semaphoresize -executor.task_timeout_seconds(default:120, min:1) – per-subprocess wall-clock timeout -
Starts the Prometheus metrics server on
metrics.port(default:9090) ifmetrics.enabled(default:True). Publishesworker_infowith worker_id, version, and consumer_group labels. -
If
debug.enabled(default:True): - Creates anEventRecorderwith the debug config, worker name, and cluster name. - Sets up a state provider callback so the recorder can periodically snapshot worker state (uptime, partitions, pool utilization, queue depth) everydebug.state_sync_interval_seconds(default:10) seconds. - Starts the recorder (creates/opens the SQLite database, starts flush/retention/state-sync background tasks). - Writes the full worker configuration to theworker_configtable (ifdebug.store_configis True, default:True), enabling autodiscovery by other workers in the same cluster. - Creates and starts aDebugServer(FastAPI) ondebug.port(default:8080), providing a web UI, JSON API, WebSocket event streaming, and database download/merge endpoints. -
Builds all sinks from config by iterating each type’s dict: -
sinks.kafka– createsKafkaSink(name, config, brokers_fallback=kafka.brokers)for each named entry -sinks.postgres– createsPostgresSink(name, config)for each -sinks.mongo– createsMongoSink(name, config)for each -sinks.http– createsHttpSink(name, config)for each -sinks.redis– createsRedisSink(name, config)for each -sinks.filesystem– createsFileSink(name, config)for each Each is registered with theSinkManagerunder a(sink_type, name)key. -
Connects all sinks by calling
connect()on each in registration order. If anyconnect()raises, the worker crashes immediately (fail-fast design). -
Builds and connects the DLQ sink: - Topic:
dlq.topicif non-empty, otherwise'{kafka.source_topic}_dlq'(default source topic:'input-events'). - Brokers:dlq.brokersif non-empty, otherwisekafka.brokers(default:'localhost:9092'). - Connects a dedicated Kafka producer for the DLQ. -
Creates the Kafka consumer (
KafkaConsumerwrappingconfluent_kafka.aio.AIOConsumer) with: -bootstrap.servers:kafka.brokers(default:'localhost:9092') -group.id:kafka.consumer_group(default:'drakkar-workers') -enable.auto.commit:False(offsets are committed manually by the framework) -auto.offset.reset:'earliest'-partition.assignment.strategy:'cooperative-sticky'-max.poll.interval.ms:kafka.max_poll_interval_ms(default:300000= 5 minutes) -session.timeout.ms:kafka.session_timeout_ms(default:45000= 45 seconds) -heartbeat.interval.ms:kafka.heartbeat_interval_ms(default:3000= 3 seconds) - Assign/revoke callbacks are wired to_on_assignand_on_revoke. -
Exposes the PostgreSQL connection pool for the handler: if any postgres sink exists, its
asyncpg.Poolis extracted and passed to the next hook. -
Calls the handler’s
on_ready(config, pg_pool)hook. The user can use this for database migrations, loading lookup tables, initializing caches, or any async setup that requires live connections. -
Discovers and starts periodic tasks by inspecting the handler for methods decorated with
@periodic(seconds=N). For each discovered method:- Creates an
asyncio.Taskthat loops: sleepseconds, thenawait coro_fn(). - Overlap prevention: the next interval begins only after the current invocation finishes (no concurrent runs of the same periodic task).
- If the coroutine raises an exception:
on_error='continue'(default): logs the error and continues looping.on_error='stop': logs the error and exits the task permanently.
- Creates an
-
Subscribes to the Kafka topic and enters the main poll loop.
-
Registers signal handlers for
SIGINTandSIGTERMthat set_running = False, triggering graceful shutdown.
Phase 1: Polling Messages from Kafka¶
The main poll loop runs continuously while _running is True. Each iteration:
1.1 Check Backpressure¶
The framework calculates the total queued count as the sum of queue_size + inflight_count across all active partition processors.
Two watermarks control backpressure:
- High watermark =
executor.max_executors(default:4) xexecutor.backpressure_high_multiplier(default:32) = 128 by default. - Low watermark = max(1,
executor.max_executorsxexecutor.backpressure_low_multiplier(default:4)) = 16 by default.
Pause condition: If not currently paused AND total queued >= high watermark:
- Calls consumer.pause(all_assigned_partition_ids) – Kafka stops delivering messages from all partitions.
- Sets backpressure_active Prometheus gauge to 1.
- Sets _paused = True.
Resume condition: If currently paused AND total queued <= low watermark:
- Calls consumer.resume(all_assigned_partition_ids) – Kafka resumes delivery.
- Sets backpressure_active gauge to 0.
- Sets _paused = False.
The hysteresis between high and low watermarks prevents rapid pause/resume oscillation. Pause and resume always operate on ALL assigned partitions, not individual ones.
1.2 Poll a Batch¶
The consumer calls consume(num_messages=count, timeout=1.0) on the underlying confluent_kafka.aio.AIOConsumer, where count defaults to kafka.max_poll_records (default: 100).
- If messages are returned: each message is wrapped in a
SourceMessageobject containingtopic,partition,offset,key(bytes or None),value(bytes), andtimestamp(milliseconds, Kafka-provided). - If a
PARTITION_EOFerror is received: silently ignored (normal condition when consumer reaches end of partition). - If any other Kafka error occurs: increments the
consumer_errorsPrometheus counter and logs a warning. The message is skipped.
1.3 Dispatch to Partition Processors¶
Each SourceMessage is routed to its corresponding PartitionProcessor by msg.partition:
The enqueue() call is non-blocking (queue.put_nowait()). It also:
- Records a consumed event in the flight recorder (if debug enabled).
- Increments the messages_consumed Prometheus counter (labeled by partition).
- Updates the partition_queue_size gauge.
If no processor exists for the partition (shouldn’t happen under normal operation), the message is silently dropped.
1.4 Idle Backoff¶
If no messages were returned by the poll, the loop sleeps for 50ms (asyncio.sleep(0.05)) to avoid busy-spinning.
Phase 2: Partition Assignment and Revocation¶
2.1 On Assign (New Partitions)¶
When Kafka’s cooperative-sticky rebalancer assigns new partitions to this worker:
- Records an
assignedevent per partition in the flight recorder. - For each newly assigned partition ID (skipping already-known ones):
- Creates a new
PartitionProcessorwith:- The partition ID
- The user’s handler instance
- The shared
ExecutorPool executor.window_size(default:100, min:1) – max messages per windowexecutor.max_retries(default:3, min:0) – retry limit per failed task- Callbacks for sink delivery (
_handle_collect) and offset commit (_handle_commit) - The flight recorder (if debug enabled)
- Starts the processor’s background
asyncio.Taskimmediately.
- Updates the
assigned_partitionsPrometheus gauge. - Calls the handler’s
on_assign(partition_ids)hook asynchronously (fire-and-forget with exception logging).
2.2 On Revoke (Lost Partitions)¶
When partitions are revoked (rebalance, scaling event):
- Records a
revokedevent per partition. - For each revoked partition:
- Removes the processor from
_processors. - Initiates an async stop sequence:- Sets
processor._running = False. - Waits up to
executor.drain_timeout_seconds(default:5, min:1) for in-flight work to complete. - Attempts a final offset commit for any completed work.
- Calls
processor.stop().
- Sets
- Updates the
assigned_partitionsgauge. - Calls the handler’s
on_revoke(partition_ids)hook asynchronously.
Phase 3: Window Collection and Arrangement¶
Each PartitionProcessor runs its own async loop independently. It does not wait for one window to fully complete before starting the next – multiple windows can be in-flight concurrently.
3.1 Collecting a Window¶
The processor waits for messages from its queue:
- Blocking wait for the first message:
await asyncio.wait_for(queue.get(), timeout=1.0). - If timeout (1 second, no messages): returns an empty list. The processor then attempts an offset commit for any previously completed work and loops back. - If a message arrives: becomes the first message in this window. - Non-blocking drain: calls
queue.get_nowait()in a tight loop until: - The queue is empty, OR - The window reachesexecutor.window_size(default:100) messages. - Updates the
partition_queue_sizegauge after collection.
The resulting list of 1 to window_size messages forms a window.
3.2 Processing a Window¶
For each window:
-
Increment the window counter (unique per partition, monotonically increasing).
-
Register all message offsets as PENDING in the
OffsetTracker. The tracker usesbisect.insort()to maintain a sorted list of offsets for efficient watermark calculation. -
Deserialize each message: calls
handler.deserialize_message(msg). If the handler class was declared with a genericInputTtype parameter (e.g.,class MyHandler(BaseDrakkarHandler[MyInput, MyOutput])), the framework automatically callsInputT.model_validate_json(msg.value)and setsmsg.payloadto the parsed Pydantic model. If parsing fails,msg.payloadis set toNone. If noInputTis declared, the raw bytes remain inmsg.value. -
Build PendingContext: creates a snapshot of currently in-flight tasks for this partition: -
pending_tasks: list ofExecutorTaskobjects currently being run -pending_task_ids: set of their task IDs for O(1) deduplication checks This allowsarrange()to avoid creating duplicate tasks for messages that map to already-running work. -
Call
handler.arrange(messages, pending_ctx): the core user hook that maps a window of messages to a list ofExecutorTaskobjects. The arrange hook receives: -messages: the window ofSourceMessageobjects (with.payloadalready parsed) -pending: thePendingContextfor deduplication
The hook returns list[ExecutorTask], where each task has:
- task_id (str): unique identifier (typically from make_task_id())
- args (list[str]): command-line arguments for the subprocess
- metadata (dict): arbitrary data carried through the pipeline
- source_offsets (list[int]): which Kafka offsets this task covers
- binary_path (str | None): optional per-task binary override
- stdin (str | None): optional string written to process stdin
The framework records the arrange_duration in the handler_duration Prometheus histogram and in the flight recorder.
-
If arrange returns an empty list: all message offsets are immediately marked COMPLETED, an offset commit is attempted, and the processor moves to the next window. No tasks are run.
-
If arrange returns tasks: for each task: - Check for duplicate
task_idin pending tasks; log a warning if found. - Add to_pending_tasksdict. - Increment_inflight_count. - Create anasyncio.Taskwrapping_execute_and_track(task, window). - Track the async task handle in_active_tasksfor cleanup.
All tasks within a window are launched concurrently. The processor immediately returns to collecting the next window – it does not wait for these tasks to finish.
Phase 4: Subprocess Execution¶
4.1 Acquiring an Executor Slot¶
The ExecutorPool uses an asyncio.Semaphore(max_executors) to limit concurrent subprocess runs across all partitions.
- The task enters the waiting state:
waiting_countis incremented. async with self._semaphore:– blocks until a slot is available.- On acquiring the semaphore:
waiting_countis decremented,active_countis incremented, and a slot ID (0 to max_executors-1) is popped from the available slots list. - If the flight recorder is enabled, a
task_startedevent is recorded with the currentpool_activeandpool_waitingcounts and the allocated slot number.
4.2 Launching the Subprocess¶
-
Binary resolution: The framework uses
task.binary_pathif set; otherwise falls back toexecutor.binary_pathfrom config. If neither is set, the task fails immediately withexit_code=-1and a descriptive error. -
Process creation via
asyncio.create_subprocess_exec(): - First argument: the resolved binary path - Remaining arguments:task.args(passed as individual arguments, not via shell – prevents shell injection) -stdin:asyncio.subprocess.PIPEiftask.stdin is not None, elseNone(not connected) -stdout:asyncio.subprocess.PIPE(always captured) -stderr:asyncio.subprocess.PIPE(always captured) -
Communication:
proc.communicate(input=stdin_bytes)writes stdin (if any) and waits for the process to exit, capturing all stdout and stderr. -
Timeout enforcement: the entire
communicate()call is wrapped inasyncio.wait_for(timeout=executor.task_timeout_seconds)(default:120seconds).
4.3 Possible Outcomes¶
Outcome A – Success (exit_code == 0):
- An ExecutorResult is created with:
- exit_code: 0
- stdout: process stdout decoded as UTF-8 (with errors='replace' for invalid bytes)
- stderr: process stderr decoded as UTF-8
- duration_seconds: wall-clock time rounded to 3 decimal places (measured with time.monotonic())
- task: the original ExecutorTask (carried through for context)
- pid: the OS process ID
- The result is returned normally to the caller.
Outcome B – Non-zero exit code:
- An ExecutorResult is created as above but with the actual non-zero exit_code.
- An ExecutorTaskError is raised, wrapping both:
- ExecutorError(task, exit_code, stderr, pid=pid) – error context for the handler
- ExecutorResult – full result including stdout/stderr for inspection
Outcome C – Timeout:
- The process is killed (proc.kill() + await proc.wait()) in the finally block.
- A synthetic ExecutorResult is created with exit_code=-1, stdout='', stderr='task timed out'.
- An ExecutorTaskError is raised with ExecutorError(task, stderr='task timed out', exception='Timeout after {N}s', pid=...).
- The executor_timeouts Prometheus counter is incremented.
Outcome D – OS error (binary not found, permission denied, etc.):
- A synthetic ExecutorResult is created with exit_code=-1, stdout='', stderr=str(e).
- An ExecutorTaskError is raised with ExecutorError(task, exception=str(e)) – no exit_code or pid since the process never started.
4.4 Releasing the Executor Slot¶
In the finally block (guaranteed to run):
1. The slot ID is returned to the available slots list (re-sorted to maintain ascending order).
2. active_count is decremented.
3. The semaphore is released, allowing the next waiting task to proceed.
Phase 5: Post-Execution – Success Path¶
When a task succeeds (exit_code == 0):
5.1 Metrics and Recording¶
executor_taskscounter incremented withstatus='completed'.executor_durationhistogram observes theduration_seconds.- Flight recorder records a
task_completedevent with pool utilization stats.
5.2 Collect Hook¶
The framework calls handler.collect(result):
- Receives the full ExecutorResult including stdout, stderr, exit_code, duration, and the original ExecutorTask (with its metadata dict).
- Returns CollectResult: a container with typed payload lists for each sink type:
- kafka: list of KafkaPayload(sink='', key=b'...', data=MyModel(...))
- postgres: list of PostgresPayload(sink='', table='results', data=MyModel(...))
- mongo: list of MongoPayload(sink='', collection='events', data=MyModel(...))
- http: list of HttpPayload(sink='', data=MyModel(...))
- redis: list of RedisPayload(sink='', key='cache:123', data=MyModel(...), ttl=3600)
- files: list of FilePayload(sink='', path='output/results.jsonl', data=MyModel(...))
- Returns None: no sink delivery for this result. The result is still tracked in the window.
The collect duration is recorded in the handler_duration histogram.
5.3 Sink Delivery (via on_collect callback)¶
If collect() returned a non-None CollectResult with has_outputs == True:
-
Validation:
SinkManager.validate_collect(result)checks that every payload’ssinkfield resolves to a configured sink instance: - Ifsinkis empty and exactly one sink of that type exists: resolves to the default. - Ifsinkis empty and multiple sinks of that type exist: raisesAmbiguousSinkError. - Ifsinknames a non-existent instance: raisesSinkNotConfiguredError. These errors crash the worker (fail-fast at validation, not at delivery time). -
Grouping: payloads are grouped by
(sink_type, sink_name)for batched delivery. -
Delivery per sink (see Phase 7 for full delivery details).
The result is appended to the window’s results list.
Phase 6: Post-Execution – Failure Path¶
When a task fails (ExecutorTaskError raised):
6.1 Metrics and Recording¶
executor_taskscounter incremented withstatus='failed'.- If the error contains “Timeout” in the exception message:
executor_timeoutscounter incremented. - Flight recorder records a
task_failedevent with error details.
6.2 on_error Hook¶
The framework calls handler.on_error(task, error):
- task: the failed ExecutorTask
- error: an ExecutorError with:
- exit_code (int | None): the process exit code, or None if it never started / timed out
- stderr (str): process stderr or error description
- exception (str | None): exception message for timeout/launch failures, None for normal non-zero exits
- pid (int | None): process ID, None if never started
The hook must return one of three types of responses:
Response A – ErrorAction.RETRY (retry the same task):
- If retry_count < executor.max_retries (default: 3, meaning up to 3 retries = 4 total attempts):
- task_retries Prometheus counter incremented.
- A new asyncio.Task is created wrapping _execute_and_track(task, window, retry_count + 1).
- The function returns immediately without decrementing _inflight_count or incrementing window.completed_count – the retry reuses the same slot.
- There is no sleep or backoff between retries; the task is immediately re-submitted to the executor pool.
- If retry_count >= executor.max_retries (retries exhausted):
- Logs a max_retries_exceeded warning with the task_id and retry count.
- The failed result is appended to the window’s results list.
- Falls through to the finally block (task counted as completed).
Response B – ErrorAction.SKIP (skip and continue):
- The default behavior from BaseDrakkarHandler.on_error().
- The failed result is appended to the window’s results list.
- Falls through to the finally block.
Response C – list[ExecutorTask] (replacement tasks):
- The handler returns new tasks to run instead of retrying the original.
- For each new task:
- Added to _pending_tasks.
- window.tasks list extended.
- window.total_tasks incremented (the window grows dynamically).
- _inflight_count incremented.
- A new asyncio.Task is created.
- The original task is counted as completed in the finally block.
- This allows patterns like: split a large failed task into smaller ones, or substitute a fallback binary.
6.3 Unexpected Exceptions¶
If any non-ExecutorTaskError exception occurs during task processing or hook calls:
- A synthetic ExecutorResult is created with exit_code=-1, stderr=str(e), duration_seconds=0.
- Appended to the window’s results list.
- The error is logged with full traceback.
- The task is counted as completed so the window can still progress.
6.4 Finally Block (All Paths)¶
After every task run (success, failure, or unexpected error), except when returning early for a retry:
- Remove the task from
_pending_tasks. If not found, log a warning (indicates a potential race or duplicate cleanup). - Decrement
_inflight_count. - Update
executor_pool_activegauge. - Increment
window.completed_count.
Phase 7: Window Completion and Sink Delivery¶
7.1 Window Completion Check¶
After each task’s finally block, the framework checks window.is_complete:
Note: total_tasks can grow dynamically if on_error returns replacement tasks, so the window only completes when ALL tasks (including dynamically added ones) have finished.
7.2 on_window_complete Hook¶
When the window is complete:
batch_durationhistogram observes the total window duration (from creation to last task completion).- Calls
handler.on_window_complete(results, source_messages): -results: allExecutorResultobjects from this window (successes and failures). -source_messages: the original messages that triggered this window. - ReturnsCollectResult: additional sink payloads (e.g., aggregated summaries). - ReturnsNone: no additional delivery.
If a CollectResult is returned, it goes through the same sink delivery pipeline as collect() results.
7.3 Mark Offsets Complete¶
All source message offsets in the window are marked as COMPLETED in the OffsetTracker. The offset_lag gauge is updated with the remaining pending offset count.
7.4 Sink Delivery Details¶
The SinkManager.deliver_all() method handles delivery to all sinks:
Grouping: payloads from the CollectResult are grouped by (sink_type, sink_name). The mapping from CollectResult fields to sink types is:
- result.kafka -> 'kafka' sinks
- result.postgres -> 'postgres' sinks
- result.mongo -> 'mongo' sinks
- result.http -> 'http' sinks
- result.redis -> 'redis' sinks
- result.files -> 'filesystem' sinks
Per-sink delivery (for each group):
A retry loop with attempt starting at 0:
- Call
sink.deliver(payloads). The serialization is sink-specific:
- KafkaSink: each payload’s
datais serialized viamodel_dump_json().encode()as the Kafka message value;keyis passed through. All payloads are produced in one batch, then the producer is flushed. The method raisesRuntimeErrorif flush is incomplete, any future is None, or any result contains an error. - PostgresSink: acquires a single connection from the
asyncpg.Pool(pool_min default:2, pool_max default:10). For each payload,datais serialized viamodel_dump()to a column->value dict, and anINSERT INTO {table} ({columns}) VALUES ($1, $2, ...)query is run. Table and column names are validated against SQL injection via a^[a-zA-Z_][a-zA-Z0-9_]*$regex. - MongoSink: for each payload,
datais serialized viamodel_dump()to a document dict, thencollection.insert_one(document)is called. - HttpSink: for each payload individually,
datais serialized viamodel_dump_json()as the request body. An HTTP request is sent using the configuredmethod(default:'POST') to the configuredurlwithContent-Type: application/jsonplus any customheaders. Timeout istimeout_seconds(default:30). Non-2xx responses raisehttpx.HTTPStatusErrorviaraise_for_status(). - RedisSink: for each payload,
datais serialized viamodel_dump_json()as the string value. The full key is{config.key_prefix}{payload.key}. Ifpayload.ttlis set,SET key value EX ttl; otherwise,SET key value(no expiry). - FileSink: for each payload,
datais serialized viamodel_dump_json() + '\n'(JSONL format). The file atpayload.pathis opened in append mode. Parent directory must exist.
-
On success: - SinkManager stats updated:
delivered_count += 1,delivered_payloads += len(payloads), timestamps recorded. - Flight recorder records asink_deliveredevent. -sink_payloads_deliveredandsink_deliver_durationPrometheus metrics updated. - Break from the retry loop. -
On failure (exception from
deliver()): - Stats updated:error_count += 1,last_errorset. -sink_deliver_errorsPrometheus counter incremented. - Flight recorder records asink_errorevent. - ADeliveryErroris created withsink_name,sink_type,errormessage, and the failedpayloads. - The handler’son_delivery_error(error)hook is called.
on_delivery_error returns one of three actions:
-
DeliveryAction.RETRY(andattempt < max_retries, default max_retries:3):sink_delivery_retriescounter incremented.- Logs a warning.
- Continues the retry loop (immediate retry, no backoff).
-
DeliveryAction.SKIP:sink_deliveries_skippedcounter incremented.- Logs a warning.
- Breaks from the retry loop. Payloads are dropped.
-
DeliveryAction.DLQ(default fromBaseDrakkarHandler.on_delivery_error()):- The framework calls
dlq_sink.send(error, partition_id). - The DLQ sink serializes the failed payloads into a JSON envelope:
Each payload is serialized via
{ "original_payloads": ["<json_string_1>", "<json_string_2>"], "sink_name": "results", "sink_type": "kafka", "error": "Connection refused", "timestamp": 1743580800.123, "partition": 5, "attempt_count": 1 }model_dump_json(); if that fails (e.g., non-serializable model), falls back tostr(). - The envelope is produced to the DLQ Kafka topic.
sink_dlq_messagesPrometheus counter incremented.- Breaks from the retry loop.
- The framework calls
-
Retries exhausted (RETRY action but
attempt >= max_retries):- Falls through to the DLQ/break path. Logs a warning.
- Breaks from the retry loop.
After successful delivery, the framework also records produced events in the flight recorder for Kafka payloads (counting output messages for observability).
Phase 8: Offset Commit¶
8.1 Watermark Calculation¶
The OffsetTracker maintains a sorted list of registered offsets, each in state PENDING or COMPLETED. The committable() method returns the highest consecutive completed offset + 1 from the beginning of the sorted list:
- Example: offsets [10, 11, 12, 13, 14] with states [COMPLETED, COMPLETED, COMPLETED, PENDING, COMPLETED]
- Committable = 13 (offsets 10, 11, 12 are consecutive and completed; 13 is pending, blocking 14)
- Example: offsets [10, 11, 12] all COMPLETED
- Committable = 13 (all completed, commit up to 13)
- Example: offset [10] PENDING
- Committable = None (nothing to commit)
This watermark design ensures that Kafka offsets are only committed when ALL preceding messages have been fully processed and their results delivered to sinks.
8.2 Commit Triggers¶
Offset commits are attempted at these points:
- After every window completes (all tasks in window finished).
- On idle iterations (no messages received for 1 second) – catches any lagging commits.
- When arrange returns no tasks – offsets are immediately committable.
- During shutdown – final commit for each partition.
- During partition revocation – commit before releasing the partition.
8.3 Commit Flow¶
When committable() returns a non-None offset:
- The framework calls
consumer.commit({partition_id: offset}). - The consumer creates aTopicPartition(topic, partition, offset)and callscommit(asynchronous=False). - Theoffsets_committedPrometheus counter is incremented (labeled by partition). - If the commit succeeds:
- The flight recorder records a
committedevent. -offset_tracker.acknowledge_commit(committed_offset)removes all offsets below the committed value from tracking, bounding memory usage. - If the commit fails (exception): - A warning is logged with the partition, offset, and error. - The function returns early without acknowledging – the next commit attempt will retry with the same or a higher offset.
Phase 9: Graceful Shutdown¶
When _running is set to False (via SIGINT, SIGTERM, or programmatic shutdown):
Step 1: Cancel Periodic Tasks¶
- All periodic task
asyncio.Taskobjects are cancelled. asyncio.gather(*tasks, return_exceptions=True)waits for them to finish.- The list is cleared.
Step 2: Signal Partition Processors to Stop¶
processor._running = Falsefor all processors. This causes each processor’s main loop to exit after its current window collection.
Step 3: Drain All Processors¶
- Waits up to
executor.drain_timeout_seconds(default:5) for all processors with: - Non-empty queues, OR
- Pending offset commits, OR
- In-flight tasks
to finish their work. - Each processor drains by: processing remaining queued messages into windows, waiting for in-flight tasks to complete, then doing a final commit. - If the timeout expires, logs a warning but continues shutdown.
Step 4: Final Offset Commits¶
- For each processor, checks
committable()and commits any remaining offsets. - If a commit fails, logs a warning. These offsets will be re-processed on next startup (at-least-once semantics).
Step 5: Stop All Processors¶
- Calls
processor.stop()on each, which: - Waits up to 10 seconds for the processor’s async task to exit naturally.
- If it doesn’t exit in 10 seconds, force-cancels the task.
- Clears the processors dict.
Step 6: Stop the Flight Recorder¶
- If debug is enabled: flushes any buffered events to SQLite, cancels background tasks (flush, retention, state sync), removes the
-live.dbsymlink.
Step 7: Stop the Debug Server¶
- If debug is enabled: stops the FastAPI server.
Step 8: Close All Sinks and DLQ¶
sink_manager.close_all(): callsclose()on each sink. Exceptions are caught and logged as warnings (never raised during shutdown).- KafkaSink: closes the Kafka producer, sets to None.
- PostgresSink: closes the asyncpg pool, sets to None.
- MongoSink: closes the motor client, sets to None.
- HttpSink: calls
client.aclose(), sets to None. - RedisSink: calls
client.aclose(), sets to None. - FileSink: no-op (no persistent connection).
- DLQ sink closed separately (same pattern).
Step 9: Close the Kafka Consumer¶
- Closes the confluent_kafka consumer, which triggers a final leave-group request to the Kafka broker.
Concurrency Model Summary¶
- One async event loop per worker process.
- One poll loop fetches messages and dispatches to partition processors.
- One processing loop per partition collects windows and launches tasks.
- Multiple windows per partition can be in-flight concurrently – the processor does not wait for window N to complete before starting window N+1.
- All partitions share a single ExecutorPool with a semaphore of size
executor.max_executors. Tasks from any partition compete for the same slots. - Backpressure operates globally – pause/resume applies to all partitions simultaneously based on the total queued count across all processors.
- Periodic tasks run as independent async tasks in the same event loop.
- Sink delivery happens inline within the task’s async context (not batched across partitions).
Delivery Guarantees¶
- At-least-once processing: offsets are committed only after all tasks in a window complete AND their results are delivered to sinks (or handled by on_delivery_error). If the worker crashes before committing, messages will be re-consumed on restart.
- No exactly-once: the framework does not use Kafka transactions. In failure scenarios (crash between sink delivery and offset commit), messages may be processed and delivered more than once.
- Ordering within a partition: messages are processed in offset order within windows. However, tasks within a window run concurrently, so per-message ordering is not guaranteed within a window.
- Cross-partition ordering: no ordering guarantees between partitions.
Configuration Reference¶
kafka – Kafka Consumer Settings¶
| Field | Type | Default | Description |
|---|---|---|---|
brokers |
str | 'localhost:9092' |
Kafka bootstrap servers |
source_topic |
str | 'input-events' |
Topic to consume from |
consumer_group |
str | 'drakkar-workers' |
Consumer group ID |
max_poll_records |
int | 100 |
Max messages per poll batch |
max_poll_interval_ms |
int | 300000 |
Max time between polls before Kafka considers consumer dead |
session_timeout_ms |
int | 45000 |
Session timeout for group membership |
heartbeat_interval_ms |
int | 3000 |
Heartbeat interval to the broker |
executor – Subprocess Executor Pool¶
| Field | Type | Default | Min | Description |
|---|---|---|---|---|
binary_path |
str or None | None |
1 char | Default subprocess binary; None requires per-task override |
max_executors |
int | 4 |
1 | Concurrent subprocess limit (semaphore size) |
task_timeout_seconds |
int | 120 |
1 | Per-subprocess wall-clock timeout |
window_size |
int | 100 |
1 | Max messages per arrange() window |
max_retries |
int | 3 |
0 | Max retries per failed task (0 = no retries) |
drain_timeout_seconds |
int | 5 |
1 | Max wait for in-flight tasks during shutdown |
backpressure_high_multiplier |
int | 32 |
1 | Pause threshold = max_executors x this |
backpressure_low_multiplier |
int | 4 |
1 | Resume threshold = max(1, max_executors x this) |
sinks – Output Sink Instances¶
Each sink type is a dict mapping instance names to their config:
sinks.kafka.<name>:
| Field | Type | Default |
|---|---|---|
topic |
str | (required) |
brokers |
str | '' (inherits kafka.brokers) |
ui_url |
str | '' |
sinks.postgres.<name>:
| Field | Type | Default | Min |
|---|---|---|---|
dsn |
str | (required) | |
pool_min |
int | 2 |
1 |
pool_max |
int | 10 |
1 |
ui_url |
str | '' |
sinks.mongo.<name>:
| Field | Type | Default |
|---|---|---|
uri |
str | (required) |
database |
str | (required) |
ui_url |
str | '' |
sinks.http.<name>:
| Field | Type | Default | Min |
|---|---|---|---|
url |
str | (required) | |
method |
str | 'POST' |
|
timeout_seconds |
int | 30 |
1 |
headers |
dict | {} |
|
max_retries |
int | 3 |
0 |
ui_url |
str | '' |
sinks.redis.<name>:
| Field | Type | Default |
|---|---|---|
url |
str | 'redis://localhost:6379/0' |
key_prefix |
str | '' |
ui_url |
str | '' |
sinks.filesystem.<name>:
| Field | Type | Default |
|---|---|---|
base_path |
str | '' |
ui_url |
str | '' |
dlq – Dead Letter Queue¶
| Field | Type | Default | Description |
|---|---|---|---|
topic |
str | '' |
DLQ topic; empty = '{source_topic}_dlq' |
brokers |
str | '' |
DLQ brokers; empty = inherits kafka.brokers |
metrics – Prometheus¶
| Field | Type | Default |
|---|---|---|
enabled |
bool | True |
port |
int | 9090 |
logging – Structured Logging¶
| Field | Type | Default |
|---|---|---|
level |
str | 'INFO' |
format |
str | 'json' (also: 'console') |
debug – Flight Recorder and Web UI¶
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool | True |
Enable/disable entire debug feature |
port |
int | 8080 |
Debug web UI port |
debug_url |
str | '' |
External URL for the debug UI |
db_dir |
str | '/tmp' |
SQLite database directory; '' = no disk persistence |
store_events |
bool | True |
Write processing events to events table |
store_config |
bool | True |
Write worker config (enables autodiscovery) |
store_state |
bool | True |
Periodic state snapshots |
state_sync_interval_seconds |
int | 10 |
Seconds between state snapshots |
rotation_interval_minutes |
int | 60 |
When to roll over DB files |
retention_hours |
int | 24 |
Delete DBs older than this |
retention_max_events |
int | 100000 |
Max total events across DB files |
store_output |
bool | True |
Include stdout/stderr in event records |
flush_interval_seconds |
int | 5 |
Buffer flush interval |
max_buffer |
int | 50000 |
In-memory event buffer size |
max_ui_rows |
int | 5000 |
Max rows returned to UI queries |
log_min_duration_ms |
int | 500 |
Min duration to log slow tasks |
ws_min_duration_ms |
int | 500 |
Min duration to broadcast via WebSocket |
event_min_duration_ms |
int | 0 |
Min duration to persist to DB |
output_min_duration_ms |
int | 500 |
Min duration to include stdout/stderr |
prometheus_url |
str | '' |
Prometheus server URL for dashboard links |
prometheus_rate_interval |
str | '5m' |
Rate interval for Prometheus queries |
prometheus_worker_label |
str | '' |
Worker label name in Prometheus |
prometheus_cluster_label |
str | '' |
Cluster label name in Prometheus |
custom_links |
list[dict] | [] |
Custom links for debug UI |
Root-Level Settings¶
| Field | Type | Default | Description |
|---|---|---|---|
worker_name_env |
str | 'WORKER_ID' |
Env var holding worker name |
cluster_name |
str | '' |
Logical cluster name |
cluster_name_env |
str | '' |
Env var overriding cluster_name |