From fe7e3a1614cb85b732d78fc223479e5379f48e17 Mon Sep 17 00:00:00 2001 From: brookjeynes Date: Thu, 16 Oct 2025 18:16:16 +1000 Subject: [PATCH] feat(db/xp): improve logging Change-Id: nnxyqszptutqvwsktylqqyquznnkrqmq Signed-off-by: brookjeynes --- internal/consumer/ingester.go | 70 +++++++++++++++++------------------ internal/db/xp.go | 26 +++++++++---- 2 files changed, 53 insertions(+), 43 deletions(-) diff --git a/internal/consumer/ingester.go b/internal/consumer/ingester.go index 63bec44..5360f94 100644 --- a/internal/consumer/ingester.go +++ b/internal/consumer/ingester.go @@ -36,33 +36,33 @@ func (i *Ingester) Ingest() processFunc { } }() - l := i.Logger.With("kind", e.Kind) + l := i.Logger.With("kind", e.Kind).With("did", e.Did) switch e.Kind { case models.EventKindCommit: + l = i.Logger.With("nsid", e.Commit.Collection).With("rkey", e.Commit.RKey) switch e.Commit.Collection { case yoten.ActorProfileNSID: l = l.With("handler", "ingestProfile") - err = i.ingestProfile(e) + err = i.ingestProfile(e, l) case yoten.FeedSessionNSID: l = l.With("handler", "ingestStudySession") - err = i.ingestStudySession(e) + err = i.ingestStudySession(e, l) case yoten.ActivityDefNSID: l = l.With("handler", "ingestActivityDef") - err = i.ingestActivityDef(e) + err = i.ingestActivityDef(e, l) case yoten.FeedResourceNSID: l = l.With("handler", "ingestResource") - err = i.ingestResource(e) + err = i.ingestResource(e, l) case yoten.GraphFollowNSID: l = l.With("handler", "ingestFollow") - err = i.ingestFollow(e) + err = i.ingestFollow(e, l) case yoten.FeedReactionNSID: l = l.With("handler", "ingestReaction") - err = i.ingestReaction(e) + err = i.ingestReaction(e, l) case yoten.FeedCommentNSID: l = l.With("handler", "ingestComment") - err = i.ingestComment(e) + err = i.ingestComment(e, l) } - l = i.Logger.With("nsid", e.Commit.Collection) } if err != nil { l.Error("failed to ingest event", "err", err) @@ -72,7 +72,7 @@ func (i *Ingester) Ingest() processFunc { } } -func (i *Ingester) ingestProfile(e *models.Event) error { +func (i *Ingester) ingestProfile(e *models.Event, logger *slog.Logger) error { did := e.Did if e.Commit.RKey != "self" { @@ -139,7 +139,7 @@ func (i *Ingester) ingestProfile(e *models.Event) error { return fmt.Errorf("failed to start transaction: %w", err) } - i.Logger.Debug("upserting profile from pds request") + logger.Debug("upserting profile from pds request") err = db.UpsertProfile(tx, &profile) if err != nil { tx.Rollback() @@ -151,7 +151,7 @@ func (i *Ingester) ingestProfile(e *models.Event) error { return nil } -func (i *Ingester) ingestStudySession(e *models.Event) error { +func (i *Ingester) ingestStudySession(e *models.Event, logger *slog.Logger) error { did := e.Did switch e.Commit.Operation { @@ -170,7 +170,7 @@ func (i *Ingester) ingestStudySession(e *models.Event) error { date, err := time.Parse(time.RFC3339, record.Date) if err != nil { - i.Logger.Error("invalid record", "err", err) + logger.Error("invalid record", "err", err) return err } @@ -237,14 +237,14 @@ func (i *Ingester) ingestStudySession(e *models.Event) error { return fmt.Errorf("failed to start transaction: %w", err) } - i.Logger.Debug("upserting study session from pds request") + logger.Debug("upserting study session from pds request") err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey) if err != nil { tx.Rollback() return fmt.Errorf("failed to upsert study session record: %w", err) } - err = db.UpdateXPForSession(tx, &studySession) + err = db.UpdateXPForSession(tx, &studySession, logger) if err != nil { tx.Rollback() return fmt.Errorf("failed to add xp for session: %w", err) @@ -262,7 +262,7 @@ func (i *Ingester) ingestStudySession(e *models.Event) error { return fmt.Errorf("failed to start transaction: %w", err) } - i.Logger.Debug("deleting study session from pds request") + logger.Debug("deleting study session from pds request") err = db.RemoveXPForSession(tx, did, e.Commit.RKey, logger) if err != nil { tx.Rollback() @@ -281,7 +281,7 @@ func (i *Ingester) ingestStudySession(e *models.Event) error { return nil } -func (i *Ingester) ingestActivityDef(e *models.Event) error { +func (i *Ingester) ingestActivityDef(e *models.Event, logger *slog.Logger) error { did := e.Did var err error @@ -354,7 +354,7 @@ func (i *Ingester) ingestActivityDef(e *models.Event) error { return fmt.Errorf("failed to start transaction: %w", err) } - i.Logger.Debug("upserting activity def from pds request") + logger.Debug("upserting activity def from pds request") err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey) if err != nil { tx.Rollback() @@ -362,7 +362,7 @@ func (i *Ingester) ingestActivityDef(e *models.Event) error { } return tx.Commit() case models.CommitOperationDelete: - i.Logger.Debug("deleting activity def from pds request") + logger.Debug("deleting activity def from pds request") err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey) } if err != nil { @@ -372,7 +372,7 @@ func (i *Ingester) ingestActivityDef(e *models.Event) error { return nil } -func (i *Ingester) ingestFollow(e *models.Event) error { +func (i *Ingester) ingestFollow(e *models.Event, logger *slog.Logger) error { var err error did := e.Did @@ -397,7 +397,7 @@ func (i *Ingester) ingestFollow(e *models.Event) error { subjectDid := record.Subject - i.Logger.Debug("upserting follow from pds request") + logger.Debug("upserting follow from pds request") err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey) if err != nil { tx.Rollback() @@ -407,12 +407,12 @@ func (i *Ingester) ingestFollow(e *models.Event) error { subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey) err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow) if err != nil { - i.Logger.Error("failed to create notification record", "err", err) + logger.Error("failed to create notification record", "err", err) } return tx.Commit() case models.CommitOperationDelete: - i.Logger.Debug("deleting follow from pds request") + logger.Debug("deleting follow from pds request") err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) } if err != nil { @@ -422,7 +422,7 @@ func (i *Ingester) ingestFollow(e *models.Event) error { return nil } -func (i *Ingester) ingestReaction(e *models.Event) error { +func (i *Ingester) ingestReaction(e *models.Event, logger *slog.Logger) error { var err error did := e.Did @@ -475,7 +475,7 @@ func (i *Ingester) ingestReaction(e *models.Event) error { CreatedAt: createdAt, } - i.Logger.Debug("upserting reaction from pds request") + logger.Debug("upserting reaction from pds request") err = db.UpsertReaction(i.Db, reactionEvent) if err != nil { tx.Rollback() @@ -484,12 +484,12 @@ func (i *Ingester) ingestReaction(e *models.Event) error { err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction) if err != nil { - i.Logger.Error("failed to create notification record", "err", err) + logger.Error("failed to create notification record", "err", err) } return tx.Commit() case models.CommitOperationDelete: - i.Logger.Debug("deleting reaction from pds request") + logger.Debug("deleting reaction from pds request") err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey) } if err != nil { @@ -499,7 +499,7 @@ func (i *Ingester) ingestReaction(e *models.Event) error { return nil } -func (i *Ingester) ingestResource(e *models.Event) error { +func (i *Ingester) ingestResource(e *models.Event, logger *slog.Logger) error { var err error did := e.Did @@ -556,7 +556,7 @@ func (i *Ingester) ingestResource(e *models.Event) error { return fmt.Errorf("invalid resource: %w", err) } - i.Logger.Debug("upserting resource from pds request") + logger.Debug("upserting resource from pds request") err = db.UpsertResource(i.Db, resource, resource.Rkey) if err != nil { tx.Rollback() @@ -564,7 +564,7 @@ func (i *Ingester) ingestResource(e *models.Event) error { } return tx.Commit() case models.CommitOperationDelete: - i.Logger.Debug("deleting resource from pds request") + logger.Debug("deleting resource from pds request") err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey) } if err != nil { @@ -574,7 +574,7 @@ func (i *Ingester) ingestResource(e *models.Event) error { return nil } -func (i *Ingester) ingestComment(e *models.Event) error { +func (i *Ingester) ingestComment(e *models.Event, logger *slog.Logger) error { var err error did := e.Did @@ -636,7 +636,7 @@ func (i *Ingester) ingestComment(e *models.Event) error { CreatedAt: createdAt, } - i.Logger.Debug("upserting comment from pds request") + logger.Debug("upserting comment from pds request") err = db.UpsertComment(i.Db, comment) if err != nil { tx.Rollback() @@ -647,7 +647,7 @@ func (i *Ingester) ingestComment(e *models.Event) error { if subjectDid.String() != did { err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment) if err != nil { - i.Logger.Error("failed to create notification record", "err", err) + logger.Error("failed to create notification record", "err", err) } } @@ -655,13 +655,13 @@ func (i *Ingester) ingestComment(e *models.Event) error { if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did { err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply) if err != nil { - i.Logger.Error("failed to create notification record", "err", err) + logger.Error("failed to create notification record", "err", err) } } return tx.Commit() case models.CommitOperationDelete: - i.Logger.Debug("deleting comment from pds request") + logger.Debug("deleting comment from pds request") err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey) } if err != nil { diff --git a/internal/db/xp.go b/internal/db/xp.go index f537f88..e27f2e2 100644 --- a/internal/db/xp.go +++ b/internal/db/xp.go @@ -3,7 +3,7 @@ package db import ( "database/sql" "fmt" - "log" + "log/slog" "math" "time" ) @@ -105,7 +105,9 @@ func updateUserXPAndLevel(e Execer, did string, xpGained int) error { return nil } -func GetXPEventsForUser(e Execer, did string) ([]XpEvent, error) { +func GetXPEventsForUser(e Execer, did string, logger *slog.Logger) ([]XpEvent, error) { + l := logger.With("handler", "GetXPEventsForUser") + rows, err := e.Query(` select did, session_rkey, xp_gained, created_at from xp_events @@ -129,7 +131,7 @@ func GetXPEventsForUser(e Execer, did string) ([]XpEvent, error) { createdAtStr, ) if err != nil { - log.Println("failed to scan xp event:", err) + l.Error("failed to find xp_event", "err", err) continue } @@ -148,14 +150,16 @@ func GetXPEventsForUser(e Execer, did string) ([]XpEvent, error) { return xpEvents, nil } -func UpdateXPForSession(e Execer, updatedSession *StudySession) error { +func UpdateXPForSession(e Execer, updatedSession *StudySession, logger *slog.Logger) error { + l := logger.With("handler", "UpdateXPForSession") + var oldXPGained int err := e.QueryRow(` select xp_gained from xp_events where did = ? and session_rkey = ? `, updatedSession.Did, updatedSession.Rkey).Scan(&oldXPGained) if err != nil { if err == sql.ErrNoRows { - log.Println("adding xp for new session") + l.Debug("adding xp for session") return AddXPForSession(e, updatedSession.Did, *updatedSession) } return fmt.Errorf("failed to get old xp for session: %w", err) @@ -166,6 +170,8 @@ func UpdateXPForSession(e Execer, updatedSession *StudySession) error { return nil } + l.Debug("updating xp for session") + _, err = e.Exec(` insert into xp_events (did, session_rkey, xp_gained) values (?, ?, ?) @@ -194,15 +200,17 @@ func UpdateXPForSession(e Execer, updatedSession *StudySession) error { return nil } -func RemoveXPForSession(e Execer, did string, sessionRkey string) error { +func RemoveXPForSession(e Execer, did string, sessionRkey string, logger *slog.Logger) error { + l := logger.With("handler", "RemoveXPForSession") + var xpToRemove int + err := e.QueryRow(` select xp_gained from xp_events where did = ? and session_rkey = ? `, did, sessionRkey).Scan(&xpToRemove) - if err != nil { if err == sql.ErrNoRows { - log.Printf("no xp event found for session '%s', nothing to remove.", sessionRkey) + l.Debug("xp_event not found, nothing to remove") return nil } return fmt.Errorf("failed to get xp for session being deleted: %w", err) @@ -213,6 +221,8 @@ func RemoveXPForSession(e Execer, did string, sessionRkey string) error { return fmt.Errorf("failed to delete xp_event: %w", err) } + l.Debug("deleted xp_event") + var currentTotalXP int err = e.QueryRow("select xp from profiles where did = ?", did).Scan(¤tTotalXP) if err != nil { -- 2.43.0