package main import ( "bytes" "context" "fmt" "log/slog" "net/http" "os" "os/signal" "time" comatproto "github.com/bluesky-social/indigo/api/atproto" "github.com/fxamacker/cbor/v2" "github.com/gorilla/websocket" "github.com/redis/go-redis/v9" "github.com/urfave/cli/v2" ) const ( BskyModDid = `did:plc:ar7c4by46qjdydhdevvrndac` // @moderation.bsky.app BskyModLabelEndpoint = `wss://mod.bsky.app/xrpc/com.atproto.label.subscribeLabels` ActionsConfig = `bskymodactions:config` LabelsAdded = `bskymodactions:added` LabelsRemoved = `bskymodactions:removed` ) func main() { app := cli.App{ Name: "bsky-modactions", } app.Action = func(cctx *cli.Context) error { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", DB: 0, }) url := BskyModLabelEndpoint seq, err := rdb.HGet(ctx, ActionsConfig, "seq").Result() if err == nil && seq != "" { // err check reversed url += fmt.Sprintf("?cursor=%s", seq) } slog.Info("connecting to websocket", "url", url) wsconn, _, err := websocket.DefaultDialer.DialContext(ctx, url, http.Header{ "User-Agent": []string{"bsky-modactions/0.1 (@bskycharts.edavis.dev)"}, }) if err != nil { return err } go func() { for { select { case <-ctx.Done(): return default: } _, p, err := wsconn.ReadMessage() if err != nil { slog.Error("error reading message from websocket", "err", err) continue } var info comatproto.LabelSubscribeLabels_Info rest, err := cbor.UnmarshalFirst(p, &info) if err != nil { slog.Error("error unmarshalling info", "err", err) continue } var labels comatproto.LabelSubscribeLabels_Labels err = labels.UnmarshalCBOR(bytes.NewReader(rest)) if err != nil { slog.Error("error unmarshalling label", "err", err) continue } for _, label := range labels.Labels { if label.Src != BskyModDid { continue } key := LabelsAdded if label.Neg != nil && *label.Neg { key = LabelsRemoved } if err := rdb.ZIncrBy(ctx, key, 1, label.Val).Err(); err != nil { slog.Error("error incrementing key", "key", key) } } if err := rdb.HSet(ctx, ActionsConfig, "seq", labels.Seq).Err(); err != nil { slog.Error("error updating seq", "err", err) } } }() mux := http.NewServeMux() mux.HandleFunc("/config", configHandler) mux.HandleFunc("/", valueHandler) srv := &http.Server{ Addr: "127.0.0.1:4456", Handler: mux, } go func() { if err := srv.ListenAndServe(); err != nil { slog.Error("error starting HTTP server", "err", err) return } }() <-ctx.Done() stop() slog.Info("shutting down") endctx, cancel := context.WithTimeout(context.TODO(), time.Minute) defer cancel() if err := srv.Shutdown(endctx); err != nil { slog.Error("error shutting down server", "err", err) } return nil } if err := app.Run(os.Args); err != nil { slog.Error("error running app", "err", err) } }