Custom transport
RealtimeTransport is the seam between the session’s control logic and the wire. Swap it to wrap a third-party WebSocket library, add runtime proxy headers, or inject a mock for unit tests without a live connection. Both OpenAIVoiceAssistant and OpenAIGroundedVoiceAssistant accept transport: any RealtimeTransport.
For the built-in URLSessionWebSocketTransport that covers most production use cases see WebSocket transport.
Protocol requirements
Section titled “Protocol requirements”public protocol RealtimeTransport: Sendable { func connect() async throws // open the connection func send(_ json: String) async throws // send one JSON frame var events: AsyncStream<String> { get }// inbound frames; finishes on close func close() async // close the connection and finish events}Example: mock transport for unit tests
Section titled “Example: mock transport for unit tests”An actor satisfies Sendable automatically and prevents data races on shared state. The pattern mirrors URLSessionWebSocketTransport: a stored continuation drives events, connect/close guard against double-open/double-close, and send buffers frames so tests can assert on them.
import AgentSquad
public actor MockRealtimeTransport: RealtimeTransport { public nonisolated let events: AsyncStream<String> private nonisolated let continuation: AsyncStream<String>.Continuation
/// Frames sent by the session — inspect in assertions. public private(set) var sentFrames: [String] = [] private var connected = false
public init() { (self.events, self.continuation) = AsyncStream.makeStream(of: String.self) }
public func connect() async throws { guard !connected else { throw RealtimeTransportError.alreadyConnected } connected = true }
public func send(_ json: String) async throws { guard connected else { throw RealtimeTransportError.notConnected } sentFrames.append(json) }
/// Simulate a server-sent frame (call from your test to drive the session). public func receive(_ json: String) { continuation.yield(json) }
public func close() async { connected = false continuation.finish() }}Injecting into a voice assistant
Section titled “Injecting into a voice assistant”Both OpenAIVoiceAssistant and OpenAIGroundedVoiceAssistant accept transport: any RealtimeTransport, so substitution is a single call-site change:
// Productionlet transport = URLSessionWebSocketTransport(apiKey: "sk-...")
// Tests / custom backendlet transport = MockRealtimeTransport()
let assistant = OpenAIVoiceAssistant( name: "voice-assistant", transport: transport, // <-- any RealtimeTransport tools: myToolProvider, userId: "u1", sessionId: UUID().uuidString)Example: custom backend with extra headers
Section titled “Example: custom backend with extra headers”For a non-OpenAI endpoint or a proxy that requires custom auth headers, implement RealtimeTransport directly rather than forking URLSessionWebSocketTransport.
import AgentSquadimport Foundation
public actor CustomWebSocketTransport: RealtimeTransport { public nonisolated let events: AsyncStream<String> private nonisolated let continuation: AsyncStream<String>.Continuation
private let request: URLRequest private let session: URLSession private var task: URLSessionWebSocketTask? private var receiveLoop: Task<Void, Never>?
public init(url: URL, bearerToken: String, extraHeaders: [String: String] = [:]) { var req = URLRequest(url: url) req.setValue("Bearer \(bearerToken)", forHTTPHeaderField: "Authorization") for (field, value) in extraHeaders { req.setValue(value, forHTTPHeaderField: field) } self.request = req self.session = .shared (self.events, self.continuation) = AsyncStream.makeStream(of: String.self) }
public func connect() async throws { guard task == nil else { throw RealtimeTransportError.alreadyConnected } let task = session.webSocketTask(with: request) self.task = task task.resume() receiveLoop = Task { [weak self] in await self?.pump() } }
public func send(_ json: String) async throws { guard let task else { throw RealtimeTransportError.notConnected } try await task.send(.string(json)) }
public func close() async { receiveLoop?.cancel() receiveLoop = nil task?.cancel(with: .goingAway, reason: nil) task = nil continuation.finish() }
private func pump() async { guard let task else { return } while !Task.isCancelled { do { switch try await task.receive() { case .string(let text): continuation.yield(text) case .data(let data): continuation.yield(String(decoding: data, as: UTF8.self)) @unknown default: break } } catch { continuation.finish() return } } }}Related pages
Section titled “Related pages”- Voice overview —
RealtimeRuntime, protocols, and event reference - WebSocket transport —
URLSessionWebSocketTransportreference - OpenAIVoiceAssistant — single-LLM built-in
- OpenAIGroundedVoiceAssistant — grounded two-phase built-in