this repo has no description

feat: add videostream handler

Eric Davis 14bd694b 80c57926

Changed files
+238 -1
cmd
videostream
pkg
+4 -1
Makefile
···
-
all: bin/mostliked bin/feedweb
+
all: bin/videostream bin/mostliked bin/feedweb
+
+
bin/videostream: cmd/videostream/*.go pkg/videostream/*.go
+
go build -o $@ ./cmd/videostream
bin/mostliked: cmd/mostliked/main.go pkg/mostliked/handler.go db/mostliked/*.go pkg/feeds/*.go
go build -o $@ ./cmd/mostliked
+67
cmd/videostream/main.go
···
+
package main
+
+
import (
+
"context"
+
"database/sql"
+
"log"
+
"os"
+
"os/signal"
+
"syscall"
+
+
jetstream "github.com/bluesky-social/jetstream/pkg/models"
+
"github.com/edavis/bsky-feeds/pkg/videostream"
+
"github.com/gorilla/websocket"
+
_ "github.com/mattn/go-sqlite3"
+
)
+
+
const JetstreamUrl = `wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post`
+
+
func main() {
+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
defer stop()
+
+
conn, _, err := websocket.DefaultDialer.DialContext(ctx, JetstreamUrl, nil)
+
if err != nil {
+
log.Fatalf("failed to open websocket: %v\n", err)
+
}
+
defer func() {
+
if err := conn.Close(); err != nil {
+
log.Printf("failed to close websocket: %v\n", err)
+
}
+
log.Printf("websocket closed\n")
+
}()
+
+
dbCnx, err := sql.Open("sqlite3", "data/videostream.db?_journal=WAL&_fk=on&_timeout=5000&_sync=1&_txlock=immediate")
+
if err != nil {
+
log.Fatalf("failed to open database: %v\n", err)
+
}
+
defer func() {
+
if _, err := dbCnx.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil {
+
log.Printf("error doing final WAL checkpoint: %v\n", err)
+
}
+
if err := dbCnx.Close(); err != nil {
+
log.Printf("failed to close db: %v\n", err)
+
}
+
log.Printf("db closed\n")
+
}()
+
+
queue := videostream.NewQueue(1000)
+
go videostream.Handler(ctx, queue, dbCnx)
+
+
log.Printf("starting up\n")
+
go func() {
+
for {
+
var event jetstream.Event
+
err := conn.ReadJSON(&event)
+
if err != nil {
+
log.Printf("ReadJSON error: %v\n", err)
+
stop()
+
break
+
}
+
queue.Enqueue(event)
+
}
+
}()
+
+
<-ctx.Done()
+
log.Printf("shutting down\n")
+
}
+7
pkg/videostream/checkpoint.go
···
+
package videostream
+
+
type CheckpointResults struct {
+
Blocked int
+
Pages int
+
Transferred int
+
}
+1
pkg/videostream/generator.go
···
+
package videostream
+107
pkg/videostream/handler.go
···
+
package videostream
+
+
import (
+
"context"
+
"database/sql"
+
_ "embed"
+
"encoding/json"
+
"fmt"
+
"log"
+
"time"
+
+
appbsky "github.com/bluesky-social/indigo/api/bsky"
+
jetstream "github.com/bluesky-social/jetstream/pkg/models"
+
"github.com/edavis/bsky-feeds/pkg/feeds"
+
_ "github.com/mattn/go-sqlite3"
+
)
+
+
//go:embed schema.sql
+
var ddl string
+
+
func Handler(ctx context.Context, events *Queue, dbCnx *sql.DB) {
+
var (
+
dbTx *sql.Tx
+
err error
+
eventCount int
+
)
+
+
if _, err = dbCnx.ExecContext(ctx, ddl); err != nil {
+
log.Printf("could not create tables: %v\n", err)
+
}
+
if _, err = dbCnx.ExecContext(ctx, `PRAGMA wal_autocheckpoint = 0`); err != nil {
+
log.Printf("could not set PRAGMA wal_autocheckpoint: %v\n", err)
+
}
+
+
for {
+
select {
+
case <-ctx.Done():
+
return
+
default:
+
}
+
+
event, ok := events.Dequeue()
+
if !ok {
+
time.Sleep(100 * time.Millisecond)
+
continue
+
}
+
+
if dbTx == nil {
+
dbTx, err = dbCnx.BeginTx(ctx, nil)
+
if err != nil {
+
log.Printf("failed to begin transaction: %v\n", err)
+
}
+
}
+
+
if event.Kind != jetstream.EventKindCommit {
+
continue
+
}
+
+
if event.Commit.Operation != jetstream.CommitOperationCreate {
+
continue
+
}
+
+
commit := *event.Commit
+
var post appbsky.FeedPost
+
if err = json.Unmarshal(commit.Record, &post); err != nil {
+
log.Printf("error parsing commit.Record: %v\n", err)
+
continue
+
}
+
+
if post.Embed != nil && post.Embed.EmbedVideo != nil {
+
uri := fmt.Sprintf("at://%s/%s/%s", event.Did, commit.Collection, commit.RKey)
+
ts := feeds.SafeTimestamp(post.CreatedAt)
+
dbTx.ExecContext(ctx, `insert or ignore into posts (uri, create_ts) values (?, ?)`, uri, ts)
+
} else {
+
continue
+
}
+
+
eventCount += 1
+
if eventCount%25 == 0 {
+
// TODO trim
+
+
if err = dbTx.Commit(); err != nil {
+
log.Printf("commit failed: %v\n", err)
+
}
+
+
var results CheckpointResults
+
err = dbCnx.QueryRowContext(ctx, `PRAGMA wal_checkpoint(RESTART)`).Scan(&results.Blocked, &results.Pages, &results.Transferred)
+
switch {
+
case err != nil:
+
log.Printf("failed checkpoint: %v\n", err)
+
case results.Blocked == 1:
+
log.Printf("checkpoint: blocked\n")
+
case results.Pages == results.Transferred:
+
log.Printf("checkpoint: %d pages transferred\n", results.Transferred)
+
case results.Pages != results.Transferred:
+
log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred)
+
}
+
+
dbTx, err = dbCnx.BeginTx(ctx, nil)
+
if err != nil {
+
log.Printf("failed to begin transaction: %v\n", err)
+
}
+
+
log.Printf("queue size: %d\n", events.Size())
+
}
+
}
+
}
+46
pkg/videostream/queue.go
···
+
package videostream
+
+
import (
+
"sync"
+
+
jetstream "github.com/bluesky-social/jetstream/pkg/models"
+
)
+
+
type Queue struct {
+
lk sync.Mutex
+
events []jetstream.Event
+
}
+
+
func NewQueue(capacity int) *Queue {
+
return &Queue{
+
events: make([]jetstream.Event, 0, capacity),
+
}
+
}
+
+
func (q *Queue) Enqueue(event jetstream.Event) {
+
q.lk.Lock()
+
defer q.lk.Unlock()
+
+
q.events = append(q.events, event)
+
}
+
+
func (q *Queue) Dequeue() (jetstream.Event, bool) {
+
q.lk.Lock()
+
defer q.lk.Unlock()
+
+
if len(q.events) == 0 {
+
var e jetstream.Event
+
return e, false
+
}
+
+
event := q.events[0]
+
q.events = q.events[1:]
+
return event, true
+
}
+
+
func (q *Queue) Size() int {
+
q.lk.Lock()
+
defer q.lk.Unlock()
+
+
return len(q.events)
+
}
+6
pkg/videostream/schema.sql
···
+
create table if not exists posts (
+
uri text primary key,
+
create_ts int not null
+
);
+
+
create index if not exists ts_idx on posts(create_ts);