diff --git a/go.mod b/go.mod index 9bfd37c..ccb4746 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,7 @@ module git.botbought.ai/foreman/agent-watcher -go 1.22.2 +go 1.22 + +require github.com/fsnotify/fsnotify v1.7.0 + +require golang.org/x/sys v0.13.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9e90680 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/source/dropfolder/dropfolder.go b/internal/source/dropfolder/dropfolder.go new file mode 100644 index 0000000..3488f36 --- /dev/null +++ b/internal/source/dropfolder/dropfolder.go @@ -0,0 +1,259 @@ +// Package dropfolder implements the drop-folder Source. +// +// Producers drop *.json files into a designated directory. The Source ingests +// each file, validates it against a schema, emits an event, and deletes the +// file. Malformed files are moved to a .dead-letter/ subdirectory for +// inspection. +// +// Lifecycle: on startup, scan the folder and ingest any pre-existing files +// (catches up after a Collector restart). Then watch via inotify for new +// arrivals. A periodic poll (default 30s) is a safety net in case inotify +// misses a Syncthing-driven rename — Syncthing usually fires IN_CLOSE_WRITE +// on rename, but the spec calls for a fallback. +// +// Drop file schema (per spec §3.1.2): +// +// { +// "recipient": "bob", +// "type": "INFO" | "NEEDS-RESPONSE" | "ACK-REQUEST", +// "priority": "normal" | "urgent", // optional, default "normal" +// "payload": "...", +// "sentinel": "/path/optional" // optional +// } +package dropfolder + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "os" + "path/filepath" + "strings" + "time" + + "github.com/fsnotify/fsnotify" + + "git.botbought.ai/foreman/agent-watcher/internal/inbox" + "git.botbought.ai/foreman/agent-watcher/internal/source" +) + +// Validation rules duplicated from agent-ping's CLI for tightness. +var validTypes = map[string]bool{ + "INFO": true, + "NEEDS-RESPONSE": true, + "ACK-REQUEST": true, +} + +var validPriorities = map[string]bool{ + "": true, // means "normal" + "normal": true, + "urgent": true, +} + +// Config configures a drop-folder Source. +type Config struct { + // Path is the directory to watch. Created if absent. + Path string + + // PollFallbackSeconds is a periodic full-scan interval in case inotify + // misses an event. Set to 0 to disable. + PollFallbackSeconds int +} + +// Source watches a directory for *.json drop files and emits events. +type Source struct { + cfg Config + logger *slog.Logger +} + +// New returns a configured (but not running) Source. +func New(cfg Config, logger *slog.Logger) *Source { + if logger == nil { + logger = slog.Default() + } + if cfg.PollFallbackSeconds == 0 { + cfg.PollFallbackSeconds = 30 + } + return &Source{cfg: cfg, logger: logger.With("source", "drop-folder", "path", cfg.Path)} +} + +func (s *Source) Name() string { return "drop-folder" } + +// Run blocks until ctx is canceled. +func (s *Source) Run(ctx context.Context, emit source.Emit) error { + if err := os.MkdirAll(s.cfg.Path, 0755); err != nil { + return fmt.Errorf("dropfolder: mkdir %s: %w", s.cfg.Path, err) + } + if err := os.MkdirAll(s.deadLetterDir(), 0755); err != nil { + return fmt.Errorf("dropfolder: mkdir dead-letter: %w", err) + } + + w, err := fsnotify.NewWatcher() + if err != nil { + return fmt.Errorf("dropfolder: new watcher: %w", err) + } + defer w.Close() + if err := w.Add(s.cfg.Path); err != nil { + return fmt.Errorf("dropfolder: add %s: %w", s.cfg.Path, err) + } + + // Initial scan — drain any files that landed before we were watching. + s.scan(ctx, emit) + + pollEvery := time.Duration(s.cfg.PollFallbackSeconds) * time.Second + var pollCh <-chan time.Time + if pollEvery > 0 { + t := time.NewTicker(pollEvery) + defer t.Stop() + pollCh = t.C + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + + case ev, ok := <-w.Events: + if !ok { + return errors.New("dropfolder: watcher closed") + } + // Care about creates and renames-into (Syncthing uses rename). + if ev.Op&(fsnotify.Create|fsnotify.Write) == 0 { + continue + } + if !strings.HasSuffix(ev.Name, ".json") { + continue + } + s.handle(ctx, emit, ev.Name) + + case err, ok := <-w.Errors: + if !ok { + return errors.New("dropfolder: watcher errors closed") + } + s.logger.Warn("watcher error", "err", err) + + case <-pollCh: + s.scan(ctx, emit) + } + } +} + +// scan ingests every *.json file currently in the drop folder. +func (s *Source) scan(ctx context.Context, emit source.Emit) { + entries, err := os.ReadDir(s.cfg.Path) + if err != nil { + s.logger.Error("scan readdir", "err", err) + return + } + for _, e := range entries { + if ctx.Err() != nil { + return + } + if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") { + continue + } + s.handle(ctx, emit, filepath.Join(s.cfg.Path, e.Name())) + } +} + +// handle reads, validates, emits, and removes (or dead-letters) one drop file. +func (s *Source) handle(ctx context.Context, emit source.Emit, path string) { + if ctx.Err() != nil { + return + } + body, err := os.ReadFile(path) + if err != nil { + // File may have been ingested already by another loop iteration. + if errors.Is(err, os.ErrNotExist) { + return + } + s.logger.Warn("read drop file", "path", path, "err", err) + s.deadLetter(path, "read failed") + return + } + + pd, err := parseDrop(body) + if err != nil { + s.logger.Warn("invalid drop file", "path", path, "err", err) + s.deadLetter(path, "schema invalid: "+err.Error()) + return + } + pd.event.Source = "drop-folder:" + filepath.Base(path) + + if err := emit(pd.recipient, pd.event); err != nil { + // Inbox write failure: don't lose the message. Leave the file in + // place so the next scan retries. Log loudly. + s.logger.Error("emit failed; drop file retained for retry", "path", path, "err", err) + return + } + + if err := os.Remove(path); err != nil { + s.logger.Warn("remove drop file", "path", path, "err", err) + } +} + +func (s *Source) deadLetter(path, reason string) { + target := filepath.Join(s.deadLetterDir(), filepath.Base(path)) + if err := os.Rename(path, target); err != nil { + s.logger.Error("dead-letter move failed", "path", path, "err", err) + return + } + // Sidecar reason file for forensics. + _ = os.WriteFile(target+".reason", []byte(reason+"\n"), 0644) + s.logger.Info("moved to dead-letter", "from", path, "to", target, "reason", reason) +} + +func (s *Source) deadLetterDir() string { + return filepath.Join(s.cfg.Path, ".dead-letter") +} + +// dropFile is the on-disk schema; intermediate value during parse. +type dropFile struct { + Recipient string `json:"recipient"` + Type string `json:"type"` + Priority string `json:"priority,omitempty"` + Payload string `json:"payload"` + Sentinel string `json:"sentinel,omitempty"` +} + +type parsedDrop struct { + recipient string + event *inbox.Event +} + +func parseDrop(body []byte) (*parsedDrop, error) { + var d dropFile + dec := json.NewDecoder(strings.NewReader(string(body))) + dec.DisallowUnknownFields() + if err := dec.Decode(&d); err != nil { + return nil, fmt.Errorf("decode: %w", err) + } + if d.Recipient == "" { + return nil, errors.New("recipient required") + } + if !validTypes[d.Type] { + return nil, fmt.Errorf("type must be INFO|NEEDS-RESPONSE|ACK-REQUEST, got %q", d.Type) + } + if !validPriorities[d.Priority] { + return nil, fmt.Errorf("priority must be normal|urgent, got %q", d.Priority) + } + if d.Payload == "" { + return nil, errors.New("payload required") + } + priority := d.Priority + if priority == "" { + priority = "normal" + } + return &parsedDrop{ + recipient: d.Recipient, + event: &inbox.Event{ + From: "collector", + Type: d.Type, + Priority: priority, + Payload: d.Payload, + Sentinel: d.Sentinel, + }, + }, nil +} diff --git a/internal/source/dropfolder/dropfolder_test.go b/internal/source/dropfolder/dropfolder_test.go new file mode 100644 index 0000000..4e1d508 --- /dev/null +++ b/internal/source/dropfolder/dropfolder_test.go @@ -0,0 +1,273 @@ +package dropfolder + +import ( + "context" + "encoding/json" + "io" + "log/slog" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "git.botbought.ai/foreman/agent-watcher/internal/inbox" +) + +func quietLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +func TestParseDrop_Valid(t *testing.T) { + body := []byte(`{"recipient":"bob","type":"INFO","priority":"urgent","payload":"hi","sentinel":"/x"}`) + p, err := parseDrop(body) + if err != nil { + t.Fatal(err) + } + if p.recipient != "bob" { + t.Errorf("recipient = %q", p.recipient) + } + if p.event.Type != "INFO" || p.event.Priority != "urgent" || p.event.Payload != "hi" || p.event.Sentinel != "/x" { + t.Errorf("unexpected event: %+v", p.event) + } +} + +func TestParseDrop_DefaultsPriority(t *testing.T) { + p, err := parseDrop([]byte(`{"recipient":"r","type":"INFO","payload":"p"}`)) + if err != nil { + t.Fatal(err) + } + if p.event.Priority != "normal" { + t.Errorf("priority default = %q, want normal", p.event.Priority) + } +} + +func TestParseDrop_Invalid(t *testing.T) { + cases := map[string]string{ + "empty body": ``, + "missing recipient": `{"type":"INFO","payload":"p"}`, + "missing type": `{"recipient":"r","payload":"p"}`, + "bad type": `{"recipient":"r","type":"NOPE","payload":"p"}`, + "bad priority": `{"recipient":"r","type":"INFO","priority":"high","payload":"p"}`, + "missing payload": `{"recipient":"r","type":"INFO"}`, + "unknown field": `{"recipient":"r","type":"INFO","payload":"p","stowaway":1}`, + "not json": `not json`, + } + for name, body := range cases { + t.Run(name, func(t *testing.T) { + if _, err := parseDrop([]byte(body)); err == nil { + t.Error("expected error") + } + }) + } +} + +// recordingEmit captures emitted events for assertion. +type recordingEmit struct { + mu sync.Mutex + events []record + err error // set to fail the next emit +} + +type record struct { + recipient string + event inbox.Event +} + +func (r *recordingEmit) emit(recipient string, ev *inbox.Event) error { + r.mu.Lock() + defer r.mu.Unlock() + if r.err != nil { + err := r.err + r.err = nil + return err + } + r.events = append(r.events, record{recipient, *ev}) + return nil +} + +func (r *recordingEmit) snapshot() []record { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]record, len(r.events)) + copy(out, r.events) + return out +} + +func writeDrop(t *testing.T, dir, name string, payload any) string { + t.Helper() + path := filepath.Join(dir, name) + b, err := json.Marshal(payload) + if err != nil { + t.Fatal(err) + } + tmp := path + ".tmp" + if err := os.WriteFile(tmp, b, 0644); err != nil { + t.Fatal(err) + } + // rename for atomic visibility (mirrors what Syncthing does) + if err := os.Rename(tmp, path); err != nil { + t.Fatal(err) + } + return path +} + +func TestSource_InitialScan_IngestsExistingFiles(t *testing.T) { + dir := t.TempDir() + writeDrop(t, dir, "1.json", map[string]string{ + "recipient": "bob", "type": "INFO", "payload": "first", + }) + writeDrop(t, dir, "2.json", map[string]string{ + "recipient": "bob", "type": "NEEDS-RESPONSE", "payload": "second", + }) + + rec := &recordingEmit{} + src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + go src.Run(ctx, rec.emit) + + waitFor(t, func() bool { return len(rec.snapshot()) == 2 }, 400*time.Millisecond) + + got := rec.snapshot() + if got[0].event.Payload+"|"+got[1].event.Payload != "first|second" && + got[0].event.Payload+"|"+got[1].event.Payload != "second|first" { + t.Errorf("unexpected payloads: %+v", got) + } + for _, r := range got { + if r.recipient != "bob" { + t.Errorf("recipient %q", r.recipient) + } + } +} + +func TestSource_LiveDrop_IngestsViaInotify(t *testing.T) { + dir := t.TempDir() + rec := &recordingEmit{} + src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go src.Run(ctx, rec.emit) + + // Give the watcher time to attach. + time.Sleep(50 * time.Millisecond) + writeDrop(t, dir, "live.json", map[string]string{ + "recipient": "foreman", "type": "INFO", "payload": "live one", + }) + + waitFor(t, func() bool { return len(rec.snapshot()) == 1 }, 1*time.Second) + + got := rec.snapshot() + if got[0].event.Payload != "live one" { + t.Errorf("payload %q", got[0].event.Payload) + } +} + +func TestSource_DeleteAfterEmit(t *testing.T) { + dir := t.TempDir() + rec := &recordingEmit{} + src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go src.Run(ctx, rec.emit) + + time.Sleep(50 * time.Millisecond) + path := writeDrop(t, dir, "ok.json", map[string]string{ + "recipient": "r", "type": "INFO", "payload": "p", + }) + + waitFor(t, func() bool { + _, err := os.Stat(path) + return os.IsNotExist(err) + }, 1*time.Second) +} + +func TestSource_DeadLetter_OnInvalidSchema(t *testing.T) { + dir := t.TempDir() + rec := &recordingEmit{} + src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go src.Run(ctx, rec.emit) + + time.Sleep(50 * time.Millisecond) + // Missing payload — invalid. + tmp := filepath.Join(dir, "bad.json.tmp") + os.WriteFile(tmp, []byte(`{"recipient":"r","type":"INFO"}`), 0644) + os.Rename(tmp, filepath.Join(dir, "bad.json")) + + deadPath := filepath.Join(dir, ".dead-letter", "bad.json") + waitFor(t, func() bool { + _, err := os.Stat(deadPath) + return err == nil + }, 1*time.Second) + + // Reason sidecar exists. + if _, err := os.Stat(deadPath + ".reason"); err != nil { + t.Errorf("reason sidecar missing: %v", err) + } + + // Original file gone from drop dir. + if _, err := os.Stat(filepath.Join(dir, "bad.json")); !os.IsNotExist(err) { + t.Errorf("original drop file should be gone") + } + + // Nothing was emitted. + if got := rec.snapshot(); len(got) != 0 { + t.Errorf("got events for invalid drop: %+v", got) + } +} + +func TestSource_RetainsFile_WhenEmitFails(t *testing.T) { + dir := t.TempDir() + rec := &recordingEmit{err: errOnce()} + src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go src.Run(ctx, rec.emit) + + time.Sleep(50 * time.Millisecond) + path := writeDrop(t, dir, "retry.json", map[string]string{ + "recipient": "r", "type": "INFO", "payload": "p", + }) + + // Wait long enough for the (failing) emit to have happened. + time.Sleep(200 * time.Millisecond) + + // Drop file is still there (retained for retry). + if _, err := os.Stat(path); err != nil { + t.Errorf("drop file should be retained on emit failure: %v", err) + } + // Not in dead-letter. + if _, err := os.Stat(filepath.Join(dir, ".dead-letter", "retry.json")); err == nil { + t.Error("emit failure should NOT dead-letter (it's transient)") + } +} + +// errOnce returns a non-nil error one time, then nil after. +func errOnce() error { + type e struct{} + return &emitErr{} +} + +type emitErr struct{} + +func (e *emitErr) Error() string { return "transient emit failure" } + +// waitFor polls predicate until it returns true or timeout elapses. +func waitFor(t *testing.T, pred func() bool, timeout time.Duration) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if pred() { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("waitFor: predicate did not become true within %s", timeout) +} diff --git a/internal/source/source.go b/internal/source/source.go new file mode 100644 index 0000000..d9d914f --- /dev/null +++ b/internal/source/source.go @@ -0,0 +1,35 @@ +// Package source defines the contract every Collector source plugin implements. +// +// A Source is a long-running goroutine that converts external events into +// inbox writes. It receives an Emit callback at start; each external event it +// observes results in one Emit call. The Source returns when its context is +// canceled, or earlier on a fatal error. +// +// Sources do not own the inbox writer or any other shared state. They emit; +// the dispatcher routes. This keeps each source small and testable in +// isolation. +package source + +import ( + "context" + + "git.botbought.ai/foreman/agent-watcher/internal/inbox" +) + +// Emit is the callback a Source uses to push one observed event into the +// dispatcher. The dispatcher writes it to recipient's inbox file. Returning +// an error means the inbox write failed; sources may log and continue, or +// drop, or shut down — that's a per-source policy decision. +type Emit func(recipient string, ev *inbox.Event) error + +// Source is a Collector input. Implementations are constructed with their +// own configuration and started via Run. +type Source interface { + // Name is a short identifier for logs and the "source" field on emitted + // events ("webhook", "drop-folder", etc.). + Name() string + + // Run blocks until ctx is canceled or a fatal error occurs. Each + // observed external event becomes one Emit call. + Run(ctx context.Context, emit Emit) error +}