forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "slices" 9 "time" 10 11 "tangled.org/core/api/tangled" 12 "tangled.org/core/appview/cache" 13 "tangled.org/core/appview/config" 14 "tangled.org/core/appview/db" 15 "tangled.org/core/appview/models" 16 ec "tangled.org/core/eventconsumer" 17 "tangled.org/core/eventconsumer/cursor" 18 "tangled.org/core/log" 19 "tangled.org/core/rbac" 20 "tangled.org/core/workflow" 21 22 "github.com/bluesky-social/indigo/atproto/syntax" 23 "github.com/go-git/go-git/v5/plumbing" 24 "github.com/posthog/posthog-go" 25) 26 27func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) { 28 logger := log.FromContext(ctx) 29 logger = log.SubLogger(logger, "knotstream") 30 31 knots, err := db.GetRegistrations( 32 d, 33 db.FilterIsNot("registered", "null"), 34 ) 35 if err != nil { 36 return nil, err 37 } 38 39 srcs := make(map[ec.Source]struct{}) 40 for _, k := range knots { 41 s := ec.NewKnotSource(k.Domain) 42 srcs[s] = struct{}{} 43 } 44 45 cache := cache.New(c.Redis.Addr) 46 cursorStore := cursor.NewRedisCursorStore(cache) 47 48 cfg := ec.ConsumerConfig{ 49 Sources: srcs, 50 ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev), 51 RetryInterval: c.Knotstream.RetryInterval, 52 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 53 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 54 WorkerCount: c.Knotstream.WorkerCount, 55 QueueSize: c.Knotstream.QueueSize, 56 Logger: logger, 57 Dev: c.Core.Dev, 58 CursorStore: &cursorStore, 59 } 60 61 return ec.NewConsumer(cfg), nil 62} 63 64func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc { 65 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 66 switch msg.Nsid { 67 case tangled.GitRefUpdateNSID: 68 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) 69 case tangled.PipelineNSID: 70 return ingestPipeline(d, source, msg) 71 } 72 73 return nil 74 } 75} 76 77func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error { 78 var record tangled.GitRefUpdate 79 err := json.Unmarshal(msg.EventJson, &record) 80 if err != nil { 81 return err 82 } 83 84 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 85 if err != nil { 86 return err 87 } 88 if !slices.Contains(knownKnots, source.Key()) { 89 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 90 } 91 92 err1 := populatePunchcard(d, record) 93 err2 := updateRepoLanguages(d, record) 94 95 var err3 error 96 if !dev { 97 err3 = pc.Enqueue(posthog.Capture{ 98 DistinctId: record.CommitterDid, 99 Event: "git_ref_update", 100 }) 101 } 102 103 return errors.Join(err1, err2, err3) 104} 105 106func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 107 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 108 if err != nil { 109 return err 110 } 111 112 count := 0 113 for _, ke := range knownEmails { 114 if record.Meta == nil { 115 continue 116 } 117 if record.Meta.CommitCount == nil { 118 continue 119 } 120 for _, ce := range record.Meta.CommitCount.ByEmail { 121 if ce == nil { 122 continue 123 } 124 if ce.Email == ke.Address { 125 count += int(ce.Count) 126 } 127 } 128 } 129 130 punch := models.Punch{ 131 Did: record.CommitterDid, 132 Date: time.Now(), 133 Count: count, 134 } 135 return db.AddPunch(d, punch) 136} 137 138func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 139 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 140 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName) 141 } 142 143 repos, err := db.GetRepos( 144 d, 145 0, 146 db.FilterEq("did", record.RepoDid), 147 db.FilterEq("name", record.RepoName), 148 ) 149 if err != nil { 150 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err) 151 } 152 if len(repos) != 1 { 153 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 154 } 155 repo := repos[0] 156 157 ref := plumbing.ReferenceName(record.Ref) 158 if !ref.IsBranch() { 159 return fmt.Errorf("%s is not a valid reference name", ref) 160 } 161 162 var langs []models.RepoLanguage 163 for _, l := range record.Meta.LangBreakdown.Inputs { 164 if l == nil { 165 continue 166 } 167 168 langs = append(langs, models.RepoLanguage{ 169 RepoAt: repo.RepoAt(), 170 Ref: ref.Short(), 171 IsDefaultRef: record.Meta.IsDefaultRef, 172 Language: l.Lang, 173 Bytes: l.Size, 174 }) 175 } 176 177 tx, err := d.Begin() 178 if err != nil { 179 return err 180 } 181 defer tx.Rollback() 182 183 // update appview's cache 184 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs) 185 if err != nil { 186 fmt.Printf("failed; %s\n", err) 187 // non-fatal 188 } 189 190 return tx.Commit() 191} 192 193func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 194 var record tangled.Pipeline 195 err := json.Unmarshal(msg.EventJson, &record) 196 if err != nil { 197 return err 198 } 199 200 if record.TriggerMetadata == nil { 201 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 202 } 203 204 if record.TriggerMetadata.Repo == nil { 205 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 206 } 207 208 // does this repo have a spindle configured? 209 repos, err := db.GetRepos( 210 d, 211 0, 212 db.FilterEq("did", record.TriggerMetadata.Repo.Did), 213 db.FilterEq("name", record.TriggerMetadata.Repo.Repo), 214 ) 215 if err != nil { 216 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) 217 } 218 if len(repos) != 1 { 219 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 220 } 221 if repos[0].Spindle == "" { 222 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 223 } 224 225 // trigger info 226 var trigger models.Trigger 227 var sha string 228 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 229 switch trigger.Kind { 230 case workflow.TriggerKindPush: 231 trigger.PushRef = &record.TriggerMetadata.Push.Ref 232 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 233 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 234 sha = *trigger.PushNewSha 235 case workflow.TriggerKindPullRequest: 236 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 237 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 238 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 239 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 240 sha = *trigger.PRSourceSha 241 } 242 243 tx, err := d.Begin() 244 if err != nil { 245 return fmt.Errorf("failed to start txn: %w", err) 246 } 247 248 triggerId, err := db.AddTrigger(tx, trigger) 249 if err != nil { 250 return fmt.Errorf("failed to add trigger entry: %w", err) 251 } 252 253 pipeline := models.Pipeline{ 254 Rkey: msg.Rkey, 255 Knot: source.Key(), 256 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 257 RepoName: record.TriggerMetadata.Repo.Repo, 258 TriggerId: int(triggerId), 259 Sha: sha, 260 } 261 262 err = db.AddPipeline(tx, pipeline) 263 if err != nil { 264 return fmt.Errorf("failed to add pipeline: %w", err) 265 } 266 267 err = tx.Commit() 268 if err != nil { 269 return fmt.Errorf("failed to commit txn: %w", err) 270 } 271 272 return nil 273}