Compare commits
9 commits
bob/mcp-wa
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 39a95ad49f | |||
| f06349eab4 | |||
|
|
3a586f38a9 | ||
|
|
4ff8c3f78d | ||
|
|
e7d4ea036a | ||
|
|
2183850c03 | ||
|
|
ba6db7c82f | ||
|
|
f9d81471c4 | ||
|
|
50e8ece83d |
18 changed files with 2588 additions and 6 deletions
95
INSTALL.md
Normal file
95
INSTALL.md
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
# INSTALL — agent-watcher Collector (Layer 1)
|
||||
|
||||
The MCP Watcher (Layer 2) has its own install path; this file covers Layer 1 only.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Linux with systemd
|
||||
- Go 1.22+ (`sudo apt install golang-go` on Ubuntu/Mint)
|
||||
- An existing agent-ping install on this host (the Collector writes to its inbox dir)
|
||||
|
||||
## Install
|
||||
|
||||
Per CLAUDE.md rule #2, **Angus runs the install commands** — agents do not modify their own configuration.
|
||||
|
||||
```sh
|
||||
git clone https://git.botbought.ai/foreman/agent-watcher ~/agent-watcher
|
||||
cd ~/agent-watcher
|
||||
./install.sh
|
||||
```
|
||||
|
||||
The script builds, installs to `~/.local/bin/agent-watcher`, drops the systemd unit into `~/.config/systemd/user/`, copies the example config to `~/.config/agent-watcher/collector.yaml` (if absent), reloads systemd, and starts the service.
|
||||
|
||||
To install without auto-starting (so you can edit the config first):
|
||||
|
||||
```sh
|
||||
./install.sh --no-start
|
||||
```
|
||||
|
||||
## Configure
|
||||
|
||||
Edit `~/.config/agent-watcher/collector.yaml`. The example has both sources configured for `agent: foreman` with placeholder routes — adapt to this host. At least one of `webhook:` or `drop_folder:` must be configured.
|
||||
|
||||
After editing:
|
||||
|
||||
```sh
|
||||
systemctl --user restart agent-watcher
|
||||
```
|
||||
|
||||
## Verify
|
||||
|
||||
```sh
|
||||
# logs
|
||||
journalctl --user -u agent-watcher -f
|
||||
|
||||
# health
|
||||
curl http://127.0.0.1:18790/health
|
||||
|
||||
# end-to-end via drop folder
|
||||
echo '{"recipient":"bob","type":"INFO","payload":"hello from drop"}' \
|
||||
> ~/Nyx/workspace/incoming/test.json
|
||||
|
||||
# end-to-end via webhook
|
||||
curl -X POST http://127.0.0.1:18790/forgejo/push \
|
||||
-H 'Content-Type: application/json' \
|
||||
-d '{"repo":"agent-ping","actor":"angus"}'
|
||||
|
||||
# the lines should appear in the recipient's inbox
|
||||
tail -2 ~/Nyx/workspace/pings/bob.inbox
|
||||
```
|
||||
|
||||
## Survive logout
|
||||
|
||||
`systemctl --user` units stop when the user logs out. To keep the Collector running across logouts:
|
||||
|
||||
```sh
|
||||
sudo loginctl enable-linger $USER
|
||||
```
|
||||
|
||||
## Uninstall
|
||||
|
||||
```sh
|
||||
systemctl --user disable --now agent-watcher
|
||||
rm ~/.local/bin/agent-watcher
|
||||
rm ~/.config/systemd/user/agent-watcher.service
|
||||
systemctl --user daemon-reload
|
||||
# config and example left behind; remove if desired:
|
||||
# rm -rf ~/.config/agent-watcher
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
| Symptom | Likely cause | Fix |
|
||||
|---|---|---|
|
||||
| `journalctl` shows `config: ... is required` | YAML field missing | Match the example, save, restart. |
|
||||
| `systemctl --user status` shows `address already in use` | port 18790 taken by something else | Edit `webhook.listen` to a free port. |
|
||||
| Drop file disappears but no inbox line | check `.dead-letter/` for the file + `.reason` sidecar | Schema validation failed — fix the producer. |
|
||||
| Webhook returns 404 | path doesn't match a configured route | Routes must match exactly; check trailing slashes. |
|
||||
| Service won't start across reboot | `linger` not enabled | `sudo loginctl enable-linger $USER` |
|
||||
| Drop file syncs back from another host before processing | drop folder is in Syncthing scope | This is intended (lets producers on other hosts deliver). Lifecycle artifacts (`.tmp`, `.dead-letter/`) are excluded by `install.sh`. |
|
||||
|
||||
## What this does NOT install
|
||||
|
||||
- The agent-ping system. Install that first (`~/agent-ping/install.sh <name>`).
|
||||
- Layer 2 (MCP Watcher). Different binary, different runtime, different unit — separate install when it lands.
|
||||
- Caddy / reverse-proxy webhook auth. v1 is loopback-only; v2 will document the upgrade path.
|
||||
44
README.md
44
README.md
|
|
@ -1,13 +1,13 @@
|
|||
# agent-watcher
|
||||
|
||||
Push-delivery layer for [`agent-ping`](http://localhost:3300/angus/agent-ping). The "secondary nervous system" for Claude Code agents on this network.
|
||||
Push-delivery layer for [`agent-ping`](https://git.botbought.ai/foreman/agent-ping). The "secondary nervous system" for Claude Code agents on this network.
|
||||
|
||||
`agent-ping` queues messages in inbox files; `agent-watcher` notices them (and other external events) and wakes the recipient agent without a human in the loop.
|
||||
|
||||
Two layers:
|
||||
|
||||
- **Collector** — small Go daemon, `systemd --user`, always on, brain-blind. Converts external events (HTTP webhooks, drop-folder file arrivals) into ping inbox writes. Runs whether or not any agent is alive.
|
||||
- **MCP Watcher** — Claude Code MCP subprocess, declared in each agent's `mcp.json`. Watches the agent's inbox via inotify and surfaces events into the live session via Channels (research preview). Provides reply tools (`ack`, `respond`, `mark_handled`).
|
||||
- **Collector** (this repo, Go) — small daemon under `systemd --user`. Always on, brain-blind. Converts external events (HTTP webhooks, drop-folder file arrivals) into ping inbox writes. Runs whether or not any agent is alive.
|
||||
- **MCP Watcher** (Python, in progress) — Claude Code MCP subprocess declared in each agent's `mcp.json`. Watches the agent's inbox via inotify and surfaces events into the live session via Channels. Provides reply tools (`ack`, `respond`, `mark_handled`).
|
||||
|
||||
Filesystem is the queue. OpenBrain is not involved.
|
||||
|
||||
|
|
@ -15,10 +15,42 @@ Filesystem is the queue. OpenBrain is not involved.
|
|||
|
||||
[`spec/agent-watcher.md`](spec/agent-watcher.md). Read that for architecture, decisions, scope.
|
||||
|
||||
Channels reference docs (snapshot of Anthropic's official docs, used by Layer 2): [`docs/channels/`](docs/channels/).
|
||||
|
||||
## Status
|
||||
|
||||
v1 spec signed off by Bob (VPS) 2026-05-06. Implementation pending.
|
||||
| Layer | Lane | Status |
|
||||
|---|---|---|
|
||||
| Spec v1 | — | Signed off by Bob 2026-05-06 |
|
||||
| Layer 1: Collector | Foreman / Go | **v0 working: 43 tests passing.** End-to-end exercised; binary builds. systemd unit + INSTALL.md ready. |
|
||||
| Layer 2: MCP Watcher | Bob / Python | In progress — sandbox CC session being set up on VPS for testing. |
|
||||
|
||||
## Install
|
||||
## Install (Layer 1)
|
||||
|
||||
Per CLAUDE.md rule #2, **Angus runs the install commands** — agents do not modify their own configuration. Install script will land alongside `INSTALL.md` once the binaries are built.
|
||||
```sh
|
||||
git clone https://git.botbought.ai/foreman/agent-watcher ~/agent-watcher
|
||||
cd ~/agent-watcher
|
||||
./install.sh
|
||||
```
|
||||
|
||||
Then edit `~/.config/agent-watcher/collector.yaml` and `systemctl --user restart agent-watcher`.
|
||||
|
||||
See [`INSTALL.md`](INSTALL.md) for verify steps, troubleshooting, and the `loginctl enable-linger` step required to keep the daemon running across logouts.
|
||||
|
||||
Per CLAUDE.md rule #2, **Angus runs the install commands** — agents do not modify their own configuration.
|
||||
|
||||
## Quick reference (Layer 1)
|
||||
|
||||
```
|
||||
Inputs Output
|
||||
───────── ──────
|
||||
HTTP POST → port 18790 ┐
|
||||
(routed via │ <recipient>.inbox (JSONL, ping-shaped)
|
||||
YAML table) │ identical format to
|
||||
├─→ what `ping <recipient> <payload>` writes;
|
||||
File drop in │ the existing UserPromptSubmit hook and the
|
||||
~/Nyx/workspace/incoming/ │ future MCP Watcher consume the stream
|
||||
*.json ┘ without distinguishing source.
|
||||
```
|
||||
|
||||
`/health` on the same webhook port returns `{received, emitted, errors, uptime_sec}` for journalctl correlation.
|
||||
|
|
|
|||
148
cmd/agent-watcher/main.go
Normal file
148
cmd/agent-watcher/main.go
Normal file
|
|
@ -0,0 +1,148 @@
|
|||
// Command agent-watcher is the Collector daemon.
|
||||
//
|
||||
// It loads a YAML config, builds the configured sources, and runs them
|
||||
// concurrently. Each source converts external events (webhook POSTs, files
|
||||
// dropped into a folder) into JSONL writes against the configured inbox
|
||||
// directory. Output is bit-identical to the agent-ping CLI's writes, so the
|
||||
// existing UserPromptSubmit hook (and the future MCP Watcher) consume the
|
||||
// stream without distinguishing source.
|
||||
//
|
||||
// Default config path: ~/.config/agent-watcher/collector.yaml. Override with
|
||||
// --config or AGENT_WATCHER_CONFIG.
|
||||
//
|
||||
// Signals: SIGINT and SIGTERM trigger graceful shutdown. SIGHUP reload is a
|
||||
// v2 item — for now restart the unit to pick up config changes.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"git.botbought.ai/foreman/agent-watcher/internal/config"
|
||||
"git.botbought.ai/foreman/agent-watcher/internal/inbox"
|
||||
"git.botbought.ai/foreman/agent-watcher/internal/source"
|
||||
"git.botbought.ai/foreman/agent-watcher/internal/source/dropfolder"
|
||||
"git.botbought.ai/foreman/agent-watcher/internal/source/webhook"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var (
|
||||
configPath = flag.String("config", "", "path to collector.yaml (default: $AGENT_WATCHER_CONFIG or ~/.config/agent-watcher/collector.yaml)")
|
||||
jsonLog = flag.Bool("json-log", false, "emit logs as JSON (for journald structured fields)")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
logger := newLogger(*jsonLog)
|
||||
|
||||
path := resolveConfigPath(*configPath)
|
||||
cfg, err := config.Load(path)
|
||||
if err != nil {
|
||||
logger.Error("config", "err", err)
|
||||
os.Exit(2)
|
||||
}
|
||||
logger = logger.With("agent", cfg.Agent)
|
||||
logger.Info("config loaded", "path", path, "inbox_dir", cfg.InboxDir)
|
||||
|
||||
if err := run(cfg, logger); err != nil {
|
||||
logger.Error("run", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func run(cfg *config.Config, logger *slog.Logger) error {
|
||||
w, err := inbox.NewWriter(cfg.InboxDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("inbox: %w", err)
|
||||
}
|
||||
emit := func(recipient string, ev *inbox.Event) error {
|
||||
return w.Write(recipient, ev)
|
||||
}
|
||||
|
||||
sources, err := buildSources(cfg, logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(sources) == 0 {
|
||||
return fmt.Errorf("no sources configured")
|
||||
}
|
||||
|
||||
ctx, cancel := signalContext()
|
||||
defer cancel()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, src := range sources {
|
||||
src := src
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
logger.Info("source starting", "name", src.Name())
|
||||
if err := src.Run(ctx, emit); err != nil && err != context.Canceled {
|
||||
logger.Error("source exited", "name", src.Name(), "err", err)
|
||||
} else {
|
||||
logger.Info("source stopped", "name", src.Name())
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildSources(cfg *config.Config, logger *slog.Logger) ([]source.Source, error) {
|
||||
var out []source.Source
|
||||
if w := cfg.Sources.Webhook; w != nil {
|
||||
s, err := webhook.New(webhook.Config{
|
||||
Listen: w.Listen,
|
||||
Routes: w.ToWebhookRouteMap(),
|
||||
}, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("webhook: %w", err)
|
||||
}
|
||||
out = append(out, s)
|
||||
}
|
||||
if d := cfg.Sources.DropFolder; d != nil {
|
||||
out = append(out, dropfolder.New(dropfolder.Config{
|
||||
Path: d.Path,
|
||||
PollFallbackSeconds: d.PollFallbackSeconds,
|
||||
}, logger))
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func signalContext() (context.Context, context.CancelFunc) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
<-sigCh
|
||||
cancel()
|
||||
}()
|
||||
return ctx, cancel
|
||||
}
|
||||
|
||||
func resolveConfigPath(flagPath string) string {
|
||||
if flagPath != "" {
|
||||
return flagPath
|
||||
}
|
||||
if env := os.Getenv("AGENT_WATCHER_CONFIG"); env != "" {
|
||||
return env
|
||||
}
|
||||
home, _ := os.UserHomeDir()
|
||||
return filepath.Join(home, ".config", "agent-watcher", "collector.yaml")
|
||||
}
|
||||
|
||||
func newLogger(asJSON bool) *slog.Logger {
|
||||
var h slog.Handler
|
||||
if asJSON {
|
||||
h = slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})
|
||||
} else {
|
||||
h = slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})
|
||||
}
|
||||
return slog.New(h)
|
||||
}
|
||||
299
cmd/agent-watcher/main_test.go
Normal file
299
cmd/agent-watcher/main_test.go
Normal file
|
|
@ -0,0 +1,299 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestMain builds the agent-watcher binary once for the integration tests.
|
||||
func TestMain(m *testing.M) {
|
||||
bin, err := buildBinary()
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "build failed:", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
binaryPath = bin
|
||||
code := m.Run()
|
||||
os.Remove(bin)
|
||||
os.Exit(code)
|
||||
}
|
||||
|
||||
var binaryPath string
|
||||
|
||||
func buildBinary() (string, error) {
|
||||
dir, err := os.MkdirTemp("", "agent-watcher-bin-*")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
out := filepath.Join(dir, "agent-watcher")
|
||||
cmd := exec.Command("go", "build", "-o", out, ".")
|
||||
cmd.Stderr = os.Stderr
|
||||
if err := cmd.Run(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// freePort returns an unused loopback "127.0.0.1:N" port.
|
||||
func freePort(t *testing.T) string {
|
||||
t.Helper()
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
addr := l.Addr().String()
|
||||
l.Close()
|
||||
return addr
|
||||
}
|
||||
|
||||
// startBinary launches agent-watcher with the given config file and returns
|
||||
// the running cmd. The test is responsible for sending SIGINT/SIGTERM.
|
||||
func startBinary(t *testing.T, configPath string) *exec.Cmd {
|
||||
t.Helper()
|
||||
cmd := exec.Command(binaryPath, "--config", configPath)
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
if cmd.Process != nil {
|
||||
cmd.Process.Signal(syscall.SIGTERM)
|
||||
cmd.Wait()
|
||||
}
|
||||
})
|
||||
return cmd
|
||||
}
|
||||
|
||||
// waitFor polls predicate until true or timeout.
|
||||
func waitFor(t *testing.T, name string, pred func() bool, timeout time.Duration) {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(timeout)
|
||||
for time.Now().Before(deadline) {
|
||||
if pred() {
|
||||
return
|
||||
}
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("waitFor %q: predicate did not become true within %s", name, timeout)
|
||||
}
|
||||
|
||||
func waitForListener(t *testing.T, addr string, timeout time.Duration) {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(timeout)
|
||||
for time.Now().Before(deadline) {
|
||||
c, err := net.DialTimeout("tcp", addr, 50*time.Millisecond)
|
||||
if err == nil {
|
||||
c.Close()
|
||||
return
|
||||
}
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("listener at %s never came up", addr)
|
||||
}
|
||||
|
||||
func readInbox(t *testing.T, path string) []map[string]any {
|
||||
t.Helper()
|
||||
body, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
out := []map[string]any{}
|
||||
for _, line := range strings.Split(strings.TrimRight(string(body), "\n"), "\n") {
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
var m map[string]any
|
||||
if err := json.Unmarshal([]byte(line), &m); err != nil {
|
||||
t.Errorf("non-JSON line: %s", line)
|
||||
continue
|
||||
}
|
||||
out = append(out, m)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// TestEndToEnd_BothSourcesEmitToInbox builds and runs the real binary,
|
||||
// exercises both webhook and drop-folder sources, and verifies the inbox
|
||||
// file contains one bit-identical JSONL line per emitted event.
|
||||
func TestEndToEnd_BothSourcesEmitToInbox(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
inboxDir := filepath.Join(tmp, "pings")
|
||||
dropDir := filepath.Join(tmp, "incoming")
|
||||
configPath := filepath.Join(tmp, "collector.yaml")
|
||||
addr := freePort(t)
|
||||
|
||||
cfg := fmt.Sprintf(`
|
||||
agent: foreman
|
||||
inbox_dir: %s
|
||||
sources:
|
||||
webhook:
|
||||
listen: %s
|
||||
routes:
|
||||
/forgejo/push:
|
||||
recipient: bob
|
||||
type: INFO
|
||||
payload_template: "push {{ .repo }} by {{ .actor }}"
|
||||
/alert:
|
||||
recipient: bob
|
||||
type: NEEDS-RESPONSE
|
||||
priority: urgent
|
||||
payload_template: "fixed alert"
|
||||
drop_folder:
|
||||
path: %s
|
||||
poll_fallback_seconds: 0
|
||||
`, inboxDir, addr, dropDir)
|
||||
|
||||
if err := os.WriteFile(configPath, []byte(cfg), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
startBinary(t, configPath)
|
||||
waitForListener(t, addr, 3*time.Second)
|
||||
|
||||
// 1) drop a file
|
||||
dropContent := `{"recipient":"bob","type":"INFO","payload":"from drop"}`
|
||||
dropTmp := filepath.Join(dropDir, "drop1.json.tmp")
|
||||
if err := os.WriteFile(dropTmp, []byte(dropContent), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.Rename(dropTmp, filepath.Join(dropDir, "drop1.json")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// 2) post a webhook
|
||||
resp, err := http.Post("http://"+addr+"/forgejo/push", "application/json",
|
||||
strings.NewReader(`{"repo":"agent-ping","actor":"angus"}`))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusAccepted {
|
||||
t.Fatalf("webhook status = %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
// 3) post a urgent alert
|
||||
resp2, err := http.Post("http://"+addr+"/alert", "application/json", strings.NewReader(""))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp2.Body.Close()
|
||||
|
||||
// Wait for 3 lines in bob's inbox.
|
||||
bobInbox := filepath.Join(inboxDir, "bob.inbox")
|
||||
waitFor(t, "3 inbox lines", func() bool {
|
||||
return len(readInbox(t, bobInbox)) == 3
|
||||
}, 3*time.Second)
|
||||
|
||||
got := readInbox(t, bobInbox)
|
||||
payloads := map[string]map[string]any{}
|
||||
for _, m := range got {
|
||||
payloads[m["payload"].(string)] = m
|
||||
}
|
||||
|
||||
// drop event
|
||||
drop := payloads["from drop"]
|
||||
if drop == nil {
|
||||
t.Fatalf("missing drop event; got: %+v", got)
|
||||
}
|
||||
if drop["type"] != "INFO" || drop["from"] != "collector" {
|
||||
t.Errorf("drop event shape: %+v", drop)
|
||||
}
|
||||
if !strings.HasPrefix(drop["source"].(string), "drop-folder:drop1.json") {
|
||||
t.Errorf("drop source: %v", drop["source"])
|
||||
}
|
||||
|
||||
// webhook push event
|
||||
push := payloads["push agent-ping by angus"]
|
||||
if push == nil {
|
||||
t.Fatalf("missing push event; got: %+v", got)
|
||||
}
|
||||
if push["type"] != "INFO" || push["priority"] != "normal" {
|
||||
t.Errorf("push event shape: %+v", push)
|
||||
}
|
||||
if push["source"] != "webhook:/forgejo/push" {
|
||||
t.Errorf("push source: %v", push["source"])
|
||||
}
|
||||
|
||||
// alert event with priority urgent
|
||||
alert := payloads["fixed alert"]
|
||||
if alert == nil {
|
||||
t.Fatalf("missing alert event; got: %+v", got)
|
||||
}
|
||||
if alert["priority"] != "urgent" || alert["type"] != "NEEDS-RESPONSE" {
|
||||
t.Errorf("alert event shape: %+v", alert)
|
||||
}
|
||||
|
||||
// Health endpoint is wired and returns sane counters.
|
||||
resp3, err := http.Get("http://" + addr + "/health")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp3.Body.Close()
|
||||
var stats map[string]any
|
||||
if err := json.NewDecoder(resp3.Body).Decode(&stats); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if int(stats["emitted"].(float64)) != 2 {
|
||||
t.Errorf("health emitted = %v, want 2", stats["emitted"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestEndToEnd_GracefulShutdown ensures SIGTERM stops the binary cleanly.
|
||||
func TestEndToEnd_GracefulShutdown(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
configPath := filepath.Join(tmp, "collector.yaml")
|
||||
addr := freePort(t)
|
||||
cfg := fmt.Sprintf(`
|
||||
agent: foreman
|
||||
inbox_dir: %s/pings
|
||||
sources:
|
||||
webhook:
|
||||
listen: %s
|
||||
routes:
|
||||
/x:
|
||||
recipient: r
|
||||
type: INFO
|
||||
payload_template: ok
|
||||
`, tmp, addr)
|
||||
if err := os.WriteFile(configPath, []byte(cfg), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cmd := exec.Command(binaryPath, "--config", configPath)
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
if err := cmd.Start(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
waitForListener(t, addr, 3*time.Second)
|
||||
|
||||
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
done := make(chan error, 1)
|
||||
go func() { done <- cmd.Wait() }()
|
||||
select {
|
||||
case err := <-done:
|
||||
if err != nil {
|
||||
// Some Go runtimes return an exitstatus error after signal-driven
|
||||
// shutdown; that's OK as long as we exited.
|
||||
if _, ok := err.(*exec.ExitError); !ok {
|
||||
t.Errorf("unexpected exit error: %v", err)
|
||||
}
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
cmd.Process.Kill()
|
||||
t.Fatal("did not shut down within 3s of SIGTERM")
|
||||
}
|
||||
}
|
||||
55
examples/collector.yaml
Normal file
55
examples/collector.yaml
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
# agent-watcher Collector configuration
|
||||
#
|
||||
# Lives at: ~/.config/agent-watcher/collector.yaml
|
||||
# Override with --config or AGENT_WATCHER_CONFIG.
|
||||
#
|
||||
# At least one source (webhook OR drop_folder) must be configured.
|
||||
|
||||
# This host's identity. Used in logs only; the inbox writer routes by
|
||||
# the recipient field on each event, not this.
|
||||
agent: foreman
|
||||
|
||||
# Optional. Where to write <recipient>.inbox files. Default shown.
|
||||
# inbox_dir: ~/Nyx/workspace/pings
|
||||
|
||||
sources:
|
||||
|
||||
# HTTP webhook source.
|
||||
# v1 binds loopback only — Caddy + bearer-token reverse-proxy is the
|
||||
# v2 upgrade path for accepting webhooks from external producers.
|
||||
webhook:
|
||||
listen: 127.0.0.1:18790
|
||||
routes:
|
||||
# Path → which inbox to land in, with a Go text/template payload.
|
||||
# Variables come from the request body decoded as JSON.
|
||||
/forgejo/push:
|
||||
recipient: bob
|
||||
type: INFO
|
||||
payload_template: "forgejo push to {{ .repo }} by {{ .actor }}"
|
||||
|
||||
# Empty-body posts work too — fixed-string templates render without
|
||||
# any data.
|
||||
/openrouter/billing-alert:
|
||||
recipient: bob
|
||||
type: NEEDS-RESPONSE
|
||||
priority: urgent
|
||||
payload_template: "billing alert: {{ .message }}"
|
||||
|
||||
# Drop-folder source.
|
||||
# Watches a directory via inotify for *.json files matching the spec
|
||||
# §3.1.2 schema:
|
||||
# {
|
||||
# "recipient": "bob",
|
||||
# "type": "INFO" | "NEEDS-RESPONSE" | "ACK-REQUEST",
|
||||
# "priority": "normal" | "urgent", # optional
|
||||
# "payload": "...",
|
||||
# "sentinel": "/path/optional" # optional
|
||||
# }
|
||||
# Valid → emit + delete. Invalid → moved to .dead-letter/ with reason.
|
||||
drop_folder:
|
||||
path: ~/Nyx/workspace/incoming/
|
||||
# Safety net if inotify misses an event. 0 (or unset) = use the
|
||||
# 30-second default. There is no "disable polling" option in v1 —
|
||||
# if you don't want polling, leave inotify to do its job and ignore
|
||||
# the cost (a periodic readdir is microseconds).
|
||||
poll_fallback_seconds: 30
|
||||
10
go.mod
Normal file
10
go.mod
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
module git.botbought.ai/foreman/agent-watcher
|
||||
|
||||
go 1.22
|
||||
|
||||
require github.com/fsnotify/fsnotify v1.7.0
|
||||
|
||||
require (
|
||||
golang.org/x/sys v0.13.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
7
go.sum
Normal file
7
go.sum
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
|
||||
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
|
||||
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
|
||||
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
114
install.sh
Executable file
114
install.sh
Executable file
|
|
@ -0,0 +1,114 @@
|
|||
#!/usr/bin/env bash
|
||||
# install.sh — set up the agent-watcher Collector on this machine.
|
||||
#
|
||||
# Usage:
|
||||
# ./install.sh # build + install + enable + start
|
||||
# ./install.sh --no-start # everything but `systemctl start`
|
||||
#
|
||||
# What it does:
|
||||
# 1. Builds the agent-watcher binary (requires Go 1.22+).
|
||||
# 2. Installs to ~/.local/bin/agent-watcher.
|
||||
# 3. Drops the systemd --user unit into ~/.config/systemd/user/.
|
||||
# 4. Drops a starter ~/.config/agent-watcher/collector.yaml if missing
|
||||
# (copies examples/collector.yaml; you edit it before first start).
|
||||
# 5. Reloads systemd, enables, and (unless --no-start) starts the unit.
|
||||
# 6. Adds drop-folder lifecycle patterns to the workspace .stignore so
|
||||
# drop files do not Syncthing-replicate during local processing.
|
||||
#
|
||||
# Per CLAUDE.md rule #2, this is intended to be run by a human. The
|
||||
# Collector itself, like agent-ping, never invokes this script.
|
||||
#
|
||||
# Layer 2 (MCP Watcher) has its own install path — different language,
|
||||
# different runtime, different unit. See its README when it lands.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
NO_START=0
|
||||
for arg in "$@"; do
|
||||
case "$arg" in
|
||||
--no-start) NO_START=1 ;;
|
||||
*) echo "unknown arg: $arg" >&2; exit 2 ;;
|
||||
esac
|
||||
done
|
||||
|
||||
REPO_DIR="$(cd "$(dirname "$0")" && pwd)"
|
||||
BIN_DIR="$HOME/.local/bin"
|
||||
CONF_DIR="$HOME/.config/agent-watcher"
|
||||
UNIT_DIR="$HOME/.config/systemd/user"
|
||||
WORKSPACE="$HOME/Nyx/workspace"
|
||||
STIGNORE="$WORKSPACE/.stignore"
|
||||
|
||||
echo "agent-watcher install"
|
||||
echo "repo: $REPO_DIR"
|
||||
echo
|
||||
|
||||
# 1. Check Go
|
||||
echo "[1/6] checking Go toolchain"
|
||||
if ! command -v go >/dev/null 2>&1; then
|
||||
echo " ERROR: 'go' not found. Install Go 1.22+ first." >&2
|
||||
echo " e.g. sudo apt install golang-go" >&2
|
||||
exit 1
|
||||
fi
|
||||
GO_VERSION=$(go version | awk '{print $3}')
|
||||
echo " $GO_VERSION"
|
||||
|
||||
# 2. Build
|
||||
echo "[2/6] building agent-watcher binary"
|
||||
mkdir -p "$BIN_DIR"
|
||||
( cd "$REPO_DIR" && go build -o "$BIN_DIR/agent-watcher" ./cmd/agent-watcher )
|
||||
echo " installed: $BIN_DIR/agent-watcher"
|
||||
|
||||
# 3. systemd unit
|
||||
echo "[3/6] installing systemd --user unit"
|
||||
mkdir -p "$UNIT_DIR"
|
||||
install -m 0644 "$REPO_DIR/systemd/agent-watcher.service" "$UNIT_DIR/agent-watcher.service"
|
||||
echo " installed: $UNIT_DIR/agent-watcher.service"
|
||||
|
||||
# 4. Config skeleton
|
||||
echo "[4/6] config skeleton"
|
||||
mkdir -p "$CONF_DIR"
|
||||
if [ -f "$CONF_DIR/collector.yaml" ]; then
|
||||
echo " $CONF_DIR/collector.yaml exists — leaving as-is"
|
||||
else
|
||||
install -m 0644 "$REPO_DIR/examples/collector.yaml" "$CONF_DIR/collector.yaml"
|
||||
echo " installed example: $CONF_DIR/collector.yaml — EDIT BEFORE START"
|
||||
fi
|
||||
|
||||
# 5. .stignore — drop folder is replicated, but lifecycle artifacts shouldn't be
|
||||
echo "[5/6] .stignore (drop-folder lifecycle artifacts are local-only)"
|
||||
if [ -d "$WORKSPACE" ]; then
|
||||
touch "$STIGNORE"
|
||||
for pattern in "incoming/.dead-letter/" "incoming/*.json.tmp"; do
|
||||
if ! grep -qxF "$pattern" "$STIGNORE"; then
|
||||
echo "$pattern" >> "$STIGNORE"
|
||||
echo " added: $pattern"
|
||||
fi
|
||||
done
|
||||
else
|
||||
echo " $WORKSPACE not present — skipping (Syncthing not set up here)"
|
||||
fi
|
||||
|
||||
# 6. systemd reload + enable + (optionally) start
|
||||
echo "[6/6] systemd reload + enable"
|
||||
systemctl --user daemon-reload
|
||||
systemctl --user enable agent-watcher.service >/dev/null
|
||||
echo " enabled at user login (loginctl enable-linger required to run when logged out)"
|
||||
|
||||
if [ "$NO_START" -eq 0 ]; then
|
||||
echo " starting…"
|
||||
systemctl --user restart agent-watcher.service
|
||||
sleep 1
|
||||
systemctl --user status agent-watcher.service --no-pager -n 5 || true
|
||||
else
|
||||
echo " --no-start: not starting; run 'systemctl --user start agent-watcher' when ready"
|
||||
fi
|
||||
|
||||
echo
|
||||
echo "installation complete."
|
||||
echo
|
||||
echo "next steps:"
|
||||
echo " 1. EDIT $CONF_DIR/collector.yaml to suit this host."
|
||||
echo " 2. systemctl --user restart agent-watcher (after edits)."
|
||||
echo " 3. journalctl --user -u agent-watcher -f (watch logs)."
|
||||
echo " 4. curl http://127.0.0.1:18790/health (sanity check)."
|
||||
echo " 5. To survive logout: loginctl enable-linger \$USER"
|
||||
179
internal/config/config.go
Normal file
179
internal/config/config.go
Normal file
|
|
@ -0,0 +1,179 @@
|
|||
// Package config loads the Collector's per-host YAML configuration.
|
||||
//
|
||||
// Config layout (spec §3.2):
|
||||
//
|
||||
// agent: foreman # this host's identity
|
||||
// inbox_dir: ~/Nyx/workspace/pings # optional; default shown
|
||||
// sources:
|
||||
// webhook:
|
||||
// listen: 127.0.0.1:18790
|
||||
// routes:
|
||||
// /forgejo/push:
|
||||
// recipient: bob
|
||||
// type: INFO
|
||||
// priority: normal # optional
|
||||
// payload_template: "..."
|
||||
// drop_folder:
|
||||
// path: ~/Nyx/workspace/incoming/
|
||||
// poll_fallback_seconds: 30
|
||||
//
|
||||
// Either source may be omitted; the Collector starts with whatever is
|
||||
// configured. Validation is strict — an unknown top-level field or a
|
||||
// malformed route returns an error.
|
||||
package config
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
|
||||
"git.botbought.ai/foreman/agent-watcher/internal/source/webhook"
|
||||
)
|
||||
|
||||
// Config is the loaded, validated configuration.
|
||||
type Config struct {
|
||||
Agent string `yaml:"agent"`
|
||||
InboxDir string `yaml:"inbox_dir,omitempty"`
|
||||
Sources Sources `yaml:"sources"`
|
||||
}
|
||||
|
||||
// Sources groups per-source-type configuration.
|
||||
type Sources struct {
|
||||
Webhook *WebhookConfig `yaml:"webhook,omitempty"`
|
||||
DropFolder *DropFolderConfig `yaml:"drop_folder,omitempty"`
|
||||
}
|
||||
|
||||
// WebhookConfig configures the HTTP webhook source.
|
||||
type WebhookConfig struct {
|
||||
Listen string `yaml:"listen"`
|
||||
Routes map[string]WebhookRoute `yaml:"routes"`
|
||||
}
|
||||
|
||||
// WebhookRoute is one entry in the webhook routing table.
|
||||
type WebhookRoute struct {
|
||||
Recipient string `yaml:"recipient"`
|
||||
Type string `yaml:"type"`
|
||||
Priority string `yaml:"priority,omitempty"`
|
||||
PayloadTemplate string `yaml:"payload_template"`
|
||||
}
|
||||
|
||||
// DropFolderConfig configures the drop-folder source.
|
||||
type DropFolderConfig struct {
|
||||
Path string `yaml:"path"`
|
||||
PollFallbackSeconds int `yaml:"poll_fallback_seconds,omitempty"`
|
||||
}
|
||||
|
||||
// Default values applied when fields are unset.
|
||||
const (
|
||||
DefaultInboxDir = "~/Nyx/workspace/pings"
|
||||
DefaultDropPath = "~/Nyx/workspace/incoming"
|
||||
DefaultWebhookListen = "127.0.0.1:18790"
|
||||
DefaultPollFallbackSeconds = 30
|
||||
)
|
||||
|
||||
// Load parses the YAML file at path, applies defaults, and validates.
|
||||
func Load(path string) (*Config, error) {
|
||||
body, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("config: read %s: %w", path, err)
|
||||
}
|
||||
return parse(body)
|
||||
}
|
||||
|
||||
func parse(body []byte) (*Config, error) {
|
||||
var c Config
|
||||
dec := yaml.NewDecoder(strings.NewReader(string(body)))
|
||||
dec.KnownFields(true)
|
||||
if err := dec.Decode(&c); err != nil {
|
||||
return nil, fmt.Errorf("config: parse: %w", err)
|
||||
}
|
||||
if err := c.applyDefaultsAndValidate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
func (c *Config) applyDefaultsAndValidate() error {
|
||||
if c.Agent == "" {
|
||||
return errors.New("config: agent is required")
|
||||
}
|
||||
if c.InboxDir == "" {
|
||||
c.InboxDir = DefaultInboxDir
|
||||
}
|
||||
c.InboxDir = expand(c.InboxDir)
|
||||
|
||||
if c.Sources.Webhook == nil && c.Sources.DropFolder == nil {
|
||||
return errors.New("config: at least one source (webhook or drop_folder) must be configured")
|
||||
}
|
||||
|
||||
if w := c.Sources.Webhook; w != nil {
|
||||
if w.Listen == "" {
|
||||
w.Listen = DefaultWebhookListen
|
||||
}
|
||||
if len(w.Routes) == 0 {
|
||||
return errors.New("config: webhook configured but no routes defined")
|
||||
}
|
||||
for path, r := range w.Routes {
|
||||
if !strings.HasPrefix(path, "/") {
|
||||
return fmt.Errorf("config: webhook route %q must start with /", path)
|
||||
}
|
||||
if r.Recipient == "" {
|
||||
return fmt.Errorf("config: webhook route %s: recipient required", path)
|
||||
}
|
||||
if r.Type == "" {
|
||||
return fmt.Errorf("config: webhook route %s: type required", path)
|
||||
}
|
||||
if r.PayloadTemplate == "" {
|
||||
return fmt.Errorf("config: webhook route %s: payload_template required", path)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if d := c.Sources.DropFolder; d != nil {
|
||||
if d.Path == "" {
|
||||
d.Path = DefaultDropPath
|
||||
}
|
||||
d.Path = expand(d.Path)
|
||||
if d.PollFallbackSeconds == 0 {
|
||||
d.PollFallbackSeconds = DefaultPollFallbackSeconds
|
||||
}
|
||||
if d.PollFallbackSeconds < 0 {
|
||||
return errors.New("config: drop_folder.poll_fallback_seconds must be >= 0")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// expand expands a leading ~/ to $HOME.
|
||||
func expand(p string) string {
|
||||
if !strings.HasPrefix(p, "~/") && p != "~" {
|
||||
return p
|
||||
}
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil || home == "" {
|
||||
return p
|
||||
}
|
||||
if p == "~" {
|
||||
return home
|
||||
}
|
||||
return filepath.Join(home, p[2:])
|
||||
}
|
||||
|
||||
// ToWebhookRouteMap converts validated routes to the source/webhook.Route
|
||||
// shape so main.go can hand them straight to webhook.New.
|
||||
func (w *WebhookConfig) ToWebhookRouteMap() map[string]webhook.Route {
|
||||
out := make(map[string]webhook.Route, len(w.Routes))
|
||||
for path, r := range w.Routes {
|
||||
out[path] = webhook.Route{
|
||||
Recipient: r.Recipient,
|
||||
Type: r.Type,
|
||||
Priority: r.Priority,
|
||||
PayloadTemplate: r.PayloadTemplate,
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
168
internal/config/config_test.go
Normal file
168
internal/config/config_test.go
Normal file
|
|
@ -0,0 +1,168 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParse_FullValid(t *testing.T) {
|
||||
body := []byte(`
|
||||
agent: foreman
|
||||
inbox_dir: /tmp/inbox
|
||||
sources:
|
||||
webhook:
|
||||
listen: 127.0.0.1:18790
|
||||
routes:
|
||||
/forgejo/push:
|
||||
recipient: bob
|
||||
type: INFO
|
||||
payload_template: "push to {{ .repo }}"
|
||||
/alert:
|
||||
recipient: bob
|
||||
type: NEEDS-RESPONSE
|
||||
priority: urgent
|
||||
payload_template: "alert"
|
||||
drop_folder:
|
||||
path: /tmp/incoming
|
||||
poll_fallback_seconds: 60
|
||||
`)
|
||||
c, err := parse(body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if c.Agent != "foreman" {
|
||||
t.Errorf("agent = %q", c.Agent)
|
||||
}
|
||||
if c.InboxDir != "/tmp/inbox" {
|
||||
t.Errorf("inbox = %q", c.InboxDir)
|
||||
}
|
||||
if c.Sources.Webhook == nil || len(c.Sources.Webhook.Routes) != 2 {
|
||||
t.Errorf("webhook routes: %+v", c.Sources.Webhook)
|
||||
}
|
||||
if c.Sources.DropFolder == nil || c.Sources.DropFolder.Path != "/tmp/incoming" {
|
||||
t.Errorf("dropfolder: %+v", c.Sources.DropFolder)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParse_AppliesDefaults(t *testing.T) {
|
||||
body := []byte(`
|
||||
agent: foreman
|
||||
sources:
|
||||
drop_folder:
|
||||
path: /tmp/x
|
||||
`)
|
||||
c, err := parse(body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if c.InboxDir == "" {
|
||||
t.Error("inbox_dir default not applied")
|
||||
}
|
||||
if c.Sources.DropFolder.PollFallbackSeconds != DefaultPollFallbackSeconds {
|
||||
t.Errorf("poll default = %d", c.Sources.DropFolder.PollFallbackSeconds)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParse_HomeExpansion(t *testing.T) {
|
||||
body := []byte(`
|
||||
agent: foreman
|
||||
sources:
|
||||
drop_folder:
|
||||
path: ~/foo/bar
|
||||
`)
|
||||
c, err := parse(body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if strings.HasPrefix(c.Sources.DropFolder.Path, "~") {
|
||||
t.Errorf("~ not expanded: %q", c.Sources.DropFolder.Path)
|
||||
}
|
||||
if !strings.HasSuffix(c.Sources.DropFolder.Path, "/foo/bar") {
|
||||
t.Errorf("expansion lost suffix: %q", c.Sources.DropFolder.Path)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParse_RejectsInvalid(t *testing.T) {
|
||||
cases := map[string]string{
|
||||
"missing agent": `sources:\n webhook:\n routes: {}`,
|
||||
"no sources": `agent: x`,
|
||||
"empty webhook routes": `
|
||||
agent: x
|
||||
sources:
|
||||
webhook:
|
||||
listen: 127.0.0.1:18790
|
||||
routes: {}
|
||||
`,
|
||||
"route missing recipient": `
|
||||
agent: x
|
||||
sources:
|
||||
webhook:
|
||||
routes:
|
||||
/x:
|
||||
type: INFO
|
||||
payload_template: y
|
||||
`,
|
||||
"route missing type": `
|
||||
agent: x
|
||||
sources:
|
||||
webhook:
|
||||
routes:
|
||||
/x:
|
||||
recipient: r
|
||||
payload_template: y
|
||||
`,
|
||||
"route missing template": `
|
||||
agent: x
|
||||
sources:
|
||||
webhook:
|
||||
routes:
|
||||
/x:
|
||||
recipient: r
|
||||
type: INFO
|
||||
`,
|
||||
"route path no slash": `
|
||||
agent: x
|
||||
sources:
|
||||
webhook:
|
||||
routes:
|
||||
bad:
|
||||
recipient: r
|
||||
type: INFO
|
||||
payload_template: y
|
||||
`,
|
||||
"unknown top-level field": `
|
||||
agent: x
|
||||
typo: oops
|
||||
sources:
|
||||
drop_folder:
|
||||
path: /x
|
||||
`,
|
||||
"poll negative": `
|
||||
agent: x
|
||||
sources:
|
||||
drop_folder:
|
||||
path: /x
|
||||
poll_fallback_seconds: -1
|
||||
`,
|
||||
}
|
||||
for name, body := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
if _, err := parse([]byte(body)); err == nil {
|
||||
t.Errorf("expected error for: %s", name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestToWebhookRouteMap(t *testing.T) {
|
||||
w := &WebhookConfig{
|
||||
Routes: map[string]WebhookRoute{
|
||||
"/x": {Recipient: "r", Type: "INFO", Priority: "urgent", PayloadTemplate: "p"},
|
||||
},
|
||||
}
|
||||
got := w.ToWebhookRouteMap()
|
||||
r := got["/x"]
|
||||
if r.Recipient != "r" || r.Type != "INFO" || r.Priority != "urgent" || r.PayloadTemplate != "p" {
|
||||
t.Errorf("conversion lost data: %+v", r)
|
||||
}
|
||||
}
|
||||
129
internal/inbox/inbox.go
Normal file
129
internal/inbox/inbox.go
Normal file
|
|
@ -0,0 +1,129 @@
|
|||
// Package inbox writes ping-shaped JSONL events to recipient inbox files.
|
||||
//
|
||||
// The output format is bit-identical to the agent-ping CLI's output, so the
|
||||
// existing UserPromptSubmit hook and the future MCP Watcher cannot tell whether
|
||||
// a line was written by `ping` or by the Collector.
|
||||
//
|
||||
// All writes are append-only with O_APPEND so concurrent writers don't tear
|
||||
// each other's lines (POSIX guarantees atomicity for writes <= PIPE_BUF, and
|
||||
// our lines are well under that).
|
||||
package inbox
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Event is the JSONL line shape. Field order and JSON keys match the ping CLI
|
||||
// exactly. Optional fields use omitempty.
|
||||
type Event struct {
|
||||
TS string `json:"ts"`
|
||||
ID string `json:"id"`
|
||||
From string `json:"from"`
|
||||
To string `json:"to"`
|
||||
Type string `json:"type"`
|
||||
Priority string `json:"priority"`
|
||||
Payload string `json:"payload"`
|
||||
Sentinel string `json:"sentinel,omitempty"`
|
||||
Source string `json:"source,omitempty"` // collector-only debug field, e.g. "webhook:/forgejo/push"
|
||||
}
|
||||
|
||||
// Writer appends events to inbox files under a base directory.
|
||||
//
|
||||
// Concurrency: a single Writer is safe for use from multiple goroutines.
|
||||
// One sync.Mutex per recipient bounds contention to per-file granularity.
|
||||
//
|
||||
// The locks map grows monotonically as new recipients appear. In practice
|
||||
// the agent network has <10 recipients, so the leak is bounded; if this
|
||||
// daemon ever lives in a system with thousands of recipients, replace the
|
||||
// map with an LRU or shard-and-evict scheme.
|
||||
type Writer struct {
|
||||
dir string
|
||||
|
||||
mu sync.Mutex
|
||||
locks map[string]*sync.Mutex
|
||||
}
|
||||
|
||||
// NewWriter returns a Writer that appends to {dir}/{recipient}.inbox.
|
||||
// The directory is created (with perms 0755) if it does not exist.
|
||||
func NewWriter(dir string) (*Writer, error) {
|
||||
if dir == "" {
|
||||
return nil, fmt.Errorf("inbox: empty dir")
|
||||
}
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("inbox: mkdir %s: %w", dir, err)
|
||||
}
|
||||
return &Writer{
|
||||
dir: dir,
|
||||
locks: make(map[string]*sync.Mutex),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Write appends one event to the recipient's inbox file. It mutates ev to set
|
||||
// TS and ID if either is empty, then sets To from recipient.
|
||||
func (w *Writer) Write(recipient string, ev *Event) error {
|
||||
if recipient == "" {
|
||||
return fmt.Errorf("inbox: empty recipient")
|
||||
}
|
||||
if ev == nil {
|
||||
return fmt.Errorf("inbox: nil event")
|
||||
}
|
||||
if ev.TS == "" {
|
||||
ev.TS = time.Now().UTC().Format("2006-01-02T15:04:05Z")
|
||||
}
|
||||
if ev.ID == "" {
|
||||
id, err := newID()
|
||||
if err != nil {
|
||||
return fmt.Errorf("inbox: generate id: %w", err)
|
||||
}
|
||||
ev.ID = id
|
||||
}
|
||||
ev.To = recipient
|
||||
|
||||
line, err := json.Marshal(ev)
|
||||
if err != nil {
|
||||
return fmt.Errorf("inbox: marshal: %w", err)
|
||||
}
|
||||
line = append(line, '\n')
|
||||
|
||||
lock := w.lockFor(recipient)
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
path := filepath.Join(w.dir, recipient+".inbox")
|
||||
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("inbox: open %s: %w", path, err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
if _, err := f.Write(line); err != nil {
|
||||
return fmt.Errorf("inbox: append: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) lockFor(recipient string) *sync.Mutex {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
if l, ok := w.locks[recipient]; ok {
|
||||
return l
|
||||
}
|
||||
l := &sync.Mutex{}
|
||||
w.locks[recipient] = l
|
||||
return l
|
||||
}
|
||||
|
||||
func newID() (string, error) {
|
||||
b := make([]byte, 4)
|
||||
if _, err := rand.Read(b); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return "ping-" + hex.EncodeToString(b), nil
|
||||
}
|
||||
182
internal/inbox/inbox_test.go
Normal file
182
internal/inbox/inbox_test.go
Normal file
|
|
@ -0,0 +1,182 @@
|
|||
package inbox
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestWriter_Write_BasicShape(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
w, err := NewWriter(dir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := w.Write("bob", &Event{
|
||||
From: "foreman",
|
||||
Type: "INFO",
|
||||
Priority: "normal",
|
||||
Payload: "hi",
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
got := readLines(t, filepath.Join(dir, "bob.inbox"))
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("want 1 line, got %d", len(got))
|
||||
}
|
||||
|
||||
var ev Event
|
||||
if err := json.Unmarshal([]byte(got[0]), &ev); err != nil {
|
||||
t.Fatalf("not JSON: %v", err)
|
||||
}
|
||||
if ev.To != "bob" {
|
||||
t.Errorf("To = %q, want bob", ev.To)
|
||||
}
|
||||
if ev.From != "foreman" {
|
||||
t.Errorf("From = %q, want foreman", ev.From)
|
||||
}
|
||||
if ev.Payload != "hi" {
|
||||
t.Errorf("Payload = %q", ev.Payload)
|
||||
}
|
||||
if !strings.HasPrefix(ev.ID, "ping-") {
|
||||
t.Errorf("ID %q missing ping- prefix", ev.ID)
|
||||
}
|
||||
if len(ev.ID) != len("ping-")+8 {
|
||||
t.Errorf("ID %q wrong length", ev.ID)
|
||||
}
|
||||
if ev.TS == "" {
|
||||
t.Errorf("TS unset")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriter_Write_PreservesProvidedIDAndTS(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
w, err := NewWriter(dir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
want := &Event{
|
||||
TS: "2026-05-06T12:00:00Z",
|
||||
ID: "ping-deadbeef",
|
||||
From: "x",
|
||||
Type: "INFO",
|
||||
Priority: "normal",
|
||||
Payload: "p",
|
||||
}
|
||||
if err := w.Write("r", want); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
got := readLines(t, filepath.Join(dir, "r.inbox"))
|
||||
var ev Event
|
||||
json.Unmarshal([]byte(got[0]), &ev)
|
||||
if ev.ID != want.ID {
|
||||
t.Errorf("ID overwritten: got %q want %q", ev.ID, want.ID)
|
||||
}
|
||||
if ev.TS != want.TS {
|
||||
t.Errorf("TS overwritten: got %q want %q", ev.TS, want.TS)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriter_Write_OmitsEmptyOptionalFields(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
w, _ := NewWriter(dir)
|
||||
w.Write("r", &Event{From: "f", Type: "INFO", Priority: "normal", Payload: "p"})
|
||||
|
||||
line := readLines(t, filepath.Join(dir, "r.inbox"))[0]
|
||||
if strings.Contains(line, "sentinel") {
|
||||
t.Errorf("empty sentinel leaked into JSON: %s", line)
|
||||
}
|
||||
if strings.Contains(line, "source") {
|
||||
t.Errorf("empty source leaked into JSON: %s", line)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriter_Write_PingShapeMatchesCLI(t *testing.T) {
|
||||
// The ping CLI emits JSON with separators=(",", ":") (compact) and these
|
||||
// keys in this order: ts, id, from, to, type, priority, payload [, sentinel].
|
||||
// Verify our marshaled output preserves the key set and compactness.
|
||||
dir := t.TempDir()
|
||||
w, _ := NewWriter(dir)
|
||||
w.Write("r", &Event{
|
||||
From: "f", Type: "INFO", Priority: "normal", Payload: "p", Sentinel: "/x",
|
||||
})
|
||||
line := strings.TrimSpace(readLines(t, filepath.Join(dir, "r.inbox"))[0])
|
||||
|
||||
wantKeys := []string{`"ts":`, `"id":`, `"from":`, `"to":`, `"type":`, `"priority":`, `"payload":`, `"sentinel":`}
|
||||
for _, k := range wantKeys {
|
||||
if !strings.Contains(line, k) {
|
||||
t.Errorf("missing key %s in: %s", k, line)
|
||||
}
|
||||
}
|
||||
// compact (no spaces around colons or commas)
|
||||
if strings.Contains(line, ": ") || strings.Contains(line, ", ") {
|
||||
t.Errorf("non-compact JSON: %s", line)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriter_Write_ConcurrentSameRecipient(t *testing.T) {
|
||||
// 100 goroutines writing to the same inbox should produce 100 valid JSON
|
||||
// lines, none torn or interleaved.
|
||||
dir := t.TempDir()
|
||||
w, _ := NewWriter(dir)
|
||||
|
||||
const n = 100
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < n; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
w.Write("r", &Event{From: "f", Type: "INFO", Priority: "normal", Payload: "x"})
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
lines := readLines(t, filepath.Join(dir, "r.inbox"))
|
||||
if len(lines) != n {
|
||||
t.Fatalf("got %d lines, want %d (torn writes?)", len(lines), n)
|
||||
}
|
||||
for i, l := range lines {
|
||||
var ev Event
|
||||
if err := json.Unmarshal([]byte(l), &ev); err != nil {
|
||||
t.Errorf("line %d not JSON: %v: %s", i, err, l)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriter_Write_RejectsBadInputs(t *testing.T) {
|
||||
w, _ := NewWriter(t.TempDir())
|
||||
if err := w.Write("", &Event{}); err == nil {
|
||||
t.Error("empty recipient: expected error")
|
||||
}
|
||||
if err := w.Write("r", nil); err == nil {
|
||||
t.Error("nil event: expected error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewWriter_RejectsEmptyDir(t *testing.T) {
|
||||
if _, err := NewWriter(""); err == nil {
|
||||
t.Error("expected error for empty dir")
|
||||
}
|
||||
}
|
||||
|
||||
func readLines(t *testing.T, path string) []string {
|
||||
t.Helper()
|
||||
b, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
out := []string{}
|
||||
for _, l := range strings.Split(strings.TrimRight(string(b), "\n"), "\n") {
|
||||
if l != "" {
|
||||
out = append(out, l)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
259
internal/source/dropfolder/dropfolder.go
Normal file
259
internal/source/dropfolder/dropfolder.go
Normal file
|
|
@ -0,0 +1,259 @@
|
|||
// Package dropfolder implements the drop-folder Source.
|
||||
//
|
||||
// Producers drop *.json files into a designated directory. The Source ingests
|
||||
// each file, validates it against a schema, emits an event, and deletes the
|
||||
// file. Malformed files are moved to a .dead-letter/ subdirectory for
|
||||
// inspection.
|
||||
//
|
||||
// Lifecycle: on startup, scan the folder and ingest any pre-existing files
|
||||
// (catches up after a Collector restart). Then watch via inotify for new
|
||||
// arrivals. A periodic poll (default 30s) is a safety net in case inotify
|
||||
// misses a Syncthing-driven rename — Syncthing usually fires IN_CLOSE_WRITE
|
||||
// on rename, but the spec calls for a fallback.
|
||||
//
|
||||
// Drop file schema (per spec §3.1.2):
|
||||
//
|
||||
// {
|
||||
// "recipient": "bob",
|
||||
// "type": "INFO" | "NEEDS-RESPONSE" | "ACK-REQUEST",
|
||||
// "priority": "normal" | "urgent", // optional, default "normal"
|
||||
// "payload": "...",
|
||||
// "sentinel": "/path/optional" // optional
|
||||
// }
|
||||
package dropfolder
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
|
||||
"git.botbought.ai/foreman/agent-watcher/internal/inbox"
|
||||
"git.botbought.ai/foreman/agent-watcher/internal/source"
|
||||
)
|
||||
|
||||
// Validation rules duplicated from agent-ping's CLI for tightness.
|
||||
var validTypes = map[string]bool{
|
||||
"INFO": true,
|
||||
"NEEDS-RESPONSE": true,
|
||||
"ACK-REQUEST": true,
|
||||
}
|
||||
|
||||
var validPriorities = map[string]bool{
|
||||
"": true, // means "normal"
|
||||
"normal": true,
|
||||
"urgent": true,
|
||||
}
|
||||
|
||||
// Config configures a drop-folder Source.
|
||||
type Config struct {
|
||||
// Path is the directory to watch. Created if absent.
|
||||
Path string
|
||||
|
||||
// PollFallbackSeconds is a periodic full-scan interval in case inotify
|
||||
// misses an event. Set to 0 to disable.
|
||||
PollFallbackSeconds int
|
||||
}
|
||||
|
||||
// Source watches a directory for *.json drop files and emits events.
|
||||
type Source struct {
|
||||
cfg Config
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// New returns a configured (but not running) Source.
|
||||
func New(cfg Config, logger *slog.Logger) *Source {
|
||||
if logger == nil {
|
||||
logger = slog.Default()
|
||||
}
|
||||
if cfg.PollFallbackSeconds == 0 {
|
||||
cfg.PollFallbackSeconds = 30
|
||||
}
|
||||
return &Source{cfg: cfg, logger: logger.With("source", "drop-folder", "path", cfg.Path)}
|
||||
}
|
||||
|
||||
func (s *Source) Name() string { return "drop-folder" }
|
||||
|
||||
// Run blocks until ctx is canceled.
|
||||
func (s *Source) Run(ctx context.Context, emit source.Emit) error {
|
||||
if err := os.MkdirAll(s.cfg.Path, 0755); err != nil {
|
||||
return fmt.Errorf("dropfolder: mkdir %s: %w", s.cfg.Path, err)
|
||||
}
|
||||
if err := os.MkdirAll(s.deadLetterDir(), 0755); err != nil {
|
||||
return fmt.Errorf("dropfolder: mkdir dead-letter: %w", err)
|
||||
}
|
||||
|
||||
w, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return fmt.Errorf("dropfolder: new watcher: %w", err)
|
||||
}
|
||||
defer w.Close()
|
||||
if err := w.Add(s.cfg.Path); err != nil {
|
||||
return fmt.Errorf("dropfolder: add %s: %w", s.cfg.Path, err)
|
||||
}
|
||||
|
||||
// Initial scan — drain any files that landed before we were watching.
|
||||
s.scan(ctx, emit)
|
||||
|
||||
pollEvery := time.Duration(s.cfg.PollFallbackSeconds) * time.Second
|
||||
var pollCh <-chan time.Time
|
||||
if pollEvery > 0 {
|
||||
t := time.NewTicker(pollEvery)
|
||||
defer t.Stop()
|
||||
pollCh = t.C
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
|
||||
case ev, ok := <-w.Events:
|
||||
if !ok {
|
||||
return errors.New("dropfolder: watcher closed")
|
||||
}
|
||||
// Care about creates and renames-into (Syncthing uses rename).
|
||||
if ev.Op&(fsnotify.Create|fsnotify.Write) == 0 {
|
||||
continue
|
||||
}
|
||||
if !strings.HasSuffix(ev.Name, ".json") {
|
||||
continue
|
||||
}
|
||||
s.handle(ctx, emit, ev.Name)
|
||||
|
||||
case err, ok := <-w.Errors:
|
||||
if !ok {
|
||||
return errors.New("dropfolder: watcher errors closed")
|
||||
}
|
||||
s.logger.Warn("watcher error", "err", err)
|
||||
|
||||
case <-pollCh:
|
||||
s.scan(ctx, emit)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// scan ingests every *.json file currently in the drop folder.
|
||||
func (s *Source) scan(ctx context.Context, emit source.Emit) {
|
||||
entries, err := os.ReadDir(s.cfg.Path)
|
||||
if err != nil {
|
||||
s.logger.Error("scan readdir", "err", err)
|
||||
return
|
||||
}
|
||||
for _, e := range entries {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") {
|
||||
continue
|
||||
}
|
||||
s.handle(ctx, emit, filepath.Join(s.cfg.Path, e.Name()))
|
||||
}
|
||||
}
|
||||
|
||||
// handle reads, validates, emits, and removes (or dead-letters) one drop file.
|
||||
func (s *Source) handle(ctx context.Context, emit source.Emit, path string) {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
body, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
// File may have been ingested already by another loop iteration.
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return
|
||||
}
|
||||
s.logger.Warn("read drop file", "path", path, "err", err)
|
||||
s.deadLetter(path, "read failed")
|
||||
return
|
||||
}
|
||||
|
||||
pd, err := parseDrop(body)
|
||||
if err != nil {
|
||||
s.logger.Warn("invalid drop file", "path", path, "err", err)
|
||||
s.deadLetter(path, "schema invalid: "+err.Error())
|
||||
return
|
||||
}
|
||||
pd.event.Source = "drop-folder:" + filepath.Base(path)
|
||||
|
||||
if err := emit(pd.recipient, pd.event); err != nil {
|
||||
// Inbox write failure: don't lose the message. Leave the file in
|
||||
// place so the next scan retries. Log loudly.
|
||||
s.logger.Error("emit failed; drop file retained for retry", "path", path, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := os.Remove(path); err != nil {
|
||||
s.logger.Warn("remove drop file", "path", path, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Source) deadLetter(path, reason string) {
|
||||
target := filepath.Join(s.deadLetterDir(), filepath.Base(path))
|
||||
if err := os.Rename(path, target); err != nil {
|
||||
s.logger.Error("dead-letter move failed", "path", path, "err", err)
|
||||
return
|
||||
}
|
||||
// Sidecar reason file for forensics.
|
||||
_ = os.WriteFile(target+".reason", []byte(reason+"\n"), 0644)
|
||||
s.logger.Info("moved to dead-letter", "from", path, "to", target, "reason", reason)
|
||||
}
|
||||
|
||||
func (s *Source) deadLetterDir() string {
|
||||
return filepath.Join(s.cfg.Path, ".dead-letter")
|
||||
}
|
||||
|
||||
// dropFile is the on-disk schema; intermediate value during parse.
|
||||
type dropFile struct {
|
||||
Recipient string `json:"recipient"`
|
||||
Type string `json:"type"`
|
||||
Priority string `json:"priority,omitempty"`
|
||||
Payload string `json:"payload"`
|
||||
Sentinel string `json:"sentinel,omitempty"`
|
||||
}
|
||||
|
||||
type parsedDrop struct {
|
||||
recipient string
|
||||
event *inbox.Event
|
||||
}
|
||||
|
||||
func parseDrop(body []byte) (*parsedDrop, error) {
|
||||
var d dropFile
|
||||
dec := json.NewDecoder(strings.NewReader(string(body)))
|
||||
dec.DisallowUnknownFields()
|
||||
if err := dec.Decode(&d); err != nil {
|
||||
return nil, fmt.Errorf("decode: %w", err)
|
||||
}
|
||||
if d.Recipient == "" {
|
||||
return nil, errors.New("recipient required")
|
||||
}
|
||||
if !validTypes[d.Type] {
|
||||
return nil, fmt.Errorf("type must be INFO|NEEDS-RESPONSE|ACK-REQUEST, got %q", d.Type)
|
||||
}
|
||||
if !validPriorities[d.Priority] {
|
||||
return nil, fmt.Errorf("priority must be normal|urgent, got %q", d.Priority)
|
||||
}
|
||||
if d.Payload == "" {
|
||||
return nil, errors.New("payload required")
|
||||
}
|
||||
priority := d.Priority
|
||||
if priority == "" {
|
||||
priority = "normal"
|
||||
}
|
||||
return &parsedDrop{
|
||||
recipient: d.Recipient,
|
||||
event: &inbox.Event{
|
||||
From: "collector",
|
||||
Type: d.Type,
|
||||
Priority: priority,
|
||||
Payload: d.Payload,
|
||||
Sentinel: d.Sentinel,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
273
internal/source/dropfolder/dropfolder_test.go
Normal file
273
internal/source/dropfolder/dropfolder_test.go
Normal file
|
|
@ -0,0 +1,273 @@
|
|||
package dropfolder
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.botbought.ai/foreman/agent-watcher/internal/inbox"
|
||||
)
|
||||
|
||||
func quietLogger() *slog.Logger {
|
||||
return slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
}
|
||||
|
||||
func TestParseDrop_Valid(t *testing.T) {
|
||||
body := []byte(`{"recipient":"bob","type":"INFO","priority":"urgent","payload":"hi","sentinel":"/x"}`)
|
||||
p, err := parseDrop(body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if p.recipient != "bob" {
|
||||
t.Errorf("recipient = %q", p.recipient)
|
||||
}
|
||||
if p.event.Type != "INFO" || p.event.Priority != "urgent" || p.event.Payload != "hi" || p.event.Sentinel != "/x" {
|
||||
t.Errorf("unexpected event: %+v", p.event)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseDrop_DefaultsPriority(t *testing.T) {
|
||||
p, err := parseDrop([]byte(`{"recipient":"r","type":"INFO","payload":"p"}`))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if p.event.Priority != "normal" {
|
||||
t.Errorf("priority default = %q, want normal", p.event.Priority)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseDrop_Invalid(t *testing.T) {
|
||||
cases := map[string]string{
|
||||
"empty body": ``,
|
||||
"missing recipient": `{"type":"INFO","payload":"p"}`,
|
||||
"missing type": `{"recipient":"r","payload":"p"}`,
|
||||
"bad type": `{"recipient":"r","type":"NOPE","payload":"p"}`,
|
||||
"bad priority": `{"recipient":"r","type":"INFO","priority":"high","payload":"p"}`,
|
||||
"missing payload": `{"recipient":"r","type":"INFO"}`,
|
||||
"unknown field": `{"recipient":"r","type":"INFO","payload":"p","stowaway":1}`,
|
||||
"not json": `not json`,
|
||||
}
|
||||
for name, body := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
if _, err := parseDrop([]byte(body)); err == nil {
|
||||
t.Error("expected error")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// recordingEmit captures emitted events for assertion.
|
||||
type recordingEmit struct {
|
||||
mu sync.Mutex
|
||||
events []record
|
||||
err error // set to fail the next emit
|
||||
}
|
||||
|
||||
type record struct {
|
||||
recipient string
|
||||
event inbox.Event
|
||||
}
|
||||
|
||||
func (r *recordingEmit) emit(recipient string, ev *inbox.Event) error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
if r.err != nil {
|
||||
err := r.err
|
||||
r.err = nil
|
||||
return err
|
||||
}
|
||||
r.events = append(r.events, record{recipient, *ev})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *recordingEmit) snapshot() []record {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
out := make([]record, len(r.events))
|
||||
copy(out, r.events)
|
||||
return out
|
||||
}
|
||||
|
||||
func writeDrop(t *testing.T, dir, name string, payload any) string {
|
||||
t.Helper()
|
||||
path := filepath.Join(dir, name)
|
||||
b, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
tmp := path + ".tmp"
|
||||
if err := os.WriteFile(tmp, b, 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// rename for atomic visibility (mirrors what Syncthing does)
|
||||
if err := os.Rename(tmp, path); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return path
|
||||
}
|
||||
|
||||
func TestSource_InitialScan_IngestsExistingFiles(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
writeDrop(t, dir, "1.json", map[string]string{
|
||||
"recipient": "bob", "type": "INFO", "payload": "first",
|
||||
})
|
||||
writeDrop(t, dir, "2.json", map[string]string{
|
||||
"recipient": "bob", "type": "NEEDS-RESPONSE", "payload": "second",
|
||||
})
|
||||
|
||||
rec := &recordingEmit{}
|
||||
src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger())
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||
defer cancel()
|
||||
go src.Run(ctx, rec.emit)
|
||||
|
||||
waitFor(t, func() bool { return len(rec.snapshot()) == 2 }, 400*time.Millisecond)
|
||||
|
||||
got := rec.snapshot()
|
||||
if got[0].event.Payload+"|"+got[1].event.Payload != "first|second" &&
|
||||
got[0].event.Payload+"|"+got[1].event.Payload != "second|first" {
|
||||
t.Errorf("unexpected payloads: %+v", got)
|
||||
}
|
||||
for _, r := range got {
|
||||
if r.recipient != "bob" {
|
||||
t.Errorf("recipient %q", r.recipient)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSource_LiveDrop_IngestsViaInotify(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
rec := &recordingEmit{}
|
||||
src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger())
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go src.Run(ctx, rec.emit)
|
||||
|
||||
// Give the watcher time to attach.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
writeDrop(t, dir, "live.json", map[string]string{
|
||||
"recipient": "foreman", "type": "INFO", "payload": "live one",
|
||||
})
|
||||
|
||||
waitFor(t, func() bool { return len(rec.snapshot()) == 1 }, 1*time.Second)
|
||||
|
||||
got := rec.snapshot()
|
||||
if got[0].event.Payload != "live one" {
|
||||
t.Errorf("payload %q", got[0].event.Payload)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSource_DeleteAfterEmit(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
rec := &recordingEmit{}
|
||||
src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger())
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go src.Run(ctx, rec.emit)
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
path := writeDrop(t, dir, "ok.json", map[string]string{
|
||||
"recipient": "r", "type": "INFO", "payload": "p",
|
||||
})
|
||||
|
||||
waitFor(t, func() bool {
|
||||
_, err := os.Stat(path)
|
||||
return os.IsNotExist(err)
|
||||
}, 1*time.Second)
|
||||
}
|
||||
|
||||
func TestSource_DeadLetter_OnInvalidSchema(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
rec := &recordingEmit{}
|
||||
src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger())
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go src.Run(ctx, rec.emit)
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
// Missing payload — invalid.
|
||||
tmp := filepath.Join(dir, "bad.json.tmp")
|
||||
os.WriteFile(tmp, []byte(`{"recipient":"r","type":"INFO"}`), 0644)
|
||||
os.Rename(tmp, filepath.Join(dir, "bad.json"))
|
||||
|
||||
deadPath := filepath.Join(dir, ".dead-letter", "bad.json")
|
||||
waitFor(t, func() bool {
|
||||
_, err := os.Stat(deadPath)
|
||||
return err == nil
|
||||
}, 1*time.Second)
|
||||
|
||||
// Reason sidecar exists.
|
||||
if _, err := os.Stat(deadPath + ".reason"); err != nil {
|
||||
t.Errorf("reason sidecar missing: %v", err)
|
||||
}
|
||||
|
||||
// Original file gone from drop dir.
|
||||
if _, err := os.Stat(filepath.Join(dir, "bad.json")); !os.IsNotExist(err) {
|
||||
t.Errorf("original drop file should be gone")
|
||||
}
|
||||
|
||||
// Nothing was emitted.
|
||||
if got := rec.snapshot(); len(got) != 0 {
|
||||
t.Errorf("got events for invalid drop: %+v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSource_RetainsFile_WhenEmitFails(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
rec := &recordingEmit{err: errOnce()}
|
||||
src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger())
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go src.Run(ctx, rec.emit)
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
path := writeDrop(t, dir, "retry.json", map[string]string{
|
||||
"recipient": "r", "type": "INFO", "payload": "p",
|
||||
})
|
||||
|
||||
// Wait long enough for the (failing) emit to have happened.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Drop file is still there (retained for retry).
|
||||
if _, err := os.Stat(path); err != nil {
|
||||
t.Errorf("drop file should be retained on emit failure: %v", err)
|
||||
}
|
||||
// Not in dead-letter.
|
||||
if _, err := os.Stat(filepath.Join(dir, ".dead-letter", "retry.json")); err == nil {
|
||||
t.Error("emit failure should NOT dead-letter (it's transient)")
|
||||
}
|
||||
}
|
||||
|
||||
// errOnce returns a non-nil error one time, then nil after.
|
||||
func errOnce() error {
|
||||
type e struct{}
|
||||
return &emitErr{}
|
||||
}
|
||||
|
||||
type emitErr struct{}
|
||||
|
||||
func (e *emitErr) Error() string { return "transient emit failure" }
|
||||
|
||||
// waitFor polls predicate until it returns true or timeout elapses.
|
||||
func waitFor(t *testing.T, pred func() bool, timeout time.Duration) {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(timeout)
|
||||
for time.Now().Before(deadline) {
|
||||
if pred() {
|
||||
return
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("waitFor: predicate did not become true within %s", timeout)
|
||||
}
|
||||
35
internal/source/source.go
Normal file
35
internal/source/source.go
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
// Package source defines the contract every Collector source plugin implements.
|
||||
//
|
||||
// A Source is a long-running goroutine that converts external events into
|
||||
// inbox writes. It receives an Emit callback at start; each external event it
|
||||
// observes results in one Emit call. The Source returns when its context is
|
||||
// canceled, or earlier on a fatal error.
|
||||
//
|
||||
// Sources do not own the inbox writer or any other shared state. They emit;
|
||||
// the dispatcher routes. This keeps each source small and testable in
|
||||
// isolation.
|
||||
package source
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.botbought.ai/foreman/agent-watcher/internal/inbox"
|
||||
)
|
||||
|
||||
// Emit is the callback a Source uses to push one observed event into the
|
||||
// dispatcher. The dispatcher writes it to recipient's inbox file. Returning
|
||||
// an error means the inbox write failed; sources may log and continue, or
|
||||
// drop, or shut down — that's a per-source policy decision.
|
||||
type Emit func(recipient string, ev *inbox.Event) error
|
||||
|
||||
// Source is a Collector input. Implementations are constructed with their
|
||||
// own configuration and started via Run.
|
||||
type Source interface {
|
||||
// Name is a short identifier for logs and the "source" field on emitted
|
||||
// events ("webhook", "drop-folder", etc.).
|
||||
Name() string
|
||||
|
||||
// Run blocks until ctx is canceled or a fatal error occurs. Each
|
||||
// observed external event becomes one Emit call.
|
||||
Run(ctx context.Context, emit Emit) error
|
||||
}
|
||||
248
internal/source/webhook/webhook.go
Normal file
248
internal/source/webhook/webhook.go
Normal file
|
|
@ -0,0 +1,248 @@
|
|||
// Package webhook implements the HTTP webhook Source.
|
||||
//
|
||||
// Listens on a loopback address and routes incoming POSTs to inbox events
|
||||
// according to a routing table. Each route specifies recipient, type,
|
||||
// priority, and a Go text/template that renders the payload from the request
|
||||
// body decoded as JSON.
|
||||
//
|
||||
// v1 is loopback-only (127.0.0.1) — no authentication. The spec calls for
|
||||
// Caddy + bearer-token reverse-proxy as the v2 upgrade path. Do NOT bind to
|
||||
// 0.0.0.0 in v1.
|
||||
//
|
||||
// Per spec §3.5, the same listener also exposes /health with per-source
|
||||
// counters.
|
||||
package webhook
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"git.botbought.ai/foreman/agent-watcher/internal/inbox"
|
||||
"git.botbought.ai/foreman/agent-watcher/internal/source"
|
||||
)
|
||||
|
||||
// Route maps a URL path to an emitted event.
|
||||
type Route struct {
|
||||
// Recipient is the target inbox name.
|
||||
Recipient string
|
||||
|
||||
// Type is one of INFO, NEEDS-RESPONSE, ACK-REQUEST.
|
||||
Type string
|
||||
|
||||
// Priority is "normal" or "urgent". Empty defaults to "normal".
|
||||
Priority string
|
||||
|
||||
// PayloadTemplate is a Go text/template rendered with the request body
|
||||
// (decoded as JSON into a map[string]any) as the dot. Plain strings
|
||||
// without {{}} render to themselves.
|
||||
PayloadTemplate string
|
||||
}
|
||||
|
||||
// Config configures a webhook Source.
|
||||
type Config struct {
|
||||
// Listen is the bind address, e.g. "127.0.0.1:18790".
|
||||
Listen string
|
||||
|
||||
// Routes maps URL path → Route. Path includes the leading slash.
|
||||
Routes map[string]Route
|
||||
}
|
||||
|
||||
// Source serves HTTP and emits an inbox event for each matched POST.
|
||||
type Source struct {
|
||||
cfg Config
|
||||
logger *slog.Logger
|
||||
|
||||
tmpls map[string]*template.Template
|
||||
|
||||
// counters
|
||||
received atomic.Uint64
|
||||
emitted atomic.Uint64
|
||||
errors atomic.Uint64
|
||||
startedAt time.Time
|
||||
}
|
||||
|
||||
// New parses the route templates and returns a configured Source.
|
||||
// Returns an error if any template fails to parse.
|
||||
func New(cfg Config, logger *slog.Logger) (*Source, error) {
|
||||
if logger == nil {
|
||||
logger = slog.Default()
|
||||
}
|
||||
if cfg.Listen == "" {
|
||||
return nil, errors.New("webhook: listen address required")
|
||||
}
|
||||
if !isLoopback(cfg.Listen) {
|
||||
return nil, fmt.Errorf("webhook: listen %q is not a loopback address; v1 forbids non-loopback binds", cfg.Listen)
|
||||
}
|
||||
|
||||
tmpls := make(map[string]*template.Template, len(cfg.Routes))
|
||||
for path, r := range cfg.Routes {
|
||||
if !validTypes[r.Type] {
|
||||
return nil, fmt.Errorf("webhook: route %s: invalid type %q", path, r.Type)
|
||||
}
|
||||
if r.Priority != "" && r.Priority != "normal" && r.Priority != "urgent" {
|
||||
return nil, fmt.Errorf("webhook: route %s: invalid priority %q", path, r.Priority)
|
||||
}
|
||||
t, err := template.New(path).Option("missingkey=zero").Parse(r.PayloadTemplate)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("webhook: route %s: parse template: %w", path, err)
|
||||
}
|
||||
tmpls[path] = t
|
||||
}
|
||||
return &Source{
|
||||
cfg: cfg,
|
||||
logger: logger.With("source", "webhook", "listen", cfg.Listen),
|
||||
tmpls: tmpls,
|
||||
startedAt: time.Now(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Source) Name() string { return "webhook" }
|
||||
|
||||
// Run blocks until ctx is canceled.
|
||||
func (s *Source) Run(ctx context.Context, emit source.Emit) error {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/health", s.health)
|
||||
mux.HandleFunc("/", s.handler(emit))
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: s.cfg.Listen,
|
||||
Handler: mux,
|
||||
ReadHeaderTimeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
listenErr := make(chan error, 1)
|
||||
go func() {
|
||||
s.logger.Info("listening")
|
||||
listenErr <- srv.ListenAndServe()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
shutCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
_ = srv.Shutdown(shutCtx)
|
||||
return ctx.Err()
|
||||
case err := <-listenErr:
|
||||
if errors.Is(err, http.ErrServerClosed) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("webhook: serve: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Source) handler(emit source.Emit) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
s.received.Add(1)
|
||||
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
route, ok := s.cfg.Routes[r.URL.Path]
|
||||
if !ok {
|
||||
http.Error(w, "no route", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) // 1 MiB cap
|
||||
if err != nil {
|
||||
s.errors.Add(1)
|
||||
s.logger.Warn("read body", "path", r.URL.Path, "err", err)
|
||||
http.Error(w, "read failed", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
var data map[string]any
|
||||
if len(bytes.TrimSpace(body)) > 0 {
|
||||
if err := json.Unmarshal(body, &data); err != nil {
|
||||
s.errors.Add(1)
|
||||
s.logger.Warn("body not JSON", "path", r.URL.Path, "err", err)
|
||||
http.Error(w, "body must be JSON", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
data = map[string]any{}
|
||||
}
|
||||
|
||||
var rendered bytes.Buffer
|
||||
if err := s.tmpls[r.URL.Path].Execute(&rendered, data); err != nil {
|
||||
s.errors.Add(1)
|
||||
s.logger.Warn("template render", "path", r.URL.Path, "err", err)
|
||||
http.Error(w, "render failed", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
priority := route.Priority
|
||||
if priority == "" {
|
||||
priority = "normal"
|
||||
}
|
||||
ev := &inbox.Event{
|
||||
From: "collector",
|
||||
Type: route.Type,
|
||||
Priority: priority,
|
||||
Payload: rendered.String(),
|
||||
Source: "webhook:" + r.URL.Path,
|
||||
}
|
||||
if err := emit(route.Recipient, ev); err != nil {
|
||||
s.errors.Add(1)
|
||||
s.logger.Error("emit", "path", r.URL.Path, "err", err)
|
||||
http.Error(w, "emit failed", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
s.emitted.Add(1)
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
_, _ = io.WriteString(w, "ok\n")
|
||||
}
|
||||
}
|
||||
|
||||
// Stats returns the current counter snapshot. Safe to call from any goroutine.
|
||||
type Stats struct {
|
||||
Received uint64 `json:"received"`
|
||||
Emitted uint64 `json:"emitted"`
|
||||
Errors uint64 `json:"errors"`
|
||||
UptimeSec int64 `json:"uptime_sec"`
|
||||
}
|
||||
|
||||
func (s *Source) Stats() Stats {
|
||||
return Stats{
|
||||
Received: s.received.Load(),
|
||||
Emitted: s.emitted.Load(),
|
||||
Errors: s.errors.Load(),
|
||||
UptimeSec: int64(time.Since(s.startedAt).Seconds()),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Source) health(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(s.Stats())
|
||||
}
|
||||
|
||||
// validTypes mirrors agent-ping. Kept here (not imported from dropfolder) so
|
||||
// the webhook source has no cross-source dependency.
|
||||
var validTypes = map[string]bool{
|
||||
"INFO": true,
|
||||
"NEEDS-RESPONSE": true,
|
||||
"ACK-REQUEST": true,
|
||||
}
|
||||
|
||||
// isLoopback returns true if addr binds only to a loopback interface.
|
||||
// Accepts "127.0.0.1:port", "[::1]:port", "localhost:port".
|
||||
func isLoopback(addr string) bool {
|
||||
host, _, err := net.SplitHostPort(addr)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
if host == "localhost" {
|
||||
return true
|
||||
}
|
||||
ip := net.ParseIP(host)
|
||||
return ip != nil && ip.IsLoopback()
|
||||
}
|
||||
|
||||
323
internal/source/webhook/webhook_test.go
Normal file
323
internal/source/webhook/webhook_test.go
Normal file
|
|
@ -0,0 +1,323 @@
|
|||
package webhook
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.botbought.ai/foreman/agent-watcher/internal/inbox"
|
||||
)
|
||||
|
||||
func quietLogger() *slog.Logger {
|
||||
return slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
}
|
||||
|
||||
// freePort returns an unused loopback "127.0.0.1:N" address.
|
||||
func freePort(t *testing.T) string {
|
||||
t.Helper()
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
addr := l.Addr().String()
|
||||
l.Close()
|
||||
return addr
|
||||
}
|
||||
|
||||
type recordingEmit struct {
|
||||
mu sync.Mutex
|
||||
events []record
|
||||
err error
|
||||
}
|
||||
|
||||
type record struct {
|
||||
recipient string
|
||||
event inbox.Event
|
||||
}
|
||||
|
||||
func (r *recordingEmit) emit(recipient string, ev *inbox.Event) error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
if r.err != nil {
|
||||
return r.err
|
||||
}
|
||||
r.events = append(r.events, record{recipient, *ev})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *recordingEmit) snap() []record {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
out := make([]record, len(r.events))
|
||||
copy(out, r.events)
|
||||
return out
|
||||
}
|
||||
|
||||
func runSource(t *testing.T, cfg Config, emit func(string, *inbox.Event) error) (string, context.CancelFunc) {
|
||||
t.Helper()
|
||||
src, err := New(cfg, quietLogger())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go src.Run(ctx, emit)
|
||||
// Wait for listener.
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
c, err := net.DialTimeout("tcp", cfg.Listen, 50*time.Millisecond)
|
||||
if err == nil {
|
||||
c.Close()
|
||||
return cfg.Listen, cancel
|
||||
}
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
cancel()
|
||||
t.Fatal("listener never came up")
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func TestNew_RejectsNonLoopback(t *testing.T) {
|
||||
_, err := New(Config{Listen: "0.0.0.0:18790"}, quietLogger())
|
||||
if err == nil {
|
||||
t.Error("expected error for non-loopback")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNew_RejectsBadType(t *testing.T) {
|
||||
_, err := New(Config{
|
||||
Listen: "127.0.0.1:0",
|
||||
Routes: map[string]Route{
|
||||
"/x": {Recipient: "r", Type: "BOGUS", PayloadTemplate: "x"},
|
||||
},
|
||||
}, quietLogger())
|
||||
if err == nil {
|
||||
t.Error("expected error for bad type")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNew_RejectsBadTemplate(t *testing.T) {
|
||||
_, err := New(Config{
|
||||
Listen: "127.0.0.1:0",
|
||||
Routes: map[string]Route{
|
||||
"/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "{{ .unclosed"},
|
||||
},
|
||||
}, quietLogger())
|
||||
if err == nil {
|
||||
t.Error("expected error for bad template")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPOST_RoutesAndEmits(t *testing.T) {
|
||||
addr := freePort(t)
|
||||
cfg := Config{
|
||||
Listen: addr,
|
||||
Routes: map[string]Route{
|
||||
"/forgejo/push": {
|
||||
Recipient: "bob",
|
||||
Type: "INFO",
|
||||
PayloadTemplate: "forgejo push to {{ .repo }} by {{ .actor }}",
|
||||
},
|
||||
},
|
||||
}
|
||||
rec := &recordingEmit{}
|
||||
_, cancel := runSource(t, cfg, rec.emit)
|
||||
defer cancel()
|
||||
|
||||
body := `{"repo":"agent-ping","actor":"angus"}`
|
||||
resp, err := http.Post("http://"+addr+"/forgejo/push", "application/json", strings.NewReader(body))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusAccepted {
|
||||
t.Errorf("status = %d, want 202", resp.StatusCode)
|
||||
}
|
||||
|
||||
got := rec.snap()
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("got %d events, want 1", len(got))
|
||||
}
|
||||
if got[0].recipient != "bob" {
|
||||
t.Errorf("recipient = %q", got[0].recipient)
|
||||
}
|
||||
if got[0].event.Type != "INFO" {
|
||||
t.Errorf("type = %q", got[0].event.Type)
|
||||
}
|
||||
if got[0].event.Priority != "normal" {
|
||||
t.Errorf("priority default = %q", got[0].event.Priority)
|
||||
}
|
||||
if got[0].event.Payload != "forgejo push to agent-ping by angus" {
|
||||
t.Errorf("payload = %q", got[0].event.Payload)
|
||||
}
|
||||
if got[0].event.Source != "webhook:/forgejo/push" {
|
||||
t.Errorf("source = %q", got[0].event.Source)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPOST_PriorityAndEmptyBody(t *testing.T) {
|
||||
addr := freePort(t)
|
||||
cfg := Config{
|
||||
Listen: addr,
|
||||
Routes: map[string]Route{
|
||||
"/alert": {
|
||||
Recipient: "bob",
|
||||
Type: "NEEDS-RESPONSE",
|
||||
Priority: "urgent",
|
||||
PayloadTemplate: "alert fired",
|
||||
},
|
||||
},
|
||||
}
|
||||
rec := &recordingEmit{}
|
||||
_, cancel := runSource(t, cfg, rec.emit)
|
||||
defer cancel()
|
||||
|
||||
resp, err := http.Post("http://"+addr+"/alert", "application/json", strings.NewReader(""))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusAccepted {
|
||||
t.Errorf("status = %d", resp.StatusCode)
|
||||
}
|
||||
got := rec.snap()[0]
|
||||
if got.event.Priority != "urgent" {
|
||||
t.Errorf("priority = %q", got.event.Priority)
|
||||
}
|
||||
if got.event.Payload != "alert fired" {
|
||||
t.Errorf("payload = %q", got.event.Payload)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPOST_404OnUnknownRoute(t *testing.T) {
|
||||
addr := freePort(t)
|
||||
cfg := Config{Listen: addr, Routes: map[string]Route{}}
|
||||
rec := &recordingEmit{}
|
||||
_, cancel := runSource(t, cfg, rec.emit)
|
||||
defer cancel()
|
||||
|
||||
resp, err := http.Post("http://"+addr+"/nope", "application/json", strings.NewReader("{}"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusNotFound {
|
||||
t.Errorf("status = %d, want 404", resp.StatusCode)
|
||||
}
|
||||
if len(rec.snap()) != 0 {
|
||||
t.Error("emitted on unknown route")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPOST_400OnBadJSON(t *testing.T) {
|
||||
addr := freePort(t)
|
||||
cfg := Config{
|
||||
Listen: addr,
|
||||
Routes: map[string]Route{
|
||||
"/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"},
|
||||
},
|
||||
}
|
||||
rec := &recordingEmit{}
|
||||
_, cancel := runSource(t, cfg, rec.emit)
|
||||
defer cancel()
|
||||
|
||||
resp, err := http.Post("http://"+addr+"/x", "application/json", strings.NewReader("not json"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusBadRequest {
|
||||
t.Errorf("status = %d, want 400", resp.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPOST_500OnEmitFailure(t *testing.T) {
|
||||
addr := freePort(t)
|
||||
cfg := Config{
|
||||
Listen: addr,
|
||||
Routes: map[string]Route{
|
||||
"/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"},
|
||||
},
|
||||
}
|
||||
rec := &recordingEmit{err: errOnce()}
|
||||
_, cancel := runSource(t, cfg, rec.emit)
|
||||
defer cancel()
|
||||
|
||||
resp, err := http.Post("http://"+addr+"/x", "application/json", strings.NewReader("{}"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusInternalServerError {
|
||||
t.Errorf("status = %d, want 500", resp.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGET_MethodNotAllowed(t *testing.T) {
|
||||
addr := freePort(t)
|
||||
cfg := Config{
|
||||
Listen: addr,
|
||||
Routes: map[string]Route{
|
||||
"/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"},
|
||||
},
|
||||
}
|
||||
rec := &recordingEmit{}
|
||||
_, cancel := runSource(t, cfg, rec.emit)
|
||||
defer cancel()
|
||||
|
||||
resp, err := http.Get("http://" + addr + "/x")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusMethodNotAllowed {
|
||||
t.Errorf("status = %d", resp.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGET_HealthEndpoint(t *testing.T) {
|
||||
addr := freePort(t)
|
||||
cfg := Config{
|
||||
Listen: addr,
|
||||
Routes: map[string]Route{
|
||||
"/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"},
|
||||
},
|
||||
}
|
||||
rec := &recordingEmit{}
|
||||
_, cancel := runSource(t, cfg, rec.emit)
|
||||
defer cancel()
|
||||
|
||||
// Two POSTs: 1 success, 1 fail.
|
||||
http.Post("http://"+addr+"/x", "application/json", strings.NewReader("{}"))
|
||||
http.Post("http://"+addr+"/wrongpath", "application/json", strings.NewReader("{}"))
|
||||
|
||||
resp, err := http.Get("http://" + addr + "/health")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Fatalf("status = %d", resp.StatusCode)
|
||||
}
|
||||
var stats Stats
|
||||
if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if stats.Received < 2 {
|
||||
t.Errorf("received = %d", stats.Received)
|
||||
}
|
||||
if stats.Emitted != 1 {
|
||||
t.Errorf("emitted = %d, want 1", stats.Emitted)
|
||||
}
|
||||
}
|
||||
|
||||
type emitErr struct{}
|
||||
|
||||
func (e *emitErr) Error() string { return "emit boom" }
|
||||
func errOnce() error { return &emitErr{} }
|
||||
26
systemd/agent-watcher.service
Normal file
26
systemd/agent-watcher.service
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
[Unit]
|
||||
Description=agent-watcher Collector — converts external events to ping inbox writes
|
||||
Documentation=https://git.botbought.ai/foreman/agent-watcher
|
||||
After=network-online.target
|
||||
Wants=network-online.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
ExecStart=%h/.local/bin/agent-watcher --json-log
|
||||
Restart=on-failure
|
||||
RestartSec=5
|
||||
# small daemon; no need for elevated limits
|
||||
LimitNOFILE=4096
|
||||
# read-only by intent; the daemon writes only to the inbox dir which is
|
||||
# inside $HOME and unaffected by ProtectSystem.
|
||||
ProtectSystem=strict
|
||||
ProtectHome=read-write
|
||||
PrivateTmp=yes
|
||||
NoNewPrivileges=yes
|
||||
# stdout/stderr go to journald automatically; --json-log makes them parseable
|
||||
StandardOutput=journal
|
||||
StandardError=journal
|
||||
SyslogIdentifier=agent-watcher
|
||||
|
||||
[Install]
|
||||
WantedBy=default.target
|
||||
Loading…
Add table
Add a link
Reference in a new issue