Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
eb7c7f6
feat: add @trigger.dev/ai package with TriggerChatTransport
cursoragent Feb 15, 2026
c14a53d
test: add comprehensive unit tests for TriggerChatTransport
cursoragent Feb 15, 2026
b8384f4
refactor: polish TriggerChatTransport implementation
cursoragent Feb 15, 2026
a620a97
test: add abort signal, multiple sessions, and body merging tests
cursoragent Feb 15, 2026
5d1eb4e
chore: add changeset for @trigger.dev/ai package
cursoragent Feb 15, 2026
6f2d5ea
refactor: remove internal ChatSessionState from public exports
cursoragent Feb 15, 2026
6206718
feat: support dynamic accessToken function for token refresh
cursoragent Feb 15, 2026
9cb40c6
refactor: avoid double-resolving accessToken in sendMessages
cursoragent Feb 15, 2026
1c382a3
feat: add chat transport and AI chat helpers to @trigger.dev/sdk
cursoragent Feb 15, 2026
4af7539
test: move chat transport tests to @trigger.dev/sdk
cursoragent Feb 15, 2026
ccd7274
refactor: delete packages/ai/ — moved to @trigger.dev/sdk subpaths
cursoragent Feb 15, 2026
821e955
chore: update changeset to target @trigger.dev/sdk
cursoragent Feb 15, 2026
2dbcd51
fix: address CodeRabbit review feedback
cursoragent Feb 15, 2026
331ed26
docs(ai): add AI Chat with useChat guide
cursoragent Feb 15, 2026
1b7316d
feat(reference): add ai-chat Next.js reference project
cursoragent Feb 15, 2026
6354075
fix(reference): use compatible @ai-sdk v3 packages, await convertToMo…
cursoragent Feb 15, 2026
a1ddc27
Use a single run with iterative waitpoint token completions
ericallam Feb 21, 2026
1fe0a98
Added tool example
ericallam Feb 21, 2026
065aa99
expose a useTriggerChatTransport hook
ericallam Feb 21, 2026
9d88886
use input streams and rename chatTask and chatState to chat.task and …
ericallam Mar 3, 2026
ae7819c
add stopping support and fix issue with the OpenAI responses API and …
ericallam Mar 4, 2026
4cf3b45
Add warmTimeoutInSeconds option
ericallam Mar 4, 2026
5ba3fb6
Add clientData support
ericallam Mar 4, 2026
61d0a89
provide already converted UIMessages to the run function for better dx
ericallam Mar 4, 2026
afd3fb7
Added better telemetry support to view turns
ericallam Mar 4, 2026
0b2cdab
Fix double looping when resuming from an input stream waitpoint
ericallam Mar 4, 2026
0cb7217
Add some pending message support in the example
ericallam Mar 4, 2026
48a960f
Accumulate messages in the task, allowing us to only have to send use…
ericallam Mar 5, 2026
8689c9b
build full example with persisting messages, adding necessary hooks, …
ericallam Mar 5, 2026
3714c9d
Add ai chat to the sidebar for now
ericallam Mar 5, 2026
914effa
remove postinstall hook
ericallam Mar 5, 2026
129dbb1
feat: add onTurnStart hook, lastEventId support, and stream resume de…
ericallam Mar 5, 2026
9cd5047
Minor fixes around reconnecting streams
ericallam Mar 6, 2026
e8cbdea
update pnpm link file
ericallam Mar 6, 2026
8057b57
fixed chat tests
ericallam Mar 6, 2026
aaa7266
use locals for the chat pipe counter instead of a module global
ericallam Mar 6, 2026
bd2ba5b
Add triggerOptions to the transport, auto-tag with the chat ID
ericallam Mar 6, 2026
6abb29b
Make clientData typesafe and pass to all chat.task hooks
ericallam Mar 6, 2026
36537c5
feat: add chat.local for per-run typed data with Proxy access and dir…
ericallam Mar 6, 2026
617e6a4
feat(chat): add stop handling, abort cleanup, continuation support, a…
ericallam Mar 7, 2026
099c9c6
Some improvements to the example ai-chat
ericallam Mar 7, 2026
e9d513b
feat(chat): expose typed chat.stream, add deepResearch subtask exampl…
ericallam Mar 8, 2026
e774b24
feat(ai): pass chat context and toolCallId to subtasks, add typed ai.…
ericallam Mar 8, 2026
a368214
feat(chat): add preload support, dynamic tools, and preload-specific …
ericallam Mar 9, 2026
e9d23ab
docs: add mermaid architecture diagrams for ai-chat system
ericallam Mar 9, 2026
810b262
docs: add sequence diagrams to ai-chat guide
ericallam Mar 9, 2026
ce76bf4
feat(chat): auto-hydrate chat.local values in ai.tool subtasks
ericallam Mar 9, 2026
7654654
feat(chat): add chat.defer(), preload toggle, TTFB measurement, and f…
ericallam Mar 9, 2026
6111a3b
fix(reference): replace hand-rolled HTML stripping with turndown
ericallam Mar 9, 2026
768aa66
feat(streams): add inputStream.waitWithWarmup(), warm timeout config …
ericallam Mar 9, 2026
83124ff
feat(chat): add composable primitives, raw task example, and task mod…
ericallam Mar 10, 2026
6d939a0
Introduce the chat session API and better docs organization
ericallam Mar 10, 2026
c113cb9
Add support for toUIMessageStream() options
ericallam Mar 10, 2026
fbc4106
Add metadata to the streamText call
ericallam Mar 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .changeset/ai-sdk-chat-transport.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
"@trigger.dev/sdk": minor
---

Add AI SDK chat transport integration via two new subpath exports:

**`@trigger.dev/sdk/chat`** (frontend, browser-safe):
- `TriggerChatTransport` — custom `ChatTransport` for the AI SDK's `useChat` hook that runs chat completions as durable Trigger.dev tasks
- `createChatTransport()` — factory function

```tsx
import { useChat } from "@ai-sdk/react";
import { TriggerChatTransport } from "@trigger.dev/sdk/chat";

const { messages, sendMessage } = useChat({
transport: new TriggerChatTransport({
task: "my-chat-task",
accessToken,
}),
});
```

**`@trigger.dev/sdk/ai`** (backend, extends existing `ai.tool`/`ai.currentToolOptions`):
- `chatTask()` — pre-typed task wrapper with auto-pipe support
- `pipeChat()` — pipe a `StreamTextResult` or stream to the frontend
- `CHAT_STREAM_KEY` — the default stream key constant
- `ChatTaskPayload` type

```ts
import { chatTask } from "@trigger.dev/sdk/ai";
import { streamText, convertToModelMessages } from "ai";

export const myChatTask = chatTask({
id: "my-chat-task",
run: async ({ messages }) => {
return streamText({
model: openai("gpt-4o"),
messages: convertToModelMessages(messages),
});
},
});
```
22 changes: 22 additions & 0 deletions .claude/rules/package-installation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
paths:
- "**/package.json"
---

# Installing Packages

When adding a new dependency to any package.json in the monorepo:

1. **Look up the latest version** on npm before adding:
```bash
pnpm view <package-name> version
```
If unsure which version to use (e.g. major version compatibility), confirm with the user.

2. **Edit the package.json directly** — do NOT use `pnpm add` as it can cause issues in the monorepo. Add the dependency with the correct version range (typically `^x.y.z`).

3. **Run `pnpm i` from the repo root** after editing to install and update the lockfile:
```bash
pnpm i
```
Always run from the repo root, not from the package directory.
2 changes: 2 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ This file provides guidance to Claude Code when working with this repository. Su

This is a pnpm 10.23.0 monorepo using Turborepo. Run commands from root with `pnpm run`.

**Adding dependencies:** Edit `package.json` directly instead of using `pnpm add`, then run `pnpm i` from the repo root. See `.claude/rules/package-installation.md` for the full process.

```bash
pnpm run docker # Start Docker services (PostgreSQL, Redis, Electric)
pnpm run db:migrate # Run database migrations
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export {
getSchemaParseFn,
type AnySchemaParseFn,
type SchemaParseFn,
type inferSchemaOut,
isSchemaZodEsque,
isSchemaValibotEsque,
isSchemaArkTypeEsque,
Expand Down
12 changes: 12 additions & 0 deletions packages/core/src/v3/inputStreams/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ export class InputStreamsAPI implements InputStreamManager {
return this.#getManager().lastSeqNum(streamId);
}

public setLastSeqNum(streamId: string, seqNum: number): void {
this.#getManager().setLastSeqNum(streamId, seqNum);
}

public shiftBuffer(streamId: string): boolean {
return this.#getManager().shiftBuffer(streamId);
}

public disconnectStream(streamId: string): void {
this.#getManager().disconnectStream(streamId);
}

public clearHandlers(): void {
this.#getManager().clearHandlers();
}
Expand Down
29 changes: 29 additions & 0 deletions packages/core/src/v3/inputStreams/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ export class StandardInputStreamManager implements InputStreamManager {
return this.seqNums.get(streamId);
}

setLastSeqNum(streamId: string, seqNum: number): void {
const current = this.seqNums.get(streamId);
// Only advance forward, never backward
if (current === undefined || seqNum > current) {
this.seqNums.set(streamId, seqNum);
}
}

shiftBuffer(streamId: string): boolean {
const buffered = this.buffer.get(streamId);
if (buffered && buffered.length > 0) {
buffered.shift();
if (buffered.length === 0) {
this.buffer.delete(streamId);
}
return true;
}
return false;
}

setRunId(runId: string, streamsVersion?: string): void {
this.currentRunId = runId;
this.streamsVersion = streamsVersion;
Expand Down Expand Up @@ -158,6 +178,15 @@ export class StandardInputStreamManager implements InputStreamManager {
}
}

disconnectStream(streamId: string): void {
const tail = this.tails.get(streamId);
if (tail) {
tail.abortController.abort();
this.tails.delete(streamId);
}
this.buffer.delete(streamId);
}

connectTail(runId: string, _fromSeq?: number): void {
// No-op: tails are now created per-stream lazily
}
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/v3/inputStreams/noopManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ export class NoopInputStreamManager implements InputStreamManager {
return undefined;
}

setLastSeqNum(_streamId: string, _seqNum: number): void {}

shiftBuffer(_streamId: string): boolean { return false; }

disconnectStream(_streamId: string): void {}

clearHandlers(): void {}
reset(): void {}
disconnect(): void {}
Expand Down
22 changes: 22 additions & 0 deletions packages/core/src/v3/inputStreams/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,28 @@ export interface InputStreamManager {
*/
lastSeqNum(streamId: string): number | undefined;

/**
* Advance the last-seen S2 sequence number for the given input stream.
* Used after `.wait()` resumes to prevent the SSE tail from replaying
* the record that was consumed via the waitpoint path.
*/
setLastSeqNum(streamId: string, seqNum: number): void;

/**
* Remove and discard the first buffered item for the given input stream.
* Used after `.wait()` resumes to remove the duplicate that the SSE tail
* buffered while the waitpoint was being completed via a separate path.
* Returns true if an item was removed, false if the buffer was empty.
*/
shiftBuffer(streamId: string): boolean;

/**
* Disconnect the SSE tail and clear the buffer for a specific input stream.
* Used before suspending via `.wait()` so the tail doesn't buffer duplicates
* of data that will be delivered through the waitpoint path.
*/
disconnectStream(streamId: string): void;

/**
* Clear all persistent `.on()` handlers and abort tails that have no remaining once waiters.
* Called automatically when a task run completes.
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/v3/realtimeStreams/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
RealtimeStreamInstance,
RealtimeStreamOperationOptions,
RealtimeStreamsManager,
StreamWriteResult,
} from "./types.js";

export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
Expand All @@ -16,7 +17,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
) {}
// Track active streams - using a Set allows multiple streams for the same key to coexist
private activeStreams = new Set<{
wait: () => Promise<void>;
wait: () => Promise<StreamWriteResult>;
abortController: AbortController;
}>();

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/v3/realtimeStreams/noopManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export class NoopRealtimeStreamsManager implements RealtimeStreamsManager {
options?: RealtimeStreamOperationOptions
): RealtimeStreamInstance<T> {
return {
wait: () => Promise.resolve(),
wait: () => Promise.resolve({}),
get stream(): AsyncIterableStream<T> {
return createAsyncIterableStreamFromAsyncIterable(source);
},
Expand Down
7 changes: 4 additions & 3 deletions packages/core/src/v3/realtimeStreams/streamInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
import { AnyZodFetchOptions } from "../zodfetch.js";
import { StreamsWriterV1 } from "./streamsWriterV1.js";
import { StreamsWriterV2 } from "./streamsWriterV2.js";
import { StreamsWriter } from "./types.js";
import { StreamsWriter, StreamWriteResult } from "./types.js";

export type StreamInstanceOptions<T> = {
apiClient: ApiClient;
Expand Down Expand Up @@ -63,8 +63,9 @@ export class StreamInstance<T> implements StreamsWriter {
return streamWriter;
}

public async wait(): Promise<void> {
return this.streamPromise.then((writer) => writer.wait());
public async wait(): Promise<StreamWriteResult> {
const writer = await this.streamPromise;
return writer.wait();
}

public get stream(): AsyncIterableStream<T> {
Expand Down
7 changes: 4 additions & 3 deletions packages/core/src/v3/realtimeStreams/streamsWriterV1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { request as httpsRequest } from "node:https";
import { request as httpRequest } from "node:http";
import { URL } from "node:url";
import { randomBytes } from "node:crypto";
import { StreamsWriter } from "./types.js";
import { StreamsWriter, StreamWriteResult } from "./types.js";

export type StreamsWriterV1Options<T> = {
baseUrl: string;
Expand Down Expand Up @@ -258,8 +258,9 @@ export class StreamsWriterV1<T> implements StreamsWriter {
await this.makeRequest(0);
}

public async wait(): Promise<void> {
return this.streamPromise;
public async wait(): Promise<StreamWriteResult> {
await this.streamPromise;
return {};
}

public [Symbol.asyncIterator]() {
Expand Down
10 changes: 6 additions & 4 deletions packages/core/src/v3/realtimeStreams/streamsWriterV2.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { S2, AppendRecord, BatchTransform } from "@s2-dev/streamstore";
import { StreamsWriter } from "./types.js";
import { StreamsWriter, StreamWriteResult } from "./types.js";
import { nanoid } from "nanoid";

export type StreamsWriterV2Options<T = any> = {
Expand Down Expand Up @@ -54,6 +54,7 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
private readonly maxInflightBytes: number;
private aborted = false;
private sessionWritable: WritableStream<any> | null = null;
private lastSeqNum: number | undefined;

constructor(private options: StreamsWriterV2Options<T>) {
this.debug = options.debug ?? false;
Expand Down Expand Up @@ -169,9 +170,9 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
const lastAcked = session.lastAckedPosition();

if (lastAcked?.end) {
const recordsWritten = lastAcked.end.seqNum;
this.lastSeqNum = lastAcked.end.seqNum;
this.log(
`[S2MetadataStream] Written ${recordsWritten} records, ending at seqNum=${lastAcked.end.seqNum}`
`[S2MetadataStream] Written ${this.lastSeqNum} records, ending at seqNum=${this.lastSeqNum}`
);
}
} catch (error) {
Expand All @@ -184,8 +185,9 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
}
}

public async wait(): Promise<void> {
public async wait(): Promise<StreamWriteResult> {
await this.streamPromise;
return { lastEventId: this.lastSeqNum?.toString() };
}

public [Symbol.asyncIterator]() {
Expand Down
36 changes: 33 additions & 3 deletions packages/core/src/v3/realtimeStreams/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ export interface RealtimeStreamsManager {
): Promise<void>;
}

export type StreamWriteResult = {
lastEventId?: string;
};

export interface RealtimeStreamInstance<T> {
wait(): Promise<void>;
wait(): Promise<StreamWriteResult>;
get stream(): AsyncIterableStream<T>;
}

export interface StreamsWriter {
wait(): Promise<void>;
wait(): Promise<StreamWriteResult>;
}

export type RealtimeDefinedStream<TPart> = {
Expand Down Expand Up @@ -71,6 +75,10 @@ export type PipeStreamOptions = {
* Additional request options for the API call.
*/
requestOptions?: ApiRequestOptions;
/** Override the default span name for this operation. */
spanName?: string;
/** When true, the span will be collapsed in the dashboard. */
collapsed?: boolean;
};

/**
Expand All @@ -89,7 +97,7 @@ export type PipeStreamResult<T> = {
* to the realtime stream. Use this to wait for the stream to complete before
* finishing your task.
*/
waitUntilComplete: () => Promise<void>;
waitUntilComplete: () => Promise<StreamWriteResult>;
};

/**
Expand Down Expand Up @@ -185,6 +193,14 @@ export type RealtimeDefinedInputStream<TData> = {
* Uses a waitpoint token internally. Can only be called inside a task.run().
*/
wait: (options?: InputStreamWaitOptions) => ManualWaitpointPromise<TData>;
/**
* Wait for data with a warm phase before suspending.
*
* Keeps the task warm (active, using compute) for `warmTimeoutInSeconds`,
* then suspends via `.wait()` if no data arrives. If data arrives during
* the warm phase the task responds instantly without suspending.
*/
waitWithWarmup: (options: InputStreamWaitWithWarmupOptions) => Promise<{ ok: true; output: TData } | { ok: false; error?: any }>;
/**
* Send data to this input stream on a specific run.
* This is used from outside the task (e.g., from your backend or another task).
Expand All @@ -199,6 +215,8 @@ export type InputStreamSubscription = {
export type InputStreamOnceOptions = {
signal?: AbortSignal;
timeoutMs?: number;
/** Override the default span name for this operation. */
spanName?: string;
};

export type SendInputStreamOptions = {
Expand Down Expand Up @@ -234,6 +252,18 @@ export type InputStreamWaitOptions = {
* and filtering waitpoints via `wait.listTokens()`.
*/
tags?: string[];

/** Override the default span name for this operation. */
spanName?: string;
};

export type InputStreamWaitWithWarmupOptions = {
/** Seconds to keep the task warm before suspending. */
warmTimeoutInSeconds: number;
/** Maximum time to wait after suspending (duration string, e.g. "1h"). */
timeout?: string;
/** Override the default span name for the outer operation. */
spanName?: string;
};

export type InferInputStreamType<T> = T extends RealtimeDefinedInputStream<infer TData>
Expand Down
Loading
Loading