Observability¶
Drakkar provides four integrated observability layers: Prometheus metrics for alerting and dashboards, structured logging for event streams, a browser-based debug UI for real-time inspection, and a SQLite flight recorder for post-mortem analysis. All four are enabled by default and work together – the debug UI reads from the flight recorder and links out to Prometheus graphs. See Configuration for the full YAML reference.
Prometheus Metrics¶
Metrics are exposed via prometheus_client and served on a dedicated HTTP endpoint. The server starts automatically when metrics.enabled is True (the default).
Metrics Reference¶
Worker Identity¶
| Metric | Type | Labels | Description |
|---|---|---|---|
drakkar_worker |
Info | worker_id, version, consumer_group |
Worker instance identity, published once at startup |
Messages¶
| Metric | Type | Labels | Description |
|---|---|---|---|
drakkar_messages_consumed_total |
Counter | partition |
Total messages consumed from the source Kafka topic |
Executor¶
| Metric | Type | Labels | Description |
|---|---|---|---|
drakkar_executor_tasks_total |
Counter | status (started, completed, failed) |
Total executor tasks by outcome |
drakkar_executor_duration_seconds |
Histogram | – | Task execution duration. Buckets: 0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 300 |
drakkar_executor_pool_active |
Gauge | – | Number of tasks currently running in the executor pool |
drakkar_executor_timeouts_total |
Counter | – | Total tasks that exceeded task_timeout_seconds and were killed |
drakkar_task_retries_total |
Counter | – | Total tasks retried after failure (via on_error returning RETRY) |
Batches¶
| Metric | Type | Labels | Description |
|---|---|---|---|
drakkar_batch_duration_seconds |
Histogram | – | Window/batch processing duration (arrange through last task completion). Buckets: 0.5, 1, 2, 5, 10, 30, 60, 120, 300, 600 |
Partitions¶
| Metric | Type | Labels | Description |
|---|---|---|---|
drakkar_partition_queue_size |
Gauge | partition |
Messages waiting in the partition’s internal queue |
drakkar_offset_lag |
Gauge | partition |
Pending (uncommitted) offsets per partition |
drakkar_backpressure_active |
Gauge | – | Whether the consumer is paused due to backpressure. 1 = paused, 0 = flowing |
drakkar_total_queued |
Gauge | – | Total messages buffered in all partition queues plus in-flight tasks |
drakkar_assigned_partitions |
Gauge | – | Number of partitions currently assigned to this worker |
drakkar_executor_idle_slot_seconds_total |
Counter | – | Accumulated slot-seconds of executor idle time while messages are waiting in queues. If 2 slots sit idle for 3 seconds while messages are queued, this increments by 6. Only counts when messages are in queues (not yet dispatched), not when the worker has nothing to do. |
drakkar_consumer_idle_seconds_total |
Counter | – | Accumulated seconds with no messages available from Kafka and nothing in queues. Measures time the worker has genuinely nothing to do (consumer lag is zero). Does not count time when the consumer is paused due to backpressure. |
Consumer¶
| Metric | Type | Labels | Description |
|---|---|---|---|
drakkar_consumer_errors_total |
Counter | – | Total Kafka consumer poll errors |
Offsets¶
| Metric | Type | Labels | Description |
|---|---|---|---|
drakkar_offsets_committed_total |
Counter | partition |
Total offset commit operations |
Rebalancing¶
| Metric | Type | Labels | Description |
|---|---|---|---|
drakkar_rebalance_events_total |
Counter | type (assign, revoke) |
Total Kafka rebalance events |
Sinks¶
| Metric | Type | Labels | Description |
|---|---|---|---|
drakkar_sink_deliver_duration_seconds |
Histogram | sink_type, sink_name |
Duration of sink delivery operations. Buckets: 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5 |
drakkar_sink_deliver_errors_total |
Counter | sink_type, sink_name |
Total sink delivery failures |
drakkar_sink_payloads_delivered_total |
Counter | sink_type, sink_name |
Total payloads delivered to sinks |
drakkar_sink_delivery_retries_total |
Counter | sink_type, sink_name |
Total sink delivery retry attempts |
drakkar_sink_deliveries_skipped_total |
Counter | sink_type, sink_name |
Deliveries skipped via on_delivery_error returning SKIP |
drakkar_tasks_precomputed_total |
Counter | – | Tasks whose result was supplied by the handler via ExecutorTask.precomputed, bypassing the subprocess. Framework is agnostic to the reason (cache hit, lookup, deterministic shortcut). Compare to drakkar_executor_tasks_total{status="completed"} for the short-circuit rate. |
drakkar_sink_dlq_messages_total |
Counter | – | Total messages sent to the dead letter queue |
drakkar_dlq_send_failures_total |
Counter | – | Total failed attempts to send messages to the DLQ. When both the primary sink and DLQ fail, the payload is lost — alert on this counter. |
drakkar_sink_circuit_open |
Gauge | sink_type, sink_name |
Per-sink circuit breaker state: 0.0 closed, 0.5 half-open, 1.0 open. Gauges are zero-initialized at sink registration so a never-tripped sink still appears in scrape output. Sustained 1.0 on a sink means its downstream has been down longer than the cooldown can recover from. |
drakkar_sink_circuit_trips_total |
Counter | sink_type, sink_name |
Transitions into the open state per sink — both the initial failure-threshold trip and every half-open probe failure. A flapping circuit surfaces as a rising rate on this counter, not a single trip plus silent reopens. Alert on rate(...[5m]) > 0 paired with non-zero drakkar_sink_circuit_open. |
Handler Hooks¶
| Metric | Type | Labels | Description |
|---|---|---|---|
drakkar_handler_duration_seconds |
Histogram | hook (arrange, on_task_complete, on_message_complete, on_error, on_window_complete, etc.) |
Duration of user handler hook execution. Buckets: 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 30 |
Recorder¶
Emitted only when debug.enabled=true. Track flight-recorder health – buffer depth, drops under load, and flush latency. Alert on sustained non-zero dropped_events_total or p99 flush_duration_seconds exceeding flush_interval_seconds.
| Metric | Type | Labels | Description |
|---|---|---|---|
drakkar_recorder_buffer_size |
Gauge | – | Current depth of the in-memory event buffer. Updated on every _record() call and after each flush. A sustained value near max_buffer signals the flush loop can’t keep up with incoming events – raise flush_interval_seconds or max_buffer, or investigate disk latency. |
drakkar_recorder_dropped_events_total |
Counter | – | Events evicted from the ring buffer because it was full at record time. Non-zero means event history has gaps; scale the buffer up or speed up flushes. This was previously a silent failure mode – the counter makes it alertable. |
drakkar_recorder_flush_duration_seconds |
Histogram | – | Duration of the full flush body (executemany + commit). Exposes disk-I/O latency tail. Buckets: 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1. Alert on p99 exceeding flush_interval_seconds – flushes overlapping the next scheduled flush starve the buffer. |
drakkar_recorder_flush_retries_total |
Counter | – | Flush attempts that failed with aiosqlite.OperationalError and re-queued their batch at the front of the buffer. A non-zero rate(...[5m]) signals a transient DB-health issue (WAL lock, disk pressure); if it climbs without stabilising, the recorder is about to start dropping batches. Pair with drakkar_recorder_flush_batches_dropped_total to distinguish “retrying” from “giving up”. |
drakkar_recorder_flush_batches_dropped_total |
Counter | – | Batches discarded after max_flush_retries consecutive failures. This is silent recorder data loss — alert on any non-zero rate. Investigate disk space, filesystem health, and WAL contention. |
Cache¶
Emitted only when cache.enabled=true. Memory gauges are maintained as running sums — Prometheus scrape reads a single int, never walks the in-memory dict. DB gauges are refreshed by the cache.cleanup loop (default every 60s).
| Metric | Type | Labels | Description |
|---|---|---|---|
drakkar_cache_hits_total |
Counter | source (memory, db) |
Cache reads served from memory or from the DB fallback. peek() is not counted — only get() increments this. |
drakkar_cache_misses_total |
Counter | – | Cache reads that returned None (key absent or expired in both memory and DB). |
drakkar_cache_writes_total |
Counter | scope (local, cluster, global) |
Total Cache.set() calls, labelled by the entry’s scope. |
drakkar_cache_deletes_total |
Counter | – | Total Cache.delete() calls regardless of whether the key was present. Note: delete is local-only — see sharp edge. |
drakkar_cache_evictions_total |
Counter | – | Entries popped from the in-memory dict due to the LRU cap (max_memory_entries). Sustained high rate signals cap undersizing. |
drakkar_cache_flush_entries_total |
Counter | op (set, delete) |
Entries drained from the dirty map to SQLite per flush cycle, by op type. Measures flush throughput, not rows actually modified (LWW may reject a SET). |
drakkar_cache_cleanup_removed_total |
Counter | – | Rows removed from the SQLite DB by the cleanup loop (entries whose TTL elapsed). A sudden spike means many entries expiring together. |
drakkar_cache_sync_entries_fetched_total |
Counter | peer |
Rows pulled from a peer worker’s cache DB by the sync loop, per peer. |
drakkar_cache_sync_entries_upserted_total |
Counter | peer |
Rows the sync loop attempted to UPSERT into the local DB. Equals or less than sync_entries_fetched_total when LWW rejects some. |
drakkar_cache_sync_errors_total |
Counter | peer |
Per-peer failures during the sync cycle (connection refused, corrupt DB, timeout). One increment per failed cycle; sync loop keeps running. |
drakkar_cache_peer_sync_timeouts_total |
Counter | – | Worker-level count of peer-sync cycles that exceeded cache.peer_sync.cycle_deadline_seconds (or the derived interval_seconds * 0.9 default). One tick per cycle that was cut short by the deadline. A sustained non-zero rate signals a slow-peer or peer-count-vs-interval imbalance — either tune the deadline up, stagger peers, or reduce the number of peers pulled per cycle. |
drakkar_cache_entries_in_memory |
Gauge | – | Entries currently in the in-memory dict. Running sum, adjusted per set/delete/evict/cleanup. |
drakkar_cache_bytes_in_memory |
Gauge | – | Sum of size_bytes across in-memory entries. Running sum (see above). |
drakkar_cache_entries_in_db |
Gauge | – | Rows in the local <worker>-cache.db. Refreshed by the cleanup loop. |
drakkar_cache_bytes_in_db |
Gauge | – | Sum of size_bytes across DB rows. Refreshed by the cleanup loop. |
Cache flush/sync/cleanup loop durations are captured by the existing
periodic_task_duration{name="cache.flush|cache.sync|cache.cleanup"}
histogram — no dedicated cache-only timing histograms.
Webapp¶
Emitted only when webapp.enabled=true. Track per-client request volume, latency, capacity headroom, and drop rates. Status labels are drawn from a closed set documented in Webapp → Status codes: ok | timeout | error | rate_limited | auth_failed | shutdown | not_ready | capacity. The client label is bounded by the configured client list plus the fixed unauthenticated sentinel for auth-failed / pre-auth gate hits, so cardinality stays small.
| Metric | Type | Labels | Description |
|---|---|---|---|
drakkar_webapp_requests_total |
Counter | client, status |
One increment per HTTP request, labelled by the matched client name and the terminal outcome. Replaces a separate webapp_rate_limited_total – query drakkar_webapp_requests_total{status='rate_limited'} instead. |
drakkar_webapp_request_duration_seconds |
Histogram | client, status |
Server-side wall-clock duration of HTTP requests. Buckets: 0.005, 0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10, 30 – covers sub-second work and the default 30s request_timeout_seconds budget. The duration starts at route entry and ends at response emission, so it includes auth + rate-limit + body parsing as well as the runner. |
drakkar_webapp_inflight |
Gauge | – | Number of HTTP requests currently being processed by the runner on T1. Incremented on runner entry, decremented in finally. Alert on drakkar_webapp_inflight > webapp.max_concurrent to spot semaphore-leak bugs – a permit accidentally held past response would surface here. |
drakkar_webapp_dropped_after_timeout_total |
Counter | client |
Requests whose pipeline result was dropped on T1 after T2 had already returned a 504 to the caller. Incremented at each cancellation gate (post-execute and pre-on_http_request_complete). A high rate signals either a too-tight request_timeout_seconds or a slow subprocess pipeline – correlate with drakkar_executor_duration_seconds. |
drakkar_webapp_rpm_limit |
Gauge | client |
Configured rpm cap per client. Informational gauge set once at webapp startup and never updated thereafter (the cap is a config field; operators edit YAML and restart). Read alongside drakkar_webapp_requests_total{status='rate_limited'} to confirm deployed limits match the workload’s expectations. |
Shutdown¶
| Metric | Type | Labels | Description |
|---|---|---|---|
drakkar_uncommitted_offsets_at_stop |
Gauge | – | Snapshot at _shutdown start: count of Kafka offsets that were registered in-flight but not yet committed when shutdown began. Summed across all assigned partitions via each OffsetTracker.pending_count. Always set (even to 0) so the gauge reflects the most recent shutdown rather than a stale prior value. |
drakkar_inflight_at_stop |
Gauge | – | Snapshot at _shutdown start: number of in-flight executor subprocesses (ExecutorPool.active_count) running user code when shutdown began. Always set, including to 0. |
drakkar_drain_timeout_hit_total |
Counter | – | Incremented each time _drain_all_processors exceeded executor.drain_timeout_seconds before all partition processors finished draining. A nonzero rate signals workers being killed mid-flight — either the timeout is too tight or handlers are stuck. |
drakkar_suspected_oom_kills_total |
Counter | – | Incremented at startup when the previous run left a watchdog file at {debug.db_dir}/{worker_id}.watchdog whose body lacked the CLEAN_EXIT marker. That signature means the prior process was killed before reaching the normal shutdown path — typically OOM-killer SIGKILL, kubelet pod-pressure eviction, or kernel panic. See OOM / SIGKILL detection below. |
OOM / SIGKILL detection¶
Drakkar writes a small watchdog file at {debug.db_dir}/{worker_id}.watchdog
during startup and removes it as the last step of a clean shutdown. The
file’s presence and contents at the next startup let the framework
distinguish three termination modes:
| Watchdog state at startup | Previous run | Action |
|---|---|---|
| File absent | First run, or prior shutdown completed cleanly and unlinked the file. | No log, no metric. |
File present, body == CLEAN_EXIT |
Prior shutdown wrote the marker but a process death between the write and the unlink left the file behind. Still a clean exit. | No log, no metric. |
| File present, body empty (or anything else) | Prior process was killed before _shutdown reached mark_clean — almost always SIGKILL from the OOM-killer, a kubelet pod-pressure eviction, or a kernel panic. |
Structured warning previous_run_ended_unexpectedly (category watchdog) with worker_id and watchdog_path fields, plus drakkar_suspected_oom_kills_total increment. |
The watchdog file is created in AppLifecycle._async_run once the
worker is committed to running (the actual write() happens just before
subscribe() returns and the poll loop starts) and deleted as soon as
the drain phase has been accounted for in _shutdown. A
drain_timeout is treated as a clean (if slow) shutdown for the
watchdog’s purposes — its dedicated counter
drakkar_drain_timeout_hit_total already covers that case. Conflating
“drain was slow” with “process was SIGKILLed” would muddle dashboards,
so the OOM counter drakkar_suspected_oom_kills_total is reserved for
the genuinely-empty-body signature: process killed before reaching
mark_clean.
The file lives in debug.db_dir (default /tmp), the same directory
used by the recorder and cache engine. If debug.db_dir is empty —
fully disk-less deployment — the watchdog is disabled for that run
(no file written, no metric increment), since the alternative of
falling back to the worker’s CWD risks both breaking the “no on-disk
state” promise and writing into a read-only container volume. The
disable is logged at startup as watchdog_disabled_no_db_dir so
operators can see the OOM signal is off.
If the watchdog write itself fails (read-only mount, permission denied,
no space), the worker logs a structured warning
watchdog_write_failed with path and error, then disables the
watchdog for the rest of the run and continues startup. The watchdog
is observability-only — losing the OOM signal is preferable to
crashing the worker over a misconfigured volume mount.
Operator interpretation: any nonzero rate on
drakkar_suspected_oom_kills_total should be correlated with
container-runtime pod-restart events. A spike usually maps to one of:
container memory limit too low, a memory leak in handler code, a
sibling pod evicting this worker, or a node-level OOM. Pair the metric
with drakkar_uncommitted_offsets_at_stop and
drakkar_inflight_at_stop from the previous run (Prometheus retains
the last-set value across restarts) to gauge how much in-flight work
was lost.
User-Defined Metrics¶
You can declare Prometheus metrics as class attributes on your handler – see Custom Prometheus Metrics for the handler-side setup. The framework auto-discovers them via discover_handler_metrics(), which scans the handler’s class hierarchy (MRO) for any prometheus_client metric instances (Counter, Gauge, Histogram, Summary, Info).
from prometheus_client import Counter, Histogram
from drakkar.handler import BaseDrakkarHandler
class MyHandler(BaseDrakkarHandler[MyInput, MyOutput]):
# These are automatically discovered and shown in the debug UI
items_parsed = Counter(
'myapp_items_parsed_total',
'Total items parsed from executor output',
)
parse_duration = Histogram(
'myapp_parse_duration_seconds',
'Time spent parsing executor output',
)
async def on_task_complete(self, result):
with self.parse_duration.time():
items = parse_output(result.stdout)
self.items_parsed.inc(len(items))
...
User metrics are displayed alongside framework metrics on the debug UI’s metrics page (/debug), tagged with source user to distinguish them from the built-in drakkar_* metrics.
Scrape Configuration¶
Metrics are exposed at :9090/metrics by default (configurable via metrics.port).
Structured Logging¶
Drakkar uses structlog for structured, context-rich logging. By default, log output goes to stderr (standard Unix convention), but you can redirect to stdout or a file.
Configuration¶
logging:
level: INFO # DEBUG, INFO, WARNING, ERROR, CRITICAL
format: json # json | console
output: stderr # stderr | stdout | file path
json(default) – produces one JSON object per line, suitable for ingestion by Elasticsearch, Loki, Datadog, or any log aggregator.console– human-readable colored output for local development.
Output Destination¶
The output field controls where logs are written:
| Value | Behavior |
|---|---|
stderr (default) |
Standard error stream |
stdout |
Standard output stream |
| File path | Append to the specified file (created if missing, parent directories created automatically) |
File paths support template variables:
| Variable | Substitution |
|---|---|
{worker_id} |
Worker name (from WORKER_ID env var or config) |
{cluster_name} |
Cluster name (from config or env var) |
# Examples
logging:
output: stdout
logging:
output: /var/log/drakkar/{worker_id}.log
logging:
output: /var/log/{cluster_name}/{worker_id}.jsonl
Environment variable override: DK_LOGGING__OUTPUT=/var/log/drakkar/worker.log
Automatic Context Fields¶
Every log line includes these fields, bound once at startup:
| Field | Source | Example |
|---|---|---|
timestamp |
ISO 8601 timestamp | 2026-04-06T12:34:56.789Z |
level |
Log level | info |
service_name |
Hardcoded | drakkar |
worker_id |
worker_name_env env var |
worker-3 |
consumer_group |
kafka.consumer_group config |
drakkar-workers |
module |
Python module name | drakkar.partition |
Per-Context Binding¶
The framework binds additional fields for specific operations using structlog.contextvars. These appear on all log lines within that context:
partition– bound during partition processinghook– bound during handler hook execution (e.g.,arrange,on_task_complete,on_message_complete)task_id– bound during task execution and result handling
Usage in Handler Code¶
import structlog
logger = structlog.get_logger()
class MyHandler(BaseDrakkarHandler[MyInput, MyOutput]):
async def arrange(self, messages, pending):
await logger.ainfo(
"Arranging batch",
category='handler',
message_count=len(messages),
pending_count=len(pending.pending_task_ids),
)
...
Output (JSON format):
{
"timestamp": "2026-04-06T12:34:56.789Z",
"level": "info",
"event": "Arranging batch",
"service_name": "drakkar",
"worker_id": "worker-3",
"consumer_group": "drakkar-workers",
"module": "myapp.handler",
"category": "handler",
"message_count": 50,
"pending_count": 12
}
Debug UI¶
The debug web UI is a FastAPI application served in a separate thread so that CPU-intensive executor tasks on the main event loop do not block the interface. Enabled by default on port 8080.
Pages¶
/ – Dashboard¶
The landing page with a high-level overview of worker health:
- Partition tiles – one tile per assigned partition showing queue depth and status.
- Pool utilization – active tasks vs. max workers.
- Event counters – consumed, completed, failed, produced, committed totals.
- Consumer lag – total lag across all assigned partitions.
- Custom links – configurable via
debug.custom_linkswith template variables ({worker_id},{cluster_name},{metrics_port},{debug_port}). - Prometheus graph links – when
debug.prometheus_urlis configured, each stat card links directly to a Prometheus graph filtered to this worker. Worker-scoped links are grouped by category (Throughput, Latency, Health, Errors). Cluster-wide links appear whendebug.prometheus_cluster_labelis set.
/partitions – Partition Overview¶
Per-partition stats table:
- Queue size (messages waiting)
- Pending offsets (uncommitted)
- Consumer lag (from Kafka high watermark)
- Committed offset and high watermark
- Consumed, completed, and failed event counts
- Live/dead status indicator
/partitions/{id} – Partition Detail¶
Paginated event browser for a single partition. Shows all recorded events (consumed, arranged, task_started, task_completed, task_failed, committed) in reverse chronological order with 50 events per page.
/sinks – Sink Delivery Stats¶
Overview of all configured sinks with live delivery statistics:
- Delivered count and payload count
- Error count and retry count
- Last delivery timestamp and duration
- Last error message and timestamp
- Links to external sink UIs (when
ui_urlis configured on a sink)
/live – Live Pipeline View¶
Real-time view of the processing pipeline, fed by WebSocket (/ws). Organized in tabbed sections:
- Arrange – active arrange() calls with partition, duration, and message labels.
- Executors Timeline – visual timeline of running, pending, and recently finished tasks. Slot-based layout matching
max_executors. Tasks shorter thanws_min_duration_msare hidden (except failures, which are always shown). See Duration Thresholds for threshold tuning. - Collect – recently completed tasks with exit codes and durations.
Pool utilization bar shows active, waiting, and available slot counts.
/history – Event Browser¶
Filterable, paginated event browser across all partitions:
- Partition filter dropdown
- Event type toggles (consumed, arranged, task_started, task_completed, task_failed, committed, etc.)
- 100 events per page with forward/backward pagination
/debug – Debug Tools¶
Multi-purpose debug page with:
- Metrics viewer – live snapshot of all registered Prometheus metrics (framework and user), accessible via the
/api/debug/metricsJSON endpoint. - Periodic tasks – status of all
@periodichandler methods: last run time, duration, success/error status, total counts, and a sparkline of recent runs. - Cross-worker message trace – two search modes:
- By partition:offset – enter
partition:offset(e.g.,5:42) to trace a message through the full pipeline. - By label – search inputs are auto-generated for each label key found in the database (e.g.,
request_id,pattern). Enter a value to find all tasks and events matching that label. Useful for tracing a specific request across partitions and workers. Both modes search the current worker first, then other live workers in the cluster, then rotated historical DB files.
- By partition:offset – enter
- Database management – list all debug database files with event counts and sizes, download individual files, merge multiple files into one, and view per-file breakdown by event type.
- Message Probe – replay a single pasted message through the handler pipeline (
arrange→ executor →on_task_complete→on_message_complete→on_window_complete) with zero footprint: no sink writes, no offset commits, no event-recorder rows, no cache writes, no peer sync. The report shows every task’s stdout/stderr/exit code, each hook’s returnedCollectResult, the payloads that would have been written to each sink, every cache call, a timeline waterfall, and any exceptions raised at any stage. Posts to/api/debug/probewith a JSON body containing the message’svalue(and optionalkey,partition,offset,topic,timestamp,use_cache).
/task/{id} – Task Detail¶
Detailed view of a single task’s lifecycle:
- Status – running, completed, or failed
- Labels – user-defined labels from ExecutorTask
.labels - Duration – wall-clock execution time
- PID – OS process ID
- CLI – reconstructed command line (
binary_path+args) - stdout/stderr – captured process output (subject to
output_min_duration_msthreshold) - Source offsets – which Kafka offsets this task covers
- Event timeline – chronological list of all events for this task_id (started, completed/failed, task_complete)
Flight Recorder¶
The flight recorder is a SQLite-based event log that captures the full processing lifecycle. It provides the data backing for the debug UI and survives worker restarts (within retention limits).
Configuration¶
debug:
db_dir: /tmp # Directory for SQLite files; '' disables disk persistence
store_events: true # Write processing events to the events table
store_config: true # Write worker config (enables autodiscovery)
store_state: true # Periodic state snapshots
flush_interval_seconds: 5 # Buffer flush interval
max_buffer: 50000 # In-memory event buffer size
rotation_interval_minutes: 60 # Rotate to a new DB file every N minutes
retention_hours: 24 # Delete DB files older than this
retention_max_events: 100000 # Cap total events across DB files
store_output: true # Include stdout/stderr in event records
Database Schema¶
Each database file can contain up to three tables, controlled by the store_events, store_config, and store_state flags:
events – Processing Events¶
Stores processing events across the full pipeline lifecycle.
Key columns: ts, dt, event, partition, offset, task_id, args, stdout, stderr, exit_code, duration, output_topic, metadata (JSON), pid, labels (JSON), origin ('kafka' or 'http', default 'kafka'), client_name (webapp client name; NULL for Kafka-origin rows), request_id (webapp framework id; NULL for Kafka-origin rows).
Indexed on (partition, offset), ts, dt, task_id, event, labels (partial, where not null), origin, and request_id (partial, where not null).
Event types:
| Event | When recorded | Key fields |
|---|---|---|
consumed |
Message polled from Kafka | partition, offset |
arranged |
arrange() completes for a window |
partition, metadata (message_count, task_count, offsets, message_labels) |
task_started |
Subprocess launched (after semaphore acquired) | task_id, partition, args, pid, labels, metadata (source_offsets, slot) |
task_completed |
Subprocess finished with exit 0 | task_id, duration, exit_code, stdout, stderr, pid, labels |
task_failed |
Subprocess failed (non-zero exit, timeout, crash) | task_id, duration, exit_code, pid, labels, metadata (exception) |
task_complete |
on_task_complete() hook finishes for one task |
task_id, partition, duration, metadata (output_message_count) |
message_complete |
on_message_complete() hook finishes for one source message |
partition, offset, duration, metadata (task_count, succeeded, failed, replaced, output_message_count) |
window_complete |
on_window_complete() hook finishes for one arrange() window |
partition, duration, metadata (window_id, task_count, output_message_count) |
produced |
Kafka payload delivered to output topic | partition, offset, output_topic |
sink_delivered |
Sink delivery succeeds | metadata (sink_type, sink_name, payload_count, duration) |
sink_error |
Sink delivery fails | metadata (sink_type, sink_name, error, attempt) |
committed |
Kafka offset committed | partition, offset |
assigned |
Partition assigned during rebalance | partition |
revoked |
Partition revoked during rebalance | partition |
periodic_run |
Periodic task execution completes | task_id (task name), duration, exit_code (0=ok, 1=error), metadata (status, error) |
webapp_request_received |
One HTTP request passed auth + rate-limit + body parsing and is about to dispatch to T1 | origin='http', client_name, request_id, metadata (started_at, body_bytes) |
webapp_request_completed |
An HTTP request returned 200 to the caller | origin='http', client_name, request_id, duration, metadata (status=’ok’, duration_ms) |
webapp_request_timeout |
T2’s asyncio.wait_for tripped its request_timeout_seconds budget; client received a 504 |
origin='http', client_name, request_id, duration, metadata (duration_ms) |
webapp_request_rate_limited |
Per-client rpm window full; client received a 429 | origin='http', client_name, metadata (rpm_limit, requests_in_window) |
webapp_request_auth_failed |
Authorization header missing or naming a non-configured token; client received a 401 |
origin='http', metadata (token_prefix – redacted to first 4 chars) |
webapp_request_dropped_after_timeout |
T1 reached a cancellation gate after T2 had already 504’d. Marks pipeline work the framework managed to skip thanks to cooperative cancellation. | origin='http', client_name, request_id |
For HTTP-origin rows, partition=-1 is used as a synthetic partition value so they remain distinct from any real Kafka partition without requiring a separate table. Filter by origin='http' to see all webapp activity, or by request_id=... to walk the full lifecycle of one HTTP request (received → task_started → task_completed → completed/timeout/dropped).
Fields subject to duration thresholds: args, stdout, stderr are omitted for fast tasks below output_min_duration_ms. Events below event_min_duration_ms are not stored at all.
Recorder upgrade story (delete pre-webapp DBs)
The webapp release added three columns to the events table –
origin, client_name, request_id – without a migration
framework. Recorder DBs are observability-only, already rotated and
disposable, so a one-shot delete on upgrade is preferable to a
migration runner that exists only to add three optional columns.
On upgrade, operators delete pre-existing per-worker recorder
DBs in debug.db_dir before restarting workers. New rotation-cycle
DBs include the new columns automatically.
The framework detects the schema mismatch at startup: the recorder
runs PRAGMA table_info(events) immediately after opening each DB
and raises RecorderSchemaError when origin / client_name /
request_id are missing. The exception propagates through
AppLifecycle._async_run and aborts startup with the actionable
next step in the message: delete the offending DB(s) under db_dir
and restart. Without this fail-fast check, pre-webapp DBs would
surface a confusing OperationalError: no such column mid-request
on the first HTTP event the recorder tried to write.
worker_config – Autodiscovery¶
Single-row table written at startup (and after each rotation). Contains the full worker identity and configuration: worker_name, cluster_name, ip_address, debug_port, debug_url, kafka_brokers, source_topic, consumer_group, binary_path, max_executors, task_timeout_seconds, max_retries, window_size, sinks_json, env_vars_json.
This table is what enables the worker autodiscovery feature – other workers scan for it in shared db_dir.
Secrets are redacted before they reach disk
The recorder SQLite file is downloadable via the debug UI. To avoid publishing credentials through that path, three redactions are applied before any env data is written:
kafka_brokers— embedded credentials are stripped from SASL URIs.SASL_SSL://alice:s3cret@host:9094becomesSASL_SSL://***:***@host:9094. Host and port remain visible.worker_config.env_vars_json— values for env var names matching secret patterns (*PASSWORD*,*SECRET*,*TOKEN*,*_KEY,*API_KEY*,*CREDENTIAL*,*_DSN) are replaced with***. For other names, any URL-shaped value has its embeddeduser:pass@credentials stripped.- per-task
envmetadata —task.envvalues written by your handler inarrange()are sanitized with the same patterns before thetask_startedevent is recorded. The original task object is not mutated; only the recorded copy is redacted (the subprocess still receives the real values). This closes the last env-related leak into the debug UI.
The contract is “aggressive redact, accept false positives”:
PASSWORD_RESET_URL is redacted because it matches *PASSWORD* even
though a reset URL isn’t a credential. Rename the var if you need the
value visible. The ExecutorConfig.env (framework-level) values are
never written to the recorder at all — they only reach the
subprocess environment.
This protects operators who add secret-named vars to expose_env_vars
without thinking, DSNs passed through otherwise-innocuous var names
(UPSTREAM_URL, CACHE_URL, etc.), and per-task env values that
handlers assemble from inbound message payloads.
worker_state – Periodic Snapshots¶
Appended every state_sync_interval_seconds (default: 10). Captures a point-in-time snapshot: uptime_seconds, assigned_partitions, partition_count, pool_active, pool_max, total_queued, cumulative counters (consumed_count, completed_count, failed_count, produced_count, committed_count), and paused flag.
File Layout and Rotation¶
Database files follow the naming pattern:
For example: /tmp/worker-1-2026-04-06__14_55_00.db
A symlink {worker_name}-live.db always points to the currently active database file while the worker is running:
The symlink is removed on graceful shutdown.
Rotation: every rotation_interval_minutes (default: 60), the recorder opens a new timestamped DB file before closing the old one (no query gap). The worker config is re-written to the new file and the live symlink is updated.
Retention: after rotation, files older than retention_hours (default: 24) are deleted. An additional cap limits the total number of files based on retention_max_events / 10,000.
Buffer Mechanics¶
Events are not written to SQLite immediately. They accumulate in an
in-memory ring buffer (collections.deque with maxlen=max_buffer,
default 50,000). A background task flushes the buffer to disk every
flush_interval_seconds (default: 5) as a batch INSERT. This design:
- Avoids per-event disk I/O (thousands of events/sec would bottleneck SQLite)
- Keeps the main event loop fast (recording an event is a deque append, ~1us)
- Provides a short window of recent events even when
db_diris empty (in-memory only mode)
If the buffer fills before the next flush (e.g., during a burst),
oldest events are evicted (ring buffer behavior). Drops are no longer
silent — the drakkar_recorder_dropped_events_total counter
(see Recorder metrics) increments on every evicted
event, and the drakkar_recorder_buffer_size gauge shows the current
depth. Alert on sustained non-zero drops and raise max_buffer or
shorten flush_interval_seconds when they appear.
On shutdown, the buffer is flushed one final time before closing the database.
Merging Databases¶
The debug UI’s database section (/debug) allows selecting multiple
database files and merging them into a single file. This is useful for:
- Post-mortem analysis – combine files from multiple workers and time periods into one queryable database
- Cross-worker correlation – merged files contain events from
all selected workers, with
worker_namepreserved in theworker_configtable - Archival – download a merged file for offline analysis with any SQLite tool
The merge process copies all events from selected files into a new
timestamped file (merged-{timestamp}.db), deduplicates by
(ts, event, partition, offset, task_id), and creates a combined
worker_config table listing all source workers. The merged file
appears in the database list and can be downloaded.
Duration Thresholds¶
Drakkar provides four independent duration thresholds that control what gets logged, broadcast, and stored. These are essential for high-throughput workers where fast tasks would otherwise flood every observability channel.
debug:
log_min_duration_ms: 500
ws_min_duration_ms: 500
event_min_duration_ms: 0
output_min_duration_ms: 500
log_min_duration_ms (default: 500)¶
Controls which completed/failed tasks produce a structlog entry. Tasks finishing faster than this threshold are silent in the log stream. Keeps logs focused on slow or problematic tasks.
ws_min_duration_ms (default: 500)¶
Controls which tasks appear in the live UI (/live page). When a task starts, its WebSocket broadcast is deferred. If the task completes before the threshold, neither the start nor the completion event is sent to connected browsers – the task is invisible in the live view.
Exception: failed tasks are always sent to WebSocket regardless of duration. This ensures failures are never hidden.
event_min_duration_ms (default: 0)¶
Controls which tasks are persisted to the SQLite flight recorder. When set to a value greater than 0, tasks completing faster than the threshold are not written to the events table. Set to 0 to store all events (the default).
This is useful for workers that process millions of fast tasks where full event recording would consume excessive disk space.
output_min_duration_ms (default: 500)¶
Controls whether args, stdout, and stderr are stored for a task. Tasks completing faster than this threshold have their output omitted from the event record – the event itself is still recorded (subject to event_min_duration_ms), but without the potentially large output fields.
Reduces storage for fast tasks where the output is uninteresting.
Worker Autodiscovery¶
A shared db_dir between workers is not required for Drakkar to
function. Each worker operates independently – its own Kafka consumer,
executor pool, sinks, and flight recorder work fine with a local
directory.
You can also disable the flight recorder database entirely by setting
db_dir: "" (empty string). The debug web UI and WebSocket live
streaming still work (events are held in memory only), but these
features are disabled: event history (/history), message trace,
label search, database download/merge, periodic task history, and
worker autodiscovery. The live pipeline view (/live) and dashboard
counters continue working since they read from in-memory state.
When workers share the same db_dir (e.g., a shared NFS mount,
Kubernetes PVC, or Docker volume), several additional debug features
become available:
- Worker switcher – jump between worker UIs from a dropdown in the nav bar, without remembering ports or IP addresses
- Cross-worker message trace – trace a message across all workers in the cluster by partition:offset or by label value
- Database merge – combine flight recorder files from multiple workers into a single SQLite file for post-mortem analysis
- Cluster-wide database browser – see all workers’ DB files in one view, sorted and filtered
Without a shared db_dir, each worker’s debug UI only sees its own
data. All other functionality (processing, sinks, metrics, logging)
works identically.
How It Works¶
- Each worker creates a timestamped SQLite file in
db_dir(e.g.,worker-1-2026-04-08__14_30_00.db) and maintains a{worker_name}-live.dbsymlink pointing to the current file. - Workers with
store_config: true(default) write their configuration – worker name, IP address, debug port, cluster name, Kafka settings – to theworker_configtable at startup and after each DB rotation. - The debug UI scans
db_dirfor*-live.dbsymlinks belonging to other workers. - For each live symlink, the UI reads
worker_configto get the worker’s name, address, and cluster membership. - Workers are grouped by
cluster_name– only workers in the same cluster appear together. Unclustered workers show in a separate group. - On graceful shutdown, the
*-live.dbsymlink is removed so stopped workers don’t appear in the dropdown.
Worker Switcher¶
The debug UI navigation bar includes a worker dropdown that lists all
discovered workers, grouped by cluster. Clicking a worker navigates
to its debug UI (using debug_url if configured, otherwise
http://{ip_address}:{debug_port}/). The dropdown refreshes every
10 seconds.
Setup¶
The minimal config for autodiscovery:
# all workers must share the same db_dir path
debug:
db_dir: "/shared" # mount the same volume on all workers
store_config: true # default, writes worker_config table
# optional: set cluster_name to group workers
cluster_name: "my-cluster"
In Kubernetes, use a shared PVC:
volumes:
- name: debug-shared
persistentVolumeClaim:
claimName: drakkar-debug
# mount on all worker pods at the same path
volumeMounts:
- name: debug-shared
mountPath: /shared
In Docker Compose, use a named volume:
volumes:
shared: {}
services:
worker-1:
volumes:
- shared:/shared
worker-2:
volumes:
- shared:/shared
Cross-Worker Trace¶
The trace feature on the /debug page searches for a message’s lifecycle
across all workers in the cluster. Two search modes are available:
By partition:offset – given a partition and offset, finds the consumed event and all related events (task_started, task_completed, task_failed, task_complete, message_complete, produced, committed).
By label – given a label key and value (e.g., request_id=abc-123),
finds all tasks whose labels match and returns
their full event lifecycle. This is useful when you know a business
identifier but not which partition or offset it landed on. Label search
inputs appear automatically on the /debug page for each label key
found in the database.
Both modes search in the same order:
- The current worker’s live database.
- Other workers’ live databases (same cluster).
- Rotated (historical) database files in
db_dir, newest first.
Each returned event carries a worker_name field identifying which
worker processed it. Labels are stored as JSON in the labels column
with a partial index (WHERE labels IS NOT NULL) for efficient
filtering.
Periodic Tasks¶
The @periodic decorator schedules handler methods to run at fixed intervals alongside the main processing loop. See also Periodic Tasks in the handler reference.
Declaration¶
from drakkar.periodic import periodic
class MyHandler(BaseDrakkarHandler[MyInput, MyOutput]):
@periodic(seconds=60)
async def refresh_cache(self):
"""Reload lookup table from database every minute."""
self.lookup = await load_lookup_table()
@periodic(seconds=300, on_error='stop')
async def health_check(self):
"""Check downstream service health. Stop if it fails."""
await ping_downstream()
Behavior¶
- Discovery: at startup, the framework inspects the handler instance for methods decorated with
@periodic. Onlyasyncfunctions are accepted – decorating a sync function raisesTypeError. - Scheduling: each periodic task runs in its own
asyncio.Taskwithin the main event loop. The loop is:sleep(seconds)thenawait coro_fn(). The next interval starts only after the current invocation finishes, preventing overlapping runs. - Lifecycle: periodic tasks are started after on_ready() completes and cancelled during graceful shutdown (before partition processors are drained).
- Error handling:
on_error='continue'(default) – the exception is logged, and the task continues scheduling on the next interval.on_error='stop'– the exception is logged, and this specific periodic task is permanently cancelled. Other periodic tasks and the main processing loop are unaffected.