1package state
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "slices"
8 "time"
9
10 "tangled.sh/tangled.sh/core/api/tangled"
11 "tangled.sh/tangled.sh/core/appview/cache"
12 "tangled.sh/tangled.sh/core/appview/config"
13 "tangled.sh/tangled.sh/core/appview/db"
14 ec "tangled.sh/tangled.sh/core/eventconsumer"
15 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
16 "tangled.sh/tangled.sh/core/log"
17 "tangled.sh/tangled.sh/core/rbac"
18 "tangled.sh/tangled.sh/core/workflow"
19
20 "github.com/bluesky-social/indigo/atproto/syntax"
21 "github.com/posthog/posthog-go"
22)
23
24func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) {
25 knots, err := db.GetCompletedRegistrations(d)
26 if err != nil {
27 return nil, err
28 }
29
30 srcs := make(map[ec.Source]struct{})
31 for _, k := range knots {
32 s := ec.NewKnotSource(k)
33 srcs[s] = struct{}{}
34 }
35
36 logger := log.New("knotstream")
37 cache := cache.New(c.Redis.Addr)
38 cursorStore := cursor.NewRedisCursorStore(cache)
39
40 cfg := ec.ConsumerConfig{
41 Sources: srcs,
42 ProcessFunc: knotIngester(ctx, d, enforcer, posthog, c.Core.Dev),
43 RetryInterval: c.Knotstream.RetryInterval,
44 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
45 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
46 WorkerCount: c.Knotstream.WorkerCount,
47 QueueSize: c.Knotstream.QueueSize,
48 Logger: logger,
49 Dev: c.Core.Dev,
50 CursorStore: &cursorStore,
51 }
52
53 return ec.NewConsumer(cfg), nil
54}
55
56func knotIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
57 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
58 switch msg.Nsid {
59 case tangled.GitRefUpdateNSID:
60 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg)
61 case tangled.PipelineNSID:
62 return ingestPipeline(d, source, msg)
63 }
64
65 return nil
66 }
67}
68
69func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error {
70 var record tangled.GitRefUpdate
71 err := json.Unmarshal(msg.EventJson, &record)
72 if err != nil {
73 return err
74 }
75
76 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
77 if err != nil {
78 return err
79 }
80 if !slices.Contains(knownKnots, source.Key()) {
81 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
82 }
83
84 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
85 if err != nil {
86 return err
87 }
88 count := 0
89 for _, ke := range knownEmails {
90 if record.Meta == nil {
91 continue
92 }
93 if record.Meta.CommitCount == nil {
94 continue
95 }
96 for _, ce := range record.Meta.CommitCount.ByEmail {
97 if ce == nil {
98 continue
99 }
100 if ce.Email == ke.Address {
101 count += int(ce.Count)
102 }
103 }
104 }
105
106 punch := db.Punch{
107 Did: record.CommitterDid,
108 Date: time.Now(),
109 Count: count,
110 }
111 if err := db.AddPunch(d, punch); err != nil {
112 return err
113 }
114
115 if !dev {
116 err = pc.Enqueue(posthog.Capture{
117 DistinctId: record.CommitterDid,
118 Event: "git_ref_update",
119 })
120 if err != nil {
121 // non-fatal, TODO: log this
122 }
123 }
124
125 return nil
126}
127
128func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
129 var record tangled.Pipeline
130 err := json.Unmarshal(msg.EventJson, &record)
131 if err != nil {
132 return err
133 }
134
135 if record.TriggerMetadata == nil {
136 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
137 }
138
139 if record.TriggerMetadata.Repo == nil {
140 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
141 }
142
143 // trigger info
144 var trigger db.Trigger
145 var sha string
146 switch record.TriggerMetadata.Kind {
147 case workflow.TriggerKindPush:
148 trigger.Kind = workflow.TriggerKindPush
149 trigger.PushRef = &record.TriggerMetadata.Push.Ref
150 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
151 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
152 sha = *trigger.PushNewSha
153 case workflow.TriggerKindPullRequest:
154 trigger.Kind = workflow.TriggerKindPush
155 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
156 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
157 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
158 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
159 sha = *trigger.PRSourceSha
160 }
161
162 tx, err := d.Begin()
163 if err != nil {
164 return err
165 }
166
167 triggerId, err := db.AddTrigger(tx, trigger)
168 if err != nil {
169 return err
170 }
171
172 pipeline := db.Pipeline{
173 Rkey: msg.Rkey,
174 Knot: source.Key(),
175 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
176 RepoName: record.TriggerMetadata.Repo.Repo,
177 TriggerId: int(triggerId),
178 Sha: sha,
179 }
180
181 err = db.AddPipeline(tx, pipeline)
182 if err != nil {
183 return err
184 }
185
186 err = tx.Commit()
187 if err != nil {
188 return err
189 }
190
191 return err
192}