Collector milestone 2: source interface + drop-folder source
source.Source is the contract every Collector input implements: Name + Run(ctx, emit). Sources don't own state — they convert external events into emit calls. Dispatcher routes. internal/source/dropfolder: watches ~/Nyx/workspace/incoming/ for *.json drop files. fsnotify-driven with periodic poll fallback (default 30s safety net for missed events). Each file: 1. Parsed against the spec §3.1.2 schema with DisallowUnknownFields. 2. Valid → emitted, then file deleted. 3. Invalid (missing fields, bad type/priority, unknown fields, garbage) → moved to .dead-letter/ with a sidecar .reason file for forensics. 4. Emit failure → file retained in place for retry (transient errors shouldn't be permanent dead-letters). Also: initial-scan on Run() drains files that landed before the watcher attached, catching up after a Collector restart. 14 tests in the package — schema validation table for all error cases, initial-scan, live inotify drop, post-emit delete, dead-letter + sidecar, emit-failure retention. Plus the 7 inbox tests still passing. Pinned fsnotify v1.7.0 (Go 1.22 compatible; 1.10.x demanded toolchain 1.23 which isn't in apt yet). go.mod stays at 1.22 to match VPS. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
50e8ece83d
commit
f9d81471c4
5 changed files with 576 additions and 1 deletions
259
internal/source/dropfolder/dropfolder.go
Normal file
259
internal/source/dropfolder/dropfolder.go
Normal file
|
|
@ -0,0 +1,259 @@
|
|||
// Package dropfolder implements the drop-folder Source.
|
||||
//
|
||||
// Producers drop *.json files into a designated directory. The Source ingests
|
||||
// each file, validates it against a schema, emits an event, and deletes the
|
||||
// file. Malformed files are moved to a .dead-letter/ subdirectory for
|
||||
// inspection.
|
||||
//
|
||||
// Lifecycle: on startup, scan the folder and ingest any pre-existing files
|
||||
// (catches up after a Collector restart). Then watch via inotify for new
|
||||
// arrivals. A periodic poll (default 30s) is a safety net in case inotify
|
||||
// misses a Syncthing-driven rename — Syncthing usually fires IN_CLOSE_WRITE
|
||||
// on rename, but the spec calls for a fallback.
|
||||
//
|
||||
// Drop file schema (per spec §3.1.2):
|
||||
//
|
||||
// {
|
||||
// "recipient": "bob",
|
||||
// "type": "INFO" | "NEEDS-RESPONSE" | "ACK-REQUEST",
|
||||
// "priority": "normal" | "urgent", // optional, default "normal"
|
||||
// "payload": "...",
|
||||
// "sentinel": "/path/optional" // optional
|
||||
// }
|
||||
package dropfolder
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
|
||||
"git.botbought.ai/foreman/agent-watcher/internal/inbox"
|
||||
"git.botbought.ai/foreman/agent-watcher/internal/source"
|
||||
)
|
||||
|
||||
// Validation rules duplicated from agent-ping's CLI for tightness.
|
||||
var validTypes = map[string]bool{
|
||||
"INFO": true,
|
||||
"NEEDS-RESPONSE": true,
|
||||
"ACK-REQUEST": true,
|
||||
}
|
||||
|
||||
var validPriorities = map[string]bool{
|
||||
"": true, // means "normal"
|
||||
"normal": true,
|
||||
"urgent": true,
|
||||
}
|
||||
|
||||
// Config configures a drop-folder Source.
|
||||
type Config struct {
|
||||
// Path is the directory to watch. Created if absent.
|
||||
Path string
|
||||
|
||||
// PollFallbackSeconds is a periodic full-scan interval in case inotify
|
||||
// misses an event. Set to 0 to disable.
|
||||
PollFallbackSeconds int
|
||||
}
|
||||
|
||||
// Source watches a directory for *.json drop files and emits events.
|
||||
type Source struct {
|
||||
cfg Config
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// New returns a configured (but not running) Source.
|
||||
func New(cfg Config, logger *slog.Logger) *Source {
|
||||
if logger == nil {
|
||||
logger = slog.Default()
|
||||
}
|
||||
if cfg.PollFallbackSeconds == 0 {
|
||||
cfg.PollFallbackSeconds = 30
|
||||
}
|
||||
return &Source{cfg: cfg, logger: logger.With("source", "drop-folder", "path", cfg.Path)}
|
||||
}
|
||||
|
||||
func (s *Source) Name() string { return "drop-folder" }
|
||||
|
||||
// Run blocks until ctx is canceled.
|
||||
func (s *Source) Run(ctx context.Context, emit source.Emit) error {
|
||||
if err := os.MkdirAll(s.cfg.Path, 0755); err != nil {
|
||||
return fmt.Errorf("dropfolder: mkdir %s: %w", s.cfg.Path, err)
|
||||
}
|
||||
if err := os.MkdirAll(s.deadLetterDir(), 0755); err != nil {
|
||||
return fmt.Errorf("dropfolder: mkdir dead-letter: %w", err)
|
||||
}
|
||||
|
||||
w, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return fmt.Errorf("dropfolder: new watcher: %w", err)
|
||||
}
|
||||
defer w.Close()
|
||||
if err := w.Add(s.cfg.Path); err != nil {
|
||||
return fmt.Errorf("dropfolder: add %s: %w", s.cfg.Path, err)
|
||||
}
|
||||
|
||||
// Initial scan — drain any files that landed before we were watching.
|
||||
s.scan(ctx, emit)
|
||||
|
||||
pollEvery := time.Duration(s.cfg.PollFallbackSeconds) * time.Second
|
||||
var pollCh <-chan time.Time
|
||||
if pollEvery > 0 {
|
||||
t := time.NewTicker(pollEvery)
|
||||
defer t.Stop()
|
||||
pollCh = t.C
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
|
||||
case ev, ok := <-w.Events:
|
||||
if !ok {
|
||||
return errors.New("dropfolder: watcher closed")
|
||||
}
|
||||
// Care about creates and renames-into (Syncthing uses rename).
|
||||
if ev.Op&(fsnotify.Create|fsnotify.Write) == 0 {
|
||||
continue
|
||||
}
|
||||
if !strings.HasSuffix(ev.Name, ".json") {
|
||||
continue
|
||||
}
|
||||
s.handle(ctx, emit, ev.Name)
|
||||
|
||||
case err, ok := <-w.Errors:
|
||||
if !ok {
|
||||
return errors.New("dropfolder: watcher errors closed")
|
||||
}
|
||||
s.logger.Warn("watcher error", "err", err)
|
||||
|
||||
case <-pollCh:
|
||||
s.scan(ctx, emit)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// scan ingests every *.json file currently in the drop folder.
|
||||
func (s *Source) scan(ctx context.Context, emit source.Emit) {
|
||||
entries, err := os.ReadDir(s.cfg.Path)
|
||||
if err != nil {
|
||||
s.logger.Error("scan readdir", "err", err)
|
||||
return
|
||||
}
|
||||
for _, e := range entries {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") {
|
||||
continue
|
||||
}
|
||||
s.handle(ctx, emit, filepath.Join(s.cfg.Path, e.Name()))
|
||||
}
|
||||
}
|
||||
|
||||
// handle reads, validates, emits, and removes (or dead-letters) one drop file.
|
||||
func (s *Source) handle(ctx context.Context, emit source.Emit, path string) {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
body, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
// File may have been ingested already by another loop iteration.
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return
|
||||
}
|
||||
s.logger.Warn("read drop file", "path", path, "err", err)
|
||||
s.deadLetter(path, "read failed")
|
||||
return
|
||||
}
|
||||
|
||||
pd, err := parseDrop(body)
|
||||
if err != nil {
|
||||
s.logger.Warn("invalid drop file", "path", path, "err", err)
|
||||
s.deadLetter(path, "schema invalid: "+err.Error())
|
||||
return
|
||||
}
|
||||
pd.event.Source = "drop-folder:" + filepath.Base(path)
|
||||
|
||||
if err := emit(pd.recipient, pd.event); err != nil {
|
||||
// Inbox write failure: don't lose the message. Leave the file in
|
||||
// place so the next scan retries. Log loudly.
|
||||
s.logger.Error("emit failed; drop file retained for retry", "path", path, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := os.Remove(path); err != nil {
|
||||
s.logger.Warn("remove drop file", "path", path, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Source) deadLetter(path, reason string) {
|
||||
target := filepath.Join(s.deadLetterDir(), filepath.Base(path))
|
||||
if err := os.Rename(path, target); err != nil {
|
||||
s.logger.Error("dead-letter move failed", "path", path, "err", err)
|
||||
return
|
||||
}
|
||||
// Sidecar reason file for forensics.
|
||||
_ = os.WriteFile(target+".reason", []byte(reason+"\n"), 0644)
|
||||
s.logger.Info("moved to dead-letter", "from", path, "to", target, "reason", reason)
|
||||
}
|
||||
|
||||
func (s *Source) deadLetterDir() string {
|
||||
return filepath.Join(s.cfg.Path, ".dead-letter")
|
||||
}
|
||||
|
||||
// dropFile is the on-disk schema; intermediate value during parse.
|
||||
type dropFile struct {
|
||||
Recipient string `json:"recipient"`
|
||||
Type string `json:"type"`
|
||||
Priority string `json:"priority,omitempty"`
|
||||
Payload string `json:"payload"`
|
||||
Sentinel string `json:"sentinel,omitempty"`
|
||||
}
|
||||
|
||||
type parsedDrop struct {
|
||||
recipient string
|
||||
event *inbox.Event
|
||||
}
|
||||
|
||||
func parseDrop(body []byte) (*parsedDrop, error) {
|
||||
var d dropFile
|
||||
dec := json.NewDecoder(strings.NewReader(string(body)))
|
||||
dec.DisallowUnknownFields()
|
||||
if err := dec.Decode(&d); err != nil {
|
||||
return nil, fmt.Errorf("decode: %w", err)
|
||||
}
|
||||
if d.Recipient == "" {
|
||||
return nil, errors.New("recipient required")
|
||||
}
|
||||
if !validTypes[d.Type] {
|
||||
return nil, fmt.Errorf("type must be INFO|NEEDS-RESPONSE|ACK-REQUEST, got %q", d.Type)
|
||||
}
|
||||
if !validPriorities[d.Priority] {
|
||||
return nil, fmt.Errorf("priority must be normal|urgent, got %q", d.Priority)
|
||||
}
|
||||
if d.Payload == "" {
|
||||
return nil, errors.New("payload required")
|
||||
}
|
||||
priority := d.Priority
|
||||
if priority == "" {
|
||||
priority = "normal"
|
||||
}
|
||||
return &parsedDrop{
|
||||
recipient: d.Recipient,
|
||||
event: &inbox.Event{
|
||||
From: "collector",
|
||||
Type: d.Type,
|
||||
Priority: priority,
|
||||
Payload: d.Payload,
|
||||
Sentinel: d.Sentinel,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue