Publisher
The Publisher is a helper that enables you to listen to and publish events to subscribers. Combined with the Event Iterator, it allows you to build streaming responses, real-time updates, and server-sent events with minimal requirements.
Installation
npm install @orpc/experimental-publisher@latest
yarn add @orpc/experimental-publisher@latest
pnpm add @orpc/experimental-publisher@latest
bun add @orpc/experimental-publisher@latest
deno add npm:@orpc/experimental-publisher@latest
Basic Usage
const publisher = new MemoryPublisher<{
'something-updated': {
id: string
}
}>()
const live = os
.handler(async function* ({ input, signal }) {
const iterator = publisher.subscribe('something-updated', { signal })
for await (const payload of iterator) {
// Handle payload here or yield directly to client
yield payload
}
})
const publish = os
.input(z.object({ id: z.string() }))
.handler(async ({ input }) => {
await publisher.publish('something-updated', { id: input.id })
})
TIP
The publisher supports both static and dynamic event names.
const publisher = new MemoryPublisher<Record<string, { message: string }>>()
Resume Feature
The resume feature uses lastEventId
to determine where to resume from after a disconnection.
WARNING
By default, most adapters have this feature disabled.
Server Implementation
When subscribing, you must forward the lastEventId
to the publisher to enable resuming:
const live = os
.handler(async function* ({ input, signal, lastEventId }) {
const iterator = publisher.subscribe('something-updated', { signal, lastEventId })
for await (const payload of iterator) {
yield payload
}
})
Event ID Management
The publisher automatically manages event ids when resume is enabled. This means:
- Event ids you provide when publishing will be ignored
- When subscribing, you must forward the event id when yielding custom payloads
import { getEventMeta, withEventMeta } from '@orpc/server'
const live = os
.handler(async function* ({ input, signal, lastEventId }) {
const iterator = publisher.subscribe('something-updated', { signal, lastEventId })
for await (const payload of iterator) {
// Preserve event id when yielding custom data
yield withEventMeta({ custom: 'value' }, { ...getEventMeta(payload) })
}
})
const publish = os
.input(z.object({ id: z.string() }))
.handler(async ({ input }) => {
// The event id 'this-will-be-ignored' will be replaced by the publisher
await publisher.publish('something-updated', withEventMeta({ id: input.id }, { id: 'this-will-be-ignored' }))
})
Client Implementation
On the client, you can use the Client Retry Plugin, which automatically controls and passes lastEventId
to the server when reconnecting. Alternatively, you can manage lastEventId
manually:
import { getEventMeta } from '@orpc/client'
let lastEventId: string | undefined
while (true) {
try {
const iterator = await client.live('input', { lastEventId })
for await (const payload of iterator) {
lastEventId = getEventMeta(payload)?.id // Update lastEventId
console.log(payload)
}
}
catch {
await new Promise(resolve => setTimeout(resolve, 1000)) // Wait 1 second before retrying
}
}
Available Adapters
Name | Resume Support | Description |
---|---|---|
MemoryPublisher | ✅ | A simple in-memory publisher |
IORedisPublisher | ✅ | Adapter for ioredis |
UpstashRedisPublisher | ✅ | Adapter for Upstash Redis |
INFO
If you'd like to add a new publisher adapter, please open an issue.
Memory Publisher
import { MemoryPublisher } from '@orpc/experimental-publisher/memory'
const publisher = new MemoryPublisher<{
'something-updated': {
id: string
}
}>({
resumeRetentionSeconds: 60 * 2, // Retain events for 2 minutes to support resume
})
INFO
Resume support is disabled by default in MemoryPublisher
. Enable it by setting resumeRetentionSeconds
to an appropriate value.
IORedis Publisher
import { Redis } from 'ioredis'
import { IORedisPublisher } from '@orpc/experimental-publisher/ioredis'
const publisher = new IORedisPublisher<{
'something-updated': {
id: string
}
}>({
commander: new Redis(), // For executing short-lived commands
subscriber: new Redis(), // For subscribing to events
resumeRetentionSeconds: 60 * 2, // Retain events for 2 minutes to support resume
prefix: 'orpc:publisher:', // avoid conflict with other keys
})
This adapter requires two Redis instances: one for executing short-lived commands and another for subscribing to events.
INFO
Resume support is disabled by default in IORedisPublisher
. Enable it by setting resumeRetentionSeconds
to an appropriate value.
Upstash Redis Publisher
import { Redis } from '@upstash/redis'
import { UpstashRedisPublisher } from '@orpc/experimental-publisher/upstash-redis'
const redis = Redis.fromEnv()
const publisher = new UpstashRedisPublisher<{
'something-updated': {
id: string
}
}>(redis, {
resumeRetentionSeconds: 60 * 2, // Retain events for 2 minutes to support resume
prefix: 'orpc:publisher:', // avoid conflict with other keys
})
INFO
Resume support is disabled by default in UpstashRedisPublisher
. Enable it by setting resumeRetentionSeconds
to an appropriate value.