Skip to content

Integration Tests

The integration/ directory in the repository contains a full Docker Compose environment with all six sink types, multiple worker clusters, realistic load patterns, and a chaos test scenario. It is intended for development and testing only – not as a production deployment reference. Credentials are hardcoded, services run without authentication, and all data is ephemeral.


Quick Start

cd integration
docker-compose up --build -d

This starts 14 services. First run pulls images and builds workers (~2 minutes). Subsequent runs start in seconds.

To stop everything:

docker-compose down -v

Services & Ports

Infrastructure

Service Port Description
Kafka 9092 Single-node KRaft broker (no Zookeeper)
PostgreSQL 5432 drakkar database with search_results table
MongoDB 27017 Document archive
Redis 6379 Cached search summaries (1h TTL)
Webhook 9000 HTTP echo server for webhook sink
Prometheus localhost:9099 Scrapes all worker metrics

Web UIs

URL Service
localhost:8088 Kafka UI – topics, messages, consumer groups
localhost:8089 MongoDB Express – collections, documents
localhost:8087 Redis Commander – keys, TTLs

3 workers, consumer group drakkar-integration, 50-partition topic.

Worker Debug UI Metrics Webapp Config
worker-1 localhost:8081 localhost:9090 localhost:8091 4 executors, all 6 sinks, webapp enabled
worker-2 localhost:8082 localhost:9091 4 executors, all 6 sinks
worker-3 localhost:8083 localhost:9093 4 executors, all 6 sinks

Webapp port follows the integration convention debug-UI port + 10 (worker-1 debug 8081 → webapp 8091). The framework default outside the integration cluster is 8090; the integration drakkar.yaml overrides it to keep the port plan consistent across services.

Only worker-1 has DK_WEBAPP__ENABLED=true; worker-2 and worker-3 are Kafka-only. This is intentional – it demonstrates the load-balancer caveat for the synchronous HTTP endpoint: requests land where they’re sent, no framework-level redistribution.

Fast Cluster (symbol counting)

2 workers, consumer group drakkar-fast, same source topic.

Worker Debug UI Metrics Config
fast-worker-1 localhost:8084 localhost:9094 2 executors, Kafka-only sink
fast-worker-2 localhost:8085 localhost:9095 2 executors, Kafka-only sink

What It Does

Message Producer

The producer container sends 10,000 messages to search-requests in phases that exercise different load patterns:

Phase Messages Rate Purpose
0. Slow drip 10 0.2/sec Worker startup, first messages
1. Warm-up 500 15/sec Steady ramp
2. Pause 15s silence Consumer idle behavior
3. Burst 200 max speed Burst handling, backpressure
4. Steady 2,000 20/sec Normal throughput
5. Pause 10s silence Second idle period
6. Burst 300 max speed Second burst
7. Cool-down remaining 10/sec Drain
8. Flood 5,000 max speed Massive consumer lag

3% of messages are “slow outliers” with repeat: 150-250 (the ripgrep binary runs the search that many times), creating tasks that take minutes instead of seconds. This exercises timeout handling and mixed-duration workloads.

Main Cluster Processing

Each search message flows through:

  1. arrange() – creates a ripgrep task with CLI args
  2. Executor runs /usr/local/bin/run-rg (ripgrep wrapper)
  3. on_task_complete() routes results based on match count: - Always: Kafka (full result), Postgres (metrics row), MongoDB (archive), Redis (cached summary) - match_count > 20: HTTP webhook notification - match_count > 50: JSONL file log
  4. 5% simulated executor failures with retry via on_error()
  5. Failed deliveries route to DLQ or retry based on sink type

Fast Cluster Processing

Same source topic, different consumer group. Each message:

  1. arrange() – creates a symbol-count task
  2. Executor runs count-symbols.sh (character counter)
  3. on_task_complete() – sends count to symbol-counts Kafka topic

Fast tasks finish in milliseconds, demonstrating duration threshold filtering and high-throughput behavior.

Webapp Pipeline (worker-1 only)

Worker-1 also runs the synchronous HTTP webapp. The endpoint accepts a RankRequest body and returns a framework-built WebReport JSON containing the user’s RankResponse plus per-task and sink stats. See integration/worker/handler.py for the arrange_http_request / on_http_request_complete implementation and the task_priority override that pushes HTTP-origin tasks ahead of Kafka tasks at the executor gate.

Two clients are configured (in integration/worker/drakkar.yaml):

Client Token RPM cap
anonymous (empty) 4
tenant-A integration-tenant-a-token-do-not-use-in-prod 60

A load_generator container drives a request every 10 seconds using the anonymous client (so you can watch the rate-limit kick in at the 4-rpm cap). A second load_generator_tenant_a service is gated behind the tenant Compose profile and uses the higher-rpm tenant token.


Testing the Webapp

Hands-on with curl

While the stack is running, send a request as tenant-A (the 60-rpm cap leaves plenty of headroom for manual testing; the anonymous client is typically saturated by the load_generator):

curl -sS -X POST http://localhost:8091/process \
  -H 'Content-Type: application/json' \
  -H 'Authorization: Bearer integration-tenant-a-token-do-not-use-in-prod' \
  -d '{"request_id": "manual-1", "score": 42}' | jq .

You should see a 200 response with the framework-built WebReport envelope (request_id, client, started_at, duration_ms, status, result, tasks, task_summary, cache, sinks, timeline).

To provoke a 429 deliberately on tenant-A: hit the endpoint at

1 req/sec for ~60 seconds. The 60-rpm window fills and the 61st request gets 429 with a Retry-After header and the hint field documented in webapp.md.

Watching the load_generator

# Default profile -- anonymous traffic only
docker-compose logs -f load_generator

# With the tenant-A profile
docker-compose --profile tenant up -d load_generator_tenant_a
docker-compose logs -f load_generator_tenant_a

Each iteration logs the request body and the response status. You’ll see 200 responses up to the rpm cap, then 429s once the sliding window fills.

What to check in the debug UI

Open worker-1’s debug UI:

  • Dashboard / – a “WebApp” tile appears alongside the partition tiles, showing in-flight requests, per-client rpm caps, and 60s success/error counts.
  • Live /live – HTTP-origin tasks render with a distinct purple highlight and the identifier <client>:<request_id[:8]> instead of the Kafka p<partition>:o<offset> form.
  • History /history – the Origin filter (top right) lets you view kafka-only / http-only / all events. Webapp-specific event types (webapp_request_received, webapp_request_completed, webapp_request_timeout, webapp_request_rate_limited, webapp_request_auth_failed, webapp_request_dropped_after_timeout) are visible here.
  • Task detail /task/{id} – clicking an HTTP-origin task shows Client / Request ID (instead of Partition / Offset), the truncated request body, and the final response.

What to check in metrics

The webapp metrics live at localhost:9090/metrics alongside the existing executor and sink counters. Filter for drakkar_webapp_:

Metric Type Labels
drakkar_webapp_requests_total Counter {client, status}
drakkar_webapp_request_duration_seconds Histogram {client, status}
drakkar_webapp_inflight Gauge
drakkar_webapp_dropped_after_timeout_total Counter {client}
drakkar_webapp_rpm_limit Gauge {client} (informational)

A useful Prometheus query while the load_generator runs:

sum by (client, status) (rate(drakkar_webapp_requests_total[1m]))

Demonstrating priority scheduling

The integration handler’s task_priority override returns a priority key of (0, ...) for task.origin == 'http' tasks and (1, source_offset) for Kafka tasks. Under sustained Kafka load, a manual curl request lands in the executor gate ahead of the queued Kafka tasks. The effect is visible in /live – HTTP tasks dequeue near-instantly even when the Kafka partition queues are full.

To exercise this end-to-end:

  1. Start the stack and the producer (default).
  2. Wait until pool utilization on worker-1 is near 100% from Kafka traffic.
  3. Hit /process with curl (or just leave the load_generator running).
  4. Watch /live on worker-1: HTTP-origin tasks (purple) are scheduled immediately; Kafka tasks remain queued behind them.

Toggling sinks_enabled

The webapp’s sinks-enabled mode is OFF by default in the integration config. To exercise it:

# In integration/docker-compose.yml -> worker-1 -> environment
DK_WEBAPP__SINKS_ENABLED: "true"

Restart worker-1. Now each HTTP request also flows through the handler’s on_message_complete and writes to Kafka / Postgres / MongoDB / Redis like a Kafka-source message. The framework adds a sinks: { ... } summary to the response body documenting per-sink attempted/delivered/dlq counts.


Debug UI Pages

Open any worker’s debug UI to explore:

Dashboard (/)

Partition tiles with queue depth and lag, executor pool utilization bar, event counters, Prometheus graph links (click any metric card). Custom links configured via custom_links in config.

Partitions (/partitions)

Per-partition breakdown: queue size, pending offsets, committed offset, consumer lag. Click a partition for its event history.

Sinks (/sinks)

Per-sink delivery stats: delivered count, error count, retry count, last delivery time and duration.

Live Pipeline (/live)

Three tabs, all WebSocket-powered:

  • Arrange – recent arrange() calls with message labels, task counts, durations
  • Executors – scrollable timeline with task bars (green=completed, yellow=running, red=failed). Hover for task detail. Zoom in/out with ± buttons.
  • Collect – recent on_task_complete() completions with output counts

The pool utilization bar and running/finished task tables update in real time. Task labels (request_id, pattern) appear in the Labels column and hover detail.

History (/history)

Filterable event browser. Toggle event types (consumed, arranged, task_started, task_completed, task_failed, etc.) and filter by partition. Paginated.

Debug (/debug)

Three collapsible sections:

  • Metrics – all Prometheus metrics (framework + user), filterable by source
  • Message Trace – enter partition:offset to trace a message through the full pipeline, including cross-worker search
  • Databases – SQLite flight recorder files, sortable/filterable, downloadable. Select multiple and merge into one file.

Task Detail (/task/{id})

Full task lifecycle: status, labels, partition, duration, PID, CLI command, source offsets (linked to trace), stdout/stderr output, event timeline.


Kafka Topics

Topic Partitions Producers Consumers
search-requests 50 producer main cluster, fast cluster
search-results 50 main cluster
search-requests_dlq 50 main cluster (DLQ)
symbol-counts 50 fast cluster

Browse topics in the Kafka UI.


Worker Autodiscovery

All workers write to /shared (mounted as a volume). The debug UI scans this directory for *-live.db symlinks and discovers peer workers via worker autodiscovery. Use the worker dropdown in the top-right nav to switch between workers without remembering ports.


Chaos Test

The chaos-test.sh script simulates cascading worker failures and recoveries:

cd integration
docker-compose up --build -d
./chaos-test.sh

Scenario: Rolling Outage

Time Event Live Workers
t1 ~3000/10000 tasks processed w1, w2, w3
t1 + 30s worker-2 stops w1, w3
t1 + 150s worker-3 stops w1 only
t1 + 390s worker-2 starts, worker-1 stops w2 only
t1 + 450s worker-1 starts w1, w2
t1 + 510s worker-3 starts w1, w2, w3

Total runtime: ~10 minutes. The script polls each worker’s /api/dashboard every 5 seconds and prints per-worker completed task counts and partition counts, so you can watch rebalancing happen.

What to observe:

  • Partition reassignment in /history (filter by assigned/revoked events)
  • Pool utilization spike on surviving worker in /live
  • Consumer lag changes in /partitions
  • No message loss: total processed across all workers should equal total produced

Tuning for Experiments

Adjust message volume

Override TOTAL_MESSAGES on the producer:

# docker-compose.yml → producer → environment
TOTAL_MESSAGES: "20000"

Change executor concurrency

Override per-worker via environment:

# docker-compose.yml → worker-1 → environment
DK_EXECUTOR__MAX_EXECUTORS: "8"

Test with fewer partitions

Edit integration/infra/init-kafka.sh:

--partitions 10   # instead of 50

Simulate slow tasks

Increase the slow-outlier percentage in integration/infra/producer.py:

# line 86: change 0.03 to 0.15 for 15% slow tasks
if random.random() < 0.15:

Change rebalance speed

The main cluster uses session_timeout_ms: 10000 (10s) for fast rebalance detection. Increase for production-like behavior:

# integration/worker/drakkar.yaml
session_timeout_ms: 45000

Disable debug thresholds

To see ALL tasks in the debug UI (including fast ones):

# integration/worker/drakkar.yaml or fast-worker/drakkar.yaml
debug:
  ws_min_duration_ms: 0
  event_min_duration_ms: 0
  output_min_duration_ms: 0
  log_min_duration_ms: 0

Add auth to debug UI

Protect database downloads with a token:

debug:
  auth_token: "my-secret-token"

Then access protected endpoints with ?token=my-secret-token or Authorization: Bearer my-secret-token header.