Flow Package

npm downloads

The @hazeljs/flow package is a durable execution graph engine for Node.js—a workflow OS kernel that handles persistence, retries, timeouts, idempotency, and concurrency so you can focus on business logic. It is fully independent of Hazel core and works with Express, Fastify, NestJS, or plain Node.

Purpose

Modern applications rely on workflows: multi-step processes that span services, require human input, and must survive restarts (order fulfillment, approval flows, fraud review, onboarding). The @hazeljs/flow package provides:

  • In-memory by default — No database or env vars required; add optional Postgres when you need durability
  • Execution graphs — Nodes and edges with branching, conditional logic, and first-class WAIT/resume
  • Durability — Optional Prisma storage for crash recovery and multi-process safety
  • Audit trail — Timeline of events for every run
  • Retries, timeouts, idempotency — Built into the engine

Key Features

FeatureDescription
Zero configIn-memory storage out of the box; no DATABASE_URL or migrations required
Decorator API@Flow, @Entry, @Node, @Edge — define flows in one file
Wait & resumeReturn { status: 'wait', reason: '...' }; call resumeRun(runId, payload) when ready
IdempotencyPer-node keys so payment/notification steps never run twice
Retries & backoffmaxAttempts, backoff: 'fixed' | 'exponential', baseDelayMs, maxDelayMs
TimeoutsPer-node timeoutMs so stuck nodes don't block forever
Conditional edgesBranch with when(ctx) predicates; deterministic, no silent wrong-path bugs
Concurrency safetyPostgres advisory locks (with Prisma) or in-process per-run lock

Architecture

graph TD
  A["Your App / Flow Runtime"] --> B["FlowEngine"]
  B --> C["Storage"]
  C --> D["In-Memory (default)"]
  C --> E["Prisma (optional)"]
  E --> F["Postgres"]
  B --> G["Flow Definitions"]
  G --> H["@Flow @Node @Edge"]
  B --> I["Runs & Timeline"]
  style A fill:#3b82f6,stroke:#60a5fa,color:#fff
  style B fill:#6366f1,stroke:#818cf8,color:#fff
  style C fill:#8b5cf6,stroke:#a78bfa,color:#fff
  style D fill:#10b981,stroke:#34d399,color:#fff
  style E fill:#f59e0b,stroke:#fbbf24,color:#fff
  style G fill:#ec4899,stroke:#f472b6,color:#fff
  • FlowEngine — Registers definitions, starts runs, ticks execution, resumes waiting runs
  • Storage — In-memory by default; use createPrismaStorage(prisma) from @hazeljs/flow/prisma for Postgres
  • Flow definitions — Built with decorators or the functional flow() builder

Installation

No database required for in-memory mode:

pnpm add @hazeljs/flow

For durable persistence (optional):

pnpm add @hazeljs/flow @prisma/client

Then run the flow package's Prisma migrations (see Persistence).

Quick Start

Define a flow (decorator-based)

import { FlowEngine, Flow, Entry, Node, Edge, buildFlowDefinition } from '@hazeljs/flow';
import type { FlowContext, NodeResult } from '@hazeljs/flow';

@Flow('order-flow', '1.0.0')
class OrderFlow {
  @Entry()
  @Node('validate')
  @Edge('charge')
  async validate(ctx: FlowContext): Promise<NodeResult> {
    // validate input
    return { status: 'ok', output: { orderId: ctx.input.orderId } };
  }

  @Node('charge')
  async charge(ctx: FlowContext): Promise<NodeResult> {
    // charge payment
    return { status: 'ok', output: { charged: true } };
  }
}

const engine = new FlowEngine();  // in-memory storage
await engine.registerDefinition(buildFlowDefinition(OrderFlow));

const { runId } = await engine.startRun({
  flowId: 'order-flow',
  version: '1.0.0',
  input: { orderId: 'ORD-123' },
});

let run = await engine.getRun(runId);
while (run?.status === 'RUNNING') {
  run = await engine.tick(runId);
}

Functional builder (alternative)

import { FlowEngine, flow } from '@hazeljs/flow';

const def = flow('my-flow', '1.0.0')
  .entry('start')
  .node('start', async (ctx) => ({ status: 'ok', output: 1 }))
  .node('end', async (ctx) => ({ status: 'ok', output: ctx.outputs.start }))
  .edge('start', 'end')
  .build();

const engine = new FlowEngine();
await engine.registerDefinition(def);

Persistence

By default the engine uses in-memory storage. For crash recovery and multi-process safety, use Postgres via the Prisma adapter:

  1. Install: pnpm add @prisma/client
  2. Set DATABASE_URL and run migrations from the flow package's prisma/ schema
  3. Create the engine with Prisma storage:
import { FlowEngine, buildFlowDefinition } from '@hazeljs/flow';
import { createPrismaStorage, createFlowPrismaClient } from '@hazeljs/flow/prisma';

const prisma = createFlowPrismaClient();  // uses DATABASE_URL
const engine = new FlowEngine({ storage: createPrismaStorage(prisma) });
// same registerDefinition, startRun, tick...

The migration SQL in @hazeljs/flow's prisma/migrations/ is only needed when you use this adapter.

Wait and resume (human-in-the-loop)

Pause a run for external input (approval, webhook, payment callback):

@Node('await-approval')
async awaitApproval(ctx: FlowContext): Promise<NodeResult> {
  return { status: 'wait', reason: 'awaiting_approval' };
}

The run is persisted (or held in memory). When the approval arrives, resume it:

await engine.resumeRun(runId, { approved: true });

No polling or custom state tables. With Flow Runtime, your UI or approval service calls POST /v1/runs/:runId/resume.

Idempotency

Prevent duplicate side effects (e.g. charging a card twice) with per-node idempotency keys:

@Node('charge', { idempotencyKey: (ctx) => `order:${ctx.input.orderId}:charge` })
async charge(ctx: FlowContext): Promise<NodeResult> {
  // only runs once per key; cached output reused on retry
  return { status: 'ok', output: { charged: true } };
}

Retries and timeouts

Attach a retry policy and timeout to any node:

@Node('call-api', {
  retry: { maxAttempts: 3, backoff: 'exponential', baseDelayMs: 1000, maxDelayMs: 10000 },
  timeoutMs: 5000,
})
async callApi(ctx: FlowContext): Promise<NodeResult> {
  // ...
}

Conditional edges (branching)

Branch on run state with when(ctx):

@Edge('score', 'approve', { when: (ctx) => (ctx.outputs.score as number) < 30 })
@Edge('score', 'review', { when: (ctx) => (ctx.outputs.score as number) < 70 })
@Edge('score', 'reject')

Edges are evaluated by priority; multiple matches at the same priority yield AMBIGUOUS_EDGE.

Run as an HTTP service

Use @hazeljs/flow-runtime to run flows behind a REST API, or invoke it from your app with runFlowRuntime({ flows, port, databaseUrl?, services }) so you don't reimplement the server.

When to use it

Good fit: Order processing, approval workflows, fraud/review queues, onboarding, AI agent orchestration, document workflows, multi-step integrations with retries and audit.

Less ideal: Simple one-off jobs (use Cron or a queue), real-time streaming (use WebSocket), high-throughput event sourcing (use Kafka).

What's next?

  • Flow Runtime — Standalone HTTP service and runFlowRuntime() for your app
  • Prisma — Optional persistence adapter uses Prisma; same patterns for app data
  • Cron — For scheduled jobs; use Flow for multi-step workflows with wait/resume