forked from tangled.org/core
this repo has no description
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/jetstream/pkg/models" 12 "github.com/go-git/go-git/v5/plumbing" 13 "github.com/ipfs/go-cid" 14 "tangled.sh/tangled.sh/core/api/tangled" 15 "tangled.sh/tangled.sh/core/appview/db" 16) 17 18type Ingester func(ctx context.Context, e *models.Event) error 19 20func Ingest(d db.DbWrapper) Ingester { 21 return func(ctx context.Context, e *models.Event) error { 22 var err error 23 defer func() { 24 eventTime := e.TimeUS 25 lastTimeUs := eventTime + 1 26 if err := d.SaveLastTimeUs(lastTimeUs); err != nil { 27 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 28 } 29 }() 30 31 if e.Kind != models.EventKindCommit { 32 return nil 33 } 34 35 switch e.Commit.Collection { 36 case tangled.GraphFollowNSID: 37 ingestFollow(&d, e) 38 case tangled.FeedStarNSID: 39 ingestStar(&d, e) 40 case tangled.PublicKeyNSID: 41 ingestPublicKey(&d, e) 42 case tangled.RepoArtifactNSID: 43 ingestArtifact(&d, e) 44 } 45 46 return err 47 } 48} 49 50func ingestStar(d *db.DbWrapper, e *models.Event) error { 51 var err error 52 did := e.Did 53 54 switch e.Commit.Operation { 55 case models.CommitOperationCreate, models.CommitOperationUpdate: 56 var subjectUri syntax.ATURI 57 58 raw := json.RawMessage(e.Commit.Record) 59 record := tangled.FeedStar{} 60 err := json.Unmarshal(raw, &record) 61 if err != nil { 62 log.Println("invalid record") 63 return err 64 } 65 66 subjectUri, err = syntax.ParseATURI(record.Subject) 67 if err != nil { 68 log.Println("invalid record") 69 return err 70 } 71 err = db.AddStar(d, did, subjectUri, e.Commit.RKey) 72 case models.CommitOperationDelete: 73 err = db.DeleteStarByRkey(d, did, e.Commit.RKey) 74 } 75 76 if err != nil { 77 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 78 } 79 80 return nil 81} 82 83func ingestFollow(d *db.DbWrapper, e *models.Event) error { 84 var err error 85 did := e.Did 86 87 switch e.Commit.Operation { 88 case models.CommitOperationCreate, models.CommitOperationUpdate: 89 raw := json.RawMessage(e.Commit.Record) 90 record := tangled.GraphFollow{} 91 err = json.Unmarshal(raw, &record) 92 if err != nil { 93 log.Println("invalid record") 94 return err 95 } 96 97 subjectDid := record.Subject 98 err = db.AddFollow(d, did, subjectDid, e.Commit.RKey) 99 case models.CommitOperationDelete: 100 err = db.DeleteFollowByRkey(d, did, e.Commit.RKey) 101 } 102 103 if err != nil { 104 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 105 } 106 107 return nil 108} 109 110func ingestPublicKey(d *db.DbWrapper, e *models.Event) error { 111 did := e.Did 112 var err error 113 114 switch e.Commit.Operation { 115 case models.CommitOperationCreate, models.CommitOperationUpdate: 116 log.Println("processing add of pubkey") 117 raw := json.RawMessage(e.Commit.Record) 118 record := tangled.PublicKey{} 119 err = json.Unmarshal(raw, &record) 120 if err != nil { 121 log.Printf("invalid record: %s", err) 122 return err 123 } 124 125 name := record.Name 126 key := record.Key 127 err = db.AddPublicKey(d, did, name, key, e.Commit.RKey) 128 case models.CommitOperationDelete: 129 log.Println("processing delete of pubkey") 130 err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey) 131 } 132 133 if err != nil { 134 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 135 } 136 137 return nil 138} 139 140func ingestArtifact(d *db.DbWrapper, e *models.Event) error { 141 did := e.Did 142 var err error 143 144 switch e.Commit.Operation { 145 case models.CommitOperationCreate, models.CommitOperationUpdate: 146 log.Println("processing add of artifact") 147 raw := json.RawMessage(e.Commit.Record) 148 record := tangled.RepoArtifact{} 149 err = json.Unmarshal(raw, &record) 150 if err != nil { 151 log.Printf("invalid record: %s", err) 152 return err 153 } 154 155 repoAt, err := syntax.ParseATURI(record.Repo) 156 if err != nil { 157 return err 158 } 159 160 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 161 if err != nil { 162 createdAt = time.Now() 163 } 164 165 artifact := db.Artifact{ 166 Did: did, 167 Rkey: e.Commit.RKey, 168 RepoAt: repoAt, 169 Tag: plumbing.Hash(record.Tag), 170 CreatedAt: createdAt, 171 BlobCid: cid.Cid(record.Artifact.Ref), 172 Name: record.Name, 173 Size: uint64(record.Artifact.Size), 174 MimeType: record.Artifact.MimeType, 175 } 176 177 err = db.AddArtifact(d, artifact) 178 case models.CommitOperationDelete: 179 log.Println("processing delete of artifact") 180 err = db.DeleteArtifact(d, db.Filter("did", did), db.Filter("rkey", e.Commit.RKey)) 181 } 182 183 if err != nil { 184 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 185 } 186 187 return nil 188}