Skip to content

Publisher

The Publisher is a helper that enables you to listen to and publish events to subscribers. Combined with the Event Iterator, it allows you to build streaming responses, real-time updates, and server-sent events with minimal requirements.

Installation

sh
npm install @orpc/experimental-publisher@latest
sh
yarn add @orpc/experimental-publisher@latest
sh
pnpm add @orpc/experimental-publisher@latest
sh
bun add @orpc/experimental-publisher@latest
sh
deno add npm:@orpc/experimental-publisher@latest

Basic Usage

ts
const 
publisher
= new
MemoryPublisher
<{
'something-updated': {
id
: string
} }>() const
live
=
os
.
handler
(async function* ({
input
,
signal
}) {
const
iterator
=
publisher
.
subscribe
('something-updated', {
signal
})
for await (const
payload
of
iterator
) {
// Handle payload here or yield directly to client yield
payload
} }) const
publish
=
os
.
input
(
z
.
object
({
id
:
z
.
string
() }))
.
handler
(async ({
input
}) => {
await
publisher
.
publish
('something-updated', {
id
:
input
.
id
})
})

TIP

The publisher supports both static and dynamic event names.

ts
const publisher = new MemoryPublisher<Record<string, { message: string }>>()

Resume Feature

The resume feature uses lastEventId to determine where to resume from after a disconnection.

WARNING

By default, most adapters have this feature disabled.

Server Implementation

When subscribing, you must forward the lastEventId to the publisher to enable resuming:

ts
const live = os
  .handler(async function* ({ input, signal, lastEventId }) {
    const iterator = publisher.subscribe('something-updated', { signal, lastEventId })
    for await (const payload of iterator) {
      yield payload
    }
  })

Event ID Management

The publisher automatically manages event ids when resume is enabled. This means:

  • Event ids you provide when publishing will be ignored
  • When subscribing, you must forward the event id when yielding custom payloads
ts
import { getEventMeta, withEventMeta } from '@orpc/server'

const live = os
  .handler(async function* ({ input, signal, lastEventId }) {
    const iterator = publisher.subscribe('something-updated', { signal, lastEventId })
    for await (const payload of iterator) {
      // Preserve event id when yielding custom data
      yield withEventMeta({ custom: 'value' }, { ...getEventMeta(payload) })
    }
  })

const publish = os
  .input(z.object({ id: z.string() }))
  .handler(async ({ input }) => {
    // The event id 'this-will-be-ignored' will be replaced by the publisher
    await publisher.publish('something-updated', withEventMeta({ id: input.id }, { id: 'this-will-be-ignored' }))
  })

Client Implementation

On the client, you can use the Client Retry Plugin, which automatically controls and passes lastEventId to the server when reconnecting. Alternatively, you can manage lastEventId manually:

ts
import { getEventMeta } from '@orpc/client'

let lastEventId: string | undefined

while (true) {
  try {
    const iterator = await client.live('input', { lastEventId })

    for await (const payload of iterator) {
      lastEventId = getEventMeta(payload)?.id // Update lastEventId

      console.log(payload)
    }
  }
  catch {
    await new Promise(resolve => setTimeout(resolve, 1000)) // Wait 1 second before retrying
  }
}

Available Adapters

NameResume SupportDescription
MemoryPublisherA simple in-memory publisher
IORedisPublisherAdapter for ioredis
UpstashRedisPublisherAdapter for Upstash Redis

INFO

If you'd like to add a new publisher adapter, please open an issue.

Memory Publisher

ts
import { MemoryPublisher } from '@orpc/experimental-publisher/memory'

const publisher = new MemoryPublisher<{
  'something-updated': {
    id: string
  }
}>({
  resumeRetentionSeconds: 60 * 2, // Retain events for 2 minutes to support resume
})

INFO

Resume support is disabled by default in MemoryPublisher. Enable it by setting resumeRetentionSeconds to an appropriate value.

IORedis Publisher

ts
import { Redis } from 'ioredis'
import { IORedisPublisher } from '@orpc/experimental-publisher/ioredis'

const publisher = new IORedisPublisher<{
  'something-updated': {
    id: string
  }
}>({
  commander: new Redis(), // For executing short-lived commands
  subscriber: new Redis(), // For subscribing to events
  resumeRetentionSeconds: 60 * 2, // Retain events for 2 minutes to support resume
  prefix: 'orpc:publisher:', // avoid conflict with other keys
})

This adapter requires two Redis instances: one for executing short-lived commands and another for subscribing to events.

INFO

Resume support is disabled by default in IORedisPublisher. Enable it by setting resumeRetentionSeconds to an appropriate value.

Upstash Redis Publisher

ts
import { Redis } from '@upstash/redis'
import { UpstashRedisPublisher } from '@orpc/experimental-publisher/upstash-redis'

const redis = Redis.fromEnv()

const publisher = new UpstashRedisPublisher<{
  'something-updated': {
    id: string
  }
}>(redis, {
  resumeRetentionSeconds: 60 * 2, // Retain events for 2 minutes to support resume
  prefix: 'orpc:publisher:', // avoid conflict with other keys
})

INFO

Resume support is disabled by default in UpstashRedisPublisher. Enable it by setting resumeRetentionSeconds to an appropriate value.

Released under the MIT License.