10 production-tested patterns for event-driven Node.js. Outbox, Saga, Circuit Breaker, Idempotent Consumer, Event Sourcing, CQRS — the building blocks of systems that survive real failures.
⚡ Add your Anthropic API key to unlock live AI tutoring in every section
01 — Outbox Pattern
Guaranteed message delivery without two-phase commit
If you write to your database and then publish to a message queue, a crash between those two steps means the event is lost forever. The Outbox pattern makes publishing atomic with your database write. Step through it.
Implementation — atomic write + async relay
// Step 1: Write business data + outbox event in ONE transaction
await db.transaction(async (trx) => {
const order = await trx('orders').insert({ userId, total, status: 'pending' }).returning('*');
// Write to outbox table in the SAME transaction
await trx('outbox_events').insert({
aggregate_id: order[0].id,
event_type: 'OrderCreated',
payload: JSON.stringify({ orderId: order[0].id, userId, total }),
created_at: new Date(),
published: false
});
});
// Step 2: Relay process polls outbox and publishes (can be a separate service)
async function relayOutboxEvents() {
const events = await db('outbox_events')
.where({ published: false })
.orderBy('created_at')
.limit(100)
.forUpdate().skipLocked(); // safe for concurrent relays
for (const event of events) {
await channel.publish('events', event.event_type, Buffer.from(event.payload), { persistent: true });
await db('outbox_events').where({ id: event.id }).update({ published: true, published_at: new Date() });
}
}
setInterval(relayOutboxEvents, 1000);
AI tutor — Outbox Patternguaranteed delivery, polling, CDC
02 — Saga Orchestration
Distributed transactions without two-phase commit
A single business operation (place order → reserve inventory → charge payment → notify) spans multiple services. If payment fails after inventory is reserved, you need to compensate. The Saga pattern orchestrates this through compensating transactions.
← select a saga step above to see the compensation logic
Orchestrator-based Saga in Node.js
class OrderSaga {
async execute(orderId: string) {
const state = { orderId, compensations: [] as Array<() => Promise> };
try {
// Step 1: Reserve inventory
await inventoryService.reserve(orderId);
state.compensations.push(() => inventoryService.release(orderId));
// Step 2: Charge payment
const charge = await paymentService.charge(orderId);
state.compensations.push(() => paymentService.refund(charge.id));
// Step 3: Schedule delivery
await deliveryService.schedule(orderId);
state.compensations.push(() => deliveryService.cancel(orderId));
// All steps succeeded
await orderRepo.markConfirmed(orderId);
} catch (err) {
// Run compensations in REVERSE order
for (const compensate of state.compensations.reverse()) {
await compensate().catch(e => logger.error({ e, orderId }, 'Compensation failed'));
}
await orderRepo.markFailed(orderId, err.message);
throw err;
}
}
}
AI tutor — Saga Patternorchestration vs choreography, compensation
03 — Circuit Breaker
Stop calling a service that's already down
Without a circuit breaker, every call to a failing downstream service waits for a timeout — stacking up, exhausting thread pools, and cascading the failure to your own service. Click each state to understand when and why the circuit transitions.
CLOSED — normal
OPEN — rejecting
HALF-OPEN — probing
Circuit Breaker with opossum (Node.js)
import CircuitBreaker from 'opossum';
const breaker = new CircuitBreaker(paymentService.charge, {
timeout: 3000, // fail if call takes > 3s
errorThresholdPercentage: 50, // open if >50% fail in rolling window
resetTimeout: 10000, // after 10s, try HALF-OPEN
volumeThreshold: 5, // minimum 5 calls before opening
});
breaker.on('open', () => logger.warn('Circuit OPEN — payment service is down'));
breaker.on('halfOpen', () => logger.info('Circuit HALF-OPEN — probing payment service'));
breaker.on('close', () => logger.info('Circuit CLOSED — payment service recovered'));
breaker.on('fallback', (result) => logger.warn({ result }, 'Fallback triggered'));
// Fallback: queue the payment for retry instead of failing the user
breaker.fallback(async (orderId) => {
await orderQueue.push({ type: 'PENDING_PAYMENT', orderId, retryAfter: Date.now() + 10000 });
return { status: 'queued', message: 'Payment processing delayed — we\'ll retry automatically' };
});
// Use it:
const result = await breaker.fire(orderId, amount);
AI tutor — Circuit Breakerstates, thresholds, fallbacks, bulkheads
04 — Idempotent Consumer
Handle duplicate messages without double-processing
In any at-least-once messaging system, the same message can arrive more than once — after a consumer crash, network retry, or broker redelivery. Your consumer must produce the same result whether it processes a message once or ten times.
The idempotency key pattern — database deduplication
// Use message ID as an idempotency key in the DB
channel.consume('payments', async (msg) => {
const messageId = msg.properties.messageId; // must be set by publisher
const data = JSON.parse(msg.content.toString());
// Check if we've already processed this message
const existing = await db('processed_messages').where({ message_id: messageId }).first();
if (existing) {
logger.info({ messageId }, 'Duplicate message — skipping');
channel.ack(msg); // ack it so it doesn't requeue
return;
}
await db.transaction(async (trx) => {
// Process the payment
await trx('payments').insert({ orderId: data.orderId, amount: data.amount, status: 'completed' });
// Record that we processed this message — atomically
await trx('processed_messages').insert({
message_id: messageId,
processed_at: new Date(),
queue: 'payments'
});
});
channel.ack(msg);
});
// Publisher must set a stable, unique messageId:
channel.publish('exchange', 'payments', Buffer.from(JSON.stringify(payload)), {
persistent: true,
messageId: `payment-${orderId}-${Date.now()}` // stable per business event
});
AI tutor — Idempotent Consumerdeduplication, messageId, exactly-once semantics
05 — Event Sourcing
State is a projection of history
Instead of storing the current state of an entity, event sourcing stores every change as an immutable event. The current state is reconstructed by replaying the event log. This gives you a full audit trail and the ability to rebuild any past state.
Event store + projection example
// Events are immutable facts — stored forever
interface OrderEvent {
eventId: string;
aggregateId: string; // orderId
eventType: 'OrderCreated' | 'ItemAdded' | 'PaymentReceived' | 'OrderShipped';
payload: unknown;
timestamp: Date;
version: number; // monotonic, per aggregate
}
// Append to event store (never update, never delete)
async function appendEvents(orderId: string, events: OrderEvent[]) {
const expectedVersion = events[0].version - 1;
// Optimistic concurrency: fail if someone else wrote since we last read
await db('order_events')
.where({ aggregate_id: orderId })
.having(db.raw('MAX(version) = ?', [expectedVersion]))
.insert(events);
}
// Reconstruct current state by replaying events
async function getOrder(orderId: string) {
const events = await db('order_events')
.where({ aggregate_id: orderId })
.orderBy('version');
return events.reduce((state: OrderState, event: OrderEvent) => {
switch (event.eventType) {
case 'OrderCreated': return { ...state, ...event.payload, status: 'pending' };
case 'PaymentReceived': return { ...state, status: 'paid', paidAt: event.timestamp };
case 'OrderShipped': return { ...state, status: 'shipped', trackingId: event.payload.trackingId };
default: return state;
}
}, {} as OrderState);
}
AI tutor — Event Sourcingevent store, snapshots, projections, replay
06 — CQRS
Separate the read model from the write model
Command Query Responsibility Segregation (CQRS) splits your system into two paths: commands (writes, which change state and emit events) and queries (reads, which are optimised for the UI). This lets you scale and optimise each independently.