Streaming SSE from Durable Endpoints Developer Preview
Durable Endpoints can stream data back to clients in real-time using Server-Sent Events (SSE). This lets you stream AI inference tokens, progress updates, or any other data — while keeping the durability guarantees of durable steps.
Streaming works across multiple steps within a single endpoint invocation, and handles the transition from sync to async mode seamlessly. If a step fails and retries, any data streamed during that step is automatically rolled back on the client.
Streaming SSE from Durable Endpoints is currently only available in the TypeScript SDK. This guide assumes you've already set up a Durable Endpoint.
When to use streaming
- AI inference — Stream LLM tokens to the browser as they're generated, so users see results immediately.
- Status updates — Send progress messages during long-running endpoint executions.
- Making existing streaming endpoints durable — Wrap your existing streaming HTTP endpoints with steps to add retry and observability at no cost to functionality.
If you don't need to stream data directly to an HTTP client, consider using Realtime to push updates from background Inngest functions via pub/sub channels.
Quick start
Server
Import step from inngest and stream from inngest/experimental/durable-endpoints, then use stream.push() or stream.pipe() inside your endpoint handler:
import Anthropic from "@anthropic-ai/sdk";
import { step } from "inngest";
import { stream } from "inngest/experimental/durable-endpoints";
import { inngest } from "@/inngest";
export const GET = inngest.endpoint(async () => {
// Option A: push() with an SDK event callback
const text = await step.run("generate", async () => {
stream.push("Generating...\n");
const client = new Anthropic();
const response = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 512,
messages: [{ role: "user", content: "Write a haiku about durability." }],
});
response.on("text", (token) => stream.push(token));
return await response.finalText();
});
// Option B: pipe() — streams each chunk AND returns the collected text
await step.run("translate", async () => {
stream.push(`\nTranslating...\n`);
const client = new Anthropic();
const response = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 256,
messages: [{ role: "user", content: `Translate to French: ${text}` }],
});
return stream.pipe(async function* () {
for await (const event of response) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
yield event.delta.text;
}
}
});
});
return Response.json("\nDone!");
});
Client
Use fetchWithStream() from inngest/experimental/durable-endpoints/client to consume the stream. It handles SSE parsing, sync-to-async redirects, and commit/rollback automatically. Chunks arrive on the client in the order they are pushed or yielded on the server.
"use client";
import { useState, useRef } from "react";
import { fetchWithStream } from "inngest/experimental/durable-endpoints/client";
export default function Generate() {
const [chunks, setChunks] = useState<string[]>([]);
const uncommittedCountRef = useRef(0);
async function run() {
setChunks([]);
uncommittedCountRef.current = 0;
const resp = await fetchWithStream("/api/generate", {
onData: ({ data }) => {
if (typeof data === "string") {
uncommittedCountRef.current++;
setChunks((prev) => [...prev, data]);
}
},
onRollback: () => {
// A step failed and will retry — remove the chunks it produced
const count = uncommittedCountRef.current;
setChunks((prev) => prev.slice(0, prev.length - count));
uncommittedCountRef.current = 0;
},
onCommit: () => {
// Step completed — its chunks are now permanent
uncommittedCountRef.current = 0;
},
onStreamError: (error) => {
setChunks((prev) => [...prev, `Error: ${error}`]);
},
});
// The endpoint's return value is available as the Response body
const result = await resp.text();
setChunks((prev) => [...prev, result]);
}
return (
<div>
<button onClick={run}>Generate</button>
<pre>{chunks.join("")}</pre>
</div>
);
}
Server API
stream.push(data)
Send a single chunk of data to the client as an SSE event.
stream.push("Loading...");
stream.push({ progress: 50, message: "Halfway there" });
- Accepts any JSON-serializable value.
- Fire-and-forget — does not block execution or return a value.
- No-op outside of an Inngest execution context, so your code works the same when called outside of a durable endpoint.
push() is ideal for one-off status messages or streaming via provider SDK event callbacks.
stream.pipe(source)
Pipe a stream source to the client and resolve with the concatenated text of all chunks. Each chunk is sent as an SSE event in real-time.
The simplest case is piping a ReadableStream, like a fetch response body:
const response = await fetch("https://api.example.com/stream");
const text = await stream.pipe(response.body);
// `text` contains the full response; the client received it chunk by chunk
When you need to transform or filter chunks before they're sent, pass an async generator function. Each yield sends one chunk to the client:
const text = await stream.pipe(async function* () {
for await (const event of response) {
// Only yield the parts you want the client to see
if (event.type === "content_block_delta") {
yield event.delta.text;
}
}
});
pipe() accepts three source types:
ReadableStream— piped directly, decoded from bytes to string chunks.AsyncIterable<string>— each value in the iterable becomes a chunk.() => AsyncIterable<string>— a function that returns an async iterable. This is what lets you passasync function*generators directly topipe().
No-op outside of an Inngest execution context (resolves with an empty string).
For the full stream.push() and stream.pipe() API reference, see the Streaming reference.
Client API
fetchWithStream(url, options)
The primary way to consume a streaming Durable Endpoint. Import it from inngest/experimental/durable-endpoints/client:
import { fetchWithStream } from "inngest/experimental/durable-endpoints/client";
fetchWithStream() returns a Promise<Response> — await it to drive the stream to completion. When the endpoint finishes, the returned Response contains the endpoint's final return value. If the endpoint does not use streaming, fetchWithStream() returns the raw Response as-is.
The core callbacks handle the majority of streaming use cases:
onData({ data, hashedStepId })— Called for each chunk.datais the deserialized value;hashedStepIdidentifies which step produced it (ornullif streamed outside a step). Data should be considered uncommitted untilonCommitfires.onRollback({ hashedStepId })— Called when a step fails and will retry. Your code is responsible for tracking and removing the chunks produced by that step (see the Quick start example for a pattern using a ref counter).onCommit({ hashedStepId })— Called when a step completes successfully. Chunks from that step are now permanent and will never be rolled back.
Because stream.push() accepts any JSON-serializable value, data in the onData callback is typed as unknown. Narrow the type in your callback as needed:
const uncommittedCount = { current: 0 };
const resp = await fetchWithStream("/api/generate", {
onData: ({ data }) => {
if (typeof data === "string") {
uncommittedCount.current++;
console.log("Chunk:", data);
}
},
onRollback: () => {
// Discard uncommitted chunks and reset counter
uncommittedCount.current = 0;
},
onCommit: () => {
// Chunks are permanent — reset counter
uncommittedCount.current = 0;
},
});
const result = await resp.text();
For all available options (fetch, fetchOpts, onMetadata, onStreamError, onDone), see the full API reference.
How it works
Sync-to-async transitions
When a client calls a streaming Durable Endpoint, the SSE stream flows directly from your app to the client. If the endpoint needs to go async (e.g. due to step.sleep(), step.waitForEvent(), or a retry), the SDK sends a redirect event telling the client where to reconnect, and the stream continues through the Inngest server.
fetchWithStream() handles this redirect automatically — the client sees a single continuous stream regardless of sync-to-async transitions.
Streaming activation
Streaming is activated lazily. The endpoint only sends an SSE response if:
- The client sends the
Accept: text/event-streamheader (whichfetchWithStream()does automatically), and - Your code calls
stream.push()orstream.pipe()during execution.
If neither push() nor pipe() is called, the endpoint behaves like a regular non-streaming Durable Endpoint.
Rollback on retry
Each chunk is tagged with the step that produced it (via hashedStepId). When a step completes, onCommit fires and those chunks become permanent. When a step fails and retries, onRollback fires and your client code should discard the uncommitted chunks from that step. On the retry attempt, the step streams fresh data that replaces what was rolled back. See the Quick start for an implementation pattern.
Data streamed outside of a step.run() is never rolled back.
SSE event types
The stream uses SSE with the following event types. The inngest.* events are internal protocol events handled by fetchWithStream() automatically — only inngest.stream events contain user data.
| Event name | Payload | Purpose |
|---|---|---|
inngest.metadata | { runId } | Always first. Identifies the run. |
inngest.stream | { data, stepId? } | User data from push() / pipe(). |
inngest.commit | { hashedStepId } | Step succeeded — its streamed data is permanent. |
inngest.rollback | { hashedStepId } | Step failed — discard its uncommitted data. |
inngest.redirect_info | { runId, url } | Tells the client to reconnect for async continuation. |
inngest.response | { status, response: { body, headers, statusCode } } | Terminal event — closes the stream. |
Limitations
Streaming SSE from Durable Endpoints is currently in developer preview. In addition to any general Durable Endpoint limitations, the following apply:
- 15 minute timeout — Client connections time out after 15 minutes, meaning your endpoint should complete within this window (including any retries) to ensure the stream is delivered end-to-end.
- No rollback outside of steps — Data streamed outside of a
step.run()is never rolled back. If you need rollback guarantees, stream from within a step. - One streaming parallel step — You can stream from at most one parallel step. Streaming from multiple parallel steps will result in interleaved output that cannot be disambiguated by the client.
- No streaming from child functions —
step.invoke()calls cannot stream data back to the parent function's client. - Raw
Responseobjects may be lost on async transition — If your endpoint returns aResponse(like a file download) and goes async, the Response is lost because it can't be memoized. Usestream.push()orstream.pipe()instead.
SDK support
| SDK | Support | Version |
|---|---|---|
| TypeScript | Developer Preview | >= 4.x (with endpointAdapter) |