extractMemories.ts
services/extractMemories/extractMemories.ts
616
Lines
21684
Bytes
4
Exports
26
Imports
10
Keywords
What this is
This page documents one file from the repository and includes its full source so you can read it without leaving the docs site.
Beginner explanation
This file is one piece of the larger system. Its name, directory, imports, and exports show where it fits. Start by reading the exports and related files first.
How it is used
Start from the exports list and related files. Those are the easiest clues for where this file fits into the system.
Expert explanation
Architecturally, this file intersects with memory-layers, integrations. It contains 616 lines, 26 detected imports, and 4 detected exports.
Important relationships
Detected exports
createAutoMemCanUseToolinitExtractMemoriesexecuteExtractMemoriesdrainPendingExtraction
Keywords
toolmessagecontextmessagesextractioninputagentappendsystemmessageresultblock
Detected imports
bun:bundlepath../../bootstrap/state.js../../hooks/useCanUseTool.js../../memdir/memdir.js../../memdir/memoryScan.js../../memdir/paths.js../../Tool.js../../tools/BashTool/toolName.js../../tools/FileEditTool/constants.js../../tools/FileReadTool/prompt.js../../tools/FileWriteTool/prompt.js../../tools/GlobTool/prompt.js../../tools/GrepTool/prompt.js../../tools/REPLTool/constants.js../../types/message.js../../utils/abortController.js../../utils/array.js../../utils/debug.js../../utils/forkedAgent.js../../utils/hooks/postSamplingHooks.js../../utils/messages.js../analytics/growthbook.js../analytics/index.js../analytics/metadata.js./prompts.js
Source notes
This page embeds the full file contents. Small or leaf files are still indexed honestly instead of being over-explained.
Full source
/**
* Extracts durable memories from the current session transcript
* and writes them to the auto-memory directory (~/.claude/projects/<path>/memory/).
*
* It runs once at the end of each complete query loop (when the model produces
* a final response with no tool calls) via handleStopHooks in stopHooks.ts.
*
* Uses the forked agent pattern (runForkedAgent) — a perfect fork of the main
* conversation that shares the parent's prompt cache.
*
* State is closure-scoped inside initExtractMemories() rather than module-level,
* following the same pattern as confidenceRating.ts. Tests call
* initExtractMemories() in beforeEach to get a fresh closure.
*/
import { feature } from 'bun:bundle'
import { basename } from 'path'
import { getIsRemoteMode } from '../../bootstrap/state.js'
import type { CanUseToolFn } from '../../hooks/useCanUseTool.js'
import { ENTRYPOINT_NAME } from '../../memdir/memdir.js'
import {
formatMemoryManifest,
scanMemoryFiles,
} from '../../memdir/memoryScan.js'
import {
getAutoMemPath,
isAutoMemoryEnabled,
isAutoMemPath,
} from '../../memdir/paths.js'
import type { Tool } from '../../Tool.js'
import { BASH_TOOL_NAME } from '../../tools/BashTool/toolName.js'
import { FILE_EDIT_TOOL_NAME } from '../../tools/FileEditTool/constants.js'
import { FILE_READ_TOOL_NAME } from '../../tools/FileReadTool/prompt.js'
import { FILE_WRITE_TOOL_NAME } from '../../tools/FileWriteTool/prompt.js'
import { GLOB_TOOL_NAME } from '../../tools/GlobTool/prompt.js'
import { GREP_TOOL_NAME } from '../../tools/GrepTool/prompt.js'
import { REPL_TOOL_NAME } from '../../tools/REPLTool/constants.js'
import type {
AssistantMessage,
Message,
SystemLocalCommandMessage,
SystemMessage,
} from '../../types/message.js'
import { createAbortController } from '../../utils/abortController.js'
import { count, uniq } from '../../utils/array.js'
import { logForDebugging } from '../../utils/debug.js'
import {
createCacheSafeParams,
runForkedAgent,
} from '../../utils/forkedAgent.js'
import type { REPLHookContext } from '../../utils/hooks/postSamplingHooks.js'
import {
createMemorySavedMessage,
createUserMessage,
} from '../../utils/messages.js'
import { getFeatureValue_CACHED_MAY_BE_STALE } from '../analytics/growthbook.js'
import { logEvent } from '../analytics/index.js'
import { sanitizeToolNameForAnalytics } from '../analytics/metadata.js'
import {
buildExtractAutoOnlyPrompt,
buildExtractCombinedPrompt,
} from './prompts.js'
/* eslint-disable @typescript-eslint/no-require-imports */
const teamMemPaths = feature('TEAMMEM')
? (require('../../memdir/teamMemPaths.js') as typeof import('../../memdir/teamMemPaths.js'))
: null
/* eslint-enable @typescript-eslint/no-require-imports */
// ============================================================================
// Helpers
// ============================================================================
/**
* Returns true if a message is visible to the model (sent in API calls).
* Excludes progress, system, and attachment messages.
*/
function isModelVisibleMessage(message: Message): boolean {
return message.type === 'user' || message.type === 'assistant'
}
function countModelVisibleMessagesSince(
messages: Message[],
sinceUuid: string | undefined,
): number {
if (sinceUuid === null || sinceUuid === undefined) {
return count(messages, isModelVisibleMessage)
}
let foundStart = false
let n = 0
for (const message of messages) {
if (!foundStart) {
if (message.uuid === sinceUuid) {
foundStart = true
}
continue
}
if (isModelVisibleMessage(message)) {
n++
}
}
// If sinceUuid was not found (e.g., removed by context compaction),
// fall back to counting all model-visible messages rather than returning 0
// which would permanently disable extraction for the rest of the session.
if (!foundStart) {
return count(messages, isModelVisibleMessage)
}
return n
}
/**
* Returns true if any assistant message after the cursor UUID contains a
* Write/Edit tool_use block targeting an auto-memory path.
*
* The main agent's prompt has full save instructions — when it writes
* memories, the forked extraction is redundant. runExtraction skips the
* agent and advances the cursor past this range, making the main agent
* and the background agent mutually exclusive per turn.
*/
function hasMemoryWritesSince(
messages: Message[],
sinceUuid: string | undefined,
): boolean {
let foundStart = sinceUuid === undefined
for (const message of messages) {
if (!foundStart) {
if (message.uuid === sinceUuid) {
foundStart = true
}
continue
}
if (message.type !== 'assistant') {
continue
}
const content = (message as AssistantMessage).message.content
if (!Array.isArray(content)) {
continue
}
for (const block of content) {
const filePath = getWrittenFilePath(block)
if (filePath !== undefined && isAutoMemPath(filePath)) {
return true
}
}
}
return false
}
// ============================================================================
// Tool Permissions
// ============================================================================
function denyAutoMemTool(tool: Tool, reason: string) {
logForDebugging(`[autoMem] denied ${tool.name}: ${reason}`)
logEvent('tengu_auto_mem_tool_denied', {
tool_name: sanitizeToolNameForAnalytics(tool.name),
})
return {
behavior: 'deny' as const,
message: reason,
decisionReason: { type: 'other' as const, reason },
}
}
/**
* Creates a canUseTool function that allows Read/Grep/Glob (unrestricted),
* read-only Bash commands, and Edit/Write only for paths within the
* auto-memory directory. Shared by extractMemories and autoDream.
*/
export function createAutoMemCanUseTool(memoryDir: string): CanUseToolFn {
return async (tool: Tool, input: Record<string, unknown>) => {
// Allow REPL — when REPL mode is enabled (ant-default), primitive tools
// are hidden from the tool list so the forked agent calls REPL instead.
// REPL's VM context re-invokes this canUseTool for each inner primitive
// (toolWrappers.ts createToolWrapper), so the Read/Bash/Edit/Write checks
// below still gate the actual file and shell operations. Giving the fork a
// different tool list would break prompt cache sharing (tools are part of
// the cache key — see CacheSafeParams in forkedAgent.ts).
if (tool.name === REPL_TOOL_NAME) {
return { behavior: 'allow' as const, updatedInput: input }
}
// Allow Read/Grep/Glob unrestricted — all inherently read-only
if (
tool.name === FILE_READ_TOOL_NAME ||
tool.name === GREP_TOOL_NAME ||
tool.name === GLOB_TOOL_NAME
) {
return { behavior: 'allow' as const, updatedInput: input }
}
// Allow Bash only for commands that pass BashTool.isReadOnly.
// `tool` IS BashTool here — no static import needed.
if (tool.name === BASH_TOOL_NAME) {
const parsed = tool.inputSchema.safeParse(input)
if (parsed.success && tool.isReadOnly(parsed.data)) {
return { behavior: 'allow' as const, updatedInput: input }
}
return denyAutoMemTool(
tool,
'Only read-only shell commands are permitted in this context (ls, find, grep, cat, stat, wc, head, tail, and similar)',
)
}
if (
(tool.name === FILE_EDIT_TOOL_NAME ||
tool.name === FILE_WRITE_TOOL_NAME) &&
'file_path' in input
) {
const filePath = input.file_path
if (typeof filePath === 'string' && isAutoMemPath(filePath)) {
return { behavior: 'allow' as const, updatedInput: input }
}
}
return denyAutoMemTool(
tool,
`only ${FILE_READ_TOOL_NAME}, ${GREP_TOOL_NAME}, ${GLOB_TOOL_NAME}, read-only ${BASH_TOOL_NAME}, and ${FILE_EDIT_TOOL_NAME}/${FILE_WRITE_TOOL_NAME} within ${memoryDir} are allowed`,
)
}
}
// ============================================================================
// Extract file paths from agent output
// ============================================================================
/**
* Extract file_path from a tool_use block's input, if present.
* Returns undefined when the block is not an Edit/Write tool use or has no file_path.
*/
function getWrittenFilePath(block: {
type: string
name?: string
input?: unknown
}): string | undefined {
if (
block.type !== 'tool_use' ||
(block.name !== FILE_EDIT_TOOL_NAME && block.name !== FILE_WRITE_TOOL_NAME)
) {
return undefined
}
const input = block.input
if (typeof input === 'object' && input !== null && 'file_path' in input) {
const fp = (input as { file_path: unknown }).file_path
return typeof fp === 'string' ? fp : undefined
}
return undefined
}
function extractWrittenPaths(agentMessages: Message[]): string[] {
const paths: string[] = []
for (const message of agentMessages) {
if (message.type !== 'assistant') {
continue
}
const content = (message as AssistantMessage).message.content
if (!Array.isArray(content)) {
continue
}
for (const block of content) {
const filePath = getWrittenFilePath(block)
if (filePath !== undefined) {
paths.push(filePath)
}
}
}
return uniq(paths)
}
// ============================================================================
// Initialization & Closure-scoped State
// ============================================================================
type AppendSystemMessageFn = (
msg: Exclude<SystemMessage, SystemLocalCommandMessage>,
) => void
/** The active extractor function, set by initExtractMemories(). */
let extractor:
| ((
context: REPLHookContext,
appendSystemMessage?: AppendSystemMessageFn,
) => Promise<void>)
| null = null
/** The active drain function, set by initExtractMemories(). No-op until init. */
let drainer: (timeoutMs?: number) => Promise<void> = async () => {}
/**
* Initialize the memory extraction system.
* Creates a fresh closure that captures all mutable state (cursor position,
* overlap guard, pending context). Call once at startup alongside
* initConfidenceRating/initPromptCoaching, or per-test in beforeEach.
*/
export function initExtractMemories(): void {
// --- Closure-scoped mutable state ---
/** Every promise handed out by the extractor that hasn't settled yet.
* Coalesced calls that stash-and-return add fast-resolving promises
* (harmless); the call that starts real work adds a promise covering the
* full trailing-run chain via runExtraction's recursive finally. */
const inFlightExtractions = new Set<Promise<void>>()
/** UUID of the last message processed — cursor so each run only
* considers messages added since the previous extraction. */
let lastMemoryMessageUuid: string | undefined
/** One-shot flag: once we log that the gate is disabled, don't repeat. */
let hasLoggedGateFailure = false
/** True while runExtraction is executing — prevents overlapping runs. */
let inProgress = false
/** Counts eligible turns since the last extraction run. Resets to 0 after each run. */
let turnsSinceLastExtraction = 0
/** When a call arrives during an in-progress run, we stash the context here
* and run one trailing extraction after the current one finishes. */
let pendingContext:
| {
context: REPLHookContext
appendSystemMessage?: AppendSystemMessageFn
}
| undefined
// --- Inner extraction logic ---
async function runExtraction({
context,
appendSystemMessage,
isTrailingRun,
}: {
context: REPLHookContext
appendSystemMessage?: AppendSystemMessageFn
isTrailingRun?: boolean
}): Promise<void> {
const { messages } = context
const memoryDir = getAutoMemPath()
const newMessageCount = countModelVisibleMessagesSince(
messages,
lastMemoryMessageUuid,
)
// Mutual exclusion: when the main agent wrote memories, skip the
// forked agent and advance the cursor past this range so the next
// extraction only considers messages after the main agent's write.
if (hasMemoryWritesSince(messages, lastMemoryMessageUuid)) {
logForDebugging(
'[extractMemories] skipping — conversation already wrote to memory files',
)
const lastMessage = messages.at(-1)
if (lastMessage?.uuid) {
lastMemoryMessageUuid = lastMessage.uuid
}
logEvent('tengu_extract_memories_skipped_direct_write', {
message_count: newMessageCount,
})
return
}
const teamMemoryEnabled = feature('TEAMMEM')
? teamMemPaths!.isTeamMemoryEnabled()
: false
const skipIndex = getFeatureValue_CACHED_MAY_BE_STALE(
'tengu_moth_copse',
false,
)
const canUseTool = createAutoMemCanUseTool(memoryDir)
const cacheSafeParams = createCacheSafeParams(context)
// Only run extraction every N eligible turns (tengu_bramble_lintel, default 1).
// Trailing extractions (from stashed contexts) skip this check since they
// process already-committed work that should not be throttled.
if (!isTrailingRun) {
turnsSinceLastExtraction++
if (
turnsSinceLastExtraction <
(getFeatureValue_CACHED_MAY_BE_STALE('tengu_bramble_lintel', null) ?? 1)
) {
return
}
}
turnsSinceLastExtraction = 0
inProgress = true
const startTime = Date.now()
try {
logForDebugging(
`[extractMemories] starting — ${newMessageCount} new messages, memoryDir=${memoryDir}`,
)
// Pre-inject the memory directory manifest so the agent doesn't spend
// a turn on `ls`. Reuses findRelevantMemories' frontmatter scan.
// Placed after the throttle gate so skipped turns don't pay the scan cost.
const existingMemories = formatMemoryManifest(
await scanMemoryFiles(memoryDir, createAbortController().signal),
)
const userPrompt =
feature('TEAMMEM') && teamMemoryEnabled
? buildExtractCombinedPrompt(
newMessageCount,
existingMemories,
skipIndex,
)
: buildExtractAutoOnlyPrompt(
newMessageCount,
existingMemories,
skipIndex,
)
const result = await runForkedAgent({
promptMessages: [createUserMessage({ content: userPrompt })],
cacheSafeParams,
canUseTool,
querySource: 'extract_memories',
forkLabel: 'extract_memories',
// The extractMemories subagent does not need to record to transcript.
// Doing so can create race conditions with the main thread.
skipTranscript: true,
// Well-behaved extractions complete in 2-4 turns (read → write).
// A hard cap prevents verification rabbit-holes from burning turns.
maxTurns: 5,
})
// Advance the cursor only after a successful run. If the agent errors
// out (caught below), the cursor stays put so those messages are
// reconsidered on the next extraction.
const lastMessage = messages.at(-1)
if (lastMessage?.uuid) {
lastMemoryMessageUuid = lastMessage.uuid
}
const writtenPaths = extractWrittenPaths(result.messages)
const turnCount = count(result.messages, m => m.type === 'assistant')
const totalInput =
result.totalUsage.input_tokens +
result.totalUsage.cache_creation_input_tokens +
result.totalUsage.cache_read_input_tokens
const hitPct =
totalInput > 0
? (
(result.totalUsage.cache_read_input_tokens / totalInput) *
100
).toFixed(1)
: '0.0'
logForDebugging(
`[extractMemories] finished — ${writtenPaths.length} files written, cache: read=${result.totalUsage.cache_read_input_tokens} create=${result.totalUsage.cache_creation_input_tokens} input=${result.totalUsage.input_tokens} (${hitPct}% hit)`,
)
if (writtenPaths.length > 0) {
logForDebugging(
`[extractMemories] memories saved: ${writtenPaths.join(', ')}`,
)
} else {
logForDebugging('[extractMemories] no memories saved this run')
}
// Index file updates are mechanical — the agent touches MEMORY.md to add
// a topic link, but the user-visible "memory" is the topic file itself.
const memoryPaths = writtenPaths.filter(
p => basename(p) !== ENTRYPOINT_NAME,
)
const teamCount = feature('TEAMMEM')
? count(memoryPaths, teamMemPaths!.isTeamMemPath)
: 0
// Log extraction event with usage from the forked agent
logEvent('tengu_extract_memories_extraction', {
input_tokens: result.totalUsage.input_tokens,
output_tokens: result.totalUsage.output_tokens,
cache_read_input_tokens: result.totalUsage.cache_read_input_tokens,
cache_creation_input_tokens:
result.totalUsage.cache_creation_input_tokens,
message_count: newMessageCount,
turn_count: turnCount,
files_written: writtenPaths.length,
memories_saved: memoryPaths.length,
team_memories_saved: teamCount,
duration_ms: Date.now() - startTime,
})
logForDebugging(
`[extractMemories] writtenPaths=${writtenPaths.length} memoryPaths=${memoryPaths.length} appendSystemMessage defined=${appendSystemMessage != null}`,
)
if (memoryPaths.length > 0) {
const msg = createMemorySavedMessage(memoryPaths)
if (feature('TEAMMEM')) {
msg.teamCount = teamCount
}
appendSystemMessage?.(msg)
}
} catch (error) {
// Extraction is best-effort — log but don't notify on error
logForDebugging(`[extractMemories] error: ${error}`)
logEvent('tengu_extract_memories_error', {
duration_ms: Date.now() - startTime,
})
} finally {
inProgress = false
// If a call arrived while we were running, run a trailing extraction
// with the latest stashed context. The trailing run will compute its
// newMessageCount relative to the cursor we just advanced — so it only
// picks up messages added between the two calls, not the full history.
const trailing = pendingContext
pendingContext = undefined
if (trailing) {
logForDebugging(
'[extractMemories] running trailing extraction for stashed context',
)
await runExtraction({
context: trailing.context,
appendSystemMessage: trailing.appendSystemMessage,
isTrailingRun: true,
})
}
}
}
// --- Public entry point (captured by extractor) ---
async function executeExtractMemoriesImpl(
context: REPLHookContext,
appendSystemMessage?: AppendSystemMessageFn,
): Promise<void> {
// Only run for the main agent, not subagents
if (context.toolUseContext.agentId) {
return
}
if (!getFeatureValue_CACHED_MAY_BE_STALE('tengu_passport_quail', false)) {
if (process.env.USER_TYPE === 'ant' && !hasLoggedGateFailure) {
hasLoggedGateFailure = true
logEvent('tengu_extract_memories_gate_disabled', {})
}
return
}
// Check auto-memory is enabled
if (!isAutoMemoryEnabled()) {
return
}
// Skip in remote mode
if (getIsRemoteMode()) {
return
}
// If an extraction is already in progress, stash this context for a
// trailing run (overwrites any previously stashed context — only the
// latest matters since it has the most messages).
if (inProgress) {
logForDebugging(
'[extractMemories] extraction in progress — stashing for trailing run',
)
logEvent('tengu_extract_memories_coalesced', {})
pendingContext = { context, appendSystemMessage }
return
}
await runExtraction({ context, appendSystemMessage })
}
extractor = async (context, appendSystemMessage) => {
const p = executeExtractMemoriesImpl(context, appendSystemMessage)
inFlightExtractions.add(p)
try {
await p
} finally {
inFlightExtractions.delete(p)
}
}
drainer = async (timeoutMs = 60_000) => {
if (inFlightExtractions.size === 0) return
await Promise.race([
Promise.all(inFlightExtractions).catch(() => {}),
// eslint-disable-next-line no-restricted-syntax -- sleep() has no .unref(); timer must not block exit
new Promise<void>(r => setTimeout(r, timeoutMs).unref()),
])
}
}
// ============================================================================
// Public API
// ============================================================================
/**
* Run memory extraction at the end of a query loop.
* Called fire-and-forget from handleStopHooks, alongside prompt suggestion/coaching.
* No-ops until initExtractMemories() has been called.
*/
export async function executeExtractMemories(
context: REPLHookContext,
appendSystemMessage?: AppendSystemMessageFn,
): Promise<void> {
await extractor?.(context, appendSystemMessage)
}
/**
* Awaits all in-flight extractions (including trailing stashed runs) with a
* soft timeout. Called by print.ts after the response is flushed but before
* gracefulShutdownSync, so the forked agent completes before the 5s shutdown
* failsafe kills it. No-op until initExtractMemories() has been called.
*/
export async function drainPendingExtraction(
timeoutMs?: number,
): Promise<void> {
await drainer(timeoutMs)
}