package videostream import ( "context" "database/sql" _ "embed" "encoding/json" "fmt" "log" "time" appbsky "github.com/bluesky-social/indigo/api/bsky" jetstream "github.com/bluesky-social/jetstream/pkg/models" "github.com/edavis/bsky-feeds/pkg/feeds" _ "github.com/mattn/go-sqlite3" ) //go:embed schema.sql var ddl string func Handler(ctx context.Context, events *Queue, dbCnx *sql.DB) { var ( dbTx *sql.Tx err error eventCount int ) if _, err = dbCnx.ExecContext(ctx, ddl); err != nil { log.Printf("could not create tables: %v\n", err) } if _, err = dbCnx.ExecContext(ctx, `PRAGMA wal_autocheckpoint = 0`); err != nil { log.Printf("could not set PRAGMA wal_autocheckpoint: %v\n", err) } for { select { case <-ctx.Done(): return default: } event, ok := events.Dequeue() if !ok { time.Sleep(100 * time.Millisecond) continue } if dbTx == nil { dbTx, err = dbCnx.BeginTx(ctx, nil) if err != nil { log.Printf("failed to begin transaction: %v\n", err) } } if event.Kind != jetstream.EventKindCommit { continue } if event.Commit.Operation != jetstream.CommitOperationCreate { continue } commit := *event.Commit var post appbsky.FeedPost if err = json.Unmarshal(commit.Record, &post); err != nil { log.Printf("error parsing commit.Record: %v\n", err) continue } if post.Embed != nil && post.Embed.EmbedVideo != nil { uri := fmt.Sprintf("at://%s/%s/%s", event.Did, commit.Collection, commit.RKey) ts := feeds.SafeTimestamp(post.CreatedAt) dbTx.ExecContext(ctx, `insert or ignore into posts (uri, create_ts) values (?, ?)`, uri, ts) } else { continue } eventCount += 1 if eventCount%25 == 0 { // TODO trim if err = dbTx.Commit(); err != nil { log.Printf("commit failed: %v\n", err) } var results CheckpointResults err = dbCnx.QueryRowContext(ctx, `PRAGMA wal_checkpoint(RESTART)`).Scan(&results.Blocked, &results.Pages, &results.Transferred) switch { case err != nil: log.Printf("failed checkpoint: %v\n", err) case results.Blocked == 1: log.Printf("checkpoint: blocked\n") case results.Pages == results.Transferred: log.Printf("checkpoint: %d pages transferred\n", results.Transferred) case results.Pages != results.Transferred: log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred) } dbTx, err = dbCnx.BeginTx(ctx, nil) if err != nil { log.Printf("failed to begin transaction: %v\n", err) } log.Printf("queue size: %d\n", events.Size()) } } }