agent-watcher/internal/inbox/inbox.go
bob-boat 50e8ece83d Collector milestone 1: inbox writer + tests
Layer 1 keystone. The internal/inbox package writes ping-shaped JSONL
events to recipient inbox files in a format bit-identical to the
agent-ping CLI's output, so the existing UserPromptSubmit hook and the
future MCP Watcher cannot tell whether a line came from `ping` or the
Collector.

- O_APPEND opens for atomic line writes (POSIX guarantees writes <=
  PIPE_BUF, our lines are well under).
- Per-recipient sync.Mutex bounds contention; multiple goroutines
  writing to one inbox stay correctly serialized.
- 7 tests passing: shape, ID/TS preservation, omitempty for optional
  fields, key-set + compactness match against ping CLI's separators=
  (",",":") output, 100-goroutine concurrent-write torn-line check,
  bad-input rejection, empty-dir rejection.

go.mod at git.botbought.ai/foreman/agent-watcher; module name matches
the public Forgejo path so eventual consumers can `go get` it.

Next milestones:
- Source plugin interface
- Drop folder source (inotify, via fsnotify)
- HTTP webhook source
- Config loader (YAML)
- main.go wiring
- systemd unit

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 16:09:56 -04:00

124 lines
3.2 KiB
Go

// Package inbox writes ping-shaped JSONL events to recipient inbox files.
//
// The output format is bit-identical to the agent-ping CLI's output, so the
// existing UserPromptSubmit hook and the future MCP Watcher cannot tell whether
// a line was written by `ping` or by the Collector.
//
// All writes are append-only with O_APPEND so concurrent writers don't tear
// each other's lines (POSIX guarantees atomicity for writes <= PIPE_BUF, and
// our lines are well under that).
package inbox
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
)
// Event is the JSONL line shape. Field order and JSON keys match the ping CLI
// exactly. Optional fields use omitempty.
type Event struct {
TS string `json:"ts"`
ID string `json:"id"`
From string `json:"from"`
To string `json:"to"`
Type string `json:"type"`
Priority string `json:"priority"`
Payload string `json:"payload"`
Sentinel string `json:"sentinel,omitempty"`
Source string `json:"source,omitempty"` // collector-only debug field, e.g. "webhook:/forgejo/push"
}
// Writer appends events to inbox files under a base directory.
//
// Concurrency: a single Writer is safe for use from multiple goroutines.
// One sync.Mutex per recipient bounds contention to per-file granularity.
type Writer struct {
dir string
mu sync.Mutex
locks map[string]*sync.Mutex
}
// NewWriter returns a Writer that appends to {dir}/{recipient}.inbox.
// The directory is created (with perms 0755) if it does not exist.
func NewWriter(dir string) (*Writer, error) {
if dir == "" {
return nil, fmt.Errorf("inbox: empty dir")
}
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("inbox: mkdir %s: %w", dir, err)
}
return &Writer{
dir: dir,
locks: make(map[string]*sync.Mutex),
}, nil
}
// Write appends one event to the recipient's inbox file. It mutates ev to set
// TS and ID if either is empty, then sets To from recipient.
func (w *Writer) Write(recipient string, ev *Event) error {
if recipient == "" {
return fmt.Errorf("inbox: empty recipient")
}
if ev == nil {
return fmt.Errorf("inbox: nil event")
}
if ev.TS == "" {
ev.TS = time.Now().UTC().Format("2006-01-02T15:04:05Z")
}
if ev.ID == "" {
id, err := newID()
if err != nil {
return fmt.Errorf("inbox: generate id: %w", err)
}
ev.ID = id
}
ev.To = recipient
line, err := json.Marshal(ev)
if err != nil {
return fmt.Errorf("inbox: marshal: %w", err)
}
line = append(line, '\n')
lock := w.lockFor(recipient)
lock.Lock()
defer lock.Unlock()
path := filepath.Join(w.dir, recipient+".inbox")
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("inbox: open %s: %w", path, err)
}
defer f.Close()
if _, err := f.Write(line); err != nil {
return fmt.Errorf("inbox: append: %w", err)
}
return nil
}
func (w *Writer) lockFor(recipient string) *sync.Mutex {
w.mu.Lock()
defer w.mu.Unlock()
if l, ok := w.locks[recipient]; ok {
return l
}
l := &sync.Mutex{}
w.locks[recipient] = l
return l
}
func newID() (string, error) {
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
return "", err
}
return "ping-" + hex.EncodeToString(b), nil
}