Durable Event Iterator Integration
Durable Event Iterator allows you to use Event Iterator by separating the streaming to a different service that provides durable event streams, automatic reconnections, recovery of missing events, and more.
INFO
This feature is not limited to Cloudflare Durable Objects, but it is approachable and currently the only supported option.
Installation
npm install @orpc/experimental-durable-event-iterator@latest
yarn add @orpc/experimental-durable-event-iterator@latest
pnpm add @orpc/experimental-durable-event-iterator@latest
bun add @orpc/experimental-durable-event-iterator@latest
deno install npm:@orpc/experimental-durable-event-iterator@latest
WARNING
The experimental-
prefix indicates that this feature is still in development and may change in the future.
Setup your Durable Object
WARNING
This section requires you to be familiar with Cloudflare Durable Objects. Please learn it first before continuing.
Define your Durable Object
Everything you need to do is extend the DurableEventIteratorObject
class. Additionally, you can define RPC Methods to publish events to the connected clients.
import { DurableEventIteratorObject } from '@orpc/experimental-durable-event-iterator/durable-object'
export class ChatRoom extends DurableEventIteratorObject<{ message: string }> {
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env, {
eventRetentionSeconds: 300, // Optional: Set the event retention duration (default is 5 minutes)
customJsonSerializers: [
// Custom JSON serializers
]
})
}
publishMessage(message: string) {
this.dei.websocketManager.publishEvent(this.ctx.getWebSockets(), { message })
}
}
WARNING
Do not use WebSocket Extended Methods like ws.serializeAttachment
and ws.deserializeAttachment
directly because you may interfere with the attachment that durable event iterator relies on. Instead, you should use the serializeAttachment
and deserializeAttachment
methods inside dei.websocketManager
.
Upgrade Durable Event Iterator Request
This step will upgrade and validate the WebSocket request to your Durable Object. You need to provide a signing key to validate the token and the corresponding Durable Object namespace.
import { upgradeDurableEventIteratorRequest } from '@orpc/experimental-durable-event-iterator/durable-object'
export default {
async fetch(request, env) {
const url = new URL(request.url)
if (url.pathname === '/chat-room') {
return upgradeDurableEventIteratorRequest(request, {
signingKey: 'secret-key', // Replace with your actual signing key
namespace: env.CHAT_ROOM,
})
}
return new Response('Not Found', { status: 404 })
},
} satisfies ExportedHandler<Env>
export { ChatRoom }
Server Side Usage
Here we define two procedures: one for listening to messages in the chat room, and another for sending messages to all connected clients by invoking the publishMessage
method on the Durable Object.
INFO
This example assumes your server and Durable Object are running in the same environment, but this is not required. Instead of invoking the publishMessage
method directly, you can send a fetch request to wherever your Durable Object is running to send messages.
import { DurableEventIterator } from '@orpc/experimental-durable-event-iterator'
export const router = {
onMessage: base.handler(({ context }) => {
return new DurableEventIterator<ChatRoom>('some-room', {
signingKey: 'secret-key', // Replace with your actual signing key
})
}),
sendMessage: base
.input(z.object({ message: z.string() }))
.handler(async ({ context, input }) => {
const id = context.env.CHAT_ROOM.idFromName('some-room')
const stub = context.env.CHAT_ROOM.get(id)
await stub.publishMessage(input.message)
}),
}
After that, you need to use DurableEventIteratorHandlerPlugin
in your handler to enable Durable Event Iterator support.
import { DurableEventIteratorHandlerPlugin } from '@orpc/experimental-durable-event-iterator'
const handler = new RPCHandler(router, {
plugins: [
new DurableEventIteratorHandlerPlugin(),
],
})
Client Side Usage
On the client side, you only need to set up the plugin. The rest is the same as Event Iterator. The url
you define inside DurableEventIteratorLinkPlugin
is the URL of your Durable Object upgrade endpoint.
import { DurableEventIteratorLinkPlugin } from '@orpc/experimental-durable-event-iterator/client'
const link = new RPCLink({
url: 'http://localhost:3000/rpc',
plugins: [
new DurableEventIteratorLinkPlugin({
url: 'ws://localhost:3000/chat-room',
}),
],
})
Example
const iterator = await client.onMessage()
for await (const { message } of iterator) {
console.log('Received message:', message)
}
await client.sendMessage({ message: 'Hello, world!' })
Recovery of Missing Events
The Durable Event Iterator automatically persists events for 5 minutes and recovers missed events when clients connect/reconnect, ensuring reliable message delivery even during network interruptions. You can use the eventRetentionSeconds
option to change the retention duration.
Durable Objects RPC
Unlike the Cloudflare Durable Objects RPC, this RPC utilizes oRPC built-in RPC system, allowing clients to easily interact with Durable Objects directly. To use it, you need to define methods that accept a WebSocket
instance as the first argument and return an oRPC Client.
import { DurableEventIteratorObject } from '@orpc/experimental-durable-event-iterator/durable-object'
export class ChatRoom extends DurableEventIteratorObject<
{ message: string }, // Event type
{ userId: string }, // (Optional) Token Attachment
{ something: string }
> {
publishMessage(currentWs: WebSocket) {
return base
.input(z.object({ message: z.string() }))
.handler(({ input, context }) => {
// Get attachments
const wsAttachment = this.dei.websocketManager.deserializeAttachment(currentWs)
const { userId } = wsAttachment['dei:token:payload'].att
const something = wsAttachment.something
// Set attachments
this.dei.websocketManager.serializeAttachment(currentWs, {
something: 'new value',
})
// Publish event to all other connected clients
this.dei.websocketManager.publishEvent(
this.ctx.getWebSockets().filter(ws => ws !== currentWs),
input,
)
})
.callable()
}
/**
* Nested Client
*/
router(ws: WebSocket) {
return {
ping: base.handler(() => 'pong').callable(),
echo: base
.input(z.object({ text: z.string() }))
.handler(({ input }) => `Echo: ${input.text}`)
.callable(),
}
}
}
Server Side Usage
import { DurableEventIterator } from '@orpc/experimental-durable-event-iterator'
export const onMessage = base.handler(({ context }) => {
return new DurableEventIterator<ChatRoom>('some-room', {
signingKey: 'secret-key', // Replace with your actual signing key
att: {
userId: 'user-123', // User-specific data
},
}).rpc('publishMessage', 'router') // List of allowed methods
})
INFO
Clients only have permission to call the methods you defined in the rpc
method. This provides fine-grained access control.
WARNING
The att
(attachment) data can be viewed from the client side, so do not put any sensitive data in it. Use it only for non-sensitive metadata like user IDs or preferences.
Client Side Usage
You can invoke methods defined inside rpc
directly from the client iterator
result.
const iterator = await client.onMessage()
// Listen for events
for await (const { message } of iterator) {
console.log('Received message:', message)
}
// Call RPC methods
await iterator.publishMessage({ message: 'Hello, world!' })
// Call nested router methods
const response = await iterator.router.ping()
console.log(response) // "pong"
const echoResponse = await iterator.router.echo({ text: 'Hello' })
console.log(echoResponse) // "Echo: Hello"
Contract First
This integration also supports Contract First. What you need to do is define an interface that extends DurableEventIteratorObject
.
import type { ContractRouterClient } from '@orpc/contract'
import { oc, type } from '@orpc/contract'
import type { ClientDurableEventIterator } from '@orpc/experimental-durable-event-iterator/client'
import type { DurableEventIteratorObject } from '@orpc/experimental-durable-event-iterator'
export const publishMessageContract = oc.input(z.object({ message: z.string() }))
export interface ChatRoom extends DurableEventIteratorObject<{ message: string }> {
publishMessage(...args: any[]): ContractRouterClient<typeof publishMessageContract>
}
export const contract = {
onMessage: oc.output(type<ClientDurableEventIterator<ChatRoom, 'publishMessage'>>()),
}