this repo has no description
1package main 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "log/slog" 8 "net/http" 9 "os" 10 "os/signal" 11 "time" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/fxamacker/cbor/v2" 15 "github.com/gorilla/websocket" 16 "github.com/redis/go-redis/v9" 17 "github.com/urfave/cli/v2" 18) 19 20const ( 21 BskyModDid = `did:plc:ar7c4by46qjdydhdevvrndac` // @moderation.bsky.app 22 BskyModLabelEndpoint = `wss://mod.bsky.app/xrpc/com.atproto.label.subscribeLabels` 23 24 ActionsConfig = `bskymodactions:config` 25 LabelsAdded = `bskymodactions:added` 26 LabelsRemoved = `bskymodactions:removed` 27) 28 29func main() { 30 app := cli.App{ 31 Name: "bsky-modactions", 32 } 33 34 app.Action = func(cctx *cli.Context) error { 35 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) 36 defer stop() 37 38 rdb := redis.NewClient(&redis.Options{ 39 Addr: "localhost:6379", 40 Password: "", 41 DB: 0, 42 }) 43 44 url := BskyModLabelEndpoint 45 seq, err := rdb.HGet(ctx, ActionsConfig, "seq").Result() 46 if err == nil && seq != "" { // err check reversed 47 url += fmt.Sprintf("?cursor=%s", seq) 48 } 49 slog.Info("connecting to websocket", "url", url) 50 51 wsconn, _, err := websocket.DefaultDialer.DialContext(ctx, url, http.Header{ 52 "User-Agent": []string{"bsky-modactions/0.1 (@bskycharts.edavis.dev)"}, 53 }) 54 if err != nil { 55 return err 56 } 57 58 go func() { 59 for { 60 select { 61 case <-ctx.Done(): 62 return 63 default: 64 } 65 66 _, p, err := wsconn.ReadMessage() 67 if err != nil { 68 slog.Error("error reading message from websocket", "err", err) 69 continue 70 } 71 72 var info comatproto.LabelSubscribeLabels_Info 73 rest, err := cbor.UnmarshalFirst(p, &info) 74 if err != nil { 75 slog.Error("error unmarshalling info", "err", err) 76 continue 77 } 78 79 var labels comatproto.LabelSubscribeLabels_Labels 80 err = labels.UnmarshalCBOR(bytes.NewReader(rest)) 81 if err != nil { 82 slog.Error("error unmarshalling label", "err", err) 83 continue 84 } 85 for _, label := range labels.Labels { 86 if label.Src != BskyModDid { 87 continue 88 } 89 90 key := LabelsAdded 91 if label.Neg != nil && *label.Neg { 92 key = LabelsRemoved 93 } 94 95 if err := rdb.ZIncrBy(ctx, key, 1, label.Val).Err(); err != nil { 96 slog.Error("error incrementing key", "key", key) 97 } 98 } 99 100 if err := rdb.HSet(ctx, ActionsConfig, "seq", labels.Seq).Err(); err != nil { 101 slog.Error("error updating seq", "err", err) 102 } 103 } 104 }() 105 106 mux := http.NewServeMux() 107 mux.HandleFunc("/config", configHandler) 108 mux.HandleFunc("/", valueHandler) 109 110 srv := &http.Server{ 111 Addr: "127.0.0.1:4456", 112 Handler: mux, 113 } 114 115 go func() { 116 if err := srv.ListenAndServe(); err != nil { 117 slog.Error("error starting HTTP server", "err", err) 118 return 119 } 120 }() 121 122 <-ctx.Done() 123 stop() 124 slog.Info("shutting down") 125 126 endctx, cancel := context.WithTimeout(context.TODO(), time.Minute) 127 defer cancel() 128 129 if err := srv.Shutdown(endctx); err != nil { 130 slog.Error("error shutting down server", "err", err) 131 } 132 133 return nil 134 } 135 136 if err := app.Run(os.Args); err != nil { 137 slog.Error("error running app", "err", err) 138 } 139}