this repo has no description
1package main 2 3import ( 4 "context" 5 "database/sql" 6 _ "embed" 7 "encoding/json" 8 "log" 9 "os/signal" 10 "syscall" 11 "time" 12 13 jetstream "github.com/bluesky-social/jetstream/pkg/models" 14 "github.com/gorilla/websocket" 15 _ "github.com/mattn/go-sqlite3" 16) 17 18type CheckpointResults struct { 19 Blocked int 20 Pages int 21 Transferred int 22} 23 24var AppBskyAllowlist = map[string]bool{ 25 "app.bsky.actor.profile": true, 26 "app.bsky.feed.generator": true, 27 "app.bsky.feed.like": true, 28 "app.bsky.feed.post": true, 29 "app.bsky.feed.postgate": true, 30 "app.bsky.feed.repost": true, 31 "app.bsky.feed.threadgate": true, 32 "app.bsky.graph.block": true, 33 "app.bsky.graph.follow": true, 34 "app.bsky.graph.list": true, 35 "app.bsky.graph.listblock": true, 36 "app.bsky.graph.listitem": true, 37 "app.bsky.graph.starterpack": true, 38 "app.bsky.labeler.service": true, 39 "chat.bsky.actor.declaration": true, 40} 41 42// const JetstreamUrl = `wss://jetstream1.us-west.bsky.network/subscribe` 43 44const JetstreamUrl = `ws://localhost:6008/subscribe` // TODO(ejd): attach a reconnect cursor 45 46const userTimestampUpdate = `insert into users (did, ts) values (?, ?) on conflict (did) do update set ts = ?` 47 48//go:embed schema.sql 49var ddl string 50 51func handler(ctx context.Context, events <-chan []byte, dbCnx *sql.DB) { 52 if _, err := dbCnx.ExecContext(ctx, ddl); err != nil { 53 log.Printf("could not create tables: %v\n", err) 54 } 55 if _, err := dbCnx.ExecContext(ctx, "PRAGMA wal_autocheckpoint = 0"); err != nil { 56 log.Printf("could not set PRAGMA wal_autocheckpoint: %v\n", err) 57 } 58 59 var ( 60 dbTx *sql.Tx 61 err error 62 eventCount int 63 ) 64 65 for evt := range events { 66 if dbTx == nil { 67 dbTx, err = dbCnx.BeginTx(ctx, nil) 68 if err != nil { 69 log.Printf("failed to begin transaction: %v\n", err) 70 } 71 } 72 73 var event jetstream.Event 74 if err := json.Unmarshal(evt, &event); err != nil { 75 continue 76 } 77 78 if event.Kind != jetstream.EventKindCommit { 79 continue 80 } 81 if event.Commit.Operation != jetstream.CommitOperationCreate { 82 // we're missing deletes and updates but this matches how bsky-activity 83 // does it so we stay consistent 84 continue 85 } 86 87 did := event.Did 88 commit := *event.Commit 89 ts := time.Now().UTC().Unix() 90 91 if _, ok := AppBskyAllowlist[commit.Collection]; !ok { 92 continue 93 } 94 95 dbTx.ExecContext(ctx, userTimestampUpdate, did, ts, ts) 96 97 eventCount += 1 98 if eventCount%1000 == 0 { 99 if err := dbTx.Commit(); err != nil { 100 log.Printf("commit failed: %v\n") 101 } 102 103 var results CheckpointResults 104 err := dbCnx.QueryRowContext(ctx, "PRAGMA wal_checkpoint(RESTART)").Scan(&results.Blocked, &results.Pages, &results.Transferred) 105 switch { 106 case err != nil: 107 log.Printf("failed checkpoint: %v\n", err) 108 case results.Blocked == 1: 109 log.Printf("checkpoint: blocked\n") 110 case results.Pages == results.Transferred: 111 log.Printf("checkpoint: %d pages transferred\n", results.Transferred) 112 case results.Pages != results.Transferred: 113 log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred) 114 } 115 116 dbTx, err = dbCnx.BeginTx(ctx, nil) 117 if err != nil { 118 log.Printf("failed to begin transaction: %v\n", err) 119 } 120 } 121 } 122} 123 124func main() { 125 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) 126 defer stop() 127 128 conn, _, err := websocket.DefaultDialer.Dial(JetstreamUrl, nil) 129 if err != nil { 130 log.Fatalf("failed to open websocket: %v\n", err) 131 } 132 defer func() { 133 if err := conn.Close(); err != nil { 134 log.Printf("failed to close websocket: %v\n", err) 135 } 136 log.Printf("websocket closed\n") 137 }() 138 139 // TODO(ejd): use more readable URL params for this 140 dbCnx, err := sql.Open("sqlite3", "data/bsky-users.db?_journal=WAL&_fk=on&_timeout=5000&_sync=1&_txlock=immediate") 141 if err != nil { 142 log.Fatalf("failed to open database: %v\n", err) 143 } 144 defer func() { 145 if _, err := dbCnx.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil { 146 log.Printf("error doing final WAL checkpoint: %v\n", err) 147 } 148 if err := dbCnx.Close(); err != nil { 149 log.Printf("failed to close db: %v\n", err) 150 } 151 log.Printf("db closed\n") 152 }() 153 154 jetstreamEvents := make(chan []byte) 155 go handler(ctx, jetstreamEvents, dbCnx) 156 157 log.Printf("starting up\n") 158 go func() { 159 for { 160 _, message, err := conn.ReadMessage() 161 if err != nil { 162 stop() 163 } 164 jetstreamEvents <- message 165 } 166 }() 167 168 <-ctx.Done() 169 log.Printf("shutting down\n") 170}