forked from tangled.org/core
this repo has no description
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "slices" 8 "time" 9 10 "tangled.sh/tangled.sh/core/api/tangled" 11 "tangled.sh/tangled.sh/core/appview/cache" 12 "tangled.sh/tangled.sh/core/appview/config" 13 "tangled.sh/tangled.sh/core/appview/db" 14 ec "tangled.sh/tangled.sh/core/eventconsumer" 15 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 16 "tangled.sh/tangled.sh/core/log" 17 "tangled.sh/tangled.sh/core/rbac" 18 "tangled.sh/tangled.sh/core/workflow" 19 20 "github.com/bluesky-social/indigo/atproto/syntax" 21 "github.com/posthog/posthog-go" 22) 23 24func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) { 25 knots, err := db.GetCompletedRegistrations(d) 26 if err != nil { 27 return nil, err 28 } 29 30 srcs := make(map[ec.Source]struct{}) 31 for _, k := range knots { 32 s := ec.NewKnotSource(k) 33 srcs[s] = struct{}{} 34 } 35 36 logger := log.New("knotstream") 37 cache := cache.New(c.Redis.Addr) 38 cursorStore := cursor.NewRedisCursorStore(cache) 39 40 cfg := ec.ConsumerConfig{ 41 Sources: srcs, 42 ProcessFunc: knotIngester(ctx, d, enforcer, posthog, c.Core.Dev), 43 RetryInterval: c.Knotstream.RetryInterval, 44 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 45 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 46 WorkerCount: c.Knotstream.WorkerCount, 47 QueueSize: c.Knotstream.QueueSize, 48 Logger: logger, 49 Dev: c.Core.Dev, 50 CursorStore: &cursorStore, 51 } 52 53 return ec.NewConsumer(cfg), nil 54} 55 56func knotIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc { 57 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 58 switch msg.Nsid { 59 case tangled.GitRefUpdateNSID: 60 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) 61 case tangled.PipelineNSID: 62 return ingestPipeline(d, source, msg) 63 } 64 65 return nil 66 } 67} 68 69func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error { 70 var record tangled.GitRefUpdate 71 err := json.Unmarshal(msg.EventJson, &record) 72 if err != nil { 73 return err 74 } 75 76 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 77 if err != nil { 78 return err 79 } 80 if !slices.Contains(knownKnots, source.Key()) { 81 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 82 } 83 84 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 85 if err != nil { 86 return err 87 } 88 count := 0 89 for _, ke := range knownEmails { 90 if record.Meta == nil { 91 continue 92 } 93 if record.Meta.CommitCount == nil { 94 continue 95 } 96 for _, ce := range record.Meta.CommitCount.ByEmail { 97 if ce == nil { 98 continue 99 } 100 if ce.Email == ke.Address { 101 count += int(ce.Count) 102 } 103 } 104 } 105 106 punch := db.Punch{ 107 Did: record.CommitterDid, 108 Date: time.Now(), 109 Count: count, 110 } 111 if err := db.AddPunch(d, punch); err != nil { 112 return err 113 } 114 115 if !dev { 116 err = pc.Enqueue(posthog.Capture{ 117 DistinctId: record.CommitterDid, 118 Event: "git_ref_update", 119 }) 120 if err != nil { 121 // non-fatal, TODO: log this 122 } 123 } 124 125 return nil 126} 127 128func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 129 var record tangled.Pipeline 130 err := json.Unmarshal(msg.EventJson, &record) 131 if err != nil { 132 return err 133 } 134 135 if record.TriggerMetadata == nil { 136 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 137 } 138 139 if record.TriggerMetadata.Repo == nil { 140 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 141 } 142 143 // trigger info 144 var trigger db.Trigger 145 var sha string 146 switch record.TriggerMetadata.Kind { 147 case workflow.TriggerKindPush: 148 trigger.Kind = workflow.TriggerKindPush 149 trigger.PushRef = &record.TriggerMetadata.Push.Ref 150 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 151 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 152 sha = *trigger.PushNewSha 153 case workflow.TriggerKindPullRequest: 154 trigger.Kind = workflow.TriggerKindPush 155 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 156 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 157 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 158 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 159 sha = *trigger.PRSourceSha 160 } 161 162 tx, err := d.Begin() 163 if err != nil { 164 return err 165 } 166 167 triggerId, err := db.AddTrigger(tx, trigger) 168 if err != nil { 169 return err 170 } 171 172 pipeline := db.Pipeline{ 173 Rkey: msg.Rkey, 174 Knot: source.Key(), 175 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 176 RepoName: record.TriggerMetadata.Repo.Repo, 177 TriggerId: int(triggerId), 178 Sha: sha, 179 } 180 181 err = db.AddPipeline(tx, pipeline) 182 if err != nil { 183 return err 184 } 185 186 err = tx.Commit() 187 if err != nil { 188 return err 189 } 190 191 return err 192}