How I Built a Real-Time AI App with Cloudflare Durable Objects and Workers AI

⏱︎ 13 min read

A practical look at how to build a real-time, stateful AI interview app using Cloudflare Workers, Durable Objects, and Partykit — with per-user databases, streaming responses, and resilient WebSocket communication.

Want to see it in action?

🎙️ Live demo: www.preprecords.com

generating hint screenshot

🎯 What Problem Are We Trying to Solve?

We want to build a real-time AI interview app that can:

⚙️ How we solve this:

⚙️ TL;DR: Why Durable Objects?

Durable Objects are like tiny cloud computers, scoped to each user or interview. Each one has:

This makes them perfect for scalable, low-latency AI apps.

🗺️ System Architecture

Here’s what our real-time AI architecture looks like:

🧩 Durable Objects + Workers AI

Architecture Diagram of AI app using Durable Objects, Workers AI, and Partykit

🧱 Core Components

minion

🖥️ The Client: WebSocket Logic and Message Handling

Here’s how the client maintains a WebSocket connection, handles messages, and refreshes tokens when needed.

Client Browser

    └── WebSocket → /party/interview/abc?token=JWT


         ┌────────────────────────────┐
         │ Interview Durable Object   │
         │ (ID: interview-abc)        │
         └────────────────────────────┘

  const socket = usePartySocket({
    // party and room help route the request, e.g /party/interview/1
    party: 'interview',
    room: interviewId,
    
    host: process.env.WEBSOCKET_HOST,
    onOpen: () => {
      socket.send(JSON.stringify({ type: GET_ALL_MESSAGES }))
    },
    // refresh auth token
    onError: async () => {
      await wsToken.refresh()
    },
    query: () => tokenRef.current,  // get current auth token
    onMessage: (event) => {
      const message = WsInterviewResponseSchema.parse(JSON.parse(event.data))

      switch (message.type) {

        case INTERVIEW_UPDATED:
          // a specific interview was updated, refetch it
          break

        case INTERVIEW_MESSAGE:
          // 👉 upsert message, in case we're streaming a response
          setMessages((prev) => {
            if (prev.find((m) => m.id === message.id)) {
              return prev.map((m) => (m.id === message.id ? message : m))
            } else {
              return [...prev, message]
            }
          })
          break

        case INTERVIEW_STATE:
          // the interview state has changed, maybe the AI is thinking or transcribing. Show an appropriate message to the user
          break

        case RATELIMIT:
          // we have reached the rate limit
          break
      }
    }
  })

The Entry Point

We use Hono to handle API routing and Partykit to manage WebSocket connections.

import { Hono } from 'hono';
import { handleError } from './middleware/handleError';
import { interviewRouter } from './routes/interviewRouter';

const app = new Hono();
app.use('*', handleError);
app.route('/api/interview', interviewRouter);

export default {
  async fetch(request, env, ctx: ExecutionContext) {
    
    // if we have a valid partykit request, handle it
    const partyResponse = await routePartykitRequest(
      request,
      env as unknown as Record<string, string>,
      {
        // Authenticate websocket requests
        onBeforeConnect: async (request) => {
          try {
            const userId = await authethicateSocketRequest(request, env)
            request.headers.set('X-User-ID', userId)
          } catch (error) {
            return new Response('Unauthorized', { status: 401 })
          }
        }
      }
    )

    if (partyResponse) {
      return partyResponse
    }

    // Otherwise, handle with Hono
    return app.fetch(request, env, ctx)
  }
} satisfies ExportedHandler<Env>


export default { fetch: app.fetch, Interview, UserInterviews };

🗃 Durable Objects as Localized State Machines

Think of Durable Objects like folders with a database inside:

For each user we have a userInterviews durable object with a sqlite database that stores all the interviews for that user

📂 userInterviews-1   # interviews for user 1
  💾 interviews.sqlite
📂 userInterviews-2   # interviews for user 2
  💾 interviews.sqlite
📂 userInterviews-3   # interviews for user 3
  💾 interviews.sqlite

For each interview that belongs to a user, we have an interview durable object with a sqlite database that stores all the messages for that interview

 📂 interview-1a      # interview messages for user 1, interview a
    💾 messages.sqlite
 📂 interview-1b      # interview messages for user 1, interview b
    💾 messages.sqlite
 📂 interview-2a      # interview messages for user 2, interview a
    💾 messages.sqlite

Each Durable Object is a singleton, automatically routed by ID (e.g. userId or interviewId).

When a request is made to “get all interviews for user 1”:

// wrangler.json (configuration file)
"durable_objects": { 
  "bindings": [
    {
      "name": "user_interviews",
      "class_name": "UserInterviews" 
    }
  ]
},

// router.ts
// Hono Router
router.get('/interviews', async (ctx) => {
  const { userId } = ctx.get('jwtPayload')

  // get the durable object id for the user
  const durableObjectId = ctx.env.user_interviews.idFromName(userId)

  // get the userInterviews DO
  const userInterviewDurableObject = ctx.env.user_interviews.get(durableObjectId)

  // RPC call to get all interviews for the user
  const interviews = await userInterviewsDO.getAllInterviews({ userId })
  return ctx.json({ success: true, interviews })
})

🧵 UserInterviews Durable Object (Per User)

This DO manages a per-user SQLite DB, CRUD for interviews, and emits WebSocket broadcasts.

Handles:


// lib/messages
export const wsInterviewCreatedMessage = (interview: InterviewT) =>
  JSON.stringify({
    type: INTERVIEW_CREATED,
    interviewId: interview.id
  } satisfies WsInterviewCreated)


// durableObjects/userInterviews.ts
import { Server, Connection, ConnectionContext } from 'partyserver'
import { wsInterviewCreatedMessage } from '@/lib/messages'

export class UserInterviews extends Server<Env> {
    static options = { hibernate: true }
    private readonly db: InterviewDB

    constructor(ctx: DurableObjectState, env: Env) {
      super(ctx, env)
      this.db = new InterviewDB(ctx.storage.sql)
    }

    async createInterview(interview: CreateInterview): Promise<InterviewT> {
      const createdInterview: InterviewT = this.db.createInterview(interview)

      // broadcast to all connected clients
      this.broadcast(wsInterviewCreatedMessage(createdInterview))

      // store in D1
      await this.globalDB.createInterview(createdInterview)

      return createdInterview
    }
}

🧵 Interview Durable Object (Per Interview)

Handles:

// durableObjects/interview.ts
export class Interview extends Server<Env> {
  static options = {
    hibernate: true
  }

  private readonly db: MessagesDB
  private readonly interviewAI: InterviewAI
  private interview: InterviewT | null = null


  constructor(ctx: DurableObjectState, env: Env) {
    super(ctx, env)
    this.interviewAI = new InterviewAI(env)
    this.rateLimit = new RateLimit(env)
    this.db = new MessagesDB(ctx.storage.sql)
  }

Interview Object State Machine

interview object state machine
type WsInterviewStateT = 
   | 'idle'
   | 'ai.extracting.skills'   
   | 'ai.generating.message'
   | 'ai.transcribing.text'
   | 'ai.generating.hint'


export class Interview extends Server<Env> {
  static options = { hibernate: true }

  private readonly db: MessagesDB
  private readonly interviewAI: InterviewAI

  // are we processing a message, transcribing, or idle?
  state: WsInterviewStateT = 'idle'

  constructor(ctx: DurableObjectState, env: Env) {
    super(ctx, env)
    this.interviewAI = new InterviewAI(env)
    this.rateLimit = new RateLimit(env)
    this.db = new MessagesDB(ctx.storage.sql)
  }

  async onConnect(conn: Conn, ctx: ConnectionContext): Promise<void> {
    const url = new URL(ctx.request.url)
    const interviewId = url.pathname.split('/').pop()
    
    // Hono auth middleware should have added the user ID to the headers
    const userId = ctx.request.headers.get('X-User-ID')
    
    if (!userId || !interviewId) {
      throw new UnauthorizedError()
    }
    
    conn.setState({ userId, interviewId })
    
    // Send the current state to the client on connect
    conn.send(wsInterviewStateMessage(this.state))
  }

🎤 Real-time Audio → Text → AI Response

yeaaaah

// durableObjects/interview.ts
async onMessage(conn: Conn, message: WSMessage) {
  if (message instanceof ArrayBuffer) {
    this.handleAudioMessage(conn,  message)
  } else if (typeof message === 'string') {
    this.handleTextMessage(conn, message)
  }
}

Text messages are parsed with a zod schema

async handleTextMessage(conn: Conn, message: string) {
  try {
    const data = WsMessageSchema.parse(JSON.parse(message))
    switch (data.type) {
      case GET_ALL_MESSAGES:
        // Fetch and return all messages for the current interview
        break
      
      case USER_TYPING:
      case USER_RECORDING:
        // reset the hint alarm
        break

        
      // NOTE: This would't happen because parsing would have failed
      case 'unknown': 
        
      
    }
  } catch (error) {
    this.handleWebSocketError(conn, error)
  }
}

Audio message are transcribed by the AI


  setState(state: WsInterviewStateT) {
    this.state = state
    this.broadcast(wsInterviewStateMessage(state))
  }

  async handleAudioMessage(
    conn: Conn,
    message: ArrayBuffer
  ) {
    const audio = new Uint8Array(message)

    // store and broadcast the current state
    this.setState(AI_TRANSCRIBING_TEXT)
    const transcribedText = await this.interviewAI.transcribeAudio(audio)
    
    // store message in the DB
    const upMessage = this.upsertMessage({
      id: crypto.randomUUID(),
      userId,
      messageType: MessageType.ANSWER,
      role: MessageRole.USER,
      content: transcribedText
    })

    // update 
    this.setState(AI_GENERATING_MESSAGE)
    
    // broadcast the transcribed message
    this.broadcast(wsInterviewMessage(upMessage))
    
    // Stream a respnse from the AI to the client
    await this.streamInterviewQuestion(userId, this.interview)
}

⏰ AI Hints with Alarms

  /**
   * Reset the hint alarm and schedule a hint.
   * Hint will be generated using the last user message.
   */
  async resetHintAlarm(interview: InterviewT) {
    const { id: interviewId, userId } = interview

    const alarmMeta: AlarmMeta = {
      type: 'question-hint',
      userId,
      interviewId,
      autoHintScheduled: true
    }

    await this.ctx.storage.put(this.alarmHintKey, alarmMeta)
    await this.ctx.storage.deleteAlarm()
    
    // set the alarm for 3 minutes from now
    await this.ctx.storage.setAlarm(Date.now() + this.hintAlarmMs)
  }
  /**
   * Handles the alarm event.
   */
  async alarm() {
    const hintAlarm = await this.getHintAlarmMeta()

    if (hintAlarm?.autoHintScheduled) {
      await this.provideHint(hintAlarm.userId)
      await this.deleteHintAlarmMeta()
    }
  }
}

AI responses are streamed to the client

// 📂 @services/interviewAI.ts
import { generateObject, generateText, streamText } from 'ai'

/**
 * Streams a chat response from the AI.
 */
streamChatResponse(interview: InterviewT, messages: InterviewMessageT[]) {
  const result = streamText({
    model: this.workersAI('@cf/meta/llama-3.1-8b-instruct'),
    system: InterviewAI.SystemPrompt.interview(interview),
    messages: messages.map((m) => ({ role: m.role, content: m.content }))
  })

  return result.textStream
}
// 📂 @durableObjects/interview.ts
  async streamInterviewQuestion(userId: string, interview: InterviewT) {
    const messages = this.getAllMessages({ userId })
    const textStream = this.interviewAI.streamChatResponse(interview, messages)

    const aiMessage = {
      id: crypto.randomUUID(),
      role: MessageRole.AI,
      messageType: MessageType.QUESTION,
      userId
    }

    let buffer = ''
    for await (const chunk of textStream) {
      buffer += chunk
      this.broadcast(wsInterviewMessage({ ...aiMessage, content: buffer + '...' }))
    }

    const message = this.upsertMessage({ ...aiMessage, content: buffer })
    this.broadcast(wsInterviewMessage(message))
  }

AI gateway is used to manage AI requests

InterviewAI Service

┌────────────────────────────────────────────┐
│              InterviewAI                   │
├────────────────────────────────────────────┤
│ Dependencies:                              │
│  - Workers AI                              │
│  - Message Templates                       │
│  - Rate Limiter                            │
├────────────────────────────────────────────┤
│ Core Functions:                            │
│  - Text Generation                         │
│  - Skill Extraction                        │
│  - Context Management                      │
│  - Stream Processing                       │
└────────────────────────────────────────────┘

Implementation Details:

export class InterviewAI {
  private readonly workersAI: WorkersAI

  constructor(private readonly env: Env) {
    this.workersAI = createWorkersAI({
      gateway: { id: this.env.AI_GATEWAY_ID },
      binding: env.AI
    })
  }

  // AI processing methods
  async generateResponse(...)
  async extractSkills(...)
  async streamResponse(...)
}

Security

🛡 Auth over WebSockets

We can’t send headers on WebSocket connections, so we use short-lived tokens in the query string.

Client GETs /auth/token JWT (5 min)
Client connects: ws://...?token=JWT
Server validates attaches userId to conn

If the token is invalid, the DO closes the connection and the client fetches a new token.

⚖️ Rate Limiting with KV

┌────────────────────────────────────────┐
│       Rate Limiting Service            │
├────────────────────────────────────────┤
│ Storage:                               │
│  - KV Namespace                        │
├────────────────────────────────────────┤
│ Configuration:                         │
│  - Max Requests: 100                   │
│  - Time Window: 24 hours               │
├────────────────────────────────────────┤
│ Methods:                               │
│  - checkRateLimit()                    │
│  - resetLimit()                        │
│  - getRemainingRequests()              │
└────────────────────────────────────────┘

Implementation:

export class RateLimit {
  private readonly maxRequests = 100
  private readonly timeWindow = 24 * 60 * 60 * 1000

  async checkRateLimit(userId: string) {
    const rateLimitKey = `${this.prefix}${userId}`
    let userData = await this.env.RATE_LIMIT_KV.get<UserData>( rateLimitKey, 'json')
    
    //... Rate limit logic
    if (userData.requests >= this.maxRequests) {
      throw new RateLimitError(this.maxRequests, resetTime)
    }
    
    userData.requests++
    await this.env.RATE_LIMIT_KV.put(rateLimitKey, JSON.stringify(userData), {
      expirationTtl: (resetTime - now) / 1000
    })
  }
  
}

🔐 Error Handling

The system implements a sophisticated error handling approach:

┌─────────────────────────────────────────────────────┐
│                  Error Hierarchy                    │
├─────────────────────────────────────────────────────┤
│                   Base Error                        │
│                       │                             │
│           ┌───────────┴───────────┐                 │
│     UnauthorizedError       BadRequestError         │
│           │                       │                 │
│    RateLimitError           InterviewError          │
└─────────────────────────────────────────────────────┘

All errors are serialized and mapped to HTTP status codes.

export class InterviewError extends Error {
  constructor(
    message: string,
    errorCode = ErrorCodes.UNKNOWN,
    statusCode = 400
  ) { }
}

📚 Glossary

TermDescription
Durable ObjectSingleton compute + local storage
PartykitWebSocket infra over DOs
Workers AICloudflare’s edge inference engine
HonoMinimal routing framework
ZodType-safe schema validation
KVKey-value storage for caching or rate limits

🚀 Live Demo

Want to try it out?

👉 Visit www.preprecords.com to test the AI interview experience in real time.


⭐ Some Thoughts on Scaling This

Durable Objects have limits on storage size, so to support long-running or high-volume usage, you can shard and offload data over time:

🧱 Strategies for Scaling Storage

1. Shard Messages by Time

2. Offload to R2 (Object Storage)

3. Archive Inactive Interviews

4. Use D1 for Lightweight Metadata

💬 Conclusion

Durable Objects make it easy to build stateful, scalable real-time apps with low latency and high resilience — all without centralized servers.

Key Takeaways

You get a modern architecture that feels like…

magic