1package state
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "github.com/bluesky-social/jetstream/pkg/models"
11 tangled "github.com/sotangled/tangled/api/tangled"
12 "github.com/sotangled/tangled/appview/db"
13)
14
15type Ingester func(ctx context.Context, e *models.Event) error
16
17func jetstreamIngester(d db.DbWrapper) Ingester {
18 return func(ctx context.Context, e *models.Event) error {
19 var err error
20 defer func() {
21 eventTime := e.TimeUS
22 lastTimeUs := eventTime + 1
23 if err := d.UpdateLastTimeUs(lastTimeUs); err != nil {
24 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
25 }
26 }()
27
28 if e.Kind != models.EventKindCommit {
29 return nil
30 }
31
32 did := e.Did
33 raw := json.RawMessage(e.Commit.Record)
34
35 switch e.Commit.Collection {
36 case tangled.GraphFollowNSID:
37 record := tangled.GraphFollow{}
38 err := json.Unmarshal(raw, &record)
39 if err != nil {
40 log.Println("invalid record")
41 return err
42 }
43 err = db.AddFollow(d, did, record.Subject, e.Commit.RKey)
44 if err != nil {
45 return fmt.Errorf("failed to add follow to db: %w", err)
46 }
47 case tangled.FeedStarNSID:
48 record := tangled.FeedStar{}
49 err := json.Unmarshal(raw, &record)
50 if err != nil {
51 log.Println("invalid record")
52 return err
53 }
54
55 subjectUri, err := syntax.ParseATURI(record.Subject)
56
57 if err != nil {
58 log.Println("invalid record")
59 return err
60 }
61
62 err = db.AddStar(d, did, subjectUri, e.Commit.RKey)
63 if err != nil {
64 return fmt.Errorf("failed to add follow to db: %w", err)
65 }
66 }
67
68 return err
69 }
70}