Skip to content

Custom transport

RealtimeTransport is the seam between the session’s control logic and the wire. Swap it to wrap a third-party WebSocket library, add runtime proxy headers, or inject a mock for unit tests without a live connection. Both OpenAIVoiceAssistant and OpenAIGroundedVoiceAssistant accept transport: any RealtimeTransport.

For the built-in URLSessionWebSocketTransport that covers most production use cases see WebSocket transport.


public protocol RealtimeTransport: Sendable {
func connect() async throws // open the connection
func send(_ json: String) async throws // send one JSON frame
var events: AsyncStream<String> { get }// inbound frames; finishes on close
func close() async // close the connection and finish events
}

An actor satisfies Sendable automatically and prevents data races on shared state. The pattern mirrors URLSessionWebSocketTransport: a stored continuation drives events, connect/close guard against double-open/double-close, and send buffers frames so tests can assert on them.

import AgentSquad
public actor MockRealtimeTransport: RealtimeTransport {
public nonisolated let events: AsyncStream<String>
private nonisolated let continuation: AsyncStream<String>.Continuation
/// Frames sent by the session — inspect in assertions.
public private(set) var sentFrames: [String] = []
private var connected = false
public init() {
(self.events, self.continuation) = AsyncStream.makeStream(of: String.self)
}
public func connect() async throws {
guard !connected else { throw RealtimeTransportError.alreadyConnected }
connected = true
}
public func send(_ json: String) async throws {
guard connected else { throw RealtimeTransportError.notConnected }
sentFrames.append(json)
}
/// Simulate a server-sent frame (call from your test to drive the session).
public func receive(_ json: String) {
continuation.yield(json)
}
public func close() async {
connected = false
continuation.finish()
}
}

Both OpenAIVoiceAssistant and OpenAIGroundedVoiceAssistant accept transport: any RealtimeTransport, so substitution is a single call-site change:

// Production
let transport = URLSessionWebSocketTransport(apiKey: "sk-...")
// Tests / custom backend
let transport = MockRealtimeTransport()
let assistant = OpenAIVoiceAssistant(
name: "voice-assistant",
transport: transport, // <-- any RealtimeTransport
tools: myToolProvider,
userId: "u1",
sessionId: UUID().uuidString
)

Example: custom backend with extra headers

Section titled “Example: custom backend with extra headers”

For a non-OpenAI endpoint or a proxy that requires custom auth headers, implement RealtimeTransport directly rather than forking URLSessionWebSocketTransport.

import AgentSquad
import Foundation
public actor CustomWebSocketTransport: RealtimeTransport {
public nonisolated let events: AsyncStream<String>
private nonisolated let continuation: AsyncStream<String>.Continuation
private let request: URLRequest
private let session: URLSession
private var task: URLSessionWebSocketTask?
private var receiveLoop: Task<Void, Never>?
public init(url: URL, bearerToken: String, extraHeaders: [String: String] = [:]) {
var req = URLRequest(url: url)
req.setValue("Bearer \(bearerToken)", forHTTPHeaderField: "Authorization")
for (field, value) in extraHeaders { req.setValue(value, forHTTPHeaderField: field) }
self.request = req
self.session = .shared
(self.events, self.continuation) = AsyncStream.makeStream(of: String.self)
}
public func connect() async throws {
guard task == nil else { throw RealtimeTransportError.alreadyConnected }
let task = session.webSocketTask(with: request)
self.task = task
task.resume()
receiveLoop = Task { [weak self] in await self?.pump() }
}
public func send(_ json: String) async throws {
guard let task else { throw RealtimeTransportError.notConnected }
try await task.send(.string(json))
}
public func close() async {
receiveLoop?.cancel()
receiveLoop = nil
task?.cancel(with: .goingAway, reason: nil)
task = nil
continuation.finish()
}
private func pump() async {
guard let task else { return }
while !Task.isCancelled {
do {
switch try await task.receive() {
case .string(let text): continuation.yield(text)
case .data(let data): continuation.yield(String(decoding: data, as: UTF8.self))
@unknown default: break
}
} catch {
continuation.finish()
return
}
}
}
}