forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "slices" 8 "time" 9 10 "tangled.sh/tangled.sh/core/api/tangled" 11 "tangled.sh/tangled.sh/core/appview/cache" 12 "tangled.sh/tangled.sh/core/appview/config" 13 "tangled.sh/tangled.sh/core/appview/db" 14 kc "tangled.sh/tangled.sh/core/knotclient" 15 "tangled.sh/tangled.sh/core/knotclient/cursor" 16 "tangled.sh/tangled.sh/core/log" 17 "tangled.sh/tangled.sh/core/rbac" 18 19 "github.com/posthog/posthog-go" 20) 21 22func KnotstreamConsumer(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*kc.EventConsumer, error) { 23 knots, err := db.GetCompletedRegistrations(d) 24 if err != nil { 25 return nil, err 26 } 27 28 srcs := make(map[kc.EventSource]struct{}) 29 for _, k := range knots { 30 s := kc.EventSource{k} 31 srcs[s] = struct{}{} 32 } 33 34 logger := log.New("knotstream") 35 cache := cache.New(c.Redis.Addr) 36 cursorStore := cursor.NewRedisCursorStore(cache) 37 38 cfg := kc.ConsumerConfig{ 39 Sources: srcs, 40 ProcessFunc: knotstreamIngester(ctx, d, enforcer, posthog, c.Core.Dev), 41 RetryInterval: c.Knotstream.RetryInterval, 42 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 43 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 44 WorkerCount: c.Knotstream.WorkerCount, 45 QueueSize: c.Knotstream.QueueSize, 46 Logger: logger, 47 Dev: c.Core.Dev, 48 CursorStore: &cursorStore, 49 } 50 51 return kc.NewEventConsumer(cfg), nil 52} 53 54func knotstreamIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) kc.ProcessFunc { 55 return func(ctx context.Context, source kc.EventSource, msg kc.Message) error { 56 switch msg.Nsid { 57 case tangled.GitRefUpdateNSID: 58 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) 59 case tangled.PipelineNSID: 60 // TODO 61 } 62 63 return nil 64 } 65} 66 67func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source kc.EventSource, msg kc.Message) error { 68 var record tangled.GitRefUpdate 69 err := json.Unmarshal(msg.EventJson, &record) 70 if err != nil { 71 return err 72 } 73 74 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 75 if err != nil { 76 return err 77 } 78 if !slices.Contains(knownKnots, source.Knot) { 79 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot) 80 } 81 82 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 83 if err != nil { 84 return err 85 } 86 count := 0 87 for _, ke := range knownEmails { 88 if record.Meta == nil { 89 continue 90 } 91 if record.Meta.CommitCount == nil { 92 continue 93 } 94 for _, ce := range record.Meta.CommitCount.ByEmail { 95 if ce == nil { 96 continue 97 } 98 if ce.Email == ke.Address { 99 count += int(ce.Count) 100 } 101 } 102 } 103 104 punch := db.Punch{ 105 Did: record.CommitterDid, 106 Date: time.Now(), 107 Count: count, 108 } 109 if err := db.AddPunch(d, punch); err != nil { 110 return err 111 } 112 113 if !dev { 114 err = pc.Enqueue(posthog.Capture{ 115 DistinctId: record.CommitterDid, 116 Event: "git_ref_update", 117 }) 118 if err != nil { 119 // non-fatal, TODO: log this 120 } 121 } 122 123 return nil 124}