diff --git a/INSTALL.md b/INSTALL.md deleted file mode 100644 index ea03ead..0000000 --- a/INSTALL.md +++ /dev/null @@ -1,95 +0,0 @@ -# INSTALL — agent-watcher Collector (Layer 1) - -The MCP Watcher (Layer 2) has its own install path; this file covers Layer 1 only. - -## Prerequisites - -- Linux with systemd -- Go 1.22+ (`sudo apt install golang-go` on Ubuntu/Mint) -- An existing agent-ping install on this host (the Collector writes to its inbox dir) - -## Install - -Per CLAUDE.md rule #2, **Angus runs the install commands** — agents do not modify their own configuration. - -```sh -git clone https://git.botbought.ai/foreman/agent-watcher ~/agent-watcher -cd ~/agent-watcher -./install.sh -``` - -The script builds, installs to `~/.local/bin/agent-watcher`, drops the systemd unit into `~/.config/systemd/user/`, copies the example config to `~/.config/agent-watcher/collector.yaml` (if absent), reloads systemd, and starts the service. - -To install without auto-starting (so you can edit the config first): - -```sh -./install.sh --no-start -``` - -## Configure - -Edit `~/.config/agent-watcher/collector.yaml`. The example has both sources configured for `agent: foreman` with placeholder routes — adapt to this host. At least one of `webhook:` or `drop_folder:` must be configured. - -After editing: - -```sh -systemctl --user restart agent-watcher -``` - -## Verify - -```sh -# logs -journalctl --user -u agent-watcher -f - -# health -curl http://127.0.0.1:18790/health - -# end-to-end via drop folder -echo '{"recipient":"bob","type":"INFO","payload":"hello from drop"}' \ - > ~/Nyx/workspace/incoming/test.json - -# end-to-end via webhook -curl -X POST http://127.0.0.1:18790/forgejo/push \ - -H 'Content-Type: application/json' \ - -d '{"repo":"agent-ping","actor":"angus"}' - -# the lines should appear in the recipient's inbox -tail -2 ~/Nyx/workspace/pings/bob.inbox -``` - -## Survive logout - -`systemctl --user` units stop when the user logs out. To keep the Collector running across logouts: - -```sh -sudo loginctl enable-linger $USER -``` - -## Uninstall - -```sh -systemctl --user disable --now agent-watcher -rm ~/.local/bin/agent-watcher -rm ~/.config/systemd/user/agent-watcher.service -systemctl --user daemon-reload -# config and example left behind; remove if desired: -# rm -rf ~/.config/agent-watcher -``` - -## Troubleshooting - -| Symptom | Likely cause | Fix | -|---|---|---| -| `journalctl` shows `config: ... is required` | YAML field missing | Match the example, save, restart. | -| `systemctl --user status` shows `address already in use` | port 18790 taken by something else | Edit `webhook.listen` to a free port. | -| Drop file disappears but no inbox line | check `.dead-letter/` for the file + `.reason` sidecar | Schema validation failed — fix the producer. | -| Webhook returns 404 | path doesn't match a configured route | Routes must match exactly; check trailing slashes. | -| Service won't start across reboot | `linger` not enabled | `sudo loginctl enable-linger $USER` | -| Drop file syncs back from another host before processing | drop folder is in Syncthing scope | This is intended (lets producers on other hosts deliver). Lifecycle artifacts (`.tmp`, `.dead-letter/`) are excluded by `install.sh`. | - -## What this does NOT install - -- The agent-ping system. Install that first (`~/agent-ping/install.sh `). -- Layer 2 (MCP Watcher). Different binary, different runtime, different unit — separate install when it lands. -- Caddy / reverse-proxy webhook auth. v1 is loopback-only; v2 will document the upgrade path. diff --git a/README.md b/README.md index 513a406..09f78b8 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,13 @@ # agent-watcher -Push-delivery layer for [`agent-ping`](https://git.botbought.ai/foreman/agent-ping). The "secondary nervous system" for Claude Code agents on this network. +Push-delivery layer for [`agent-ping`](http://localhost:3300/angus/agent-ping). The "secondary nervous system" for Claude Code agents on this network. `agent-ping` queues messages in inbox files; `agent-watcher` notices them (and other external events) and wakes the recipient agent without a human in the loop. Two layers: -- **Collector** (this repo, Go) — small daemon under `systemd --user`. Always on, brain-blind. Converts external events (HTTP webhooks, drop-folder file arrivals) into ping inbox writes. Runs whether or not any agent is alive. -- **MCP Watcher** (Python, in progress) — Claude Code MCP subprocess declared in each agent's `mcp.json`. Watches the agent's inbox via inotify and surfaces events into the live session via Channels. Provides reply tools (`ack`, `respond`, `mark_handled`). +- **Collector** — small Go daemon, `systemd --user`, always on, brain-blind. Converts external events (HTTP webhooks, drop-folder file arrivals) into ping inbox writes. Runs whether or not any agent is alive. +- **MCP Watcher** — Claude Code MCP subprocess, declared in each agent's `mcp.json`. Watches the agent's inbox via inotify and surfaces events into the live session via Channels (research preview). Provides reply tools (`ack`, `respond`, `mark_handled`). Filesystem is the queue. OpenBrain is not involved. @@ -15,42 +15,10 @@ Filesystem is the queue. OpenBrain is not involved. [`spec/agent-watcher.md`](spec/agent-watcher.md). Read that for architecture, decisions, scope. -Channels reference docs (snapshot of Anthropic's official docs, used by Layer 2): [`docs/channels/`](docs/channels/). - ## Status -| Layer | Lane | Status | -|---|---|---| -| Spec v1 | — | Signed off by Bob 2026-05-06 | -| Layer 1: Collector | Foreman / Go | **v0 working: 43 tests passing.** End-to-end exercised; binary builds. systemd unit + INSTALL.md ready. | -| Layer 2: MCP Watcher | Bob / Python | In progress — sandbox CC session being set up on VPS for testing. | +v1 spec signed off by Bob (VPS) 2026-05-06. Implementation pending. -## Install (Layer 1) +## Install -```sh -git clone https://git.botbought.ai/foreman/agent-watcher ~/agent-watcher -cd ~/agent-watcher -./install.sh -``` - -Then edit `~/.config/agent-watcher/collector.yaml` and `systemctl --user restart agent-watcher`. - -See [`INSTALL.md`](INSTALL.md) for verify steps, troubleshooting, and the `loginctl enable-linger` step required to keep the daemon running across logouts. - -Per CLAUDE.md rule #2, **Angus runs the install commands** — agents do not modify their own configuration. - -## Quick reference (Layer 1) - -``` -Inputs Output -───────── ────── -HTTP POST → port 18790 ┐ - (routed via │ .inbox (JSONL, ping-shaped) - YAML table) │ identical format to - ├─→ what `ping ` writes; -File drop in │ the existing UserPromptSubmit hook and the -~/Nyx/workspace/incoming/ │ future MCP Watcher consume the stream -*.json ┘ without distinguishing source. -``` - -`/health` on the same webhook port returns `{received, emitted, errors, uptime_sec}` for journalctl correlation. +Per CLAUDE.md rule #2, **Angus runs the install commands** — agents do not modify their own configuration. Install script will land alongside `INSTALL.md` once the binaries are built. diff --git a/cmd/agent-watcher/main.go b/cmd/agent-watcher/main.go deleted file mode 100644 index fdca86e..0000000 --- a/cmd/agent-watcher/main.go +++ /dev/null @@ -1,148 +0,0 @@ -// Command agent-watcher is the Collector daemon. -// -// It loads a YAML config, builds the configured sources, and runs them -// concurrently. Each source converts external events (webhook POSTs, files -// dropped into a folder) into JSONL writes against the configured inbox -// directory. Output is bit-identical to the agent-ping CLI's writes, so the -// existing UserPromptSubmit hook (and the future MCP Watcher) consume the -// stream without distinguishing source. -// -// Default config path: ~/.config/agent-watcher/collector.yaml. Override with -// --config or AGENT_WATCHER_CONFIG. -// -// Signals: SIGINT and SIGTERM trigger graceful shutdown. SIGHUP reload is a -// v2 item — for now restart the unit to pick up config changes. -package main - -import ( - "context" - "flag" - "fmt" - "log/slog" - "os" - "os/signal" - "path/filepath" - "sync" - "syscall" - - "git.botbought.ai/foreman/agent-watcher/internal/config" - "git.botbought.ai/foreman/agent-watcher/internal/inbox" - "git.botbought.ai/foreman/agent-watcher/internal/source" - "git.botbought.ai/foreman/agent-watcher/internal/source/dropfolder" - "git.botbought.ai/foreman/agent-watcher/internal/source/webhook" -) - -func main() { - var ( - configPath = flag.String("config", "", "path to collector.yaml (default: $AGENT_WATCHER_CONFIG or ~/.config/agent-watcher/collector.yaml)") - jsonLog = flag.Bool("json-log", false, "emit logs as JSON (for journald structured fields)") - ) - flag.Parse() - - logger := newLogger(*jsonLog) - - path := resolveConfigPath(*configPath) - cfg, err := config.Load(path) - if err != nil { - logger.Error("config", "err", err) - os.Exit(2) - } - logger = logger.With("agent", cfg.Agent) - logger.Info("config loaded", "path", path, "inbox_dir", cfg.InboxDir) - - if err := run(cfg, logger); err != nil { - logger.Error("run", "err", err) - os.Exit(1) - } -} - -func run(cfg *config.Config, logger *slog.Logger) error { - w, err := inbox.NewWriter(cfg.InboxDir) - if err != nil { - return fmt.Errorf("inbox: %w", err) - } - emit := func(recipient string, ev *inbox.Event) error { - return w.Write(recipient, ev) - } - - sources, err := buildSources(cfg, logger) - if err != nil { - return err - } - if len(sources) == 0 { - return fmt.Errorf("no sources configured") - } - - ctx, cancel := signalContext() - defer cancel() - - var wg sync.WaitGroup - for _, src := range sources { - src := src - wg.Add(1) - go func() { - defer wg.Done() - logger.Info("source starting", "name", src.Name()) - if err := src.Run(ctx, emit); err != nil && err != context.Canceled { - logger.Error("source exited", "name", src.Name(), "err", err) - } else { - logger.Info("source stopped", "name", src.Name()) - } - }() - } - wg.Wait() - return nil -} - -func buildSources(cfg *config.Config, logger *slog.Logger) ([]source.Source, error) { - var out []source.Source - if w := cfg.Sources.Webhook; w != nil { - s, err := webhook.New(webhook.Config{ - Listen: w.Listen, - Routes: w.ToWebhookRouteMap(), - }, logger) - if err != nil { - return nil, fmt.Errorf("webhook: %w", err) - } - out = append(out, s) - } - if d := cfg.Sources.DropFolder; d != nil { - out = append(out, dropfolder.New(dropfolder.Config{ - Path: d.Path, - PollFallbackSeconds: d.PollFallbackSeconds, - }, logger)) - } - return out, nil -} - -func signalContext() (context.Context, context.CancelFunc) { - ctx, cancel := context.WithCancel(context.Background()) - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - go func() { - <-sigCh - cancel() - }() - return ctx, cancel -} - -func resolveConfigPath(flagPath string) string { - if flagPath != "" { - return flagPath - } - if env := os.Getenv("AGENT_WATCHER_CONFIG"); env != "" { - return env - } - home, _ := os.UserHomeDir() - return filepath.Join(home, ".config", "agent-watcher", "collector.yaml") -} - -func newLogger(asJSON bool) *slog.Logger { - var h slog.Handler - if asJSON { - h = slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}) - } else { - h = slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}) - } - return slog.New(h) -} diff --git a/cmd/agent-watcher/main_test.go b/cmd/agent-watcher/main_test.go deleted file mode 100644 index 6cfa105..0000000 --- a/cmd/agent-watcher/main_test.go +++ /dev/null @@ -1,299 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "net" - "net/http" - "os" - "os/exec" - "path/filepath" - "strings" - "syscall" - "testing" - "time" -) - -// TestMain builds the agent-watcher binary once for the integration tests. -func TestMain(m *testing.M) { - bin, err := buildBinary() - if err != nil { - fmt.Fprintln(os.Stderr, "build failed:", err) - os.Exit(1) - } - binaryPath = bin - code := m.Run() - os.Remove(bin) - os.Exit(code) -} - -var binaryPath string - -func buildBinary() (string, error) { - dir, err := os.MkdirTemp("", "agent-watcher-bin-*") - if err != nil { - return "", err - } - out := filepath.Join(dir, "agent-watcher") - cmd := exec.Command("go", "build", "-o", out, ".") - cmd.Stderr = os.Stderr - if err := cmd.Run(); err != nil { - return "", err - } - return out, nil -} - -// freePort returns an unused loopback "127.0.0.1:N" port. -func freePort(t *testing.T) string { - t.Helper() - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - addr := l.Addr().String() - l.Close() - return addr -} - -// startBinary launches agent-watcher with the given config file and returns -// the running cmd. The test is responsible for sending SIGINT/SIGTERM. -func startBinary(t *testing.T, configPath string) *exec.Cmd { - t.Helper() - cmd := exec.Command(binaryPath, "--config", configPath) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - if err := cmd.Start(); err != nil { - t.Fatal(err) - } - t.Cleanup(func() { - if cmd.Process != nil { - cmd.Process.Signal(syscall.SIGTERM) - cmd.Wait() - } - }) - return cmd -} - -// waitFor polls predicate until true or timeout. -func waitFor(t *testing.T, name string, pred func() bool, timeout time.Duration) { - t.Helper() - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - if pred() { - return - } - time.Sleep(20 * time.Millisecond) - } - t.Fatalf("waitFor %q: predicate did not become true within %s", name, timeout) -} - -func waitForListener(t *testing.T, addr string, timeout time.Duration) { - t.Helper() - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - c, err := net.DialTimeout("tcp", addr, 50*time.Millisecond) - if err == nil { - c.Close() - return - } - time.Sleep(20 * time.Millisecond) - } - t.Fatalf("listener at %s never came up", addr) -} - -func readInbox(t *testing.T, path string) []map[string]any { - t.Helper() - body, err := os.ReadFile(path) - if err != nil { - return nil - } - out := []map[string]any{} - for _, line := range strings.Split(strings.TrimRight(string(body), "\n"), "\n") { - if line == "" { - continue - } - var m map[string]any - if err := json.Unmarshal([]byte(line), &m); err != nil { - t.Errorf("non-JSON line: %s", line) - continue - } - out = append(out, m) - } - return out -} - -// TestEndToEnd_BothSourcesEmitToInbox builds and runs the real binary, -// exercises both webhook and drop-folder sources, and verifies the inbox -// file contains one bit-identical JSONL line per emitted event. -func TestEndToEnd_BothSourcesEmitToInbox(t *testing.T) { - tmp := t.TempDir() - inboxDir := filepath.Join(tmp, "pings") - dropDir := filepath.Join(tmp, "incoming") - configPath := filepath.Join(tmp, "collector.yaml") - addr := freePort(t) - - cfg := fmt.Sprintf(` -agent: foreman -inbox_dir: %s -sources: - webhook: - listen: %s - routes: - /forgejo/push: - recipient: bob - type: INFO - payload_template: "push {{ .repo }} by {{ .actor }}" - /alert: - recipient: bob - type: NEEDS-RESPONSE - priority: urgent - payload_template: "fixed alert" - drop_folder: - path: %s - poll_fallback_seconds: 0 -`, inboxDir, addr, dropDir) - - if err := os.WriteFile(configPath, []byte(cfg), 0644); err != nil { - t.Fatal(err) - } - - startBinary(t, configPath) - waitForListener(t, addr, 3*time.Second) - - // 1) drop a file - dropContent := `{"recipient":"bob","type":"INFO","payload":"from drop"}` - dropTmp := filepath.Join(dropDir, "drop1.json.tmp") - if err := os.WriteFile(dropTmp, []byte(dropContent), 0644); err != nil { - t.Fatal(err) - } - if err := os.Rename(dropTmp, filepath.Join(dropDir, "drop1.json")); err != nil { - t.Fatal(err) - } - - // 2) post a webhook - resp, err := http.Post("http://"+addr+"/forgejo/push", "application/json", - strings.NewReader(`{"repo":"agent-ping","actor":"angus"}`)) - if err != nil { - t.Fatal(err) - } - resp.Body.Close() - if resp.StatusCode != http.StatusAccepted { - t.Fatalf("webhook status = %d", resp.StatusCode) - } - - // 3) post a urgent alert - resp2, err := http.Post("http://"+addr+"/alert", "application/json", strings.NewReader("")) - if err != nil { - t.Fatal(err) - } - resp2.Body.Close() - - // Wait for 3 lines in bob's inbox. - bobInbox := filepath.Join(inboxDir, "bob.inbox") - waitFor(t, "3 inbox lines", func() bool { - return len(readInbox(t, bobInbox)) == 3 - }, 3*time.Second) - - got := readInbox(t, bobInbox) - payloads := map[string]map[string]any{} - for _, m := range got { - payloads[m["payload"].(string)] = m - } - - // drop event - drop := payloads["from drop"] - if drop == nil { - t.Fatalf("missing drop event; got: %+v", got) - } - if drop["type"] != "INFO" || drop["from"] != "collector" { - t.Errorf("drop event shape: %+v", drop) - } - if !strings.HasPrefix(drop["source"].(string), "drop-folder:drop1.json") { - t.Errorf("drop source: %v", drop["source"]) - } - - // webhook push event - push := payloads["push agent-ping by angus"] - if push == nil { - t.Fatalf("missing push event; got: %+v", got) - } - if push["type"] != "INFO" || push["priority"] != "normal" { - t.Errorf("push event shape: %+v", push) - } - if push["source"] != "webhook:/forgejo/push" { - t.Errorf("push source: %v", push["source"]) - } - - // alert event with priority urgent - alert := payloads["fixed alert"] - if alert == nil { - t.Fatalf("missing alert event; got: %+v", got) - } - if alert["priority"] != "urgent" || alert["type"] != "NEEDS-RESPONSE" { - t.Errorf("alert event shape: %+v", alert) - } - - // Health endpoint is wired and returns sane counters. - resp3, err := http.Get("http://" + addr + "/health") - if err != nil { - t.Fatal(err) - } - defer resp3.Body.Close() - var stats map[string]any - if err := json.NewDecoder(resp3.Body).Decode(&stats); err != nil { - t.Fatal(err) - } - if int(stats["emitted"].(float64)) != 2 { - t.Errorf("health emitted = %v, want 2", stats["emitted"]) - } -} - -// TestEndToEnd_GracefulShutdown ensures SIGTERM stops the binary cleanly. -func TestEndToEnd_GracefulShutdown(t *testing.T) { - tmp := t.TempDir() - configPath := filepath.Join(tmp, "collector.yaml") - addr := freePort(t) - cfg := fmt.Sprintf(` -agent: foreman -inbox_dir: %s/pings -sources: - webhook: - listen: %s - routes: - /x: - recipient: r - type: INFO - payload_template: ok -`, tmp, addr) - if err := os.WriteFile(configPath, []byte(cfg), 0644); err != nil { - t.Fatal(err) - } - - cmd := exec.Command(binaryPath, "--config", configPath) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - if err := cmd.Start(); err != nil { - t.Fatal(err) - } - waitForListener(t, addr, 3*time.Second) - - if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { - t.Fatal(err) - } - - done := make(chan error, 1) - go func() { done <- cmd.Wait() }() - select { - case err := <-done: - if err != nil { - // Some Go runtimes return an exitstatus error after signal-driven - // shutdown; that's OK as long as we exited. - if _, ok := err.(*exec.ExitError); !ok { - t.Errorf("unexpected exit error: %v", err) - } - } - case <-time.After(3 * time.Second): - cmd.Process.Kill() - t.Fatal("did not shut down within 3s of SIGTERM") - } -} diff --git a/examples/collector.yaml b/examples/collector.yaml deleted file mode 100644 index 1354d29..0000000 --- a/examples/collector.yaml +++ /dev/null @@ -1,55 +0,0 @@ -# agent-watcher Collector configuration -# -# Lives at: ~/.config/agent-watcher/collector.yaml -# Override with --config or AGENT_WATCHER_CONFIG. -# -# At least one source (webhook OR drop_folder) must be configured. - -# This host's identity. Used in logs only; the inbox writer routes by -# the recipient field on each event, not this. -agent: foreman - -# Optional. Where to write .inbox files. Default shown. -# inbox_dir: ~/Nyx/workspace/pings - -sources: - - # HTTP webhook source. - # v1 binds loopback only — Caddy + bearer-token reverse-proxy is the - # v2 upgrade path for accepting webhooks from external producers. - webhook: - listen: 127.0.0.1:18790 - routes: - # Path → which inbox to land in, with a Go text/template payload. - # Variables come from the request body decoded as JSON. - /forgejo/push: - recipient: bob - type: INFO - payload_template: "forgejo push to {{ .repo }} by {{ .actor }}" - - # Empty-body posts work too — fixed-string templates render without - # any data. - /openrouter/billing-alert: - recipient: bob - type: NEEDS-RESPONSE - priority: urgent - payload_template: "billing alert: {{ .message }}" - - # Drop-folder source. - # Watches a directory via inotify for *.json files matching the spec - # §3.1.2 schema: - # { - # "recipient": "bob", - # "type": "INFO" | "NEEDS-RESPONSE" | "ACK-REQUEST", - # "priority": "normal" | "urgent", # optional - # "payload": "...", - # "sentinel": "/path/optional" # optional - # } - # Valid → emit + delete. Invalid → moved to .dead-letter/ with reason. - drop_folder: - path: ~/Nyx/workspace/incoming/ - # Safety net if inotify misses an event. 0 (or unset) = use the - # 30-second default. There is no "disable polling" option in v1 — - # if you don't want polling, leave inotify to do its job and ignore - # the cost (a periodic readdir is microseconds). - poll_fallback_seconds: 30 diff --git a/go.mod b/go.mod deleted file mode 100644 index 2bf7c53..0000000 --- a/go.mod +++ /dev/null @@ -1,10 +0,0 @@ -module git.botbought.ai/foreman/agent-watcher - -go 1.22 - -require github.com/fsnotify/fsnotify v1.7.0 - -require ( - golang.org/x/sys v0.13.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect -) diff --git a/go.sum b/go.sum deleted file mode 100644 index 14be547..0000000 --- a/go.sum +++ /dev/null @@ -1,7 +0,0 @@ -github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= -github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/install.sh b/install.sh deleted file mode 100755 index 3699865..0000000 --- a/install.sh +++ /dev/null @@ -1,114 +0,0 @@ -#!/usr/bin/env bash -# install.sh — set up the agent-watcher Collector on this machine. -# -# Usage: -# ./install.sh # build + install + enable + start -# ./install.sh --no-start # everything but `systemctl start` -# -# What it does: -# 1. Builds the agent-watcher binary (requires Go 1.22+). -# 2. Installs to ~/.local/bin/agent-watcher. -# 3. Drops the systemd --user unit into ~/.config/systemd/user/. -# 4. Drops a starter ~/.config/agent-watcher/collector.yaml if missing -# (copies examples/collector.yaml; you edit it before first start). -# 5. Reloads systemd, enables, and (unless --no-start) starts the unit. -# 6. Adds drop-folder lifecycle patterns to the workspace .stignore so -# drop files do not Syncthing-replicate during local processing. -# -# Per CLAUDE.md rule #2, this is intended to be run by a human. The -# Collector itself, like agent-ping, never invokes this script. -# -# Layer 2 (MCP Watcher) has its own install path — different language, -# different runtime, different unit. See its README when it lands. - -set -euo pipefail - -NO_START=0 -for arg in "$@"; do - case "$arg" in - --no-start) NO_START=1 ;; - *) echo "unknown arg: $arg" >&2; exit 2 ;; - esac -done - -REPO_DIR="$(cd "$(dirname "$0")" && pwd)" -BIN_DIR="$HOME/.local/bin" -CONF_DIR="$HOME/.config/agent-watcher" -UNIT_DIR="$HOME/.config/systemd/user" -WORKSPACE="$HOME/Nyx/workspace" -STIGNORE="$WORKSPACE/.stignore" - -echo "agent-watcher install" -echo "repo: $REPO_DIR" -echo - -# 1. Check Go -echo "[1/6] checking Go toolchain" -if ! command -v go >/dev/null 2>&1; then - echo " ERROR: 'go' not found. Install Go 1.22+ first." >&2 - echo " e.g. sudo apt install golang-go" >&2 - exit 1 -fi -GO_VERSION=$(go version | awk '{print $3}') -echo " $GO_VERSION" - -# 2. Build -echo "[2/6] building agent-watcher binary" -mkdir -p "$BIN_DIR" -( cd "$REPO_DIR" && go build -o "$BIN_DIR/agent-watcher" ./cmd/agent-watcher ) -echo " installed: $BIN_DIR/agent-watcher" - -# 3. systemd unit -echo "[3/6] installing systemd --user unit" -mkdir -p "$UNIT_DIR" -install -m 0644 "$REPO_DIR/systemd/agent-watcher.service" "$UNIT_DIR/agent-watcher.service" -echo " installed: $UNIT_DIR/agent-watcher.service" - -# 4. Config skeleton -echo "[4/6] config skeleton" -mkdir -p "$CONF_DIR" -if [ -f "$CONF_DIR/collector.yaml" ]; then - echo " $CONF_DIR/collector.yaml exists — leaving as-is" -else - install -m 0644 "$REPO_DIR/examples/collector.yaml" "$CONF_DIR/collector.yaml" - echo " installed example: $CONF_DIR/collector.yaml — EDIT BEFORE START" -fi - -# 5. .stignore — drop folder is replicated, but lifecycle artifacts shouldn't be -echo "[5/6] .stignore (drop-folder lifecycle artifacts are local-only)" -if [ -d "$WORKSPACE" ]; then - touch "$STIGNORE" - for pattern in "incoming/.dead-letter/" "incoming/*.json.tmp"; do - if ! grep -qxF "$pattern" "$STIGNORE"; then - echo "$pattern" >> "$STIGNORE" - echo " added: $pattern" - fi - done -else - echo " $WORKSPACE not present — skipping (Syncthing not set up here)" -fi - -# 6. systemd reload + enable + (optionally) start -echo "[6/6] systemd reload + enable" -systemctl --user daemon-reload -systemctl --user enable agent-watcher.service >/dev/null -echo " enabled at user login (loginctl enable-linger required to run when logged out)" - -if [ "$NO_START" -eq 0 ]; then - echo " starting…" - systemctl --user restart agent-watcher.service - sleep 1 - systemctl --user status agent-watcher.service --no-pager -n 5 || true -else - echo " --no-start: not starting; run 'systemctl --user start agent-watcher' when ready" -fi - -echo -echo "installation complete." -echo -echo "next steps:" -echo " 1. EDIT $CONF_DIR/collector.yaml to suit this host." -echo " 2. systemctl --user restart agent-watcher (after edits)." -echo " 3. journalctl --user -u agent-watcher -f (watch logs)." -echo " 4. curl http://127.0.0.1:18790/health (sanity check)." -echo " 5. To survive logout: loginctl enable-linger \$USER" diff --git a/internal/config/config.go b/internal/config/config.go deleted file mode 100644 index b54f80b..0000000 --- a/internal/config/config.go +++ /dev/null @@ -1,179 +0,0 @@ -// Package config loads the Collector's per-host YAML configuration. -// -// Config layout (spec §3.2): -// -// agent: foreman # this host's identity -// inbox_dir: ~/Nyx/workspace/pings # optional; default shown -// sources: -// webhook: -// listen: 127.0.0.1:18790 -// routes: -// /forgejo/push: -// recipient: bob -// type: INFO -// priority: normal # optional -// payload_template: "..." -// drop_folder: -// path: ~/Nyx/workspace/incoming/ -// poll_fallback_seconds: 30 -// -// Either source may be omitted; the Collector starts with whatever is -// configured. Validation is strict — an unknown top-level field or a -// malformed route returns an error. -package config - -import ( - "errors" - "fmt" - "os" - "path/filepath" - "strings" - - "gopkg.in/yaml.v3" - - "git.botbought.ai/foreman/agent-watcher/internal/source/webhook" -) - -// Config is the loaded, validated configuration. -type Config struct { - Agent string `yaml:"agent"` - InboxDir string `yaml:"inbox_dir,omitempty"` - Sources Sources `yaml:"sources"` -} - -// Sources groups per-source-type configuration. -type Sources struct { - Webhook *WebhookConfig `yaml:"webhook,omitempty"` - DropFolder *DropFolderConfig `yaml:"drop_folder,omitempty"` -} - -// WebhookConfig configures the HTTP webhook source. -type WebhookConfig struct { - Listen string `yaml:"listen"` - Routes map[string]WebhookRoute `yaml:"routes"` -} - -// WebhookRoute is one entry in the webhook routing table. -type WebhookRoute struct { - Recipient string `yaml:"recipient"` - Type string `yaml:"type"` - Priority string `yaml:"priority,omitempty"` - PayloadTemplate string `yaml:"payload_template"` -} - -// DropFolderConfig configures the drop-folder source. -type DropFolderConfig struct { - Path string `yaml:"path"` - PollFallbackSeconds int `yaml:"poll_fallback_seconds,omitempty"` -} - -// Default values applied when fields are unset. -const ( - DefaultInboxDir = "~/Nyx/workspace/pings" - DefaultDropPath = "~/Nyx/workspace/incoming" - DefaultWebhookListen = "127.0.0.1:18790" - DefaultPollFallbackSeconds = 30 -) - -// Load parses the YAML file at path, applies defaults, and validates. -func Load(path string) (*Config, error) { - body, err := os.ReadFile(path) - if err != nil { - return nil, fmt.Errorf("config: read %s: %w", path, err) - } - return parse(body) -} - -func parse(body []byte) (*Config, error) { - var c Config - dec := yaml.NewDecoder(strings.NewReader(string(body))) - dec.KnownFields(true) - if err := dec.Decode(&c); err != nil { - return nil, fmt.Errorf("config: parse: %w", err) - } - if err := c.applyDefaultsAndValidate(); err != nil { - return nil, err - } - return &c, nil -} - -func (c *Config) applyDefaultsAndValidate() error { - if c.Agent == "" { - return errors.New("config: agent is required") - } - if c.InboxDir == "" { - c.InboxDir = DefaultInboxDir - } - c.InboxDir = expand(c.InboxDir) - - if c.Sources.Webhook == nil && c.Sources.DropFolder == nil { - return errors.New("config: at least one source (webhook or drop_folder) must be configured") - } - - if w := c.Sources.Webhook; w != nil { - if w.Listen == "" { - w.Listen = DefaultWebhookListen - } - if len(w.Routes) == 0 { - return errors.New("config: webhook configured but no routes defined") - } - for path, r := range w.Routes { - if !strings.HasPrefix(path, "/") { - return fmt.Errorf("config: webhook route %q must start with /", path) - } - if r.Recipient == "" { - return fmt.Errorf("config: webhook route %s: recipient required", path) - } - if r.Type == "" { - return fmt.Errorf("config: webhook route %s: type required", path) - } - if r.PayloadTemplate == "" { - return fmt.Errorf("config: webhook route %s: payload_template required", path) - } - } - } - - if d := c.Sources.DropFolder; d != nil { - if d.Path == "" { - d.Path = DefaultDropPath - } - d.Path = expand(d.Path) - if d.PollFallbackSeconds == 0 { - d.PollFallbackSeconds = DefaultPollFallbackSeconds - } - if d.PollFallbackSeconds < 0 { - return errors.New("config: drop_folder.poll_fallback_seconds must be >= 0") - } - } - return nil -} - -// expand expands a leading ~/ to $HOME. -func expand(p string) string { - if !strings.HasPrefix(p, "~/") && p != "~" { - return p - } - home, err := os.UserHomeDir() - if err != nil || home == "" { - return p - } - if p == "~" { - return home - } - return filepath.Join(home, p[2:]) -} - -// ToWebhookRouteMap converts validated routes to the source/webhook.Route -// shape so main.go can hand them straight to webhook.New. -func (w *WebhookConfig) ToWebhookRouteMap() map[string]webhook.Route { - out := make(map[string]webhook.Route, len(w.Routes)) - for path, r := range w.Routes { - out[path] = webhook.Route{ - Recipient: r.Recipient, - Type: r.Type, - Priority: r.Priority, - PayloadTemplate: r.PayloadTemplate, - } - } - return out -} diff --git a/internal/config/config_test.go b/internal/config/config_test.go deleted file mode 100644 index ccae24f..0000000 --- a/internal/config/config_test.go +++ /dev/null @@ -1,168 +0,0 @@ -package config - -import ( - "strings" - "testing" -) - -func TestParse_FullValid(t *testing.T) { - body := []byte(` -agent: foreman -inbox_dir: /tmp/inbox -sources: - webhook: - listen: 127.0.0.1:18790 - routes: - /forgejo/push: - recipient: bob - type: INFO - payload_template: "push to {{ .repo }}" - /alert: - recipient: bob - type: NEEDS-RESPONSE - priority: urgent - payload_template: "alert" - drop_folder: - path: /tmp/incoming - poll_fallback_seconds: 60 -`) - c, err := parse(body) - if err != nil { - t.Fatal(err) - } - if c.Agent != "foreman" { - t.Errorf("agent = %q", c.Agent) - } - if c.InboxDir != "/tmp/inbox" { - t.Errorf("inbox = %q", c.InboxDir) - } - if c.Sources.Webhook == nil || len(c.Sources.Webhook.Routes) != 2 { - t.Errorf("webhook routes: %+v", c.Sources.Webhook) - } - if c.Sources.DropFolder == nil || c.Sources.DropFolder.Path != "/tmp/incoming" { - t.Errorf("dropfolder: %+v", c.Sources.DropFolder) - } -} - -func TestParse_AppliesDefaults(t *testing.T) { - body := []byte(` -agent: foreman -sources: - drop_folder: - path: /tmp/x -`) - c, err := parse(body) - if err != nil { - t.Fatal(err) - } - if c.InboxDir == "" { - t.Error("inbox_dir default not applied") - } - if c.Sources.DropFolder.PollFallbackSeconds != DefaultPollFallbackSeconds { - t.Errorf("poll default = %d", c.Sources.DropFolder.PollFallbackSeconds) - } -} - -func TestParse_HomeExpansion(t *testing.T) { - body := []byte(` -agent: foreman -sources: - drop_folder: - path: ~/foo/bar -`) - c, err := parse(body) - if err != nil { - t.Fatal(err) - } - if strings.HasPrefix(c.Sources.DropFolder.Path, "~") { - t.Errorf("~ not expanded: %q", c.Sources.DropFolder.Path) - } - if !strings.HasSuffix(c.Sources.DropFolder.Path, "/foo/bar") { - t.Errorf("expansion lost suffix: %q", c.Sources.DropFolder.Path) - } -} - -func TestParse_RejectsInvalid(t *testing.T) { - cases := map[string]string{ - "missing agent": `sources:\n webhook:\n routes: {}`, - "no sources": `agent: x`, - "empty webhook routes": ` -agent: x -sources: - webhook: - listen: 127.0.0.1:18790 - routes: {} -`, - "route missing recipient": ` -agent: x -sources: - webhook: - routes: - /x: - type: INFO - payload_template: y -`, - "route missing type": ` -agent: x -sources: - webhook: - routes: - /x: - recipient: r - payload_template: y -`, - "route missing template": ` -agent: x -sources: - webhook: - routes: - /x: - recipient: r - type: INFO -`, - "route path no slash": ` -agent: x -sources: - webhook: - routes: - bad: - recipient: r - type: INFO - payload_template: y -`, - "unknown top-level field": ` -agent: x -typo: oops -sources: - drop_folder: - path: /x -`, - "poll negative": ` -agent: x -sources: - drop_folder: - path: /x - poll_fallback_seconds: -1 -`, - } - for name, body := range cases { - t.Run(name, func(t *testing.T) { - if _, err := parse([]byte(body)); err == nil { - t.Errorf("expected error for: %s", name) - } - }) - } -} - -func TestToWebhookRouteMap(t *testing.T) { - w := &WebhookConfig{ - Routes: map[string]WebhookRoute{ - "/x": {Recipient: "r", Type: "INFO", Priority: "urgent", PayloadTemplate: "p"}, - }, - } - got := w.ToWebhookRouteMap() - r := got["/x"] - if r.Recipient != "r" || r.Type != "INFO" || r.Priority != "urgent" || r.PayloadTemplate != "p" { - t.Errorf("conversion lost data: %+v", r) - } -} diff --git a/internal/inbox/inbox.go b/internal/inbox/inbox.go deleted file mode 100644 index c687c95..0000000 --- a/internal/inbox/inbox.go +++ /dev/null @@ -1,129 +0,0 @@ -// Package inbox writes ping-shaped JSONL events to recipient inbox files. -// -// The output format is bit-identical to the agent-ping CLI's output, so the -// existing UserPromptSubmit hook and the future MCP Watcher cannot tell whether -// a line was written by `ping` or by the Collector. -// -// All writes are append-only with O_APPEND so concurrent writers don't tear -// each other's lines (POSIX guarantees atomicity for writes <= PIPE_BUF, and -// our lines are well under that). -package inbox - -import ( - "crypto/rand" - "encoding/hex" - "encoding/json" - "fmt" - "os" - "path/filepath" - "sync" - "time" -) - -// Event is the JSONL line shape. Field order and JSON keys match the ping CLI -// exactly. Optional fields use omitempty. -type Event struct { - TS string `json:"ts"` - ID string `json:"id"` - From string `json:"from"` - To string `json:"to"` - Type string `json:"type"` - Priority string `json:"priority"` - Payload string `json:"payload"` - Sentinel string `json:"sentinel,omitempty"` - Source string `json:"source,omitempty"` // collector-only debug field, e.g. "webhook:/forgejo/push" -} - -// Writer appends events to inbox files under a base directory. -// -// Concurrency: a single Writer is safe for use from multiple goroutines. -// One sync.Mutex per recipient bounds contention to per-file granularity. -// -// The locks map grows monotonically as new recipients appear. In practice -// the agent network has <10 recipients, so the leak is bounded; if this -// daemon ever lives in a system with thousands of recipients, replace the -// map with an LRU or shard-and-evict scheme. -type Writer struct { - dir string - - mu sync.Mutex - locks map[string]*sync.Mutex -} - -// NewWriter returns a Writer that appends to {dir}/{recipient}.inbox. -// The directory is created (with perms 0755) if it does not exist. -func NewWriter(dir string) (*Writer, error) { - if dir == "" { - return nil, fmt.Errorf("inbox: empty dir") - } - if err := os.MkdirAll(dir, 0755); err != nil { - return nil, fmt.Errorf("inbox: mkdir %s: %w", dir, err) - } - return &Writer{ - dir: dir, - locks: make(map[string]*sync.Mutex), - }, nil -} - -// Write appends one event to the recipient's inbox file. It mutates ev to set -// TS and ID if either is empty, then sets To from recipient. -func (w *Writer) Write(recipient string, ev *Event) error { - if recipient == "" { - return fmt.Errorf("inbox: empty recipient") - } - if ev == nil { - return fmt.Errorf("inbox: nil event") - } - if ev.TS == "" { - ev.TS = time.Now().UTC().Format("2006-01-02T15:04:05Z") - } - if ev.ID == "" { - id, err := newID() - if err != nil { - return fmt.Errorf("inbox: generate id: %w", err) - } - ev.ID = id - } - ev.To = recipient - - line, err := json.Marshal(ev) - if err != nil { - return fmt.Errorf("inbox: marshal: %w", err) - } - line = append(line, '\n') - - lock := w.lockFor(recipient) - lock.Lock() - defer lock.Unlock() - - path := filepath.Join(w.dir, recipient+".inbox") - f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return fmt.Errorf("inbox: open %s: %w", path, err) - } - defer f.Close() - - if _, err := f.Write(line); err != nil { - return fmt.Errorf("inbox: append: %w", err) - } - return nil -} - -func (w *Writer) lockFor(recipient string) *sync.Mutex { - w.mu.Lock() - defer w.mu.Unlock() - if l, ok := w.locks[recipient]; ok { - return l - } - l := &sync.Mutex{} - w.locks[recipient] = l - return l -} - -func newID() (string, error) { - b := make([]byte, 4) - if _, err := rand.Read(b); err != nil { - return "", err - } - return "ping-" + hex.EncodeToString(b), nil -} diff --git a/internal/inbox/inbox_test.go b/internal/inbox/inbox_test.go deleted file mode 100644 index 22f199e..0000000 --- a/internal/inbox/inbox_test.go +++ /dev/null @@ -1,182 +0,0 @@ -package inbox - -import ( - "encoding/json" - "os" - "path/filepath" - "strings" - "sync" - "testing" -) - -func TestWriter_Write_BasicShape(t *testing.T) { - dir := t.TempDir() - w, err := NewWriter(dir) - if err != nil { - t.Fatal(err) - } - - if err := w.Write("bob", &Event{ - From: "foreman", - Type: "INFO", - Priority: "normal", - Payload: "hi", - }); err != nil { - t.Fatal(err) - } - - got := readLines(t, filepath.Join(dir, "bob.inbox")) - if len(got) != 1 { - t.Fatalf("want 1 line, got %d", len(got)) - } - - var ev Event - if err := json.Unmarshal([]byte(got[0]), &ev); err != nil { - t.Fatalf("not JSON: %v", err) - } - if ev.To != "bob" { - t.Errorf("To = %q, want bob", ev.To) - } - if ev.From != "foreman" { - t.Errorf("From = %q, want foreman", ev.From) - } - if ev.Payload != "hi" { - t.Errorf("Payload = %q", ev.Payload) - } - if !strings.HasPrefix(ev.ID, "ping-") { - t.Errorf("ID %q missing ping- prefix", ev.ID) - } - if len(ev.ID) != len("ping-")+8 { - t.Errorf("ID %q wrong length", ev.ID) - } - if ev.TS == "" { - t.Errorf("TS unset") - } -} - -func TestWriter_Write_PreservesProvidedIDAndTS(t *testing.T) { - dir := t.TempDir() - w, err := NewWriter(dir) - if err != nil { - t.Fatal(err) - } - - want := &Event{ - TS: "2026-05-06T12:00:00Z", - ID: "ping-deadbeef", - From: "x", - Type: "INFO", - Priority: "normal", - Payload: "p", - } - if err := w.Write("r", want); err != nil { - t.Fatal(err) - } - - got := readLines(t, filepath.Join(dir, "r.inbox")) - var ev Event - json.Unmarshal([]byte(got[0]), &ev) - if ev.ID != want.ID { - t.Errorf("ID overwritten: got %q want %q", ev.ID, want.ID) - } - if ev.TS != want.TS { - t.Errorf("TS overwritten: got %q want %q", ev.TS, want.TS) - } -} - -func TestWriter_Write_OmitsEmptyOptionalFields(t *testing.T) { - dir := t.TempDir() - w, _ := NewWriter(dir) - w.Write("r", &Event{From: "f", Type: "INFO", Priority: "normal", Payload: "p"}) - - line := readLines(t, filepath.Join(dir, "r.inbox"))[0] - if strings.Contains(line, "sentinel") { - t.Errorf("empty sentinel leaked into JSON: %s", line) - } - if strings.Contains(line, "source") { - t.Errorf("empty source leaked into JSON: %s", line) - } -} - -func TestWriter_Write_PingShapeMatchesCLI(t *testing.T) { - // The ping CLI emits JSON with separators=(",", ":") (compact) and these - // keys in this order: ts, id, from, to, type, priority, payload [, sentinel]. - // Verify our marshaled output preserves the key set and compactness. - dir := t.TempDir() - w, _ := NewWriter(dir) - w.Write("r", &Event{ - From: "f", Type: "INFO", Priority: "normal", Payload: "p", Sentinel: "/x", - }) - line := strings.TrimSpace(readLines(t, filepath.Join(dir, "r.inbox"))[0]) - - wantKeys := []string{`"ts":`, `"id":`, `"from":`, `"to":`, `"type":`, `"priority":`, `"payload":`, `"sentinel":`} - for _, k := range wantKeys { - if !strings.Contains(line, k) { - t.Errorf("missing key %s in: %s", k, line) - } - } - // compact (no spaces around colons or commas) - if strings.Contains(line, ": ") || strings.Contains(line, ", ") { - t.Errorf("non-compact JSON: %s", line) - } -} - -func TestWriter_Write_ConcurrentSameRecipient(t *testing.T) { - // 100 goroutines writing to the same inbox should produce 100 valid JSON - // lines, none torn or interleaved. - dir := t.TempDir() - w, _ := NewWriter(dir) - - const n = 100 - var wg sync.WaitGroup - for i := 0; i < n; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - w.Write("r", &Event{From: "f", Type: "INFO", Priority: "normal", Payload: "x"}) - }(i) - } - wg.Wait() - - lines := readLines(t, filepath.Join(dir, "r.inbox")) - if len(lines) != n { - t.Fatalf("got %d lines, want %d (torn writes?)", len(lines), n) - } - for i, l := range lines { - var ev Event - if err := json.Unmarshal([]byte(l), &ev); err != nil { - t.Errorf("line %d not JSON: %v: %s", i, err, l) - } - } -} - -func TestWriter_Write_RejectsBadInputs(t *testing.T) { - w, _ := NewWriter(t.TempDir()) - if err := w.Write("", &Event{}); err == nil { - t.Error("empty recipient: expected error") - } - if err := w.Write("r", nil); err == nil { - t.Error("nil event: expected error") - } -} - -func TestNewWriter_RejectsEmptyDir(t *testing.T) { - if _, err := NewWriter(""); err == nil { - t.Error("expected error for empty dir") - } -} - -func readLines(t *testing.T, path string) []string { - t.Helper() - b, err := os.ReadFile(path) - if err != nil { - t.Fatal(err) - } - out := []string{} - for _, l := range strings.Split(strings.TrimRight(string(b), "\n"), "\n") { - if l != "" { - out = append(out, l) - } - } - return out -} diff --git a/internal/source/dropfolder/dropfolder.go b/internal/source/dropfolder/dropfolder.go deleted file mode 100644 index 3488f36..0000000 --- a/internal/source/dropfolder/dropfolder.go +++ /dev/null @@ -1,259 +0,0 @@ -// Package dropfolder implements the drop-folder Source. -// -// Producers drop *.json files into a designated directory. The Source ingests -// each file, validates it against a schema, emits an event, and deletes the -// file. Malformed files are moved to a .dead-letter/ subdirectory for -// inspection. -// -// Lifecycle: on startup, scan the folder and ingest any pre-existing files -// (catches up after a Collector restart). Then watch via inotify for new -// arrivals. A periodic poll (default 30s) is a safety net in case inotify -// misses a Syncthing-driven rename — Syncthing usually fires IN_CLOSE_WRITE -// on rename, but the spec calls for a fallback. -// -// Drop file schema (per spec §3.1.2): -// -// { -// "recipient": "bob", -// "type": "INFO" | "NEEDS-RESPONSE" | "ACK-REQUEST", -// "priority": "normal" | "urgent", // optional, default "normal" -// "payload": "...", -// "sentinel": "/path/optional" // optional -// } -package dropfolder - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "log/slog" - "os" - "path/filepath" - "strings" - "time" - - "github.com/fsnotify/fsnotify" - - "git.botbought.ai/foreman/agent-watcher/internal/inbox" - "git.botbought.ai/foreman/agent-watcher/internal/source" -) - -// Validation rules duplicated from agent-ping's CLI for tightness. -var validTypes = map[string]bool{ - "INFO": true, - "NEEDS-RESPONSE": true, - "ACK-REQUEST": true, -} - -var validPriorities = map[string]bool{ - "": true, // means "normal" - "normal": true, - "urgent": true, -} - -// Config configures a drop-folder Source. -type Config struct { - // Path is the directory to watch. Created if absent. - Path string - - // PollFallbackSeconds is a periodic full-scan interval in case inotify - // misses an event. Set to 0 to disable. - PollFallbackSeconds int -} - -// Source watches a directory for *.json drop files and emits events. -type Source struct { - cfg Config - logger *slog.Logger -} - -// New returns a configured (but not running) Source. -func New(cfg Config, logger *slog.Logger) *Source { - if logger == nil { - logger = slog.Default() - } - if cfg.PollFallbackSeconds == 0 { - cfg.PollFallbackSeconds = 30 - } - return &Source{cfg: cfg, logger: logger.With("source", "drop-folder", "path", cfg.Path)} -} - -func (s *Source) Name() string { return "drop-folder" } - -// Run blocks until ctx is canceled. -func (s *Source) Run(ctx context.Context, emit source.Emit) error { - if err := os.MkdirAll(s.cfg.Path, 0755); err != nil { - return fmt.Errorf("dropfolder: mkdir %s: %w", s.cfg.Path, err) - } - if err := os.MkdirAll(s.deadLetterDir(), 0755); err != nil { - return fmt.Errorf("dropfolder: mkdir dead-letter: %w", err) - } - - w, err := fsnotify.NewWatcher() - if err != nil { - return fmt.Errorf("dropfolder: new watcher: %w", err) - } - defer w.Close() - if err := w.Add(s.cfg.Path); err != nil { - return fmt.Errorf("dropfolder: add %s: %w", s.cfg.Path, err) - } - - // Initial scan — drain any files that landed before we were watching. - s.scan(ctx, emit) - - pollEvery := time.Duration(s.cfg.PollFallbackSeconds) * time.Second - var pollCh <-chan time.Time - if pollEvery > 0 { - t := time.NewTicker(pollEvery) - defer t.Stop() - pollCh = t.C - } - - for { - select { - case <-ctx.Done(): - return ctx.Err() - - case ev, ok := <-w.Events: - if !ok { - return errors.New("dropfolder: watcher closed") - } - // Care about creates and renames-into (Syncthing uses rename). - if ev.Op&(fsnotify.Create|fsnotify.Write) == 0 { - continue - } - if !strings.HasSuffix(ev.Name, ".json") { - continue - } - s.handle(ctx, emit, ev.Name) - - case err, ok := <-w.Errors: - if !ok { - return errors.New("dropfolder: watcher errors closed") - } - s.logger.Warn("watcher error", "err", err) - - case <-pollCh: - s.scan(ctx, emit) - } - } -} - -// scan ingests every *.json file currently in the drop folder. -func (s *Source) scan(ctx context.Context, emit source.Emit) { - entries, err := os.ReadDir(s.cfg.Path) - if err != nil { - s.logger.Error("scan readdir", "err", err) - return - } - for _, e := range entries { - if ctx.Err() != nil { - return - } - if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") { - continue - } - s.handle(ctx, emit, filepath.Join(s.cfg.Path, e.Name())) - } -} - -// handle reads, validates, emits, and removes (or dead-letters) one drop file. -func (s *Source) handle(ctx context.Context, emit source.Emit, path string) { - if ctx.Err() != nil { - return - } - body, err := os.ReadFile(path) - if err != nil { - // File may have been ingested already by another loop iteration. - if errors.Is(err, os.ErrNotExist) { - return - } - s.logger.Warn("read drop file", "path", path, "err", err) - s.deadLetter(path, "read failed") - return - } - - pd, err := parseDrop(body) - if err != nil { - s.logger.Warn("invalid drop file", "path", path, "err", err) - s.deadLetter(path, "schema invalid: "+err.Error()) - return - } - pd.event.Source = "drop-folder:" + filepath.Base(path) - - if err := emit(pd.recipient, pd.event); err != nil { - // Inbox write failure: don't lose the message. Leave the file in - // place so the next scan retries. Log loudly. - s.logger.Error("emit failed; drop file retained for retry", "path", path, "err", err) - return - } - - if err := os.Remove(path); err != nil { - s.logger.Warn("remove drop file", "path", path, "err", err) - } -} - -func (s *Source) deadLetter(path, reason string) { - target := filepath.Join(s.deadLetterDir(), filepath.Base(path)) - if err := os.Rename(path, target); err != nil { - s.logger.Error("dead-letter move failed", "path", path, "err", err) - return - } - // Sidecar reason file for forensics. - _ = os.WriteFile(target+".reason", []byte(reason+"\n"), 0644) - s.logger.Info("moved to dead-letter", "from", path, "to", target, "reason", reason) -} - -func (s *Source) deadLetterDir() string { - return filepath.Join(s.cfg.Path, ".dead-letter") -} - -// dropFile is the on-disk schema; intermediate value during parse. -type dropFile struct { - Recipient string `json:"recipient"` - Type string `json:"type"` - Priority string `json:"priority,omitempty"` - Payload string `json:"payload"` - Sentinel string `json:"sentinel,omitempty"` -} - -type parsedDrop struct { - recipient string - event *inbox.Event -} - -func parseDrop(body []byte) (*parsedDrop, error) { - var d dropFile - dec := json.NewDecoder(strings.NewReader(string(body))) - dec.DisallowUnknownFields() - if err := dec.Decode(&d); err != nil { - return nil, fmt.Errorf("decode: %w", err) - } - if d.Recipient == "" { - return nil, errors.New("recipient required") - } - if !validTypes[d.Type] { - return nil, fmt.Errorf("type must be INFO|NEEDS-RESPONSE|ACK-REQUEST, got %q", d.Type) - } - if !validPriorities[d.Priority] { - return nil, fmt.Errorf("priority must be normal|urgent, got %q", d.Priority) - } - if d.Payload == "" { - return nil, errors.New("payload required") - } - priority := d.Priority - if priority == "" { - priority = "normal" - } - return &parsedDrop{ - recipient: d.Recipient, - event: &inbox.Event{ - From: "collector", - Type: d.Type, - Priority: priority, - Payload: d.Payload, - Sentinel: d.Sentinel, - }, - }, nil -} diff --git a/internal/source/dropfolder/dropfolder_test.go b/internal/source/dropfolder/dropfolder_test.go deleted file mode 100644 index 4e1d508..0000000 --- a/internal/source/dropfolder/dropfolder_test.go +++ /dev/null @@ -1,273 +0,0 @@ -package dropfolder - -import ( - "context" - "encoding/json" - "io" - "log/slog" - "os" - "path/filepath" - "sync" - "testing" - "time" - - "git.botbought.ai/foreman/agent-watcher/internal/inbox" -) - -func quietLogger() *slog.Logger { - return slog.New(slog.NewTextHandler(io.Discard, nil)) -} - -func TestParseDrop_Valid(t *testing.T) { - body := []byte(`{"recipient":"bob","type":"INFO","priority":"urgent","payload":"hi","sentinel":"/x"}`) - p, err := parseDrop(body) - if err != nil { - t.Fatal(err) - } - if p.recipient != "bob" { - t.Errorf("recipient = %q", p.recipient) - } - if p.event.Type != "INFO" || p.event.Priority != "urgent" || p.event.Payload != "hi" || p.event.Sentinel != "/x" { - t.Errorf("unexpected event: %+v", p.event) - } -} - -func TestParseDrop_DefaultsPriority(t *testing.T) { - p, err := parseDrop([]byte(`{"recipient":"r","type":"INFO","payload":"p"}`)) - if err != nil { - t.Fatal(err) - } - if p.event.Priority != "normal" { - t.Errorf("priority default = %q, want normal", p.event.Priority) - } -} - -func TestParseDrop_Invalid(t *testing.T) { - cases := map[string]string{ - "empty body": ``, - "missing recipient": `{"type":"INFO","payload":"p"}`, - "missing type": `{"recipient":"r","payload":"p"}`, - "bad type": `{"recipient":"r","type":"NOPE","payload":"p"}`, - "bad priority": `{"recipient":"r","type":"INFO","priority":"high","payload":"p"}`, - "missing payload": `{"recipient":"r","type":"INFO"}`, - "unknown field": `{"recipient":"r","type":"INFO","payload":"p","stowaway":1}`, - "not json": `not json`, - } - for name, body := range cases { - t.Run(name, func(t *testing.T) { - if _, err := parseDrop([]byte(body)); err == nil { - t.Error("expected error") - } - }) - } -} - -// recordingEmit captures emitted events for assertion. -type recordingEmit struct { - mu sync.Mutex - events []record - err error // set to fail the next emit -} - -type record struct { - recipient string - event inbox.Event -} - -func (r *recordingEmit) emit(recipient string, ev *inbox.Event) error { - r.mu.Lock() - defer r.mu.Unlock() - if r.err != nil { - err := r.err - r.err = nil - return err - } - r.events = append(r.events, record{recipient, *ev}) - return nil -} - -func (r *recordingEmit) snapshot() []record { - r.mu.Lock() - defer r.mu.Unlock() - out := make([]record, len(r.events)) - copy(out, r.events) - return out -} - -func writeDrop(t *testing.T, dir, name string, payload any) string { - t.Helper() - path := filepath.Join(dir, name) - b, err := json.Marshal(payload) - if err != nil { - t.Fatal(err) - } - tmp := path + ".tmp" - if err := os.WriteFile(tmp, b, 0644); err != nil { - t.Fatal(err) - } - // rename for atomic visibility (mirrors what Syncthing does) - if err := os.Rename(tmp, path); err != nil { - t.Fatal(err) - } - return path -} - -func TestSource_InitialScan_IngestsExistingFiles(t *testing.T) { - dir := t.TempDir() - writeDrop(t, dir, "1.json", map[string]string{ - "recipient": "bob", "type": "INFO", "payload": "first", - }) - writeDrop(t, dir, "2.json", map[string]string{ - "recipient": "bob", "type": "NEEDS-RESPONSE", "payload": "second", - }) - - rec := &recordingEmit{} - src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) - - ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) - defer cancel() - go src.Run(ctx, rec.emit) - - waitFor(t, func() bool { return len(rec.snapshot()) == 2 }, 400*time.Millisecond) - - got := rec.snapshot() - if got[0].event.Payload+"|"+got[1].event.Payload != "first|second" && - got[0].event.Payload+"|"+got[1].event.Payload != "second|first" { - t.Errorf("unexpected payloads: %+v", got) - } - for _, r := range got { - if r.recipient != "bob" { - t.Errorf("recipient %q", r.recipient) - } - } -} - -func TestSource_LiveDrop_IngestsViaInotify(t *testing.T) { - dir := t.TempDir() - rec := &recordingEmit{} - src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go src.Run(ctx, rec.emit) - - // Give the watcher time to attach. - time.Sleep(50 * time.Millisecond) - writeDrop(t, dir, "live.json", map[string]string{ - "recipient": "foreman", "type": "INFO", "payload": "live one", - }) - - waitFor(t, func() bool { return len(rec.snapshot()) == 1 }, 1*time.Second) - - got := rec.snapshot() - if got[0].event.Payload != "live one" { - t.Errorf("payload %q", got[0].event.Payload) - } -} - -func TestSource_DeleteAfterEmit(t *testing.T) { - dir := t.TempDir() - rec := &recordingEmit{} - src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go src.Run(ctx, rec.emit) - - time.Sleep(50 * time.Millisecond) - path := writeDrop(t, dir, "ok.json", map[string]string{ - "recipient": "r", "type": "INFO", "payload": "p", - }) - - waitFor(t, func() bool { - _, err := os.Stat(path) - return os.IsNotExist(err) - }, 1*time.Second) -} - -func TestSource_DeadLetter_OnInvalidSchema(t *testing.T) { - dir := t.TempDir() - rec := &recordingEmit{} - src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go src.Run(ctx, rec.emit) - - time.Sleep(50 * time.Millisecond) - // Missing payload — invalid. - tmp := filepath.Join(dir, "bad.json.tmp") - os.WriteFile(tmp, []byte(`{"recipient":"r","type":"INFO"}`), 0644) - os.Rename(tmp, filepath.Join(dir, "bad.json")) - - deadPath := filepath.Join(dir, ".dead-letter", "bad.json") - waitFor(t, func() bool { - _, err := os.Stat(deadPath) - return err == nil - }, 1*time.Second) - - // Reason sidecar exists. - if _, err := os.Stat(deadPath + ".reason"); err != nil { - t.Errorf("reason sidecar missing: %v", err) - } - - // Original file gone from drop dir. - if _, err := os.Stat(filepath.Join(dir, "bad.json")); !os.IsNotExist(err) { - t.Errorf("original drop file should be gone") - } - - // Nothing was emitted. - if got := rec.snapshot(); len(got) != 0 { - t.Errorf("got events for invalid drop: %+v", got) - } -} - -func TestSource_RetainsFile_WhenEmitFails(t *testing.T) { - dir := t.TempDir() - rec := &recordingEmit{err: errOnce()} - src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go src.Run(ctx, rec.emit) - - time.Sleep(50 * time.Millisecond) - path := writeDrop(t, dir, "retry.json", map[string]string{ - "recipient": "r", "type": "INFO", "payload": "p", - }) - - // Wait long enough for the (failing) emit to have happened. - time.Sleep(200 * time.Millisecond) - - // Drop file is still there (retained for retry). - if _, err := os.Stat(path); err != nil { - t.Errorf("drop file should be retained on emit failure: %v", err) - } - // Not in dead-letter. - if _, err := os.Stat(filepath.Join(dir, ".dead-letter", "retry.json")); err == nil { - t.Error("emit failure should NOT dead-letter (it's transient)") - } -} - -// errOnce returns a non-nil error one time, then nil after. -func errOnce() error { - type e struct{} - return &emitErr{} -} - -type emitErr struct{} - -func (e *emitErr) Error() string { return "transient emit failure" } - -// waitFor polls predicate until it returns true or timeout elapses. -func waitFor(t *testing.T, pred func() bool, timeout time.Duration) { - t.Helper() - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - if pred() { - return - } - time.Sleep(10 * time.Millisecond) - } - t.Fatalf("waitFor: predicate did not become true within %s", timeout) -} diff --git a/internal/source/source.go b/internal/source/source.go deleted file mode 100644 index d9d914f..0000000 --- a/internal/source/source.go +++ /dev/null @@ -1,35 +0,0 @@ -// Package source defines the contract every Collector source plugin implements. -// -// A Source is a long-running goroutine that converts external events into -// inbox writes. It receives an Emit callback at start; each external event it -// observes results in one Emit call. The Source returns when its context is -// canceled, or earlier on a fatal error. -// -// Sources do not own the inbox writer or any other shared state. They emit; -// the dispatcher routes. This keeps each source small and testable in -// isolation. -package source - -import ( - "context" - - "git.botbought.ai/foreman/agent-watcher/internal/inbox" -) - -// Emit is the callback a Source uses to push one observed event into the -// dispatcher. The dispatcher writes it to recipient's inbox file. Returning -// an error means the inbox write failed; sources may log and continue, or -// drop, or shut down — that's a per-source policy decision. -type Emit func(recipient string, ev *inbox.Event) error - -// Source is a Collector input. Implementations are constructed with their -// own configuration and started via Run. -type Source interface { - // Name is a short identifier for logs and the "source" field on emitted - // events ("webhook", "drop-folder", etc.). - Name() string - - // Run blocks until ctx is canceled or a fatal error occurs. Each - // observed external event becomes one Emit call. - Run(ctx context.Context, emit Emit) error -} diff --git a/internal/source/webhook/webhook.go b/internal/source/webhook/webhook.go deleted file mode 100644 index 18da159..0000000 --- a/internal/source/webhook/webhook.go +++ /dev/null @@ -1,248 +0,0 @@ -// Package webhook implements the HTTP webhook Source. -// -// Listens on a loopback address and routes incoming POSTs to inbox events -// according to a routing table. Each route specifies recipient, type, -// priority, and a Go text/template that renders the payload from the request -// body decoded as JSON. -// -// v1 is loopback-only (127.0.0.1) — no authentication. The spec calls for -// Caddy + bearer-token reverse-proxy as the v2 upgrade path. Do NOT bind to -// 0.0.0.0 in v1. -// -// Per spec §3.5, the same listener also exposes /health with per-source -// counters. -package webhook - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "log/slog" - "net" - "net/http" - "sync/atomic" - "text/template" - "time" - - "git.botbought.ai/foreman/agent-watcher/internal/inbox" - "git.botbought.ai/foreman/agent-watcher/internal/source" -) - -// Route maps a URL path to an emitted event. -type Route struct { - // Recipient is the target inbox name. - Recipient string - - // Type is one of INFO, NEEDS-RESPONSE, ACK-REQUEST. - Type string - - // Priority is "normal" or "urgent". Empty defaults to "normal". - Priority string - - // PayloadTemplate is a Go text/template rendered with the request body - // (decoded as JSON into a map[string]any) as the dot. Plain strings - // without {{}} render to themselves. - PayloadTemplate string -} - -// Config configures a webhook Source. -type Config struct { - // Listen is the bind address, e.g. "127.0.0.1:18790". - Listen string - - // Routes maps URL path → Route. Path includes the leading slash. - Routes map[string]Route -} - -// Source serves HTTP and emits an inbox event for each matched POST. -type Source struct { - cfg Config - logger *slog.Logger - - tmpls map[string]*template.Template - - // counters - received atomic.Uint64 - emitted atomic.Uint64 - errors atomic.Uint64 - startedAt time.Time -} - -// New parses the route templates and returns a configured Source. -// Returns an error if any template fails to parse. -func New(cfg Config, logger *slog.Logger) (*Source, error) { - if logger == nil { - logger = slog.Default() - } - if cfg.Listen == "" { - return nil, errors.New("webhook: listen address required") - } - if !isLoopback(cfg.Listen) { - return nil, fmt.Errorf("webhook: listen %q is not a loopback address; v1 forbids non-loopback binds", cfg.Listen) - } - - tmpls := make(map[string]*template.Template, len(cfg.Routes)) - for path, r := range cfg.Routes { - if !validTypes[r.Type] { - return nil, fmt.Errorf("webhook: route %s: invalid type %q", path, r.Type) - } - if r.Priority != "" && r.Priority != "normal" && r.Priority != "urgent" { - return nil, fmt.Errorf("webhook: route %s: invalid priority %q", path, r.Priority) - } - t, err := template.New(path).Option("missingkey=zero").Parse(r.PayloadTemplate) - if err != nil { - return nil, fmt.Errorf("webhook: route %s: parse template: %w", path, err) - } - tmpls[path] = t - } - return &Source{ - cfg: cfg, - logger: logger.With("source", "webhook", "listen", cfg.Listen), - tmpls: tmpls, - startedAt: time.Now(), - }, nil -} - -func (s *Source) Name() string { return "webhook" } - -// Run blocks until ctx is canceled. -func (s *Source) Run(ctx context.Context, emit source.Emit) error { - mux := http.NewServeMux() - mux.HandleFunc("/health", s.health) - mux.HandleFunc("/", s.handler(emit)) - - srv := &http.Server{ - Addr: s.cfg.Listen, - Handler: mux, - ReadHeaderTimeout: 5 * time.Second, - } - - listenErr := make(chan error, 1) - go func() { - s.logger.Info("listening") - listenErr <- srv.ListenAndServe() - }() - - select { - case <-ctx.Done(): - shutCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - _ = srv.Shutdown(shutCtx) - return ctx.Err() - case err := <-listenErr: - if errors.Is(err, http.ErrServerClosed) { - return nil - } - return fmt.Errorf("webhook: serve: %w", err) - } -} - -func (s *Source) handler(emit source.Emit) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - s.received.Add(1) - - if r.Method != http.MethodPost { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - route, ok := s.cfg.Routes[r.URL.Path] - if !ok { - http.Error(w, "no route", http.StatusNotFound) - return - } - body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) // 1 MiB cap - if err != nil { - s.errors.Add(1) - s.logger.Warn("read body", "path", r.URL.Path, "err", err) - http.Error(w, "read failed", http.StatusBadRequest) - return - } - var data map[string]any - if len(bytes.TrimSpace(body)) > 0 { - if err := json.Unmarshal(body, &data); err != nil { - s.errors.Add(1) - s.logger.Warn("body not JSON", "path", r.URL.Path, "err", err) - http.Error(w, "body must be JSON", http.StatusBadRequest) - return - } - } else { - data = map[string]any{} - } - - var rendered bytes.Buffer - if err := s.tmpls[r.URL.Path].Execute(&rendered, data); err != nil { - s.errors.Add(1) - s.logger.Warn("template render", "path", r.URL.Path, "err", err) - http.Error(w, "render failed", http.StatusInternalServerError) - return - } - priority := route.Priority - if priority == "" { - priority = "normal" - } - ev := &inbox.Event{ - From: "collector", - Type: route.Type, - Priority: priority, - Payload: rendered.String(), - Source: "webhook:" + r.URL.Path, - } - if err := emit(route.Recipient, ev); err != nil { - s.errors.Add(1) - s.logger.Error("emit", "path", r.URL.Path, "err", err) - http.Error(w, "emit failed", http.StatusInternalServerError) - return - } - s.emitted.Add(1) - w.WriteHeader(http.StatusAccepted) - _, _ = io.WriteString(w, "ok\n") - } -} - -// Stats returns the current counter snapshot. Safe to call from any goroutine. -type Stats struct { - Received uint64 `json:"received"` - Emitted uint64 `json:"emitted"` - Errors uint64 `json:"errors"` - UptimeSec int64 `json:"uptime_sec"` -} - -func (s *Source) Stats() Stats { - return Stats{ - Received: s.received.Load(), - Emitted: s.emitted.Load(), - Errors: s.errors.Load(), - UptimeSec: int64(time.Since(s.startedAt).Seconds()), - } -} - -func (s *Source) health(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(s.Stats()) -} - -// validTypes mirrors agent-ping. Kept here (not imported from dropfolder) so -// the webhook source has no cross-source dependency. -var validTypes = map[string]bool{ - "INFO": true, - "NEEDS-RESPONSE": true, - "ACK-REQUEST": true, -} - -// isLoopback returns true if addr binds only to a loopback interface. -// Accepts "127.0.0.1:port", "[::1]:port", "localhost:port". -func isLoopback(addr string) bool { - host, _, err := net.SplitHostPort(addr) - if err != nil { - return false - } - if host == "localhost" { - return true - } - ip := net.ParseIP(host) - return ip != nil && ip.IsLoopback() -} - diff --git a/internal/source/webhook/webhook_test.go b/internal/source/webhook/webhook_test.go deleted file mode 100644 index 0eb47e8..0000000 --- a/internal/source/webhook/webhook_test.go +++ /dev/null @@ -1,323 +0,0 @@ -package webhook - -import ( - "context" - "encoding/json" - "io" - "log/slog" - "net" - "net/http" - "strings" - "sync" - "testing" - "time" - - "git.botbought.ai/foreman/agent-watcher/internal/inbox" -) - -func quietLogger() *slog.Logger { - return slog.New(slog.NewTextHandler(io.Discard, nil)) -} - -// freePort returns an unused loopback "127.0.0.1:N" address. -func freePort(t *testing.T) string { - t.Helper() - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - addr := l.Addr().String() - l.Close() - return addr -} - -type recordingEmit struct { - mu sync.Mutex - events []record - err error -} - -type record struct { - recipient string - event inbox.Event -} - -func (r *recordingEmit) emit(recipient string, ev *inbox.Event) error { - r.mu.Lock() - defer r.mu.Unlock() - if r.err != nil { - return r.err - } - r.events = append(r.events, record{recipient, *ev}) - return nil -} - -func (r *recordingEmit) snap() []record { - r.mu.Lock() - defer r.mu.Unlock() - out := make([]record, len(r.events)) - copy(out, r.events) - return out -} - -func runSource(t *testing.T, cfg Config, emit func(string, *inbox.Event) error) (string, context.CancelFunc) { - t.Helper() - src, err := New(cfg, quietLogger()) - if err != nil { - t.Fatal(err) - } - ctx, cancel := context.WithCancel(context.Background()) - go src.Run(ctx, emit) - // Wait for listener. - deadline := time.Now().Add(2 * time.Second) - for time.Now().Before(deadline) { - c, err := net.DialTimeout("tcp", cfg.Listen, 50*time.Millisecond) - if err == nil { - c.Close() - return cfg.Listen, cancel - } - time.Sleep(20 * time.Millisecond) - } - cancel() - t.Fatal("listener never came up") - return "", nil -} - -func TestNew_RejectsNonLoopback(t *testing.T) { - _, err := New(Config{Listen: "0.0.0.0:18790"}, quietLogger()) - if err == nil { - t.Error("expected error for non-loopback") - } -} - -func TestNew_RejectsBadType(t *testing.T) { - _, err := New(Config{ - Listen: "127.0.0.1:0", - Routes: map[string]Route{ - "/x": {Recipient: "r", Type: "BOGUS", PayloadTemplate: "x"}, - }, - }, quietLogger()) - if err == nil { - t.Error("expected error for bad type") - } -} - -func TestNew_RejectsBadTemplate(t *testing.T) { - _, err := New(Config{ - Listen: "127.0.0.1:0", - Routes: map[string]Route{ - "/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "{{ .unclosed"}, - }, - }, quietLogger()) - if err == nil { - t.Error("expected error for bad template") - } -} - -func TestPOST_RoutesAndEmits(t *testing.T) { - addr := freePort(t) - cfg := Config{ - Listen: addr, - Routes: map[string]Route{ - "/forgejo/push": { - Recipient: "bob", - Type: "INFO", - PayloadTemplate: "forgejo push to {{ .repo }} by {{ .actor }}", - }, - }, - } - rec := &recordingEmit{} - _, cancel := runSource(t, cfg, rec.emit) - defer cancel() - - body := `{"repo":"agent-ping","actor":"angus"}` - resp, err := http.Post("http://"+addr+"/forgejo/push", "application/json", strings.NewReader(body)) - if err != nil { - t.Fatal(err) - } - resp.Body.Close() - if resp.StatusCode != http.StatusAccepted { - t.Errorf("status = %d, want 202", resp.StatusCode) - } - - got := rec.snap() - if len(got) != 1 { - t.Fatalf("got %d events, want 1", len(got)) - } - if got[0].recipient != "bob" { - t.Errorf("recipient = %q", got[0].recipient) - } - if got[0].event.Type != "INFO" { - t.Errorf("type = %q", got[0].event.Type) - } - if got[0].event.Priority != "normal" { - t.Errorf("priority default = %q", got[0].event.Priority) - } - if got[0].event.Payload != "forgejo push to agent-ping by angus" { - t.Errorf("payload = %q", got[0].event.Payload) - } - if got[0].event.Source != "webhook:/forgejo/push" { - t.Errorf("source = %q", got[0].event.Source) - } -} - -func TestPOST_PriorityAndEmptyBody(t *testing.T) { - addr := freePort(t) - cfg := Config{ - Listen: addr, - Routes: map[string]Route{ - "/alert": { - Recipient: "bob", - Type: "NEEDS-RESPONSE", - Priority: "urgent", - PayloadTemplate: "alert fired", - }, - }, - } - rec := &recordingEmit{} - _, cancel := runSource(t, cfg, rec.emit) - defer cancel() - - resp, err := http.Post("http://"+addr+"/alert", "application/json", strings.NewReader("")) - if err != nil { - t.Fatal(err) - } - resp.Body.Close() - if resp.StatusCode != http.StatusAccepted { - t.Errorf("status = %d", resp.StatusCode) - } - got := rec.snap()[0] - if got.event.Priority != "urgent" { - t.Errorf("priority = %q", got.event.Priority) - } - if got.event.Payload != "alert fired" { - t.Errorf("payload = %q", got.event.Payload) - } -} - -func TestPOST_404OnUnknownRoute(t *testing.T) { - addr := freePort(t) - cfg := Config{Listen: addr, Routes: map[string]Route{}} - rec := &recordingEmit{} - _, cancel := runSource(t, cfg, rec.emit) - defer cancel() - - resp, err := http.Post("http://"+addr+"/nope", "application/json", strings.NewReader("{}")) - if err != nil { - t.Fatal(err) - } - resp.Body.Close() - if resp.StatusCode != http.StatusNotFound { - t.Errorf("status = %d, want 404", resp.StatusCode) - } - if len(rec.snap()) != 0 { - t.Error("emitted on unknown route") - } -} - -func TestPOST_400OnBadJSON(t *testing.T) { - addr := freePort(t) - cfg := Config{ - Listen: addr, - Routes: map[string]Route{ - "/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"}, - }, - } - rec := &recordingEmit{} - _, cancel := runSource(t, cfg, rec.emit) - defer cancel() - - resp, err := http.Post("http://"+addr+"/x", "application/json", strings.NewReader("not json")) - if err != nil { - t.Fatal(err) - } - resp.Body.Close() - if resp.StatusCode != http.StatusBadRequest { - t.Errorf("status = %d, want 400", resp.StatusCode) - } -} - -func TestPOST_500OnEmitFailure(t *testing.T) { - addr := freePort(t) - cfg := Config{ - Listen: addr, - Routes: map[string]Route{ - "/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"}, - }, - } - rec := &recordingEmit{err: errOnce()} - _, cancel := runSource(t, cfg, rec.emit) - defer cancel() - - resp, err := http.Post("http://"+addr+"/x", "application/json", strings.NewReader("{}")) - if err != nil { - t.Fatal(err) - } - resp.Body.Close() - if resp.StatusCode != http.StatusInternalServerError { - t.Errorf("status = %d, want 500", resp.StatusCode) - } -} - -func TestGET_MethodNotAllowed(t *testing.T) { - addr := freePort(t) - cfg := Config{ - Listen: addr, - Routes: map[string]Route{ - "/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"}, - }, - } - rec := &recordingEmit{} - _, cancel := runSource(t, cfg, rec.emit) - defer cancel() - - resp, err := http.Get("http://" + addr + "/x") - if err != nil { - t.Fatal(err) - } - resp.Body.Close() - if resp.StatusCode != http.StatusMethodNotAllowed { - t.Errorf("status = %d", resp.StatusCode) - } -} - -func TestGET_HealthEndpoint(t *testing.T) { - addr := freePort(t) - cfg := Config{ - Listen: addr, - Routes: map[string]Route{ - "/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"}, - }, - } - rec := &recordingEmit{} - _, cancel := runSource(t, cfg, rec.emit) - defer cancel() - - // Two POSTs: 1 success, 1 fail. - http.Post("http://"+addr+"/x", "application/json", strings.NewReader("{}")) - http.Post("http://"+addr+"/wrongpath", "application/json", strings.NewReader("{}")) - - resp, err := http.Get("http://" + addr + "/health") - if err != nil { - t.Fatal(err) - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - t.Fatalf("status = %d", resp.StatusCode) - } - var stats Stats - if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil { - t.Fatal(err) - } - if stats.Received < 2 { - t.Errorf("received = %d", stats.Received) - } - if stats.Emitted != 1 { - t.Errorf("emitted = %d, want 1", stats.Emitted) - } -} - -type emitErr struct{} - -func (e *emitErr) Error() string { return "emit boom" } -func errOnce() error { return &emitErr{} } diff --git a/systemd/agent-watcher.service b/systemd/agent-watcher.service deleted file mode 100644 index bd3f291..0000000 --- a/systemd/agent-watcher.service +++ /dev/null @@ -1,26 +0,0 @@ -[Unit] -Description=agent-watcher Collector — converts external events to ping inbox writes -Documentation=https://git.botbought.ai/foreman/agent-watcher -After=network-online.target -Wants=network-online.target - -[Service] -Type=simple -ExecStart=%h/.local/bin/agent-watcher --json-log -Restart=on-failure -RestartSec=5 -# small daemon; no need for elevated limits -LimitNOFILE=4096 -# read-only by intent; the daemon writes only to the inbox dir which is -# inside $HOME and unaffected by ProtectSystem. -ProtectSystem=strict -ProtectHome=read-write -PrivateTmp=yes -NoNewPrivileges=yes -# stdout/stderr go to journald automatically; --json-log makes them parseable -StandardOutput=journal -StandardError=journal -SyslogIdentifier=agent-watcher - -[Install] -WantedBy=default.target