1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/go-git/go-git/v5/plumbing"
13 "github.com/ipfs/go-cid"
14 "tangled.sh/tangled.sh/core/api/tangled"
15 "tangled.sh/tangled.sh/core/appview/db"
16)
17
18type Ingester func(ctx context.Context, e *models.Event) error
19
20func Ingest(d db.DbWrapper) Ingester {
21 return func(ctx context.Context, e *models.Event) error {
22 var err error
23 defer func() {
24 eventTime := e.TimeUS
25 lastTimeUs := eventTime + 1
26 if err := d.SaveLastTimeUs(lastTimeUs); err != nil {
27 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
28 }
29 }()
30
31 if e.Kind != models.EventKindCommit {
32 return nil
33 }
34
35 switch e.Commit.Collection {
36 case tangled.GraphFollowNSID:
37 ingestFollow(&d, e)
38 case tangled.FeedStarNSID:
39 ingestStar(&d, e)
40 case tangled.PublicKeyNSID:
41 ingestPublicKey(&d, e)
42 case tangled.RepoArtifactNSID:
43 ingestArtifact(&d, e)
44 }
45
46 return err
47 }
48}
49
50func ingestStar(d *db.DbWrapper, e *models.Event) error {
51 var err error
52 did := e.Did
53
54 switch e.Commit.Operation {
55 case models.CommitOperationCreate, models.CommitOperationUpdate:
56 var subjectUri syntax.ATURI
57
58 raw := json.RawMessage(e.Commit.Record)
59 record := tangled.FeedStar{}
60 err := json.Unmarshal(raw, &record)
61 if err != nil {
62 log.Println("invalid record")
63 return err
64 }
65
66 subjectUri, err = syntax.ParseATURI(record.Subject)
67 if err != nil {
68 log.Println("invalid record")
69 return err
70 }
71 err = db.AddStar(d, did, subjectUri, e.Commit.RKey)
72 case models.CommitOperationDelete:
73 err = db.DeleteStarByRkey(d, did, e.Commit.RKey)
74 }
75
76 if err != nil {
77 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
78 }
79
80 return nil
81}
82
83func ingestFollow(d *db.DbWrapper, e *models.Event) error {
84 var err error
85 did := e.Did
86
87 switch e.Commit.Operation {
88 case models.CommitOperationCreate, models.CommitOperationUpdate:
89 raw := json.RawMessage(e.Commit.Record)
90 record := tangled.GraphFollow{}
91 err = json.Unmarshal(raw, &record)
92 if err != nil {
93 log.Println("invalid record")
94 return err
95 }
96
97 subjectDid := record.Subject
98 err = db.AddFollow(d, did, subjectDid, e.Commit.RKey)
99 case models.CommitOperationDelete:
100 err = db.DeleteFollowByRkey(d, did, e.Commit.RKey)
101 }
102
103 if err != nil {
104 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
105 }
106
107 return nil
108}
109
110func ingestPublicKey(d *db.DbWrapper, e *models.Event) error {
111 did := e.Did
112 var err error
113
114 switch e.Commit.Operation {
115 case models.CommitOperationCreate, models.CommitOperationUpdate:
116 log.Println("processing add of pubkey")
117 raw := json.RawMessage(e.Commit.Record)
118 record := tangled.PublicKey{}
119 err = json.Unmarshal(raw, &record)
120 if err != nil {
121 log.Printf("invalid record: %s", err)
122 return err
123 }
124
125 name := record.Name
126 key := record.Key
127 err = db.AddPublicKey(d, did, name, key, e.Commit.RKey)
128 case models.CommitOperationDelete:
129 log.Println("processing delete of pubkey")
130 err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey)
131 }
132
133 if err != nil {
134 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
135 }
136
137 return nil
138}
139
140func ingestArtifact(d *db.DbWrapper, e *models.Event) error {
141 did := e.Did
142 var err error
143
144 switch e.Commit.Operation {
145 case models.CommitOperationCreate, models.CommitOperationUpdate:
146 log.Println("processing add of artifact")
147 raw := json.RawMessage(e.Commit.Record)
148 record := tangled.RepoArtifact{}
149 err = json.Unmarshal(raw, &record)
150 if err != nil {
151 log.Printf("invalid record: %s", err)
152 return err
153 }
154
155 repoAt, err := syntax.ParseATURI(record.Repo)
156 if err != nil {
157 return err
158 }
159
160 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
161 if err != nil {
162 createdAt = time.Now()
163 }
164
165 artifact := db.Artifact{
166 Did: did,
167 Rkey: e.Commit.RKey,
168 RepoAt: repoAt,
169 Tag: plumbing.Hash(record.Tag),
170 CreatedAt: createdAt,
171 BlobCid: cid.Cid(record.Artifact.Ref),
172 Name: record.Name,
173 Size: uint64(record.Artifact.Size),
174 MimeType: record.Artifact.MimeType,
175 }
176
177 err = db.AddArtifact(d, artifact)
178 case models.CommitOperationDelete:
179 log.Println("processing delete of artifact")
180 err = db.DeleteArtifact(d, db.Filter("did", did), db.Filter("rkey", e.Commit.RKey))
181 }
182
183 if err != nil {
184 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
185 }
186
187 return nil
188}