Data Package

The @hazeljs/data package provides data processing and ETL capabilities for HazelJS applications. It includes declarative pipelines with schema validation, stream processing, built-in transformers, and data quality checks.

Purpose

Building data pipelines typically requires orchestrating multiple steps—normalization, validation, enrichment—with proper error handling and schema enforcement. The @hazeljs/data package simplifies this by providing:

  • Declarative Pipelines – Use @Pipeline, @Transform, and @Validate decorators to define ETL flows
  • Schema Validation – Fluent Schema API for type-safe validation (string, number, object, array, email, oneOf)
  • ETL Service – Execute multi-step pipelines with ordered execution and error handling
  • Built-in Transformers – trimString, toLowerCase, parseJson, pick, omit, renameKeys
  • Data Quality – QualityService for completeness, notNull, and custom checks
  • Stream Processing – StreamBuilder and StreamProcessor for batch and streaming workloads
  • Flink Integration – Optional Apache Flink deployment for distributed stream processing

Architecture

The package uses a decorator-driven pipeline architecture with service orchestration:

graph TD
  A["@Pipeline Decorator<br/>(Defines Pipeline Class)"] --> B["@Transform / @Validate<br/>(Step Decorators)"]
  B --> C["ETLService<br/>(Orchestrates Execution)"]
  C --> D["PipelineBase.execute()<br/>(Single Entry Point)"]
  
  E["SchemaValidator<br/>(Fluent Schema)"] --> F["@Validate Step"]
  
  G["StreamService"] --> H["Batch Processing<br/>(processBatch)"]
  G --> I["StreamProcessor<br/>(Streaming)"]
  
  C --> J["Step 1 → Step 2 → Step 3"]
  
  style A fill:#8b5cf6,stroke:#a78bfa,stroke-width:2px,color:#fff
  style B fill:#3b82f6,stroke:#60a5fa,stroke-width:2px,color:#fff
  style C fill:#10b981,stroke:#34d399,stroke-width:2px,color:#fff
  style D fill:#f59e0b,stroke:#fbbf24,stroke-width:2px,color:#fff

Key Components

  1. DataModule – Registers SchemaValidator, ETLService, PipelineBuilder, StreamService, QualityService, FlinkService
  2. ETLService – Executes pipelines by invoking decorated steps in order
  3. PipelineBase – Base class providing execute() for pipelines
  4. SchemaValidator – Validates data against fluent schemas
  5. Decorators@Pipeline, @Transform, @Validate, @Stream for declarative pipelines

Advantages

1. Declarative ETL

Define pipelines with decorators—no manual step wiring or execution logic.

2. Type-Safe Validation

Fluent Schema API with TypeScript support for validating request bodies, API responses, and data imports.

3. Reusable Transformers

Built-in and custom transformers compose easily within pipeline steps.

4. Data Quality Built-In

QualityService for completeness checks, null checks, and custom validation rules.

5. Batch and Streaming

StreamService for batch processing; StreamBuilder and Flink for streaming workloads.

Installation

npm install @hazeljs/data @hazeljs/core

Quick Start

1. Import DataModule

import { HazelApp } from '@hazeljs/core';
import { DataModule } from '@hazeljs/data';

const app = new HazelApp({
  imports: [DataModule.forRoot()],
});

app.listen(3000);

2. Define a Pipeline

import { Injectable } from '@hazeljs/core';
import {
  Pipeline,
  PipelineBase,
  Transform,
  Validate,
  ETLService,
  Schema,
} from '@hazeljs/data';

const OrderSchema = Schema.object()
  .prop('id', Schema.string().required())
  .prop('customerId', Schema.string().required())
  .prop('status', Schema.string().oneOf(['pending', 'paid', 'shipped', 'delivered', 'cancelled']))
  .prop('items', Schema.array().items(Schema.object()
    .prop('sku', Schema.string().minLength(1))
    .prop('qty', Schema.number().min(1))
    .prop('price', Schema.number().min(0))
  ))
  .required();

@Pipeline('order-processing')
@Injectable()
export class OrderProcessingPipeline extends PipelineBase {
  constructor(etlService: ETLService) {
    super(etlService);
  }

  @Transform({ step: 1, name: 'normalize' })
  async normalize(data: Record<string, unknown>): Promise<Record<string, unknown>> {
    return {
      ...data,
      status: String(data.status).toLowerCase(),
    };
  }

  @Validate({ step: 2, schema: OrderSchema })
  async validate(data: Record<string, unknown>): Promise<Record<string, unknown>> {
    return data;
  }

  @Transform({ step: 3, name: 'enrich' })
  async enrich(data: Record<string, unknown> & { items?: { qty: number; price: number }[] }): Promise<Record<string, unknown>> {
    const items = data.items ?? [];
    const subtotal = items.reduce((sum, i) => sum + i.qty * i.price, 0);
    const tax = subtotal * 0.1;
    return {
      ...data,
      subtotal,
      tax,
      total: subtotal + tax,
      processedAt: new Date().toISOString(),
    };
  }
}

3. Execute from a Controller

import { Controller, Post, Body, Inject } from '@hazeljs/core';
import { OrderProcessingPipeline } from './pipelines/order-processing.pipeline';

@Controller('data')
export class DataController {
  constructor(
    @Inject(OrderProcessingPipeline) private pipeline: OrderProcessingPipeline
  ) {}

  @Post('pipeline/orders')
  async processOrder(@Body() body: unknown) {
    const result = await this.pipeline.execute(body);
    return { ok: true, data: result };
  }
}

Batch Processing

Process arrays through pipelines with StreamService:

import { StreamService, ETLService } from '@hazeljs/data';

const streamService = new StreamService(etlService);
const results = await streamService.processBatch(OrderProcessingPipeline, orders);

Schema Validation

Build schemas with the fluent API:

import { Schema, SchemaValidator } from '@hazeljs/data';

const UserSchema = Schema.object()
  .prop('email', Schema.string().format('email').required())
  .prop('name', Schema.string().minLength(1).maxLength(200))
  .prop('age', Schema.number().min(0).max(150))
  .prop('role', Schema.string().oneOf(['user', 'admin', 'moderator', 'guest']))
  .required();

const validator = new SchemaValidator();
const { value, error } = validator.validate(UserSchema, rawData);

Data Quality

Run quality checks on datasets:

import { QualityService } from '@hazeljs/data';

const qualityService = new QualityService();
const report = await qualityService.check(records, {
  completeness: ['id', 'email', 'createdAt'],
  notNull: ['id', 'status'],
});

Flink Configuration

For distributed stream processing with Apache Flink:

DataModule.forRoot({
  flink: {
    url: process.env.FLINK_REST_URL ?? 'http://localhost:8081',
    timeout: 30000,
  },
});

Built-in Transformers

TransformerDescription
trimStringTrim whitespace from strings
toLowerCase / toUpperCaseCase conversion
parseJson / stringifyJsonJSON parsing and serialization
pickSelect specific keys from objects
omitRemove specific keys from objects
renameKeysRename object keys

Related Resources