this repo has no description
at test-server 3.4 kB view raw
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "log" 9 "log/slog" 10 "net/http" 11 "os" 12 "os/signal" 13 "path" 14 "syscall" 15 16 tangledalertbot "tangled.sh/willdot.net/tangled-alert-bot" 17 18 "github.com/avast/retry-go/v4" 19 "github.com/bugsnag/bugsnag-go" 20 "github.com/joho/godotenv" 21) 22 23const ( 24 defaultJetstreamAddr = "wss://jetstream.atproto.tools/subscribe" 25) 26 27func main() { 28 err := run() 29 if err != nil { 30 log.Fatal(err) 31 } 32} 33 34func run() error { 35 err := godotenv.Load() 36 if err != nil && !os.IsNotExist(err) { 37 return fmt.Errorf("error loading .env file") 38 } 39 40 signals := make(chan os.Signal, 1) 41 signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) 42 43 bugsnag.Configure(bugsnag.Configuration{ 44 APIKey: os.Getenv("BUGSNAG"), 45 }) 46 47 dbPath := os.Getenv("DATABASE_PATH") 48 if dbPath == "" { 49 dbPath = "./" 50 } 51 52 dbFilename := path.Join(dbPath, "database.db") 53 database, err := tangledalertbot.NewDatabase(dbFilename) 54 if err != nil { 55 return fmt.Errorf("create new store: %w", err) 56 } 57 defer database.Close() 58 59 ctx, cancel := context.WithCancel(context.Background()) 60 defer cancel() 61 62 go consumeLoop(ctx, database) 63 64 go startHttpServer(ctx, database) 65 66 <-signals 67 cancel() 68 69 return nil 70} 71 72func consumeLoop(ctx context.Context, database *tangledalertbot.Database) { 73 handler := tangledalertbot.NewFeedHandler(database) 74 75 jsServerAddr := os.Getenv("JS_SERVER_ADDR") 76 if jsServerAddr == "" { 77 jsServerAddr = defaultJetstreamAddr 78 } 79 80 consumer := tangledalertbot.NewJetstreamConsumer(jsServerAddr, slog.Default(), handler) 81 82 _ = retry.Do(func() error { 83 err := consumer.Consume(ctx) 84 if err != nil { 85 // if the context has been cancelled then it's time to exit 86 if errors.Is(err, context.Canceled) { 87 return nil 88 } 89 slog.Error("consume loop", "error", err) 90 bugsnag.Notify(err) 91 return err 92 } 93 return nil 94 }, retry.Attempts(0)) // retry indefinitly until context canceled 95 96 slog.Warn("exiting consume loop") 97} 98 99func startHttpServer(ctx context.Context, db *tangledalertbot.Database) { 100 srv := server{ 101 db: db, 102 } 103 mux := http.NewServeMux() 104 mux.HandleFunc("/issues", srv.handleListIssues) 105 mux.HandleFunc("/comments", srv.handleListComments) 106 107 err := http.ListenAndServe(":3000", mux) 108 if err != nil { 109 slog.Error("http listen and serve", "error", err) 110 } 111} 112 113type server struct { 114 db *tangledalertbot.Database 115} 116 117func (s *server) handleListIssues(w http.ResponseWriter, r *http.Request) { 118 issues, err := s.db.GetIssues() 119 if err != nil { 120 slog.Error("getting issues from DB", "error", err) 121 http.Error(w, "error getting issues from DB", http.StatusInternalServerError) 122 return 123 } 124 125 b, err := json.Marshal(issues) 126 if err != nil { 127 slog.Error("marshalling issues from DB", "error", err) 128 http.Error(w, "marshalling issues from DB", http.StatusInternalServerError) 129 return 130 } 131 132 w.Header().Set("Content-Type", "application/json") 133 w.Write(b) 134} 135 136func (s *server) handleListComments(w http.ResponseWriter, r *http.Request) { 137 comments, err := s.db.GetComments() 138 if err != nil { 139 slog.Error("getting comments from DB", "error", err) 140 http.Error(w, "error getting comments from DB", http.StatusInternalServerError) 141 return 142 } 143 144 b, err := json.Marshal(comments) 145 if err != nil { 146 slog.Error("marshalling comments from DB", "error", err) 147 http.Error(w, "marshalling comments from DB", http.StatusInternalServerError) 148 return 149 } 150 151 w.Header().Set("Content-Type", "application/json") 152 w.Write(b) 153}