this repo has no description
1package main 2 3import ( 4 "context" 5 "database/sql" 6 _ "embed" 7 "log" 8 "os/signal" 9 "sync" 10 "syscall" 11 "time" 12 13 jetstream "github.com/bluesky-social/jetstream/pkg/models" 14 "github.com/gorilla/websocket" 15 _ "github.com/mattn/go-sqlite3" 16) 17 18type CheckpointResults struct { 19 Blocked int 20 Pages int 21 Transferred int 22} 23 24type Queue struct { 25 lk sync.Mutex 26 events []jetstream.Event 27} 28 29func NewQueue(capacity int) *Queue { 30 return &Queue{ 31 events: make([]jetstream.Event, 0, capacity), 32 } 33} 34 35func (q *Queue) Enqueue(event jetstream.Event) { 36 q.lk.Lock() 37 defer q.lk.Unlock() 38 39 q.events = append(q.events, event) 40} 41 42func (q *Queue) Dequeue() (jetstream.Event, bool) { 43 q.lk.Lock() 44 defer q.lk.Unlock() 45 46 if len(q.events) == 0 { 47 var e jetstream.Event 48 return e, false 49 } 50 51 event := q.events[0] 52 q.events = q.events[1:] 53 return event, true 54} 55 56func (q *Queue) Size() int { 57 q.lk.Lock() 58 defer q.lk.Unlock() 59 60 return len(q.events) 61} 62 63var AppBskyAllowlist = map[string]bool{ 64 "app.bsky.actor.profile": true, 65 "app.bsky.feed.generator": true, 66 "app.bsky.feed.like": true, 67 "app.bsky.feed.post": true, 68 "app.bsky.feed.postgate": true, 69 "app.bsky.feed.repost": true, 70 "app.bsky.feed.threadgate": true, 71 "app.bsky.graph.block": true, 72 "app.bsky.graph.follow": true, 73 "app.bsky.graph.list": true, 74 "app.bsky.graph.listblock": true, 75 "app.bsky.graph.listitem": true, 76 "app.bsky.graph.starterpack": true, 77 "app.bsky.labeler.service": true, 78 "chat.bsky.actor.declaration": true, 79} 80 81const JetstreamUrl = `wss://jetstream1.us-west.bsky.network/subscribe` // TODO(ejd): attach a reconnect cursor 82 83const userTimestampUpdate = `insert into users (did, ts) values (?, ?) on conflict (did) do update set ts = ?` 84 85//go:embed schema.sql 86var ddl string 87 88func handler(ctx context.Context, queue *Queue, dbCnx *sql.DB) { 89 if _, err := dbCnx.ExecContext(ctx, ddl); err != nil { 90 log.Printf("could not create tables: %v\n", err) 91 } 92 if _, err := dbCnx.ExecContext(ctx, "PRAGMA wal_autocheckpoint = 0"); err != nil { 93 log.Printf("could not set PRAGMA wal_autocheckpoint: %v\n", err) 94 } 95 96 var ( 97 dbTx *sql.Tx 98 err error 99 eventCount int 100 ) 101 102queueLoop: 103 for { 104 select { 105 case <-ctx.Done(): 106 break queueLoop 107 default: 108 } 109 110 event, ok := queue.Dequeue() 111 if !ok { 112 time.Sleep(100 * time.Millisecond) 113 continue 114 } 115 116 if dbTx == nil { 117 dbTx, err = dbCnx.BeginTx(ctx, nil) 118 if err != nil { 119 log.Printf("failed to begin transaction: %v\n", err) 120 } 121 } 122 123 if event.Kind != jetstream.EventKindCommit { 124 continue 125 } 126 if event.Commit.Operation != jetstream.CommitOperationCreate { 127 // we're missing deletes and updates but this matches how bsky-activity 128 // does it so we stay consistent 129 continue 130 } 131 132 did := event.Did 133 commit := *event.Commit 134 ts := time.Now().UTC().Unix() 135 136 if _, ok := AppBskyAllowlist[commit.Collection]; !ok { 137 continue 138 } 139 140 dbTx.ExecContext(ctx, userTimestampUpdate, did, ts, ts) 141 142 eventCount += 1 143 if eventCount%100_000 == 0 { 144 if err = dbTx.Commit(); err != nil { 145 log.Printf("commit failed: %v\n", err) 146 } else { 147 log.Printf("commit successful\n") 148 } 149 150 var results CheckpointResults 151 err = dbCnx.QueryRowContext(ctx, "PRAGMA wal_checkpoint(RESTART)").Scan(&results.Blocked, &results.Pages, &results.Transferred) 152 switch { 153 case err != nil: 154 log.Printf("failed checkpoint: %v\n", err) 155 case results.Blocked == 1: 156 log.Printf("checkpoint: blocked\n") 157 case results.Pages == results.Transferred: 158 log.Printf("checkpoint: %d pages transferred\n", results.Transferred) 159 case results.Pages != results.Transferred: 160 log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred) 161 } 162 163 dbTx, err = dbCnx.BeginTx(ctx, nil) 164 if err != nil { 165 log.Printf("failed to begin transaction: %v\n", err) 166 } 167 } 168 169 if eventCount%2500 == 0 { 170 log.Printf("queue size: %d\n", queue.Size()) 171 } 172 } 173} 174 175func main() { 176 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) 177 defer stop() 178 179 conn, _, err := websocket.DefaultDialer.DialContext(ctx, JetstreamUrl, nil) 180 if err != nil { 181 log.Fatalf("failed to open websocket: %v\n", err) 182 } 183 defer func() { 184 if err := conn.Close(); err != nil { 185 log.Printf("failed to close websocket: %v\n", err) 186 } 187 log.Printf("websocket closed\n") 188 }() 189 190 // TODO(ejd): use more readable URL params for this 191 dbCnx, err := sql.Open("sqlite3", "data/bsky-users.db?_journal=WAL&_fk=on&_timeout=5000&_sync=1&_txlock=immediate") 192 if err != nil { 193 log.Fatalf("failed to open database: %v\n", err) 194 } 195 defer func() { 196 if _, err := dbCnx.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil { 197 log.Printf("error doing final WAL checkpoint: %v\n", err) 198 } 199 if err := dbCnx.Close(); err != nil { 200 log.Printf("failed to close db: %v\n", err) 201 } 202 log.Printf("db closed\n") 203 }() 204 205 queue := NewQueue(100_000) 206 go handler(ctx, queue, dbCnx) 207 208 log.Printf("starting up\n") 209 go func() { 210 for { 211 var event jetstream.Event 212 err := conn.ReadJSON(&event) 213 if err != nil { 214 log.Printf("ReadJSON error: %v\n", err) 215 stop() 216 break 217 } 218 queue.Enqueue(event) 219 } 220 }() 221 222 <-ctx.Done() 223 log.Printf("shutting down\n") 224}