forked from tangled.org/core
this repo has no description
at master 1.6 kB view raw
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "github.com/bluesky-social/jetstream/pkg/models" 11 tangled "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/appview/db" 13) 14 15type Ingester func(ctx context.Context, e *models.Event) error 16 17func jetstreamIngester(d db.DbWrapper) Ingester { 18 return func(ctx context.Context, e *models.Event) error { 19 var err error 20 defer func() { 21 eventTime := e.TimeUS 22 lastTimeUs := eventTime + 1 23 if err := d.SaveLastTimeUs(lastTimeUs); err != nil { 24 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 25 } 26 }() 27 28 if e.Kind != models.EventKindCommit { 29 return nil 30 } 31 32 did := e.Did 33 raw := json.RawMessage(e.Commit.Record) 34 35 switch e.Commit.Collection { 36 case tangled.GraphFollowNSID: 37 record := tangled.GraphFollow{} 38 err := json.Unmarshal(raw, &record) 39 if err != nil { 40 log.Println("invalid record") 41 return err 42 } 43 err = db.AddFollow(d, did, record.Subject, e.Commit.RKey) 44 if err != nil { 45 return fmt.Errorf("failed to add follow to db: %w", err) 46 } 47 case tangled.FeedStarNSID: 48 record := tangled.FeedStar{} 49 err := json.Unmarshal(raw, &record) 50 if err != nil { 51 log.Println("invalid record") 52 return err 53 } 54 55 subjectUri, err := syntax.ParseATURI(record.Subject) 56 57 if err != nil { 58 log.Println("invalid record") 59 return err 60 } 61 62 err = db.AddStar(d, did, subjectUri, e.Commit.RKey) 63 if err != nil { 64 return fmt.Errorf("failed to add follow to db: %w", err) 65 } 66 } 67 68 return err 69 } 70}