this repo has no description
1package videostream
2
3import (
4 "context"
5 "database/sql"
6 _ "embed"
7 "encoding/json"
8 "fmt"
9 "log"
10 "time"
11
12 appbsky "github.com/bluesky-social/indigo/api/bsky"
13 jetstream "github.com/bluesky-social/jetstream/pkg/models"
14 "github.com/edavis/bsky-feeds/pkg/feeds"
15 _ "github.com/mattn/go-sqlite3"
16)
17
18//go:embed schema.sql
19var ddl string
20
21func Handler(ctx context.Context, events *Queue, dbCnx *sql.DB) {
22 var (
23 dbTx *sql.Tx
24 err error
25 eventCount int
26 )
27
28 if _, err = dbCnx.ExecContext(ctx, ddl); err != nil {
29 log.Printf("could not create tables: %v\n", err)
30 }
31 if _, err = dbCnx.ExecContext(ctx, `PRAGMA wal_autocheckpoint = 0`); err != nil {
32 log.Printf("could not set PRAGMA wal_autocheckpoint: %v\n", err)
33 }
34
35 for {
36 select {
37 case <-ctx.Done():
38 return
39 default:
40 }
41
42 event, ok := events.Dequeue()
43 if !ok {
44 time.Sleep(100 * time.Millisecond)
45 continue
46 }
47
48 if dbTx == nil {
49 dbTx, err = dbCnx.BeginTx(ctx, nil)
50 if err != nil {
51 log.Printf("failed to begin transaction: %v\n", err)
52 }
53 }
54
55 if event.Kind != jetstream.EventKindCommit {
56 continue
57 }
58
59 if event.Commit.Operation != jetstream.CommitOperationCreate {
60 continue
61 }
62
63 commit := *event.Commit
64 var post appbsky.FeedPost
65 if err = json.Unmarshal(commit.Record, &post); err != nil {
66 log.Printf("error parsing commit.Record: %v\n", err)
67 continue
68 }
69
70 if post.Embed != nil && post.Embed.EmbedVideo != nil {
71 uri := fmt.Sprintf("at://%s/%s/%s", event.Did, commit.Collection, commit.RKey)
72 ts := feeds.SafeTimestamp(post.CreatedAt)
73 dbTx.ExecContext(ctx, `insert or ignore into posts (uri, create_ts) values (?, ?)`, uri, ts)
74 } else {
75 continue
76 }
77
78 eventCount += 1
79 if eventCount%25 == 0 {
80 // TODO trim
81
82 if err = dbTx.Commit(); err != nil {
83 log.Printf("commit failed: %v\n", err)
84 }
85
86 var results CheckpointResults
87 err = dbCnx.QueryRowContext(ctx, `PRAGMA wal_checkpoint(RESTART)`).Scan(&results.Blocked, &results.Pages, &results.Transferred)
88 switch {
89 case err != nil:
90 log.Printf("failed checkpoint: %v\n", err)
91 case results.Blocked == 1:
92 log.Printf("checkpoint: blocked\n")
93 case results.Pages == results.Transferred:
94 log.Printf("checkpoint: %d pages transferred\n", results.Transferred)
95 case results.Pages != results.Transferred:
96 log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred)
97 }
98
99 dbTx, err = dbCnx.BeginTx(ctx, nil)
100 if err != nil {
101 log.Printf("failed to begin transaction: %v\n", err)
102 }
103
104 log.Printf("queue size: %d\n", events.Size())
105 }
106 }
107}