1package state
2
3import (
4 "encoding/json"
5 "fmt"
6 "slices"
7 "time"
8
9 "tangled.sh/tangled.sh/core/api/tangled"
10 "tangled.sh/tangled.sh/core/appview/cache"
11 "tangled.sh/tangled.sh/core/appview/config"
12 "tangled.sh/tangled.sh/core/appview/db"
13 kc "tangled.sh/tangled.sh/core/knotclient"
14 "tangled.sh/tangled.sh/core/log"
15 "tangled.sh/tangled.sh/core/rbac"
16
17 "github.com/posthog/posthog-go"
18)
19
20func KnotstreamConsumer(c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*kc.EventConsumer, error) {
21 knots, err := db.GetCompletedRegistrations(d)
22 if err != nil {
23 return nil, err
24 }
25
26 srcs := make(map[kc.EventSource]struct{})
27 for _, k := range knots {
28 s := kc.EventSource{k}
29 srcs[s] = struct{}{}
30 }
31
32 logger := log.New("knotstream")
33 cache := cache.New(c.Redis.Addr)
34 cursorStore := kc.NewRedisCursorStore(cache)
35
36 cfg := kc.ConsumerConfig{
37 Sources: srcs,
38 ProcessFunc: knotstreamIngester(d, enforcer, posthog, c.Core.Dev),
39 RetryInterval: c.Knotstream.RetryInterval,
40 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
41 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
42 WorkerCount: c.Knotstream.WorkerCount,
43 QueueSize: c.Knotstream.QueueSize,
44 Logger: logger,
45 Dev: c.Core.Dev,
46 CursorStore: &cursorStore,
47 }
48
49 return kc.NewEventConsumer(cfg), nil
50}
51
52func knotstreamIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) kc.ProcessFunc {
53 return func(source kc.EventSource, msg kc.Message) error {
54 switch msg.Nsid {
55 case tangled.GitRefUpdateNSID:
56 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg)
57 case tangled.PipelineNSID:
58 // TODO
59 }
60
61 return nil
62 }
63}
64
65func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source kc.EventSource, msg kc.Message) error {
66 var record tangled.GitRefUpdate
67 err := json.Unmarshal(msg.EventJson, &record)
68 if err != nil {
69 return err
70 }
71
72 knownKnots, err := enforcer.GetDomainsForUser(record.CommitterDid)
73 if err != nil {
74 return err
75 }
76 if !slices.Contains(knownKnots, source.Knot) {
77 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot)
78 }
79
80 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
81 if err != nil {
82 return err
83 }
84 count := 0
85 for _, ke := range knownEmails {
86 if record.Meta == nil {
87 continue
88 }
89 if record.Meta.CommitCount == nil {
90 continue
91 }
92 for _, ce := range record.Meta.CommitCount.ByEmail {
93 if ce == nil {
94 continue
95 }
96 if ce.Email == ke.Address {
97 count += int(ce.Count)
98 }
99 }
100 }
101
102 punch := db.Punch{
103 Did: record.CommitterDid,
104 Date: time.Now(),
105 Count: count,
106 }
107 if err := db.AddPunch(d, punch); err != nil {
108 return err
109 }
110
111 if !dev {
112 err = pc.Enqueue(posthog.Capture{
113 DistinctId: record.CommitterDid,
114 Event: "git_ref_update",
115 })
116 if err != nil {
117 // non-fatal, TODO: log this
118 }
119 }
120
121 return nil
122}