back interdiff of round #2 and #1

appview,knotserver,spindle: rework jetstream #480

merged
opened by oppi.li targeting master from push-mtsxyxnkznyy

do not return errors from ingesters, this causes the read loop to be killed.

Signed-off-by: oppiliappan me@oppi.li

files
appview
jetstream
knotserver
log
spindle
ERROR
appview/ingester.go

Failed to calculate interdiff for this file.

ERROR
knotserver/handler.go

Failed to calculate interdiff for this file.

ERROR
knotserver/ingester.go

Failed to calculate interdiff for this file.

ERROR
log/log.go

Failed to calculate interdiff for this file.

ERROR
spindle/ingester.go

Failed to calculate interdiff for this file.

NEW
jetstream/jetstream.go
···
type processor func(context.Context, *models.Event) error
func (j *JetstreamClient) withDidFilter(processFunc processor) processor {
-
// empty filter => all dids allowed
-
if len(j.wantedDids) == 0 {
-
return processFunc
-
}
// since this closure references j.WantedDids; it should auto-update
// existing instances of the closure when j.WantedDids is mutated
return func(ctx context.Context, evt *models.Event) error {
if _, ok := j.wantedDids[evt.Did]; ok {
return processFunc(ctx, evt)
} else {
···
type processor func(context.Context, *models.Event) error
func (j *JetstreamClient) withDidFilter(processFunc processor) processor {
// since this closure references j.WantedDids; it should auto-update
// existing instances of the closure when j.WantedDids is mutated
return func(ctx context.Context, evt *models.Event) error {
+
// empty filter => all dids allowed
+
if len(j.wantedDids) == 0 {
+
return processFunc(ctx, evt)
+
}
+
if _, ok := j.wantedDids[evt.Did]; ok {
return processFunc(ctx, evt)
} else {