SessionsWebSocket.ts
remote/SessionsWebSocket.ts
405
Lines
12505
Bytes
2
Exports
10
Imports
10
Keywords
What this is
This page documents one file from the repository and includes its full source so you can read it without leaving the docs site.
Beginner explanation
This file is one piece of the larger system. Its name, directory, imports, and exports show where it fits. Start by reading the exports and related files first.
How it is used
Start from the exports list and related files. Those are the easiest clues for where this file fits into the system.
Expert explanation
Architecturally, this file intersects with session-engine, remote-bridge. It contains 405 lines, 10 detected imports, and 2 detected exports.
Important relationships
Detected exports
SessionsWebSocketCallbacksSessionsWebSocket
Keywords
sessionswebsocketvoidlogfordebuggingmessageprivatecloseconnectedwebsocketdatacallbacks
Detected imports
crypto../constants/oauth.js../entrypoints/agentSdkTypes.js../entrypoints/sdk/controlTypes.js../utils/debug.js../utils/errors.js../utils/log.js../utils/mtls.js../utils/proxy.js../utils/slowOperations.js
Source notes
This page embeds the full file contents. Small or leaf files are still indexed honestly instead of being over-explained.
Full source
import { randomUUID } from 'crypto'
import { getOauthConfig } from '../constants/oauth.js'
import type { SDKMessage } from '../entrypoints/agentSdkTypes.js'
import type {
SDKControlCancelRequest,
SDKControlRequest,
SDKControlRequestInner,
SDKControlResponse,
} from '../entrypoints/sdk/controlTypes.js'
import { logForDebugging } from '../utils/debug.js'
import { errorMessage } from '../utils/errors.js'
import { logError } from '../utils/log.js'
import { getWebSocketTLSOptions } from '../utils/mtls.js'
import { getWebSocketProxyAgent, getWebSocketProxyUrl } from '../utils/proxy.js'
import { jsonParse, jsonStringify } from '../utils/slowOperations.js'
const RECONNECT_DELAY_MS = 2000
const MAX_RECONNECT_ATTEMPTS = 5
const PING_INTERVAL_MS = 30000
/**
* Maximum retries for 4001 (session not found). During compaction the
* server may briefly consider the session stale; a short retry window
* lets the client recover without giving up permanently.
*/
const MAX_SESSION_NOT_FOUND_RETRIES = 3
/**
* WebSocket close codes that indicate a permanent server-side rejection.
* The client stops reconnecting immediately.
* Note: 4001 (session not found) is handled separately with limited
* retries since it can be transient during compaction.
*/
const PERMANENT_CLOSE_CODES = new Set([
4003, // unauthorized
])
type WebSocketState = 'connecting' | 'connected' | 'closed'
type SessionsMessage =
| SDKMessage
| SDKControlRequest
| SDKControlResponse
| SDKControlCancelRequest
function isSessionsMessage(value: unknown): value is SessionsMessage {
if (typeof value !== 'object' || value === null || !('type' in value)) {
return false
}
// Accept any message with a string `type` field. Downstream handlers
// (sdkMessageAdapter, RemoteSessionManager) decide what to do with
// unknown types. A hardcoded allowlist here would silently drop new
// message types the backend starts sending before the client is updated.
return typeof value.type === 'string'
}
export type SessionsWebSocketCallbacks = {
onMessage: (message: SessionsMessage) => void
onClose?: () => void
onError?: (error: Error) => void
onConnected?: () => void
/** Fired when a transient close is detected and a reconnect is scheduled.
* onClose fires only for permanent close (server ended / attempts exhausted). */
onReconnecting?: () => void
}
// Common interface between globalThis.WebSocket and ws.WebSocket
type WebSocketLike = {
close(): void
send(data: string): void
ping?(): void // Bun & ws both support this
}
/**
* WebSocket client for connecting to CCR sessions via /v1/sessions/ws/{id}/subscribe
*
* Protocol:
* 1. Connect to wss://api.anthropic.com/v1/sessions/ws/{sessionId}/subscribe?organization_uuid=...
* 2. Send auth message: { type: 'auth', credential: { type: 'oauth', token: '...' } }
* 3. Receive SDKMessage stream from the session
*/
export class SessionsWebSocket {
private ws: WebSocketLike | null = null
private state: WebSocketState = 'closed'
private reconnectAttempts = 0
private sessionNotFoundRetries = 0
private pingInterval: NodeJS.Timeout | null = null
private reconnectTimer: NodeJS.Timeout | null = null
constructor(
private readonly sessionId: string,
private readonly orgUuid: string,
private readonly getAccessToken: () => string,
private readonly callbacks: SessionsWebSocketCallbacks,
) {}
/**
* Connect to the sessions WebSocket endpoint
*/
async connect(): Promise<void> {
if (this.state === 'connecting') {
logForDebugging('[SessionsWebSocket] Already connecting')
return
}
this.state = 'connecting'
const baseUrl = getOauthConfig().BASE_API_URL.replace('https://', 'wss://')
const url = `${baseUrl}/v1/sessions/ws/${this.sessionId}/subscribe?organization_uuid=${this.orgUuid}`
logForDebugging(`[SessionsWebSocket] Connecting to ${url}`)
// Get fresh token for each connection attempt
const accessToken = this.getAccessToken()
const headers = {
Authorization: `Bearer ${accessToken}`,
'anthropic-version': '2023-06-01',
}
if (typeof Bun !== 'undefined') {
// Bun's WebSocket supports headers/proxy options but the DOM typings don't
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
const ws = new globalThis.WebSocket(url, {
headers,
proxy: getWebSocketProxyUrl(url),
tls: getWebSocketTLSOptions() || undefined,
} as unknown as string[])
this.ws = ws
ws.addEventListener('open', () => {
logForDebugging(
'[SessionsWebSocket] Connection opened, authenticated via headers',
)
this.state = 'connected'
this.reconnectAttempts = 0
this.sessionNotFoundRetries = 0
this.startPingInterval()
this.callbacks.onConnected?.()
})
ws.addEventListener('message', (event: MessageEvent) => {
const data =
typeof event.data === 'string' ? event.data : String(event.data)
this.handleMessage(data)
})
ws.addEventListener('error', () => {
const err = new Error('[SessionsWebSocket] WebSocket error')
logError(err)
this.callbacks.onError?.(err)
})
// eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
ws.addEventListener('close', (event: CloseEvent) => {
logForDebugging(
`[SessionsWebSocket] Closed: code=${event.code} reason=${event.reason}`,
)
this.handleClose(event.code)
})
ws.addEventListener('pong', () => {
logForDebugging('[SessionsWebSocket] Pong received')
})
} else {
const { default: WS } = await import('ws')
const ws = new WS(url, {
headers,
agent: getWebSocketProxyAgent(url),
...getWebSocketTLSOptions(),
})
this.ws = ws
ws.on('open', () => {
logForDebugging(
'[SessionsWebSocket] Connection opened, authenticated via headers',
)
// Auth is handled via headers, so we're immediately connected
this.state = 'connected'
this.reconnectAttempts = 0
this.sessionNotFoundRetries = 0
this.startPingInterval()
this.callbacks.onConnected?.()
})
ws.on('message', (data: Buffer) => {
this.handleMessage(data.toString())
})
ws.on('error', (err: Error) => {
logError(new Error(`[SessionsWebSocket] Error: ${err.message}`))
this.callbacks.onError?.(err)
})
ws.on('close', (code: number, reason: Buffer) => {
logForDebugging(
`[SessionsWebSocket] Closed: code=${code} reason=${reason.toString()}`,
)
this.handleClose(code)
})
ws.on('pong', () => {
logForDebugging('[SessionsWebSocket] Pong received')
})
}
}
/**
* Handle incoming WebSocket message
*/
private handleMessage(data: string): void {
try {
const message: unknown = jsonParse(data)
// Forward SDK messages to callback
if (isSessionsMessage(message)) {
this.callbacks.onMessage(message)
} else {
logForDebugging(
`[SessionsWebSocket] Ignoring message type: ${typeof message === 'object' && message !== null && 'type' in message ? String(message.type) : 'unknown'}`,
)
}
} catch (error) {
logError(
new Error(
`[SessionsWebSocket] Failed to parse message: ${errorMessage(error)}`,
),
)
}
}
/**
* Handle WebSocket close
*/
private handleClose(closeCode: number): void {
this.stopPingInterval()
if (this.state === 'closed') {
return
}
this.ws = null
const previousState = this.state
this.state = 'closed'
// Permanent codes: stop reconnecting — server has definitively ended the session
if (PERMANENT_CLOSE_CODES.has(closeCode)) {
logForDebugging(
`[SessionsWebSocket] Permanent close code ${closeCode}, not reconnecting`,
)
this.callbacks.onClose?.()
return
}
// 4001 (session not found) can be transient during compaction: the
// server may briefly consider the session stale while the CLI worker
// is busy with the compaction API call and not emitting events.
if (closeCode === 4001) {
this.sessionNotFoundRetries++
if (this.sessionNotFoundRetries > MAX_SESSION_NOT_FOUND_RETRIES) {
logForDebugging(
`[SessionsWebSocket] 4001 retry budget exhausted (${MAX_SESSION_NOT_FOUND_RETRIES}), not reconnecting`,
)
this.callbacks.onClose?.()
return
}
this.scheduleReconnect(
RECONNECT_DELAY_MS * this.sessionNotFoundRetries,
`4001 attempt ${this.sessionNotFoundRetries}/${MAX_SESSION_NOT_FOUND_RETRIES}`,
)
return
}
// Attempt reconnection if we were connected
if (
previousState === 'connected' &&
this.reconnectAttempts < MAX_RECONNECT_ATTEMPTS
) {
this.reconnectAttempts++
this.scheduleReconnect(
RECONNECT_DELAY_MS,
`attempt ${this.reconnectAttempts}/${MAX_RECONNECT_ATTEMPTS}`,
)
} else {
logForDebugging('[SessionsWebSocket] Not reconnecting')
this.callbacks.onClose?.()
}
}
private scheduleReconnect(delay: number, label: string): void {
this.callbacks.onReconnecting?.()
logForDebugging(
`[SessionsWebSocket] Scheduling reconnect (${label}) in ${delay}ms`,
)
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null
void this.connect()
}, delay)
}
private startPingInterval(): void {
this.stopPingInterval()
this.pingInterval = setInterval(() => {
if (this.ws && this.state === 'connected') {
try {
this.ws.ping?.()
} catch {
// Ignore ping errors, close handler will deal with connection issues
}
}
}, PING_INTERVAL_MS)
}
/**
* Stop ping interval
*/
private stopPingInterval(): void {
if (this.pingInterval) {
clearInterval(this.pingInterval)
this.pingInterval = null
}
}
/**
* Send a control response back to the session
*/
sendControlResponse(response: SDKControlResponse): void {
if (!this.ws || this.state !== 'connected') {
logError(new Error('[SessionsWebSocket] Cannot send: not connected'))
return
}
logForDebugging('[SessionsWebSocket] Sending control response')
this.ws.send(jsonStringify(response))
}
/**
* Send a control request to the session (e.g., interrupt)
*/
sendControlRequest(request: SDKControlRequestInner): void {
if (!this.ws || this.state !== 'connected') {
logError(new Error('[SessionsWebSocket] Cannot send: not connected'))
return
}
const controlRequest: SDKControlRequest = {
type: 'control_request',
request_id: randomUUID(),
request,
}
logForDebugging(
`[SessionsWebSocket] Sending control request: ${request.subtype}`,
)
this.ws.send(jsonStringify(controlRequest))
}
/**
* Check if connected
*/
isConnected(): boolean {
return this.state === 'connected'
}
/**
* Close the WebSocket connection
*/
close(): void {
logForDebugging('[SessionsWebSocket] Closing connection')
this.state = 'closed'
this.stopPingInterval()
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
if (this.ws) {
// Null out event handlers to prevent race conditions during reconnect.
// Under Bun (native WebSocket), onX handlers are the clean way to detach.
// Under Node (ws package), the listeners were attached with .on() in connect(),
// but since we're about to close and null out this.ws, no cleanup is needed.
this.ws.close()
this.ws = null
}
}
/**
* Force reconnect - closes existing connection and establishes a new one.
* Useful when the subscription becomes stale (e.g., after container shutdown).
*/
reconnect(): void {
logForDebugging('[SessionsWebSocket] Force reconnecting')
this.reconnectAttempts = 0
this.sessionNotFoundRetries = 0
this.close()
// Small delay before reconnecting (stored in reconnectTimer so it can be cancelled)
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null
void this.connect()
}, 500)
}
}