feat(db/xp): improve logging #23

merged
opened by brookjeynes.dev targeting master from push-xlxvwwpkrpwv
Changed files
+53 -43
internal
consumer
db
+35 -35
internal/consumer/ingester.go
···
}
}()
-
l := i.Logger.With("kind", e.Kind)
switch e.Kind {
case models.EventKindCommit:
switch e.Commit.Collection {
case yoten.ActorProfileNSID:
l = l.With("handler", "ingestProfile")
-
err = i.ingestProfile(e)
case yoten.FeedSessionNSID:
l = l.With("handler", "ingestStudySession")
-
err = i.ingestStudySession(e)
case yoten.ActivityDefNSID:
l = l.With("handler", "ingestActivityDef")
-
err = i.ingestActivityDef(e)
case yoten.FeedResourceNSID:
l = l.With("handler", "ingestResource")
-
err = i.ingestResource(e)
case yoten.GraphFollowNSID:
l = l.With("handler", "ingestFollow")
-
err = i.ingestFollow(e)
case yoten.FeedReactionNSID:
l = l.With("handler", "ingestReaction")
-
err = i.ingestReaction(e)
case yoten.FeedCommentNSID:
l = l.With("handler", "ingestComment")
-
err = i.ingestComment(e)
}
-
l = i.Logger.With("nsid", e.Commit.Collection)
}
if err != nil {
l.Error("failed to ingest event", "err", err)
···
}
}
-
func (i *Ingester) ingestProfile(e *models.Event) error {
did := e.Did
if e.Commit.RKey != "self" {
···
return fmt.Errorf("failed to start transaction: %w", err)
}
-
i.Logger.Debug("upserting profile from pds request")
err = db.UpsertProfile(tx, &profile)
if err != nil {
tx.Rollback()
···
return nil
}
-
func (i *Ingester) ingestStudySession(e *models.Event) error {
did := e.Did
switch e.Commit.Operation {
···
date, err := time.Parse(time.RFC3339, record.Date)
if err != nil {
-
i.Logger.Error("invalid record", "err", err)
return err
}
···
return fmt.Errorf("failed to start transaction: %w", err)
}
-
i.Logger.Debug("upserting study session from pds request")
err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey)
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to upsert study session record: %w", err)
}
-
err = db.UpdateXPForSession(tx, &studySession)
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to add xp for session: %w", err)
···
return fmt.Errorf("failed to start transaction: %w", err)
}
-
i.Logger.Debug("deleting study session from pds request")
err = db.RemoveXPForSession(tx, did, e.Commit.RKey, logger)
if err != nil {
tx.Rollback()
···
return nil
}
-
func (i *Ingester) ingestActivityDef(e *models.Event) error {
did := e.Did
var err error
···
return fmt.Errorf("failed to start transaction: %w", err)
}
-
i.Logger.Debug("upserting activity def from pds request")
err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey)
if err != nil {
tx.Rollback()
···
}
return tx.Commit()
case models.CommitOperationDelete:
-
i.Logger.Debug("deleting activity def from pds request")
err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey)
}
if err != nil {
···
return nil
}
-
func (i *Ingester) ingestFollow(e *models.Event) error {
var err error
did := e.Did
···
subjectDid := record.Subject
-
i.Logger.Debug("upserting follow from pds request")
err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey)
if err != nil {
tx.Rollback()
···
subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey)
err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow)
if err != nil {
-
i.Logger.Error("failed to create notification record", "err", err)
}
return tx.Commit()
case models.CommitOperationDelete:
-
i.Logger.Debug("deleting follow from pds request")
err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
}
if err != nil {
···
return nil
}
-
func (i *Ingester) ingestReaction(e *models.Event) error {
var err error
did := e.Did
···
CreatedAt: createdAt,
}
-
i.Logger.Debug("upserting reaction from pds request")
err = db.UpsertReaction(i.Db, reactionEvent)
if err != nil {
tx.Rollback()
···
err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction)
if err != nil {
-
i.Logger.Error("failed to create notification record", "err", err)
}
return tx.Commit()
case models.CommitOperationDelete:
-
i.Logger.Debug("deleting reaction from pds request")
err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey)
}
if err != nil {
···
return nil
}
-
func (i *Ingester) ingestResource(e *models.Event) error {
var err error
did := e.Did
···
return fmt.Errorf("invalid resource: %w", err)
}
-
i.Logger.Debug("upserting resource from pds request")
err = db.UpsertResource(i.Db, resource, resource.Rkey)
if err != nil {
tx.Rollback()
···
}
return tx.Commit()
case models.CommitOperationDelete:
-
i.Logger.Debug("deleting resource from pds request")
err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey)
}
if err != nil {
···
return nil
}
-
func (i *Ingester) ingestComment(e *models.Event) error {
var err error
did := e.Did
···
CreatedAt: createdAt,
}
-
i.Logger.Debug("upserting comment from pds request")
err = db.UpsertComment(i.Db, comment)
if err != nil {
tx.Rollback()
···
if subjectDid.String() != did {
err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment)
if err != nil {
-
i.Logger.Error("failed to create notification record", "err", err)
}
}
···
if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did {
err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply)
if err != nil {
-
i.Logger.Error("failed to create notification record", "err", err)
}
}
return tx.Commit()
case models.CommitOperationDelete:
-
i.Logger.Debug("deleting comment from pds request")
err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey)
}
if err != nil {
···
}
}()
+
l := i.Logger.With("kind", e.Kind).With("did", e.Did)
switch e.Kind {
case models.EventKindCommit:
+
l = i.Logger.With("nsid", e.Commit.Collection).With("rkey", e.Commit.RKey)
switch e.Commit.Collection {
case yoten.ActorProfileNSID:
l = l.With("handler", "ingestProfile")
+
err = i.ingestProfile(e, l)
case yoten.FeedSessionNSID:
l = l.With("handler", "ingestStudySession")
+
err = i.ingestStudySession(e, l)
case yoten.ActivityDefNSID:
l = l.With("handler", "ingestActivityDef")
+
err = i.ingestActivityDef(e, l)
case yoten.FeedResourceNSID:
l = l.With("handler", "ingestResource")
+
err = i.ingestResource(e, l)
case yoten.GraphFollowNSID:
l = l.With("handler", "ingestFollow")
+
err = i.ingestFollow(e, l)
case yoten.FeedReactionNSID:
l = l.With("handler", "ingestReaction")
+
err = i.ingestReaction(e, l)
case yoten.FeedCommentNSID:
l = l.With("handler", "ingestComment")
+
err = i.ingestComment(e, l)
}
}
if err != nil {
l.Error("failed to ingest event", "err", err)
···
}
}
+
func (i *Ingester) ingestProfile(e *models.Event, logger *slog.Logger) error {
did := e.Did
if e.Commit.RKey != "self" {
···
return fmt.Errorf("failed to start transaction: %w", err)
}
+
logger.Debug("upserting profile from pds request")
err = db.UpsertProfile(tx, &profile)
if err != nil {
tx.Rollback()
···
return nil
}
+
func (i *Ingester) ingestStudySession(e *models.Event, logger *slog.Logger) error {
did := e.Did
switch e.Commit.Operation {
···
date, err := time.Parse(time.RFC3339, record.Date)
if err != nil {
+
logger.Error("invalid record", "err", err)
return err
}
···
return fmt.Errorf("failed to start transaction: %w", err)
}
+
logger.Debug("upserting study session from pds request")
err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey)
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to upsert study session record: %w", err)
}
+
err = db.UpdateXPForSession(tx, &studySession, logger)
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to add xp for session: %w", err)
···
return fmt.Errorf("failed to start transaction: %w", err)
}
+
logger.Debug("deleting study session from pds request")
err = db.RemoveXPForSession(tx, did, e.Commit.RKey, logger)
if err != nil {
tx.Rollback()
···
return nil
}
+
func (i *Ingester) ingestActivityDef(e *models.Event, logger *slog.Logger) error {
did := e.Did
var err error
···
return fmt.Errorf("failed to start transaction: %w", err)
}
+
logger.Debug("upserting activity def from pds request")
err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey)
if err != nil {
tx.Rollback()
···
}
return tx.Commit()
case models.CommitOperationDelete:
+
logger.Debug("deleting activity def from pds request")
err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey)
}
if err != nil {
···
return nil
}
+
func (i *Ingester) ingestFollow(e *models.Event, logger *slog.Logger) error {
var err error
did := e.Did
···
subjectDid := record.Subject
+
logger.Debug("upserting follow from pds request")
err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey)
if err != nil {
tx.Rollback()
···
subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey)
err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow)
if err != nil {
+
logger.Error("failed to create notification record", "err", err)
}
return tx.Commit()
case models.CommitOperationDelete:
+
logger.Debug("deleting follow from pds request")
err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
}
if err != nil {
···
return nil
}
+
func (i *Ingester) ingestReaction(e *models.Event, logger *slog.Logger) error {
var err error
did := e.Did
···
CreatedAt: createdAt,
}
+
logger.Debug("upserting reaction from pds request")
err = db.UpsertReaction(i.Db, reactionEvent)
if err != nil {
tx.Rollback()
···
err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction)
if err != nil {
+
logger.Error("failed to create notification record", "err", err)
}
return tx.Commit()
case models.CommitOperationDelete:
+
logger.Debug("deleting reaction from pds request")
err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey)
}
if err != nil {
···
return nil
}
+
func (i *Ingester) ingestResource(e *models.Event, logger *slog.Logger) error {
var err error
did := e.Did
···
return fmt.Errorf("invalid resource: %w", err)
}
+
logger.Debug("upserting resource from pds request")
err = db.UpsertResource(i.Db, resource, resource.Rkey)
if err != nil {
tx.Rollback()
···
}
return tx.Commit()
case models.CommitOperationDelete:
+
logger.Debug("deleting resource from pds request")
err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey)
}
if err != nil {
···
return nil
}
+
func (i *Ingester) ingestComment(e *models.Event, logger *slog.Logger) error {
var err error
did := e.Did
···
CreatedAt: createdAt,
}
+
logger.Debug("upserting comment from pds request")
err = db.UpsertComment(i.Db, comment)
if err != nil {
tx.Rollback()
···
if subjectDid.String() != did {
err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment)
if err != nil {
+
logger.Error("failed to create notification record", "err", err)
}
}
···
if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did {
err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply)
if err != nil {
+
logger.Error("failed to create notification record", "err", err)
}
}
return tx.Commit()
case models.CommitOperationDelete:
+
logger.Debug("deleting comment from pds request")
err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey)
}
if err != nil {
+18 -8
internal/db/xp.go
···
import (
"database/sql"
"fmt"
-
"log"
"math"
"time"
)
···
return nil
}
-
func GetXPEventsForUser(e Execer, did string) ([]XpEvent, error) {
rows, err := e.Query(`
select did, session_rkey, xp_gained, created_at
from xp_events
···
createdAtStr,
)
if err != nil {
-
log.Println("failed to scan xp event:", err)
continue
}
···
return xpEvents, nil
}
-
func UpdateXPForSession(e Execer, updatedSession *StudySession) error {
var oldXPGained int
err := e.QueryRow(`
select xp_gained from xp_events where did = ? and session_rkey = ?
`, updatedSession.Did, updatedSession.Rkey).Scan(&oldXPGained)
if err != nil {
if err == sql.ErrNoRows {
-
log.Println("adding xp for new session")
return AddXPForSession(e, updatedSession.Did, *updatedSession)
}
return fmt.Errorf("failed to get old xp for session: %w", err)
···
return nil
}
_, err = e.Exec(`
insert into xp_events (did, session_rkey, xp_gained)
values (?, ?, ?)
···
return nil
}
-
func RemoveXPForSession(e Execer, did string, sessionRkey string) error {
var xpToRemove int
err := e.QueryRow(`
select xp_gained from xp_events where did = ? and session_rkey = ?
`, did, sessionRkey).Scan(&xpToRemove)
-
if err != nil {
if err == sql.ErrNoRows {
-
log.Printf("no xp event found for session '%s', nothing to remove.", sessionRkey)
return nil
}
return fmt.Errorf("failed to get xp for session being deleted: %w", err)
···
return fmt.Errorf("failed to delete xp_event: %w", err)
}
var currentTotalXP int
err = e.QueryRow("select xp from profiles where did = ?", did).Scan(&currentTotalXP)
if err != nil {
···
import (
"database/sql"
"fmt"
+
"log/slog"
"math"
"time"
)
···
return nil
}
+
func GetXPEventsForUser(e Execer, did string, logger *slog.Logger) ([]XpEvent, error) {
+
l := logger.With("handler", "GetXPEventsForUser")
+
rows, err := e.Query(`
select did, session_rkey, xp_gained, created_at
from xp_events
···
createdAtStr,
)
if err != nil {
+
l.Error("failed to find xp_event", "err", err)
continue
}
···
return xpEvents, nil
}
+
func UpdateXPForSession(e Execer, updatedSession *StudySession, logger *slog.Logger) error {
+
l := logger.With("handler", "UpdateXPForSession")
+
var oldXPGained int
err := e.QueryRow(`
select xp_gained from xp_events where did = ? and session_rkey = ?
`, updatedSession.Did, updatedSession.Rkey).Scan(&oldXPGained)
if err != nil {
if err == sql.ErrNoRows {
+
l.Debug("adding xp for session")
return AddXPForSession(e, updatedSession.Did, *updatedSession)
}
return fmt.Errorf("failed to get old xp for session: %w", err)
···
return nil
}
+
l.Debug("updating xp for session")
+
_, err = e.Exec(`
insert into xp_events (did, session_rkey, xp_gained)
values (?, ?, ?)
···
return nil
}
+
func RemoveXPForSession(e Execer, did string, sessionRkey string, logger *slog.Logger) error {
+
l := logger.With("handler", "RemoveXPForSession")
+
var xpToRemove int
+
err := e.QueryRow(`
select xp_gained from xp_events where did = ? and session_rkey = ?
`, did, sessionRkey).Scan(&xpToRemove)
if err != nil {
if err == sql.ErrNoRows {
+
l.Debug("xp_event not found, nothing to remove")
return nil
}
return fmt.Errorf("failed to get xp for session being deleted: %w", err)
···
return fmt.Errorf("failed to delete xp_event: %w", err)
}
+
l.Debug("deleted xp_event")
+
var currentTotalXP int
err = e.QueryRow("select xp from profiles where did = ?", did).Scan(&currentTotalXP)
if err != nil {