Skip to main content

Streaming Guide

Deep Research streams real-time progress via Server-Sent Events (SSE). This guide covers all event types, reconnection handling, and code examples.

Overview

When you create a research task, the response includes a streamUrl. Connect to this URL to receive events as the multi-agent system searches the web, extracts content, and synthesizes findings.

SSE is the primary way to consume Deep Research results. It provides immediate visibility into progress without polling, and supports reconnection if your connection drops.

Connecting to a Stream

Endpoint:

GET /research/tasks/{taskId}/stream

Query Parameters:

ParameterTypeDefaultDescription
detailstringTask's detail valuebasic or detailed
fromSequenceinteger0Resume from this sequence number

curl:

curl -N "https://api.feeds.onhelix.ai/research/tasks/{taskId}/stream" \
-H "Authorization: Bearer YOUR_API_KEY"

JavaScript (EventSource):

const es = new EventSource(
`https://api.feeds.onhelix.ai/research/tasks/${taskId}/stream`,
{ headers: { Authorization: 'Bearer YOUR_API_KEY' } }
);

Python (sseclient):

import requests
import sseclient

response = requests.get(
f'https://api.feeds.onhelix.ai/research/tasks/{task_id}/stream',
headers={'Authorization': 'Bearer YOUR_API_KEY'},
stream=True,
)
client = sseclient.SSEClient(response)

Detail Levels

basic (default)

High-level progress events. Best for end-user UIs where you want to show progress bars, source lists, and the final report.

Events: topic, progress, source, result, error, done

detailed

Everything in basic plus agent-level observability events. Best for debugging, developer tools, or building advanced visualizations of the agent system.

Additional events: agent, tool, text, thinking

Override the detail level per connection:

curl -N "https://api.feeds.onhelix.ai/research/tasks/{taskId}/stream?detail=detailed" \
-H "Authorization: Bearer YOUR_API_KEY"

Event Reference

Basic Events

These events are sent at both basic and detailed detail levels.

topic

A sub-topic research task has started or completed.

{
"topic": "solid-state-battery-materials",
"index": 1,
"status": "started"
}
FieldTypeDescription
topicstringSub-topic name or agent identifier
indexnumberTopic number (1-based)
statusstringstarted or completed

progress

Overall research progress update. Sent when a topic completes or a sub-report is saved.

{
"topics_total": 3,
"topics_completed": 1,
"sources_found": 5
}
FieldTypeDescription
topics_totalnumberTotal number of sub-topics being researched
topics_completednumberNumber of sub-topics completed so far
sources_foundnumberTotal web sources discovered so far

source

A web source was discovered and saved.

{
"url": "https://example.com/solid-state-research",
"title": "Solid-State Battery Breakthroughs in 2025",
"topic": "solid-state-battery-materials"
}
FieldTypeDescription
urlstringURL of the web source
titlestringPage title
topicstringSub-topic this source relates to

result

Research is complete. Contains the full result object.

{
"report": "# Solid-State Battery Technology\n\n## Overview\n\n...",
"topics_researched": [
"solid-state-battery-materials",
"ev-integration",
"manufacturing-challenges"
],
"sources": [
{
"url": "https://example.com/research",
"title": "Battery Research Paper"
}
],
"sub_reports_count": 3,
"confidence_level": "high"
}
FieldTypeDescription
reportstringComprehensive research report in Markdown
topics_researchedstring[]List of sub-topics investigated
sourcesobject[]Web sources cited in the report
sub_reports_countnumberNumber of sub-reports synthesized
confidence_levelstringhigh, medium, or low

error

Research encountered an error.

{
"message": "Workflow execution timed out"
}
FieldTypeDescription
messagestringError description

done

Stream has ended. This is always the last event sent. Close your connection after receiving it.

{}

Detailed Events

These events are only sent when detail=detailed.

agent

Agent lifecycle event. Tracks when sub-researcher agents start and end.

Agent started:

{
"type": "start",
"id": "sub-researcher-abc123",
"topic": "solid-state-battery-materials"
}

Agent ended:

{
"type": "end",
"id": "sub-researcher-abc123",
"status": "completed"
}
FieldTypeDescription
typestringstart or end
idstringUnique agent identifier
topicstringSub-topic assigned (on start)
statusstringAgent exit status (on end)

tool

Tool execution event. Tracks when agents use tools (web search, content extraction, etc.).

Tool started:

{
"type": "start",
"name": "web_search",
"agent_id": "sub-researcher-abc123"
}

Tool ended:

{
"type": "end",
"name": "web_search",
"agent_id": "sub-researcher-abc123",
"results_count": 8
}
FieldTypeDescription
typestringstart or end
namestringTool name (e.g., web_search)
agent_idstringAgent that invoked the tool
results_countnumberNumber of results returned (on end)

text

LLM text generation delta from an agent.

{
"agent_id": "sub-researcher-abc123",
"delta": "Based on the sources analyzed, solid-state batteries "
}
FieldTypeDescription
agent_idstringAgent generating text
deltastringText chunk (incremental)

thinking

Agent reasoning/thinking content.

{
"agent_id": "sub-researcher-abc123",
"content": "I should search for recent papers on sulfide-based solid electrolytes..."
}
FieldTypeDescription
agent_idstringAgent that is reasoning
contentstringThinking/reasoning content

Stream Resumption

Every SSE event the server sends includes an id: line containing a sequence number. This is your checkpoint — track it on the client as events arrive:

event: topic
data: {"topic":"battery-materials","index":1,"status":"started"}
id: 3

event: source
data: {"url":"https://...","title":"...","topic":"battery-materials"}
id: 7

If your connection drops, reconnect with the last id you received as the fromSequence query parameter:

GET /research/tasks/{taskId}/stream?fromSequence=7

The server replays all events after that sequence, so you won't miss anything. For a fresh connection, fromSequence defaults to 0 (start from the beginning).

In JavaScript, the sequence number is available via event.lastEventId:

let lastSequence = 0;

es.addEventListener('progress', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
// handle event...
});

Keepalive: The server sends SSE comment lines (: prefix) every 15 seconds to keep the connection alive and prevent proxies or load balancers from timing out.

Stream availability: If you connect immediately after creating a task, the stream may not exist yet. The server polls for up to 30 seconds, sending keepalive comments while waiting. If the stream still isn't available, an error event is sent.

Snapshot + Stream Pattern

For long-running research tasks (5-10 minutes with hundreds of events), replaying the entire stream from fromSequence=0 on reconnection can be slow. The snapshot endpoint lets you load the accumulated state instantly, then connect to the SSE stream for only new events.

Step 1: Fetch the snapshot to hydrate your UI:

GET /research/tasks/{taskId}/snapshot

Step 2: Connect to the SSE stream starting from the snapshot's sequence:

GET /research/tasks/{taskId}/stream?fromSequence={streamSequence}

Any events that occurred between loading the snapshot and connecting to the stream will be replayed automatically — the sequence-last pattern guarantees no events are missed.

JavaScript:

async function reconnect(taskId) {
// 1. Load snapshot for instant UI hydration
const snapshotRes = await fetch(
`https://api.feeds.onhelix.ai/research/tasks/${taskId}/snapshot`,
{ headers: { Authorization: 'Bearer YOUR_API_KEY' } }
);
const { data: snapshot } = await snapshotRes.json();

// Hydrate UI from snapshot
updateProgress(snapshot.progress);
renderTopics(snapshot.topics);
renderSources(snapshot.sources);
if (snapshot.result) renderResult(snapshot.result);

// 2. Connect to stream for new events only
let lastSequence = snapshot.streamSequence;
const url = `https://api.feeds.onhelix.ai/research/tasks/${taskId}/stream?fromSequence=${lastSequence}`;
const es = new EventSource(url, {
headers: { Authorization: 'Bearer YOUR_API_KEY' },
});

es.addEventListener('progress', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
updateProgress(JSON.parse(e.data));
});

// ... handle other events
}

Python:

import requests
import sseclient
import json

def reconnect(task_id: str):
headers = {'Authorization': 'Bearer YOUR_API_KEY'}

# 1. Load snapshot
snapshot_res = requests.get(
f'https://api.feeds.onhelix.ai/research/tasks/{task_id}/snapshot',
headers=headers,
)
snapshot = snapshot_res.json()['data']

# Hydrate state from snapshot
print(f"Progress: {snapshot['progress']['topics_completed']}/{snapshot['progress']['topics_total']}")
for source in snapshot['sources']:
print(f" Source: {source['title']}")

# 2. Stream only new events
stream_res = requests.get(
f'https://api.feeds.onhelix.ai/research/tasks/{task_id}/stream',
params={'fromSequence': snapshot['streamSequence']},
headers=headers,
stream=True,
)
client = sseclient.SSEClient(stream_res)

for event in client.events():
data = json.loads(event.data)
if event.event == 'done':
break
# ... handle events

When to use this pattern:

  • Page refreshes or tab switches during a running research task
  • Reconnecting after a long disconnection
  • Late-joining clients that need the current state immediately
  • Any scenario where replaying the full event stream would be too slow

Code Examples

JavaScript / Node.js

Complete example with event handling and automatic reconnection:

const API_KEY = 'YOUR_API_KEY';
const BASE_URL = 'https://api.feeds.onhelix.ai';

async function research(topic) {
// 1. Create the task
const createRes = await fetch(`${BASE_URL}/research/tasks`, {
method: 'POST',
headers: {
Authorization: `Bearer ${API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ input: topic }),
});

const { data: task } = await createRes.json();
console.log(`Task created: ${task.id}`);

// 2. Stream results
let lastSequence = 0;
const sources = [];

function connect() {
const url = `${BASE_URL}/research/tasks/${task.id}/stream?fromSequence=${lastSequence}`;
const es = new EventSource(url, {
headers: { Authorization: `Bearer ${API_KEY}` },
});

es.addEventListener('topic', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
const { topic, status } = JSON.parse(e.data);
console.log(`Topic "${topic}": ${status}`);
});

es.addEventListener('progress', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
const { topics_completed, topics_total, sources_found } = JSON.parse(
e.data
);
console.log(
`Progress: ${topics_completed}/${topics_total} topics, ${sources_found} sources`
);
});

es.addEventListener('source', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
const source = JSON.parse(e.data);
sources.push(source);
console.log(`Source found: ${source.title} (${source.url})`);
});

es.addEventListener('result', (e) => {
lastSequence = parseInt(e.lastEventId, 10) || lastSequence;
const result = JSON.parse(e.data);
console.log('\n--- Research Report ---');
console.log(result.report);
console.log(`\nConfidence: ${result.confidence_level}`);
console.log(`Sources: ${sources.length}`);
});

es.addEventListener('error', (e) => {
if (e.data) {
const { message } = JSON.parse(e.data);
console.error(`Research error: ${message}`);
}
es.close();
});

es.addEventListener('done', () => {
es.close();
});

// Auto-reconnect on connection loss
es.onerror = () => {
es.close();
console.log('Connection lost, reconnecting...');
setTimeout(connect, 3000);
};
}

connect();
}

research('Impact of quantum computing on cryptography');

Python

Complete example with reconnection logic:

import requests
import sseclient
import json
import time

API_KEY = 'YOUR_API_KEY'
BASE_URL = 'https://api.feeds.onhelix.ai'
HEADERS = {'Authorization': f'Bearer {API_KEY}'}


def research(topic: str) -> dict:
# 1. Create the task
response = requests.post(
f'{BASE_URL}/research/tasks',
json={'input': topic},
headers={**HEADERS, 'Content-Type': 'application/json'},
)
response.raise_for_status()
task = response.json()['data']
print(f"Task created: {task['id']}")

# 2. Stream results with reconnection
last_sequence = 0
result = None
max_retries = 5

for attempt in range(max_retries):
try:
stream_response = requests.get(
f"{BASE_URL}/research/tasks/{task['id']}/stream",
params={'fromSequence': last_sequence},
headers=HEADERS,
stream=True,
)
stream_response.raise_for_status()
client = sseclient.SSEClient(stream_response)

for event in client.events():
if event.id:
last_sequence = int(event.id)

data = json.loads(event.data)

if event.event == 'topic':
print(f"Topic \"{data['topic']}\": {data['status']}")

elif event.event == 'progress':
print(
f"Progress: {data['topics_completed']}/{data['topics_total']} topics, "
f"{data['sources_found']} sources"
)

elif event.event == 'source':
print(f"Source: {data['title']} ({data['url']})")

elif event.event == 'result':
result = data
print(f"\nReport:\n{data['report'][:200]}...")

elif event.event == 'error':
print(f"Error: {data['message']}")
return data

elif event.event == 'done':
return result

break # Stream ended normally

except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError):
print(f"Connection lost, reconnecting (attempt {attempt + 1})...")
time.sleep(3)

return result


result = research('Impact of quantum computing on cryptography')

curl

# Basic streaming
curl -N "https://api.feeds.onhelix.ai/research/tasks/{taskId}/stream" \
-H "Authorization: Bearer YOUR_API_KEY"

# Detailed streaming
curl -N "https://api.feeds.onhelix.ai/research/tasks/{taskId}/stream?detail=detailed" \
-H "Authorization: Bearer YOUR_API_KEY"

# Resume from sequence 42
curl -N "https://api.feeds.onhelix.ai/research/tasks/{taskId}/stream?fromSequence=42" \
-H "Authorization: Bearer YOUR_API_KEY"

Note: With curl, SSE events are printed as raw text. Each event has an event: line followed by a data: line.

Common Patterns

Progress Bar

Use progress events to build a progress bar:

es.addEventListener('progress', (e) => {
const { topics_completed, topics_total } = JSON.parse(e.data);
const percent = Math.round((topics_completed / topics_total) * 100);
updateProgressBar(percent);
});

Source List Accumulation

Collect sources as they're discovered:

const sources = [];

es.addEventListener('source', (e) => {
const source = JSON.parse(e.data);
sources.push(source);
renderSourceList(sources);
});

Agent Visualization (Detailed Mode)

Track active agents and their tool usage:

const agents = new Map();

es.addEventListener('agent', (e) => {
const { type, id, topic, status } = JSON.parse(e.data);
if (type === 'start') {
agents.set(id, { topic, tools: [], status: 'running' });
} else {
const agent = agents.get(id);
if (agent) agent.status = status;
}
renderAgentDashboard(agents);
});

es.addEventListener('tool', (e) => {
const { type, name, agent_id, results_count } = JSON.parse(e.data);
const agent = agents.get(agent_id);
if (agent) {
if (type === 'start') {
agent.tools.push({ name, status: 'running' });
} else {
const tool = agent.tools.find(
(t) => t.name === name && t.status === 'running'
);
if (tool) {
tool.status = 'done';
tool.results_count = results_count;
}
}
}
renderAgentDashboard(agents);
});

Troubleshooting

Connection Drops

SSE connections can be interrupted by network issues, proxies, or load balancers. Always implement reconnection with fromSequence:

  • Track the id field from each event
  • On disconnect, reconnect with ?fromSequence={lastId}
  • The server replays missed events automatically

No Events Received

If you connect but receive no events:

  • The stream may not be ready yet. The server waits up to 30 seconds for stream availability
  • Check that the task ID is correct and the task status is running
  • Verify your API key has access to the task

Duplicate Events After Reconnection

When reconnecting with fromSequence, you may receive the event at that sequence again. Use the sequence ID to deduplicate on the client side.