appview: use repo_language table to access language info #317

merged
opened by oppi.li targeting master from push-snktzuwttuvu

this falls back to calling RepoLanguages on the knot, and caches that info at the appview.

Signed-off-by: oppiliappan me@oppi.li

Changed files
+100 -37
appview
repo
state
+44 -25
appview/repo/index.go
···
}
}
-
languageInfo, err := getLanguageInfo(f, signedClient, ref)
if err != nil {
log.Printf("failed to compute language percentages: %s", err)
// non-fatal
···
Languages: languageInfo,
Pipelines: pipelines,
})
-
return
}
-
func getLanguageInfo(
f *reporesolver.ResolvedRepo,
signedClient *knotclient.SignedClient,
ref string,
) ([]types.RepoLanguageDetails, error) {
-
repoLanguages, err := signedClient.RepoLanguages(f.OwnerDid(), f.RepoName, ref)
-
if err != nil {
-
return []types.RepoLanguageDetails{}, err
-
}
-
if repoLanguages == nil {
-
repoLanguages = &types.RepoLanguageResponse{Languages: make(map[string]int64)}
}
-
var totalSize int64
-
for _, fileSize := range repoLanguages.Languages {
-
totalSize += fileSize
}
var languageStats []types.RepoLanguageDetails
-
var otherPercentage float32 = 0
-
-
for lang, size := range repoLanguages.Languages {
-
percentage := (float32(size) / float32(totalSize)) * 100
-
-
if percentage <= 0.5 {
-
otherPercentage += percentage
-
continue
-
}
-
-
color := enry.GetColor(lang)
-
-
languageStats = append(languageStats, types.RepoLanguageDetails{Name: lang, Percentage: percentage, Color: color})
}
sort.Slice(languageStats, func(i, j int) bool {
···
}
}
+
languageInfo, err := rp.getLanguageInfo(f, signedClient, ref)
if err != nil {
log.Printf("failed to compute language percentages: %s", err)
// non-fatal
···
Languages: languageInfo,
Pipelines: pipelines,
})
}
+
func (rp *Repo) getLanguageInfo(
f *reporesolver.ResolvedRepo,
signedClient *knotclient.SignedClient,
ref string,
) ([]types.RepoLanguageDetails, error) {
+
// first attempt to fetch from db
+
langs, err := db.GetRepoLanguages(
+
rp.db,
+
db.FilterEq("repo_at", f.RepoAt),
+
db.FilterEq("ref", ref),
+
)
+
+
if err != nil || langs == nil {
+
// non-fatal, fetch langs from ks
+
ls, err := signedClient.RepoLanguages(f.OwnerDid(), f.RepoName, ref)
+
if err != nil {
+
return nil, err
+
}
+
if ls == nil {
+
return nil, nil
+
}
+
for l, s := range ls.Languages {
+
langs = append(langs, db.RepoLanguage{
+
RepoAt: f.RepoAt,
+
Ref: ref,
+
Language: l,
+
Bytes: s,
+
})
+
}
+
+
// update appview's cache
+
err = db.InsertRepoLanguages(rp.db, langs)
+
if err != nil {
+
// non-fatal
+
log.Println("failed to cache lang results", err)
+
}
}
+
var total int64
+
for _, l := range langs {
+
total += l.Bytes
}
var languageStats []types.RepoLanguageDetails
+
for _, l := range langs {
+
percentage := float32(l.Bytes) / float32(total) * 100
+
color := enry.GetColor(l.Language)
+
languageStats = append(languageStats, types.RepoLanguageDetails{
+
Name: l.Language,
+
Percentage: percentage,
+
Color: color,
+
})
}
sort.Slice(languageStats, func(i, j int) bool {
+56 -12
appview/state/knotstream.go
···
import (
"context"
"encoding/json"
"fmt"
"slices"
"time"
···
"tangled.sh/tangled.sh/core/workflow"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/posthog/posthog-go"
)
···
cfg := ec.ConsumerConfig{
Sources: srcs,
-
ProcessFunc: knotIngester(ctx, d, enforcer, posthog, c.Core.Dev),
RetryInterval: c.Knotstream.RetryInterval,
MaxRetryInterval: c.Knotstream.MaxRetryInterval,
ConnectionTimeout: c.Knotstream.ConnectionTimeout,
···
return ec.NewConsumer(cfg), nil
}
-
func knotIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
return func(ctx context.Context, source ec.Source, msg ec.Message) error {
switch msg.Nsid {
case tangled.GitRefUpdateNSID:
···
return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
}
knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
if err != nil {
return err
}
count := 0
for _, ke := range knownEmails {
if record.Meta == nil {
···
Date: time.Now(),
Count: count,
}
-
if err := db.AddPunch(d, punch); err != nil {
-
return err
}
-
if !dev {
-
err = pc.Enqueue(posthog.Capture{
-
DistinctId: record.CommitterDid,
-
Event: "git_ref_update",
-
})
-
if err != nil {
-
// non-fatal, TODO: log this
}
}
-
return nil
}
func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
···
import (
"context"
"encoding/json"
+
"errors"
"fmt"
"slices"
"time"
···
"tangled.sh/tangled.sh/core/workflow"
"github.com/bluesky-social/indigo/atproto/syntax"
+
"github.com/go-git/go-git/v5/plumbing"
"github.com/posthog/posthog-go"
)
···
cfg := ec.ConsumerConfig{
Sources: srcs,
+
ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev),
RetryInterval: c.Knotstream.RetryInterval,
MaxRetryInterval: c.Knotstream.MaxRetryInterval,
ConnectionTimeout: c.Knotstream.ConnectionTimeout,
···
return ec.NewConsumer(cfg), nil
}
+
func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
return func(ctx context.Context, source ec.Source, msg ec.Message) error {
switch msg.Nsid {
case tangled.GitRefUpdateNSID:
···
return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
}
+
err1 := populatePunchcard(d, record)
+
err2 := updateRepoLanguages(d, record)
+
+
var err3 error
+
if !dev {
+
err3 = pc.Enqueue(posthog.Capture{
+
DistinctId: record.CommitterDid,
+
Event: "git_ref_update",
+
})
+
}
+
+
return errors.Join(err1, err2, err3)
+
}
+
+
func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
if err != nil {
return err
}
+
count := 0
for _, ke := range knownEmails {
if record.Meta == nil {
···
Date: time.Now(),
Count: count,
}
+
return db.AddPunch(d, punch)
+
}
+
+
func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
+
if record.Meta == nil && record.Meta.LangBreakdown == nil {
+
return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName)
}
+
repos, err := db.GetRepos(
+
d,
+
db.FilterEq("did", record.RepoDid),
+
db.FilterEq("name", record.RepoName),
+
)
+
if err != nil {
+
return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err)
+
}
+
if len(repos) != 1 {
+
return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
+
}
+
repo := repos[0]
+
+
ref := plumbing.ReferenceName(record.Ref)
+
if !ref.IsBranch() {
+
return fmt.Errorf("%s is not a valid reference name", ref)
+
}
+
+
var langs []db.RepoLanguage
+
for _, l := range record.Meta.LangBreakdown.Inputs {
+
if l == nil {
+
continue
}
+
+
langs = append(langs, db.RepoLanguage{
+
RepoAt: repo.RepoAt(),
+
Ref: ref.Short(),
+
Language: l.Lang,
+
Bytes: l.Size,
+
})
}
+
return db.InsertRepoLanguages(d, langs)
}
func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {