Webapp¶
An optional synchronous-HTTP entry point that exposes the same handler
pipeline used for Kafka. When webapp.enabled=true, Drakkar runs a
FastAPI server on a dedicated thread accepting POST requests, parses the
body into a user-defined Pydantic model, dispatches the request to the
main pipeline loop, and returns a JSON response built from the executor
fan-out.
The webapp is opt-in (webapp.enabled=false by default). It coexists
with the Kafka source pipeline rather than replacing it – the same
worker can consume from Kafka and serve HTTP requests at the same time.
When to use the webapp vs the Kafka source path¶
| Question | Choose… |
|---|---|
| “Does my caller need a synchronous response in one round-trip?” | Webapp. Kafka’s at-least-once delivery is one-way; a synchronous reply needs HTTP (or a request/reply broker the framework does not provide). |
| “Is my workload high-throughput, restart-resilient, with persistent retries?” | Kafka source. Webapp requests live for one HTTP round-trip and are dropped if the worker restarts mid-flight; Kafka offsets re-deliver after a crash. |
| “Do I need per-tenant authentication and rate limits at the worker?” | Webapp. Auth + rpm is built in. The Kafka path has no per-producer admission control. |
| “Is the pipeline logic identical for HTTP and Kafka inputs?” | Both, on the same worker. Reuse arrange() / arrange_http_request() and share the executor pool, sinks, cache, and recorder. |
| “Will my caller send 1000s of requests per second from many concurrent clients?” | Kafka source. A single worker’s webapp is capped by max_concurrent (default 64) and rate-limited per client; horizontal scale-out plus the Kafka-fallback pattern below is the path to higher throughput. |
The framework defaults the webapp to a small dev deployment (one anonymous client, rpm=4). Production deployments should configure named clients with non-empty tokens and per-client rpm caps – see Config reference.
Enabling¶
Set webapp.enabled: true in the worker config and declare the HTTP
request and response types as the third and fourth generic parameters
to BaseDrakkarHandler:
import drakkar as dk
from pydantic import BaseModel
class KafkaIn(BaseModel):
request_id: str
pattern: str
class KafkaOut(BaseModel):
request_id: str
matches: int
class RankRequest(BaseModel):
"""Body of POST /process."""
request_id: str
query: str
top_k: int = 10
class RankResponse(BaseModel):
"""Body of the JSON response under the ``result`` key."""
request_id: str
items: list[str]
score: float
class RankHandler(dk.BaseDrakkarHandler[KafkaIn, KafkaOut, RankRequest, RankResponse]):
async def arrange(self, messages, pending):
# Kafka path -- unchanged.
...
async def arrange_http_request(self, req: RankRequest, pending) -> list[dk.ExecutorTask]:
# Translate a single HTTP request into one or more executor tasks.
return [
dk.ExecutorTask(
task_id=dk.make_task_id('rank'),
args=['--query', req.query, '--top-k', str(req.top_k)],
metadata={'request_id': req.request_id},
)
]
async def on_http_request_complete(self, group: dk.MessageGroup) -> RankResponse:
# Build the user-facing response from completed task results.
items = [r.stdout.strip() for r in group.results]
return RankResponse(
request_id=group.request_id or '',
items=items,
score=float(group.succeeded),
)
webapp:
enabled: true
host: 0.0.0.0
port: 8090
path: /process
clients:
- name: tenant-a
token: "secret-bearer-token"
rpm: 60
That is the minimum. The handler keeps its existing arrange() /
on_task_complete() Kafka path; the webapp pipeline reuses the same
executor pool, the same self.cache, and the same recorder.
Webapp users must declare four type parameters
BaseDrakkarHandler is generic over four type parameters. Two-param
handlers (BaseDrakkarHandler[InputT, OutputT]) keep working
unchanged thanks to PEP 696 default TypeVars; the HTTP slots
default to None and the webapp hooks are never called. When
webapp.enabled=true, the framework reads HttpRequestT and
HttpResponseT off the handler subclass at startup and raises
ConfigurationError if either is None. See Handler hooks below.
Config reference¶
Top-level webapp: block. Every field has a safe default so the example
in Enabling is enough to get a working endpoint; the table
below documents every knob for production deployments.
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool |
false |
Master switch. When false, no FastAPI server runs and the handler’s HTTP hooks are never invoked. |
host |
str |
'0.0.0.0' |
Interface uvicorn binds. Use '127.0.0.1' to keep the endpoint inside the host (private deployments). |
port |
int |
8090 |
Port uvicorn binds. The webapp does not share a port with the debug UI or the metrics server. |
path |
str |
'/process' |
Single POST route the framework registers. Must start with '/' and contain at least one character after it. |
sinks_enabled |
bool |
false |
When true, calls on_message_complete(group) after the executor fan-out and routes returned CollectResult payloads through the SinkManager. When false, sinks are skipped entirely and the response carries sinks: null. |
request_timeout_seconds |
float |
30.0 |
Per-request budget enforced via asyncio.wait_for on the webapp loop. On timeout the client receives a 504 with status='timeout' and the runner’s post-execute hook is cooperatively cancelled. Must be > 0. |
max_concurrent |
int |
64 |
Per-worker semaphore capacity for in-flight HTTP requests. The 65th concurrent request returns 503 status='capacity' immediately rather than queuing. Must be > 0. |
clients |
list[WebClientConfig] |
one anonymous client with rpm=4 | Configured tenants. See the table below. At least one client is required; explicit clients: [] fails at config load. |
webapp.clients[] — WebClientConfig¶
Each entry defines one tenant with its bearer token and rpm cap.
| Field | Type | Default | Description |
|---|---|---|---|
name |
str |
required | Tenant name. Used in metric labels (drakkar_webapp_requests_total{client=...}), recorder rows, and the response body. Must be non-empty. |
token |
str |
'' |
Bearer token presented in the Authorization: Bearer <token> header. Empty token = anonymous slot (matches requests with no Authorization header). At most one client may have an empty token; non-empty tokens must be unique across clients. |
rpm |
int |
4 |
Per-client requests-per-minute cap, enforced on a 60-second sliding window. Must be > 0. |
webapp:
enabled: true
host: 0.0.0.0
port: 8090
path: /process
sinks_enabled: false
request_timeout_seconds: 30.0
max_concurrent: 64
clients:
- name: anonymous
token: ""
rpm: 4
- name: tenant-a
token: "secret-tenant-a-token"
rpm: 60
- name: tenant-b
token: "secret-tenant-b-token"
rpm: 600
Validation runs at config load time. Misconfigurations
(request_timeout_seconds <= 0, duplicate non-empty tokens, two clients
with empty tokens, clients: [], etc.) raise ValueError at startup
with a message naming the offending field.
When every configured client has an empty token, the worker logs a
webapp_unauthenticated_warning at startup so private-network
deployments that should have had a token configured surface in log
aggregation.
Handler hooks¶
BaseDrakkarHandler exposes four HTTP-specific hooks alongside the
Kafka-path hooks documented in Handler. They are invoked
only when webapp.enabled=true and only for HTTP requests; Kafka
messages never trigger them.
| Hook | When called | Frequency | Returns |
|---|---|---|---|
arrange_http_request(req, pending) |
One HTTP request passed auth, rate-limit, and body parsing; about to enter the executor pool | Once per HTTP request | list[ExecutorTask] |
on_http_request_complete(group) |
All tasks from one HTTP request reached a terminal state | Once per HTTP request | HttpResponseT (a Pydantic model) |
http_request_id(req, headers) |
After body parsing, before task fan-out | Once per HTTP request | str (validated to ASCII, no whitespace, ≤64 chars) |
http_request_label(req, request_id) |
Before structured-log lines and debug-UI rendering | Once per HTTP request | str |
Required overrides: arrange_http_request and on_http_request_complete.
The defaults raise NotImplementedError so a misconfigured deployment
fails fast at the first request rather than silently misbehaving.
http_request_id and http_request_label are optional – the framework
generates a fresh id from make_request_id('req') and labels the
request with that id when not overridden.
arrange_http_request(req, pending) -> list[ExecutorTask]¶
Mirrors arrange for the webapp path.
Receives the parsed request body (typed as HttpRequestT) and an empty
PendingContext – the synthetic per-request task group has no
sibling-message dedup work to do.
The framework auto-stamps every returned task with origin='http',
client_name=<matched client>, and request_id (whatever
http_request_id returned). Operators see those columns in the recorder
and the debug UI – no need to set them by hand.
async def arrange_http_request(self, req: RankRequest, pending) -> list[dk.ExecutorTask]:
return [
dk.ExecutorTask(
task_id=dk.make_task_id('rank'),
args=['--query', req.query, '--top-k', str(req.top_k)],
metadata={'request_id': req.request_id},
labels={'kind': 'http', 'query': req.query[:32]},
)
]
on_http_request_complete(group) -> HttpResponseT¶
Mirrors on_message_complete for the
webapp path but is required and returns the user’s
HttpResponseT model instead of an optional CollectResult. The
framework places the returned model under "result" in the JSON
response body (see Response shape).
The MessageGroup carries a synthetic SourceMessage with partition=-1
and a key set to the matched client name; group.origin == 'http',
group.client_name, and group.request_id are populated from the
request envelope.
async def on_http_request_complete(self, group: dk.MessageGroup) -> RankResponse:
if group.is_empty:
return RankResponse(request_id=group.request_id or '', items=[], score=0.0)
items = [line.strip() for r in group.results for line in r.stdout.splitlines() if line.strip()]
return RankResponse(
request_id=group.request_id or '',
items=items,
score=float(group.succeeded) / max(group.total, 1),
)
When webapp.sinks_enabled=true, the framework calls
on_message_complete(group) (the same hook used by the Kafka path)
before on_http_request_complete, routes any returned payloads
through the SinkManager, and records per-sink outcomes into
the response under "sinks". See Request / response examples.
http_request_id(req, headers) -> str¶
Override to promote an upstream tracing header into the framework
request id. The default returns a fresh framework-generated id from
make_request_id('req'); overrides typically pull X-Request-ID or
X-Correlation-ID from the headers so a single id flows from caller →
worker → downstream services.
def http_request_id(self, req: RankRequest, headers) -> str:
upstream = headers.get('x-request-id')
if upstream:
return upstream # validated against the request_id contract before use
return dk.make_request_id('req')
The framework calls validate_request_id(...) on the return value (≤64
chars, ASCII, no whitespace). A buggy override that returns a malformed
id surfaces immediately as a 500 – it never reaches downstream logs.
http_request_label(req, request_id) -> str¶
Override to embed a business-readable identifier into structured log
lines and the debug UI. The default returns the framework request_id
unchanged.
def http_request_label(self, req: RankRequest, request_id: str) -> str:
return f'{req.query[:16]}/{request_id}'
Request / response examples¶
Once the worker is running with webapp.enabled=true, send a POST to
the configured path with a JSON body matching HttpRequestT. The
examples below assume the Enabling handler.
Without sinks (sinks_enabled: false, the default)¶
curl -sS -X POST http://localhost:8090/process \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer secret-tenant-a-token' \
-d '{"request_id": "rid-1", "query": "vikings", "top_k": 5}'
import httpx
resp = httpx.post(
'http://localhost:8090/process',
headers={'Authorization': 'Bearer secret-tenant-a-token'},
json={'request_id': 'rid-1', 'query': 'vikings', 'top_k': 5},
)
resp.raise_for_status()
print(resp.json())
Successful response body (HTTP 200):
{
"request_id": "rid-1",
"client": "tenant-a",
"started_at": "2026-05-06T09:01:00.000000+00:00",
"finished_at": "2026-05-06T09:01:00.084000+00:00",
"duration_ms": 84.0,
"status": "ok",
"result": {
"request_id": "rid-1",
"items": ["match-1", "match-2"],
"score": 1.0
},
"tasks": [
{"task_id": "rank-...", "exit_code": 0, "duration_ms": 41.5, "retries": 0}
],
"task_summary": {"total": 1, "success": 1, "failed": 0},
"cache": {"hits": 0, "misses": 0},
"sinks": null,
"timeline": [
{"stage": "arrange", "duration_ms": 0.6},
{"stage": "execute", "duration_ms": 41.7},
{"stage": "on_http_request_complete", "duration_ms": 0.4}
],
"error": null
}
| Field | Meaning |
|---|---|
request_id |
The id resolved by http_request_id(...). Same value stamped onto every executor task and recorder row for the request. |
client |
The matched WebClientConfig.name from auth. Used as a metric label and recorder column. |
started_at / finished_at |
UTC timestamps marking the request’s wall-clock entry/exit at the route handler. |
duration_ms |
Server-side wall clock between started_at and finished_at. Prefer this over external timers when comparing requests across clients. |
status |
One of ok | error | timeout | rate_limited | auth_failed | shutdown | not_ready. Mirrors the drakkar_webapp_requests_total label. |
result |
The Pydantic model returned by on_http_request_complete(group), dumped to JSON. null on error paths. |
tasks |
One compact entry per successful executor task (task_id, exit_code, duration_ms, retries). Subprocess stdout / stderr are deliberately excluded from the response body – read those via the recorder or the debug UI. |
task_summary |
Aggregate total / success / failed counts for the per-request fan-out. |
cache |
Per-request cache hit/miss counts. Reserved for future use; the runner currently emits zeros. |
sinks |
null when sinks_enabled=false. With sinks enabled, an aggregated per-sink-type delivery summary – see below. |
timeline |
One {stage, duration_ms} entry per pipeline phase (arrange, execute, optional sinks, on_http_request_complete). Lets operators see where time was spent without correlating timestamps across log lines. |
error |
Human-readable error message on the error/timeout paths; null on the success path. |
With sinks (sinks_enabled: true)¶
The same request as above with webapp.sinks_enabled: true in config
and an on_message_complete hook returning a CollectResult:
async def on_message_complete(self, group: dk.MessageGroup) -> dk.CollectResult | None:
return dk.CollectResult(
kafka=[dk.KafkaPayload(data=group.results[0], key=group.request_id.encode())],
)
Successful response body (HTTP 200) – the sinks field is now populated:
{
"request_id": "rid-1",
"client": "tenant-a",
"status": "ok",
"result": {"request_id": "rid-1", "items": ["match-1"], "score": 1.0},
"tasks": [{"task_id": "rank-...", "exit_code": 0, "duration_ms": 41.5, "retries": 0}],
"task_summary": {"total": 1, "success": 1, "failed": 0},
"cache": {"hits": 0, "misses": 0},
"sinks": {
"by_type": {
"kafka": {"attempted": 1, "delivered": 1, "dlq": 0, "errors": []}
}
},
"timeline": [
{"stage": "arrange", "duration_ms": 0.6},
{"stage": "execute", "duration_ms": 41.7},
{"stage": "sinks", "duration_ms": 5.4},
{"stage": "on_http_request_complete", "duration_ms": 0.4}
]
}
The sinks.by_type map is keyed by sink type (kafka, postgres,
mongo, …) and emits one entry per type that the request actually
touched. Each entry carries attempted, delivered, dlq, and an
errors list for delivery failures routed through
on_delivery_error. Sinks that the
request did not touch are omitted – the response mirrors what was
attempted, not the full configured sink set.
Status codes¶
Every error response shares a flat envelope shape: status (string),
error (human-readable), request_id (server-side id where one was
allocated), and a per-status set of extra fields. The 429 / 503 bodies
include a hint field telling the client to switch to the Kafka source
topic for higher throughput and worker-restart resilience.
| HTTP | status |
When | Sample body |
|---|---|---|---|
| 200 | ok |
Successful end-to-end execution | See Request / response examples. |
| 401 | auth_failed |
Authorization header is missing-with-no-anonymous-slot, malformed, or names a non-configured token |
{"error": "unauthorized"} |
| 422 | n/a (legacy error: invalid_request) |
Body is missing or fails Pydantic validation against HttpRequestT |
{"error": "invalid_request", "request_id": "req-...", "details": [{"loc": ["query"], "msg": "Field required", "type": "missing"}]} |
| 429 | rate_limited |
Per-client rpm window is full | {"status": "rate_limited", "error": "client 'tenant-a' exceeded rpm cap (60); try again later", "request_id": "req-...", "rpm_limit": 60, "hint": "route this workload through the Kafka source topic for higher throughput and worker-restart resilience"} |
| 503 | capacity |
max_concurrent semaphore is full at request time |
{"status": "capacity", "error": "webapp is over capacity; request rejected", "request_id": "req-...", "max_concurrent": 64, "hint": "route this workload through the Kafka source topic for higher throughput and worker-restart resilience"} |
| 503 | shutdown |
The worker is in the drain phase and refusing new requests | {"status": "shutdown", "error": "webapp is shutting down; request rejected", "request_id": "req-...", "hint": "route this workload through the Kafka source topic for higher throughput and worker-restart resilience"} |
| 503 | not_ready |
The webapp accepted the connection but the main pipeline has not yet completed startup | {"status": "not_ready", "error": "webapp is starting; main pipeline is not yet ready", "request_id": "req-...", "hint": "route this workload through the Kafka source topic for higher throughput and worker-restart resilience"} |
| 504 | timeout |
request_timeout_seconds elapsed while the runner was processing |
{"status": "timeout", "error": "request exceeded 30.0 seconds", "request_id": "req-...", "duration_ms": 30001.4, "hint": "route this workload through the Kafka source topic for higher throughput and worker-restart resilience"} |
| 500 | error |
A user hook (arrange_http_request, on_http_request_complete, http_request_id, etc.) raised an unhandled exception |
{"status": "error", "request_id": "req-...", "error": "internal error"} |
The 422 response intentionally uses error: invalid_request rather than
a status field – it sits at the body-parse boundary, before the
runner allocates the framework status taxonomy. The other rejection
paths mirror drakkar_webapp_requests_total{status=...} exactly so
operators can cross-reference response bodies with Prometheus
dashboards.
Tracebacks from user-hook exceptions are NEVER included in the response
body. The full traceback lands in the structured log
(webapp_request_handler_error / webapp_arrange_http_request_failed
/ webapp_on_http_request_complete_failed) so operators can debug
without the framework leaking internals to clients.
Graceful shutdown semantics¶
The webapp participates in the worker shutdown sequence to guarantee that callers see a clean rejection rather than a connection reset mid-flight.
- Drain signal.
AppLifecycle._shutdownflips the webapp’sshutdown_eventbefore the drain phase begins. The route handler checksshutdown_event.is_set()at the top of every request and returns a 503status='shutdown'immediately for any new request that lands during drain. This is the 1-second guarantee for new-request rejection: from the moment shutdown begins, every subsequent request gets a 503 within one event-loop tick. - In-flight requests complete. Already-dispatched requests
continue running on the main pipeline loop. They are bounded by
request_timeout_seconds– a slow handler cannot hold shutdown open indefinitely. - Uvicorn stop. Once the drain completes, the framework calls
WebApp.stop(drain_timeout)which signals uvicorn to exit and joins the daemon thread withindrain_timeout. A stuck request handler logswebapp_stop_thread_join_timeoutand the worker proceeds with teardown – the daemon thread is collected when the process exits.
The not_ready 503 covers the other boundary: between the webapp
thread coming up and the main pipeline completing its first poll. New
requests during that startup window also receive 503 with the
Kafka-fallback hint.
Load balancer caveat¶
Webapp state is per-worker, not per-cluster:
- The rpm sliding window (
WebClientConfig.rpm) is enforced independently on every worker that exposes the webapp. A 60 rpm cap on three workers tolerates 180 rpm cluster-wide if requests load-balance evenly across them. - The
max_concurrentsemaphore is per-worker. A 64-slot cap on three workers admits up to 192 concurrent requests cluster-wide. - The
drakkar_webapp_*metrics are per-worker; aggregate via Prometheussum by (...)for cluster-wide views.
Choose your load-balancer policy with this in mind:
| Scenario | Recommended LB policy |
|---|---|
| Per-tenant rate limits should match the configured rpm regardless of which worker handled the request | Sticky routing by Authorization token (or X-Tenant-ID). The same client always lands on the same worker; per-worker rpm equals cluster rpm. |
| Workload is uniform and you want even utilisation | Stateless distribution (round-robin, random, least-conn). Effective cluster rpm is rpm * worker_count; size the configured rpm down accordingly. |
Workers run different config (e.g. only worker-1 has webapp.enabled=true) |
Pin webapp traffic to the enabled workers; the unconfigured workers do not bind the webapp port. The integration scenario uses this layout deliberately to exercise mixed-config deployments. |
The framework does not coordinate rate-limit state across workers. If you need true cluster-wide rate limits, terminate them at the load-balancer or an upstream API gateway.
Cross-thread caveat for advanced users¶
The webapp lives on a separate event loop from the main pipeline:
- T1 (main loop) – runs the executor pool, the recorder, sinks,
Kafka consumer, and
@periodictasks. - T2 (webapp loop) – runs uvicorn, the FastAPI route handler, the
per-request semaphore, and
asyncio.wait_forfor the request timeout.
The framework hops requests between the two loops via
drakkar.concurrency.dispatch_to_loop. User code never needs to think
about threading on the request path – arrange_http_request and
on_http_request_complete always run on T1.
The exception is a @periodic task that needs to reach the webapp
loop. This is rare; the typical case is a periodic worker action
(refresh a lookup table, write a heartbeat) that should also broadcast
state into the webapp’s per-request structures. When that comes up,
dispatch_to_loop works in either direction – pass the webapp loop as
the target:
import drakkar as dk
from drakkar.concurrency import dispatch_to_loop
class MyHandler(dk.BaseDrakkarHandler[KIn, KOut, RankRequest, RankResponse]):
@dk.periodic(seconds=30)
async def refresh_webapp_state(self):
# ``self._app`` and the webapp loop are framework-private; this
# pattern is intentionally low-level. Most handlers will never
# need it.
webapp = self._app._webapp # type: ignore[attr-defined]
if webapp is None or webapp._loop is None:
return # webapp not enabled or not yet ready
await dispatch_to_loop(self._refresh_on_t2(), target_loop=webapp._loop)
async def _refresh_on_t2(self) -> None:
# Runs on T2. Touch only objects bound to the webapp loop.
...
The far more common case – handler hooks reading and writing
self.cache, calling await self._app.something() – runs on T1
naturally; dispatch_to_loop is not needed.
Priority-scheduling pattern¶
HTTP requests typically have tighter latency requirements than Kafka
messages (a synchronous client is waiting for the response). The
task_priority hook overrides the executor
pool’s priority key so HTTP-origin tasks dequeue ahead of Kafka-origin
tasks:
class WebappFirstHandler(dk.BaseDrakkarHandler[KIn, KOut, RankRequest, RankResponse]):
def task_priority(self, task):
# Smaller key dequeues first. ``origin == 'http'`` tasks get a
# priority class of 0; Kafka tasks fall back to the default
# offset-based key with a leading 1.
if task.origin == 'http':
return (0, task.request_id or '')
return (1, min(task.source_offsets) if task.source_offsets else 0)
The default priority key is min(task.source_offsets) – for
Kafka-origin tasks that drains older messages first. HTTP-origin tasks
have an empty source_offsets list, so without an override they
tiebreak FIFO via the gate’s internal sequence counter. The override
above gives them an explicit priority class.
task_priority is also called for retries (with the same task object),
and the framework auto-stamps origin / client_name / request_id
before submission, so the override sees the same values on the first
attempt and on every retry.
Kafka-fallback pattern¶
The webapp is the right tool for synchronous, low-throughput,
per-request workloads. For high-throughput or restart-resilient
workloads, route through the Kafka source topic instead – that is what
the hint field on every 429 / 503 / 504 response steers clients
toward.
A typical client wrapper:
import httpx
async def submit_with_fallback(client: httpx.AsyncClient, payload: dict) -> dict:
"""Try the webapp first; on capacity / shutdown / timeout, fall back to Kafka."""
try:
resp = await client.post('http://worker-1:8090/process', json=payload, timeout=5.0)
except httpx.TimeoutException:
await produce_to_kafka('input-events', payload)
return {'status': 'enqueued', 'channel': 'kafka'}
if resp.status_code in (429, 503, 504):
await produce_to_kafka('input-events', payload)
return {'status': 'enqueued', 'channel': 'kafka'}
resp.raise_for_status()
return resp.json()
The framework’s contract: a successful 200 means the request was
processed end-to-end on this worker; a 429/503/504 means the caller
should retry via Kafka rather than hammering the webapp. Clients that
ignore the hint and tight-loop the webapp will see sustained
drakkar_webapp_requests_total{status='rate_limited'} /
{status='capacity'} – correlate those metrics with the relevant
client label and surface the load-balancing problem at the deployer.
Integration scenario¶
The repo ships with a runnable end-to-end scenario under integration/
that wires the webapp pipeline into the same docker-compose stack used
for Kafka demos. It demonstrates:
- Per-worker enablement – only
worker-1flipswebapp.enabled=true(via theDK_WEBAPP__ENABLED=trueenv var); the other two workers leave the shared yaml default offalseso they never bind port 8090. This matches the third row of the Load balancer caveat table. - Anonymous client at rpm=4 – the default
load_generatorservice pollsPOST /processevery 10 seconds with noAuthorizationheader. Steady state is 200 OK; lowerINTERVAL_SECONDSto ~2 seconds and watch 429s appear in the generator’s stdout. - Tenant-A client at rpm=60 – the optional
load_generator_tenant_aservice (gated behind a compose profile) presents a bearer token and runs at a tighter interval to exercise the higher rpm tier. - Priority scheduling – the integration handler’s
task_priority(task)override returns(0, 0)fororigin='http'tasks and(1, min_offset)for Kafka tasks, so HTTP-origin tasks jump the executor gate while a slow Kafka task is in flight.
Running it¶
cd integration
docker compose up --build # default: anonymous load_generator only
# or, to also drive the tenant-A path:
docker compose --profile tenant up --build
What to expect within ~30 seconds of the stack being healthy:
worker-1logswebapp_request_receivedandwebapp_request_completedevents.- The
load_generatorcontainer prints one line per iteration with the status code and a body preview ("result","sinks","errors","hint"). http://localhost:8081/livehighlights HTTP-origin tasks with the origin column populated; the same tasks dequeue ahead of pending Kafka tasks at the gate.drakkar_webapp_requests_total{worker="worker-1",status="ok"}climbs in Prometheus (http://localhost:9099).
File map¶
| Path | Purpose |
|---|---|
integration/load_generator/loop.py |
Standalone Python loop that POSTs RankRequest payloads on a configurable interval. |
integration/load_generator/Dockerfile |
Thin python:3.13-slim image; the only dependency is a pinned httpx. |
integration/load_generator/requirements.txt |
httpx==0.27.2 (pinned). |
integration/worker/handler.py |
4-param BaseDrakkarHandler; defines arrange_http_request, on_http_request_complete, and the task_priority override. |
integration/worker/models.py |
Adds RankRequest / RankResponse. |
integration/worker/drakkar.yaml |
webapp: block with two clients; enabled=false by default and flipped per-worker via env. |
integration/docker-compose.yml |
Defines load_generator and load_generator_tenant_a; exposes port 8090 and a /healthz healthcheck on worker-1. |
Related pages¶
- Handler – the full hook surface, including the four HTTP-specific hooks
- Configuration – the deep config
table for the
webapp:block - Observability – Prometheus metrics, the six new recorder event types, and the recorder upgrade story
- Sinks – the same
CollectResult/ sink routing the webapp pipeline reuses whensinks_enabled=true