forked from tangled.org/core
this repo has no description
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 9 "github.com/bluesky-social/jetstream/pkg/models" 10 tangled "github.com/sotangled/tangled/api/tangled" 11 "github.com/sotangled/tangled/appview/db" 12) 13 14type Ingester func(ctx context.Context, e *models.Event) error 15 16func jetstreamIngester(d db.DbWrapper) Ingester { 17 return func(ctx context.Context, e *models.Event) error { 18 var err error 19 defer func() { 20 eventTime := e.TimeUS 21 lastTimeUs := eventTime + 1 22 if err := d.UpdateLastTimeUs(lastTimeUs); err != nil { 23 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 24 } 25 }() 26 27 if e.Kind != models.EventKindCommit { 28 return nil 29 } 30 31 did := e.Did 32 raw := json.RawMessage(e.Commit.Record) 33 34 switch e.Commit.Collection { 35 case tangled.GraphFollowNSID: 36 record := tangled.GraphFollow{} 37 err := json.Unmarshal(raw, &record) 38 if err != nil { 39 log.Println("invalid record") 40 return err 41 } 42 err = db.AddFollow(d, did, record.Subject, e.Commit.RKey) 43 if err != nil { 44 return fmt.Errorf("failed to add follow to db: %w", err) 45 } 46 } 47 48 return err 49 } 50}