feat(consumer/ingester): use slogger #18

merged
opened by brookjeynes.dev targeting master from push-trrpxxyxxmot
Changed files
+34 -22
internal
consumer
server
+30 -20
internal/consumer/ingester.go
···
"context"
"encoding/json"
"fmt"
-
"log"
+
"log/slog"
"strings"
"time"
···
type Ingester struct {
Db db.DbWrapper
Config *config.Config
+
Logger *slog.Logger
}
type processFunc func(ctx context.Context, e *models.Event) error
···
}
}()
+
l := i.Logger.With("kind", e.Kind)
switch e.Kind {
case models.EventKindCommit:
switch e.Commit.Collection {
case yoten.ActorProfileNSID:
+
l = l.With("handler", "ingestProfile")
err = i.ingestProfile(e)
case yoten.FeedSessionNSID:
+
l = l.With("handler", "ingestStudySession")
err = i.ingestStudySession(e)
case yoten.ActivityDefNSID:
+
l = l.With("handler", "ingestActivityDef")
err = i.ingestActivityDef(e)
case yoten.FeedResourceNSID:
+
l = l.With("handler", "ingestResource")
err = i.ingestResource(e)
case yoten.GraphFollowNSID:
+
l = l.With("handler", "ingestFollow")
err = i.ingestFollow(e)
case yoten.FeedReactionNSID:
+
l = l.With("handler", "ingestReaction")
err = i.ingestReaction(e)
case yoten.FeedCommentNSID:
+
l = l.With("handler", "ingestComment")
err = i.ingestComment(e)
}
+
l = i.Logger.With("nsid", e.Commit.Collection)
}
if err != nil {
-
log.Printf("failed to ingest event for collection %s: %v", e.Commit.Collection, err)
+
l.Error("failed to ingest event", "err", err)
}
return nil
···
return fmt.Errorf("failed to start transaction: %w", err)
}
-
log.Printf("upserting profile '%s' from pds request", profile.Did)
+
i.Logger.Debug("upserting profile from pds request")
err = db.UpsertProfile(tx, &profile)
if err != nil {
tx.Rollback()
···
date, err := time.Parse(time.RFC3339, record.Date)
if err != nil {
-
log.Printf("invalid record: %s", err)
+
i.Logger.Error("invalid record", "err", err)
return err
}
···
return fmt.Errorf("failed to start transaction: %w", err)
}
-
log.Println("upserting study session from pds request")
+
i.Logger.Debug("upserting study session from pds request")
err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey)
if err != nil {
tx.Rollback()
···
return fmt.Errorf("failed to start transaction: %w", err)
}
-
log.Println("deleting study session from pds request")
+
i.Logger.Debug("deleting study session from pds request")
err = db.DeleteStudySessionByRkey(tx, did, e.Commit.RKey)
if err != nil {
tx.Rollback()
···
return fmt.Errorf("failed to start transaction: %w", err)
}
-
log.Println("upserting activity def from pds request")
+
i.Logger.Debug("upserting activity def from pds request")
err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey)
if err != nil {
tx.Rollback()
···
}
return tx.Commit()
case models.CommitOperationDelete:
-
log.Println("deleting activity def from pds request")
+
i.Logger.Debug("deleting activity def from pds request")
err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey)
}
if err != nil {
···
subjectDid := record.Subject
-
log.Println("upserting follow from pds request")
+
i.Logger.Debug("upserting follow from pds request")
err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey)
if err != nil {
tx.Rollback()
···
subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey)
err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow)
if err != nil {
-
log.Println("failed to create notification record:", err)
+
i.Logger.Error("failed to create notification record", "err", err)
}
return tx.Commit()
case models.CommitOperationDelete:
-
log.Println("deleting follow from pds request")
+
i.Logger.Debug("deleting follow from pds request")
err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
}
if err != nil {
···
CreatedAt: createdAt,
}
-
log.Println("upserting reaction from pds request")
+
i.Logger.Debug("upserting reaction from pds request")
err = db.UpsertReaction(i.Db, reactionEvent)
if err != nil {
tx.Rollback()
···
err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction)
if err != nil {
-
log.Println("failed to create notification record:", err)
+
i.Logger.Error("failed to create notification record", "err", err)
}
return tx.Commit()
case models.CommitOperationDelete:
-
log.Println("deleting reaction from pds request")
+
i.Logger.Debug("deleting reaction from pds request")
err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey)
}
if err != nil {
···
return fmt.Errorf("invalid resource: %w", err)
}
-
log.Println("upserting resource from pds request")
+
i.Logger.Debug("upserting resource from pds request")
err = db.UpsertResource(i.Db, resource, resource.Rkey)
if err != nil {
tx.Rollback()
···
}
return tx.Commit()
case models.CommitOperationDelete:
-
log.Println("deleting resource from pds request")
+
i.Logger.Debug("deleting resource from pds request")
err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey)
}
if err != nil {
···
CreatedAt: createdAt,
}
-
log.Println("upserting comment from pds request")
+
i.Logger.Debug("upserting comment from pds request")
err = db.UpsertComment(i.Db, comment)
if err != nil {
tx.Rollback()
···
if subjectDid.String() != did {
err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment)
if err != nil {
-
log.Println("failed to create notification record:", err)
+
i.Logger.Error("failed to create notification record", "err", err)
}
}
···
if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did {
err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply)
if err != nil {
-
log.Println("failed to create notification record:", err)
+
i.Logger.Error("failed to create notification record", "err", err)
}
}
return tx.Commit()
case models.CommitOperationDelete:
-
log.Println("deleting comment from pds request")
+
i.Logger.Debug("deleting comment from pds request")
err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey)
}
if err != nil {
+4 -2
internal/server/app.go
···
yoten.GraphFollowNSID,
},
nil,
-
slog.Default(),
+
log.SubLogger(logger, "jetstream"),
wrapper,
false,
)
···
ingester := consumer.Ingester{
Db: wrapper,
-
Config: config}
+
Config: config,
+
Logger: log.SubLogger(logger, "ingester"),
+
}
err = jc.StartJetstream(ctx, ingester.Ingest())
if err != nil {
return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)