forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 7.2 kB view raw
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "slices" 9 "time" 10 11 "tangled.org/core/api/tangled" 12 "tangled.org/core/appview/cache" 13 "tangled.org/core/appview/config" 14 "tangled.org/core/appview/db" 15 "tangled.org/core/appview/models" 16 ec "tangled.org/core/eventconsumer" 17 "tangled.org/core/eventconsumer/cursor" 18 "tangled.org/core/log" 19 "tangled.org/core/orm" 20 "tangled.org/core/rbac" 21 "tangled.org/core/workflow" 22 23 "github.com/bluesky-social/indigo/atproto/syntax" 24 "github.com/go-git/go-git/v5/plumbing" 25 "github.com/posthog/posthog-go" 26) 27 28func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) { 29 logger := log.FromContext(ctx) 30 logger = log.SubLogger(logger, "knotstream") 31 32 knots, err := db.GetRegistrations( 33 d, 34 orm.FilterIsNot("registered", "null"), 35 ) 36 if err != nil { 37 return nil, err 38 } 39 40 srcs := make(map[ec.Source]struct{}) 41 for _, k := range knots { 42 s := ec.NewKnotSource(k.Domain) 43 srcs[s] = struct{}{} 44 } 45 46 cache := cache.New(c.Redis.Addr) 47 cursorStore := cursor.NewRedisCursorStore(cache) 48 49 cfg := ec.ConsumerConfig{ 50 Sources: srcs, 51 ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev), 52 RetryInterval: c.Knotstream.RetryInterval, 53 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 54 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 55 WorkerCount: c.Knotstream.WorkerCount, 56 QueueSize: c.Knotstream.QueueSize, 57 Logger: logger, 58 Dev: c.Core.Dev, 59 CursorStore: &cursorStore, 60 } 61 62 return ec.NewConsumer(cfg), nil 63} 64 65func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc { 66 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 67 switch msg.Nsid { 68 case tangled.GitRefUpdateNSID: 69 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) 70 case tangled.PipelineNSID: 71 return ingestPipeline(d, source, msg) 72 } 73 74 return nil 75 } 76} 77 78func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error { 79 var record tangled.GitRefUpdate 80 err := json.Unmarshal(msg.EventJson, &record) 81 if err != nil { 82 return err 83 } 84 85 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 86 if err != nil { 87 return err 88 } 89 if !slices.Contains(knownKnots, source.Key()) { 90 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 91 } 92 93 err1 := populatePunchcard(d, record) 94 err2 := updateRepoLanguages(d, record) 95 96 var err3 error 97 if !dev { 98 err3 = pc.Enqueue(posthog.Capture{ 99 DistinctId: record.CommitterDid, 100 Event: "git_ref_update", 101 }) 102 } 103 104 return errors.Join(err1, err2, err3) 105} 106 107func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 108 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 109 if err != nil { 110 return err 111 } 112 113 count := 0 114 for _, ke := range knownEmails { 115 if record.Meta == nil { 116 continue 117 } 118 if record.Meta.CommitCount == nil { 119 continue 120 } 121 for _, ce := range record.Meta.CommitCount.ByEmail { 122 if ce == nil { 123 continue 124 } 125 if ce.Email == ke.Address { 126 count += int(ce.Count) 127 } 128 } 129 } 130 131 punch := models.Punch{ 132 Did: record.CommitterDid, 133 Date: time.Now(), 134 Count: count, 135 } 136 return db.AddPunch(d, punch) 137} 138 139func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 140 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 141 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName) 142 } 143 144 repos, err := db.GetRepos( 145 d, 146 0, 147 orm.FilterEq("did", record.RepoDid), 148 orm.FilterEq("name", record.RepoName), 149 ) 150 if err != nil { 151 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err) 152 } 153 if len(repos) != 1 { 154 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 155 } 156 repo := repos[0] 157 158 ref := plumbing.ReferenceName(record.Ref) 159 if !ref.IsBranch() { 160 return fmt.Errorf("%s is not a valid reference name", ref) 161 } 162 163 var langs []models.RepoLanguage 164 for _, l := range record.Meta.LangBreakdown.Inputs { 165 if l == nil { 166 continue 167 } 168 169 langs = append(langs, models.RepoLanguage{ 170 RepoAt: repo.RepoAt(), 171 Ref: ref.Short(), 172 IsDefaultRef: record.Meta.IsDefaultRef, 173 Language: l.Lang, 174 Bytes: l.Size, 175 }) 176 } 177 178 tx, err := d.Begin() 179 if err != nil { 180 return err 181 } 182 defer tx.Rollback() 183 184 // update appview's cache 185 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs) 186 if err != nil { 187 fmt.Printf("failed; %s\n", err) 188 // non-fatal 189 } 190 191 return tx.Commit() 192} 193 194func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 195 var record tangled.Pipeline 196 err := json.Unmarshal(msg.EventJson, &record) 197 if err != nil { 198 return err 199 } 200 201 if record.TriggerMetadata == nil { 202 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 203 } 204 205 if record.TriggerMetadata.Repo == nil { 206 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 207 } 208 209 // does this repo have a spindle configured? 210 repos, err := db.GetRepos( 211 d, 212 0, 213 orm.FilterEq("did", record.TriggerMetadata.Repo.Did), 214 orm.FilterEq("name", record.TriggerMetadata.Repo.Repo), 215 ) 216 if err != nil { 217 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) 218 } 219 if len(repos) != 1 { 220 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 221 } 222 if repos[0].Spindle == "" { 223 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 224 } 225 226 // trigger info 227 var trigger models.Trigger 228 var sha string 229 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 230 switch trigger.Kind { 231 case workflow.TriggerKindPush: 232 trigger.PushRef = &record.TriggerMetadata.Push.Ref 233 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 234 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 235 sha = *trigger.PushNewSha 236 case workflow.TriggerKindPullRequest: 237 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 238 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 239 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 240 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 241 sha = *trigger.PRSourceSha 242 } 243 244 tx, err := d.Begin() 245 if err != nil { 246 return fmt.Errorf("failed to start txn: %w", err) 247 } 248 249 triggerId, err := db.AddTrigger(tx, trigger) 250 if err != nil { 251 return fmt.Errorf("failed to add trigger entry: %w", err) 252 } 253 254 pipeline := models.Pipeline{ 255 Rkey: msg.Rkey, 256 Knot: source.Key(), 257 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 258 RepoName: record.TriggerMetadata.Repo.Repo, 259 TriggerId: int(triggerId), 260 Sha: sha, 261 } 262 263 err = db.AddPipeline(tx, pipeline) 264 if err != nil { 265 return fmt.Errorf("failed to add pipeline: %w", err) 266 } 267 268 err = tx.Commit() 269 if err != nil { 270 return fmt.Errorf("failed to commit txn: %w", err) 271 } 272 273 return nil 274}