Incident: Pipeline-Crash durch Payloads

Das Problem: Externe APIs sind in der Praxis oft inkonsistent. Letzte Woche schlugen unsere Nightly-Builds fehl, weil ein wesentliches externes Kassensystem ohne Vorwarnung Schlüssel im JSON-Payload veränderte – „customer_id" wurde zu „custId", „lifetime_value" zu „ltv_eur", „phone_number" zu „contact_phone". Diese drei Umbenennungen triggerten 14 Downstream-Validierungsfehler, die kaskadierend drei weitere Pipelines zu Fall brachten. Bis zum Morgen-Standup fehlten 6 Stunden Transaktionsdaten in unserem Data Warehouse.

Der starre Regex-Ansatz und seine Grenzen

Der erste Instinkt war, einen Mapping-Layer vor der Pydantic-Validierung zu schalten: ein Python-Dictionary mit expliziten Key-Umbenennungen wie {'custId': 'customer_id', 'ltv_eur': 'lifetime_value'}. Für flache Objekte mit stabilen Abweichungsmustern funktioniert das. Die Realität unserer Kassensystem-Payloads ist aber ein dreifach verschachteltes JSON mit insgesamt 47 Feldern – Transaktionsdaten, Kundenprofile, Kassenkonfigurationen und Artikelpositionen in einem Payload. Der Mapper wuchs innerhalb von zwei Wochen auf über 200 hartcodierte Mappings. Jede neue Abweichung erforderte einen manuellen PR, ein Code-Review und ein Deployment – für einen trivialen Tippfehler im Feldnamen des Drittanbieters. Maintenance-Aufwand: ca. 3 Stunden pro Woche für einen Senior-Entwickler.

Split Screen JSON Payloads

Die Komplexitätsfalle tiefer JSONs

Das eigentliche Problem liegt nicht auf der ersten Ebene des JSONs. Pydantic's ValidationError auf Top-Level-Feldern ist noch einfach abzufangen. Kritisch wird es bei tief verschachtelten Strukturen. Wenn payload['transaction']['items'][0]['sku_code'] plötzlich payload['transaction']['line_items'][0]['article_id'] heißt, gibt Pydantic lediglich einen generischen field required-Fehler zurück, ohne Angabe des exakten Pfads. Das Debugging dauerte im Schnitt 45 Minuten pro Incident, weil das fehlerhafte Feld erst durch manuelles Diffing des Payload-Logs gegen das Schema identifiziert werden musste.

Ada's dynamischer Fuzzy Key Mapper

Die neue Architektur setzt auf einen zweistufigen Self-Healing-Mechanismus. Stufe 1: Pydantic v2 mit model_config = ConfigDict(populate_by_name=True) und expliziten Field-Aliases für bekannte Varianten. Stufe 2: Bei einem ValidationError extrahiert ein Error-Parser die fehlerhaften Pfade aus dem Pydantic-Fehlerobjekt (error['loc']). Diese Pfade plus das rohe fehlerhafte JSON und das vollständige Ziel-Schema werden als strukturierter Prompt an gpt-4o-mini gesendet. Das Modell versteht semantisch, dass „contactPhone" und „phone_number" dasselbe meinen, baut das JSON nach dem Zielschema um und gibt es als valides JSON zurück. Das reparierte Mapping wird sofort in Redis gecacht mit einem TTL von 30 Tagen, Key-Format: payload_map:{hash(incoming_keys)}. Beim nächsten identischen Payload-Pattern greift der Cache in unter 1 ms – kein LLM-Aufruf, keine Kosten.

from pydantic import BaseModel, ValidationError, ConfigDict, Field
from openai import OpenAI
import redis, json, hashlib

client = OpenAI()
r = redis.Redis(host='localhost', port=6379, db=1)

class CustomerData(BaseModel):
    model_config = ConfigDict(populate_by_name=True)
    customer_id: str = Field(alias='custId', default=None)
    lifetime_value: float = Field(alias='ltv_eur', default=None)
    phone_number: str = Field(alias='contact_phone', default=None)

def get_cache_key(incoming: dict) -> str:
    key_str = json.dumps(sorted(incoming.keys()))
    return f'payload_map:{hashlib.md5(key_str.encode()).hexdigest()}'

def llm_repair_payload(incoming: dict, schema: dict) -> dict:
    cache_key = get_cache_key(incoming)
    cached = r.get(cache_key)
    if cached:
        mapping = json.loads(cached)
        return {mapping.get(k, k): v for k, v in incoming.items()}

    prompt = (
        f'Target schema: {json.dumps(schema)}\n'
        f'Incoming JSON: {json.dumps(incoming)}\n'
        'Return ONLY valid JSON matching the target schema. '
        'Map fields by semantic similarity. No explanation.'
    )
    response = client.chat.completions.create(
        model='gpt-4o-mini',
        temperature=0,
        response_format={'type': 'json_object'},
        messages=[{'role': 'user', 'content': prompt}]
    )
    repaired = json.loads(response.choices[0].message.content)

    mapping = {old: new for old, new in zip(incoming.keys(), repaired.keys())}
    r.setex(cache_key, 2592000, json.dumps(mapping))  # 30 days TTL
    return repaired

def parse_customer(incoming_json: dict) -> CustomerData:
    try:
        return CustomerData(**incoming_json)
    except ValidationError:
        repaired = llm_repair_payload(incoming_json, CustomerData.model_json_schema())
        return CustomerData(**repaired)

Ergebnisse und Alerting

Seit dem Deployment wird jeder LLM-Repair-Vorgang in eine separate Postgres-Tabelle geloggt mit Zeitstempel, den betroffenen Feldern und dem Cache-Status. Ein wöchentlicher Cronjob analysiert diese Tabelle und sendet eine Slack-Nachricht, wenn derselbe Payload-Typ dreimal ohne Cache-Hit repariert wurde – ein Indikator, dass der externe Provider seine Struktur erneut geändert hat. Die Pipeline-Resilienz gegen externe Schemaänderungen stieg von 72 % auf 99.8 %, der manuelle Maintenance-Aufwand sank von 3 Stunden auf unter 20 Minuten pro Woche.