QueryEngine.ts
QueryEngine.ts
No strong subsystem tag
1296
Lines
46630
Bytes
2
Exports
51
Imports
10
Keywords
What this is
This page documents one file from the repository and includes its full source so you can read it without leaving the docs site.
Beginner explanation
This file is one piece of the larger system. Its name, directory, imports, and exports show where it fits. Start by reading the exports and related files first.
How it is used
Start from the exports list and related files. Those are the easiest clues for where this file fits into the system.
Expert explanation
Architecturally, this file intersects with general runtime concerns. It contains 1296 lines, 51 detected imports, and 2 detected exports.
Important relationships
Detected exports
QueryEngineConfigQueryEngine
Keywords
messagemessagesmutablemessagesyielduuidresultuserutilsprevgetsessionid
Detected imports
bun:bundle@anthropic-ai/sdk/resources/messages.mjscryptolodash-es/last.jssrc/bootstrap/state.jssrc/entrypoints/agentSdkTypes.jssrc/services/api/claude.jssrc/services/api/logging.jssrc/services/api/logging.jsstrip-ansi./commands.js./commands.js./constants/xml.js./cost-tracker.js./hooks/useCanUseTool.js./memdir/memdir.js./memdir/paths.js./query.js./services/api/errors.js./services/mcp/types.js./state/AppState.js./Tool.js./tools/AgentTool/loadAgentsDir.js./tools/SyntheticOutputTool/SyntheticOutputTool.js./types/message.js./types/textInputTypes.js./utils/abortController.js./utils/commitAttribution.js./utils/config.js./utils/cwd.js./utils/envUtils.js./utils/fastMode.js./utils/fileHistory.js./utils/fileStateCache.js./utils/headlessProfiler.js./utils/hooks/hookHelpers.js./utils/log.js./utils/messages.js./utils/model/model.js./utils/plugins/pluginLoader.js./utils/processUserInput/processUserInput.js./utils/queryContext.js./utils/Shell.js./utils/sessionStorage.js./utils/systemPromptType.js./utils/systemTheme.js./utils/thinking.js./utils/messages/mappers.js./utils/messages/systemInit.js./utils/permissions/filesystem.js./utils/queryHelpers.js
Source notes
This page embeds the full file contents. Small or leaf files are still indexed honestly instead of being over-explained.
Full source
import { feature } from 'bun:bundle'
import type { ContentBlockParam } from '@anthropic-ai/sdk/resources/messages.mjs'
import { randomUUID } from 'crypto'
import last from 'lodash-es/last.js'
import {
getSessionId,
isSessionPersistenceDisabled,
} from 'src/bootstrap/state.js'
import type {
PermissionMode,
SDKCompactBoundaryMessage,
SDKMessage,
SDKPermissionDenial,
SDKStatus,
SDKUserMessageReplay,
} from 'src/entrypoints/agentSdkTypes.js'
import { accumulateUsage, updateUsage } from 'src/services/api/claude.js'
import type { NonNullableUsage } from 'src/services/api/logging.js'
import { EMPTY_USAGE } from 'src/services/api/logging.js'
import stripAnsi from 'strip-ansi'
import type { Command } from './commands.js'
import { getSlashCommandToolSkills } from './commands.js'
import {
LOCAL_COMMAND_STDERR_TAG,
LOCAL_COMMAND_STDOUT_TAG,
} from './constants/xml.js'
import {
getModelUsage,
getTotalAPIDuration,
getTotalCost,
} from './cost-tracker.js'
import type { CanUseToolFn } from './hooks/useCanUseTool.js'
import { loadMemoryPrompt } from './memdir/memdir.js'
import { hasAutoMemPathOverride } from './memdir/paths.js'
import { query } from './query.js'
import { categorizeRetryableAPIError } from './services/api/errors.js'
import type { MCPServerConnection } from './services/mcp/types.js'
import type { AppState } from './state/AppState.js'
import { type Tools, type ToolUseContext, toolMatchesName } from './Tool.js'
import type { AgentDefinition } from './tools/AgentTool/loadAgentsDir.js'
import { SYNTHETIC_OUTPUT_TOOL_NAME } from './tools/SyntheticOutputTool/SyntheticOutputTool.js'
import type { Message } from './types/message.js'
import type { OrphanedPermission } from './types/textInputTypes.js'
import { createAbortController } from './utils/abortController.js'
import type { AttributionState } from './utils/commitAttribution.js'
import { getGlobalConfig } from './utils/config.js'
import { getCwd } from './utils/cwd.js'
import { isBareMode, isEnvTruthy } from './utils/envUtils.js'
import { getFastModeState } from './utils/fastMode.js'
import {
type FileHistoryState,
fileHistoryEnabled,
fileHistoryMakeSnapshot,
} from './utils/fileHistory.js'
import {
cloneFileStateCache,
type FileStateCache,
} from './utils/fileStateCache.js'
import { headlessProfilerCheckpoint } from './utils/headlessProfiler.js'
import { registerStructuredOutputEnforcement } from './utils/hooks/hookHelpers.js'
import { getInMemoryErrors } from './utils/log.js'
import { countToolCalls, SYNTHETIC_MESSAGES } from './utils/messages.js'
import {
getMainLoopModel,
parseUserSpecifiedModel,
} from './utils/model/model.js'
import { loadAllPluginsCacheOnly } from './utils/plugins/pluginLoader.js'
import {
type ProcessUserInputContext,
processUserInput,
} from './utils/processUserInput/processUserInput.js'
import { fetchSystemPromptParts } from './utils/queryContext.js'
import { setCwd } from './utils/Shell.js'
import {
flushSessionStorage,
recordTranscript,
} from './utils/sessionStorage.js'
import { asSystemPrompt } from './utils/systemPromptType.js'
import { resolveThemeSetting } from './utils/systemTheme.js'
import {
shouldEnableThinkingByDefault,
type ThinkingConfig,
} from './utils/thinking.js'
// Lazy: MessageSelector.tsx pulls React/ink; only needed for message filtering at query time
/* eslint-disable @typescript-eslint/no-require-imports */
const messageSelector =
(): typeof import('src/components/MessageSelector.js') =>
require('src/components/MessageSelector.js')
import {
localCommandOutputToSDKAssistantMessage,
toSDKCompactMetadata,
} from './utils/messages/mappers.js'
import {
buildSystemInitMessage,
sdkCompatToolName,
} from './utils/messages/systemInit.js'
import {
getScratchpadDir,
isScratchpadEnabled,
} from './utils/permissions/filesystem.js'
/* eslint-enable @typescript-eslint/no-require-imports */
import {
handleOrphanedPermission,
isResultSuccessful,
normalizeMessage,
} from './utils/queryHelpers.js'
// Dead code elimination: conditional import for coordinator mode
/* eslint-disable @typescript-eslint/no-require-imports */
const getCoordinatorUserContext: (
mcpClients: ReadonlyArray<{ name: string }>,
scratchpadDir?: string,
) => { [k: string]: string } = feature('COORDINATOR_MODE')
? require('./coordinator/coordinatorMode.js').getCoordinatorUserContext
: () => ({})
/* eslint-enable @typescript-eslint/no-require-imports */
// Dead code elimination: conditional import for snip compaction
/* eslint-disable @typescript-eslint/no-require-imports */
const snipModule = feature('HISTORY_SNIP')
? (require('./services/compact/snipCompact.js') as typeof import('./services/compact/snipCompact.js'))
: null
const snipProjection = feature('HISTORY_SNIP')
? (require('./services/compact/snipProjection.js') as typeof import('./services/compact/snipProjection.js'))
: null
/* eslint-enable @typescript-eslint/no-require-imports */
export type QueryEngineConfig = {
cwd: string
tools: Tools
commands: Command[]
mcpClients: MCPServerConnection[]
agents: AgentDefinition[]
canUseTool: CanUseToolFn
getAppState: () => AppState
setAppState: (f: (prev: AppState) => AppState) => void
initialMessages?: Message[]
readFileCache: FileStateCache
customSystemPrompt?: string
appendSystemPrompt?: string
userSpecifiedModel?: string
fallbackModel?: string
thinkingConfig?: ThinkingConfig
maxTurns?: number
maxBudgetUsd?: number
taskBudget?: { total: number }
jsonSchema?: Record<string, unknown>
verbose?: boolean
replayUserMessages?: boolean
/** Handler for URL elicitations triggered by MCP tool -32042 errors. */
handleElicitation?: ToolUseContext['handleElicitation']
includePartialMessages?: boolean
setSDKStatus?: (status: SDKStatus) => void
abortController?: AbortController
orphanedPermission?: OrphanedPermission
/**
* Snip-boundary handler: receives each yielded system message plus the
* current mutableMessages store. Returns undefined if the message is not a
* snip boundary; otherwise returns the replayed snip result. Injected by
* ask() when HISTORY_SNIP is enabled so feature-gated strings stay inside
* the gated module (keeps QueryEngine free of excluded strings and testable
* despite feature() returning false under bun test). SDK-only: the REPL
* keeps full history for UI scrollback and projects on demand via
* projectSnippedView; QueryEngine truncates here to bound memory in long
* headless sessions (no UI to preserve).
*/
snipReplay?: (
yieldedSystemMsg: Message,
store: Message[],
) => { messages: Message[]; executed: boolean } | undefined
}
/**
* QueryEngine owns the query lifecycle and session state for a conversation.
* It extracts the core logic from ask() into a standalone class that can be
* used by both the headless/SDK path and (in a future phase) the REPL.
*
* One QueryEngine per conversation. Each submitMessage() call starts a new
* turn within the same conversation. State (messages, file cache, usage, etc.)
* persists across turns.
*/
export class QueryEngine {
private config: QueryEngineConfig
private mutableMessages: Message[]
private abortController: AbortController
private permissionDenials: SDKPermissionDenial[]
private totalUsage: NonNullableUsage
private hasHandledOrphanedPermission = false
private readFileState: FileStateCache
// Turn-scoped skill discovery tracking (feeds was_discovered on
// tengu_skill_tool_invocation). Must persist across the two
// processUserInputContext rebuilds inside submitMessage, but is cleared
// at the start of each submitMessage to avoid unbounded growth across
// many turns in SDK mode.
private discoveredSkillNames = new Set<string>()
private loadedNestedMemoryPaths = new Set<string>()
constructor(config: QueryEngineConfig) {
this.config = config
this.mutableMessages = config.initialMessages ?? []
this.abortController = config.abortController ?? createAbortController()
this.permissionDenials = []
this.readFileState = config.readFileCache
this.totalUsage = EMPTY_USAGE
}
async *submitMessage(
prompt: string | ContentBlockParam[],
options?: { uuid?: string; isMeta?: boolean },
): AsyncGenerator<SDKMessage, void, unknown> {
const {
cwd,
commands,
tools,
mcpClients,
verbose = false,
thinkingConfig,
maxTurns,
maxBudgetUsd,
taskBudget,
canUseTool,
customSystemPrompt,
appendSystemPrompt,
userSpecifiedModel,
fallbackModel,
jsonSchema,
getAppState,
setAppState,
replayUserMessages = false,
includePartialMessages = false,
agents = [],
setSDKStatus,
orphanedPermission,
} = this.config
this.discoveredSkillNames.clear()
setCwd(cwd)
const persistSession = !isSessionPersistenceDisabled()
const startTime = Date.now()
// Wrap canUseTool to track permission denials
const wrappedCanUseTool: CanUseToolFn = async (
tool,
input,
toolUseContext,
assistantMessage,
toolUseID,
forceDecision,
) => {
const result = await canUseTool(
tool,
input,
toolUseContext,
assistantMessage,
toolUseID,
forceDecision,
)
// Track denials for SDK reporting
if (result.behavior !== 'allow') {
this.permissionDenials.push({
tool_name: sdkCompatToolName(tool.name),
tool_use_id: toolUseID,
tool_input: input,
})
}
return result
}
const initialAppState = getAppState()
const initialMainLoopModel = userSpecifiedModel
? parseUserSpecifiedModel(userSpecifiedModel)
: getMainLoopModel()
const initialThinkingConfig: ThinkingConfig = thinkingConfig
? thinkingConfig
: shouldEnableThinkingByDefault() !== false
? { type: 'adaptive' }
: { type: 'disabled' }
headlessProfilerCheckpoint('before_getSystemPrompt')
// Narrow once so TS tracks the type through the conditionals below.
const customPrompt =
typeof customSystemPrompt === 'string' ? customSystemPrompt : undefined
const {
defaultSystemPrompt,
userContext: baseUserContext,
systemContext,
} = await fetchSystemPromptParts({
tools,
mainLoopModel: initialMainLoopModel,
additionalWorkingDirectories: Array.from(
initialAppState.toolPermissionContext.additionalWorkingDirectories.keys(),
),
mcpClients,
customSystemPrompt: customPrompt,
})
headlessProfilerCheckpoint('after_getSystemPrompt')
const userContext = {
...baseUserContext,
...getCoordinatorUserContext(
mcpClients,
isScratchpadEnabled() ? getScratchpadDir() : undefined,
),
}
// When an SDK caller provides a custom system prompt AND has set
// CLAUDE_COWORK_MEMORY_PATH_OVERRIDE, inject the memory-mechanics prompt.
// The env var is an explicit opt-in signal — the caller has wired up
// a memory directory and needs Claude to know how to use it (which
// Write/Edit tools to call, MEMORY.md filename, loading semantics).
// The caller can layer their own policy text via appendSystemPrompt.
const memoryMechanicsPrompt =
customPrompt !== undefined && hasAutoMemPathOverride()
? await loadMemoryPrompt()
: null
const systemPrompt = asSystemPrompt([
...(customPrompt !== undefined ? [customPrompt] : defaultSystemPrompt),
...(memoryMechanicsPrompt ? [memoryMechanicsPrompt] : []),
...(appendSystemPrompt ? [appendSystemPrompt] : []),
])
// Register function hook for structured output enforcement
const hasStructuredOutputTool = tools.some(t =>
toolMatchesName(t, SYNTHETIC_OUTPUT_TOOL_NAME),
)
if (jsonSchema && hasStructuredOutputTool) {
registerStructuredOutputEnforcement(setAppState, getSessionId())
}
let processUserInputContext: ProcessUserInputContext = {
messages: this.mutableMessages,
// Slash commands that mutate the message array (e.g. /force-snip)
// call setMessages(fn). In interactive mode this writes back to
// AppState; in print mode we write back to mutableMessages so the
// rest of the query loop (push at :389, snapshot at :392) sees
// the result. The second processUserInputContext below (after
// slash-command processing) keeps the no-op — nothing else calls
// setMessages past that point.
setMessages: fn => {
this.mutableMessages = fn(this.mutableMessages)
},
onChangeAPIKey: () => {},
handleElicitation: this.config.handleElicitation,
options: {
commands,
debug: false, // we use stdout, so don't want to clobber it
tools,
verbose,
mainLoopModel: initialMainLoopModel,
thinkingConfig: initialThinkingConfig,
mcpClients,
mcpResources: {},
ideInstallationStatus: null,
isNonInteractiveSession: true,
customSystemPrompt,
appendSystemPrompt,
agentDefinitions: { activeAgents: agents, allAgents: [] },
theme: resolveThemeSetting(getGlobalConfig().theme),
maxBudgetUsd,
},
getAppState,
setAppState,
abortController: this.abortController,
readFileState: this.readFileState,
nestedMemoryAttachmentTriggers: new Set<string>(),
loadedNestedMemoryPaths: this.loadedNestedMemoryPaths,
dynamicSkillDirTriggers: new Set<string>(),
discoveredSkillNames: this.discoveredSkillNames,
setInProgressToolUseIDs: () => {},
setResponseLength: () => {},
updateFileHistoryState: (
updater: (prev: FileHistoryState) => FileHistoryState,
) => {
setAppState(prev => {
const updated = updater(prev.fileHistory)
if (updated === prev.fileHistory) return prev
return { ...prev, fileHistory: updated }
})
},
updateAttributionState: (
updater: (prev: AttributionState) => AttributionState,
) => {
setAppState(prev => {
const updated = updater(prev.attribution)
if (updated === prev.attribution) return prev
return { ...prev, attribution: updated }
})
},
setSDKStatus,
}
// Handle orphaned permission (only once per engine lifetime)
if (orphanedPermission && !this.hasHandledOrphanedPermission) {
this.hasHandledOrphanedPermission = true
for await (const message of handleOrphanedPermission(
orphanedPermission,
tools,
this.mutableMessages,
processUserInputContext,
)) {
yield message
}
}
const {
messages: messagesFromUserInput,
shouldQuery,
allowedTools,
model: modelFromUserInput,
resultText,
} = await processUserInput({
input: prompt,
mode: 'prompt',
setToolJSX: () => {},
context: {
...processUserInputContext,
messages: this.mutableMessages,
},
messages: this.mutableMessages,
uuid: options?.uuid,
isMeta: options?.isMeta,
querySource: 'sdk',
})
// Push new messages, including user input and any attachments
this.mutableMessages.push(...messagesFromUserInput)
// Update params to reflect updates from processing /slash commands
const messages = [...this.mutableMessages]
// Persist the user's message(s) to transcript BEFORE entering the query
// loop. The for-await below only calls recordTranscript when ask() yields
// an assistant/user/compact_boundary message — which doesn't happen until
// the API responds. If the process is killed before that (e.g. user clicks
// Stop in cowork seconds after send), the transcript is left with only
// queue-operation entries; getLastSessionLog filters those out, returns
// null, and --resume fails with "No conversation found". Writing now makes
// the transcript resumable from the point the user message was accepted,
// even if no API response ever arrives.
//
// --bare / SIMPLE: fire-and-forget. Scripted calls don't --resume after
// kill-mid-request. The await is ~4ms on SSD, ~30ms under disk contention
// — the single largest controllable critical-path cost after module eval.
// Transcript is still written (for post-hoc debugging); just not blocking.
if (persistSession && messagesFromUserInput.length > 0) {
const transcriptPromise = recordTranscript(messages)
if (isBareMode()) {
void transcriptPromise
} else {
await transcriptPromise
if (
isEnvTruthy(process.env.CLAUDE_CODE_EAGER_FLUSH) ||
isEnvTruthy(process.env.CLAUDE_CODE_IS_COWORK)
) {
await flushSessionStorage()
}
}
}
// Filter messages that should be acknowledged after transcript
const replayableMessages = messagesFromUserInput.filter(
msg =>
(msg.type === 'user' &&
!msg.isMeta && // Skip synthetic caveat messages
!msg.toolUseResult && // Skip tool results (they'll be acked from query)
messageSelector().selectableUserMessagesFilter(msg)) || // Skip non-user-authored messages (task notifications, etc.)
(msg.type === 'system' && msg.subtype === 'compact_boundary'), // Always ack compact boundaries
)
const messagesToAck = replayUserMessages ? replayableMessages : []
// Update the ToolPermissionContext based on user input processing (as necessary)
setAppState(prev => ({
...prev,
toolPermissionContext: {
...prev.toolPermissionContext,
alwaysAllowRules: {
...prev.toolPermissionContext.alwaysAllowRules,
command: allowedTools,
},
},
}))
const mainLoopModel = modelFromUserInput ?? initialMainLoopModel
// Recreate after processing the prompt to pick up updated messages and
// model (from slash commands).
processUserInputContext = {
messages,
setMessages: () => {},
onChangeAPIKey: () => {},
handleElicitation: this.config.handleElicitation,
options: {
commands,
debug: false,
tools,
verbose,
mainLoopModel,
thinkingConfig: initialThinkingConfig,
mcpClients,
mcpResources: {},
ideInstallationStatus: null,
isNonInteractiveSession: true,
customSystemPrompt,
appendSystemPrompt,
theme: resolveThemeSetting(getGlobalConfig().theme),
agentDefinitions: { activeAgents: agents, allAgents: [] },
maxBudgetUsd,
},
getAppState,
setAppState,
abortController: this.abortController,
readFileState: this.readFileState,
nestedMemoryAttachmentTriggers: new Set<string>(),
loadedNestedMemoryPaths: this.loadedNestedMemoryPaths,
dynamicSkillDirTriggers: new Set<string>(),
discoveredSkillNames: this.discoveredSkillNames,
setInProgressToolUseIDs: () => {},
setResponseLength: () => {},
updateFileHistoryState: processUserInputContext.updateFileHistoryState,
updateAttributionState: processUserInputContext.updateAttributionState,
setSDKStatus,
}
headlessProfilerCheckpoint('before_skills_plugins')
// Cache-only: headless/SDK/CCR startup must not block on network for
// ref-tracked plugins. CCR populates the cache via CLAUDE_CODE_SYNC_PLUGIN_INSTALL
// (headlessPluginInstall) or CLAUDE_CODE_PLUGIN_SEED_DIR before this runs;
// SDK callers that need fresh source can call /reload-plugins.
const [skills, { enabled: enabledPlugins }] = await Promise.all([
getSlashCommandToolSkills(getCwd()),
loadAllPluginsCacheOnly(),
])
headlessProfilerCheckpoint('after_skills_plugins')
yield buildSystemInitMessage({
tools,
mcpClients,
model: mainLoopModel,
permissionMode: initialAppState.toolPermissionContext
.mode as PermissionMode, // TODO: avoid the cast
commands,
agents,
skills,
plugins: enabledPlugins,
fastMode: initialAppState.fastMode,
})
// Record when system message is yielded for headless latency tracking
headlessProfilerCheckpoint('system_message_yielded')
if (!shouldQuery) {
// Return the results of local slash commands.
// Use messagesFromUserInput (not replayableMessages) for command output
// because selectableUserMessagesFilter excludes local-command-stdout tags.
for (const msg of messagesFromUserInput) {
if (
msg.type === 'user' &&
typeof msg.message.content === 'string' &&
(msg.message.content.includes(`<${LOCAL_COMMAND_STDOUT_TAG}>`) ||
msg.message.content.includes(`<${LOCAL_COMMAND_STDERR_TAG}>`) ||
msg.isCompactSummary)
) {
yield {
type: 'user',
message: {
...msg.message,
content: stripAnsi(msg.message.content),
},
session_id: getSessionId(),
parent_tool_use_id: null,
uuid: msg.uuid,
timestamp: msg.timestamp,
isReplay: !msg.isCompactSummary,
isSynthetic: msg.isMeta || msg.isVisibleInTranscriptOnly,
} as SDKUserMessageReplay
}
// Local command output — yield as a synthetic assistant message so
// RC renders it as assistant-style text rather than a user bubble.
// Emitted as assistant (not the dedicated SDKLocalCommandOutputMessage
// system subtype) so mobile clients + session-ingress can parse it.
if (
msg.type === 'system' &&
msg.subtype === 'local_command' &&
typeof msg.content === 'string' &&
(msg.content.includes(`<${LOCAL_COMMAND_STDOUT_TAG}>`) ||
msg.content.includes(`<${LOCAL_COMMAND_STDERR_TAG}>`))
) {
yield localCommandOutputToSDKAssistantMessage(msg.content, msg.uuid)
}
if (msg.type === 'system' && msg.subtype === 'compact_boundary') {
yield {
type: 'system',
subtype: 'compact_boundary' as const,
session_id: getSessionId(),
uuid: msg.uuid,
compact_metadata: toSDKCompactMetadata(msg.compactMetadata),
} as SDKCompactBoundaryMessage
}
}
if (persistSession) {
await recordTranscript(messages)
if (
isEnvTruthy(process.env.CLAUDE_CODE_EAGER_FLUSH) ||
isEnvTruthy(process.env.CLAUDE_CODE_IS_COWORK)
) {
await flushSessionStorage()
}
}
yield {
type: 'result',
subtype: 'success',
is_error: false,
duration_ms: Date.now() - startTime,
duration_api_ms: getTotalAPIDuration(),
num_turns: messages.length - 1,
result: resultText ?? '',
stop_reason: null,
session_id: getSessionId(),
total_cost_usd: getTotalCost(),
usage: this.totalUsage,
modelUsage: getModelUsage(),
permission_denials: this.permissionDenials,
fast_mode_state: getFastModeState(
mainLoopModel,
initialAppState.fastMode,
),
uuid: randomUUID(),
}
return
}
if (fileHistoryEnabled() && persistSession) {
messagesFromUserInput
.filter(messageSelector().selectableUserMessagesFilter)
.forEach(message => {
void fileHistoryMakeSnapshot(
(updater: (prev: FileHistoryState) => FileHistoryState) => {
setAppState(prev => ({
...prev,
fileHistory: updater(prev.fileHistory),
}))
},
message.uuid,
)
})
}
// Track current message usage (reset on each message_start)
let currentMessageUsage: NonNullableUsage = EMPTY_USAGE
let turnCount = 1
let hasAcknowledgedInitialMessages = false
// Track structured output from StructuredOutput tool calls
let structuredOutputFromTool: unknown
// Track the last stop_reason from assistant messages
let lastStopReason: string | null = null
// Reference-based watermark so error_during_execution's errors[] is
// turn-scoped. A length-based index breaks when the 100-entry ring buffer
// shift()s during the turn — the index slides. If this entry is rotated
// out, lastIndexOf returns -1 and we include everything (safe fallback).
const errorLogWatermark = getInMemoryErrors().at(-1)
// Snapshot count before this query for delta-based retry limiting
const initialStructuredOutputCalls = jsonSchema
? countToolCalls(this.mutableMessages, SYNTHETIC_OUTPUT_TOOL_NAME)
: 0
for await (const message of query({
messages,
systemPrompt,
userContext,
systemContext,
canUseTool: wrappedCanUseTool,
toolUseContext: processUserInputContext,
fallbackModel,
querySource: 'sdk',
maxTurns,
taskBudget,
})) {
// Record assistant, user, and compact boundary messages
if (
message.type === 'assistant' ||
message.type === 'user' ||
(message.type === 'system' && message.subtype === 'compact_boundary')
) {
// Before writing a compact boundary, flush any in-memory-only
// messages up through the preservedSegment tail. Attachments and
// progress are now recorded inline (their switch cases below), but
// this flush still matters for the preservedSegment tail walk.
// If the SDK subprocess restarts before then (claude-desktop kills
// between turns), tailUuid points to a never-written message →
// applyPreservedSegmentRelinks fails its tail→head walk → returns
// without pruning → resume loads full pre-compact history.
if (
persistSession &&
message.type === 'system' &&
message.subtype === 'compact_boundary'
) {
const tailUuid = message.compactMetadata?.preservedSegment?.tailUuid
if (tailUuid) {
const tailIdx = this.mutableMessages.findLastIndex(
m => m.uuid === tailUuid,
)
if (tailIdx !== -1) {
await recordTranscript(this.mutableMessages.slice(0, tailIdx + 1))
}
}
}
messages.push(message)
if (persistSession) {
// Fire-and-forget for assistant messages. claude.ts yields one
// assistant message per content block, then mutates the last
// one's message.usage/stop_reason on message_delta — relying on
// the write queue's 100ms lazy jsonStringify. Awaiting here
// blocks ask()'s generator, so message_delta can't run until
// every block is consumed; the drain timer (started at block 1)
// elapses first. Interactive CC doesn't hit this because
// useLogMessages.ts fire-and-forgets. enqueueWrite is
// order-preserving so fire-and-forget here is safe.
if (message.type === 'assistant') {
void recordTranscript(messages)
} else {
await recordTranscript(messages)
}
}
// Acknowledge initial user messages after first transcript recording
if (!hasAcknowledgedInitialMessages && messagesToAck.length > 0) {
hasAcknowledgedInitialMessages = true
for (const msgToAck of messagesToAck) {
if (msgToAck.type === 'user') {
yield {
type: 'user',
message: msgToAck.message,
session_id: getSessionId(),
parent_tool_use_id: null,
uuid: msgToAck.uuid,
timestamp: msgToAck.timestamp,
isReplay: true,
} as SDKUserMessageReplay
}
}
}
}
if (message.type === 'user') {
turnCount++
}
switch (message.type) {
case 'tombstone':
// Tombstone messages are control signals for removing messages, skip them
break
case 'assistant':
// Capture stop_reason if already set (synthetic messages). For
// streamed responses, this is null at content_block_stop time;
// the real value arrives via message_delta (handled below).
if (message.message.stop_reason != null) {
lastStopReason = message.message.stop_reason
}
this.mutableMessages.push(message)
yield* normalizeMessage(message)
break
case 'progress':
this.mutableMessages.push(message)
// Record inline so the dedup loop in the next ask() call sees it
// as already-recorded. Without this, deferred progress interleaves
// with already-recorded tool_results in mutableMessages, and the
// dedup walk freezes startingParentUuid at the wrong message —
// forking the chain and orphaning the conversation on resume.
if (persistSession) {
messages.push(message)
void recordTranscript(messages)
}
yield* normalizeMessage(message)
break
case 'user':
this.mutableMessages.push(message)
yield* normalizeMessage(message)
break
case 'stream_event':
if (message.event.type === 'message_start') {
// Reset current message usage for new message
currentMessageUsage = EMPTY_USAGE
currentMessageUsage = updateUsage(
currentMessageUsage,
message.event.message.usage,
)
}
if (message.event.type === 'message_delta') {
currentMessageUsage = updateUsage(
currentMessageUsage,
message.event.usage,
)
// Capture stop_reason from message_delta. The assistant message
// is yielded at content_block_stop with stop_reason=null; the
// real value only arrives here (see claude.ts message_delta
// handler). Without this, result.stop_reason is always null.
if (message.event.delta.stop_reason != null) {
lastStopReason = message.event.delta.stop_reason
}
}
if (message.event.type === 'message_stop') {
// Accumulate current message usage into total
this.totalUsage = accumulateUsage(
this.totalUsage,
currentMessageUsage,
)
}
if (includePartialMessages) {
yield {
type: 'stream_event' as const,
event: message.event,
session_id: getSessionId(),
parent_tool_use_id: null,
uuid: randomUUID(),
}
}
break
case 'attachment':
this.mutableMessages.push(message)
// Record inline (same reason as progress above).
if (persistSession) {
messages.push(message)
void recordTranscript(messages)
}
// Extract structured output from StructuredOutput tool calls
if (message.attachment.type === 'structured_output') {
structuredOutputFromTool = message.attachment.data
}
// Handle max turns reached signal from query.ts
else if (message.attachment.type === 'max_turns_reached') {
if (persistSession) {
if (
isEnvTruthy(process.env.CLAUDE_CODE_EAGER_FLUSH) ||
isEnvTruthy(process.env.CLAUDE_CODE_IS_COWORK)
) {
await flushSessionStorage()
}
}
yield {
type: 'result',
subtype: 'error_max_turns',
duration_ms: Date.now() - startTime,
duration_api_ms: getTotalAPIDuration(),
is_error: true,
num_turns: message.attachment.turnCount,
stop_reason: lastStopReason,
session_id: getSessionId(),
total_cost_usd: getTotalCost(),
usage: this.totalUsage,
modelUsage: getModelUsage(),
permission_denials: this.permissionDenials,
fast_mode_state: getFastModeState(
mainLoopModel,
initialAppState.fastMode,
),
uuid: randomUUID(),
errors: [
`Reached maximum number of turns (${message.attachment.maxTurns})`,
],
}
return
}
// Yield queued_command attachments as SDK user message replays
else if (
replayUserMessages &&
message.attachment.type === 'queued_command'
) {
yield {
type: 'user',
message: {
role: 'user' as const,
content: message.attachment.prompt,
},
session_id: getSessionId(),
parent_tool_use_id: null,
uuid: message.attachment.source_uuid || message.uuid,
timestamp: message.timestamp,
isReplay: true,
} as SDKUserMessageReplay
}
break
case 'stream_request_start':
// Don't yield stream request start messages
break
case 'system': {
// Snip boundary: replay on our store to remove zombie messages and
// stale markers. The yielded boundary is a signal, not data to push —
// the replay produces its own equivalent boundary. Without this,
// markers persist and re-trigger on every turn, and mutableMessages
// never shrinks (memory leak in long SDK sessions). The subtype
// check lives inside the injected callback so feature-gated strings
// stay out of this file (excluded-strings check).
const snipResult = this.config.snipReplay?.(
message,
this.mutableMessages,
)
if (snipResult !== undefined) {
if (snipResult.executed) {
this.mutableMessages.length = 0
this.mutableMessages.push(...snipResult.messages)
}
break
}
this.mutableMessages.push(message)
// Yield compact boundary messages to SDK
if (
message.subtype === 'compact_boundary' &&
message.compactMetadata
) {
// Release pre-compaction messages for GC. The boundary was just
// pushed so it's the last element. query.ts already uses
// getMessagesAfterCompactBoundary() internally, so only
// post-boundary messages are needed going forward.
const mutableBoundaryIdx = this.mutableMessages.length - 1
if (mutableBoundaryIdx > 0) {
this.mutableMessages.splice(0, mutableBoundaryIdx)
}
const localBoundaryIdx = messages.length - 1
if (localBoundaryIdx > 0) {
messages.splice(0, localBoundaryIdx)
}
yield {
type: 'system',
subtype: 'compact_boundary' as const,
session_id: getSessionId(),
uuid: message.uuid,
compact_metadata: toSDKCompactMetadata(message.compactMetadata),
}
}
if (message.subtype === 'api_error') {
yield {
type: 'system',
subtype: 'api_retry' as const,
attempt: message.retryAttempt,
max_retries: message.maxRetries,
retry_delay_ms: message.retryInMs,
error_status: message.error.status ?? null,
error: categorizeRetryableAPIError(message.error),
session_id: getSessionId(),
uuid: message.uuid,
}
}
// Don't yield other system messages in headless mode
break
}
case 'tool_use_summary':
// Yield tool use summary messages to SDK
yield {
type: 'tool_use_summary' as const,
summary: message.summary,
preceding_tool_use_ids: message.precedingToolUseIds,
session_id: getSessionId(),
uuid: message.uuid,
}
break
}
// Check if USD budget has been exceeded
if (maxBudgetUsd !== undefined && getTotalCost() >= maxBudgetUsd) {
if (persistSession) {
if (
isEnvTruthy(process.env.CLAUDE_CODE_EAGER_FLUSH) ||
isEnvTruthy(process.env.CLAUDE_CODE_IS_COWORK)
) {
await flushSessionStorage()
}
}
yield {
type: 'result',
subtype: 'error_max_budget_usd',
duration_ms: Date.now() - startTime,
duration_api_ms: getTotalAPIDuration(),
is_error: true,
num_turns: turnCount,
stop_reason: lastStopReason,
session_id: getSessionId(),
total_cost_usd: getTotalCost(),
usage: this.totalUsage,
modelUsage: getModelUsage(),
permission_denials: this.permissionDenials,
fast_mode_state: getFastModeState(
mainLoopModel,
initialAppState.fastMode,
),
uuid: randomUUID(),
errors: [`Reached maximum budget ($${maxBudgetUsd})`],
}
return
}
// Check if structured output retry limit exceeded (only on user messages)
if (message.type === 'user' && jsonSchema) {
const currentCalls = countToolCalls(
this.mutableMessages,
SYNTHETIC_OUTPUT_TOOL_NAME,
)
const callsThisQuery = currentCalls - initialStructuredOutputCalls
const maxRetries = parseInt(
process.env.MAX_STRUCTURED_OUTPUT_RETRIES || '5',
10,
)
if (callsThisQuery >= maxRetries) {
if (persistSession) {
if (
isEnvTruthy(process.env.CLAUDE_CODE_EAGER_FLUSH) ||
isEnvTruthy(process.env.CLAUDE_CODE_IS_COWORK)
) {
await flushSessionStorage()
}
}
yield {
type: 'result',
subtype: 'error_max_structured_output_retries',
duration_ms: Date.now() - startTime,
duration_api_ms: getTotalAPIDuration(),
is_error: true,
num_turns: turnCount,
stop_reason: lastStopReason,
session_id: getSessionId(),
total_cost_usd: getTotalCost(),
usage: this.totalUsage,
modelUsage: getModelUsage(),
permission_denials: this.permissionDenials,
fast_mode_state: getFastModeState(
mainLoopModel,
initialAppState.fastMode,
),
uuid: randomUUID(),
errors: [
`Failed to provide valid structured output after ${maxRetries} attempts`,
],
}
return
}
}
}
// Stop hooks yield progress/attachment messages AFTER the assistant
// response (via yield* handleStopHooks in query.ts). Since #23537 pushes
// those to `messages` inline, last(messages) can be a progress/attachment
// instead of the assistant — which makes textResult extraction below
// return '' and -p mode emit a blank line. Allowlist to assistant|user:
// isResultSuccessful handles both (user with all tool_result blocks is a
// valid successful terminal state).
const result = messages.findLast(
m => m.type === 'assistant' || m.type === 'user',
)
// Capture for the error_during_execution diagnostic — isResultSuccessful
// is a type predicate (message is Message), so inside the false branch
// `result` narrows to never and these accesses don't typecheck.
const edeResultType = result?.type ?? 'undefined'
const edeLastContentType =
result?.type === 'assistant'
? (last(result.message.content)?.type ?? 'none')
: 'n/a'
// Flush buffered transcript writes before yielding result.
// The desktop app kills the CLI process immediately after receiving the
// result message, so any unflushed writes would be lost.
if (persistSession) {
if (
isEnvTruthy(process.env.CLAUDE_CODE_EAGER_FLUSH) ||
isEnvTruthy(process.env.CLAUDE_CODE_IS_COWORK)
) {
await flushSessionStorage()
}
}
if (!isResultSuccessful(result, lastStopReason)) {
yield {
type: 'result',
subtype: 'error_during_execution',
duration_ms: Date.now() - startTime,
duration_api_ms: getTotalAPIDuration(),
is_error: true,
num_turns: turnCount,
stop_reason: lastStopReason,
session_id: getSessionId(),
total_cost_usd: getTotalCost(),
usage: this.totalUsage,
modelUsage: getModelUsage(),
permission_denials: this.permissionDenials,
fast_mode_state: getFastModeState(
mainLoopModel,
initialAppState.fastMode,
),
uuid: randomUUID(),
// Diagnostic prefix: these are what isResultSuccessful() checks — if
// the result type isn't assistant-with-text/thinking or user-with-
// tool_result, and stop_reason isn't end_turn, that's why this fired.
// errors[] is turn-scoped via the watermark; previously it dumped the
// entire process's logError buffer (ripgrep timeouts, ENOENT, etc).
errors: (() => {
const all = getInMemoryErrors()
const start = errorLogWatermark
? all.lastIndexOf(errorLogWatermark) + 1
: 0
return [
`[ede_diagnostic] result_type=${edeResultType} last_content_type=${edeLastContentType} stop_reason=${lastStopReason}`,
...all.slice(start).map(_ => _.error),
]
})(),
}
return
}
// Extract the text result based on message type
let textResult = ''
let isApiError = false
if (result.type === 'assistant') {
const lastContent = last(result.message.content)
if (
lastContent?.type === 'text' &&
!SYNTHETIC_MESSAGES.has(lastContent.text)
) {
textResult = lastContent.text
}
isApiError = Boolean(result.isApiErrorMessage)
}
yield {
type: 'result',
subtype: 'success',
is_error: isApiError,
duration_ms: Date.now() - startTime,
duration_api_ms: getTotalAPIDuration(),
num_turns: turnCount,
result: textResult,
stop_reason: lastStopReason,
session_id: getSessionId(),
total_cost_usd: getTotalCost(),
usage: this.totalUsage,
modelUsage: getModelUsage(),
permission_denials: this.permissionDenials,
structured_output: structuredOutputFromTool,
fast_mode_state: getFastModeState(
mainLoopModel,
initialAppState.fastMode,
),
uuid: randomUUID(),
}
}
interrupt(): void {
this.abortController.abort()
}
getMessages(): readonly Message[] {
return this.mutableMessages
}
getReadFileState(): FileStateCache {
return this.readFileState
}
getSessionId(): string {
return getSessionId()
}
setModel(model: string): void {
this.config.userSpecifiedModel = model
}
}
/**
* Sends a single prompt to the Claude API and returns the response.
* Assumes that claude is being used non-interactively -- will not
* ask the user for permissions or further input.
*
* Convenience wrapper around QueryEngine for one-shot usage.
*/
export async function* ask({
commands,
prompt,
promptUuid,
isMeta,
cwd,
tools,
mcpClients,
verbose = false,
thinkingConfig,
maxTurns,
maxBudgetUsd,
taskBudget,
canUseTool,
mutableMessages = [],
getReadFileCache,
setReadFileCache,
customSystemPrompt,
appendSystemPrompt,
userSpecifiedModel,
fallbackModel,
jsonSchema,
getAppState,
setAppState,
abortController,
replayUserMessages = false,
includePartialMessages = false,
handleElicitation,
agents = [],
setSDKStatus,
orphanedPermission,
}: {
commands: Command[]
prompt: string | Array<ContentBlockParam>
promptUuid?: string
isMeta?: boolean
cwd: string
tools: Tools
verbose?: boolean
mcpClients: MCPServerConnection[]
thinkingConfig?: ThinkingConfig
maxTurns?: number
maxBudgetUsd?: number
taskBudget?: { total: number }
canUseTool: CanUseToolFn
mutableMessages?: Message[]
customSystemPrompt?: string
appendSystemPrompt?: string
userSpecifiedModel?: string
fallbackModel?: string
jsonSchema?: Record<string, unknown>
getAppState: () => AppState
setAppState: (f: (prev: AppState) => AppState) => void
getReadFileCache: () => FileStateCache
setReadFileCache: (cache: FileStateCache) => void
abortController?: AbortController
replayUserMessages?: boolean
includePartialMessages?: boolean
handleElicitation?: ToolUseContext['handleElicitation']
agents?: AgentDefinition[]
setSDKStatus?: (status: SDKStatus) => void
orphanedPermission?: OrphanedPermission
}): AsyncGenerator<SDKMessage, void, unknown> {
const engine = new QueryEngine({
cwd,
tools,
commands,
mcpClients,
agents,
canUseTool,
getAppState,
setAppState,
initialMessages: mutableMessages,
readFileCache: cloneFileStateCache(getReadFileCache()),
customSystemPrompt,
appendSystemPrompt,
userSpecifiedModel,
fallbackModel,
thinkingConfig,
maxTurns,
maxBudgetUsd,
taskBudget,
jsonSchema,
verbose,
handleElicitation,
replayUserMessages,
includePartialMessages,
setSDKStatus,
abortController,
orphanedPermission,
...(feature('HISTORY_SNIP')
? {
snipReplay: (yielded: Message, store: Message[]) => {
if (!snipProjection!.isSnipBoundaryMessage(yielded))
return undefined
return snipModule!.snipCompactIfNeeded(store, { force: true })
},
}
: {}),
})
try {
yield* engine.submitMessage(prompt, {
uuid: promptUuid,
isMeta,
})
} finally {
setReadFileCache(engine.getReadFileState())
}
}