Layer 2 MCP Watcher v0 scaffold

Per spec/agent-watcher.md §4. TypeScript/Node implementation living in
mcp-watcher/ subdirectory, parallel to Layer 1 Collector at repo root.

What lands:
- Core MCP server (src/server.ts) with experimental['claude/channel']:{}
  + tools:{} capability declarations, stdio transport, channel-event
  notifier wired through the inbox watcher.
- Identity resolution mirroring agent-ping's layered model
  (PING_AGENT_IDENTITY env, $CLAUDE_HOME/ping-agent, ~/.ping-agent).
- Inbox reader with HWM tracking, sentinel deferral (warn-after-3),
  atomic HWM writes via tmp+rename.
- chokidar-backed file watcher with coalesced drain, urgent-first
  ordering, recentEvents map for tool sender lookup.
- Three reply tools (ack / respond / mark_handled) with cross-host
  write discipline (writes to local inbox files; Syncthing replicates).
- Sentinel file (.<agent>.watcher-active) for hook coexistence per
  spec §4.3 — agent-ping hook stands down when the watcher is in
  charge of delivery on this host. Sentinel + hwm in .stignore.
- 35 unit tests passing (vitest): inbox parsing, HWM round-trip,
  sentinel deferral semantics, identity layers, tool I/O, watcher
  drain + ordering + restart-from-hwm.
- install.sh (Angus-executed, rule-2 compliant) installs deps,
  builds, symlinks ~/.local/bin/agent-watcher-mcp, prints mcp.json
  registration snippet for paste.
- README documents launch flag, sandbox CLAUDE_HOME pattern,
  hook coexistence, observability, v2 limitations.

Not yet:
- Integration test against a real Claude Code session — gated on
  Angus spinning up a sandbox CC session on the VPS with
  CLAUDE_HOME=~/.claude-sandbox.
- agent-ping hook update to read the sentinel and stand down.
  Separate small PR against agent-ping.

Interface contract with Layer 1: the inbox JSONL line shape from
inbox.ts::PingEvent matches inbox.Event in the Collector — bit-
identical reads regardless of source.
This commit is contained in:
bob 2026-05-06 17:44:57 -03:00
parent 3eda72df28
commit c22558c67a
18 changed files with 4786 additions and 0 deletions

View file

@ -0,0 +1,57 @@
// Identity resolution for the MCP Watcher.
//
// Mirrors the layered resolution shipped in agent-ping (PR #1). The CLI
// has four layers (--as flag, env, $CLAUDE_HOME/ping-agent, ~/.ping-agent);
// the watcher subprocess has three (no argv override useful in MCP context).
//
// Spec: agent-ping spec §9; agent-watcher spec §4.4.
import { homedir } from "node:os";
import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path";
export interface ResolvedIdentity {
agent: string;
source: string;
}
/**
* Resolve this process's agent identity.
* Layers, first match wins:
* 1. PING_AGENT_IDENTITY env var
* 2. $CLAUDE_HOME/ping-agent (if CLAUDE_HOME is set and file exists)
* 3. ~/.ping-agent
*
* Throws if no layer resolves.
*/
export function resolveIdentity(env: NodeJS.ProcessEnv = process.env): ResolvedIdentity {
const envId = (env.PING_AGENT_IDENTITY ?? "").trim();
if (envId) {
return { agent: envId, source: "PING_AGENT_IDENTITY env var" };
}
const claudeHome = (env.CLAUDE_HOME ?? "").trim();
if (claudeHome) {
const f = join(claudeHome, "ping-agent");
if (existsSync(f)) {
const name = readFileSync(f, "utf8").trim();
if (name) {
return { agent: name, source: f };
}
}
}
const home = env.HOME ?? homedir();
const defaultFile = join(home, ".ping-agent");
if (existsSync(defaultFile)) {
const name = readFileSync(defaultFile, "utf8").trim();
if (name) {
return { agent: name, source: defaultFile };
}
}
throw new Error(
`agent-watcher-mcp: no identity resolved. Set PING_AGENT_IDENTITY, ` +
`$CLAUDE_HOME/ping-agent, or ${defaultFile}.`,
);
}

147
mcp-watcher/src/inbox.ts Normal file
View file

@ -0,0 +1,147 @@
// Inbox reader + HWM tracker.
//
// JSONL inbox files are produced by the agent-ping CLI and the Layer 1
// Collector. This module reads new lines since the last delivered ts,
// applies sentinel-deferral logic (warn-after-3 per spec §6 of agent-ping
// + §4.1 of agent-watcher), and atomically advances the HWM.
//
// The HWM file is local-only (not Syncthing-replicated; in .stignore).
import { existsSync, readFileSync, writeFileSync, renameSync, mkdirSync } from "node:fs";
import { dirname } from "node:path";
export interface PingEvent {
ts: string;
id: string;
from: string;
to: string;
type: string;
priority?: string;
payload: string;
sentinel?: string;
source?: string; // collector-only debug field
}
export interface HwmState {
last_delivered_ts: string;
pending_attempts: Record<string, number>;
}
export const DEFER_LIMIT = 3;
export function readInbox(path: string): PingEvent[] {
if (!existsSync(path)) return [];
const body = readFileSync(path, "utf8");
const out: PingEvent[] = [];
for (const raw of body.split("\n")) {
const line = raw.trim();
if (!line) continue;
try {
const parsed = JSON.parse(line);
if (parsed && typeof parsed === "object" && parsed.id && parsed.ts) {
out.push(parsed as PingEvent);
}
} catch {
// Skip malformed lines; logged at the call site.
}
}
return out;
}
export function readHwm(path: string): HwmState {
if (!existsSync(path)) {
return { last_delivered_ts: "", pending_attempts: {} };
}
try {
const parsed = JSON.parse(readFileSync(path, "utf8"));
return {
last_delivered_ts: parsed.last_delivered_ts ?? "",
pending_attempts: parsed.pending_attempts ?? {},
};
} catch {
return { last_delivered_ts: "", pending_attempts: {} };
}
}
/** Atomic write via tmp-file + rename. Avoids torn writes if the process dies mid-write. */
export function writeHwm(path: string, state: HwmState): void {
mkdirSync(dirname(path), { recursive: true });
const tmp = path + ".tmp";
writeFileSync(tmp, JSON.stringify(state) + "\n", "utf8");
renameSync(tmp, path);
}
export interface Deliverable {
event: PingEvent;
/** Prefix prepended to the payload when sentinel was missing on the 3rd attempt. */
warning?: string;
}
export interface ReadResult {
/** Events ready to deliver (HWM should advance to cover these). */
deliverable: Deliverable[];
/** Events deferred this round; HWM should NOT advance past them yet. */
deferred: PingEvent[];
/** Updated HWM state to write after delivery succeeds. */
nextHwm: HwmState;
}
/**
* Read new pings since the HWM, applying sentinel deferral.
*
* `sentinelExists` is injected so tests don't need real files. In production
* pass `(p) => existsSync(p)` from the caller.
*/
export function unreadSinceHwm(
events: PingEvent[],
hwm: HwmState,
sentinelExists: (path: string) => boolean,
): ReadResult {
const deliverable: Deliverable[] = [];
const deferred: PingEvent[] = [];
const nextAttempts: Record<string, number> = { ...hwm.pending_attempts };
for (const ev of events) {
if (hwm.last_delivered_ts && ev.ts <= hwm.last_delivered_ts) continue;
if (ev.sentinel && !sentinelExists(ev.sentinel)) {
const attempts = (nextAttempts[ev.id] ?? 0) + 1;
nextAttempts[ev.id] = attempts;
if (attempts < DEFER_LIMIT) {
deferred.push(ev);
continue;
}
// Third attempt — deliver with warning.
deliverable.push({
event: ev,
warning: `referenced file ${ev.sentinel} hasn't synced after ${attempts} attempts`,
});
delete nextAttempts[ev.id];
continue;
}
deliverable.push({ event: ev });
delete nextAttempts[ev.id];
}
// HWM advances to cover the highest-ts deliverable, but NOT past any deferred event.
// Otherwise we'd skip past deferred items on next read.
const minDeferredTs = deferred.length > 0
? deferred.reduce((m, e) => (e.ts < m ? e.ts : m), deferred[0].ts)
: null;
let nextTs = hwm.last_delivered_ts;
for (const d of deliverable) {
if (minDeferredTs && d.event.ts >= minDeferredTs) continue;
if (d.event.ts > nextTs) nextTs = d.event.ts;
}
return {
deliverable,
deferred,
nextHwm: {
last_delivered_ts: nextTs,
pending_attempts: nextAttempts,
},
};
}

33
mcp-watcher/src/paths.ts Normal file
View file

@ -0,0 +1,33 @@
// Path helpers — single source of truth for where files live.
//
// All paths derive from $HOME (or os.homedir() fallback) and the agent's
// resolved identity. Centralizing them here keeps inbox, hwm, sentinel,
// and ack-log paths consistent across modules and trivially mockable in
// tests by overriding `home`.
import { homedir } from "node:os";
import { join } from "node:path";
export interface Paths {
workspace: string;
pingsDir: string;
acksLog: string;
agentsFile: string;
inbox(agent: string): string;
hwm(agent: string): string;
watcherActive(agent: string): string;
}
export function makePaths(home: string = process.env.HOME ?? homedir()): Paths {
const workspace = join(home, "Nyx", "workspace");
const pingsDir = join(workspace, "pings");
return {
workspace,
pingsDir,
acksLog: join(workspace, "acks", "log"),
agentsFile: join(pingsDir, ".agents"),
inbox: (agent: string) => join(pingsDir, `${agent}.inbox`),
hwm: (agent: string) => join(pingsDir, `.${agent}.hwm`),
watcherActive: (agent: string) => join(pingsDir, `.${agent}.watcher-active`),
};
}

View file

@ -0,0 +1,41 @@
// Sentinel file: signals the agent-ping UserPromptSubmit hook to stand
// down while the watcher is delivering pings via Channels (spec §4.3).
//
// The file lives at `pings/.<agent>.watcher-active` and is local-only
// (must be in .stignore — otherwise one host's watcher silences another
// host's hook).
//
// Lifecycle:
// - On startup: write the file with the current PID + timestamp.
// - On graceful shutdown (SIGINT/SIGTERM): remove it.
// - On crash: file is left behind. The hook checks file age + PID
// liveness to ignore stale sentinels. (See agent-ping hook update.)
import { writeFileSync, unlinkSync, mkdirSync, existsSync } from "node:fs";
import { dirname } from "node:path";
export interface SentinelHandle {
release(): void;
}
export function claimSentinel(path: string): SentinelHandle {
mkdirSync(dirname(path), { recursive: true });
const body = JSON.stringify({
pid: process.pid,
started_at: new Date().toISOString(),
});
writeFileSync(path, body + "\n", "utf8");
let released = false;
return {
release() {
if (released) return;
released = true;
try {
if (existsSync(path)) unlinkSync(path);
} catch {
// best-effort
}
},
};
}

135
mcp-watcher/src/server.ts Normal file
View file

@ -0,0 +1,135 @@
#!/usr/bin/env node
// agent-watcher-mcp — Layer 2 of the agent-watcher system.
//
// An MCP server (stdio transport) that:
// 1. Watches the local agent's ping inbox file via inotify (chokidar).
// 2. Surfaces unread pings into the Claude Code session as channel events
// (notifications/claude/channel) per the channels-reference.
// 3. Exposes three reply tools (ack / respond / mark_handled) so Claude
// can react.
// 4. Writes a `.watcher-active` sentinel so the existing agent-ping
// UserPromptSubmit hook stands down while the watcher is in charge
// of delivery.
//
// Spec: ../spec/agent-watcher.md §4.
//
// Launch (Claude Code v2.1.80+, research preview):
// claude --dangerously-load-development-channels server:agent-watcher
//
// mcp.json registration:
// { "mcpServers": { "agent-watcher": { "command": "agent-watcher-mcp" } } }
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
ListToolsRequestSchema,
CallToolRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { resolveIdentity } from "./identity.js";
import { makePaths } from "./paths.js";
import { InboxWatcher } from "./watcher.js";
import { claimSentinel } from "./sentinel.js";
import { toolDefinitions, callTool } from "./tools.js";
import type { PingEvent } from "./inbox.js";
const SERVER_NAME = "agent-watcher";
const SERVER_VERSION = "0.1.0";
const INSTRUCTIONS = [
"Pings from other agents arrive as <channel source=\"agent-watcher\" ...> events.",
"Each event has these attributes: ping_id, sender, type, priority, ts.",
"Treat them as priority context. Respond patterns:",
"- type=NEEDS-RESPONSE → call the `respond` tool with the ping_id and a payload",
"- type=ACK-REQUEST → call the `ack` tool with the ping_id (no content reply needed)",
"- type=INFO → call `mark_handled` once you've factored it in (or call ack to record it)",
"- priority=urgent events render first regardless of timestamp",
"If a ping body begins with \"[WARNING: referenced file ... hasn't synced]\" the referenced",
"artifact wasn't on disk when the watcher delivered. Acknowledge but flag the missing file.",
].join("\n");
async function main(): Promise<void> {
const id = resolveIdentity();
const paths = makePaths();
const mcp = new Server(
{ name: SERVER_NAME, version: SERVER_VERSION },
{
capabilities: {
experimental: { "claude/channel": {} },
tools: {},
},
instructions: INSTRUCTIONS,
},
);
const watcher = new InboxWatcher({
paths,
agent: id.agent,
notify: async (event: PingEvent, warning?: string) => {
const content = warning
? `[WARNING: ${warning}]\n${event.payload}`
: event.payload;
// meta keys must be identifier-safe (no hyphens). Map ping fields:
// - keep ping_id, sender, ts, priority verbatim
// - rename `type` → `ping_type` so it doesn't collide with anything
// the runtime might reserve
const meta: Record<string, string> = {
ping_id: event.id,
sender: event.from,
ping_type: event.type,
ts: event.ts,
};
if (event.priority) meta.priority = event.priority;
if (event.sentinel) meta.sentinel = sanitizeForMeta(event.sentinel);
await mcp.notification({
method: "notifications/claude/channel",
params: { content, meta },
});
},
});
mcp.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: toolDefinitions(),
}));
mcp.setRequestHandler(CallToolRequestSchema, async (req) => {
const args = (req.params.arguments ?? {}) as Record<string, unknown>;
const result = callTool(req.params.name, args, {
paths,
agent: id.agent,
recentEvents: () => watcher.recentEvents(),
});
if (!result.ok) {
throw new Error(result.text);
}
return { content: [{ type: "text", text: result.text }] };
});
// Sentinel: signal the agent-ping hook to stand down on this host.
const sentinel = claimSentinel(paths.watcherActive(id.agent));
const cleanup = () => sentinel.release();
process.on("SIGINT", () => { cleanup(); process.exit(0); });
process.on("SIGTERM", () => { cleanup(); process.exit(0); });
process.on("exit", cleanup);
// Connect stdio first so notifications during the initial drain land cleanly.
const transport = new StdioServerTransport();
await mcp.connect(transport);
// Now drain + start watching.
await watcher.start();
}
/** Channel meta values must be identifier-safe strings. Replace anything weird. */
function sanitizeForMeta(s: string): string {
return s.replace(/[^A-Za-z0-9_/.:\-]/g, "_");
}
main().catch((err: unknown) => {
const msg = err instanceof Error ? err.message : String(err);
process.stderr.write(`agent-watcher-mcp: fatal: ${msg}\n`);
process.exit(1);
});

196
mcp-watcher/src/tools.ts Normal file
View file

@ -0,0 +1,196 @@
// Reply tools exposed by the MCP server: ack, respond, mark_handled.
//
// These are standard MCP tools per the channels-reference. Claude calls them
// to send signals back to the originating sender or to mark a ping as handled
// without a content reply.
//
// Cross-host write discipline (spec §4.2): the tools always write to the
// local filesystem. Syncthing replicates outward. They never write directly
// to a remote-host path.
import { appendFileSync, mkdirSync, existsSync, writeFileSync, renameSync, readdirSync } from "node:fs";
import { dirname } from "node:path";
import { randomBytes } from "node:crypto";
import type { Paths } from "./paths.js";
import type { PingEvent } from "./inbox.js";
const VALID_TYPES = new Set(["INFO", "NEEDS-RESPONSE", "ACK-REQUEST"]);
export interface ToolDeps {
paths: Paths;
agent: string;
/** All pings the watcher has seen; used to look up ping_id → original sender. */
recentEvents: () => Map<string, PingEvent>;
}
/** JSON Schema for the three tools. Returned by ListToolsRequestSchema. */
export function toolDefinitions() {
return [
{
name: "ack",
description:
"Acknowledge receipt of a ping. Writes a delivery record to the shared acks log. " +
"Use for ACK-REQUEST type pings, or when you want to confirm a NEEDS-RESPONSE was seen " +
"before composing a longer reply.",
inputSchema: {
type: "object",
properties: {
ping_id: { type: "string", description: "The ping id (from the <channel ping_id=...> attribute)." },
message: { type: "string", description: "Optional short message to record alongside the ack." },
},
required: ["ping_id"],
},
},
{
name: "respond",
description:
"Send a response ping back to the original sender. Writes a JSONL line to <sender>.inbox. " +
"Use for content replies to NEEDS-RESPONSE pings.",
inputSchema: {
type: "object",
properties: {
ping_id: { type: "string", description: "The original ping id this is responding to." },
payload: { type: "string", description: "The response message." },
type: {
type: "string",
enum: ["INFO", "NEEDS-RESPONSE", "ACK-REQUEST"],
description: "Type of the outgoing ping. Default INFO.",
},
priority: {
type: "string",
enum: ["normal", "urgent"],
description: "Outgoing priority. Default normal.",
},
},
required: ["ping_id", "payload"],
},
},
{
name: "mark_handled",
description:
"Mark a ping as processed without sending a reply. Records an ack with kind=delivered. " +
"Use for INFO pings you've factored into your reasoning.",
inputSchema: {
type: "object",
properties: {
ping_id: { type: "string", description: "The ping id to mark handled." },
},
required: ["ping_id"],
},
},
];
}
export interface CallResult {
ok: boolean;
text: string;
}
export function callTool(name: string, args: Record<string, unknown>, deps: ToolDeps): CallResult {
switch (name) {
case "ack":
return doAck(args as { ping_id: string; message?: string }, deps);
case "respond":
return doRespond(
args as { ping_id: string; payload: string; type?: string; priority?: string },
deps,
);
case "mark_handled":
return doMarkHandled(args as { ping_id: string }, deps);
default:
return { ok: false, text: `unknown tool: ${name}` };
}
}
function lookupOriginalSender(ping_id: string, deps: ToolDeps): string | null {
const ev = deps.recentEvents().get(ping_id);
return ev?.from ?? null;
}
function writeAck(deps: ToolDeps, entry: Record<string, unknown>): void {
mkdirSync(dirname(deps.paths.acksLog), { recursive: true });
appendFileSync(deps.paths.acksLog, JSON.stringify(entry) + "\n", "utf8");
}
function doAck(args: { ping_id: string; message?: string }, deps: ToolDeps): CallResult {
if (!args.ping_id) return { ok: false, text: "ping_id required" };
const sender = lookupOriginalSender(args.ping_id, deps);
const entry: Record<string, unknown> = {
ts: nowUtc(),
acked_by: deps.agent,
ping_id: args.ping_id,
original_sender: sender,
ack_kind: "delivered",
};
if (args.message) entry.message = args.message;
const ev = deps.recentEvents().get(args.ping_id);
if (ev) entry.original_type = ev.type;
writeAck(deps, entry);
return { ok: true, text: `acked ${args.ping_id}` };
}
function doRespond(
args: { ping_id: string; payload: string; type?: string; priority?: string },
deps: ToolDeps,
): CallResult {
if (!args.ping_id) return { ok: false, text: "ping_id required" };
if (!args.payload) return { ok: false, text: "payload required" };
const recipient = lookupOriginalSender(args.ping_id, deps);
if (!recipient) {
return {
ok: false,
text: `cannot respond: ping ${args.ping_id} not found in recent events. ` +
`It may have been delivered before this watcher started.`,
};
}
const type = (args.type ?? "INFO").toUpperCase();
if (!VALID_TYPES.has(type)) {
return { ok: false, text: `invalid type ${args.type}; want INFO|NEEDS-RESPONSE|ACK-REQUEST` };
}
const priority = args.priority ?? "normal";
if (priority !== "normal" && priority !== "urgent") {
return { ok: false, text: `invalid priority ${priority}; want normal|urgent` };
}
const newEvent: PingEvent = {
ts: nowUtc(),
id: "ping-" + randomBytes(4).toString("hex"),
from: deps.agent,
to: recipient,
type,
priority,
payload: args.payload,
};
const target = deps.paths.inbox(recipient);
appendInboxAtomic(target, newEvent);
// Also log the ack so the original sender's `acks/log` reflects "responded".
writeAck(deps, {
ts: nowUtc(),
acked_by: deps.agent,
ping_id: args.ping_id,
original_sender: recipient,
ack_kind: "responded",
response_id: newEvent.id,
});
return { ok: true, text: `responded to ${args.ping_id}${recipient}.inbox (${newEvent.id})` };
}
function doMarkHandled(args: { ping_id: string }, deps: ToolDeps): CallResult {
if (!args.ping_id) return { ok: false, text: "ping_id required" };
return doAck({ ping_id: args.ping_id, message: "marked handled, no reply needed" }, deps);
}
function appendInboxAtomic(path: string, ev: PingEvent): void {
mkdirSync(dirname(path), { recursive: true });
// O_APPEND on POSIX guarantees atomicity for writes <= PIPE_BUF (4 KiB).
// Our JSONL line is well under that, so a plain appendFileSync is safe
// even if other processes (ping CLI, Collector) are also appending.
appendFileSync(path, JSON.stringify(ev) + "\n", "utf8");
}
function nowUtc(): string {
return new Date().toISOString().replace(/\.\d{3}Z$/, "Z");
}

147
mcp-watcher/src/watcher.ts Normal file
View file

@ -0,0 +1,147 @@
// Inbox watcher — wraps chokidar on the inbox file and emits channel
// notifications via the supplied notifier callback.
//
// Behaviour:
// 1. On startup: read the inbox, drain anything since the HWM, emit
// pending pings.
// 2. Set up chokidar on the inbox file. On change, re-read, emit new lines.
// 3. Sentinel-deferred pings remain in the inbox; on each change-event
// we re-evaluate them. The HWM file does NOT advance past them.
//
// The notifier is the MCP server's `mcp.notification()` wrapped to take a
// PingEvent + optional warning. Decoupled so this module is testable
// without spinning up an MCP transport.
import chokidar, { type FSWatcher } from "chokidar";
import { existsSync } from "node:fs";
import type { Paths } from "./paths.js";
import {
readInbox,
readHwm,
writeHwm,
unreadSinceHwm,
type Deliverable,
type HwmState,
type PingEvent,
} from "./inbox.js";
export type Notifier = (event: PingEvent, warning?: string) => Promise<void>;
export interface WatcherOptions {
paths: Paths;
agent: string;
notify: Notifier;
/** Override for testability. Defaults to fs.existsSync. */
sentinelExists?: (p: string) => boolean;
/** Called on every drain after writeHwm; useful for tests + telemetry. */
onDrain?: (result: { delivered: number; deferred: number }) => void;
/** Override for testability — chokidar in prod, mock in tests. */
startWatcher?: (path: string, onChange: () => void) => FSWatcher | { close(): Promise<void> };
}
export class InboxWatcher {
private opts: WatcherOptions;
private fsWatcher: FSWatcher | { close(): Promise<void> } | null = null;
/** All seen events keyed by id, used by the respond tool to look up sender. */
private seen = new Map<string, PingEvent>();
private draining = false;
private pendingDrain = false;
constructor(opts: WatcherOptions) {
this.opts = opts;
}
async start(): Promise<void> {
// Initial drain
await this.drain();
const inboxPath = this.opts.paths.inbox(this.opts.agent);
const start = this.opts.startWatcher ?? defaultChokidar;
this.fsWatcher = start(inboxPath, () => {
void this.drain();
});
}
async stop(): Promise<void> {
if (this.fsWatcher) {
await this.fsWatcher.close();
this.fsWatcher = null;
}
}
/** Snapshot of seen events. Used by the respond tool. */
recentEvents(): Map<string, PingEvent> {
return this.seen;
}
/** Drain new pings since HWM. Coalesces back-to-back fires. */
private async drain(): Promise<void> {
if (this.draining) {
this.pendingDrain = true;
return;
}
this.draining = true;
try {
do {
this.pendingDrain = false;
await this.drainOnce();
} while (this.pendingDrain);
} finally {
this.draining = false;
}
}
private async drainOnce(): Promise<void> {
const inboxPath = this.opts.paths.inbox(this.opts.agent);
const hwmPath = this.opts.paths.hwm(this.opts.agent);
const events = readInbox(inboxPath);
// Track every seen event so respond() can look them up.
for (const ev of events) this.seen.set(ev.id, ev);
const hwm = readHwm(hwmPath);
const sentinelExists = this.opts.sentinelExists ?? ((p: string) => existsSync(p));
const result = unreadSinceHwm(events, hwm, sentinelExists);
// Emit normal-priority pings in ts order; urgent first.
const sorted: Deliverable[] = [...result.deliverable].sort((a, b) => {
const aUrg = a.event.priority === "urgent" ? 0 : 1;
const bUrg = b.event.priority === "urgent" ? 0 : 1;
if (aUrg !== bUrg) return aUrg - bUrg;
return a.event.ts.localeCompare(b.event.ts);
});
for (const d of sorted) {
await this.opts.notify(d.event, d.warning);
}
// Persist HWM regardless of deferred count — pending_attempts changes too.
if (sorted.length > 0 || hwmDifferent(hwm, result.nextHwm)) {
writeHwm(hwmPath, result.nextHwm);
}
this.opts.onDrain?.({ delivered: sorted.length, deferred: result.deferred.length });
}
}
function hwmDifferent(a: HwmState, b: HwmState): boolean {
if (a.last_delivered_ts !== b.last_delivered_ts) return true;
const ak = Object.keys(a.pending_attempts);
const bk = Object.keys(b.pending_attempts);
if (ak.length !== bk.length) return true;
for (const k of ak) {
if (a.pending_attempts[k] !== b.pending_attempts[k]) return true;
}
return false;
}
function defaultChokidar(path: string, onChange: () => void): FSWatcher {
const w = chokidar.watch(path, {
persistent: true,
ignoreInitial: true,
awaitWriteFinish: { stabilityThreshold: 100, pollInterval: 50 },
});
w.on("add", onChange);
w.on("change", onChange);
return w;
}