Real-Time Streaming
For most use cases, you don't need to manage SSE directly. Use .promise() for components and WorkflowClient.stream() or waitForRun() for workflows — both handle the SSE connection automatically.
This page is reference material for custom implementations.
White Rabbit exposes Server-Sent Events (SSE) endpoints for both workflow runs and component executions. Instead of polling, subscribe once and receive events as they happen.
Why SSE?
- Zero polling — the server pushes events to you
- Instant feedback — see each stage complete in real time
- Automatic close — the stream closes itself when the run reaches a terminal state
- Heartbeat — a
:pingcomment is sent every 15 seconds to keep proxies alive
SDK handles SSE automatically
For component execution, .promise() opens and closes the SSE connection:
import { WorkspaceClient, ComponentModule } from 'caller-sdk';
const workspace = new WorkspaceClient({ apiKey: process.env.WR_API_KEY! });
const result = await workspace
.call(ComponentModule.WAIT_FOR_EVM_TRANSACTION, {
jsonRpcUrl: 'https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY',
transactionHash: '0xabc123...',
})
.promise({ timeoutMs: 120_000 });
console.log('Confirmed:', result);
For workflow runs, use WorkflowClient.stream() or waitForRun():
import { WorkflowClient } from 'caller-sdk';
const workflow = new WorkflowClient({
apiKey: process.env.WR_API_KEY!,
workflowId: process.env.WR_WORKFLOW_ID!,
});
const { runId } = await workflow.trigger();
const run = await workflow.waitForRun(runId); // SSE-backed, resolves on completion
Stream a component execution
SSE /v1/sdk/components/executions/:executionId/stream
Auth: X-Api-Key
SDK (recommended)
Use workspace.execution.stream() — handles connection lifecycle, buffering, and error recovery automatically:
import { WorkspaceClient, ComponentModule } from 'caller-sdk';
const workspace = new WorkspaceClient({ apiKey: process.env.WR_API_KEY! });
const execution = await workspace
.call(ComponentModule.WAIT_FOR_EVM_TRANSACTION, {
jsonRpcUrl: 'https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY',
transactionHash: '0xabc123...',
})
.execute();
const sub = workspace.execution.stream(execution.id, {
onUpdate(event) {
console.log(event.status, event.output);
// stream auto-closes on COMPLETED / FAILED
},
onError(err) {
console.error('Stream error:', err.message);
},
});
// sub.close() to cancel early
Event shape
interface ExecutionStreamEvent {
id: string;
status: 'CREATED' | 'EXECUTING' | 'COMPLETED' | 'FAILED';
output: unknown | null; // populated on COMPLETED
error: unknown | null; // populated on FAILED
timestamp: string;
}
Manual — Node.js (native fetch)
import { WorkspaceClient, ComponentModule } from 'caller-sdk';
const workspace = new WorkspaceClient({ apiKey: process.env.WR_API_KEY! });
// 1. Create execution
const { id: executionId } = await workspace
.call(ComponentModule.WAIT_FOR_EVM_TRANSACTION, {
jsonRpcUrl: 'https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY',
transactionHash: '0xabc123...',
})
.execute()
.then(e => e);
// 2. Open SSE stream manually (prefer workspace.execution.stream() above)
const response = await fetch(
`https://api.whiterabbit.app/v1/sdk/components/executions/${executionId}/stream`,
{ headers: { 'X-Api-Key': process.env.WR_API_KEY! } },
);
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
for (const line of decoder.decode(value).split('\n')) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6).trim();
if (!data || data === ':ping') continue;
const event: ExecutionStreamEvent = JSON.parse(data);
console.log(event.status);
if (event.status === 'COMPLETED' || event.status === 'FAILED') {
reader.cancel();
break;
}
}
}
Node.js (eventsource package)
npm install eventsource
import EventSource from 'eventsource';
const source = new EventSource(
`https://api.whiterabbit.app/v1/sdk/components/executions/${executionId}/stream`,
{ headers: { 'X-Api-Key': process.env.WR_API_KEY! } },
);
source.onmessage = (event) => {
if (event.data === ':ping') return;
const update: ExecutionStreamEvent = JSON.parse(event.data);
if (update.status === 'COMPLETED') {
console.log('Output:', update.output);
source.close();
}
if (update.status === 'FAILED') {
console.error('Failed:', update.error);
source.close();
}
};
Stream a workflow run
SSE /v1/sdk/workflows/executions/:runId/stream
Auth: X-Api-Key
Event shape
interface RunStreamEvent {
id: string; // Run UUID
status: 'CREATED' | 'EXECUTING' | 'COMPLETED' | 'FAILED' | 'CANCELED';
output: {
totalUsage?: number;
pendingStageCount?: number;
failedStageCount?: number;
cancelRequestedAt?: string | null;
cancelReason?: string | null;
nonTerminalStages?: Array<{ // Currently active stages
id: string;
module: string;
status: 'CREATED' | 'RUNNING';
executionEventId: string | null;
executionAttempt: number;
}>;
};
timestamp: string; // ISO 8601
}
Browser (native EventSource)
The native EventSource API does not support custom request headers. For browser clients, proxy the stream through your backend or use your app's session cookie for authenticated users.
const source = new EventSource(
`https://api.whiterabbit.app/v1/sdk/workflows/executions/${runId}/stream`,
);
source.onmessage = (event) => {
if (!event.data || event.data === ':ping') return;
const update: RunStreamEvent = JSON.parse(event.data);
switch (update.status) {
case 'EXECUTING':
console.log(`Running — ${update.output.pendingStageCount} stages pending`);
break;
case 'COMPLETED':
console.log(`Done! Used ${update.output.totalUsage} credits`);
source.close();
break;
case 'FAILED':
console.error(`Run failed after ${update.output.failedStageCount} stage failures`);
source.close();
break;
case 'CANCELED':
console.warn(`Canceled: ${update.output.cancelReason}`);
source.close();
break;
}
};
source.onerror = () => {
source.close();
fetchRunStatus(runId).then(console.log);
};
Node.js (native fetch)
const response = await fetch(
`https://api.whiterabbit.app/v1/sdk/workflows/executions/${runId}/stream`,
{ headers: { 'X-Api-Key': process.env.WR_API_KEY! } },
);
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
for (const line of chunk.split('\n')) {
if (line.startsWith('data: ')) {
const data = line.slice(6).trim();
if (!data || data === ':ping') continue;
const event: RunStreamEvent = JSON.parse(data);
console.log(event.status, event.output?.pendingStageCount);
}
}
}
Connection behavior
| Scenario | Behavior |
|---|---|
Execution/run already COMPLETED when you connect | First event emitted immediately, then stream closes |
Execution/run already FAILED | First event emitted with error, then stream closes |
| Proxy/load balancer timeout | :ping heartbeat (every 15s) prevents idle disconnects |
| Stream closes unexpectedly | Fetch final status via REST; re-subscribing will immediately emit current state |
React hook example
import { useEffect, useState } from 'react';
interface RunState {
status: string;
pendingStageCount: number;
totalUsage?: number;
}
export function useRunStream(runId: string | null) {
const [state, setState] = useState<RunState | null>(null);
useEffect(() => {
if (!runId) return;
// Proxy through your backend to add auth header
const source = new EventSource(`/api/workflow-run-stream?runId=${runId}`);
source.onmessage = (e) => {
if (!e.data || e.data === ':ping') return;
const update = JSON.parse(e.data);
setState({
status: update.status,
pendingStageCount: update.output?.pendingStageCount ?? 0,
totalUsage: update.output?.totalUsage,
});
if (update.status === 'COMPLETED' || update.status === 'FAILED') {
source.close();
}
};
return () => source.close();
}, [runId]);
return state;
}