1package state
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8
9 "github.com/bluesky-social/jetstream/pkg/models"
10 tangled "github.com/sotangled/tangled/api/tangled"
11 "github.com/sotangled/tangled/appview/db"
12)
13
14type Ingester func(ctx context.Context, e *models.Event) error
15
16func jetstreamIngester(d db.DbWrapper) Ingester {
17 return func(ctx context.Context, e *models.Event) error {
18 var err error
19 defer func() {
20 eventTime := e.TimeUS
21 lastTimeUs := eventTime + 1
22 if err := d.UpdateLastTimeUs(lastTimeUs); err != nil {
23 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
24 }
25 }()
26
27 if e.Kind != models.EventKindCommit {
28 return nil
29 }
30
31 did := e.Did
32 raw := json.RawMessage(e.Commit.Record)
33
34 switch e.Commit.Collection {
35 case tangled.GraphFollowNSID:
36 record := tangled.GraphFollow{}
37 err := json.Unmarshal(raw, &record)
38 if err != nil {
39 log.Println("invalid record")
40 return err
41 }
42 err = db.AddFollow(d, did, record.Subject, e.Commit.RKey)
43 if err != nil {
44 return fmt.Errorf("failed to add follow to db: %w", err)
45 }
46 }
47
48 return err
49 }
50}