forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "encoding/json" 5 "fmt" 6 "slices" 7 "time" 8 9 "tangled.sh/tangled.sh/core/api/tangled" 10 "tangled.sh/tangled.sh/core/appview/cache" 11 "tangled.sh/tangled.sh/core/appview/config" 12 "tangled.sh/tangled.sh/core/appview/db" 13 kc "tangled.sh/tangled.sh/core/knotclient" 14 "tangled.sh/tangled.sh/core/log" 15 "tangled.sh/tangled.sh/core/rbac" 16 17 "github.com/posthog/posthog-go" 18) 19 20func KnotstreamConsumer(c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*kc.EventConsumer, error) { 21 knots, err := db.GetCompletedRegistrations(d) 22 if err != nil { 23 return nil, err 24 } 25 26 srcs := make(map[kc.EventSource]struct{}) 27 for _, k := range knots { 28 s := kc.EventSource{k} 29 srcs[s] = struct{}{} 30 } 31 32 logger := log.New("knotstream") 33 cache := cache.New(c.Redis.Addr) 34 cursorStore := kc.NewRedisCursorStore(cache) 35 36 cfg := kc.ConsumerConfig{ 37 Sources: srcs, 38 ProcessFunc: knotstreamIngester(d, enforcer, posthog, c.Core.Dev), 39 RetryInterval: c.Knotstream.RetryInterval, 40 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 41 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 42 WorkerCount: c.Knotstream.WorkerCount, 43 QueueSize: c.Knotstream.QueueSize, 44 Logger: logger, 45 Dev: c.Core.Dev, 46 CursorStore: &cursorStore, 47 } 48 49 return kc.NewEventConsumer(cfg), nil 50} 51 52func knotstreamIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) kc.ProcessFunc { 53 return func(source kc.EventSource, msg kc.Message) error { 54 switch msg.Nsid { 55 case tangled.GitRefUpdateNSID: 56 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) 57 case tangled.PipelineNSID: 58 // TODO 59 } 60 61 return nil 62 } 63} 64 65func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source kc.EventSource, msg kc.Message) error { 66 var record tangled.GitRefUpdate 67 err := json.Unmarshal(msg.EventJson, &record) 68 if err != nil { 69 return err 70 } 71 72 knownKnots, err := enforcer.GetDomainsForUser(record.CommitterDid) 73 if err != nil { 74 return err 75 } 76 if !slices.Contains(knownKnots, source.Knot) { 77 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot) 78 } 79 80 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 81 if err != nil { 82 return err 83 } 84 count := 0 85 for _, ke := range knownEmails { 86 if record.Meta == nil { 87 continue 88 } 89 if record.Meta.CommitCount == nil { 90 continue 91 } 92 for _, ce := range record.Meta.CommitCount.ByEmail { 93 if ce == nil { 94 continue 95 } 96 if ce.Email == ke.Address { 97 count += int(ce.Count) 98 } 99 } 100 } 101 102 punch := db.Punch{ 103 Did: record.CommitterDid, 104 Date: time.Now(), 105 Count: count, 106 } 107 if err := db.AddPunch(d, punch); err != nil { 108 return err 109 } 110 111 if !dev { 112 err = pc.Enqueue(posthog.Capture{ 113 DistinctId: record.CommitterDid, 114 Event: "git_ref_update", 115 }) 116 if err != nil { 117 // non-fatal, TODO: log this 118 } 119 } 120 121 return nil 122}