From 3e70c81467f2a6eb43e1f3c0fd2bd7739165a340 Mon Sep 17 00:00:00 2001 From: Seongmin Lee Date: Sat, 12 Jul 2025 20:28:10 +0900 Subject: [PATCH] appview: add basic issue indexer Change-Id: ukmllqluzwxxyqwuuknqpxuroyrkpvrw - Heavily inspired by gitea - add `GetAllIssues` which only receives a paginator and gathers all issues ignoring `repoAt` field Signed-off-by: Seongmin Lee --- .gitignore | 1 + appview/indexer/base36/base36.go | 20 +++ appview/indexer/bleve/batch.go | 58 +++++++++ appview/indexer/indexer.go | 32 +++++ appview/indexer/issues/indexer.go | 197 ++++++++++++++++++++++++++++++ appview/indexer/notifier.go | 20 +++ appview/issues/issues.go | 5 + appview/models/search.go | 23 ++++ appview/pages/pages.go | 1 + appview/pagination/page.go | 23 ++++ appview/state/router.go | 1 + appview/state/state.go | 10 ++ 12 files changed, 391 insertions(+) create mode 100644 appview/indexer/base36/base36.go create mode 100644 appview/indexer/bleve/batch.go create mode 100644 appview/indexer/indexer.go create mode 100644 appview/indexer/issues/indexer.go create mode 100644 appview/indexer/notifier.go create mode 100644 appview/models/search.go diff --git a/.gitignore b/.gitignore index e7fdc3d9..1a28a53e 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ patches .env *.rdb .envrc +*.bleve # Created if following hacking.md genjwks.out /nix/vm-data diff --git a/appview/indexer/base36/base36.go b/appview/indexer/base36/base36.go new file mode 100644 index 00000000..d5d3d653 --- /dev/null +++ b/appview/indexer/base36/base36.go @@ -0,0 +1,20 @@ +// mostly copied from gitea/modules/indexer/internal/base32 + +package base36 + +import ( + "fmt" + "strconv" +) + +func Encode(i int64) string { + return strconv.FormatInt(i, 36) +} + +func Decode(s string) (int64, error) { + i, err := strconv.ParseInt(s, 36, 64) + if err != nil { + return 0, fmt.Errorf("invalid base36 integer %q: %w", s, err) + } + return i, nil +} diff --git a/appview/indexer/bleve/batch.go b/appview/indexer/bleve/batch.go new file mode 100644 index 00000000..618564e1 --- /dev/null +++ b/appview/indexer/bleve/batch.go @@ -0,0 +1,58 @@ +// Copyright 2021 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package bleveutil + +import ( + "github.com/blevesearch/bleve/v2" +) + +// FlushingBatch is a batch of operations that automatically flushes to the +// underlying index once it reaches a certain size. +type FlushingBatch struct { + maxBatchSize int + batch *bleve.Batch + index bleve.Index +} + +// NewFlushingBatch creates a new flushing batch for the specified index. Once +// the number of operations in the batch reaches the specified limit, the batch +// automatically flushes its operations to the index. +func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch { + return &FlushingBatch{ + maxBatchSize: maxBatchSize, + batch: index.NewBatch(), + index: index, + } +} + +// Index add a new index to batch +func (b *FlushingBatch) Index(id string, data any) error { + if err := b.batch.Index(id, data); err != nil { + return err + } + return b.flushIfFull() +} + +// Delete add a delete index to batch +func (b *FlushingBatch) Delete(id string) error { + b.batch.Delete(id) + return b.flushIfFull() +} + +func (b *FlushingBatch) flushIfFull() error { + if b.batch.Size() < b.maxBatchSize { + return nil + } + return b.Flush() +} + +// Flush submit the batch and create a new one +func (b *FlushingBatch) Flush() error { + err := b.index.Batch(b.batch) + if err != nil { + return err + } + b.batch = b.index.NewBatch() + return nil +} diff --git a/appview/indexer/indexer.go b/appview/indexer/indexer.go new file mode 100644 index 00000000..dc3789ae --- /dev/null +++ b/appview/indexer/indexer.go @@ -0,0 +1,32 @@ +package indexer + +import ( + "context" + "log/slog" + + "tangled.org/core/appview/db" + issues_indexer "tangled.org/core/appview/indexer/issues" + "tangled.org/core/appview/notify" + tlog "tangled.org/core/log" +) + +type Indexer struct { + Issues *issues_indexer.Indexer + logger *slog.Logger + notify.BaseNotifier +} + +func New(logger *slog.Logger) *Indexer { + return &Indexer { + issues_indexer.NewIndexer("indexes.bleve"), + logger, + notify.BaseNotifier{}, + } +} + +// Init initializes all indexers +func (ix *Indexer) Init(ctx context.Context, db *db.DB) error { + ctx = tlog.IntoContext(ctx, ix.logger) + ix.Issues.Init(ctx, db) + return nil +} diff --git a/appview/indexer/issues/indexer.go b/appview/indexer/issues/indexer.go new file mode 100644 index 00000000..627f66fb --- /dev/null +++ b/appview/indexer/issues/indexer.go @@ -0,0 +1,197 @@ +// heavily inspired by gitea's model (basically copy-pasted) +package issues_indexer + +import ( + "context" + "errors" + "os" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/index/upsidedown" + "github.com/blevesearch/bleve/v2/search/query" + "tangled.org/core/appview/db" + "tangled.org/core/appview/indexer/base36" + "tangled.org/core/appview/indexer/bleve" + "tangled.org/core/appview/models" + "tangled.org/core/appview/pagination" + tlog "tangled.org/core/log" +) + +type Indexer struct { + indexer bleve.Index + path string +} + +func NewIndexer(indexDir string) *Indexer { + return &Indexer{ + path: indexDir, + } +} + +// Init initializes the indexer +func (ix *Indexer) Init(ctx context.Context, e db.Execer) { + l := tlog.FromContext(ctx) + existed, err := ix.intialize(ctx) + if err != nil { + l.Error("failed to initialize issue indexer", "err", err) + } + if !existed { + l.Debug("Populating the issue indexer") + err := PopulateIndexer(ctx, ix, e) + if err != nil { + l.Error("failed to populate issue indexer", "err", err) + } + } + l.Info("Initialized the issue indexer") +} + +func (ix *Indexer) intialize(ctx context.Context) (bool, error) { + if ix.indexer != nil { + return false, errors.New("indexer is already initialized") + } + + indexer, err := openIndexer(ctx, ix.path) + if err != nil { + return false, err + } + if indexer != nil { + ix.indexer = indexer + return true, nil + } + + mapping := bleve.NewIndexMapping() + indexer, err = bleve.New(ix.path, mapping) + if err != nil { + return false, err + } + + ix.indexer = indexer + + return false, nil +} + +func openIndexer(ctx context.Context, path string) (bleve.Index, error) { + l := tlog.FromContext(ctx) + indexer, err := bleve.Open(path) + if err != nil { + if errors.Is(err, upsidedown.IncompatibleVersion) { + l.Info("Indexer was built with a previous version of bleve, deleting and rebuilding") + return nil, os.RemoveAll(path) + } + return nil, nil + } + return indexer, nil +} + +func PopulateIndexer(ctx context.Context, ix *Indexer, e db.Execer) error { + l := tlog.FromContext(ctx) + count := 0 + err := pagination.IterateAll( + func(page pagination.Page) ([]models.Issue, error) { + return db.GetIssuesPaginated(e, page) + }, + func(issues []models.Issue) error { + count += len(issues) + return ix.Index(ctx, issues...) + }, + ) + l.Info("issues indexed", "count", count) + return err +} + +// issueData data stored and will be indexed +type issueData struct { + ID int64 `json:"id"` + RepoAt string `json:"repo_at"` + IssueID int `json:"issue_id"` + Title string `json:"title"` + Body string `json:"body"` + + IsOpen bool `json:"is_open"` + Comments []IssueCommentData `json:"comments"` +} + +func makeIssueData(issue *models.Issue) *issueData { + return &issueData{ + ID: issue.Id, + RepoAt: issue.RepoAt.String(), + IssueID: issue.IssueId, + Title: issue.Title, + Body: issue.Body, + IsOpen: issue.Open, + } +} + +type IssueCommentData struct { + Body string `json:"body"` +} + +type SearchResult struct { + Hits []int64 + Total uint64 +} + +const maxBatchSize = 20 + +func (ix *Indexer) Index(ctx context.Context, issues ...models.Issue) error { + batch := bleveutil.NewFlushingBatch(ix.indexer, maxBatchSize) + for _, issue := range issues { + issueData := makeIssueData(&issue) + if err := batch.Index(base36.Encode(issue.Id), issueData); err != nil { + return err + } + } + return batch.Flush() +} + +// Search searches for issues +func (ix *Indexer) Search(ctx context.Context, opts models.IssueSearchOptions) (*SearchResult, error) { + var queries []query.Query + + if opts.Keyword != "" { + queries = append(queries, bleve.NewDisjunctionQuery( + matchAndQuery(opts.Keyword, "title"), + matchAndQuery(opts.Keyword, "body"), + )) + } + queries = append(queries, keywordFieldQuery(opts.RepoAt, "repo_at")) + queries = append(queries, boolFieldQuery(opts.IsOpen, "is_open")) + // TODO: append more queries + + var indexerQuery query.Query = bleve.NewConjunctionQuery(queries...) + searchReq := bleve.NewSearchRequestOptions(indexerQuery, opts.Page.Limit, opts.Page.Offset, false) + res, err := ix.indexer.SearchInContext(ctx, searchReq) + if err != nil { + return nil, nil + } + ret := &SearchResult{ + Total: res.Total, + Hits: make([]int64, len(res.Hits)), + } + for i, hit := range res.Hits { + id, err := base36.Decode(hit.ID) + if err != nil { + return nil, err + } + ret.Hits[i] = id + } + return ret, nil +} + +func matchAndQuery(keyword, field string) query.Query { + q := bleve.NewMatchQuery(keyword) + q.FieldVal = field + return q +} + +func boolFieldQuery(val bool, field string) query.Query { + q := bleve.NewBoolFieldQuery(val) + q.FieldVal = field + return q +} + +func keywordFieldQuery(keyword, field string) query.Query { + q := bleve.NewTermQuery(keyword) + q.FieldVal = field + return q +} diff --git a/appview/indexer/notifier.go b/appview/indexer/notifier.go new file mode 100644 index 00000000..f862ac73 --- /dev/null +++ b/appview/indexer/notifier.go @@ -0,0 +1,20 @@ +package indexer + +import ( + "context" + + "tangled.org/core/appview/models" + "tangled.org/core/appview/notify" + "tangled.org/core/log" +) + +var _ notify.Notifier = &Indexer{} + +func (ix *Indexer) NewIssue(ctx context.Context, issue *models.Issue) { + l := log.FromContext(ctx).With("notifier", "indexer.NewIssue", "issue", issue) + l.Debug("indexing new issue") + err := ix.Issues.Index(ctx, *issue) + if err != nil { + l.Error("failed to index an issue", "err", err) + } +} diff --git a/appview/issues/issues.go b/appview/issues/issues.go index fb1acc0a..83209ad1 100644 --- a/appview/issues/issues.go +++ b/appview/issues/issues.go @@ -19,6 +19,7 @@ import ( "tangled.org/core/api/tangled" "tangled.org/core/appview/config" "tangled.org/core/appview/db" + issues_indexer "tangled.org/core/appview/indexer/issues" "tangled.org/core/appview/models" "tangled.org/core/appview/notify" "tangled.org/core/appview/oauth" @@ -40,6 +41,7 @@ type Issues struct { notifier notify.Notifier logger *slog.Logger validator *validator.Validator + indexer *issues_indexer.Indexer } func New( @@ -51,6 +53,7 @@ func New( config *config.Config, notifier notify.Notifier, validator *validator.Validator, + indexer *issues_indexer.Indexer, logger *slog.Logger, ) *Issues { return &Issues{ @@ -63,6 +66,7 @@ func New( notifier: notifier, logger: logger, validator: validator, + indexer: indexer, } } @@ -847,6 +851,7 @@ func (rp *Issues) NewIssue(w http.ResponseWriter, r *http.Request) { Rkey: tid.TID(), Title: r.FormValue("title"), Body: r.FormValue("body"), + Open: true, Did: user.Did, Created: time.Now(), Repo: &f.Repo, diff --git a/appview/models/search.go b/appview/models/search.go new file mode 100644 index 00000000..2cd9ea36 --- /dev/null +++ b/appview/models/search.go @@ -0,0 +1,23 @@ +package models + +import "tangled.org/core/appview/pagination" + +type IssueSearchOptions struct { + Keyword string + RepoAt string + IsOpen bool + + Page pagination.Page +} + +// func (so *SearchOptions) ToFilters() []filter { +// var filters []filter +// if so.IsOpen != nil { +// openValue := 0 +// if *so.IsOpen { +// openValue = 1 +// } +// filters = append(filters, FilterEq("open", openValue)) +// } +// return filters +// } diff --git a/appview/pages/pages.go b/appview/pages/pages.go index 296d797d..dd30f870 100644 --- a/appview/pages/pages.go +++ b/appview/pages/pages.go @@ -970,6 +970,7 @@ type RepoIssuesParams struct { LabelDefs map[string]*models.LabelDefinition Page pagination.Page FilteringByOpen bool + FilterQuery string } func (p *Pages) RepoIssues(w io.Writer, params RepoIssuesParams) error { diff --git a/appview/pagination/page.go b/appview/pagination/page.go index b3dc781b..9ecef5dc 100644 --- a/appview/pagination/page.go +++ b/appview/pagination/page.go @@ -29,3 +29,26 @@ func (p Page) Next() Page { Limit: p.Limit, } } + +func IterateAll[T any]( + fetch func(page Page) ([]T, error), + handle func(items []T) error, +) error { + page := FirstPage() + for { + items, err := fetch(page) + if err != nil { + return err + } + + err = handle(items) + if err != nil { + return err + } + if len(items) < page.Limit { + break + } + page = page.Next() + } + return nil +} diff --git a/appview/state/router.go b/appview/state/router.go index c430b962..9cd8a91f 100644 --- a/appview/state/router.go +++ b/appview/state/router.go @@ -262,6 +262,7 @@ func (s *State) IssuesRouter(mw *middleware.Middleware) http.Handler { s.config, s.notifier, s.validator, + s.indexer.Issues, log.SubLogger(s.logger, "issues"), ) return issues.Router(mw) diff --git a/appview/state/state.go b/appview/state/state.go index 78c4fbbd..f640eef7 100644 --- a/appview/state/state.go +++ b/appview/state/state.go @@ -14,6 +14,7 @@ import ( "tangled.org/core/appview" "tangled.org/core/appview/config" "tangled.org/core/appview/db" + "tangled.org/core/appview/indexer" "tangled.org/core/appview/models" "tangled.org/core/appview/notify" dbnotify "tangled.org/core/appview/notify/db" @@ -43,6 +44,7 @@ import ( type State struct { db *db.DB notifier notify.Notifier + indexer *indexer.Indexer oauth *oauth.OAuth enforcer *rbac.Enforcer pages *pages.Pages @@ -65,6 +67,12 @@ func Make(ctx context.Context, config *config.Config) (*State, error) { return nil, fmt.Errorf("failed to create db: %w", err) } + indexer := indexer.New(log.SubLogger(logger, "indexer")) + err = indexer.Init(ctx, d) + if err != nil { + return nil, fmt.Errorf("failed to create indexer: %w", err) + } + enforcer, err := rbac.NewEnforcer(config.Core.DbPath) if err != nil { return nil, fmt.Errorf("failed to create enforcer: %w", err) @@ -159,11 +167,13 @@ func Make(ctx context.Context, config *config.Config) (*State, error) { if !config.Core.Dev { notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) } + notifiers = append(notifiers, indexer) notifier := notify.NewMergedNotifier(notifiers...) state := &State{ d, notifier, + indexer, oauth, enforcer, pages, -- 2.43.0