Compare commits

..

No commits in common. "main" and "foreman/collector-scaffold" have entirely different histories.

18 changed files with 0 additions and 4819 deletions

View file

@ -1,5 +0,0 @@
node_modules/
dist/
*.tsbuildinfo
.DS_Store
coverage/

View file

@ -1,98 +0,0 @@
# agent-watcher-mcp (Layer 2)
The MCP Watcher: a Claude Code stdio MCP server that surfaces ping-inbox
events into your Claude Code session via Channels, and exposes reply tools
(`ack` / `respond` / `mark_handled`) so Claude can react.
Spec: `../spec/agent-watcher.md` §4.
## What it does
1. Resolves identity (env / `$CLAUDE_HOME/ping-agent` / `~/.ping-agent`).
2. Writes `pings/.<agent>.watcher-active` so the existing agent-ping
`UserPromptSubmit` hook stands down on this host.
3. Watches `pings/<agent>.inbox` via inotify (chokidar).
4. On change, drains unread pings (since HWM), applies sentinel
deferral (warn-after-3), and emits each as a
`notifications/claude/channel` event.
5. Exposes three MCP tools so Claude can ack, respond, or mark a ping
handled.
6. Removes the sentinel on graceful shutdown.
## Install
Per CLAUDE.md rule #2, the agent does not install on itself. A human
runs:
```bash
cd /path/to/agent-watcher/mcp-watcher
./install.sh
```
This installs deps, builds, symlinks the binary into `~/.local/bin/`,
adds `.stignore` patterns, and prints the `mcp.json` snippet to paste
into Claude Code's config.
## Launch
Channels is in research preview. Start Claude Code with:
```bash
claude --dangerously-load-development-channels server:agent-watcher
```
## Sandbox testing
To run a separate Claude Code session with a different identity (so it
won't compete with the default `~/.ping-agent` for inbox reads):
```bash
mkdir -p ~/.claude-sandbox
echo sandbox > ~/.claude-sandbox/ping-agent
CLAUDE_CONFIG_DIR=~/.claude-sandbox \
claude --dangerously-load-development-channels server:agent-watcher
```
The sandbox session reads `pings/sandbox.inbox` and writes
`pings/.sandbox.watcher-active` — fully isolated from prod.
## How it interacts with agent-ping
The `agent-ping` UserPromptSubmit hook checks for the sentinel file
`pings/.<agent>.watcher-active` at startup. If the file is present and
the PID inside is alive, the hook stands down — the watcher is the
delivery primitive. If the watcher exits (graceful or crash with stale
sentinel cleared by the hook's age check), the hook resumes
async-on-next-prompt delivery.
This means **you can run with the watcher OR the hook OR both
configured** — the sentinel arbitrates.
## Tests
```bash
npm test # unit tests via vitest (35 currently passing)
npm run typecheck # tsc --noEmit
npm run build # tsc → dist/
```
## Observability
- Logs to stderr (visible via `claude --debug` or the Claude Code MCP
debug log at `~/.claude/debug/<session-id>.txt`).
- Sentinel file content includes PID + start time for the hook's
age/liveness check.
## Limitations / v2
- Sentinel hook coexistence requires the agent-ping hook to know how
to read the sentinel. PR pending against `agent-ping` to add the
check.
- No reconnection / restart on Claude Code session restart — Claude
Code spawns the subprocess anew each session, so the watcher
re-drains from HWM cleanly.
- One watcher process per agent identity. Two sessions with the same
identity contending on the same inbox is undefined behaviour
(use `CLAUDE_HOME` to scope identities).
- Channels is research preview; if the API changes, expect to update
the meta-key sanitization or notification shape.

View file

@ -1,118 +0,0 @@
#!/usr/bin/env bash
# install.sh — set up the agent-watcher MCP Watcher (Layer 2) on this host.
#
# Usage:
# ./install.sh # install: npm ci, build, link binary, print mcp.json snippet
# ./install.sh --no-mcp-json # everything but the mcp.json snippet (you handle registration)
#
# What it does:
# 1. Verifies Node 18+ is available.
# 2. Installs dependencies (npm ci) and builds (tsc).
# 3. Symlinks dist/server.js to ~/.local/bin/agent-watcher-mcp (chmod +x).
# 4. Adds .stignore patterns for the local-only watcher-active sentinel.
# 5. Prints the mcp.json snippet for paste into Claude Code's config.
#
# Per CLAUDE.md rule #2, this is intended to be run by a human. The MCP
# Watcher itself, like agent-ping, never invokes this script.
#
# Layer 1 (Collector) has its own install path. See the repo root install.sh.
set -euo pipefail
NO_MCP_JSON=0
for arg in "$@"; do
case "$arg" in
--no-mcp-json) NO_MCP_JSON=1 ;;
*) echo "unknown arg: $arg" >&2; exit 2 ;;
esac
done
REPO_DIR="$(cd "$(dirname "$0")" && pwd)"
BIN_DIR="$HOME/.local/bin"
WORKSPACE="$HOME/Nyx/workspace"
STIGNORE="$WORKSPACE/.stignore"
echo "agent-watcher-mcp install"
echo "repo: $REPO_DIR"
echo
# 1. Node check
echo "[1/5] checking Node"
if ! command -v node >/dev/null 2>&1; then
echo " ERROR: 'node' not found. Install Node 18+ first." >&2
exit 1
fi
NODE_MAJOR=$(node --version | sed 's/^v//' | cut -d. -f1)
if [ "$NODE_MAJOR" -lt 18 ]; then
echo " ERROR: Node 18+ required; found $(node --version)." >&2
exit 1
fi
echo " $(node --version)"
# 2. Install deps + build
echo "[2/5] npm ci + build"
( cd "$REPO_DIR" && npm ci --silent && npx tsc )
echo " built: $REPO_DIR/dist/server.js"
# 3. Binary symlink
echo "[3/5] linking binary"
mkdir -p "$BIN_DIR"
ln -sf "$REPO_DIR/dist/server.js" "$BIN_DIR/agent-watcher-mcp"
chmod +x "$REPO_DIR/dist/server.js"
echo " linked: $BIN_DIR/agent-watcher-mcp -> $REPO_DIR/dist/server.js"
# 4. .stignore — sentinel + hwm files are local-only
echo "[4/5] .stignore (sentinel + hwm are local-only per spec §4.3)"
if [ -d "$WORKSPACE" ]; then
touch "$STIGNORE"
for pattern in "pings/.*.watcher-active" "pings/.*.hwm"; do
if ! grep -qxF "$pattern" "$STIGNORE"; then
echo "$pattern" >> "$STIGNORE"
echo " added: $pattern"
fi
done
else
echo " $WORKSPACE not present — skipping (Syncthing not set up here)"
fi
# 5. mcp.json snippet
echo "[5/5] mcp.json registration"
if [ "$NO_MCP_JSON" -eq 0 ]; then
cat <<EOF
To register the watcher with Claude Code, add this to your MCP config.
For project-level (.mcp.json in the project root):
{
"mcpServers": {
"agent-watcher": {
"command": "agent-watcher-mcp"
}
}
}
For user-level (~/.claude.json):
{
"mcpServers": {
"agent-watcher": {
"command": "$BIN_DIR/agent-watcher-mcp"
}
}
}
Then start Claude Code with the development flag (research preview):
claude --dangerously-load-development-channels server:agent-watcher
To use a sandbox identity instead of the default ~/.ping-agent:
CLAUDE_CONFIG_DIR=~/.claude-sandbox claude --dangerously-load-development-channels server:agent-watcher
(Drop a 'ping-agent' file in ~/.claude-sandbox containing the sandbox name first.)
EOF
else
echo " --no-mcp-json: skipping snippet"
fi
echo "installation complete."

File diff suppressed because it is too large Load diff

View file

@ -1,32 +0,0 @@
{
"name": "agent-watcher-mcp",
"version": "0.1.0",
"description": "Layer 2 MCP Watcher for the agent-watcher system. Reads ping inbox files via inotify and surfaces them into a Claude Code session via Channels.",
"private": true,
"type": "module",
"bin": {
"agent-watcher-mcp": "dist/server.js"
},
"main": "dist/server.js",
"scripts": {
"build": "tsc",
"start": "node dist/server.js",
"dev": "tsx src/server.ts",
"test": "vitest run",
"test:watch": "vitest",
"typecheck": "tsc --noEmit"
},
"dependencies": {
"@modelcontextprotocol/sdk": "^1.0.0",
"chokidar": "^4.0.0"
},
"devDependencies": {
"@types/node": "^20.0.0",
"tsx": "^4.0.0",
"typescript": "^5.4.0",
"vitest": "^1.6.0"
},
"engines": {
"node": ">=18.0.0"
}
}

View file

@ -1,61 +0,0 @@
// Identity resolution for the MCP Watcher.
//
// Mirrors the layered resolution shipped in agent-ping (PR #1). The CLI
// has four layers (--as flag, env, $CLAUDE_HOME/ping-agent, ~/.ping-agent);
// the watcher subprocess has three (no argv override useful in MCP context).
//
// Spec: agent-ping spec §9; agent-watcher spec §4.4.
import { homedir } from "node:os";
import { existsSync, readFileSync } from "node:fs";
import { join } from "node:path";
export interface ResolvedIdentity {
agent: string;
source: string;
}
/**
* Resolve this process's agent identity.
* Layers, first match wins:
* 1. PING_AGENT_IDENTITY env var
* 2. $CLAUDE_CONFIG_DIR/ping-agent (the env var Claude Code actually reads;
* verified by `strings` on the binary, 2026-05-06 sandbox spike).
* 3. $CLAUDE_HOME/ping-agent (kept for compat with anyone setting the
* previously-documented variable).
* 4. ~/.ping-agent
*
* Throws if no layer resolves.
*/
export function resolveIdentity(env: NodeJS.ProcessEnv = process.env): ResolvedIdentity {
const envId = (env.PING_AGENT_IDENTITY ?? "").trim();
if (envId) {
return { agent: envId, source: "PING_AGENT_IDENTITY env var" };
}
for (const varName of ["CLAUDE_CONFIG_DIR", "CLAUDE_HOME"]) {
const dir = (env[varName] ?? "").trim();
if (!dir) continue;
const f = join(dir, "ping-agent");
if (existsSync(f)) {
const name = readFileSync(f, "utf8").trim();
if (name) {
return { agent: name, source: f };
}
}
}
const home = env.HOME ?? homedir();
const defaultFile = join(home, ".ping-agent");
if (existsSync(defaultFile)) {
const name = readFileSync(defaultFile, "utf8").trim();
if (name) {
return { agent: name, source: defaultFile };
}
}
throw new Error(
`agent-watcher-mcp: no identity resolved. Set PING_AGENT_IDENTITY, ` +
`$CLAUDE_CONFIG_DIR/ping-agent (or $CLAUDE_HOME/ping-agent), or ${defaultFile}.`,
);
}

View file

@ -1,147 +0,0 @@
// Inbox reader + HWM tracker.
//
// JSONL inbox files are produced by the agent-ping CLI and the Layer 1
// Collector. This module reads new lines since the last delivered ts,
// applies sentinel-deferral logic (warn-after-3 per spec §6 of agent-ping
// + §4.1 of agent-watcher), and atomically advances the HWM.
//
// The HWM file is local-only (not Syncthing-replicated; in .stignore).
import { existsSync, readFileSync, writeFileSync, renameSync, mkdirSync } from "node:fs";
import { dirname } from "node:path";
export interface PingEvent {
ts: string;
id: string;
from: string;
to: string;
type: string;
priority?: string;
payload: string;
sentinel?: string;
source?: string; // collector-only debug field
}
export interface HwmState {
last_delivered_ts: string;
pending_attempts: Record<string, number>;
}
export const DEFER_LIMIT = 3;
export function readInbox(path: string): PingEvent[] {
if (!existsSync(path)) return [];
const body = readFileSync(path, "utf8");
const out: PingEvent[] = [];
for (const raw of body.split("\n")) {
const line = raw.trim();
if (!line) continue;
try {
const parsed = JSON.parse(line);
if (parsed && typeof parsed === "object" && parsed.id && parsed.ts) {
out.push(parsed as PingEvent);
}
} catch {
// Skip malformed lines; logged at the call site.
}
}
return out;
}
export function readHwm(path: string): HwmState {
if (!existsSync(path)) {
return { last_delivered_ts: "", pending_attempts: {} };
}
try {
const parsed = JSON.parse(readFileSync(path, "utf8"));
return {
last_delivered_ts: parsed.last_delivered_ts ?? "",
pending_attempts: parsed.pending_attempts ?? {},
};
} catch {
return { last_delivered_ts: "", pending_attempts: {} };
}
}
/** Atomic write via tmp-file + rename. Avoids torn writes if the process dies mid-write. */
export function writeHwm(path: string, state: HwmState): void {
mkdirSync(dirname(path), { recursive: true });
const tmp = path + ".tmp";
writeFileSync(tmp, JSON.stringify(state) + "\n", "utf8");
renameSync(tmp, path);
}
export interface Deliverable {
event: PingEvent;
/** Prefix prepended to the payload when sentinel was missing on the 3rd attempt. */
warning?: string;
}
export interface ReadResult {
/** Events ready to deliver (HWM should advance to cover these). */
deliverable: Deliverable[];
/** Events deferred this round; HWM should NOT advance past them yet. */
deferred: PingEvent[];
/** Updated HWM state to write after delivery succeeds. */
nextHwm: HwmState;
}
/**
* Read new pings since the HWM, applying sentinel deferral.
*
* `sentinelExists` is injected so tests don't need real files. In production
* pass `(p) => existsSync(p)` from the caller.
*/
export function unreadSinceHwm(
events: PingEvent[],
hwm: HwmState,
sentinelExists: (path: string) => boolean,
): ReadResult {
const deliverable: Deliverable[] = [];
const deferred: PingEvent[] = [];
const nextAttempts: Record<string, number> = { ...hwm.pending_attempts };
for (const ev of events) {
if (hwm.last_delivered_ts && ev.ts <= hwm.last_delivered_ts) continue;
if (ev.sentinel && !sentinelExists(ev.sentinel)) {
const attempts = (nextAttempts[ev.id] ?? 0) + 1;
nextAttempts[ev.id] = attempts;
if (attempts < DEFER_LIMIT) {
deferred.push(ev);
continue;
}
// Third attempt — deliver with warning.
deliverable.push({
event: ev,
warning: `referenced file ${ev.sentinel} hasn't synced after ${attempts} attempts`,
});
delete nextAttempts[ev.id];
continue;
}
deliverable.push({ event: ev });
delete nextAttempts[ev.id];
}
// HWM advances to cover the highest-ts deliverable, but NOT past any deferred event.
// Otherwise we'd skip past deferred items on next read.
const minDeferredTs = deferred.length > 0
? deferred.reduce((m, e) => (e.ts < m ? e.ts : m), deferred[0].ts)
: null;
let nextTs = hwm.last_delivered_ts;
for (const d of deliverable) {
if (minDeferredTs && d.event.ts >= minDeferredTs) continue;
if (d.event.ts > nextTs) nextTs = d.event.ts;
}
return {
deliverable,
deferred,
nextHwm: {
last_delivered_ts: nextTs,
pending_attempts: nextAttempts,
},
};
}

View file

@ -1,33 +0,0 @@
// Path helpers — single source of truth for where files live.
//
// All paths derive from $HOME (or os.homedir() fallback) and the agent's
// resolved identity. Centralizing them here keeps inbox, hwm, sentinel,
// and ack-log paths consistent across modules and trivially mockable in
// tests by overriding `home`.
import { homedir } from "node:os";
import { join } from "node:path";
export interface Paths {
workspace: string;
pingsDir: string;
acksLog: string;
agentsFile: string;
inbox(agent: string): string;
hwm(agent: string): string;
watcherActive(agent: string): string;
}
export function makePaths(home: string = process.env.HOME ?? homedir()): Paths {
const workspace = join(home, "Nyx", "workspace");
const pingsDir = join(workspace, "pings");
return {
workspace,
pingsDir,
acksLog: join(workspace, "acks", "log"),
agentsFile: join(pingsDir, ".agents"),
inbox: (agent: string) => join(pingsDir, `${agent}.inbox`),
hwm: (agent: string) => join(pingsDir, `.${agent}.hwm`),
watcherActive: (agent: string) => join(pingsDir, `.${agent}.watcher-active`),
};
}

View file

@ -1,41 +0,0 @@
// Sentinel file: signals the agent-ping UserPromptSubmit hook to stand
// down while the watcher is delivering pings via Channels (spec §4.3).
//
// The file lives at `pings/.<agent>.watcher-active` and is local-only
// (must be in .stignore — otherwise one host's watcher silences another
// host's hook).
//
// Lifecycle:
// - On startup: write the file with the current PID + timestamp.
// - On graceful shutdown (SIGINT/SIGTERM): remove it.
// - On crash: file is left behind. The hook checks file age + PID
// liveness to ignore stale sentinels. (See agent-ping hook update.)
import { writeFileSync, unlinkSync, mkdirSync, existsSync } from "node:fs";
import { dirname } from "node:path";
export interface SentinelHandle {
release(): void;
}
export function claimSentinel(path: string): SentinelHandle {
mkdirSync(dirname(path), { recursive: true });
const body = JSON.stringify({
pid: process.pid,
started_at: new Date().toISOString(),
});
writeFileSync(path, body + "\n", "utf8");
let released = false;
return {
release() {
if (released) return;
released = true;
try {
if (existsSync(path)) unlinkSync(path);
} catch {
// best-effort
}
},
};
}

View file

@ -1,135 +0,0 @@
#!/usr/bin/env node
// agent-watcher-mcp — Layer 2 of the agent-watcher system.
//
// An MCP server (stdio transport) that:
// 1. Watches the local agent's ping inbox file via inotify (chokidar).
// 2. Surfaces unread pings into the Claude Code session as channel events
// (notifications/claude/channel) per the channels-reference.
// 3. Exposes three reply tools (ack / respond / mark_handled) so Claude
// can react.
// 4. Writes a `.watcher-active` sentinel so the existing agent-ping
// UserPromptSubmit hook stands down while the watcher is in charge
// of delivery.
//
// Spec: ../spec/agent-watcher.md §4.
//
// Launch (Claude Code v2.1.80+, research preview):
// claude --dangerously-load-development-channels server:agent-watcher
//
// mcp.json registration:
// { "mcpServers": { "agent-watcher": { "command": "agent-watcher-mcp" } } }
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
ListToolsRequestSchema,
CallToolRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { resolveIdentity } from "./identity.js";
import { makePaths } from "./paths.js";
import { InboxWatcher } from "./watcher.js";
import { claimSentinel } from "./sentinel.js";
import { toolDefinitions, callTool } from "./tools.js";
import type { PingEvent } from "./inbox.js";
const SERVER_NAME = "agent-watcher";
const SERVER_VERSION = "0.1.0";
const INSTRUCTIONS = [
"Pings from other agents arrive as <channel source=\"agent-watcher\" ...> events.",
"Each event has these attributes: ping_id, sender, type, priority, ts.",
"Treat them as priority context. Respond patterns:",
"- type=NEEDS-RESPONSE → call the `respond` tool with the ping_id and a payload",
"- type=ACK-REQUEST → call the `ack` tool with the ping_id (no content reply needed)",
"- type=INFO → call `mark_handled` once you've factored it in (or call ack to record it)",
"- priority=urgent events render first regardless of timestamp",
"If a ping body begins with \"[WARNING: referenced file ... hasn't synced]\" the referenced",
"artifact wasn't on disk when the watcher delivered. Acknowledge but flag the missing file.",
].join("\n");
async function main(): Promise<void> {
const id = resolveIdentity();
const paths = makePaths();
const mcp = new Server(
{ name: SERVER_NAME, version: SERVER_VERSION },
{
capabilities: {
experimental: { "claude/channel": {} },
tools: {},
},
instructions: INSTRUCTIONS,
},
);
const watcher = new InboxWatcher({
paths,
agent: id.agent,
notify: async (event: PingEvent, warning?: string) => {
const content = warning
? `[WARNING: ${warning}]\n${event.payload}`
: event.payload;
// meta keys must be identifier-safe (no hyphens). Map ping fields:
// - keep ping_id, sender, ts, priority verbatim
// - rename `type` → `ping_type` so it doesn't collide with anything
// the runtime might reserve
const meta: Record<string, string> = {
ping_id: event.id,
sender: event.from,
ping_type: event.type,
ts: event.ts,
};
if (event.priority) meta.priority = event.priority;
if (event.sentinel) meta.sentinel = sanitizeForMeta(event.sentinel);
await mcp.notification({
method: "notifications/claude/channel",
params: { content, meta },
});
},
});
mcp.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: toolDefinitions(),
}));
mcp.setRequestHandler(CallToolRequestSchema, async (req) => {
const args = (req.params.arguments ?? {}) as Record<string, unknown>;
const result = callTool(req.params.name, args, {
paths,
agent: id.agent,
recentEvents: () => watcher.recentEvents(),
});
if (!result.ok) {
throw new Error(result.text);
}
return { content: [{ type: "text", text: result.text }] };
});
// Sentinel: signal the agent-ping hook to stand down on this host.
const sentinel = claimSentinel(paths.watcherActive(id.agent));
const cleanup = () => sentinel.release();
process.on("SIGINT", () => { cleanup(); process.exit(0); });
process.on("SIGTERM", () => { cleanup(); process.exit(0); });
process.on("exit", cleanup);
// Connect stdio first so notifications during the initial drain land cleanly.
const transport = new StdioServerTransport();
await mcp.connect(transport);
// Now drain + start watching.
await watcher.start();
}
/** Channel meta values must be identifier-safe strings. Replace anything weird. */
function sanitizeForMeta(s: string): string {
return s.replace(/[^A-Za-z0-9_/.:\-]/g, "_");
}
main().catch((err: unknown) => {
const msg = err instanceof Error ? err.message : String(err);
process.stderr.write(`agent-watcher-mcp: fatal: ${msg}\n`);
process.exit(1);
});

View file

@ -1,196 +0,0 @@
// Reply tools exposed by the MCP server: ack, respond, mark_handled.
//
// These are standard MCP tools per the channels-reference. Claude calls them
// to send signals back to the originating sender or to mark a ping as handled
// without a content reply.
//
// Cross-host write discipline (spec §4.2): the tools always write to the
// local filesystem. Syncthing replicates outward. They never write directly
// to a remote-host path.
import { appendFileSync, mkdirSync, existsSync, writeFileSync, renameSync, readdirSync } from "node:fs";
import { dirname } from "node:path";
import { randomBytes } from "node:crypto";
import type { Paths } from "./paths.js";
import type { PingEvent } from "./inbox.js";
const VALID_TYPES = new Set(["INFO", "NEEDS-RESPONSE", "ACK-REQUEST"]);
export interface ToolDeps {
paths: Paths;
agent: string;
/** All pings the watcher has seen; used to look up ping_id → original sender. */
recentEvents: () => Map<string, PingEvent>;
}
/** JSON Schema for the three tools. Returned by ListToolsRequestSchema. */
export function toolDefinitions() {
return [
{
name: "ack",
description:
"Acknowledge receipt of a ping. Writes a delivery record to the shared acks log. " +
"Use for ACK-REQUEST type pings, or when you want to confirm a NEEDS-RESPONSE was seen " +
"before composing a longer reply.",
inputSchema: {
type: "object",
properties: {
ping_id: { type: "string", description: "The ping id (from the <channel ping_id=...> attribute)." },
message: { type: "string", description: "Optional short message to record alongside the ack." },
},
required: ["ping_id"],
},
},
{
name: "respond",
description:
"Send a response ping back to the original sender. Writes a JSONL line to <sender>.inbox. " +
"Use for content replies to NEEDS-RESPONSE pings.",
inputSchema: {
type: "object",
properties: {
ping_id: { type: "string", description: "The original ping id this is responding to." },
payload: { type: "string", description: "The response message." },
type: {
type: "string",
enum: ["INFO", "NEEDS-RESPONSE", "ACK-REQUEST"],
description: "Type of the outgoing ping. Default INFO.",
},
priority: {
type: "string",
enum: ["normal", "urgent"],
description: "Outgoing priority. Default normal.",
},
},
required: ["ping_id", "payload"],
},
},
{
name: "mark_handled",
description:
"Mark a ping as processed without sending a reply. Records an ack with kind=delivered. " +
"Use for INFO pings you've factored into your reasoning.",
inputSchema: {
type: "object",
properties: {
ping_id: { type: "string", description: "The ping id to mark handled." },
},
required: ["ping_id"],
},
},
];
}
export interface CallResult {
ok: boolean;
text: string;
}
export function callTool(name: string, args: Record<string, unknown>, deps: ToolDeps): CallResult {
switch (name) {
case "ack":
return doAck(args as { ping_id: string; message?: string }, deps);
case "respond":
return doRespond(
args as { ping_id: string; payload: string; type?: string; priority?: string },
deps,
);
case "mark_handled":
return doMarkHandled(args as { ping_id: string }, deps);
default:
return { ok: false, text: `unknown tool: ${name}` };
}
}
function lookupOriginalSender(ping_id: string, deps: ToolDeps): string | null {
const ev = deps.recentEvents().get(ping_id);
return ev?.from ?? null;
}
function writeAck(deps: ToolDeps, entry: Record<string, unknown>): void {
mkdirSync(dirname(deps.paths.acksLog), { recursive: true });
appendFileSync(deps.paths.acksLog, JSON.stringify(entry) + "\n", "utf8");
}
function doAck(args: { ping_id: string; message?: string }, deps: ToolDeps): CallResult {
if (!args.ping_id) return { ok: false, text: "ping_id required" };
const sender = lookupOriginalSender(args.ping_id, deps);
const entry: Record<string, unknown> = {
ts: nowUtc(),
acked_by: deps.agent,
ping_id: args.ping_id,
original_sender: sender,
ack_kind: "delivered",
};
if (args.message) entry.message = args.message;
const ev = deps.recentEvents().get(args.ping_id);
if (ev) entry.original_type = ev.type;
writeAck(deps, entry);
return { ok: true, text: `acked ${args.ping_id}` };
}
function doRespond(
args: { ping_id: string; payload: string; type?: string; priority?: string },
deps: ToolDeps,
): CallResult {
if (!args.ping_id) return { ok: false, text: "ping_id required" };
if (!args.payload) return { ok: false, text: "payload required" };
const recipient = lookupOriginalSender(args.ping_id, deps);
if (!recipient) {
return {
ok: false,
text: `cannot respond: ping ${args.ping_id} not found in recent events. ` +
`It may have been delivered before this watcher started.`,
};
}
const type = (args.type ?? "INFO").toUpperCase();
if (!VALID_TYPES.has(type)) {
return { ok: false, text: `invalid type ${args.type}; want INFO|NEEDS-RESPONSE|ACK-REQUEST` };
}
const priority = args.priority ?? "normal";
if (priority !== "normal" && priority !== "urgent") {
return { ok: false, text: `invalid priority ${priority}; want normal|urgent` };
}
const newEvent: PingEvent = {
ts: nowUtc(),
id: "ping-" + randomBytes(4).toString("hex"),
from: deps.agent,
to: recipient,
type,
priority,
payload: args.payload,
};
const target = deps.paths.inbox(recipient);
appendInboxAtomic(target, newEvent);
// Also log the ack so the original sender's `acks/log` reflects "responded".
writeAck(deps, {
ts: nowUtc(),
acked_by: deps.agent,
ping_id: args.ping_id,
original_sender: recipient,
ack_kind: "responded",
response_id: newEvent.id,
});
return { ok: true, text: `responded to ${args.ping_id}${recipient}.inbox (${newEvent.id})` };
}
function doMarkHandled(args: { ping_id: string }, deps: ToolDeps): CallResult {
if (!args.ping_id) return { ok: false, text: "ping_id required" };
return doAck({ ping_id: args.ping_id, message: "marked handled, no reply needed" }, deps);
}
function appendInboxAtomic(path: string, ev: PingEvent): void {
mkdirSync(dirname(path), { recursive: true });
// O_APPEND on POSIX guarantees atomicity for writes <= PIPE_BUF (4 KiB).
// Our JSONL line is well under that, so a plain appendFileSync is safe
// even if other processes (ping CLI, Collector) are also appending.
appendFileSync(path, JSON.stringify(ev) + "\n", "utf8");
}
function nowUtc(): string {
return new Date().toISOString().replace(/\.\d{3}Z$/, "Z");
}

View file

@ -1,147 +0,0 @@
// Inbox watcher — wraps chokidar on the inbox file and emits channel
// notifications via the supplied notifier callback.
//
// Behaviour:
// 1. On startup: read the inbox, drain anything since the HWM, emit
// pending pings.
// 2. Set up chokidar on the inbox file. On change, re-read, emit new lines.
// 3. Sentinel-deferred pings remain in the inbox; on each change-event
// we re-evaluate them. The HWM file does NOT advance past them.
//
// The notifier is the MCP server's `mcp.notification()` wrapped to take a
// PingEvent + optional warning. Decoupled so this module is testable
// without spinning up an MCP transport.
import chokidar, { type FSWatcher } from "chokidar";
import { existsSync } from "node:fs";
import type { Paths } from "./paths.js";
import {
readInbox,
readHwm,
writeHwm,
unreadSinceHwm,
type Deliverable,
type HwmState,
type PingEvent,
} from "./inbox.js";
export type Notifier = (event: PingEvent, warning?: string) => Promise<void>;
export interface WatcherOptions {
paths: Paths;
agent: string;
notify: Notifier;
/** Override for testability. Defaults to fs.existsSync. */
sentinelExists?: (p: string) => boolean;
/** Called on every drain after writeHwm; useful for tests + telemetry. */
onDrain?: (result: { delivered: number; deferred: number }) => void;
/** Override for testability — chokidar in prod, mock in tests. */
startWatcher?: (path: string, onChange: () => void) => FSWatcher | { close(): Promise<void> };
}
export class InboxWatcher {
private opts: WatcherOptions;
private fsWatcher: FSWatcher | { close(): Promise<void> } | null = null;
/** All seen events keyed by id, used by the respond tool to look up sender. */
private seen = new Map<string, PingEvent>();
private draining = false;
private pendingDrain = false;
constructor(opts: WatcherOptions) {
this.opts = opts;
}
async start(): Promise<void> {
// Initial drain
await this.drain();
const inboxPath = this.opts.paths.inbox(this.opts.agent);
const start = this.opts.startWatcher ?? defaultChokidar;
this.fsWatcher = start(inboxPath, () => {
void this.drain();
});
}
async stop(): Promise<void> {
if (this.fsWatcher) {
await this.fsWatcher.close();
this.fsWatcher = null;
}
}
/** Snapshot of seen events. Used by the respond tool. */
recentEvents(): Map<string, PingEvent> {
return this.seen;
}
/** Drain new pings since HWM. Coalesces back-to-back fires. */
private async drain(): Promise<void> {
if (this.draining) {
this.pendingDrain = true;
return;
}
this.draining = true;
try {
do {
this.pendingDrain = false;
await this.drainOnce();
} while (this.pendingDrain);
} finally {
this.draining = false;
}
}
private async drainOnce(): Promise<void> {
const inboxPath = this.opts.paths.inbox(this.opts.agent);
const hwmPath = this.opts.paths.hwm(this.opts.agent);
const events = readInbox(inboxPath);
// Track every seen event so respond() can look them up.
for (const ev of events) this.seen.set(ev.id, ev);
const hwm = readHwm(hwmPath);
const sentinelExists = this.opts.sentinelExists ?? ((p: string) => existsSync(p));
const result = unreadSinceHwm(events, hwm, sentinelExists);
// Emit normal-priority pings in ts order; urgent first.
const sorted: Deliverable[] = [...result.deliverable].sort((a, b) => {
const aUrg = a.event.priority === "urgent" ? 0 : 1;
const bUrg = b.event.priority === "urgent" ? 0 : 1;
if (aUrg !== bUrg) return aUrg - bUrg;
return a.event.ts.localeCompare(b.event.ts);
});
for (const d of sorted) {
await this.opts.notify(d.event, d.warning);
}
// Persist HWM regardless of deferred count — pending_attempts changes too.
if (sorted.length > 0 || hwmDifferent(hwm, result.nextHwm)) {
writeHwm(hwmPath, result.nextHwm);
}
this.opts.onDrain?.({ delivered: sorted.length, deferred: result.deferred.length });
}
}
function hwmDifferent(a: HwmState, b: HwmState): boolean {
if (a.last_delivered_ts !== b.last_delivered_ts) return true;
const ak = Object.keys(a.pending_attempts);
const bk = Object.keys(b.pending_attempts);
if (ak.length !== bk.length) return true;
for (const k of ak) {
if (a.pending_attempts[k] !== b.pending_attempts[k]) return true;
}
return false;
}
function defaultChokidar(path: string, onChange: () => void): FSWatcher {
const w = chokidar.watch(path, {
persistent: true,
ignoreInitial: true,
awaitWriteFinish: { stabilityThreshold: 100, pollInterval: 50 },
});
w.on("add", onChange);
w.on("change", onChange);
return w;
}

View file

@ -1,94 +0,0 @@
import { describe, it, expect, beforeEach, afterEach } from "vitest";
import { mkdtempSync, writeFileSync, rmSync, mkdirSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { resolveIdentity } from "../src/identity.js";
let dir: string;
beforeEach(() => {
dir = mkdtempSync(join(tmpdir(), "watcher-id-"));
});
afterEach(() => {
rmSync(dir, { recursive: true, force: true });
});
describe("resolveIdentity", () => {
it("uses PING_AGENT_IDENTITY env var first", () => {
writeFileSync(join(dir, ".ping-agent"), "default-agent\n");
const r = resolveIdentity({
HOME: dir,
PING_AGENT_IDENTITY: "envname",
});
expect(r.agent).toBe("envname");
expect(r.source).toMatch(/env/i);
});
it("uses $CLAUDE_CONFIG_DIR/ping-agent next", () => {
writeFileSync(join(dir, ".ping-agent"), "default-agent\n");
const sandbox = join(dir, "claude-sandbox");
mkdirSync(sandbox);
writeFileSync(join(sandbox, "ping-agent"), "sandbox\n");
const r = resolveIdentity({
HOME: dir,
CLAUDE_CONFIG_DIR: sandbox,
PING_AGENT_IDENTITY: "",
});
expect(r.agent).toBe("sandbox");
expect(r.source).toContain(sandbox);
});
it("falls back to $CLAUDE_HOME/ping-agent when CLAUDE_CONFIG_DIR is unset", () => {
writeFileSync(join(dir, ".ping-agent"), "default-agent\n");
const claudeHome = join(dir, "claude-old");
mkdirSync(claudeHome);
writeFileSync(join(claudeHome, "ping-agent"), "compat\n");
const r = resolveIdentity({
HOME: dir,
CLAUDE_HOME: claudeHome,
PING_AGENT_IDENTITY: "",
});
expect(r.agent).toBe("compat");
expect(r.source).toContain(claudeHome);
});
it("CLAUDE_CONFIG_DIR takes precedence over CLAUDE_HOME", () => {
const sandbox = join(dir, "claude-sandbox");
const old = join(dir, "claude-old");
mkdirSync(sandbox);
mkdirSync(old);
writeFileSync(join(sandbox, "ping-agent"), "new\n");
writeFileSync(join(old, "ping-agent"), "old\n");
const r = resolveIdentity({
HOME: dir,
CLAUDE_CONFIG_DIR: sandbox,
CLAUDE_HOME: old,
});
expect(r.agent).toBe("new");
});
it("falls through to ~/.ping-agent", () => {
writeFileSync(join(dir, ".ping-agent"), "bob\n");
const r = resolveIdentity({ HOME: dir });
expect(r.agent).toBe("bob");
expect(r.source).toContain(".ping-agent");
});
it("throws when nothing resolves", () => {
expect(() => resolveIdentity({ HOME: dir })).toThrow(/no identity/);
});
it("ignores empty CLAUDE_CONFIG_DIR/ping-agent and falls through", () => {
writeFileSync(join(dir, ".ping-agent"), "bob\n");
const sandbox = join(dir, "claude-sandbox");
mkdirSync(sandbox);
writeFileSync(join(sandbox, "ping-agent"), "");
const r = resolveIdentity({ HOME: dir, CLAUDE_CONFIG_DIR: sandbox });
expect(r.agent).toBe("bob");
});
it("trims whitespace from identity files", () => {
writeFileSync(join(dir, ".ping-agent"), " bob \n\n");
const r = resolveIdentity({ HOME: dir });
expect(r.agent).toBe("bob");
});
});

View file

@ -1,175 +0,0 @@
import { describe, it, expect, beforeEach, afterEach } from "vitest";
import { mkdtempSync, writeFileSync, existsSync, readFileSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
readInbox,
readHwm,
writeHwm,
unreadSinceHwm,
DEFER_LIMIT,
type PingEvent,
type HwmState,
} from "../src/inbox.js";
const ev = (over: Partial<PingEvent>): PingEvent => ({
ts: "2026-05-06T10:00:00Z",
id: "ping-aaa",
from: "foreman",
to: "bob",
type: "INFO",
payload: "test",
...over,
});
let dir: string;
beforeEach(() => {
dir = mkdtempSync(join(tmpdir(), "watcher-test-"));
});
afterEach(() => {
rmSync(dir, { recursive: true, force: true });
});
describe("readInbox", () => {
it("returns empty for missing file", () => {
expect(readInbox(join(dir, "missing.inbox"))).toEqual([]);
});
it("parses JSONL, skips blank + malformed lines", () => {
const path = join(dir, "x.inbox");
writeFileSync(
path,
[
JSON.stringify(ev({ id: "ping-1" })),
"",
"{ not json",
JSON.stringify(ev({ id: "ping-2", ts: "2026-05-06T11:00:00Z" })),
"",
].join("\n"),
);
const r = readInbox(path);
expect(r.map((e) => e.id)).toEqual(["ping-1", "ping-2"]);
});
it("requires id and ts on each line", () => {
const path = join(dir, "x.inbox");
writeFileSync(path, [
JSON.stringify({ ts: "x", payload: "missing id" }),
JSON.stringify({ id: "missing-ts", payload: "x" }),
JSON.stringify(ev({ id: "good" })),
].join("\n"));
expect(readInbox(path).map((e) => e.id)).toEqual(["good"]);
});
});
describe("readHwm + writeHwm", () => {
it("returns empty state for missing file", () => {
const r = readHwm(join(dir, "missing.hwm"));
expect(r).toEqual({ last_delivered_ts: "", pending_attempts: {} });
});
it("round-trips through atomic write", () => {
const path = join(dir, ".hwm");
const state: HwmState = {
last_delivered_ts: "2026-05-06T12:00:00Z",
pending_attempts: { "ping-x": 2 },
};
writeHwm(path, state);
expect(existsSync(path)).toBe(true);
expect(existsSync(path + ".tmp")).toBe(false);
expect(readHwm(path)).toEqual(state);
});
it("recovers from corrupt file with empty state", () => {
const path = join(dir, ".hwm");
writeFileSync(path, "{ corrupt");
expect(readHwm(path)).toEqual({ last_delivered_ts: "", pending_attempts: {} });
});
});
describe("unreadSinceHwm", () => {
const yes = () => true;
const no = () => false;
it("returns all events when HWM is empty", () => {
const events = [ev({ id: "a", ts: "2026-05-06T10:00:00Z" })];
const r = unreadSinceHwm(events, { last_delivered_ts: "", pending_attempts: {} }, yes);
expect(r.deliverable.map((d) => d.event.id)).toEqual(["a"]);
expect(r.deferred).toEqual([]);
expect(r.nextHwm.last_delivered_ts).toBe("2026-05-06T10:00:00Z");
});
it("filters out events with ts <= HWM", () => {
const events = [
ev({ id: "old", ts: "2026-05-06T09:00:00Z" }),
ev({ id: "new", ts: "2026-05-06T11:00:00Z" }),
];
const r = unreadSinceHwm(
events,
{ last_delivered_ts: "2026-05-06T10:00:00Z", pending_attempts: {} },
yes,
);
expect(r.deliverable.map((d) => d.event.id)).toEqual(["new"]);
});
it("defers events whose sentinel is missing, increments attempts", () => {
const events = [ev({ id: "a", sentinel: "/nope" })];
const r = unreadSinceHwm(events, { last_delivered_ts: "", pending_attempts: {} }, no);
expect(r.deliverable).toEqual([]);
expect(r.deferred.map((e) => e.id)).toEqual(["a"]);
expect(r.nextHwm.pending_attempts).toEqual({ a: 1 });
// HWM does NOT advance past the deferred event
expect(r.nextHwm.last_delivered_ts).toBe("");
});
it("delivers with warning on the DEFER_LIMIT-th attempt", () => {
const events = [ev({ id: "a", sentinel: "/nope" })];
const r = unreadSinceHwm(
events,
{ last_delivered_ts: "", pending_attempts: { a: DEFER_LIMIT - 1 } },
no,
);
expect(r.deliverable.length).toBe(1);
expect(r.deliverable[0].warning).toMatch(/hasn't synced/);
expect(r.deferred).toEqual([]);
// attempts cleared once delivered
expect(r.nextHwm.pending_attempts).toEqual({});
expect(r.nextHwm.last_delivered_ts).toBe("2026-05-06T10:00:00Z");
});
it("delivers immediately when sentinel exists", () => {
const events = [ev({ id: "a", sentinel: "/exists" })];
const r = unreadSinceHwm(events, { last_delivered_ts: "", pending_attempts: {} }, yes);
expect(r.deliverable.length).toBe(1);
expect(r.deliverable[0].warning).toBeUndefined();
expect(r.nextHwm.pending_attempts).toEqual({});
});
it("HWM does not advance past deferred event even when later events deliver", () => {
const events = [
ev({ id: "deferred", ts: "2026-05-06T10:00:00Z", sentinel: "/nope" }),
ev({ id: "later", ts: "2026-05-06T11:00:00Z" }),
];
const sentinelExists = (p: string) => p !== "/nope";
const r = unreadSinceHwm(
events,
{ last_delivered_ts: "", pending_attempts: {} },
sentinelExists,
);
expect(r.deliverable.map((d) => d.event.id)).toEqual(["later"]);
expect(r.deferred.map((e) => e.id)).toEqual(["deferred"]);
// HWM stays empty so the deferred event is reconsidered next read
expect(r.nextHwm.last_delivered_ts).toBe("");
});
it("clears pending_attempts when sentinel finally lands", () => {
const events = [ev({ id: "a", sentinel: "/now-ok" })];
const r = unreadSinceHwm(
events,
{ last_delivered_ts: "", pending_attempts: { a: 2 } },
yes,
);
expect(r.deliverable.length).toBe(1);
expect(r.nextHwm.pending_attempts).toEqual({});
});
});

View file

@ -1,159 +0,0 @@
import { describe, it, expect, beforeEach, afterEach } from "vitest";
import { mkdtempSync, readFileSync, existsSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { makePaths } from "../src/paths.js";
import { callTool, toolDefinitions } from "../src/tools.js";
import type { PingEvent } from "../src/inbox.js";
let dir: string;
beforeEach(() => {
dir = mkdtempSync(join(tmpdir(), "watcher-tools-"));
});
afterEach(() => {
rmSync(dir, { recursive: true, force: true });
});
const setupDeps = (recent: Map<string, PingEvent>) => ({
paths: makePaths(dir),
agent: "bob",
recentEvents: () => recent,
});
const sampleEvent = (over: Partial<PingEvent> = {}): PingEvent => ({
ts: "2026-05-06T10:00:00Z",
id: "ping-orig",
from: "foreman",
to: "bob",
type: "NEEDS-RESPONSE",
payload: "do the thing",
...over,
});
describe("toolDefinitions", () => {
it("declares ack, respond, and mark_handled with required ping_id", () => {
const tools = toolDefinitions();
const names = tools.map((t) => t.name).sort();
expect(names).toEqual(["ack", "mark_handled", "respond"]);
for (const t of tools) {
expect(t.inputSchema.required).toContain("ping_id");
}
});
});
describe("callTool — ack", () => {
it("appends to acks/log with ack_kind=delivered + original_type", () => {
const recent = new Map<string, PingEvent>([["ping-orig", sampleEvent()]]);
const deps = setupDeps(recent);
const r = callTool("ack", { ping_id: "ping-orig" }, deps);
expect(r.ok).toBe(true);
const log = readFileSync(deps.paths.acksLog, "utf8").trim();
const obj = JSON.parse(log);
expect(obj.acked_by).toBe("bob");
expect(obj.ping_id).toBe("ping-orig");
expect(obj.original_sender).toBe("foreman");
expect(obj.ack_kind).toBe("delivered");
expect(obj.original_type).toBe("NEEDS-RESPONSE");
});
it("rejects missing ping_id", () => {
const r = callTool("ack", {}, setupDeps(new Map()));
expect(r.ok).toBe(false);
});
});
describe("callTool — respond", () => {
it("writes a JSONL line to <sender>.inbox + ack with ack_kind=responded", () => {
const recent = new Map<string, PingEvent>([["ping-orig", sampleEvent()]]);
const deps = setupDeps(recent);
const r = callTool(
"respond",
{ ping_id: "ping-orig", payload: "ok done", type: "INFO" },
deps,
);
expect(r.ok).toBe(true);
const inboxPath = deps.paths.inbox("foreman");
const lines = readFileSync(inboxPath, "utf8").trim().split("\n");
expect(lines).toHaveLength(1);
const ev = JSON.parse(lines[0]);
expect(ev.from).toBe("bob");
expect(ev.to).toBe("foreman");
expect(ev.type).toBe("INFO");
expect(ev.payload).toBe("ok done");
expect(ev.id).toMatch(/^ping-[a-f0-9]{8}$/);
const ack = JSON.parse(readFileSync(deps.paths.acksLog, "utf8").trim());
expect(ack.ack_kind).toBe("responded");
expect(ack.response_id).toBe(ev.id);
});
it("defaults missing type to INFO + missing priority to normal", () => {
const recent = new Map<string, PingEvent>([["ping-orig", sampleEvent()]]);
const deps = setupDeps(recent);
callTool("respond", { ping_id: "ping-orig", payload: "x" }, deps);
const lines = readFileSync(deps.paths.inbox("foreman"), "utf8").trim().split("\n");
const ev = JSON.parse(lines[0]);
expect(ev.type).toBe("INFO");
expect(ev.priority).toBe("normal");
});
it("rejects unknown ping_id", () => {
const r = callTool(
"respond",
{ ping_id: "ping-unknown", payload: "x" },
setupDeps(new Map()),
);
expect(r.ok).toBe(false);
expect(r.text).toContain("not found");
});
it("rejects invalid type", () => {
const recent = new Map<string, PingEvent>([["ping-orig", sampleEvent()]]);
const r = callTool(
"respond",
{ ping_id: "ping-orig", payload: "x", type: "bogus" },
setupDeps(recent),
);
expect(r.ok).toBe(false);
});
});
describe("callTool — mark_handled", () => {
it("writes an ack with explanatory message", () => {
const recent = new Map<string, PingEvent>([["ping-orig", sampleEvent()]]);
const deps = setupDeps(recent);
const r = callTool("mark_handled", { ping_id: "ping-orig" }, deps);
expect(r.ok).toBe(true);
const ack = JSON.parse(readFileSync(deps.paths.acksLog, "utf8").trim());
expect(ack.ack_kind).toBe("delivered");
expect(ack.message).toMatch(/handled/i);
});
});
describe("callTool — unknown tool", () => {
it("returns ok=false on unknown tool name", () => {
const r = callTool("bogus", { ping_id: "x" }, setupDeps(new Map()));
expect(r.ok).toBe(false);
expect(r.text).toContain("unknown");
});
});
describe("ack does not crash on unknown ping_id", () => {
it("still writes an ack with original_sender=null", () => {
const deps = setupDeps(new Map());
const r = callTool("ack", { ping_id: "ping-unknown" }, deps);
expect(r.ok).toBe(true);
const ack = JSON.parse(readFileSync(deps.paths.acksLog, "utf8").trim());
expect(ack.original_sender).toBeNull();
});
});
describe("paths writability through tools", () => {
it("creates the acks dir on first ack", () => {
const deps = setupDeps(new Map([["p1", sampleEvent({ id: "p1" })]]));
expect(existsSync(deps.paths.acksLog)).toBe(false);
callTool("ack", { ping_id: "p1" }, deps);
expect(existsSync(deps.paths.acksLog)).toBe(true);
});
});

View file

@ -1,155 +0,0 @@
import { describe, it, expect, beforeEach, afterEach } from "vitest";
import { mkdtempSync, writeFileSync, mkdirSync, appendFileSync, rmSync, readFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { makePaths } from "../src/paths.js";
import { InboxWatcher } from "../src/watcher.js";
import type { PingEvent } from "../src/inbox.js";
let dir: string;
beforeEach(() => {
dir = mkdtempSync(join(tmpdir(), "watcher-watch-"));
});
afterEach(() => {
rmSync(dir, { recursive: true, force: true });
});
const ev = (over: Partial<PingEvent>): PingEvent => ({
ts: "2026-05-06T10:00:00Z",
id: "ping-" + Math.random().toString(36).slice(2, 10),
from: "foreman",
to: "bob",
type: "INFO",
payload: "test",
...over,
});
const writeInbox = (path: string, events: PingEvent[]) => {
mkdirSync(join(path, "..").replace(/[^/]*\/?$/, ""), { recursive: true });
writeFileSync(path, events.map((e) => JSON.stringify(e)).join("\n") + "\n");
};
describe("InboxWatcher initial drain", () => {
it("emits all unread events on start, then advances HWM", async () => {
const paths = makePaths(dir);
mkdirSync(paths.pingsDir, { recursive: true });
writeInbox(paths.inbox("bob"), [
ev({ id: "a", ts: "2026-05-06T10:00:00Z" }),
ev({ id: "b", ts: "2026-05-06T11:00:00Z" }),
]);
const delivered: { event: PingEvent; warning?: string }[] = [];
const w = new InboxWatcher({
paths,
agent: "bob",
notify: async (event, warning) => {
delivered.push({ event, warning });
},
// No-op chokidar substitute
startWatcher: () => ({ async close() {} }),
});
await w.start();
await w.stop();
expect(delivered.map((d) => d.event.id)).toEqual(["a", "b"]);
const hwm = JSON.parse(readFileSync(paths.hwm("bob"), "utf8"));
expect(hwm.last_delivered_ts).toBe("2026-05-06T11:00:00Z");
});
it("orders urgent before normal regardless of timestamp", async () => {
const paths = makePaths(dir);
mkdirSync(paths.pingsDir, { recursive: true });
writeInbox(paths.inbox("bob"), [
ev({ id: "older-normal", ts: "2026-05-06T10:00:00Z", priority: "normal" }),
ev({ id: "newer-urgent", ts: "2026-05-06T11:00:00Z", priority: "urgent" }),
]);
const delivered: PingEvent[] = [];
const w = new InboxWatcher({
paths, agent: "bob",
notify: async (e) => { delivered.push(e); },
startWatcher: () => ({ async close() {} }),
});
await w.start();
await w.stop();
expect(delivered.map((d) => d.id)).toEqual(["newer-urgent", "older-normal"]);
});
it("defers a sentinel-missing ping and re-attempts on next drain", async () => {
const paths = makePaths(dir);
mkdirSync(paths.pingsDir, { recursive: true });
writeInbox(paths.inbox("bob"), [ev({ id: "a", sentinel: "/nope" })]);
const delivered: PingEvent[] = [];
let triggerChange: () => void = () => {};
const w = new InboxWatcher({
paths, agent: "bob",
notify: async (e) => { delivered.push(e); },
sentinelExists: () => false,
startWatcher: (_path, onChange) => {
triggerChange = onChange;
return { async close() {} };
},
});
await w.start();
expect(delivered).toEqual([]);
// hwm has pending_attempts.a = 1 but ts not advanced
let hwm = JSON.parse(readFileSync(paths.hwm("bob"), "utf8"));
expect(hwm.pending_attempts).toEqual({ a: 1 });
expect(hwm.last_delivered_ts).toBe("");
// Two more triggers — third delivery emits with warning
triggerChange();
await new Promise((r) => setTimeout(r, 10));
triggerChange();
await new Promise((r) => setTimeout(r, 10));
expect(delivered.map((d) => d.id)).toEqual(["a"]);
hwm = JSON.parse(readFileSync(paths.hwm("bob"), "utf8"));
expect(hwm.pending_attempts).toEqual({});
expect(hwm.last_delivered_ts).toBe("2026-05-06T10:00:00Z");
await w.stop();
});
it("populates recentEvents map for tools to look up sender", async () => {
const paths = makePaths(dir);
mkdirSync(paths.pingsDir, { recursive: true });
writeInbox(paths.inbox("bob"), [
ev({ id: "ping-x", from: "mom" }),
ev({ id: "ping-y", from: "foreman" }),
]);
const w = new InboxWatcher({
paths, agent: "bob",
notify: async () => {},
startWatcher: () => ({ async close() {} }),
});
await w.start();
expect(w.recentEvents().get("ping-x")?.from).toBe("mom");
expect(w.recentEvents().get("ping-y")?.from).toBe("foreman");
await w.stop();
});
it("skips pings already covered by HWM on restart", async () => {
const paths = makePaths(dir);
mkdirSync(paths.pingsDir, { recursive: true });
writeInbox(paths.inbox("bob"), [
ev({ id: "a", ts: "2026-05-06T10:00:00Z" }),
ev({ id: "b", ts: "2026-05-06T11:00:00Z" }),
]);
writeFileSync(
paths.hwm("bob"),
JSON.stringify({ last_delivered_ts: "2026-05-06T10:00:00Z", pending_attempts: {} }),
);
const delivered: PingEvent[] = [];
const w = new InboxWatcher({
paths, agent: "bob",
notify: async (e) => { delivered.push(e); },
startWatcher: () => ({ async close() {} }),
});
await w.start();
await w.stop();
expect(delivered.map((d) => d.id)).toEqual(["b"]);
});
});

View file

@ -1,18 +0,0 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "Node16",
"moduleResolution": "Node16",
"outDir": "dist",
"rootDir": "src",
"strict": true,
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"skipLibCheck": true,
"declaration": false,
"sourceMap": true
},
"include": ["src/**/*.ts"],
"exclude": ["node_modules", "dist", "test"]
}

View file

@ -1,8 +0,0 @@
import { defineConfig } from "vitest/config";
export default defineConfig({
test: {
include: ["test/**/*.test.ts"],
environment: "node",
},
});