Chat Tutorial: Procedures
Now that the skeleton is in place, let's build the real chat procedures. You'll create queries to fetch messages and room info, mutations to send messages and manage presence, and — most importantly — a subscription that streams chat events in real time over WebSockets.
The Chat Event System
Before writing procedures, we need an event emitter. When a user sends a message, joins a room, or starts typing, we emit an event. Subscriptions listen for these events and forward them to connected clients.
// app/lib/events.ts
export type ChatEvent =
| { type: 'message'; roomId: string; message: { id: string; username: string; content: string; timestamp: number } }
| { type: 'user_joined'; roomId: string; username: string }
| { type: 'user_left'; roomId: string; username: string }
| { type: 'typing'; roomId: string; username: string }
| { type: 'stop_typing'; roomId: string; username: string }
const listeners = new Set<(event: ChatEvent) => void>()
export function emit(event: ChatEvent) {
for (const listener of listeners) {
listener(event)
}
}
export function subscribe(roomId: string): AsyncGenerator<ChatEvent, void, unknown> {
const queue: ChatEvent[] = []
let resolve: (() => void) | null = null
let done = false
const listener = (event: ChatEvent) => {
if (event.roomId !== roomId) return
queue.push(event)
resolve?.()
}
listeners.add(listener)
return {
async next() {
if (done) return { value: undefined, done: true }
while (queue.length === 0) {
await new Promise<void>((r) => { resolve = r })
resolve = null
if (done) return { value: undefined, done: true }
}
return { value: queue.shift()!, done: false }
},
async return() {
done = true
listeners.delete(listener)
resolve?.()
return { value: undefined, done: true }
},
async throw() {
done = true
listeners.delete(listener)
resolve?.()
return { value: undefined, done: true }
},
[Symbol.asyncIterator]() { return this },
}
}This is a simple pub/sub pattern. Each subscription creates an async generator that yields events for a specific room. When the client disconnects, return() is called and the listener is cleaned up.
Complete Procedures
Now build all the chat procedures:
// app/rpc/procedures.ts
import { procedure } from '@ereo/rpc'
import { store } from '~/lib/store'
import { emit, subscribe as subscribeToRoom } from '~/lib/events'
import type { ChatEvent } from '~/lib/events'
const pub = procedure
// --- Queries ---
export const health = pub.query(() => ({
status: 'ok',
rooms: store.getRooms().length,
timestamp: Date.now(),
}))
export const listRooms = pub.query(() => {
return store.getRooms().map((room) => ({
...room,
online: store.getOnlineUsers(room.id).length,
}))
})
export const getRoom = pub.query(
{ parse: (data: unknown) => data as { roomId: string } },
({ input }) => {
const room = store.getRoom(input.roomId)
if (!room) throw new Error('Room not found')
return {
...room,
online: store.getOnlineUsers(room.id),
messages: store.getMessages(room.id, 50),
}
}
)
export const getMessages = pub.query(
{ parse: (data: unknown) => data as { roomId: string; limit?: number } },
({ input }) => {
return store.getMessages(input.roomId, input.limit || 50)
}
)
// --- Mutations ---
export const sendMessage = pub.mutation(
{ parse: (data: unknown) => data as { roomId: string; username: string; content: string } },
({ input }) => {
const { roomId, username, content } = input
if (!content.trim()) {
return { ok: false, error: { code: 'INVALID_INPUT', message: 'Message cannot be empty' } }
}
if (content.length > 1000) {
return { ok: false, error: { code: 'INVALID_INPUT', message: 'Message too long (max 1000 chars)' } }
}
const message = store.addMessage(roomId, username, content.trim())
// Clear typing indicator
store.clearTyping(roomId, username)
// Broadcast to subscribers
emit({
type: 'message',
roomId,
message: {
id: message.id,
username: message.username,
content: message.content,
timestamp: message.timestamp,
},
})
emit({ type: 'stop_typing', roomId, username })
return message
}
)
export const joinRoom = pub.mutation(
{ parse: (data: unknown) => data as { roomId: string; username: string } },
({ input }) => {
const { roomId, username } = input
if (!store.getRoom(roomId)) {
return { ok: false, error: { code: 'NOT_FOUND', message: 'Room not found' } }
}
store.joinRoom(roomId, username)
emit({ type: 'user_joined', roomId, username })
return { online: store.getOnlineUsers(roomId) }
}
)
export const leaveRoom = pub.mutation(
{ parse: (data: unknown) => data as { roomId: string; username: string } },
({ input }) => {
store.leaveRoom(input.roomId, input.username)
store.clearTyping(input.roomId, input.username)
emit({ type: 'user_left', roomId: input.roomId, username: input.username })
emit({ type: 'stop_typing', roomId: input.roomId, username: input.username })
return { success: true }
}
)
export const setTyping = pub.mutation(
{ parse: (data: unknown) => data as { roomId: string; username: string } },
({ input }) => {
store.setTyping(input.roomId, input.username)
emit({ type: 'typing', roomId: input.roomId, username: input.username })
return { success: true }
}
)
export const stopTyping = pub.mutation(
{ parse: (data: unknown) => data as { roomId: string; username: string } },
({ input }) => {
store.clearTyping(input.roomId, input.username)
emit({ type: 'stop_typing', roomId: input.roomId, username: input.username })
return { success: true }
}
)
// --- Subscription ---
export const roomEvents = pub.subscription(
{ parse: (data: unknown) => data as { roomId: string } },
async function* ({ input }): AsyncGenerator<ChatEvent> {
yield* subscribeToRoom(input.roomId)
}
)Update the Router
Add all procedures to the router:
// app/rpc/router.ts
import { createRouter } from '@ereo/rpc'
import {
health,
listRooms,
getRoom,
getMessages,
sendMessage,
joinRoom,
leaveRoom,
setTyping,
stopTyping,
roomEvents,
} from './procedures'
export const router = createRouter({
health,
rooms: {
list: listRooms,
get: getRoom,
join: joinRoom,
leave: leaveRoom,
},
messages: {
list: getMessages,
send: sendMessage,
},
typing: {
start: setTyping,
stop: stopTyping,
},
events: roomEvents,
})
export type AppRouter = typeof routerUnderstanding the Procedure Types
Let's break down how each procedure type works:
Queries (read-only)
// No input — just returns data
export const health = pub.query(() => ({ status: 'ok' }))
// With input — first arg is a schema, second is the handler
export const getRoom = pub.query(
{ parse: (data: unknown) => data as { roomId: string } },
({ input }) => { /* input is typed as { roomId: string } */ }
)Queries are idempotent. They can be cached, retried, and called via GET or POST.
Mutations (write operations)
export const sendMessage = pub.mutation(
{ parse: (data: unknown) => data as { roomId: string; username: string; content: string } },
({ input }) => {
// Side effects: write to store, emit events
const message = store.addMessage(input.roomId, input.username, input.content)
emit({ type: 'message', roomId: input.roomId, message })
return message
}
)Mutations have side effects. They always use POST and should not be retried automatically.
Subscriptions (streaming)
export const roomEvents = pub.subscription(
{ parse: (data: unknown) => data as { roomId: string } },
async function* ({ input }) {
yield* subscribeToRoom(input.roomId)
}
)Subscriptions use async function* (async generators). They yield values over time through a WebSocket connection. When the client unsubscribes or disconnects, the generator's return() method is called for cleanup.
The Event Flow
Here's how sending a message flows through the system:
Client A sends "Hello"
│
▼
rpc.messages.send.mutate({ roomId: 'general', username: 'alice', content: 'Hello' })
│
▼
Server: sendMessage mutation
├── store.addMessage() → saves to in-memory array
├── store.clearTyping() → clears typing indicator
├── emit({ type: 'message', ... }) → broadcasts to all listeners
│ │
│ ├── Client A's subscription listener → queue.push(event) → yields to generator
│ ├── Client B's subscription listener → queue.push(event) → yields to generator
│ └── Client C's subscription listener → queue.push(event) → yields to generator
│
└── returns message object to Client AThe mutation returns immediately to the caller. The subscription delivers the event asynchronously to all connected clients (including the sender).
Test the Procedures
Test with curl:
# List rooms
curl -X POST http://localhost:3000/api/rpc \
-H "Content-Type: application/json" \
-d '{"path":["rooms","list"],"type":"query"}'
# Get a room
curl -X POST http://localhost:3000/api/rpc \
-H "Content-Type: application/json" \
-d '{"path":["rooms","get"],"type":"query","input":{"roomId":"general"}}'
# Send a message
curl -X POST http://localhost:3000/api/rpc \
-H "Content-Type: application/json" \
-d '{"path":["messages","send"],"type":"mutation","input":{"roomId":"general","username":"test","content":"Hello from curl!"}}'
# Get messages
curl -X POST http://localhost:3000/api/rpc \
-H "Content-Type: application/json" \
-d '{"path":["messages","list"],"type":"query","input":{"roomId":"general"}}'The last call should return the message you just sent.
What We've Done
- Built an event emitter for real-time broadcasting
- Created 9 RPC procedures covering all chat operations
- Organized procedures into a nested router (rooms, messages, typing, events)
- Implemented an async generator subscription for room events
- Understood the three procedure types: query, mutation, subscription
- Tested all procedures via curl
Next Step
The procedures work over HTTP but the subscription needs a WebSocket connection. In the next chapter, we'll set up WebSocket handling and connect the real-time event stream.