forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db
2
3import (
4 "database/sql"
5 "fmt"
6 "maps"
7 "slices"
8 "sort"
9 "strings"
10 "time"
11
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview/models"
15 "tangled.org/core/appview/pagination"
16 "tangled.org/core/orm"
17)
18
19func PutIssue(tx *sql.Tx, issue *models.Issue) error {
20 // ensure sequence exists
21 _, err := tx.Exec(`
22 insert or ignore into repo_issue_seqs (repo_at, next_issue_id)
23 values (?, 1)
24 `, issue.RepoAt)
25 if err != nil {
26 return err
27 }
28
29 issues, err := GetIssues(
30 tx,
31 orm.FilterEq("did", issue.Did),
32 orm.FilterEq("rkey", issue.Rkey),
33 )
34 switch {
35 case err != nil:
36 return err
37 case len(issues) == 0:
38 return createNewIssue(tx, issue)
39 case len(issues) != 1: // should be unreachable
40 return fmt.Errorf("invalid number of issues returned: %d", len(issues))
41 default:
42 // if content is identical, do not edit
43 existingIssue := issues[0]
44 if existingIssue.Title == issue.Title && existingIssue.Body == issue.Body {
45 return nil
46 }
47
48 issue.Id = existingIssue.Id
49 issue.IssueId = existingIssue.IssueId
50 return updateIssue(tx, issue)
51 }
52}
53
54func createNewIssue(tx *sql.Tx, issue *models.Issue) error {
55 // get next issue_id
56 var newIssueId int
57 err := tx.QueryRow(`
58 update repo_issue_seqs
59 set next_issue_id = next_issue_id + 1
60 where repo_at = ?
61 returning next_issue_id - 1
62 `, issue.RepoAt).Scan(&newIssueId)
63 if err != nil {
64 return err
65 }
66
67 // insert new issue
68 row := tx.QueryRow(`
69 insert into issues (repo_at, did, rkey, issue_id, title, body)
70 values (?, ?, ?, ?, ?, ?)
71 returning rowid, issue_id
72 `, issue.RepoAt, issue.Did, issue.Rkey, newIssueId, issue.Title, issue.Body)
73
74 err = row.Scan(&issue.Id, &issue.IssueId)
75 if err != nil {
76 return fmt.Errorf("scan row: %w", err)
77 }
78
79 if err := putReferences(tx, issue.AtUri(), issue.References); err != nil {
80 return fmt.Errorf("put reference_links: %w", err)
81 }
82 return nil
83}
84
85func updateIssue(tx *sql.Tx, issue *models.Issue) error {
86 // update existing issue
87 _, err := tx.Exec(`
88 update issues
89 set title = ?, body = ?, edited = ?
90 where did = ? and rkey = ?
91 `, issue.Title, issue.Body, time.Now().Format(time.RFC3339), issue.Did, issue.Rkey)
92 if err != nil {
93 return err
94 }
95
96 if err := putReferences(tx, issue.AtUri(), issue.References); err != nil {
97 return fmt.Errorf("put reference_links: %w", err)
98 }
99 return nil
100}
101
102func GetIssuesPaginated(e Execer, page pagination.Page, filters ...orm.Filter) ([]models.Issue, error) {
103 issueMap := make(map[string]*models.Issue) // at-uri -> issue
104
105 var conditions []string
106 var args []any
107
108 for _, filter := range filters {
109 conditions = append(conditions, filter.Condition())
110 args = append(args, filter.Arg()...)
111 }
112
113 whereClause := ""
114 if conditions != nil {
115 whereClause = " where " + strings.Join(conditions, " and ")
116 }
117
118 pLower := orm.FilterGte("row_num", page.Offset+1)
119 pUpper := orm.FilterLte("row_num", page.Offset+page.Limit)
120
121 pageClause := ""
122 if page.Limit > 0 {
123 args = append(args, pLower.Arg()...)
124 args = append(args, pUpper.Arg()...)
125 pageClause = " where " + pLower.Condition() + " and " + pUpper.Condition()
126 }
127
128 query := fmt.Sprintf(
129 `
130 select * from (
131 select
132 id,
133 did,
134 rkey,
135 repo_at,
136 issue_id,
137 title,
138 body,
139 open,
140 created,
141 edited,
142 deleted,
143 row_number() over (order by created desc) as row_num
144 from
145 issues
146 %s
147 ) ranked_issues
148 %s
149 `,
150 whereClause,
151 pageClause,
152 )
153
154 rows, err := e.Query(query, args...)
155 if err != nil {
156 return nil, fmt.Errorf("failed to query issues table: %w", err)
157 }
158 defer rows.Close()
159
160 for rows.Next() {
161 var issue models.Issue
162 var createdAt string
163 var editedAt, deletedAt sql.Null[string]
164 var rowNum int64
165 err := rows.Scan(
166 &issue.Id,
167 &issue.Did,
168 &issue.Rkey,
169 &issue.RepoAt,
170 &issue.IssueId,
171 &issue.Title,
172 &issue.Body,
173 &issue.Open,
174 &createdAt,
175 &editedAt,
176 &deletedAt,
177 &rowNum,
178 )
179 if err != nil {
180 return nil, fmt.Errorf("failed to scan issue: %w", err)
181 }
182
183 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
184 issue.Created = t
185 }
186
187 if editedAt.Valid {
188 if t, err := time.Parse(time.RFC3339, editedAt.V); err == nil {
189 issue.Edited = &t
190 }
191 }
192
193 if deletedAt.Valid {
194 if t, err := time.Parse(time.RFC3339, deletedAt.V); err == nil {
195 issue.Deleted = &t
196 }
197 }
198
199 atUri := issue.AtUri().String()
200 issueMap[atUri] = &issue
201 }
202
203 // collect reverse repos
204 repoAts := make([]string, 0, len(issueMap)) // or just []string{}
205 for _, issue := range issueMap {
206 repoAts = append(repoAts, string(issue.RepoAt))
207 }
208
209 repos, err := GetRepos(e, 0, orm.FilterIn("at_uri", repoAts))
210 if err != nil {
211 return nil, fmt.Errorf("failed to build repo mappings: %w", err)
212 }
213
214 repoMap := make(map[string]*models.Repo)
215 for i := range repos {
216 repoMap[string(repos[i].RepoAt())] = &repos[i]
217 }
218
219 for issueAt, i := range issueMap {
220 if r, ok := repoMap[string(i.RepoAt)]; ok {
221 i.Repo = r
222 } else {
223 // do not show up the issue if the repo is deleted
224 // TODO: foreign key where?
225 delete(issueMap, issueAt)
226 }
227 }
228
229 // collect comments
230 issueAts := slices.Collect(maps.Keys(issueMap))
231
232 comments, err := GetIssueComments(e, orm.FilterIn("issue_at", issueAts))
233 if err != nil {
234 return nil, fmt.Errorf("failed to query comments: %w", err)
235 }
236 for i := range comments {
237 issueAt := comments[i].IssueAt
238 if issue, ok := issueMap[issueAt]; ok {
239 issue.Comments = append(issue.Comments, comments[i])
240 }
241 }
242
243 // collect allLabels for each issue
244 allLabels, err := GetLabels(e, orm.FilterIn("subject", issueAts))
245 if err != nil {
246 return nil, fmt.Errorf("failed to query labels: %w", err)
247 }
248 for issueAt, labels := range allLabels {
249 if issue, ok := issueMap[issueAt.String()]; ok {
250 issue.Labels = labels
251 }
252 }
253
254 // collect references for each issue
255 allReferencs, err := GetReferencesAll(e, orm.FilterIn("from_at", issueAts))
256 if err != nil {
257 return nil, fmt.Errorf("failed to query reference_links: %w", err)
258 }
259 for issueAt, references := range allReferencs {
260 if issue, ok := issueMap[issueAt.String()]; ok {
261 issue.References = references
262 }
263 }
264
265 var issues []models.Issue
266 for _, i := range issueMap {
267 issues = append(issues, *i)
268 }
269
270 sort.Slice(issues, func(i, j int) bool {
271 return issues[i].Created.After(issues[j].Created)
272 })
273
274 return issues, nil
275}
276
277func GetIssue(e Execer, repoAt syntax.ATURI, issueId int) (*models.Issue, error) {
278 issues, err := GetIssuesPaginated(
279 e,
280 pagination.Page{},
281 orm.FilterEq("repo_at", repoAt),
282 orm.FilterEq("issue_id", issueId),
283 )
284 if err != nil {
285 return nil, err
286 }
287 if len(issues) != 1 {
288 return nil, sql.ErrNoRows
289 }
290
291 return &issues[0], nil
292}
293
294func GetIssues(e Execer, filters ...orm.Filter) ([]models.Issue, error) {
295 return GetIssuesPaginated(e, pagination.Page{}, filters...)
296}
297
298// GetIssueIDs gets list of all existing issue's IDs
299func GetIssueIDs(e Execer, opts models.IssueSearchOptions) ([]int64, error) {
300 var ids []int64
301
302 var filters []orm.Filter
303 openValue := 0
304 if opts.IsOpen {
305 openValue = 1
306 }
307 filters = append(filters, orm.FilterEq("open", openValue))
308 if opts.RepoAt != "" {
309 filters = append(filters, orm.FilterEq("repo_at", opts.RepoAt))
310 }
311
312 var conditions []string
313 var args []any
314
315 for _, filter := range filters {
316 conditions = append(conditions, filter.Condition())
317 args = append(args, filter.Arg()...)
318 }
319
320 whereClause := ""
321 if conditions != nil {
322 whereClause = " where " + strings.Join(conditions, " and ")
323 }
324 query := fmt.Sprintf(
325 `
326 select
327 id
328 from
329 issues
330 %s
331 limit ? offset ?`,
332 whereClause,
333 )
334 args = append(args, opts.Page.Limit, opts.Page.Offset)
335 rows, err := e.Query(query, args...)
336 if err != nil {
337 return nil, err
338 }
339 defer rows.Close()
340
341 for rows.Next() {
342 var id int64
343 err := rows.Scan(&id)
344 if err != nil {
345 return nil, err
346 }
347
348 ids = append(ids, id)
349 }
350
351 return ids, nil
352}
353
354func AddIssueComment(tx *sql.Tx, c models.IssueComment) (int64, error) {
355 result, err := tx.Exec(
356 `insert into issue_comments (
357 did,
358 rkey,
359 issue_at,
360 body,
361 reply_to,
362 created,
363 edited
364 )
365 values (?, ?, ?, ?, ?, ?, null)
366 on conflict(did, rkey) do update set
367 issue_at = excluded.issue_at,
368 body = excluded.body,
369 edited = case
370 when
371 issue_comments.issue_at != excluded.issue_at
372 or issue_comments.body != excluded.body
373 or issue_comments.reply_to != excluded.reply_to
374 then ?
375 else issue_comments.edited
376 end`,
377 c.Did,
378 c.Rkey,
379 c.IssueAt,
380 c.Body,
381 c.ReplyTo,
382 c.Created.Format(time.RFC3339),
383 time.Now().Format(time.RFC3339),
384 )
385 if err != nil {
386 return 0, err
387 }
388
389 id, err := result.LastInsertId()
390 if err != nil {
391 return 0, err
392 }
393
394 if err := putReferences(tx, c.AtUri(), c.References); err != nil {
395 return 0, fmt.Errorf("put reference_links: %w", err)
396 }
397
398 return id, nil
399}
400
401func DeleteIssueComments(e Execer, filters ...orm.Filter) error {
402 var conditions []string
403 var args []any
404 for _, filter := range filters {
405 conditions = append(conditions, filter.Condition())
406 args = append(args, filter.Arg()...)
407 }
408
409 whereClause := ""
410 if conditions != nil {
411 whereClause = " where " + strings.Join(conditions, " and ")
412 }
413
414 query := fmt.Sprintf(`update issue_comments set body = "", deleted = strftime('%%Y-%%m-%%dT%%H:%%M:%%SZ', 'now') %s`, whereClause)
415
416 _, err := e.Exec(query, args...)
417 return err
418}
419
420func GetIssueComments(e Execer, filters ...orm.Filter) ([]models.IssueComment, error) {
421 commentMap := make(map[string]*models.IssueComment)
422
423 var conditions []string
424 var args []any
425 for _, filter := range filters {
426 conditions = append(conditions, filter.Condition())
427 args = append(args, filter.Arg()...)
428 }
429
430 whereClause := ""
431 if conditions != nil {
432 whereClause = " where " + strings.Join(conditions, " and ")
433 }
434
435 query := fmt.Sprintf(`
436 select
437 id,
438 did,
439 rkey,
440 issue_at,
441 reply_to,
442 body,
443 created,
444 edited,
445 deleted
446 from
447 issue_comments
448 %s
449 `, whereClause)
450
451 rows, err := e.Query(query, args...)
452 if err != nil {
453 return nil, err
454 }
455
456 for rows.Next() {
457 var comment models.IssueComment
458 var created string
459 var rkey, edited, deleted, replyTo sql.Null[string]
460 err := rows.Scan(
461 &comment.Id,
462 &comment.Did,
463 &rkey,
464 &comment.IssueAt,
465 &replyTo,
466 &comment.Body,
467 &created,
468 &edited,
469 &deleted,
470 )
471 if err != nil {
472 return nil, err
473 }
474
475 // this is a remnant from old times, newer comments always have rkey
476 if rkey.Valid {
477 comment.Rkey = rkey.V
478 }
479
480 if t, err := time.Parse(time.RFC3339, created); err == nil {
481 comment.Created = t
482 }
483
484 if edited.Valid {
485 if t, err := time.Parse(time.RFC3339, edited.V); err == nil {
486 comment.Edited = &t
487 }
488 }
489
490 if deleted.Valid {
491 if t, err := time.Parse(time.RFC3339, deleted.V); err == nil {
492 comment.Deleted = &t
493 }
494 }
495
496 if replyTo.Valid {
497 comment.ReplyTo = &replyTo.V
498 }
499
500 atUri := comment.AtUri().String()
501 commentMap[atUri] = &comment
502 }
503
504 if err = rows.Err(); err != nil {
505 return nil, err
506 }
507
508 // collect references for each comments
509 commentAts := slices.Collect(maps.Keys(commentMap))
510 allReferencs, err := GetReferencesAll(e, orm.FilterIn("from_at", commentAts))
511 if err != nil {
512 return nil, fmt.Errorf("failed to query reference_links: %w", err)
513 }
514 for commentAt, references := range allReferencs {
515 if comment, ok := commentMap[commentAt.String()]; ok {
516 comment.References = references
517 }
518 }
519
520 var comments []models.IssueComment
521 for _, c := range commentMap {
522 comments = append(comments, *c)
523 }
524
525 sort.Slice(comments, func(i, j int) bool {
526 return comments[i].Created.After(comments[j].Created)
527 })
528
529 return comments, nil
530}
531
532func DeleteIssues(tx *sql.Tx, did, rkey string) error {
533 _, err := tx.Exec(
534 `delete from issues
535 where did = ? and rkey = ?`,
536 did,
537 rkey,
538 )
539 if err != nil {
540 return fmt.Errorf("delete issue: %w", err)
541 }
542
543 uri := syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", did, tangled.RepoIssueNSID, rkey))
544 err = deleteReferences(tx, uri)
545 if err != nil {
546 return fmt.Errorf("delete reference_links: %w", err)
547 }
548
549 return nil
550}
551
552func CloseIssues(e Execer, filters ...orm.Filter) error {
553 var conditions []string
554 var args []any
555 for _, filter := range filters {
556 conditions = append(conditions, filter.Condition())
557 args = append(args, filter.Arg()...)
558 }
559
560 whereClause := ""
561 if conditions != nil {
562 whereClause = " where " + strings.Join(conditions, " and ")
563 }
564
565 query := fmt.Sprintf(`update issues set open = 0 %s`, whereClause)
566 _, err := e.Exec(query, args...)
567 return err
568}
569
570func ReopenIssues(e Execer, filters ...orm.Filter) error {
571 var conditions []string
572 var args []any
573 for _, filter := range filters {
574 conditions = append(conditions, filter.Condition())
575 args = append(args, filter.Arg()...)
576 }
577
578 whereClause := ""
579 if conditions != nil {
580 whereClause = " where " + strings.Join(conditions, " and ")
581 }
582
583 query := fmt.Sprintf(`update issues set open = 1 %s`, whereClause)
584 _, err := e.Exec(query, args...)
585 return err
586}
587
588func GetIssueCount(e Execer, repoAt syntax.ATURI) (models.IssueCount, error) {
589 row := e.QueryRow(`
590 select
591 count(case when open = 1 then 1 end) as open_count,
592 count(case when open = 0 then 1 end) as closed_count
593 from issues
594 where repo_at = ?`,
595 repoAt,
596 )
597
598 var count models.IssueCount
599 if err := row.Scan(&count.Open, &count.Closed); err != nil {
600 return models.IssueCount{}, err
601 }
602
603 return count, nil
604}