// Package inbox writes ping-shaped JSONL events to recipient inbox files. // // The output format is bit-identical to the agent-ping CLI's output, so the // existing UserPromptSubmit hook and the future MCP Watcher cannot tell whether // a line was written by `ping` or by the Collector. // // All writes are append-only with O_APPEND so concurrent writers don't tear // each other's lines (POSIX guarantees atomicity for writes <= PIPE_BUF, and // our lines are well under that). package inbox import ( "crypto/rand" "encoding/hex" "encoding/json" "fmt" "os" "path/filepath" "sync" "time" ) // Event is the JSONL line shape. Field order and JSON keys match the ping CLI // exactly. Optional fields use omitempty. type Event struct { TS string `json:"ts"` ID string `json:"id"` From string `json:"from"` To string `json:"to"` Type string `json:"type"` Priority string `json:"priority"` Payload string `json:"payload"` Sentinel string `json:"sentinel,omitempty"` Source string `json:"source,omitempty"` // collector-only debug field, e.g. "webhook:/forgejo/push" } // Writer appends events to inbox files under a base directory. // // Concurrency: a single Writer is safe for use from multiple goroutines. // One sync.Mutex per recipient bounds contention to per-file granularity. // // The locks map grows monotonically as new recipients appear. In practice // the agent network has <10 recipients, so the leak is bounded; if this // daemon ever lives in a system with thousands of recipients, replace the // map with an LRU or shard-and-evict scheme. type Writer struct { dir string mu sync.Mutex locks map[string]*sync.Mutex } // NewWriter returns a Writer that appends to {dir}/{recipient}.inbox. // The directory is created (with perms 0755) if it does not exist. func NewWriter(dir string) (*Writer, error) { if dir == "" { return nil, fmt.Errorf("inbox: empty dir") } if err := os.MkdirAll(dir, 0755); err != nil { return nil, fmt.Errorf("inbox: mkdir %s: %w", dir, err) } return &Writer{ dir: dir, locks: make(map[string]*sync.Mutex), }, nil } // Write appends one event to the recipient's inbox file. It mutates ev to set // TS and ID if either is empty, then sets To from recipient. func (w *Writer) Write(recipient string, ev *Event) error { if recipient == "" { return fmt.Errorf("inbox: empty recipient") } if ev == nil { return fmt.Errorf("inbox: nil event") } if ev.TS == "" { ev.TS = time.Now().UTC().Format("2006-01-02T15:04:05Z") } if ev.ID == "" { id, err := newID() if err != nil { return fmt.Errorf("inbox: generate id: %w", err) } ev.ID = id } ev.To = recipient line, err := json.Marshal(ev) if err != nil { return fmt.Errorf("inbox: marshal: %w", err) } line = append(line, '\n') lock := w.lockFor(recipient) lock.Lock() defer lock.Unlock() path := filepath.Join(w.dir, recipient+".inbox") f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return fmt.Errorf("inbox: open %s: %w", path, err) } defer f.Close() if _, err := f.Write(line); err != nil { return fmt.Errorf("inbox: append: %w", err) } return nil } func (w *Writer) lockFor(recipient string) *sync.Mutex { w.mu.Lock() defer w.mu.Unlock() if l, ok := w.locks[recipient]; ok { return l } l := &sync.Mutex{} w.locks[recipient] = l return l } func newID() (string, error) { b := make([]byte, 4) if _, err := rand.Read(b); err != nil { return "", err } return "ping-" + hex.EncodeToString(b), nil }