forked from tangled.org/core
this repo has no description
at master 7.1 kB view raw
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "slices" 9 "time" 10 11 "tangled.org/core/api/tangled" 12 "tangled.org/core/appview/cache" 13 "tangled.org/core/appview/config" 14 "tangled.org/core/appview/db" 15 "tangled.org/core/appview/models" 16 ec "tangled.org/core/eventconsumer" 17 "tangled.org/core/eventconsumer/cursor" 18 "tangled.org/core/log" 19 "tangled.org/core/rbac" 20 "tangled.org/core/workflow" 21 22 "github.com/bluesky-social/indigo/atproto/syntax" 23 "github.com/go-git/go-git/v5/plumbing" 24 "github.com/posthog/posthog-go" 25) 26 27func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) { 28 knots, err := db.GetRegistrations( 29 d, 30 db.FilterIsNot("registered", "null"), 31 ) 32 if err != nil { 33 return nil, err 34 } 35 36 srcs := make(map[ec.Source]struct{}) 37 for _, k := range knots { 38 s := ec.NewKnotSource(k.Domain) 39 srcs[s] = struct{}{} 40 } 41 42 logger := log.New("knotstream") 43 cache := cache.New(c.Redis.Addr) 44 cursorStore := cursor.NewRedisCursorStore(cache) 45 46 cfg := ec.ConsumerConfig{ 47 Sources: srcs, 48 ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev), 49 RetryInterval: c.Knotstream.RetryInterval, 50 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 51 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 52 WorkerCount: c.Knotstream.WorkerCount, 53 QueueSize: c.Knotstream.QueueSize, 54 Logger: logger, 55 Dev: c.Core.Dev, 56 CursorStore: &cursorStore, 57 } 58 59 return ec.NewConsumer(cfg), nil 60} 61 62func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc { 63 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 64 switch msg.Nsid { 65 case tangled.GitRefUpdateNSID: 66 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) 67 case tangled.PipelineNSID: 68 return ingestPipeline(d, source, msg) 69 } 70 71 return nil 72 } 73} 74 75func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error { 76 var record tangled.GitRefUpdate 77 err := json.Unmarshal(msg.EventJson, &record) 78 if err != nil { 79 return err 80 } 81 82 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 83 if err != nil { 84 return err 85 } 86 if !slices.Contains(knownKnots, source.Key()) { 87 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 88 } 89 90 err1 := populatePunchcard(d, record) 91 err2 := updateRepoLanguages(d, record) 92 93 var err3 error 94 if !dev { 95 err3 = pc.Enqueue(posthog.Capture{ 96 DistinctId: record.CommitterDid, 97 Event: "git_ref_update", 98 }) 99 } 100 101 return errors.Join(err1, err2, err3) 102} 103 104func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 105 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 106 if err != nil { 107 return err 108 } 109 110 count := 0 111 for _, ke := range knownEmails { 112 if record.Meta == nil { 113 continue 114 } 115 if record.Meta.CommitCount == nil { 116 continue 117 } 118 for _, ce := range record.Meta.CommitCount.ByEmail { 119 if ce == nil { 120 continue 121 } 122 if ce.Email == ke.Address { 123 count += int(ce.Count) 124 } 125 } 126 } 127 128 punch := models.Punch{ 129 Did: record.CommitterDid, 130 Date: time.Now(), 131 Count: count, 132 } 133 return db.AddPunch(d, punch) 134} 135 136func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 137 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 138 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName) 139 } 140 141 repos, err := db.GetRepos( 142 d, 143 0, 144 db.FilterEq("did", record.RepoDid), 145 db.FilterEq("name", record.RepoName), 146 ) 147 if err != nil { 148 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err) 149 } 150 if len(repos) != 1 { 151 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 152 } 153 repo := repos[0] 154 155 ref := plumbing.ReferenceName(record.Ref) 156 if !ref.IsBranch() { 157 return fmt.Errorf("%s is not a valid reference name", ref) 158 } 159 160 var langs []models.RepoLanguage 161 for _, l := range record.Meta.LangBreakdown.Inputs { 162 if l == nil { 163 continue 164 } 165 166 langs = append(langs, models.RepoLanguage{ 167 RepoAt: repo.RepoAt(), 168 Ref: ref.Short(), 169 IsDefaultRef: record.Meta.IsDefaultRef, 170 Language: l.Lang, 171 Bytes: l.Size, 172 }) 173 } 174 175 tx, err := d.Begin() 176 if err != nil { 177 return err 178 } 179 defer tx.Rollback() 180 181 // update appview's cache 182 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs) 183 if err != nil { 184 fmt.Printf("failed; %s\n", err) 185 // non-fatal 186 } 187 188 return tx.Commit() 189} 190 191func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 192 var record tangled.Pipeline 193 err := json.Unmarshal(msg.EventJson, &record) 194 if err != nil { 195 return err 196 } 197 198 if record.TriggerMetadata == nil { 199 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 200 } 201 202 if record.TriggerMetadata.Repo == nil { 203 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 204 } 205 206 // does this repo have a spindle configured? 207 repos, err := db.GetRepos( 208 d, 209 0, 210 db.FilterEq("did", record.TriggerMetadata.Repo.Did), 211 db.FilterEq("name", record.TriggerMetadata.Repo.Repo), 212 ) 213 if err != nil { 214 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err) 215 } 216 if len(repos) != 1 { 217 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 218 } 219 if repos[0].Spindle == "" { 220 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 221 } 222 223 // trigger info 224 var trigger models.Trigger 225 var sha string 226 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 227 switch trigger.Kind { 228 case workflow.TriggerKindPush: 229 trigger.PushRef = &record.TriggerMetadata.Push.Ref 230 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 231 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 232 sha = *trigger.PushNewSha 233 case workflow.TriggerKindPullRequest: 234 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 235 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 236 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 237 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 238 sha = *trigger.PRSourceSha 239 } 240 241 tx, err := d.Begin() 242 if err != nil { 243 return fmt.Errorf("failed to start txn: %w", err) 244 } 245 246 triggerId, err := db.AddTrigger(tx, trigger) 247 if err != nil { 248 return fmt.Errorf("failed to add trigger entry: %w", err) 249 } 250 251 pipeline := models.Pipeline{ 252 Rkey: msg.Rkey, 253 Knot: source.Key(), 254 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 255 RepoName: record.TriggerMetadata.Repo.Repo, 256 TriggerId: int(triggerId), 257 Sha: sha, 258 } 259 260 err = db.AddPipeline(tx, pipeline) 261 if err != nil { 262 return fmt.Errorf("failed to add pipeline: %w", err) 263 } 264 265 err = tx.Commit() 266 if err != nil { 267 return fmt.Errorf("failed to commit txn: %w", err) 268 } 269 270 return nil 271}