1package state
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "slices"
8 "time"
9
10 "tangled.sh/tangled.sh/core/api/tangled"
11 "tangled.sh/tangled.sh/core/appview/cache"
12 "tangled.sh/tangled.sh/core/appview/config"
13 "tangled.sh/tangled.sh/core/appview/db"
14 kc "tangled.sh/tangled.sh/core/knotclient"
15 "tangled.sh/tangled.sh/core/knotclient/cursor"
16 "tangled.sh/tangled.sh/core/log"
17 "tangled.sh/tangled.sh/core/rbac"
18
19 "github.com/posthog/posthog-go"
20)
21
22func KnotstreamConsumer(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*kc.EventConsumer, error) {
23 knots, err := db.GetCompletedRegistrations(d)
24 if err != nil {
25 return nil, err
26 }
27
28 srcs := make(map[kc.EventSource]struct{})
29 for _, k := range knots {
30 s := kc.EventSource{k}
31 srcs[s] = struct{}{}
32 }
33
34 logger := log.New("knotstream")
35 cache := cache.New(c.Redis.Addr)
36 cursorStore := cursor.NewRedisCursorStore(cache)
37
38 cfg := kc.ConsumerConfig{
39 Sources: srcs,
40 ProcessFunc: knotstreamIngester(ctx, d, enforcer, posthog, c.Core.Dev),
41 RetryInterval: c.Knotstream.RetryInterval,
42 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
43 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
44 WorkerCount: c.Knotstream.WorkerCount,
45 QueueSize: c.Knotstream.QueueSize,
46 Logger: logger,
47 Dev: c.Core.Dev,
48 CursorStore: &cursorStore,
49 }
50
51 return kc.NewEventConsumer(cfg), nil
52}
53
54func knotstreamIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) kc.ProcessFunc {
55 return func(ctx context.Context, source kc.EventSource, msg kc.Message) error {
56 switch msg.Nsid {
57 case tangled.GitRefUpdateNSID:
58 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg)
59 case tangled.PipelineNSID:
60 // TODO
61 }
62
63 return nil
64 }
65}
66
67func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source kc.EventSource, msg kc.Message) error {
68 var record tangled.GitRefUpdate
69 err := json.Unmarshal(msg.EventJson, &record)
70 if err != nil {
71 return err
72 }
73
74 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
75 if err != nil {
76 return err
77 }
78 if !slices.Contains(knownKnots, source.Knot) {
79 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot)
80 }
81
82 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
83 if err != nil {
84 return err
85 }
86 count := 0
87 for _, ke := range knownEmails {
88 if record.Meta == nil {
89 continue
90 }
91 if record.Meta.CommitCount == nil {
92 continue
93 }
94 for _, ce := range record.Meta.CommitCount.ByEmail {
95 if ce == nil {
96 continue
97 }
98 if ce.Email == ke.Address {
99 count += int(ce.Count)
100 }
101 }
102 }
103
104 punch := db.Punch{
105 Did: record.CommitterDid,
106 Date: time.Now(),
107 Count: count,
108 }
109 if err := db.AddPunch(d, punch); err != nil {
110 return err
111 }
112
113 if !dev {
114 err = pc.Enqueue(posthog.Capture{
115 DistinctId: record.CommitterDid,
116 Event: "git_ref_update",
117 })
118 if err != nil {
119 // non-fatal, TODO: log this
120 }
121 }
122
123 return nil
124}