Collector v0 (Layer 1) #1

Merged
foreman merged 7 commits from foreman/collector-scaffold into main 2026-05-06 17:47:49 -03:00
2 changed files with 571 additions and 0 deletions
Showing only changes of commit ba6db7c82f - Show all commits

View file

@ -0,0 +1,248 @@
// Package webhook implements the HTTP webhook Source.
//
// Listens on a loopback address and routes incoming POSTs to inbox events
// according to a routing table. Each route specifies recipient, type,
// priority, and a Go text/template that renders the payload from the request
// body decoded as JSON.
//
// v1 is loopback-only (127.0.0.1) — no authentication. The spec calls for
// Caddy + bearer-token reverse-proxy as the v2 upgrade path. Do NOT bind to
// 0.0.0.0 in v1.
//
// Per spec §3.5, the same listener also exposes /health with per-source
// counters.
package webhook
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"sync/atomic"
"text/template"
"time"
"git.botbought.ai/foreman/agent-watcher/internal/inbox"
"git.botbought.ai/foreman/agent-watcher/internal/source"
)
// Route maps a URL path to an emitted event.
type Route struct {
// Recipient is the target inbox name.
Recipient string
// Type is one of INFO, NEEDS-RESPONSE, ACK-REQUEST.
Type string
// Priority is "normal" or "urgent". Empty defaults to "normal".
Priority string
// PayloadTemplate is a Go text/template rendered with the request body
// (decoded as JSON into a map[string]any) as the dot. Plain strings
// without {{}} render to themselves.
PayloadTemplate string
}
// Config configures a webhook Source.
type Config struct {
// Listen is the bind address, e.g. "127.0.0.1:18790".
Listen string
// Routes maps URL path → Route. Path includes the leading slash.
Routes map[string]Route
}
// Source serves HTTP and emits an inbox event for each matched POST.
type Source struct {
cfg Config
logger *slog.Logger
tmpls map[string]*template.Template
// counters
received atomic.Uint64
emitted atomic.Uint64
errors atomic.Uint64
startedAt time.Time
}
// New parses the route templates and returns a configured Source.
// Returns an error if any template fails to parse.
func New(cfg Config, logger *slog.Logger) (*Source, error) {
if logger == nil {
logger = slog.Default()
}
if cfg.Listen == "" {
return nil, errors.New("webhook: listen address required")
}
if !isLoopback(cfg.Listen) {
return nil, fmt.Errorf("webhook: listen %q is not a loopback address; v1 forbids non-loopback binds", cfg.Listen)
}
tmpls := make(map[string]*template.Template, len(cfg.Routes))
for path, r := range cfg.Routes {
if !validTypes[r.Type] {
return nil, fmt.Errorf("webhook: route %s: invalid type %q", path, r.Type)
}
if r.Priority != "" && r.Priority != "normal" && r.Priority != "urgent" {
return nil, fmt.Errorf("webhook: route %s: invalid priority %q", path, r.Priority)
}
t, err := template.New(path).Option("missingkey=zero").Parse(r.PayloadTemplate)
if err != nil {
return nil, fmt.Errorf("webhook: route %s: parse template: %w", path, err)
}
tmpls[path] = t
}
return &Source{
cfg: cfg,
logger: logger.With("source", "webhook", "listen", cfg.Listen),
tmpls: tmpls,
startedAt: time.Now(),
}, nil
}
func (s *Source) Name() string { return "webhook" }
// Run blocks until ctx is canceled.
func (s *Source) Run(ctx context.Context, emit source.Emit) error {
mux := http.NewServeMux()
mux.HandleFunc("/health", s.health)
mux.HandleFunc("/", s.handler(emit))
srv := &http.Server{
Addr: s.cfg.Listen,
Handler: mux,
ReadHeaderTimeout: 5 * time.Second,
}
listenErr := make(chan error, 1)
go func() {
s.logger.Info("listening")
listenErr <- srv.ListenAndServe()
}()
select {
case <-ctx.Done():
shutCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
_ = srv.Shutdown(shutCtx)
return ctx.Err()
case err := <-listenErr:
if errors.Is(err, http.ErrServerClosed) {
return nil
}
return fmt.Errorf("webhook: serve: %w", err)
}
}
func (s *Source) handler(emit source.Emit) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
s.received.Add(1)
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
route, ok := s.cfg.Routes[r.URL.Path]
if !ok {
http.Error(w, "no route", http.StatusNotFound)
return
}
body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) // 1 MiB cap
if err != nil {
s.errors.Add(1)
s.logger.Warn("read body", "path", r.URL.Path, "err", err)
http.Error(w, "read failed", http.StatusBadRequest)
return
}
var data map[string]any
if len(bytes.TrimSpace(body)) > 0 {
if err := json.Unmarshal(body, &data); err != nil {
s.errors.Add(1)
s.logger.Warn("body not JSON", "path", r.URL.Path, "err", err)
http.Error(w, "body must be JSON", http.StatusBadRequest)
return
}
} else {
data = map[string]any{}
}
var rendered bytes.Buffer
if err := s.tmpls[r.URL.Path].Execute(&rendered, data); err != nil {
s.errors.Add(1)
s.logger.Warn("template render", "path", r.URL.Path, "err", err)
http.Error(w, "render failed", http.StatusInternalServerError)
return
}
priority := route.Priority
if priority == "" {
priority = "normal"
}
ev := &inbox.Event{
From: "collector",
Type: route.Type,
Priority: priority,
Payload: rendered.String(),
Source: "webhook:" + r.URL.Path,
}
if err := emit(route.Recipient, ev); err != nil {
s.errors.Add(1)
s.logger.Error("emit", "path", r.URL.Path, "err", err)
http.Error(w, "emit failed", http.StatusInternalServerError)
return
}
s.emitted.Add(1)
w.WriteHeader(http.StatusAccepted)
_, _ = io.WriteString(w, "ok\n")
}
}
// Stats returns the current counter snapshot. Safe to call from any goroutine.
type Stats struct {
Received uint64 `json:"received"`
Emitted uint64 `json:"emitted"`
Errors uint64 `json:"errors"`
UptimeSec int64 `json:"uptime_sec"`
}
func (s *Source) Stats() Stats {
return Stats{
Received: s.received.Load(),
Emitted: s.emitted.Load(),
Errors: s.errors.Load(),
UptimeSec: int64(time.Since(s.startedAt).Seconds()),
}
}
func (s *Source) health(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(s.Stats())
}
// validTypes mirrors agent-ping. Kept here (not imported from dropfolder) so
// the webhook source has no cross-source dependency.
var validTypes = map[string]bool{
"INFO": true,
"NEEDS-RESPONSE": true,
"ACK-REQUEST": true,
}
// isLoopback returns true if addr binds only to a loopback interface.
// Accepts "127.0.0.1:port", "[::1]:port", "localhost:port".
func isLoopback(addr string) bool {
host, _, err := net.SplitHostPort(addr)
if err != nil {
return false
}
if host == "localhost" {
return true
}
ip := net.ParseIP(host)
return ip != nil && ip.IsLoopback()
}

View file

@ -0,0 +1,323 @@
package webhook
import (
"context"
"encoding/json"
"io"
"log/slog"
"net"
"net/http"
"strings"
"sync"
"testing"
"time"
"git.botbought.ai/foreman/agent-watcher/internal/inbox"
)
func quietLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(io.Discard, nil))
}
// freePort returns an unused loopback "127.0.0.1:N" address.
func freePort(t *testing.T) string {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
addr := l.Addr().String()
l.Close()
return addr
}
type recordingEmit struct {
mu sync.Mutex
events []record
err error
}
type record struct {
recipient string
event inbox.Event
}
func (r *recordingEmit) emit(recipient string, ev *inbox.Event) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.err != nil {
return r.err
}
r.events = append(r.events, record{recipient, *ev})
return nil
}
func (r *recordingEmit) snap() []record {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]record, len(r.events))
copy(out, r.events)
return out
}
func runSource(t *testing.T, cfg Config, emit func(string, *inbox.Event) error) (string, context.CancelFunc) {
t.Helper()
src, err := New(cfg, quietLogger())
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
go src.Run(ctx, emit)
// Wait for listener.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
c, err := net.DialTimeout("tcp", cfg.Listen, 50*time.Millisecond)
if err == nil {
c.Close()
return cfg.Listen, cancel
}
time.Sleep(20 * time.Millisecond)
}
cancel()
t.Fatal("listener never came up")
return "", nil
}
func TestNew_RejectsNonLoopback(t *testing.T) {
_, err := New(Config{Listen: "0.0.0.0:18790"}, quietLogger())
if err == nil {
t.Error("expected error for non-loopback")
}
}
func TestNew_RejectsBadType(t *testing.T) {
_, err := New(Config{
Listen: "127.0.0.1:0",
Routes: map[string]Route{
"/x": {Recipient: "r", Type: "BOGUS", PayloadTemplate: "x"},
},
}, quietLogger())
if err == nil {
t.Error("expected error for bad type")
}
}
func TestNew_RejectsBadTemplate(t *testing.T) {
_, err := New(Config{
Listen: "127.0.0.1:0",
Routes: map[string]Route{
"/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "{{ .unclosed"},
},
}, quietLogger())
if err == nil {
t.Error("expected error for bad template")
}
}
func TestPOST_RoutesAndEmits(t *testing.T) {
addr := freePort(t)
cfg := Config{
Listen: addr,
Routes: map[string]Route{
"/forgejo/push": {
Recipient: "bob",
Type: "INFO",
PayloadTemplate: "forgejo push to {{ .repo }} by {{ .actor }}",
},
},
}
rec := &recordingEmit{}
_, cancel := runSource(t, cfg, rec.emit)
defer cancel()
body := `{"repo":"agent-ping","actor":"angus"}`
resp, err := http.Post("http://"+addr+"/forgejo/push", "application/json", strings.NewReader(body))
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Errorf("status = %d, want 202", resp.StatusCode)
}
got := rec.snap()
if len(got) != 1 {
t.Fatalf("got %d events, want 1", len(got))
}
if got[0].recipient != "bob" {
t.Errorf("recipient = %q", got[0].recipient)
}
if got[0].event.Type != "INFO" {
t.Errorf("type = %q", got[0].event.Type)
}
if got[0].event.Priority != "normal" {
t.Errorf("priority default = %q", got[0].event.Priority)
}
if got[0].event.Payload != "forgejo push to agent-ping by angus" {
t.Errorf("payload = %q", got[0].event.Payload)
}
if got[0].event.Source != "webhook:/forgejo/push" {
t.Errorf("source = %q", got[0].event.Source)
}
}
func TestPOST_PriorityAndEmptyBody(t *testing.T) {
addr := freePort(t)
cfg := Config{
Listen: addr,
Routes: map[string]Route{
"/alert": {
Recipient: "bob",
Type: "NEEDS-RESPONSE",
Priority: "urgent",
PayloadTemplate: "alert fired",
},
},
}
rec := &recordingEmit{}
_, cancel := runSource(t, cfg, rec.emit)
defer cancel()
resp, err := http.Post("http://"+addr+"/alert", "application/json", strings.NewReader(""))
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Errorf("status = %d", resp.StatusCode)
}
got := rec.snap()[0]
if got.event.Priority != "urgent" {
t.Errorf("priority = %q", got.event.Priority)
}
if got.event.Payload != "alert fired" {
t.Errorf("payload = %q", got.event.Payload)
}
}
func TestPOST_404OnUnknownRoute(t *testing.T) {
addr := freePort(t)
cfg := Config{Listen: addr, Routes: map[string]Route{}}
rec := &recordingEmit{}
_, cancel := runSource(t, cfg, rec.emit)
defer cancel()
resp, err := http.Post("http://"+addr+"/nope", "application/json", strings.NewReader("{}"))
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusNotFound {
t.Errorf("status = %d, want 404", resp.StatusCode)
}
if len(rec.snap()) != 0 {
t.Error("emitted on unknown route")
}
}
func TestPOST_400OnBadJSON(t *testing.T) {
addr := freePort(t)
cfg := Config{
Listen: addr,
Routes: map[string]Route{
"/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"},
},
}
rec := &recordingEmit{}
_, cancel := runSource(t, cfg, rec.emit)
defer cancel()
resp, err := http.Post("http://"+addr+"/x", "application/json", strings.NewReader("not json"))
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusBadRequest {
t.Errorf("status = %d, want 400", resp.StatusCode)
}
}
func TestPOST_500OnEmitFailure(t *testing.T) {
addr := freePort(t)
cfg := Config{
Listen: addr,
Routes: map[string]Route{
"/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"},
},
}
rec := &recordingEmit{err: errOnce()}
_, cancel := runSource(t, cfg, rec.emit)
defer cancel()
resp, err := http.Post("http://"+addr+"/x", "application/json", strings.NewReader("{}"))
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusInternalServerError {
t.Errorf("status = %d, want 500", resp.StatusCode)
}
}
func TestGET_MethodNotAllowed(t *testing.T) {
addr := freePort(t)
cfg := Config{
Listen: addr,
Routes: map[string]Route{
"/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"},
},
}
rec := &recordingEmit{}
_, cancel := runSource(t, cfg, rec.emit)
defer cancel()
resp, err := http.Get("http://" + addr + "/x")
if err != nil {
t.Fatal(err)
}
resp.Body.Close()
if resp.StatusCode != http.StatusMethodNotAllowed {
t.Errorf("status = %d", resp.StatusCode)
}
}
func TestGET_HealthEndpoint(t *testing.T) {
addr := freePort(t)
cfg := Config{
Listen: addr,
Routes: map[string]Route{
"/x": {Recipient: "r", Type: "INFO", PayloadTemplate: "x"},
},
}
rec := &recordingEmit{}
_, cancel := runSource(t, cfg, rec.emit)
defer cancel()
// Two POSTs: 1 success, 1 fail.
http.Post("http://"+addr+"/x", "application/json", strings.NewReader("{}"))
http.Post("http://"+addr+"/wrongpath", "application/json", strings.NewReader("{}"))
resp, err := http.Get("http://" + addr + "/health")
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("status = %d", resp.StatusCode)
}
var stats Stats
if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil {
t.Fatal(err)
}
if stats.Received < 2 {
t.Errorf("received = %d", stats.Received)
}
if stats.Emitted != 1 {
t.Errorf("emitted = %d, want 1", stats.Emitted)
}
}
type emitErr struct{}
func (e *emitErr) Error() string { return "emit boom" }
func errOnce() error { return &emitErr{} }