diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9bfd37c --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.botbought.ai/foreman/agent-watcher + +go 1.22.2 diff --git a/internal/inbox/inbox.go b/internal/inbox/inbox.go new file mode 100644 index 0000000..54ee0f5 --- /dev/null +++ b/internal/inbox/inbox.go @@ -0,0 +1,124 @@ +// Package inbox writes ping-shaped JSONL events to recipient inbox files. +// +// The output format is bit-identical to the agent-ping CLI's output, so the +// existing UserPromptSubmit hook and the future MCP Watcher cannot tell whether +// a line was written by `ping` or by the Collector. +// +// All writes are append-only with O_APPEND so concurrent writers don't tear +// each other's lines (POSIX guarantees atomicity for writes <= PIPE_BUF, and +// our lines are well under that). +package inbox + +import ( + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" +) + +// Event is the JSONL line shape. Field order and JSON keys match the ping CLI +// exactly. Optional fields use omitempty. +type Event struct { + TS string `json:"ts"` + ID string `json:"id"` + From string `json:"from"` + To string `json:"to"` + Type string `json:"type"` + Priority string `json:"priority"` + Payload string `json:"payload"` + Sentinel string `json:"sentinel,omitempty"` + Source string `json:"source,omitempty"` // collector-only debug field, e.g. "webhook:/forgejo/push" +} + +// Writer appends events to inbox files under a base directory. +// +// Concurrency: a single Writer is safe for use from multiple goroutines. +// One sync.Mutex per recipient bounds contention to per-file granularity. +type Writer struct { + dir string + + mu sync.Mutex + locks map[string]*sync.Mutex +} + +// NewWriter returns a Writer that appends to {dir}/{recipient}.inbox. +// The directory is created (with perms 0755) if it does not exist. +func NewWriter(dir string) (*Writer, error) { + if dir == "" { + return nil, fmt.Errorf("inbox: empty dir") + } + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("inbox: mkdir %s: %w", dir, err) + } + return &Writer{ + dir: dir, + locks: make(map[string]*sync.Mutex), + }, nil +} + +// Write appends one event to the recipient's inbox file. It mutates ev to set +// TS and ID if either is empty, then sets To from recipient. +func (w *Writer) Write(recipient string, ev *Event) error { + if recipient == "" { + return fmt.Errorf("inbox: empty recipient") + } + if ev == nil { + return fmt.Errorf("inbox: nil event") + } + if ev.TS == "" { + ev.TS = time.Now().UTC().Format("2006-01-02T15:04:05Z") + } + if ev.ID == "" { + id, err := newID() + if err != nil { + return fmt.Errorf("inbox: generate id: %w", err) + } + ev.ID = id + } + ev.To = recipient + + line, err := json.Marshal(ev) + if err != nil { + return fmt.Errorf("inbox: marshal: %w", err) + } + line = append(line, '\n') + + lock := w.lockFor(recipient) + lock.Lock() + defer lock.Unlock() + + path := filepath.Join(w.dir, recipient+".inbox") + f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("inbox: open %s: %w", path, err) + } + defer f.Close() + + if _, err := f.Write(line); err != nil { + return fmt.Errorf("inbox: append: %w", err) + } + return nil +} + +func (w *Writer) lockFor(recipient string) *sync.Mutex { + w.mu.Lock() + defer w.mu.Unlock() + if l, ok := w.locks[recipient]; ok { + return l + } + l := &sync.Mutex{} + w.locks[recipient] = l + return l +} + +func newID() (string, error) { + b := make([]byte, 4) + if _, err := rand.Read(b); err != nil { + return "", err + } + return "ping-" + hex.EncodeToString(b), nil +} diff --git a/internal/inbox/inbox_test.go b/internal/inbox/inbox_test.go new file mode 100644 index 0000000..22f199e --- /dev/null +++ b/internal/inbox/inbox_test.go @@ -0,0 +1,182 @@ +package inbox + +import ( + "encoding/json" + "os" + "path/filepath" + "strings" + "sync" + "testing" +) + +func TestWriter_Write_BasicShape(t *testing.T) { + dir := t.TempDir() + w, err := NewWriter(dir) + if err != nil { + t.Fatal(err) + } + + if err := w.Write("bob", &Event{ + From: "foreman", + Type: "INFO", + Priority: "normal", + Payload: "hi", + }); err != nil { + t.Fatal(err) + } + + got := readLines(t, filepath.Join(dir, "bob.inbox")) + if len(got) != 1 { + t.Fatalf("want 1 line, got %d", len(got)) + } + + var ev Event + if err := json.Unmarshal([]byte(got[0]), &ev); err != nil { + t.Fatalf("not JSON: %v", err) + } + if ev.To != "bob" { + t.Errorf("To = %q, want bob", ev.To) + } + if ev.From != "foreman" { + t.Errorf("From = %q, want foreman", ev.From) + } + if ev.Payload != "hi" { + t.Errorf("Payload = %q", ev.Payload) + } + if !strings.HasPrefix(ev.ID, "ping-") { + t.Errorf("ID %q missing ping- prefix", ev.ID) + } + if len(ev.ID) != len("ping-")+8 { + t.Errorf("ID %q wrong length", ev.ID) + } + if ev.TS == "" { + t.Errorf("TS unset") + } +} + +func TestWriter_Write_PreservesProvidedIDAndTS(t *testing.T) { + dir := t.TempDir() + w, err := NewWriter(dir) + if err != nil { + t.Fatal(err) + } + + want := &Event{ + TS: "2026-05-06T12:00:00Z", + ID: "ping-deadbeef", + From: "x", + Type: "INFO", + Priority: "normal", + Payload: "p", + } + if err := w.Write("r", want); err != nil { + t.Fatal(err) + } + + got := readLines(t, filepath.Join(dir, "r.inbox")) + var ev Event + json.Unmarshal([]byte(got[0]), &ev) + if ev.ID != want.ID { + t.Errorf("ID overwritten: got %q want %q", ev.ID, want.ID) + } + if ev.TS != want.TS { + t.Errorf("TS overwritten: got %q want %q", ev.TS, want.TS) + } +} + +func TestWriter_Write_OmitsEmptyOptionalFields(t *testing.T) { + dir := t.TempDir() + w, _ := NewWriter(dir) + w.Write("r", &Event{From: "f", Type: "INFO", Priority: "normal", Payload: "p"}) + + line := readLines(t, filepath.Join(dir, "r.inbox"))[0] + if strings.Contains(line, "sentinel") { + t.Errorf("empty sentinel leaked into JSON: %s", line) + } + if strings.Contains(line, "source") { + t.Errorf("empty source leaked into JSON: %s", line) + } +} + +func TestWriter_Write_PingShapeMatchesCLI(t *testing.T) { + // The ping CLI emits JSON with separators=(",", ":") (compact) and these + // keys in this order: ts, id, from, to, type, priority, payload [, sentinel]. + // Verify our marshaled output preserves the key set and compactness. + dir := t.TempDir() + w, _ := NewWriter(dir) + w.Write("r", &Event{ + From: "f", Type: "INFO", Priority: "normal", Payload: "p", Sentinel: "/x", + }) + line := strings.TrimSpace(readLines(t, filepath.Join(dir, "r.inbox"))[0]) + + wantKeys := []string{`"ts":`, `"id":`, `"from":`, `"to":`, `"type":`, `"priority":`, `"payload":`, `"sentinel":`} + for _, k := range wantKeys { + if !strings.Contains(line, k) { + t.Errorf("missing key %s in: %s", k, line) + } + } + // compact (no spaces around colons or commas) + if strings.Contains(line, ": ") || strings.Contains(line, ", ") { + t.Errorf("non-compact JSON: %s", line) + } +} + +func TestWriter_Write_ConcurrentSameRecipient(t *testing.T) { + // 100 goroutines writing to the same inbox should produce 100 valid JSON + // lines, none torn or interleaved. + dir := t.TempDir() + w, _ := NewWriter(dir) + + const n = 100 + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + w.Write("r", &Event{From: "f", Type: "INFO", Priority: "normal", Payload: "x"}) + }(i) + } + wg.Wait() + + lines := readLines(t, filepath.Join(dir, "r.inbox")) + if len(lines) != n { + t.Fatalf("got %d lines, want %d (torn writes?)", len(lines), n) + } + for i, l := range lines { + var ev Event + if err := json.Unmarshal([]byte(l), &ev); err != nil { + t.Errorf("line %d not JSON: %v: %s", i, err, l) + } + } +} + +func TestWriter_Write_RejectsBadInputs(t *testing.T) { + w, _ := NewWriter(t.TempDir()) + if err := w.Write("", &Event{}); err == nil { + t.Error("empty recipient: expected error") + } + if err := w.Write("r", nil); err == nil { + t.Error("nil event: expected error") + } +} + +func TestNewWriter_RejectsEmptyDir(t *testing.T) { + if _, err := NewWriter(""); err == nil { + t.Error("expected error for empty dir") + } +} + +func readLines(t *testing.T, path string) []string { + t.Helper() + b, err := os.ReadFile(path) + if err != nil { + t.Fatal(err) + } + out := []string{} + for _, l := range strings.Split(strings.TrimRight(string(b), "\n"), "\n") { + if l != "" { + out = append(out, l) + } + } + return out +}