How I Built a Real-Time AI App with Cloudflare Durable Objects and Workers AI
A practical look at how to build a real-time, stateful AI interview app using Cloudflare Workers, Durable Objects, and Partykit — with per-user databases, streaming responses, and resilient WebSocket communication.
Want to see it in action?
🎙️ Live demo: www.preprecords.com
🎯 What Problem Are We Trying to Solve?
We want to build a real-time AI interview app that can:
- ✅ Show when the AI is “thinking” or generating a response
- 🔁 Keep state even if the user refreshes the page
- 🚀 Stream AI responses in real time
- 🧠 Offer hints after a user goes idle
- 🔒 Enforce per-user rate limits
⚙️ How we solve this:
- Cloudflare Durable Objects to persist state per user and per interview
- Partykit for WebSocket communication
- Workers AI for inference on the edge
- Lightweight SQLite DBs inside Durable Objects — fast and local
⚙️ TL;DR: Why Durable Objects?
Durable Objects are like tiny cloud computers, scoped to each user or interview. Each one has:
- Its own local SQLite database
- Ability to maintain state across requests
- WebSocket support for real-time sync
- Built-in alarm scheduling (like setTimeout on the edge)
This makes them perfect for scalable, low-latency AI apps.
🗺️ System Architecture
Here’s what our real-time AI architecture looks like:
🧩 Durable Objects + Workers AI
🧱 Core Components
🖥️ The Client: WebSocket Logic and Message Handling
Here’s how the client maintains a WebSocket connection, handles messages, and refreshes tokens when needed.
- Uses WebSocket to communicate with the server and stream AI responses.
- When we receive a message, we parse it using Zod schemas for type safety.
- When an interview message arrives we upsert it into local state. If we’re streaming a message, it might look like this:
socket message -> 'the cat..' socket message -> 'the cat in the..' socket message -> 'the cat in the hat'
- We send a short lived token along with the query string to authenticate the WebSocket connection.
- On connection error, we refresh the auth token.
Client Browser
│
└── WebSocket → /party/interview/abc?token=JWT
│
▼
┌────────────────────────────┐
│ Interview Durable Object │
│ (ID: interview-abc) │
└────────────────────────────┘
const socket = usePartySocket({
// party and room help route the request, e.g /party/interview/1
party: 'interview',
room: interviewId,
host: process.env.WEBSOCKET_HOST,
onOpen: () => {
socket.send(JSON.stringify({ type: GET_ALL_MESSAGES }))
},
// refresh auth token
onError: async () => {
await wsToken.refresh()
},
query: () => tokenRef.current, // get current auth token
onMessage: (event) => {
const message = WsInterviewResponseSchema.parse(JSON.parse(event.data))
switch (message.type) {
case INTERVIEW_UPDATED:
// a specific interview was updated, refetch it
break
case INTERVIEW_MESSAGE:
// 👉 upsert message, in case we're streaming a response
setMessages((prev) => {
if (prev.find((m) => m.id === message.id)) {
return prev.map((m) => (m.id === message.id ? message : m))
} else {
return [...prev, message]
}
})
break
case INTERVIEW_STATE:
// the interview state has changed, maybe the AI is thinking or transcribing. Show an appropriate message to the user
break
case RATELIMIT:
// we have reached the rate limit
break
}
}
})
The Entry Point
We use Hono to handle API routing and Partykit to manage WebSocket connections.
import { Hono } from 'hono';
import { handleError } from './middleware/handleError';
import { interviewRouter } from './routes/interviewRouter';
const app = new Hono();
app.use('*', handleError);
app.route('/api/interview', interviewRouter);
export default {
async fetch(request, env, ctx: ExecutionContext) {
// if we have a valid partykit request, handle it
const partyResponse = await routePartykitRequest(
request,
env as unknown as Record<string, string>,
{
// Authenticate websocket requests
onBeforeConnect: async (request) => {
try {
const userId = await authethicateSocketRequest(request, env)
request.headers.set('X-User-ID', userId)
} catch (error) {
return new Response('Unauthorized', { status: 401 })
}
}
}
)
if (partyResponse) {
return partyResponse
}
// Otherwise, handle with Hono
return app.fetch(request, env, ctx)
}
} satisfies ExportedHandler<Env>
export default { fetch: app.fetch, Interview, UserInterviews };
🗃 Durable Objects as Localized State Machines
Think of Durable Objects like folders with a database inside:
For each user we have a userInterviews
durable object with a sqlite
database
that stores all the interviews
for that user
📂 userInterviews-1 # interviews for user 1
💾 interviews.sqlite
📂 userInterviews-2 # interviews for user 2
💾 interviews.sqlite
📂 userInterviews-3 # interviews for user 3
💾 interviews.sqlite
For each interview
that belongs to a user
, we have an interview
durable object with a sqlite
database that stores all the messages
for that interview
📂 interview-1a # interview messages for user 1, interview a
💾 messages.sqlite
📂 interview-1b # interview messages for user 1, interview b
💾 messages.sqlite
📂 interview-2a # interview messages for user 2, interview a
💾 messages.sqlite
Each Durable Object is a singleton, automatically routed by ID (e.g. userId or interviewId).
When a request is made to “get all interviews for user 1”:
[GET] /api/interviews
with an auth token that contains the user ID- The request is routed to the
userInterviews
durable object withid=userInterivews-1
- within THAT durable object, we can query the sqlite database for all interviews for
user 1
// wrangler.json (configuration file)
"durable_objects": {
"bindings": [
{
"name": "user_interviews",
"class_name": "UserInterviews"
}
]
},
// router.ts
// Hono Router
router.get('/interviews', async (ctx) => {
const { userId } = ctx.get('jwtPayload')
// get the durable object id for the user
const durableObjectId = ctx.env.user_interviews.idFromName(userId)
// get the userInterviews DO
const userInterviewDurableObject = ctx.env.user_interviews.get(durableObjectId)
// RPC call to get all interviews for the user
const interviews = await userInterviewsDO.getAllInterviews({ userId })
return ctx.json({ success: true, interviews })
})
🧵 UserInterviews Durable Object (Per User)
This DO manages a per-user SQLite DB, CRUD for interviews, and emits WebSocket broadcasts.
Handles:
- interview CRUD
- WebSocket messages for interview state
// lib/messages
export const wsInterviewCreatedMessage = (interview: InterviewT) =>
JSON.stringify({
type: INTERVIEW_CREATED,
interviewId: interview.id
} satisfies WsInterviewCreated)
// durableObjects/userInterviews.ts
import { Server, Connection, ConnectionContext } from 'partyserver'
import { wsInterviewCreatedMessage } from '@/lib/messages'
export class UserInterviews extends Server<Env> {
static options = { hibernate: true }
private readonly db: InterviewDB
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env)
this.db = new InterviewDB(ctx.storage.sql)
}
async createInterview(interview: CreateInterview): Promise<InterviewT> {
const createdInterview: InterviewT = this.db.createInterview(interview)
// broadcast to all connected clients
this.broadcast(wsInterviewCreatedMessage(createdInterview))
// store in D1
await this.globalDB.createInterview(createdInterview)
return createdInterview
}
}
🧵 Interview Durable Object (Per Interview)
Handles:
-
WebSocket messages
-
Audio transcription
-
Text-based AI chat
-
State transitions (idle → transcribing → responding)
┌────────────┐ │ Browser │ │────────────│ │ - Sends WS │ │ request │ └────┬───────┘ │ WebSocket (with token) ▼ ┌──────────────────────────────┐ │ Interview Durable Object │ │ ID: interview-abc │ ├──────────────────────────────┤ │ - Parses auth token │ │ - Maintains connection state │ │ - Updates local state │ │ - Handles message stream │ └────┬─────────────┬───────────┘ │ │ ▼ ▼ ┌────────────┐ ┌──────────────────────────────┐ │ SQLite DB │ │ Workers AI │ │ - Messages │ │ - Inference (LLaMA, Whisper) │ └────────────┘ └──────────────────────────────┘
// durableObjects/interview.ts
export class Interview extends Server<Env> {
static options = {
hibernate: true
}
private readonly db: MessagesDB
private readonly interviewAI: InterviewAI
private interview: InterviewT | null = null
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env)
this.interviewAI = new InterviewAI(env)
this.rateLimit = new RateLimit(env)
this.db = new MessagesDB(ctx.storage.sql)
}
Interview Object State Machine
- When a user connects, we send the current state to the client
- This means that if a user refreshes the page, they can see the current state of “processing” or “thinking”

type WsInterviewStateT =
| 'idle'
| 'ai.extracting.skills'
| 'ai.generating.message'
| 'ai.transcribing.text'
| 'ai.generating.hint'
export class Interview extends Server<Env> {
static options = { hibernate: true }
private readonly db: MessagesDB
private readonly interviewAI: InterviewAI
// are we processing a message, transcribing, or idle?
state: WsInterviewStateT = 'idle'
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env)
this.interviewAI = new InterviewAI(env)
this.rateLimit = new RateLimit(env)
this.db = new MessagesDB(ctx.storage.sql)
}
async onConnect(conn: Conn, ctx: ConnectionContext): Promise<void> {
const url = new URL(ctx.request.url)
const interviewId = url.pathname.split('/').pop()
// Hono auth middleware should have added the user ID to the headers
const userId = ctx.request.headers.get('X-User-ID')
if (!userId || !interviewId) {
throw new UnauthorizedError()
}
conn.setState({ userId, interviewId })
// Send the current state to the client on connect
conn.send(wsInterviewStateMessage(this.state))
}
🎤 Real-time Audio → Text → AI Response
- Uses Zod for message parsing and type safety
- Zod parsing happens on the server and client side
- Text messages are serialized and deserialized using JSON and parsed using Zod
// durableObjects/interview.ts
async onMessage(conn: Conn, message: WSMessage) {
if (message instanceof ArrayBuffer) {
this.handleAudioMessage(conn, message)
} else if (typeof message === 'string') {
this.handleTextMessage(conn, message)
}
}
Text messages are parsed with a zod schema
async handleTextMessage(conn: Conn, message: string) {
try {
const data = WsMessageSchema.parse(JSON.parse(message))
switch (data.type) {
case GET_ALL_MESSAGES:
// Fetch and return all messages for the current interview
break
case USER_TYPING:
case USER_RECORDING:
// reset the hint alarm
break
// NOTE: This would't happen because parsing would have failed
case 'unknown':
}
} catch (error) {
this.handleWebSocketError(conn, error)
}
}
Audio message are transcribed by the AI
- Audio messages are sent through the websocket as an ArrayBuffer
- They’re converted to a Uint8Array and sent to the AI for processing
- We then update the state to show that the AI is transcribing the text
- storing state, means that a user can refresh the page and see the current state
setState(state: WsInterviewStateT) {
this.state = state
this.broadcast(wsInterviewStateMessage(state))
}
async handleAudioMessage(
conn: Conn,
message: ArrayBuffer
) {
const audio = new Uint8Array(message)
// store and broadcast the current state
this.setState(AI_TRANSCRIBING_TEXT)
const transcribedText = await this.interviewAI.transcribeAudio(audio)
// store message in the DB
const upMessage = this.upsertMessage({
id: crypto.randomUUID(),
userId,
messageType: MessageType.ANSWER,
role: MessageRole.USER,
content: transcribedText
})
// update
this.setState(AI_GENERATING_MESSAGE)
// broadcast the transcribed message
this.broadcast(wsInterviewMessage(upMessage))
// Stream a respnse from the AI to the client
await this.streamInterviewQuestion(userId, this.interview)
}
⏰ AI Hints with Alarms
- After answering a question, if the user is idle for 3 minutes, the AI sends a hint
- If the user starts
typing
or startsrecording
, the timer is reset - This is done by using a durable object alarm
- We store alarm metadata and create an alarm in the future
/**
* Reset the hint alarm and schedule a hint.
* Hint will be generated using the last user message.
*/
async resetHintAlarm(interview: InterviewT) {
const { id: interviewId, userId } = interview
const alarmMeta: AlarmMeta = {
type: 'question-hint',
userId,
interviewId,
autoHintScheduled: true
}
await this.ctx.storage.put(this.alarmHintKey, alarmMeta)
await this.ctx.storage.deleteAlarm()
// set the alarm for 3 minutes from now
await this.ctx.storage.setAlarm(Date.now() + this.hintAlarmMs)
}
- When the alarm runs, it checks if there is a hint scheduled
- If so, it’ll provide a hint by making a call to the AI to generate a hint
/**
* Handles the alarm event.
*/
async alarm() {
const hintAlarm = await this.getHintAlarmMeta()
if (hintAlarm?.autoHintScheduled) {
await this.provideHint(hintAlarm.userId)
await this.deleteHintAlarmMeta()
}
}
}
AI responses are streamed to the client
- Uses the ai-sdk library to stream responses from AI
- We then pipe the response to the client through the websocket
// 📂 @services/interviewAI.ts
import { generateObject, generateText, streamText } from 'ai'
/**
* Streams a chat response from the AI.
*/
streamChatResponse(interview: InterviewT, messages: InterviewMessageT[]) {
const result = streamText({
model: this.workersAI('@cf/meta/llama-3.1-8b-instruct'),
system: InterviewAI.SystemPrompt.interview(interview),
messages: messages.map((m) => ({ role: m.role, content: m.content }))
})
return result.textStream
}
- The streamed response is then broadcasted to all connected clients
- After the response is fully generated, it is stored in the database
// 📂 @durableObjects/interview.ts
async streamInterviewQuestion(userId: string, interview: InterviewT) {
const messages = this.getAllMessages({ userId })
const textStream = this.interviewAI.streamChatResponse(interview, messages)
const aiMessage = {
id: crypto.randomUUID(),
role: MessageRole.AI,
messageType: MessageType.QUESTION,
userId
}
let buffer = ''
for await (const chunk of textStream) {
buffer += chunk
this.broadcast(wsInterviewMessage({ ...aiMessage, content: buffer + '...' }))
}
const message = this.upsertMessage({ ...aiMessage, content: buffer })
this.broadcast(wsInterviewMessage(message))
}
AI gateway is used to manage AI requests
- Everything is routed through the AI gateway
- This is a centralized service that manages AI requests:
- analytics
- logging
- rate limiting
- request retries
- Different AI models are used for different tasks
- For transcribing audio:
@cf/openai/whisper-tiny-en
- For chat responses:
@cf/meta/llama-3.1-8b-instruct
- For transcribing audio:
InterviewAI Service
┌────────────────────────────────────────────┐
│ InterviewAI │
├────────────────────────────────────────────┤
│ Dependencies: │
│ - Workers AI │
│ - Message Templates │
│ - Rate Limiter │
├────────────────────────────────────────────┤
│ Core Functions: │
│ - Text Generation │
│ - Skill Extraction │
│ - Context Management │
│ - Stream Processing │
└────────────────────────────────────────────┘
Implementation Details:
export class InterviewAI {
private readonly workersAI: WorkersAI
constructor(private readonly env: Env) {
this.workersAI = createWorkersAI({
gateway: { id: this.env.AI_GATEWAY_ID },
binding: env.AI
})
}
// AI processing methods
async generateResponse(...)
async extractSkills(...)
async streamResponse(...)
}
Security
- JWT-based authentication
- Rate limiting
- CORS protection
- Secure WebSocket connections
- Environment-based configuration
🛡 Auth over WebSockets
We can’t send headers on WebSocket connections, so we use short-lived tokens in the query string.
Client GETs /auth/token → JWT (5 min)
Client connects: ws://...?token=JWT
Server validates → attaches userId to conn
If the token is invalid, the DO closes the connection and the client fetches a new token.
⚖️ Rate Limiting with KV
- Rate limiting is done using Cloudflare KV
┌────────────────────────────────────────┐
│ Rate Limiting Service │
├────────────────────────────────────────┤
│ Storage: │
│ - KV Namespace │
├────────────────────────────────────────┤
│ Configuration: │
│ - Max Requests: 100 │
│ - Time Window: 24 hours │
├────────────────────────────────────────┤
│ Methods: │
│ - checkRateLimit() │
│ - resetLimit() │
│ - getRemainingRequests() │
└────────────────────────────────────────┘
- If you have key value store with a prefix by user id
- Every time you check the rate limit, you increment the count
- The count is reset after the time window has passed
- If when checking the rate limit, the count is greater than the max requests, you throw an error
Implementation:
export class RateLimit {
private readonly maxRequests = 100
private readonly timeWindow = 24 * 60 * 60 * 1000
async checkRateLimit(userId: string) {
const rateLimitKey = `${this.prefix}${userId}`
let userData = await this.env.RATE_LIMIT_KV.get<UserData>( rateLimitKey, 'json')
//... Rate limit logic
if (userData.requests >= this.maxRequests) {
throw new RateLimitError(this.maxRequests, resetTime)
}
userData.requests++
await this.env.RATE_LIMIT_KV.put(rateLimitKey, JSON.stringify(userData), {
expirationTtl: (resetTime - now) / 1000
})
}
}
🔐 Error Handling
The system implements a sophisticated error handling approach:
- Custom error types for different scenarios
- Serialization-safe error passing
- HTTP status code mapping
- Standardized error responses
┌─────────────────────────────────────────────────────┐
│ Error Hierarchy │
├─────────────────────────────────────────────────────┤
│ Base Error │
│ │ │
│ ┌───────────┴───────────┐ │
│ UnauthorizedError BadRequestError │
│ │ │ │
│ RateLimitError InterviewError │
└─────────────────────────────────────────────────────┘
All errors are serialized and mapped to HTTP status codes.
export class InterviewError extends Error {
constructor(
message: string,
errorCode = ErrorCodes.UNKNOWN,
statusCode = 400
) { }
}
📚 Glossary
Term | Description |
---|---|
Durable Object | Singleton compute + local storage |
Partykit | WebSocket infra over DOs |
Workers AI | Cloudflare’s edge inference engine |
Hono | Minimal routing framework |
Zod | Type-safe schema validation |
KV | Key-value storage for caching or rate limits |
🚀 Live Demo
Want to try it out?
👉 Visit www.preprecords.com to test the AI interview experience in real time.
⭐ Some Thoughts on Scaling This
Durable Objects have limits on storage size, so to support long-running or high-volume usage, you can shard and offload data over time:
🧱 Strategies for Scaling Storage
1. Shard Messages by Time
- Split messages across multiple Durable Objects or SQLite DBs
- For example: one DO per
user + year
orinterview + month
- Keeps each object lean and fast
2. Offload to R2 (Object Storage)
- Periodically move old messages or interviews to Cloudflare R2
- Set up a Cloudflare Alarm:
- Check message count or DB size
- If over threshold, move older messages to R2
- Store metadata so they can be rehydrated later
3. Archive Inactive Interviews
- After each message,
resetAlarm()
to 30 days in the future - On alarm trigger:
- If the interview has been inactive for 30+ days, move it to R2
- If the user revisits the interview later:
- Load metadata from D1 or R2
- Rehydrate it into a new (or temp) Durable Object
4. Use D1 for Lightweight Metadata
- Store high-level metadata (interview ID, timestamp, archive status) in Cloudflare D1
- This lets you:
-
Index/search interviews across users
-
Track which interviews were offloaded
-
Trigger background workers to archive/restore content
-
💬 Conclusion
Durable Objects make it easy to build stateful, scalable real-time apps with low latency and high resilience — all without centralized servers.
✅ Key Takeaways
- Durable Objects act like per-user compute + storage nodes.
- WebSocket + Partykit makes real-time sync trivial.
- Workers AI lets you run inference directly at the edge.
- Local SQLite per object makes scaling simple and fast.
- This setup avoids cold starts, supports refresh recovery, and simplifies auth/state.
You get a modern architecture that feels like…