package sinks import ( "bytes" "fmt" "text/template" "time" pubsub "github.com/alash3al/go-pubsub" "github.com/rs/zerolog/log" "code.chrissexton.org/cws/lameralert/config" ) type Console struct { id string sub *pubsub.Subscriber c *config.Config cfgRoot string } func NewConsoleID(config *config.Config, id string) *Console { c := &Console{id: id, c: config, cfgRoot: fmt.Sprintf("console.%s", id)} go c.serve() return c } func (c *Console) Subscribe(broker *pubsub.Broker, topic string) error { var err error c.sub, err = broker.Attach() if err != nil { log.Error().Err(err).Msg("Error attaching to broker") return err } broker.Subscribe(c.sub, topic) return nil } func (c *Console) serve() { wait := 1 for c.sub == nil { t := time.NewTimer(time.Duration(wait) * time.Second) <-t.C wait *= 2 } log.Info().Msgf("Console started up with wait %d", wait) for { select { case ev := <-c.sub.GetMessages(): tpl := c.c.GetString(c.cfgRoot+".tpl", "{{.}}") t := template.Must(template.New(c.cfgRoot).Parse(tpl)) buf := bytes.Buffer{} if err := t.Execute(&buf, ev.GetPayload()); err != nil { fmt.Printf("%s could not execute template: %s", c.id, err) } fmt.Printf("%s %s\n", c.id, buf.String()) } } }