From 05b8981ceace764e195b20803848f316f0ecd69a Mon Sep 17 00:00:00 2001 From: Chris Sexton Date: Tue, 24 Dec 2019 07:20:27 -0500 Subject: [PATCH] initial commit --- .gitignore | 98 ++++++++++++++++++++ cmd/lamer/lamer.go | 16 ++++ config/config.go | 197 ++++++++++++++++++++++++++++++++++++++++ config/web.go | 71 +++++++++++++++ event/event.go | 9 ++ go.mod | 15 +++ go.sum | 28 ++++++ lameralert.go | 61 +++++++++++++ sinks/console.go | 59 ++++++++++++ sinks/pushover.go | 81 +++++++++++++++++ sinks/sink.go | 7 ++ sources/generic_rest.go | 50 ++++++++++ sources/rss.go | 118 ++++++++++++++++++++++++ sources/source.go | 8 ++ 14 files changed, 818 insertions(+) create mode 100644 .gitignore create mode 100644 cmd/lamer/lamer.go create mode 100644 config/config.go create mode 100644 config/web.go create mode 100644 event/event.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 lameralert.go create mode 100644 sinks/console.go create mode 100644 sinks/pushover.go create mode 100644 sinks/sink.go create mode 100644 sources/generic_rest.go create mode 100644 sources/rss.go create mode 100644 sources/source.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b33e4b9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,98 @@ + +# Created by https://www.gitignore.io/api/go,linux,macos,windows +# Edit at https://www.gitignore.io/?templates=go,linux,macos,windows + +### Go ### +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +### Go Patch ### +/vendor/ +/Godeps/ + +### Linux ### +*~ + +# temporary files which can be created if a process still has a handle open of a deleted file +.fuse_hidden* + +# KDE directory preferences +.directory + +# Linux trash folder which might appear on any partition or disk +.Trash-* + +# .nfs files are created when an open file is removed but is still being accessed +.nfs* + +### macOS ### +# General +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + +### Windows ### +# Windows thumbnail cache files +Thumbs.db +Thumbs.db:encryptable +ehthumbs.db +ehthumbs_vista.db + +# Dump file +*.stackdump + +# Folder config file +[Dd]esktop.ini + +# Recycle Bin used on file shares +$RECYCLE.BIN/ + +# Windows Installer files +*.cab +*.msi +*.msix +*.msm +*.msp + +# Windows shortcuts +*.lnk + +# End of https://www.gitignore.io/api/go,linux,macos,windows +.idea +cmd/lamer/lamer +lameralert.db diff --git a/cmd/lamer/lamer.go b/cmd/lamer/lamer.go new file mode 100644 index 0000000..48a3e8b --- /dev/null +++ b/cmd/lamer/lamer.go @@ -0,0 +1,16 @@ +package main + +import ( + "flag" + + "github.com/rs/zerolog/log" + + "code.chrissexton.org/cws/lameralert" +) + +func main() { + flag.Parse() + log.Info().Msg("lamer startup") + done := lameralert.Setup() + <-done +} diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..77f561d --- /dev/null +++ b/config/config.go @@ -0,0 +1,197 @@ +package config + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/boltdb/bolt" + "github.com/gorilla/mux" + "github.com/rs/zerolog/log" +) + +var ( + bucket = []byte("config") +) + +type configValue struct { + Key string `json:"key"` + Value interface{} `json:"value"` +} + +func (value configValue) getBytes() ([]byte, error) { + return json.Marshal(value) +} + +func decodeConfig(in []byte) (configValue, error) { + var out configValue + err := json.Unmarshal(in, &out) + return out, err +} + +type Config struct { + // I'm not completely sure if this mutex is necessary. + // It is left here because I'm opening an closing the db + // for every operation. + sync.Mutex + path string +} + +func (c *Config) db() *bolt.DB { + db, err := bolt.Open(c.path, 0666, + &bolt.Options{ + Timeout: 1 * time.Second, + }) + if err != nil { + log.Fatal().Msgf("Could not open database: %s", err) + } + err = db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(bucket) + return err + }) + if err != nil { + log.Fatal().Err(err).Msg("Could not open bucket") + } + return db +} + +func (c *Config) getAll() (map[string]configValue, error) { + c.Lock() + defer c.Unlock() + db := c.db() + defer db.Close() + vals := map[string]configValue{} + err := db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + err := b.ForEach(func(k, v []byte) error { + value, err := decodeConfig(v) + if err != nil { + return fmt.Errorf("could not convert value: %w", err) + } + vals[string(k)] = value + return nil + }) + if err != nil { + return fmt.Errorf("error in forEach: %w", err) + } + return nil + }) + return vals, err +} + +func (c *Config) get(key string) (configValue, error) { + c.Lock() + defer c.Unlock() + db := c.db() + defer db.Close() + var out configValue + err := db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + value, err := decodeConfig(b.Get([]byte(key))) + if err != nil { + return err + } + out = value + return nil + }) + if err != nil { + return configValue{}, err + } + return out, nil +} + +func (c *Config) set(key string, value configValue) error { + c.Lock() + defer c.Unlock() + db := c.db() + defer db.Close() + err := db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + v, err := value.getBytes() + if err != nil { + return err + } + err = b.Put([]byte(key), v) + return err + }) + if err != nil { + log.Error().Msgf("Error setting key: %s", err) + } + return err +} + +func (c *Config) delete(key string) error { + c.Lock() + defer c.Unlock() + db := c.db() + defer db.Close() + err := db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + return b.Delete([]byte(key)) + }) + return err +} + +// Public interface + +func New(r *mux.Router) *Config { + c := &Config{ + path: "lameralert.db", + } + c.setup(r) + return c +} + +func (c *Config) GetInt64(key string, defaultValue int64) int64 { + v, err := c.get(key) + if err != nil { + return defaultValue + } + return int64(v.Value.(float64)) +} + +func (c *Config) GetInt(key string, defaultValue int) int { + v, err := c.get(key) + if err != nil { + return defaultValue + } + return int(v.Value.(float64)) +} + +func (c *Config) GetStringSlice(key string, defaultValue []string) []string { + v, err := c.get(key) + if err != nil { + log.Error().Msgf("GetStringSlice errored looking for %s: %s", key, err) + return defaultValue + } + return interfaceSliceToStringSlice(v.Value.([]interface{})) +} + +func interfaceSliceToStringSlice(in []interface{}) []string { + out := []string{} + for _, it := range in { + out = append(out, it.(string)) + } + return out +} + +func (c *Config) GetStringMap(key string, defaultValue map[string]string) map[string]string { + v, err := c.get(key) + if err != nil { + return defaultValue + } + return v.Value.(map[string]string) +} + +func (c *Config) GetString(key, defaultValue string) string { + v, err := c.get(key) + if err != nil { + return defaultValue + } + return v.Value.(string) +} + +func (c *Config) Set(key string, value interface{}) error { + return c.set(key, configValue{key, value}) +} diff --git a/config/web.go b/config/web.go new file mode 100644 index 0000000..e721c0c --- /dev/null +++ b/config/web.go @@ -0,0 +1,71 @@ +package config + +import ( + "encoding/json" + "net/http" + + "github.com/gorilla/mux" + "github.com/rs/zerolog/log" +) + +func (c *Config) setup(r *mux.Router) { + r.HandleFunc("/{key}", c.deleteValue).Methods(http.MethodDelete) + r.HandleFunc("/{key}", c.getValue).Methods(http.MethodGet) + r.HandleFunc("/", c.getConfigValues).Methods(http.MethodGet) + r.HandleFunc("/", c.postConfigValue).Methods(http.MethodPost) +} + +func (c *Config) getConfigValues(w http.ResponseWriter, r *http.Request) { + log.Debug().Msg("getConfigValues") + vals, err := c.getAll() + if err != nil { + http.Error(w, mkJSONError(err.Error()), http.StatusInternalServerError) + } + json.NewEncoder(w).Encode(vals) +} + +func (c *Config) postConfigValue(w http.ResponseWriter, r *http.Request) { + log.Debug().Msg("postConfigValues") + configPost := configValue{} + err := json.NewDecoder(r.Body).Decode(&configPost) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + } + c.set(configPost.Key, configPost) + resp := struct { + Status string `json:"status"` + }{"success"} + log.Debug().Msgf("post success, returning struct %v", resp) + err = json.NewEncoder(w).Encode(resp) + if err != nil { + http.Error(w, mkJSONError(err.Error()), http.StatusInternalServerError) + } +} + +func (c *Config) deleteValue(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + log.Debug().Msgf("deleteValue %s", vars["key"]) + err := c.delete(vars["key"]) + if err != nil { + http.Error(w, mkJSONError(err.Error()), http.StatusInternalServerError) + } + http.Error(w, "", http.StatusNoContent) +} + +func (c *Config) getValue(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + log.Debug().Msgf("getValue %s", vars["key"]) + v, err := c.get(vars["key"]) + if err != nil { + http.Error(w, mkJSONError(err.Error()), http.StatusNotFound) + return + } + json.NewEncoder(w).Encode(v) +} + +func mkJSONError(msg string) string { + j, _ := json.Marshal(struct { + Error string `json:"error"` + }{msg}) + return string(j) +} diff --git a/event/event.go b/event/event.go new file mode 100644 index 0000000..1aac23d --- /dev/null +++ b/event/event.go @@ -0,0 +1,9 @@ +package event + +import "time" + +type Event struct { + Time time.Time + Error error + Payload map[string]string +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..95ca625 --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module code.chrissexton.org/cws/lameralert + +go 1.13 + +require ( + github.com/PuerkitoBio/goquery v1.5.0 // indirect + github.com/alash3al/go-pubsub v0.0.0-20190804113747-4147c4964332 + github.com/boltdb/bolt v1.3.1 + github.com/gorilla/mux v1.7.3 + github.com/gregdel/pushover v0.0.0-20190217183207-15d3fef40636 + github.com/mmcdole/gofeed v1.0.0-beta2 + github.com/mmcdole/goxpp v0.0.0-20181012175147-0068e33feabf // indirect + github.com/rs/zerolog v1.17.2 + golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..ac186c0 --- /dev/null +++ b/go.sum @@ -0,0 +1,28 @@ +github.com/PuerkitoBio/goquery v1.5.0/go.mod h1:qD2PgZ9lccMbQlc7eEOjaeRlFQON7xY8kdmcsrnKqMg= +github.com/alash3al/go-pubsub v0.0.0-20190804113747-4147c4964332 h1:Kgkjunxmj2d7DSIg0joTQcRRuKnT0VfwtQVwMdHGKqo= +github.com/alash3al/go-pubsub v0.0.0-20190804113747-4147c4964332/go.mod h1:OFNZAVEGEivuAfXw6Kc5539cV2jhjycsoSmXKwf9yBs= +github.com/andybalholm/cascadia v1.0.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= +github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gregdel/pushover v0.0.0-20190217183207-15d3fef40636 h1:6agUllU8gUNAallyB+afeLXMRLL6Q1z+S6YC7Pi1EIY= +github.com/gregdel/pushover v0.0.0-20190217183207-15d3fef40636/go.mod h1:NbuXd8Iwy5dU99qFToB8mSE29qJOQNW/bphiV8CWj/k= +github.com/mmcdole/gofeed v1.0.0-beta2/go.mod h1:/BF9JneEL2/flujm8XHoxUcghdTV6vvb3xx/vKyChFU= +github.com/mmcdole/goxpp v0.0.0-20181012175147-0068e33feabf/go.mod h1:pasqhqstspkosTneA62Nc+2p9SOBBYAPbnmRRWPQ0V8= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= +github.com/rs/zerolog v1.17.2 h1:RMRHFw2+wF7LO0QqtELQwo8hqSmqISyCJeFeAAuWcRo= +github.com/rs/zerolog v1.17.2/go.mod h1:9nvC1axdVrAHcu/s9taAVfBuIdTZLVQmKQyvrUjF5+I= +github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/lameralert.go b/lameralert.go new file mode 100644 index 0000000..a294d52 --- /dev/null +++ b/lameralert.go @@ -0,0 +1,61 @@ +package lameralert + +import ( + "net/http" + + "github.com/gorilla/mux" + "github.com/rs/zerolog/log" + + "code.chrissexton.org/cws/lameralert/config" + "code.chrissexton.org/cws/lameralert/event" + "code.chrissexton.org/cws/lameralert/sinks" + "code.chrissexton.org/cws/lameralert/sources" +) + +type Connection struct { + source sources.Source + sink sinks.Sink + options map[string]interface{} +} + +func NewConnection(from sources.Source, to sinks.Sink) Connection { + for _, topic := range from.GetTopics() { + to.Subscribe(from.GetSender(), topic) + } + return Connection{ + source: from, + sink: to, + } +} + +var mapping = map[chan event.Event][]chan event.Event{} + +func logger(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Do stuff here + log.Debug().Msgf("%s - %s", r.Method, r.RequestURI) + // Call the next handler, which can be another middleware in the chain, or the final handler. + next.ServeHTTP(w, r) + }) +} +func Setup() chan bool { + r := mux.NewRouter() + r.Use(logger) + c := config.New(r.PathPrefix("/config").Subrouter()) + ch := make(chan bool) + + h := sources.NewGenericRest(r.PathPrefix("/rest").Subrouter()) + NewConnection(h, sinks.NewConsoleID(c, "00")) + NewConnection(h, sinks.NewConsoleID(c, "01")) + + rss := sources.NewRSS(c, r.PathPrefix("/rss/webshit").Subrouter(), "webshit", "http://n-gate.com/hackernews/index.rss") + NewConnection(rss, sinks.NewConsoleID(c, "webshit-sink")) + if push, err := sinks.NewPushover(c, "webshit-push"); err != nil { + log.Fatal().Msgf("error: %s", err) + } else { + NewConnection(rss, push) + } + + http.ListenAndServe(":9090", r) + return ch +} diff --git a/sinks/console.go b/sinks/console.go new file mode 100644 index 0000000..fc1f8a1 --- /dev/null +++ b/sinks/console.go @@ -0,0 +1,59 @@ +package sinks + +import ( + "bytes" + "fmt" + "text/template" + "time" + + pubsub "github.com/alash3al/go-pubsub" + "github.com/rs/zerolog/log" + + "code.chrissexton.org/cws/lameralert/config" +) + +type Console struct { + id string + sub *pubsub.Subscriber + c *config.Config + cfgRoot string +} + +func NewConsoleID(config *config.Config, id string) *Console { + c := &Console{id: id, c: config, cfgRoot: fmt.Sprintf("console.%s", id)} + go c.serve() + return c +} + +func (c *Console) Subscribe(broker *pubsub.Broker, topic string) error { + var err error + c.sub, err = broker.Attach() + if err != nil { + log.Error().Err(err).Msg("Error attaching to broker") + return err + } + broker.Subscribe(c.sub, topic) + return nil +} + +func (c *Console) serve() { + wait := 1 + for c.sub == nil { + t := time.NewTimer(time.Duration(wait) * time.Second) + <-t.C + wait *= 2 + } + log.Info().Msgf("Console started up with wait %d", wait) + for { + select { + case ev := <-c.sub.GetMessages(): + tpl := c.c.GetString(c.cfgRoot+".tpl", "{{.}}") + t := template.Must(template.New(c.cfgRoot).Parse(tpl)) + buf := bytes.Buffer{} + if err := t.Execute(&buf, ev.GetPayload()); err != nil { + fmt.Printf("%s could not execute template: %s", c.id, err) + } + fmt.Printf("%s %s\n", c.id, buf.String()) + } + } +} diff --git a/sinks/pushover.go b/sinks/pushover.go new file mode 100644 index 0000000..3bbd364 --- /dev/null +++ b/sinks/pushover.go @@ -0,0 +1,81 @@ +package sinks + +import ( + "bytes" + "fmt" + "text/template" + "time" + + pubsub "github.com/alash3al/go-pubsub" + "github.com/gregdel/pushover" + "github.com/rs/zerolog/log" + + "code.chrissexton.org/cws/lameralert/config" +) + +type Pushover struct { + c *config.Config + sub *pubsub.Subscriber + cfgRoot string + name string + p *pushover.Pushover +} + +func NewPushover(config *config.Config, name string) (*Pushover, error) { + token := config.GetString("pushover.token", "") + if token == "" { + err := fmt.Errorf("token is empty") + log.Error().Msgf("Could not create pushover for %s: %s.", name, err) + return nil, err + } + p := &Pushover{ + c: config, + name: name, + cfgRoot: fmt.Sprintf("pushover.%s", name), + p: pushover.New(token), + } + go p.serve() + return p, nil +} + +func (p *Pushover) Subscribe(broker *pubsub.Broker, topic string) error { + var err error + p.sub, err = broker.Attach() + if err != nil { + log.Error().Err(err).Msg("Error attaching to broker") + return err + } + broker.Subscribe(p.sub, topic) + return nil +} + +func (p *Pushover) serve() { + wait := 1 + for p.sub == nil { + t := time.NewTimer(time.Duration(wait) * time.Second) + <-t.C + wait *= 2 + } + for { + select { + case ev := <-p.sub.GetMessages(): + tpl := p.c.GetString(p.cfgRoot+".tpl", "{{.}}") + t := template.Must(template.New(p.cfgRoot).Parse(tpl)) + buf := bytes.Buffer{} + if err := t.Execute(&buf, ev.GetPayload()); err != nil { + fmt.Printf("%s could not execute template: %s", p.name, err) + } + recipients := p.c.GetStringSlice(p.cfgRoot+".recipients", []string{}) + log.Debug().Msgf("%s got a message, pushing to %v", p.name, recipients) + msg := pushover.NewMessage(buf.String()) + for _, r := range recipients { + to := pushover.NewRecipient(r) + _, err := p.p.SendMessage(msg, to) + if err != nil { + log.Error().Msgf("Error pushing %s to %s: %s", p.name, r, err) + } + log.Info().Msgf("%s sent pushover to %s", p.name, r) + } + } + } +} diff --git a/sinks/sink.go b/sinks/sink.go new file mode 100644 index 0000000..4935b25 --- /dev/null +++ b/sinks/sink.go @@ -0,0 +1,7 @@ +package sinks + +import pubsub "github.com/alash3al/go-pubsub" + +type Sink interface { + Subscribe(broker *pubsub.Broker, topic string) error +} diff --git a/sources/generic_rest.go b/sources/generic_rest.go new file mode 100644 index 0000000..ad82c86 --- /dev/null +++ b/sources/generic_rest.go @@ -0,0 +1,50 @@ +package sources + +import ( + "io/ioutil" + "net/http" + "time" + + pubsub "github.com/alash3al/go-pubsub" + "github.com/gorilla/mux" + + "code.chrissexton.org/cws/lameralert/event" +) + +type GenericRest struct { + broker *pubsub.Broker + router *mux.Router +} + +func (gr *GenericRest) GetTopics() []string { + return []string{"REST"} +} + +func NewGenericRest(r *mux.Router) *GenericRest { + gr := &GenericRest{ + router: r, + broker: pubsub.NewBroker(), + } + go gr.serve() + return gr +} + +func (gr *GenericRest) GetSender() *pubsub.Broker { + return gr.broker +} + +func (gr *GenericRest) ping(w http.ResponseWriter, r *http.Request) { + body, _ := ioutil.ReadAll(r.Body) + defer r.Body.Close() + payload := map[string]string{"body": string(body)} + ev := event.Event{ + Time: time.Now(), + Error: nil, + Payload: payload, + } + gr.broker.Broadcast(ev, gr.GetTopics()[0]) +} + +func (gr *GenericRest) serve() { + gr.router.HandleFunc("", gr.ping) +} diff --git a/sources/rss.go b/sources/rss.go new file mode 100644 index 0000000..23a5da5 --- /dev/null +++ b/sources/rss.go @@ -0,0 +1,118 @@ +package sources + +import ( + "crypto/sha1" + "fmt" + "net/http" + "time" + + pubsub "github.com/alash3al/go-pubsub" + "github.com/gorilla/mux" + "github.com/mmcdole/gofeed" + "github.com/rs/zerolog/log" + + "code.chrissexton.org/cws/lameralert/config" + "code.chrissexton.org/cws/lameralert/event" +) + +type RSS struct { + broker *pubsub.Broker + router *mux.Router + config *config.Config + configRoot string + url string + name string +} + +func NewRSS(c *config.Config, r *mux.Router, name, url string) *RSS { + rss := RSS{ + broker: pubsub.NewBroker(), + router: r, + config: c, + url: url, + name: name, + configRoot: fmt.Sprintf("rss.%s", name), + } + go rss.serve() + return &rss +} + +func (rss *RSS) GetSender() *pubsub.Broker { + return rss.broker +} + +func (rss *RSS) GetTopics() []string { + return []string{rss.name} +} + +func (rss *RSS) pingHard(w http.ResponseWriter, r *http.Request) { + rss.check(true) + http.Error(w, "", http.StatusNoContent) +} + +func (rss *RSS) ping(w http.ResponseWriter, r *http.Request) { + rss.check(false) + http.Error(w, "", http.StatusNoContent) +} + +func (rss *RSS) serve() { + rss.router.HandleFunc("/force", rss.pingHard) + rss.router.HandleFunc("", rss.ping) + for { + wait := rss.config.GetInt(rss.configRoot+".wait", 900) + t := time.NewTimer(time.Duration(wait) * time.Second) + log.Debug().Msgf("%s waiting %v", rss.configRoot+".wait", wait) + <-t.C + log.Debug().Msgf("%s checking after %v", rss.name, wait) + rss.check(false) + } +} + +func in(item string, slice []string) bool { + for _, it := range slice { + if item == it { + return true + } + } + return false +} + +func hash(item string) string { + return fmt.Sprintf("%x", sha1.Sum([]byte(item))) +} + +func (rss *RSS) check(force bool) { + fp := gofeed.NewParser() + feed, err := fp.ParseURL(rss.url) + if err != nil { + log.Error().Err(err).Msg("error getting RSS") + return + } + + cache := rss.config.GetStringSlice(rss.configRoot+".cache", []string{}) + newCache := []string{} + + if force { + cache = []string{} + } + + for _, it := range feed.Items { + h := hash(it.Title + it.Description) + newCache = append(newCache, h) + if !in(h, cache) { + payload := map[string]string{ + "title": it.Title, + "description": it.Description, + "content": it.Content, + "published": it.Published, + } + ev := event.Event{ + Time: time.Now(), + Payload: payload, + } + rss.broker.Broadcast(ev, rss.name) + } + } + + rss.config.Set(rss.configRoot+".cache", newCache) +} diff --git a/sources/source.go b/sources/source.go new file mode 100644 index 0000000..b95f59e --- /dev/null +++ b/sources/source.go @@ -0,0 +1,8 @@ +package sources + +import pubsub "github.com/alash3al/go-pubsub" + +type Source interface { + GetSender() *pubsub.Broker + GetTopics() []string +}