messageQueueManager.ts
utils/messageQueueManager.ts
No strong subsystem tag
548
Lines
16563
Bytes
32
Exports
12
Imports
10
Keywords
What this is
This page documents one file from the repository and includes its full source so you can read it without leaving the docs site.
Beginner explanation
This file is one piece of the larger system. Its name, directory, imports, and exports show where it fits. Start by reading the exports and related files first.
How it is used
Start from the exports list and related files. Those are the easiest clues for where this file fits into the system.
Expert explanation
Architecturally, this file intersects with general runtime concerns. It contains 548 lines, 12 detected imports, and 32 detected exports.
Important relationships
Detected exports
SetAppStatesubscribeToCommandQueuegetCommandQueueSnapshotgetCommandQueuegetCommandQueueLengthhasCommandsInQueuerecheckCommandQueueenqueueenqueuePendingNotificationdequeuedequeueAllpeekdequeueAllMatchingremoveremoveByFilterclearCommandQueueresetCommandQueueisPromptInputModeEditableisQueuedCommandEditableisQueuedCommandVisiblePopAllEditableResultpopAllEditablesubscribeToPendingNotificationsgetPendingNotificationsSnapshothasPendingNotificationsgetPendingNotificationsCountrecheckPendingNotificationsdequeuePendingNotificationresetPendingNotificationsclearPendingNotificationsgetCommandsByMaxPriorityisSlashCommand
Keywords
commandqueuecommandslengthqueuedcommandcommandpriorityqueueimagesnotifysubscribersfilter
Detected imports
bun:bundle@anthropic-ai/sdk/resources/messages.mjssrc/types/utils.js../bootstrap/state.js../state/AppState.js../types/messageQueueTypes.js../types/textInputTypes.js./config.js./messages.js./objectGroupBy.js./sessionStorage.js./signal.js
Source notes
This page embeds the full file contents. Small or leaf files are still indexed honestly instead of being over-explained.
Full source
import { feature } from 'bun:bundle'
import type { ContentBlockParam } from '@anthropic-ai/sdk/resources/messages.mjs'
import type { Permutations } from 'src/types/utils.js'
import { getSessionId } from '../bootstrap/state.js'
import type { AppState } from '../state/AppState.js'
import type {
QueueOperation,
QueueOperationMessage,
} from '../types/messageQueueTypes.js'
import type {
EditablePromptInputMode,
PromptInputMode,
QueuedCommand,
QueuePriority,
} from '../types/textInputTypes.js'
import type { PastedContent } from './config.js'
import { extractTextContent } from './messages.js'
import { objectGroupBy } from './objectGroupBy.js'
import { recordQueueOperation } from './sessionStorage.js'
import { createSignal } from './signal.js'
export type SetAppState = (f: (prev: AppState) => AppState) => void
// ============================================================================
// Logging helper
// ============================================================================
function logOperation(operation: QueueOperation, content?: string): void {
const sessionId = getSessionId()
const queueOp: QueueOperationMessage = {
type: 'queue-operation',
operation,
timestamp: new Date().toISOString(),
sessionId,
...(content !== undefined && { content }),
}
void recordQueueOperation(queueOp)
}
// ============================================================================
// Unified command queue (module-level, independent of React state)
//
// All commands — user input, task notifications, orphaned permissions — go
// through this single queue. React components subscribe via
// useSyncExternalStore (subscribeToCommandQueue / getCommandQueueSnapshot).
// Non-React code (print.ts streaming loop) reads directly via
// getCommandQueue() / getCommandQueueLength().
//
// Priority determines dequeue order: 'now' > 'next' > 'later'.
// Within the same priority, commands are processed FIFO.
// ============================================================================
const commandQueue: QueuedCommand[] = []
/** Frozen snapshot — recreated on every mutation for useSyncExternalStore. */
let snapshot: readonly QueuedCommand[] = Object.freeze([])
const queueChanged = createSignal()
function notifySubscribers(): void {
snapshot = Object.freeze([...commandQueue])
queueChanged.emit()
}
// ============================================================================
// useSyncExternalStore interface
// ============================================================================
/**
* Subscribe to command queue changes.
* Compatible with React's useSyncExternalStore.
*/
export const subscribeToCommandQueue = queueChanged.subscribe
/**
* Get current snapshot of the command queue.
* Compatible with React's useSyncExternalStore.
* Returns a frozen array that only changes reference on mutation.
*/
export function getCommandQueueSnapshot(): readonly QueuedCommand[] {
return snapshot
}
// ============================================================================
// Read operations (for non-React code)
// ============================================================================
/**
* Get a mutable copy of the current queue.
* Use for one-off reads where you need the actual commands.
*/
export function getCommandQueue(): QueuedCommand[] {
return [...commandQueue]
}
/**
* Get the current queue length without copying.
*/
export function getCommandQueueLength(): number {
return commandQueue.length
}
/**
* Check if there are commands in the queue.
*/
export function hasCommandsInQueue(): boolean {
return commandQueue.length > 0
}
/**
* Trigger a re-check by notifying subscribers.
* Use after async processing completes to ensure remaining commands
* are picked up by useSyncExternalStore consumers.
*/
export function recheckCommandQueue(): void {
if (commandQueue.length > 0) {
notifySubscribers()
}
}
// ============================================================================
// Write operations
// ============================================================================
/**
* Add a command to the queue.
* Used for user-initiated commands (prompt, bash, orphaned-permission).
* Defaults priority to 'next' (processed before task notifications).
*/
export function enqueue(command: QueuedCommand): void {
commandQueue.push({ ...command, priority: command.priority ?? 'next' })
notifySubscribers()
logOperation(
'enqueue',
typeof command.value === 'string' ? command.value : undefined,
)
}
/**
* Add a task notification to the queue.
* Convenience wrapper that defaults priority to 'later' so user input
* is never starved by system messages.
*/
export function enqueuePendingNotification(command: QueuedCommand): void {
commandQueue.push({ ...command, priority: command.priority ?? 'later' })
notifySubscribers()
logOperation(
'enqueue',
typeof command.value === 'string' ? command.value : undefined,
)
}
const PRIORITY_ORDER: Record<QueuePriority, number> = {
now: 0,
next: 1,
later: 2,
}
/**
* Remove and return the highest-priority command, or undefined if empty.
* Within the same priority level, commands are dequeued FIFO.
*
* An optional `filter` narrows the candidates: only commands for which the
* predicate returns `true` are considered. Non-matching commands stay in the
* queue untouched. This lets between-turn drains (SDK, REPL) restrict to
* main-thread commands (`cmd.agentId === undefined`) without restructuring
* the existing while-loop patterns.
*/
export function dequeue(
filter?: (cmd: QueuedCommand) => boolean,
): QueuedCommand | undefined {
if (commandQueue.length === 0) {
return undefined
}
// Find the first command with the highest priority (respecting filter)
let bestIdx = -1
let bestPriority = Infinity
for (let i = 0; i < commandQueue.length; i++) {
const cmd = commandQueue[i]!
if (filter && !filter(cmd)) continue
const priority = PRIORITY_ORDER[cmd.priority ?? 'next']
if (priority < bestPriority) {
bestIdx = i
bestPriority = priority
}
}
if (bestIdx === -1) return undefined
const [dequeued] = commandQueue.splice(bestIdx, 1)
notifySubscribers()
logOperation('dequeue')
return dequeued
}
/**
* Remove and return all commands from the queue.
* Logs a dequeue operation for each command.
*/
export function dequeueAll(): QueuedCommand[] {
if (commandQueue.length === 0) {
return []
}
const commands = [...commandQueue]
commandQueue.length = 0
notifySubscribers()
for (const _cmd of commands) {
logOperation('dequeue')
}
return commands
}
/**
* Return the highest-priority command without removing it, or undefined if empty.
* Accepts an optional `filter` — only commands passing the predicate are considered.
*/
export function peek(
filter?: (cmd: QueuedCommand) => boolean,
): QueuedCommand | undefined {
if (commandQueue.length === 0) {
return undefined
}
let bestIdx = -1
let bestPriority = Infinity
for (let i = 0; i < commandQueue.length; i++) {
const cmd = commandQueue[i]!
if (filter && !filter(cmd)) continue
const priority = PRIORITY_ORDER[cmd.priority ?? 'next']
if (priority < bestPriority) {
bestIdx = i
bestPriority = priority
}
}
if (bestIdx === -1) return undefined
return commandQueue[bestIdx]
}
/**
* Remove and return all commands matching a predicate, preserving priority order.
* Non-matching commands stay in the queue.
*/
export function dequeueAllMatching(
predicate: (cmd: QueuedCommand) => boolean,
): QueuedCommand[] {
const matched: QueuedCommand[] = []
const remaining: QueuedCommand[] = []
for (const cmd of commandQueue) {
if (predicate(cmd)) {
matched.push(cmd)
} else {
remaining.push(cmd)
}
}
if (matched.length === 0) {
return []
}
commandQueue.length = 0
commandQueue.push(...remaining)
notifySubscribers()
for (const _cmd of matched) {
logOperation('dequeue')
}
return matched
}
/**
* Remove specific commands from the queue by reference identity.
* Callers must pass the same object references that are in the queue
* (e.g. from getCommandsByMaxPriority). Logs a 'remove' operation for each.
*/
export function remove(commandsToRemove: QueuedCommand[]): void {
if (commandsToRemove.length === 0) {
return
}
const before = commandQueue.length
for (let i = commandQueue.length - 1; i >= 0; i--) {
if (commandsToRemove.includes(commandQueue[i]!)) {
commandQueue.splice(i, 1)
}
}
if (commandQueue.length !== before) {
notifySubscribers()
}
for (const _cmd of commandsToRemove) {
logOperation('remove')
}
}
/**
* Remove commands matching a predicate.
* Returns the removed commands.
*/
export function removeByFilter(
predicate: (cmd: QueuedCommand) => boolean,
): QueuedCommand[] {
const removed: QueuedCommand[] = []
for (let i = commandQueue.length - 1; i >= 0; i--) {
if (predicate(commandQueue[i]!)) {
removed.unshift(commandQueue.splice(i, 1)[0]!)
}
}
if (removed.length > 0) {
notifySubscribers()
for (const _cmd of removed) {
logOperation('remove')
}
}
return removed
}
/**
* Clear all commands from the queue.
* Used by ESC cancellation to discard queued notifications.
*/
export function clearCommandQueue(): void {
if (commandQueue.length === 0) {
return
}
commandQueue.length = 0
notifySubscribers()
}
/**
* Clear all commands and reset snapshot.
* Used for test cleanup.
*/
export function resetCommandQueue(): void {
commandQueue.length = 0
snapshot = Object.freeze([])
}
// ============================================================================
// Editable mode helpers
// ============================================================================
const NON_EDITABLE_MODES = new Set<PromptInputMode>([
'task-notification',
] satisfies Permutations<Exclude<PromptInputMode, EditablePromptInputMode>>)
export function isPromptInputModeEditable(
mode: PromptInputMode,
): mode is EditablePromptInputMode {
return !NON_EDITABLE_MODES.has(mode)
}
/**
* Whether this queued command can be pulled into the input buffer via UP/ESC.
* System-generated commands (proactive ticks, scheduled tasks, plan
* verification, channel messages) contain raw XML and must not leak into
* the user's input.
*/
export function isQueuedCommandEditable(cmd: QueuedCommand): boolean {
return isPromptInputModeEditable(cmd.mode) && !cmd.isMeta
}
/**
* Whether this queued command should render in the queue preview under the
* prompt. Superset of editable — channel messages show (so the keyboard user
* sees what arrived) but stay non-editable (raw XML).
*/
export function isQueuedCommandVisible(cmd: QueuedCommand): boolean {
if (
(feature('KAIROS') || feature('KAIROS_CHANNELS')) &&
cmd.origin?.kind === 'channel'
)
return true
return isQueuedCommandEditable(cmd)
}
/**
* Extract text from a queued command value.
* For strings, returns the string.
* For ContentBlockParam[], extracts text from text blocks.
*/
function extractTextFromValue(value: string | ContentBlockParam[]): string {
return typeof value === 'string' ? value : extractTextContent(value, '\n')
}
/**
* Extract images from ContentBlockParam[] and convert to PastedContent format.
* Returns empty array for string values or if no images found.
*/
function extractImagesFromValue(
value: string | ContentBlockParam[],
startId: number,
): PastedContent[] {
if (typeof value === 'string') {
return []
}
const images: PastedContent[] = []
let imageIndex = 0
for (const block of value) {
if (block.type === 'image' && block.source.type === 'base64') {
images.push({
id: startId + imageIndex,
type: 'image',
content: block.source.data,
mediaType: block.source.media_type,
filename: `image${imageIndex + 1}`,
})
imageIndex++
}
}
return images
}
export type PopAllEditableResult = {
text: string
cursorOffset: number
images: PastedContent[]
}
/**
* Pop all editable commands and combine them with current input for editing.
* Notification modes (task-notification) are left in the queue
* to be auto-processed later.
* Returns object with combined text, cursor offset, and images to restore.
* Returns undefined if no editable commands in queue.
*/
export function popAllEditable(
currentInput: string,
currentCursorOffset: number,
): PopAllEditableResult | undefined {
if (commandQueue.length === 0) {
return undefined
}
const { editable = [], nonEditable = [] } = objectGroupBy(
[...commandQueue],
cmd => (isQueuedCommandEditable(cmd) ? 'editable' : 'nonEditable'),
)
if (editable.length === 0) {
return undefined
}
// Extract text from queued commands (handles both strings and ContentBlockParam[])
const queuedTexts = editable.map(cmd => extractTextFromValue(cmd.value))
const newInput = [...queuedTexts, currentInput].filter(Boolean).join('\n')
// Calculate cursor offset: length of joined queued commands + 1 + current cursor offset
const cursorOffset = queuedTexts.join('\n').length + 1 + currentCursorOffset
// Extract images from queued commands
const images: PastedContent[] = []
let nextImageId = Date.now() // Use timestamp as base for unique IDs
for (const cmd of editable) {
// handlePromptSubmit queues images in pastedContents (value is a string).
// Preserve the original PastedContent id so imageStore lookups still work.
if (cmd.pastedContents) {
for (const content of Object.values(cmd.pastedContents)) {
if (content.type === 'image') {
images.push(content)
}
}
}
// Bridge/remote commands may embed images directly in ContentBlockParam[].
const cmdImages = extractImagesFromValue(cmd.value, nextImageId)
images.push(...cmdImages)
nextImageId += cmdImages.length
}
for (const command of editable) {
logOperation(
'popAll',
typeof command.value === 'string' ? command.value : undefined,
)
}
// Replace queue contents with only the non-editable commands
commandQueue.length = 0
commandQueue.push(...nonEditable)
notifySubscribers()
return { text: newInput, cursorOffset, images }
}
// ============================================================================
// Backward-compatible aliases (deprecated — prefer new names)
// ============================================================================
/** @deprecated Use subscribeToCommandQueue */
export const subscribeToPendingNotifications = subscribeToCommandQueue
/** @deprecated Use getCommandQueueSnapshot */
export function getPendingNotificationsSnapshot(): readonly QueuedCommand[] {
return snapshot
}
/** @deprecated Use hasCommandsInQueue */
export const hasPendingNotifications = hasCommandsInQueue
/** @deprecated Use getCommandQueueLength */
export const getPendingNotificationsCount = getCommandQueueLength
/** @deprecated Use recheckCommandQueue */
export const recheckPendingNotifications = recheckCommandQueue
/** @deprecated Use dequeue */
export function dequeuePendingNotification(): QueuedCommand | undefined {
return dequeue()
}
/** @deprecated Use resetCommandQueue */
export const resetPendingNotifications = resetCommandQueue
/** @deprecated Use clearCommandQueue */
export const clearPendingNotifications = clearCommandQueue
/**
* Get commands at or above a given priority level without removing them.
* Useful for mid-chain draining where only urgent items should be processed.
*
* Priority order: 'now' (0) > 'next' (1) > 'later' (2).
* Passing 'now' returns only now-priority commands; 'later' returns everything.
*/
export function getCommandsByMaxPriority(
maxPriority: QueuePriority,
): QueuedCommand[] {
const threshold = PRIORITY_ORDER[maxPriority]
return commandQueue.filter(
cmd => PRIORITY_ORDER[cmd.priority ?? 'next'] <= threshold,
)
}
/**
* Returns true if the command is a slash command that should be routed through
* processSlashCommand rather than sent to the model as text.
*
* Commands with `skipSlashCommands` (e.g. bridge/CCR messages) are NOT treated
* as slash commands — their text is meant for the model.
*/
export function isSlashCommand(cmd: QueuedCommand): boolean {
return (
typeof cmd.value === 'string' &&
cmd.value.trim().startsWith('/') &&
!cmd.skipSlashCommands
)
}