Compare commits
No commits in common. "main" and "foreman/collector-scaffold" have entirely different histories.
main
...
foreman/co
18 changed files with 0 additions and 4819 deletions
5
mcp-watcher/.gitignore
vendored
5
mcp-watcher/.gitignore
vendored
|
|
@ -1,5 +0,0 @@
|
||||||
node_modules/
|
|
||||||
dist/
|
|
||||||
*.tsbuildinfo
|
|
||||||
.DS_Store
|
|
||||||
coverage/
|
|
||||||
|
|
@ -1,98 +0,0 @@
|
||||||
# agent-watcher-mcp (Layer 2)
|
|
||||||
|
|
||||||
The MCP Watcher: a Claude Code stdio MCP server that surfaces ping-inbox
|
|
||||||
events into your Claude Code session via Channels, and exposes reply tools
|
|
||||||
(`ack` / `respond` / `mark_handled`) so Claude can react.
|
|
||||||
|
|
||||||
Spec: `../spec/agent-watcher.md` §4.
|
|
||||||
|
|
||||||
## What it does
|
|
||||||
|
|
||||||
1. Resolves identity (env / `$CLAUDE_HOME/ping-agent` / `~/.ping-agent`).
|
|
||||||
2. Writes `pings/.<agent>.watcher-active` so the existing agent-ping
|
|
||||||
`UserPromptSubmit` hook stands down on this host.
|
|
||||||
3. Watches `pings/<agent>.inbox` via inotify (chokidar).
|
|
||||||
4. On change, drains unread pings (since HWM), applies sentinel
|
|
||||||
deferral (warn-after-3), and emits each as a
|
|
||||||
`notifications/claude/channel` event.
|
|
||||||
5. Exposes three MCP tools so Claude can ack, respond, or mark a ping
|
|
||||||
handled.
|
|
||||||
6. Removes the sentinel on graceful shutdown.
|
|
||||||
|
|
||||||
## Install
|
|
||||||
|
|
||||||
Per CLAUDE.md rule #2, the agent does not install on itself. A human
|
|
||||||
runs:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
cd /path/to/agent-watcher/mcp-watcher
|
|
||||||
./install.sh
|
|
||||||
```
|
|
||||||
|
|
||||||
This installs deps, builds, symlinks the binary into `~/.local/bin/`,
|
|
||||||
adds `.stignore` patterns, and prints the `mcp.json` snippet to paste
|
|
||||||
into Claude Code's config.
|
|
||||||
|
|
||||||
## Launch
|
|
||||||
|
|
||||||
Channels is in research preview. Start Claude Code with:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
claude --dangerously-load-development-channels server:agent-watcher
|
|
||||||
```
|
|
||||||
|
|
||||||
## Sandbox testing
|
|
||||||
|
|
||||||
To run a separate Claude Code session with a different identity (so it
|
|
||||||
won't compete with the default `~/.ping-agent` for inbox reads):
|
|
||||||
|
|
||||||
```bash
|
|
||||||
mkdir -p ~/.claude-sandbox
|
|
||||||
echo sandbox > ~/.claude-sandbox/ping-agent
|
|
||||||
CLAUDE_CONFIG_DIR=~/.claude-sandbox \
|
|
||||||
claude --dangerously-load-development-channels server:agent-watcher
|
|
||||||
```
|
|
||||||
|
|
||||||
The sandbox session reads `pings/sandbox.inbox` and writes
|
|
||||||
`pings/.sandbox.watcher-active` — fully isolated from prod.
|
|
||||||
|
|
||||||
## How it interacts with agent-ping
|
|
||||||
|
|
||||||
The `agent-ping` UserPromptSubmit hook checks for the sentinel file
|
|
||||||
`pings/.<agent>.watcher-active` at startup. If the file is present and
|
|
||||||
the PID inside is alive, the hook stands down — the watcher is the
|
|
||||||
delivery primitive. If the watcher exits (graceful or crash with stale
|
|
||||||
sentinel cleared by the hook's age check), the hook resumes
|
|
||||||
async-on-next-prompt delivery.
|
|
||||||
|
|
||||||
This means **you can run with the watcher OR the hook OR both
|
|
||||||
configured** — the sentinel arbitrates.
|
|
||||||
|
|
||||||
## Tests
|
|
||||||
|
|
||||||
```bash
|
|
||||||
npm test # unit tests via vitest (35 currently passing)
|
|
||||||
npm run typecheck # tsc --noEmit
|
|
||||||
npm run build # tsc → dist/
|
|
||||||
```
|
|
||||||
|
|
||||||
## Observability
|
|
||||||
|
|
||||||
- Logs to stderr (visible via `claude --debug` or the Claude Code MCP
|
|
||||||
debug log at `~/.claude/debug/<session-id>.txt`).
|
|
||||||
- Sentinel file content includes PID + start time for the hook's
|
|
||||||
age/liveness check.
|
|
||||||
|
|
||||||
## Limitations / v2
|
|
||||||
|
|
||||||
- Sentinel hook coexistence requires the agent-ping hook to know how
|
|
||||||
to read the sentinel. PR pending against `agent-ping` to add the
|
|
||||||
check.
|
|
||||||
- No reconnection / restart on Claude Code session restart — Claude
|
|
||||||
Code spawns the subprocess anew each session, so the watcher
|
|
||||||
re-drains from HWM cleanly.
|
|
||||||
- One watcher process per agent identity. Two sessions with the same
|
|
||||||
identity contending on the same inbox is undefined behaviour
|
|
||||||
(use `CLAUDE_HOME` to scope identities).
|
|
||||||
- Channels is research preview; if the API changes, expect to update
|
|
||||||
the meta-key sanitization or notification shape.
|
|
||||||
|
|
@ -1,118 +0,0 @@
|
||||||
#!/usr/bin/env bash
|
|
||||||
# install.sh — set up the agent-watcher MCP Watcher (Layer 2) on this host.
|
|
||||||
#
|
|
||||||
# Usage:
|
|
||||||
# ./install.sh # install: npm ci, build, link binary, print mcp.json snippet
|
|
||||||
# ./install.sh --no-mcp-json # everything but the mcp.json snippet (you handle registration)
|
|
||||||
#
|
|
||||||
# What it does:
|
|
||||||
# 1. Verifies Node 18+ is available.
|
|
||||||
# 2. Installs dependencies (npm ci) and builds (tsc).
|
|
||||||
# 3. Symlinks dist/server.js to ~/.local/bin/agent-watcher-mcp (chmod +x).
|
|
||||||
# 4. Adds .stignore patterns for the local-only watcher-active sentinel.
|
|
||||||
# 5. Prints the mcp.json snippet for paste into Claude Code's config.
|
|
||||||
#
|
|
||||||
# Per CLAUDE.md rule #2, this is intended to be run by a human. The MCP
|
|
||||||
# Watcher itself, like agent-ping, never invokes this script.
|
|
||||||
#
|
|
||||||
# Layer 1 (Collector) has its own install path. See the repo root install.sh.
|
|
||||||
|
|
||||||
set -euo pipefail
|
|
||||||
|
|
||||||
NO_MCP_JSON=0
|
|
||||||
for arg in "$@"; do
|
|
||||||
case "$arg" in
|
|
||||||
--no-mcp-json) NO_MCP_JSON=1 ;;
|
|
||||||
*) echo "unknown arg: $arg" >&2; exit 2 ;;
|
|
||||||
esac
|
|
||||||
done
|
|
||||||
|
|
||||||
REPO_DIR="$(cd "$(dirname "$0")" && pwd)"
|
|
||||||
BIN_DIR="$HOME/.local/bin"
|
|
||||||
WORKSPACE="$HOME/Nyx/workspace"
|
|
||||||
STIGNORE="$WORKSPACE/.stignore"
|
|
||||||
|
|
||||||
echo "agent-watcher-mcp install"
|
|
||||||
echo "repo: $REPO_DIR"
|
|
||||||
echo
|
|
||||||
|
|
||||||
# 1. Node check
|
|
||||||
echo "[1/5] checking Node"
|
|
||||||
if ! command -v node >/dev/null 2>&1; then
|
|
||||||
echo " ERROR: 'node' not found. Install Node 18+ first." >&2
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
NODE_MAJOR=$(node --version | sed 's/^v//' | cut -d. -f1)
|
|
||||||
if [ "$NODE_MAJOR" -lt 18 ]; then
|
|
||||||
echo " ERROR: Node 18+ required; found $(node --version)." >&2
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo " $(node --version)"
|
|
||||||
|
|
||||||
# 2. Install deps + build
|
|
||||||
echo "[2/5] npm ci + build"
|
|
||||||
( cd "$REPO_DIR" && npm ci --silent && npx tsc )
|
|
||||||
echo " built: $REPO_DIR/dist/server.js"
|
|
||||||
|
|
||||||
# 3. Binary symlink
|
|
||||||
echo "[3/5] linking binary"
|
|
||||||
mkdir -p "$BIN_DIR"
|
|
||||||
ln -sf "$REPO_DIR/dist/server.js" "$BIN_DIR/agent-watcher-mcp"
|
|
||||||
chmod +x "$REPO_DIR/dist/server.js"
|
|
||||||
echo " linked: $BIN_DIR/agent-watcher-mcp -> $REPO_DIR/dist/server.js"
|
|
||||||
|
|
||||||
# 4. .stignore — sentinel + hwm files are local-only
|
|
||||||
echo "[4/5] .stignore (sentinel + hwm are local-only per spec §4.3)"
|
|
||||||
if [ -d "$WORKSPACE" ]; then
|
|
||||||
touch "$STIGNORE"
|
|
||||||
for pattern in "pings/.*.watcher-active" "pings/.*.hwm"; do
|
|
||||||
if ! grep -qxF "$pattern" "$STIGNORE"; then
|
|
||||||
echo "$pattern" >> "$STIGNORE"
|
|
||||||
echo " added: $pattern"
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
else
|
|
||||||
echo " $WORKSPACE not present — skipping (Syncthing not set up here)"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# 5. mcp.json snippet
|
|
||||||
echo "[5/5] mcp.json registration"
|
|
||||||
if [ "$NO_MCP_JSON" -eq 0 ]; then
|
|
||||||
cat <<EOF
|
|
||||||
|
|
||||||
To register the watcher with Claude Code, add this to your MCP config.
|
|
||||||
|
|
||||||
For project-level (.mcp.json in the project root):
|
|
||||||
{
|
|
||||||
"mcpServers": {
|
|
||||||
"agent-watcher": {
|
|
||||||
"command": "agent-watcher-mcp"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
For user-level (~/.claude.json):
|
|
||||||
{
|
|
||||||
"mcpServers": {
|
|
||||||
"agent-watcher": {
|
|
||||||
"command": "$BIN_DIR/agent-watcher-mcp"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Then start Claude Code with the development flag (research preview):
|
|
||||||
|
|
||||||
claude --dangerously-load-development-channels server:agent-watcher
|
|
||||||
|
|
||||||
To use a sandbox identity instead of the default ~/.ping-agent:
|
|
||||||
|
|
||||||
CLAUDE_CONFIG_DIR=~/.claude-sandbox claude --dangerously-load-development-channels server:agent-watcher
|
|
||||||
|
|
||||||
(Drop a 'ping-agent' file in ~/.claude-sandbox containing the sandbox name first.)
|
|
||||||
|
|
||||||
EOF
|
|
||||||
else
|
|
||||||
echo " --no-mcp-json: skipping snippet"
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo "installation complete."
|
|
||||||
3197
mcp-watcher/package-lock.json
generated
3197
mcp-watcher/package-lock.json
generated
File diff suppressed because it is too large
Load diff
|
|
@ -1,32 +0,0 @@
|
||||||
{
|
|
||||||
"name": "agent-watcher-mcp",
|
|
||||||
"version": "0.1.0",
|
|
||||||
"description": "Layer 2 MCP Watcher for the agent-watcher system. Reads ping inbox files via inotify and surfaces them into a Claude Code session via Channels.",
|
|
||||||
"private": true,
|
|
||||||
"type": "module",
|
|
||||||
"bin": {
|
|
||||||
"agent-watcher-mcp": "dist/server.js"
|
|
||||||
},
|
|
||||||
"main": "dist/server.js",
|
|
||||||
"scripts": {
|
|
||||||
"build": "tsc",
|
|
||||||
"start": "node dist/server.js",
|
|
||||||
"dev": "tsx src/server.ts",
|
|
||||||
"test": "vitest run",
|
|
||||||
"test:watch": "vitest",
|
|
||||||
"typecheck": "tsc --noEmit"
|
|
||||||
},
|
|
||||||
"dependencies": {
|
|
||||||
"@modelcontextprotocol/sdk": "^1.0.0",
|
|
||||||
"chokidar": "^4.0.0"
|
|
||||||
},
|
|
||||||
"devDependencies": {
|
|
||||||
"@types/node": "^20.0.0",
|
|
||||||
"tsx": "^4.0.0",
|
|
||||||
"typescript": "^5.4.0",
|
|
||||||
"vitest": "^1.6.0"
|
|
||||||
},
|
|
||||||
"engines": {
|
|
||||||
"node": ">=18.0.0"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,61 +0,0 @@
|
||||||
// Identity resolution for the MCP Watcher.
|
|
||||||
//
|
|
||||||
// Mirrors the layered resolution shipped in agent-ping (PR #1). The CLI
|
|
||||||
// has four layers (--as flag, env, $CLAUDE_HOME/ping-agent, ~/.ping-agent);
|
|
||||||
// the watcher subprocess has three (no argv override useful in MCP context).
|
|
||||||
//
|
|
||||||
// Spec: agent-ping spec §9; agent-watcher spec §4.4.
|
|
||||||
|
|
||||||
import { homedir } from "node:os";
|
|
||||||
import { existsSync, readFileSync } from "node:fs";
|
|
||||||
import { join } from "node:path";
|
|
||||||
|
|
||||||
export interface ResolvedIdentity {
|
|
||||||
agent: string;
|
|
||||||
source: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Resolve this process's agent identity.
|
|
||||||
* Layers, first match wins:
|
|
||||||
* 1. PING_AGENT_IDENTITY env var
|
|
||||||
* 2. $CLAUDE_CONFIG_DIR/ping-agent (the env var Claude Code actually reads;
|
|
||||||
* verified by `strings` on the binary, 2026-05-06 sandbox spike).
|
|
||||||
* 3. $CLAUDE_HOME/ping-agent (kept for compat with anyone setting the
|
|
||||||
* previously-documented variable).
|
|
||||||
* 4. ~/.ping-agent
|
|
||||||
*
|
|
||||||
* Throws if no layer resolves.
|
|
||||||
*/
|
|
||||||
export function resolveIdentity(env: NodeJS.ProcessEnv = process.env): ResolvedIdentity {
|
|
||||||
const envId = (env.PING_AGENT_IDENTITY ?? "").trim();
|
|
||||||
if (envId) {
|
|
||||||
return { agent: envId, source: "PING_AGENT_IDENTITY env var" };
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const varName of ["CLAUDE_CONFIG_DIR", "CLAUDE_HOME"]) {
|
|
||||||
const dir = (env[varName] ?? "").trim();
|
|
||||||
if (!dir) continue;
|
|
||||||
const f = join(dir, "ping-agent");
|
|
||||||
if (existsSync(f)) {
|
|
||||||
const name = readFileSync(f, "utf8").trim();
|
|
||||||
if (name) {
|
|
||||||
return { agent: name, source: f };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const home = env.HOME ?? homedir();
|
|
||||||
const defaultFile = join(home, ".ping-agent");
|
|
||||||
if (existsSync(defaultFile)) {
|
|
||||||
const name = readFileSync(defaultFile, "utf8").trim();
|
|
||||||
if (name) {
|
|
||||||
return { agent: name, source: defaultFile };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
throw new Error(
|
|
||||||
`agent-watcher-mcp: no identity resolved. Set PING_AGENT_IDENTITY, ` +
|
|
||||||
`$CLAUDE_CONFIG_DIR/ping-agent (or $CLAUDE_HOME/ping-agent), or ${defaultFile}.`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
@ -1,147 +0,0 @@
|
||||||
// Inbox reader + HWM tracker.
|
|
||||||
//
|
|
||||||
// JSONL inbox files are produced by the agent-ping CLI and the Layer 1
|
|
||||||
// Collector. This module reads new lines since the last delivered ts,
|
|
||||||
// applies sentinel-deferral logic (warn-after-3 per spec §6 of agent-ping
|
|
||||||
// + §4.1 of agent-watcher), and atomically advances the HWM.
|
|
||||||
//
|
|
||||||
// The HWM file is local-only (not Syncthing-replicated; in .stignore).
|
|
||||||
|
|
||||||
import { existsSync, readFileSync, writeFileSync, renameSync, mkdirSync } from "node:fs";
|
|
||||||
import { dirname } from "node:path";
|
|
||||||
|
|
||||||
export interface PingEvent {
|
|
||||||
ts: string;
|
|
||||||
id: string;
|
|
||||||
from: string;
|
|
||||||
to: string;
|
|
||||||
type: string;
|
|
||||||
priority?: string;
|
|
||||||
payload: string;
|
|
||||||
sentinel?: string;
|
|
||||||
source?: string; // collector-only debug field
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface HwmState {
|
|
||||||
last_delivered_ts: string;
|
|
||||||
pending_attempts: Record<string, number>;
|
|
||||||
}
|
|
||||||
|
|
||||||
export const DEFER_LIMIT = 3;
|
|
||||||
|
|
||||||
export function readInbox(path: string): PingEvent[] {
|
|
||||||
if (!existsSync(path)) return [];
|
|
||||||
const body = readFileSync(path, "utf8");
|
|
||||||
const out: PingEvent[] = [];
|
|
||||||
for (const raw of body.split("\n")) {
|
|
||||||
const line = raw.trim();
|
|
||||||
if (!line) continue;
|
|
||||||
try {
|
|
||||||
const parsed = JSON.parse(line);
|
|
||||||
if (parsed && typeof parsed === "object" && parsed.id && parsed.ts) {
|
|
||||||
out.push(parsed as PingEvent);
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
// Skip malformed lines; logged at the call site.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
export function readHwm(path: string): HwmState {
|
|
||||||
if (!existsSync(path)) {
|
|
||||||
return { last_delivered_ts: "", pending_attempts: {} };
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
const parsed = JSON.parse(readFileSync(path, "utf8"));
|
|
||||||
return {
|
|
||||||
last_delivered_ts: parsed.last_delivered_ts ?? "",
|
|
||||||
pending_attempts: parsed.pending_attempts ?? {},
|
|
||||||
};
|
|
||||||
} catch {
|
|
||||||
return { last_delivered_ts: "", pending_attempts: {} };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Atomic write via tmp-file + rename. Avoids torn writes if the process dies mid-write. */
|
|
||||||
export function writeHwm(path: string, state: HwmState): void {
|
|
||||||
mkdirSync(dirname(path), { recursive: true });
|
|
||||||
const tmp = path + ".tmp";
|
|
||||||
writeFileSync(tmp, JSON.stringify(state) + "\n", "utf8");
|
|
||||||
renameSync(tmp, path);
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface Deliverable {
|
|
||||||
event: PingEvent;
|
|
||||||
/** Prefix prepended to the payload when sentinel was missing on the 3rd attempt. */
|
|
||||||
warning?: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface ReadResult {
|
|
||||||
/** Events ready to deliver (HWM should advance to cover these). */
|
|
||||||
deliverable: Deliverable[];
|
|
||||||
/** Events deferred this round; HWM should NOT advance past them yet. */
|
|
||||||
deferred: PingEvent[];
|
|
||||||
/** Updated HWM state to write after delivery succeeds. */
|
|
||||||
nextHwm: HwmState;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Read new pings since the HWM, applying sentinel deferral.
|
|
||||||
*
|
|
||||||
* `sentinelExists` is injected so tests don't need real files. In production
|
|
||||||
* pass `(p) => existsSync(p)` from the caller.
|
|
||||||
*/
|
|
||||||
export function unreadSinceHwm(
|
|
||||||
events: PingEvent[],
|
|
||||||
hwm: HwmState,
|
|
||||||
sentinelExists: (path: string) => boolean,
|
|
||||||
): ReadResult {
|
|
||||||
const deliverable: Deliverable[] = [];
|
|
||||||
const deferred: PingEvent[] = [];
|
|
||||||
const nextAttempts: Record<string, number> = { ...hwm.pending_attempts };
|
|
||||||
|
|
||||||
for (const ev of events) {
|
|
||||||
if (hwm.last_delivered_ts && ev.ts <= hwm.last_delivered_ts) continue;
|
|
||||||
|
|
||||||
if (ev.sentinel && !sentinelExists(ev.sentinel)) {
|
|
||||||
const attempts = (nextAttempts[ev.id] ?? 0) + 1;
|
|
||||||
nextAttempts[ev.id] = attempts;
|
|
||||||
if (attempts < DEFER_LIMIT) {
|
|
||||||
deferred.push(ev);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// Third attempt — deliver with warning.
|
|
||||||
deliverable.push({
|
|
||||||
event: ev,
|
|
||||||
warning: `referenced file ${ev.sentinel} hasn't synced after ${attempts} attempts`,
|
|
||||||
});
|
|
||||||
delete nextAttempts[ev.id];
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
deliverable.push({ event: ev });
|
|
||||||
delete nextAttempts[ev.id];
|
|
||||||
}
|
|
||||||
|
|
||||||
// HWM advances to cover the highest-ts deliverable, but NOT past any deferred event.
|
|
||||||
// Otherwise we'd skip past deferred items on next read.
|
|
||||||
const minDeferredTs = deferred.length > 0
|
|
||||||
? deferred.reduce((m, e) => (e.ts < m ? e.ts : m), deferred[0].ts)
|
|
||||||
: null;
|
|
||||||
|
|
||||||
let nextTs = hwm.last_delivered_ts;
|
|
||||||
for (const d of deliverable) {
|
|
||||||
if (minDeferredTs && d.event.ts >= minDeferredTs) continue;
|
|
||||||
if (d.event.ts > nextTs) nextTs = d.event.ts;
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
deliverable,
|
|
||||||
deferred,
|
|
||||||
nextHwm: {
|
|
||||||
last_delivered_ts: nextTs,
|
|
||||||
pending_attempts: nextAttempts,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
@ -1,33 +0,0 @@
|
||||||
// Path helpers — single source of truth for where files live.
|
|
||||||
//
|
|
||||||
// All paths derive from $HOME (or os.homedir() fallback) and the agent's
|
|
||||||
// resolved identity. Centralizing them here keeps inbox, hwm, sentinel,
|
|
||||||
// and ack-log paths consistent across modules and trivially mockable in
|
|
||||||
// tests by overriding `home`.
|
|
||||||
|
|
||||||
import { homedir } from "node:os";
|
|
||||||
import { join } from "node:path";
|
|
||||||
|
|
||||||
export interface Paths {
|
|
||||||
workspace: string;
|
|
||||||
pingsDir: string;
|
|
||||||
acksLog: string;
|
|
||||||
agentsFile: string;
|
|
||||||
inbox(agent: string): string;
|
|
||||||
hwm(agent: string): string;
|
|
||||||
watcherActive(agent: string): string;
|
|
||||||
}
|
|
||||||
|
|
||||||
export function makePaths(home: string = process.env.HOME ?? homedir()): Paths {
|
|
||||||
const workspace = join(home, "Nyx", "workspace");
|
|
||||||
const pingsDir = join(workspace, "pings");
|
|
||||||
return {
|
|
||||||
workspace,
|
|
||||||
pingsDir,
|
|
||||||
acksLog: join(workspace, "acks", "log"),
|
|
||||||
agentsFile: join(pingsDir, ".agents"),
|
|
||||||
inbox: (agent: string) => join(pingsDir, `${agent}.inbox`),
|
|
||||||
hwm: (agent: string) => join(pingsDir, `.${agent}.hwm`),
|
|
||||||
watcherActive: (agent: string) => join(pingsDir, `.${agent}.watcher-active`),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
@ -1,41 +0,0 @@
|
||||||
// Sentinel file: signals the agent-ping UserPromptSubmit hook to stand
|
|
||||||
// down while the watcher is delivering pings via Channels (spec §4.3).
|
|
||||||
//
|
|
||||||
// The file lives at `pings/.<agent>.watcher-active` and is local-only
|
|
||||||
// (must be in .stignore — otherwise one host's watcher silences another
|
|
||||||
// host's hook).
|
|
||||||
//
|
|
||||||
// Lifecycle:
|
|
||||||
// - On startup: write the file with the current PID + timestamp.
|
|
||||||
// - On graceful shutdown (SIGINT/SIGTERM): remove it.
|
|
||||||
// - On crash: file is left behind. The hook checks file age + PID
|
|
||||||
// liveness to ignore stale sentinels. (See agent-ping hook update.)
|
|
||||||
|
|
||||||
import { writeFileSync, unlinkSync, mkdirSync, existsSync } from "node:fs";
|
|
||||||
import { dirname } from "node:path";
|
|
||||||
|
|
||||||
export interface SentinelHandle {
|
|
||||||
release(): void;
|
|
||||||
}
|
|
||||||
|
|
||||||
export function claimSentinel(path: string): SentinelHandle {
|
|
||||||
mkdirSync(dirname(path), { recursive: true });
|
|
||||||
const body = JSON.stringify({
|
|
||||||
pid: process.pid,
|
|
||||||
started_at: new Date().toISOString(),
|
|
||||||
});
|
|
||||||
writeFileSync(path, body + "\n", "utf8");
|
|
||||||
|
|
||||||
let released = false;
|
|
||||||
return {
|
|
||||||
release() {
|
|
||||||
if (released) return;
|
|
||||||
released = true;
|
|
||||||
try {
|
|
||||||
if (existsSync(path)) unlinkSync(path);
|
|
||||||
} catch {
|
|
||||||
// best-effort
|
|
||||||
}
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
@ -1,135 +0,0 @@
|
||||||
#!/usr/bin/env node
|
|
||||||
// agent-watcher-mcp — Layer 2 of the agent-watcher system.
|
|
||||||
//
|
|
||||||
// An MCP server (stdio transport) that:
|
|
||||||
// 1. Watches the local agent's ping inbox file via inotify (chokidar).
|
|
||||||
// 2. Surfaces unread pings into the Claude Code session as channel events
|
|
||||||
// (notifications/claude/channel) per the channels-reference.
|
|
||||||
// 3. Exposes three reply tools (ack / respond / mark_handled) so Claude
|
|
||||||
// can react.
|
|
||||||
// 4. Writes a `.watcher-active` sentinel so the existing agent-ping
|
|
||||||
// UserPromptSubmit hook stands down while the watcher is in charge
|
|
||||||
// of delivery.
|
|
||||||
//
|
|
||||||
// Spec: ../spec/agent-watcher.md §4.
|
|
||||||
//
|
|
||||||
// Launch (Claude Code v2.1.80+, research preview):
|
|
||||||
// claude --dangerously-load-development-channels server:agent-watcher
|
|
||||||
//
|
|
||||||
// mcp.json registration:
|
|
||||||
// { "mcpServers": { "agent-watcher": { "command": "agent-watcher-mcp" } } }
|
|
||||||
|
|
||||||
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
|
|
||||||
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
|
|
||||||
import {
|
|
||||||
ListToolsRequestSchema,
|
|
||||||
CallToolRequestSchema,
|
|
||||||
} from "@modelcontextprotocol/sdk/types.js";
|
|
||||||
|
|
||||||
import { resolveIdentity } from "./identity.js";
|
|
||||||
import { makePaths } from "./paths.js";
|
|
||||||
import { InboxWatcher } from "./watcher.js";
|
|
||||||
import { claimSentinel } from "./sentinel.js";
|
|
||||||
import { toolDefinitions, callTool } from "./tools.js";
|
|
||||||
import type { PingEvent } from "./inbox.js";
|
|
||||||
|
|
||||||
const SERVER_NAME = "agent-watcher";
|
|
||||||
const SERVER_VERSION = "0.1.0";
|
|
||||||
|
|
||||||
const INSTRUCTIONS = [
|
|
||||||
"Pings from other agents arrive as <channel source=\"agent-watcher\" ...> events.",
|
|
||||||
"Each event has these attributes: ping_id, sender, type, priority, ts.",
|
|
||||||
"Treat them as priority context. Respond patterns:",
|
|
||||||
"- type=NEEDS-RESPONSE → call the `respond` tool with the ping_id and a payload",
|
|
||||||
"- type=ACK-REQUEST → call the `ack` tool with the ping_id (no content reply needed)",
|
|
||||||
"- type=INFO → call `mark_handled` once you've factored it in (or call ack to record it)",
|
|
||||||
"- priority=urgent events render first regardless of timestamp",
|
|
||||||
"If a ping body begins with \"[WARNING: referenced file ... hasn't synced]\" the referenced",
|
|
||||||
"artifact wasn't on disk when the watcher delivered. Acknowledge but flag the missing file.",
|
|
||||||
].join("\n");
|
|
||||||
|
|
||||||
async function main(): Promise<void> {
|
|
||||||
const id = resolveIdentity();
|
|
||||||
const paths = makePaths();
|
|
||||||
|
|
||||||
const mcp = new Server(
|
|
||||||
{ name: SERVER_NAME, version: SERVER_VERSION },
|
|
||||||
{
|
|
||||||
capabilities: {
|
|
||||||
experimental: { "claude/channel": {} },
|
|
||||||
tools: {},
|
|
||||||
},
|
|
||||||
instructions: INSTRUCTIONS,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
const watcher = new InboxWatcher({
|
|
||||||
paths,
|
|
||||||
agent: id.agent,
|
|
||||||
notify: async (event: PingEvent, warning?: string) => {
|
|
||||||
const content = warning
|
|
||||||
? `[WARNING: ${warning}]\n${event.payload}`
|
|
||||||
: event.payload;
|
|
||||||
|
|
||||||
// meta keys must be identifier-safe (no hyphens). Map ping fields:
|
|
||||||
// - keep ping_id, sender, ts, priority verbatim
|
|
||||||
// - rename `type` → `ping_type` so it doesn't collide with anything
|
|
||||||
// the runtime might reserve
|
|
||||||
const meta: Record<string, string> = {
|
|
||||||
ping_id: event.id,
|
|
||||||
sender: event.from,
|
|
||||||
ping_type: event.type,
|
|
||||||
ts: event.ts,
|
|
||||||
};
|
|
||||||
if (event.priority) meta.priority = event.priority;
|
|
||||||
if (event.sentinel) meta.sentinel = sanitizeForMeta(event.sentinel);
|
|
||||||
|
|
||||||
await mcp.notification({
|
|
||||||
method: "notifications/claude/channel",
|
|
||||||
params: { content, meta },
|
|
||||||
});
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
mcp.setRequestHandler(ListToolsRequestSchema, async () => ({
|
|
||||||
tools: toolDefinitions(),
|
|
||||||
}));
|
|
||||||
|
|
||||||
mcp.setRequestHandler(CallToolRequestSchema, async (req) => {
|
|
||||||
const args = (req.params.arguments ?? {}) as Record<string, unknown>;
|
|
||||||
const result = callTool(req.params.name, args, {
|
|
||||||
paths,
|
|
||||||
agent: id.agent,
|
|
||||||
recentEvents: () => watcher.recentEvents(),
|
|
||||||
});
|
|
||||||
if (!result.ok) {
|
|
||||||
throw new Error(result.text);
|
|
||||||
}
|
|
||||||
return { content: [{ type: "text", text: result.text }] };
|
|
||||||
});
|
|
||||||
|
|
||||||
// Sentinel: signal the agent-ping hook to stand down on this host.
|
|
||||||
const sentinel = claimSentinel(paths.watcherActive(id.agent));
|
|
||||||
const cleanup = () => sentinel.release();
|
|
||||||
process.on("SIGINT", () => { cleanup(); process.exit(0); });
|
|
||||||
process.on("SIGTERM", () => { cleanup(); process.exit(0); });
|
|
||||||
process.on("exit", cleanup);
|
|
||||||
|
|
||||||
// Connect stdio first so notifications during the initial drain land cleanly.
|
|
||||||
const transport = new StdioServerTransport();
|
|
||||||
await mcp.connect(transport);
|
|
||||||
|
|
||||||
// Now drain + start watching.
|
|
||||||
await watcher.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Channel meta values must be identifier-safe strings. Replace anything weird. */
|
|
||||||
function sanitizeForMeta(s: string): string {
|
|
||||||
return s.replace(/[^A-Za-z0-9_/.:\-]/g, "_");
|
|
||||||
}
|
|
||||||
|
|
||||||
main().catch((err: unknown) => {
|
|
||||||
const msg = err instanceof Error ? err.message : String(err);
|
|
||||||
process.stderr.write(`agent-watcher-mcp: fatal: ${msg}\n`);
|
|
||||||
process.exit(1);
|
|
||||||
});
|
|
||||||
|
|
@ -1,196 +0,0 @@
|
||||||
// Reply tools exposed by the MCP server: ack, respond, mark_handled.
|
|
||||||
//
|
|
||||||
// These are standard MCP tools per the channels-reference. Claude calls them
|
|
||||||
// to send signals back to the originating sender or to mark a ping as handled
|
|
||||||
// without a content reply.
|
|
||||||
//
|
|
||||||
// Cross-host write discipline (spec §4.2): the tools always write to the
|
|
||||||
// local filesystem. Syncthing replicates outward. They never write directly
|
|
||||||
// to a remote-host path.
|
|
||||||
|
|
||||||
import { appendFileSync, mkdirSync, existsSync, writeFileSync, renameSync, readdirSync } from "node:fs";
|
|
||||||
import { dirname } from "node:path";
|
|
||||||
import { randomBytes } from "node:crypto";
|
|
||||||
import type { Paths } from "./paths.js";
|
|
||||||
import type { PingEvent } from "./inbox.js";
|
|
||||||
|
|
||||||
const VALID_TYPES = new Set(["INFO", "NEEDS-RESPONSE", "ACK-REQUEST"]);
|
|
||||||
|
|
||||||
export interface ToolDeps {
|
|
||||||
paths: Paths;
|
|
||||||
agent: string;
|
|
||||||
/** All pings the watcher has seen; used to look up ping_id → original sender. */
|
|
||||||
recentEvents: () => Map<string, PingEvent>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** JSON Schema for the three tools. Returned by ListToolsRequestSchema. */
|
|
||||||
export function toolDefinitions() {
|
|
||||||
return [
|
|
||||||
{
|
|
||||||
name: "ack",
|
|
||||||
description:
|
|
||||||
"Acknowledge receipt of a ping. Writes a delivery record to the shared acks log. " +
|
|
||||||
"Use for ACK-REQUEST type pings, or when you want to confirm a NEEDS-RESPONSE was seen " +
|
|
||||||
"before composing a longer reply.",
|
|
||||||
inputSchema: {
|
|
||||||
type: "object",
|
|
||||||
properties: {
|
|
||||||
ping_id: { type: "string", description: "The ping id (from the <channel ping_id=...> attribute)." },
|
|
||||||
message: { type: "string", description: "Optional short message to record alongside the ack." },
|
|
||||||
},
|
|
||||||
required: ["ping_id"],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "respond",
|
|
||||||
description:
|
|
||||||
"Send a response ping back to the original sender. Writes a JSONL line to <sender>.inbox. " +
|
|
||||||
"Use for content replies to NEEDS-RESPONSE pings.",
|
|
||||||
inputSchema: {
|
|
||||||
type: "object",
|
|
||||||
properties: {
|
|
||||||
ping_id: { type: "string", description: "The original ping id this is responding to." },
|
|
||||||
payload: { type: "string", description: "The response message." },
|
|
||||||
type: {
|
|
||||||
type: "string",
|
|
||||||
enum: ["INFO", "NEEDS-RESPONSE", "ACK-REQUEST"],
|
|
||||||
description: "Type of the outgoing ping. Default INFO.",
|
|
||||||
},
|
|
||||||
priority: {
|
|
||||||
type: "string",
|
|
||||||
enum: ["normal", "urgent"],
|
|
||||||
description: "Outgoing priority. Default normal.",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
required: ["ping_id", "payload"],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "mark_handled",
|
|
||||||
description:
|
|
||||||
"Mark a ping as processed without sending a reply. Records an ack with kind=delivered. " +
|
|
||||||
"Use for INFO pings you've factored into your reasoning.",
|
|
||||||
inputSchema: {
|
|
||||||
type: "object",
|
|
||||||
properties: {
|
|
||||||
ping_id: { type: "string", description: "The ping id to mark handled." },
|
|
||||||
},
|
|
||||||
required: ["ping_id"],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
];
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface CallResult {
|
|
||||||
ok: boolean;
|
|
||||||
text: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
export function callTool(name: string, args: Record<string, unknown>, deps: ToolDeps): CallResult {
|
|
||||||
switch (name) {
|
|
||||||
case "ack":
|
|
||||||
return doAck(args as { ping_id: string; message?: string }, deps);
|
|
||||||
case "respond":
|
|
||||||
return doRespond(
|
|
||||||
args as { ping_id: string; payload: string; type?: string; priority?: string },
|
|
||||||
deps,
|
|
||||||
);
|
|
||||||
case "mark_handled":
|
|
||||||
return doMarkHandled(args as { ping_id: string }, deps);
|
|
||||||
default:
|
|
||||||
return { ok: false, text: `unknown tool: ${name}` };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function lookupOriginalSender(ping_id: string, deps: ToolDeps): string | null {
|
|
||||||
const ev = deps.recentEvents().get(ping_id);
|
|
||||||
return ev?.from ?? null;
|
|
||||||
}
|
|
||||||
|
|
||||||
function writeAck(deps: ToolDeps, entry: Record<string, unknown>): void {
|
|
||||||
mkdirSync(dirname(deps.paths.acksLog), { recursive: true });
|
|
||||||
appendFileSync(deps.paths.acksLog, JSON.stringify(entry) + "\n", "utf8");
|
|
||||||
}
|
|
||||||
|
|
||||||
function doAck(args: { ping_id: string; message?: string }, deps: ToolDeps): CallResult {
|
|
||||||
if (!args.ping_id) return { ok: false, text: "ping_id required" };
|
|
||||||
const sender = lookupOriginalSender(args.ping_id, deps);
|
|
||||||
const entry: Record<string, unknown> = {
|
|
||||||
ts: nowUtc(),
|
|
||||||
acked_by: deps.agent,
|
|
||||||
ping_id: args.ping_id,
|
|
||||||
original_sender: sender,
|
|
||||||
ack_kind: "delivered",
|
|
||||||
};
|
|
||||||
if (args.message) entry.message = args.message;
|
|
||||||
const ev = deps.recentEvents().get(args.ping_id);
|
|
||||||
if (ev) entry.original_type = ev.type;
|
|
||||||
writeAck(deps, entry);
|
|
||||||
return { ok: true, text: `acked ${args.ping_id}` };
|
|
||||||
}
|
|
||||||
|
|
||||||
function doRespond(
|
|
||||||
args: { ping_id: string; payload: string; type?: string; priority?: string },
|
|
||||||
deps: ToolDeps,
|
|
||||||
): CallResult {
|
|
||||||
if (!args.ping_id) return { ok: false, text: "ping_id required" };
|
|
||||||
if (!args.payload) return { ok: false, text: "payload required" };
|
|
||||||
const recipient = lookupOriginalSender(args.ping_id, deps);
|
|
||||||
if (!recipient) {
|
|
||||||
return {
|
|
||||||
ok: false,
|
|
||||||
text: `cannot respond: ping ${args.ping_id} not found in recent events. ` +
|
|
||||||
`It may have been delivered before this watcher started.`,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
const type = (args.type ?? "INFO").toUpperCase();
|
|
||||||
if (!VALID_TYPES.has(type)) {
|
|
||||||
return { ok: false, text: `invalid type ${args.type}; want INFO|NEEDS-RESPONSE|ACK-REQUEST` };
|
|
||||||
}
|
|
||||||
const priority = args.priority ?? "normal";
|
|
||||||
if (priority !== "normal" && priority !== "urgent") {
|
|
||||||
return { ok: false, text: `invalid priority ${priority}; want normal|urgent` };
|
|
||||||
}
|
|
||||||
|
|
||||||
const newEvent: PingEvent = {
|
|
||||||
ts: nowUtc(),
|
|
||||||
id: "ping-" + randomBytes(4).toString("hex"),
|
|
||||||
from: deps.agent,
|
|
||||||
to: recipient,
|
|
||||||
type,
|
|
||||||
priority,
|
|
||||||
payload: args.payload,
|
|
||||||
};
|
|
||||||
|
|
||||||
const target = deps.paths.inbox(recipient);
|
|
||||||
appendInboxAtomic(target, newEvent);
|
|
||||||
|
|
||||||
// Also log the ack so the original sender's `acks/log` reflects "responded".
|
|
||||||
writeAck(deps, {
|
|
||||||
ts: nowUtc(),
|
|
||||||
acked_by: deps.agent,
|
|
||||||
ping_id: args.ping_id,
|
|
||||||
original_sender: recipient,
|
|
||||||
ack_kind: "responded",
|
|
||||||
response_id: newEvent.id,
|
|
||||||
});
|
|
||||||
|
|
||||||
return { ok: true, text: `responded to ${args.ping_id} → ${recipient}.inbox (${newEvent.id})` };
|
|
||||||
}
|
|
||||||
|
|
||||||
function doMarkHandled(args: { ping_id: string }, deps: ToolDeps): CallResult {
|
|
||||||
if (!args.ping_id) return { ok: false, text: "ping_id required" };
|
|
||||||
return doAck({ ping_id: args.ping_id, message: "marked handled, no reply needed" }, deps);
|
|
||||||
}
|
|
||||||
|
|
||||||
function appendInboxAtomic(path: string, ev: PingEvent): void {
|
|
||||||
mkdirSync(dirname(path), { recursive: true });
|
|
||||||
// O_APPEND on POSIX guarantees atomicity for writes <= PIPE_BUF (4 KiB).
|
|
||||||
// Our JSONL line is well under that, so a plain appendFileSync is safe
|
|
||||||
// even if other processes (ping CLI, Collector) are also appending.
|
|
||||||
appendFileSync(path, JSON.stringify(ev) + "\n", "utf8");
|
|
||||||
}
|
|
||||||
|
|
||||||
function nowUtc(): string {
|
|
||||||
return new Date().toISOString().replace(/\.\d{3}Z$/, "Z");
|
|
||||||
}
|
|
||||||
|
|
@ -1,147 +0,0 @@
|
||||||
// Inbox watcher — wraps chokidar on the inbox file and emits channel
|
|
||||||
// notifications via the supplied notifier callback.
|
|
||||||
//
|
|
||||||
// Behaviour:
|
|
||||||
// 1. On startup: read the inbox, drain anything since the HWM, emit
|
|
||||||
// pending pings.
|
|
||||||
// 2. Set up chokidar on the inbox file. On change, re-read, emit new lines.
|
|
||||||
// 3. Sentinel-deferred pings remain in the inbox; on each change-event
|
|
||||||
// we re-evaluate them. The HWM file does NOT advance past them.
|
|
||||||
//
|
|
||||||
// The notifier is the MCP server's `mcp.notification()` wrapped to take a
|
|
||||||
// PingEvent + optional warning. Decoupled so this module is testable
|
|
||||||
// without spinning up an MCP transport.
|
|
||||||
|
|
||||||
import chokidar, { type FSWatcher } from "chokidar";
|
|
||||||
import { existsSync } from "node:fs";
|
|
||||||
import type { Paths } from "./paths.js";
|
|
||||||
import {
|
|
||||||
readInbox,
|
|
||||||
readHwm,
|
|
||||||
writeHwm,
|
|
||||||
unreadSinceHwm,
|
|
||||||
type Deliverable,
|
|
||||||
type HwmState,
|
|
||||||
type PingEvent,
|
|
||||||
} from "./inbox.js";
|
|
||||||
|
|
||||||
export type Notifier = (event: PingEvent, warning?: string) => Promise<void>;
|
|
||||||
|
|
||||||
export interface WatcherOptions {
|
|
||||||
paths: Paths;
|
|
||||||
agent: string;
|
|
||||||
notify: Notifier;
|
|
||||||
/** Override for testability. Defaults to fs.existsSync. */
|
|
||||||
sentinelExists?: (p: string) => boolean;
|
|
||||||
/** Called on every drain after writeHwm; useful for tests + telemetry. */
|
|
||||||
onDrain?: (result: { delivered: number; deferred: number }) => void;
|
|
||||||
/** Override for testability — chokidar in prod, mock in tests. */
|
|
||||||
startWatcher?: (path: string, onChange: () => void) => FSWatcher | { close(): Promise<void> };
|
|
||||||
}
|
|
||||||
|
|
||||||
export class InboxWatcher {
|
|
||||||
private opts: WatcherOptions;
|
|
||||||
private fsWatcher: FSWatcher | { close(): Promise<void> } | null = null;
|
|
||||||
/** All seen events keyed by id, used by the respond tool to look up sender. */
|
|
||||||
private seen = new Map<string, PingEvent>();
|
|
||||||
private draining = false;
|
|
||||||
private pendingDrain = false;
|
|
||||||
|
|
||||||
constructor(opts: WatcherOptions) {
|
|
||||||
this.opts = opts;
|
|
||||||
}
|
|
||||||
|
|
||||||
async start(): Promise<void> {
|
|
||||||
// Initial drain
|
|
||||||
await this.drain();
|
|
||||||
|
|
||||||
const inboxPath = this.opts.paths.inbox(this.opts.agent);
|
|
||||||
const start = this.opts.startWatcher ?? defaultChokidar;
|
|
||||||
this.fsWatcher = start(inboxPath, () => {
|
|
||||||
void this.drain();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async stop(): Promise<void> {
|
|
||||||
if (this.fsWatcher) {
|
|
||||||
await this.fsWatcher.close();
|
|
||||||
this.fsWatcher = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Snapshot of seen events. Used by the respond tool. */
|
|
||||||
recentEvents(): Map<string, PingEvent> {
|
|
||||||
return this.seen;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Drain new pings since HWM. Coalesces back-to-back fires. */
|
|
||||||
private async drain(): Promise<void> {
|
|
||||||
if (this.draining) {
|
|
||||||
this.pendingDrain = true;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.draining = true;
|
|
||||||
try {
|
|
||||||
do {
|
|
||||||
this.pendingDrain = false;
|
|
||||||
await this.drainOnce();
|
|
||||||
} while (this.pendingDrain);
|
|
||||||
} finally {
|
|
||||||
this.draining = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async drainOnce(): Promise<void> {
|
|
||||||
const inboxPath = this.opts.paths.inbox(this.opts.agent);
|
|
||||||
const hwmPath = this.opts.paths.hwm(this.opts.agent);
|
|
||||||
|
|
||||||
const events = readInbox(inboxPath);
|
|
||||||
// Track every seen event so respond() can look them up.
|
|
||||||
for (const ev of events) this.seen.set(ev.id, ev);
|
|
||||||
|
|
||||||
const hwm = readHwm(hwmPath);
|
|
||||||
const sentinelExists = this.opts.sentinelExists ?? ((p: string) => existsSync(p));
|
|
||||||
const result = unreadSinceHwm(events, hwm, sentinelExists);
|
|
||||||
|
|
||||||
// Emit normal-priority pings in ts order; urgent first.
|
|
||||||
const sorted: Deliverable[] = [...result.deliverable].sort((a, b) => {
|
|
||||||
const aUrg = a.event.priority === "urgent" ? 0 : 1;
|
|
||||||
const bUrg = b.event.priority === "urgent" ? 0 : 1;
|
|
||||||
if (aUrg !== bUrg) return aUrg - bUrg;
|
|
||||||
return a.event.ts.localeCompare(b.event.ts);
|
|
||||||
});
|
|
||||||
|
|
||||||
for (const d of sorted) {
|
|
||||||
await this.opts.notify(d.event, d.warning);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Persist HWM regardless of deferred count — pending_attempts changes too.
|
|
||||||
if (sorted.length > 0 || hwmDifferent(hwm, result.nextHwm)) {
|
|
||||||
writeHwm(hwmPath, result.nextHwm);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.opts.onDrain?.({ delivered: sorted.length, deferred: result.deferred.length });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function hwmDifferent(a: HwmState, b: HwmState): boolean {
|
|
||||||
if (a.last_delivered_ts !== b.last_delivered_ts) return true;
|
|
||||||
const ak = Object.keys(a.pending_attempts);
|
|
||||||
const bk = Object.keys(b.pending_attempts);
|
|
||||||
if (ak.length !== bk.length) return true;
|
|
||||||
for (const k of ak) {
|
|
||||||
if (a.pending_attempts[k] !== b.pending_attempts[k]) return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
function defaultChokidar(path: string, onChange: () => void): FSWatcher {
|
|
||||||
const w = chokidar.watch(path, {
|
|
||||||
persistent: true,
|
|
||||||
ignoreInitial: true,
|
|
||||||
awaitWriteFinish: { stabilityThreshold: 100, pollInterval: 50 },
|
|
||||||
});
|
|
||||||
w.on("add", onChange);
|
|
||||||
w.on("change", onChange);
|
|
||||||
return w;
|
|
||||||
}
|
|
||||||
|
|
@ -1,94 +0,0 @@
|
||||||
import { describe, it, expect, beforeEach, afterEach } from "vitest";
|
|
||||||
import { mkdtempSync, writeFileSync, rmSync, mkdirSync } from "node:fs";
|
|
||||||
import { tmpdir } from "node:os";
|
|
||||||
import { join } from "node:path";
|
|
||||||
import { resolveIdentity } from "../src/identity.js";
|
|
||||||
|
|
||||||
let dir: string;
|
|
||||||
beforeEach(() => {
|
|
||||||
dir = mkdtempSync(join(tmpdir(), "watcher-id-"));
|
|
||||||
});
|
|
||||||
afterEach(() => {
|
|
||||||
rmSync(dir, { recursive: true, force: true });
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("resolveIdentity", () => {
|
|
||||||
it("uses PING_AGENT_IDENTITY env var first", () => {
|
|
||||||
writeFileSync(join(dir, ".ping-agent"), "default-agent\n");
|
|
||||||
const r = resolveIdentity({
|
|
||||||
HOME: dir,
|
|
||||||
PING_AGENT_IDENTITY: "envname",
|
|
||||||
});
|
|
||||||
expect(r.agent).toBe("envname");
|
|
||||||
expect(r.source).toMatch(/env/i);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("uses $CLAUDE_CONFIG_DIR/ping-agent next", () => {
|
|
||||||
writeFileSync(join(dir, ".ping-agent"), "default-agent\n");
|
|
||||||
const sandbox = join(dir, "claude-sandbox");
|
|
||||||
mkdirSync(sandbox);
|
|
||||||
writeFileSync(join(sandbox, "ping-agent"), "sandbox\n");
|
|
||||||
const r = resolveIdentity({
|
|
||||||
HOME: dir,
|
|
||||||
CLAUDE_CONFIG_DIR: sandbox,
|
|
||||||
PING_AGENT_IDENTITY: "",
|
|
||||||
});
|
|
||||||
expect(r.agent).toBe("sandbox");
|
|
||||||
expect(r.source).toContain(sandbox);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("falls back to $CLAUDE_HOME/ping-agent when CLAUDE_CONFIG_DIR is unset", () => {
|
|
||||||
writeFileSync(join(dir, ".ping-agent"), "default-agent\n");
|
|
||||||
const claudeHome = join(dir, "claude-old");
|
|
||||||
mkdirSync(claudeHome);
|
|
||||||
writeFileSync(join(claudeHome, "ping-agent"), "compat\n");
|
|
||||||
const r = resolveIdentity({
|
|
||||||
HOME: dir,
|
|
||||||
CLAUDE_HOME: claudeHome,
|
|
||||||
PING_AGENT_IDENTITY: "",
|
|
||||||
});
|
|
||||||
expect(r.agent).toBe("compat");
|
|
||||||
expect(r.source).toContain(claudeHome);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("CLAUDE_CONFIG_DIR takes precedence over CLAUDE_HOME", () => {
|
|
||||||
const sandbox = join(dir, "claude-sandbox");
|
|
||||||
const old = join(dir, "claude-old");
|
|
||||||
mkdirSync(sandbox);
|
|
||||||
mkdirSync(old);
|
|
||||||
writeFileSync(join(sandbox, "ping-agent"), "new\n");
|
|
||||||
writeFileSync(join(old, "ping-agent"), "old\n");
|
|
||||||
const r = resolveIdentity({
|
|
||||||
HOME: dir,
|
|
||||||
CLAUDE_CONFIG_DIR: sandbox,
|
|
||||||
CLAUDE_HOME: old,
|
|
||||||
});
|
|
||||||
expect(r.agent).toBe("new");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("falls through to ~/.ping-agent", () => {
|
|
||||||
writeFileSync(join(dir, ".ping-agent"), "bob\n");
|
|
||||||
const r = resolveIdentity({ HOME: dir });
|
|
||||||
expect(r.agent).toBe("bob");
|
|
||||||
expect(r.source).toContain(".ping-agent");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("throws when nothing resolves", () => {
|
|
||||||
expect(() => resolveIdentity({ HOME: dir })).toThrow(/no identity/);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("ignores empty CLAUDE_CONFIG_DIR/ping-agent and falls through", () => {
|
|
||||||
writeFileSync(join(dir, ".ping-agent"), "bob\n");
|
|
||||||
const sandbox = join(dir, "claude-sandbox");
|
|
||||||
mkdirSync(sandbox);
|
|
||||||
writeFileSync(join(sandbox, "ping-agent"), "");
|
|
||||||
const r = resolveIdentity({ HOME: dir, CLAUDE_CONFIG_DIR: sandbox });
|
|
||||||
expect(r.agent).toBe("bob");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("trims whitespace from identity files", () => {
|
|
||||||
writeFileSync(join(dir, ".ping-agent"), " bob \n\n");
|
|
||||||
const r = resolveIdentity({ HOME: dir });
|
|
||||||
expect(r.agent).toBe("bob");
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
@ -1,175 +0,0 @@
|
||||||
import { describe, it, expect, beforeEach, afterEach } from "vitest";
|
|
||||||
import { mkdtempSync, writeFileSync, existsSync, readFileSync, rmSync } from "node:fs";
|
|
||||||
import { tmpdir } from "node:os";
|
|
||||||
import { join } from "node:path";
|
|
||||||
import {
|
|
||||||
readInbox,
|
|
||||||
readHwm,
|
|
||||||
writeHwm,
|
|
||||||
unreadSinceHwm,
|
|
||||||
DEFER_LIMIT,
|
|
||||||
type PingEvent,
|
|
||||||
type HwmState,
|
|
||||||
} from "../src/inbox.js";
|
|
||||||
|
|
||||||
const ev = (over: Partial<PingEvent>): PingEvent => ({
|
|
||||||
ts: "2026-05-06T10:00:00Z",
|
|
||||||
id: "ping-aaa",
|
|
||||||
from: "foreman",
|
|
||||||
to: "bob",
|
|
||||||
type: "INFO",
|
|
||||||
payload: "test",
|
|
||||||
...over,
|
|
||||||
});
|
|
||||||
|
|
||||||
let dir: string;
|
|
||||||
beforeEach(() => {
|
|
||||||
dir = mkdtempSync(join(tmpdir(), "watcher-test-"));
|
|
||||||
});
|
|
||||||
afterEach(() => {
|
|
||||||
rmSync(dir, { recursive: true, force: true });
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("readInbox", () => {
|
|
||||||
it("returns empty for missing file", () => {
|
|
||||||
expect(readInbox(join(dir, "missing.inbox"))).toEqual([]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("parses JSONL, skips blank + malformed lines", () => {
|
|
||||||
const path = join(dir, "x.inbox");
|
|
||||||
writeFileSync(
|
|
||||||
path,
|
|
||||||
[
|
|
||||||
JSON.stringify(ev({ id: "ping-1" })),
|
|
||||||
"",
|
|
||||||
"{ not json",
|
|
||||||
JSON.stringify(ev({ id: "ping-2", ts: "2026-05-06T11:00:00Z" })),
|
|
||||||
"",
|
|
||||||
].join("\n"),
|
|
||||||
);
|
|
||||||
const r = readInbox(path);
|
|
||||||
expect(r.map((e) => e.id)).toEqual(["ping-1", "ping-2"]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("requires id and ts on each line", () => {
|
|
||||||
const path = join(dir, "x.inbox");
|
|
||||||
writeFileSync(path, [
|
|
||||||
JSON.stringify({ ts: "x", payload: "missing id" }),
|
|
||||||
JSON.stringify({ id: "missing-ts", payload: "x" }),
|
|
||||||
JSON.stringify(ev({ id: "good" })),
|
|
||||||
].join("\n"));
|
|
||||||
expect(readInbox(path).map((e) => e.id)).toEqual(["good"]);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("readHwm + writeHwm", () => {
|
|
||||||
it("returns empty state for missing file", () => {
|
|
||||||
const r = readHwm(join(dir, "missing.hwm"));
|
|
||||||
expect(r).toEqual({ last_delivered_ts: "", pending_attempts: {} });
|
|
||||||
});
|
|
||||||
|
|
||||||
it("round-trips through atomic write", () => {
|
|
||||||
const path = join(dir, ".hwm");
|
|
||||||
const state: HwmState = {
|
|
||||||
last_delivered_ts: "2026-05-06T12:00:00Z",
|
|
||||||
pending_attempts: { "ping-x": 2 },
|
|
||||||
};
|
|
||||||
writeHwm(path, state);
|
|
||||||
expect(existsSync(path)).toBe(true);
|
|
||||||
expect(existsSync(path + ".tmp")).toBe(false);
|
|
||||||
expect(readHwm(path)).toEqual(state);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("recovers from corrupt file with empty state", () => {
|
|
||||||
const path = join(dir, ".hwm");
|
|
||||||
writeFileSync(path, "{ corrupt");
|
|
||||||
expect(readHwm(path)).toEqual({ last_delivered_ts: "", pending_attempts: {} });
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("unreadSinceHwm", () => {
|
|
||||||
const yes = () => true;
|
|
||||||
const no = () => false;
|
|
||||||
|
|
||||||
it("returns all events when HWM is empty", () => {
|
|
||||||
const events = [ev({ id: "a", ts: "2026-05-06T10:00:00Z" })];
|
|
||||||
const r = unreadSinceHwm(events, { last_delivered_ts: "", pending_attempts: {} }, yes);
|
|
||||||
expect(r.deliverable.map((d) => d.event.id)).toEqual(["a"]);
|
|
||||||
expect(r.deferred).toEqual([]);
|
|
||||||
expect(r.nextHwm.last_delivered_ts).toBe("2026-05-06T10:00:00Z");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("filters out events with ts <= HWM", () => {
|
|
||||||
const events = [
|
|
||||||
ev({ id: "old", ts: "2026-05-06T09:00:00Z" }),
|
|
||||||
ev({ id: "new", ts: "2026-05-06T11:00:00Z" }),
|
|
||||||
];
|
|
||||||
const r = unreadSinceHwm(
|
|
||||||
events,
|
|
||||||
{ last_delivered_ts: "2026-05-06T10:00:00Z", pending_attempts: {} },
|
|
||||||
yes,
|
|
||||||
);
|
|
||||||
expect(r.deliverable.map((d) => d.event.id)).toEqual(["new"]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("defers events whose sentinel is missing, increments attempts", () => {
|
|
||||||
const events = [ev({ id: "a", sentinel: "/nope" })];
|
|
||||||
const r = unreadSinceHwm(events, { last_delivered_ts: "", pending_attempts: {} }, no);
|
|
||||||
expect(r.deliverable).toEqual([]);
|
|
||||||
expect(r.deferred.map((e) => e.id)).toEqual(["a"]);
|
|
||||||
expect(r.nextHwm.pending_attempts).toEqual({ a: 1 });
|
|
||||||
// HWM does NOT advance past the deferred event
|
|
||||||
expect(r.nextHwm.last_delivered_ts).toBe("");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("delivers with warning on the DEFER_LIMIT-th attempt", () => {
|
|
||||||
const events = [ev({ id: "a", sentinel: "/nope" })];
|
|
||||||
const r = unreadSinceHwm(
|
|
||||||
events,
|
|
||||||
{ last_delivered_ts: "", pending_attempts: { a: DEFER_LIMIT - 1 } },
|
|
||||||
no,
|
|
||||||
);
|
|
||||||
expect(r.deliverable.length).toBe(1);
|
|
||||||
expect(r.deliverable[0].warning).toMatch(/hasn't synced/);
|
|
||||||
expect(r.deferred).toEqual([]);
|
|
||||||
// attempts cleared once delivered
|
|
||||||
expect(r.nextHwm.pending_attempts).toEqual({});
|
|
||||||
expect(r.nextHwm.last_delivered_ts).toBe("2026-05-06T10:00:00Z");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("delivers immediately when sentinel exists", () => {
|
|
||||||
const events = [ev({ id: "a", sentinel: "/exists" })];
|
|
||||||
const r = unreadSinceHwm(events, { last_delivered_ts: "", pending_attempts: {} }, yes);
|
|
||||||
expect(r.deliverable.length).toBe(1);
|
|
||||||
expect(r.deliverable[0].warning).toBeUndefined();
|
|
||||||
expect(r.nextHwm.pending_attempts).toEqual({});
|
|
||||||
});
|
|
||||||
|
|
||||||
it("HWM does not advance past deferred event even when later events deliver", () => {
|
|
||||||
const events = [
|
|
||||||
ev({ id: "deferred", ts: "2026-05-06T10:00:00Z", sentinel: "/nope" }),
|
|
||||||
ev({ id: "later", ts: "2026-05-06T11:00:00Z" }),
|
|
||||||
];
|
|
||||||
const sentinelExists = (p: string) => p !== "/nope";
|
|
||||||
const r = unreadSinceHwm(
|
|
||||||
events,
|
|
||||||
{ last_delivered_ts: "", pending_attempts: {} },
|
|
||||||
sentinelExists,
|
|
||||||
);
|
|
||||||
expect(r.deliverable.map((d) => d.event.id)).toEqual(["later"]);
|
|
||||||
expect(r.deferred.map((e) => e.id)).toEqual(["deferred"]);
|
|
||||||
// HWM stays empty so the deferred event is reconsidered next read
|
|
||||||
expect(r.nextHwm.last_delivered_ts).toBe("");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("clears pending_attempts when sentinel finally lands", () => {
|
|
||||||
const events = [ev({ id: "a", sentinel: "/now-ok" })];
|
|
||||||
const r = unreadSinceHwm(
|
|
||||||
events,
|
|
||||||
{ last_delivered_ts: "", pending_attempts: { a: 2 } },
|
|
||||||
yes,
|
|
||||||
);
|
|
||||||
expect(r.deliverable.length).toBe(1);
|
|
||||||
expect(r.nextHwm.pending_attempts).toEqual({});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
@ -1,159 +0,0 @@
|
||||||
import { describe, it, expect, beforeEach, afterEach } from "vitest";
|
|
||||||
import { mkdtempSync, readFileSync, existsSync, rmSync } from "node:fs";
|
|
||||||
import { tmpdir } from "node:os";
|
|
||||||
import { join } from "node:path";
|
|
||||||
import { makePaths } from "../src/paths.js";
|
|
||||||
import { callTool, toolDefinitions } from "../src/tools.js";
|
|
||||||
import type { PingEvent } from "../src/inbox.js";
|
|
||||||
|
|
||||||
let dir: string;
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
dir = mkdtempSync(join(tmpdir(), "watcher-tools-"));
|
|
||||||
});
|
|
||||||
afterEach(() => {
|
|
||||||
rmSync(dir, { recursive: true, force: true });
|
|
||||||
});
|
|
||||||
|
|
||||||
const setupDeps = (recent: Map<string, PingEvent>) => ({
|
|
||||||
paths: makePaths(dir),
|
|
||||||
agent: "bob",
|
|
||||||
recentEvents: () => recent,
|
|
||||||
});
|
|
||||||
|
|
||||||
const sampleEvent = (over: Partial<PingEvent> = {}): PingEvent => ({
|
|
||||||
ts: "2026-05-06T10:00:00Z",
|
|
||||||
id: "ping-orig",
|
|
||||||
from: "foreman",
|
|
||||||
to: "bob",
|
|
||||||
type: "NEEDS-RESPONSE",
|
|
||||||
payload: "do the thing",
|
|
||||||
...over,
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("toolDefinitions", () => {
|
|
||||||
it("declares ack, respond, and mark_handled with required ping_id", () => {
|
|
||||||
const tools = toolDefinitions();
|
|
||||||
const names = tools.map((t) => t.name).sort();
|
|
||||||
expect(names).toEqual(["ack", "mark_handled", "respond"]);
|
|
||||||
for (const t of tools) {
|
|
||||||
expect(t.inputSchema.required).toContain("ping_id");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("callTool — ack", () => {
|
|
||||||
it("appends to acks/log with ack_kind=delivered + original_type", () => {
|
|
||||||
const recent = new Map<string, PingEvent>([["ping-orig", sampleEvent()]]);
|
|
||||||
const deps = setupDeps(recent);
|
|
||||||
const r = callTool("ack", { ping_id: "ping-orig" }, deps);
|
|
||||||
expect(r.ok).toBe(true);
|
|
||||||
const log = readFileSync(deps.paths.acksLog, "utf8").trim();
|
|
||||||
const obj = JSON.parse(log);
|
|
||||||
expect(obj.acked_by).toBe("bob");
|
|
||||||
expect(obj.ping_id).toBe("ping-orig");
|
|
||||||
expect(obj.original_sender).toBe("foreman");
|
|
||||||
expect(obj.ack_kind).toBe("delivered");
|
|
||||||
expect(obj.original_type).toBe("NEEDS-RESPONSE");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("rejects missing ping_id", () => {
|
|
||||||
const r = callTool("ack", {}, setupDeps(new Map()));
|
|
||||||
expect(r.ok).toBe(false);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("callTool — respond", () => {
|
|
||||||
it("writes a JSONL line to <sender>.inbox + ack with ack_kind=responded", () => {
|
|
||||||
const recent = new Map<string, PingEvent>([["ping-orig", sampleEvent()]]);
|
|
||||||
const deps = setupDeps(recent);
|
|
||||||
const r = callTool(
|
|
||||||
"respond",
|
|
||||||
{ ping_id: "ping-orig", payload: "ok done", type: "INFO" },
|
|
||||||
deps,
|
|
||||||
);
|
|
||||||
expect(r.ok).toBe(true);
|
|
||||||
const inboxPath = deps.paths.inbox("foreman");
|
|
||||||
const lines = readFileSync(inboxPath, "utf8").trim().split("\n");
|
|
||||||
expect(lines).toHaveLength(1);
|
|
||||||
const ev = JSON.parse(lines[0]);
|
|
||||||
expect(ev.from).toBe("bob");
|
|
||||||
expect(ev.to).toBe("foreman");
|
|
||||||
expect(ev.type).toBe("INFO");
|
|
||||||
expect(ev.payload).toBe("ok done");
|
|
||||||
expect(ev.id).toMatch(/^ping-[a-f0-9]{8}$/);
|
|
||||||
|
|
||||||
const ack = JSON.parse(readFileSync(deps.paths.acksLog, "utf8").trim());
|
|
||||||
expect(ack.ack_kind).toBe("responded");
|
|
||||||
expect(ack.response_id).toBe(ev.id);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("defaults missing type to INFO + missing priority to normal", () => {
|
|
||||||
const recent = new Map<string, PingEvent>([["ping-orig", sampleEvent()]]);
|
|
||||||
const deps = setupDeps(recent);
|
|
||||||
callTool("respond", { ping_id: "ping-orig", payload: "x" }, deps);
|
|
||||||
const lines = readFileSync(deps.paths.inbox("foreman"), "utf8").trim().split("\n");
|
|
||||||
const ev = JSON.parse(lines[0]);
|
|
||||||
expect(ev.type).toBe("INFO");
|
|
||||||
expect(ev.priority).toBe("normal");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("rejects unknown ping_id", () => {
|
|
||||||
const r = callTool(
|
|
||||||
"respond",
|
|
||||||
{ ping_id: "ping-unknown", payload: "x" },
|
|
||||||
setupDeps(new Map()),
|
|
||||||
);
|
|
||||||
expect(r.ok).toBe(false);
|
|
||||||
expect(r.text).toContain("not found");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("rejects invalid type", () => {
|
|
||||||
const recent = new Map<string, PingEvent>([["ping-orig", sampleEvent()]]);
|
|
||||||
const r = callTool(
|
|
||||||
"respond",
|
|
||||||
{ ping_id: "ping-orig", payload: "x", type: "bogus" },
|
|
||||||
setupDeps(recent),
|
|
||||||
);
|
|
||||||
expect(r.ok).toBe(false);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("callTool — mark_handled", () => {
|
|
||||||
it("writes an ack with explanatory message", () => {
|
|
||||||
const recent = new Map<string, PingEvent>([["ping-orig", sampleEvent()]]);
|
|
||||||
const deps = setupDeps(recent);
|
|
||||||
const r = callTool("mark_handled", { ping_id: "ping-orig" }, deps);
|
|
||||||
expect(r.ok).toBe(true);
|
|
||||||
const ack = JSON.parse(readFileSync(deps.paths.acksLog, "utf8").trim());
|
|
||||||
expect(ack.ack_kind).toBe("delivered");
|
|
||||||
expect(ack.message).toMatch(/handled/i);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("callTool — unknown tool", () => {
|
|
||||||
it("returns ok=false on unknown tool name", () => {
|
|
||||||
const r = callTool("bogus", { ping_id: "x" }, setupDeps(new Map()));
|
|
||||||
expect(r.ok).toBe(false);
|
|
||||||
expect(r.text).toContain("unknown");
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("ack does not crash on unknown ping_id", () => {
|
|
||||||
it("still writes an ack with original_sender=null", () => {
|
|
||||||
const deps = setupDeps(new Map());
|
|
||||||
const r = callTool("ack", { ping_id: "ping-unknown" }, deps);
|
|
||||||
expect(r.ok).toBe(true);
|
|
||||||
const ack = JSON.parse(readFileSync(deps.paths.acksLog, "utf8").trim());
|
|
||||||
expect(ack.original_sender).toBeNull();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("paths writability through tools", () => {
|
|
||||||
it("creates the acks dir on first ack", () => {
|
|
||||||
const deps = setupDeps(new Map([["p1", sampleEvent({ id: "p1" })]]));
|
|
||||||
expect(existsSync(deps.paths.acksLog)).toBe(false);
|
|
||||||
callTool("ack", { ping_id: "p1" }, deps);
|
|
||||||
expect(existsSync(deps.paths.acksLog)).toBe(true);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
@ -1,155 +0,0 @@
|
||||||
import { describe, it, expect, beforeEach, afterEach } from "vitest";
|
|
||||||
import { mkdtempSync, writeFileSync, mkdirSync, appendFileSync, rmSync, readFileSync } from "node:fs";
|
|
||||||
import { tmpdir } from "node:os";
|
|
||||||
import { join } from "node:path";
|
|
||||||
import { makePaths } from "../src/paths.js";
|
|
||||||
import { InboxWatcher } from "../src/watcher.js";
|
|
||||||
import type { PingEvent } from "../src/inbox.js";
|
|
||||||
|
|
||||||
let dir: string;
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
dir = mkdtempSync(join(tmpdir(), "watcher-watch-"));
|
|
||||||
});
|
|
||||||
afterEach(() => {
|
|
||||||
rmSync(dir, { recursive: true, force: true });
|
|
||||||
});
|
|
||||||
|
|
||||||
const ev = (over: Partial<PingEvent>): PingEvent => ({
|
|
||||||
ts: "2026-05-06T10:00:00Z",
|
|
||||||
id: "ping-" + Math.random().toString(36).slice(2, 10),
|
|
||||||
from: "foreman",
|
|
||||||
to: "bob",
|
|
||||||
type: "INFO",
|
|
||||||
payload: "test",
|
|
||||||
...over,
|
|
||||||
});
|
|
||||||
|
|
||||||
const writeInbox = (path: string, events: PingEvent[]) => {
|
|
||||||
mkdirSync(join(path, "..").replace(/[^/]*\/?$/, ""), { recursive: true });
|
|
||||||
writeFileSync(path, events.map((e) => JSON.stringify(e)).join("\n") + "\n");
|
|
||||||
};
|
|
||||||
|
|
||||||
describe("InboxWatcher initial drain", () => {
|
|
||||||
it("emits all unread events on start, then advances HWM", async () => {
|
|
||||||
const paths = makePaths(dir);
|
|
||||||
mkdirSync(paths.pingsDir, { recursive: true });
|
|
||||||
writeInbox(paths.inbox("bob"), [
|
|
||||||
ev({ id: "a", ts: "2026-05-06T10:00:00Z" }),
|
|
||||||
ev({ id: "b", ts: "2026-05-06T11:00:00Z" }),
|
|
||||||
]);
|
|
||||||
|
|
||||||
const delivered: { event: PingEvent; warning?: string }[] = [];
|
|
||||||
const w = new InboxWatcher({
|
|
||||||
paths,
|
|
||||||
agent: "bob",
|
|
||||||
notify: async (event, warning) => {
|
|
||||||
delivered.push({ event, warning });
|
|
||||||
},
|
|
||||||
// No-op chokidar substitute
|
|
||||||
startWatcher: () => ({ async close() {} }),
|
|
||||||
});
|
|
||||||
await w.start();
|
|
||||||
await w.stop();
|
|
||||||
|
|
||||||
expect(delivered.map((d) => d.event.id)).toEqual(["a", "b"]);
|
|
||||||
const hwm = JSON.parse(readFileSync(paths.hwm("bob"), "utf8"));
|
|
||||||
expect(hwm.last_delivered_ts).toBe("2026-05-06T11:00:00Z");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("orders urgent before normal regardless of timestamp", async () => {
|
|
||||||
const paths = makePaths(dir);
|
|
||||||
mkdirSync(paths.pingsDir, { recursive: true });
|
|
||||||
writeInbox(paths.inbox("bob"), [
|
|
||||||
ev({ id: "older-normal", ts: "2026-05-06T10:00:00Z", priority: "normal" }),
|
|
||||||
ev({ id: "newer-urgent", ts: "2026-05-06T11:00:00Z", priority: "urgent" }),
|
|
||||||
]);
|
|
||||||
|
|
||||||
const delivered: PingEvent[] = [];
|
|
||||||
const w = new InboxWatcher({
|
|
||||||
paths, agent: "bob",
|
|
||||||
notify: async (e) => { delivered.push(e); },
|
|
||||||
startWatcher: () => ({ async close() {} }),
|
|
||||||
});
|
|
||||||
await w.start();
|
|
||||||
await w.stop();
|
|
||||||
expect(delivered.map((d) => d.id)).toEqual(["newer-urgent", "older-normal"]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("defers a sentinel-missing ping and re-attempts on next drain", async () => {
|
|
||||||
const paths = makePaths(dir);
|
|
||||||
mkdirSync(paths.pingsDir, { recursive: true });
|
|
||||||
writeInbox(paths.inbox("bob"), [ev({ id: "a", sentinel: "/nope" })]);
|
|
||||||
|
|
||||||
const delivered: PingEvent[] = [];
|
|
||||||
let triggerChange: () => void = () => {};
|
|
||||||
const w = new InboxWatcher({
|
|
||||||
paths, agent: "bob",
|
|
||||||
notify: async (e) => { delivered.push(e); },
|
|
||||||
sentinelExists: () => false,
|
|
||||||
startWatcher: (_path, onChange) => {
|
|
||||||
triggerChange = onChange;
|
|
||||||
return { async close() {} };
|
|
||||||
},
|
|
||||||
});
|
|
||||||
await w.start();
|
|
||||||
expect(delivered).toEqual([]);
|
|
||||||
// hwm has pending_attempts.a = 1 but ts not advanced
|
|
||||||
let hwm = JSON.parse(readFileSync(paths.hwm("bob"), "utf8"));
|
|
||||||
expect(hwm.pending_attempts).toEqual({ a: 1 });
|
|
||||||
expect(hwm.last_delivered_ts).toBe("");
|
|
||||||
|
|
||||||
// Two more triggers — third delivery emits with warning
|
|
||||||
triggerChange();
|
|
||||||
await new Promise((r) => setTimeout(r, 10));
|
|
||||||
triggerChange();
|
|
||||||
await new Promise((r) => setTimeout(r, 10));
|
|
||||||
|
|
||||||
expect(delivered.map((d) => d.id)).toEqual(["a"]);
|
|
||||||
hwm = JSON.parse(readFileSync(paths.hwm("bob"), "utf8"));
|
|
||||||
expect(hwm.pending_attempts).toEqual({});
|
|
||||||
expect(hwm.last_delivered_ts).toBe("2026-05-06T10:00:00Z");
|
|
||||||
await w.stop();
|
|
||||||
});
|
|
||||||
|
|
||||||
it("populates recentEvents map for tools to look up sender", async () => {
|
|
||||||
const paths = makePaths(dir);
|
|
||||||
mkdirSync(paths.pingsDir, { recursive: true });
|
|
||||||
writeInbox(paths.inbox("bob"), [
|
|
||||||
ev({ id: "ping-x", from: "mom" }),
|
|
||||||
ev({ id: "ping-y", from: "foreman" }),
|
|
||||||
]);
|
|
||||||
const w = new InboxWatcher({
|
|
||||||
paths, agent: "bob",
|
|
||||||
notify: async () => {},
|
|
||||||
startWatcher: () => ({ async close() {} }),
|
|
||||||
});
|
|
||||||
await w.start();
|
|
||||||
expect(w.recentEvents().get("ping-x")?.from).toBe("mom");
|
|
||||||
expect(w.recentEvents().get("ping-y")?.from).toBe("foreman");
|
|
||||||
await w.stop();
|
|
||||||
});
|
|
||||||
|
|
||||||
it("skips pings already covered by HWM on restart", async () => {
|
|
||||||
const paths = makePaths(dir);
|
|
||||||
mkdirSync(paths.pingsDir, { recursive: true });
|
|
||||||
writeInbox(paths.inbox("bob"), [
|
|
||||||
ev({ id: "a", ts: "2026-05-06T10:00:00Z" }),
|
|
||||||
ev({ id: "b", ts: "2026-05-06T11:00:00Z" }),
|
|
||||||
]);
|
|
||||||
writeFileSync(
|
|
||||||
paths.hwm("bob"),
|
|
||||||
JSON.stringify({ last_delivered_ts: "2026-05-06T10:00:00Z", pending_attempts: {} }),
|
|
||||||
);
|
|
||||||
|
|
||||||
const delivered: PingEvent[] = [];
|
|
||||||
const w = new InboxWatcher({
|
|
||||||
paths, agent: "bob",
|
|
||||||
notify: async (e) => { delivered.push(e); },
|
|
||||||
startWatcher: () => ({ async close() {} }),
|
|
||||||
});
|
|
||||||
await w.start();
|
|
||||||
await w.stop();
|
|
||||||
expect(delivered.map((d) => d.id)).toEqual(["b"]);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
@ -1,18 +0,0 @@
|
||||||
{
|
|
||||||
"compilerOptions": {
|
|
||||||
"target": "ES2022",
|
|
||||||
"module": "Node16",
|
|
||||||
"moduleResolution": "Node16",
|
|
||||||
"outDir": "dist",
|
|
||||||
"rootDir": "src",
|
|
||||||
"strict": true,
|
|
||||||
"esModuleInterop": true,
|
|
||||||
"forceConsistentCasingInFileNames": true,
|
|
||||||
"resolveJsonModule": true,
|
|
||||||
"skipLibCheck": true,
|
|
||||||
"declaration": false,
|
|
||||||
"sourceMap": true
|
|
||||||
},
|
|
||||||
"include": ["src/**/*.ts"],
|
|
||||||
"exclude": ["node_modules", "dist", "test"]
|
|
||||||
}
|
|
||||||
|
|
@ -1,8 +0,0 @@
|
||||||
import { defineConfig } from "vitest/config";
|
|
||||||
|
|
||||||
export default defineConfig({
|
|
||||||
test: {
|
|
||||||
include: ["test/**/*.test.ts"],
|
|
||||||
environment: "node",
|
|
||||||
},
|
|
||||||
});
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue