package dropfolder import ( "context" "encoding/json" "io" "log/slog" "os" "path/filepath" "sync" "testing" "time" "git.botbought.ai/foreman/agent-watcher/internal/inbox" ) func quietLogger() *slog.Logger { return slog.New(slog.NewTextHandler(io.Discard, nil)) } func TestParseDrop_Valid(t *testing.T) { body := []byte(`{"recipient":"bob","type":"INFO","priority":"urgent","payload":"hi","sentinel":"/x"}`) p, err := parseDrop(body) if err != nil { t.Fatal(err) } if p.recipient != "bob" { t.Errorf("recipient = %q", p.recipient) } if p.event.Type != "INFO" || p.event.Priority != "urgent" || p.event.Payload != "hi" || p.event.Sentinel != "/x" { t.Errorf("unexpected event: %+v", p.event) } } func TestParseDrop_DefaultsPriority(t *testing.T) { p, err := parseDrop([]byte(`{"recipient":"r","type":"INFO","payload":"p"}`)) if err != nil { t.Fatal(err) } if p.event.Priority != "normal" { t.Errorf("priority default = %q, want normal", p.event.Priority) } } func TestParseDrop_Invalid(t *testing.T) { cases := map[string]string{ "empty body": ``, "missing recipient": `{"type":"INFO","payload":"p"}`, "missing type": `{"recipient":"r","payload":"p"}`, "bad type": `{"recipient":"r","type":"NOPE","payload":"p"}`, "bad priority": `{"recipient":"r","type":"INFO","priority":"high","payload":"p"}`, "missing payload": `{"recipient":"r","type":"INFO"}`, "unknown field": `{"recipient":"r","type":"INFO","payload":"p","stowaway":1}`, "not json": `not json`, } for name, body := range cases { t.Run(name, func(t *testing.T) { if _, err := parseDrop([]byte(body)); err == nil { t.Error("expected error") } }) } } // recordingEmit captures emitted events for assertion. type recordingEmit struct { mu sync.Mutex events []record err error // set to fail the next emit } type record struct { recipient string event inbox.Event } func (r *recordingEmit) emit(recipient string, ev *inbox.Event) error { r.mu.Lock() defer r.mu.Unlock() if r.err != nil { err := r.err r.err = nil return err } r.events = append(r.events, record{recipient, *ev}) return nil } func (r *recordingEmit) snapshot() []record { r.mu.Lock() defer r.mu.Unlock() out := make([]record, len(r.events)) copy(out, r.events) return out } func writeDrop(t *testing.T, dir, name string, payload any) string { t.Helper() path := filepath.Join(dir, name) b, err := json.Marshal(payload) if err != nil { t.Fatal(err) } tmp := path + ".tmp" if err := os.WriteFile(tmp, b, 0644); err != nil { t.Fatal(err) } // rename for atomic visibility (mirrors what Syncthing does) if err := os.Rename(tmp, path); err != nil { t.Fatal(err) } return path } func TestSource_InitialScan_IngestsExistingFiles(t *testing.T) { dir := t.TempDir() writeDrop(t, dir, "1.json", map[string]string{ "recipient": "bob", "type": "INFO", "payload": "first", }) writeDrop(t, dir, "2.json", map[string]string{ "recipient": "bob", "type": "NEEDS-RESPONSE", "payload": "second", }) rec := &recordingEmit{} src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() go src.Run(ctx, rec.emit) waitFor(t, func() bool { return len(rec.snapshot()) == 2 }, 400*time.Millisecond) got := rec.snapshot() if got[0].event.Payload+"|"+got[1].event.Payload != "first|second" && got[0].event.Payload+"|"+got[1].event.Payload != "second|first" { t.Errorf("unexpected payloads: %+v", got) } for _, r := range got { if r.recipient != "bob" { t.Errorf("recipient %q", r.recipient) } } } func TestSource_LiveDrop_IngestsViaInotify(t *testing.T) { dir := t.TempDir() rec := &recordingEmit{} src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go src.Run(ctx, rec.emit) // Give the watcher time to attach. time.Sleep(50 * time.Millisecond) writeDrop(t, dir, "live.json", map[string]string{ "recipient": "foreman", "type": "INFO", "payload": "live one", }) waitFor(t, func() bool { return len(rec.snapshot()) == 1 }, 1*time.Second) got := rec.snapshot() if got[0].event.Payload != "live one" { t.Errorf("payload %q", got[0].event.Payload) } } func TestSource_DeleteAfterEmit(t *testing.T) { dir := t.TempDir() rec := &recordingEmit{} src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go src.Run(ctx, rec.emit) time.Sleep(50 * time.Millisecond) path := writeDrop(t, dir, "ok.json", map[string]string{ "recipient": "r", "type": "INFO", "payload": "p", }) waitFor(t, func() bool { _, err := os.Stat(path) return os.IsNotExist(err) }, 1*time.Second) } func TestSource_DeadLetter_OnInvalidSchema(t *testing.T) { dir := t.TempDir() rec := &recordingEmit{} src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go src.Run(ctx, rec.emit) time.Sleep(50 * time.Millisecond) // Missing payload — invalid. tmp := filepath.Join(dir, "bad.json.tmp") os.WriteFile(tmp, []byte(`{"recipient":"r","type":"INFO"}`), 0644) os.Rename(tmp, filepath.Join(dir, "bad.json")) deadPath := filepath.Join(dir, ".dead-letter", "bad.json") waitFor(t, func() bool { _, err := os.Stat(deadPath) return err == nil }, 1*time.Second) // Reason sidecar exists. if _, err := os.Stat(deadPath + ".reason"); err != nil { t.Errorf("reason sidecar missing: %v", err) } // Original file gone from drop dir. if _, err := os.Stat(filepath.Join(dir, "bad.json")); !os.IsNotExist(err) { t.Errorf("original drop file should be gone") } // Nothing was emitted. if got := rec.snapshot(); len(got) != 0 { t.Errorf("got events for invalid drop: %+v", got) } } func TestSource_RetainsFile_WhenEmitFails(t *testing.T) { dir := t.TempDir() rec := &recordingEmit{err: errOnce()} src := New(Config{Path: dir, PollFallbackSeconds: 0}, quietLogger()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go src.Run(ctx, rec.emit) time.Sleep(50 * time.Millisecond) path := writeDrop(t, dir, "retry.json", map[string]string{ "recipient": "r", "type": "INFO", "payload": "p", }) // Wait long enough for the (failing) emit to have happened. time.Sleep(200 * time.Millisecond) // Drop file is still there (retained for retry). if _, err := os.Stat(path); err != nil { t.Errorf("drop file should be retained on emit failure: %v", err) } // Not in dead-letter. if _, err := os.Stat(filepath.Join(dir, ".dead-letter", "retry.json")); err == nil { t.Error("emit failure should NOT dead-letter (it's transient)") } } // errOnce returns a non-nil error one time, then nil after. func errOnce() error { type e struct{} return &emitErr{} } type emitErr struct{} func (e *emitErr) Error() string { return "transient emit failure" } // waitFor polls predicate until it returns true or timeout elapses. func waitFor(t *testing.T, pred func() bool, timeout time.Duration) { t.Helper() deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { if pred() { return } time.Sleep(10 * time.Millisecond) } t.Fatalf("waitFor: predicate did not become true within %s", timeout) }