Graph
Execution engine for running task graphs
The Graph is the runtime that:
- executes tasks
- tracks dependencies
- memoizes results
- optionally enforces per-lane concurrency (when
laneConcurrencyis provided) - emits observability events
Creating a Graph
import { graph } from "@hello-terrain/work";
const g = graph();The graph() function creates a new execution context with its own task registry and state.
API Reference
graph()
Creates a new graph instance.
Returns: Graph
Graph Methods
add(task)
Registers a task with the graph. Tasks must be added before they can be executed.
const myTask = task((get, work) => {
const v = get(someParam);
return work(() => v * 2);
});
g.add(myTask);| Parameter | Type | Description |
|---|---|---|
task | Task<T> | The task to register |
Returns: TaskRef<T> - The same task reference (for chaining)
Tasks that you pass as targets must be registered (via g.add(task)), otherwise the graph can’t run them.
Upstream tasks referenced via get(otherTask) will be registered automatically when discovered.
run(options?)
Executes the graph, computing all registered tasks (or specified targets) with dependency resolution.
const report = await g.run();
console.log(report.status); // "ok", "error", or "cancelled"| Option | Type | Description |
|---|---|---|
targets | TaskRef[] | Specific tasks to run (defaults to all) |
laneConcurrency | Record<string, number> | Max concurrent tasks per lane |
signal | AbortSignal | Abort signal for cancellation |
resources | unknown | Resources passed to task contexts |
Returns: Promise<RunReport>
dispose()
Releases resources held by the graph (currently: param subscriptions) and clears internal state. Use this when graphs are created/destroyed frequently (e.g. in hot-reload or per-level/per-scene lifecycles).
g.dispose();get(task)
Returns the computed value of a task. Throws if the task hasn't been run or is not in a ready state.
await g.run();
const result = g.get(myTask);| Parameter | Type | Description |
|---|---|---|
task | TaskRef<T> | The task to get the value for |
Returns: T - The computed value
Throws: NoComputedValueError if task is not ready, UnknownNodeError if task is not registered
peek(task)
Returns the computed value of a task, or undefined if not ready. Does not throw.
const maybeResult = g.peek(myTask);
if (maybeResult !== undefined) {
console.log("Got result:", maybeResult);
}| Parameter | Type | Description |
|---|---|---|
task | TaskRef<T> | The task to peek at |
Returns: T | undefined
set(param, callback)
Takes graph-local ownership of a param's value. After calling set(), the graph stores its
own copy and detaches from the external param.subscribe() flow. This enables multiple graphs
to share the same module-scope param() token with isolated runtime values.
import { param } from "@hello-terrain/work";
const rootSize = param(256);
const g = graph()
.add(myTask)
.set(rootSize, () => 128); // graph-local value: 128
// Update later
g.set(rootSize, (prev) => prev * 2); // graph-local value: 256| Parameter | Type | Description |
|---|---|---|
param | ParamRef<T> | The param to take ownership of |
callback | (prev: T) => T | Function that receives current value and returns new value |
Returns: Graph (for chaining)
After graph.set(), external param.set() calls no longer affect this graph's value for that param.
Other graphs that haven't called graph.set() on the same param are unaffected.
on(callback)
Subscribes to graph events for monitoring, logging, or debugging.
// Subscribe to all events (legacy)
const unsubAll = g.on((event) => {
console.log(event.type, event);
});
// Subscribe to a group of events
const unsubTasks = g.on("task:*", (event) => {
console.log(event.type, event.taskId);
});
// Subscribe to a single event type
const unsubCacheHit = g.on("task:cacheHit", (event) => {
console.log("cache hit", event.taskId);
});| Signature | Description |
|---|---|
on(cb) | Subscribe to all events (legacy) |
on("task:*", cb) | Subscribe to all task events |
on("run:*", cb) | Subscribe to all run events |
on("task:cacheHit", cb) | Subscribe to a single event type |
Returns: () => void - Unsubscribe function
Run Options
targets
Run only specific tasks (and their dependencies):
// Only compute these tasks and their dependencies
await g.run({
targets: [taskA, taskB],
});laneConcurrency
Control how many tasks can run in parallel per lane:
await g.run({
laneConcurrency: {
cpu: 4, // Max 4 CPU-bound tasks in parallel
io: 10, // Max 10 I/O-bound tasks in parallel
gpu: 1, // Only 1 GPU task at a time
},
});If laneConcurrency is omitted (or {}), the graph does not throttle task concurrency by lane.
When laneConcurrency is provided and non-empty:
- Tasks without an explicit lane default to
"cpu". - Lanes not listed in
laneConcurrencydefault to 1 permit.
signal
Cancel a running graph:
const controller = new AbortController();
// Start the run
const promise = g.run({ signal: controller.signal });
// Cancel after 5 seconds
setTimeout(() => controller.abort(), 5000);
const report = await promise;
console.log(report.status); // "cancelled"resources
Pass shared resources to all tasks:
const db = new Database();
const cache = new Cache();
await g.run({
resources: { db, cache },
});
// In tasks:
const fetchUser = task(async (get, work, ctx) => {
const { db } = ctx.resources;
return db.users.find(get(userId));
});Run Report
The run() method returns a report with execution statistics:
interface RunReport {
runId: string; // Unique run identifier
status: RunStatus; // "ok", "error", or "cancelled"
startedAt: number; // Start timestamp (ms)
finishedAt: number; // End timestamp (ms)
durationMs: number; // Total duration
taskCount: number; // Tasks executed (not from cache)
cacheHits: number; // Tasks served from cache
}const report = await g.run();
console.log(`Run ${report.runId}`);
console.log(`Status: ${report.status}`);
console.log(`Duration: ${report.durationMs}ms`);
console.log(`Tasks: ${report.taskCount} executed, ${report.cacheHits} cached`);Graph Events
Subscribe to events for observability:
| Event Type | Properties | Description |
|---|---|---|
run:start | runId, at | Run begins |
run:finish | runId, at, status | Run completes |
task:start | runId, taskId, at, lane | Task begins |
task:finish | runId, taskId, at, durationMs | Task completes |
task:cacheHit | runId, taskId, at | Task served from cache |
task:error | runId, taskId, at, durationMs, error | Task failed |
param:set | paramId, at | Param value set via graph.set() |
g.on("run:*", (event) => console.log(`[${event.runId}]`, event.type));
g.on("task:*", (event) => console.log(`[${event.runId}]`, event.type, event.taskId));
g.on("task:error", (event) => console.error(`[${event.runId}]`, event.taskId, event.error));
g.on("param:*", (event) => console.log("param event", event.type, event.paramId));Error Handling
UnknownNodeError
Thrown when attempting to access a task that hasn't been registered:
try {
g.get(unregisteredTask);
} catch (e) {
if (e instanceof UnknownNodeError) {
console.error(`Task ${e.id} is not registered`);
}
}NoComputedValueError
Thrown when accessing a task that hasn't been computed yet:
try {
g.get(myTask); // Without running first
} catch (e) {
if (e instanceof NoComputedValueError) {
console.error(`Task ${e.id} has not been computed`);
}
}CyclicalGraphError
Thrown when a cycle is detected during execution:
// This would create a cycle
const a = task((get, work) => {
const bv = get(b);
return work(() => bv + 1);
});
const b = task((get, work) => {
const av = get(a);
return work(() => av + 1);
});
// Running will throw CyclicalGraphErrorCancelledError
Thrown when a run is aborted:
const controller = new AbortController();
controller.abort();
try {
await g.run({ signal: controller.signal });
} catch (e) {
// Tasks may throw the abort reason
}Examples
Complete Example
import { param, task, graph } from "@hello-terrain/work";
// Inputs
const items = param<number[]>([1, 2, 3, 4, 5]).displayName("items");
const multiplier = param(2).displayName("multiplier");
// Computations
const doubled = task((get, work) => {
const xs = get(items);
const m = get(multiplier);
return work(() => xs.map((x) => x * m));
}).displayName("doubled");
const sum = task((get, work) => {
const xs = get(doubled);
return work(() => xs.reduce((a, b) => a + b, 0));
}).displayName("sum");
const average = task((get, work) => {
const s = get(sum);
const values = get(doubled);
return work(() => s / values.length);
}).displayName("average");
// Setup
const g = graph();
g.add(doubled);
g.add(sum);
g.add(average);
// Monitor execution
g.on((e) => {
if (e.type === "task:finish") {
console.log(`${e.taskId} took ${e.durationMs}ms`);
}
});
// Initial run
let report = await g.run();
console.log("Sum:", g.get(sum)); // 30
console.log("Average:", g.get(average)); // 6
console.log("Cache hits:", report.cacheHits); // 0
// Update input and re-run
multiplier.set(() => 3);
report = await g.run();
console.log("Sum:", g.get(sum)); // 45
console.log("Average:", g.get(average)); // 9
console.log("Cache hits:", report.cacheHits); // 0 (all recomputed due to dependency change)
// Re-run without changes
report = await g.run();
console.log("Cache hits:", report.cacheHits); // 3 (all from cache)Async Pipeline
const apiUrl = param("https://api.example.com");
const fetchUsers = task(async (get, work, ctx) => {
const url = get(apiUrl);
return work(async () => {
const response = await fetch(`${url}/users`, { signal: ctx.signal });
return response.json();
});
}).displayName("fetchUsers").lane("network");
const fetchPosts = task(async (get, work, ctx) => {
const url = get(apiUrl);
return work(async () => {
const response = await fetch(`${url}/posts`, { signal: ctx.signal });
return response.json();
});
}).displayName("fetchPosts").lane("network");
const combined = task((get, work) => {
const users = get(fetchUsers);
const posts = get(fetchPosts);
return work(() => ({ users, posts }));
}).displayName("combined");
const g = graph();
g.add(combined);
// Both fetches run in parallel (same lane, configured concurrency)
await g.run({
laneConcurrency: { network: 5 },
});
g.get(combined) // { users:[...], posts: [...] }Multi-Instance with Shared Tasks
Define params and tasks at module scope, then create multiple graphs with isolated values using graph.set():
import { param, task, graph } from "@hello-terrain/work";
// Module-scope params (shared declarations with defaults)
const rootSize = param(256);
const maxLevel = param(16);
// Module-scope task (no factory wrapper needed)
const configTask = task((get, work) => {
const size = get(rootSize);
const level = get(maxLevel);
return work(() => ({ size, level }));
}).displayName("configTask");
// Two graphs sharing the same task/param tokens, each with isolated values
const terrain1 = graph()
.add(configTask)
.set(rootSize, () => 128)
.set(maxLevel, () => 12);
const terrain2 = graph()
.add(configTask)
.set(rootSize, () => 512)
.set(maxLevel, () => 20);
await terrain1.run();
await terrain2.run();
console.log(terrain1.get(configTask)); // { size: 128, level: 12 }
console.log(terrain2.get(configTask)); // { size: 512, level: 20 }
// Per-instance updates
terrain1.set(rootSize, () => 256);
await terrain1.run();
console.log(terrain1.get(configTask)); // { size: 256, level: 12 }
console.log(terrain2.get(configTask)); // { size: 512, level: 20 } — unaffectedPerformance
This runtime is designed to be used in a hot loop; we track performance with mitata.
- Run locally:
pnpm --filter @hello-terrain/work bench - Latest published results: Benchmarks
Inspecting the Graph (inspect)
Use g.inspect() to export a snapshot of the graph for visualization/debugging. This does not
run tasks; it only reflects what the graph currently knows (including discovered dependencies).
const snapshot = g.inspect({ includeRuntime: true, includeTopo: true });
console.log(snapshot.nodes);
console.log(snapshot.edges);
console.log(snapshot.meta?.topoOrder);The returned shape is:
nodes: params + tasks (optionally includesstate,dirty,version)edges:from -> todependencies (kind: "param" | "task")meta: internal counters likecompileCountand optionaltopoOrder