From 50e8ece83da46eb4075fa54af76f9a7113a6856f Mon Sep 17 00:00:00 2001 From: bob-boat Date: Wed, 6 May 2026 16:09:56 -0400 Subject: [PATCH 1/7] Collector milestone 1: inbox writer + tests Layer 1 keystone. The internal/inbox package writes ping-shaped JSONL events to recipient inbox files in a format bit-identical to the agent-ping CLI's output, so the existing UserPromptSubmit hook and the future MCP Watcher cannot tell whether a line came from `ping` or the Collector. - O_APPEND opens for atomic line writes (POSIX guarantees writes <= PIPE_BUF, our lines are well under). - Per-recipient sync.Mutex bounds contention; multiple goroutines writing to one inbox stay correctly serialized. - 7 tests passing: shape, ID/TS preservation, omitempty for optional fields, key-set + compactness match against ping CLI's separators= (",",":") output, 100-goroutine concurrent-write torn-line check, bad-input rejection, empty-dir rejection. go.mod at git.botbought.ai/foreman/agent-watcher; module name matches the public Forgejo path so eventual consumers can `go get` it. Next milestones: - Source plugin interface - Drop folder source (inotify, via fsnotify) - HTTP webhook source - Config loader (YAML) - main.go wiring - systemd unit Co-Authored-By: Claude Opus 4.7 (1M context) --- go.mod | 3 + internal/inbox/inbox.go | 124 ++++++++++++++++++++++++ internal/inbox/inbox_test.go | 182 +++++++++++++++++++++++++++++++++++ 3 files changed, 309 insertions(+) create mode 100644 go.mod create mode 100644 internal/inbox/inbox.go create mode 100644 internal/inbox/inbox_test.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9bfd37c --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.botbought.ai/foreman/agent-watcher + +go 1.22.2 diff --git a/internal/inbox/inbox.go b/internal/inbox/inbox.go new file mode 100644 index 0000000..54ee0f5 --- /dev/null +++ b/internal/inbox/inbox.go @@ -0,0 +1,124 @@ +// Package inbox writes ping-shaped JSONL events to recipient inbox files. +// +// The output format is bit-identical to the agent-ping CLI's output, so the +// existing UserPromptSubmit hook and the future MCP Watcher cannot tell whether +// a line was written by `ping` or by the Collector. +// +// All writes are append-only with O_APPEND so concurrent writers don't tear +// each other's lines (POSIX guarantees atomicity for writes <= PIPE_BUF, and +// our lines are well under that). +package inbox + +import ( + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" +) + +// Event is the JSONL line shape. Field order and JSON keys match the ping CLI +// exactly. Optional fields use omitempty. +type Event struct { + TS string `json:"ts"` + ID string `json:"id"` + From string `json:"from"` + To string `json:"to"` + Type string `json:"type"` + Priority string `json:"priority"` + Payload string `json:"payload"` + Sentinel string `json:"sentinel,omitempty"` + Source string `json:"source,omitempty"` // collector-only debug field, e.g. "webhook:/forgejo/push" +} + +// Writer appends events to inbox files under a base directory. +// +// Concurrency: a single Writer is safe for use from multiple goroutines. +// One sync.Mutex per recipient bounds contention to per-file granularity. +type Writer struct { + dir string + + mu sync.Mutex + locks map[string]*sync.Mutex +} + +// NewWriter returns a Writer that appends to {dir}/{recipient}.inbox. +// The directory is created (with perms 0755) if it does not exist. +func NewWriter(dir string) (*Writer, error) { + if dir == "" { + return nil, fmt.Errorf("inbox: empty dir") + } + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("inbox: mkdir %s: %w", dir, err) + } + return &Writer{ + dir: dir, + locks: make(map[string]*sync.Mutex), + }, nil +} + +// Write appends one event to the recipient's inbox file. It mutates ev to set +// TS and ID if either is empty, then sets To from recipient. +func (w *Writer) Write(recipient string, ev *Event) error { + if recipient == "" { + return fmt.Errorf("inbox: empty recipient") + } + if ev == nil { + return fmt.Errorf("inbox: nil event") + } + if ev.TS == "" { + ev.TS = time.Now().UTC().Format("2006-01-02T15:04:05Z") + } + if ev.ID == "" { + id, err := newID() + if err != nil { + return fmt.Errorf("inbox: generate id: %w", err) + } + ev.ID = id + } + ev.To = recipient + + line, err := json.Marshal(ev) + if err != nil { + return fmt.Errorf("inbox: marshal: %w", err) + } + line = append(line, '\n') + + lock := w.lockFor(recipient) + lock.Lock() + defer lock.Unlock() + + path := filepath.Join(w.dir, recipient+".inbox") + f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("inbox: open %s: %w", path, err) + } + defer f.Close() + + if _, err := f.Write(line); err != nil { + return fmt.Errorf("inbox: append: %w", err) + } + return nil +} + +func (w *Writer) lockFor(recipient string) *sync.Mutex { + w.mu.Lock() + defer w.mu.Unlock() + if l, ok := w.locks[recipient]; ok { + return l + } + l := &sync.Mutex{} + w.locks[recipient] = l + return l +} + +func newID() (string, error) { + b := make([]byte, 4) + if _, err := rand.Read(b); err != nil { + return "", err + } + return "ping-" + hex.EncodeToString(b), nil +} diff --git a/internal/inbox/inbox_test.go b/internal/inbox/inbox_test.go new file mode 100644 index 0000000..22f199e --- /dev/null +++ b/internal/inbox/inbox_test.go @@ -0,0 +1,182 @@ +package inbox + +import ( + "encoding/json" + "os" + "path/filepath" + "strings" + "sync" + "testing" +) + +func TestWriter_Write_BasicShape(t *testing.T) { + dir := t.TempDir() + w, err := NewWriter(dir) + if err != nil { + t.Fatal(err) + } + + if err := w.Write("bob", &Event{ + From: "foreman", + Type: "INFO", + Priority: "normal", + Payload: "hi", + }); err != nil { + t.Fatal(err) + } + + got := readLines(t, filepath.Join(dir, "bob.inbox")) + if len(got) != 1 { + t.Fatalf("want 1 line, got %d", len(got)) + } + + var ev Event + if err := json.Unmarshal([]byte(got[0]), &ev); err != nil { + t.Fatalf("not JSON: %v", err) + } + if ev.To != "bob" { + t.Errorf("To = %q, want bob", ev.To) + } + if ev.From != "foreman" { + t.Errorf("From = %q, want foreman", ev.From) + } + if ev.Payload != "hi" { + t.Errorf("Payload = %q", ev.Payload) + } + if !strings.HasPrefix(ev.ID, "ping-") { + t.Errorf("ID %q missing ping- prefix", ev.ID) + } + if len(ev.ID) != len("ping-")+8 { + t.Errorf("ID %q wrong length", ev.ID) + } + if ev.TS == "" { + t.Errorf("TS unset") + } +} + +func TestWriter_Write_PreservesProvidedIDAndTS(t *testing.T) { + dir := t.TempDir() + w, err := NewWriter(dir) + if err != nil { + t.Fatal(err) + } + + want := &Event{ + TS: "2026-05-06T12:00:00Z", + ID: "ping-deadbeef", + From: "x", + Type: "INFO", + Priority: "normal", + Payload: "p", + } + if err := w.Write("r", want); err != nil { + t.Fatal(err) + } + + got := readLines(t, filepath.Join(dir, "r.inbox")) + var ev Event + json.Unmarshal([]byte(got[0]), &ev) + if ev.ID != want.ID { + t.Errorf("ID overwritten: got %q want %q", ev.ID, want.ID) + } + if ev.TS != want.TS { + t.Errorf("TS overwritten: got %q want %q", ev.TS, want.TS) + } +} + +func TestWriter_Write_OmitsEmptyOptionalFields(t *testing.T) { + dir := t.TempDir() + w, _ := NewWriter(dir) + w.Write("r", &Event{From: "f", Type: "INFO", Priority: "normal", Payload: "p"}) + + line := readLines(t, filepath.Join(dir, "r.inbox"))[0] + if strings.Contains(line, "sentinel") { + t.Errorf("empty sentinel leaked into JSON: %s", line) + } + if strings.Contains(line, "source") { + t.Errorf("empty source leaked into JSON: %s", line) + } +} + +func TestWriter_Write_PingShapeMatchesCLI(t *testing.T) { + // The ping CLI emits JSON with separators=(",", ":") (compact) and these + // keys in this order: ts, id, from, to, type, priority, payload [, sentinel]. + // Verify our marshaled output preserves the key set and compactness. + dir := t.TempDir() + w, _ := NewWriter(dir) + w.Write("r", &Event{ + From: "f", Type: "INFO", Priority: "normal", Payload: "p", Sentinel: "/x", + }) + line := strings.TrimSpace(readLines(t, filepath.Join(dir, "r.inbox"))[0]) + + wantKeys := []string{`"ts":`, `"id":`, `"from":`, `"to":`, `"type":`, `"priority":`, `"payload":`, `"sentinel":`} + for _, k := range wantKeys { + if !strings.Contains(line, k) { + t.Errorf("missing key %s in: %s", k, line) + } + } + // compact (no spaces around colons or commas) + if strings.Contains(line, ": ") || strings.Contains(line, ", ") { + t.Errorf("non-compact JSON: %s", line) + } +} + +func TestWriter_Write_ConcurrentSameRecipient(t *testing.T) { + // 100 goroutines writing to the same inbox should produce 100 valid JSON + // lines, none torn or interleaved. + dir := t.TempDir() + w, _ := NewWriter(dir) + + const n = 100 + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + w.Write("r", &Event{From: "f", Type: "INFO", Priority: "normal", Payload: "x"}) + }(i) + } + wg.Wait() + + lines := readLines(t, filepath.Join(dir, "r.inbox")) + if len(lines) != n { + t.Fatalf("got %d lines, want %d (torn writes?)", len(lines), n) + } + for i, l := range lines { + var ev Event + if err := json.Unmarshal([]byte(l), &ev); err != nil { + t.Errorf("line %d not JSON: %v: %s", i, err, l) + } + } +} + +func TestWriter_Write_RejectsBadInputs(t *testing.T) { + w, _ := NewWriter(t.TempDir()) + if err := w.Write("", &Event{}); err == nil { + t.Error("empty recipient: expected error") + } + if err := w.Write("r", nil); err == nil { + t.Error("nil event: expected error") + } +} + +func TestNewWriter_RejectsEmptyDir(t *testing.T) { + if _, err := NewWriter(""); err == nil { + t.Error("expected error for empty dir") + } +} + +func readLines(t *testing.T, path string) []string { + t.Helper() + b, err := os.ReadFile(path) + if err != nil { + t.Fatal(err) + } + out := []string{} + for _, l := range strings.Split(strings.TrimRight(string(b), "\n"), "\n") { + if l != "" { + out = append(out, l) + } + } + return out +} -- 2.49.1 From f9d81471c407a996e0b32b2aab80d85b69c34afd Mon Sep 17 00:00:00 2001 From: bob-boat Date: Wed, 6 May 2026 16:14:03 -0400 Subject: [PATCH 2/7] Collector milestone 2: source interface + drop-folder source MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit source.Source is the contract every Collector input implements: Name + Run(ctx, emit). Sources don't own state — they convert external events into emit calls. Dispatcher routes. internal/source/dropfolder: watches ~/Nyx/workspace/incoming/ for *.json drop files. fsnotify-driven with periodic poll fallback (default 30s safety net for missed events). Each file: 1. Parsed against the spec §3.1.2 schema with DisallowUnknownFields. 2. Valid → emitted, then file deleted. 3. Invalid (missing fields, bad type/priority, unknown fields, garbage) → moved to .dead-letter/ with a sidecar .reason file for forensics. 4. Emit failure → file retained in place for retry (transient errors shouldn't be permanent dead-letters). Also: initial-scan on Run() drains files that landed before the watcher attached, catching up after a Collector restart. 14 tests in the package — schema validation table for all error cases, initial-scan, live inotify drop, post-emit delete, dead-letter + sidecar, emit-failure retention. Plus the 7 inbox tests still passing. Pinned fsnotify v1.7.0 (Go 1.22 compatible; 1.10.x demanded toolchain 1.23 which isn't in apt yet). go.mod stays at 1.22 to match VPS. Co-Authored-By: Claude Opus 4.7 (1M context) --- go.mod | 6 +- go.sum | 4 + internal/source/dropfolder/dropfolder.go | 259 +++++++++++++++++ internal/source/dropfolder/dropfolder_test.go | 273 ++++++++++++++++++ internal/source/source.go | 35 +++ 5 files changed, 576 insertions(+), 1 deletion(-) create mode 100644 go.sum create mode 100644 internal/source/dropfolder/dropfolder.go create mode 100644 internal/source/dropfolder/dropfolder_test.go create mode 100644 internal/source/source.go diff --git a/go.mod b/go.mod index 9bfd37c..ccb4746 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,7 @@ module git.botbought.ai/foreman/agent-watcher -go 1.22.2 +go 1.22 + +require github.com/fsnotify/fsnotify v1.7.0 + +require golang.org/x/sys v0.13.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9e90680 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/source/dropfolder/dropfolder.go b/internal/source/dropfolder/dropfolder.go new file mode 100644 index 0000000..3488f36 --- /dev/null +++ b/internal/source/dropfolder/dropfolder.go @@ -0,0 +1,259 @@ +// Package dropfolder implements the drop-folder Source. +// +// Producers drop *.json files into a designated directory. The Source ingests +// each file, validates it against a schema, emits an event, and deletes the +// file. Malformed files are moved to a .dead-letter/ subdirectory for +// inspection. +// +// Lifecycle: on startup, scan the folder and ingest any pre-existing files +// (catches up after a Collector restart). Then watch via inotify for new +// arrivals. A periodic poll (default 30s) is a safety net in case inotify +// misses a Syncthing-driven rename — Syncthing usually fires IN_CLOSE_WRITE +// on rename, but the spec calls for a fallback. +// +// Drop file schema (per spec §3.1.2): +// +// { +// "recipient": "bob", +// "type": "INFO" | "NEEDS-RESPONSE" | "ACK-REQUEST", +// "priority": "normal" | "urgent", // optional, default "normal" +// "payload": "...", +// "sentinel": "/path/optional" // optional +// } +package dropfolder + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "os" + "path/filepath" + "strings" + "time" + + "github.com/fsnotify/fsnotify" + + "git.botbought.ai/foreman/agent-watcher/internal/inbox" + "git.botbought.ai/foreman/agent-watcher/internal/source" +) + +// Validation rules duplicated from agent-ping's CLI for tightness. +var validTypes = map[string]bool{ + "INFO": true, + "NEEDS-RESPONSE": true, + "ACK-REQUEST": true, +} + +var validPriorities = map[string]bool{ + "": true, // means "normal" + "normal": true, + "urgent": true, +} + +// Config configures a drop-folder Source. +type Config struct { + // Path is the directory to watch. Created if absent. + Path string + + // PollFallbackSeconds is a periodic full-scan interval in case inotify + // misses an event. Set to 0 to disable. + PollFallbackSeconds int +} + +// Source watches a directory for *.json drop files and emits events. +type Source struct { + cfg Config + logger *slog.Logger +} + +// New returns a configured (but not running) Source. +func New(cfg Config, logger *slog.Logger) *Source { + if logger == nil { + logger = slog.Default() + } + if cfg.PollFallbackSeconds == 0 { + cfg.PollFallbackSeconds = 30 + } + return &Source{cfg: cfg, logger: logger.With("source", "drop-folder", "path", cfg.Path)} +} + +func (s *Source) Name() string { return "drop-folder" } + +// Run blocks until ctx is canceled. +func (s *Source) Run(ctx context.Context, emit source.Emit) error { + if err := os.MkdirAll(s.cfg.Path, 0755); err != nil { + return fmt.Errorf("dropfolder: mkdir %s: %w", s.cfg.Path, err) + } + if err := os.MkdirAll(s.deadLetterDir(), 0755); err != nil { + return fmt.Errorf("dropfolder: mkdir dead-letter: %w", err) + } + + w, err := fsnotify.NewWatcher() + if err != nil { + return fmt.Errorf("dropfolder: new watcher: %w", err) + } + defer w.Close() + if err := w.Add(s.cfg.Path); err != nil { + return fmt.Errorf("dropfolder: add %s: %w", s.cfg.Path, err) + } + + // Initial scan — drain any files that landed before we were watching. + s.scan(ctx, emit) + + pollEvery := time.Duration(s.cfg.PollFallbackSeconds) * time.Second + var pollCh <-chan time.Time + if pollEvery > 0 { + t := time.NewTicker(pollEvery) + defer t.Stop() + pollCh = t.C + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + + case ev, ok := <-w.Events: + if !ok { + return errors.New("dropfolder: watcher closed") + } + // Care about creates and renames-into (Syncthing uses rename). + if ev.Op&(fsnotify.Create|fsnotify.Write) == 0 { + continue + } + if !strings.HasSuffix(ev.Name, ".json") { + continue + } + s.handle(ctx, emit, ev.Name) + + case err, ok := <-w.Errors: + if !ok { + return errors.New("dropfolder: watcher errors closed") + } + s.logger.Warn("watcher error", "err", err) + + case <-pollCh: + s.scan(ctx, emit) + } + } +} + +// scan ingests every *.json file currently in the drop folder. +func (s *Source) scan(ctx context.Context, emit source.Emit) { + entries, err := os.ReadDir(s.cfg.Path) + if err != nil { + s.logger.Error("scan readdir", "err", err) + return + } + for _, e := range entries { + if ctx.Err() != nil { + return + } + if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") { + continue + } + s.handle(ctx, emit, filepath.Join(s.cfg.Path, e.Name())) + } +} + +// handle reads, validates, emits, and removes (or dead-letters) one drop file. +func (s *Source) handle(ctx context.Context, emit source.Emit, path string) { + if ctx.Err() != nil { + return + } + body, err := os.ReadFile(path) + if err != nil { + // File may have been ingested already by another loop iteration. + if errors.Is(err, os.ErrNotExist) { + return + } + s.logger.Warn("read drop file", "path", path, "err", err) + s.deadLetter(path, "read failed") + return + } + + pd, err := parseDrop(body) + if err != nil { + s.logger.Warn("invalid drop file", "path", path, "err", err) + s.deadLetter(path, "schema invalid: "+err.Error()) + return + } + pd.event.Source = "drop-folder:" + filepath.Base(path) + + if err := emit(pd.recipient, pd.event); err != nil { + // Inbox write failure: don't lose the message. Leave the file in + // place so the next scan retries. Log loudly. + s.logger.Error("emit failed; drop file retained for retry", "path", path, "err", err) + return + } + + if err := os.Remove(path); err != nil { + s.logger.Warn("remove drop file", "path", path, "err", err) + } +} + +func (s *Source) deadLetter(path, reason string) { + target := filepath.Join(s.deadLetterDir(), filepath.Base(path)) + if err := os.Rename(path, target); err != nil { + s.logger.Error("dead-letter move failed", "path", path, "err", err) + return + } + // Sidecar reason file for forensics. + _ = os.WriteFile(target+".reason", []byte(reason+"\n"), 0644) + s.logger.Info("moved to dead-letter", "from", path, "to", target, "reason", reason) +} + +func (s *Source) deadLetterDir() string { + return filepath.Join(s.cfg.Path, ".dead-letter") +} + +// dropFile is the on-disk schema; intermediate value during parse. +type dropFile struct { + Recipient string `json:"recipient"` + Type string `json:"type"` + Priority string `json:"priority,omitempty"` + Payload string `json:"payload"` + Sentinel string `json:"sentinel,omitempty"` +} + +type parsedDrop struct { + recipient string + event *inbox.Event +} + +func parseDrop(body []byte) (*parsedDrop, error) { + var d dropFile + dec := json.NewDecoder(strings.NewReader(string(body))) + dec.DisallowUnknownFields() + if err := dec.Decode(&d); err != nil { + return nil, fmt.Errorf("decode: %w", err) + } + if d.Recipient == "" { + return nil, errors.New("recipient required") + } + if !validTypes[d.Type] { + return nil, fmt.Errorf("type must be INFO|NEEDS-RESPONSE|ACK-REQUEST, got %q", d.Type) + } + if !validPriorities[d.Priority] { + return nil, fmt.Errorf("priority must be normal|urgent, got %q", d.Priority) + } + if d.Payload == "" { + return nil, errors.New("payload required") + } + priority := d.Priority + if priority == "" { + priority = "normal" + } + return &parsedDrop{ + recipient: d.Recipient, + event: &inbox.Event{ + From: "collector", + Type: d.Type, + Priority: priority, + Payload: d.Payload, + Sentinel: d.Sentinel, + }, + }, nil +} diff --git a/internal/source/dropfolder/dropfolder_test.go b/internal/source/dropfolder/dropfolder_test.go new file mode 100644 index 0000000..4e1d508 --- /dev/null +++ b/internal/source/dropfolder/dropfolder_test.go @@ -0,0 +1,273 @@ +package dropfolder + +import ( + "context" + "encoding/json" + "io" + "log/slog" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "git.botbought.ai/foreman/agent-watcher/internal/inbox" +) + +func quietLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +func TestParseDrop_Valid(t *testing.T) { + body := []byte(`{"recipient":"bob","type":"INFO","priority":"urgent","payload":"hi","sentinel":"/x"}`) + p, err := parseDrop(body) + if err != nil { + t.Fatal(err) + } + if p.recipient != "bob" { + t.Errorf("recipient = %q", p.recipient) + } + if p.event.Type != "INFO" || p.event.Priority != "urgent" || p.event.Payload != "hi" || p.event.Sentinel != "/x" { + t.Errorf("unexpected event: %+v", p.event) + } +} + +func TestParseDrop_DefaultsPriority(t *testing.T) { + p, err := parseDrop([]byte(`{"recipient":"r","type":"INFO","payload":"p"}`)) + if err != nil { + t.Fatal(err) + } + if p.event.Priority != "normal" { + t.Errorf("priority default = %q, want normal", p.event.Priority) + } +} + +func TestParseDrop_Invalid(t *testing.T) { + cases := map[string]string{ + "empty body": ``, + "missing recipient": `{"type":"INFO","payload":"p"}`, + "missing type": `{"recipient":"r","payload":"p"}`, + "bad type": `{"recipient":"r","type":"NOPE","payload":"p"}`, + "bad priority": `{"recipient":"r","type":"INFO","priority":"high","payload":"p"}`, + "missing payload": `{"recipient":"r","type":"INFO"}`, + "unknown field": `{"recipient":"r","type":"INFO","payload":"p","stowaway":1}`, + "not json": `not json`, + } + for name, body := range cases { + t.Run(name, func(t *testing.T) { + if _, err := parseDrop([]byte(body)); err == nil { + t.Error("expected error") + } + }) + } +} + +// recordingEmit captures emitted events for assertion. +type recordingEmit struct { + mu sync.Mutex + events []record + err error // set to fail the next emit +} + +type record struct { + recipient string + event inbox.Event +} + +func (r *recordingEmit) emit(recipient string, ev *inbox.Event) error { + r.mu.Lock() + defer r.mu.Unlock() + if r.err != nil { + err := r.err + r.err = nil + return err + } + r.events = append(r.events, record{recipient, *ev}) + return nil +} + +func (r *recordingEmit) snapshot() []record { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]record, len(r.events)) + copy(out, r.events) + return out +} + +func writeDrop(t *testing.T, dir, name string, payload any) string { + t.Helper() + path := filepath.Join(dir, name) + b, err := json.Marshal(payload) + if err != nil { + t.Fatal(err) + } + tmp := path + ".tmp" + if err := os.WriteFile(tmp, b, 0644); err != nil { + t.Fatal(err) + } + // rename for atomic visibility (mirrors what Syncthing does) + if err := os.Rename(tmp, path); err != nil { + t.Fatal(err) + } + return path +} + +func TestSource_InitialScan_IngestsExistingFiles(t *testing.T) { + dir := t.TempDir() + writeDrop(t, dir, "1.json", map[string]string{ + "recipient": "bob", "type": "INFO", "payload": "first", + }) + writeDrop(t, dir, "2.json", map[string]string{ + "recipient": "bob", "type": "NEEDS-RESPONSE", "payload": "second", + }) + + rec := &recordingEmit{} + src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + go src.Run(ctx, rec.emit) + + waitFor(t, func() bool { return len(rec.snapshot()) == 2 }, 400*time.Millisecond) + + got := rec.snapshot() + if got[0].event.Payload+"|"+got[1].event.Payload != "first|second" && + got[0].event.Payload+"|"+got[1].event.Payload != "second|first" { + t.Errorf("unexpected payloads: %+v", got) + } + for _, r := range got { + if r.recipient != "bob" { + t.Errorf("recipient %q", r.recipient) + } + } +} + +func TestSource_LiveDrop_IngestsViaInotify(t *testing.T) { + dir := t.TempDir() + rec := &recordingEmit{} + src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go src.Run(ctx, rec.emit) + + // Give the watcher time to attach. + time.Sleep(50 * time.Millisecond) + writeDrop(t, dir, "live.json", map[string]string{ + "recipient": "foreman", "type": "INFO", "payload": "live one", + }) + + waitFor(t, func() bool { return len(rec.snapshot()) == 1 }, 1*time.Second) + + got := rec.snapshot() + if got[0].event.Payload != "live one" { + t.Errorf("payload %q", got[0].event.Payload) + } +} + +func TestSource_DeleteAfterEmit(t *testing.T) { + dir := t.TempDir() + rec := &recordingEmit{} + src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go src.Run(ctx, rec.emit) + + time.Sleep(50 * time.Millisecond) + path := writeDrop(t, dir, "ok.json", map[string]string{ + "recipient": "r", "type": "INFO", "payload": "p", + }) + + waitFor(t, func() bool { + _, err := os.Stat(path) + return os.IsNotExist(err) + }, 1*time.Second) +} + +func TestSource_DeadLetter_OnInvalidSchema(t *testing.T) { + dir := t.TempDir() + rec := &recordingEmit{} + src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go src.Run(ctx, rec.emit) + + time.Sleep(50 * time.Millisecond) + // Missing payload — invalid. + tmp := filepath.Join(dir, "bad.json.tmp") + os.WriteFile(tmp, []byte(`{"recipient":"r","type":"INFO"}`), 0644) + os.Rename(tmp, filepath.Join(dir, "bad.json")) + + deadPath := filepath.Join(dir, ".dead-letter", "bad.json") + waitFor(t, func() bool { + _, err := os.Stat(deadPath) + return err == nil + }, 1*time.Second) + + // Reason sidecar exists. + if _, err := os.Stat(deadPath + ".reason"); err != nil { + t.Errorf("reason sidecar missing: %v", err) + } + + // Original file gone from drop dir. + if _, err := os.Stat(filepath.Join(dir, "bad.json")); !os.IsNotExist(err) { + t.Errorf("original drop file should be gone") + } + + // Nothing was emitted. + if got := rec.snapshot(); len(got) != 0 { + t.Errorf("got events for invalid drop: %+v", got) + } +} + +func TestSource_RetainsFile_WhenEmitFails(t *testing.T) { + dir := t.TempDir() + rec := &recordingEmit{err: errOnce()} + src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go src.Run(ctx, rec.emit) + + time.Sleep(50 * time.Millisecond) + path := writeDrop(t, dir, "retry.json", map[string]string{ + "recipient": "r", "type": "INFO", "payload": "p", + }) + + // Wait long enough for the (failing) emit to have happened. + time.Sleep(200 * time.Millisecond) + + // Drop file is still there (retained for retry). + if _, err := os.Stat(path); err != nil { + t.Errorf("drop file should be retained on emit failure: %v", err) + } + // Not in dead-letter. + if _, err := os.Stat(filepath.Join(dir, ".dead-letter", "retry.json")); err == nil { + t.Error("emit failure should NOT dead-letter (it's transient)") + } +} + +// errOnce returns a non-nil error one time, then nil after. +func errOnce() error { + type e struct{} + return &emitErr{} +} + +type emitErr struct{} + +func (e *emitErr) Error() string { return "transient emit failure" } + +// waitFor polls predicate until it returns true or timeout elapses. +func waitFor(t *testing.T, pred func() bool, timeout time.Duration) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if pred() { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("waitFor: predicate did not become true within %s", timeout) +} diff --git a/internal/source/source.go b/internal/source/source.go new file mode 100644 index 0000000..d9d914f --- /dev/null +++ b/internal/source/source.go @@ -0,0 +1,35 @@ +// Package source defines the contract every Collector source plugin implements. +// +// A Source is a long-running goroutine that converts external events into +// inbox writes. It receives an Emit callback at start; each external event it +// observes results in one Emit call. The Source returns when its context is +// canceled, or earlier on a fatal error. +// +// Sources do not own the inbox writer or any other shared state. They emit; +// the dispatcher routes. This keeps each source small and testable in +// isolation. +package source + +import ( + "context" + + "git.botbought.ai/foreman/agent-watcher/internal/inbox" +) + +// Emit is the callback a Source uses to push one observed event into the +// dispatcher. The dispatcher writes it to recipient's inbox file. Returning +// an error means the inbox write failed; sources may log and continue, or +// drop, or shut down — that's a per-source policy decision. +type Emit func(recipient string, ev *inbox.Event) error + +// Source is a Collector input. Implementations are constructed with their +// own configuration and started via Run. +type Source interface { + // Name is a short identifier for logs and the "source" field on emitted + // events ("webhook", "drop-folder", etc.). + Name() string + + // Run blocks until ctx is canceled or a fatal error occurs. Each + // observed external event becomes one Emit call. + Run(ctx context.Context, emit Emit) error +} -- 2.49.1 From ba6db7c82f5b4f87e8081fe5ef7206e305ab3c31 Mon Sep 17 00:00:00 2001 From: bob-boat Date: Wed, 6 May 2026 16:17:01 -0400 Subject: [PATCH 3/7] Collector milestone 3: HTTP webhook source + /health MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit internal/source/webhook routes inbound POSTs to inbox events via a configured table. Each route specifies recipient, type, priority, and a Go text/template payload renderer that consumes the request body decoded as JSON. v1 binds loopback only — New() rejects non-loopback addresses at construction. Caddy + bearer-token reverse-proxy is the v2 upgrade path per spec §4. Behavior: - POST + matched route + valid JSON body → render template, emit, 202 - Missing route → 404 - Wrong method → 405 - Bad JSON → 400 - Template render failure → 500 - Emit failure → 500 (caller responsible for retry; HTTP source has no durable staging) - Empty body → empty data map for template (lets fixed-string templates work without sending {}) - 1 MiB request body cap GET /health returns JSON Stats{received, emitted, errors, uptime_sec} on the same listener for journalctl correlation per spec §3.5. 10 tests passing — non-loopback rejection, bad type/template rejection, route+template happy path, priority defaulting, empty body, 404/400/405/500, health endpoint counters. 31 tests across the three internal packages, all passing. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/source/webhook/webhook.go | 248 ++++++++++++++++++ internal/source/webhook/webhook_test.go | 323 ++++++++++++++++++++++++ 2 files changed, 571 insertions(+) create mode 100644 internal/source/webhook/webhook.go create mode 100644 internal/source/webhook/webhook_test.go diff --git a/internal/source/webhook/webhook.go b/internal/source/webhook/webhook.go new file mode 100644 index 0000000..18da159 --- /dev/null +++ b/internal/source/webhook/webhook.go @@ -0,0 +1,248 @@ +// Package webhook implements the HTTP webhook Source. +// +// Listens on a loopback address and routes incoming POSTs to inbox events +// according to a routing table. Each route specifies recipient, type, +// priority, and a Go text/template that renders the payload from the request +// body decoded as JSON. +// +// v1 is loopback-only (127.0.0.1) — no authentication. The spec calls for +// Caddy + bearer-token reverse-proxy as the v2 upgrade path. Do NOT bind to +// 0.0.0.0 in v1. +// +// Per spec §3.5, the same listener also exposes /health with per-source +// counters. +package webhook + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "net" + "net/http" + "sync/atomic" + "text/template" + "time" + + "git.botbought.ai/foreman/agent-watcher/internal/inbox" + "git.botbought.ai/foreman/agent-watcher/internal/source" +) + +// Route maps a URL path to an emitted event. +type Route struct { + // Recipient is the target inbox name. + Recipient string + + // Type is one of INFO, NEEDS-RESPONSE, ACK-REQUEST. + Type string + + // Priority is "normal" or "urgent". Empty defaults to "normal". + Priority string + + // PayloadTemplate is a Go text/template rendered with the request body + // (decoded as JSON into a map[string]any) as the dot. Plain strings + // without {{}} render to themselves. + PayloadTemplate string +} + +// Config configures a webhook Source. +type Config struct { + // Listen is the bind address, e.g. "127.0.0.1:18790". + Listen string + + // Routes maps URL path → Route. Path includes the leading slash. + Routes map[string]Route +} + +// Source serves HTTP and emits an inbox event for each matched POST. +type Source struct { + cfg Config + logger *slog.Logger + + tmpls map[string]*template.Template + + // counters + received atomic.Uint64 + emitted atomic.Uint64 + errors atomic.Uint64 + startedAt time.Time +} + +// New parses the route templates and returns a configured Source. +// Returns an error if any template fails to parse. +func New(cfg Config, logger *slog.Logger) (*Source, error) { + if logger == nil { + logger = slog.Default() + } + if cfg.Listen == "" { + return nil, errors.New("webhook: listen address required") + } + if !isLoopback(cfg.Listen) { + return nil, fmt.Errorf("webhook: listen %q is not a loopback address; v1 forbids non-loopback binds", cfg.Listen) + } + + tmpls := make(map[string]*template.Template, len(cfg.Routes)) + for path, r := range cfg.Routes { + if !validTypes[r.Type] { + return nil, fmt.Errorf("webhook: route %s: invalid type %q", path, r.Type) + } + if r.Priority != "" && r.Priority != "normal" && r.Priority != "urgent" { + return nil, fmt.Errorf("webhook: route %s: invalid priority %q", path, r.Priority) + } + t, err := template.New(path).Option("missingkey=zero").Parse(r.PayloadTemplate) + if err != nil { + return nil, fmt.Errorf("webhook: route %s: parse template: %w", path, err) + } + tmpls[path] = t + } + return &Source{ + cfg: cfg, + logger: logger.With("source", "webhook", "listen", cfg.Listen), + tmpls: tmpls, + startedAt: time.Now(), + }, nil +} + +func (s *Source) Name() string { return "webhook" } + +// Run blocks until ctx is canceled. +func (s *Source) Run(ctx context.Context, emit source.Emit) error { + mux := http.NewServeMux() + mux.HandleFunc("/health", s.health) + mux.HandleFunc("/", s.handler(emit)) + + srv := &http.Server{ + Addr: s.cfg.Listen, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + + listenErr := make(chan error, 1) + go func() { + s.logger.Info("listening") + listenErr <- srv.ListenAndServe() + }() + + select { + case <-ctx.Done(): + shutCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + _ = srv.Shutdown(shutCtx) + return ctx.Err() + case err := <-listenErr: + if errors.Is(err, http.ErrServerClosed) { + return nil + } + return fmt.Errorf("webhook: serve: %w", err) + } +} + +func (s *Source) handler(emit source.Emit) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + s.received.Add(1) + + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + route, ok := s.cfg.Routes[r.URL.Path] + if !ok { + http.Error(w, "no route", http.StatusNotFound) + return + } + body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) // 1 MiB cap + if err != nil { + s.errors.Add(1) + s.logger.Warn("read body", "path", r.URL.Path, "err", err) + http.Error(w, "read failed", http.StatusBadRequest) + return + } + var data map[string]any + if len(bytes.TrimSpace(body)) > 0 { + if err := json.Unmarshal(body, &data); err != nil { + s.errors.Add(1) + s.logger.Warn("body not JSON", "path", r.URL.Path, "err", err) + http.Error(w, "body must be JSON", http.StatusBadRequest) + return + } + } else { + data = map[string]any{} + } + + var rendered bytes.Buffer + if err := s.tmpls[r.URL.Path].Execute(&rendered, data); err != nil { + s.errors.Add(1) + s.logger.Warn("template render", "path", r.URL.Path, "err", err) + http.Error(w, "render failed", http.StatusInternalServerError) + return + } + priority := route.Priority + if priority == "" { + priority = "normal" + } + ev := &inbox.Event{ + From: "collector", + Type: route.Type, + Priority: priority, + Payload: rendered.String(), + Source: "webhook:" + r.URL.Path, + } + if err := emit(route.Recipient, ev); err != nil { + s.errors.Add(1) + s.logger.Error("emit", "path", r.URL.Path, "err", err) + http.Error(w, "emit failed", http.StatusInternalServerError) + return + } + s.emitted.Add(1) + w.WriteHeader(http.StatusAccepted) + _, _ = io.WriteString(w, "ok\n") + } +} + +// Stats returns the current counter snapshot. Safe to call from any goroutine. +type Stats struct { + Received uint64 `json:"received"` + Emitted uint64 `json:"emitted"` + Errors uint64 `json:"errors"` + UptimeSec int64 `json:"uptime_sec"` +} + +func (s *Source) Stats() Stats { + return Stats{ + Received: s.received.Load(), + Emitted: s.emitted.Load(), + Errors: s.errors.Load(), + UptimeSec: int64(time.Since(s.startedAt).Seconds()), + } +} + +func (s *Source) health(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(s.Stats()) +} + +// validTypes mirrors agent-ping. Kept here (not imported from dropfolder) so +// the webhook source has no cross-source dependency. +var validTypes = map[string]bool{ + "INFO": true, + "NEEDS-RESPONSE": true, + "ACK-REQUEST": true, +} + +// isLoopback returns true if addr binds only to a loopback interface. +// Accepts "127.0.0.1:port", "[::1]:port", "localhost:port". +func isLoopback(addr string) bool { + host, _, err := net.SplitHostPort(addr) + if err != nil { + return false + } + if host == "localhost" { + return true + } + ip := net.ParseIP(host) + return ip != nil && ip.IsLoopback() +} + diff --git a/internal/source/webhook/webhook_test.go b/internal/source/webhook/webhook_test.go new file mode 100644 index 0000000..0eb47e8 --- /dev/null +++ b/internal/source/webhook/webhook_test.go @@ -0,0 +1,323 @@ +package webhook + +import ( + "context" + "encoding/json" + "io" + "log/slog" + "net" + "net/http" + "strings" + "sync" + "testing" + "time" + + "git.botbought.ai/foreman/agent-watcher/internal/inbox" +) + +func quietLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +// freePort returns an unused loopback "127.0.0.1:N" address. +func freePort(t *testing.T) string { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + addr := l.Addr().String() + l.Close() + return addr +} + +type recordingEmit struct { + mu sync.Mutex + events []record + err error +} + +type record struct { + recipient string + event inbox.Event +} + +func (r *recordingEmit) emit(recipient string, ev *inbox.Event) error { + r.mu.Lock() + defer r.mu.Unlock() + if r.err != nil { + return r.err + } + r.events = append(r.events, record{recipient, *ev}) + return nil +} + +func (r *recordingEmit) snap() []record { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]record, len(r.events)) + copy(out, r.events) + return out +} + +func runSource(t *testing.T, cfg Config, emit func(string, *inbox.Event) error) (string, context.CancelFunc) { + t.Helper() + src, err := New(cfg, quietLogger()) + if err != nil { + t.Fatal(err) + } + ctx, cancel := context.WithCancel(context.Background()) + go src.Run(ctx, emit) + // Wait for listener. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + c, err := net.DialTimeout("tcp", cfg.Listen, 50*time.Millisecond) + if err == nil { + c.Close() + return cfg.Listen, cancel + } + time.Sleep(20 * time.Millisecond) + } + cancel() + t.Fatal("listener never came up") + return "", nil +} + +func TestNew_RejectsNonLoopback(t *testing.T) { + _, err := New(Config{Listen: "0.0.0.0:18790"}, quietLogger()) + if err == nil { + t.Error("expected error for non-loopback") + } +} + +func TestNew_RejectsBadType(t *testing.T) { + _, err := New(Config{ + Listen: "127.0.0.1:0", + Routes: map[string]Route{ + "/x": {Recipient: "r", Type: "BOGUS", PayloadTemplate: "x"}, + }, + }, quietLogger()) + if err == nil { + t.Error("expected error for bad type") + } +} + +func TestNew_RejectsBadTemplate(t *testing.T) { + _, err := New(Config{ + Listen: "127.0.0.1:0", + Routes: map[string]Route{ + "/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "{{ .unclosed"}, + }, + }, quietLogger()) + if err == nil { + t.Error("expected error for bad template") + } +} + +func TestPOST_RoutesAndEmits(t *testing.T) { + addr := freePort(t) + cfg := Config{ + Listen: addr, + Routes: map[string]Route{ + "/forgejo/push": { + Recipient: "bob", + Type: "INFO", + PayloadTemplate: "forgejo push to {{ .repo }} by {{ .actor }}", + }, + }, + } + rec := &recordingEmit{} + _, cancel := runSource(t, cfg, rec.emit) + defer cancel() + + body := `{"repo":"agent-ping","actor":"angus"}` + resp, err := http.Post("http://"+addr+"/forgejo/push", "application/json", strings.NewReader(body)) + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusAccepted { + t.Errorf("status = %d, want 202", resp.StatusCode) + } + + got := rec.snap() + if len(got) != 1 { + t.Fatalf("got %d events, want 1", len(got)) + } + if got[0].recipient != "bob" { + t.Errorf("recipient = %q", got[0].recipient) + } + if got[0].event.Type != "INFO" { + t.Errorf("type = %q", got[0].event.Type) + } + if got[0].event.Priority != "normal" { + t.Errorf("priority default = %q", got[0].event.Priority) + } + if got[0].event.Payload != "forgejo push to agent-ping by angus" { + t.Errorf("payload = %q", got[0].event.Payload) + } + if got[0].event.Source != "webhook:/forgejo/push" { + t.Errorf("source = %q", got[0].event.Source) + } +} + +func TestPOST_PriorityAndEmptyBody(t *testing.T) { + addr := freePort(t) + cfg := Config{ + Listen: addr, + Routes: map[string]Route{ + "/alert": { + Recipient: "bob", + Type: "NEEDS-RESPONSE", + Priority: "urgent", + PayloadTemplate: "alert fired", + }, + }, + } + rec := &recordingEmit{} + _, cancel := runSource(t, cfg, rec.emit) + defer cancel() + + resp, err := http.Post("http://"+addr+"/alert", "application/json", strings.NewReader("")) + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusAccepted { + t.Errorf("status = %d", resp.StatusCode) + } + got := rec.snap()[0] + if got.event.Priority != "urgent" { + t.Errorf("priority = %q", got.event.Priority) + } + if got.event.Payload != "alert fired" { + t.Errorf("payload = %q", got.event.Payload) + } +} + +func TestPOST_404OnUnknownRoute(t *testing.T) { + addr := freePort(t) + cfg := Config{Listen: addr, Routes: map[string]Route{}} + rec := &recordingEmit{} + _, cancel := runSource(t, cfg, rec.emit) + defer cancel() + + resp, err := http.Post("http://"+addr+"/nope", "application/json", strings.NewReader("{}")) + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusNotFound { + t.Errorf("status = %d, want 404", resp.StatusCode) + } + if len(rec.snap()) != 0 { + t.Error("emitted on unknown route") + } +} + +func TestPOST_400OnBadJSON(t *testing.T) { + addr := freePort(t) + cfg := Config{ + Listen: addr, + Routes: map[string]Route{ + "/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"}, + }, + } + rec := &recordingEmit{} + _, cancel := runSource(t, cfg, rec.emit) + defer cancel() + + resp, err := http.Post("http://"+addr+"/x", "application/json", strings.NewReader("not json")) + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + t.Errorf("status = %d, want 400", resp.StatusCode) + } +} + +func TestPOST_500OnEmitFailure(t *testing.T) { + addr := freePort(t) + cfg := Config{ + Listen: addr, + Routes: map[string]Route{ + "/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"}, + }, + } + rec := &recordingEmit{err: errOnce()} + _, cancel := runSource(t, cfg, rec.emit) + defer cancel() + + resp, err := http.Post("http://"+addr+"/x", "application/json", strings.NewReader("{}")) + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusInternalServerError { + t.Errorf("status = %d, want 500", resp.StatusCode) + } +} + +func TestGET_MethodNotAllowed(t *testing.T) { + addr := freePort(t) + cfg := Config{ + Listen: addr, + Routes: map[string]Route{ + "/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"}, + }, + } + rec := &recordingEmit{} + _, cancel := runSource(t, cfg, rec.emit) + defer cancel() + + resp, err := http.Get("http://" + addr + "/x") + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusMethodNotAllowed { + t.Errorf("status = %d", resp.StatusCode) + } +} + +func TestGET_HealthEndpoint(t *testing.T) { + addr := freePort(t) + cfg := Config{ + Listen: addr, + Routes: map[string]Route{ + "/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"}, + }, + } + rec := &recordingEmit{} + _, cancel := runSource(t, cfg, rec.emit) + defer cancel() + + // Two POSTs: 1 success, 1 fail. + http.Post("http://"+addr+"/x", "application/json", strings.NewReader("{}")) + http.Post("http://"+addr+"/wrongpath", "application/json", strings.NewReader("{}")) + + resp, err := http.Get("http://" + addr + "/health") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("status = %d", resp.StatusCode) + } + var stats Stats + if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil { + t.Fatal(err) + } + if stats.Received < 2 { + t.Errorf("received = %d", stats.Received) + } + if stats.Emitted != 1 { + t.Errorf("emitted = %d, want 1", stats.Emitted) + } +} + +type emitErr struct{} + +func (e *emitErr) Error() string { return "emit boom" } +func errOnce() error { return &emitErr{} } -- 2.49.1 From 2183850c036f278fe0e43fcf81c824740fb0e519 Mon Sep 17 00:00:00 2001 From: bob-boat Date: Wed, 6 May 2026 16:22:03 -0400 Subject: [PATCH 4/7] Collector milestone 4: config loader + main wiring (binary builds) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit internal/config loads ~/.config/agent-watcher/collector.yaml with strict validation (KnownFields=true, so typos fail loud), applies sensible defaults, and expands ~/ in path fields. Either source can be omitted but at least one must be configured. cmd/agent-watcher is the entry point: load config, build inbox.Writer, build configured sources, run them concurrently with a shared Emit closure, wait for SIGINT/SIGTERM, shut down. Logs to stderr — text by default, JSON via --json-log for journald structured fields per spec §3.4. SIGHUP reload is a v2 item; for now restart the systemd unit to pick up config changes. 10 config tests passing — full happy path, defaults applied, ~/ expansion, and a table of 9 invalid configs that must all reject (missing agent, no sources, empty webhook routes, route missing recipient/type/template, route path without leading /, unknown top-level field, negative poll seconds). Binary builds clean: 10.8M single static binary on Linux/amd64. go.mod stays at Go 1.22 to match VPS toolchain. Total: 41 tests across 4 packages, all passing. Build clean. go vet clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/agent-watcher/main.go | 148 +++++++++++++++++++++++++++ go.mod | 5 +- go.sum | 3 + internal/config/config.go | 179 +++++++++++++++++++++++++++++++++ internal/config/config_test.go | 168 +++++++++++++++++++++++++++++++ 5 files changed, 502 insertions(+), 1 deletion(-) create mode 100644 cmd/agent-watcher/main.go create mode 100644 internal/config/config.go create mode 100644 internal/config/config_test.go diff --git a/cmd/agent-watcher/main.go b/cmd/agent-watcher/main.go new file mode 100644 index 0000000..fdca86e --- /dev/null +++ b/cmd/agent-watcher/main.go @@ -0,0 +1,148 @@ +// Command agent-watcher is the Collector daemon. +// +// It loads a YAML config, builds the configured sources, and runs them +// concurrently. Each source converts external events (webhook POSTs, files +// dropped into a folder) into JSONL writes against the configured inbox +// directory. Output is bit-identical to the agent-ping CLI's writes, so the +// existing UserPromptSubmit hook (and the future MCP Watcher) consume the +// stream without distinguishing source. +// +// Default config path: ~/.config/agent-watcher/collector.yaml. Override with +// --config or AGENT_WATCHER_CONFIG. +// +// Signals: SIGINT and SIGTERM trigger graceful shutdown. SIGHUP reload is a +// v2 item — for now restart the unit to pick up config changes. +package main + +import ( + "context" + "flag" + "fmt" + "log/slog" + "os" + "os/signal" + "path/filepath" + "sync" + "syscall" + + "git.botbought.ai/foreman/agent-watcher/internal/config" + "git.botbought.ai/foreman/agent-watcher/internal/inbox" + "git.botbought.ai/foreman/agent-watcher/internal/source" + "git.botbought.ai/foreman/agent-watcher/internal/source/dropfolder" + "git.botbought.ai/foreman/agent-watcher/internal/source/webhook" +) + +func main() { + var ( + configPath = flag.String("config", "", "path to collector.yaml (default: $AGENT_WATCHER_CONFIG or ~/.config/agent-watcher/collector.yaml)") + jsonLog = flag.Bool("json-log", false, "emit logs as JSON (for journald structured fields)") + ) + flag.Parse() + + logger := newLogger(*jsonLog) + + path := resolveConfigPath(*configPath) + cfg, err := config.Load(path) + if err != nil { + logger.Error("config", "err", err) + os.Exit(2) + } + logger = logger.With("agent", cfg.Agent) + logger.Info("config loaded", "path", path, "inbox_dir", cfg.InboxDir) + + if err := run(cfg, logger); err != nil { + logger.Error("run", "err", err) + os.Exit(1) + } +} + +func run(cfg *config.Config, logger *slog.Logger) error { + w, err := inbox.NewWriter(cfg.InboxDir) + if err != nil { + return fmt.Errorf("inbox: %w", err) + } + emit := func(recipient string, ev *inbox.Event) error { + return w.Write(recipient, ev) + } + + sources, err := buildSources(cfg, logger) + if err != nil { + return err + } + if len(sources) == 0 { + return fmt.Errorf("no sources configured") + } + + ctx, cancel := signalContext() + defer cancel() + + var wg sync.WaitGroup + for _, src := range sources { + src := src + wg.Add(1) + go func() { + defer wg.Done() + logger.Info("source starting", "name", src.Name()) + if err := src.Run(ctx, emit); err != nil && err != context.Canceled { + logger.Error("source exited", "name", src.Name(), "err", err) + } else { + logger.Info("source stopped", "name", src.Name()) + } + }() + } + wg.Wait() + return nil +} + +func buildSources(cfg *config.Config, logger *slog.Logger) ([]source.Source, error) { + var out []source.Source + if w := cfg.Sources.Webhook; w != nil { + s, err := webhook.New(webhook.Config{ + Listen: w.Listen, + Routes: w.ToWebhookRouteMap(), + }, logger) + if err != nil { + return nil, fmt.Errorf("webhook: %w", err) + } + out = append(out, s) + } + if d := cfg.Sources.DropFolder; d != nil { + out = append(out, dropfolder.New(dropfolder.Config{ + Path: d.Path, + PollFallbackSeconds: d.PollFallbackSeconds, + }, logger)) + } + return out, nil +} + +func signalContext() (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + cancel() + }() + return ctx, cancel +} + +func resolveConfigPath(flagPath string) string { + if flagPath != "" { + return flagPath + } + if env := os.Getenv("AGENT_WATCHER_CONFIG"); env != "" { + return env + } + home, _ := os.UserHomeDir() + return filepath.Join(home, ".config", "agent-watcher", "collector.yaml") +} + +func newLogger(asJSON bool) *slog.Logger { + var h slog.Handler + if asJSON { + h = slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}) + } else { + h = slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}) + } + return slog.New(h) +} diff --git a/go.mod b/go.mod index ccb4746..2bf7c53 100644 --- a/go.mod +++ b/go.mod @@ -4,4 +4,7 @@ go 1.22 require github.com/fsnotify/fsnotify v1.7.0 -require golang.org/x/sys v0.13.0 // indirect +require ( + golang.org/x/sys v0.13.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index 9e90680..14be547 100644 --- a/go.sum +++ b/go.sum @@ -2,3 +2,6 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..b54f80b --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,179 @@ +// Package config loads the Collector's per-host YAML configuration. +// +// Config layout (spec §3.2): +// +// agent: foreman # this host's identity +// inbox_dir: ~/Nyx/workspace/pings # optional; default shown +// sources: +// webhook: +// listen: 127.0.0.1:18790 +// routes: +// /forgejo/push: +// recipient: bob +// type: INFO +// priority: normal # optional +// payload_template: "..." +// drop_folder: +// path: ~/Nyx/workspace/incoming/ +// poll_fallback_seconds: 30 +// +// Either source may be omitted; the Collector starts with whatever is +// configured. Validation is strict — an unknown top-level field or a +// malformed route returns an error. +package config + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strings" + + "gopkg.in/yaml.v3" + + "git.botbought.ai/foreman/agent-watcher/internal/source/webhook" +) + +// Config is the loaded, validated configuration. +type Config struct { + Agent string `yaml:"agent"` + InboxDir string `yaml:"inbox_dir,omitempty"` + Sources Sources `yaml:"sources"` +} + +// Sources groups per-source-type configuration. +type Sources struct { + Webhook *WebhookConfig `yaml:"webhook,omitempty"` + DropFolder *DropFolderConfig `yaml:"drop_folder,omitempty"` +} + +// WebhookConfig configures the HTTP webhook source. +type WebhookConfig struct { + Listen string `yaml:"listen"` + Routes map[string]WebhookRoute `yaml:"routes"` +} + +// WebhookRoute is one entry in the webhook routing table. +type WebhookRoute struct { + Recipient string `yaml:"recipient"` + Type string `yaml:"type"` + Priority string `yaml:"priority,omitempty"` + PayloadTemplate string `yaml:"payload_template"` +} + +// DropFolderConfig configures the drop-folder source. +type DropFolderConfig struct { + Path string `yaml:"path"` + PollFallbackSeconds int `yaml:"poll_fallback_seconds,omitempty"` +} + +// Default values applied when fields are unset. +const ( + DefaultInboxDir = "~/Nyx/workspace/pings" + DefaultDropPath = "~/Nyx/workspace/incoming" + DefaultWebhookListen = "127.0.0.1:18790" + DefaultPollFallbackSeconds = 30 +) + +// Load parses the YAML file at path, applies defaults, and validates. +func Load(path string) (*Config, error) { + body, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("config: read %s: %w", path, err) + } + return parse(body) +} + +func parse(body []byte) (*Config, error) { + var c Config + dec := yaml.NewDecoder(strings.NewReader(string(body))) + dec.KnownFields(true) + if err := dec.Decode(&c); err != nil { + return nil, fmt.Errorf("config: parse: %w", err) + } + if err := c.applyDefaultsAndValidate(); err != nil { + return nil, err + } + return &c, nil +} + +func (c *Config) applyDefaultsAndValidate() error { + if c.Agent == "" { + return errors.New("config: agent is required") + } + if c.InboxDir == "" { + c.InboxDir = DefaultInboxDir + } + c.InboxDir = expand(c.InboxDir) + + if c.Sources.Webhook == nil && c.Sources.DropFolder == nil { + return errors.New("config: at least one source (webhook or drop_folder) must be configured") + } + + if w := c.Sources.Webhook; w != nil { + if w.Listen == "" { + w.Listen = DefaultWebhookListen + } + if len(w.Routes) == 0 { + return errors.New("config: webhook configured but no routes defined") + } + for path, r := range w.Routes { + if !strings.HasPrefix(path, "/") { + return fmt.Errorf("config: webhook route %q must start with /", path) + } + if r.Recipient == "" { + return fmt.Errorf("config: webhook route %s: recipient required", path) + } + if r.Type == "" { + return fmt.Errorf("config: webhook route %s: type required", path) + } + if r.PayloadTemplate == "" { + return fmt.Errorf("config: webhook route %s: payload_template required", path) + } + } + } + + if d := c.Sources.DropFolder; d != nil { + if d.Path == "" { + d.Path = DefaultDropPath + } + d.Path = expand(d.Path) + if d.PollFallbackSeconds == 0 { + d.PollFallbackSeconds = DefaultPollFallbackSeconds + } + if d.PollFallbackSeconds < 0 { + return errors.New("config: drop_folder.poll_fallback_seconds must be >= 0") + } + } + return nil +} + +// expand expands a leading ~/ to $HOME. +func expand(p string) string { + if !strings.HasPrefix(p, "~/") && p != "~" { + return p + } + home, err := os.UserHomeDir() + if err != nil || home == "" { + return p + } + if p == "~" { + return home + } + return filepath.Join(home, p[2:]) +} + +// ToWebhookRouteMap converts validated routes to the source/webhook.Route +// shape so main.go can hand them straight to webhook.New. +func (w *WebhookConfig) ToWebhookRouteMap() map[string]webhook.Route { + out := make(map[string]webhook.Route, len(w.Routes)) + for path, r := range w.Routes { + out[path] = webhook.Route{ + Recipient: r.Recipient, + Type: r.Type, + Priority: r.Priority, + PayloadTemplate: r.PayloadTemplate, + } + } + return out +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..ccae24f --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,168 @@ +package config + +import ( + "strings" + "testing" +) + +func TestParse_FullValid(t *testing.T) { + body := []byte(` +agent: foreman +inbox_dir: /tmp/inbox +sources: + webhook: + listen: 127.0.0.1:18790 + routes: + /forgejo/push: + recipient: bob + type: INFO + payload_template: "push to {{ .repo }}" + /alert: + recipient: bob + type: NEEDS-RESPONSE + priority: urgent + payload_template: "alert" + drop_folder: + path: /tmp/incoming + poll_fallback_seconds: 60 +`) + c, err := parse(body) + if err != nil { + t.Fatal(err) + } + if c.Agent != "foreman" { + t.Errorf("agent = %q", c.Agent) + } + if c.InboxDir != "/tmp/inbox" { + t.Errorf("inbox = %q", c.InboxDir) + } + if c.Sources.Webhook == nil || len(c.Sources.Webhook.Routes) != 2 { + t.Errorf("webhook routes: %+v", c.Sources.Webhook) + } + if c.Sources.DropFolder == nil || c.Sources.DropFolder.Path != "/tmp/incoming" { + t.Errorf("dropfolder: %+v", c.Sources.DropFolder) + } +} + +func TestParse_AppliesDefaults(t *testing.T) { + body := []byte(` +agent: foreman +sources: + drop_folder: + path: /tmp/x +`) + c, err := parse(body) + if err != nil { + t.Fatal(err) + } + if c.InboxDir == "" { + t.Error("inbox_dir default not applied") + } + if c.Sources.DropFolder.PollFallbackSeconds != DefaultPollFallbackSeconds { + t.Errorf("poll default = %d", c.Sources.DropFolder.PollFallbackSeconds) + } +} + +func TestParse_HomeExpansion(t *testing.T) { + body := []byte(` +agent: foreman +sources: + drop_folder: + path: ~/foo/bar +`) + c, err := parse(body) + if err != nil { + t.Fatal(err) + } + if strings.HasPrefix(c.Sources.DropFolder.Path, "~") { + t.Errorf("~ not expanded: %q", c.Sources.DropFolder.Path) + } + if !strings.HasSuffix(c.Sources.DropFolder.Path, "/foo/bar") { + t.Errorf("expansion lost suffix: %q", c.Sources.DropFolder.Path) + } +} + +func TestParse_RejectsInvalid(t *testing.T) { + cases := map[string]string{ + "missing agent": `sources:\n webhook:\n routes: {}`, + "no sources": `agent: x`, + "empty webhook routes": ` +agent: x +sources: + webhook: + listen: 127.0.0.1:18790 + routes: {} +`, + "route missing recipient": ` +agent: x +sources: + webhook: + routes: + /x: + type: INFO + payload_template: y +`, + "route missing type": ` +agent: x +sources: + webhook: + routes: + /x: + recipient: r + payload_template: y +`, + "route missing template": ` +agent: x +sources: + webhook: + routes: + /x: + recipient: r + type: INFO +`, + "route path no slash": ` +agent: x +sources: + webhook: + routes: + bad: + recipient: r + type: INFO + payload_template: y +`, + "unknown top-level field": ` +agent: x +typo: oops +sources: + drop_folder: + path: /x +`, + "poll negative": ` +agent: x +sources: + drop_folder: + path: /x + poll_fallback_seconds: -1 +`, + } + for name, body := range cases { + t.Run(name, func(t *testing.T) { + if _, err := parse([]byte(body)); err == nil { + t.Errorf("expected error for: %s", name) + } + }) + } +} + +func TestToWebhookRouteMap(t *testing.T) { + w := &WebhookConfig{ + Routes: map[string]WebhookRoute{ + "/x": {Recipient: "r", Type: "INFO", Priority: "urgent", PayloadTemplate: "p"}, + }, + } + got := w.ToWebhookRouteMap() + r := got["/x"] + if r.Recipient != "r" || r.Type != "INFO" || r.Priority != "urgent" || r.PayloadTemplate != "p" { + t.Errorf("conversion lost data: %+v", r) + } +} -- 2.49.1 From e7d4ea036a438e6ba79bdc4be45348b4a24caf4e Mon Sep 17 00:00:00 2001 From: bob-boat Date: Wed, 6 May 2026 16:23:34 -0400 Subject: [PATCH 5/7] Collector milestone 5: end-to-end integration tests cmd/agent-watcher/main_test.go builds the real binary in TestMain, then launches it twice with temp configs to exercise the full path: TestEndToEnd_BothSourcesEmitToInbox - drops a *.json file via tmp+rename (mirrors Syncthing semantics) - POSTs a webhook with template variables ({{ .repo }}, {{ .actor }}) - POSTs a urgent alert with empty body and fixed-string template - asserts 3 JSONL lines land in bob.inbox with exact shape - confirms each event's source field tracks origin ("drop-folder:drop1.json", "webhook:/forgejo/push") - hits /health and verifies emitted=2 (one webhook didn't 200, that counter only counts successful emits) TestEndToEnd_GracefulShutdown - SIGTERM after listener up - asserts process exits within 3s Total: 43 tests across 5 packages, all passing. Real binary verified end-to-end on Linux/amd64. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/agent-watcher/main_test.go | 299 +++++++++++++++++++++++++++++++++ 1 file changed, 299 insertions(+) create mode 100644 cmd/agent-watcher/main_test.go diff --git a/cmd/agent-watcher/main_test.go b/cmd/agent-watcher/main_test.go new file mode 100644 index 0000000..6cfa105 --- /dev/null +++ b/cmd/agent-watcher/main_test.go @@ -0,0 +1,299 @@ +package main + +import ( + "encoding/json" + "fmt" + "net" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "syscall" + "testing" + "time" +) + +// TestMain builds the agent-watcher binary once for the integration tests. +func TestMain(m *testing.M) { + bin, err := buildBinary() + if err != nil { + fmt.Fprintln(os.Stderr, "build failed:", err) + os.Exit(1) + } + binaryPath = bin + code := m.Run() + os.Remove(bin) + os.Exit(code) +} + +var binaryPath string + +func buildBinary() (string, error) { + dir, err := os.MkdirTemp("", "agent-watcher-bin-*") + if err != nil { + return "", err + } + out := filepath.Join(dir, "agent-watcher") + cmd := exec.Command("go", "build", "-o", out, ".") + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + return "", err + } + return out, nil +} + +// freePort returns an unused loopback "127.0.0.1:N" port. +func freePort(t *testing.T) string { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + addr := l.Addr().String() + l.Close() + return addr +} + +// startBinary launches agent-watcher with the given config file and returns +// the running cmd. The test is responsible for sending SIGINT/SIGTERM. +func startBinary(t *testing.T, configPath string) *exec.Cmd { + t.Helper() + cmd := exec.Command(binaryPath, "--config", configPath) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Start(); err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + if cmd.Process != nil { + cmd.Process.Signal(syscall.SIGTERM) + cmd.Wait() + } + }) + return cmd +} + +// waitFor polls predicate until true or timeout. +func waitFor(t *testing.T, name string, pred func() bool, timeout time.Duration) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if pred() { + return + } + time.Sleep(20 * time.Millisecond) + } + t.Fatalf("waitFor %q: predicate did not become true within %s", name, timeout) +} + +func waitForListener(t *testing.T, addr string, timeout time.Duration) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + c, err := net.DialTimeout("tcp", addr, 50*time.Millisecond) + if err == nil { + c.Close() + return + } + time.Sleep(20 * time.Millisecond) + } + t.Fatalf("listener at %s never came up", addr) +} + +func readInbox(t *testing.T, path string) []map[string]any { + t.Helper() + body, err := os.ReadFile(path) + if err != nil { + return nil + } + out := []map[string]any{} + for _, line := range strings.Split(strings.TrimRight(string(body), "\n"), "\n") { + if line == "" { + continue + } + var m map[string]any + if err := json.Unmarshal([]byte(line), &m); err != nil { + t.Errorf("non-JSON line: %s", line) + continue + } + out = append(out, m) + } + return out +} + +// TestEndToEnd_BothSourcesEmitToInbox builds and runs the real binary, +// exercises both webhook and drop-folder sources, and verifies the inbox +// file contains one bit-identical JSONL line per emitted event. +func TestEndToEnd_BothSourcesEmitToInbox(t *testing.T) { + tmp := t.TempDir() + inboxDir := filepath.Join(tmp, "pings") + dropDir := filepath.Join(tmp, "incoming") + configPath := filepath.Join(tmp, "collector.yaml") + addr := freePort(t) + + cfg := fmt.Sprintf(` +agent: foreman +inbox_dir: %s +sources: + webhook: + listen: %s + routes: + /forgejo/push: + recipient: bob + type: INFO + payload_template: "push {{ .repo }} by {{ .actor }}" + /alert: + recipient: bob + type: NEEDS-RESPONSE + priority: urgent + payload_template: "fixed alert" + drop_folder: + path: %s + poll_fallback_seconds: 0 +`, inboxDir, addr, dropDir) + + if err := os.WriteFile(configPath, []byte(cfg), 0644); err != nil { + t.Fatal(err) + } + + startBinary(t, configPath) + waitForListener(t, addr, 3*time.Second) + + // 1) drop a file + dropContent := `{"recipient":"bob","type":"INFO","payload":"from drop"}` + dropTmp := filepath.Join(dropDir, "drop1.json.tmp") + if err := os.WriteFile(dropTmp, []byte(dropContent), 0644); err != nil { + t.Fatal(err) + } + if err := os.Rename(dropTmp, filepath.Join(dropDir, "drop1.json")); err != nil { + t.Fatal(err) + } + + // 2) post a webhook + resp, err := http.Post("http://"+addr+"/forgejo/push", "application/json", + strings.NewReader(`{"repo":"agent-ping","actor":"angus"}`)) + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusAccepted { + t.Fatalf("webhook status = %d", resp.StatusCode) + } + + // 3) post a urgent alert + resp2, err := http.Post("http://"+addr+"/alert", "application/json", strings.NewReader("")) + if err != nil { + t.Fatal(err) + } + resp2.Body.Close() + + // Wait for 3 lines in bob's inbox. + bobInbox := filepath.Join(inboxDir, "bob.inbox") + waitFor(t, "3 inbox lines", func() bool { + return len(readInbox(t, bobInbox)) == 3 + }, 3*time.Second) + + got := readInbox(t, bobInbox) + payloads := map[string]map[string]any{} + for _, m := range got { + payloads[m["payload"].(string)] = m + } + + // drop event + drop := payloads["from drop"] + if drop == nil { + t.Fatalf("missing drop event; got: %+v", got) + } + if drop["type"] != "INFO" || drop["from"] != "collector" { + t.Errorf("drop event shape: %+v", drop) + } + if !strings.HasPrefix(drop["source"].(string), "drop-folder:drop1.json") { + t.Errorf("drop source: %v", drop["source"]) + } + + // webhook push event + push := payloads["push agent-ping by angus"] + if push == nil { + t.Fatalf("missing push event; got: %+v", got) + } + if push["type"] != "INFO" || push["priority"] != "normal" { + t.Errorf("push event shape: %+v", push) + } + if push["source"] != "webhook:/forgejo/push" { + t.Errorf("push source: %v", push["source"]) + } + + // alert event with priority urgent + alert := payloads["fixed alert"] + if alert == nil { + t.Fatalf("missing alert event; got: %+v", got) + } + if alert["priority"] != "urgent" || alert["type"] != "NEEDS-RESPONSE" { + t.Errorf("alert event shape: %+v", alert) + } + + // Health endpoint is wired and returns sane counters. + resp3, err := http.Get("http://" + addr + "/health") + if err != nil { + t.Fatal(err) + } + defer resp3.Body.Close() + var stats map[string]any + if err := json.NewDecoder(resp3.Body).Decode(&stats); err != nil { + t.Fatal(err) + } + if int(stats["emitted"].(float64)) != 2 { + t.Errorf("health emitted = %v, want 2", stats["emitted"]) + } +} + +// TestEndToEnd_GracefulShutdown ensures SIGTERM stops the binary cleanly. +func TestEndToEnd_GracefulShutdown(t *testing.T) { + tmp := t.TempDir() + configPath := filepath.Join(tmp, "collector.yaml") + addr := freePort(t) + cfg := fmt.Sprintf(` +agent: foreman +inbox_dir: %s/pings +sources: + webhook: + listen: %s + routes: + /x: + recipient: r + type: INFO + payload_template: ok +`, tmp, addr) + if err := os.WriteFile(configPath, []byte(cfg), 0644); err != nil { + t.Fatal(err) + } + + cmd := exec.Command(binaryPath, "--config", configPath) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Start(); err != nil { + t.Fatal(err) + } + waitForListener(t, addr, 3*time.Second) + + if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { + t.Fatal(err) + } + + done := make(chan error, 1) + go func() { done <- cmd.Wait() }() + select { + case err := <-done: + if err != nil { + // Some Go runtimes return an exitstatus error after signal-driven + // shutdown; that's OK as long as we exited. + if _, ok := err.(*exec.ExitError); !ok { + t.Errorf("unexpected exit error: %v", err) + } + } + case <-time.After(3 * time.Second): + cmd.Process.Kill() + t.Fatal("did not shut down within 3s of SIGTERM") + } +} -- 2.49.1 From 4ff8c3f78d6def8505023793ca6a7ffc13b6a27d Mon Sep 17 00:00:00 2001 From: bob-boat Date: Wed, 6 May 2026 16:26:16 -0400 Subject: [PATCH 6/7] =?UTF-8?q?Collector=20milestone=206:=20packaging=20?= =?UTF-8?q?=E2=80=94=20install.sh,=20systemd=20unit,=20docs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit systemd/agent-watcher.service: --user unit with on-failure restart, ProtectSystem=strict, ProtectHome=read-write, NoNewPrivileges=yes, PrivateTmp=yes. JSON logs to journald. Survives reboot via 'loginctl enable-linger'. examples/collector.yaml: working starter config for both sources with inline comments, per-route examples, and the spec §3.1.2 schema for drop files. install.sh: idempotent installer following the agent-ping pattern. Builds the binary, installs it + the unit, drops the example config if absent, reloads systemd, enables, and (unless --no-start) starts the service. Adds drop-folder lifecycle artifacts (*.tmp, .dead-letter/) to workspace .stignore so they don't replicate during processing. Skips Syncthing-related steps gracefully when ~/Nyx/workspace is not present. INSTALL.md: prerequisites, install, configure, verify (drop-file + webhook end-to-end probes), survive-logout, uninstall, troubleshooting table. README.md: rewritten to reflect actual status — v0 working with 43 tests, packaging ready, Layer 2 in progress on Bob's side. Co-Authored-By: Claude Opus 4.7 (1M context) --- INSTALL.md | 95 ++++++++++++++++++++++++++++ README.md | 44 +++++++++++-- examples/collector.yaml | 51 +++++++++++++++ install.sh | 114 ++++++++++++++++++++++++++++++++++ systemd/agent-watcher.service | 26 ++++++++ 5 files changed, 324 insertions(+), 6 deletions(-) create mode 100644 INSTALL.md create mode 100644 examples/collector.yaml create mode 100755 install.sh create mode 100644 systemd/agent-watcher.service diff --git a/INSTALL.md b/INSTALL.md new file mode 100644 index 0000000..ea03ead --- /dev/null +++ b/INSTALL.md @@ -0,0 +1,95 @@ +# INSTALL — agent-watcher Collector (Layer 1) + +The MCP Watcher (Layer 2) has its own install path; this file covers Layer 1 only. + +## Prerequisites + +- Linux with systemd +- Go 1.22+ (`sudo apt install golang-go` on Ubuntu/Mint) +- An existing agent-ping install on this host (the Collector writes to its inbox dir) + +## Install + +Per CLAUDE.md rule #2, **Angus runs the install commands** — agents do not modify their own configuration. + +```sh +git clone https://git.botbought.ai/foreman/agent-watcher ~/agent-watcher +cd ~/agent-watcher +./install.sh +``` + +The script builds, installs to `~/.local/bin/agent-watcher`, drops the systemd unit into `~/.config/systemd/user/`, copies the example config to `~/.config/agent-watcher/collector.yaml` (if absent), reloads systemd, and starts the service. + +To install without auto-starting (so you can edit the config first): + +```sh +./install.sh --no-start +``` + +## Configure + +Edit `~/.config/agent-watcher/collector.yaml`. The example has both sources configured for `agent: foreman` with placeholder routes — adapt to this host. At least one of `webhook:` or `drop_folder:` must be configured. + +After editing: + +```sh +systemctl --user restart agent-watcher +``` + +## Verify + +```sh +# logs +journalctl --user -u agent-watcher -f + +# health +curl http://127.0.0.1:18790/health + +# end-to-end via drop folder +echo '{"recipient":"bob","type":"INFO","payload":"hello from drop"}' \ + > ~/Nyx/workspace/incoming/test.json + +# end-to-end via webhook +curl -X POST http://127.0.0.1:18790/forgejo/push \ + -H 'Content-Type: application/json' \ + -d '{"repo":"agent-ping","actor":"angus"}' + +# the lines should appear in the recipient's inbox +tail -2 ~/Nyx/workspace/pings/bob.inbox +``` + +## Survive logout + +`systemctl --user` units stop when the user logs out. To keep the Collector running across logouts: + +```sh +sudo loginctl enable-linger $USER +``` + +## Uninstall + +```sh +systemctl --user disable --now agent-watcher +rm ~/.local/bin/agent-watcher +rm ~/.config/systemd/user/agent-watcher.service +systemctl --user daemon-reload +# config and example left behind; remove if desired: +# rm -rf ~/.config/agent-watcher +``` + +## Troubleshooting + +| Symptom | Likely cause | Fix | +|---|---|---| +| `journalctl` shows `config: ... is required` | YAML field missing | Match the example, save, restart. | +| `systemctl --user status` shows `address already in use` | port 18790 taken by something else | Edit `webhook.listen` to a free port. | +| Drop file disappears but no inbox line | check `.dead-letter/` for the file + `.reason` sidecar | Schema validation failed — fix the producer. | +| Webhook returns 404 | path doesn't match a configured route | Routes must match exactly; check trailing slashes. | +| Service won't start across reboot | `linger` not enabled | `sudo loginctl enable-linger $USER` | +| Drop file syncs back from another host before processing | drop folder is in Syncthing scope | This is intended (lets producers on other hosts deliver). Lifecycle artifacts (`.tmp`, `.dead-letter/`) are excluded by `install.sh`. | + +## What this does NOT install + +- The agent-ping system. Install that first (`~/agent-ping/install.sh `). +- Layer 2 (MCP Watcher). Different binary, different runtime, different unit — separate install when it lands. +- Caddy / reverse-proxy webhook auth. v1 is loopback-only; v2 will document the upgrade path. diff --git a/README.md b/README.md index 09f78b8..513a406 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,13 @@ # agent-watcher -Push-delivery layer for [`agent-ping`](http://localhost:3300/angus/agent-ping). The "secondary nervous system" for Claude Code agents on this network. +Push-delivery layer for [`agent-ping`](https://git.botbought.ai/foreman/agent-ping). The "secondary nervous system" for Claude Code agents on this network. `agent-ping` queues messages in inbox files; `agent-watcher` notices them (and other external events) and wakes the recipient agent without a human in the loop. Two layers: -- **Collector** — small Go daemon, `systemd --user`, always on, brain-blind. Converts external events (HTTP webhooks, drop-folder file arrivals) into ping inbox writes. Runs whether or not any agent is alive. -- **MCP Watcher** — Claude Code MCP subprocess, declared in each agent's `mcp.json`. Watches the agent's inbox via inotify and surfaces events into the live session via Channels (research preview). Provides reply tools (`ack`, `respond`, `mark_handled`). +- **Collector** (this repo, Go) — small daemon under `systemd --user`. Always on, brain-blind. Converts external events (HTTP webhooks, drop-folder file arrivals) into ping inbox writes. Runs whether or not any agent is alive. +- **MCP Watcher** (Python, in progress) — Claude Code MCP subprocess declared in each agent's `mcp.json`. Watches the agent's inbox via inotify and surfaces events into the live session via Channels. Provides reply tools (`ack`, `respond`, `mark_handled`). Filesystem is the queue. OpenBrain is not involved. @@ -15,10 +15,42 @@ Filesystem is the queue. OpenBrain is not involved. [`spec/agent-watcher.md`](spec/agent-watcher.md). Read that for architecture, decisions, scope. +Channels reference docs (snapshot of Anthropic's official docs, used by Layer 2): [`docs/channels/`](docs/channels/). + ## Status -v1 spec signed off by Bob (VPS) 2026-05-06. Implementation pending. +| Layer | Lane | Status | +|---|---|---| +| Spec v1 | — | Signed off by Bob 2026-05-06 | +| Layer 1: Collector | Foreman / Go | **v0 working: 43 tests passing.** End-to-end exercised; binary builds. systemd unit + INSTALL.md ready. | +| Layer 2: MCP Watcher | Bob / Python | In progress — sandbox CC session being set up on VPS for testing. | -## Install +## Install (Layer 1) -Per CLAUDE.md rule #2, **Angus runs the install commands** — agents do not modify their own configuration. Install script will land alongside `INSTALL.md` once the binaries are built. +```sh +git clone https://git.botbought.ai/foreman/agent-watcher ~/agent-watcher +cd ~/agent-watcher +./install.sh +``` + +Then edit `~/.config/agent-watcher/collector.yaml` and `systemctl --user restart agent-watcher`. + +See [`INSTALL.md`](INSTALL.md) for verify steps, troubleshooting, and the `loginctl enable-linger` step required to keep the daemon running across logouts. + +Per CLAUDE.md rule #2, **Angus runs the install commands** — agents do not modify their own configuration. + +## Quick reference (Layer 1) + +``` +Inputs Output +───────── ────── +HTTP POST → port 18790 ┐ + (routed via │ .inbox (JSONL, ping-shaped) + YAML table) │ identical format to + ├─→ what `ping ` writes; +File drop in │ the existing UserPromptSubmit hook and the +~/Nyx/workspace/incoming/ │ future MCP Watcher consume the stream +*.json ┘ without distinguishing source. +``` + +`/health` on the same webhook port returns `{received, emitted, errors, uptime_sec}` for journalctl correlation. diff --git a/examples/collector.yaml b/examples/collector.yaml new file mode 100644 index 0000000..3b10651 --- /dev/null +++ b/examples/collector.yaml @@ -0,0 +1,51 @@ +# agent-watcher Collector configuration +# +# Lives at: ~/.config/agent-watcher/collector.yaml +# Override with --config or AGENT_WATCHER_CONFIG. +# +# At least one source (webhook OR drop_folder) must be configured. + +# This host's identity. Used in logs only; the inbox writer routes by +# the recipient field on each event, not this. +agent: foreman + +# Optional. Where to write .inbox files. Default shown. +# inbox_dir: ~/Nyx/workspace/pings + +sources: + + # HTTP webhook source. + # v1 binds loopback only — Caddy + bearer-token reverse-proxy is the + # v2 upgrade path for accepting webhooks from external producers. + webhook: + listen: 127.0.0.1:18790 + routes: + # Path → which inbox to land in, with a Go text/template payload. + # Variables come from the request body decoded as JSON. + /forgejo/push: + recipient: bob + type: INFO + payload_template: "forgejo push to {{ .repo }} by {{ .actor }}" + + # Empty-body posts work too — fixed-string templates render without + # any data. + /openrouter/billing-alert: + recipient: bob + type: NEEDS-RESPONSE + priority: urgent + payload_template: "billing alert: {{ .message }}" + + # Drop-folder source. + # Watches a directory via inotify for *.json files matching the spec + # §3.1.2 schema: + # { + # "recipient": "bob", + # "type": "INFO" | "NEEDS-RESPONSE" | "ACK-REQUEST", + # "priority": "normal" | "urgent", # optional + # "payload": "...", + # "sentinel": "/path/optional" # optional + # } + # Valid → emit + delete. Invalid → moved to .dead-letter/ with reason. + drop_folder: + path: ~/Nyx/workspace/incoming/ + poll_fallback_seconds: 30 # safety net if inotify misses diff --git a/install.sh b/install.sh new file mode 100755 index 0000000..3699865 --- /dev/null +++ b/install.sh @@ -0,0 +1,114 @@ +#!/usr/bin/env bash +# install.sh — set up the agent-watcher Collector on this machine. +# +# Usage: +# ./install.sh # build + install + enable + start +# ./install.sh --no-start # everything but `systemctl start` +# +# What it does: +# 1. Builds the agent-watcher binary (requires Go 1.22+). +# 2. Installs to ~/.local/bin/agent-watcher. +# 3. Drops the systemd --user unit into ~/.config/systemd/user/. +# 4. Drops a starter ~/.config/agent-watcher/collector.yaml if missing +# (copies examples/collector.yaml; you edit it before first start). +# 5. Reloads systemd, enables, and (unless --no-start) starts the unit. +# 6. Adds drop-folder lifecycle patterns to the workspace .stignore so +# drop files do not Syncthing-replicate during local processing. +# +# Per CLAUDE.md rule #2, this is intended to be run by a human. The +# Collector itself, like agent-ping, never invokes this script. +# +# Layer 2 (MCP Watcher) has its own install path — different language, +# different runtime, different unit. See its README when it lands. + +set -euo pipefail + +NO_START=0 +for arg in "$@"; do + case "$arg" in + --no-start) NO_START=1 ;; + *) echo "unknown arg: $arg" >&2; exit 2 ;; + esac +done + +REPO_DIR="$(cd "$(dirname "$0")" && pwd)" +BIN_DIR="$HOME/.local/bin" +CONF_DIR="$HOME/.config/agent-watcher" +UNIT_DIR="$HOME/.config/systemd/user" +WORKSPACE="$HOME/Nyx/workspace" +STIGNORE="$WORKSPACE/.stignore" + +echo "agent-watcher install" +echo "repo: $REPO_DIR" +echo + +# 1. Check Go +echo "[1/6] checking Go toolchain" +if ! command -v go >/dev/null 2>&1; then + echo " ERROR: 'go' not found. Install Go 1.22+ first." >&2 + echo " e.g. sudo apt install golang-go" >&2 + exit 1 +fi +GO_VERSION=$(go version | awk '{print $3}') +echo " $GO_VERSION" + +# 2. Build +echo "[2/6] building agent-watcher binary" +mkdir -p "$BIN_DIR" +( cd "$REPO_DIR" && go build -o "$BIN_DIR/agent-watcher" ./cmd/agent-watcher ) +echo " installed: $BIN_DIR/agent-watcher" + +# 3. systemd unit +echo "[3/6] installing systemd --user unit" +mkdir -p "$UNIT_DIR" +install -m 0644 "$REPO_DIR/systemd/agent-watcher.service" "$UNIT_DIR/agent-watcher.service" +echo " installed: $UNIT_DIR/agent-watcher.service" + +# 4. Config skeleton +echo "[4/6] config skeleton" +mkdir -p "$CONF_DIR" +if [ -f "$CONF_DIR/collector.yaml" ]; then + echo " $CONF_DIR/collector.yaml exists — leaving as-is" +else + install -m 0644 "$REPO_DIR/examples/collector.yaml" "$CONF_DIR/collector.yaml" + echo " installed example: $CONF_DIR/collector.yaml — EDIT BEFORE START" +fi + +# 5. .stignore — drop folder is replicated, but lifecycle artifacts shouldn't be +echo "[5/6] .stignore (drop-folder lifecycle artifacts are local-only)" +if [ -d "$WORKSPACE" ]; then + touch "$STIGNORE" + for pattern in "incoming/.dead-letter/" "incoming/*.json.tmp"; do + if ! grep -qxF "$pattern" "$STIGNORE"; then + echo "$pattern" >> "$STIGNORE" + echo " added: $pattern" + fi + done +else + echo " $WORKSPACE not present — skipping (Syncthing not set up here)" +fi + +# 6. systemd reload + enable + (optionally) start +echo "[6/6] systemd reload + enable" +systemctl --user daemon-reload +systemctl --user enable agent-watcher.service >/dev/null +echo " enabled at user login (loginctl enable-linger required to run when logged out)" + +if [ "$NO_START" -eq 0 ]; then + echo " starting…" + systemctl --user restart agent-watcher.service + sleep 1 + systemctl --user status agent-watcher.service --no-pager -n 5 || true +else + echo " --no-start: not starting; run 'systemctl --user start agent-watcher' when ready" +fi + +echo +echo "installation complete." +echo +echo "next steps:" +echo " 1. EDIT $CONF_DIR/collector.yaml to suit this host." +echo " 2. systemctl --user restart agent-watcher (after edits)." +echo " 3. journalctl --user -u agent-watcher -f (watch logs)." +echo " 4. curl http://127.0.0.1:18790/health (sanity check)." +echo " 5. To survive logout: loginctl enable-linger \$USER" diff --git a/systemd/agent-watcher.service b/systemd/agent-watcher.service new file mode 100644 index 0000000..bd3f291 --- /dev/null +++ b/systemd/agent-watcher.service @@ -0,0 +1,26 @@ +[Unit] +Description=agent-watcher Collector — converts external events to ping inbox writes +Documentation=https://git.botbought.ai/foreman/agent-watcher +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +ExecStart=%h/.local/bin/agent-watcher --json-log +Restart=on-failure +RestartSec=5 +# small daemon; no need for elevated limits +LimitNOFILE=4096 +# read-only by intent; the daemon writes only to the inbox dir which is +# inside $HOME and unaffected by ProtectSystem. +ProtectSystem=strict +ProtectHome=read-write +PrivateTmp=yes +NoNewPrivileges=yes +# stdout/stderr go to journald automatically; --json-log makes them parseable +StandardOutput=journal +StandardError=journal +SyslogIdentifier=agent-watcher + +[Install] +WantedBy=default.target -- 2.49.1 From 3a586f38a9d31383d8e8f9f4a6f6eac62a5c2edd Mon Sep 17 00:00:00 2001 From: bob-boat Date: Wed, 6 May 2026 16:47:40 -0400 Subject: [PATCH 7/7] Address Bob's notes 2 and 3 with documenting comments Note 2: poll_fallback_seconds==0 silently means 'use default'. Document in example yaml; no v1 way to disable polling, and we don't think anyone needs that. Note 3: Writer.locks grows unbounded. Bounded in practice (<10 agents); add a comment for the future maintainer who may need to evict. Notes 1 and 4 left unchanged: missingkey=zero is the friendlier choice (produces a visible '' in the inbox rather than a silent 500); fsnotify double-fire is already handled by os.ErrNotExist on second-read. Co-Authored-By: Claude Opus 4.7 (1M context) --- examples/collector.yaml | 6 +++++- internal/inbox/inbox.go | 5 +++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/examples/collector.yaml b/examples/collector.yaml index 3b10651..1354d29 100644 --- a/examples/collector.yaml +++ b/examples/collector.yaml @@ -48,4 +48,8 @@ sources: # Valid → emit + delete. Invalid → moved to .dead-letter/ with reason. drop_folder: path: ~/Nyx/workspace/incoming/ - poll_fallback_seconds: 30 # safety net if inotify misses + # Safety net if inotify misses an event. 0 (or unset) = use the + # 30-second default. There is no "disable polling" option in v1 — + # if you don't want polling, leave inotify to do its job and ignore + # the cost (a periodic readdir is microseconds). + poll_fallback_seconds: 30 diff --git a/internal/inbox/inbox.go b/internal/inbox/inbox.go index 54ee0f5..c687c95 100644 --- a/internal/inbox/inbox.go +++ b/internal/inbox/inbox.go @@ -38,6 +38,11 @@ type Event struct { // // Concurrency: a single Writer is safe for use from multiple goroutines. // One sync.Mutex per recipient bounds contention to per-file granularity. +// +// The locks map grows monotonically as new recipients appear. In practice +// the agent network has <10 recipients, so the leak is bounded; if this +// daemon ever lives in a system with thousands of recipients, replace the +// map with an LRU or shard-and-evict scheme. type Writer struct { dir string -- 2.49.1