this repo has no description

refactor: rework shutdowns

Changed files
+35 -32
cmd
mostliked
pkg
+27 -22
cmd/mostliked/main.go
···
"os"
"os/signal"
"syscall"
+
"context"
"github.com/edavis/bsky-feeds/pkg/mostliked"
"github.com/gorilla/websocket"
_ "github.com/mattn/go-sqlite3"
)
+
+
//const JetstreamUrl = `wss://jetstream1.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like&cursor=1728846514000000`
+
const JetstreamUrl = `ws://localhost:6008/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like`
func main() {
-
conn, _, err := websocket.DefaultDialer.Dial("ws://localhost:6008/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like", nil)
+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
defer stop()
+
+
conn, _, err := websocket.DefaultDialer.Dial(JetstreamUrl, nil)
if err != nil {
-
log.Fatal("websocket connection error:", err)
+
log.Fatalf("failed to open websocket: %v\n", err)
}
-
defer conn.Close()
+
defer func() {
+
if err := conn.Close(); err != nil {
+
log.Printf("failed to close websocket: %v\n", err)
+
}
+
log.Println("websocket closed")
+
}()
dbCnx, err := sql.Open("sqlite3", "data/mostliked.db?_journal=WAL&_fk=on")
if err != nil {
-
log.Fatal("error opening db")
+
log.Fatalf("failed to open database: %v\n", err)
}
-
defer dbCnx.Close()
+
defer func() {
+
if err := dbCnx.Close(); err != nil {
+
log.Printf("failed to close db: %v", err)
+
}
+
log.Println("db closed")
+
}()
jetstreamEvents := make(chan []byte)
-
go mostliked.Handler(jetstreamEvents, dbCnx)
-
-
signalChan := make(chan os.Signal, 1)
-
cleanupDone := make(chan struct{})
-
signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
-
-
go func(conn *websocket.Conn, dbCnx *sql.DB, jetstreamEvents chan []byte) {
-
<-signalChan
-
log.Println("shutting down...")
-
conn.Close()
-
dbCnx.Close()
-
close(cleanupDone)
-
}(conn, dbCnx, jetstreamEvents)
+
go mostliked.Handler(ctx, jetstreamEvents, dbCnx)
log.Println("starting up")
-
go func(conn *websocket.Conn) {
+
go func() {
for {
_, message, err := conn.ReadMessage()
if err != nil {
-
log.Println("error reading from websocket:", err)
break
}
jetstreamEvents <- message
}
-
}(conn)
+
}()
-
<-cleanupDone
+
<-ctx.Done()
+
log.Println("shutting down")
}
+1 -1
pkg/mostliked/generator.go
···
ctx := context.Background()
dbCnx, err := sql.Open("sqlite3", "data/mostliked.db?_journal=WAL&_fk=on&mode=ro")
if err != nil {
-
log.Fatal("error opening db")
+
log.Printf("error opening db: %v\n", err)
}
defer dbCnx.Close()
+7 -9
pkg/mostliked/handler.go
···
return ""
}
-
func Handler(events <-chan []byte, dbCnx *sql.DB) {
-
ctx := context.Background()
-
+
func Handler(ctx context.Context, events <-chan []byte, dbCnx *sql.DB) {
if _, err := dbCnx.ExecContext(ctx, ddl); err != nil {
-
log.Fatal("couldn't create tables")
+
log.Printf("couldn't create tables: %v\n", err)
}
queries := db.New(dbCnx)
···
if !txOpen {
dbTx, err = dbCnx.BeginTx(ctx, nil)
if err != nil {
-
log.Fatal(err)
+
log.Printf("failed to begin transaction: %v\n", err)
}
txOpen = true
queriesTx = queries.WithTx(dbTx)
···
continue
}
drafts.Delete(like.Subject.Uri)
-
log.Println("storing", like.Subject.Uri, "in database")
+
// log.Println("storing", like.Subject.Uri, "in database")
err := queriesTx.InsertPost(ctx, db.InsertPostParams{
Uri: like.Subject.Uri,
CreateTs: draftPost.Created,
···
}
eventCount += 1
-
if eventCount % 500 == 0 {
+
if eventCount % 1000 == 0 {
if err := dbTx.Commit(); err != nil {
-
log.Fatalf("commit failed: %v\n", err)
+
log.Printf("commit failed: %v\n", err)
} else {
txOpen = false
}
-
log.Println("db committed")
+
// log.Println("db committed")
}
}
}