forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1// heavily inspired by gitea's model (basically copy-pasted) 2package pulls_indexer 3 4import ( 5 "context" 6 "errors" 7 "log" 8 "os" 9 10 "github.com/blevesearch/bleve/v2" 11 "github.com/blevesearch/bleve/v2/analysis/analyzer/custom" 12 "github.com/blevesearch/bleve/v2/analysis/token/camelcase" 13 "github.com/blevesearch/bleve/v2/analysis/token/lowercase" 14 "github.com/blevesearch/bleve/v2/analysis/token/unicodenorm" 15 "github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode" 16 "github.com/blevesearch/bleve/v2/index/upsidedown" 17 "github.com/blevesearch/bleve/v2/mapping" 18 "github.com/blevesearch/bleve/v2/search/query" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/indexer/base36" 21 "tangled.org/core/appview/indexer/bleve" 22 "tangled.org/core/appview/models" 23 tlog "tangled.org/core/log" 24) 25 26const ( 27 pullIndexerAnalyzer = "pullIndexer" 28 pullIndexerDocType = "pullIndexerDocType" 29 30 unicodeNormalizeName = "uicodeNormalize" 31) 32 33type Indexer struct { 34 indexer bleve.Index 35 path string 36} 37 38func NewIndexer(indexDir string) *Indexer { 39 return &Indexer{ 40 path: indexDir, 41 } 42} 43 44// Init initializes the indexer 45func (ix *Indexer) Init(ctx context.Context, e db.Execer) { 46 l := tlog.FromContext(ctx) 47 existed, err := ix.intialize(ctx) 48 if err != nil { 49 log.Fatalln("failed to initialize pull indexer", err) 50 } 51 if !existed { 52 l.Debug("Populating the pull indexer") 53 err := PopulateIndexer(ctx, ix, e) 54 if err != nil { 55 log.Fatalln("failed to populate pull indexer", err) 56 } 57 } 58 59 count, _ := ix.indexer.DocCount() 60 l.Info("Initialized the pull indexer", "docCount", count) 61} 62 63func generatePullIndexMapping() (mapping.IndexMapping, error) { 64 mapping := bleve.NewIndexMapping() 65 docMapping := bleve.NewDocumentMapping() 66 67 textFieldMapping := bleve.NewTextFieldMapping() 68 textFieldMapping.Store = false 69 textFieldMapping.IncludeInAll = false 70 71 keywordFieldMapping := bleve.NewKeywordFieldMapping() 72 keywordFieldMapping.Store = false 73 keywordFieldMapping.IncludeInAll = false 74 75 // numericFieldMapping := bleve.NewNumericFieldMapping() 76 77 docMapping.AddFieldMappingsAt("title", textFieldMapping) 78 docMapping.AddFieldMappingsAt("body", textFieldMapping) 79 80 docMapping.AddFieldMappingsAt("repo_at", keywordFieldMapping) 81 docMapping.AddFieldMappingsAt("state", keywordFieldMapping) 82 83 err := mapping.AddCustomTokenFilter(unicodeNormalizeName, map[string]any{ 84 "type": unicodenorm.Name, 85 "form": unicodenorm.NFC, 86 }) 87 if err != nil { 88 return nil, err 89 } 90 91 err = mapping.AddCustomAnalyzer(pullIndexerAnalyzer, map[string]any{ 92 "type": custom.Name, 93 "char_filters": []string{}, 94 "tokenizer": unicode.Name, 95 "token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name}, 96 }) 97 if err != nil { 98 return nil, err 99 } 100 101 mapping.DefaultAnalyzer = pullIndexerAnalyzer 102 mapping.AddDocumentMapping(pullIndexerDocType, docMapping) 103 mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping()) 104 mapping.DefaultMapping = bleve.NewDocumentDisabledMapping() 105 106 return mapping, nil 107} 108 109func (ix *Indexer) intialize(ctx context.Context) (bool, error) { 110 if ix.indexer != nil { 111 return false, errors.New("indexer is already initialized") 112 } 113 114 indexer, err := openIndexer(ctx, ix.path) 115 if err != nil { 116 return false, err 117 } 118 if indexer != nil { 119 ix.indexer = indexer 120 return true, nil 121 } 122 123 mapping, err := generatePullIndexMapping() 124 if err != nil { 125 return false, err 126 } 127 indexer, err = bleve.New(ix.path, mapping) 128 if err != nil { 129 return false, err 130 } 131 132 ix.indexer = indexer 133 134 return false, nil 135} 136 137func openIndexer(ctx context.Context, path string) (bleve.Index, error) { 138 l := tlog.FromContext(ctx) 139 indexer, err := bleve.Open(path) 140 if err != nil { 141 if errors.Is(err, upsidedown.IncompatibleVersion) { 142 l.Info("Indexer was built with a previous version of bleve, deleting and rebuilding") 143 return nil, os.RemoveAll(path) 144 } 145 return nil, nil 146 } 147 return indexer, nil 148} 149 150func PopulateIndexer(ctx context.Context, ix *Indexer, e db.Execer) error { 151 l := tlog.FromContext(ctx) 152 153 pulls, err := db.GetPulls(e) 154 if err != nil { 155 return err 156 } 157 count := len(pulls) 158 err = ix.Index(ctx, pulls...) 159 if err != nil { 160 return err 161 } 162 l.Info("pulls indexed", "count", count) 163 return err 164} 165 166// pullData data stored and will be indexed 167type pullData struct { 168 ID int64 `json:"id"` 169 RepoAt string `json:"repo_at"` 170 PullID int `json:"pull_id"` 171 Title string `json:"title"` 172 Body string `json:"body"` 173 State string `json:"state"` 174 175 Comments []pullCommentData `json:"comments"` 176} 177 178func makePullData(pull *models.Pull) *pullData { 179 return &pullData{ 180 ID: int64(pull.ID), 181 RepoAt: pull.RepoAt.String(), 182 PullID: pull.PullId, 183 Title: pull.Title, 184 Body: pull.Body, 185 State: pull.State.String(), 186 } 187} 188 189// Type returns the document type, for bleve's mapping.Classifier interface. 190func (i *pullData) Type() string { 191 return pullIndexerDocType 192} 193 194type pullCommentData struct { 195 Body string `json:"body"` 196} 197 198type searchResult struct { 199 Hits []int64 200 Total uint64 201} 202 203const maxBatchSize = 20 204 205func (ix *Indexer) Index(ctx context.Context, pulls ...*models.Pull) error { 206 batch := bleveutil.NewFlushingBatch(ix.indexer, maxBatchSize) 207 for _, pull := range pulls { 208 pullData := makePullData(pull) 209 if err := batch.Index(base36.Encode(pullData.ID), pullData); err != nil { 210 return err 211 } 212 } 213 return batch.Flush() 214} 215 216func (ix *Indexer) Delete(ctx context.Context, pullID int64) error { 217 return ix.indexer.Delete(base36.Encode(pullID)) 218} 219 220// Search searches for pulls 221func (ix *Indexer) Search(ctx context.Context, opts models.PullSearchOptions) (*searchResult, error) { 222 var queries []query.Query 223 224 // TODO(boltless): remove this after implementing pulls page pagination 225 limit := opts.Page.Limit 226 if limit == 0 { 227 limit = 500 228 } 229 230 if opts.Keyword != "" { 231 queries = append(queries, bleve.NewDisjunctionQuery( 232 bleveutil.MatchAndQuery("title", opts.Keyword, pullIndexerAnalyzer, 0), 233 bleveutil.MatchAndQuery("body", opts.Keyword, pullIndexerAnalyzer, 0), 234 )) 235 } 236 queries = append(queries, bleveutil.KeywordFieldQuery("repo_at", opts.RepoAt)) 237 queries = append(queries, bleveutil.KeywordFieldQuery("state", opts.State.String())) 238 239 var indexerQuery query.Query = bleve.NewConjunctionQuery(queries...) 240 searchReq := bleve.NewSearchRequestOptions(indexerQuery, limit, opts.Page.Offset, false) 241 res, err := ix.indexer.SearchInContext(ctx, searchReq) 242 if err != nil { 243 return nil, nil 244 } 245 ret := &searchResult{ 246 Total: res.Total, 247 Hits: make([]int64, len(res.Hits)), 248 } 249 for i, hit := range res.Hits { 250 id, err := base36.Decode(hit.ID) 251 if err != nil { 252 return nil, err 253 } 254 ret.Hits[i] = id 255 } 256 return ret, nil 257}