Skip to main content

Real-Time Streaming

SDK handles streaming for you

For most use cases, you don't need to manage SSE directly. Use .promise() for components and WorkflowClient.stream() or waitForRun() for workflows — both handle the SSE connection automatically.

This page is reference material for custom implementations.

White Rabbit exposes Server-Sent Events (SSE) endpoints for both workflow runs and component executions. Instead of polling, subscribe once and receive events as they happen.


Why SSE?

  • Zero polling — the server pushes events to you
  • Instant feedback — see each stage complete in real time
  • Automatic close — the stream closes itself when the run reaches a terminal state
  • Heartbeat — a :ping comment is sent every 15 seconds to keep proxies alive

SDK handles SSE automatically

For component execution, .promise() opens and closes the SSE connection:

import { WorkspaceClient, ComponentModule } from 'caller-sdk';

const workspace = new WorkspaceClient({ apiKey: process.env.WR_API_KEY! });

const result = await workspace
.call(ComponentModule.WAIT_FOR_EVM_TRANSACTION, {
jsonRpcUrl: 'https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY',
transactionHash: '0xabc123...',
})
.promise({ timeoutMs: 120_000 });

console.log('Confirmed:', result);

For workflow runs, use WorkflowClient.stream() or waitForRun():

import { WorkflowClient } from 'caller-sdk';

const workflow = new WorkflowClient({
apiKey: process.env.WR_API_KEY!,
workflowId: process.env.WR_WORKFLOW_ID!,
});
const { runId } = await workflow.trigger();
const run = await workflow.waitForRun(runId); // SSE-backed, resolves on completion

Stream a component execution

SSE /v1/sdk/components/executions/:executionId/stream

Auth: X-Api-Key

Use workspace.execution.stream() — handles connection lifecycle, buffering, and error recovery automatically:

import { WorkspaceClient, ComponentModule } from 'caller-sdk';

const workspace = new WorkspaceClient({ apiKey: process.env.WR_API_KEY! });

const execution = await workspace
.call(ComponentModule.WAIT_FOR_EVM_TRANSACTION, {
jsonRpcUrl: 'https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY',
transactionHash: '0xabc123...',
})
.execute();

const sub = workspace.execution.stream(execution.id, {
onUpdate(event) {
console.log(event.status, event.output);
// stream auto-closes on COMPLETED / FAILED
},
onError(err) {
console.error('Stream error:', err.message);
},
});

// sub.close() to cancel early

Event shape

interface ExecutionStreamEvent {
id: string;
status: 'CREATED' | 'EXECUTING' | 'COMPLETED' | 'FAILED';
output: unknown | null; // populated on COMPLETED
error: unknown | null; // populated on FAILED
timestamp: string;
}

Manual — Node.js (native fetch)

import { WorkspaceClient, ComponentModule } from 'caller-sdk';

const workspace = new WorkspaceClient({ apiKey: process.env.WR_API_KEY! });

// 1. Create execution
const { id: executionId } = await workspace
.call(ComponentModule.WAIT_FOR_EVM_TRANSACTION, {
jsonRpcUrl: 'https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY',
transactionHash: '0xabc123...',
})
.execute()
.then(e => e);

// 2. Open SSE stream manually (prefer workspace.execution.stream() above)
const response = await fetch(
`https://api.whiterabbit.app/v1/sdk/components/executions/${executionId}/stream`,
{ headers: { 'X-Api-Key': process.env.WR_API_KEY! } },
);

const reader = response.body!.getReader();
const decoder = new TextDecoder();

while (true) {
const { done, value } = await reader.read();
if (done) break;

for (const line of decoder.decode(value).split('\n')) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6).trim();
if (!data || data === ':ping') continue;

const event: ExecutionStreamEvent = JSON.parse(data);
console.log(event.status);

if (event.status === 'COMPLETED' || event.status === 'FAILED') {
reader.cancel();
break;
}
}
}

Node.js (eventsource package)

npm install eventsource
import EventSource from 'eventsource';

const source = new EventSource(
`https://api.whiterabbit.app/v1/sdk/components/executions/${executionId}/stream`,
{ headers: { 'X-Api-Key': process.env.WR_API_KEY! } },
);

source.onmessage = (event) => {
if (event.data === ':ping') return;
const update: ExecutionStreamEvent = JSON.parse(event.data);

if (update.status === 'COMPLETED') {
console.log('Output:', update.output);
source.close();
}
if (update.status === 'FAILED') {
console.error('Failed:', update.error);
source.close();
}
};

Stream a workflow run

SSE /v1/sdk/workflows/executions/:runId/stream

Auth: X-Api-Key

Event shape

interface RunStreamEvent {
id: string; // Run UUID
status: 'CREATED' | 'EXECUTING' | 'COMPLETED' | 'FAILED' | 'CANCELED';
output: {
totalUsage?: number;
pendingStageCount?: number;
failedStageCount?: number;
cancelRequestedAt?: string | null;
cancelReason?: string | null;
nonTerminalStages?: Array<{ // Currently active stages
id: string;
module: string;
status: 'CREATED' | 'RUNNING';
executionEventId: string | null;
executionAttempt: number;
}>;
};
timestamp: string; // ISO 8601
}

Browser (native EventSource)

info

The native EventSource API does not support custom request headers. For browser clients, proxy the stream through your backend or use your app's session cookie for authenticated users.

const source = new EventSource(
`https://api.whiterabbit.app/v1/sdk/workflows/executions/${runId}/stream`,
);

source.onmessage = (event) => {
if (!event.data || event.data === ':ping') return;

const update: RunStreamEvent = JSON.parse(event.data);

switch (update.status) {
case 'EXECUTING':
console.log(`Running — ${update.output.pendingStageCount} stages pending`);
break;
case 'COMPLETED':
console.log(`Done! Used ${update.output.totalUsage} credits`);
source.close();
break;
case 'FAILED':
console.error(`Run failed after ${update.output.failedStageCount} stage failures`);
source.close();
break;
case 'CANCELED':
console.warn(`Canceled: ${update.output.cancelReason}`);
source.close();
break;
}
};

source.onerror = () => {
source.close();
fetchRunStatus(runId).then(console.log);
};

Node.js (native fetch)

const response = await fetch(
`https://api.whiterabbit.app/v1/sdk/workflows/executions/${runId}/stream`,
{ headers: { 'X-Api-Key': process.env.WR_API_KEY! } },
);

const reader = response.body!.getReader();
const decoder = new TextDecoder();

while (true) {
const { done, value } = await reader.read();
if (done) break;

const chunk = decoder.decode(value);
for (const line of chunk.split('\n')) {
if (line.startsWith('data: ')) {
const data = line.slice(6).trim();
if (!data || data === ':ping') continue;
const event: RunStreamEvent = JSON.parse(data);
console.log(event.status, event.output?.pendingStageCount);
}
}
}

Connection behavior

ScenarioBehavior
Execution/run already COMPLETED when you connectFirst event emitted immediately, then stream closes
Execution/run already FAILEDFirst event emitted with error, then stream closes
Proxy/load balancer timeout:ping heartbeat (every 15s) prevents idle disconnects
Stream closes unexpectedlyFetch final status via REST; re-subscribing will immediately emit current state

React hook example

import { useEffect, useState } from 'react';

interface RunState {
status: string;
pendingStageCount: number;
totalUsage?: number;
}

export function useRunStream(runId: string | null) {
const [state, setState] = useState<RunState | null>(null);

useEffect(() => {
if (!runId) return;

// Proxy through your backend to add auth header
const source = new EventSource(`/api/workflow-run-stream?runId=${runId}`);

source.onmessage = (e) => {
if (!e.data || e.data === ':ping') return;
const update = JSON.parse(e.data);
setState({
status: update.status,
pendingStageCount: update.output?.pendingStageCount ?? 0,
totalUsage: update.output?.totalUsage,
});

if (update.status === 'COMPLETED' || update.status === 'FAILED') {
source.close();
}
};

return () => source.close();
}, [runId]);

return state;
}