Neu veröffentlicht: E-Commerce mit Power Pages, Stripe & Analytics

· Architektur  · 7 minuten Lesezeit

Event-Driven Ingestion mit BullMQ und Redis

POST /ingest blockierte die Extension, bis Embedding und Qdrant-Upsert fertig waren. Mit BullMQ und Redis wird der Ingest asynchron: 202 sofort, Verarbeitung im Hintergrund, Statusabfrage über GET /captures/:id/status.

POST /ingest blockierte die Extension, bis Embedding und Qdrant-Upsert fertig waren. Mit BullMQ und Redis wird der Ingest asynchron: 202 sofort, Verarbeitung im Hintergrund, Statusabfrage über GET /captures/:id/status.

Inhalt

Das Problem mit synchronem Ingest

Ich hatte ein funktionierendes System. Die Chrome Extension drückt Ctrl+Q, der Post wird erkannt, Screenshot wird zugeschnitten, und dann kommt POST /ingest. Das Backend macht drei Dinge nacheinander: Bild nach Azure Blob Storage hochladen, Embedding über Ollama oder OpenAI generieren, Vektor in Qdrant speichern.

Funktioniert. Aber mit einem Problem, das ich lange ignoriert hatte.

Das Embedding bei OpenAI dauert zwischen 300 ms und 1.500 ms, je nach Auslastung. Ollama lokal ist noch langsamer. Solange das Backend diese Kette durchläuft, hält es die HTTP-Verbindung zur Extension offen. Die Extension hängt. Der Nutzer wartet. Und wenn das Backend neu gestartet wird, während gerade ein Capture in der Pipeline steckt, geht dieser Capture verloren.

Das ist kein Edge Case. Das ist schlechtes Design.

Was ich stattdessen wollte

Die Extension soll feuern und vergessen. POST /ingest gibt 202 Accepted zurück, sobald das Bild hochgeladen ist. Die Antwort enthält eine captureId. Den Rest erledigt ein Worker im Hintergrund: Embedding und Qdrant-Upsert.

Extension  →  POST /ingest
           ←  202 { captureId }   ← sofort

Backend-Worker (async):
  → generateEmbedding(text)
  → storePoint(vector, payload, captureId)
  → job.state = "completed"

Extension  →  GET /captures/:captureId/status
           ←  { status: "completed" }

Genau das ist das Event-Driven-Prinzip: Produzent und Konsument sind entkoppelt. Der Produzent (API) weiß nicht, wie lange die Verarbeitung dauert. Er gibt Job in eine Queue, fertig.

Warum BullMQ und nicht einfach setImmediate

Die einfachste Variante wäre: setImmediate(() => processInBackground(...)). HTTP antwortet sofort, Worker läuft in demselben Node.js-Prozess, keine neue Abhängigkeit.

Das Problem: Bei einem Prozess-Crash gehen alle Jobs verloren, die noch nicht abgearbeitet wurden. Es gibt keine Retry-Logik. Kein Monitoring. Keine Möglichkeit, von außen zu sehen, was gerade verarbeitet wird.

BullMQ löst das mit Redis als persistenter Queue. Ein Job bleibt in Redis, bis der Worker ihn als completed oder failed markiert. Server-Restart kostet keinen einzigen Job.

Kafka wäre overkill für diesen Use Case. Kafka ist sinnvoll ab einigen Millionen Events pro Tag, wenn mehrere unabhängige Consumer-Groups denselben Stream lesen müssen. BullMQ mit Redis skaliert auf 100.000 Jobs täglich ohne jede Konfigurationsänderung. Mehr Worker-Container starten reicht aus.

Redis in den Docker-Compose-Stack einbauen

In docker-compose.yml reicht ein einzelner Service:

# Added in Phase 2: Redis as BullMQ queue broker
redis:
  image: redis:7-alpine
  container_name: redis_local
  ports:
    - "6379:6379"
  healthcheck:
    test: ["CMD", "redis-cli", "ping"]
    interval: 10s
    timeout: 3s
    retries: 5

Der Backend-Service bekommt eine neue Umgebungsvariable:

backend:
  environment:
    - REDIS_URL=redis://redis:6379
  depends_on:
    - redis

Im docker-compose.override.yml wird der Backend-Service noch um redis in der depends_on-Liste erweitert, damit der lokale Stack in der richtigen Reihenfolge hochfährt.

Die Queue-Konfiguration im Backend

In src/services/queue.ts sitzt der BullMQ-Queue-Singleton:

// src/services/queue.ts
import { Queue } from "bullmq";
import type { IngestJobData } from "../types";

function parseRedisUrl(url: string): { host: string; port: number } {
  const parsed = new URL(url);
  return { host: parsed.hostname, port: parseInt(parsed.port || "6379", 10) };
}

export const connection = parseRedisUrl(process.env.REDIS_URL || "redis://localhost:6379");

export const ingestionQueue = new Queue<IngestJobData>("ingestion", {
  connection,
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: "exponential", delay: 2000 },
    removeOnComplete: { count: 500 },
    removeOnFail: { count: 100 },
  },
});

attempts: 3 mit exponentiellem Backoff (2 s, 4 s, 8 s) deckt temporäre Ausfälle von Ollama oder OpenAI ab. Die meisten Embedding-Fehler sind transient.

removeOnComplete und removeOnFail verhindern, dass Redis mit abgelaufenen Job-Einträgen volläuft. Ich behalte 500 erfolgreiche und 100 fehlgeschlagene Jobs für die Statusabfrage.

Der Ingest-Worker

// src/workers/ingest-worker.ts
import { Worker, type Job } from "bullmq";
import type { IngestJobData } from "../types";
import { generateEmbedding } from "../services/ai-provider";
import { storePoint } from "../services/qdrant";
import { connection, INGESTION_QUEUE_NAME } from "../services/queue";

async function processIngestJob(job: Job<IngestJobData>): Promise<void> {
  const { captureId, qdrantPayload, embeddingText } = job.data;
  const vector = await generateEmbedding(embeddingText);
  await storePoint(vector, qdrantPayload, captureId);
}

export function createIngestWorker() {
  const worker = new Worker<IngestJobData>(INGESTION_QUEUE_NAME, processIngestJob, {
    connection,
    concurrency: 3,
  });

  worker.on("failed", (job, err) => {
    if (job && job.attemptsMade >= (job.opts?.attempts ?? 3)) {
      console.error(`[IngestWorker] DEAD LETTER — captureId ${job.data.captureId}:`, err.message);
    }
  });

  return worker;
}

concurrency: 3 bedeutet: der Worker verarbeitet bis zu drei Jobs gleichzeitig. Sinnvoll, wenn mehrere Captures in kurzer Zeit eintreffen. Für OpenAI-Embeddings ist das unproblematisch. Für Ollama lokal kann concurrency: 1 besser sein, um das Modell nicht zu überlasten.

Die storePoint-Funktion in services/qdrant.ts bekommt eine optionale ID:

export async function storePoint(
  vector: number[],
  payload: QdrantPayload,
  id?: string
): Promise<string> {
  const pointId = id ?? crypto.randomUUID();
  // ...
}

Das ist der entscheidende Punkt: Die captureId, die die Extension in 202 { captureId } erhält, ist dieselbe UUID, die später als Qdrant-Dokumenten-ID gespeichert wird. Kein separates ID-Mapping nötig.

Die neue Ingest-Route

// src/routes/ingest.ts (vereinfacht)
router.post("/", async (req, res) => {
  const payload = req.body as PlatformAnalysisPayload;
  if (!payload?.platform || !payload?.image?.base64) {
    res.status(400).json({ error: "Invalid payload" });
    return;
  }
  if (req.auth?.userId) payload.capturedBy.userId = req.auth.userId;

  const captureId = crypto.randomUUID();
  const embeddingText = buildEmbeddingText(payload);

  // Upload image synchronously (fast) — then immediately return
  const blobName = await uploadImage(payload.image.base64, payload.image.mimeType);

  const qdrantPayload = { ...payload, image: { blobName, ... }, embeddingText };

  // Enqueue with captureId as BullMQ job ID
  await ingestionQueue.add("ingest", { captureId, qdrantPayload, embeddingText }, { jobId: captureId });

  res.status(202).json({ captureId });
});

Das Bild wird noch synchron hochgeladen, typischerweise unter 200 ms. Das ist kein Blocking-Problem. Der teure Teil (Embedding) ist vollständig ausgelagert.

Der Statusabfrage-Endpoint

// src/routes/captures.ts
router.get("/:captureId/status", async (req, res) => {
  const job = await statusQueue.getJob(req.params.captureId);
  if (!job) {
    res.json({ captureId: req.params.captureId, status: "unknown" });
    return;
  }
  const state = await job.getState();
  res.json({ captureId: req.params.captureId, status: state, ...(state === "failed" ? { failedReason: job.failedReason } : {}) });
});

Mögliche Status: waiting, active, completed, failed, delayed, unknown.

Die Extension-Seite

In background/main.ts wird das 202-Response jetzt ausgewertet:

if (response.status === 202) {
  const { captureId } = await response.json() as { captureId: string };
  await appendAsyncCapture(captureId, payload);
}

appendAsyncCapture speichert { captureId, channel, platform, capturedAt, status: "waiting" } in chrome.storage.local.asyncCaptures.

Wenn das Popup geöffnet wird, lädt es diese Einträge und fragt für jeden, der noch nicht completed ist, den Backend-Status ab:

const updatedAsync = await refreshAsyncStatuses(asyncCaptures);
await chrome.storage.local.set({ asyncCaptures: trimmed });
renderAsyncCaptures(trimmed);

Das Popup zeigt ein neues “Verarbeitung”-Panel mit farbkodierten Status: gelb für warteschlange/verzögert, blau für verarbeitung, rot für fehlgeschlagen.

Graceful Shutdown

In src/index.ts gibt es jetzt einen ordentlichen Shutdown-Handler:

const worker = createIngestWorker();

const shutdown = async () => {
  console.log("Shutting down worker...");
  await worker.close(); // Wartet, bis der laufende Job abgeschlossen ist
  process.exit(0);
};

process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);

worker.close() lässt den aktuell laufenden Job zu Ende verarbeiten, bevor der Prozess beendet wird. Kein Datenverlust bei docker compose down.

Was das konkret ändert

Vor Phase 2: POST /ingest hält die Extension für 300 ms bis 3 s fest. Ein Backend-Restart während des Embedings verliert den Job.

Nach Phase 2: POST /ingest antwortet in unter 300 ms (Image-Upload + Queue-Enqueue). Ein Backend-Restart verliert keinen einzigen Job. Redis hält die Queue. Der Worker nimmt den Job nach dem Neustart wieder auf.

Für die nächste Phase (Multi-Tenancy, Quota-Enforcement) ist die Architektur bereits vorbereitet: Workers können horizontal skaliert werden, indem man einfach weitere Backend-Container startet. Keine Code-Änderungen nötig.

Diagramm: BullMQ Event-Driven Ingestion Architektur Abbildung: POST /ingest gibt 202 zurück, sobald das Bild hochgeladen ist. BullMQ übernimmt die Queue-Verwaltung in Redis. Der IngestWorker zieht Jobs, generiert Embeddings und speichert Vektoren in Qdrant. Die Extension kann den Status über GET /captures/:id/status abfragen.


Alle Artikel der Serie

  1. Vision und Systemübersicht: Chrome Extension, RAG-Architektur, Projekthintergrund: Artikel lesen
  2. RAG-System Aufbau: Qdrant, Embeddings, Cosine-Ähnlichkeit in TypeScript: Artikel lesen
  3. AI Provider Abstraktion: Ollama vs. OpenAI, Interface-Design, kein Vendor-Lock-in: Artikel lesen
  4. Chrome Extension MV3: Drei isolierte Laufzeitkontexte, Message Passing, Strategy Pattern: Artikel lesen
  5. Docker Compose Strategie: Override-Pattern, von lokal zu Azure: Artikel lesen
  6. Ollama lokal vs. Docker: Die Entscheidung und ihre Konsequenzen: Artikel lesen
  7. Ollama Auto-Pull Entrypoint: Automatisiertes Modell-Setup beim Container-Start: Artikel lesen
  8. tsconfig und Vite: Node16 vs. bundler, warum Vite eigene Regeln hat: Artikel lesen
  9. Instagram Caption mit MutationObserver vollständig laden: Artikel lesen
  10. Chrome Extension Foundation mit Health-Dot und Retry-Queue: Artikel lesen
  11. Phase 2 Features: Shadow DOM Overlay, Tailwind v4, Duplicate Detection: Artikel lesen
  12. Race Condition bei der Plattformerkennung: Wie ein UI-Event die Instagram-Erkennung bricht: Artikel lesen
  13. PostId-Extraktion in zwei Instagram-Layouts: querySelector vs. Ancestor-Traversal: Artikel lesen
  14. Instagram Karussell vollständig erfassen mit MutationObserver: Artikel lesen
  15. Notiz und Tags beim Screenshot-Speichern: Artikel lesen
  16. Instagram Tastatur-Shortcuts blockieren Chrome Extension Eingaben: Artikel lesen
  17. Lowercase-Normalisierung und Duplikat-Erkennung im Tag-Input: Artikel lesen
  18. Zitadel Login V2 in Docker Compose: drei versteckte Fehler: Artikel lesen
  19. PKCE OAuth in einer Chrome MV3 Extension: Artikel lesen
  20. React Frontend mit react-oidc-context und Zitadel: Artikel lesen
  21. Vite Build-Time-Umgebungsvariablen in Docker: Artikel lesen
  22. Event-Driven Ingestion mit BullMQ und Redis (dieser Artikel)

Du baust ein event-driven Backend mit BullMQ und Redis und steckst bei der Worker-Konfiguration oder der Fehlerbehandlung fest? Lass uns das gemeinsam einschätzen.

Zurück zum Blog

Ähnliche Beiträge

Alle Beiträge ansehen
RAG-System mit Qdrant, Embeddings und Node.js aufbauen

RAG-System mit Qdrant, Embeddings und Node.js aufbauen

Retrieval-Augmented Generation ist keine Theorie. Es ist eine konkrete Architektur aus drei Schritten: Einbetten, Suchen, Generieren. Ich zeige, wie ich das mit Qdrant, nomic-embed-text und llama3.2 komplett lokal und ohne Cloud-Kosten umgesetzt habe.