···
9
+
"tangled.sh/tangled.sh/core/api/tangled"
10
+
"tangled.sh/tangled.sh/core/appview/cache"
11
+
"tangled.sh/tangled.sh/core/appview/config"
12
+
"tangled.sh/tangled.sh/core/appview/db"
13
+
kc "tangled.sh/tangled.sh/core/knotclient"
14
+
"tangled.sh/tangled.sh/core/log"
15
+
"tangled.sh/tangled.sh/core/rbac"
18
+
func KnotstreamConsumer(c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*kc.EventConsumer, error) {
19
+
knots, err := db.GetCompletedRegistrations(d)
24
+
srcs := make(map[kc.EventSource]struct{})
25
+
for _, k := range knots {
26
+
s := kc.EventSource{k}
27
+
srcs[s] = struct{}{}
30
+
logger := log.New("knotstream")
31
+
cache := cache.New(c.Redis.Addr)
32
+
cursorStore := kc.NewRedisCursorStore(cache)
34
+
cfg := kc.ConsumerConfig{
36
+
ProcessFunc: knotstreamIngester(d, enforcer),
37
+
RetryInterval: c.Knotstream.RetryInterval,
38
+
MaxRetryInterval: c.Knotstream.MaxRetryInterval,
39
+
ConnectionTimeout: c.Knotstream.ConnectionTimeout,
40
+
WorkerCount: c.Knotstream.WorkerCount,
41
+
QueueSize: c.Knotstream.QueueSize,
44
+
CursorStore: &cursorStore,
47
+
return kc.NewEventConsumer(cfg), nil
50
+
func knotstreamIngester(d *db.DB, enforcer *rbac.Enforcer) kc.ProcessFunc {
51
+
return func(source kc.EventSource, msg kc.Message) error {
53
+
case tangled.GitRefUpdateNSID:
54
+
return ingestRefUpdate(d, enforcer, source, msg)
55
+
case tangled.PipelineNSID:
63
+
func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, source kc.EventSource, msg kc.Message) error {
64
+
var record tangled.GitRefUpdate
65
+
err := json.Unmarshal(msg.EventJson, &record)
70
+
knownKnots, err := enforcer.GetDomainsForUser(record.CommitterDid)
75
+
if !slices.Contains(knownKnots, source.Knot) {
76
+
return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot)
80
+
Did: record.CommitterDid,
84
+
if err := db.AddPunch(d, punch); err != nil {