audit-engine — Operational-Data Audit Engine
Living specification. Source of truth for the design of issue #39 and its follow-ups. Update as decisions evolve. The version posted as the design comment on #39 is the v1.0 snapshot.
TL;DR
A new Python service called audit-engine runs as a Docker container on vps-h1 (Hostinger) alongside n8n, claude-proxy, and WAHA. It uses APScheduler in-process for cron-style triggers and a Supabase-hosted control plane (audit.* schema) for project/action/workbook/run state. Workbook design and report rendering are driven by Claude via the existing claude-proxy:9999 (zero marginal cost, Claude Max OAuth). Connectors are first-party Python plugins (Supabase, HTTP, Wasabi, GitHub, GoogleDrive) loaded by entry-point discovery — not n8n nodes. Reports render to HTML via Jinja2 → PDF via the existing pdf-service:8100 on vps-i1. Delivery: SMTP (Mailgun) and Google Drive via per-project service-account JSON pulled from Supabase Vault at runtime. Failures → Discord webhook + Prometheus alert.
1. Runtime placement
Decision: vps-h1 (Hostinger), new Docker service audit-engine in hostinger/docker-compose.yml.
| Option | Pros | Cons | Verdict |
|---|---|---|---|
| vps-h1 container (chosen) | Co-located with claude-proxy → loopback HTTP (zero latency, zero auth complexity); n8n peer for webhook handoffs; same VPS already runs Python via WAHA hooks | 2 vCPU / 8 GB shared with n8n+WAHA — must enforce concurrency caps | Best fit |
| vps-i1 container | Co-located with pdf-service, prometheus | Far from claude-proxy → needs network egress + auth round-trip; vps-i1 already heavier | Rejected |
| Lambda / Vercel cron | Stateless, no infra | No subscription claude-proxy access; cold starts | Rejected |
| n8n cron workflows | Zero new code | Workbook design (LLM tool-use loop) awkward in n8n; per-action state hard to model | Rejected |
Resource budget on vps-h1:
- audit-engine container: cap 512 MB / 0.5 CPU
- Concurrency: max 2 concurrent action runs (
asyncio.Semaphore(2)) - pdf-service reached via
https://pdf.vps-i1.infra.zintegrowana.online(new Caddy entry) with API key
2. Data model — Supabase
Decision: Supabase PostgreSQL (mwkqmgadqnkkihjdeqsi), new schema audit.
Config in Supabase, but every project and action row carries a source_ref column so the operator can declare actions in YAML in the repo and have a sync job upsert them (audits/<project>/<action>.yml).
SQL DDL
CREATE SCHEMA IF NOT EXISTS audit;
CREATE TABLE audit.project (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
slug text NOT NULL UNIQUE,
display_name text NOT NULL,
github_repo text NOT NULL,
gdrive_root_id text,
gdrive_sa_secret_ref text,
default_recipients text[] DEFAULT '{}',
monthly_token_cap bigint DEFAULT 2000000,
monthly_token_used bigint DEFAULT 0,
monthly_window_started_at timestamptz DEFAULT now(),
created_at timestamptz DEFAULT now(),
updated_at timestamptz DEFAULT now()
);
CREATE TYPE audit.action_status AS ENUM ('new','optimizing','optimized','error','disabled');
CREATE TABLE audit.action (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
project_id uuid NOT NULL REFERENCES audit.project(id) ON DELETE CASCADE,
slug text NOT NULL,
name text NOT NULL,
description text NOT NULL,
schedule_cron text NOT NULL,
schedule_tz text NOT NULL DEFAULT 'Europe/Warsaw',
output_format text NOT NULL CHECK (output_format IN ('pdf','markdown','html')),
delivery jsonb NOT NULL DEFAULT '{}'::jsonb,
status audit.action_status NOT NULL DEFAULT 'new',
optimized_at timestamptz,
last_run_at timestamptz,
last_success_at timestamptz,
last_error text,
per_run_token_cap int DEFAULT 100000,
source_ref text,
created_at timestamptz DEFAULT now(),
updated_at timestamptz DEFAULT now(),
UNIQUE (project_id, slug)
);
CREATE TABLE audit.workbook (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
action_id uuid NOT NULL REFERENCES audit.action(id) ON DELETE CASCADE,
version int NOT NULL,
spec jsonb NOT NULL,
designed_by_model text,
design_prompt text,
design_token_usage int,
active boolean NOT NULL DEFAULT false,
created_at timestamptz DEFAULT now(),
UNIQUE (action_id, version)
);
CREATE INDEX ON audit.workbook (action_id) WHERE active;
CREATE TABLE audit.run (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
action_id uuid NOT NULL REFERENCES audit.action(id) ON DELETE CASCADE,
workbook_id uuid REFERENCES audit.workbook(id),
started_at timestamptz NOT NULL DEFAULT now(),
finished_at timestamptz,
status text NOT NULL CHECK (status IN ('pending','running','success','failed','partial')),
trigger text NOT NULL CHECK (trigger IN ('schedule','manual','retry','reoptimize')),
token_usage int DEFAULT 0,
artifact_uri text,
delivery_log jsonb DEFAULT '[]'::jsonb,
error text,
log jsonb DEFAULT '[]'::jsonb
);
CREATE INDEX ON audit.run (action_id, started_at DESC);
CREATE INDEX ON audit.run (status, started_at DESC) WHERE status IN ('failed','partial');3. Schedule representation
Single schedule_cron text column (5-field cron) + schedule_tz IANA string.
Cron expresses every requested case in one well-known DSL. APScheduler’s CronTrigger.from_crontab(expr, timezone=tz) is one-line ingestion. croniter validates on write.
scheduler.add_job(
run_action,
CronTrigger.from_crontab(a.schedule_cron, timezone=a.schedule_tz),
args=[a.id],
id=f"action:{a.id}",
replace_existing=True,
misfire_grace_time=600,
coalesce=True,
max_instances=1,
)4. AI integration
Primary: claude-proxy:9999 (Claude Max OAuth subscription). Fallback: ANTHROPIC_API_KEY if proxy 5xx ×3.
Since claude-proxy returns plain text from claude -p, tool-use is implemented as a structured prompt protocol:
- Engine sends prompt: “You are designing an audit workbook. Available connectors: [list]. Reply with JSON matching this schema: …”
- Claude returns JSON workbook spec.
- Engine validates with Pydantic; if invalid, sends back error + asks for fix (max 3 retries).
Thin abstraction audit_engine.ai.ClaudeClient exposes .design_workbook(spec_prompt) and .render_summary(data) hiding the proxy/API toggle.
Envelope (per claude-proxy README):
POST http://host.docker.internal:9999/v1/messages
X-Proxy-Secret: $CLAUDE_PROXY_SECRET
{"messages":[{"role":"user","content":"..."}]}5. Connector framework
First-party Python plugins via registry decorator. Not n8n nodes.
class Connector(Protocol):
name: str # "supabase.query"
description: str # for LLM context
params_schema: dict # JSON Schema
async def execute(self, params: dict, project: Project) -> ConnectorResult: ...
CONNECTORS: dict[str, type[Connector]] = {}
def register(cls): CONNECTORS[cls.name] = cls; return clsv1 connectors:
| Connector | Purpose | Params |
|---|---|---|
supabase.query | Read-only SQL | sql, params, role |
supabase.rpc | Call RPC functions | function, args |
http.get / http.post | Generic HTTP fetch | url, headers, body, timeout |
github.issues_search | GH issue search | q, repo |
github.repo_stats | Repo metrics | repo |
wasabi.list / wasabi.get | S3 access | bucket, prefix / key |
gdrive.list / gdrive.read | Drive content | folder_id, mime |
prometheus.query | PromQL via Thanos | expr, start, end, step |
n8n.run_workflow | Escape hatch | webhook_url, body |
6. Workbook representation
JSON validated by Pydantic, versioned in audit.workbook.spec.
{
"version": "1.0",
"designed_for": "Weekly incidents summary for et-operational-platform",
"steps": [
{
"id": "fetch_incidents",
"connector": "supabase.query",
"params": {
"sql": "SELECT id, type, severity, created_at, status FROM incidents WHERE created_at >= now() - interval '7 days' ORDER BY created_at DESC"
},
"output_var": "incidents"
},
{
"id": "summarize",
"type": "ai_summary",
"model": "claude-default",
"prompt": "Summarize {{ incidents | tojson }} into markdown.",
"output_var": "summary_md"
},
{
"id": "render",
"type": "template",
"template": "report_default.html.j2",
"context": { "title": "Weekly — {{ run_date }}", "body_md": "{{ summary_md }}" },
"output_var": "html"
},
{ "id": "to_pdf", "type": "render_pdf", "input_var": "html", "output_var": "pdf_bytes" }
],
"deliveries": [
{ "type": "email", "to": ["radieu@gmail.com"], "subject": "Weekly — {{ run_date }}" },
{ "type": "gdrive", "folder_id": "${project.gdrive_root_id}", "filename": "weekly-{{ run_date }}.pdf" }
],
"cost_estimate": { "tokens_per_run": 8000 }
}Step types: connector, ai_summary, template (Jinja2 SandboxedEnvironment), render_pdf, transform_python (whitelisted expressions only — no arbitrary code).
7. Secrets
| Secret | Scope | Storage |
|---|---|---|
GOOGLE_SA_<project_slug> | per-project | Supabase Vault, project.gdrive_sa_secret_ref |
SUPABASE_URL/_SERVICE_KEY | engine-level | hostinger .env |
CLAUDE_PROXY_SECRET | engine-level | hostinger .env |
PDF_SERVICE_API_KEY | engine-level | hostinger .env |
WASABI_* | per-project | Vault |
SMTP_* | engine-level | hostinger .env |
ANTHROPIC_API_KEY (fallback) | engine-level | hostinger .env optional |
DISCORD_WEBHOOK_URL | engine-level | hostinger .env |
Runtime injection: audit_engine.secrets.SecretResolver reads Vault on first use per process, caches for process lifetime.
8. Flow diagrams
See the design comment on issue #39 for the full ASCII diagrams. Summary:
- Scheduler tick → load action → check caps → if no workbook, design via AI → execute steps → deliver → update run row.
- AI workbook design loop → send catalog + description → receive JSON → validate → dry-run with row_limit=1 → persist + activate.
9. Failure modes
| Failure | Detection | Response |
|---|---|---|
| Connector raises | try/except per step | run.status='partial' if late, failed if early |
| LLM design invalid JSON | Pydantic validate | retry ≤3 with error feedback, then action.status='error' |
| Per-action token cap exceeded | running counter | abort run, status=failed, Discord alert |
| Project monthly cap exceeded | pre-flight check | skip, status=failed, Discord P2 |
| pdf-service unreachable | httpx ConnectError | retry 3× exp; fall back to markdown attachment |
| Delivery fails | per-channel try/except | record in delivery_log; partial if any succeeded |
| claude-proxy 5xx | httpx | retry 3×; API fallback if AUDIT_ENGINE_API_FALLBACK=1 |
| Container crash mid-run | startup janitor | mark stale running rows → failed with error='orphaned' |
Discord webhook + Prometheus alert (audit_run_failures_total, Alertmanager P1 if >3/h per project).
10. Cost guardrails
| Cap | Default | Column | Reset |
|---|---|---|---|
| Per-action per-run | 100k tokens | action.per_run_token_cap | each run |
| Per-project monthly | 2M tokens | project.monthly_token_cap | rolling 30d via project.monthly_window_started_at |
Token accounting: ClaudeClient.complete() returns estimated tokens (chars/4 for proxy; exact from API). run.token_usage accumulates; project monthly running total updated on completion.
Metrics on /metrics:
audit_tokens_used_total{project="..."}
audit_run_duration_seconds
audit_runs_total{status,project}
audit_active_actions
audit_workbook_design_total{result}
11. File layout
infra-src/audit-engine/
├── Dockerfile
├── pyproject.toml
├── README.md
├── src/audit_engine/
│ ├── main.py
│ ├── config.py
│ ├── db.py
│ ├── secrets.py
│ ├── scheduler.py
│ ├── runner.py
│ ├── designer.py
│ ├── ai/{__init__.py, claude_client.py, prompts.py}
│ ├── connectors/{__init__.py, base.py, supabase.py, http.py, github.py, wasabi.py, gdrive.py, prometheus.py, n8n.py}
│ ├── render/{__init__.py, jinja_env.py, templates/report_default.html.j2, pdf.py}
│ ├── delivery/{__init__.py, email.py, gdrive.py}
│ ├── models.py
│ ├── alerts.py
│ └── api/{__init__.py, actions.py, projects.py}
├── audits/ # declarative YAML, synced to Supabase
│ └── README.md
└── tests/
Plus scripts/audit-engine/sync.py for YAML → Supabase upsert.
12. Deploy plan — hostinger/docker-compose.yml addition
audit-engine:
build: /opt/p24-infra/infra-src/audit-engine
restart: unless-stopped
environment:
- SUPABASE_URL=${SUPABASE_URL}
- SUPABASE_SERVICE_KEY=${SUPABASE_SERVICE_KEY}
- CLAUDE_PROXY_URL=http://host.docker.internal:9999
- CLAUDE_PROXY_SECRET=${CLAUDE_PROXY_SECRET}
- PDF_SERVICE_URL=https://pdf.vps-i1.infra.zintegrowana.online
- PDF_SERVICE_API_KEY=${PDF_SERVICE_API_KEY}
- SMTP_HOST=${SMTP_HOST}
- SMTP_USER=${SMTP_USER}
- SMTP_PASSWORD=${SMTP_PASSWORD}
- SMTP_FROM=audit-engine@services.pinbox24.com
- DISCORD_WEBHOOK_URL=${P24_DISCORD_INFRA_SCRIPTS_ERRORS_WEBHOOK_URL}
- AUDIT_ENGINE_API_FALLBACK=0
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:-}
- TZ=Europe/Warsaw
extra_hosts:
- "host.docker.internal:host-gateway"
ports:
- "127.0.0.1:8300:8300"
labels:
- traefik.enable=true
- traefik.http.routers.audit.rule=Host(`audit.vps-h1.infra.zintegrowana.online`)
- traefik.http.routers.audit.tls=true
- traefik.http.routers.audit.entrypoints=web,websecure
- traefik.http.routers.audit.tls.certresolver=mytlschallenge
- traefik.http.services.audit.loadbalancer.server.port=8300
deploy:
resources:
limits:
memory: 512MA new pdf-service Caddy route on vps-i1: pdf.vps-i1.infra.zintegrowana.online → 127.0.0.1:8100.
13. Migration plan
| Phase | Scope |
|---|---|
| 0. Design + scaffolding (this PR) | docs/audit-engine.md + infra-src/audit-engine/ skeleton |
| 1 | Supabase migration + YAML-to-DB sync CLI |
| 2 | FastAPI app + APScheduler + 1 hardcoded reference action (no AI) |
| 3 | AI workbook designer via claude-proxy |
| 4 | Connector expansion (github, http, wasabi, prometheus, gdrive) |
| 5 | PDF rendering + Google Drive delivery + Vault wiring |
| 6 | /metrics + Prometheus scrape + Alertmanager + Discord |
| 7 | Cost guardrails enforcement |
| 8 | Author production audit actions for et-oper + p24-infra |
14. Follow-up issues
- audit-engine: Supabase schema migration + YAML-to-DB sync (phase 1)
- audit-engine: FastAPI skeleton + APScheduler + reference deterministic action (phase 2)
- audit-engine: AI workbook designer via claude-proxy (phase 3)
- audit-engine: connector library — github, http, wasabi, prometheus, gdrive (phase 4)
- audit-engine: PDF + Google Drive delivery + Vault wiring (phase 5)
- audit-engine: Prometheus /metrics + Grafana dashboard + Alertmanager + Discord (phase 6)
- audit-engine: cost guardrails (phase 7)
- audit-engine: author production actions for et-operational-platform + p24-infra (phase 8)