this repo has no description
1package main
2
3import (
4 "context"
5 "database/sql"
6 "log"
7 "os"
8 "os/signal"
9 "syscall"
10
11 jetstream "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/edavis/bsky-feeds/pkg/videostream"
13 "github.com/gorilla/websocket"
14 _ "github.com/mattn/go-sqlite3"
15)
16
17const JetstreamUrl = `wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post`
18
19func main() {
20 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
21 defer stop()
22
23 conn, _, err := websocket.DefaultDialer.DialContext(ctx, JetstreamUrl, nil)
24 if err != nil {
25 log.Fatalf("failed to open websocket: %v\n", err)
26 }
27 defer func() {
28 if err := conn.Close(); err != nil {
29 log.Printf("failed to close websocket: %v\n", err)
30 }
31 log.Printf("websocket closed\n")
32 }()
33
34 dbCnx, err := sql.Open("sqlite3", "data/videostream.db?_journal=WAL&_fk=on&_timeout=5000&_sync=1&_txlock=immediate")
35 if err != nil {
36 log.Fatalf("failed to open database: %v\n", err)
37 }
38 defer func() {
39 if _, err := dbCnx.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil {
40 log.Printf("error doing final WAL checkpoint: %v\n", err)
41 }
42 if err := dbCnx.Close(); err != nil {
43 log.Printf("failed to close db: %v\n", err)
44 }
45 log.Printf("db closed\n")
46 }()
47
48 queue := videostream.NewQueue(1000)
49 go videostream.Handler(ctx, queue, dbCnx)
50
51 log.Printf("starting up\n")
52 go func() {
53 for {
54 var event jetstream.Event
55 err := conn.ReadJSON(&event)
56 if err != nil {
57 log.Printf("ReadJSON error: %v\n", err)
58 stop()
59 break
60 }
61 queue.Enqueue(event)
62 }
63 }()
64
65 <-ctx.Done()
66 log.Printf("shutting down\n")
67}