···
-
l := i.Logger.With("kind", e.Kind)
case models.EventKindCommit:
switch e.Commit.Collection {
case yoten.ActorProfileNSID:
l = l.With("handler", "ingestProfile")
-
err = i.ingestProfile(e)
case yoten.FeedSessionNSID:
l = l.With("handler", "ingestStudySession")
-
err = i.ingestStudySession(e)
case yoten.ActivityDefNSID:
l = l.With("handler", "ingestActivityDef")
-
err = i.ingestActivityDef(e)
case yoten.FeedResourceNSID:
l = l.With("handler", "ingestResource")
-
err = i.ingestResource(e)
case yoten.GraphFollowNSID:
l = l.With("handler", "ingestFollow")
-
err = i.ingestFollow(e)
case yoten.FeedReactionNSID:
l = l.With("handler", "ingestReaction")
-
err = i.ingestReaction(e)
case yoten.FeedCommentNSID:
l = l.With("handler", "ingestComment")
-
err = i.ingestComment(e)
-
l = i.Logger.With("nsid", e.Commit.Collection)
l.Error("failed to ingest event", "err", err)
···
-
func (i *Ingester) ingestProfile(e *models.Event) error {
if e.Commit.RKey != "self" {
···
return fmt.Errorf("failed to start transaction: %w", err)
-
i.Logger.Debug("upserting profile from pds request")
err = db.UpsertProfile(tx, &profile)
···
-
func (i *Ingester) ingestStudySession(e *models.Event) error {
switch e.Commit.Operation {
···
date, err := time.Parse(time.RFC3339, record.Date)
-
i.Logger.Error("invalid record", "err", err)
···
return fmt.Errorf("failed to start transaction: %w", err)
-
i.Logger.Debug("upserting study session from pds request")
err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey)
return fmt.Errorf("failed to upsert study session record: %w", err)
-
err = db.UpdateXPForSession(tx, &studySession)
return fmt.Errorf("failed to add xp for session: %w", err)
···
return fmt.Errorf("failed to start transaction: %w", err)
-
i.Logger.Debug("deleting study session from pds request")
err = db.RemoveXPForSession(tx, did, e.Commit.RKey, logger)
···
-
func (i *Ingester) ingestActivityDef(e *models.Event) error {
···
return fmt.Errorf("failed to start transaction: %w", err)
-
i.Logger.Debug("upserting activity def from pds request")
err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey)
···
case models.CommitOperationDelete:
-
i.Logger.Debug("deleting activity def from pds request")
err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey)
···
-
func (i *Ingester) ingestFollow(e *models.Event) error {
···
subjectDid := record.Subject
-
i.Logger.Debug("upserting follow from pds request")
err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey)
···
subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey)
err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow)
-
i.Logger.Error("failed to create notification record", "err", err)
case models.CommitOperationDelete:
-
i.Logger.Debug("deleting follow from pds request")
err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
···
-
func (i *Ingester) ingestReaction(e *models.Event) error {
···
-
i.Logger.Debug("upserting reaction from pds request")
err = db.UpsertReaction(i.Db, reactionEvent)
···
err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction)
-
i.Logger.Error("failed to create notification record", "err", err)
case models.CommitOperationDelete:
-
i.Logger.Debug("deleting reaction from pds request")
err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey)
···
-
func (i *Ingester) ingestResource(e *models.Event) error {
···
return fmt.Errorf("invalid resource: %w", err)
-
i.Logger.Debug("upserting resource from pds request")
err = db.UpsertResource(i.Db, resource, resource.Rkey)
···
case models.CommitOperationDelete:
-
i.Logger.Debug("deleting resource from pds request")
err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey)
···
-
func (i *Ingester) ingestComment(e *models.Event) error {
···
-
i.Logger.Debug("upserting comment from pds request")
err = db.UpsertComment(i.Db, comment)
···
if subjectDid.String() != did {
err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment)
-
i.Logger.Error("failed to create notification record", "err", err)
···
if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did {
err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply)
-
i.Logger.Error("failed to create notification record", "err", err)
case models.CommitOperationDelete:
-
i.Logger.Debug("deleting comment from pds request")
err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey)
···
+
l := i.Logger.With("kind", e.Kind).With("did", e.Did)
case models.EventKindCommit:
+
l = i.Logger.With("nsid", e.Commit.Collection).With("rkey", e.Commit.RKey)
switch e.Commit.Collection {
case yoten.ActorProfileNSID:
l = l.With("handler", "ingestProfile")
+
err = i.ingestProfile(e, l)
case yoten.FeedSessionNSID:
l = l.With("handler", "ingestStudySession")
+
err = i.ingestStudySession(e, l)
case yoten.ActivityDefNSID:
l = l.With("handler", "ingestActivityDef")
+
err = i.ingestActivityDef(e, l)
case yoten.FeedResourceNSID:
l = l.With("handler", "ingestResource")
+
err = i.ingestResource(e, l)
case yoten.GraphFollowNSID:
l = l.With("handler", "ingestFollow")
+
err = i.ingestFollow(e, l)
case yoten.FeedReactionNSID:
l = l.With("handler", "ingestReaction")
+
err = i.ingestReaction(e, l)
case yoten.FeedCommentNSID:
l = l.With("handler", "ingestComment")
+
err = i.ingestComment(e, l)
l.Error("failed to ingest event", "err", err)
···
+
func (i *Ingester) ingestProfile(e *models.Event, logger *slog.Logger) error {
if e.Commit.RKey != "self" {
···
return fmt.Errorf("failed to start transaction: %w", err)
+
logger.Debug("upserting profile from pds request")
err = db.UpsertProfile(tx, &profile)
···
+
func (i *Ingester) ingestStudySession(e *models.Event, logger *slog.Logger) error {
switch e.Commit.Operation {
···
date, err := time.Parse(time.RFC3339, record.Date)
+
logger.Error("invalid record", "err", err)
···
return fmt.Errorf("failed to start transaction: %w", err)
+
logger.Debug("upserting study session from pds request")
err = db.UpsertStudySession(tx, &studySession, e.Commit.RKey)
return fmt.Errorf("failed to upsert study session record: %w", err)
+
err = db.UpdateXPForSession(tx, &studySession, logger)
return fmt.Errorf("failed to add xp for session: %w", err)
···
return fmt.Errorf("failed to start transaction: %w", err)
+
logger.Debug("deleting study session from pds request")
err = db.RemoveXPForSession(tx, did, e.Commit.RKey, logger)
···
+
func (i *Ingester) ingestActivityDef(e *models.Event, logger *slog.Logger) error {
···
return fmt.Errorf("failed to start transaction: %w", err)
+
logger.Debug("upserting activity def from pds request")
err = db.UpsertActivityDef(tx, &activityDef, e.Commit.RKey)
···
case models.CommitOperationDelete:
+
logger.Debug("deleting activity def from pds request")
err = db.DeleteActivityDefByRkey(i.Db, did, e.Commit.RKey)
···
+
func (i *Ingester) ingestFollow(e *models.Event, logger *slog.Logger) error {
···
subjectDid := record.Subject
+
logger.Debug("upserting follow from pds request")
err = db.AddFollow(tx, did, subjectDid, e.Commit.RKey)
···
subjectUri := fmt.Sprintf("at://%s/%s/%s", did, yoten.GraphFollowNSID, e.Commit.RKey)
err = db.CreateNotification(tx, subjectDid, did, subjectUri, db.NotificationTypeFollow)
+
logger.Error("failed to create notification record", "err", err)
case models.CommitOperationDelete:
+
logger.Debug("deleting follow from pds request")
err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
···
+
func (i *Ingester) ingestReaction(e *models.Event, logger *slog.Logger) error {
···
+
logger.Debug("upserting reaction from pds request")
err = db.UpsertReaction(i.Db, reactionEvent)
···
err = db.CreateNotification(tx, subjectDid.String(), did, subject.String(), db.NotificationTypeReaction)
+
logger.Error("failed to create notification record", "err", err)
case models.CommitOperationDelete:
+
logger.Debug("deleting reaction from pds request")
err = db.DeleteReactionByRkey(i.Db, did, e.Commit.RKey)
···
+
func (i *Ingester) ingestResource(e *models.Event, logger *slog.Logger) error {
···
return fmt.Errorf("invalid resource: %w", err)
+
logger.Debug("upserting resource from pds request")
err = db.UpsertResource(i.Db, resource, resource.Rkey)
···
case models.CommitOperationDelete:
+
logger.Debug("deleting resource from pds request")
err = db.DeleteResourceByRkey(i.Db, did, e.Commit.RKey)
···
+
func (i *Ingester) ingestComment(e *models.Event, logger *slog.Logger) error {
···
+
logger.Debug("upserting comment from pds request")
err = db.UpsertComment(i.Db, comment)
···
if subjectDid.String() != did {
err = db.CreateNotification(tx, subjectDid.String(), did, subjectUri.String(), db.NotificationTypeComment)
+
logger.Error("failed to create notification record", "err", err)
···
if comment.ParentCommentUri != nil && comment.ParentCommentUri.Authority().String() != did {
err = db.CreateNotification(tx, comment.ParentCommentUri.Authority().String(), did, parentCommentUri.String(), db.NotificationTypeReply)
+
logger.Error("failed to create notification record", "err", err)
case models.CommitOperationDelete:
+
logger.Debug("deleting comment from pds request")
err = db.DeleteCommentByRkey(i.Db, did, e.Commit.RKey)