appview,knotserver,spindle: rework jetstream #480

merged
opened by oppi.li targeting master from push-mtsxyxnkznyy

do not return errors from ingesters, this causes the read loop to be killed.

Signed-off-by: oppiliappan me@oppi.li

Changed files
+64 -54
appview
knotserver
log
spindle
+2 -2
appview/ingester.go
···
}
if err != nil {
-
l.Error("error ingesting record", "err", err)
}
-
return err
}
}
···
}
if err != nil {
+
l.Debug("error ingesting record", "err", err)
}
+
return nil
}
}
+5 -5
knotserver/handler.go
···
return nil, fmt.Errorf("failed to setup enforcer: %w", err)
}
-
err = h.jc.StartJetstream(ctx, h.processMessages)
-
if err != nil {
-
return nil, fmt.Errorf("failed to start jetstream: %w", err)
-
}
-
// Check if the knot knows about any Dids;
// if it does, it is already initialized and we can repopulate the
// Jetstream subscriptions.
···
}
}
r.Get("/", h.Index)
r.Get("/capabilities", h.Capabilities)
r.Get("/version", h.Version)
···
return nil, fmt.Errorf("failed to setup enforcer: %w", err)
}
// Check if the knot knows about any Dids;
// if it does, it is already initialized and we can repopulate the
// Jetstream subscriptions.
···
}
}
+
err = h.jc.StartJetstream(ctx, h.processMessages)
+
if err != nil {
+
return nil, fmt.Errorf("failed to start jetstream: %w", err)
+
}
+
r.Get("/", h.Index)
r.Get("/capabilities", h.Capabilities)
r.Get("/version", h.Version)
+46 -42
knotserver/ingester.go
···
"tangled.sh/tangled.sh/core/workflow"
)
-
func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
l := log.FromContext(ctx)
pk := db.PublicKey{
Did: did,
PublicKey: record,
···
return nil
}
-
func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
l := log.FromContext(ctx)
if record.Domain != h.c.Server.Hostname {
l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
···
return nil
}
-
func (h *Handle) processPull(ctx context.Context, did string, record tangled.RepoPull) error {
l := log.FromContext(ctx)
l = l.With("handler", "processPull")
l = l.With("did", did)
···
return nil
}
-
event := db.Event{
Rkey: TID(),
Nsid: tangled.PipelineNSID,
EventJson: string(eventJson),
}
-
return h.db.InsertEvent(event, h.n)
}
// duplicated from add collaborator
-
func (h *Handle) processCollaborator(ctx context.Context, did string, record tangled.RepoCollaborator) error {
repoAt, err := syntax.ParseATURI(record.Repo)
if err != nil {
return err
···
didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
// check perms for this user
-
if ok, err := h.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil {
return fmt.Errorf("insufficient permissions: %w", err)
}
···
}
func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
-
did := event.Did
if event.Kind != models.EventKindCommit {
return nil
}
···
}
}()
-
raw := json.RawMessage(event.Commit.Record)
-
switch event.Commit.Collection {
case tangled.PublicKeyNSID:
-
var record tangled.PublicKey
-
if err := json.Unmarshal(raw, &record); err != nil {
-
return fmt.Errorf("failed to unmarshal record: %w", err)
-
}
-
if err := h.processPublicKey(ctx, did, record); err != nil {
-
return fmt.Errorf("failed to process public key: %w", err)
-
}
-
case tangled.KnotMemberNSID:
-
var record tangled.KnotMember
-
if err := json.Unmarshal(raw, &record); err != nil {
-
return fmt.Errorf("failed to unmarshal record: %w", err)
-
}
-
if err := h.processKnotMember(ctx, did, record); err != nil {
-
return fmt.Errorf("failed to process knot member: %w", err)
-
}
-
case tangled.RepoPullNSID:
-
var record tangled.RepoPull
-
if err := json.Unmarshal(raw, &record); err != nil {
-
return fmt.Errorf("failed to unmarshal record: %w", err)
-
}
-
if err := h.processPull(ctx, did, record); err != nil {
-
return fmt.Errorf("failed to process knot member: %w", err)
-
}
-
case tangled.RepoCollaboratorNSID:
-
var record tangled.RepoCollaborator
-
if err := json.Unmarshal(raw, &record); err != nil {
-
return fmt.Errorf("failed to unmarshal record: %w", err)
-
}
-
if err := h.processCollaborator(ctx, did, record); err != nil {
-
return fmt.Errorf("failed to process knot member: %w", err)
-
}
}
-
return err
}
···
"tangled.sh/tangled.sh/core/workflow"
)
+
func (h *Handle) processPublicKey(ctx context.Context, event *models.Event) error {
l := log.FromContext(ctx)
+
raw := json.RawMessage(event.Commit.Record)
+
did := event.Did
+
+
var record tangled.PublicKey
+
if err := json.Unmarshal(raw, &record); err != nil {
+
return fmt.Errorf("failed to unmarshal record: %w", err)
+
}
+
pk := db.PublicKey{
Did: did,
PublicKey: record,
···
return nil
}
+
func (h *Handle) processKnotMember(ctx context.Context, event *models.Event) error {
l := log.FromContext(ctx)
+
raw := json.RawMessage(event.Commit.Record)
+
did := event.Did
+
+
var record tangled.KnotMember
+
if err := json.Unmarshal(raw, &record); err != nil {
+
return fmt.Errorf("failed to unmarshal record: %w", err)
+
}
if record.Domain != h.c.Server.Hostname {
l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
···
return nil
}
+
func (h *Handle) processPull(ctx context.Context, event *models.Event) error {
+
raw := json.RawMessage(event.Commit.Record)
+
did := event.Did
+
+
var record tangled.RepoPull
+
if err := json.Unmarshal(raw, &record); err != nil {
+
return fmt.Errorf("failed to unmarshal record: %w", err)
+
}
+
l := log.FromContext(ctx)
l = l.With("handler", "processPull")
l = l.With("did", did)
···
return nil
}
+
ev := db.Event{
Rkey: TID(),
Nsid: tangled.PipelineNSID,
EventJson: string(eventJson),
}
+
return h.db.InsertEvent(ev, h.n)
}
// duplicated from add collaborator
+
func (h *Handle) processCollaborator(ctx context.Context, event *models.Event) error {
+
raw := json.RawMessage(event.Commit.Record)
+
did := event.Did
+
+
var record tangled.RepoCollaborator
+
if err := json.Unmarshal(raw, &record); err != nil {
+
return fmt.Errorf("failed to unmarshal record: %w", err)
+
}
+
repoAt, err := syntax.ParseATURI(record.Repo)
if err != nil {
return err
···
didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
// check perms for this user
+
if ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo); !ok || err != nil {
return fmt.Errorf("insufficient permissions: %w", err)
}
···
}
func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
if event.Kind != models.EventKindCommit {
return nil
}
···
}
}()
switch event.Commit.Collection {
case tangled.PublicKeyNSID:
+
err = h.processPublicKey(ctx, event)
case tangled.KnotMemberNSID:
+
err = h.processKnotMember(ctx, event)
case tangled.RepoPullNSID:
+
err = h.processPull(ctx, event)
case tangled.RepoCollaboratorNSID:
+
err = h.processCollaborator(ctx, event)
+
}
+
if err != nil {
+
h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err)
}
+
return nil
}
+3 -1
log/log.go
···
// NewHandler sets up a new slog.Handler with the service name
// as an attribute
func NewHandler(name string) slog.Handler {
-
handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{})
var attrs []slog.Attr
attrs = append(attrs, slog.Attr{Key: "service", Value: slog.StringValue(name)})
···
// NewHandler sets up a new slog.Handler with the service name
// as an attribute
func NewHandler(name string) slog.Handler {
+
handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
+
Level: slog.LevelDebug,
+
})
var attrs []slog.Attr
attrs = append(attrs, slog.Attr{Key: "service", Value: slog.StringValue(name)})
+8 -4
spindle/ingester.go
···
switch e.Commit.Collection {
case tangled.SpindleMemberNSID:
-
s.ingestMember(ctx, e)
case tangled.RepoNSID:
-
s.ingestRepo(ctx, e)
case tangled.RepoCollaboratorNSID:
-
s.ingestCollaborator(ctx, e)
}
-
return err
}
}
···
switch e.Commit.Collection {
case tangled.SpindleMemberNSID:
+
err = s.ingestMember(ctx, e)
case tangled.RepoNSID:
+
err = s.ingestRepo(ctx, e)
case tangled.RepoCollaboratorNSID:
+
err = s.ingestCollaborator(ctx, e)
}
+
if err != nil {
+
s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err)
+
}
+
+
return nil
}
}