···
9
+
"tangled.sh/tangled.sh/core/api/tangled"
10
+
"tangled.sh/tangled.sh/core/appview/cache"
11
+
"tangled.sh/tangled.sh/core/appview/config"
12
+
"tangled.sh/tangled.sh/core/appview/db"
13
+
kc "tangled.sh/tangled.sh/core/knotclient"
14
+
"tangled.sh/tangled.sh/core/log"
15
+
"tangled.sh/tangled.sh/core/rbac"
18
+
func KnotstreamConsumer(c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*kc.EventConsumer, error) {
19
+
knots, err := db.GetCompletedRegistrations(d)
24
+
var srcs []kc.EventSource
25
+
for _, k := range knots {
26
+
srcs = append(srcs, kc.EventSource{k})
29
+
logger := log.New("knotstream")
30
+
cache := cache.New(c.Redis.Addr)
31
+
cursorStore := kc.NewRedisCursorStore(cache)
33
+
cfg := kc.ConsumerConfig{
35
+
ProcessFunc: knotstreamIngester(d, enforcer),
36
+
RetryInterval: c.Knotstream.RetryInterval,
37
+
MaxRetryInterval: c.Knotstream.MaxRetryInterval,
38
+
ConnectionTimeout: c.Knotstream.ConnectionTimeout,
39
+
WorkerCount: c.Knotstream.WorkerCount,
40
+
QueueSize: c.Knotstream.QueueSize,
43
+
CursorStore: &cursorStore,
46
+
return kc.NewEventConsumer(cfg), nil
49
+
func knotstreamIngester(d *db.DB, enforcer *rbac.Enforcer) kc.ProcessFunc {
50
+
return func(source kc.EventSource, msg kc.Message) error {
52
+
case tangled.GitRefUpdateNSID:
53
+
return ingestRefUpdate(d, enforcer, source, msg)
54
+
case tangled.PipelineNSID:
62
+
func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, source kc.EventSource, msg kc.Message) error {
63
+
var record tangled.GitRefUpdate
64
+
err := json.Unmarshal(msg.EventJson, &record)
69
+
knownKnots, err := enforcer.GetDomainsForUser(record.CommitterDid)
74
+
if !slices.Contains(knownKnots, source.Knot) {
75
+
return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot)
79
+
Did: record.CommitterDid,
83
+
if err := db.AddPunch(d, punch); err != nil {