this repo has no description
at send-alerts 4.1 kB view raw
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "log" 9 "log/slog" 10 "net/http" 11 "os" 12 "os/signal" 13 "path" 14 "syscall" 15 "time" 16 17 tangledalertbot "tangled.sh/willdot.net/tangled-alert-bot" 18 19 "github.com/avast/retry-go/v4" 20 "github.com/bugsnag/bugsnag-go" 21 "github.com/joho/godotenv" 22) 23 24const ( 25 defaultJetstreamAddr = "wss://jetstream.atproto.tools/subscribe" 26) 27 28func main() { 29 err := run() 30 if err != nil { 31 log.Fatal(err) 32 } 33} 34 35func run() error { 36 err := godotenv.Load() 37 if err != nil && !os.IsNotExist(err) { 38 return fmt.Errorf("error loading .env file") 39 } 40 41 signals := make(chan os.Signal, 1) 42 signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) 43 44 bugsnag.Configure(bugsnag.Configuration{ 45 APIKey: os.Getenv("BUGSNAG"), 46 }) 47 48 dbPath := os.Getenv("DATABASE_PATH") 49 if dbPath == "" { 50 dbPath = "./" 51 } 52 53 dbFilename := path.Join(dbPath, "database.db") 54 database, err := tangledalertbot.NewDatabase(dbFilename) 55 if err != nil { 56 return fmt.Errorf("create new store: %w", err) 57 } 58 defer database.Close() 59 60 dmService, err := tangledalertbot.NewDmService(database, time.Second*30) 61 if err != nil { 62 return fmt.Errorf("create dm service: %w", err) 63 } 64 65 ctx, cancel := context.WithCancel(context.Background()) 66 defer cancel() 67 68 go consumeLoop(ctx, database) 69 70 go startHttpServer(ctx, database) 71 72 go dmService.Start(ctx) 73 74 <-signals 75 cancel() 76 77 return nil 78} 79 80func consumeLoop(ctx context.Context, database *tangledalertbot.Database) { 81 handler := tangledalertbot.NewFeedHandler(database) 82 83 jsServerAddr := os.Getenv("JS_SERVER_ADDR") 84 if jsServerAddr == "" { 85 jsServerAddr = defaultJetstreamAddr 86 } 87 88 consumer := tangledalertbot.NewJetstreamConsumer(jsServerAddr, slog.Default(), handler) 89 90 _ = retry.Do(func() error { 91 err := consumer.Consume(ctx) 92 if err != nil { 93 // if the context has been cancelled then it's time to exit 94 if errors.Is(err, context.Canceled) { 95 return nil 96 } 97 return err 98 } 99 return nil 100 }, retry.Attempts(0)) // retry indefinitly until context canceled 101 102 slog.Warn("exiting consume loop") 103} 104 105func startHttpServer(ctx context.Context, db *tangledalertbot.Database) { 106 srv := server{ 107 db: db, 108 } 109 mux := http.NewServeMux() 110 mux.HandleFunc("/issues", srv.handleListIssues) 111 mux.HandleFunc("/comments", srv.handleListComments) 112 mux.HandleFunc("/users", srv.handleListUsers) 113 114 err := http.ListenAndServe(":3000", mux) 115 if err != nil { 116 slog.Error("http listen and serve", "error", err) 117 } 118} 119 120type server struct { 121 db *tangledalertbot.Database 122} 123 124func (s *server) handleListIssues(w http.ResponseWriter, r *http.Request) { 125 issues, err := s.db.GetIssues() 126 if err != nil { 127 slog.Error("getting issues from DB", "error", err) 128 http.Error(w, "error getting issues from DB", http.StatusInternalServerError) 129 return 130 } 131 132 b, err := json.Marshal(issues) 133 if err != nil { 134 slog.Error("marshalling issues from DB", "error", err) 135 http.Error(w, "marshalling issues from DB", http.StatusInternalServerError) 136 return 137 } 138 139 w.Header().Set("Content-Type", "application/json") 140 w.Write(b) 141} 142 143func (s *server) handleListComments(w http.ResponseWriter, r *http.Request) { 144 comments, err := s.db.GetComments() 145 if err != nil { 146 slog.Error("getting comments from DB", "error", err) 147 http.Error(w, "error getting comments from DB", http.StatusInternalServerError) 148 return 149 } 150 151 b, err := json.Marshal(comments) 152 if err != nil { 153 slog.Error("marshalling comments from DB", "error", err) 154 http.Error(w, "marshalling comments from DB", http.StatusInternalServerError) 155 return 156 } 157 158 w.Header().Set("Content-Type", "application/json") 159 w.Write(b) 160} 161 162func (s *server) handleListUsers(w http.ResponseWriter, r *http.Request) { 163 users, err := s.db.GetUsers() 164 if err != nil { 165 slog.Error("getting users from DB", "error", err) 166 http.Error(w, "error getting users from DB", http.StatusInternalServerError) 167 return 168 } 169 170 b, err := json.Marshal(users) 171 if err != nil { 172 slog.Error("marshalling users from DB", "error", err) 173 http.Error(w, "marshalling users from DB", http.StatusInternalServerError) 174 return 175 } 176 177 w.Header().Set("Content-Type", "application/json") 178 w.Write(b) 179}