Agent runs are a resource, not an ephemeral stream
An agent run is a persisted resource with an ID and lifecycle — the SSE stream is a window into it, not the run itself.
nestjs-realtime-stream-agent-runs-as-resource
Why it matters
| Stake | If ignored |
|---|---|
| Data corruption |
|
| Bad UX |
|
How to fix
An agent run (the text streaming to the user) is a resource with an ID — not just a passing stream. It is persisted in the DB with state (pending, running, completed, failed, canceled), accumulated output, and tool calls. The stream is a window into the resource, not the resource itself.
Implementing this rule requires a documented disconnect policy (see the cancellation modes in the related LLM-streaming rule). Explicit cancel (DELETE / Stop button / POST .../cancel) must exist regardless of the chosen policy. Client resume must not be broken by a naive req.on('close') abort.
Examples
ts
@Sse('chat')
async chat(@Body() dto: PromptDto) {
// streaming straight to the client, nothing persisted except the final message
}ts
@Post('runs')
async startRun(@Body() dto: StartRunDto, @CurrentUser() user: User): Promise<{ runId: string }> {
const run = await this.runs.create({ userId: user.id, prompt: dto.prompt });
this.queue.add('execute-run', { runId: run.id });
return { runId: run.id };
}
@Sse('runs/:id/events')
streamRun(@Param('id') runId: string, @CurrentUser() user: User): Observable<MessageEvent> {
return this.runs.subscribeToEvents(runId, user);
// can also replay events that already happened, if the user joined late
}
@Get('runs/:id')
async getRun(@Param('id') runId: string, @CurrentUser() user: User): Promise<RunDto> {
return this.runs.getForUser(runId, user);
}
@Post('runs/:id/cancel')
async cancel(@Param('id') runId: string, @CurrentUser() user: User) {
await this.runs.cancel(runId, user);
}