forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "slices" 9 "time" 10 11 "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/appview/cache" 13 "tangled.sh/tangled.sh/core/appview/config" 14 "tangled.sh/tangled.sh/core/appview/db" 15 ec "tangled.sh/tangled.sh/core/eventconsumer" 16 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 17 "tangled.sh/tangled.sh/core/log" 18 "tangled.sh/tangled.sh/core/rbac" 19 "tangled.sh/tangled.sh/core/workflow" 20 21 "github.com/bluesky-social/indigo/atproto/syntax" 22 "github.com/go-git/go-git/v5/plumbing" 23 "github.com/posthog/posthog-go" 24) 25 26func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) { 27 knots, err := db.GetCompletedRegistrations(d) 28 if err != nil { 29 return nil, err 30 } 31 32 srcs := make(map[ec.Source]struct{}) 33 for _, k := range knots { 34 s := ec.NewKnotSource(k) 35 srcs[s] = struct{}{} 36 } 37 38 logger := log.New("knotstream") 39 cache := cache.New(c.Redis.Addr) 40 cursorStore := cursor.NewRedisCursorStore(cache) 41 42 cfg := ec.ConsumerConfig{ 43 Sources: srcs, 44 ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev), 45 RetryInterval: c.Knotstream.RetryInterval, 46 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 47 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 48 WorkerCount: c.Knotstream.WorkerCount, 49 QueueSize: c.Knotstream.QueueSize, 50 Logger: logger, 51 Dev: c.Core.Dev, 52 CursorStore: &cursorStore, 53 } 54 55 return ec.NewConsumer(cfg), nil 56} 57 58func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc { 59 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 60 switch msg.Nsid { 61 case tangled.GitRefUpdateNSID: 62 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) 63 case tangled.PipelineNSID: 64 return ingestPipeline(d, source, msg) 65 } 66 67 return nil 68 } 69} 70 71func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error { 72 var record tangled.GitRefUpdate 73 err := json.Unmarshal(msg.EventJson, &record) 74 if err != nil { 75 return err 76 } 77 78 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 79 if err != nil { 80 return err 81 } 82 if !slices.Contains(knownKnots, source.Key()) { 83 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 84 } 85 86 err1 := populatePunchcard(d, record) 87 err2 := updateRepoLanguages(d, record) 88 89 var err3 error 90 if !dev { 91 err3 = pc.Enqueue(posthog.Capture{ 92 DistinctId: record.CommitterDid, 93 Event: "git_ref_update", 94 }) 95 } 96 97 return errors.Join(err1, err2, err3) 98} 99 100func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 101 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 102 if err != nil { 103 return err 104 } 105 106 count := 0 107 for _, ke := range knownEmails { 108 if record.Meta == nil { 109 continue 110 } 111 if record.Meta.CommitCount == nil { 112 continue 113 } 114 for _, ce := range record.Meta.CommitCount.ByEmail { 115 if ce == nil { 116 continue 117 } 118 if ce.Email == ke.Address { 119 count += int(ce.Count) 120 } 121 } 122 } 123 124 punch := db.Punch{ 125 Did: record.CommitterDid, 126 Date: time.Now(), 127 Count: count, 128 } 129 return db.AddPunch(d, punch) 130} 131 132func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 133 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 134 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName) 135 } 136 137 repos, err := db.GetRepos( 138 d, 139 0, 140 db.FilterEq("did", record.RepoDid), 141 db.FilterEq("name", record.RepoName), 142 ) 143 if err != nil { 144 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err) 145 } 146 if len(repos) != 1 { 147 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 148 } 149 repo := repos[0] 150 151 ref := plumbing.ReferenceName(record.Ref) 152 if !ref.IsBranch() { 153 return fmt.Errorf("%s is not a valid reference name", ref) 154 } 155 156 var langs []db.RepoLanguage 157 for _, l := range record.Meta.LangBreakdown.Inputs { 158 if l == nil { 159 continue 160 } 161 162 langs = append(langs, db.RepoLanguage{ 163 RepoAt: repo.RepoAt(), 164 Ref: ref.Short(), 165 IsDefaultRef: record.Meta.IsDefaultRef, 166 Language: l.Lang, 167 Bytes: l.Size, 168 }) 169 } 170 171 return db.InsertRepoLanguages(d, langs) 172} 173 174func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 175 var record tangled.Pipeline 176 err := json.Unmarshal(msg.EventJson, &record) 177 if err != nil { 178 return err 179 } 180 181 if record.TriggerMetadata == nil { 182 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 183 } 184 185 if record.TriggerMetadata.Repo == nil { 186 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 187 } 188 189 // does this repo have a spindle configured? 190 repos, err := db.GetRepos( 191 d, 192 0, 193 db.FilterEq("did", record.TriggerMetadata.Repo.Did), 194 db.FilterEq("name", record.TriggerMetadata.Repo.Repo), 195 ) 196 if err != nil { 197 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) 198 } 199 if len(repos) != 1 { 200 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 201 } 202 if repos[0].Spindle == "" { 203 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 204 } 205 206 // trigger info 207 var trigger db.Trigger 208 var sha string 209 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 210 switch trigger.Kind { 211 case workflow.TriggerKindPush: 212 trigger.PushRef = &record.TriggerMetadata.Push.Ref 213 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 214 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 215 sha = *trigger.PushNewSha 216 case workflow.TriggerKindPullRequest: 217 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 218 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 219 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 220 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 221 sha = *trigger.PRSourceSha 222 } 223 224 tx, err := d.Begin() 225 if err != nil { 226 return fmt.Errorf("failed to start txn: %w", err) 227 } 228 229 triggerId, err := db.AddTrigger(tx, trigger) 230 if err != nil { 231 return fmt.Errorf("failed to add trigger entry: %w", err) 232 } 233 234 pipeline := db.Pipeline{ 235 Rkey: msg.Rkey, 236 Knot: source.Key(), 237 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 238 RepoName: record.TriggerMetadata.Repo.Repo, 239 TriggerId: int(triggerId), 240 Sha: sha, 241 } 242 243 err = db.AddPipeline(tx, pipeline) 244 if err != nil { 245 return fmt.Errorf("failed to add pipeline: %w", err) 246 } 247 248 err = tx.Commit() 249 if err != nil { 250 return fmt.Errorf("failed to commit txn: %w", err) 251 } 252 253 return nil 254}