this repo has no description

feat: use transactions

Changed files
+30 -3
pkg
mostliked
+30 -3
pkg/mostliked/handler.go
···
go trimPostsTable(ctx, queries)
+
var (
+
dbTx *sql.Tx
+
queriesTx *db.Queries
+
txOpen bool
+
err error
+
eventCount int
+
)
+
for evt := range events {
+
if !txOpen {
+
dbTx, err = dbCnx.BeginTx(ctx, nil)
+
if err != nil {
+
log.Fatal(err)
+
}
+
txOpen = true
+
queriesTx = queries.WithTx(dbTx)
+
}
+
var like appbsky.FeedLike
var event jetstream.Event
if err := json.Unmarshal(evt, &event); err != nil {
···
}
drafts.Delete(like.Subject.Uri)
log.Println("storing", like.Subject.Uri, "in database")
-
err := queries.InsertPost(ctx, db.InsertPostParams{
+
err := queriesTx.InsertPost(ctx, db.InsertPostParams{
Uri: like.Subject.Uri,
CreateTs: draftPost.Created,
Likes: draftPost.Likes,
···
log.Println("error inserting post")
}
for _, lang := range draftPost.Languages {
-
err = queries.InsertLang(ctx, db.InsertLangParams{
+
err = queriesTx.InsertLang(ctx, db.InsertLangParams{
Uri: like.Subject.Uri,
Lang: strings.ToLower(lang.IsoCode639_1().String()),
})
···
}
}
} else {
-
err := queries.UpdateLikes(ctx, like.Subject.Uri)
+
err := queriesTx.UpdateLikes(ctx, like.Subject.Uri)
if err != nil {
log.Println("error updating likes")
}
+
}
+
+
eventCount += 1
+
if eventCount % 500 == 0 {
+
if err := dbTx.Commit(); err != nil {
+
log.Fatalf("commit failed: %v\n", err)
+
} else {
+
txOpen = false
+
}
+
log.Println("db committed")
}
}
}