appview: add basic issue indexer (wip) #494

merged
opened by boltless.me targeting master from boltless.me/core: feat/search
  • Heavily inspired by gitea
  • add GetAllIssues which only receives a paginator and gathers all issues ignoring repoAt field

Signed-off-by: Seongmin Lee boltlessengineer@proton.me

Changed files
+391
appview
indexer
issues
models
pages
pagination
state
+1
.gitignore
···
.env
*.rdb
.envrc
+
*.bleve
# Created if following hacking.md
genjwks.out
/nix/vm-data
+20
appview/indexer/base36/base36.go
···
+
// mostly copied from gitea/modules/indexer/internal/base32
+
+
package base36
+
+
import (
+
"fmt"
+
"strconv"
+
)
+
+
func Encode(i int64) string {
+
return strconv.FormatInt(i, 36)
+
}
+
+
func Decode(s string) (int64, error) {
+
i, err := strconv.ParseInt(s, 36, 64)
+
if err != nil {
+
return 0, fmt.Errorf("invalid base36 integer %q: %w", s, err)
+
}
+
return i, nil
+
}
+58
appview/indexer/bleve/batch.go
···
+
// Copyright 2021 The Gitea Authors. All rights reserved.
+
// SPDX-License-Identifier: MIT
+
+
package bleveutil
+
+
import (
+
"github.com/blevesearch/bleve/v2"
+
)
+
+
// FlushingBatch is a batch of operations that automatically flushes to the
+
// underlying index once it reaches a certain size.
+
type FlushingBatch struct {
+
maxBatchSize int
+
batch *bleve.Batch
+
index bleve.Index
+
}
+
+
// NewFlushingBatch creates a new flushing batch for the specified index. Once
+
// the number of operations in the batch reaches the specified limit, the batch
+
// automatically flushes its operations to the index.
+
func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch {
+
return &FlushingBatch{
+
maxBatchSize: maxBatchSize,
+
batch: index.NewBatch(),
+
index: index,
+
}
+
}
+
+
// Index add a new index to batch
+
func (b *FlushingBatch) Index(id string, data any) error {
+
if err := b.batch.Index(id, data); err != nil {
+
return err
+
}
+
return b.flushIfFull()
+
}
+
+
// Delete add a delete index to batch
+
func (b *FlushingBatch) Delete(id string) error {
+
b.batch.Delete(id)
+
return b.flushIfFull()
+
}
+
+
func (b *FlushingBatch) flushIfFull() error {
+
if b.batch.Size() < b.maxBatchSize {
+
return nil
+
}
+
return b.Flush()
+
}
+
+
// Flush submit the batch and create a new one
+
func (b *FlushingBatch) Flush() error {
+
err := b.index.Batch(b.batch)
+
if err != nil {
+
return err
+
}
+
b.batch = b.index.NewBatch()
+
return nil
+
}
+32
appview/indexer/indexer.go
···
+
package indexer
+
+
import (
+
"context"
+
"log/slog"
+
+
"tangled.org/core/appview/db"
+
issues_indexer "tangled.org/core/appview/indexer/issues"
+
"tangled.org/core/appview/notify"
+
tlog "tangled.org/core/log"
+
)
+
+
type Indexer struct {
+
Issues *issues_indexer.Indexer
+
logger *slog.Logger
+
notify.BaseNotifier
+
}
+
+
func New(logger *slog.Logger) *Indexer {
+
return &Indexer {
+
issues_indexer.NewIndexer("indexes.bleve"),
+
logger,
+
notify.BaseNotifier{},
+
}
+
}
+
+
// Init initializes all indexers
+
func (ix *Indexer) Init(ctx context.Context, db *db.DB) error {
+
ctx = tlog.IntoContext(ctx, ix.logger)
+
ix.Issues.Init(ctx, db)
+
return nil
+
}
+197
appview/indexer/issues/indexer.go
···
+
// heavily inspired by gitea's model (basically copy-pasted)
+
package issues_indexer
+
+
import (
+
"context"
+
"errors"
+
"os"
+
+
"github.com/blevesearch/bleve/v2"
+
"github.com/blevesearch/bleve/v2/index/upsidedown"
+
"github.com/blevesearch/bleve/v2/search/query"
+
"tangled.org/core/appview/db"
+
"tangled.org/core/appview/indexer/base36"
+
"tangled.org/core/appview/indexer/bleve"
+
"tangled.org/core/appview/models"
+
"tangled.org/core/appview/pagination"
+
tlog "tangled.org/core/log"
+
)
+
+
type Indexer struct {
+
indexer bleve.Index
+
path string
+
}
+
+
func NewIndexer(indexDir string) *Indexer {
+
return &Indexer{
+
path: indexDir,
+
}
+
}
+
+
// Init initializes the indexer
+
func (ix *Indexer) Init(ctx context.Context, e db.Execer) {
+
l := tlog.FromContext(ctx)
+
existed, err := ix.intialize(ctx)
+
if err != nil {
+
l.Error("failed to initialize issue indexer", "err", err)
+
}
+
if !existed {
+
l.Debug("Populating the issue indexer")
+
err := PopulateIndexer(ctx, ix, e)
+
if err != nil {
+
l.Error("failed to populate issue indexer", "err", err)
+
}
+
}
+
l.Info("Initialized the issue indexer")
+
}
+
+
func (ix *Indexer) intialize(ctx context.Context) (bool, error) {
+
if ix.indexer != nil {
+
return false, errors.New("indexer is already initialized")
+
}
+
+
indexer, err := openIndexer(ctx, ix.path)
+
if err != nil {
+
return false, err
+
}
+
if indexer != nil {
+
ix.indexer = indexer
+
return true, nil
+
}
+
+
mapping := bleve.NewIndexMapping()
+
indexer, err = bleve.New(ix.path, mapping)
+
if err != nil {
+
return false, err
+
}
+
+
ix.indexer = indexer
+
+
return false, nil
+
}
+
+
func openIndexer(ctx context.Context, path string) (bleve.Index, error) {
+
l := tlog.FromContext(ctx)
+
indexer, err := bleve.Open(path)
+
if err != nil {
+
if errors.Is(err, upsidedown.IncompatibleVersion) {
+
l.Info("Indexer was built with a previous version of bleve, deleting and rebuilding")
+
return nil, os.RemoveAll(path)
+
}
+
return nil, nil
+
}
+
return indexer, nil
+
}
+
+
func PopulateIndexer(ctx context.Context, ix *Indexer, e db.Execer) error {
+
l := tlog.FromContext(ctx)
+
count := 0
+
err := pagination.IterateAll(
+
func(page pagination.Page) ([]models.Issue, error) {
+
return db.GetIssuesPaginated(e, page)
+
},
+
func(issues []models.Issue) error {
+
count += len(issues)
+
return ix.Index(ctx, issues...)
+
},
+
)
+
l.Info("issues indexed", "count", count)
+
return err
+
}
+
+
// issueData data stored and will be indexed
+
type issueData struct {
+
ID int64 `json:"id"`
+
RepoAt string `json:"repo_at"`
+
IssueID int `json:"issue_id"`
+
Title string `json:"title"`
+
Body string `json:"body"`
+
+
IsOpen bool `json:"is_open"`
+
Comments []IssueCommentData `json:"comments"`
+
}
+
+
func makeIssueData(issue *models.Issue) issueData {
+
return issueData{
+
ID: issue.Id,
+
RepoAt: issue.RepoAt.String(),
+
IssueID: issue.IssueId,
+
Title: issue.Title,
+
Body: issue.Body,
+
IsOpen: issue.Open,
+
}
+
}
+
+
type IssueCommentData struct {
+
Body string `json:"body"`
+
}
+
+
type SearchResult struct {
+
Hits []int64
+
Total uint64
+
}
+
+
const maxBatchSize = 20
+
+
func (ix *Indexer) Index(ctx context.Context, issues ...models.Issue) error {
+
batch := bleveutil.NewFlushingBatch(ix.indexer, maxBatchSize)
+
for _, issue := range issues {
+
issueData := makeIssueData(&issue)
+
if err := batch.Index(base36.Encode(issue.Id), issueData); err != nil {
+
return err
+
}
+
}
+
return batch.Flush()
+
}
+
+
// Search searches for issues
+
func (ix *Indexer) Search(ctx context.Context, opts models.IssueSearchOptions) (*SearchResult, error) {
+
var queries []query.Query
+
+
if opts.Keyword != "" {
+
queries = append(queries, bleve.NewDisjunctionQuery(
+
matchAndQuery(opts.Keyword, "title"),
+
matchAndQuery(opts.Keyword, "body"),
+
))
+
}
+
queries = append(queries, keywordFieldQuery(opts.RepoAt, "repo_at"))
+
queries = append(queries, boolFieldQuery(opts.IsOpen, "is_open"))
+
// TODO: append more queries
+
+
var indexerQuery query.Query = bleve.NewConjunctionQuery(queries...)
+
searchReq := bleve.NewSearchRequestOptions(indexerQuery, opts.Page.Limit, opts.Page.Offset, false)
+
res, err := ix.indexer.SearchInContext(ctx, searchReq)
+
if err != nil {
+
return nil, nil
+
}
+
ret := &SearchResult{
+
Total: res.Total,
+
Hits: make([]int64, len(res.Hits)),
+
}
+
for i, hit := range res.Hits {
+
id, err := base36.Decode(hit.ID)
+
if err != nil {
+
return nil, err
+
}
+
ret.Hits[i] = id
+
}
+
return ret, nil
+
}
+
+
func matchAndQuery(keyword, field string) query.Query {
+
q := bleve.NewMatchQuery(keyword)
+
q.FieldVal = field
+
return q
+
}
+
+
func boolFieldQuery(val bool, field string) query.Query {
+
q := bleve.NewBoolFieldQuery(val)
+
q.FieldVal = field
+
return q
+
}
+
+
func keywordFieldQuery(keyword, field string) query.Query {
+
q := bleve.NewTermQuery(keyword)
+
q.FieldVal = field
+
return q
+
}
+20
appview/indexer/notifier.go
···
+
package indexer
+
+
import (
+
"context"
+
+
"tangled.org/core/appview/models"
+
"tangled.org/core/appview/notify"
+
"tangled.org/core/log"
+
)
+
+
var _ notify.Notifier = &Indexer{}
+
+
func (ix *Indexer) NewIssue(ctx context.Context, issue *models.Issue) {
+
l := log.FromContext(ctx).With("notifier", "indexer.NewIssue", "issue", issue)
+
l.Debug("indexing new issue")
+
err := ix.Issues.Index(ctx, *issue)
+
if err != nil {
+
l.Error("failed to index an issue", "err", err)
+
}
+
}
+5
appview/issues/issues.go
···
"tangled.org/core/api/tangled"
"tangled.org/core/appview/config"
"tangled.org/core/appview/db"
+
issues_indexer "tangled.org/core/appview/indexer/issues"
"tangled.org/core/appview/models"
"tangled.org/core/appview/notify"
"tangled.org/core/appview/oauth"
···
notifier notify.Notifier
logger *slog.Logger
validator *validator.Validator
+
indexer *issues_indexer.Indexer
}
func New(
···
config *config.Config,
notifier notify.Notifier,
validator *validator.Validator,
+
indexer *issues_indexer.Indexer,
logger *slog.Logger,
) *Issues {
return &Issues{
···
notifier: notifier,
logger: logger,
validator: validator,
+
indexer: indexer,
}
}
···
Rkey: tid.TID(),
Title: r.FormValue("title"),
Body: r.FormValue("body"),
+
Open: true,
Did: user.Did,
Created: time.Now(),
Repo: &f.Repo,
+23
appview/models/search.go
···
+
package models
+
+
import "tangled.org/core/appview/pagination"
+
+
type IssueSearchOptions struct {
+
Keyword string
+
RepoAt string
+
IsOpen bool
+
+
Page pagination.Page
+
}
+
+
// func (so *SearchOptions) ToFilters() []filter {
+
// var filters []filter
+
// if so.IsOpen != nil {
+
// openValue := 0
+
// if *so.IsOpen {
+
// openValue = 1
+
// }
+
// filters = append(filters, FilterEq("open", openValue))
+
// }
+
// return filters
+
// }
+1
appview/pages/pages.go
···
LabelDefs map[string]*models.LabelDefinition
Page pagination.Page
FilteringByOpen bool
+
FilterQuery string
}
func (p *Pages) RepoIssues(w io.Writer, params RepoIssuesParams) error {
+23
appview/pagination/page.go
···
Limit: p.Limit,
}
}
+
+
func IterateAll[T any](
+
fetch func(page Page) ([]T, error),
+
handle func(items []T) error,
+
) error {
+
page := FirstPage()
+
for {
+
items, err := fetch(page)
+
if err != nil {
+
return err
+
}
+
+
err = handle(items)
+
if err != nil {
+
return err
+
}
+
if len(items) < page.Limit {
+
break
+
}
+
page = page.Next()
+
}
+
return nil
+
}
+1
appview/state/router.go
···
s.config,
s.notifier,
s.validator,
+
s.indexer.Issues,
log.SubLogger(s.logger, "issues"),
)
return issues.Router(mw)
+10
appview/state/state.go
···
"tangled.org/core/appview"
"tangled.org/core/appview/config"
"tangled.org/core/appview/db"
+
"tangled.org/core/appview/indexer"
"tangled.org/core/appview/models"
"tangled.org/core/appview/notify"
dbnotify "tangled.org/core/appview/notify/db"
···
type State struct {
db *db.DB
notifier notify.Notifier
+
indexer *indexer.Indexer
oauth *oauth.OAuth
enforcer *rbac.Enforcer
pages *pages.Pages
···
return nil, fmt.Errorf("failed to create db: %w", err)
}
+
indexer := indexer.New(log.SubLogger(logger, "indexer"))
+
err = indexer.Init(ctx, d)
+
if err != nil {
+
return nil, fmt.Errorf("failed to create indexer: %w", err)
+
}
+
enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
if err != nil {
return nil, fmt.Errorf("failed to create enforcer: %w", err)
···
if !config.Core.Dev {
notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
}
+
notifiers = append(notifiers, indexer)
notifier := notify.NewMergedNotifier(notifiers...)
state := &State{
d,
notifier,
+
indexer,
oauth,
enforcer,
pages,