audit-engine — Operational-Data Audit Engine

Living specification. Source of truth for the design of issue #39 and its follow-ups. Update as decisions evolve. The version posted as the design comment on #39 is the v1.0 snapshot.


TL;DR

A new Python service called audit-engine runs as a Docker container on vps-h1 (Hostinger) alongside n8n, claude-proxy, and WAHA. It uses APScheduler in-process for cron-style triggers and a Supabase-hosted control plane (audit.* schema) for project/action/workbook/run state. Workbook design and report rendering are driven by Claude via the existing claude-proxy:9999 (zero marginal cost, Claude Max OAuth). Connectors are first-party Python plugins (Supabase, HTTP, Wasabi, GitHub, GoogleDrive) loaded by entry-point discovery — not n8n nodes. Reports render to HTML via Jinja2 → PDF via the existing pdf-service:8100 on vps-i1. Delivery: SMTP (Mailgun) and Google Drive via per-project service-account JSON pulled from Supabase Vault at runtime. Failures → Discord webhook + Prometheus alert.


1. Runtime placement

Decision: vps-h1 (Hostinger), new Docker service audit-engine in hostinger/docker-compose.yml.

OptionProsConsVerdict
vps-h1 container (chosen)Co-located with claude-proxy → loopback HTTP (zero latency, zero auth complexity); n8n peer for webhook handoffs; same VPS already runs Python via WAHA hooks2 vCPU / 8 GB shared with n8n+WAHA — must enforce concurrency capsBest fit
vps-i1 containerCo-located with pdf-service, prometheusFar from claude-proxy → needs network egress + auth round-trip; vps-i1 already heavierRejected
Lambda / Vercel cronStateless, no infraNo subscription claude-proxy access; cold startsRejected
n8n cron workflowsZero new codeWorkbook design (LLM tool-use loop) awkward in n8n; per-action state hard to modelRejected

Resource budget on vps-h1:

  • audit-engine container: cap 512 MB / 0.5 CPU
  • Concurrency: max 2 concurrent action runs (asyncio.Semaphore(2))
  • pdf-service reached via https://pdf.vps-i1.infra.zintegrowana.online (new Caddy entry) with API key

2. Data model — Supabase

Decision: Supabase PostgreSQL (mwkqmgadqnkkihjdeqsi), new schema audit.

Config in Supabase, but every project and action row carries a source_ref column so the operator can declare actions in YAML in the repo and have a sync job upsert them (audits/<project>/<action>.yml).

SQL DDL

CREATE SCHEMA IF NOT EXISTS audit;
 
CREATE TABLE audit.project (
  id              uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  slug            text NOT NULL UNIQUE,
  display_name    text NOT NULL,
  github_repo     text NOT NULL,
  gdrive_root_id  text,
  gdrive_sa_secret_ref text,
  default_recipients text[] DEFAULT '{}',
  monthly_token_cap  bigint DEFAULT 2000000,
  monthly_token_used bigint DEFAULT 0,
  monthly_window_started_at timestamptz DEFAULT now(),
  created_at      timestamptz DEFAULT now(),
  updated_at      timestamptz DEFAULT now()
);
 
CREATE TYPE audit.action_status AS ENUM ('new','optimizing','optimized','error','disabled');
 
CREATE TABLE audit.action (
  id              uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  project_id      uuid NOT NULL REFERENCES audit.project(id) ON DELETE CASCADE,
  slug            text NOT NULL,
  name            text NOT NULL,
  description     text NOT NULL,
  schedule_cron   text NOT NULL,
  schedule_tz     text NOT NULL DEFAULT 'Europe/Warsaw',
  output_format   text NOT NULL CHECK (output_format IN ('pdf','markdown','html')),
  delivery        jsonb NOT NULL DEFAULT '{}'::jsonb,
  status          audit.action_status NOT NULL DEFAULT 'new',
  optimized_at    timestamptz,
  last_run_at     timestamptz,
  last_success_at timestamptz,
  last_error      text,
  per_run_token_cap int DEFAULT 100000,
  source_ref      text,
  created_at      timestamptz DEFAULT now(),
  updated_at      timestamptz DEFAULT now(),
  UNIQUE (project_id, slug)
);
 
CREATE TABLE audit.workbook (
  id              uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  action_id       uuid NOT NULL REFERENCES audit.action(id) ON DELETE CASCADE,
  version         int  NOT NULL,
  spec            jsonb NOT NULL,
  designed_by_model text,
  design_prompt   text,
  design_token_usage int,
  active          boolean NOT NULL DEFAULT false,
  created_at      timestamptz DEFAULT now(),
  UNIQUE (action_id, version)
);
CREATE INDEX ON audit.workbook (action_id) WHERE active;
 
CREATE TABLE audit.run (
  id              uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  action_id       uuid NOT NULL REFERENCES audit.action(id) ON DELETE CASCADE,
  workbook_id     uuid REFERENCES audit.workbook(id),
  started_at      timestamptz NOT NULL DEFAULT now(),
  finished_at     timestamptz,
  status          text NOT NULL CHECK (status IN ('pending','running','success','failed','partial')),
  trigger         text NOT NULL CHECK (trigger IN ('schedule','manual','retry','reoptimize')),
  token_usage     int DEFAULT 0,
  artifact_uri    text,
  delivery_log    jsonb DEFAULT '[]'::jsonb,
  error           text,
  log             jsonb DEFAULT '[]'::jsonb
);
CREATE INDEX ON audit.run (action_id, started_at DESC);
CREATE INDEX ON audit.run (status, started_at DESC) WHERE status IN ('failed','partial');

3. Schedule representation

Single schedule_cron text column (5-field cron) + schedule_tz IANA string.

Cron expresses every requested case in one well-known DSL. APScheduler’s CronTrigger.from_crontab(expr, timezone=tz) is one-line ingestion. croniter validates on write.

scheduler.add_job(
    run_action,
    CronTrigger.from_crontab(a.schedule_cron, timezone=a.schedule_tz),
    args=[a.id],
    id=f"action:{a.id}",
    replace_existing=True,
    misfire_grace_time=600,
    coalesce=True,
    max_instances=1,
)

4. AI integration

Primary: claude-proxy:9999 (Claude Max OAuth subscription). Fallback: ANTHROPIC_API_KEY if proxy 5xx ×3.

Since claude-proxy returns plain text from claude -p, tool-use is implemented as a structured prompt protocol:

  1. Engine sends prompt: “You are designing an audit workbook. Available connectors: [list]. Reply with JSON matching this schema: …”
  2. Claude returns JSON workbook spec.
  3. Engine validates with Pydantic; if invalid, sends back error + asks for fix (max 3 retries).

Thin abstraction audit_engine.ai.ClaudeClient exposes .design_workbook(spec_prompt) and .render_summary(data) hiding the proxy/API toggle.

Envelope (per claude-proxy README):

POST http://host.docker.internal:9999/v1/messages
X-Proxy-Secret: $CLAUDE_PROXY_SECRET
{"messages":[{"role":"user","content":"..."}]}

5. Connector framework

First-party Python plugins via registry decorator. Not n8n nodes.

class Connector(Protocol):
    name: str                               # "supabase.query"
    description: str                        # for LLM context
    params_schema: dict                     # JSON Schema
    async def execute(self, params: dict, project: Project) -> ConnectorResult: ...
 
CONNECTORS: dict[str, type[Connector]] = {}
def register(cls): CONNECTORS[cls.name] = cls; return cls

v1 connectors:

ConnectorPurposeParams
supabase.queryRead-only SQLsql, params, role
supabase.rpcCall RPC functionsfunction, args
http.get / http.postGeneric HTTP fetchurl, headers, body, timeout
github.issues_searchGH issue searchq, repo
github.repo_statsRepo metricsrepo
wasabi.list / wasabi.getS3 accessbucket, prefix / key
gdrive.list / gdrive.readDrive contentfolder_id, mime
prometheus.queryPromQL via Thanosexpr, start, end, step
n8n.run_workflowEscape hatchwebhook_url, body

6. Workbook representation

JSON validated by Pydantic, versioned in audit.workbook.spec.

{
  "version": "1.0",
  "designed_for": "Weekly incidents summary for et-operational-platform",
  "steps": [
    {
      "id": "fetch_incidents",
      "connector": "supabase.query",
      "params": {
        "sql": "SELECT id, type, severity, created_at, status FROM incidents WHERE created_at >= now() - interval '7 days' ORDER BY created_at DESC"
      },
      "output_var": "incidents"
    },
    {
      "id": "summarize",
      "type": "ai_summary",
      "model": "claude-default",
      "prompt": "Summarize {{ incidents | tojson }} into markdown.",
      "output_var": "summary_md"
    },
    {
      "id": "render",
      "type": "template",
      "template": "report_default.html.j2",
      "context": { "title": "Weekly — {{ run_date }}", "body_md": "{{ summary_md }}" },
      "output_var": "html"
    },
    { "id": "to_pdf", "type": "render_pdf", "input_var": "html", "output_var": "pdf_bytes" }
  ],
  "deliveries": [
    { "type": "email", "to": ["radieu@gmail.com"], "subject": "Weekly — {{ run_date }}" },
    { "type": "gdrive", "folder_id": "${project.gdrive_root_id}", "filename": "weekly-{{ run_date }}.pdf" }
  ],
  "cost_estimate": { "tokens_per_run": 8000 }
}

Step types: connector, ai_summary, template (Jinja2 SandboxedEnvironment), render_pdf, transform_python (whitelisted expressions only — no arbitrary code).


7. Secrets

SecretScopeStorage
GOOGLE_SA_<project_slug>per-projectSupabase Vault, project.gdrive_sa_secret_ref
SUPABASE_URL/_SERVICE_KEYengine-levelhostinger .env
CLAUDE_PROXY_SECRETengine-levelhostinger .env
PDF_SERVICE_API_KEYengine-levelhostinger .env
WASABI_*per-projectVault
SMTP_*engine-levelhostinger .env
ANTHROPIC_API_KEY (fallback)engine-levelhostinger .env optional
DISCORD_WEBHOOK_URLengine-levelhostinger .env

Runtime injection: audit_engine.secrets.SecretResolver reads Vault on first use per process, caches for process lifetime.


8. Flow diagrams

See the design comment on issue #39 for the full ASCII diagrams. Summary:

  1. Scheduler tick → load action → check caps → if no workbook, design via AI → execute steps → deliver → update run row.
  2. AI workbook design loop → send catalog + description → receive JSON → validate → dry-run with row_limit=1 → persist + activate.

9. Failure modes

FailureDetectionResponse
Connector raisestry/except per steprun.status='partial' if late, failed if early
LLM design invalid JSONPydantic validateretry ≤3 with error feedback, then action.status='error'
Per-action token cap exceededrunning counterabort run, status=failed, Discord alert
Project monthly cap exceededpre-flight checkskip, status=failed, Discord P2
pdf-service unreachablehttpx ConnectErrorretry 3× exp; fall back to markdown attachment
Delivery failsper-channel try/exceptrecord in delivery_log; partial if any succeeded
claude-proxy 5xxhttpxretry 3×; API fallback if AUDIT_ENGINE_API_FALLBACK=1
Container crash mid-runstartup janitormark stale running rows → failed with error='orphaned'

Discord webhook + Prometheus alert (audit_run_failures_total, Alertmanager P1 if >3/h per project).


10. Cost guardrails

CapDefaultColumnReset
Per-action per-run100k tokensaction.per_run_token_capeach run
Per-project monthly2M tokensproject.monthly_token_caprolling 30d via project.monthly_window_started_at

Token accounting: ClaudeClient.complete() returns estimated tokens (chars/4 for proxy; exact from API). run.token_usage accumulates; project monthly running total updated on completion.

Metrics on /metrics:

audit_tokens_used_total{project="..."}
audit_run_duration_seconds
audit_runs_total{status,project}
audit_active_actions
audit_workbook_design_total{result}

11. File layout

infra-src/audit-engine/
├── Dockerfile
├── pyproject.toml
├── README.md
├── src/audit_engine/
│   ├── main.py
│   ├── config.py
│   ├── db.py
│   ├── secrets.py
│   ├── scheduler.py
│   ├── runner.py
│   ├── designer.py
│   ├── ai/{__init__.py, claude_client.py, prompts.py}
│   ├── connectors/{__init__.py, base.py, supabase.py, http.py, github.py, wasabi.py, gdrive.py, prometheus.py, n8n.py}
│   ├── render/{__init__.py, jinja_env.py, templates/report_default.html.j2, pdf.py}
│   ├── delivery/{__init__.py, email.py, gdrive.py}
│   ├── models.py
│   ├── alerts.py
│   └── api/{__init__.py, actions.py, projects.py}
├── audits/                              # declarative YAML, synced to Supabase
│   └── README.md
└── tests/

Plus scripts/audit-engine/sync.py for YAML → Supabase upsert.


12. Deploy plan — hostinger/docker-compose.yml addition

audit-engine:
  build: /opt/p24-infra/infra-src/audit-engine
  restart: unless-stopped
  environment:
    - SUPABASE_URL=${SUPABASE_URL}
    - SUPABASE_SERVICE_KEY=${SUPABASE_SERVICE_KEY}
    - CLAUDE_PROXY_URL=http://host.docker.internal:9999
    - CLAUDE_PROXY_SECRET=${CLAUDE_PROXY_SECRET}
    - PDF_SERVICE_URL=https://pdf.vps-i1.infra.zintegrowana.online
    - PDF_SERVICE_API_KEY=${PDF_SERVICE_API_KEY}
    - SMTP_HOST=${SMTP_HOST}
    - SMTP_USER=${SMTP_USER}
    - SMTP_PASSWORD=${SMTP_PASSWORD}
    - SMTP_FROM=audit-engine@services.pinbox24.com
    - DISCORD_WEBHOOK_URL=${P24_DISCORD_INFRA_SCRIPTS_ERRORS_WEBHOOK_URL}
    - AUDIT_ENGINE_API_FALLBACK=0
    - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:-}
    - TZ=Europe/Warsaw
  extra_hosts:
    - "host.docker.internal:host-gateway"
  ports:
    - "127.0.0.1:8300:8300"
  labels:
    - traefik.enable=true
    - traefik.http.routers.audit.rule=Host(`audit.vps-h1.infra.zintegrowana.online`)
    - traefik.http.routers.audit.tls=true
    - traefik.http.routers.audit.entrypoints=web,websecure
    - traefik.http.routers.audit.tls.certresolver=mytlschallenge
    - traefik.http.services.audit.loadbalancer.server.port=8300
  deploy:
    resources:
      limits:
        memory: 512M

A new pdf-service Caddy route on vps-i1: pdf.vps-i1.infra.zintegrowana.online → 127.0.0.1:8100.


13. Migration plan

PhaseScope
0. Design + scaffolding (this PR)docs/audit-engine.md + infra-src/audit-engine/ skeleton
1Supabase migration + YAML-to-DB sync CLI
2FastAPI app + APScheduler + 1 hardcoded reference action (no AI)
3AI workbook designer via claude-proxy
4Connector expansion (github, http, wasabi, prometheus, gdrive)
5PDF rendering + Google Drive delivery + Vault wiring
6/metrics + Prometheus scrape + Alertmanager + Discord
7Cost guardrails enforcement
8Author production audit actions for et-oper + p24-infra

14. Follow-up issues

  1. audit-engine: Supabase schema migration + YAML-to-DB sync (phase 1)
  2. audit-engine: FastAPI skeleton + APScheduler + reference deterministic action (phase 2)
  3. audit-engine: AI workbook designer via claude-proxy (phase 3)
  4. audit-engine: connector library — github, http, wasabi, prometheus, gdrive (phase 4)
  5. audit-engine: PDF + Google Drive delivery + Vault wiring (phase 5)
  6. audit-engine: Prometheus /metrics + Grafana dashboard + Alertmanager + Discord (phase 6)
  7. audit-engine: cost guardrails (phase 7)
  8. audit-engine: author production actions for et-operational-platform + p24-infra (phase 8)