Filehigh importancesource

firstPartyEventLoggingExporter.ts

services/analytics/firstPartyEventLoggingExporter.ts

807
Lines
26362
Bytes
1
Exports
25
Imports
10
Keywords

What this is

This page documents one file from the repository and includes its full source so you can read it without leaving the docs site.

Beginner explanation

This file is one piece of the larger system. Its name, directory, imports, and exports show where it fits. Start by reading the exports and related files first.

How it is used

Start from the exports list and related files. Those are the easiest clues for where this file fits into the system.

Expert explanation

Architecturally, this file intersects with integrations. It contains 807 lines, 25 detected imports, and 1 detected exports.

Important relationships

Detected exports

  • FirstPartyEventLoggingExporter

Keywords

eventsprivateeventlengthvoidfilepathprocesslogfordebuggingfailedlogging

Detected imports

  • @opentelemetry/api
  • @opentelemetry/core
  • @opentelemetry/sdk-logs
  • axios
  • crypto
  • fs/promises
  • path
  • src/utils/user.js
  • ../../bootstrap/state.js
  • ../../types/generated/events_mono/claude_code/v1/claude_code_internal_event.js
  • ../../types/generated/events_mono/growthbook/v1/growthbook_experiment_event.js
  • ../../utils/auth.js
  • ../../utils/config.js
  • ../../utils/debug.js
  • ../../utils/envUtils.js
  • ../../utils/errors.js
  • ../../utils/http.js
  • ../../utils/json.js
  • ../../utils/log.js
  • ../../utils/sleep.js
  • ../../utils/slowOperations.js
  • ../../utils/userAgent.js
  • ../oauth/client.js
  • ./index.js
  • ./metadata.js

Source notes

This page embeds the full file contents. Small or leaf files are still indexed honestly instead of being over-explained.

Open parent directory

Full source

import type { HrTime } from '@opentelemetry/api'
import { type ExportResult, ExportResultCode } from '@opentelemetry/core'
import type {
  LogRecordExporter,
  ReadableLogRecord,
} from '@opentelemetry/sdk-logs'
import axios from 'axios'
import { randomUUID } from 'crypto'
import { appendFile, mkdir, readdir, unlink, writeFile } from 'fs/promises'
import * as path from 'path'
import type { CoreUserData } from 'src/utils/user.js'
import {
  getIsNonInteractiveSession,
  getSessionId,
} from '../../bootstrap/state.js'
import { ClaudeCodeInternalEvent } from '../../types/generated/events_mono/claude_code/v1/claude_code_internal_event.js'
import { GrowthbookExperimentEvent } from '../../types/generated/events_mono/growthbook/v1/growthbook_experiment_event.js'
import {
  getClaudeAIOAuthTokens,
  hasProfileScope,
  isClaudeAISubscriber,
} from '../../utils/auth.js'
import { checkHasTrustDialogAccepted } from '../../utils/config.js'
import { logForDebugging } from '../../utils/debug.js'
import { getClaudeConfigHomeDir } from '../../utils/envUtils.js'
import { errorMessage, isFsInaccessible, toError } from '../../utils/errors.js'
import { getAuthHeaders } from '../../utils/http.js'
import { readJSONLFile } from '../../utils/json.js'
import { logError } from '../../utils/log.js'
import { sleep } from '../../utils/sleep.js'
import { jsonStringify } from '../../utils/slowOperations.js'
import { getClaudeCodeUserAgent } from '../../utils/userAgent.js'
import { isOAuthTokenExpired } from '../oauth/client.js'
import { stripProtoFields } from './index.js'
import { type EventMetadata, to1PEventFormat } from './metadata.js'

// Unique ID for this process run - used to isolate failed event files between runs
const BATCH_UUID = randomUUID()

// File prefix for failed event storage
const FILE_PREFIX = '1p_failed_events.'

// Storage directory for failed events - evaluated at runtime to respect CLAUDE_CONFIG_DIR in tests
function getStorageDir(): string {
  return path.join(getClaudeConfigHomeDir(), 'telemetry')
}

// API envelope - event_data is the JSON output from proto toJSON()
type FirstPartyEventLoggingEvent = {
  event_type: 'ClaudeCodeInternalEvent' | 'GrowthbookExperimentEvent'
  event_data: unknown
}

type FirstPartyEventLoggingPayload = {
  events: FirstPartyEventLoggingEvent[]
}

/**
 * Exporter for 1st-party event logging to /api/event_logging/batch.
 *
 * Export cycles are controlled by OpenTelemetry's BatchLogRecordProcessor, which
 * triggers export() when either:
 * - Time interval elapses (default: 5 seconds via scheduledDelayMillis)
 * - Batch size is reached (default: 200 events via maxExportBatchSize)
 *
 * This exporter adds resilience on top:
 * - Append-only log for failed events (concurrency-safe)
 * - Quadratic backoff retry for failed events, dropped after maxAttempts
 * - Immediate retry of queued events when any export succeeds (endpoint is healthy)
 * - Chunking large event sets into smaller batches
 * - Auth fallback: retries without auth on 401 errors
 */
export class FirstPartyEventLoggingExporter implements LogRecordExporter {
  private readonly endpoint: string
  private readonly timeout: number
  private readonly maxBatchSize: number
  private readonly skipAuth: boolean
  private readonly batchDelayMs: number
  private readonly baseBackoffDelayMs: number
  private readonly maxBackoffDelayMs: number
  private readonly maxAttempts: number
  private readonly isKilled: () => boolean
  private pendingExports: Promise<void>[] = []
  private isShutdown = false
  private readonly schedule: (
    fn: () => Promise<void>,
    delayMs: number,
  ) => () => void
  private cancelBackoff: (() => void) | null = null
  private attempts = 0
  private isRetrying = false
  private lastExportErrorContext: string | undefined

  constructor(
    options: {
      timeout?: number
      maxBatchSize?: number
      skipAuth?: boolean
      batchDelayMs?: number
      baseBackoffDelayMs?: number
      maxBackoffDelayMs?: number
      maxAttempts?: number
      path?: string
      baseUrl?: string
      // Injected killswitch probe. Checked per-POST so that disabling the
      // firstParty sink also stops backoff retries (not just new emits).
      // Passed in rather than imported to avoid a cycle with firstPartyEventLogger.ts.
      isKilled?: () => boolean
      schedule?: (fn: () => Promise<void>, delayMs: number) => () => void
    } = {},
  ) {
    // Default: prod, except when ANTHROPIC_BASE_URL is explicitly staging.
    // Overridable via tengu_1p_event_batch_config.baseUrl.
    const baseUrl =
      options.baseUrl ||
      (process.env.ANTHROPIC_BASE_URL === 'https://api-staging.anthropic.com'
        ? 'https://api-staging.anthropic.com'
        : 'https://api.anthropic.com')

    this.endpoint = `${baseUrl}${options.path || '/api/event_logging/batch'}`

    this.timeout = options.timeout || 10000
    this.maxBatchSize = options.maxBatchSize || 200
    this.skipAuth = options.skipAuth ?? false
    this.batchDelayMs = options.batchDelayMs || 100
    this.baseBackoffDelayMs = options.baseBackoffDelayMs || 500
    this.maxBackoffDelayMs = options.maxBackoffDelayMs || 30000
    this.maxAttempts = options.maxAttempts ?? 8
    this.isKilled = options.isKilled ?? (() => false)
    this.schedule =
      options.schedule ??
      ((fn, ms) => {
        const t = setTimeout(fn, ms)
        return () => clearTimeout(t)
      })

    // Retry any failed events from previous runs of this session (in background)
    void this.retryPreviousBatches()
  }

  // Expose for testing
  async getQueuedEventCount(): Promise<number> {
    return (await this.loadEventsFromCurrentBatch()).length
  }

  // --- Storage helpers ---

  private getCurrentBatchFilePath(): string {
    return path.join(
      getStorageDir(),
      `${FILE_PREFIX}${getSessionId()}.${BATCH_UUID}.json`,
    )
  }

  private async loadEventsFromFile(
    filePath: string,
  ): Promise<FirstPartyEventLoggingEvent[]> {
    try {
      return await readJSONLFile<FirstPartyEventLoggingEvent>(filePath)
    } catch {
      return []
    }
  }

  private async loadEventsFromCurrentBatch(): Promise<
    FirstPartyEventLoggingEvent[]
  > {
    return this.loadEventsFromFile(this.getCurrentBatchFilePath())
  }

  private async saveEventsToFile(
    filePath: string,
    events: FirstPartyEventLoggingEvent[],
  ): Promise<void> {
    try {
      if (events.length === 0) {
        try {
          await unlink(filePath)
        } catch {
          // File doesn't exist, nothing to delete
        }
      } else {
        // Ensure storage directory exists
        await mkdir(getStorageDir(), { recursive: true })
        // Write as JSON lines (one event per line)
        const content = events.map(e => jsonStringify(e)).join('\n') + '\n'
        await writeFile(filePath, content, 'utf8')
      }
    } catch (error) {
      logError(error)
    }
  }

  private async appendEventsToFile(
    filePath: string,
    events: FirstPartyEventLoggingEvent[],
  ): Promise<void> {
    if (events.length === 0) return
    try {
      // Ensure storage directory exists
      await mkdir(getStorageDir(), { recursive: true })
      // Append as JSON lines (one event per line) - atomic on most filesystems
      const content = events.map(e => jsonStringify(e)).join('\n') + '\n'
      await appendFile(filePath, content, 'utf8')
    } catch (error) {
      logError(error)
    }
  }

  private async deleteFile(filePath: string): Promise<void> {
    try {
      await unlink(filePath)
    } catch {
      // File doesn't exist or can't be deleted, ignore
    }
  }

  // --- Previous batch retry (startup) ---

  private async retryPreviousBatches(): Promise<void> {
    try {
      const prefix = `${FILE_PREFIX}${getSessionId()}.`
      let files: string[]
      try {
        files = (await readdir(getStorageDir()))
          .filter((f: string) => f.startsWith(prefix) && f.endsWith('.json'))
          .filter((f: string) => !f.includes(BATCH_UUID)) // Exclude current batch
      } catch (e) {
        if (isFsInaccessible(e)) return
        throw e
      }

      for (const file of files) {
        const filePath = path.join(getStorageDir(), file)
        void this.retryFileInBackground(filePath)
      }
    } catch (error) {
      logError(error)
    }
  }

  private async retryFileInBackground(filePath: string): Promise<void> {
    if (this.attempts >= this.maxAttempts) {
      await this.deleteFile(filePath)
      return
    }

    const events = await this.loadEventsFromFile(filePath)
    if (events.length === 0) {
      await this.deleteFile(filePath)
      return
    }

    if (process.env.USER_TYPE === 'ant') {
      logForDebugging(
        `1P event logging: retrying ${events.length} events from previous batch`,
      )
    }

    const failedEvents = await this.sendEventsInBatches(events)
    if (failedEvents.length === 0) {
      await this.deleteFile(filePath)
      if (process.env.USER_TYPE === 'ant') {
        logForDebugging('1P event logging: previous batch retry succeeded')
      }
    } else {
      // Save only the failed events back (not all original events)
      await this.saveEventsToFile(filePath, failedEvents)
      if (process.env.USER_TYPE === 'ant') {
        logForDebugging(
          `1P event logging: previous batch retry failed, ${failedEvents.length} events remain`,
        )
      }
    }
  }

  async export(
    logs: ReadableLogRecord[],
    resultCallback: (result: ExportResult) => void,
  ): Promise<void> {
    if (this.isShutdown) {
      if (process.env.USER_TYPE === 'ant') {
        logForDebugging(
          '1P event logging export failed: Exporter has been shutdown',
        )
      }
      resultCallback({
        code: ExportResultCode.FAILED,
        error: new Error('Exporter has been shutdown'),
      })
      return
    }

    const exportPromise = this.doExport(logs, resultCallback)
    this.pendingExports.push(exportPromise)

    // Clean up completed exports
    void exportPromise.finally(() => {
      const index = this.pendingExports.indexOf(exportPromise)
      if (index > -1) {
        void this.pendingExports.splice(index, 1)
      }
    })
  }

  private async doExport(
    logs: ReadableLogRecord[],
    resultCallback: (result: ExportResult) => void,
  ): Promise<void> {
    try {
      // Filter for event logs only (by scope name)
      const eventLogs = logs.filter(
        log =>
          log.instrumentationScope?.name === 'com.anthropic.claude_code.events',
      )

      if (eventLogs.length === 0) {
        resultCallback({ code: ExportResultCode.SUCCESS })
        return
      }

      // Transform new logs (failed events are retried independently via backoff)
      const events = this.transformLogsToEvents(eventLogs).events

      if (events.length === 0) {
        resultCallback({ code: ExportResultCode.SUCCESS })
        return
      }

      if (this.attempts >= this.maxAttempts) {
        resultCallback({
          code: ExportResultCode.FAILED,
          error: new Error(
            `Dropped ${events.length} events: max attempts (${this.maxAttempts}) reached`,
          ),
        })
        return
      }

      // Send events
      const failedEvents = await this.sendEventsInBatches(events)
      this.attempts++

      if (failedEvents.length > 0) {
        await this.queueFailedEvents(failedEvents)
        this.scheduleBackoffRetry()
        const context = this.lastExportErrorContext
          ? ` (${this.lastExportErrorContext})`
          : ''
        resultCallback({
          code: ExportResultCode.FAILED,
          error: new Error(
            `Failed to export ${failedEvents.length} events${context}`,
          ),
        })
        return
      }

      // Success - reset backoff and immediately retry any queued events
      this.resetBackoff()
      if ((await this.getQueuedEventCount()) > 0 && !this.isRetrying) {
        void this.retryFailedEvents()
      }
      resultCallback({ code: ExportResultCode.SUCCESS })
    } catch (error) {
      if (process.env.USER_TYPE === 'ant') {
        logForDebugging(
          `1P event logging export failed: ${errorMessage(error)}`,
        )
      }
      logError(error)
      resultCallback({
        code: ExportResultCode.FAILED,
        error: toError(error),
      })
    }
  }

  private async sendEventsInBatches(
    events: FirstPartyEventLoggingEvent[],
  ): Promise<FirstPartyEventLoggingEvent[]> {
    // Chunk events into batches
    const batches: FirstPartyEventLoggingEvent[][] = []
    for (let i = 0; i < events.length; i += this.maxBatchSize) {
      batches.push(events.slice(i, i + this.maxBatchSize))
    }

    if (process.env.USER_TYPE === 'ant') {
      logForDebugging(
        `1P event logging: exporting ${events.length} events in ${batches.length} batch(es)`,
      )
    }

    // Send each batch with delay between them. On first failure, assume the
    // endpoint is down and short-circuit: queue the failed batch plus all
    // remaining unsent batches without POSTing them. The backoff retry will
    // probe again with a single batch next tick.
    const failedBatchEvents: FirstPartyEventLoggingEvent[] = []
    let lastErrorContext: string | undefined
    for (let i = 0; i < batches.length; i++) {
      const batch = batches[i]!
      try {
        await this.sendBatchWithRetry({ events: batch })
      } catch (error) {
        lastErrorContext = getAxiosErrorContext(error)
        for (let j = i; j < batches.length; j++) {
          failedBatchEvents.push(...batches[j]!)
        }
        if (process.env.USER_TYPE === 'ant') {
          const skipped = batches.length - 1 - i
          logForDebugging(
            `1P event logging: batch ${i + 1}/${batches.length} failed (${lastErrorContext}); short-circuiting ${skipped} remaining batch(es)`,
          )
        }
        break
      }

      if (i < batches.length - 1 && this.batchDelayMs > 0) {
        await sleep(this.batchDelayMs)
      }
    }

    if (failedBatchEvents.length > 0 && lastErrorContext) {
      this.lastExportErrorContext = lastErrorContext
    }

    return failedBatchEvents
  }

  private async queueFailedEvents(
    events: FirstPartyEventLoggingEvent[],
  ): Promise<void> {
    const filePath = this.getCurrentBatchFilePath()

    // Append-only: just add new events to file (atomic on most filesystems)
    await this.appendEventsToFile(filePath, events)

    const context = this.lastExportErrorContext
      ? ` (${this.lastExportErrorContext})`
      : ''
    const message = `1P event logging: ${events.length} events failed to export${context}`
    logError(new Error(message))
  }

  private scheduleBackoffRetry(): void {
    // Don't schedule if already retrying or shutdown
    if (this.cancelBackoff || this.isRetrying || this.isShutdown) {
      return
    }

    // Quadratic backoff (matching Statsig SDK): base * attempts²
    const delay = Math.min(
      this.baseBackoffDelayMs * this.attempts * this.attempts,
      this.maxBackoffDelayMs,
    )

    if (process.env.USER_TYPE === 'ant') {
      logForDebugging(
        `1P event logging: scheduling backoff retry in ${delay}ms (attempt ${this.attempts})`,
      )
    }

    this.cancelBackoff = this.schedule(async () => {
      this.cancelBackoff = null
      await this.retryFailedEvents()
    }, delay)
  }

  private async retryFailedEvents(): Promise<void> {
    const filePath = this.getCurrentBatchFilePath()

    // Keep retrying while there are events and endpoint is healthy
    while (!this.isShutdown) {
      const events = await this.loadEventsFromFile(filePath)
      if (events.length === 0) break

      if (this.attempts >= this.maxAttempts) {
        if (process.env.USER_TYPE === 'ant') {
          logForDebugging(
            `1P event logging: max attempts (${this.maxAttempts}) reached, dropping ${events.length} events`,
          )
        }
        await this.deleteFile(filePath)
        this.resetBackoff()
        return
      }

      this.isRetrying = true

      // Clear file before retry (we have events in memory now)
      await this.deleteFile(filePath)

      if (process.env.USER_TYPE === 'ant') {
        logForDebugging(
          `1P event logging: retrying ${events.length} failed events (attempt ${this.attempts + 1})`,
        )
      }

      const failedEvents = await this.sendEventsInBatches(events)
      this.attempts++

      this.isRetrying = false

      if (failedEvents.length > 0) {
        // Write failures back to disk
        await this.saveEventsToFile(filePath, failedEvents)
        this.scheduleBackoffRetry()
        return // Failed - wait for backoff
      }

      // Success - reset backoff and continue loop to drain any newly queued events
      this.resetBackoff()
      if (process.env.USER_TYPE === 'ant') {
        logForDebugging('1P event logging: backoff retry succeeded')
      }
    }
  }

  private resetBackoff(): void {
    this.attempts = 0
    if (this.cancelBackoff) {
      this.cancelBackoff()
      this.cancelBackoff = null
    }
  }

  private async sendBatchWithRetry(
    payload: FirstPartyEventLoggingPayload,
  ): Promise<void> {
    if (this.isKilled()) {
      // Throw so the caller short-circuits remaining batches and queues
      // everything to disk. Zero network traffic while killed; the backoff
      // timer keeps ticking and will resume POSTs as soon as the GrowthBook
      // cache picks up the cleared flag.
      throw new Error('firstParty sink killswitch active')
    }

    const baseHeaders: Record<string, string> = {
      'Content-Type': 'application/json',
      'User-Agent': getClaudeCodeUserAgent(),
      'x-service-name': 'claude-code',
    }

    // Skip auth if trust hasn't been established yet
    // This prevents executing apiKeyHelper commands before the trust dialog
    // Non-interactive sessions implicitly have workspace trust
    const hasTrust =
      checkHasTrustDialogAccepted() || getIsNonInteractiveSession()
    if (process.env.USER_TYPE === 'ant' && !hasTrust) {
      logForDebugging('1P event logging: Trust not accepted')
    }

    // Skip auth when the OAuth token is expired or lacks user:profile
    // scope (service key sessions). Falls through to unauthenticated send.
    let shouldSkipAuth = this.skipAuth || !hasTrust
    if (!shouldSkipAuth && isClaudeAISubscriber()) {
      const tokens = getClaudeAIOAuthTokens()
      if (!hasProfileScope()) {
        shouldSkipAuth = true
      } else if (tokens && isOAuthTokenExpired(tokens.expiresAt)) {
        shouldSkipAuth = true
        if (process.env.USER_TYPE === 'ant') {
          logForDebugging(
            '1P event logging: OAuth token expired, skipping auth to avoid 401',
          )
        }
      }
    }

    // Try with auth headers first (unless trust not established or token is known to be expired)
    const authResult = shouldSkipAuth
      ? { headers: {}, error: 'trust not established or Oauth token expired' }
      : getAuthHeaders()
    const useAuth = !authResult.error

    if (!useAuth && process.env.USER_TYPE === 'ant') {
      logForDebugging(
        `1P event logging: auth not available, sending without auth`,
      )
    }

    const headers = useAuth
      ? { ...baseHeaders, ...authResult.headers }
      : baseHeaders

    try {
      const response = await axios.post(this.endpoint, payload, {
        timeout: this.timeout,
        headers,
      })
      this.logSuccess(payload.events.length, useAuth, response.data)
      return
    } catch (error) {
      // Handle 401 by retrying without auth
      if (
        useAuth &&
        axios.isAxiosError(error) &&
        error.response?.status === 401
      ) {
        if (process.env.USER_TYPE === 'ant') {
          logForDebugging(
            '1P event logging: 401 auth error, retrying without auth',
          )
        }
        const response = await axios.post(this.endpoint, payload, {
          timeout: this.timeout,
          headers: baseHeaders,
        })
        this.logSuccess(payload.events.length, false, response.data)
        return
      }

      throw error
    }
  }

  private logSuccess(
    eventCount: number,
    withAuth: boolean,
    responseData: unknown,
  ): void {
    if (process.env.USER_TYPE === 'ant') {
      logForDebugging(
        `1P event logging: ${eventCount} events exported successfully${withAuth ? ' (with auth)' : ' (without auth)'}`,
      )
      logForDebugging(`API Response: ${jsonStringify(responseData, null, 2)}`)
    }
  }

  private hrTimeToDate(hrTime: HrTime): Date {
    const [seconds, nanoseconds] = hrTime
    return new Date(seconds * 1000 + nanoseconds / 1000000)
  }

  private transformLogsToEvents(
    logs: ReadableLogRecord[],
  ): FirstPartyEventLoggingPayload {
    const events: FirstPartyEventLoggingEvent[] = []

    for (const log of logs) {
      const attributes = log.attributes || {}

      // Check if this is a GrowthBook experiment event
      if (attributes.event_type === 'GrowthbookExperimentEvent') {
        const timestamp = this.hrTimeToDate(log.hrTime)
        const account_uuid = attributes.account_uuid as string | undefined
        const organization_uuid = attributes.organization_uuid as
          | string
          | undefined
        events.push({
          event_type: 'GrowthbookExperimentEvent',
          event_data: GrowthbookExperimentEvent.toJSON({
            event_id: attributes.event_id as string,
            timestamp,
            experiment_id: attributes.experiment_id as string,
            variation_id: attributes.variation_id as number,
            environment: attributes.environment as string,
            user_attributes: attributes.user_attributes as string,
            experiment_metadata: attributes.experiment_metadata as string,
            device_id: attributes.device_id as string,
            session_id: attributes.session_id as string,
            auth:
              account_uuid || organization_uuid
                ? { account_uuid, organization_uuid }
                : undefined,
          }),
        })
        continue
      }

      // Extract event name
      const eventName =
        (attributes.event_name as string) || (log.body as string) || 'unknown'

      // Extract metadata objects directly (no JSON parsing needed)
      const coreMetadata = attributes.core_metadata as EventMetadata | undefined
      const userMetadata = attributes.user_metadata as CoreUserData
      const eventMetadata = (attributes.event_metadata || {}) as Record<
        string,
        unknown
      >

      if (!coreMetadata) {
        // Emit partial event if core metadata is missing
        if (process.env.USER_TYPE === 'ant') {
          logForDebugging(
            `1P event logging: core_metadata missing for event ${eventName}`,
          )
        }
        events.push({
          event_type: 'ClaudeCodeInternalEvent',
          event_data: ClaudeCodeInternalEvent.toJSON({
            event_id: attributes.event_id as string | undefined,
            event_name: eventName,
            client_timestamp: this.hrTimeToDate(log.hrTime),
            session_id: getSessionId(),
            additional_metadata: Buffer.from(
              jsonStringify({
                transform_error: 'core_metadata attribute is missing',
              }),
            ).toString('base64'),
          }),
        })
        continue
      }

      // Transform to 1P format
      const formatted = to1PEventFormat(
        coreMetadata,
        userMetadata,
        eventMetadata,
      )

      // _PROTO_* keys are PII-tagged values meant only for privileged BQ
      // columns. Hoist known keys to proto fields, then defensively strip any
      // remaining _PROTO_* so an unrecognized future key can't silently land
      // in the general-access additional_metadata blob. sink.ts applies the
      // same strip before Datadog; this closes the 1P side.
      const {
        _PROTO_skill_name,
        _PROTO_plugin_name,
        _PROTO_marketplace_name,
        ...rest
      } = formatted.additional
      const additionalMetadata = stripProtoFields(rest)

      events.push({
        event_type: 'ClaudeCodeInternalEvent',
        event_data: ClaudeCodeInternalEvent.toJSON({
          event_id: attributes.event_id as string | undefined,
          event_name: eventName,
          client_timestamp: this.hrTimeToDate(log.hrTime),
          device_id: attributes.user_id as string | undefined,
          email: userMetadata?.email,
          auth: formatted.auth,
          ...formatted.core,
          env: formatted.env,
          process: formatted.process,
          skill_name:
            typeof _PROTO_skill_name === 'string'
              ? _PROTO_skill_name
              : undefined,
          plugin_name:
            typeof _PROTO_plugin_name === 'string'
              ? _PROTO_plugin_name
              : undefined,
          marketplace_name:
            typeof _PROTO_marketplace_name === 'string'
              ? _PROTO_marketplace_name
              : undefined,
          additional_metadata:
            Object.keys(additionalMetadata).length > 0
              ? Buffer.from(jsonStringify(additionalMetadata)).toString(
                  'base64',
                )
              : undefined,
        }),
      })
    }

    return { events }
  }

  async shutdown(): Promise<void> {
    this.isShutdown = true
    this.resetBackoff()
    await this.forceFlush()
    if (process.env.USER_TYPE === 'ant') {
      logForDebugging('1P event logging exporter shutdown complete')
    }
  }

  async forceFlush(): Promise<void> {
    await Promise.all(this.pendingExports)
    if (process.env.USER_TYPE === 'ant') {
      logForDebugging('1P event logging exporter flush complete')
    }
  }
}

function getAxiosErrorContext(error: unknown): string {
  if (!axios.isAxiosError(error)) {
    return errorMessage(error)
  }

  const parts: string[] = []

  const requestId = error.response?.headers?.['request-id']
  if (requestId) {
    parts.push(`request-id=${requestId}`)
  }

  if (error.response?.status) {
    parts.push(`status=${error.response.status}`)
  }

  if (error.code) {
    parts.push(`code=${error.code}`)
  }

  if (error.message) {
    parts.push(error.message)
  }

  return parts.join(', ')
}