1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "github.com/bluesky-social/jetstream/pkg/models"
11 tangled "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/appview/db"
13)
14
15type Ingester func(ctx context.Context, e *models.Event) error
16
17func Ingest(d db.DbWrapper) Ingester {
18 return func(ctx context.Context, e *models.Event) error {
19 var err error
20 defer func() {
21 eventTime := e.TimeUS
22 lastTimeUs := eventTime + 1
23 if err := d.SaveLastTimeUs(lastTimeUs); err != nil {
24 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
25 }
26 }()
27
28 if e.Kind != models.EventKindCommit {
29 return nil
30 }
31
32 switch e.Commit.Collection {
33 case tangled.GraphFollowNSID:
34 ingestFollow(&d, e)
35 case tangled.FeedStarNSID:
36 ingestStar(&d, e)
37 case tangled.PublicKeyNSID:
38 ingestPublicKey(&d, e)
39 }
40
41 return err
42 }
43}
44
45func ingestStar(d *db.DbWrapper, e *models.Event) error {
46 var err error
47 did := e.Did
48
49 switch e.Commit.Operation {
50 case models.CommitOperationCreate, models.CommitOperationUpdate:
51 var subjectUri syntax.ATURI
52
53 raw := json.RawMessage(e.Commit.Record)
54 record := tangled.FeedStar{}
55 err := json.Unmarshal(raw, &record)
56 if err != nil {
57 log.Println("invalid record")
58 return err
59 }
60
61 subjectUri, err = syntax.ParseATURI(record.Subject)
62 if err != nil {
63 log.Println("invalid record")
64 return err
65 }
66 err = db.AddStar(d, did, subjectUri, e.Commit.RKey)
67 case models.CommitOperationDelete:
68 err = db.DeleteStarByRkey(d, did, e.Commit.RKey)
69 }
70
71 if err != nil {
72 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
73 }
74
75 return nil
76}
77
78func ingestFollow(d *db.DbWrapper, e *models.Event) error {
79 var err error
80 did := e.Did
81
82 switch e.Commit.Operation {
83 case models.CommitOperationCreate, models.CommitOperationUpdate:
84 raw := json.RawMessage(e.Commit.Record)
85 record := tangled.GraphFollow{}
86 err = json.Unmarshal(raw, &record)
87 if err != nil {
88 log.Println("invalid record")
89 return err
90 }
91
92 subjectDid := record.Subject
93 err = db.AddFollow(d, did, subjectDid, e.Commit.RKey)
94 case models.CommitOperationDelete:
95 err = db.DeleteFollowByRkey(d, did, e.Commit.RKey)
96 }
97
98 if err != nil {
99 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
100 }
101
102 return nil
103}
104
105func ingestPublicKey(d *db.DbWrapper, e *models.Event) error {
106 did := e.Did
107 var err error
108
109 switch e.Commit.Operation {
110 case models.CommitOperationCreate, models.CommitOperationUpdate:
111 log.Println("processing add of pubkey")
112 raw := json.RawMessage(e.Commit.Record)
113 record := tangled.PublicKey{}
114 err = json.Unmarshal(raw, &record)
115 if err != nil {
116 log.Printf("invalid record: %s", err)
117 return err
118 }
119
120 name := record.Name
121 key := record.Key
122 err = db.AddPublicKey(d, did, name, key, e.Commit.RKey)
123 case models.CommitOperationDelete:
124 log.Println("processing delete of pubkey")
125 err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey)
126 }
127
128 if err != nil {
129 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
130 }
131
132 return nil
133}