···
1
+
package issues_indexer
8
+
"github.com/blevesearch/bleve/v2"
9
+
"github.com/blevesearch/bleve/v2/index/upsidedown"
10
+
"github.com/blevesearch/bleve/v2/search/query"
11
+
"tangled.org/core/appview/db"
12
+
"tangled.org/core/appview/indexer/base36"
13
+
"tangled.org/core/appview/indexer/bleve"
14
+
"tangled.org/core/appview/models"
15
+
"tangled.org/core/appview/pagination"
16
+
tlog "tangled.org/core/log"
19
+
type Indexer struct {
24
+
func NewIndexer(indexDir string) *Indexer {
30
+
// Init initializes the indexer
31
+
func (ix *Indexer) Init(ctx context.Context, e db.Execer) {
32
+
l := tlog.FromContext(ctx)
33
+
existed, err := ix.intialize(ctx)
35
+
l.Error("failed to initialize issue indexer", "err", err)
38
+
l.Debug("Populating the issue indexer")
39
+
err := PopulateIndexer(ctx, ix, e)
41
+
l.Error("failed to populate issue indexer", "err", err)
44
+
l.Info("Initialized the issue indexer")
47
+
func (ix *Indexer) intialize(ctx context.Context) (bool, error) {
48
+
if ix.indexer != nil {
49
+
return false, errors.New("indexer is already initialized")
52
+
indexer, err := openIndexer(ctx, ix.path)
57
+
ix.indexer = indexer
61
+
mapping := bleve.NewIndexMapping()
62
+
indexer, err = bleve.New(ix.path, mapping)
67
+
ix.indexer = indexer
72
+
func openIndexer(ctx context.Context, path string) (bleve.Index, error) {
73
+
l := tlog.FromContext(ctx)
74
+
indexer, err := bleve.Open(path)
76
+
if errors.Is(err, upsidedown.IncompatibleVersion) {
77
+
l.Info("Indexer was built with a previous version of bleve, deleting and rebuilding")
78
+
return nil, os.RemoveAll(path)
85
+
func PopulateIndexer(ctx context.Context, ix *Indexer, e db.Execer) error {
86
+
l := tlog.FromContext(ctx)
88
+
err := pagination.IterateAll(
89
+
func(page pagination.Page) ([]models.Issue, error) {
90
+
return db.GetIssuesPaginated(e, page)
92
+
func(issues []models.Issue) error {
93
+
var dataList []*IssueData
94
+
for _, issue := range issues {
95
+
dataList = append(dataList, &IssueData{
97
+
IssueID: issue.IssueId,
100
+
IsOpen: issue.Open,
102
+
err := ix.Index(ctx, dataList...)
110
+
l.Info("issues indexed", "count", count)
114
+
// IssueData data stored and will be indexed
115
+
type IssueData struct {
116
+
ID int64 `json:"id"`
117
+
IssueID int `json:"issue_id"`
118
+
Title string `json:"title"`
119
+
Body string `json:"body"`
121
+
IsOpen bool `json:"is_open"`
122
+
Comments []IssueCommentData `json:"comments"`
125
+
type IssueCommentData struct {
126
+
Body string `json:"body"`
129
+
type SearchResult struct {
134
+
const maxBatchSize = 20
136
+
func (ix *Indexer) Index(ctx context.Context, issues ...*IssueData) error {
137
+
batch := bleveutil.NewFlushingBatch(ix.indexer, maxBatchSize)
138
+
for _, issue := range issues {
139
+
if err := batch.Index(base36.Encode(issue.ID), issue); err != nil {
143
+
return batch.Flush()
146
+
// Search searches for issues
147
+
func (ix *Indexer) Search(ctx context.Context, opts models.IssueSearchOptions) (*SearchResult, error) {
148
+
var queries []query.Query
150
+
if opts.Keyword != "" {
151
+
queries = append(queries, bleve.NewDisjunctionQuery(
152
+
matchAndQuery(opts.Keyword, "title"),
153
+
matchAndQuery(opts.Keyword, "body"),
156
+
queries = append(queries, boolFieldQuery(opts.IsOpen, "is_open"))
157
+
// TODO: append more queries
159
+
var indexerQuery query.Query = bleve.NewConjunctionQuery(queries...)
160
+
searchReq := bleve.NewSearchRequestOptions(indexerQuery, opts.Page.Limit, opts.Page.Offset, false)
161
+
res, err := ix.indexer.SearchInContext(ctx, searchReq)
165
+
ret := &SearchResult{
167
+
Hits: make([]int64, len(res.Hits)),
169
+
for i, hit := range res.Hits {
170
+
id, err := base36.Decode(hit.ID)
179
+
func matchAndQuery(keyword, field string) query.Query {
180
+
q := bleve.NewMatchQuery(keyword)
185
+
func boolFieldQuery(val bool, field string) query.Query {
186
+
q := bleve.NewBoolFieldQuery(val)