Compare commits
1 commit
main
...
foreman/sa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
95ff60ce94 |
2 changed files with 87 additions and 0 deletions
|
|
@ -7,6 +7,11 @@
|
||||||
// 2. Set up chokidar on the inbox file. On change, re-read, emit new lines.
|
// 2. Set up chokidar on the inbox file. On change, re-read, emit new lines.
|
||||||
// 3. Sentinel-deferred pings remain in the inbox; on each change-event
|
// 3. Sentinel-deferred pings remain in the inbox; on each change-event
|
||||||
// we re-evaluate them. The HWM file does NOT advance past them.
|
// we re-evaluate them. The HWM file does NOT advance past them.
|
||||||
|
// 4. Safety-poll: a periodic timer also calls drain() regardless of
|
||||||
|
// filesystem events. Catches the rare case where the kernel drops
|
||||||
|
// an inotify event (observed under brief idle gaps + atomic writes).
|
||||||
|
// drain() is idempotent, so the overhead is one stat + HWM compare
|
||||||
|
// when nothing's changed.
|
||||||
//
|
//
|
||||||
// The notifier is the MCP server's `mcp.notification()` wrapped to take a
|
// The notifier is the MCP server's `mcp.notification()` wrapped to take a
|
||||||
// PingEvent + optional warning. Decoupled so this module is testable
|
// PingEvent + optional warning. Decoupled so this module is testable
|
||||||
|
|
@ -37,11 +42,21 @@ export interface WatcherOptions {
|
||||||
onDrain?: (result: { delivered: number; deferred: number }) => void;
|
onDrain?: (result: { delivered: number; deferred: number }) => void;
|
||||||
/** Override for testability — chokidar in prod, mock in tests. */
|
/** Override for testability — chokidar in prod, mock in tests. */
|
||||||
startWatcher?: (path: string, onChange: () => void) => FSWatcher | { close(): Promise<void> };
|
startWatcher?: (path: string, onChange: () => void) => FSWatcher | { close(): Promise<void> };
|
||||||
|
/**
|
||||||
|
* Safety-poll interval in ms. Calls drain() unconditionally to catch
|
||||||
|
* dropped inotify events. Default 30_000. Set to 0 to disable (tests
|
||||||
|
* that don't need the poll, or environments with perfectly-reliable
|
||||||
|
* fs notifications).
|
||||||
|
*/
|
||||||
|
safetyPollMs?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const DEFAULT_SAFETY_POLL_MS = 30_000;
|
||||||
|
|
||||||
export class InboxWatcher {
|
export class InboxWatcher {
|
||||||
private opts: WatcherOptions;
|
private opts: WatcherOptions;
|
||||||
private fsWatcher: FSWatcher | { close(): Promise<void> } | null = null;
|
private fsWatcher: FSWatcher | { close(): Promise<void> } | null = null;
|
||||||
|
private safetyPoll: ReturnType<typeof setInterval> | null = null;
|
||||||
/** All seen events keyed by id, used by the respond tool to look up sender. */
|
/** All seen events keyed by id, used by the respond tool to look up sender. */
|
||||||
private seen = new Map<string, PingEvent>();
|
private seen = new Map<string, PingEvent>();
|
||||||
private draining = false;
|
private draining = false;
|
||||||
|
|
@ -60,9 +75,20 @@ export class InboxWatcher {
|
||||||
this.fsWatcher = start(inboxPath, () => {
|
this.fsWatcher = start(inboxPath, () => {
|
||||||
void this.drain();
|
void this.drain();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const pollMs = this.opts.safetyPollMs ?? DEFAULT_SAFETY_POLL_MS;
|
||||||
|
if (pollMs > 0) {
|
||||||
|
this.safetyPoll = setInterval(() => {
|
||||||
|
void this.drain();
|
||||||
|
}, pollMs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async stop(): Promise<void> {
|
async stop(): Promise<void> {
|
||||||
|
if (this.safetyPoll) {
|
||||||
|
clearInterval(this.safetyPoll);
|
||||||
|
this.safetyPoll = null;
|
||||||
|
}
|
||||||
if (this.fsWatcher) {
|
if (this.fsWatcher) {
|
||||||
await this.fsWatcher.close();
|
await this.fsWatcher.close();
|
||||||
this.fsWatcher = null;
|
this.fsWatcher = null;
|
||||||
|
|
|
||||||
|
|
@ -130,6 +130,67 @@ describe("InboxWatcher initial drain", () => {
|
||||||
await w.stop();
|
await w.stop();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("safety-poll delivers pings appended while no fs-event fires", async () => {
|
||||||
|
// Simulates the dropped-inotify-event case: chokidar mock never fires
|
||||||
|
// onChange. A ping is appended after start(); only the safety-poll
|
||||||
|
// timer can catch it. Asserts the watcher recovers within one poll cycle.
|
||||||
|
const paths = makePaths(dir);
|
||||||
|
mkdirSync(paths.pingsDir, { recursive: true });
|
||||||
|
writeInbox(paths.inbox("bob"), [ev({ id: "before-start", ts: "2026-05-06T10:00:00Z" })]);
|
||||||
|
|
||||||
|
const delivered: PingEvent[] = [];
|
||||||
|
const w = new InboxWatcher({
|
||||||
|
paths,
|
||||||
|
agent: "bob",
|
||||||
|
notify: async (e) => { delivered.push(e); },
|
||||||
|
// Chokidar mock that NEVER fires onChange — simulates a dropped event.
|
||||||
|
startWatcher: () => ({ async close() {} }),
|
||||||
|
safetyPollMs: 20,
|
||||||
|
});
|
||||||
|
await w.start();
|
||||||
|
expect(delivered.map((d) => d.id)).toEqual(["before-start"]);
|
||||||
|
|
||||||
|
// Append a new ping post-start, without notifying chokidar.
|
||||||
|
appendFileSync(
|
||||||
|
paths.inbox("bob"),
|
||||||
|
JSON.stringify(ev({ id: "after-start", ts: "2026-05-06T12:00:00Z" })) + "\n",
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wait long enough for at least one safety-poll cycle.
|
||||||
|
await new Promise((r) => setTimeout(r, 80));
|
||||||
|
await w.stop();
|
||||||
|
|
||||||
|
expect(delivered.map((d) => d.id)).toEqual(["before-start", "after-start"]);
|
||||||
|
const hwm = JSON.parse(readFileSync(paths.hwm("bob"), "utf8"));
|
||||||
|
expect(hwm.last_delivered_ts).toBe("2026-05-06T12:00:00Z");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("safetyPollMs: 0 disables the safety-poll timer", async () => {
|
||||||
|
const paths = makePaths(dir);
|
||||||
|
mkdirSync(paths.pingsDir, { recursive: true });
|
||||||
|
writeInbox(paths.inbox("bob"), [ev({ id: "a", ts: "2026-05-06T10:00:00Z" })]);
|
||||||
|
|
||||||
|
const delivered: PingEvent[] = [];
|
||||||
|
const w = new InboxWatcher({
|
||||||
|
paths,
|
||||||
|
agent: "bob",
|
||||||
|
notify: async (e) => { delivered.push(e); },
|
||||||
|
startWatcher: () => ({ async close() {} }),
|
||||||
|
safetyPollMs: 0,
|
||||||
|
});
|
||||||
|
await w.start();
|
||||||
|
|
||||||
|
// Append a ping; with no fs-event AND no safety-poll, it must NOT be delivered.
|
||||||
|
appendFileSync(
|
||||||
|
paths.inbox("bob"),
|
||||||
|
JSON.stringify(ev({ id: "missed", ts: "2026-05-06T11:00:00Z" })) + "\n",
|
||||||
|
);
|
||||||
|
await new Promise((r) => setTimeout(r, 50));
|
||||||
|
await w.stop();
|
||||||
|
|
||||||
|
expect(delivered.map((d) => d.id)).toEqual(["a"]); // only the initial-drain ping
|
||||||
|
});
|
||||||
|
|
||||||
it("skips pings already covered by HWM on restart", async () => {
|
it("skips pings already covered by HWM on restart", async () => {
|
||||||
const paths = makePaths(dir);
|
const paths = makePaths(dir);
|
||||||
mkdirSync(paths.pingsDir, { recursive: true });
|
mkdirSync(paths.pingsDir, { recursive: true });
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue