this repo has no description
1package main
2
3import (
4 "context"
5 "database/sql"
6 _ "embed"
7 "encoding/json"
8 "log"
9 "os/signal"
10 "syscall"
11 "time"
12
13 jetstream "github.com/bluesky-social/jetstream/pkg/models"
14 "github.com/gorilla/websocket"
15 _ "github.com/mattn/go-sqlite3"
16)
17
18type CheckpointResults struct {
19 Blocked int
20 Pages int
21 Transferred int
22}
23
24var AppBskyAllowlist = map[string]bool{
25 "app.bsky.actor.profile": true,
26 "app.bsky.feed.generator": true,
27 "app.bsky.feed.like": true,
28 "app.bsky.feed.post": true,
29 "app.bsky.feed.postgate": true,
30 "app.bsky.feed.repost": true,
31 "app.bsky.feed.threadgate": true,
32 "app.bsky.graph.block": true,
33 "app.bsky.graph.follow": true,
34 "app.bsky.graph.list": true,
35 "app.bsky.graph.listblock": true,
36 "app.bsky.graph.listitem": true,
37 "app.bsky.graph.starterpack": true,
38 "app.bsky.labeler.service": true,
39 "chat.bsky.actor.declaration": true,
40}
41
42// const JetstreamUrl = `wss://jetstream1.us-west.bsky.network/subscribe`
43
44const JetstreamUrl = `ws://localhost:6008/subscribe` // TODO(ejd): attach a reconnect cursor
45
46const userTimestampUpdate = `insert into users (did, ts) values (?, ?) on conflict (did) do update set ts = ?`
47
48//go:embed schema.sql
49var ddl string
50
51func handler(ctx context.Context, events <-chan []byte, dbCnx *sql.DB) {
52 if _, err := dbCnx.ExecContext(ctx, ddl); err != nil {
53 log.Printf("could not create tables: %v\n", err)
54 }
55 if _, err := dbCnx.ExecContext(ctx, "PRAGMA wal_autocheckpoint = 0"); err != nil {
56 log.Printf("could not set PRAGMA wal_autocheckpoint: %v\n", err)
57 }
58
59 var (
60 dbTx *sql.Tx
61 err error
62 eventCount int
63 )
64
65 for evt := range events {
66 if dbTx == nil {
67 dbTx, err = dbCnx.BeginTx(ctx, nil)
68 if err != nil {
69 log.Printf("failed to begin transaction: %v\n", err)
70 }
71 }
72
73 var event jetstream.Event
74 if err := json.Unmarshal(evt, &event); err != nil {
75 continue
76 }
77
78 if event.Kind != jetstream.EventKindCommit {
79 continue
80 }
81 if event.Commit.Operation != jetstream.CommitOperationCreate {
82 // we're missing deletes and updates but this matches how bsky-activity
83 // does it so we stay consistent
84 continue
85 }
86
87 did := event.Did
88 commit := *event.Commit
89 ts := time.Now().UTC().Unix()
90
91 if _, ok := AppBskyAllowlist[commit.Collection]; !ok {
92 continue
93 }
94
95 dbTx.ExecContext(ctx, userTimestampUpdate, did, ts, ts)
96
97 eventCount += 1
98 if eventCount%1000 == 0 {
99 if err := dbTx.Commit(); err != nil {
100 log.Printf("commit failed: %v\n")
101 }
102
103 var results CheckpointResults
104 err := dbCnx.QueryRowContext(ctx, "PRAGMA wal_checkpoint(RESTART)").Scan(&results.Blocked, &results.Pages, &results.Transferred)
105 switch {
106 case err != nil:
107 log.Printf("failed checkpoint: %v\n", err)
108 case results.Blocked == 1:
109 log.Printf("checkpoint: blocked\n")
110 case results.Pages == results.Transferred:
111 log.Printf("checkpoint: %d pages transferred\n", results.Transferred)
112 case results.Pages != results.Transferred:
113 log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred)
114 }
115
116 dbTx, err = dbCnx.BeginTx(ctx, nil)
117 if err != nil {
118 log.Printf("failed to begin transaction: %v\n", err)
119 }
120 }
121 }
122}
123
124func main() {
125 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
126 defer stop()
127
128 conn, _, err := websocket.DefaultDialer.Dial(JetstreamUrl, nil)
129 if err != nil {
130 log.Fatalf("failed to open websocket: %v\n", err)
131 }
132 defer func() {
133 if err := conn.Close(); err != nil {
134 log.Printf("failed to close websocket: %v\n", err)
135 }
136 log.Printf("websocket closed\n")
137 }()
138
139 // TODO(ejd): use more readable URL params for this
140 dbCnx, err := sql.Open("sqlite3", "data/bsky-users.db?_journal=WAL&_fk=on&_timeout=5000&_sync=1&_txlock=immediate")
141 if err != nil {
142 log.Fatalf("failed to open database: %v\n", err)
143 }
144 defer func() {
145 if _, err := dbCnx.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil {
146 log.Printf("error doing final WAL checkpoint: %v\n", err)
147 }
148 if err := dbCnx.Close(); err != nil {
149 log.Printf("failed to close db: %v\n", err)
150 }
151 log.Printf("db closed\n")
152 }()
153
154 jetstreamEvents := make(chan []byte)
155 go handler(ctx, jetstreamEvents, dbCnx)
156
157 log.Printf("starting up\n")
158 go func() {
159 for {
160 _, message, err := conn.ReadMessage()
161 if err != nil {
162 stop()
163 }
164 jetstreamEvents <- message
165 }
166 }()
167
168 <-ctx.Done()
169 log.Printf("shutting down\n")
170}