1package state
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "slices"
8 "time"
9
10 "tangled.sh/tangled.sh/core/api/tangled"
11 "tangled.sh/tangled.sh/core/appview/cache"
12 "tangled.sh/tangled.sh/core/appview/config"
13 "tangled.sh/tangled.sh/core/appview/db"
14 kc "tangled.sh/tangled.sh/core/knotclient"
15 "tangled.sh/tangled.sh/core/log"
16 "tangled.sh/tangled.sh/core/rbac"
17
18 "github.com/posthog/posthog-go"
19)
20
21func KnotstreamConsumer(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*kc.EventConsumer, error) {
22 knots, err := db.GetCompletedRegistrations(d)
23 if err != nil {
24 return nil, err
25 }
26
27 srcs := make(map[kc.EventSource]struct{})
28 for _, k := range knots {
29 s := kc.EventSource{k}
30 srcs[s] = struct{}{}
31 }
32
33 logger := log.New("knotstream")
34 cache := cache.New(c.Redis.Addr)
35 cursorStore := kc.NewRedisCursorStore(cache)
36
37 cfg := kc.ConsumerConfig{
38 Sources: srcs,
39 ProcessFunc: knotstreamIngester(ctx, d, enforcer, posthog, c.Core.Dev),
40 RetryInterval: c.Knotstream.RetryInterval,
41 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
42 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
43 WorkerCount: c.Knotstream.WorkerCount,
44 QueueSize: c.Knotstream.QueueSize,
45 Logger: logger,
46 Dev: c.Core.Dev,
47 CursorStore: &cursorStore,
48 }
49
50 return kc.NewEventConsumer(cfg), nil
51}
52
53func knotstreamIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) kc.ProcessFunc {
54 return func(ctx context.Context, source kc.EventSource, msg kc.Message) error {
55 switch msg.Nsid {
56 case tangled.GitRefUpdateNSID:
57 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg)
58 case tangled.PipelineNSID:
59 // TODO
60 }
61
62 return nil
63 }
64}
65
66func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source kc.EventSource, msg kc.Message) error {
67 var record tangled.GitRefUpdate
68 err := json.Unmarshal(msg.EventJson, &record)
69 if err != nil {
70 return err
71 }
72
73 knownKnots, err := enforcer.GetDomainsForUser(record.CommitterDid)
74 if err != nil {
75 return err
76 }
77 if !slices.Contains(knownKnots, source.Knot) {
78 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot)
79 }
80
81 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
82 if err != nil {
83 return err
84 }
85 count := 0
86 for _, ke := range knownEmails {
87 if record.Meta == nil {
88 continue
89 }
90 if record.Meta.CommitCount == nil {
91 continue
92 }
93 for _, ce := range record.Meta.CommitCount.ByEmail {
94 if ce == nil {
95 continue
96 }
97 if ce.Email == ke.Address {
98 count += int(ce.Count)
99 }
100 }
101 }
102
103 punch := db.Punch{
104 Did: record.CommitterDid,
105 Date: time.Now(),
106 Count: count,
107 }
108 if err := db.AddPunch(d, punch); err != nil {
109 return err
110 }
111
112 if !dev {
113 err = pc.Enqueue(posthog.Capture{
114 DistinctId: record.CommitterDid,
115 Event: "git_ref_update",
116 })
117 if err != nil {
118 // non-fatal, TODO: log this
119 }
120 }
121
122 return nil
123}