this repo has no description
1package main
2
3import (
4 "context"
5 "database/sql"
6 _ "embed"
7 "log"
8 "os/signal"
9 "sync"
10 "syscall"
11 "time"
12
13 jetstream "github.com/bluesky-social/jetstream/pkg/models"
14 "github.com/gorilla/websocket"
15 _ "github.com/mattn/go-sqlite3"
16)
17
18type CheckpointResults struct {
19 Blocked int
20 Pages int
21 Transferred int
22}
23
24type Queue struct {
25 lk sync.Mutex
26 events []jetstream.Event
27}
28
29func NewQueue(capacity int) *Queue {
30 return &Queue{
31 events: make([]jetstream.Event, 0, capacity),
32 }
33}
34
35func (q *Queue) Enqueue(event jetstream.Event) {
36 q.lk.Lock()
37 defer q.lk.Unlock()
38
39 q.events = append(q.events, event)
40}
41
42func (q *Queue) Dequeue() (jetstream.Event, bool) {
43 q.lk.Lock()
44 defer q.lk.Unlock()
45
46 if len(q.events) == 0 {
47 var e jetstream.Event
48 return e, false
49 }
50
51 event := q.events[0]
52 q.events = q.events[1:]
53 return event, true
54}
55
56func (q *Queue) Size() int {
57 q.lk.Lock()
58 defer q.lk.Unlock()
59
60 return len(q.events)
61}
62
63var AppBskyAllowlist = map[string]bool{
64 "app.bsky.actor.profile": true,
65 "app.bsky.feed.generator": true,
66 "app.bsky.feed.like": true,
67 "app.bsky.feed.post": true,
68 "app.bsky.feed.postgate": true,
69 "app.bsky.feed.repost": true,
70 "app.bsky.feed.threadgate": true,
71 "app.bsky.graph.block": true,
72 "app.bsky.graph.follow": true,
73 "app.bsky.graph.list": true,
74 "app.bsky.graph.listblock": true,
75 "app.bsky.graph.listitem": true,
76 "app.bsky.graph.starterpack": true,
77 "app.bsky.labeler.service": true,
78 "chat.bsky.actor.declaration": true,
79}
80
81const JetstreamUrl = `wss://jetstream1.us-west.bsky.network/subscribe` // TODO(ejd): attach a reconnect cursor
82
83const userTimestampUpdate = `insert into users (did, ts) values (?, ?) on conflict (did) do update set ts = ?`
84
85//go:embed schema.sql
86var ddl string
87
88func handler(ctx context.Context, queue *Queue, dbCnx *sql.DB) {
89 if _, err := dbCnx.ExecContext(ctx, ddl); err != nil {
90 log.Printf("could not create tables: %v\n", err)
91 }
92 if _, err := dbCnx.ExecContext(ctx, "PRAGMA wal_autocheckpoint = 0"); err != nil {
93 log.Printf("could not set PRAGMA wal_autocheckpoint: %v\n", err)
94 }
95
96 var (
97 dbTx *sql.Tx
98 err error
99 eventCount int
100 )
101
102queueLoop:
103 for {
104 select {
105 case <-ctx.Done():
106 break queueLoop
107 default:
108 }
109
110 event, ok := queue.Dequeue()
111 if !ok {
112 time.Sleep(100 * time.Millisecond)
113 continue
114 }
115
116 if dbTx == nil {
117 dbTx, err = dbCnx.BeginTx(ctx, nil)
118 if err != nil {
119 log.Printf("failed to begin transaction: %v\n", err)
120 }
121 }
122
123 if event.Kind != jetstream.EventKindCommit {
124 continue
125 }
126 if event.Commit.Operation != jetstream.CommitOperationCreate {
127 // we're missing deletes and updates but this matches how bsky-activity
128 // does it so we stay consistent
129 continue
130 }
131
132 did := event.Did
133 commit := *event.Commit
134 ts := time.Now().UTC().Unix()
135
136 if _, ok := AppBskyAllowlist[commit.Collection]; !ok {
137 continue
138 }
139
140 dbTx.ExecContext(ctx, userTimestampUpdate, did, ts, ts)
141
142 eventCount += 1
143 if eventCount%100_000 == 0 {
144 if err = dbTx.Commit(); err != nil {
145 log.Printf("commit failed: %v\n", err)
146 } else {
147 log.Printf("commit successful\n")
148 }
149
150 var results CheckpointResults
151 err = dbCnx.QueryRowContext(ctx, "PRAGMA wal_checkpoint(RESTART)").Scan(&results.Blocked, &results.Pages, &results.Transferred)
152 switch {
153 case err != nil:
154 log.Printf("failed checkpoint: %v\n", err)
155 case results.Blocked == 1:
156 log.Printf("checkpoint: blocked\n")
157 case results.Pages == results.Transferred:
158 log.Printf("checkpoint: %d pages transferred\n", results.Transferred)
159 case results.Pages != results.Transferred:
160 log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred)
161 }
162
163 dbTx, err = dbCnx.BeginTx(ctx, nil)
164 if err != nil {
165 log.Printf("failed to begin transaction: %v\n", err)
166 }
167 }
168
169 if eventCount%2500 == 0 {
170 log.Printf("queue size: %d\n", queue.Size())
171 }
172 }
173}
174
175func main() {
176 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
177 defer stop()
178
179 conn, _, err := websocket.DefaultDialer.DialContext(ctx, JetstreamUrl, nil)
180 if err != nil {
181 log.Fatalf("failed to open websocket: %v\n", err)
182 }
183 defer func() {
184 if err := conn.Close(); err != nil {
185 log.Printf("failed to close websocket: %v\n", err)
186 }
187 log.Printf("websocket closed\n")
188 }()
189
190 // TODO(ejd): use more readable URL params for this
191 dbCnx, err := sql.Open("sqlite3", "data/bsky-users.db?_journal=WAL&_fk=on&_timeout=5000&_sync=1&_txlock=immediate")
192 if err != nil {
193 log.Fatalf("failed to open database: %v\n", err)
194 }
195 defer func() {
196 if _, err := dbCnx.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil {
197 log.Printf("error doing final WAL checkpoint: %v\n", err)
198 }
199 if err := dbCnx.Close(); err != nil {
200 log.Printf("failed to close db: %v\n", err)
201 }
202 log.Printf("db closed\n")
203 }()
204
205 queue := NewQueue(100_000)
206 go handler(ctx, queue, dbCnx)
207
208 log.Printf("starting up\n")
209 go func() {
210 for {
211 var event jetstream.Event
212 err := conn.ReadJSON(&event)
213 if err != nil {
214 log.Printf("ReadJSON error: %v\n", err)
215 stop()
216 break
217 }
218 queue.Enqueue(event)
219 }
220 }()
221
222 <-ctx.Done()
223 log.Printf("shutting down\n")
224}