Flow Package
The @hazeljs/flow package is a durable execution graph engine for Node.js—a workflow OS kernel that handles persistence, retries, timeouts, idempotency, and concurrency so you can focus on business logic. It is fully independent of Hazel core and works with Express, Fastify, NestJS, or plain Node.
Purpose
Modern applications rely on workflows: multi-step processes that span services, require human input, and must survive restarts (order fulfillment, approval flows, fraud review, onboarding). The @hazeljs/flow package provides:
- In-memory by default — No database or env vars required; add optional Postgres when you need durability
- Execution graphs — Nodes and edges with branching, conditional logic, and first-class WAIT/resume
- Durability — Optional Prisma storage for crash recovery and multi-process safety
- Audit trail — Timeline of events for every run
- Retries, timeouts, idempotency — Built into the engine
Key Features
| Feature | Description |
|---|---|
| Zero config | In-memory storage out of the box; no DATABASE_URL or migrations required |
| Decorator API | @Flow, @Entry, @Node, @Edge — define flows in one file |
| Wait & resume | Return { status: 'wait', reason: '...' }; call resumeRun(runId, payload) when ready |
| Idempotency | Per-node keys so payment/notification steps never run twice |
| Retries & backoff | maxAttempts, backoff: 'fixed' | 'exponential', baseDelayMs, maxDelayMs |
| Timeouts | Per-node timeoutMs so stuck nodes don't block forever |
| Conditional edges | Branch with when(ctx) predicates; deterministic, no silent wrong-path bugs |
| Concurrency safety | Postgres advisory locks (with Prisma) or in-process per-run lock |
Architecture
graph TD A["Your App / Flow Runtime"] --> B["FlowEngine"] B --> C["Storage"] C --> D["In-Memory (default)"] C --> E["Prisma (optional)"] E --> F["Postgres"] B --> G["Flow Definitions"] G --> H["@Flow @Node @Edge"] B --> I["Runs & Timeline"] style A fill:#3b82f6,stroke:#60a5fa,color:#fff style B fill:#6366f1,stroke:#818cf8,color:#fff style C fill:#8b5cf6,stroke:#a78bfa,color:#fff style D fill:#10b981,stroke:#34d399,color:#fff style E fill:#f59e0b,stroke:#fbbf24,color:#fff style G fill:#ec4899,stroke:#f472b6,color:#fff
- FlowEngine — Registers definitions, starts runs, ticks execution, resumes waiting runs
- Storage — In-memory by default; use
createPrismaStorage(prisma)from@hazeljs/flow/prismafor Postgres - Flow definitions — Built with decorators or the functional
flow()builder
Installation
No database required for in-memory mode:
pnpm add @hazeljs/flow
For durable persistence (optional):
pnpm add @hazeljs/flow @prisma/client
Then run the flow package's Prisma migrations (see Persistence).
Quick Start
Define a flow (decorator-based)
import { FlowEngine, Flow, Entry, Node, Edge, buildFlowDefinition } from '@hazeljs/flow';
import type { FlowContext, NodeResult } from '@hazeljs/flow';
@Flow('order-flow', '1.0.0')
class OrderFlow {
@Entry()
@Node('validate')
@Edge('charge')
async validate(ctx: FlowContext): Promise<NodeResult> {
// validate input
return { status: 'ok', output: { orderId: ctx.input.orderId } };
}
@Node('charge')
async charge(ctx: FlowContext): Promise<NodeResult> {
// charge payment
return { status: 'ok', output: { charged: true } };
}
}
const engine = new FlowEngine(); // in-memory storage
await engine.registerDefinition(buildFlowDefinition(OrderFlow));
const { runId } = await engine.startRun({
flowId: 'order-flow',
version: '1.0.0',
input: { orderId: 'ORD-123' },
});
let run = await engine.getRun(runId);
while (run?.status === 'RUNNING') {
run = await engine.tick(runId);
}
Functional builder (alternative)
import { FlowEngine, flow } from '@hazeljs/flow';
const def = flow('my-flow', '1.0.0')
.entry('start')
.node('start', async (ctx) => ({ status: 'ok', output: 1 }))
.node('end', async (ctx) => ({ status: 'ok', output: ctx.outputs.start }))
.edge('start', 'end')
.build();
const engine = new FlowEngine();
await engine.registerDefinition(def);
Persistence
By default the engine uses in-memory storage. For crash recovery and multi-process safety, use Postgres via the Prisma adapter:
- Install:
pnpm add @prisma/client - Set
DATABASE_URLand run migrations from the flow package'sprisma/schema - Create the engine with Prisma storage:
import { FlowEngine, buildFlowDefinition } from '@hazeljs/flow';
import { createPrismaStorage, createFlowPrismaClient } from '@hazeljs/flow/prisma';
const prisma = createFlowPrismaClient(); // uses DATABASE_URL
const engine = new FlowEngine({ storage: createPrismaStorage(prisma) });
// same registerDefinition, startRun, tick...
The migration SQL in @hazeljs/flow's prisma/migrations/ is only needed when you use this adapter.
Wait and resume (human-in-the-loop)
Pause a run for external input (approval, webhook, payment callback):
@Node('await-approval')
async awaitApproval(ctx: FlowContext): Promise<NodeResult> {
return { status: 'wait', reason: 'awaiting_approval' };
}
The run is persisted (or held in memory). When the approval arrives, resume it:
await engine.resumeRun(runId, { approved: true });
No polling or custom state tables. With Flow Runtime, your UI or approval service calls POST /v1/runs/:runId/resume.
Idempotency
Prevent duplicate side effects (e.g. charging a card twice) with per-node idempotency keys:
@Node('charge', { idempotencyKey: (ctx) => `order:${ctx.input.orderId}:charge` })
async charge(ctx: FlowContext): Promise<NodeResult> {
// only runs once per key; cached output reused on retry
return { status: 'ok', output: { charged: true } };
}
Retries and timeouts
Attach a retry policy and timeout to any node:
@Node('call-api', {
retry: { maxAttempts: 3, backoff: 'exponential', baseDelayMs: 1000, maxDelayMs: 10000 },
timeoutMs: 5000,
})
async callApi(ctx: FlowContext): Promise<NodeResult> {
// ...
}
Conditional edges (branching)
Branch on run state with when(ctx):
@Edge('score', 'approve', { when: (ctx) => (ctx.outputs.score as number) < 30 })
@Edge('score', 'review', { when: (ctx) => (ctx.outputs.score as number) < 70 })
@Edge('score', 'reject')
Edges are evaluated by priority; multiple matches at the same priority yield AMBIGUOUS_EDGE.
Run as an HTTP service
Use @hazeljs/flow-runtime to run flows behind a REST API, or invoke it from your app with runFlowRuntime({ flows, port, databaseUrl?, services }) so you don't reimplement the server.
When to use it
Good fit: Order processing, approval workflows, fraud/review queues, onboarding, AI agent orchestration, document workflows, multi-step integrations with retries and audit.
Less ideal: Simple one-off jobs (use Cron or a queue), real-time streaming (use WebSocket), high-throughput event sourcing (use Kafka).
What's next?
- Flow Runtime — Standalone HTTP service and
runFlowRuntime()for your app - Prisma — Optional persistence adapter uses Prisma; same patterns for app data
- Cron — For scheduled jobs; use Flow for multi-step workflows with wait/resume