Executor System¶
Drakkar runs external binaries as subprocesses – not through a shell, but directly via asyncio.create_subprocess_exec. Arguments are passed as a list, making the execution safe from shell injection by design. Concurrency is controlled by an asyncio.Semaphore sized to executor.max_executors, and each task is bounded by a wall-clock timeout via asyncio.wait_for.
ExecutorTask¶
Every subprocess execution starts with an ExecutorTask created in your arrange() hook. The task carries everything the framework needs to launch the process and track its lifecycle.
| Field | Type | Default | Description |
|---|---|---|---|
task_id |
str |
(required) | Unique identifier. Use make_task_id(prefix) to generate a time-sortable hex ID (e.g., t-0194a3b2c1d4e5f6-a7c2f1e3). |
args |
list[str] |
(required) | CLI arguments appended to the binary path when launching the process. |
source_offsets |
list[int] |
(required) | Kafka offsets of the source messages that produced this task. Used for offset watermark tracking – offsets are committed only after all sinks confirm delivery. |
metadata |
dict |
{} |
Arbitrary key-value data carried through the pipeline. Accessible in collect() via result.task.metadata. |
labels |
dict[str, str] |
{} |
User-defined key-value labels shown in the debug UI (live timeline, task detail page, trace view). Useful for request_id, user_id, or other domain identifiers. |
env |
dict[str, str] |
{} |
Per-task environment variables. Merged on top of executor.env from config (task overrides config on key conflict). See Environment Variables. |
binary_path |
str \| None |
None |
Per-task binary override. Takes precedence over executor.binary_path from config. |
stdin |
str \| None |
None |
Optional string piped to the process stdin after launch. When None, stdin is not connected. |
Generating Task IDs¶
The make_task_id() helper produces short, time-sortable, unique IDs:
from drakkar import make_task_id
task_id = make_task_id('search')
# "search-0194a3b2c1d4e5f6-a7c2f1e3"
The format is {prefix}-{timestamp_hex}-{random_hex}. Lexicographic order matches creation order, and the nanosecond timestamp combined with a 32-bit random suffix ensures uniqueness.
Binary Path Resolution¶
The binary to execute is resolved per-task using this priority:
task.binary_path– set on the individualExecutorTaskexecutor.binary_path– from the YAML/env config
If neither is set, the task fails immediately with exit_code=-1 and a descriptive error message. No process is launched.
This two-level resolution lets you run different binaries for different task types within the same handler:
async def arrange(self, messages, pending):
tasks = []
for msg in messages:
binary = '/usr/bin/fast-search' if msg.payload.priority == 'high' else '/usr/bin/deep-search'
tasks.append(ExecutorTask(
task_id=make_task_id('search'),
args=['--query', msg.payload.query],
source_offsets=[msg.offset],
binary_path=binary,
))
return tasks
If all tasks use the same binary, set executor.binary_path in your config and omit binary_path from individual tasks.
Execution Flow¶
Each task goes through these steps:
1. Enter Semaphore Queue¶
The task enters the waiting state (waiting_count incremented). It blocks on async with semaphore until a slot is available.
2. Acquire Slot¶
When the semaphore is acquired, waiting_count is decremented, active_count is incremented, and a slot ID (0 to max_executors - 1) is assigned from the available pool.
3. Launch Process¶
The framework calls asyncio.create_subprocess_exec with:
- First argument: the resolved binary path
- Remaining arguments:
task.args(passed as individual arguments, not via shell) stdin:PIPEiftask.stdinis set, otherwise not connectedstdout:PIPE(always captured)stderr:PIPE(always captured)
4. Communicate¶
proc.communicate(input=stdin_bytes) writes stdin (if any) and waits for the process to exit. All stdout and stderr are captured in memory.
5. Enforce Timeout¶
The entire communicate() call is wrapped in asyncio.wait_for(timeout=executor.task_timeout_seconds) (default: 120 seconds).
6. Handle Outcome¶
Four outcomes are possible:
Success (exit code 0) – Returns an ExecutorResult with captured output. The collect() hook is called next.
Non-zero exit – Raises ExecutorTaskError. The on_error() hook decides what to do.
Timeout – The process is killed (proc.kill()), and ExecutorTaskError is raised with stderr='task timed out'.
OSError (binary not found) – ExecutorTaskError is raised immediately. No process was ever started.
7. Release Slot¶
In the finally block (guaranteed to run), the slot ID is returned to the pool, active_count is decremented, and the semaphore is released for the next waiting task. If the process is still running (e.g., after a timeout), it is killed before release.
ExecutorResult¶
Returned on successful execution (exit code 0). Also attached to ExecutorTaskError on failures for inspection.
| Field | Type | Description |
|---|---|---|
exit_code |
int |
Process exit code. 0 on success, non-zero on failure, -1 for timeout/launch errors. |
stdout |
str |
Captured process stdout, decoded as UTF-8 with errors='replace'. |
stderr |
str |
Captured process stderr, decoded as UTF-8 with errors='replace'. |
duration_seconds |
float |
Wall-clock time from process start to completion, rounded to 3 decimal places. |
task |
ExecutorTask |
The original task that produced this result. Carries metadata and labels through the pipeline. |
pid |
int \| None |
OS process ID. None if the process never started. |
Error Handling (on_error Hook)¶
When a task fails, the framework calls your on_error() hook. The error argument is an ExecutorError:
| Field | Type | Description |
|---|---|---|
task |
ExecutorTask |
The task that failed. |
exit_code |
int \| None |
Process exit code. None if the process failed to start or timed out. |
stderr |
str |
Process stderr or a short error description (e.g., 'task timed out'). |
exception |
str \| None |
Exception message for timeout/launch failures. None for normal non-zero exits. |
pid |
int \| None |
Process ID. None if the process never started. |
Return Values¶
The hook must return one of:
ErrorAction.RETRY – Re-execute the same task. The framework retries up to executor.max_retries (default: 3) times. Retries are immediate (no backoff). If retries are exhausted, the task is marked as failed and the window continues.
ErrorAction.SKIP – Mark the task as failed, append its result to the window, and continue. This is the default behavior.
list[ExecutorTask] – Replace the failed task with new tasks. The new tasks are added to the current window and execute immediately. The window’s total_tasks grows dynamically to include them. Use this to split a large failing task into smaller ones, or to substitute a fallback binary.
Example¶
from drakkar import BaseDrakkarHandler, ErrorAction, ExecutorError, ExecutorTask
class MyHandler(BaseDrakkarHandler):
async def on_error(self, task, error):
# Retry transient failures (simulated by exit code 75)
if error.exit_code == 75:
return ErrorAction.RETRY
# Timeout: try a lighter binary as fallback
if error.exception and 'Timeout' in error.exception:
return [ExecutorTask(
task_id=make_task_id('fallback'),
args=task.args + ['--fast-mode'],
source_offsets=task.source_offsets,
metadata=task.metadata,
binary_path='/usr/bin/lightweight-search',
)]
# Everything else: skip
return ErrorAction.SKIP
Concurrency and Backpressure¶
Semaphore-based Concurrency¶
executor.max_executors (default: 4) controls how many subprocesses run in parallel across all partitions. The ExecutorPool maintains an asyncio.Semaphore of this size. Tasks from any partition compete for the same pool.
When all slots are occupied, additional tasks wait in the semaphore queue. The waiting_count property tracks how many tasks are queued, while active_count tracks how many are running.
Kafka Consumer Backpressure¶
The framework pauses and resumes the Kafka consumer based on total queued work across all partition processors. See Performance Tuning for tuning recommendations. Two watermarks control the behavior:
| Watermark | Formula | Default (max_executors=4) |
|---|---|---|
| High (pause) | max_executors x backpressure_high_multiplier |
4 x 32 = 128 |
| Low (resume) | max(1, max_executors x backpressure_low_multiplier) |
max(1, 4 x 4) = 16 |
The total queued count is the sum of queue_size + inflight_count across all active partition processors.
- When total queued >= high watermark: the consumer is paused (all assigned partitions stop receiving messages).
- When total queued <= low watermark: the consumer is resumed.
The gap between high and low watermarks (hysteresis) prevents rapid pause/resume oscillation. Pause and resume always operate on all assigned partitions simultaneously.
Configuration¶
executor:
max_executors: 8
backpressure_high_multiplier: 32 # pause at 8 * 32 = 256 queued
backpressure_low_multiplier: 4 # resume at 8 * 4 = 32 queued
Windowing¶
Messages are collected into windows before being passed to arrange(). A window contains 1 to executor.window_size (default: 100) messages from a single partition.
Window Lifecycle¶
- The partition processor waits for the first message (blocks up to 1 second).
- It drains any additional messages from the queue without blocking, up to
window_size. - The window of messages is passed to arrange(), which returns
ExecutorTaskobjects. - All tasks in the window are launched concurrently.
- The window tracks
completed_countvstotal_tasksto know when it is done. - On completion, on_window_complete() is called, offsets are marked complete, and a commit is attempted.
Concurrent Windows¶
Multiple windows can be in-flight concurrently for the same partition. The processor does not wait for window N to complete before starting window N+1. This keeps throughput high when tasks have variable execution times.
Each window’s tasks execute independently. Offset commits follow watermark semantics – offsets are committed only when all preceding messages have been fully processed and delivered. A slow task in window 1 blocks offset advancement even if window 2 has already finished.
Dynamic Window Growth¶
If on_error() returns replacement tasks, the window’s total_tasks count grows. The window only completes when all tasks – including dynamically added ones – have finished.
Stdin Support¶
The stdin field on ExecutorTask lets you pipe data to a subprocess without passing it as CLI arguments. When set, the framework connects a pipe to the process stdin and writes the string immediately after launch.
This is useful for passing structured input (JSON, configuration) that would be awkward or unsafe as command-line arguments.
Example¶
import json
from drakkar import ExecutorTask, make_task_id
async def arrange(self, messages, pending):
tasks = []
for msg in messages:
payload = {
'query': msg.payload.query,
'filters': msg.payload.filters,
'options': {'max_results': 100, 'timeout_ms': 5000},
}
tasks.append(ExecutorTask(
task_id=make_task_id('search'),
args=['--mode', 'stdin'],
source_offsets=[msg.offset],
metadata={'request_id': msg.payload.request_id},
stdin=json.dumps(payload),
))
return tasks
The subprocess reads the JSON from stdin:
#!/usr/bin/env python3
import json
import sys
request = json.load(sys.stdin)
# process request...
print(json.dumps(result))
When stdin is None (the default), the process stdin is not connected at all.
Environment Variables¶
Custom environment variables can be passed to executor subprocesses at
two levels: globally via config and per-task via ExecutorTask.env.
Merge order¶
The subprocess environment is built by merging three layers, where later layers override earlier ones on key conflict:
- Parent process env – the worker’s own environment (inherited)
- Config env (
executor.env) – applied to all tasks - Task env (
ExecutorTask.env) – applied to one task, overrides config
When neither config nor task defines custom env vars, the subprocess inherits the parent environment directly (no copy overhead).
Config-level env¶
Set executor.env in your YAML config to pass variables to every task:
executor:
binary_path: "/usr/local/bin/my-tool"
env:
DATABASE_URL: "postgresql://localhost/mydb"
LOG_LEVEL: "warn"
CORPUS_PATH: "/data/corpus"
These are useful for connection strings, feature flags, or paths that are the same for every task but shouldn’t be hardcoded in the binary.
Environment variable override: DRAKKAR_EXECUTOR__ENV='{"KEY": "value"}'
Per-task env¶
Set env on ExecutorTask in arrange() for task-specific values:
async def arrange(self, messages, pending):
tasks = []
for msg in messages:
tasks.append(ExecutorTask(
task_id=make_task_id('proc'),
args=['--process'],
source_offsets=[msg.offset],
env={
'REQUEST_ID': msg.payload.request_id,
'PRIORITY': str(msg.payload.priority),
},
))
return tasks
Per-task env is useful when the binary reads configuration from environment variables and different messages need different settings.
Override example¶
Config sets a default, task overrides it for specific messages:
# handler.py — arrange()
ExecutorTask(
task_id=make_task_id('proc'),
args=['--run'],
source_offsets=[msg.offset],
env={'MODE': 'turbo'}, # overrides config's "standard"
# TIMEOUT: "30" is still inherited from config
)
The subprocess sees MODE=turbo and TIMEOUT=30.
Configuration Reference¶
All executor settings live under the executor key in your YAML config:
executor:
binary_path: /usr/local/bin/my-worker # default binary (optional)
max_executors: 4 # concurrent subprocess limit
task_timeout_seconds: 120 # per-task wall-clock timeout
window_size: 100 # max messages per arrange() call
max_retries: 3 # retry limit per failed task
drain_timeout_seconds: 5 # max wait for in-flight tasks on shutdown
backpressure_high_multiplier: 32 # pause consumer at max_executors * this
backpressure_low_multiplier: 4 # resume consumer at max_executors * this
| Field | Type | Default | Min | Description |
|---|---|---|---|---|
binary_path |
str \| None |
None |
1 char | Default subprocess binary. None requires per-task override via ExecutorTask.binary_path. |
max_executors |
int |
4 |
1 | Concurrent subprocess limit (semaphore size). |
task_timeout_seconds |
int |
120 |
1 | Per-subprocess wall-clock timeout in seconds. |
window_size |
int |
100 |
1 | Max messages per arrange() window. |
max_retries |
int |
3 |
0 | Max retries per failed task. 0 disables retries. |
drain_timeout_seconds |
int |
5 |
1 | Max wait for in-flight tasks during graceful shutdown. |
backpressure_high_multiplier |
int |
32 |
1 | Pause threshold = max_executors x this. |
backpressure_low_multiplier |
int |
4 |
1 | Resume threshold = max(1, max_executors x this). |
Environment variable overrides use the DRAKKAR_EXECUTOR__ prefix: