diff --git a/cmd/agent-watcher/main.go b/cmd/agent-watcher/main.go new file mode 100644 index 0000000..fdca86e --- /dev/null +++ b/cmd/agent-watcher/main.go @@ -0,0 +1,148 @@ +// Command agent-watcher is the Collector daemon. +// +// It loads a YAML config, builds the configured sources, and runs them +// concurrently. Each source converts external events (webhook POSTs, files +// dropped into a folder) into JSONL writes against the configured inbox +// directory. Output is bit-identical to the agent-ping CLI's writes, so the +// existing UserPromptSubmit hook (and the future MCP Watcher) consume the +// stream without distinguishing source. +// +// Default config path: ~/.config/agent-watcher/collector.yaml. Override with +// --config or AGENT_WATCHER_CONFIG. +// +// Signals: SIGINT and SIGTERM trigger graceful shutdown. SIGHUP reload is a +// v2 item — for now restart the unit to pick up config changes. +package main + +import ( + "context" + "flag" + "fmt" + "log/slog" + "os" + "os/signal" + "path/filepath" + "sync" + "syscall" + + "git.botbought.ai/foreman/agent-watcher/internal/config" + "git.botbought.ai/foreman/agent-watcher/internal/inbox" + "git.botbought.ai/foreman/agent-watcher/internal/source" + "git.botbought.ai/foreman/agent-watcher/internal/source/dropfolder" + "git.botbought.ai/foreman/agent-watcher/internal/source/webhook" +) + +func main() { + var ( + configPath = flag.String("config", "", "path to collector.yaml (default: $AGENT_WATCHER_CONFIG or ~/.config/agent-watcher/collector.yaml)") + jsonLog = flag.Bool("json-log", false, "emit logs as JSON (for journald structured fields)") + ) + flag.Parse() + + logger := newLogger(*jsonLog) + + path := resolveConfigPath(*configPath) + cfg, err := config.Load(path) + if err != nil { + logger.Error("config", "err", err) + os.Exit(2) + } + logger = logger.With("agent", cfg.Agent) + logger.Info("config loaded", "path", path, "inbox_dir", cfg.InboxDir) + + if err := run(cfg, logger); err != nil { + logger.Error("run", "err", err) + os.Exit(1) + } +} + +func run(cfg *config.Config, logger *slog.Logger) error { + w, err := inbox.NewWriter(cfg.InboxDir) + if err != nil { + return fmt.Errorf("inbox: %w", err) + } + emit := func(recipient string, ev *inbox.Event) error { + return w.Write(recipient, ev) + } + + sources, err := buildSources(cfg, logger) + if err != nil { + return err + } + if len(sources) == 0 { + return fmt.Errorf("no sources configured") + } + + ctx, cancel := signalContext() + defer cancel() + + var wg sync.WaitGroup + for _, src := range sources { + src := src + wg.Add(1) + go func() { + defer wg.Done() + logger.Info("source starting", "name", src.Name()) + if err := src.Run(ctx, emit); err != nil && err != context.Canceled { + logger.Error("source exited", "name", src.Name(), "err", err) + } else { + logger.Info("source stopped", "name", src.Name()) + } + }() + } + wg.Wait() + return nil +} + +func buildSources(cfg *config.Config, logger *slog.Logger) ([]source.Source, error) { + var out []source.Source + if w := cfg.Sources.Webhook; w != nil { + s, err := webhook.New(webhook.Config{ + Listen: w.Listen, + Routes: w.ToWebhookRouteMap(), + }, logger) + if err != nil { + return nil, fmt.Errorf("webhook: %w", err) + } + out = append(out, s) + } + if d := cfg.Sources.DropFolder; d != nil { + out = append(out, dropfolder.New(dropfolder.Config{ + Path: d.Path, + PollFallbackSeconds: d.PollFallbackSeconds, + }, logger)) + } + return out, nil +} + +func signalContext() (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + cancel() + }() + return ctx, cancel +} + +func resolveConfigPath(flagPath string) string { + if flagPath != "" { + return flagPath + } + if env := os.Getenv("AGENT_WATCHER_CONFIG"); env != "" { + return env + } + home, _ := os.UserHomeDir() + return filepath.Join(home, ".config", "agent-watcher", "collector.yaml") +} + +func newLogger(asJSON bool) *slog.Logger { + var h slog.Handler + if asJSON { + h = slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}) + } else { + h = slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}) + } + return slog.New(h) +} diff --git a/go.mod b/go.mod index ccb4746..2bf7c53 100644 --- a/go.mod +++ b/go.mod @@ -4,4 +4,7 @@ go 1.22 require github.com/fsnotify/fsnotify v1.7.0 -require golang.org/x/sys v0.13.0 // indirect +require ( + golang.org/x/sys v0.13.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index 9e90680..14be547 100644 --- a/go.sum +++ b/go.sum @@ -2,3 +2,6 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..b54f80b --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,179 @@ +// Package config loads the Collector's per-host YAML configuration. +// +// Config layout (spec §3.2): +// +// agent: foreman # this host's identity +// inbox_dir: ~/Nyx/workspace/pings # optional; default shown +// sources: +// webhook: +// listen: 127.0.0.1:18790 +// routes: +// /forgejo/push: +// recipient: bob +// type: INFO +// priority: normal # optional +// payload_template: "..." +// drop_folder: +// path: ~/Nyx/workspace/incoming/ +// poll_fallback_seconds: 30 +// +// Either source may be omitted; the Collector starts with whatever is +// configured. Validation is strict — an unknown top-level field or a +// malformed route returns an error. +package config + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strings" + + "gopkg.in/yaml.v3" + + "git.botbought.ai/foreman/agent-watcher/internal/source/webhook" +) + +// Config is the loaded, validated configuration. +type Config struct { + Agent string `yaml:"agent"` + InboxDir string `yaml:"inbox_dir,omitempty"` + Sources Sources `yaml:"sources"` +} + +// Sources groups per-source-type configuration. +type Sources struct { + Webhook *WebhookConfig `yaml:"webhook,omitempty"` + DropFolder *DropFolderConfig `yaml:"drop_folder,omitempty"` +} + +// WebhookConfig configures the HTTP webhook source. +type WebhookConfig struct { + Listen string `yaml:"listen"` + Routes map[string]WebhookRoute `yaml:"routes"` +} + +// WebhookRoute is one entry in the webhook routing table. +type WebhookRoute struct { + Recipient string `yaml:"recipient"` + Type string `yaml:"type"` + Priority string `yaml:"priority,omitempty"` + PayloadTemplate string `yaml:"payload_template"` +} + +// DropFolderConfig configures the drop-folder source. +type DropFolderConfig struct { + Path string `yaml:"path"` + PollFallbackSeconds int `yaml:"poll_fallback_seconds,omitempty"` +} + +// Default values applied when fields are unset. +const ( + DefaultInboxDir = "~/Nyx/workspace/pings" + DefaultDropPath = "~/Nyx/workspace/incoming" + DefaultWebhookListen = "127.0.0.1:18790" + DefaultPollFallbackSeconds = 30 +) + +// Load parses the YAML file at path, applies defaults, and validates. +func Load(path string) (*Config, error) { + body, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("config: read %s: %w", path, err) + } + return parse(body) +} + +func parse(body []byte) (*Config, error) { + var c Config + dec := yaml.NewDecoder(strings.NewReader(string(body))) + dec.KnownFields(true) + if err := dec.Decode(&c); err != nil { + return nil, fmt.Errorf("config: parse: %w", err) + } + if err := c.applyDefaultsAndValidate(); err != nil { + return nil, err + } + return &c, nil +} + +func (c *Config) applyDefaultsAndValidate() error { + if c.Agent == "" { + return errors.New("config: agent is required") + } + if c.InboxDir == "" { + c.InboxDir = DefaultInboxDir + } + c.InboxDir = expand(c.InboxDir) + + if c.Sources.Webhook == nil && c.Sources.DropFolder == nil { + return errors.New("config: at least one source (webhook or drop_folder) must be configured") + } + + if w := c.Sources.Webhook; w != nil { + if w.Listen == "" { + w.Listen = DefaultWebhookListen + } + if len(w.Routes) == 0 { + return errors.New("config: webhook configured but no routes defined") + } + for path, r := range w.Routes { + if !strings.HasPrefix(path, "/") { + return fmt.Errorf("config: webhook route %q must start with /", path) + } + if r.Recipient == "" { + return fmt.Errorf("config: webhook route %s: recipient required", path) + } + if r.Type == "" { + return fmt.Errorf("config: webhook route %s: type required", path) + } + if r.PayloadTemplate == "" { + return fmt.Errorf("config: webhook route %s: payload_template required", path) + } + } + } + + if d := c.Sources.DropFolder; d != nil { + if d.Path == "" { + d.Path = DefaultDropPath + } + d.Path = expand(d.Path) + if d.PollFallbackSeconds == 0 { + d.PollFallbackSeconds = DefaultPollFallbackSeconds + } + if d.PollFallbackSeconds < 0 { + return errors.New("config: drop_folder.poll_fallback_seconds must be >= 0") + } + } + return nil +} + +// expand expands a leading ~/ to $HOME. +func expand(p string) string { + if !strings.HasPrefix(p, "~/") && p != "~" { + return p + } + home, err := os.UserHomeDir() + if err != nil || home == "" { + return p + } + if p == "~" { + return home + } + return filepath.Join(home, p[2:]) +} + +// ToWebhookRouteMap converts validated routes to the source/webhook.Route +// shape so main.go can hand them straight to webhook.New. +func (w *WebhookConfig) ToWebhookRouteMap() map[string]webhook.Route { + out := make(map[string]webhook.Route, len(w.Routes)) + for path, r := range w.Routes { + out[path] = webhook.Route{ + Recipient: r.Recipient, + Type: r.Type, + Priority: r.Priority, + PayloadTemplate: r.PayloadTemplate, + } + } + return out +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..ccae24f --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,168 @@ +package config + +import ( + "strings" + "testing" +) + +func TestParse_FullValid(t *testing.T) { + body := []byte(` +agent: foreman +inbox_dir: /tmp/inbox +sources: + webhook: + listen: 127.0.0.1:18790 + routes: + /forgejo/push: + recipient: bob + type: INFO + payload_template: "push to {{ .repo }}" + /alert: + recipient: bob + type: NEEDS-RESPONSE + priority: urgent + payload_template: "alert" + drop_folder: + path: /tmp/incoming + poll_fallback_seconds: 60 +`) + c, err := parse(body) + if err != nil { + t.Fatal(err) + } + if c.Agent != "foreman" { + t.Errorf("agent = %q", c.Agent) + } + if c.InboxDir != "/tmp/inbox" { + t.Errorf("inbox = %q", c.InboxDir) + } + if c.Sources.Webhook == nil || len(c.Sources.Webhook.Routes) != 2 { + t.Errorf("webhook routes: %+v", c.Sources.Webhook) + } + if c.Sources.DropFolder == nil || c.Sources.DropFolder.Path != "/tmp/incoming" { + t.Errorf("dropfolder: %+v", c.Sources.DropFolder) + } +} + +func TestParse_AppliesDefaults(t *testing.T) { + body := []byte(` +agent: foreman +sources: + drop_folder: + path: /tmp/x +`) + c, err := parse(body) + if err != nil { + t.Fatal(err) + } + if c.InboxDir == "" { + t.Error("inbox_dir default not applied") + } + if c.Sources.DropFolder.PollFallbackSeconds != DefaultPollFallbackSeconds { + t.Errorf("poll default = %d", c.Sources.DropFolder.PollFallbackSeconds) + } +} + +func TestParse_HomeExpansion(t *testing.T) { + body := []byte(` +agent: foreman +sources: + drop_folder: + path: ~/foo/bar +`) + c, err := parse(body) + if err != nil { + t.Fatal(err) + } + if strings.HasPrefix(c.Sources.DropFolder.Path, "~") { + t.Errorf("~ not expanded: %q", c.Sources.DropFolder.Path) + } + if !strings.HasSuffix(c.Sources.DropFolder.Path, "/foo/bar") { + t.Errorf("expansion lost suffix: %q", c.Sources.DropFolder.Path) + } +} + +func TestParse_RejectsInvalid(t *testing.T) { + cases := map[string]string{ + "missing agent": `sources:\n webhook:\n routes: {}`, + "no sources": `agent: x`, + "empty webhook routes": ` +agent: x +sources: + webhook: + listen: 127.0.0.1:18790 + routes: {} +`, + "route missing recipient": ` +agent: x +sources: + webhook: + routes: + /x: + type: INFO + payload_template: y +`, + "route missing type": ` +agent: x +sources: + webhook: + routes: + /x: + recipient: r + payload_template: y +`, + "route missing template": ` +agent: x +sources: + webhook: + routes: + /x: + recipient: r + type: INFO +`, + "route path no slash": ` +agent: x +sources: + webhook: + routes: + bad: + recipient: r + type: INFO + payload_template: y +`, + "unknown top-level field": ` +agent: x +typo: oops +sources: + drop_folder: + path: /x +`, + "poll negative": ` +agent: x +sources: + drop_folder: + path: /x + poll_fallback_seconds: -1 +`, + } + for name, body := range cases { + t.Run(name, func(t *testing.T) { + if _, err := parse([]byte(body)); err == nil { + t.Errorf("expected error for: %s", name) + } + }) + } +} + +func TestToWebhookRouteMap(t *testing.T) { + w := &WebhookConfig{ + Routes: map[string]WebhookRoute{ + "/x": {Recipient: "r", Type: "INFO", Priority: "urgent", PayloadTemplate: "p"}, + }, + } + got := w.ToWebhookRouteMap() + r := got["/x"] + if r.Recipient != "r" || r.Type != "INFO" || r.Priority != "urgent" || r.PayloadTemplate != "p" { + t.Errorf("conversion lost data: %+v", r) + } +}