From 88004a7b2ddc1529d19e674fe686a5a6ddabe7fe Mon Sep 17 00:00:00 2001 From: oppiliappan Date: Tue, 12 Aug 2025 21:02:56 +0100 Subject: [PATCH] appview,knotserver,spindle: rework jetstream Change-Id: mtsxyxnkznyylrnuvsuvxmvsxlnsskxv do not return errors from ingesters, this causes the read loop to be killed. Signed-off-by: oppiliappan --- appview/ingester.go | 4 +- jetstream/jetstream.go | 10 +++-- knotserver/handler.go | 10 ++--- knotserver/ingester.go | 88 ++++++++++++++++++++++-------------------- log/log.go | 4 +- spindle/ingester.go | 12 ++++-- 6 files changed, 70 insertions(+), 58 deletions(-) diff --git a/appview/ingester.go b/appview/ingester.go index b8585fdd..35e65458 100644 --- a/appview/ingester.go +++ b/appview/ingester.go @@ -71,10 +71,10 @@ func (i *Ingester) Ingest() processFunc { } if err != nil { - l.Error("error ingesting record", "err", err) + l.Debug("error ingesting record", "err", err) } - return err + return nil } } diff --git a/jetstream/jetstream.go b/jetstream/jetstream.go index fd4506e2..e2935f69 100644 --- a/jetstream/jetstream.go +++ b/jetstream/jetstream.go @@ -68,13 +68,15 @@ func (j *JetstreamClient) RemoveDid(did string) { type processor func(context.Context, *models.Event) error func (j *JetstreamClient) withDidFilter(processFunc processor) processor { - // empty filter => all dids allowed - if len(j.wantedDids) == 0 { - return processFunc - } // since this closure references j.WantedDids; it should auto-update // existing instances of the closure when j.WantedDids is mutated return func(ctx context.Context, evt *models.Event) error { + + // empty filter => all dids allowed + if len(j.wantedDids) == 0 { + return processFunc(ctx, evt) + } + if _, ok := j.wantedDids[evt.Did]; ok { return processFunc(ctx, evt) } else { diff --git a/knotserver/handler.go b/knotserver/handler.go index 766346ef..2c52cc71 100644 --- a/knotserver/handler.go +++ b/knotserver/handler.go @@ -52,11 +52,6 @@ func Setup(ctx context.Context, c *config.Config, db *db.DB, e *rbac.Enforcer, j return nil, fmt.Errorf("failed to setup enforcer: %w", err) } - err = h.jc.StartJetstream(ctx, h.processMessages) - if err != nil { - return nil, fmt.Errorf("failed to start jetstream: %w", err) - } - // Check if the knot knows about any Dids; // if it does, it is already initialized and we can repopulate the // Jetstream subscriptions. @@ -73,6 +68,11 @@ func Setup(ctx context.Context, c *config.Config, db *db.DB, e *rbac.Enforcer, j } } + err = h.jc.StartJetstream(ctx, h.processMessages) + if err != nil { + return nil, fmt.Errorf("failed to start jetstream: %w", err) + } + r.Get("/", h.Index) r.Get("/capabilities", h.Capabilities) r.Get("/version", h.Version) diff --git a/knotserver/ingester.go b/knotserver/ingester.go index 1f1cc218..4aebb664 100644 --- a/knotserver/ingester.go +++ b/knotserver/ingester.go @@ -25,8 +25,16 @@ import ( "tangled.sh/tangled.sh/core/workflow" ) -func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error { +func (h *Handle) processPublicKey(ctx context.Context, event *models.Event) error { l := log.FromContext(ctx) + raw := json.RawMessage(event.Commit.Record) + did := event.Did + + var record tangled.PublicKey + if err := json.Unmarshal(raw, &record); err != nil { + return fmt.Errorf("failed to unmarshal record: %w", err) + } + pk := db.PublicKey{ Did: did, PublicKey: record, @@ -39,8 +47,15 @@ func (h *Handle) processPublicKey(ctx context.Context, did string, record tangle return nil } -func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error { +func (h *Handle) processKnotMember(ctx context.Context, event *models.Event) error { l := log.FromContext(ctx) + raw := json.RawMessage(event.Commit.Record) + did := event.Did + + var record tangled.KnotMember + if err := json.Unmarshal(raw, &record); err != nil { + return fmt.Errorf("failed to unmarshal record: %w", err) + } if record.Domain != h.c.Server.Hostname { l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) @@ -72,7 +87,15 @@ func (h *Handle) processKnotMember(ctx context.Context, did string, record tangl return nil } -func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error { +func (h *Handle) processPull(ctx context.Context, event *models.Event) error { + raw := json.RawMessage(event.Commit.Record) + did := event.Did + + var record tangled.RepoPull + if err := json.Unmarshal(raw, &record); err != nil { + return fmt.Errorf("failed to unmarshal record: %w", err) + } + l := log.FromContext(ctx) l = l.With("handler", "processPull") l = l.With("did", did) @@ -204,17 +227,25 @@ func (h *Handle) processPull(ctx context.Context, did string, record tangled.Rep return nil } - event := db.Event{ + ev := db.Event{ Rkey: TID(), Nsid: tangled.PipelineNSID, EventJson: string(eventJson), } - return h.db.InsertEvent(event, h.n) + return h.db.InsertEvent(ev, h.n) } // duplicated from add collaborator -func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error { +func (h *Handle) processCollaborator(ctx context.Context, event *models.Event) error { + raw := json.RawMessage(event.Commit.Record) + did := event.Did + + var record tangled.RepoCollaborator + if err := json.Unmarshal(raw, &record); err != nil { + return fmt.Errorf("failed to unmarshal record: %w", err) + } + repoAt, err := syntax.ParseATURI(record.Repo) if err != nil { return err @@ -247,7 +278,7 @@ func (h *Handle) processCollaborator(ctx context.Context, did string, record tan didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) // check perms for this user - if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { + if ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo); !ok || err != nil { return fmt.Errorf("insufficient permissions: %w", err) } @@ -307,7 +338,6 @@ func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { } func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { - did := event.Did if event.Kind != models.EventKindCommit { return nil } @@ -321,46 +351,20 @@ func (h *Handle) processMessages(ctx context.Context, event *models.Event) error } }() - raw := json.RawMessage(event.Commit.Record) - switch event.Commit.Collection { case tangled.PublicKeyNSID: - var record tangled.PublicKey - if err := json.Unmarshal(raw, &record); err != nil { - return fmt.Errorf("failed to unmarshal record: %w", err) - } - if err := h.processPublicKey(ctx, did, record); err != nil { - return fmt.Errorf("failed to process public key: %w", err) - } - + err = h.processPublicKey(ctx, event) case tangled.KnotMemberNSID: - var record tangled.KnotMember - if err := json.Unmarshal(raw, &record); err != nil { - return fmt.Errorf("failed to unmarshal record: %w", err) - } - if err := h.processKnotMember(ctx, did, record); err != nil { - return fmt.Errorf("failed to process knot member: %w", err) - } - + err = h.processKnotMember(ctx, event) case tangled.RepoPullNSID: - var record tangled.RepoPull - if err := json.Unmarshal(raw, &record); err != nil { - return fmt.Errorf("failed to unmarshal record: %w", err) - } - if err := h.processPull(ctx, did, record); err != nil { - return fmt.Errorf("failed to process knot member: %w", err) - } - + err = h.processPull(ctx, event) case tangled.RepoCollaboratorNSID: - var record tangled.RepoCollaborator - if err := json.Unmarshal(raw, &record); err != nil { - return fmt.Errorf("failed to unmarshal record: %w", err) - } - if err := h.processCollaborator(ctx, did, record); err != nil { - return fmt.Errorf("failed to process knot member: %w", err) - } + err = h.processCollaborator(ctx, event) + } + if err != nil { + h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err) } - return err + return nil } diff --git a/log/log.go b/log/log.go index df039cb1..ab0590c2 100644 --- a/log/log.go +++ b/log/log.go @@ -9,7 +9,9 @@ import ( // NewHandler sets up a new slog.Handler with the service name // as an attribute func NewHandler(name string) slog.Handler { - handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{}) + handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }) var attrs []slog.Attr attrs = append(attrs, slog.Attr{Key: "service", Value: slog.StringValue(name)}) diff --git a/spindle/ingester.go b/spindle/ingester.go index cf45fd6f..161a64d6 100644 --- a/spindle/ingester.go +++ b/spindle/ingester.go @@ -40,14 +40,18 @@ func (s *Spindle) ingest() Ingester { switch e.Commit.Collection { case tangled.SpindleMemberNSID: - s.ingestMember(ctx, e) + err = s.ingestMember(ctx, e) case tangled.RepoNSID: - s.ingestRepo(ctx, e) + err = s.ingestRepo(ctx, e) case tangled.RepoCollaboratorNSID: - s.ingestCollaborator(ctx, e) + err = s.ingestCollaborator(ctx, e) } - return err + if err != nil { + s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err) + } + + return nil } } -- 2.43.0