Skip to content

Data Pipeline

The data pipeline system provides automatic parallelization and dependency management for data loading. It prevents waterfalls by analyzing dependencies and running independent loaders in parallel.

Overview

ts
import { createPipeline, dataSource, cachedSource, optionalSource } from '@ereo/data';

createPipeline

Creates a data pipeline with automatic parallelization.

ts
const pipeline = createPipeline({
  loaders: {
    user: { load: async () => getUser() },
    posts: { load: async () => getPosts() },
    comments: { load: async ({ data }) => getComments(data.posts) },
  },
  dependencies: {
    comments: ['posts'],
  },
});

export const loader = pipeline.toLoader();

PipelineConfig

ts
interface PipelineConfig<TLoaders, P> {
  loaders: TLoaders;
  dependencies?: Partial<Record<keyof TLoaders, (keyof TLoaders)[]>>;
  onError?: (error: Error, key: string) => void;
  metrics?: boolean;
}

DataSource

ts
interface DataSource<T, P> {
  load: (args: LoaderArgs<P>) => T | Promise<T>;
  tags?: string[] | ((params: P) => string[]);
  ttl?: number;
  required?: boolean;
  fallback?: T;
}

Helper Functions

dataSource

Creates a simple data source:

ts
const userSource = dataSource(async ({ params }) => getUser(params.id));

cachedSource

Creates a cached data source:

ts
const cachedPosts = cachedSource(
  async () => db.posts.findMany(),
  { tags: ['posts'], ttl: 300 }
);

optionalSource

Creates a data source with fallback:

ts
const userPrefs = optionalSource(
  async ({ params }) => getUserPreferences(params.id),
  { defaultTheme: 'light' }
);

PipelineResult

ts
interface PipelineResult<TLoaders> {
  data: { [K in keyof TLoaders]: Awaited<ReturnType<TLoaders[K]['load']>> };
  metrics: PipelineMetrics;
  errors: Map<keyof TLoaders, Error>;
}

Metrics

ts
interface PipelineMetrics {
  total: number;
  loaders: Map<string, LoaderMetrics>;
  executionOrder: ExecutionStep[];
  parallelEfficiency: number;
  waterfalls: WaterfallInfo[];
}

Examples

Basic Pipeline

ts
const pipeline = createPipeline({
  loaders: {
    user: dataSource(async ({ params }) => getUser(params.id)),
    posts: dataSource(async ({ params }) => getPosts(params.id)),
  },
});

export const loader = pipeline.toLoader();

With Dependencies

ts
const pipeline = createPipeline({
  loaders: {
    user: dataSource(async () => getCurrentUser()),
    posts: dataSource(async () => getPosts()),
    comments: dataSource(async ({ data }) => getComments(data.posts)),
  },
  dependencies: {
    comments: ['posts'],
  },
});

With Error Handling

ts
const pipeline = createPipeline({
  loaders: {
    critical: { load: async () => getCriticalData(), required: true },
    optional: { load: async () => getOptionalData(), fallback: null },
  },
  onError: (error, key) => console.error(`${key} failed:`, error),
});

With Metrics

ts
const pipeline = createPipeline({
  loaders: { /* ... */ },
  metrics: true,
});

const result = await pipeline.execute(args);
console.log(formatMetrics(result.metrics));

Released under the MIT License.