forked from tangled.org/core
this repo has no description
at spindle 3.0 kB view raw
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "slices" 8 "time" 9 10 "tangled.sh/tangled.sh/core/api/tangled" 11 "tangled.sh/tangled.sh/core/appview/cache" 12 "tangled.sh/tangled.sh/core/appview/config" 13 "tangled.sh/tangled.sh/core/appview/db" 14 kc "tangled.sh/tangled.sh/core/knotclient" 15 "tangled.sh/tangled.sh/core/log" 16 "tangled.sh/tangled.sh/core/rbac" 17 18 "github.com/posthog/posthog-go" 19) 20 21func KnotstreamConsumer(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*kc.EventConsumer, error) { 22 knots, err := db.GetCompletedRegistrations(d) 23 if err != nil { 24 return nil, err 25 } 26 27 srcs := make(map[kc.EventSource]struct{}) 28 for _, k := range knots { 29 s := kc.EventSource{k} 30 srcs[s] = struct{}{} 31 } 32 33 logger := log.New("knotstream") 34 cache := cache.New(c.Redis.Addr) 35 cursorStore := kc.NewRedisCursorStore(cache) 36 37 cfg := kc.ConsumerConfig{ 38 Sources: srcs, 39 ProcessFunc: knotstreamIngester(ctx, d, enforcer, posthog, c.Core.Dev), 40 RetryInterval: c.Knotstream.RetryInterval, 41 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 42 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 43 WorkerCount: c.Knotstream.WorkerCount, 44 QueueSize: c.Knotstream.QueueSize, 45 Logger: logger, 46 Dev: c.Core.Dev, 47 CursorStore: &cursorStore, 48 } 49 50 return kc.NewEventConsumer(cfg), nil 51} 52 53func knotstreamIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) kc.ProcessFunc { 54 return func(ctx context.Context, source kc.EventSource, msg kc.Message) error { 55 switch msg.Nsid { 56 case tangled.GitRefUpdateNSID: 57 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) 58 case tangled.PipelineNSID: 59 // TODO 60 } 61 62 return nil 63 } 64} 65 66func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source kc.EventSource, msg kc.Message) error { 67 var record tangled.GitRefUpdate 68 err := json.Unmarshal(msg.EventJson, &record) 69 if err != nil { 70 return err 71 } 72 73 knownKnots, err := enforcer.GetDomainsForUser(record.CommitterDid) 74 if err != nil { 75 return err 76 } 77 if !slices.Contains(knownKnots, source.Knot) { 78 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot) 79 } 80 81 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 82 if err != nil { 83 return err 84 } 85 count := 0 86 for _, ke := range knownEmails { 87 if record.Meta == nil { 88 continue 89 } 90 if record.Meta.CommitCount == nil { 91 continue 92 } 93 for _, ce := range record.Meta.CommitCount.ByEmail { 94 if ce == nil { 95 continue 96 } 97 if ce.Email == ke.Address { 98 count += int(ce.Count) 99 } 100 } 101 } 102 103 punch := db.Punch{ 104 Did: record.CommitterDid, 105 Date: time.Now(), 106 Count: count, 107 } 108 if err := db.AddPunch(d, punch); err != nil { 109 return err 110 } 111 112 if !dev { 113 err = pc.Enqueue(posthog.Capture{ 114 DistinctId: record.CommitterDid, 115 Event: "git_ref_update", 116 }) 117 if err != nil { 118 // non-fatal, TODO: log this 119 } 120 } 121 122 return nil 123}