this repo has no description
1package main
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "log/slog"
8 "net/http"
9 "os"
10 "os/signal"
11 "time"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/fxamacker/cbor/v2"
15 "github.com/gorilla/websocket"
16 "github.com/redis/go-redis/v9"
17 "github.com/urfave/cli/v2"
18)
19
20const (
21 BskyModDid = `did:plc:ar7c4by46qjdydhdevvrndac` // @moderation.bsky.app
22 BskyModLabelEndpoint = `wss://mod.bsky.app/xrpc/com.atproto.label.subscribeLabels`
23
24 ActionsConfig = `bskymodactions:config`
25 LabelsAdded = `bskymodactions:added`
26 LabelsRemoved = `bskymodactions:removed`
27)
28
29func main() {
30 app := cli.App{
31 Name: "bsky-modactions",
32 }
33
34 app.Action = func(cctx *cli.Context) error {
35 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
36 defer stop()
37
38 rdb := redis.NewClient(&redis.Options{
39 Addr: "localhost:6379",
40 Password: "",
41 DB: 0,
42 })
43
44 url := BskyModLabelEndpoint
45 seq, err := rdb.HGet(ctx, ActionsConfig, "seq").Result()
46 if err == nil && seq != "" { // err check reversed
47 url += fmt.Sprintf("?cursor=%s", seq)
48 }
49 slog.Info("connecting to websocket", "url", url)
50
51 wsconn, _, err := websocket.DefaultDialer.DialContext(ctx, url, http.Header{
52 "User-Agent": []string{"bsky-modactions/0.1 (@bskycharts.edavis.dev)"},
53 })
54 if err != nil {
55 return err
56 }
57
58 go func() {
59 for {
60 select {
61 case <-ctx.Done():
62 return
63 default:
64 }
65
66 _, p, err := wsconn.ReadMessage()
67 if err != nil {
68 slog.Error("error reading message from websocket", "err", err)
69 continue
70 }
71
72 var info comatproto.LabelSubscribeLabels_Info
73 rest, err := cbor.UnmarshalFirst(p, &info)
74 if err != nil {
75 slog.Error("error unmarshalling info", "err", err)
76 continue
77 }
78
79 var labels comatproto.LabelSubscribeLabels_Labels
80 err = labels.UnmarshalCBOR(bytes.NewReader(rest))
81 if err != nil {
82 slog.Error("error unmarshalling label", "err", err)
83 continue
84 }
85 for _, label := range labels.Labels {
86 if label.Src != BskyModDid {
87 continue
88 }
89
90 key := LabelsAdded
91 if label.Neg != nil && *label.Neg {
92 key = LabelsRemoved
93 }
94
95 if err := rdb.ZIncrBy(ctx, key, 1, label.Val).Err(); err != nil {
96 slog.Error("error incrementing key", "key", key)
97 }
98 }
99
100 if err := rdb.HSet(ctx, ActionsConfig, "seq", labels.Seq).Err(); err != nil {
101 slog.Error("error updating seq", "err", err)
102 }
103 }
104 }()
105
106 mux := http.NewServeMux()
107 mux.HandleFunc("/config", configHandler)
108 mux.HandleFunc("/", valueHandler)
109
110 srv := &http.Server{
111 Addr: "127.0.0.1:4456",
112 Handler: mux,
113 }
114
115 go func() {
116 if err := srv.ListenAndServe(); err != nil {
117 slog.Error("error starting HTTP server", "err", err)
118 return
119 }
120 }()
121
122 <-ctx.Done()
123 stop()
124 slog.Info("shutting down")
125
126 endctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
127 defer cancel()
128
129 if err := srv.Shutdown(endctx); err != nil {
130 slog.Error("error shutting down server", "err", err)
131 }
132
133 return nil
134 }
135
136 if err := app.Run(os.Args); err != nil {
137 slog.Error("error running app", "err", err)
138 }
139}