diff --git a/internal/source/webhook/webhook.go b/internal/source/webhook/webhook.go new file mode 100644 index 0000000..18da159 --- /dev/null +++ b/internal/source/webhook/webhook.go @@ -0,0 +1,248 @@ +// Package webhook implements the HTTP webhook Source. +// +// Listens on a loopback address and routes incoming POSTs to inbox events +// according to a routing table. Each route specifies recipient, type, +// priority, and a Go text/template that renders the payload from the request +// body decoded as JSON. +// +// v1 is loopback-only (127.0.0.1) — no authentication. The spec calls for +// Caddy + bearer-token reverse-proxy as the v2 upgrade path. Do NOT bind to +// 0.0.0.0 in v1. +// +// Per spec §3.5, the same listener also exposes /health with per-source +// counters. +package webhook + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "net" + "net/http" + "sync/atomic" + "text/template" + "time" + + "git.botbought.ai/foreman/agent-watcher/internal/inbox" + "git.botbought.ai/foreman/agent-watcher/internal/source" +) + +// Route maps a URL path to an emitted event. +type Route struct { + // Recipient is the target inbox name. + Recipient string + + // Type is one of INFO, NEEDS-RESPONSE, ACK-REQUEST. + Type string + + // Priority is "normal" or "urgent". Empty defaults to "normal". + Priority string + + // PayloadTemplate is a Go text/template rendered with the request body + // (decoded as JSON into a map[string]any) as the dot. Plain strings + // without {{}} render to themselves. + PayloadTemplate string +} + +// Config configures a webhook Source. +type Config struct { + // Listen is the bind address, e.g. "127.0.0.1:18790". + Listen string + + // Routes maps URL path → Route. Path includes the leading slash. + Routes map[string]Route +} + +// Source serves HTTP and emits an inbox event for each matched POST. +type Source struct { + cfg Config + logger *slog.Logger + + tmpls map[string]*template.Template + + // counters + received atomic.Uint64 + emitted atomic.Uint64 + errors atomic.Uint64 + startedAt time.Time +} + +// New parses the route templates and returns a configured Source. +// Returns an error if any template fails to parse. +func New(cfg Config, logger *slog.Logger) (*Source, error) { + if logger == nil { + logger = slog.Default() + } + if cfg.Listen == "" { + return nil, errors.New("webhook: listen address required") + } + if !isLoopback(cfg.Listen) { + return nil, fmt.Errorf("webhook: listen %q is not a loopback address; v1 forbids non-loopback binds", cfg.Listen) + } + + tmpls := make(map[string]*template.Template, len(cfg.Routes)) + for path, r := range cfg.Routes { + if !validTypes[r.Type] { + return nil, fmt.Errorf("webhook: route %s: invalid type %q", path, r.Type) + } + if r.Priority != "" && r.Priority != "normal" && r.Priority != "urgent" { + return nil, fmt.Errorf("webhook: route %s: invalid priority %q", path, r.Priority) + } + t, err := template.New(path).Option("missingkey=zero").Parse(r.PayloadTemplate) + if err != nil { + return nil, fmt.Errorf("webhook: route %s: parse template: %w", path, err) + } + tmpls[path] = t + } + return &Source{ + cfg: cfg, + logger: logger.With("source", "webhook", "listen", cfg.Listen), + tmpls: tmpls, + startedAt: time.Now(), + }, nil +} + +func (s *Source) Name() string { return "webhook" } + +// Run blocks until ctx is canceled. +func (s *Source) Run(ctx context.Context, emit source.Emit) error { + mux := http.NewServeMux() + mux.HandleFunc("/health", s.health) + mux.HandleFunc("/", s.handler(emit)) + + srv := &http.Server{ + Addr: s.cfg.Listen, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + + listenErr := make(chan error, 1) + go func() { + s.logger.Info("listening") + listenErr <- srv.ListenAndServe() + }() + + select { + case <-ctx.Done(): + shutCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + _ = srv.Shutdown(shutCtx) + return ctx.Err() + case err := <-listenErr: + if errors.Is(err, http.ErrServerClosed) { + return nil + } + return fmt.Errorf("webhook: serve: %w", err) + } +} + +func (s *Source) handler(emit source.Emit) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + s.received.Add(1) + + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + route, ok := s.cfg.Routes[r.URL.Path] + if !ok { + http.Error(w, "no route", http.StatusNotFound) + return + } + body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) // 1 MiB cap + if err != nil { + s.errors.Add(1) + s.logger.Warn("read body", "path", r.URL.Path, "err", err) + http.Error(w, "read failed", http.StatusBadRequest) + return + } + var data map[string]any + if len(bytes.TrimSpace(body)) > 0 { + if err := json.Unmarshal(body, &data); err != nil { + s.errors.Add(1) + s.logger.Warn("body not JSON", "path", r.URL.Path, "err", err) + http.Error(w, "body must be JSON", http.StatusBadRequest) + return + } + } else { + data = map[string]any{} + } + + var rendered bytes.Buffer + if err := s.tmpls[r.URL.Path].Execute(&rendered, data); err != nil { + s.errors.Add(1) + s.logger.Warn("template render", "path", r.URL.Path, "err", err) + http.Error(w, "render failed", http.StatusInternalServerError) + return + } + priority := route.Priority + if priority == "" { + priority = "normal" + } + ev := &inbox.Event{ + From: "collector", + Type: route.Type, + Priority: priority, + Payload: rendered.String(), + Source: "webhook:" + r.URL.Path, + } + if err := emit(route.Recipient, ev); err != nil { + s.errors.Add(1) + s.logger.Error("emit", "path", r.URL.Path, "err", err) + http.Error(w, "emit failed", http.StatusInternalServerError) + return + } + s.emitted.Add(1) + w.WriteHeader(http.StatusAccepted) + _, _ = io.WriteString(w, "ok\n") + } +} + +// Stats returns the current counter snapshot. Safe to call from any goroutine. +type Stats struct { + Received uint64 `json:"received"` + Emitted uint64 `json:"emitted"` + Errors uint64 `json:"errors"` + UptimeSec int64 `json:"uptime_sec"` +} + +func (s *Source) Stats() Stats { + return Stats{ + Received: s.received.Load(), + Emitted: s.emitted.Load(), + Errors: s.errors.Load(), + UptimeSec: int64(time.Since(s.startedAt).Seconds()), + } +} + +func (s *Source) health(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(s.Stats()) +} + +// validTypes mirrors agent-ping. Kept here (not imported from dropfolder) so +// the webhook source has no cross-source dependency. +var validTypes = map[string]bool{ + "INFO": true, + "NEEDS-RESPONSE": true, + "ACK-REQUEST": true, +} + +// isLoopback returns true if addr binds only to a loopback interface. +// Accepts "127.0.0.1:port", "[::1]:port", "localhost:port". +func isLoopback(addr string) bool { + host, _, err := net.SplitHostPort(addr) + if err != nil { + return false + } + if host == "localhost" { + return true + } + ip := net.ParseIP(host) + return ip != nil && ip.IsLoopback() +} + diff --git a/internal/source/webhook/webhook_test.go b/internal/source/webhook/webhook_test.go new file mode 100644 index 0000000..0eb47e8 --- /dev/null +++ b/internal/source/webhook/webhook_test.go @@ -0,0 +1,323 @@ +package webhook + +import ( + "context" + "encoding/json" + "io" + "log/slog" + "net" + "net/http" + "strings" + "sync" + "testing" + "time" + + "git.botbought.ai/foreman/agent-watcher/internal/inbox" +) + +func quietLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +// freePort returns an unused loopback "127.0.0.1:N" address. +func freePort(t *testing.T) string { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + addr := l.Addr().String() + l.Close() + return addr +} + +type recordingEmit struct { + mu sync.Mutex + events []record + err error +} + +type record struct { + recipient string + event inbox.Event +} + +func (r *recordingEmit) emit(recipient string, ev *inbox.Event) error { + r.mu.Lock() + defer r.mu.Unlock() + if r.err != nil { + return r.err + } + r.events = append(r.events, record{recipient, *ev}) + return nil +} + +func (r *recordingEmit) snap() []record { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]record, len(r.events)) + copy(out, r.events) + return out +} + +func runSource(t *testing.T, cfg Config, emit func(string, *inbox.Event) error) (string, context.CancelFunc) { + t.Helper() + src, err := New(cfg, quietLogger()) + if err != nil { + t.Fatal(err) + } + ctx, cancel := context.WithCancel(context.Background()) + go src.Run(ctx, emit) + // Wait for listener. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + c, err := net.DialTimeout("tcp", cfg.Listen, 50*time.Millisecond) + if err == nil { + c.Close() + return cfg.Listen, cancel + } + time.Sleep(20 * time.Millisecond) + } + cancel() + t.Fatal("listener never came up") + return "", nil +} + +func TestNew_RejectsNonLoopback(t *testing.T) { + _, err := New(Config{Listen: "0.0.0.0:18790"}, quietLogger()) + if err == nil { + t.Error("expected error for non-loopback") + } +} + +func TestNew_RejectsBadType(t *testing.T) { + _, err := New(Config{ + Listen: "127.0.0.1:0", + Routes: map[string]Route{ + "/x": {Recipient: "r", Type: "BOGUS", PayloadTemplate: "x"}, + }, + }, quietLogger()) + if err == nil { + t.Error("expected error for bad type") + } +} + +func TestNew_RejectsBadTemplate(t *testing.T) { + _, err := New(Config{ + Listen: "127.0.0.1:0", + Routes: map[string]Route{ + "/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "{{ .unclosed"}, + }, + }, quietLogger()) + if err == nil { + t.Error("expected error for bad template") + } +} + +func TestPOST_RoutesAndEmits(t *testing.T) { + addr := freePort(t) + cfg := Config{ + Listen: addr, + Routes: map[string]Route{ + "/forgejo/push": { + Recipient: "bob", + Type: "INFO", + PayloadTemplate: "forgejo push to {{ .repo }} by {{ .actor }}", + }, + }, + } + rec := &recordingEmit{} + _, cancel := runSource(t, cfg, rec.emit) + defer cancel() + + body := `{"repo":"agent-ping","actor":"angus"}` + resp, err := http.Post("http://"+addr+"/forgejo/push", "application/json", strings.NewReader(body)) + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusAccepted { + t.Errorf("status = %d, want 202", resp.StatusCode) + } + + got := rec.snap() + if len(got) != 1 { + t.Fatalf("got %d events, want 1", len(got)) + } + if got[0].recipient != "bob" { + t.Errorf("recipient = %q", got[0].recipient) + } + if got[0].event.Type != "INFO" { + t.Errorf("type = %q", got[0].event.Type) + } + if got[0].event.Priority != "normal" { + t.Errorf("priority default = %q", got[0].event.Priority) + } + if got[0].event.Payload != "forgejo push to agent-ping by angus" { + t.Errorf("payload = %q", got[0].event.Payload) + } + if got[0].event.Source != "webhook:/forgejo/push" { + t.Errorf("source = %q", got[0].event.Source) + } +} + +func TestPOST_PriorityAndEmptyBody(t *testing.T) { + addr := freePort(t) + cfg := Config{ + Listen: addr, + Routes: map[string]Route{ + "/alert": { + Recipient: "bob", + Type: "NEEDS-RESPONSE", + Priority: "urgent", + PayloadTemplate: "alert fired", + }, + }, + } + rec := &recordingEmit{} + _, cancel := runSource(t, cfg, rec.emit) + defer cancel() + + resp, err := http.Post("http://"+addr+"/alert", "application/json", strings.NewReader("")) + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusAccepted { + t.Errorf("status = %d", resp.StatusCode) + } + got := rec.snap()[0] + if got.event.Priority != "urgent" { + t.Errorf("priority = %q", got.event.Priority) + } + if got.event.Payload != "alert fired" { + t.Errorf("payload = %q", got.event.Payload) + } +} + +func TestPOST_404OnUnknownRoute(t *testing.T) { + addr := freePort(t) + cfg := Config{Listen: addr, Routes: map[string]Route{}} + rec := &recordingEmit{} + _, cancel := runSource(t, cfg, rec.emit) + defer cancel() + + resp, err := http.Post("http://"+addr+"/nope", "application/json", strings.NewReader("{}")) + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusNotFound { + t.Errorf("status = %d, want 404", resp.StatusCode) + } + if len(rec.snap()) != 0 { + t.Error("emitted on unknown route") + } +} + +func TestPOST_400OnBadJSON(t *testing.T) { + addr := freePort(t) + cfg := Config{ + Listen: addr, + Routes: map[string]Route{ + "/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"}, + }, + } + rec := &recordingEmit{} + _, cancel := runSource(t, cfg, rec.emit) + defer cancel() + + resp, err := http.Post("http://"+addr+"/x", "application/json", strings.NewReader("not json")) + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + t.Errorf("status = %d, want 400", resp.StatusCode) + } +} + +func TestPOST_500OnEmitFailure(t *testing.T) { + addr := freePort(t) + cfg := Config{ + Listen: addr, + Routes: map[string]Route{ + "/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"}, + }, + } + rec := &recordingEmit{err: errOnce()} + _, cancel := runSource(t, cfg, rec.emit) + defer cancel() + + resp, err := http.Post("http://"+addr+"/x", "application/json", strings.NewReader("{}")) + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusInternalServerError { + t.Errorf("status = %d, want 500", resp.StatusCode) + } +} + +func TestGET_MethodNotAllowed(t *testing.T) { + addr := freePort(t) + cfg := Config{ + Listen: addr, + Routes: map[string]Route{ + "/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"}, + }, + } + rec := &recordingEmit{} + _, cancel := runSource(t, cfg, rec.emit) + defer cancel() + + resp, err := http.Get("http://" + addr + "/x") + if err != nil { + t.Fatal(err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusMethodNotAllowed { + t.Errorf("status = %d", resp.StatusCode) + } +} + +func TestGET_HealthEndpoint(t *testing.T) { + addr := freePort(t) + cfg := Config{ + Listen: addr, + Routes: map[string]Route{ + "/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"}, + }, + } + rec := &recordingEmit{} + _, cancel := runSource(t, cfg, rec.emit) + defer cancel() + + // Two POSTs: 1 success, 1 fail. + http.Post("http://"+addr+"/x", "application/json", strings.NewReader("{}")) + http.Post("http://"+addr+"/wrongpath", "application/json", strings.NewReader("{}")) + + resp, err := http.Get("http://" + addr + "/health") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("status = %d", resp.StatusCode) + } + var stats Stats + if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil { + t.Fatal(err) + } + if stats.Received < 2 { + t.Errorf("received = %d", stats.Received) + } + if stats.Emitted != 1 { + t.Errorf("emitted = %d, want 1", stats.Emitted) + } +} + +type emitErr struct{} + +func (e *emitErr) Error() string { return "emit boom" } +func errOnce() error { return &emitErr{} }