this repo has no description
1package main 2 3import ( 4 "context" 5 "database/sql" 6 _ "embed" 7 "encoding/json" 8 "log" 9 "os/signal" 10 "syscall" 11 "time" 12 13 jetstream "github.com/bluesky-social/jetstream/pkg/models" 14 "github.com/gorilla/websocket" 15 _ "github.com/mattn/go-sqlite3" 16) 17 18type CheckpointResults struct { 19 Blocked int 20 Pages int 21 Transferred int 22} 23 24var AppBskyAllowlist = map[string]bool{ 25 "app.bsky.actor.profile": true, 26 "app.bsky.feed.generator": true, 27 "app.bsky.feed.like": true, 28 "app.bsky.feed.post": true, 29 "app.bsky.feed.postgate": true, 30 "app.bsky.feed.repost": true, 31 "app.bsky.feed.threadgate": true, 32 "app.bsky.graph.block": true, 33 "app.bsky.graph.follow": true, 34 "app.bsky.graph.list": true, 35 "app.bsky.graph.listblock": true, 36 "app.bsky.graph.listitem": true, 37 "app.bsky.graph.starterpack": true, 38 "app.bsky.labeler.service": true, 39 "chat.bsky.actor.declaration": true, 40} 41 42const JetstreamUrl = `wss://jetstream1.us-west.bsky.network/subscribe` // TODO(ejd): attach a reconnect cursor 43 44const userTimestampUpdate = `insert into users (did, ts) values (?, ?) on conflict (did) do update set ts = ?` 45 46//go:embed schema.sql 47var ddl string 48 49func handler(ctx context.Context, events <-chan []byte, dbCnx *sql.DB) { 50 if _, err := dbCnx.ExecContext(ctx, ddl); err != nil { 51 log.Printf("could not create tables: %v\n", err) 52 } 53 if _, err := dbCnx.ExecContext(ctx, "PRAGMA wal_autocheckpoint = 0"); err != nil { 54 log.Printf("could not set PRAGMA wal_autocheckpoint: %v\n", err) 55 } 56 57 var ( 58 dbTx *sql.Tx 59 err error 60 eventCount int 61 ) 62 63 for evt := range events { 64 if dbTx == nil { 65 dbTx, err = dbCnx.BeginTx(ctx, nil) 66 if err != nil { 67 log.Printf("failed to begin transaction: %v\n", err) 68 } 69 } 70 71 var event jetstream.Event 72 if err := json.Unmarshal(evt, &event); err != nil { 73 continue 74 } 75 76 if event.Kind != jetstream.EventKindCommit { 77 continue 78 } 79 if event.Commit.Operation != jetstream.CommitOperationCreate { 80 // we're missing deletes and updates but this matches how bsky-activity 81 // does it so we stay consistent 82 continue 83 } 84 85 did := event.Did 86 commit := *event.Commit 87 ts := time.Now().UTC().Unix() 88 89 if _, ok := AppBskyAllowlist[commit.Collection]; !ok { 90 continue 91 } 92 93 dbTx.ExecContext(ctx, userTimestampUpdate, did, ts, ts) 94 95 eventCount += 1 96 if eventCount%1000 == 0 { 97 if err := dbTx.Commit(); err != nil { 98 log.Printf("commit failed: %v\n") 99 } 100 101 var results CheckpointResults 102 err := dbCnx.QueryRowContext(ctx, "PRAGMA wal_checkpoint(RESTART)").Scan(&results.Blocked, &results.Pages, &results.Transferred) 103 switch { 104 case err != nil: 105 log.Printf("failed checkpoint: %v\n", err) 106 case results.Blocked == 1: 107 log.Printf("checkpoint: blocked\n") 108 case results.Pages == results.Transferred: 109 log.Printf("checkpoint: %d pages transferred\n", results.Transferred) 110 case results.Pages != results.Transferred: 111 log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred) 112 } 113 114 dbTx, err = dbCnx.BeginTx(ctx, nil) 115 if err != nil { 116 log.Printf("failed to begin transaction: %v\n", err) 117 } 118 } 119 } 120} 121 122func main() { 123 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) 124 defer stop() 125 126 conn, _, err := websocket.DefaultDialer.Dial(JetstreamUrl, nil) 127 if err != nil { 128 log.Fatalf("failed to open websocket: %v\n", err) 129 } 130 defer func() { 131 if err := conn.Close(); err != nil { 132 log.Printf("failed to close websocket: %v\n", err) 133 } 134 log.Printf("websocket closed\n") 135 }() 136 137 // TODO(ejd): use more readable URL params for this 138 dbCnx, err := sql.Open("sqlite3", "data/bsky-users.db?_journal=WAL&_fk=on&_timeout=5000&_sync=1&_txlock=immediate") 139 if err != nil { 140 log.Fatalf("failed to open database: %v\n", err) 141 } 142 defer func() { 143 if _, err := dbCnx.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil { 144 log.Printf("error doing final WAL checkpoint: %v\n", err) 145 } 146 if err := dbCnx.Close(); err != nil { 147 log.Printf("failed to close db: %v\n", err) 148 } 149 log.Printf("db closed\n") 150 }() 151 152 jetstreamEvents := make(chan []byte) 153 go handler(ctx, jetstreamEvents, dbCnx) 154 155 log.Printf("starting up\n") 156 go func() { 157 for { 158 _, message, err := conn.ReadMessage() 159 if err != nil { 160 stop() 161 } 162 jetstreamEvents <- message 163 } 164 }() 165 166 <-ctx.Done() 167 log.Printf("shutting down\n") 168}