initial commit
This commit is contained in:
commit
05b8981cea
|
@ -0,0 +1,98 @@
|
|||
|
||||
# Created by https://www.gitignore.io/api/go,linux,macos,windows
|
||||
# Edit at https://www.gitignore.io/?templates=go,linux,macos,windows
|
||||
|
||||
### Go ###
|
||||
# Binaries for programs and plugins
|
||||
*.exe
|
||||
*.exe~
|
||||
*.dll
|
||||
*.so
|
||||
*.dylib
|
||||
|
||||
# Test binary, built with `go test -c`
|
||||
*.test
|
||||
|
||||
# Output of the go coverage tool, specifically when used with LiteIDE
|
||||
*.out
|
||||
|
||||
# Dependency directories (remove the comment below to include it)
|
||||
# vendor/
|
||||
|
||||
### Go Patch ###
|
||||
/vendor/
|
||||
/Godeps/
|
||||
|
||||
### Linux ###
|
||||
*~
|
||||
|
||||
# temporary files which can be created if a process still has a handle open of a deleted file
|
||||
.fuse_hidden*
|
||||
|
||||
# KDE directory preferences
|
||||
.directory
|
||||
|
||||
# Linux trash folder which might appear on any partition or disk
|
||||
.Trash-*
|
||||
|
||||
# .nfs files are created when an open file is removed but is still being accessed
|
||||
.nfs*
|
||||
|
||||
### macOS ###
|
||||
# General
|
||||
.DS_Store
|
||||
.AppleDouble
|
||||
.LSOverride
|
||||
|
||||
# Icon must end with two \r
|
||||
Icon
|
||||
|
||||
# Thumbnails
|
||||
._*
|
||||
|
||||
# Files that might appear in the root of a volume
|
||||
.DocumentRevisions-V100
|
||||
.fseventsd
|
||||
.Spotlight-V100
|
||||
.TemporaryItems
|
||||
.Trashes
|
||||
.VolumeIcon.icns
|
||||
.com.apple.timemachine.donotpresent
|
||||
|
||||
# Directories potentially created on remote AFP share
|
||||
.AppleDB
|
||||
.AppleDesktop
|
||||
Network Trash Folder
|
||||
Temporary Items
|
||||
.apdisk
|
||||
|
||||
### Windows ###
|
||||
# Windows thumbnail cache files
|
||||
Thumbs.db
|
||||
Thumbs.db:encryptable
|
||||
ehthumbs.db
|
||||
ehthumbs_vista.db
|
||||
|
||||
# Dump file
|
||||
*.stackdump
|
||||
|
||||
# Folder config file
|
||||
[Dd]esktop.ini
|
||||
|
||||
# Recycle Bin used on file shares
|
||||
$RECYCLE.BIN/
|
||||
|
||||
# Windows Installer files
|
||||
*.cab
|
||||
*.msi
|
||||
*.msix
|
||||
*.msm
|
||||
*.msp
|
||||
|
||||
# Windows shortcuts
|
||||
*.lnk
|
||||
|
||||
# End of https://www.gitignore.io/api/go,linux,macos,windows
|
||||
.idea
|
||||
cmd/lamer/lamer
|
||||
lameralert.db
|
|
@ -0,0 +1,16 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"code.chrissexton.org/cws/lameralert"
|
||||
)
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
log.Info().Msg("lamer startup")
|
||||
done := lameralert.Setup()
|
||||
<-done
|
||||
}
|
|
@ -0,0 +1,197 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
var (
|
||||
bucket = []byte("config")
|
||||
)
|
||||
|
||||
type configValue struct {
|
||||
Key string `json:"key"`
|
||||
Value interface{} `json:"value"`
|
||||
}
|
||||
|
||||
func (value configValue) getBytes() ([]byte, error) {
|
||||
return json.Marshal(value)
|
||||
}
|
||||
|
||||
func decodeConfig(in []byte) (configValue, error) {
|
||||
var out configValue
|
||||
err := json.Unmarshal(in, &out)
|
||||
return out, err
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
// I'm not completely sure if this mutex is necessary.
|
||||
// It is left here because I'm opening an closing the db
|
||||
// for every operation.
|
||||
sync.Mutex
|
||||
path string
|
||||
}
|
||||
|
||||
func (c *Config) db() *bolt.DB {
|
||||
db, err := bolt.Open(c.path, 0666,
|
||||
&bolt.Options{
|
||||
Timeout: 1 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal().Msgf("Could not open database: %s", err)
|
||||
}
|
||||
err = db.Update(func(tx *bolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(bucket)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("Could not open bucket")
|
||||
}
|
||||
return db
|
||||
}
|
||||
|
||||
func (c *Config) getAll() (map[string]configValue, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
db := c.db()
|
||||
defer db.Close()
|
||||
vals := map[string]configValue{}
|
||||
err := db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucket)
|
||||
err := b.ForEach(func(k, v []byte) error {
|
||||
value, err := decodeConfig(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not convert value: %w", err)
|
||||
}
|
||||
vals[string(k)] = value
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error in forEach: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return vals, err
|
||||
}
|
||||
|
||||
func (c *Config) get(key string) (configValue, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
db := c.db()
|
||||
defer db.Close()
|
||||
var out configValue
|
||||
err := db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucket)
|
||||
value, err := decodeConfig(b.Get([]byte(key)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
out = value
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return configValue{}, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *Config) set(key string, value configValue) error {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
db := c.db()
|
||||
defer db.Close()
|
||||
err := db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucket)
|
||||
v, err := value.getBytes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = b.Put([]byte(key), v)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
log.Error().Msgf("Error setting key: %s", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Config) delete(key string) error {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
db := c.db()
|
||||
defer db.Close()
|
||||
err := db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucket)
|
||||
return b.Delete([]byte(key))
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// Public interface
|
||||
|
||||
func New(r *mux.Router) *Config {
|
||||
c := &Config{
|
||||
path: "lameralert.db",
|
||||
}
|
||||
c.setup(r)
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Config) GetInt64(key string, defaultValue int64) int64 {
|
||||
v, err := c.get(key)
|
||||
if err != nil {
|
||||
return defaultValue
|
||||
}
|
||||
return int64(v.Value.(float64))
|
||||
}
|
||||
|
||||
func (c *Config) GetInt(key string, defaultValue int) int {
|
||||
v, err := c.get(key)
|
||||
if err != nil {
|
||||
return defaultValue
|
||||
}
|
||||
return int(v.Value.(float64))
|
||||
}
|
||||
|
||||
func (c *Config) GetStringSlice(key string, defaultValue []string) []string {
|
||||
v, err := c.get(key)
|
||||
if err != nil {
|
||||
log.Error().Msgf("GetStringSlice errored looking for %s: %s", key, err)
|
||||
return defaultValue
|
||||
}
|
||||
return interfaceSliceToStringSlice(v.Value.([]interface{}))
|
||||
}
|
||||
|
||||
func interfaceSliceToStringSlice(in []interface{}) []string {
|
||||
out := []string{}
|
||||
for _, it := range in {
|
||||
out = append(out, it.(string))
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (c *Config) GetStringMap(key string, defaultValue map[string]string) map[string]string {
|
||||
v, err := c.get(key)
|
||||
if err != nil {
|
||||
return defaultValue
|
||||
}
|
||||
return v.Value.(map[string]string)
|
||||
}
|
||||
|
||||
func (c *Config) GetString(key, defaultValue string) string {
|
||||
v, err := c.get(key)
|
||||
if err != nil {
|
||||
return defaultValue
|
||||
}
|
||||
return v.Value.(string)
|
||||
}
|
||||
|
||||
func (c *Config) Set(key string, value interface{}) error {
|
||||
return c.set(key, configValue{key, value})
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func (c *Config) setup(r *mux.Router) {
|
||||
r.HandleFunc("/{key}", c.deleteValue).Methods(http.MethodDelete)
|
||||
r.HandleFunc("/{key}", c.getValue).Methods(http.MethodGet)
|
||||
r.HandleFunc("/", c.getConfigValues).Methods(http.MethodGet)
|
||||
r.HandleFunc("/", c.postConfigValue).Methods(http.MethodPost)
|
||||
}
|
||||
|
||||
func (c *Config) getConfigValues(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug().Msg("getConfigValues")
|
||||
vals, err := c.getAll()
|
||||
if err != nil {
|
||||
http.Error(w, mkJSONError(err.Error()), http.StatusInternalServerError)
|
||||
}
|
||||
json.NewEncoder(w).Encode(vals)
|
||||
}
|
||||
|
||||
func (c *Config) postConfigValue(w http.ResponseWriter, r *http.Request) {
|
||||
log.Debug().Msg("postConfigValues")
|
||||
configPost := configValue{}
|
||||
err := json.NewDecoder(r.Body).Decode(&configPost)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
}
|
||||
c.set(configPost.Key, configPost)
|
||||
resp := struct {
|
||||
Status string `json:"status"`
|
||||
}{"success"}
|
||||
log.Debug().Msgf("post success, returning struct %v", resp)
|
||||
err = json.NewEncoder(w).Encode(resp)
|
||||
if err != nil {
|
||||
http.Error(w, mkJSONError(err.Error()), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Config) deleteValue(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
log.Debug().Msgf("deleteValue %s", vars["key"])
|
||||
err := c.delete(vars["key"])
|
||||
if err != nil {
|
||||
http.Error(w, mkJSONError(err.Error()), http.StatusInternalServerError)
|
||||
}
|
||||
http.Error(w, "", http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (c *Config) getValue(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
log.Debug().Msgf("getValue %s", vars["key"])
|
||||
v, err := c.get(vars["key"])
|
||||
if err != nil {
|
||||
http.Error(w, mkJSONError(err.Error()), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
json.NewEncoder(w).Encode(v)
|
||||
}
|
||||
|
||||
func mkJSONError(msg string) string {
|
||||
j, _ := json.Marshal(struct {
|
||||
Error string `json:"error"`
|
||||
}{msg})
|
||||
return string(j)
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
package event
|
||||
|
||||
import "time"
|
||||
|
||||
type Event struct {
|
||||
Time time.Time
|
||||
Error error
|
||||
Payload map[string]string
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
module code.chrissexton.org/cws/lameralert
|
||||
|
||||
go 1.13
|
||||
|
||||
require (
|
||||
github.com/PuerkitoBio/goquery v1.5.0 // indirect
|
||||
github.com/alash3al/go-pubsub v0.0.0-20190804113747-4147c4964332
|
||||
github.com/boltdb/bolt v1.3.1
|
||||
github.com/gorilla/mux v1.7.3
|
||||
github.com/gregdel/pushover v0.0.0-20190217183207-15d3fef40636
|
||||
github.com/mmcdole/gofeed v1.0.0-beta2
|
||||
github.com/mmcdole/goxpp v0.0.0-20181012175147-0068e33feabf // indirect
|
||||
github.com/rs/zerolog v1.17.2
|
||||
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // indirect
|
||||
)
|
|
@ -0,0 +1,28 @@
|
|||
github.com/PuerkitoBio/goquery v1.5.0/go.mod h1:qD2PgZ9lccMbQlc7eEOjaeRlFQON7xY8kdmcsrnKqMg=
|
||||
github.com/alash3al/go-pubsub v0.0.0-20190804113747-4147c4964332 h1:Kgkjunxmj2d7DSIg0joTQcRRuKnT0VfwtQVwMdHGKqo=
|
||||
github.com/alash3al/go-pubsub v0.0.0-20190804113747-4147c4964332/go.mod h1:OFNZAVEGEivuAfXw6Kc5539cV2jhjycsoSmXKwf9yBs=
|
||||
github.com/andybalholm/cascadia v1.0.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
|
||||
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
|
||||
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
|
||||
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
|
||||
github.com/gregdel/pushover v0.0.0-20190217183207-15d3fef40636 h1:6agUllU8gUNAallyB+afeLXMRLL6Q1z+S6YC7Pi1EIY=
|
||||
github.com/gregdel/pushover v0.0.0-20190217183207-15d3fef40636/go.mod h1:NbuXd8Iwy5dU99qFToB8mSE29qJOQNW/bphiV8CWj/k=
|
||||
github.com/mmcdole/gofeed v1.0.0-beta2/go.mod h1:/BF9JneEL2/flujm8XHoxUcghdTV6vvb3xx/vKyChFU=
|
||||
github.com/mmcdole/goxpp v0.0.0-20181012175147-0068e33feabf/go.mod h1:pasqhqstspkosTneA62Nc+2p9SOBBYAPbnmRRWPQ0V8=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
||||
github.com/rs/zerolog v1.17.2 h1:RMRHFw2+wF7LO0QqtELQwo8hqSmqISyCJeFeAAuWcRo=
|
||||
github.com/rs/zerolog v1.17.2/go.mod h1:9nvC1axdVrAHcu/s9taAVfBuIdTZLVQmKQyvrUjF5+I=
|
||||
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ=
|
||||
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
|
@ -0,0 +1,61 @@
|
|||
package lameralert
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"code.chrissexton.org/cws/lameralert/config"
|
||||
"code.chrissexton.org/cws/lameralert/event"
|
||||
"code.chrissexton.org/cws/lameralert/sinks"
|
||||
"code.chrissexton.org/cws/lameralert/sources"
|
||||
)
|
||||
|
||||
type Connection struct {
|
||||
source sources.Source
|
||||
sink sinks.Sink
|
||||
options map[string]interface{}
|
||||
}
|
||||
|
||||
func NewConnection(from sources.Source, to sinks.Sink) Connection {
|
||||
for _, topic := range from.GetTopics() {
|
||||
to.Subscribe(from.GetSender(), topic)
|
||||
}
|
||||
return Connection{
|
||||
source: from,
|
||||
sink: to,
|
||||
}
|
||||
}
|
||||
|
||||
var mapping = map[chan event.Event][]chan event.Event{}
|
||||
|
||||
func logger(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Do stuff here
|
||||
log.Debug().Msgf("%s - %s", r.Method, r.RequestURI)
|
||||
// Call the next handler, which can be another middleware in the chain, or the final handler.
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
func Setup() chan bool {
|
||||
r := mux.NewRouter()
|
||||
r.Use(logger)
|
||||
c := config.New(r.PathPrefix("/config").Subrouter())
|
||||
ch := make(chan bool)
|
||||
|
||||
h := sources.NewGenericRest(r.PathPrefix("/rest").Subrouter())
|
||||
NewConnection(h, sinks.NewConsoleID(c, "00"))
|
||||
NewConnection(h, sinks.NewConsoleID(c, "01"))
|
||||
|
||||
rss := sources.NewRSS(c, r.PathPrefix("/rss/webshit").Subrouter(), "webshit", "http://n-gate.com/hackernews/index.rss")
|
||||
NewConnection(rss, sinks.NewConsoleID(c, "webshit-sink"))
|
||||
if push, err := sinks.NewPushover(c, "webshit-push"); err != nil {
|
||||
log.Fatal().Msgf("error: %s", err)
|
||||
} else {
|
||||
NewConnection(rss, push)
|
||||
}
|
||||
|
||||
http.ListenAndServe(":9090", r)
|
||||
return ch
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
package sinks
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
pubsub "github.com/alash3al/go-pubsub"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"code.chrissexton.org/cws/lameralert/config"
|
||||
)
|
||||
|
||||
type Console struct {
|
||||
id string
|
||||
sub *pubsub.Subscriber
|
||||
c *config.Config
|
||||
cfgRoot string
|
||||
}
|
||||
|
||||
func NewConsoleID(config *config.Config, id string) *Console {
|
||||
c := &Console{id: id, c: config, cfgRoot: fmt.Sprintf("console.%s", id)}
|
||||
go c.serve()
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Console) Subscribe(broker *pubsub.Broker, topic string) error {
|
||||
var err error
|
||||
c.sub, err = broker.Attach()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Error attaching to broker")
|
||||
return err
|
||||
}
|
||||
broker.Subscribe(c.sub, topic)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Console) serve() {
|
||||
wait := 1
|
||||
for c.sub == nil {
|
||||
t := time.NewTimer(time.Duration(wait) * time.Second)
|
||||
<-t.C
|
||||
wait *= 2
|
||||
}
|
||||
log.Info().Msgf("Console started up with wait %d", wait)
|
||||
for {
|
||||
select {
|
||||
case ev := <-c.sub.GetMessages():
|
||||
tpl := c.c.GetString(c.cfgRoot+".tpl", "{{.}}")
|
||||
t := template.Must(template.New(c.cfgRoot).Parse(tpl))
|
||||
buf := bytes.Buffer{}
|
||||
if err := t.Execute(&buf, ev.GetPayload()); err != nil {
|
||||
fmt.Printf("%s could not execute template: %s", c.id, err)
|
||||
}
|
||||
fmt.Printf("%s %s\n", c.id, buf.String())
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
package sinks
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
pubsub "github.com/alash3al/go-pubsub"
|
||||
"github.com/gregdel/pushover"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"code.chrissexton.org/cws/lameralert/config"
|
||||
)
|
||||
|
||||
type Pushover struct {
|
||||
c *config.Config
|
||||
sub *pubsub.Subscriber
|
||||
cfgRoot string
|
||||
name string
|
||||
p *pushover.Pushover
|
||||
}
|
||||
|
||||
func NewPushover(config *config.Config, name string) (*Pushover, error) {
|
||||
token := config.GetString("pushover.token", "")
|
||||
if token == "" {
|
||||
err := fmt.Errorf("token is empty")
|
||||
log.Error().Msgf("Could not create pushover for %s: %s.", name, err)
|
||||
return nil, err
|
||||
}
|
||||
p := &Pushover{
|
||||
c: config,
|
||||
name: name,
|
||||
cfgRoot: fmt.Sprintf("pushover.%s", name),
|
||||
p: pushover.New(token),
|
||||
}
|
||||
go p.serve()
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *Pushover) Subscribe(broker *pubsub.Broker, topic string) error {
|
||||
var err error
|
||||
p.sub, err = broker.Attach()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Error attaching to broker")
|
||||
return err
|
||||
}
|
||||
broker.Subscribe(p.sub, topic)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Pushover) serve() {
|
||||
wait := 1
|
||||
for p.sub == nil {
|
||||
t := time.NewTimer(time.Duration(wait) * time.Second)
|
||||
<-t.C
|
||||
wait *= 2
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case ev := <-p.sub.GetMessages():
|
||||
tpl := p.c.GetString(p.cfgRoot+".tpl", "{{.}}")
|
||||
t := template.Must(template.New(p.cfgRoot).Parse(tpl))
|
||||
buf := bytes.Buffer{}
|
||||
if err := t.Execute(&buf, ev.GetPayload()); err != nil {
|
||||
fmt.Printf("%s could not execute template: %s", p.name, err)
|
||||
}
|
||||
recipients := p.c.GetStringSlice(p.cfgRoot+".recipients", []string{})
|
||||
log.Debug().Msgf("%s got a message, pushing to %v", p.name, recipients)
|
||||
msg := pushover.NewMessage(buf.String())
|
||||
for _, r := range recipients {
|
||||
to := pushover.NewRecipient(r)
|
||||
_, err := p.p.SendMessage(msg, to)
|
||||
if err != nil {
|
||||
log.Error().Msgf("Error pushing %s to %s: %s", p.name, r, err)
|
||||
}
|
||||
log.Info().Msgf("%s sent pushover to %s", p.name, r)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
package sinks
|
||||
|
||||
import pubsub "github.com/alash3al/go-pubsub"
|
||||
|
||||
type Sink interface {
|
||||
Subscribe(broker *pubsub.Broker, topic string) error
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
package sources
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
pubsub "github.com/alash3al/go-pubsub"
|
||||
"github.com/gorilla/mux"
|
||||
|
||||
"code.chrissexton.org/cws/lameralert/event"
|
||||
)
|
||||
|
||||
type GenericRest struct {
|
||||
broker *pubsub.Broker
|
||||
router *mux.Router
|
||||
}
|
||||
|
||||
func (gr *GenericRest) GetTopics() []string {
|
||||
return []string{"REST"}
|
||||
}
|
||||
|
||||
func NewGenericRest(r *mux.Router) *GenericRest {
|
||||
gr := &GenericRest{
|
||||
router: r,
|
||||
broker: pubsub.NewBroker(),
|
||||
}
|
||||
go gr.serve()
|
||||
return gr
|
||||
}
|
||||
|
||||
func (gr *GenericRest) GetSender() *pubsub.Broker {
|
||||
return gr.broker
|
||||
}
|
||||
|
||||
func (gr *GenericRest) ping(w http.ResponseWriter, r *http.Request) {
|
||||
body, _ := ioutil.ReadAll(r.Body)
|
||||
defer r.Body.Close()
|
||||
payload := map[string]string{"body": string(body)}
|
||||
ev := event.Event{
|
||||
Time: time.Now(),
|
||||
Error: nil,
|
||||
Payload: payload,
|
||||
}
|
||||
gr.broker.Broadcast(ev, gr.GetTopics()[0])
|
||||
}
|
||||
|
||||
func (gr *GenericRest) serve() {
|
||||
gr.router.HandleFunc("", gr.ping)
|
||||
}
|
|
@ -0,0 +1,118 @@
|
|||
package sources
|
||||
|
||||
import (
|
||||
"crypto/sha1"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
pubsub "github.com/alash3al/go-pubsub"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/mmcdole/gofeed"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"code.chrissexton.org/cws/lameralert/config"
|
||||
"code.chrissexton.org/cws/lameralert/event"
|
||||
)
|
||||
|
||||
type RSS struct {
|
||||
broker *pubsub.Broker
|
||||
router *mux.Router
|
||||
config *config.Config
|
||||
configRoot string
|
||||
url string
|
||||
name string
|
||||
}
|
||||
|
||||
func NewRSS(c *config.Config, r *mux.Router, name, url string) *RSS {
|
||||
rss := RSS{
|
||||
broker: pubsub.NewBroker(),
|
||||
router: r,
|
||||
config: c,
|
||||
url: url,
|
||||
name: name,
|
||||
configRoot: fmt.Sprintf("rss.%s", name),
|
||||
}
|
||||
go rss.serve()
|
||||
return &rss
|
||||
}
|
||||
|
||||
func (rss *RSS) GetSender() *pubsub.Broker {
|
||||
return rss.broker
|
||||
}
|
||||
|
||||
func (rss *RSS) GetTopics() []string {
|
||||
return []string{rss.name}
|
||||
}
|
||||
|
||||
func (rss *RSS) pingHard(w http.ResponseWriter, r *http.Request) {
|
||||
rss.check(true)
|
||||
http.Error(w, "", http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (rss *RSS) ping(w http.ResponseWriter, r *http.Request) {
|
||||
rss.check(false)
|
||||
http.Error(w, "", http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (rss *RSS) serve() {
|
||||
rss.router.HandleFunc("/force", rss.pingHard)
|
||||
rss.router.HandleFunc("", rss.ping)
|
||||
for {
|
||||
wait := rss.config.GetInt(rss.configRoot+".wait", 900)
|
||||
t := time.NewTimer(time.Duration(wait) * time.Second)
|
||||
log.Debug().Msgf("%s waiting %v", rss.configRoot+".wait", wait)
|
||||
<-t.C
|
||||
log.Debug().Msgf("%s checking after %v", rss.name, wait)
|
||||
rss.check(false)
|
||||
}
|
||||
}
|
||||
|
||||
func in(item string, slice []string) bool {
|
||||
for _, it := range slice {
|
||||
if item == it {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func hash(item string) string {
|
||||
return fmt.Sprintf("%x", sha1.Sum([]byte(item)))
|
||||
}
|
||||
|
||||
func (rss *RSS) check(force bool) {
|
||||
fp := gofeed.NewParser()
|
||||
feed, err := fp.ParseURL(rss.url)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("error getting RSS")
|
||||
return
|
||||
}
|
||||
|
||||
cache := rss.config.GetStringSlice(rss.configRoot+".cache", []string{})
|
||||
newCache := []string{}
|
||||
|
||||
if force {
|
||||
cache = []string{}
|
||||
}
|
||||
|
||||
for _, it := range feed.Items {
|
||||
h := hash(it.Title + it.Description)
|
||||
newCache = append(newCache, h)
|
||||
if !in(h, cache) {
|
||||
payload := map[string]string{
|
||||
"title": it.Title,
|
||||
"description": it.Description,
|
||||
"content": it.Content,
|
||||
"published": it.Published,
|
||||
}
|
||||
ev := event.Event{
|
||||
Time: time.Now(),
|
||||
Payload: payload,
|
||||
}
|
||||
rss.broker.Broadcast(ev, rss.name)
|
||||
}
|
||||
}
|
||||
|
||||
rss.config.Set(rss.configRoot+".cache", newCache)
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
package sources
|
||||
|
||||
import pubsub "github.com/alash3al/go-pubsub"
|
||||
|
||||
type Source interface {
|
||||
GetSender() *pubsub.Broker
|
||||
GetTopics() []string
|
||||
}
|
Loading…
Reference in New Issue