forked from tangled.org/core
this repo has no description
at master 6.9 kB view raw
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "slices" 9 "time" 10 11 "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/appview/cache" 13 "tangled.sh/tangled.sh/core/appview/config" 14 "tangled.sh/tangled.sh/core/appview/db" 15 ec "tangled.sh/tangled.sh/core/eventconsumer" 16 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 17 "tangled.sh/tangled.sh/core/log" 18 "tangled.sh/tangled.sh/core/rbac" 19 "tangled.sh/tangled.sh/core/workflow" 20 21 "github.com/bluesky-social/indigo/atproto/syntax" 22 "github.com/go-git/go-git/v5/plumbing" 23 "github.com/posthog/posthog-go" 24) 25 26func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) { 27 knots, err := db.GetRegistrations( 28 d, 29 db.FilterIsNot("registered", "null"), 30 ) 31 if err != nil { 32 return nil, err 33 } 34 35 srcs := make(map[ec.Source]struct{}) 36 for _, k := range knots { 37 s := ec.NewKnotSource(k.Domain) 38 srcs[s] = struct{}{} 39 } 40 41 logger := log.New("knotstream") 42 cache := cache.New(c.Redis.Addr) 43 cursorStore := cursor.NewRedisCursorStore(cache) 44 45 cfg := ec.ConsumerConfig{ 46 Sources: srcs, 47 ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev), 48 RetryInterval: c.Knotstream.RetryInterval, 49 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 50 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 51 WorkerCount: c.Knotstream.WorkerCount, 52 QueueSize: c.Knotstream.QueueSize, 53 Logger: logger, 54 Dev: c.Core.Dev, 55 CursorStore: &cursorStore, 56 } 57 58 return ec.NewConsumer(cfg), nil 59} 60 61func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc { 62 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 63 switch msg.Nsid { 64 case tangled.GitRefUpdateNSID: 65 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) 66 case tangled.PipelineNSID: 67 return ingestPipeline(d, source, msg) 68 } 69 70 return nil 71 } 72} 73 74func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error { 75 var record tangled.GitRefUpdate 76 err := json.Unmarshal(msg.EventJson, &record) 77 if err != nil { 78 return err 79 } 80 81 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 82 if err != nil { 83 return err 84 } 85 if !slices.Contains(knownKnots, source.Key()) { 86 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 87 } 88 89 err1 := populatePunchcard(d, record) 90 err2 := updateRepoLanguages(d, record) 91 92 var err3 error 93 if !dev { 94 err3 = pc.Enqueue(posthog.Capture{ 95 DistinctId: record.CommitterDid, 96 Event: "git_ref_update", 97 }) 98 } 99 100 return errors.Join(err1, err2, err3) 101} 102 103func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 104 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 105 if err != nil { 106 return err 107 } 108 109 count := 0 110 for _, ke := range knownEmails { 111 if record.Meta == nil { 112 continue 113 } 114 if record.Meta.CommitCount == nil { 115 continue 116 } 117 for _, ce := range record.Meta.CommitCount.ByEmail { 118 if ce == nil { 119 continue 120 } 121 if ce.Email == ke.Address { 122 count += int(ce.Count) 123 } 124 } 125 } 126 127 punch := db.Punch{ 128 Did: record.CommitterDid, 129 Date: time.Now(), 130 Count: count, 131 } 132 return db.AddPunch(d, punch) 133} 134 135func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 136 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 137 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName) 138 } 139 140 repos, err := db.GetRepos( 141 d, 142 0, 143 db.FilterEq("did", record.RepoDid), 144 db.FilterEq("name", record.RepoName), 145 ) 146 if err != nil { 147 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err) 148 } 149 if len(repos) != 1 { 150 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 151 } 152 repo := repos[0] 153 154 ref := plumbing.ReferenceName(record.Ref) 155 if !ref.IsBranch() { 156 return fmt.Errorf("%s is not a valid reference name", ref) 157 } 158 159 var langs []db.RepoLanguage 160 for _, l := range record.Meta.LangBreakdown.Inputs { 161 if l == nil { 162 continue 163 } 164 165 langs = append(langs, db.RepoLanguage{ 166 RepoAt: repo.RepoAt(), 167 Ref: ref.Short(), 168 IsDefaultRef: record.Meta.IsDefaultRef, 169 Language: l.Lang, 170 Bytes: l.Size, 171 }) 172 } 173 174 return db.InsertRepoLanguages(d, langs) 175} 176 177func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 178 var record tangled.Pipeline 179 err := json.Unmarshal(msg.EventJson, &record) 180 if err != nil { 181 return err 182 } 183 184 if record.TriggerMetadata == nil { 185 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 186 } 187 188 if record.TriggerMetadata.Repo == nil { 189 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 190 } 191 192 // does this repo have a spindle configured? 193 repos, err := db.GetRepos( 194 d, 195 0, 196 db.FilterEq("did", record.TriggerMetadata.Repo.Did), 197 db.FilterEq("name", record.TriggerMetadata.Repo.Repo), 198 ) 199 if err != nil { 200 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) 201 } 202 if len(repos) != 1 { 203 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 204 } 205 if repos[0].Spindle == "" { 206 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 207 } 208 209 // trigger info 210 var trigger db.Trigger 211 var sha string 212 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 213 switch trigger.Kind { 214 case workflow.TriggerKindPush: 215 trigger.PushRef = &record.TriggerMetadata.Push.Ref 216 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 217 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 218 sha = *trigger.PushNewSha 219 case workflow.TriggerKindPullRequest: 220 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 221 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 222 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 223 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 224 sha = *trigger.PRSourceSha 225 } 226 227 tx, err := d.Begin() 228 if err != nil { 229 return fmt.Errorf("failed to start txn: %w", err) 230 } 231 232 triggerId, err := db.AddTrigger(tx, trigger) 233 if err != nil { 234 return fmt.Errorf("failed to add trigger entry: %w", err) 235 } 236 237 pipeline := db.Pipeline{ 238 Rkey: msg.Rkey, 239 Knot: source.Key(), 240 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 241 RepoName: record.TriggerMetadata.Repo.Repo, 242 TriggerId: int(triggerId), 243 Sha: sha, 244 } 245 246 err = db.AddPipeline(tx, pipeline) 247 if err != nil { 248 return fmt.Errorf("failed to add pipeline: %w", err) 249 } 250 251 err = tx.Commit() 252 if err != nil { 253 return fmt.Errorf("failed to commit txn: %w", err) 254 } 255 256 return nil 257}