// Package dropfolder implements the drop-folder Source. // // Producers drop *.json files into a designated directory. The Source ingests // each file, validates it against a schema, emits an event, and deletes the // file. Malformed files are moved to a .dead-letter/ subdirectory for // inspection. // // Lifecycle: on startup, scan the folder and ingest any pre-existing files // (catches up after a Collector restart). Then watch via inotify for new // arrivals. A periodic poll (default 30s) is a safety net in case inotify // misses a Syncthing-driven rename — Syncthing usually fires IN_CLOSE_WRITE // on rename, but the spec calls for a fallback. // // Drop file schema (per spec §3.1.2): // // { // "recipient": "bob", // "type": "INFO" | "NEEDS-RESPONSE" | "ACK-REQUEST", // "priority": "normal" | "urgent", // optional, default "normal" // "payload": "...", // "sentinel": "/path/optional" // optional // } package dropfolder import ( "context" "encoding/json" "errors" "fmt" "log/slog" "os" "path/filepath" "strings" "time" "github.com/fsnotify/fsnotify" "git.botbought.ai/foreman/agent-watcher/internal/inbox" "git.botbought.ai/foreman/agent-watcher/internal/source" ) // Validation rules duplicated from agent-ping's CLI for tightness. var validTypes = map[string]bool{ "INFO": true, "NEEDS-RESPONSE": true, "ACK-REQUEST": true, } var validPriorities = map[string]bool{ "": true, // means "normal" "normal": true, "urgent": true, } // Config configures a drop-folder Source. type Config struct { // Path is the directory to watch. Created if absent. Path string // PollFallbackSeconds is a periodic full-scan interval in case inotify // misses an event. Set to 0 to disable. PollFallbackSeconds int } // Source watches a directory for *.json drop files and emits events. type Source struct { cfg Config logger *slog.Logger } // New returns a configured (but not running) Source. func New(cfg Config, logger *slog.Logger) *Source { if logger == nil { logger = slog.Default() } if cfg.PollFallbackSeconds == 0 { cfg.PollFallbackSeconds = 30 } return &Source{cfg: cfg, logger: logger.With("source", "drop-folder", "path", cfg.Path)} } func (s *Source) Name() string { return "drop-folder" } // Run blocks until ctx is canceled. func (s *Source) Run(ctx context.Context, emit source.Emit) error { if err := os.MkdirAll(s.cfg.Path, 0755); err != nil { return fmt.Errorf("dropfolder: mkdir %s: %w", s.cfg.Path, err) } if err := os.MkdirAll(s.deadLetterDir(), 0755); err != nil { return fmt.Errorf("dropfolder: mkdir dead-letter: %w", err) } w, err := fsnotify.NewWatcher() if err != nil { return fmt.Errorf("dropfolder: new watcher: %w", err) } defer w.Close() if err := w.Add(s.cfg.Path); err != nil { return fmt.Errorf("dropfolder: add %s: %w", s.cfg.Path, err) } // Initial scan — drain any files that landed before we were watching. s.scan(ctx, emit) pollEvery := time.Duration(s.cfg.PollFallbackSeconds) * time.Second var pollCh <-chan time.Time if pollEvery > 0 { t := time.NewTicker(pollEvery) defer t.Stop() pollCh = t.C } for { select { case <-ctx.Done(): return ctx.Err() case ev, ok := <-w.Events: if !ok { return errors.New("dropfolder: watcher closed") } // Care about creates and renames-into (Syncthing uses rename). if ev.Op&(fsnotify.Create|fsnotify.Write) == 0 { continue } if !strings.HasSuffix(ev.Name, ".json") { continue } s.handle(ctx, emit, ev.Name) case err, ok := <-w.Errors: if !ok { return errors.New("dropfolder: watcher errors closed") } s.logger.Warn("watcher error", "err", err) case <-pollCh: s.scan(ctx, emit) } } } // scan ingests every *.json file currently in the drop folder. func (s *Source) scan(ctx context.Context, emit source.Emit) { entries, err := os.ReadDir(s.cfg.Path) if err != nil { s.logger.Error("scan readdir", "err", err) return } for _, e := range entries { if ctx.Err() != nil { return } if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") { continue } s.handle(ctx, emit, filepath.Join(s.cfg.Path, e.Name())) } } // handle reads, validates, emits, and removes (or dead-letters) one drop file. func (s *Source) handle(ctx context.Context, emit source.Emit, path string) { if ctx.Err() != nil { return } body, err := os.ReadFile(path) if err != nil { // File may have been ingested already by another loop iteration. if errors.Is(err, os.ErrNotExist) { return } s.logger.Warn("read drop file", "path", path, "err", err) s.deadLetter(path, "read failed") return } pd, err := parseDrop(body) if err != nil { s.logger.Warn("invalid drop file", "path", path, "err", err) s.deadLetter(path, "schema invalid: "+err.Error()) return } pd.event.Source = "drop-folder:" + filepath.Base(path) if err := emit(pd.recipient, pd.event); err != nil { // Inbox write failure: don't lose the message. Leave the file in // place so the next scan retries. Log loudly. s.logger.Error("emit failed; drop file retained for retry", "path", path, "err", err) return } if err := os.Remove(path); err != nil { s.logger.Warn("remove drop file", "path", path, "err", err) } } func (s *Source) deadLetter(path, reason string) { target := filepath.Join(s.deadLetterDir(), filepath.Base(path)) if err := os.Rename(path, target); err != nil { s.logger.Error("dead-letter move failed", "path", path, "err", err) return } // Sidecar reason file for forensics. _ = os.WriteFile(target+".reason", []byte(reason+"\n"), 0644) s.logger.Info("moved to dead-letter", "from", path, "to", target, "reason", reason) } func (s *Source) deadLetterDir() string { return filepath.Join(s.cfg.Path, ".dead-letter") } // dropFile is the on-disk schema; intermediate value during parse. type dropFile struct { Recipient string `json:"recipient"` Type string `json:"type"` Priority string `json:"priority,omitempty"` Payload string `json:"payload"` Sentinel string `json:"sentinel,omitempty"` } type parsedDrop struct { recipient string event *inbox.Event } func parseDrop(body []byte) (*parsedDrop, error) { var d dropFile dec := json.NewDecoder(strings.NewReader(string(body))) dec.DisallowUnknownFields() if err := dec.Decode(&d); err != nil { return nil, fmt.Errorf("decode: %w", err) } if d.Recipient == "" { return nil, errors.New("recipient required") } if !validTypes[d.Type] { return nil, fmt.Errorf("type must be INFO|NEEDS-RESPONSE|ACK-REQUEST, got %q", d.Type) } if !validPriorities[d.Priority] { return nil, fmt.Errorf("priority must be normal|urgent, got %q", d.Priority) } if d.Payload == "" { return nil, errors.New("payload required") } priority := d.Priority if priority == "" { priority = "normal" } return &parsedDrop{ recipient: d.Recipient, event: &inbox.Event{ From: "collector", Type: d.Type, Priority: priority, Payload: d.Payload, Sentinel: d.Sentinel, }, }, nil }