From b598c0404f217a2051d56bd2ab4562a3016e4e1d Mon Sep 17 00:00:00 2001 From: brookjeynes Date: Wed, 15 Oct 2025 12:18:27 +1000 Subject: [PATCH] feat(consumer/ingester): use slogger Change-Id: vxnqqryvltmwrxrrtxvvspklprxowpku Signed-off-by: brookjeynes --- internal/consumer/ingester.go | 50 +++++++++++++++++++++-------------- internal/server/app.go | 6 +++-- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/internal/consumer/ingester.go b/internal/consumer/ingester.go index af23b09..571332b 100644 --- a/internal/consumer/ingester.go +++ b/internal/consumer/ingester.go @@ -4,7 +4,7 @@ import ( "context" "encoding/json" "fmt" - "log" + "log/slog" "strings" "time" @@ -20,6 +20,7 @@ import ( type Ingester struct { Db db.DbWrapper Config *config.Config + Logger *slog.Logger } type processFunc func(ctx context.Context, e *models.Event) error @@ -35,27 +36,36 @@ func (i *Ingester) Ingest() processFunc { } }() + l := i.Logger.With("kind", e.Kind) switch e.Kind { case models.EventKindCommit: switch e.Commit.Collection { case yoten.ActorProfileNSID: + l = l.With("handler", "ingestProfile") err = i.ingestProfile(e) case yoten.FeedSessionNSID: + l = l.With("handler", "ingestStudySession") err = i.ingestStudySession(e) case yoten.ActivityDefNSID: + l = l.With("handler", "ingestActivityDef") err = i.ingestActivityDef(e) case yoten.FeedResourceNSID: + l = l.With("handler", "ingestResource") err = i.ingestResource(e) case yoten.GraphFollowNSID: + l = l.With("handler", "ingestFollow") err = i.ingestFollow(e) case yoten.FeedReactionNSID: + l = l.With("handler", "ingestReaction") err = i.ingestReaction(e) case yoten.FeedCommentNSID: + l = l.With("handler", "ingestComment") err = i.ingestComment(e) } + l = i.Logger.With("nsid", e.Commit.Collection) } if err != nil { - log.Printf("failed to ingest event for collection %s: %v", e.Commit.Collection, err) + l.Error("failed to ingest event", "err", err) } return nil @@ -129,7 +139,7 @@ func (i *Ingester) ingestProfile(e *models.Event) error { return fmt.Errorf("failed to start transaction: %w", err) } - log.Printf("upserting profile '%s' from pds request", profile.Did) + i.Logger.Debug("upserting profile from pds request") err = db.UpsertProfile(tx, &profile) if err != nil { tx.Rollback() @@ -160,7 +170,7 @@ func (i *Ingester) ingestStudySession(e *models.Event) error { date, err := time.Parse(time.RFC3339, record.Date) if err != nil { - log.Printf("invalid record: %s", err) + i.Logger.Error("invalid record", "err", err) return err } @@ -227,7 +237,7 @@ func (i *Ingester) ingestStudySession(e *models.Event) error { return fmt.Errorf("failed to start transaction: %w", err) } - log.Println("upserting study session from pds request") + i.Logger.Debug("upserting study session from pds request") err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey) if err != nil { tx.Rollback() @@ -252,7 +262,7 @@ func (i *Ingester) ingestStudySession(e *models.Event) error { return fmt.Errorf("failed to start transaction: %w", err) } - log.Println("deleting study session from pds request") + i.Logger.Debug("deleting study session from pds request") err = db.DeleteStudySessionByRkey(tx, did, e.Commit.RKey) if err != nil { tx.Rollback() @@ -344,7 +354,7 @@ func (i *Ingester) ingestActivityDef(e *models.Event) error { return fmt.Errorf("failed to start transaction: %w", err) } - log.Println("upserting activity def from pds request") + i.Logger.Debug("upserting activity def from pds request") err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey) if err != nil { tx.Rollback() @@ -352,7 +362,7 @@ func (i *Ingester) ingestActivityDef(e *models.Event) error { } return tx.Commit() case models.CommitOperationDelete: - log.Println("deleting activity def from pds request") + i.Logger.Debug("deleting activity def from pds request") err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey) } if err != nil { @@ -387,7 +397,7 @@ func (i *Ingester) ingestFollow(e *models.Event) error { subjectDid := record.Subject - log.Println("upserting follow from pds request") + i.Logger.Debug("upserting follow from pds request") err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey) if err != nil { tx.Rollback() @@ -397,12 +407,12 @@ func (i *Ingester) ingestFollow(e *models.Event) error { subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey) err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow) if err != nil { - log.Println("failed to create notification record:", err) + i.Logger.Error("failed to create notification record", "err", err) } return tx.Commit() case models.CommitOperationDelete: - log.Println("deleting follow from pds request") + i.Logger.Debug("deleting follow from pds request") err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) } if err != nil { @@ -465,7 +475,7 @@ func (i *Ingester) ingestReaction(e *models.Event) error { CreatedAt: createdAt, } - log.Println("upserting reaction from pds request") + i.Logger.Debug("upserting reaction from pds request") err = db.UpsertReaction(i.Db, reactionEvent) if err != nil { tx.Rollback() @@ -474,12 +484,12 @@ func (i *Ingester) ingestReaction(e *models.Event) error { err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction) if err != nil { - log.Println("failed to create notification record:", err) + i.Logger.Error("failed to create notification record", "err", err) } return tx.Commit() case models.CommitOperationDelete: - log.Println("deleting reaction from pds request") + i.Logger.Debug("deleting reaction from pds request") err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey) } if err != nil { @@ -546,7 +556,7 @@ func (i *Ingester) ingestResource(e *models.Event) error { return fmt.Errorf("invalid resource: %w", err) } - log.Println("upserting resource from pds request") + i.Logger.Debug("upserting resource from pds request") err = db.UpsertResource(i.Db, resource, resource.Rkey) if err != nil { tx.Rollback() @@ -554,7 +564,7 @@ func (i *Ingester) ingestResource(e *models.Event) error { } return tx.Commit() case models.CommitOperationDelete: - log.Println("deleting resource from pds request") + i.Logger.Debug("deleting resource from pds request") err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey) } if err != nil { @@ -626,7 +636,7 @@ func (i *Ingester) ingestComment(e *models.Event) error { CreatedAt: createdAt, } - log.Println("upserting comment from pds request") + i.Logger.Debug("upserting comment from pds request") err = db.UpsertComment(i.Db, comment) if err != nil { tx.Rollback() @@ -637,7 +647,7 @@ func (i *Ingester) ingestComment(e *models.Event) error { if subjectDid.String() != did { err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment) if err != nil { - log.Println("failed to create notification record:", err) + i.Logger.Error("failed to create notification record", "err", err) } } @@ -645,13 +655,13 @@ func (i *Ingester) ingestComment(e *models.Event) error { if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did { err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply) if err != nil { - log.Println("failed to create notification record:", err) + i.Logger.Error("failed to create notification record", "err", err) } } return tx.Commit() case models.CommitOperationDelete: - log.Println("deleting comment from pds request") + i.Logger.Debug("deleting comment from pds request") err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey) } if err != nil { diff --git a/internal/server/app.go b/internal/server/app.go index c7cf92f..20bc6bd 100644 --- a/internal/server/app.go +++ b/internal/server/app.go @@ -85,7 +85,7 @@ func Make(ctx context.Context, config *config.Config) (*Server, error) { yoten.GraphFollowNSID, }, nil, - slog.Default(), + log.SubLogger(logger, "jetstream"), wrapper, false, ) @@ -95,7 +95,9 @@ func Make(ctx context.Context, config *config.Config) (*Server, error) { ingester := consumer.Ingester{ Db: wrapper, - Config: config} + Config: config, + Logger: log.SubLogger(logger, "ingester"), + } err = jc.StartJetstream(ctx, ingester.Ingest()) if err != nil { return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) -- 2.43.0