forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log"
8 "reflect"
9 "strings"
10
11 _ "github.com/mattn/go-sqlite3"
12)
13
14type DB struct {
15 *sql.DB
16}
17
18type Execer interface {
19 Query(query string, args ...any) (*sql.Rows, error)
20 QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
21 QueryRow(query string, args ...any) *sql.Row
22 QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
23 Exec(query string, args ...any) (sql.Result, error)
24 ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
25 Prepare(query string) (*sql.Stmt, error)
26 PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
27}
28
29func Make(dbPath string) (*DB, error) {
30 db, err := sql.Open("sqlite3", dbPath)
31 if err != nil {
32 return nil, err
33 }
34 _, err = db.Exec(`
35 pragma journal_mode = WAL;
36 pragma synchronous = normal;
37 pragma foreign_keys = on;
38 pragma temp_store = memory;
39 pragma mmap_size = 30000000000;
40 pragma page_size = 32768;
41 pragma auto_vacuum = incremental;
42 pragma busy_timeout = 5000;
43
44 create table if not exists registrations (
45 id integer primary key autoincrement,
46 domain text not null unique,
47 did text not null,
48 secret text not null,
49 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
50 registered text
51 );
52 create table if not exists public_keys (
53 id integer primary key autoincrement,
54 did text not null,
55 name text not null,
56 key text not null,
57 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
58 unique(did, name, key)
59 );
60 create table if not exists repos (
61 id integer primary key autoincrement,
62 did text not null,
63 name text not null,
64 knot text not null,
65 rkey text not null,
66 at_uri text not null unique,
67 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
68 unique(did, name, knot, rkey)
69 );
70 create table if not exists collaborators (
71 id integer primary key autoincrement,
72 did text not null,
73 repo integer not null,
74 foreign key (repo) references repos(id) on delete cascade
75 );
76 create table if not exists follows (
77 user_did text not null,
78 subject_did text not null,
79 rkey text not null,
80 followed_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
81 primary key (user_did, subject_did),
82 check (user_did <> subject_did)
83 );
84 create table if not exists issues (
85 id integer primary key autoincrement,
86 owner_did text not null,
87 repo_at text not null,
88 issue_id integer not null,
89 title text not null,
90 body text not null,
91 open integer not null default 1,
92 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
93 issue_at text,
94 unique(repo_at, issue_id),
95 foreign key (repo_at) references repos(at_uri) on delete cascade
96 );
97 create table if not exists comments (
98 id integer primary key autoincrement,
99 owner_did text not null,
100 issue_id integer not null,
101 repo_at text not null,
102 comment_id integer not null,
103 comment_at text not null,
104 body text not null,
105 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
106 unique(issue_id, comment_id),
107 foreign key (repo_at, issue_id) references issues(repo_at, issue_id) on delete cascade
108 );
109 create table if not exists pulls (
110 -- identifiers
111 id integer primary key autoincrement,
112 pull_id integer not null,
113
114 -- at identifiers
115 repo_at text not null,
116 owner_did text not null,
117 rkey text not null,
118 pull_at text,
119
120 -- content
121 title text not null,
122 body text not null,
123 target_branch text not null,
124 state integer not null default 0 check (state in (0, 1, 2)), -- open, merged, closed
125
126 -- meta
127 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
128
129 -- constraints
130 unique(repo_at, pull_id),
131 foreign key (repo_at) references repos(at_uri) on delete cascade
132 );
133
134 -- every pull must have atleast 1 submission: the initial submission
135 create table if not exists pull_submissions (
136 -- identifiers
137 id integer primary key autoincrement,
138 pull_id integer not null,
139
140 -- at identifiers
141 repo_at text not null,
142
143 -- content, these are immutable, and require a resubmission to update
144 round_number integer not null default 0,
145 patch text,
146
147 -- meta
148 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
149
150 -- constraints
151 unique(repo_at, pull_id, round_number),
152 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade
153 );
154
155 create table if not exists pull_comments (
156 -- identifiers
157 id integer primary key autoincrement,
158 pull_id integer not null,
159 submission_id integer not null,
160
161 -- at identifiers
162 repo_at text not null,
163 owner_did text not null,
164 comment_at text not null,
165
166 -- content
167 body text not null,
168
169 -- meta
170 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
171
172 -- constraints
173 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade,
174 foreign key (submission_id) references pull_submissions(id) on delete cascade
175 );
176
177 create table if not exists _jetstream (
178 id integer primary key autoincrement,
179 last_time_us integer not null
180 );
181
182 create table if not exists repo_issue_seqs (
183 repo_at text primary key,
184 next_issue_id integer not null default 1
185 );
186
187 create table if not exists repo_pull_seqs (
188 repo_at text primary key,
189 next_pull_id integer not null default 1
190 );
191
192 create table if not exists stars (
193 id integer primary key autoincrement,
194 starred_by_did text not null,
195 repo_at text not null,
196 rkey text not null,
197 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
198 foreign key (repo_at) references repos(at_uri) on delete cascade,
199 unique(starred_by_did, repo_at)
200 );
201
202 create table if not exists reactions (
203 id integer primary key autoincrement,
204 reacted_by_did text not null,
205 thread_at text not null,
206 kind text not null,
207 rkey text not null,
208 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
209 unique(reacted_by_did, thread_at, kind)
210 );
211
212 create table if not exists emails (
213 id integer primary key autoincrement,
214 did text not null,
215 email text not null,
216 verified integer not null default 0,
217 verification_code text not null,
218 last_sent text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
219 is_primary integer not null default 0,
220 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
221 unique(did, email)
222 );
223
224 create table if not exists artifacts (
225 -- id
226 id integer primary key autoincrement,
227 did text not null,
228 rkey text not null,
229
230 -- meta
231 repo_at text not null,
232 tag binary(20) not null,
233 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
234
235 -- data
236 blob_cid text not null,
237 name text not null,
238 size integer not null default 0,
239 mimetype string not null default "*/*",
240
241 -- constraints
242 unique(did, rkey), -- record must be unique
243 unique(repo_at, tag, name), -- for a given tag object, each file must be unique
244 foreign key (repo_at) references repos(at_uri) on delete cascade
245 );
246
247 create table if not exists profile (
248 -- id
249 id integer primary key autoincrement,
250 did text not null,
251
252 -- data
253 description text not null,
254 include_bluesky integer not null default 0,
255 location text,
256
257 -- constraints
258 unique(did)
259 );
260 create table if not exists profile_links (
261 -- id
262 id integer primary key autoincrement,
263 did text not null,
264
265 -- data
266 link text not null,
267
268 -- constraints
269 foreign key (did) references profile(did) on delete cascade
270 );
271 create table if not exists profile_stats (
272 -- id
273 id integer primary key autoincrement,
274 did text not null,
275
276 -- data
277 kind text not null check (kind in (
278 "merged-pull-request-count",
279 "closed-pull-request-count",
280 "open-pull-request-count",
281 "open-issue-count",
282 "closed-issue-count",
283 "repository-count"
284 )),
285
286 -- constraints
287 foreign key (did) references profile(did) on delete cascade
288 );
289 create table if not exists profile_pinned_repositories (
290 -- id
291 id integer primary key autoincrement,
292 did text not null,
293
294 -- data
295 at_uri text not null,
296
297 -- constraints
298 unique(did, at_uri),
299 foreign key (did) references profile(did) on delete cascade,
300 foreign key (at_uri) references repos(at_uri) on delete cascade
301 );
302
303 create table if not exists oauth_requests (
304 id integer primary key autoincrement,
305 auth_server_iss text not null,
306 state text not null,
307 did text not null,
308 handle text not null,
309 pds_url text not null,
310 pkce_verifier text not null,
311 dpop_auth_server_nonce text not null,
312 dpop_private_jwk text not null
313 );
314
315 create table if not exists oauth_sessions (
316 id integer primary key autoincrement,
317 did text not null,
318 handle text not null,
319 pds_url text not null,
320 auth_server_iss text not null,
321 access_jwt text not null,
322 refresh_jwt text not null,
323 dpop_pds_nonce text,
324 dpop_auth_server_nonce text not null,
325 dpop_private_jwk text not null,
326 expiry text not null
327 );
328
329 create table if not exists punchcard (
330 did text not null,
331 date text not null, -- yyyy-mm-dd
332 count integer,
333 primary key (did, date)
334 );
335
336 create table if not exists spindles (
337 id integer primary key autoincrement,
338 owner text not null,
339 instance text not null,
340 verified text, -- time of verification
341 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
342
343 unique(owner, instance)
344 );
345
346 create table if not exists spindle_members (
347 -- identifiers for the record
348 id integer primary key autoincrement,
349 did text not null,
350 rkey text not null,
351
352 -- data
353 instance text not null,
354 subject text not null,
355 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
356
357 -- constraints
358 unique (did, instance, subject)
359 );
360
361 create table if not exists pipelines (
362 -- identifiers
363 id integer primary key autoincrement,
364 knot text not null,
365 rkey text not null,
366
367 repo_owner text not null,
368 repo_name text not null,
369
370 -- every pipeline must be associated with exactly one commit
371 sha text not null check (length(sha) = 40),
372 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
373
374 -- trigger data
375 trigger_id integer not null,
376
377 unique(knot, rkey),
378 foreign key (trigger_id) references triggers(id) on delete cascade
379 );
380
381 create table if not exists triggers (
382 -- primary key
383 id integer primary key autoincrement,
384
385 -- top-level fields
386 kind text not null,
387
388 -- pushTriggerData fields
389 push_ref text,
390 push_new_sha text check (length(push_new_sha) = 40),
391 push_old_sha text check (length(push_old_sha) = 40),
392
393 -- pullRequestTriggerData fields
394 pr_source_branch text,
395 pr_target_branch text,
396 pr_source_sha text check (length(pr_source_sha) = 40),
397 pr_action text
398 );
399
400 create table if not exists pipeline_statuses (
401 -- identifiers
402 id integer primary key autoincrement,
403 spindle text not null,
404 rkey text not null,
405
406 -- referenced pipeline. these form the (did, rkey) pair
407 pipeline_knot text not null,
408 pipeline_rkey text not null,
409
410 -- content
411 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
412 workflow text not null,
413 status text not null,
414 error text,
415 exit_code integer not null default 0,
416
417 unique (spindle, rkey),
418 foreign key (pipeline_knot, pipeline_rkey)
419 references pipelines (knot, rkey)
420 on delete cascade
421 );
422
423 create table if not exists repo_languages (
424 -- identifiers
425 id integer primary key autoincrement,
426
427 -- repo identifiers
428 repo_at text not null,
429 ref text not null,
430 is_default_ref integer not null default 0,
431
432 -- language breakdown
433 language text not null,
434 bytes integer not null check (bytes >= 0),
435
436 unique(repo_at, ref, language)
437 );
438
439 create table if not exists signups_inflight (
440 id integer primary key autoincrement,
441 email text not null unique,
442 invite_code text not null,
443 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
444 );
445
446 create table if not exists strings (
447 -- identifiers
448 did text not null,
449 rkey text not null,
450
451 -- content
452 filename text not null,
453 description text,
454 content text not null,
455 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
456 edited text,
457
458 primary key (did, rkey)
459 );
460
461 create table if not exists migrations (
462 id integer primary key autoincrement,
463 name text unique
464 );
465 `)
466 if err != nil {
467 return nil, err
468 }
469
470 // run migrations
471 runMigration(db, "add-description-to-repos", func(tx *sql.Tx) error {
472 tx.Exec(`
473 alter table repos add column description text check (length(description) <= 200);
474 `)
475 return nil
476 })
477
478 runMigration(db, "add-rkey-to-pubkeys", func(tx *sql.Tx) error {
479 // add unconstrained column
480 _, err := tx.Exec(`
481 alter table public_keys
482 add column rkey text;
483 `)
484 if err != nil {
485 return err
486 }
487
488 // backfill
489 _, err = tx.Exec(`
490 update public_keys
491 set rkey = ''
492 where rkey is null;
493 `)
494 if err != nil {
495 return err
496 }
497
498 return nil
499 })
500
501 runMigration(db, "add-rkey-to-comments", func(tx *sql.Tx) error {
502 _, err := tx.Exec(`
503 alter table comments drop column comment_at;
504 alter table comments add column rkey text;
505 `)
506 return err
507 })
508
509 runMigration(db, "add-deleted-and-edited-to-issue-comments", func(tx *sql.Tx) error {
510 _, err := tx.Exec(`
511 alter table comments add column deleted text; -- timestamp
512 alter table comments add column edited text; -- timestamp
513 `)
514 return err
515 })
516
517 runMigration(db, "add-source-info-to-pulls-and-submissions", func(tx *sql.Tx) error {
518 _, err := tx.Exec(`
519 alter table pulls add column source_branch text;
520 alter table pulls add column source_repo_at text;
521 alter table pull_submissions add column source_rev text;
522 `)
523 return err
524 })
525
526 runMigration(db, "add-source-to-repos", func(tx *sql.Tx) error {
527 _, err := tx.Exec(`
528 alter table repos add column source text;
529 `)
530 return err
531 })
532
533 // disable foreign-keys for the next migration
534 // NOTE: this cannot be done in a transaction, so it is run outside [0]
535 //
536 // [0]: https://sqlite.org/pragma.html#pragma_foreign_keys
537 db.Exec("pragma foreign_keys = off;")
538 runMigration(db, "recreate-pulls-column-for-stacking-support", func(tx *sql.Tx) error {
539 _, err := tx.Exec(`
540 create table pulls_new (
541 -- identifiers
542 id integer primary key autoincrement,
543 pull_id integer not null,
544
545 -- at identifiers
546 repo_at text not null,
547 owner_did text not null,
548 rkey text not null,
549
550 -- content
551 title text not null,
552 body text not null,
553 target_branch text not null,
554 state integer not null default 0 check (state in (0, 1, 2, 3)), -- closed, open, merged, deleted
555
556 -- source info
557 source_branch text,
558 source_repo_at text,
559
560 -- stacking
561 stack_id text,
562 change_id text,
563 parent_change_id text,
564
565 -- meta
566 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
567
568 -- constraints
569 unique(repo_at, pull_id),
570 foreign key (repo_at) references repos(at_uri) on delete cascade
571 );
572
573 insert into pulls_new (
574 id, pull_id,
575 repo_at, owner_did, rkey,
576 title, body, target_branch, state,
577 source_branch, source_repo_at,
578 created
579 )
580 select
581 id, pull_id,
582 repo_at, owner_did, rkey,
583 title, body, target_branch, state,
584 source_branch, source_repo_at,
585 created
586 FROM pulls;
587
588 drop table pulls;
589 alter table pulls_new rename to pulls;
590 `)
591 return err
592 })
593 db.Exec("pragma foreign_keys = on;")
594
595 // run migrations
596 runMigration(db, "add-spindle-to-repos", func(tx *sql.Tx) error {
597 tx.Exec(`
598 alter table repos add column spindle text;
599 `)
600 return nil
601 })
602
603 // recreate and add rkey + created columns with default constraint
604 runMigration(db, "rework-collaborators-table", func(tx *sql.Tx) error {
605 // create new table
606 // - repo_at instead of repo integer
607 // - rkey field
608 // - created field
609 _, err := tx.Exec(`
610 create table collaborators_new (
611 -- identifiers for the record
612 id integer primary key autoincrement,
613 did text not null,
614 rkey text,
615
616 -- content
617 subject_did text not null,
618 repo_at text not null,
619
620 -- meta
621 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
622
623 -- constraints
624 foreign key (repo_at) references repos(at_uri) on delete cascade
625 )
626 `)
627 if err != nil {
628 return err
629 }
630
631 // copy data
632 _, err = tx.Exec(`
633 insert into collaborators_new (id, did, rkey, subject_did, repo_at)
634 select
635 c.id,
636 r.did,
637 '',
638 c.did,
639 r.at_uri
640 from collaborators c
641 join repos r on c.repo = r.id
642 `)
643 if err != nil {
644 return err
645 }
646
647 // drop old table
648 _, err = tx.Exec(`drop table collaborators`)
649 if err != nil {
650 return err
651 }
652
653 // rename new table
654 _, err = tx.Exec(`alter table collaborators_new rename to collaborators`)
655 return err
656 })
657
658 return &DB{db}, nil
659}
660
661type migrationFn = func(*sql.Tx) error
662
663func runMigration(d *sql.DB, name string, migrationFn migrationFn) error {
664 tx, err := d.Begin()
665 if err != nil {
666 return err
667 }
668 defer tx.Rollback()
669
670 var exists bool
671 err = tx.QueryRow("select exists (select 1 from migrations where name = ?)", name).Scan(&exists)
672 if err != nil {
673 return err
674 }
675
676 if !exists {
677 // run migration
678 err = migrationFn(tx)
679 if err != nil {
680 log.Printf("Failed to run migration %s: %v", name, err)
681 return err
682 }
683
684 // mark migration as complete
685 _, err = tx.Exec("insert into migrations (name) values (?)", name)
686 if err != nil {
687 log.Printf("Failed to mark migration %s as complete: %v", name, err)
688 return err
689 }
690
691 // commit the transaction
692 if err := tx.Commit(); err != nil {
693 return err
694 }
695
696 log.Printf("migration %s applied successfully", name)
697 } else {
698 log.Printf("skipped migration %s, already applied", name)
699 }
700
701 return nil
702}
703
704type filter struct {
705 key string
706 arg any
707 cmp string
708}
709
710func newFilter(key, cmp string, arg any) filter {
711 return filter{
712 key: key,
713 arg: arg,
714 cmp: cmp,
715 }
716}
717
718func FilterEq(key string, arg any) filter { return newFilter(key, "=", arg) }
719func FilterNotEq(key string, arg any) filter { return newFilter(key, "<>", arg) }
720func FilterGte(key string, arg any) filter { return newFilter(key, ">=", arg) }
721func FilterLte(key string, arg any) filter { return newFilter(key, "<=", arg) }
722func FilterIs(key string, arg any) filter { return newFilter(key, "is", arg) }
723func FilterIsNot(key string, arg any) filter { return newFilter(key, "is not", arg) }
724func FilterIn(key string, arg any) filter { return newFilter(key, "in", arg) }
725
726func (f filter) Condition() string {
727 rv := reflect.ValueOf(f.arg)
728 kind := rv.Kind()
729
730 // if we have `FilterIn(k, [1, 2, 3])`, compile it down to `k in (?, ?, ?)`
731 if kind == reflect.Slice || kind == reflect.Array {
732 if rv.Len() == 0 {
733 // always false
734 return "1 = 0"
735 }
736
737 placeholders := make([]string, rv.Len())
738 for i := range placeholders {
739 placeholders[i] = "?"
740 }
741
742 return fmt.Sprintf("%s %s (%s)", f.key, f.cmp, strings.Join(placeholders, ", "))
743 }
744
745 return fmt.Sprintf("%s %s ?", f.key, f.cmp)
746}
747
748func (f filter) Arg() []any {
749 rv := reflect.ValueOf(f.arg)
750 kind := rv.Kind()
751 if kind == reflect.Slice || kind == reflect.Array {
752 if rv.Len() == 0 {
753 return nil
754 }
755
756 out := make([]any, rv.Len())
757 for i := range rv.Len() {
758 out[i] = rv.Index(i).Interface()
759 }
760 return out
761 }
762
763 return []any{f.arg}
764}