this repo has no description
at master 2.6 kB view raw
1package videostream 2 3import ( 4 "context" 5 "database/sql" 6 _ "embed" 7 "encoding/json" 8 "fmt" 9 "log" 10 "time" 11 12 appbsky "github.com/bluesky-social/indigo/api/bsky" 13 jetstream "github.com/bluesky-social/jetstream/pkg/models" 14 "github.com/edavis/bsky-feeds/pkg/feeds" 15 _ "github.com/mattn/go-sqlite3" 16) 17 18//go:embed schema.sql 19var ddl string 20 21func Handler(ctx context.Context, events *Queue, dbCnx *sql.DB) { 22 var ( 23 dbTx *sql.Tx 24 err error 25 eventCount int 26 ) 27 28 if _, err = dbCnx.ExecContext(ctx, ddl); err != nil { 29 log.Printf("could not create tables: %v\n", err) 30 } 31 if _, err = dbCnx.ExecContext(ctx, `PRAGMA wal_autocheckpoint = 0`); err != nil { 32 log.Printf("could not set PRAGMA wal_autocheckpoint: %v\n", err) 33 } 34 35 for { 36 select { 37 case <-ctx.Done(): 38 return 39 default: 40 } 41 42 event, ok := events.Dequeue() 43 if !ok { 44 time.Sleep(100 * time.Millisecond) 45 continue 46 } 47 48 if dbTx == nil { 49 dbTx, err = dbCnx.BeginTx(ctx, nil) 50 if err != nil { 51 log.Printf("failed to begin transaction: %v\n", err) 52 } 53 } 54 55 if event.Kind != jetstream.EventKindCommit { 56 continue 57 } 58 59 if event.Commit.Operation != jetstream.CommitOperationCreate { 60 continue 61 } 62 63 commit := *event.Commit 64 var post appbsky.FeedPost 65 if err = json.Unmarshal(commit.Record, &post); err != nil { 66 log.Printf("error parsing commit.Record: %v\n", err) 67 continue 68 } 69 70 if post.Embed != nil && post.Embed.EmbedVideo != nil { 71 uri := fmt.Sprintf("at://%s/%s/%s", event.Did, commit.Collection, commit.RKey) 72 ts := feeds.SafeTimestamp(post.CreatedAt) 73 dbTx.ExecContext(ctx, `insert or ignore into posts (uri, create_ts) values (?, ?)`, uri, ts) 74 } else { 75 continue 76 } 77 78 eventCount += 1 79 if eventCount%25 == 0 { 80 // TODO trim 81 82 if err = dbTx.Commit(); err != nil { 83 log.Printf("commit failed: %v\n", err) 84 } 85 86 var results CheckpointResults 87 err = dbCnx.QueryRowContext(ctx, `PRAGMA wal_checkpoint(RESTART)`).Scan(&results.Blocked, &results.Pages, &results.Transferred) 88 switch { 89 case err != nil: 90 log.Printf("failed checkpoint: %v\n", err) 91 case results.Blocked == 1: 92 log.Printf("checkpoint: blocked\n") 93 case results.Pages == results.Transferred: 94 log.Printf("checkpoint: %d pages transferred\n", results.Transferred) 95 case results.Pages != results.Transferred: 96 log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred) 97 } 98 99 dbTx, err = dbCnx.BeginTx(ctx, nil) 100 if err != nil { 101 log.Printf("failed to begin transaction: %v\n", err) 102 } 103 104 log.Printf("queue size: %d\n", events.Size()) 105 } 106 } 107}