this repo has no description
1package main
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log"
8 "log/slog"
9 "os"
10 "os/signal"
11 "path"
12 "syscall"
13
14 tangledalertbot "tangled.sh/willdot.net/tangled-alert-bot"
15
16 "github.com/avast/retry-go/v4"
17 "github.com/joho/godotenv"
18)
19
20const (
21 defaultJetstreamAddr = "wss://jetstream.atproto.tools/subscribe"
22)
23
24func main() {
25 err := run()
26 if err != nil {
27 log.Fatal(err)
28 }
29}
30
31func run() error {
32 err := godotenv.Load()
33 if err != nil && !os.IsNotExist(err) {
34 return fmt.Errorf("error loading .env file")
35 }
36
37 signals := make(chan os.Signal, 1)
38 signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
39
40 dbPath := os.Getenv("DATABASE_PATH")
41 if dbPath == "" {
42 dbPath = "./"
43 }
44
45 dbFilename := path.Join(dbPath, "database.db")
46 database, err := tangledalertbot.NewDatabase(dbFilename)
47 if err != nil {
48 return fmt.Errorf("create new store: %w", err)
49 }
50 defer database.Close()
51
52 ctx, cancel := context.WithCancel(context.Background())
53 defer cancel()
54
55 go consumeLoop(ctx, database)
56
57 <-signals
58 cancel()
59
60 return nil
61}
62
63func consumeLoop(ctx context.Context, database *tangledalertbot.Database) {
64 handler := tangledalertbot.NewFeedHandler(database)
65
66 jsServerAddr := os.Getenv("JS_SERVER_ADDR")
67 if jsServerAddr == "" {
68 jsServerAddr = defaultJetstreamAddr
69 }
70
71 consumer := tangledalertbot.NewJetstreamConsumer(jsServerAddr, slog.Default(), handler)
72
73 _ = retry.Do(func() error {
74 err := consumer.Consume(ctx)
75 if err != nil {
76 // if the context has been cancelled then it's time to exit
77 if errors.Is(err, context.Canceled) {
78 return nil
79 }
80 slog.Error("consume loop", "error", err)
81 return err
82 }
83 return nil
84 }, retry.Attempts(0)) // retry indefinitly until context canceled
85
86 slog.Warn("exiting consume loop")
87}