import { describe, it, expect, beforeEach, afterEach } from "vitest"; import { mkdtempSync, writeFileSync, mkdirSync, appendFileSync, rmSync, readFileSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { makePaths } from "../src/paths.js"; import { InboxWatcher } from "../src/watcher.js"; import type { PingEvent } from "../src/inbox.js"; let dir: string; beforeEach(() => { dir = mkdtempSync(join(tmpdir(), "watcher-watch-")); }); afterEach(() => { rmSync(dir, { recursive: true, force: true }); }); const ev = (over: Partial): PingEvent => ({ ts: "2026-05-06T10:00:00Z", id: "ping-" + Math.random().toString(36).slice(2, 10), from: "foreman", to: "bob", type: "INFO", payload: "test", ...over, }); const writeInbox = (path: string, events: PingEvent[]) => { mkdirSync(join(path, "..").replace(/[^/]*\/?$/, ""), { recursive: true }); writeFileSync(path, events.map((e) => JSON.stringify(e)).join("\n") + "\n"); }; describe("InboxWatcher initial drain", () => { it("emits all unread events on start, then advances HWM", async () => { const paths = makePaths(dir); mkdirSync(paths.pingsDir, { recursive: true }); writeInbox(paths.inbox("bob"), [ ev({ id: "a", ts: "2026-05-06T10:00:00Z" }), ev({ id: "b", ts: "2026-05-06T11:00:00Z" }), ]); const delivered: { event: PingEvent; warning?: string }[] = []; const w = new InboxWatcher({ paths, agent: "bob", notify: async (event, warning) => { delivered.push({ event, warning }); }, // No-op chokidar substitute startWatcher: () => ({ async close() {} }), }); await w.start(); await w.stop(); expect(delivered.map((d) => d.event.id)).toEqual(["a", "b"]); const hwm = JSON.parse(readFileSync(paths.hwm("bob"), "utf8")); expect(hwm.last_delivered_ts).toBe("2026-05-06T11:00:00Z"); }); it("orders urgent before normal regardless of timestamp", async () => { const paths = makePaths(dir); mkdirSync(paths.pingsDir, { recursive: true }); writeInbox(paths.inbox("bob"), [ ev({ id: "older-normal", ts: "2026-05-06T10:00:00Z", priority: "normal" }), ev({ id: "newer-urgent", ts: "2026-05-06T11:00:00Z", priority: "urgent" }), ]); const delivered: PingEvent[] = []; const w = new InboxWatcher({ paths, agent: "bob", notify: async (e) => { delivered.push(e); }, startWatcher: () => ({ async close() {} }), }); await w.start(); await w.stop(); expect(delivered.map((d) => d.id)).toEqual(["newer-urgent", "older-normal"]); }); it("defers a sentinel-missing ping and re-attempts on next drain", async () => { const paths = makePaths(dir); mkdirSync(paths.pingsDir, { recursive: true }); writeInbox(paths.inbox("bob"), [ev({ id: "a", sentinel: "/nope" })]); const delivered: PingEvent[] = []; let triggerChange: () => void = () => {}; const w = new InboxWatcher({ paths, agent: "bob", notify: async (e) => { delivered.push(e); }, sentinelExists: () => false, startWatcher: (_path, onChange) => { triggerChange = onChange; return { async close() {} }; }, }); await w.start(); expect(delivered).toEqual([]); // hwm has pending_attempts.a = 1 but ts not advanced let hwm = JSON.parse(readFileSync(paths.hwm("bob"), "utf8")); expect(hwm.pending_attempts).toEqual({ a: 1 }); expect(hwm.last_delivered_ts).toBe(""); // Two more triggers — third delivery emits with warning triggerChange(); await new Promise((r) => setTimeout(r, 10)); triggerChange(); await new Promise((r) => setTimeout(r, 10)); expect(delivered.map((d) => d.id)).toEqual(["a"]); hwm = JSON.parse(readFileSync(paths.hwm("bob"), "utf8")); expect(hwm.pending_attempts).toEqual({}); expect(hwm.last_delivered_ts).toBe("2026-05-06T10:00:00Z"); await w.stop(); }); it("populates recentEvents map for tools to look up sender", async () => { const paths = makePaths(dir); mkdirSync(paths.pingsDir, { recursive: true }); writeInbox(paths.inbox("bob"), [ ev({ id: "ping-x", from: "mom" }), ev({ id: "ping-y", from: "foreman" }), ]); const w = new InboxWatcher({ paths, agent: "bob", notify: async () => {}, startWatcher: () => ({ async close() {} }), }); await w.start(); expect(w.recentEvents().get("ping-x")?.from).toBe("mom"); expect(w.recentEvents().get("ping-y")?.from).toBe("foreman"); await w.stop(); }); it("skips pings already covered by HWM on restart", async () => { const paths = makePaths(dir); mkdirSync(paths.pingsDir, { recursive: true }); writeInbox(paths.inbox("bob"), [ ev({ id: "a", ts: "2026-05-06T10:00:00Z" }), ev({ id: "b", ts: "2026-05-06T11:00:00Z" }), ]); writeFileSync( paths.hwm("bob"), JSON.stringify({ last_delivered_ts: "2026-05-06T10:00:00Z", pending_attempts: {} }), ); const delivered: PingEvent[] = []; const w = new InboxWatcher({ paths, agent: "bob", notify: async (e) => { delivered.push(e); }, startWatcher: () => ({ async close() {} }), }); await w.start(); await w.stop(); expect(delivered.map((d) => d.id)).toEqual(["b"]); }); });