···
···
type processFunc func(ctx context.Context, e *models.Event) error
···
case models.EventKindCommit:
switch e.Commit.Collection {
case yoten.ActorProfileNSID:
case yoten.FeedSessionNSID:
err = i.ingestStudySession(e)
case yoten.ActivityDefNSID:
err = i.ingestActivityDef(e)
case yoten.FeedResourceNSID:
err = i.ingestResource(e)
case yoten.GraphFollowNSID:
case yoten.FeedReactionNSID:
err = i.ingestReaction(e)
case yoten.FeedCommentNSID:
-
log.Printf("failed to ingest event for collection %s: %v", e.Commit.Collection, err)
···
return fmt.Errorf("failed to start transaction: %w", err)
-
log.Printf("upserting profile '%s' from pds request", profile.Did)
err = db.UpsertProfile(tx, &profile)
···
date, err := time.Parse(time.RFC3339, record.Date)
-
log.Printf("invalid record: %s", err)
···
return fmt.Errorf("failed to start transaction: %w", err)
-
log.Println("upserting study session from pds request")
err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey)
···
return fmt.Errorf("failed to start transaction: %w", err)
-
log.Println("deleting study session from pds request")
err = db.DeleteStudySessionByRkey(tx, did, e.Commit.RKey)
···
return fmt.Errorf("failed to start transaction: %w", err)
-
log.Println("upserting activity def from pds request")
err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey)
···
case models.CommitOperationDelete:
-
log.Println("deleting activity def from pds request")
err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey)
···
subjectDid := record.Subject
-
log.Println("upserting follow from pds request")
err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey)
···
subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey)
err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow)
-
log.Println("failed to create notification record:", err)
case models.CommitOperationDelete:
-
log.Println("deleting follow from pds request")
err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
···
-
log.Println("upserting reaction from pds request")
err = db.UpsertReaction(i.Db, reactionEvent)
···
err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction)
-
log.Println("failed to create notification record:", err)
case models.CommitOperationDelete:
-
log.Println("deleting reaction from pds request")
err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey)
···
return fmt.Errorf("invalid resource: %w", err)
-
log.Println("upserting resource from pds request")
err = db.UpsertResource(i.Db, resource, resource.Rkey)
···
case models.CommitOperationDelete:
-
log.Println("deleting resource from pds request")
err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey)
···
-
log.Println("upserting comment from pds request")
err = db.UpsertComment(i.Db, comment)
···
if subjectDid.String() != did {
err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment)
-
log.Println("failed to create notification record:", err)
···
if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did {
err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply)
-
log.Println("failed to create notification record:", err)
case models.CommitOperationDelete:
-
log.Println("deleting comment from pds request")
err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey)
···
···
type processFunc func(ctx context.Context, e *models.Event) error
···
+
l := i.Logger.With("kind", e.Kind)
case models.EventKindCommit:
switch e.Commit.Collection {
case yoten.ActorProfileNSID:
+
l = l.With("handler", "ingestProfile")
case yoten.FeedSessionNSID:
+
l = l.With("handler", "ingestStudySession")
err = i.ingestStudySession(e)
case yoten.ActivityDefNSID:
+
l = l.With("handler", "ingestActivityDef")
err = i.ingestActivityDef(e)
case yoten.FeedResourceNSID:
+
l = l.With("handler", "ingestResource")
err = i.ingestResource(e)
case yoten.GraphFollowNSID:
+
l = l.With("handler", "ingestFollow")
case yoten.FeedReactionNSID:
+
l = l.With("handler", "ingestReaction")
err = i.ingestReaction(e)
case yoten.FeedCommentNSID:
+
l = l.With("handler", "ingestComment")
+
l = i.Logger.With("nsid", e.Commit.Collection)
+
l.Error("failed to ingest event", "err", err)
···
return fmt.Errorf("failed to start transaction: %w", err)
+
i.Logger.Debug("upserting profile from pds request")
err = db.UpsertProfile(tx, &profile)
···
date, err := time.Parse(time.RFC3339, record.Date)
+
i.Logger.Error("invalid record", "err", err)
···
return fmt.Errorf("failed to start transaction: %w", err)
+
i.Logger.Debug("upserting study session from pds request")
err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey)
···
return fmt.Errorf("failed to start transaction: %w", err)
+
i.Logger.Debug("deleting study session from pds request")
err = db.DeleteStudySessionByRkey(tx, did, e.Commit.RKey)
···
return fmt.Errorf("failed to start transaction: %w", err)
+
i.Logger.Debug("upserting activity def from pds request")
err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey)
···
case models.CommitOperationDelete:
+
i.Logger.Debug("deleting activity def from pds request")
err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey)
···
subjectDid := record.Subject
+
i.Logger.Debug("upserting follow from pds request")
err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey)
···
subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey)
err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow)
+
i.Logger.Error("failed to create notification record", "err", err)
case models.CommitOperationDelete:
+
i.Logger.Debug("deleting follow from pds request")
err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
···
+
i.Logger.Debug("upserting reaction from pds request")
err = db.UpsertReaction(i.Db, reactionEvent)
···
err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction)
+
i.Logger.Error("failed to create notification record", "err", err)
case models.CommitOperationDelete:
+
i.Logger.Debug("deleting reaction from pds request")
err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey)
···
return fmt.Errorf("invalid resource: %w", err)
+
i.Logger.Debug("upserting resource from pds request")
err = db.UpsertResource(i.Db, resource, resource.Rkey)
···
case models.CommitOperationDelete:
+
i.Logger.Debug("deleting resource from pds request")
err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey)
···
+
i.Logger.Debug("upserting comment from pds request")
err = db.UpsertComment(i.Db, comment)
···
if subjectDid.String() != did {
err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment)
+
i.Logger.Error("failed to create notification record", "err", err)
···
if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did {
err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply)
+
i.Logger.Error("failed to create notification record", "err", err)
case models.CommitOperationDelete:
+
i.Logger.Debug("deleting comment from pds request")
err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey)