BIG pi update with claude chat
This commit is contained in:
779
pi/.pi/agent/shared/claude-stream.ts
Normal file
779
pi/.pi/agent/shared/claude-stream.ts
Normal file
@@ -0,0 +1,779 @@
|
||||
/**
|
||||
* claude-stream — Shared types, rendering, and core spawn/stream logic
|
||||
* for ask-claude and chat-claude pi extensions.
|
||||
*
|
||||
* Both extensions spawn `claude -p --output-format stream-json` and parse
|
||||
* the same streaming protocol. This module provides:
|
||||
* - Block types (ThinkingBlock, ToolBlock, TextBlock)
|
||||
* - Rendering helpers (tool call lines, result boxes, usage formatting)
|
||||
* - runClaude() — the core spawn + stream parser
|
||||
*/
|
||||
|
||||
import { spawn } from "node:child_process";
|
||||
import { readFileSync } from "node:fs";
|
||||
import { diffLines } from "diff";
|
||||
import { getMarkdownTheme } from "@mariozechner/pi-coding-agent";
|
||||
import { Box, Container, Markdown, Spacer, Text } from "@mariozechner/pi-tui";
|
||||
|
||||
// =============================================================================
|
||||
// Block types
|
||||
// =============================================================================
|
||||
|
||||
export interface ThinkingBlock {
|
||||
type: "thinking";
|
||||
text: string;
|
||||
}
|
||||
|
||||
export interface ToolBlock {
|
||||
type: "tool";
|
||||
id: string;
|
||||
name: string;
|
||||
inputJson: string;
|
||||
editContext?: { before: string[]; after: string[]; startLine: number };
|
||||
result?: { text: string; isError: boolean };
|
||||
}
|
||||
|
||||
export interface TextBlock {
|
||||
type: "text";
|
||||
text: string;
|
||||
}
|
||||
|
||||
export type StreamBlock = ThinkingBlock | ToolBlock | TextBlock;
|
||||
|
||||
// =============================================================================
|
||||
// Details interface (stored in tool result, drives rendering)
|
||||
// =============================================================================
|
||||
|
||||
export interface ClaudeDetails {
|
||||
label: string;
|
||||
done: boolean;
|
||||
blocks: StreamBlock[];
|
||||
finalText: string;
|
||||
sessionId?: string;
|
||||
isResume?: boolean;
|
||||
costUsd?: number;
|
||||
inputTokens?: number;
|
||||
outputTokens?: number;
|
||||
cacheReadTokens?: number;
|
||||
cacheWriteTokens?: number;
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Rendering helpers
|
||||
// =============================================================================
|
||||
|
||||
export function shortenPath(p: string): string {
|
||||
const home = process.env.HOME ?? "";
|
||||
return home && p.startsWith(home) ? "~" + p.slice(home.length) : p;
|
||||
}
|
||||
|
||||
export function formatUsage(d: ClaudeDetails): string {
|
||||
const parts: string[] = [];
|
||||
if (d.inputTokens) parts.push(`↑${d.inputTokens}`);
|
||||
if (d.outputTokens) parts.push(`↓${d.outputTokens}`);
|
||||
if (d.cacheReadTokens) parts.push(`R${d.cacheReadTokens}`);
|
||||
if (d.cacheWriteTokens) parts.push(`W${d.cacheWriteTokens}`);
|
||||
if (d.costUsd) parts.push(d.costUsd < 0.001 ? `$${(d.costUsd * 1000).toFixed(2)}m` : `$${d.costUsd.toFixed(4)}`);
|
||||
return parts.join(" ");
|
||||
}
|
||||
|
||||
export type Theme = {
|
||||
fg: (c: any, t: string) => string;
|
||||
bg: (c: any, t: string) => string;
|
||||
bold: (t: string) => string;
|
||||
italic: (t: string) => string;
|
||||
dim?: (t: string) => string;
|
||||
};
|
||||
|
||||
export function renderToolCallLine(block: ToolBlock, theme: Theme): string {
|
||||
let input: Record<string, unknown> = {};
|
||||
try { input = JSON.parse(block.inputJson); } catch { /* ok */ }
|
||||
|
||||
switch (block.name.toLowerCase()) {
|
||||
case "read": {
|
||||
const path = shortenPath(String(input.file_path ?? input.path ?? ""));
|
||||
const offset = Number(input.offset ?? 1);
|
||||
const limit = input.limit != null ? Number(input.limit) : undefined;
|
||||
const range = limit != null ? `:${offset}-${offset + limit - 1}` : "";
|
||||
return theme.fg("muted", "read ") + theme.fg("accent", path) + theme.fg("warning", range);
|
||||
}
|
||||
case "bash": {
|
||||
const cmd = String(input.command ?? "").replace(/\n/g, " ↵ ");
|
||||
return theme.fg("muted", "$ ") + theme.fg("toolOutput", cmd);
|
||||
}
|
||||
case "edit": {
|
||||
const path = shortenPath(String(input.file_path ?? input.path ?? ""));
|
||||
return theme.fg("muted", "edit ") + theme.fg("accent", path);
|
||||
}
|
||||
case "write": {
|
||||
const path = shortenPath(String(input.file_path ?? input.path ?? ""));
|
||||
const lines = String(input.content ?? "").split("\n").length;
|
||||
return theme.fg("muted", "write ") + theme.fg("accent", path) + theme.fg("dim", ` (${lines} lines)`);
|
||||
}
|
||||
case "glob": {
|
||||
const pat = String(input.pattern ?? "");
|
||||
const path = input.path ? shortenPath(String(input.path)) : ".";
|
||||
return theme.fg("muted", "glob ") + theme.fg("accent", pat) + theme.fg("dim", ` in ${path}`);
|
||||
}
|
||||
case "grep": {
|
||||
const pat = String(input.pattern ?? "");
|
||||
const path = input.path ? shortenPath(String(input.path)) : ".";
|
||||
return theme.fg("muted", "grep ") + theme.fg("accent", `"${pat}"`) + theme.fg("dim", ` in ${path}`);
|
||||
}
|
||||
case "mcp__pi__ask": {
|
||||
// Surfaces from the pi-ask-bridge MCP server. input is
|
||||
// { questions: [{ id, question, options[], multi?, recommended? }, ...] }
|
||||
// Show the first question text inline; if there are more, append a count.
|
||||
const qs = Array.isArray(input.questions) ? (input.questions as any[]) : [];
|
||||
const first = qs[0];
|
||||
const head = first?.question ? String(first.question) : "(empty)";
|
||||
const more = qs.length > 1 ? ` (+${qs.length - 1} more)` : "";
|
||||
const tag = first?.id ? ` [${first.id}]` : "";
|
||||
return theme.fg("muted", "ask ") + theme.fg("accent", head) + theme.fg("dim", tag + more);
|
||||
}
|
||||
default: {
|
||||
const desc = typeof input.description === "string" ? input.description
|
||||
: typeof input.prompt === "string" ? input.prompt.split("\n")[0]
|
||||
: block.inputJson;
|
||||
return theme.fg("toolTitle", block.name) + theme.fg("dim", " " + desc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function renderToolResultBox(block: ToolBlock, theme: Theme): Text {
|
||||
if (!block.result) return new Text("", 0, 0);
|
||||
|
||||
let input: Record<string, unknown> = {};
|
||||
try { input = JSON.parse(block.inputJson); } catch { /* ok */ }
|
||||
|
||||
// Strip / restyle synthetic envelopes like <tool_use_error> and
|
||||
// <system_reminder> before any tool-specific parsing.
|
||||
const text = transformSpecialTags(block.result.text);
|
||||
const { isError } = block.result;
|
||||
|
||||
switch (block.name.toLowerCase()) {
|
||||
case "read": {
|
||||
const rawLines = text.split("\n").filter((l) => l.length > 0);
|
||||
const parsed = rawLines.map((l) => {
|
||||
const tab = l.indexOf("\t");
|
||||
return tab >= 0 ? { num: l.slice(0, tab), content: l.slice(tab + 1) } : { num: "", content: l };
|
||||
});
|
||||
const maxNumLen = parsed.reduce((m, l) => Math.max(m, l.num.length), 0);
|
||||
return new Text(
|
||||
parsed.map((l) => theme.fg("dim", l.num.padStart(maxNumLen)) + " " + l.content).join("\n"),
|
||||
0, 0,
|
||||
);
|
||||
}
|
||||
case "edit": {
|
||||
if (isError) return new Text(text, 0, 0);
|
||||
const oldStr = String(input.old_string ?? input.oldText ?? "");
|
||||
const newStr = String(input.new_string ?? input.newText ?? "");
|
||||
if (!oldStr && !newStr) return new Text(text.slice(0, 200), 0, 0);
|
||||
const oldLines = oldStr === "" ? [] : oldStr.split("\n");
|
||||
const newLines = newStr === "" ? [] : newStr.split("\n");
|
||||
const ctx = block.editContext;
|
||||
const startLine = ctx?.startLine ?? 1;
|
||||
const header = theme.fg("dim", `@@ -${startLine},${oldLines.length} +${startLine},${newLines.length} @@`);
|
||||
const diff: string[] = [header];
|
||||
for (const l of ctx?.before ?? []) diff.push(theme.fg("dim", " " + l));
|
||||
for (const l of oldLines) diff.push(theme.fg("toolDiffRemoved", "-" + l));
|
||||
for (const l of newLines) diff.push(theme.fg("toolDiffAdded", "+" + l));
|
||||
for (const l of ctx?.after ?? []) diff.push(theme.fg("dim", " " + l));
|
||||
return new Text(diff.join("\n"), 0, 0);
|
||||
}
|
||||
case "write": {
|
||||
if (isError) return new Text(text, 0, 0);
|
||||
const lines = String(input.content ?? "").split("\n");
|
||||
const numWidth = String(lines.length).length;
|
||||
return new Text(
|
||||
lines.map((l, i) => theme.fg("dim", String(i + 1).padStart(numWidth)) + " " + l).join("\n"),
|
||||
0, 0,
|
||||
);
|
||||
}
|
||||
case "bash":
|
||||
return new Text(text.trimEnd(), 0, 0);
|
||||
case "mcp__pi__ask": {
|
||||
// The pi-ask-mcp server returns a JSON array of QuestionResults.
|
||||
// Pretty-print as one "id → answer" line per question instead of
|
||||
// dumping raw JSON into the result banner.
|
||||
if (isError) return new Text(text.trim(), 0, 0);
|
||||
let parsed: any;
|
||||
try { parsed = JSON.parse(text); } catch { return new Text(text.trim(), 0, 0); }
|
||||
if (!Array.isArray(parsed)) return new Text(text.trim(), 0, 0);
|
||||
const lines: string[] = [];
|
||||
for (const r of parsed) {
|
||||
const id = theme.fg("accent", String(r?.id ?? "?"));
|
||||
const opts = Array.isArray(r?.selectedOptions) ? r.selectedOptions : [];
|
||||
const custom = r?.customInput ? String(r.customInput) : "";
|
||||
const arrow = theme.fg("dim", " → ");
|
||||
let answer: string;
|
||||
if (opts.length === 0 && !custom) {
|
||||
answer = theme.fg("warning", "(cancelled)");
|
||||
} else if (opts.length > 0 && custom) {
|
||||
answer = theme.fg("toolOutput", `[${opts.join(", ")}] + Other: "${custom}"`);
|
||||
} else if (custom) {
|
||||
answer = theme.fg("toolOutput", `Other: "${custom}"`);
|
||||
} else {
|
||||
answer = theme.fg("toolOutput", opts.length === 1 ? opts[0] : `[${opts.join(", ")}]`);
|
||||
}
|
||||
lines.push(id + arrow + answer);
|
||||
}
|
||||
return new Text(lines.join("\n"), 0, 0);
|
||||
}
|
||||
default:
|
||||
return new Text(text.trim(), 0, 0);
|
||||
}
|
||||
}
|
||||
|
||||
/** Strip ANSI SGR sequences so we can re-style plain text. */
|
||||
function stripAnsi(s: string): string {
|
||||
return s.replace(/\x1b\[[0-9;]*m/g, "");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Special-tag transform — used for tool-result text that embeds synthetic
|
||||
// envelopes like <tool_use_error> or <system_reminder>.
|
||||
//
|
||||
// IMPORTANT: all wrappers use PARTIAL resets (`\x1b[22;23;24;39m` — reset
|
||||
// bold/dim/italic/underline and foreground only). If we emitted \x1b[0m in
|
||||
// the middle of a line, it would also wipe any background that Box.bgFn
|
||||
// painted around us, producing a staircase-shaped hole where the padding
|
||||
// loses its colour. Partial resets leave the background intact.
|
||||
// ---------------------------------------------------------------------------
|
||||
const SGR_FG_RESET = "\x1b[22;23;24;39m";
|
||||
const SGR_BOLD_WHITE = "\x1b[1;97m";
|
||||
const SGR_BOLD_RED = "\x1b[1;91m";
|
||||
const SGR_DIM_GREY = "\x1b[2;90m";
|
||||
|
||||
function transformSpecialTags(text: string): string {
|
||||
// <tool_use_error>message</tool_use_error>
|
||||
// → strip tags, display message in bright bold red
|
||||
text = text.replace(
|
||||
/<tool_use_error>\s*([\s\S]*?)\s*<\/tool_use_error>/g,
|
||||
(_, inner: string) => SGR_BOLD_RED + inner + SGR_FG_RESET,
|
||||
);
|
||||
// <system_reminder> or <system-reminder>
|
||||
// → strip tags, collapse whitespace to one line, truncate at ~100 chars
|
||||
// with a trailing "...", render in dim grey
|
||||
text = text.replace(
|
||||
/<system[-_]reminder>\s*([\s\S]*?)\s*<\/system[-_]reminder>/g,
|
||||
(_, inner: string) => {
|
||||
const oneLine = inner.replace(/\s+/g, " ").trim();
|
||||
const MAX = 100;
|
||||
const snippet = oneLine.length > MAX ? oneLine.slice(0, MAX).trimEnd() : oneLine;
|
||||
return SGR_DIM_GREY + snippet + "..." + SGR_FG_RESET;
|
||||
},
|
||||
);
|
||||
return text;
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a tool block as two stacked banners:
|
||||
* 1) header — bold-white title on an ORANGE background (matches the
|
||||
* chat-claude outer border colour).
|
||||
* 2) result body (if any) — on the theme's pending / success / error bg.
|
||||
*
|
||||
* The header deliberately uses a different background from the result, so
|
||||
* the "staircase" where the banners have different widths is an intentional
|
||||
* stylistic cue, not a rendering glitch.
|
||||
*/
|
||||
const ORANGE_BG_FN = (s: string) => "\x1b[48;5;130m" + s + "\x1b[0m";
|
||||
|
||||
export function renderToolBlock(block: ToolBlock, theme: Theme): Container {
|
||||
const c = new Container();
|
||||
|
||||
// ---- Header banner (orange) ----
|
||||
// Strip per-segment colours from renderToolCallLine so the title renders
|
||||
// uniformly bold-white over the orange background. Partial-reset at the
|
||||
// end keeps the orange bg alive for the trailing padding.
|
||||
const headerText = SGR_BOLD_WHITE + stripAnsi(renderToolCallLine(block, theme)) + SGR_FG_RESET;
|
||||
c.addChild(new Text(headerText, 2, 0, ORANGE_BG_FN));
|
||||
|
||||
// ---- Result body ----
|
||||
if (block.result !== undefined) {
|
||||
const bgFn = block.result.isError
|
||||
? (s: string) => theme.bg("toolErrorBg", s)
|
||||
: (s: string) => theme.bg("toolSuccessBg", s);
|
||||
const box = new Box(2, 0, bgFn);
|
||||
box.addChild(renderToolResultBox(block, theme));
|
||||
c.addChild(box);
|
||||
}
|
||||
return c;
|
||||
}
|
||||
|
||||
/** Build a unified diff string for an edit tool call. */
|
||||
export function buildEditDiff(oldStr: string, newStr: string, contextLines = 3): string {
|
||||
const changes = diffLines(oldStr, newStr);
|
||||
if (changes.length === 1 && (changes[0].added || changes[0].removed)) {
|
||||
const oldCount = (oldStr ? oldStr.split("\n").length : 0);
|
||||
const newCount = (newStr ? newStr.split("\n").length : 0);
|
||||
const lines: string[] = [
|
||||
`@@ -1,${oldCount} +1,${newCount} @@`,
|
||||
...oldStr.split("\n").map((l) => "-" + l),
|
||||
...newStr.split("\n").map((l) => "+" + l),
|
||||
];
|
||||
return lines.join("\n");
|
||||
}
|
||||
|
||||
const ctxLines: string[] = oldStr.split("\n");
|
||||
const hunks: { start: number; lines: string[] }[] = [];
|
||||
let currentHunk: { start: number; lines: string[] } | null = null;
|
||||
let lineIdx = 0;
|
||||
|
||||
for (const change of changes) {
|
||||
const lines = change.value.endsWith("\n") ? change.value.slice(0, -1).split("\n") : change.value.split("\n");
|
||||
if (change.added) {
|
||||
if (!currentHunk) currentHunk = { start: lineIdx, lines: [] };
|
||||
for (const l of lines) currentHunk.lines.push("+" + l);
|
||||
} else if (change.removed) {
|
||||
if (!currentHunk) currentHunk = { start: lineIdx, lines: [] };
|
||||
for (const l of lines) currentHunk.lines.push("-" + l);
|
||||
lineIdx += lines.length;
|
||||
} else {
|
||||
for (const l of lines) {
|
||||
if (!currentHunk) { lineIdx++; continue; }
|
||||
currentHunk.lines.push(" " + l);
|
||||
if (currentHunk.lines.length >= contextLines * 2 + 1) {
|
||||
hunks.push(currentHunk);
|
||||
currentHunk = null;
|
||||
}
|
||||
lineIdx++;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (currentHunk) hunks.push(currentHunk);
|
||||
|
||||
if (hunks.length === 0) return "(no changes)";
|
||||
|
||||
const merged: { start: number; lines: string[] }[] = [];
|
||||
for (const h of hunks) {
|
||||
if (merged.length === 0 || h.start <= merged[merged.length - 1].start + merged[merged.length - 1].lines.length) {
|
||||
if (merged.length > 0) {
|
||||
const prev = merged[merged.length - 1];
|
||||
prev.lines.push(...h.lines.slice(Math.max(0, prev.lines.length - (h.start - prev.start))));
|
||||
} else {
|
||||
merged.push({ ...h, lines: [...h.lines] });
|
||||
}
|
||||
} else {
|
||||
merged.push({ ...h, lines: [...h.lines] });
|
||||
}
|
||||
}
|
||||
|
||||
const resultLines: string[] = [];
|
||||
for (const h of merged) {
|
||||
const removed = h.lines.filter((l) => l.startsWith("-")).length;
|
||||
const added = h.lines.filter((l) => l.startsWith("+")).length;
|
||||
const total = h.lines.length;
|
||||
resultLines.push(`@@ -${h.start},${removed + (total - removed - added)} +${h.start},${added + (total - removed - added)} @@`);
|
||||
resultLines.push(...h.lines);
|
||||
}
|
||||
return resultLines.join("\n");
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Error helpers
|
||||
// =============================================================================
|
||||
|
||||
const ANTHROPIC_ERROR_MAP: [RegExp, string][] = [
|
||||
[/rate_limit/i, "Rate limited by Anthropic — please try again in a few seconds."],
|
||||
[/invalid_api_key/i, "Invalid Anthropic API key — check your ANTHROPIC_API_KEY."],
|
||||
[/permission_error/i, "Permission denied — check your Anthropic account and API key."],
|
||||
[/not_found.*model/i, "Model not found — the specified model is not available."],
|
||||
[/context_length/i, "Context window exceeded — the input is too large for this model."],
|
||||
[/overloaded/i, "Anthropic is overloaded — please retry shortly."],
|
||||
[/billing/i, "Billing issue — check your Anthropic account status."],
|
||||
[/invalid_request/i, "Invalid request — the prompt or parameters may be malformed."],
|
||||
];
|
||||
|
||||
export function formatAnthropicError(raw: string, exitCode?: number): string {
|
||||
for (const [re, msg] of ANTHROPIC_ERROR_MAP) {
|
||||
if (re.test(raw)) return msg;
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(raw.trim());
|
||||
if (parsed.type === "error" && parsed.error?.message) {
|
||||
const em = parsed.error.message;
|
||||
for (const [re, msg] of ANTHROPIC_ERROR_MAP) {
|
||||
if (re.test(em)) return msg;
|
||||
}
|
||||
return em;
|
||||
}
|
||||
} catch { /* not JSON */ }
|
||||
for (const line of raw.split("\n")) {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed.startsWith("{")) continue;
|
||||
try {
|
||||
const parsed = JSON.parse(trimmed);
|
||||
if (parsed.type === "error" && parsed.error?.message) {
|
||||
const em = parsed.error.message;
|
||||
for (const [re, msg] of ANTHROPIC_ERROR_MAP) {
|
||||
if (re.test(em)) return msg;
|
||||
}
|
||||
return em;
|
||||
}
|
||||
} catch { /* skip */ }
|
||||
}
|
||||
if (raw.trim()) return raw.trim().slice(0, 500);
|
||||
return `claude process exited with code ${exitCode ?? "unknown"}`;
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Core: spawn claude CLI and stream all output
|
||||
// =============================================================================
|
||||
|
||||
export interface RunClaudeOptions {
|
||||
model?: string; // default: "sonnet"
|
||||
agent?: string; // --agent <name> (from ~/.claude/agents/)
|
||||
sessionId?: string; // --resume <id> (multi-turn)
|
||||
noSessionPersistence?: boolean; // --no-session-persistence (ask-claude default)
|
||||
enrichEditDiffs?: boolean; // enrich edit tool results with unified diffs
|
||||
mcpConfigPath?: string; // --mcp-config <path> (e.g. pi-ask-bridge)
|
||||
disallowedTools?: string[]; // --disallowed-tools (e.g. ["AskUserQuestion"])
|
||||
// Extended-thinking effort level ("low"|"medium"|"high"|"xhigh"|"max").
|
||||
// REQUIRED for thinking blocks to appear: in `-p` / stream-json mode the
|
||||
// CLI does NOT honour the user's interactive `defaultThinkingLevel`
|
||||
// setting — thinking_delta events are only emitted when `--effort` is
|
||||
// passed explicitly. Pass "off" to leave the flag off entirely.
|
||||
effort?: string;
|
||||
extraEnv?: NodeJS.ProcessEnv; // merged on top of process.env for the child
|
||||
cwd: string;
|
||||
signal?: AbortSignal;
|
||||
timeoutMs?: number; // default: 15 min
|
||||
onUpdate: (partial: { blocks: StreamBlock[]; finalText: string }) => void;
|
||||
}
|
||||
|
||||
export interface RunClaudeResult {
|
||||
finalText: string;
|
||||
blocks: StreamBlock[];
|
||||
sessionId: string;
|
||||
costUsd: number;
|
||||
inputTokens: number;
|
||||
outputTokens: number;
|
||||
cacheReadTokens: number;
|
||||
cacheWriteTokens: number;
|
||||
}
|
||||
|
||||
export async function runClaude(prompt: string, opts: RunClaudeOptions): Promise<RunClaudeResult> {
|
||||
const DEFAULT_TIMEOUT_MS = 15 * 60 * 1000;
|
||||
const timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const args = ["-p", "--output-format", "stream-json", "--include-partial-messages"];
|
||||
|
||||
// Session mode: resume takes priority, then agent, then plain model
|
||||
if (opts.sessionId) {
|
||||
args.push("--resume", opts.sessionId);
|
||||
} else if (opts.agent) {
|
||||
args.push("--agent", opts.agent);
|
||||
if (opts.model) args.push("--model", opts.model);
|
||||
if (opts.noSessionPersistence) args.push("--no-session-persistence");
|
||||
} else {
|
||||
args.push("--model", opts.model ?? "sonnet");
|
||||
if (opts.noSessionPersistence) args.push("--no-session-persistence");
|
||||
}
|
||||
|
||||
// Extended-thinking effort — must be passed explicitly in -p mode;
|
||||
// the interactive `defaultThinkingLevel` setting does NOT apply here.
|
||||
// Callers pass "off" to suppress the flag (e.g. ask-claude where
|
||||
// raw speed matters more than thought traces).
|
||||
if (opts.effort && opts.effort !== "off") {
|
||||
args.push("--effort", opts.effort);
|
||||
}
|
||||
|
||||
// MCP config (e.g. pi-ask-bridge for routing AskUserQuestion-style
|
||||
// requests through pi's native UI). Additive — does NOT pass
|
||||
// --strict-mcp-config, so the user's other configured MCP servers
|
||||
// (exa, sentry, …) remain available to Claude.
|
||||
if (opts.mcpConfigPath) {
|
||||
args.push("--mcp-config", opts.mcpConfigPath);
|
||||
}
|
||||
// Tool denylist — typically ["AskUserQuestion"] when the MCP
|
||||
// server is providing a replacement.
|
||||
if (opts.disallowedTools?.length) {
|
||||
args.push("--disallowed-tools", opts.disallowedTools.join(","));
|
||||
}
|
||||
|
||||
const proc = spawn("claude", args, {
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
cwd: opts.cwd,
|
||||
env: opts.extraEnv ? { ...process.env, ...opts.extraEnv } : process.env,
|
||||
});
|
||||
|
||||
try {
|
||||
proc.stdin.write(prompt, "utf8");
|
||||
proc.stdin.end();
|
||||
} catch (err) {
|
||||
if ((err as NodeJS.ErrnoException).code !== "EPIPE") reject(err);
|
||||
}
|
||||
|
||||
let buffer = "";
|
||||
const blocks: StreamBlock[] = [];
|
||||
const pendingTools = new Map<number, { name: string; inputJson: string; id: string }>();
|
||||
|
||||
let sessionId = "";
|
||||
let costUsd = 0, inputTokens = 0, outputTokens = 0, cacheReadTokens = 0, cacheWriteTokens = 0;
|
||||
|
||||
const getOrCreateTextBlock = (): TextBlock => {
|
||||
const last = blocks[blocks.length - 1];
|
||||
if (last?.type === "text") return last;
|
||||
const b: TextBlock = { type: "text", text: "" };
|
||||
blocks.push(b);
|
||||
return b;
|
||||
};
|
||||
|
||||
const getOrCreateThinkingBlock = (): ThinkingBlock => {
|
||||
const last = blocks[blocks.length - 1];
|
||||
if (last?.type === "thinking") return last;
|
||||
const b: ThinkingBlock = { type: "thinking", text: "" };
|
||||
blocks.push(b);
|
||||
return b;
|
||||
};
|
||||
|
||||
const emit = () => {
|
||||
const finalText = blocks.filter((b): b is TextBlock => b.type === "text").map((b) => b.text).join("");
|
||||
opts.onUpdate({ blocks: [...blocks], finalText });
|
||||
};
|
||||
|
||||
const processLine = (line: string) => {
|
||||
if (!line.trim()) return;
|
||||
let event: any;
|
||||
try { event = JSON.parse(line); } catch { return; }
|
||||
|
||||
if (event.type === "stream_event") {
|
||||
const e = event.event as any;
|
||||
if (!e) return;
|
||||
|
||||
if (e.type === "content_block_start") {
|
||||
const cb = e.content_block;
|
||||
if (cb?.type === "tool_use") {
|
||||
pendingTools.set(e.index as number, { name: cb.name, inputJson: "", id: cb.id });
|
||||
}
|
||||
} else if (e.type === "content_block_delta") {
|
||||
const d = e.delta as any;
|
||||
if (d?.type === "text_delta") {
|
||||
getOrCreateTextBlock().text += d.text as string;
|
||||
emit();
|
||||
} else if (d?.type === "thinking_delta") {
|
||||
getOrCreateThinkingBlock().text += d.thinking as string;
|
||||
emit();
|
||||
} else if (d?.type === "input_json_delta") {
|
||||
const tool = pendingTools.get(e.index as number);
|
||||
if (tool) tool.inputJson += d.partial_json as string ?? "";
|
||||
}
|
||||
} else if (e.type === "content_block_stop") {
|
||||
const tool = pendingTools.get(e.index as number);
|
||||
if (tool) {
|
||||
// Claude CLI's --include-partial-messages can emit an `assistant`
|
||||
// event with the completed tool_use BEFORE content_block_stop
|
||||
// arrives. That path already pushed a block with the same id;
|
||||
// update it in place instead of pushing a duplicate.
|
||||
const existing = blocks.find(
|
||||
(b): b is ToolBlock => b.type === "tool" && b.id === tool.id,
|
||||
);
|
||||
const target: ToolBlock = existing ?? {
|
||||
type: "tool", id: tool.id, name: tool.name, inputJson: tool.inputJson,
|
||||
};
|
||||
if (existing) {
|
||||
// Prefer the streamed inputJson (it's been accumulating and
|
||||
// matches what claude-code actually executed).
|
||||
existing.inputJson = tool.inputJson;
|
||||
}
|
||||
// For edit tools, read file context before the edit executes
|
||||
if (tool.name.toLowerCase() === "edit" && !target.editContext) {
|
||||
try {
|
||||
const inp = JSON.parse(tool.inputJson) as Record<string, unknown>;
|
||||
const fp = String(inp.file_path ?? inp.path ?? "");
|
||||
const oldStr = String(inp.old_string ?? inp.oldText ?? "");
|
||||
if (fp && oldStr) {
|
||||
const fileLines = readFileSync(fp, "utf8").split("\n");
|
||||
const oldLines = oldStr.split("\n");
|
||||
let startIdx = -1;
|
||||
outer: for (let i = 0; i <= fileLines.length - oldLines.length; i++) {
|
||||
for (let j = 0; j < oldLines.length; j++) {
|
||||
if (fileLines[i + j] !== oldLines[j]) continue outer;
|
||||
}
|
||||
startIdx = i;
|
||||
break;
|
||||
}
|
||||
if (startIdx >= 0) {
|
||||
target.editContext = {
|
||||
before: fileLines.slice(Math.max(0, startIdx - 3), startIdx),
|
||||
after: fileLines.slice(startIdx + oldLines.length, startIdx + oldLines.length + 3),
|
||||
startLine: startIdx + 1,
|
||||
};
|
||||
}
|
||||
}
|
||||
} catch { /* ignore */ }
|
||||
}
|
||||
if (!existing) blocks.push(target);
|
||||
pendingTools.delete(e.index as number);
|
||||
emit();
|
||||
}
|
||||
} else if (e.type === "message_delta") {
|
||||
const u = e.usage as any;
|
||||
if (u) {
|
||||
inputTokens = u.input_tokens ?? inputTokens;
|
||||
outputTokens = u.output_tokens ?? outputTokens;
|
||||
cacheReadTokens = u.cache_read_input_tokens ?? cacheReadTokens;
|
||||
cacheWriteTokens = u.cache_creation_input_tokens ?? cacheWriteTokens;
|
||||
}
|
||||
}
|
||||
} else if (event.type === "user") {
|
||||
for (const c of (event.message?.content ?? []) as any[]) {
|
||||
if (c.type !== "tool_result") continue;
|
||||
// tool_result.content may be a plain string (typical for bash/read
|
||||
// output) or an array of {type,text}/{type,image} blocks. Handle both.
|
||||
let text: string;
|
||||
if (typeof c.content === "string") {
|
||||
text = c.content;
|
||||
} else if (Array.isArray(c.content)) {
|
||||
text = (c.content as any[])
|
||||
.filter((x) => x.type === "text")
|
||||
.map((x) => x.text as string)
|
||||
.join("\n");
|
||||
} else {
|
||||
text = "";
|
||||
}
|
||||
const toolId = c.tool_use_id as string;
|
||||
const toolBlock = blocks.findLast((b): b is ToolBlock => b.type === "tool" && b.id === toolId);
|
||||
if (toolBlock && toolBlock.name.toLowerCase() === "edit" && !c.is_error && opts.enrichEditDiffs) {
|
||||
try {
|
||||
const inp = JSON.parse(toolBlock.inputJson) as Record<string, unknown>;
|
||||
const oldStr = String(inp.old_string ?? inp.oldText ?? "");
|
||||
const newStr = String(inp.new_string ?? inp.newText ?? "");
|
||||
if (oldStr || newStr) {
|
||||
const diff = buildEditDiff(oldStr, newStr);
|
||||
text = `${text}\n\n--- edit diff ---\n${diff}\n--- end diff ---`;
|
||||
}
|
||||
} catch { /* ignore */ }
|
||||
}
|
||||
if (toolBlock) toolBlock.result = { text, isError: c.is_error === true };
|
||||
emit();
|
||||
}
|
||||
} else if (event.type === "assistant") {
|
||||
const content: any[] = (event as any).message?.content ?? [];
|
||||
for (const cb of content) {
|
||||
if (cb.type !== "tool_use") continue;
|
||||
const exists = blocks.some((b): b is ToolBlock => b.type === "tool" && b.id === cb.id);
|
||||
if (!exists) blocks.push({ type: "tool", id: cb.id, name: cb.name, inputJson: JSON.stringify(cb.input ?? {}) });
|
||||
}
|
||||
emit();
|
||||
} else if (event.type === "result") {
|
||||
sessionId = event.session_id ?? "";
|
||||
costUsd = event.total_cost_usd ?? 0;
|
||||
const u = event.usage as any;
|
||||
if (u) {
|
||||
inputTokens = u.input_tokens ?? inputTokens;
|
||||
outputTokens = u.output_tokens ?? outputTokens;
|
||||
cacheReadTokens = u.cache_read_input_tokens ?? cacheReadTokens;
|
||||
cacheWriteTokens = u.cache_creation_input_tokens ?? cacheWriteTokens;
|
||||
}
|
||||
} else if (event.type === "error") {
|
||||
const errMsg = event.error?.message ?? event.message ?? JSON.stringify(event);
|
||||
reject(new Error(formatAnthropicError(errMsg)));
|
||||
}
|
||||
};
|
||||
|
||||
proc.stdout.on("data", (data: Buffer) => {
|
||||
buffer += data.toString();
|
||||
const lines = buffer.split("\n");
|
||||
buffer = lines.pop() ?? "";
|
||||
for (const line of lines) processLine(line);
|
||||
});
|
||||
|
||||
let stderrOutput = "";
|
||||
proc.stderr.on("data", (data: Buffer) => { stderrOutput += data.toString(); });
|
||||
|
||||
proc.on("close", (code) => {
|
||||
clearTimeout(timeoutId);
|
||||
if (buffer.trim()) processLine(buffer);
|
||||
const finalText = blocks.filter((b): b is TextBlock => b.type === "text").map((b) => b.text).join("");
|
||||
if (code !== 0) {
|
||||
if (timeoutFired) {
|
||||
reject(new Error(`Claude timed out after ${timeoutMs / 1000}s — the process was killed. Consider using a simpler prompt or a faster model.`));
|
||||
} else {
|
||||
const errMsg = formatAnthropicError(stderrOutput.trim(), code);
|
||||
const detail = finalText ? ` (partial output: ${finalText.slice(0, 100)})` : "";
|
||||
reject(new Error(errMsg + detail));
|
||||
}
|
||||
return;
|
||||
}
|
||||
resolve({ finalText, blocks, sessionId, costUsd, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens });
|
||||
});
|
||||
|
||||
proc.on("error", (err) => {
|
||||
if ((err as NodeJS.ErrnoException).code === "ENOENT") {
|
||||
reject(new Error("Claude CLI not found — install it with `npm install -g @anthropic-ai/claude-code`"));
|
||||
} else {
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
|
||||
if (opts.signal) {
|
||||
const kill = () => {
|
||||
try { if (!proc.killed) proc.kill("SIGTERM"); } catch { /* ok */ }
|
||||
setTimeout(() => { try { if (!proc.killed) proc.kill("SIGKILL"); } catch { /* ok */ } }, 3000);
|
||||
};
|
||||
if (opts.signal.aborted) kill();
|
||||
else opts.signal.addEventListener("abort", kill, { once: true });
|
||||
}
|
||||
|
||||
let timeoutFired = false;
|
||||
const timeoutId = setTimeout(() => {
|
||||
timeoutFired = true;
|
||||
try { proc.kill("SIGTERM"); } catch { /* ok */ }
|
||||
setTimeout(() => { try { if (!proc.killed) proc.kill("SIGKILL"); } catch { /* ok */ } }, 3000);
|
||||
}, timeoutMs);
|
||||
|
||||
proc.prependListener("close", () => { clearTimeout(timeoutId); });
|
||||
proc.prependListener("error", () => { clearTimeout(timeoutId); });
|
||||
});
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// RenderResult factory — produces the common result rendering for both extensions
|
||||
// =============================================================================
|
||||
|
||||
export function renderClaudeResult(
|
||||
result: { details?: ClaudeDetails; isPartial?: boolean },
|
||||
theme: Theme,
|
||||
opts?: { showSession?: boolean }
|
||||
): Container | Text {
|
||||
const d = result.details as ClaudeDetails | undefined;
|
||||
if (!d) return new Text(theme.fg("muted", "…"), 0, 0);
|
||||
|
||||
const isDone = d.done && !result.isPartial;
|
||||
const icon = isDone ? theme.fg("success", "✓ ") : theme.fg("warning", "⏳ ");
|
||||
const resume = d.isResume ? theme.fg("dim", " ↩") : "";
|
||||
const c = new Container();
|
||||
|
||||
c.addChild(new Text(icon + theme.fg("toolTitle", theme.bold(d.label)) + resume, 0, 0));
|
||||
|
||||
for (const block of d.blocks ?? []) {
|
||||
if (block.type === "thinking" && block.text.trim()) {
|
||||
c.addChild(new Text(theme.fg("dim", theme.italic(block.text.trimEnd())), 0, 0));
|
||||
} else if (block.type === "tool") {
|
||||
c.addChild(renderToolBlock(block, theme));
|
||||
} else if (block.type === "text" && block.text.trim()) {
|
||||
c.addChild(new Spacer(1));
|
||||
if (isDone) {
|
||||
c.addChild(new Markdown(block.text.trim(), 0, 0, getMarkdownTheme()));
|
||||
} else {
|
||||
c.addChild(new Text(theme.fg("dim", block.text.trimEnd()), 0, 0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (isDone) {
|
||||
const usageLine = formatUsage(d);
|
||||
const parts: string[] = [];
|
||||
if (usageLine) parts.push(usageLine);
|
||||
if (opts?.showSession && d.sessionId) parts.push(`session:${d.sessionId.slice(0, 8)}`);
|
||||
if (parts.length > 0) {
|
||||
c.addChild(new Spacer(1));
|
||||
c.addChild(new Text(theme.fg("dim", parts.join(" ")), 0, 0));
|
||||
}
|
||||
}
|
||||
|
||||
return c;
|
||||
}
|
||||
201
pi/.pi/agent/shared/pi-ask-bridge.ts
Normal file
201
pi/.pi/agent/shared/pi-ask-bridge.ts
Normal file
@@ -0,0 +1,201 @@
|
||||
/**
|
||||
* pi-ask-bridge — Unix-domain socket server that pipes ask requests from
|
||||
* the pi-ask-mcp subprocess (running inside Claude CLI inside chat-claude)
|
||||
* into pi's native ask UI (askSingleQuestionWithInlineNote / askQuestionsWithTabs).
|
||||
*
|
||||
* Architecture:
|
||||
*
|
||||
* pi process
|
||||
* └── chat-claude extension
|
||||
* ├── AskBridge (here) — listens on $PI_ASK_SOCKET
|
||||
* └── claude -p ... --mcp-config <generated>
|
||||
* └── pi-ask-mcp/server.js
|
||||
* ↳ on tools/call ask → connects to $PI_ASK_SOCKET,
|
||||
* sends question, awaits answer
|
||||
*
|
||||
* Lifecycle: start one bridge per chat-claude session; close on exit.
|
||||
* Concurrency: pi's ui.custom overlay is modal, so asks are serialised
|
||||
* across all open connections via an internal promise chain.
|
||||
*
|
||||
* Wire format (NDJSON, one message per line):
|
||||
*
|
||||
* request → { id, type: "ask", questions: [{id, question, options[], multi?, recommended?}, ...] }
|
||||
* response ← { id, type: "result", results: [{id, selectedOptions[], customInput?}, ...] }
|
||||
* error ← { id, type: "error", message: "…" }
|
||||
*/
|
||||
|
||||
import { createServer, type Server as NetServer, type Socket } from "node:net";
|
||||
import { mkdtempSync, rmSync, writeFileSync } from "node:fs";
|
||||
import { join, dirname } from "node:path";
|
||||
import { tmpdir } from "node:os";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import type { ExtensionUIContext } from "@mariozechner/pi-coding-agent";
|
||||
import { askSingleQuestionWithInlineNote } from "../extensions/pi-ask-tool/ask-inline-ui.js";
|
||||
import { askQuestionsWithTabs } from "../extensions/pi-ask-tool/ask-tabs-ui.js";
|
||||
import type { AskQuestion, AskSelection } from "../extensions/pi-ask-tool/ask-logic.js";
|
||||
|
||||
// =============================================================================
|
||||
// Public API
|
||||
// =============================================================================
|
||||
|
||||
export interface AskBridge {
|
||||
/** Path to the generated --mcp-config JSON, suitable for `claude --mcp-config`. */
|
||||
mcpConfigPath: string;
|
||||
/** Absolute path to the underlying Unix socket (informational). */
|
||||
socketPath: string;
|
||||
/** How many ask requests this bridge has served so far. */
|
||||
count(): number;
|
||||
/** Stop accepting connections, remove socket + temp dir. Idempotent. */
|
||||
close(): void;
|
||||
}
|
||||
|
||||
export interface StartAskBridgeOptions {
|
||||
/** pi UI context (must come from an interactive session). */
|
||||
ui: ExtensionUIContext;
|
||||
/**
|
||||
* Absolute path to extensions/pi-ask-mcp/server.js. Auto-derived from
|
||||
* import.meta.url when omitted (assumes the conventional layout).
|
||||
*/
|
||||
mcpServerEntry?: string;
|
||||
/**
|
||||
* MCP server name surfaced in the tool prefix. Defaults to "pi", which
|
||||
* makes the tool name `mcp__pi__ask` in Claude's tool stream.
|
||||
*/
|
||||
serverName?: string;
|
||||
/** Optional notification fired whenever a new ask is served. */
|
||||
onAsk?: (totalSoFar: number) => void;
|
||||
}
|
||||
|
||||
export function startAskBridge(opts: StartAskBridgeOptions): AskBridge {
|
||||
const dir = mkdtempSync(join(tmpdir(), "pi-ask-")); // 0700 perms
|
||||
const sock = join(dir, "ask.sock");
|
||||
let askCount = 0;
|
||||
let closed = false;
|
||||
|
||||
const server: NetServer = createServer((conn) =>
|
||||
handleConnection(conn, opts, () => {
|
||||
askCount += 1;
|
||||
opts.onAsk?.(askCount);
|
||||
return askCount;
|
||||
}),
|
||||
);
|
||||
server.on("error", () => { /* socket disappeared, etc. — bridge is single-tenant, ignore */ });
|
||||
server.listen(sock);
|
||||
|
||||
const mcpEntry = opts.mcpServerEntry ?? defaultMcpEntry();
|
||||
const serverName = opts.serverName ?? "pi";
|
||||
const cfgPath = join(dir, "mcp.json");
|
||||
writeFileSync(cfgPath, JSON.stringify({
|
||||
mcpServers: {
|
||||
[serverName]: {
|
||||
command: "node",
|
||||
args: [mcpEntry],
|
||||
env: { PI_ASK_SOCKET: sock },
|
||||
},
|
||||
},
|
||||
}));
|
||||
|
||||
return {
|
||||
socketPath: sock,
|
||||
mcpConfigPath: cfgPath,
|
||||
count: () => askCount,
|
||||
close: () => {
|
||||
if (closed) return;
|
||||
closed = true;
|
||||
try { server.close(); } catch { /* noop */ }
|
||||
try { rmSync(dir, { recursive: true, force: true }); } catch { /* noop */ }
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Internals
|
||||
// =============================================================================
|
||||
|
||||
function defaultMcpEntry(): string {
|
||||
// shared/pi-ask-bridge.ts → ../extensions/pi-ask-mcp/server.js
|
||||
const here = dirname(fileURLToPath(import.meta.url));
|
||||
return join(here, "..", "extensions", "pi-ask-mcp", "server.js");
|
||||
}
|
||||
|
||||
// pi.ui.custom is modal — only one overlay can be on screen at a time.
|
||||
// Serialise asks across ALL connections via this single promise chain.
|
||||
let askChain: Promise<unknown> = Promise.resolve();
|
||||
|
||||
function handleConnection(
|
||||
conn: Socket,
|
||||
opts: StartAskBridgeOptions,
|
||||
bumpCount: () => number,
|
||||
) {
|
||||
let buf = "";
|
||||
conn.on("data", (data) => {
|
||||
buf += data.toString();
|
||||
let nl = buf.indexOf("\n");
|
||||
while (nl >= 0) {
|
||||
const line = buf.slice(0, nl);
|
||||
buf = buf.slice(nl + 1);
|
||||
handleLine(line, conn, opts, bumpCount);
|
||||
nl = buf.indexOf("\n");
|
||||
}
|
||||
});
|
||||
conn.on("error", () => { /* peer might disappear if Claude is killed mid-flight */ });
|
||||
}
|
||||
|
||||
function handleLine(
|
||||
line: string,
|
||||
conn: Socket,
|
||||
opts: StartAskBridgeOptions,
|
||||
bumpCount: () => number,
|
||||
) {
|
||||
if (!line.trim()) return;
|
||||
let msg: any;
|
||||
try { msg = JSON.parse(line); } catch { return; }
|
||||
if (msg.type !== "ask") return;
|
||||
const id = String(msg.id ?? "");
|
||||
const questions = Array.isArray(msg.questions) ? (msg.questions as AskQuestion[]) : [];
|
||||
|
||||
askChain = askChain.then(async () => {
|
||||
bumpCount();
|
||||
try {
|
||||
const results = await askViaPiUI(opts.ui, questions);
|
||||
writeReply(conn, { id, type: "result", results });
|
||||
} catch (err) {
|
||||
writeReply(conn, {
|
||||
id,
|
||||
type: "error",
|
||||
message: err instanceof Error ? err.message : String(err),
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function writeReply(conn: Socket, msg: unknown) {
|
||||
try { conn.write(JSON.stringify(msg) + "\n"); conn.end(); } catch { /* gone */ }
|
||||
}
|
||||
|
||||
interface QuestionResult {
|
||||
id: string;
|
||||
selectedOptions: string[];
|
||||
customInput?: string;
|
||||
}
|
||||
|
||||
async function askViaPiUI(
|
||||
ui: ExtensionUIContext,
|
||||
questions: AskQuestion[],
|
||||
): Promise<QuestionResult[]> {
|
||||
if (questions.length === 0) return [];
|
||||
|
||||
if (questions.length === 1 && !questions[0].multi) {
|
||||
const sel: AskSelection = await askSingleQuestionWithInlineNote(ui, questions[0]);
|
||||
return [toResult(questions[0], sel)];
|
||||
}
|
||||
|
||||
const tab = await askQuestionsWithTabs(ui, questions);
|
||||
return questions.map((q, i) => toResult(q, tab.selections[i] ?? { selectedOptions: [] }));
|
||||
}
|
||||
|
||||
function toResult(q: AskQuestion, sel: AskSelection): QuestionResult {
|
||||
const out: QuestionResult = { id: q.id, selectedOptions: [...sel.selectedOptions] };
|
||||
if (sel.customInput) out.customInput = sel.customInput;
|
||||
return out;
|
||||
}
|
||||
Reference in New Issue
Block a user