forked from tangled.org/core
this repo has no description
at camo 3.1 kB view raw
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "github.com/bluesky-social/jetstream/pkg/models" 11 tangled "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/appview/db" 13) 14 15type Ingester func(ctx context.Context, e *models.Event) error 16 17func Ingest(d db.DbWrapper) Ingester { 18 return func(ctx context.Context, e *models.Event) error { 19 var err error 20 defer func() { 21 eventTime := e.TimeUS 22 lastTimeUs := eventTime + 1 23 if err := d.SaveLastTimeUs(lastTimeUs); err != nil { 24 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 25 } 26 }() 27 28 if e.Kind != models.EventKindCommit { 29 return nil 30 } 31 32 switch e.Commit.Collection { 33 case tangled.GraphFollowNSID: 34 ingestFollow(&d, e) 35 case tangled.FeedStarNSID: 36 ingestStar(&d, e) 37 case tangled.PublicKeyNSID: 38 ingestPublicKey(&d, e) 39 } 40 41 return err 42 } 43} 44 45func ingestStar(d *db.DbWrapper, e *models.Event) error { 46 var err error 47 did := e.Did 48 49 switch e.Commit.Operation { 50 case models.CommitOperationCreate, models.CommitOperationUpdate: 51 var subjectUri syntax.ATURI 52 53 raw := json.RawMessage(e.Commit.Record) 54 record := tangled.FeedStar{} 55 err := json.Unmarshal(raw, &record) 56 if err != nil { 57 log.Println("invalid record") 58 return err 59 } 60 61 subjectUri, err = syntax.ParseATURI(record.Subject) 62 if err != nil { 63 log.Println("invalid record") 64 return err 65 } 66 err = db.AddStar(d, did, subjectUri, e.Commit.RKey) 67 case models.CommitOperationDelete: 68 err = db.DeleteStarByRkey(d, did, e.Commit.RKey) 69 } 70 71 if err != nil { 72 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 73 } 74 75 return nil 76} 77 78func ingestFollow(d *db.DbWrapper, e *models.Event) error { 79 var err error 80 did := e.Did 81 82 switch e.Commit.Operation { 83 case models.CommitOperationCreate, models.CommitOperationUpdate: 84 raw := json.RawMessage(e.Commit.Record) 85 record := tangled.GraphFollow{} 86 err = json.Unmarshal(raw, &record) 87 if err != nil { 88 log.Println("invalid record") 89 return err 90 } 91 92 subjectDid := record.Subject 93 err = db.AddFollow(d, did, subjectDid, e.Commit.RKey) 94 case models.CommitOperationDelete: 95 err = db.DeleteFollowByRkey(d, did, e.Commit.RKey) 96 } 97 98 if err != nil { 99 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 100 } 101 102 return nil 103} 104 105func ingestPublicKey(d *db.DbWrapper, e *models.Event) error { 106 did := e.Did 107 var err error 108 109 switch e.Commit.Operation { 110 case models.CommitOperationCreate, models.CommitOperationUpdate: 111 log.Println("processing add of pubkey") 112 raw := json.RawMessage(e.Commit.Record) 113 record := tangled.PublicKey{} 114 err = json.Unmarshal(raw, &record) 115 if err != nil { 116 log.Printf("invalid record: %s", err) 117 return err 118 } 119 120 name := record.Name 121 key := record.Key 122 err = db.AddPublicKey(d, did, name, key, e.Commit.RKey) 123 case models.CommitOperationDelete: 124 log.Println("processing delete of pubkey") 125 err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey) 126 } 127 128 if err != nil { 129 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 130 } 131 132 return nil 133}