Pardon our dust! All this is still a work in progress.

Graph

Execution engine for running task graphs

The Graph is the runtime that:

  • executes tasks
  • tracks dependencies
  • memoizes results
  • optionally enforces per-lane concurrency (when laneConcurrency is provided)
  • emits observability events

Creating a Graph

import { graph } from "@hello-terrain/work";

const g = graph();

The graph() function creates a new execution context with its own task registry and state.

API Reference

graph()

Creates a new graph instance.

Returns: Graph

Graph Methods

add(task)

Registers a task with the graph. Tasks must be added before they can be executed.

const myTask = task((get, work) => {
  const v = get(someParam);
  return work(() => v * 2);
});
g.add(myTask);
ParameterTypeDescription
taskTask<T>The task to register

Returns: TaskRef<T> - The same task reference (for chaining)

Tasks that you pass as targets must be registered (via g.add(task)), otherwise the graph can’t run them. Upstream tasks referenced via get(otherTask) will be registered automatically when discovered.

run(options?)

Executes the graph, computing all registered tasks (or specified targets) with dependency resolution.

const report = await g.run();
console.log(report.status); // "ok", "error", or "cancelled"
OptionTypeDescription
targetsTaskRef[]Specific tasks to run (defaults to all)
laneConcurrencyRecord<string, number>Max concurrent tasks per lane
signalAbortSignalAbort signal for cancellation
resourcesunknownResources passed to task contexts

Returns: Promise<RunReport>

dispose()

Releases resources held by the graph (currently: param subscriptions) and clears internal state. Use this when graphs are created/destroyed frequently (e.g. in hot-reload or per-level/per-scene lifecycles).

g.dispose();

get(task)

Returns the computed value of a task. Throws if the task hasn't been run or is not in a ready state.

await g.run();
const result = g.get(myTask);
ParameterTypeDescription
taskTaskRef<T>The task to get the value for

Returns: T - The computed value

Throws: NoComputedValueError if task is not ready, UnknownNodeError if task is not registered

peek(task)

Returns the computed value of a task, or undefined if not ready. Does not throw.

const maybeResult = g.peek(myTask);
if (maybeResult !== undefined) {
  console.log("Got result:", maybeResult);
}
ParameterTypeDescription
taskTaskRef<T>The task to peek at

Returns: T | undefined

set(param, callback)

Takes graph-local ownership of a param's value. After calling set(), the graph stores its own copy and detaches from the external param.subscribe() flow. This enables multiple graphs to share the same module-scope param() token with isolated runtime values.

import { param } from "@hello-terrain/work";

const rootSize = param(256);

const g = graph()
  .add(myTask)
  .set(rootSize, () => 128);   // graph-local value: 128

// Update later
g.set(rootSize, (prev) => prev * 2);  // graph-local value: 256
ParameterTypeDescription
paramParamRef<T>The param to take ownership of
callback(prev: T) => TFunction that receives current value and returns new value

Returns: Graph (for chaining)

After graph.set(), external param.set() calls no longer affect this graph's value for that param. Other graphs that haven't called graph.set() on the same param are unaffected.

on(callback)

Subscribes to graph events for monitoring, logging, or debugging.

// Subscribe to all events (legacy)
const unsubAll = g.on((event) => {
  console.log(event.type, event);
});

// Subscribe to a group of events
const unsubTasks = g.on("task:*", (event) => {
  console.log(event.type, event.taskId);
});

// Subscribe to a single event type
const unsubCacheHit = g.on("task:cacheHit", (event) => {
  console.log("cache hit", event.taskId);
});
SignatureDescription
on(cb)Subscribe to all events (legacy)
on("task:*", cb)Subscribe to all task events
on("run:*", cb)Subscribe to all run events
on("task:cacheHit", cb)Subscribe to a single event type

Returns: () => void - Unsubscribe function

Run Options

targets

Run only specific tasks (and their dependencies):

// Only compute these tasks and their dependencies
await g.run({
  targets: [taskA, taskB],
});

laneConcurrency

Control how many tasks can run in parallel per lane:

await g.run({
  laneConcurrency: {
    cpu: 4,     // Max 4 CPU-bound tasks in parallel
    io: 10,     // Max 10 I/O-bound tasks in parallel
    gpu: 1,     // Only 1 GPU task at a time
  },
});

If laneConcurrency is omitted (or {}), the graph does not throttle task concurrency by lane.

When laneConcurrency is provided and non-empty:

  • Tasks without an explicit lane default to "cpu".
  • Lanes not listed in laneConcurrency default to 1 permit.

signal

Cancel a running graph:

const controller = new AbortController();

// Start the run
const promise = g.run({ signal: controller.signal });

// Cancel after 5 seconds
setTimeout(() => controller.abort(), 5000);

const report = await promise;
console.log(report.status); // "cancelled"

resources

Pass shared resources to all tasks:

const db = new Database();
const cache = new Cache();

await g.run({
  resources: { db, cache },
});

// In tasks:
const fetchUser = task(async (get, work, ctx) => {
  const { db } = ctx.resources;
  return db.users.find(get(userId));
});

Run Report

The run() method returns a report with execution statistics:

interface RunReport {
  runId: string;      // Unique run identifier
  status: RunStatus;  // "ok", "error", or "cancelled"
  startedAt: number;  // Start timestamp (ms)
  finishedAt: number; // End timestamp (ms)
  durationMs: number; // Total duration
  taskCount: number;  // Tasks executed (not from cache)
  cacheHits: number;  // Tasks served from cache
}
const report = await g.run();

console.log(`Run ${report.runId}`);
console.log(`Status: ${report.status}`);
console.log(`Duration: ${report.durationMs}ms`);
console.log(`Tasks: ${report.taskCount} executed, ${report.cacheHits} cached`);

Graph Events

Subscribe to events for observability:

Event TypePropertiesDescription
run:startrunId, atRun begins
run:finishrunId, at, statusRun completes
task:startrunId, taskId, at, laneTask begins
task:finishrunId, taskId, at, durationMsTask completes
task:cacheHitrunId, taskId, atTask served from cache
task:errorrunId, taskId, at, durationMs, errorTask failed
param:setparamId, atParam value set via graph.set()
g.on("run:*", (event) => console.log(`[${event.runId}]`, event.type));
g.on("task:*", (event) => console.log(`[${event.runId}]`, event.type, event.taskId));
g.on("task:error", (event) => console.error(`[${event.runId}]`, event.taskId, event.error));
g.on("param:*", (event) => console.log("param event", event.type, event.paramId));

Error Handling

UnknownNodeError

Thrown when attempting to access a task that hasn't been registered:

try {
  g.get(unregisteredTask);
} catch (e) {
  if (e instanceof UnknownNodeError) {
    console.error(`Task ${e.id} is not registered`);
  }
}

NoComputedValueError

Thrown when accessing a task that hasn't been computed yet:

try {
  g.get(myTask); // Without running first
} catch (e) {
  if (e instanceof NoComputedValueError) {
    console.error(`Task ${e.id} has not been computed`);
  }
}

CyclicalGraphError

Thrown when a cycle is detected during execution:

// This would create a cycle
const a = task((get, work) => {
  const bv = get(b);
  return work(() => bv + 1);
});
const b = task((get, work) => {
  const av = get(a);
  return work(() => av + 1);
});

// Running will throw CyclicalGraphError

CancelledError

Thrown when a run is aborted:

const controller = new AbortController();
controller.abort();

try {
  await g.run({ signal: controller.signal });
} catch (e) {
  // Tasks may throw the abort reason
}

Examples

Complete Example

import { param, task, graph } from "@hello-terrain/work";

// Inputs
const items = param<number[]>([1, 2, 3, 4, 5]).displayName("items");
const multiplier = param(2).displayName("multiplier");

// Computations
const doubled = task((get, work) => {
  const xs = get(items);
  const m = get(multiplier);
  return work(() => xs.map((x) => x * m));
}).displayName("doubled");

const sum = task((get, work) => {
  const xs = get(doubled);
  return work(() => xs.reduce((a, b) => a + b, 0));
}).displayName("sum");

const average = task((get, work) => {
  const s = get(sum);
  const values = get(doubled);
  return work(() => s / values.length);
}).displayName("average");

// Setup
const g = graph();
g.add(doubled);
g.add(sum);
g.add(average);

// Monitor execution
g.on((e) => {
  if (e.type === "task:finish") {
    console.log(`${e.taskId} took ${e.durationMs}ms`);
  }
});

// Initial run
let report = await g.run();
console.log("Sum:", g.get(sum));         // 30
console.log("Average:", g.get(average)); // 6
console.log("Cache hits:", report.cacheHits); // 0

// Update input and re-run
multiplier.set(() => 3);
report = await g.run();
console.log("Sum:", g.get(sum));         // 45
console.log("Average:", g.get(average)); // 9
console.log("Cache hits:", report.cacheHits); // 0 (all recomputed due to dependency change)

// Re-run without changes
report = await g.run();
console.log("Cache hits:", report.cacheHits); // 3 (all from cache)

Async Pipeline

const apiUrl = param("https://api.example.com");

const fetchUsers = task(async (get, work, ctx) => {
  const url = get(apiUrl);
  return work(async () => {
    const response = await fetch(`${url}/users`, { signal: ctx.signal });
    return response.json();
  });
}).displayName("fetchUsers").lane("network");

const fetchPosts = task(async (get, work, ctx) => {
  const url = get(apiUrl);
  return work(async () => {
    const response = await fetch(`${url}/posts`, { signal: ctx.signal });
    return response.json();
  });
}).displayName("fetchPosts").lane("network");

const combined = task((get, work) => {
  const users = get(fetchUsers);
  const posts = get(fetchPosts);
  return work(() => ({ users, posts }));
}).displayName("combined");

const g = graph();
g.add(combined);

// Both fetches run in parallel (same lane, configured concurrency)
await g.run({
  laneConcurrency: { network: 5 },
});

g.get(combined) // { users:[...], posts: [...] }

Multi-Instance with Shared Tasks

Define params and tasks at module scope, then create multiple graphs with isolated values using graph.set():

import { param, task, graph } from "@hello-terrain/work";

// Module-scope params (shared declarations with defaults)
const rootSize = param(256);
const maxLevel = param(16);

// Module-scope task (no factory wrapper needed)
const configTask = task((get, work) => {
  const size = get(rootSize);
  const level = get(maxLevel);
  return work(() => ({ size, level }));
}).displayName("configTask");

// Two graphs sharing the same task/param tokens, each with isolated values
const terrain1 = graph()
  .add(configTask)
  .set(rootSize, () => 128)
  .set(maxLevel, () => 12);

const terrain2 = graph()
  .add(configTask)
  .set(rootSize, () => 512)
  .set(maxLevel, () => 20);

await terrain1.run();
await terrain2.run();

console.log(terrain1.get(configTask)); // { size: 128, level: 12 }
console.log(terrain2.get(configTask)); // { size: 512, level: 20 }

// Per-instance updates
terrain1.set(rootSize, () => 256);
await terrain1.run();
console.log(terrain1.get(configTask)); // { size: 256, level: 12 }
console.log(terrain2.get(configTask)); // { size: 512, level: 20 } — unaffected

Performance

This runtime is designed to be used in a hot loop; we track performance with mitata.

  • Run locally: pnpm --filter @hello-terrain/work bench
  • Latest published results: Benchmarks

Inspecting the Graph (inspect)

Use g.inspect() to export a snapshot of the graph for visualization/debugging. This does not run tasks; it only reflects what the graph currently knows (including discovered dependencies).

const snapshot = g.inspect({ includeRuntime: true, includeTopo: true });

console.log(snapshot.nodes);
console.log(snapshot.edges);
console.log(snapshot.meta?.topoOrder);

The returned shape is:

  • nodes: params + tasks (optionally includes state, dirty, version)
  • edges: from -> to dependencies (kind: "param" | "task")
  • meta: internal counters like compileCount and optional topoOrder