Compare commits

..

9 commits

Author SHA1 Message Date
39a95ad49f Merge pull request 'Layer 2 MCP Watcher v0 scaffold' (#2) from bob/mcp-watcher-scaffold into main 2026-05-06 19:26:12 -03:00
f06349eab4 Merge pull request 'Collector v0 (Layer 1)' (#1) from foreman/collector-scaffold into main 2026-05-06 17:47:48 -03:00
bob-boat
3a586f38a9 Address Bob's notes 2 and 3 with documenting comments
Note 2: poll_fallback_seconds==0 silently means 'use default'. Document
in example yaml; no v1 way to disable polling, and we don't think
anyone needs that.

Note 3: Writer.locks grows unbounded. Bounded in practice (<10 agents);
add a comment for the future maintainer who may need to evict.

Notes 1 and 4 left unchanged: missingkey=zero is the friendlier choice
(produces a visible '<no value>' in the inbox rather than a silent
500); fsnotify double-fire is already handled by os.ErrNotExist on
second-read.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 16:47:40 -04:00
bob-boat
4ff8c3f78d Collector milestone 6: packaging — install.sh, systemd unit, docs
systemd/agent-watcher.service: --user unit with on-failure restart,
ProtectSystem=strict, ProtectHome=read-write, NoNewPrivileges=yes,
PrivateTmp=yes. JSON logs to journald. Survives reboot via
'loginctl enable-linger'.

examples/collector.yaml: working starter config for both sources with
inline comments, per-route examples, and the spec §3.1.2 schema for
drop files.

install.sh: idempotent installer following the agent-ping pattern.
Builds the binary, installs it + the unit, drops the example config if
absent, reloads systemd, enables, and (unless --no-start) starts the
service. Adds drop-folder lifecycle artifacts (*.tmp, .dead-letter/)
to workspace .stignore so they don't replicate during processing.
Skips Syncthing-related steps gracefully when ~/Nyx/workspace is not
present.

INSTALL.md: prerequisites, install, configure, verify (drop-file +
webhook end-to-end probes), survive-logout, uninstall, troubleshooting
table.

README.md: rewritten to reflect actual status — v0 working with 43
tests, packaging ready, Layer 2 in progress on Bob's side.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 16:26:16 -04:00
bob-boat
e7d4ea036a Collector milestone 5: end-to-end integration tests
cmd/agent-watcher/main_test.go builds the real binary in TestMain, then
launches it twice with temp configs to exercise the full path:

TestEndToEnd_BothSourcesEmitToInbox
  - drops a *.json file via tmp+rename (mirrors Syncthing semantics)
  - POSTs a webhook with template variables ({{ .repo }}, {{ .actor }})
  - POSTs a urgent alert with empty body and fixed-string template
  - asserts 3 JSONL lines land in bob.inbox with exact shape
  - confirms each event's source field tracks origin
    ("drop-folder:drop1.json", "webhook:/forgejo/push")
  - hits /health and verifies emitted=2 (one webhook didn't 200, that
    counter only counts successful emits)

TestEndToEnd_GracefulShutdown
  - SIGTERM after listener up
  - asserts process exits within 3s

Total: 43 tests across 5 packages, all passing. Real binary verified
end-to-end on Linux/amd64.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 16:23:34 -04:00
bob-boat
2183850c03 Collector milestone 4: config loader + main wiring (binary builds)
internal/config loads ~/.config/agent-watcher/collector.yaml with strict
validation (KnownFields=true, so typos fail loud), applies sensible
defaults, and expands ~/ in path fields. Either source can be omitted
but at least one must be configured.

cmd/agent-watcher is the entry point: load config, build inbox.Writer,
build configured sources, run them concurrently with a shared Emit
closure, wait for SIGINT/SIGTERM, shut down. Logs to stderr — text by
default, JSON via --json-log for journald structured fields per spec
§3.4.

SIGHUP reload is a v2 item; for now restart the systemd unit to pick
up config changes.

10 config tests passing — full happy path, defaults applied, ~/
expansion, and a table of 9 invalid configs that must all reject
(missing agent, no sources, empty webhook routes, route missing
recipient/type/template, route path without leading /, unknown
top-level field, negative poll seconds).

Binary builds clean: 10.8M single static binary on Linux/amd64.
go.mod stays at Go 1.22 to match VPS toolchain.

Total: 41 tests across 4 packages, all passing. Build clean. go vet
clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 16:22:03 -04:00
bob-boat
ba6db7c82f Collector milestone 3: HTTP webhook source + /health
internal/source/webhook routes inbound POSTs to inbox events via a
configured table. Each route specifies recipient, type, priority, and
a Go text/template payload renderer that consumes the request body
decoded as JSON.

v1 binds loopback only — New() rejects non-loopback addresses at
construction. Caddy + bearer-token reverse-proxy is the v2 upgrade
path per spec §4.

Behavior:
- POST + matched route + valid JSON body → render template, emit, 202
- Missing route → 404
- Wrong method → 405
- Bad JSON → 400
- Template render failure → 500
- Emit failure → 500 (caller responsible for retry; HTTP source has no
  durable staging)
- Empty body → empty data map for template (lets fixed-string templates
  work without sending {})
- 1 MiB request body cap

GET /health returns JSON Stats{received, emitted, errors, uptime_sec}
on the same listener for journalctl correlation per spec §3.5.

10 tests passing — non-loopback rejection, bad type/template
rejection, route+template happy path, priority defaulting, empty body,
404/400/405/500, health endpoint counters.

31 tests across the three internal packages, all passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 16:17:01 -04:00
bob-boat
f9d81471c4 Collector milestone 2: source interface + drop-folder source
source.Source is the contract every Collector input implements: Name +
Run(ctx, emit). Sources don't own state — they convert external events
into emit calls. Dispatcher routes.

internal/source/dropfolder: watches ~/Nyx/workspace/incoming/ for
*.json drop files. fsnotify-driven with periodic poll fallback (default
30s safety net for missed events). Each file:

1. Parsed against the spec §3.1.2 schema with DisallowUnknownFields.
2. Valid → emitted, then file deleted.
3. Invalid (missing fields, bad type/priority, unknown fields, garbage)
   → moved to .dead-letter/ with a sidecar .reason file for forensics.
4. Emit failure → file retained in place for retry (transient errors
   shouldn't be permanent dead-letters).

Also: initial-scan on Run() drains files that landed before the watcher
attached, catching up after a Collector restart.

14 tests in the package — schema validation table for all error cases,
initial-scan, live inotify drop, post-emit delete, dead-letter +
sidecar, emit-failure retention. Plus the 7 inbox tests still passing.

Pinned fsnotify v1.7.0 (Go 1.22 compatible; 1.10.x demanded toolchain
1.23 which isn't in apt yet). go.mod stays at 1.22 to match VPS.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 16:14:03 -04:00
bob-boat
50e8ece83d Collector milestone 1: inbox writer + tests
Layer 1 keystone. The internal/inbox package writes ping-shaped JSONL
events to recipient inbox files in a format bit-identical to the
agent-ping CLI's output, so the existing UserPromptSubmit hook and the
future MCP Watcher cannot tell whether a line came from `ping` or the
Collector.

- O_APPEND opens for atomic line writes (POSIX guarantees writes <=
  PIPE_BUF, our lines are well under).
- Per-recipient sync.Mutex bounds contention; multiple goroutines
  writing to one inbox stay correctly serialized.
- 7 tests passing: shape, ID/TS preservation, omitempty for optional
  fields, key-set + compactness match against ping CLI's separators=
  (",",":") output, 100-goroutine concurrent-write torn-line check,
  bad-input rejection, empty-dir rejection.

go.mod at git.botbought.ai/foreman/agent-watcher; module name matches
the public Forgejo path so eventual consumers can `go get` it.

Next milestones:
- Source plugin interface
- Drop folder source (inotify, via fsnotify)
- HTTP webhook source
- Config loader (YAML)
- main.go wiring
- systemd unit

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 16:09:56 -04:00
18 changed files with 2588 additions and 6 deletions

95
INSTALL.md Normal file
View file

@ -0,0 +1,95 @@
# INSTALL — agent-watcher Collector (Layer 1)
The MCP Watcher (Layer 2) has its own install path; this file covers Layer 1 only.
## Prerequisites
- Linux with systemd
- Go 1.22+ (`sudo apt install golang-go` on Ubuntu/Mint)
- An existing agent-ping install on this host (the Collector writes to its inbox dir)
## Install
Per CLAUDE.md rule #2, **Angus runs the install commands** — agents do not modify their own configuration.
```sh
git clone https://git.botbought.ai/foreman/agent-watcher ~/agent-watcher
cd ~/agent-watcher
./install.sh
```
The script builds, installs to `~/.local/bin/agent-watcher`, drops the systemd unit into `~/.config/systemd/user/`, copies the example config to `~/.config/agent-watcher/collector.yaml` (if absent), reloads systemd, and starts the service.
To install without auto-starting (so you can edit the config first):
```sh
./install.sh --no-start
```
## Configure
Edit `~/.config/agent-watcher/collector.yaml`. The example has both sources configured for `agent: foreman` with placeholder routes — adapt to this host. At least one of `webhook:` or `drop_folder:` must be configured.
After editing:
```sh
systemctl --user restart agent-watcher
```
## Verify
```sh
# logs
journalctl --user -u agent-watcher -f
# health
curl http://127.0.0.1:18790/health
# end-to-end via drop folder
echo '{"recipient":"bob","type":"INFO","payload":"hello from drop"}' \
> ~/Nyx/workspace/incoming/test.json
# end-to-end via webhook
curl -X POST http://127.0.0.1:18790/forgejo/push \
-H 'Content-Type: application/json' \
-d '{"repo":"agent-ping","actor":"angus"}'
# the lines should appear in the recipient's inbox
tail -2 ~/Nyx/workspace/pings/bob.inbox
```
## Survive logout
`systemctl --user` units stop when the user logs out. To keep the Collector running across logouts:
```sh
sudo loginctl enable-linger $USER
```
## Uninstall
```sh
systemctl --user disable --now agent-watcher
rm ~/.local/bin/agent-watcher
rm ~/.config/systemd/user/agent-watcher.service
systemctl --user daemon-reload
# config and example left behind; remove if desired:
# rm -rf ~/.config/agent-watcher
```
## Troubleshooting
| Symptom | Likely cause | Fix |
|---|---|---|
| `journalctl` shows `config: ... is required` | YAML field missing | Match the example, save, restart. |
| `systemctl --user status` shows `address already in use` | port 18790 taken by something else | Edit `webhook.listen` to a free port. |
| Drop file disappears but no inbox line | check `.dead-letter/` for the file + `.reason` sidecar | Schema validation failed — fix the producer. |
| Webhook returns 404 | path doesn't match a configured route | Routes must match exactly; check trailing slashes. |
| Service won't start across reboot | `linger` not enabled | `sudo loginctl enable-linger $USER` |
| Drop file syncs back from another host before processing | drop folder is in Syncthing scope | This is intended (lets producers on other hosts deliver). Lifecycle artifacts (`.tmp`, `.dead-letter/`) are excluded by `install.sh`. |
## What this does NOT install
- The agent-ping system. Install that first (`~/agent-ping/install.sh <name>`).
- Layer 2 (MCP Watcher). Different binary, different runtime, different unit — separate install when it lands.
- Caddy / reverse-proxy webhook auth. v1 is loopback-only; v2 will document the upgrade path.

View file

@ -1,13 +1,13 @@
# agent-watcher
Push-delivery layer for [`agent-ping`](http://localhost:3300/angus/agent-ping). The "secondary nervous system" for Claude Code agents on this network.
Push-delivery layer for [`agent-ping`](https://git.botbought.ai/foreman/agent-ping). The "secondary nervous system" for Claude Code agents on this network.
`agent-ping` queues messages in inbox files; `agent-watcher` notices them (and other external events) and wakes the recipient agent without a human in the loop.
Two layers:
- **Collector** — small Go daemon, `systemd --user`, always on, brain-blind. Converts external events (HTTP webhooks, drop-folder file arrivals) into ping inbox writes. Runs whether or not any agent is alive.
- **MCP Watcher** — Claude Code MCP subprocess, declared in each agent's `mcp.json`. Watches the agent's inbox via inotify and surfaces events into the live session via Channels (research preview). Provides reply tools (`ack`, `respond`, `mark_handled`).
- **Collector** (this repo, Go) — small daemon under `systemd --user`. Always on, brain-blind. Converts external events (HTTP webhooks, drop-folder file arrivals) into ping inbox writes. Runs whether or not any agent is alive.
- **MCP Watcher** (Python, in progress) — Claude Code MCP subprocess declared in each agent's `mcp.json`. Watches the agent's inbox via inotify and surfaces events into the live session via Channels. Provides reply tools (`ack`, `respond`, `mark_handled`).
Filesystem is the queue. OpenBrain is not involved.
@ -15,10 +15,42 @@ Filesystem is the queue. OpenBrain is not involved.
[`spec/agent-watcher.md`](spec/agent-watcher.md). Read that for architecture, decisions, scope.
Channels reference docs (snapshot of Anthropic's official docs, used by Layer 2): [`docs/channels/`](docs/channels/).
## Status
v1 spec signed off by Bob (VPS) 2026-05-06. Implementation pending.
| Layer | Lane | Status |
|---|---|---|
| Spec v1 | — | Signed off by Bob 2026-05-06 |
| Layer 1: Collector | Foreman / Go | **v0 working: 43 tests passing.** End-to-end exercised; binary builds. systemd unit + INSTALL.md ready. |
| Layer 2: MCP Watcher | Bob / Python | In progress — sandbox CC session being set up on VPS for testing. |
## Install
## Install (Layer 1)
Per CLAUDE.md rule #2, **Angus runs the install commands** — agents do not modify their own configuration. Install script will land alongside `INSTALL.md` once the binaries are built.
```sh
git clone https://git.botbought.ai/foreman/agent-watcher ~/agent-watcher
cd ~/agent-watcher
./install.sh
```
Then edit `~/.config/agent-watcher/collector.yaml` and `systemctl --user restart agent-watcher`.
See [`INSTALL.md`](INSTALL.md) for verify steps, troubleshooting, and the `loginctl enable-linger` step required to keep the daemon running across logouts.
Per CLAUDE.md rule #2, **Angus runs the install commands** — agents do not modify their own configuration.
## Quick reference (Layer 1)
```
Inputs Output
───────── ──────
HTTP POST → port 18790 ┐
(routed via │ <recipient>.inbox (JSONL, ping-shaped)
YAML table) │ identical format to
├─→ what `ping <recipient> <payload>` writes;
File drop in │ the existing UserPromptSubmit hook and the
~/Nyx/workspace/incoming/ │ future MCP Watcher consume the stream
*.json ┘ without distinguishing source.
```
`/health` on the same webhook port returns `{received, emitted, errors, uptime_sec}` for journalctl correlation.

148
cmd/agent-watcher/main.go Normal file
View file

@ -0,0 +1,148 @@
// Command agent-watcher is the Collector daemon.
//
// It loads a YAML config, builds the configured sources, and runs them
// concurrently. Each source converts external events (webhook POSTs, files
// dropped into a folder) into JSONL writes against the configured inbox
// directory. Output is bit-identical to the agent-ping CLI's writes, so the
// existing UserPromptSubmit hook (and the future MCP Watcher) consume the
// stream without distinguishing source.
//
// Default config path: ~/.config/agent-watcher/collector.yaml. Override with
// --config or AGENT_WATCHER_CONFIG.
//
// Signals: SIGINT and SIGTERM trigger graceful shutdown. SIGHUP reload is a
// v2 item — for now restart the unit to pick up config changes.
package main
import (
"context"
"flag"
"fmt"
"log/slog"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"git.botbought.ai/foreman/agent-watcher/internal/config"
"git.botbought.ai/foreman/agent-watcher/internal/inbox"
"git.botbought.ai/foreman/agent-watcher/internal/source"
"git.botbought.ai/foreman/agent-watcher/internal/source/dropfolder"
"git.botbought.ai/foreman/agent-watcher/internal/source/webhook"
)
func main() {
var (
configPath = flag.String("config", "", "path to collector.yaml (default: $AGENT_WATCHER_CONFIG or ~/.config/agent-watcher/collector.yaml)")
jsonLog = flag.Bool("json-log", false, "emit logs as JSON (for journald structured fields)")
)
flag.Parse()
logger := newLogger(*jsonLog)
path := resolveConfigPath(*configPath)
cfg, err := config.Load(path)
if err != nil {
logger.Error("config", "err", err)
os.Exit(2)
}
logger = logger.With("agent", cfg.Agent)
logger.Info("config loaded", "path", path, "inbox_dir", cfg.InboxDir)
if err := run(cfg, logger); err != nil {
logger.Error("run", "err", err)
os.Exit(1)
}
}
func run(cfg *config.Config, logger *slog.Logger) error {
w, err := inbox.NewWriter(cfg.InboxDir)
if err != nil {
return fmt.Errorf("inbox: %w", err)
}
emit := func(recipient string, ev *inbox.Event) error {
return w.Write(recipient, ev)
}
sources, err := buildSources(cfg, logger)
if err != nil {
return err
}
if len(sources) == 0 {
return fmt.Errorf("no sources configured")
}
ctx, cancel := signalContext()
defer cancel()
var wg sync.WaitGroup
for _, src := range sources {
src := src
wg.Add(1)
go func() {
defer wg.Done()
logger.Info("source starting", "name", src.Name())
if err := src.Run(ctx, emit); err != nil && err != context.Canceled {
logger.Error("source exited", "name", src.Name(), "err", err)
} else {
logger.Info("source stopped", "name", src.Name())
}
}()
}
wg.Wait()
return nil
}
func buildSources(cfg *config.Config, logger *slog.Logger) ([]source.Source, error) {
var out []source.Source
if w := cfg.Sources.Webhook; w != nil {
s, err := webhook.New(webhook.Config{
Listen: w.Listen,
Routes: w.ToWebhookRouteMap(),
}, logger)
if err != nil {
return nil, fmt.Errorf("webhook: %w", err)
}
out = append(out, s)
}
if d := cfg.Sources.DropFolder; d != nil {
out = append(out, dropfolder.New(dropfolder.Config{
Path: d.Path,
PollFallbackSeconds: d.PollFallbackSeconds,
}, logger))
}
return out, nil
}
func signalContext() (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
cancel()
}()
return ctx, cancel
}
func resolveConfigPath(flagPath string) string {
if flagPath != "" {
return flagPath
}
if env := os.Getenv("AGENT_WATCHER_CONFIG"); env != "" {
return env
}
home, _ := os.UserHomeDir()
return filepath.Join(home, ".config", "agent-watcher", "collector.yaml")
}
func newLogger(asJSON bool) *slog.Logger {
var h slog.Handler
if asJSON {
h = slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})
} else {
h = slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})
}
return slog.New(h)
}

View file

@ -0,0 +1,299 @@
package main
import (
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"syscall"
"testing"
"time"
)
// TestMain builds the agent-watcher binary once for the integration tests.
func TestMain(m *testing.M) {
bin, err := buildBinary()
if err != nil {
fmt.Fprintln(os.Stderr, "build failed:", err)
os.Exit(1)
}
binaryPath = bin
code := m.Run()
os.Remove(bin)
os.Exit(code)
}
var binaryPath string
func buildBinary() (string, error) {
dir, err := os.MkdirTemp("", "agent-watcher-bin-*")
if err != nil {
return "", err
}
out := filepath.Join(dir, "agent-watcher")
cmd := exec.Command("go", "build", "-o", out, ".")
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return "", err
}
return out, nil
}
// freePort returns an unused loopback "127.0.0.1:N" port.
func freePort(t *testing.T) string {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
addr := l.Addr().String()
l.Close()
return addr
}
// startBinary launches agent-watcher with the given config file and returns
// the running cmd. The test is responsible for sending SIGINT/SIGTERM.
func startBinary(t *testing.T, configPath string) *exec.Cmd {
t.Helper()
cmd := exec.Command(binaryPath, "--config", configPath)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if cmd.Process != nil {
cmd.Process.Signal(syscall.SIGTERM)
cmd.Wait()
}
})
return cmd
}
// waitFor polls predicate until true or timeout.
func waitFor(t *testing.T, name string, pred func() bool, timeout time.Duration) {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if pred() {
return
}
time.Sleep(20 * time.Millisecond)
}
t.Fatalf("waitFor %q: predicate did not become true within %s", name, timeout)
}
func waitForListener(t *testing.T, addr string, timeout time.Duration) {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
c, err := net.DialTimeout("tcp", addr, 50*time.Millisecond)
if err == nil {
c.Close()
return
}
time.Sleep(20 * time.Millisecond)
}
t.Fatalf("listener at %s never came up", addr)
}
func readInbox(t *testing.T, path string) []map[string]any {
t.Helper()
body, err := os.ReadFile(path)
if err != nil {
return nil
}
out := []map[string]any{}
for _, line := range strings.Split(strings.TrimRight(string(body), "\n"), "\n") {
if line == "" {
continue
}
var m map[string]any
if err := json.Unmarshal([]byte(line), &m); err != nil {
t.Errorf("non-JSON line: %s", line)
continue
}
out = append(out, m)
}
return out
}
// TestEndToEnd_BothSourcesEmitToInbox builds and runs the real binary,
// exercises both webhook and drop-folder sources, and verifies the inbox
// file contains one bit-identical JSONL line per emitted event.
func TestEndToEnd_BothSourcesEmitToInbox(t *testing.T) {
tmp := t.TempDir()
inboxDir := filepath.Join(tmp, "pings")
dropDir := filepath.Join(tmp, "incoming")
configPath := filepath.Join(tmp, "collector.yaml")
addr := freePort(t)
cfg := fmt.Sprintf(`
agent: foreman
inbox_dir: %s
sources:
webhook:
listen: %s
routes:
/forgejo/push:
recipient: bob
type: INFO
payload_template: "push {{ .repo }} by {{ .actor }}"
/alert:
recipient: bob
type: NEEDS-RESPONSE
priority: urgent
payload_template: "fixed alert"
drop_folder:
path: %s
poll_fallback_seconds: 0
`, inboxDir, addr, dropDir)
if err := os.WriteFile(configPath, []byte(cfg), 0644); err != nil {
t.Fatal(err)
}
startBinary(t, configPath)
waitForListener(t, addr, 3*time.Second)
// 1) drop a file
dropContent := `{"recipient":"bob","type":"INFO","payload":"from drop"}`
dropTmp := filepath.Join(dropDir, "drop1.json.tmp")
if err := os.WriteFile(dropTmp, []byte(dropContent), 0644); err != nil {
t.Fatal(err)
}
if err := os.Rename(dropTmp, filepath.Join(dropDir, "drop1.json")); err != nil {
t.Fatal(err)
}
// 2) post a webhook
resp, err := http.Post("http://"+addr+"/forgejo/push", "application/json",
strings.NewReader(`{"repo":"agent-ping","actor":"angus"}`))
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("webhook status = %d", resp.StatusCode)
}
// 3) post a urgent alert
resp2, err := http.Post("http://"+addr+"/alert", "application/json", strings.NewReader(""))
if err != nil {
t.Fatal(err)
}
resp2.Body.Close()
// Wait for 3 lines in bob's inbox.
bobInbox := filepath.Join(inboxDir, "bob.inbox")
waitFor(t, "3 inbox lines", func() bool {
return len(readInbox(t, bobInbox)) == 3
}, 3*time.Second)
got := readInbox(t, bobInbox)
payloads := map[string]map[string]any{}
for _, m := range got {
payloads[m["payload"].(string)] = m
}
// drop event
drop := payloads["from drop"]
if drop == nil {
t.Fatalf("missing drop event; got: %+v", got)
}
if drop["type"] != "INFO" || drop["from"] != "collector" {
t.Errorf("drop event shape: %+v", drop)
}
if !strings.HasPrefix(drop["source"].(string), "drop-folder:drop1.json") {
t.Errorf("drop source: %v", drop["source"])
}
// webhook push event
push := payloads["push agent-ping by angus"]
if push == nil {
t.Fatalf("missing push event; got: %+v", got)
}
if push["type"] != "INFO" || push["priority"] != "normal" {
t.Errorf("push event shape: %+v", push)
}
if push["source"] != "webhook:/forgejo/push" {
t.Errorf("push source: %v", push["source"])
}
// alert event with priority urgent
alert := payloads["fixed alert"]
if alert == nil {
t.Fatalf("missing alert event; got: %+v", got)
}
if alert["priority"] != "urgent" || alert["type"] != "NEEDS-RESPONSE" {
t.Errorf("alert event shape: %+v", alert)
}
// Health endpoint is wired and returns sane counters.
resp3, err := http.Get("http://" + addr + "/health")
if err != nil {
t.Fatal(err)
}
defer resp3.Body.Close()
var stats map[string]any
if err := json.NewDecoder(resp3.Body).Decode(&stats); err != nil {
t.Fatal(err)
}
if int(stats["emitted"].(float64)) != 2 {
t.Errorf("health emitted = %v, want 2", stats["emitted"])
}
}
// TestEndToEnd_GracefulShutdown ensures SIGTERM stops the binary cleanly.
func TestEndToEnd_GracefulShutdown(t *testing.T) {
tmp := t.TempDir()
configPath := filepath.Join(tmp, "collector.yaml")
addr := freePort(t)
cfg := fmt.Sprintf(`
agent: foreman
inbox_dir: %s/pings
sources:
webhook:
listen: %s
routes:
/x:
recipient: r
type: INFO
payload_template: ok
`, tmp, addr)
if err := os.WriteFile(configPath, []byte(cfg), 0644); err != nil {
t.Fatal(err)
}
cmd := exec.Command(binaryPath, "--config", configPath)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
t.Fatal(err)
}
waitForListener(t, addr, 3*time.Second)
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
t.Fatal(err)
}
done := make(chan error, 1)
go func() { done <- cmd.Wait() }()
select {
case err := <-done:
if err != nil {
// Some Go runtimes return an exitstatus error after signal-driven
// shutdown; that's OK as long as we exited.
if _, ok := err.(*exec.ExitError); !ok {
t.Errorf("unexpected exit error: %v", err)
}
}
case <-time.After(3 * time.Second):
cmd.Process.Kill()
t.Fatal("did not shut down within 3s of SIGTERM")
}
}

55
examples/collector.yaml Normal file
View file

@ -0,0 +1,55 @@
# agent-watcher Collector configuration
#
# Lives at: ~/.config/agent-watcher/collector.yaml
# Override with --config or AGENT_WATCHER_CONFIG.
#
# At least one source (webhook OR drop_folder) must be configured.
# This host's identity. Used in logs only; the inbox writer routes by
# the recipient field on each event, not this.
agent: foreman
# Optional. Where to write <recipient>.inbox files. Default shown.
# inbox_dir: ~/Nyx/workspace/pings
sources:
# HTTP webhook source.
# v1 binds loopback only — Caddy + bearer-token reverse-proxy is the
# v2 upgrade path for accepting webhooks from external producers.
webhook:
listen: 127.0.0.1:18790
routes:
# Path → which inbox to land in, with a Go text/template payload.
# Variables come from the request body decoded as JSON.
/forgejo/push:
recipient: bob
type: INFO
payload_template: "forgejo push to {{ .repo }} by {{ .actor }}"
# Empty-body posts work too — fixed-string templates render without
# any data.
/openrouter/billing-alert:
recipient: bob
type: NEEDS-RESPONSE
priority: urgent
payload_template: "billing alert: {{ .message }}"
# Drop-folder source.
# Watches a directory via inotify for *.json files matching the spec
# §3.1.2 schema:
# {
# "recipient": "bob",
# "type": "INFO" | "NEEDS-RESPONSE" | "ACK-REQUEST",
# "priority": "normal" | "urgent", # optional
# "payload": "...",
# "sentinel": "/path/optional" # optional
# }
# Valid → emit + delete. Invalid → moved to .dead-letter/ with reason.
drop_folder:
path: ~/Nyx/workspace/incoming/
# Safety net if inotify misses an event. 0 (or unset) = use the
# 30-second default. There is no "disable polling" option in v1 —
# if you don't want polling, leave inotify to do its job and ignore
# the cost (a periodic readdir is microseconds).
poll_fallback_seconds: 30

10
go.mod Normal file
View file

@ -0,0 +1,10 @@
module git.botbought.ai/foreman/agent-watcher
go 1.22
require github.com/fsnotify/fsnotify v1.7.0
require (
golang.org/x/sys v0.13.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

7
go.sum Normal file
View file

@ -0,0 +1,7 @@
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

114
install.sh Executable file
View file

@ -0,0 +1,114 @@
#!/usr/bin/env bash
# install.sh — set up the agent-watcher Collector on this machine.
#
# Usage:
# ./install.sh # build + install + enable + start
# ./install.sh --no-start # everything but `systemctl start`
#
# What it does:
# 1. Builds the agent-watcher binary (requires Go 1.22+).
# 2. Installs to ~/.local/bin/agent-watcher.
# 3. Drops the systemd --user unit into ~/.config/systemd/user/.
# 4. Drops a starter ~/.config/agent-watcher/collector.yaml if missing
# (copies examples/collector.yaml; you edit it before first start).
# 5. Reloads systemd, enables, and (unless --no-start) starts the unit.
# 6. Adds drop-folder lifecycle patterns to the workspace .stignore so
# drop files do not Syncthing-replicate during local processing.
#
# Per CLAUDE.md rule #2, this is intended to be run by a human. The
# Collector itself, like agent-ping, never invokes this script.
#
# Layer 2 (MCP Watcher) has its own install path — different language,
# different runtime, different unit. See its README when it lands.
set -euo pipefail
NO_START=0
for arg in "$@"; do
case "$arg" in
--no-start) NO_START=1 ;;
*) echo "unknown arg: $arg" >&2; exit 2 ;;
esac
done
REPO_DIR="$(cd "$(dirname "$0")" && pwd)"
BIN_DIR="$HOME/.local/bin"
CONF_DIR="$HOME/.config/agent-watcher"
UNIT_DIR="$HOME/.config/systemd/user"
WORKSPACE="$HOME/Nyx/workspace"
STIGNORE="$WORKSPACE/.stignore"
echo "agent-watcher install"
echo "repo: $REPO_DIR"
echo
# 1. Check Go
echo "[1/6] checking Go toolchain"
if ! command -v go >/dev/null 2>&1; then
echo " ERROR: 'go' not found. Install Go 1.22+ first." >&2
echo " e.g. sudo apt install golang-go" >&2
exit 1
fi
GO_VERSION=$(go version | awk '{print $3}')
echo " $GO_VERSION"
# 2. Build
echo "[2/6] building agent-watcher binary"
mkdir -p "$BIN_DIR"
( cd "$REPO_DIR" && go build -o "$BIN_DIR/agent-watcher" ./cmd/agent-watcher )
echo " installed: $BIN_DIR/agent-watcher"
# 3. systemd unit
echo "[3/6] installing systemd --user unit"
mkdir -p "$UNIT_DIR"
install -m 0644 "$REPO_DIR/systemd/agent-watcher.service" "$UNIT_DIR/agent-watcher.service"
echo " installed: $UNIT_DIR/agent-watcher.service"
# 4. Config skeleton
echo "[4/6] config skeleton"
mkdir -p "$CONF_DIR"
if [ -f "$CONF_DIR/collector.yaml" ]; then
echo " $CONF_DIR/collector.yaml exists — leaving as-is"
else
install -m 0644 "$REPO_DIR/examples/collector.yaml" "$CONF_DIR/collector.yaml"
echo " installed example: $CONF_DIR/collector.yaml — EDIT BEFORE START"
fi
# 5. .stignore — drop folder is replicated, but lifecycle artifacts shouldn't be
echo "[5/6] .stignore (drop-folder lifecycle artifacts are local-only)"
if [ -d "$WORKSPACE" ]; then
touch "$STIGNORE"
for pattern in "incoming/.dead-letter/" "incoming/*.json.tmp"; do
if ! grep -qxF "$pattern" "$STIGNORE"; then
echo "$pattern" >> "$STIGNORE"
echo " added: $pattern"
fi
done
else
echo " $WORKSPACE not present — skipping (Syncthing not set up here)"
fi
# 6. systemd reload + enable + (optionally) start
echo "[6/6] systemd reload + enable"
systemctl --user daemon-reload
systemctl --user enable agent-watcher.service >/dev/null
echo " enabled at user login (loginctl enable-linger required to run when logged out)"
if [ "$NO_START" -eq 0 ]; then
echo " starting…"
systemctl --user restart agent-watcher.service
sleep 1
systemctl --user status agent-watcher.service --no-pager -n 5 || true
else
echo " --no-start: not starting; run 'systemctl --user start agent-watcher' when ready"
fi
echo
echo "installation complete."
echo
echo "next steps:"
echo " 1. EDIT $CONF_DIR/collector.yaml to suit this host."
echo " 2. systemctl --user restart agent-watcher (after edits)."
echo " 3. journalctl --user -u agent-watcher -f (watch logs)."
echo " 4. curl http://127.0.0.1:18790/health (sanity check)."
echo " 5. To survive logout: loginctl enable-linger \$USER"

179
internal/config/config.go Normal file
View file

@ -0,0 +1,179 @@
// Package config loads the Collector's per-host YAML configuration.
//
// Config layout (spec §3.2):
//
// agent: foreman # this host's identity
// inbox_dir: ~/Nyx/workspace/pings # optional; default shown
// sources:
// webhook:
// listen: 127.0.0.1:18790
// routes:
// /forgejo/push:
// recipient: bob
// type: INFO
// priority: normal # optional
// payload_template: "..."
// drop_folder:
// path: ~/Nyx/workspace/incoming/
// poll_fallback_seconds: 30
//
// Either source may be omitted; the Collector starts with whatever is
// configured. Validation is strict — an unknown top-level field or a
// malformed route returns an error.
package config
import (
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"gopkg.in/yaml.v3"
"git.botbought.ai/foreman/agent-watcher/internal/source/webhook"
)
// Config is the loaded, validated configuration.
type Config struct {
Agent string `yaml:"agent"`
InboxDir string `yaml:"inbox_dir,omitempty"`
Sources Sources `yaml:"sources"`
}
// Sources groups per-source-type configuration.
type Sources struct {
Webhook *WebhookConfig `yaml:"webhook,omitempty"`
DropFolder *DropFolderConfig `yaml:"drop_folder,omitempty"`
}
// WebhookConfig configures the HTTP webhook source.
type WebhookConfig struct {
Listen string `yaml:"listen"`
Routes map[string]WebhookRoute `yaml:"routes"`
}
// WebhookRoute is one entry in the webhook routing table.
type WebhookRoute struct {
Recipient string `yaml:"recipient"`
Type string `yaml:"type"`
Priority string `yaml:"priority,omitempty"`
PayloadTemplate string `yaml:"payload_template"`
}
// DropFolderConfig configures the drop-folder source.
type DropFolderConfig struct {
Path string `yaml:"path"`
PollFallbackSeconds int `yaml:"poll_fallback_seconds,omitempty"`
}
// Default values applied when fields are unset.
const (
DefaultInboxDir = "~/Nyx/workspace/pings"
DefaultDropPath = "~/Nyx/workspace/incoming"
DefaultWebhookListen = "127.0.0.1:18790"
DefaultPollFallbackSeconds = 30
)
// Load parses the YAML file at path, applies defaults, and validates.
func Load(path string) (*Config, error) {
body, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("config: read %s: %w", path, err)
}
return parse(body)
}
func parse(body []byte) (*Config, error) {
var c Config
dec := yaml.NewDecoder(strings.NewReader(string(body)))
dec.KnownFields(true)
if err := dec.Decode(&c); err != nil {
return nil, fmt.Errorf("config: parse: %w", err)
}
if err := c.applyDefaultsAndValidate(); err != nil {
return nil, err
}
return &c, nil
}
func (c *Config) applyDefaultsAndValidate() error {
if c.Agent == "" {
return errors.New("config: agent is required")
}
if c.InboxDir == "" {
c.InboxDir = DefaultInboxDir
}
c.InboxDir = expand(c.InboxDir)
if c.Sources.Webhook == nil && c.Sources.DropFolder == nil {
return errors.New("config: at least one source (webhook or drop_folder) must be configured")
}
if w := c.Sources.Webhook; w != nil {
if w.Listen == "" {
w.Listen = DefaultWebhookListen
}
if len(w.Routes) == 0 {
return errors.New("config: webhook configured but no routes defined")
}
for path, r := range w.Routes {
if !strings.HasPrefix(path, "/") {
return fmt.Errorf("config: webhook route %q must start with /", path)
}
if r.Recipient == "" {
return fmt.Errorf("config: webhook route %s: recipient required", path)
}
if r.Type == "" {
return fmt.Errorf("config: webhook route %s: type required", path)
}
if r.PayloadTemplate == "" {
return fmt.Errorf("config: webhook route %s: payload_template required", path)
}
}
}
if d := c.Sources.DropFolder; d != nil {
if d.Path == "" {
d.Path = DefaultDropPath
}
d.Path = expand(d.Path)
if d.PollFallbackSeconds == 0 {
d.PollFallbackSeconds = DefaultPollFallbackSeconds
}
if d.PollFallbackSeconds < 0 {
return errors.New("config: drop_folder.poll_fallback_seconds must be >= 0")
}
}
return nil
}
// expand expands a leading ~/ to $HOME.
func expand(p string) string {
if !strings.HasPrefix(p, "~/") && p != "~" {
return p
}
home, err := os.UserHomeDir()
if err != nil || home == "" {
return p
}
if p == "~" {
return home
}
return filepath.Join(home, p[2:])
}
// ToWebhookRouteMap converts validated routes to the source/webhook.Route
// shape so main.go can hand them straight to webhook.New.
func (w *WebhookConfig) ToWebhookRouteMap() map[string]webhook.Route {
out := make(map[string]webhook.Route, len(w.Routes))
for path, r := range w.Routes {
out[path] = webhook.Route{
Recipient: r.Recipient,
Type: r.Type,
Priority: r.Priority,
PayloadTemplate: r.PayloadTemplate,
}
}
return out
}

View file

@ -0,0 +1,168 @@
package config
import (
"strings"
"testing"
)
func TestParse_FullValid(t *testing.T) {
body := []byte(`
agent: foreman
inbox_dir: /tmp/inbox
sources:
webhook:
listen: 127.0.0.1:18790
routes:
/forgejo/push:
recipient: bob
type: INFO
payload_template: "push to {{ .repo }}"
/alert:
recipient: bob
type: NEEDS-RESPONSE
priority: urgent
payload_template: "alert"
drop_folder:
path: /tmp/incoming
poll_fallback_seconds: 60
`)
c, err := parse(body)
if err != nil {
t.Fatal(err)
}
if c.Agent != "foreman" {
t.Errorf("agent = %q", c.Agent)
}
if c.InboxDir != "/tmp/inbox" {
t.Errorf("inbox = %q", c.InboxDir)
}
if c.Sources.Webhook == nil || len(c.Sources.Webhook.Routes) != 2 {
t.Errorf("webhook routes: %+v", c.Sources.Webhook)
}
if c.Sources.DropFolder == nil || c.Sources.DropFolder.Path != "/tmp/incoming" {
t.Errorf("dropfolder: %+v", c.Sources.DropFolder)
}
}
func TestParse_AppliesDefaults(t *testing.T) {
body := []byte(`
agent: foreman
sources:
drop_folder:
path: /tmp/x
`)
c, err := parse(body)
if err != nil {
t.Fatal(err)
}
if c.InboxDir == "" {
t.Error("inbox_dir default not applied")
}
if c.Sources.DropFolder.PollFallbackSeconds != DefaultPollFallbackSeconds {
t.Errorf("poll default = %d", c.Sources.DropFolder.PollFallbackSeconds)
}
}
func TestParse_HomeExpansion(t *testing.T) {
body := []byte(`
agent: foreman
sources:
drop_folder:
path: ~/foo/bar
`)
c, err := parse(body)
if err != nil {
t.Fatal(err)
}
if strings.HasPrefix(c.Sources.DropFolder.Path, "~") {
t.Errorf("~ not expanded: %q", c.Sources.DropFolder.Path)
}
if !strings.HasSuffix(c.Sources.DropFolder.Path, "/foo/bar") {
t.Errorf("expansion lost suffix: %q", c.Sources.DropFolder.Path)
}
}
func TestParse_RejectsInvalid(t *testing.T) {
cases := map[string]string{
"missing agent": `sources:\n webhook:\n routes: {}`,
"no sources": `agent: x`,
"empty webhook routes": `
agent: x
sources:
webhook:
listen: 127.0.0.1:18790
routes: {}
`,
"route missing recipient": `
agent: x
sources:
webhook:
routes:
/x:
type: INFO
payload_template: y
`,
"route missing type": `
agent: x
sources:
webhook:
routes:
/x:
recipient: r
payload_template: y
`,
"route missing template": `
agent: x
sources:
webhook:
routes:
/x:
recipient: r
type: INFO
`,
"route path no slash": `
agent: x
sources:
webhook:
routes:
bad:
recipient: r
type: INFO
payload_template: y
`,
"unknown top-level field": `
agent: x
typo: oops
sources:
drop_folder:
path: /x
`,
"poll negative": `
agent: x
sources:
drop_folder:
path: /x
poll_fallback_seconds: -1
`,
}
for name, body := range cases {
t.Run(name, func(t *testing.T) {
if _, err := parse([]byte(body)); err == nil {
t.Errorf("expected error for: %s", name)
}
})
}
}
func TestToWebhookRouteMap(t *testing.T) {
w := &WebhookConfig{
Routes: map[string]WebhookRoute{
"/x": {Recipient: "r", Type: "INFO", Priority: "urgent", PayloadTemplate: "p"},
},
}
got := w.ToWebhookRouteMap()
r := got["/x"]
if r.Recipient != "r" || r.Type != "INFO" || r.Priority != "urgent" || r.PayloadTemplate != "p" {
t.Errorf("conversion lost data: %+v", r)
}
}

129
internal/inbox/inbox.go Normal file
View file

@ -0,0 +1,129 @@
// Package inbox writes ping-shaped JSONL events to recipient inbox files.
//
// The output format is bit-identical to the agent-ping CLI's output, so the
// existing UserPromptSubmit hook and the future MCP Watcher cannot tell whether
// a line was written by `ping` or by the Collector.
//
// All writes are append-only with O_APPEND so concurrent writers don't tear
// each other's lines (POSIX guarantees atomicity for writes <= PIPE_BUF, and
// our lines are well under that).
package inbox
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
)
// Event is the JSONL line shape. Field order and JSON keys match the ping CLI
// exactly. Optional fields use omitempty.
type Event struct {
TS string `json:"ts"`
ID string `json:"id"`
From string `json:"from"`
To string `json:"to"`
Type string `json:"type"`
Priority string `json:"priority"`
Payload string `json:"payload"`
Sentinel string `json:"sentinel,omitempty"`
Source string `json:"source,omitempty"` // collector-only debug field, e.g. "webhook:/forgejo/push"
}
// Writer appends events to inbox files under a base directory.
//
// Concurrency: a single Writer is safe for use from multiple goroutines.
// One sync.Mutex per recipient bounds contention to per-file granularity.
//
// The locks map grows monotonically as new recipients appear. In practice
// the agent network has <10 recipients, so the leak is bounded; if this
// daemon ever lives in a system with thousands of recipients, replace the
// map with an LRU or shard-and-evict scheme.
type Writer struct {
dir string
mu sync.Mutex
locks map[string]*sync.Mutex
}
// NewWriter returns a Writer that appends to {dir}/{recipient}.inbox.
// The directory is created (with perms 0755) if it does not exist.
func NewWriter(dir string) (*Writer, error) {
if dir == "" {
return nil, fmt.Errorf("inbox: empty dir")
}
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("inbox: mkdir %s: %w", dir, err)
}
return &Writer{
dir: dir,
locks: make(map[string]*sync.Mutex),
}, nil
}
// Write appends one event to the recipient's inbox file. It mutates ev to set
// TS and ID if either is empty, then sets To from recipient.
func (w *Writer) Write(recipient string, ev *Event) error {
if recipient == "" {
return fmt.Errorf("inbox: empty recipient")
}
if ev == nil {
return fmt.Errorf("inbox: nil event")
}
if ev.TS == "" {
ev.TS = time.Now().UTC().Format("2006-01-02T15:04:05Z")
}
if ev.ID == "" {
id, err := newID()
if err != nil {
return fmt.Errorf("inbox: generate id: %w", err)
}
ev.ID = id
}
ev.To = recipient
line, err := json.Marshal(ev)
if err != nil {
return fmt.Errorf("inbox: marshal: %w", err)
}
line = append(line, '\n')
lock := w.lockFor(recipient)
lock.Lock()
defer lock.Unlock()
path := filepath.Join(w.dir, recipient+".inbox")
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("inbox: open %s: %w", path, err)
}
defer f.Close()
if _, err := f.Write(line); err != nil {
return fmt.Errorf("inbox: append: %w", err)
}
return nil
}
func (w *Writer) lockFor(recipient string) *sync.Mutex {
w.mu.Lock()
defer w.mu.Unlock()
if l, ok := w.locks[recipient]; ok {
return l
}
l := &sync.Mutex{}
w.locks[recipient] = l
return l
}
func newID() (string, error) {
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
return "", err
}
return "ping-" + hex.EncodeToString(b), nil
}

View file

@ -0,0 +1,182 @@
package inbox
import (
"encoding/json"
"os"
"path/filepath"
"strings"
"sync"
"testing"
)
func TestWriter_Write_BasicShape(t *testing.T) {
dir := t.TempDir()
w, err := NewWriter(dir)
if err != nil {
t.Fatal(err)
}
if err := w.Write("bob", &Event{
From: "foreman",
Type: "INFO",
Priority: "normal",
Payload: "hi",
}); err != nil {
t.Fatal(err)
}
got := readLines(t, filepath.Join(dir, "bob.inbox"))
if len(got) != 1 {
t.Fatalf("want 1 line, got %d", len(got))
}
var ev Event
if err := json.Unmarshal([]byte(got[0]), &ev); err != nil {
t.Fatalf("not JSON: %v", err)
}
if ev.To != "bob" {
t.Errorf("To = %q, want bob", ev.To)
}
if ev.From != "foreman" {
t.Errorf("From = %q, want foreman", ev.From)
}
if ev.Payload != "hi" {
t.Errorf("Payload = %q", ev.Payload)
}
if !strings.HasPrefix(ev.ID, "ping-") {
t.Errorf("ID %q missing ping- prefix", ev.ID)
}
if len(ev.ID) != len("ping-")+8 {
t.Errorf("ID %q wrong length", ev.ID)
}
if ev.TS == "" {
t.Errorf("TS unset")
}
}
func TestWriter_Write_PreservesProvidedIDAndTS(t *testing.T) {
dir := t.TempDir()
w, err := NewWriter(dir)
if err != nil {
t.Fatal(err)
}
want := &Event{
TS: "2026-05-06T12:00:00Z",
ID: "ping-deadbeef",
From: "x",
Type: "INFO",
Priority: "normal",
Payload: "p",
}
if err := w.Write("r", want); err != nil {
t.Fatal(err)
}
got := readLines(t, filepath.Join(dir, "r.inbox"))
var ev Event
json.Unmarshal([]byte(got[0]), &ev)
if ev.ID != want.ID {
t.Errorf("ID overwritten: got %q want %q", ev.ID, want.ID)
}
if ev.TS != want.TS {
t.Errorf("TS overwritten: got %q want %q", ev.TS, want.TS)
}
}
func TestWriter_Write_OmitsEmptyOptionalFields(t *testing.T) {
dir := t.TempDir()
w, _ := NewWriter(dir)
w.Write("r", &Event{From: "f", Type: "INFO", Priority: "normal", Payload: "p"})
line := readLines(t, filepath.Join(dir, "r.inbox"))[0]
if strings.Contains(line, "sentinel") {
t.Errorf("empty sentinel leaked into JSON: %s", line)
}
if strings.Contains(line, "source") {
t.Errorf("empty source leaked into JSON: %s", line)
}
}
func TestWriter_Write_PingShapeMatchesCLI(t *testing.T) {
// The ping CLI emits JSON with separators=(",", ":") (compact) and these
// keys in this order: ts, id, from, to, type, priority, payload [, sentinel].
// Verify our marshaled output preserves the key set and compactness.
dir := t.TempDir()
w, _ := NewWriter(dir)
w.Write("r", &Event{
From: "f", Type: "INFO", Priority: "normal", Payload: "p", Sentinel: "/x",
})
line := strings.TrimSpace(readLines(t, filepath.Join(dir, "r.inbox"))[0])
wantKeys := []string{`"ts":`, `"id":`, `"from":`, `"to":`, `"type":`, `"priority":`, `"payload":`, `"sentinel":`}
for _, k := range wantKeys {
if !strings.Contains(line, k) {
t.Errorf("missing key %s in: %s", k, line)
}
}
// compact (no spaces around colons or commas)
if strings.Contains(line, ": ") || strings.Contains(line, ", ") {
t.Errorf("non-compact JSON: %s", line)
}
}
func TestWriter_Write_ConcurrentSameRecipient(t *testing.T) {
// 100 goroutines writing to the same inbox should produce 100 valid JSON
// lines, none torn or interleaved.
dir := t.TempDir()
w, _ := NewWriter(dir)
const n = 100
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
w.Write("r", &Event{From: "f", Type: "INFO", Priority: "normal", Payload: "x"})
}(i)
}
wg.Wait()
lines := readLines(t, filepath.Join(dir, "r.inbox"))
if len(lines) != n {
t.Fatalf("got %d lines, want %d (torn writes?)", len(lines), n)
}
for i, l := range lines {
var ev Event
if err := json.Unmarshal([]byte(l), &ev); err != nil {
t.Errorf("line %d not JSON: %v: %s", i, err, l)
}
}
}
func TestWriter_Write_RejectsBadInputs(t *testing.T) {
w, _ := NewWriter(t.TempDir())
if err := w.Write("", &Event{}); err == nil {
t.Error("empty recipient: expected error")
}
if err := w.Write("r", nil); err == nil {
t.Error("nil event: expected error")
}
}
func TestNewWriter_RejectsEmptyDir(t *testing.T) {
if _, err := NewWriter(""); err == nil {
t.Error("expected error for empty dir")
}
}
func readLines(t *testing.T, path string) []string {
t.Helper()
b, err := os.ReadFile(path)
if err != nil {
t.Fatal(err)
}
out := []string{}
for _, l := range strings.Split(strings.TrimRight(string(b), "\n"), "\n") {
if l != "" {
out = append(out, l)
}
}
return out
}

View file

@ -0,0 +1,259 @@
// Package dropfolder implements the drop-folder Source.
//
// Producers drop *.json files into a designated directory. The Source ingests
// each file, validates it against a schema, emits an event, and deletes the
// file. Malformed files are moved to a .dead-letter/ subdirectory for
// inspection.
//
// Lifecycle: on startup, scan the folder and ingest any pre-existing files
// (catches up after a Collector restart). Then watch via inotify for new
// arrivals. A periodic poll (default 30s) is a safety net in case inotify
// misses a Syncthing-driven rename — Syncthing usually fires IN_CLOSE_WRITE
// on rename, but the spec calls for a fallback.
//
// Drop file schema (per spec §3.1.2):
//
// {
// "recipient": "bob",
// "type": "INFO" | "NEEDS-RESPONSE" | "ACK-REQUEST",
// "priority": "normal" | "urgent", // optional, default "normal"
// "payload": "...",
// "sentinel": "/path/optional" // optional
// }
package dropfolder
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"git.botbought.ai/foreman/agent-watcher/internal/inbox"
"git.botbought.ai/foreman/agent-watcher/internal/source"
)
// Validation rules duplicated from agent-ping's CLI for tightness.
var validTypes = map[string]bool{
"INFO": true,
"NEEDS-RESPONSE": true,
"ACK-REQUEST": true,
}
var validPriorities = map[string]bool{
"": true, // means "normal"
"normal": true,
"urgent": true,
}
// Config configures a drop-folder Source.
type Config struct {
// Path is the directory to watch. Created if absent.
Path string
// PollFallbackSeconds is a periodic full-scan interval in case inotify
// misses an event. Set to 0 to disable.
PollFallbackSeconds int
}
// Source watches a directory for *.json drop files and emits events.
type Source struct {
cfg Config
logger *slog.Logger
}
// New returns a configured (but not running) Source.
func New(cfg Config, logger *slog.Logger) *Source {
if logger == nil {
logger = slog.Default()
}
if cfg.PollFallbackSeconds == 0 {
cfg.PollFallbackSeconds = 30
}
return &Source{cfg: cfg, logger: logger.With("source", "drop-folder", "path", cfg.Path)}
}
func (s *Source) Name() string { return "drop-folder" }
// Run blocks until ctx is canceled.
func (s *Source) Run(ctx context.Context, emit source.Emit) error {
if err := os.MkdirAll(s.cfg.Path, 0755); err != nil {
return fmt.Errorf("dropfolder: mkdir %s: %w", s.cfg.Path, err)
}
if err := os.MkdirAll(s.deadLetterDir(), 0755); err != nil {
return fmt.Errorf("dropfolder: mkdir dead-letter: %w", err)
}
w, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("dropfolder: new watcher: %w", err)
}
defer w.Close()
if err := w.Add(s.cfg.Path); err != nil {
return fmt.Errorf("dropfolder: add %s: %w", s.cfg.Path, err)
}
// Initial scan — drain any files that landed before we were watching.
s.scan(ctx, emit)
pollEvery := time.Duration(s.cfg.PollFallbackSeconds) * time.Second
var pollCh <-chan time.Time
if pollEvery > 0 {
t := time.NewTicker(pollEvery)
defer t.Stop()
pollCh = t.C
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case ev, ok := <-w.Events:
if !ok {
return errors.New("dropfolder: watcher closed")
}
// Care about creates and renames-into (Syncthing uses rename).
if ev.Op&(fsnotify.Create|fsnotify.Write) == 0 {
continue
}
if !strings.HasSuffix(ev.Name, ".json") {
continue
}
s.handle(ctx, emit, ev.Name)
case err, ok := <-w.Errors:
if !ok {
return errors.New("dropfolder: watcher errors closed")
}
s.logger.Warn("watcher error", "err", err)
case <-pollCh:
s.scan(ctx, emit)
}
}
}
// scan ingests every *.json file currently in the drop folder.
func (s *Source) scan(ctx context.Context, emit source.Emit) {
entries, err := os.ReadDir(s.cfg.Path)
if err != nil {
s.logger.Error("scan readdir", "err", err)
return
}
for _, e := range entries {
if ctx.Err() != nil {
return
}
if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") {
continue
}
s.handle(ctx, emit, filepath.Join(s.cfg.Path, e.Name()))
}
}
// handle reads, validates, emits, and removes (or dead-letters) one drop file.
func (s *Source) handle(ctx context.Context, emit source.Emit, path string) {
if ctx.Err() != nil {
return
}
body, err := os.ReadFile(path)
if err != nil {
// File may have been ingested already by another loop iteration.
if errors.Is(err, os.ErrNotExist) {
return
}
s.logger.Warn("read drop file", "path", path, "err", err)
s.deadLetter(path, "read failed")
return
}
pd, err := parseDrop(body)
if err != nil {
s.logger.Warn("invalid drop file", "path", path, "err", err)
s.deadLetter(path, "schema invalid: "+err.Error())
return
}
pd.event.Source = "drop-folder:" + filepath.Base(path)
if err := emit(pd.recipient, pd.event); err != nil {
// Inbox write failure: don't lose the message. Leave the file in
// place so the next scan retries. Log loudly.
s.logger.Error("emit failed; drop file retained for retry", "path", path, "err", err)
return
}
if err := os.Remove(path); err != nil {
s.logger.Warn("remove drop file", "path", path, "err", err)
}
}
func (s *Source) deadLetter(path, reason string) {
target := filepath.Join(s.deadLetterDir(), filepath.Base(path))
if err := os.Rename(path, target); err != nil {
s.logger.Error("dead-letter move failed", "path", path, "err", err)
return
}
// Sidecar reason file for forensics.
_ = os.WriteFile(target+".reason", []byte(reason+"\n"), 0644)
s.logger.Info("moved to dead-letter", "from", path, "to", target, "reason", reason)
}
func (s *Source) deadLetterDir() string {
return filepath.Join(s.cfg.Path, ".dead-letter")
}
// dropFile is the on-disk schema; intermediate value during parse.
type dropFile struct {
Recipient string `json:"recipient"`
Type string `json:"type"`
Priority string `json:"priority,omitempty"`
Payload string `json:"payload"`
Sentinel string `json:"sentinel,omitempty"`
}
type parsedDrop struct {
recipient string
event *inbox.Event
}
func parseDrop(body []byte) (*parsedDrop, error) {
var d dropFile
dec := json.NewDecoder(strings.NewReader(string(body)))
dec.DisallowUnknownFields()
if err := dec.Decode(&d); err != nil {
return nil, fmt.Errorf("decode: %w", err)
}
if d.Recipient == "" {
return nil, errors.New("recipient required")
}
if !validTypes[d.Type] {
return nil, fmt.Errorf("type must be INFO|NEEDS-RESPONSE|ACK-REQUEST, got %q", d.Type)
}
if !validPriorities[d.Priority] {
return nil, fmt.Errorf("priority must be normal|urgent, got %q", d.Priority)
}
if d.Payload == "" {
return nil, errors.New("payload required")
}
priority := d.Priority
if priority == "" {
priority = "normal"
}
return &parsedDrop{
recipient: d.Recipient,
event: &inbox.Event{
From: "collector",
Type: d.Type,
Priority: priority,
Payload: d.Payload,
Sentinel: d.Sentinel,
},
}, nil
}

View file

@ -0,0 +1,273 @@
package dropfolder
import (
"context"
"encoding/json"
"io"
"log/slog"
"os"
"path/filepath"
"sync"
"testing"
"time"
"git.botbought.ai/foreman/agent-watcher/internal/inbox"
)
func quietLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(io.Discard, nil))
}
func TestParseDrop_Valid(t *testing.T) {
body := []byte(`{"recipient":"bob","type":"INFO","priority":"urgent","payload":"hi","sentinel":"/x"}`)
p, err := parseDrop(body)
if err != nil {
t.Fatal(err)
}
if p.recipient != "bob" {
t.Errorf("recipient = %q", p.recipient)
}
if p.event.Type != "INFO" || p.event.Priority != "urgent" || p.event.Payload != "hi" || p.event.Sentinel != "/x" {
t.Errorf("unexpected event: %+v", p.event)
}
}
func TestParseDrop_DefaultsPriority(t *testing.T) {
p, err := parseDrop([]byte(`{"recipient":"r","type":"INFO","payload":"p"}`))
if err != nil {
t.Fatal(err)
}
if p.event.Priority != "normal" {
t.Errorf("priority default = %q, want normal", p.event.Priority)
}
}
func TestParseDrop_Invalid(t *testing.T) {
cases := map[string]string{
"empty body": ``,
"missing recipient": `{"type":"INFO","payload":"p"}`,
"missing type": `{"recipient":"r","payload":"p"}`,
"bad type": `{"recipient":"r","type":"NOPE","payload":"p"}`,
"bad priority": `{"recipient":"r","type":"INFO","priority":"high","payload":"p"}`,
"missing payload": `{"recipient":"r","type":"INFO"}`,
"unknown field": `{"recipient":"r","type":"INFO","payload":"p","stowaway":1}`,
"not json": `not json`,
}
for name, body := range cases {
t.Run(name, func(t *testing.T) {
if _, err := parseDrop([]byte(body)); err == nil {
t.Error("expected error")
}
})
}
}
// recordingEmit captures emitted events for assertion.
type recordingEmit struct {
mu sync.Mutex
events []record
err error // set to fail the next emit
}
type record struct {
recipient string
event inbox.Event
}
func (r *recordingEmit) emit(recipient string, ev *inbox.Event) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.err != nil {
err := r.err
r.err = nil
return err
}
r.events = append(r.events, record{recipient, *ev})
return nil
}
func (r *recordingEmit) snapshot() []record {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]record, len(r.events))
copy(out, r.events)
return out
}
func writeDrop(t *testing.T, dir, name string, payload any) string {
t.Helper()
path := filepath.Join(dir, name)
b, err := json.Marshal(payload)
if err != nil {
t.Fatal(err)
}
tmp := path + ".tmp"
if err := os.WriteFile(tmp, b, 0644); err != nil {
t.Fatal(err)
}
// rename for atomic visibility (mirrors what Syncthing does)
if err := os.Rename(tmp, path); err != nil {
t.Fatal(err)
}
return path
}
func TestSource_InitialScan_IngestsExistingFiles(t *testing.T) {
dir := t.TempDir()
writeDrop(t, dir, "1.json", map[string]string{
"recipient": "bob", "type": "INFO", "payload": "first",
})
writeDrop(t, dir, "2.json", map[string]string{
"recipient": "bob", "type": "NEEDS-RESPONSE", "payload": "second",
})
rec := &recordingEmit{}
src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger())
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
go src.Run(ctx, rec.emit)
waitFor(t, func() bool { return len(rec.snapshot()) == 2 }, 400*time.Millisecond)
got := rec.snapshot()
if got[0].event.Payload+"|"+got[1].event.Payload != "first|second" &&
got[0].event.Payload+"|"+got[1].event.Payload != "second|first" {
t.Errorf("unexpected payloads: %+v", got)
}
for _, r := range got {
if r.recipient != "bob" {
t.Errorf("recipient %q", r.recipient)
}
}
}
func TestSource_LiveDrop_IngestsViaInotify(t *testing.T) {
dir := t.TempDir()
rec := &recordingEmit{}
src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go src.Run(ctx, rec.emit)
// Give the watcher time to attach.
time.Sleep(50 * time.Millisecond)
writeDrop(t, dir, "live.json", map[string]string{
"recipient": "foreman", "type": "INFO", "payload": "live one",
})
waitFor(t, func() bool { return len(rec.snapshot()) == 1 }, 1*time.Second)
got := rec.snapshot()
if got[0].event.Payload != "live one" {
t.Errorf("payload %q", got[0].event.Payload)
}
}
func TestSource_DeleteAfterEmit(t *testing.T) {
dir := t.TempDir()
rec := &recordingEmit{}
src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go src.Run(ctx, rec.emit)
time.Sleep(50 * time.Millisecond)
path := writeDrop(t, dir, "ok.json", map[string]string{
"recipient": "r", "type": "INFO", "payload": "p",
})
waitFor(t, func() bool {
_, err := os.Stat(path)
return os.IsNotExist(err)
}, 1*time.Second)
}
func TestSource_DeadLetter_OnInvalidSchema(t *testing.T) {
dir := t.TempDir()
rec := &recordingEmit{}
src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go src.Run(ctx, rec.emit)
time.Sleep(50 * time.Millisecond)
// Missing payload — invalid.
tmp := filepath.Join(dir, "bad.json.tmp")
os.WriteFile(tmp, []byte(`{"recipient":"r","type":"INFO"}`), 0644)
os.Rename(tmp, filepath.Join(dir, "bad.json"))
deadPath := filepath.Join(dir, ".dead-letter", "bad.json")
waitFor(t, func() bool {
_, err := os.Stat(deadPath)
return err == nil
}, 1*time.Second)
// Reason sidecar exists.
if _, err := os.Stat(deadPath + ".reason"); err != nil {
t.Errorf("reason sidecar missing: %v", err)
}
// Original file gone from drop dir.
if _, err := os.Stat(filepath.Join(dir, "bad.json")); !os.IsNotExist(err) {
t.Errorf("original drop file should be gone")
}
// Nothing was emitted.
if got := rec.snapshot(); len(got) != 0 {
t.Errorf("got events for invalid drop: %+v", got)
}
}
func TestSource_RetainsFile_WhenEmitFails(t *testing.T) {
dir := t.TempDir()
rec := &recordingEmit{err: errOnce()}
src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go src.Run(ctx, rec.emit)
time.Sleep(50 * time.Millisecond)
path := writeDrop(t, dir, "retry.json", map[string]string{
"recipient": "r", "type": "INFO", "payload": "p",
})
// Wait long enough for the (failing) emit to have happened.
time.Sleep(200 * time.Millisecond)
// Drop file is still there (retained for retry).
if _, err := os.Stat(path); err != nil {
t.Errorf("drop file should be retained on emit failure: %v", err)
}
// Not in dead-letter.
if _, err := os.Stat(filepath.Join(dir, ".dead-letter", "retry.json")); err == nil {
t.Error("emit failure should NOT dead-letter (it's transient)")
}
}
// errOnce returns a non-nil error one time, then nil after.
func errOnce() error {
type e struct{}
return &emitErr{}
}
type emitErr struct{}
func (e *emitErr) Error() string { return "transient emit failure" }
// waitFor polls predicate until it returns true or timeout elapses.
func waitFor(t *testing.T, pred func() bool, timeout time.Duration) {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if pred() {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("waitFor: predicate did not become true within %s", timeout)
}

35
internal/source/source.go Normal file
View file

@ -0,0 +1,35 @@
// Package source defines the contract every Collector source plugin implements.
//
// A Source is a long-running goroutine that converts external events into
// inbox writes. It receives an Emit callback at start; each external event it
// observes results in one Emit call. The Source returns when its context is
// canceled, or earlier on a fatal error.
//
// Sources do not own the inbox writer or any other shared state. They emit;
// the dispatcher routes. This keeps each source small and testable in
// isolation.
package source
import (
"context"
"git.botbought.ai/foreman/agent-watcher/internal/inbox"
)
// Emit is the callback a Source uses to push one observed event into the
// dispatcher. The dispatcher writes it to recipient's inbox file. Returning
// an error means the inbox write failed; sources may log and continue, or
// drop, or shut down — that's a per-source policy decision.
type Emit func(recipient string, ev *inbox.Event) error
// Source is a Collector input. Implementations are constructed with their
// own configuration and started via Run.
type Source interface {
// Name is a short identifier for logs and the "source" field on emitted
// events ("webhook", "drop-folder", etc.).
Name() string
// Run blocks until ctx is canceled or a fatal error occurs. Each
// observed external event becomes one Emit call.
Run(ctx context.Context, emit Emit) error
}

View file

@ -0,0 +1,248 @@
// Package webhook implements the HTTP webhook Source.
//
// Listens on a loopback address and routes incoming POSTs to inbox events
// according to a routing table. Each route specifies recipient, type,
// priority, and a Go text/template that renders the payload from the request
// body decoded as JSON.
//
// v1 is loopback-only (127.0.0.1) — no authentication. The spec calls for
// Caddy + bearer-token reverse-proxy as the v2 upgrade path. Do NOT bind to
// 0.0.0.0 in v1.
//
// Per spec §3.5, the same listener also exposes /health with per-source
// counters.
package webhook
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"sync/atomic"
"text/template"
"time"
"git.botbought.ai/foreman/agent-watcher/internal/inbox"
"git.botbought.ai/foreman/agent-watcher/internal/source"
)
// Route maps a URL path to an emitted event.
type Route struct {
// Recipient is the target inbox name.
Recipient string
// Type is one of INFO, NEEDS-RESPONSE, ACK-REQUEST.
Type string
// Priority is "normal" or "urgent". Empty defaults to "normal".
Priority string
// PayloadTemplate is a Go text/template rendered with the request body
// (decoded as JSON into a map[string]any) as the dot. Plain strings
// without {{}} render to themselves.
PayloadTemplate string
}
// Config configures a webhook Source.
type Config struct {
// Listen is the bind address, e.g. "127.0.0.1:18790".
Listen string
// Routes maps URL path → Route. Path includes the leading slash.
Routes map[string]Route
}
// Source serves HTTP and emits an inbox event for each matched POST.
type Source struct {
cfg Config
logger *slog.Logger
tmpls map[string]*template.Template
// counters
received atomic.Uint64
emitted atomic.Uint64
errors atomic.Uint64
startedAt time.Time
}
// New parses the route templates and returns a configured Source.
// Returns an error if any template fails to parse.
func New(cfg Config, logger *slog.Logger) (*Source, error) {
if logger == nil {
logger = slog.Default()
}
if cfg.Listen == "" {
return nil, errors.New("webhook: listen address required")
}
if !isLoopback(cfg.Listen) {
return nil, fmt.Errorf("webhook: listen %q is not a loopback address; v1 forbids non-loopback binds", cfg.Listen)
}
tmpls := make(map[string]*template.Template, len(cfg.Routes))
for path, r := range cfg.Routes {
if !validTypes[r.Type] {
return nil, fmt.Errorf("webhook: route %s: invalid type %q", path, r.Type)
}
if r.Priority != "" && r.Priority != "normal" && r.Priority != "urgent" {
return nil, fmt.Errorf("webhook: route %s: invalid priority %q", path, r.Priority)
}
t, err := template.New(path).Option("missingkey=zero").Parse(r.PayloadTemplate)
if err != nil {
return nil, fmt.Errorf("webhook: route %s: parse template: %w", path, err)
}
tmpls[path] = t
}
return &Source{
cfg: cfg,
logger: logger.With("source", "webhook", "listen", cfg.Listen),
tmpls: tmpls,
startedAt: time.Now(),
}, nil
}
func (s *Source) Name() string { return "webhook" }
// Run blocks until ctx is canceled.
func (s *Source) Run(ctx context.Context, emit source.Emit) error {
mux := http.NewServeMux()
mux.HandleFunc("/health", s.health)
mux.HandleFunc("/", s.handler(emit))
srv := &http.Server{
Addr: s.cfg.Listen,
Handler: mux,
ReadHeaderTimeout: 5 * time.Second,
}
listenErr := make(chan error, 1)
go func() {
s.logger.Info("listening")
listenErr <- srv.ListenAndServe()
}()
select {
case <-ctx.Done():
shutCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
_ = srv.Shutdown(shutCtx)
return ctx.Err()
case err := <-listenErr:
if errors.Is(err, http.ErrServerClosed) {
return nil
}
return fmt.Errorf("webhook: serve: %w", err)
}
}
func (s *Source) handler(emit source.Emit) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
s.received.Add(1)
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
route, ok := s.cfg.Routes[r.URL.Path]
if !ok {
http.Error(w, "no route", http.StatusNotFound)
return
}
body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) // 1 MiB cap
if err != nil {
s.errors.Add(1)
s.logger.Warn("read body", "path", r.URL.Path, "err", err)
http.Error(w, "read failed", http.StatusBadRequest)
return
}
var data map[string]any
if len(bytes.TrimSpace(body)) > 0 {
if err := json.Unmarshal(body, &data); err != nil {
s.errors.Add(1)
s.logger.Warn("body not JSON", "path", r.URL.Path, "err", err)
http.Error(w, "body must be JSON", http.StatusBadRequest)
return
}
} else {
data = map[string]any{}
}
var rendered bytes.Buffer
if err := s.tmpls[r.URL.Path].Execute(&rendered, data); err != nil {
s.errors.Add(1)
s.logger.Warn("template render", "path", r.URL.Path, "err", err)
http.Error(w, "render failed", http.StatusInternalServerError)
return
}
priority := route.Priority
if priority == "" {
priority = "normal"
}
ev := &inbox.Event{
From: "collector",
Type: route.Type,
Priority: priority,
Payload: rendered.String(),
Source: "webhook:" + r.URL.Path,
}
if err := emit(route.Recipient, ev); err != nil {
s.errors.Add(1)
s.logger.Error("emit", "path", r.URL.Path, "err", err)
http.Error(w, "emit failed", http.StatusInternalServerError)
return
}
s.emitted.Add(1)
w.WriteHeader(http.StatusAccepted)
_, _ = io.WriteString(w, "ok\n")
}
}
// Stats returns the current counter snapshot. Safe to call from any goroutine.
type Stats struct {
Received uint64 `json:"received"`
Emitted uint64 `json:"emitted"`
Errors uint64 `json:"errors"`
UptimeSec int64 `json:"uptime_sec"`
}
func (s *Source) Stats() Stats {
return Stats{
Received: s.received.Load(),
Emitted: s.emitted.Load(),
Errors: s.errors.Load(),
UptimeSec: int64(time.Since(s.startedAt).Seconds()),
}
}
func (s *Source) health(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(s.Stats())
}
// validTypes mirrors agent-ping. Kept here (not imported from dropfolder) so
// the webhook source has no cross-source dependency.
var validTypes = map[string]bool{
"INFO": true,
"NEEDS-RESPONSE": true,
"ACK-REQUEST": true,
}
// isLoopback returns true if addr binds only to a loopback interface.
// Accepts "127.0.0.1:port", "[::1]:port", "localhost:port".
func isLoopback(addr string) bool {
host, _, err := net.SplitHostPort(addr)
if err != nil {
return false
}
if host == "localhost" {
return true
}
ip := net.ParseIP(host)
return ip != nil && ip.IsLoopback()
}

View file

@ -0,0 +1,323 @@
package webhook
import (
"context"
"encoding/json"
"io"
"log/slog"
"net"
"net/http"
"strings"
"sync"
"testing"
"time"
"git.botbought.ai/foreman/agent-watcher/internal/inbox"
)
func quietLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(io.Discard, nil))
}
// freePort returns an unused loopback "127.0.0.1:N" address.
func freePort(t *testing.T) string {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
addr := l.Addr().String()
l.Close()
return addr
}
type recordingEmit struct {
mu sync.Mutex
events []record
err error
}
type record struct {
recipient string
event inbox.Event
}
func (r *recordingEmit) emit(recipient string, ev *inbox.Event) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.err != nil {
return r.err
}
r.events = append(r.events, record{recipient, *ev})
return nil
}
func (r *recordingEmit) snap() []record {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]record, len(r.events))
copy(out, r.events)
return out
}
func runSource(t *testing.T, cfg Config, emit func(string, *inbox.Event) error) (string, context.CancelFunc) {
t.Helper()
src, err := New(cfg, quietLogger())
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
go src.Run(ctx, emit)
// Wait for listener.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
c, err := net.DialTimeout("tcp", cfg.Listen, 50*time.Millisecond)
if err == nil {
c.Close()
return cfg.Listen, cancel
}
time.Sleep(20 * time.Millisecond)
}
cancel()
t.Fatal("listener never came up")
return "", nil
}
func TestNew_RejectsNonLoopback(t *testing.T) {
_, err := New(Config{Listen: "0.0.0.0:18790"}, quietLogger())
if err == nil {
t.Error("expected error for non-loopback")
}
}
func TestNew_RejectsBadType(t *testing.T) {
_, err := New(Config{
Listen: "127.0.0.1:0",
Routes: map[string]Route{
"/x": {Recipient: "r", Type: "BOGUS", PayloadTemplate: "x"},
},
}, quietLogger())
if err == nil {
t.Error("expected error for bad type")
}
}
func TestNew_RejectsBadTemplate(t *testing.T) {
_, err := New(Config{
Listen: "127.0.0.1:0",
Routes: map[string]Route{
"/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "{{ .unclosed"},
},
}, quietLogger())
if err == nil {
t.Error("expected error for bad template")
}
}
func TestPOST_RoutesAndEmits(t *testing.T) {
addr := freePort(t)
cfg := Config{
Listen: addr,
Routes: map[string]Route{
"/forgejo/push": {
Recipient: "bob",
Type: "INFO",
PayloadTemplate: "forgejo push to {{ .repo }} by {{ .actor }}",
},
},
}
rec := &recordingEmit{}
_, cancel := runSource(t, cfg, rec.emit)
defer cancel()
body := `{"repo":"agent-ping","actor":"angus"}`
resp, err := http.Post("http://"+addr+"/forgejo/push", "application/json", strings.NewReader(body))
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Errorf("status = %d, want 202", resp.StatusCode)
}
got := rec.snap()
if len(got) != 1 {
t.Fatalf("got %d events, want 1", len(got))
}
if got[0].recipient != "bob" {
t.Errorf("recipient = %q", got[0].recipient)
}
if got[0].event.Type != "INFO" {
t.Errorf("type = %q", got[0].event.Type)
}
if got[0].event.Priority != "normal" {
t.Errorf("priority default = %q", got[0].event.Priority)
}
if got[0].event.Payload != "forgejo push to agent-ping by angus" {
t.Errorf("payload = %q", got[0].event.Payload)
}
if got[0].event.Source != "webhook:/forgejo/push" {
t.Errorf("source = %q", got[0].event.Source)
}
}
func TestPOST_PriorityAndEmptyBody(t *testing.T) {
addr := freePort(t)
cfg := Config{
Listen: addr,
Routes: map[string]Route{
"/alert": {
Recipient: "bob",
Type: "NEEDS-RESPONSE",
Priority: "urgent",
PayloadTemplate: "alert fired",
},
},
}
rec := &recordingEmit{}
_, cancel := runSource(t, cfg, rec.emit)
defer cancel()
resp, err := http.Post("http://"+addr+"/alert", "application/json", strings.NewReader(""))
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Errorf("status = %d", resp.StatusCode)
}
got := rec.snap()[0]
if got.event.Priority != "urgent" {
t.Errorf("priority = %q", got.event.Priority)
}
if got.event.Payload != "alert fired" {
t.Errorf("payload = %q", got.event.Payload)
}
}
func TestPOST_404OnUnknownRoute(t *testing.T) {
addr := freePort(t)
cfg := Config{Listen: addr, Routes: map[string]Route{}}
rec := &recordingEmit{}
_, cancel := runSource(t, cfg, rec.emit)
defer cancel()
resp, err := http.Post("http://"+addr+"/nope", "application/json", strings.NewReader("{}"))
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusNotFound {
t.Errorf("status = %d, want 404", resp.StatusCode)
}
if len(rec.snap()) != 0 {
t.Error("emitted on unknown route")
}
}
func TestPOST_400OnBadJSON(t *testing.T) {
addr := freePort(t)
cfg := Config{
Listen: addr,
Routes: map[string]Route{
"/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"},
},
}
rec := &recordingEmit{}
_, cancel := runSource(t, cfg, rec.emit)
defer cancel()
resp, err := http.Post("http://"+addr+"/x", "application/json", strings.NewReader("not json"))
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusBadRequest {
t.Errorf("status = %d, want 400", resp.StatusCode)
}
}
func TestPOST_500OnEmitFailure(t *testing.T) {
addr := freePort(t)
cfg := Config{
Listen: addr,
Routes: map[string]Route{
"/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"},
},
}
rec := &recordingEmit{err: errOnce()}
_, cancel := runSource(t, cfg, rec.emit)
defer cancel()
resp, err := http.Post("http://"+addr+"/x", "application/json", strings.NewReader("{}"))
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusInternalServerError {
t.Errorf("status = %d, want 500", resp.StatusCode)
}
}
func TestGET_MethodNotAllowed(t *testing.T) {
addr := freePort(t)
cfg := Config{
Listen: addr,
Routes: map[string]Route{
"/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"},
},
}
rec := &recordingEmit{}
_, cancel := runSource(t, cfg, rec.emit)
defer cancel()
resp, err := http.Get("http://" + addr + "/x")
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusMethodNotAllowed {
t.Errorf("status = %d", resp.StatusCode)
}
}
func TestGET_HealthEndpoint(t *testing.T) {
addr := freePort(t)
cfg := Config{
Listen: addr,
Routes: map[string]Route{
"/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"},
},
}
rec := &recordingEmit{}
_, cancel := runSource(t, cfg, rec.emit)
defer cancel()
// Two POSTs: 1 success, 1 fail.
http.Post("http://"+addr+"/x", "application/json", strings.NewReader("{}"))
http.Post("http://"+addr+"/wrongpath", "application/json", strings.NewReader("{}"))
resp, err := http.Get("http://" + addr + "/health")
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("status = %d", resp.StatusCode)
}
var stats Stats
if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil {
t.Fatal(err)
}
if stats.Received < 2 {
t.Errorf("received = %d", stats.Received)
}
if stats.Emitted != 1 {
t.Errorf("emitted = %d, want 1", stats.Emitted)
}
}
type emitErr struct{}
func (e *emitErr) Error() string { return "emit boom" }
func errOnce() error { return &emitErr{} }

View file

@ -0,0 +1,26 @@
[Unit]
Description=agent-watcher Collector — converts external events to ping inbox writes
Documentation=https://git.botbought.ai/foreman/agent-watcher
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
ExecStart=%h/.local/bin/agent-watcher --json-log
Restart=on-failure
RestartSec=5
# small daemon; no need for elevated limits
LimitNOFILE=4096
# read-only by intent; the daemon writes only to the inbox dir which is
# inside $HOME and unaffected by ProtectSystem.
ProtectSystem=strict
ProtectHome=read-write
PrivateTmp=yes
NoNewPrivileges=yes
# stdout/stderr go to journald automatically; --json-log makes them parseable
StandardOutput=journal
StandardError=journal
SyslogIdentifier=agent-watcher
[Install]
WantedBy=default.target