Streaming Guide
Deep Research streams real-time progress via Server-Sent Events (SSE). This guide covers all event types, reconnection handling, and code examples.
Overview
When you create a research task, the response includes a streamUrl. Connect to this URL to receive events as the multi-agent system searches the web, extracts content, and synthesizes findings.
SSE is the primary way to consume Deep Research results. It provides immediate visibility into progress without polling, and supports reconnection if your connection drops.
Connecting to a Stream
Endpoint:
GET /research/tasks/{taskId}/stream
Query Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
detail | string | Task's detail value | basic or detailed |
fromSequence | integer | 0 | Resume from this sequence number |
curl:
curl -N "https://api.feeds.onhelix.ai/research/tasks/{taskId}/stream" \
-H "Authorization: Bearer YOUR_API_KEY"
JavaScript (EventSource):
const es = new EventSource(
`https://api.feeds.onhelix.ai/research/tasks/${taskId}/stream`,
{ headers: { Authorization: 'Bearer YOUR_API_KEY' } }
);
Python (sseclient):
import requests
import sseclient
response = requests.get(
f'https://api.feeds.onhelix.ai/research/tasks/{task_id}/stream',
headers={'Authorization': 'Bearer YOUR_API_KEY'},
stream=True,
)
client = sseclient.SSEClient(response)
Detail Levels
basic (default)
High-level progress events. Best for end-user UIs where you want to show progress bars, source lists, and the final report.
Events: topic, progress, source, result, error, done
detailed
Everything in basic plus agent-level observability events. Best for debugging, developer tools, or building advanced visualizations of the agent system.
Additional events: agent, tool, text, thinking
Override the detail level per connection:
curl -N "https://api.feeds.onhelix.ai/research/tasks/{taskId}/stream?detail=detailed" \
-H "Authorization: Bearer YOUR_API_KEY"
Event Reference
Basic Events
These events are sent at both basic and detailed detail levels.
topic
A sub-topic research task has started or completed.
{
"topic": "solid-state-battery-materials",
"index": 1,
"status": "started"
}
| Field | Type | Description |
|---|---|---|
topic | string | Sub-topic name or agent identifier |
index | number | Topic number (1-based) |
status | string | started or completed |
progress
Overall research progress update. Sent when a topic completes or a sub-report is saved.
{
"topics_total": 3,
"topics_completed": 1,
"sources_found": 5
}
| Field | Type | Description |
|---|---|---|
topics_total | number | Total number of sub-topics being researched |
topics_completed | number | Number of sub-topics completed so far |
sources_found | number | Total web sources discovered so far |
source
A web source was discovered and saved.
{
"url": "https://example.com/solid-state-research",
"title": "Solid-State Battery Breakthroughs in 2025",
"topic": "solid-state-battery-materials"
}
| Field | Type | Description |
|---|---|---|
url | string | URL of the web source |
title | string | Page title |
topic | string | Sub-topic this source relates to |
result
Research is complete. Contains the full result object.
{
"report": "# Solid-State Battery Technology\n\n## Overview\n\n...",
"topics_researched": [
"solid-state-battery-materials",
"ev-integration",
"manufacturing-challenges"
],
"sources": [
{
"url": "https://example.com/research",
"title": "Battery Research Paper"
}
],
"sub_reports_count": 3,
"confidence_level": "high"
}
| Field | Type | Description |
|---|---|---|
report | string | Comprehensive research report in Markdown |
topics_researched | string[] | List of sub-topics investigated |
sources | object[] | Web sources cited in the report |
sub_reports_count | number | Number of sub-reports synthesized |
confidence_level | string | high, medium, or low |
error
Research encountered an error.
{
"message": "Workflow execution timed out"
}
| Field | Type | Description |
|---|---|---|
message | string | Error description |
done
Stream has ended. This is always the last event sent. Close your connection after receiving it.
{}
Detailed Events
These events are only sent when detail=detailed.
agent
Agent lifecycle event. Tracks when sub-researcher agents start and end.
Agent started:
{
"type": "start",
"id": "sub-researcher-abc123",
"topic": "solid-state-battery-materials"
}
Agent ended:
{
"type": "end",
"id": "sub-researcher-abc123",
"status": "completed"
}
| Field | Type | Description |
|---|---|---|
type | string | start or end |
id | string | Unique agent identifier |
topic | string | Sub-topic assigned (on start) |
status | string | Agent exit status (on end) |
tool
Tool execution event. Tracks when agents use tools (web search, content extraction, etc.).
Tool started:
{
"type": "start",
"name": "web_search",
"agent_id": "sub-researcher-abc123"
}
Tool ended:
{
"type": "end",
"name": "web_search",
"agent_id": "sub-researcher-abc123",
"results_count": 8
}
| Field | Type | Description |
|---|---|---|
type | string | start or end |
name | string | Tool name (e.g., web_search) |
agent_id | string | Agent that invoked the tool |
results_count | number | Number of results returned (on end) |
text
LLM text generation delta from an agent.
{
"agent_id": "sub-researcher-abc123",
"delta": "Based on the sources analyzed, solid-state batteries "
}
| Field | Type | Description |
|---|---|---|
agent_id | string | Agent generating text |
delta | string | Text chunk (incremental) |
thinking
Agent reasoning/thinking content.
{
"agent_id": "sub-researcher-abc123",
"content": "I should search for recent papers on sulfide-based solid electrolytes..."
}
| Field | Type | Description |
|---|---|---|
agent_id | string | Agent that is reasoning |
content | string | Thinking/reasoning content |
Stream Resumption
Every SSE event the server sends includes an id: line containing a sequence number. This is your checkpoint — track it on the client as events arrive:
event: topic
data: {"topic":"battery-materials","index":1,"status":"started"}
id: 3
event: source
data: {"url":"https://...","title":"...","topic":"battery-materials"}
id: 7
If your connection drops, reconnect with the last id you received as the fromSequence query parameter:
GET /research/tasks/{taskId}/stream?fromSequence=7
The server replays all events after that sequence, so you won't miss anything. For a fresh connection, fromSequence defaults to 0 (start from the beginning).
In JavaScript, the sequence number is available via event.lastEventId:
let lastSequence = 0;
es.addEventListener('progress', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
// handle event...
});
Keepalive: The server sends SSE comment lines (: prefix) every 15 seconds to keep the connection alive and prevent proxies or load balancers from timing out.
Stream availability: If you connect immediately after creating a task, the stream may not exist yet. The server polls for up to 30 seconds, sending keepalive comments while waiting. If the stream still isn't available, an error event is sent.
Snapshot + Stream Pattern
For long-running research tasks (5-10 minutes with hundreds of events), replaying the entire stream from fromSequence=0 on reconnection can be slow. The snapshot endpoint lets you load the accumulated state instantly, then connect to the SSE stream for only new events.
Step 1: Fetch the snapshot to hydrate your UI:
GET /research/tasks/{taskId}/snapshot
Step 2: Connect to the SSE stream starting from the snapshot's sequence:
GET /research/tasks/{taskId}/stream?fromSequence={streamSequence}
Any events that occurred between loading the snapshot and connecting to the stream will be replayed automatically — the sequence-last pattern guarantees no events are missed.
JavaScript:
async function reconnect(taskId) {
// 1. Load snapshot for instant UI hydration
const snapshotRes = await fetch(
`https://api.feeds.onhelix.ai/research/tasks/${taskId}/snapshot`,
{ headers: { Authorization: 'Bearer YOUR_API_KEY' } }
);
const { data: snapshot } = await snapshotRes.json();
// Hydrate UI from snapshot
updateProgress(snapshot.progress);
renderTopics(snapshot.topics);
renderSources(snapshot.sources);
if (snapshot.result) renderResult(snapshot.result);
// 2. Connect to stream for new events only
let lastSequence = snapshot.streamSequence;
const url = `https://api.feeds.onhelix.ai/research/tasks/${taskId}/stream?fromSequence=${lastSequence}`;
const es = new EventSource(url, {
headers: { Authorization: 'Bearer YOUR_API_KEY' },
});
es.addEventListener('progress', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
updateProgress(JSON.parse(e.data));
});
// ... handle other events
}
Python:
import requests
import sseclient
import json
def reconnect(task_id: str):
headers = {'Authorization': 'Bearer YOUR_API_KEY'}
# 1. Load snapshot
snapshot_res = requests.get(
f'https://api.feeds.onhelix.ai/research/tasks/{task_id}/snapshot',
headers=headers,
)
snapshot = snapshot_res.json()['data']
# Hydrate state from snapshot
print(f"Progress: {snapshot['progress']['topics_completed']}/{snapshot['progress']['topics_total']}")
for source in snapshot['sources']:
print(f" Source: {source['title']}")
# 2. Stream only new events
stream_res = requests.get(
f'https://api.feeds.onhelix.ai/research/tasks/{task_id}/stream',
params={'fromSequence': snapshot['streamSequence']},
headers=headers,
stream=True,
)
client = sseclient.SSEClient(stream_res)
for event in client.events():
data = json.loads(event.data)
if event.event == 'done':
break
# ... handle events
When to use this pattern:
- Page refreshes or tab switches during a running research task
- Reconnecting after a long disconnection
- Late-joining clients that need the current state immediately
- Any scenario where replaying the full event stream would be too slow
Code Examples
JavaScript / Node.js
Complete example with event handling and automatic reconnection:
const API_KEY = 'YOUR_API_KEY';
const BASE_URL = 'https://api.feeds.onhelix.ai';
async function research(topic) {
// 1. Create the task
const createRes = await fetch(`${BASE_URL}/research/tasks`, {
method: 'POST',
headers: {
Authorization: `Bearer ${API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ input: topic }),
});
const { data: task } = await createRes.json();
console.log(`Task created: ${task.id}`);
// 2. Stream results
let lastSequence = 0;
const sources = [];
function connect() {
const url = `${BASE_URL}/research/tasks/${task.id}/stream?fromSequence=${lastSequence}`;
const es = new EventSource(url, {
headers: { Authorization: `Bearer ${API_KEY}` },
});
es.addEventListener('topic', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
const { topic, status } = JSON.parse(e.data);
console.log(`Topic "${topic}": ${status}`);
});
es.addEventListener('progress', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
const { topics_completed, topics_total, sources_found } = JSON.parse(
e.data
);
console.log(
`Progress: ${topics_completed}/${topics_total} topics, ${sources_found} sources`
);
});
es.addEventListener('source', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
const source = JSON.parse(e.data);
sources.push(source);
console.log(`Source found: ${source.title} (${source.url})`);
});
es.addEventListener('result', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
const result = JSON.parse(e.data);
console.log('\n--- Research Report ---');
console.log(result.report);
console.log(`\nConfidence: ${result.confidence_level}`);
console.log(`Sources: ${sources.length}`);
});
es.addEventListener('error', (e) => {
if (e.data) {
const { message } = JSON.parse(e.data);
console.error(`Research error: ${message}`);
}
es.close();
});
es.addEventListener('done', () => {
es.close();
});
// Auto-reconnect on connection loss
es.onerror = () => {
es.close();
console.log('Connection lost, reconnecting...');
setTimeout(connect, 3000);
};
}
connect();
}
research('Impact of quantum computing on cryptography');
Python
Complete example with reconnection logic:
import requests
import sseclient
import json
import time
API_KEY = 'YOUR_API_KEY'
BASE_URL = 'https://api.feeds.onhelix.ai'
HEADERS = {'Authorization': f'Bearer {API_KEY}'}
def research(topic: str) -> dict:
# 1. Create the task
response = requests.post(
f'{BASE_URL}/research/tasks',
json={'input': topic},
headers={**HEADERS, 'Content-Type': 'application/json'},
)
response.raise_for_status()
task = response.json()['data']
print(f"Task created: {task['id']}")
# 2. Stream results with reconnection
last_sequence = 0
result = None
max_retries = 5
for attempt in range(max_retries):
try:
stream_response = requests.get(
f"{BASE_URL}/research/tasks/{task['id']}/stream",
params={'fromSequence': last_sequence},
headers=HEADERS,
stream=True,
)
stream_response.raise_for_status()
client = sseclient.SSEClient(stream_response)
for event in client.events():
if event.id:
last_sequence = int(event.id)
data = json.loads(event.data)
if event.event == 'topic':
print(f"Topic \"{data['topic']}\": {data['status']}")
elif event.event == 'progress':
print(
f"Progress: {data['topics_completed']}/{data['topics_total']} topics, "
f"{data['sources_found']} sources"
)
elif event.event == 'source':
print(f"Source: {data['title']} ({data['url']})")
elif event.event == 'result':
result = data
print(f"\nReport:\n{data['report'][:200]}...")
elif event.event == 'error':
print(f"Error: {data['message']}")
return data
elif event.event == 'done':
return result
break # Stream ended normally
except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError):
print(f"Connection lost, reconnecting (attempt {attempt + 1})...")
time.sleep(3)
return result
result = research('Impact of quantum computing on cryptography')
curl
# Basic streaming
curl -N "https://api.feeds.onhelix.ai/research/tasks/{taskId}/stream" \
-H "Authorization: Bearer YOUR_API_KEY"
# Detailed streaming
curl -N "https://api.feeds.onhelix.ai/research/tasks/{taskId}/stream?detail=detailed" \
-H "Authorization: Bearer YOUR_API_KEY"
# Resume from sequence 42
curl -N "https://api.feeds.onhelix.ai/research/tasks/{taskId}/stream?fromSequence=42" \
-H "Authorization: Bearer YOUR_API_KEY"
Note: With curl, SSE events are printed as raw text. Each event has an event: line followed by a data: line.
Common Patterns
Progress Bar
Use progress events to build a progress bar:
es.addEventListener('progress', (e) => {
const { topics_completed, topics_total } = JSON.parse(e.data);
const percent = Math.round((topics_completed / topics_total) * 100);
updateProgressBar(percent);
});
Source List Accumulation
Collect sources as they're discovered:
const sources = [];
es.addEventListener('source', (e) => {
const source = JSON.parse(e.data);
sources.push(source);
renderSourceList(sources);
});
Agent Visualization (Detailed Mode)
Track active agents and their tool usage:
const agents = new Map();
es.addEventListener('agent', (e) => {
const { type, id, topic, status } = JSON.parse(e.data);
if (type === 'start') {
agents.set(id, { topic, tools: [], status: 'running' });
} else {
const agent = agents.get(id);
if (agent) agent.status = status;
}
renderAgentDashboard(agents);
});
es.addEventListener('tool', (e) => {
const { type, name, agent_id, results_count } = JSON.parse(e.data);
const agent = agents.get(agent_id);
if (agent) {
if (type === 'start') {
agent.tools.push({ name, status: 'running' });
} else {
const tool = agent.tools.find(
(t) => t.name === name && t.status === 'running'
);
if (tool) {
tool.status = 'done';
tool.results_count = results_count;
}
}
}
renderAgentDashboard(agents);
});
Troubleshooting
Connection Drops
SSE connections can be interrupted by network issues, proxies, or load balancers. Always implement reconnection with fromSequence:
- Track the
idfield from each event - On disconnect, reconnect with
?fromSequence={lastId} - The server replays missed events automatically
No Events Received
If you connect but receive no events:
- The stream may not be ready yet. The server waits up to 30 seconds for stream availability
- Check that the task ID is correct and the task status is
running - Verify your API key has access to the task
Duplicate Events After Reconnection
When reconnecting with fromSequence, you may receive the event at that sequence again. Use the sequence ID to deduplicate on the client side.