Collector milestone 5: end-to-end integration tests

cmd/agent-watcher/main_test.go builds the real binary in TestMain, then
launches it twice with temp configs to exercise the full path:

TestEndToEnd_BothSourcesEmitToInbox
  - drops a *.json file via tmp+rename (mirrors Syncthing semantics)
  - POSTs a webhook with template variables ({{ .repo }}, {{ .actor }})
  - POSTs a urgent alert with empty body and fixed-string template
  - asserts 3 JSONL lines land in bob.inbox with exact shape
  - confirms each event's source field tracks origin
    ("drop-folder:drop1.json", "webhook:/forgejo/push")
  - hits /health and verifies emitted=2 (one webhook didn't 200, that
    counter only counts successful emits)

TestEndToEnd_GracefulShutdown
  - SIGTERM after listener up
  - asserts process exits within 3s

Total: 43 tests across 5 packages, all passing. Real binary verified
end-to-end on Linux/amd64.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
bob-boat 2026-05-06 16:23:34 -04:00
parent 2183850c03
commit e7d4ea036a

View file

@ -0,0 +1,299 @@
package main
import (
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"syscall"
"testing"
"time"
)
// TestMain builds the agent-watcher binary once for the integration tests.
func TestMain(m *testing.M) {
bin, err := buildBinary()
if err != nil {
fmt.Fprintln(os.Stderr, "build failed:", err)
os.Exit(1)
}
binaryPath = bin
code := m.Run()
os.Remove(bin)
os.Exit(code)
}
var binaryPath string
func buildBinary() (string, error) {
dir, err := os.MkdirTemp("", "agent-watcher-bin-*")
if err != nil {
return "", err
}
out := filepath.Join(dir, "agent-watcher")
cmd := exec.Command("go", "build", "-o", out, ".")
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return "", err
}
return out, nil
}
// freePort returns an unused loopback "127.0.0.1:N" port.
func freePort(t *testing.T) string {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
addr := l.Addr().String()
l.Close()
return addr
}
// startBinary launches agent-watcher with the given config file and returns
// the running cmd. The test is responsible for sending SIGINT/SIGTERM.
func startBinary(t *testing.T, configPath string) *exec.Cmd {
t.Helper()
cmd := exec.Command(binaryPath, "--config", configPath)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if cmd.Process != nil {
cmd.Process.Signal(syscall.SIGTERM)
cmd.Wait()
}
})
return cmd
}
// waitFor polls predicate until true or timeout.
func waitFor(t *testing.T, name string, pred func() bool, timeout time.Duration) {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if pred() {
return
}
time.Sleep(20 * time.Millisecond)
}
t.Fatalf("waitFor %q: predicate did not become true within %s", name, timeout)
}
func waitForListener(t *testing.T, addr string, timeout time.Duration) {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
c, err := net.DialTimeout("tcp", addr, 50*time.Millisecond)
if err == nil {
c.Close()
return
}
time.Sleep(20 * time.Millisecond)
}
t.Fatalf("listener at %s never came up", addr)
}
func readInbox(t *testing.T, path string) []map[string]any {
t.Helper()
body, err := os.ReadFile(path)
if err != nil {
return nil
}
out := []map[string]any{}
for _, line := range strings.Split(strings.TrimRight(string(body), "\n"), "\n") {
if line == "" {
continue
}
var m map[string]any
if err := json.Unmarshal([]byte(line), &m); err != nil {
t.Errorf("non-JSON line: %s", line)
continue
}
out = append(out, m)
}
return out
}
// TestEndToEnd_BothSourcesEmitToInbox builds and runs the real binary,
// exercises both webhook and drop-folder sources, and verifies the inbox
// file contains one bit-identical JSONL line per emitted event.
func TestEndToEnd_BothSourcesEmitToInbox(t *testing.T) {
tmp := t.TempDir()
inboxDir := filepath.Join(tmp, "pings")
dropDir := filepath.Join(tmp, "incoming")
configPath := filepath.Join(tmp, "collector.yaml")
addr := freePort(t)
cfg := fmt.Sprintf(`
agent: foreman
inbox_dir: %s
sources:
webhook:
listen: %s
routes:
/forgejo/push:
recipient: bob
type: INFO
payload_template: "push {{ .repo }} by {{ .actor }}"
/alert:
recipient: bob
type: NEEDS-RESPONSE
priority: urgent
payload_template: "fixed alert"
drop_folder:
path: %s
poll_fallback_seconds: 0
`, inboxDir, addr, dropDir)
if err := os.WriteFile(configPath, []byte(cfg), 0644); err != nil {
t.Fatal(err)
}
startBinary(t, configPath)
waitForListener(t, addr, 3*time.Second)
// 1) drop a file
dropContent := `{"recipient":"bob","type":"INFO","payload":"from drop"}`
dropTmp := filepath.Join(dropDir, "drop1.json.tmp")
if err := os.WriteFile(dropTmp, []byte(dropContent), 0644); err != nil {
t.Fatal(err)
}
if err := os.Rename(dropTmp, filepath.Join(dropDir, "drop1.json")); err != nil {
t.Fatal(err)
}
// 2) post a webhook
resp, err := http.Post("http://"+addr+"/forgejo/push", "application/json",
strings.NewReader(`{"repo":"agent-ping","actor":"angus"}`))
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("webhook status = %d", resp.StatusCode)
}
// 3) post a urgent alert
resp2, err := http.Post("http://"+addr+"/alert", "application/json", strings.NewReader(""))
if err != nil {
t.Fatal(err)
}
resp2.Body.Close()
// Wait for 3 lines in bob's inbox.
bobInbox := filepath.Join(inboxDir, "bob.inbox")
waitFor(t, "3 inbox lines", func() bool {
return len(readInbox(t, bobInbox)) == 3
}, 3*time.Second)
got := readInbox(t, bobInbox)
payloads := map[string]map[string]any{}
for _, m := range got {
payloads[m["payload"].(string)] = m
}
// drop event
drop := payloads["from drop"]
if drop == nil {
t.Fatalf("missing drop event; got: %+v", got)
}
if drop["type"] != "INFO" || drop["from"] != "collector" {
t.Errorf("drop event shape: %+v", drop)
}
if !strings.HasPrefix(drop["source"].(string), "drop-folder:drop1.json") {
t.Errorf("drop source: %v", drop["source"])
}
// webhook push event
push := payloads["push agent-ping by angus"]
if push == nil {
t.Fatalf("missing push event; got: %+v", got)
}
if push["type"] != "INFO" || push["priority"] != "normal" {
t.Errorf("push event shape: %+v", push)
}
if push["source"] != "webhook:/forgejo/push" {
t.Errorf("push source: %v", push["source"])
}
// alert event with priority urgent
alert := payloads["fixed alert"]
if alert == nil {
t.Fatalf("missing alert event; got: %+v", got)
}
if alert["priority"] != "urgent" || alert["type"] != "NEEDS-RESPONSE" {
t.Errorf("alert event shape: %+v", alert)
}
// Health endpoint is wired and returns sane counters.
resp3, err := http.Get("http://" + addr + "/health")
if err != nil {
t.Fatal(err)
}
defer resp3.Body.Close()
var stats map[string]any
if err := json.NewDecoder(resp3.Body).Decode(&stats); err != nil {
t.Fatal(err)
}
if int(stats["emitted"].(float64)) != 2 {
t.Errorf("health emitted = %v, want 2", stats["emitted"])
}
}
// TestEndToEnd_GracefulShutdown ensures SIGTERM stops the binary cleanly.
func TestEndToEnd_GracefulShutdown(t *testing.T) {
tmp := t.TempDir()
configPath := filepath.Join(tmp, "collector.yaml")
addr := freePort(t)
cfg := fmt.Sprintf(`
agent: foreman
inbox_dir: %s/pings
sources:
webhook:
listen: %s
routes:
/x:
recipient: r
type: INFO
payload_template: ok
`, tmp, addr)
if err := os.WriteFile(configPath, []byte(cfg), 0644); err != nil {
t.Fatal(err)
}
cmd := exec.Command(binaryPath, "--config", configPath)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
t.Fatal(err)
}
waitForListener(t, addr, 3*time.Second)
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
t.Fatal(err)
}
done := make(chan error, 1)
go func() { done <- cmd.Wait() }()
select {
case err := <-done:
if err != nil {
// Some Go runtimes return an exitstatus error after signal-driven
// shutdown; that's OK as long as we exited.
if _, ok := err.(*exec.ExitError); !ok {
t.Errorf("unexpected exit error: %v", err)
}
}
case <-time.After(3 * time.Second):
cmd.Process.Kill()
t.Fatal("did not shut down within 3s of SIGTERM")
}
}