···
···
type processFunc func(ctx context.Context, e *models.Event) error
···
39
+
l := i.Logger.With("kind", e.Kind)
case models.EventKindCommit:
switch e.Commit.Collection {
case yoten.ActorProfileNSID:
44
+
l = l.With("handler", "ingestProfile")
case yoten.FeedSessionNSID:
47
+
l = l.With("handler", "ingestStudySession")
err = i.ingestStudySession(e)
case yoten.ActivityDefNSID:
50
+
l = l.With("handler", "ingestActivityDef")
err = i.ingestActivityDef(e)
case yoten.FeedResourceNSID:
53
+
l = l.With("handler", "ingestResource")
err = i.ingestResource(e)
case yoten.GraphFollowNSID:
56
+
l = l.With("handler", "ingestFollow")
case yoten.FeedReactionNSID:
59
+
l = l.With("handler", "ingestReaction")
err = i.ingestReaction(e)
case yoten.FeedCommentNSID:
62
+
l = l.With("handler", "ingestComment")
65
+
l = i.Logger.With("nsid", e.Commit.Collection)
58
-
log.Printf("failed to ingest event for collection %s: %v", e.Commit.Collection, err)
68
+
l.Error("failed to ingest event", "err", err)
···
return fmt.Errorf("failed to start transaction: %w", err)
132
-
log.Printf("upserting profile '%s' from pds request", profile.Did)
142
+
i.Logger.Debug("upserting profile from pds request")
err = db.UpsertProfile(tx, &profile)
···
date, err := time.Parse(time.RFC3339, record.Date)
163
-
log.Printf("invalid record: %s", err)
173
+
i.Logger.Error("invalid record", "err", err)
···
return fmt.Errorf("failed to start transaction: %w", err)
230
-
log.Println("upserting study session from pds request")
240
+
i.Logger.Debug("upserting study session from pds request")
err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey)
···
return fmt.Errorf("failed to start transaction: %w", err)
255
-
log.Println("deleting study session from pds request")
265
+
i.Logger.Debug("deleting study session from pds request")
err = db.DeleteStudySessionByRkey(tx, did, e.Commit.RKey)
···
return fmt.Errorf("failed to start transaction: %w", err)
347
-
log.Println("upserting activity def from pds request")
357
+
i.Logger.Debug("upserting activity def from pds request")
err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey)
···
case models.CommitOperationDelete:
355
-
log.Println("deleting activity def from pds request")
365
+
i.Logger.Debug("deleting activity def from pds request")
err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey)
···
subjectDid := record.Subject
390
-
log.Println("upserting follow from pds request")
400
+
i.Logger.Debug("upserting follow from pds request")
err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey)
···
subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey)
err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow)
400
-
log.Println("failed to create notification record:", err)
410
+
i.Logger.Error("failed to create notification record", "err", err)
case models.CommitOperationDelete:
405
-
log.Println("deleting follow from pds request")
415
+
i.Logger.Debug("deleting follow from pds request")
err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
···
468
-
log.Println("upserting reaction from pds request")
478
+
i.Logger.Debug("upserting reaction from pds request")
err = db.UpsertReaction(i.Db, reactionEvent)
···
err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction)
477
-
log.Println("failed to create notification record:", err)
487
+
i.Logger.Error("failed to create notification record", "err", err)
case models.CommitOperationDelete:
482
-
log.Println("deleting reaction from pds request")
492
+
i.Logger.Debug("deleting reaction from pds request")
err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey)
···
return fmt.Errorf("invalid resource: %w", err)
549
-
log.Println("upserting resource from pds request")
559
+
i.Logger.Debug("upserting resource from pds request")
err = db.UpsertResource(i.Db, resource, resource.Rkey)
···
case models.CommitOperationDelete:
557
-
log.Println("deleting resource from pds request")
567
+
i.Logger.Debug("deleting resource from pds request")
err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey)
···
629
-
log.Println("upserting comment from pds request")
639
+
i.Logger.Debug("upserting comment from pds request")
err = db.UpsertComment(i.Db, comment)
···
if subjectDid.String() != did {
err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment)
640
-
log.Println("failed to create notification record:", err)
650
+
i.Logger.Error("failed to create notification record", "err", err)
···
if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did {
err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply)
648
-
log.Println("failed to create notification record:", err)
658
+
i.Logger.Error("failed to create notification record", "err", err)
case models.CommitOperationDelete:
654
-
log.Println("deleting comment from pds request")
664
+
i.Logger.Debug("deleting comment from pds request")
err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey)