forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log" 8 "reflect" 9 "strings" 10 11 _ "github.com/mattn/go-sqlite3" 12) 13 14type DB struct { 15 *sql.DB 16} 17 18type Execer interface { 19 Query(query string, args ...any) (*sql.Rows, error) 20 QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) 21 QueryRow(query string, args ...any) *sql.Row 22 QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row 23 Exec(query string, args ...any) (sql.Result, error) 24 ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) 25 Prepare(query string) (*sql.Stmt, error) 26 PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) 27} 28 29func Make(dbPath string) (*DB, error) { 30 db, err := sql.Open("sqlite3", dbPath) 31 if err != nil { 32 return nil, err 33 } 34 _, err = db.Exec(` 35 pragma journal_mode = WAL; 36 pragma synchronous = normal; 37 pragma foreign_keys = on; 38 pragma temp_store = memory; 39 pragma mmap_size = 30000000000; 40 pragma page_size = 32768; 41 pragma auto_vacuum = incremental; 42 pragma busy_timeout = 5000; 43 44 create table if not exists registrations ( 45 id integer primary key autoincrement, 46 domain text not null unique, 47 did text not null, 48 secret text not null, 49 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 50 registered text 51 ); 52 create table if not exists public_keys ( 53 id integer primary key autoincrement, 54 did text not null, 55 name text not null, 56 key text not null, 57 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 58 unique(did, name, key) 59 ); 60 create table if not exists repos ( 61 id integer primary key autoincrement, 62 did text not null, 63 name text not null, 64 knot text not null, 65 rkey text not null, 66 at_uri text not null unique, 67 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 68 unique(did, name, knot, rkey) 69 ); 70 create table if not exists collaborators ( 71 id integer primary key autoincrement, 72 did text not null, 73 repo integer not null, 74 foreign key (repo) references repos(id) on delete cascade 75 ); 76 create table if not exists follows ( 77 user_did text not null, 78 subject_did text not null, 79 rkey text not null, 80 followed_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 81 primary key (user_did, subject_did), 82 check (user_did <> subject_did) 83 ); 84 create table if not exists issues ( 85 id integer primary key autoincrement, 86 owner_did text not null, 87 repo_at text not null, 88 issue_id integer not null, 89 title text not null, 90 body text not null, 91 open integer not null default 1, 92 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 93 issue_at text, 94 unique(repo_at, issue_id), 95 foreign key (repo_at) references repos(at_uri) on delete cascade 96 ); 97 create table if not exists comments ( 98 id integer primary key autoincrement, 99 owner_did text not null, 100 issue_id integer not null, 101 repo_at text not null, 102 comment_id integer not null, 103 comment_at text not null, 104 body text not null, 105 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 106 unique(issue_id, comment_id), 107 foreign key (repo_at, issue_id) references issues(repo_at, issue_id) on delete cascade 108 ); 109 create table if not exists pulls ( 110 -- identifiers 111 id integer primary key autoincrement, 112 pull_id integer not null, 113 114 -- at identifiers 115 repo_at text not null, 116 owner_did text not null, 117 rkey text not null, 118 pull_at text, 119 120 -- content 121 title text not null, 122 body text not null, 123 target_branch text not null, 124 state integer not null default 0 check (state in (0, 1, 2)), -- open, merged, closed 125 126 -- meta 127 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 128 129 -- constraints 130 unique(repo_at, pull_id), 131 foreign key (repo_at) references repos(at_uri) on delete cascade 132 ); 133 134 -- every pull must have atleast 1 submission: the initial submission 135 create table if not exists pull_submissions ( 136 -- identifiers 137 id integer primary key autoincrement, 138 pull_id integer not null, 139 140 -- at identifiers 141 repo_at text not null, 142 143 -- content, these are immutable, and require a resubmission to update 144 round_number integer not null default 0, 145 patch text, 146 147 -- meta 148 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 149 150 -- constraints 151 unique(repo_at, pull_id, round_number), 152 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade 153 ); 154 155 create table if not exists pull_comments ( 156 -- identifiers 157 id integer primary key autoincrement, 158 pull_id integer not null, 159 submission_id integer not null, 160 161 -- at identifiers 162 repo_at text not null, 163 owner_did text not null, 164 comment_at text not null, 165 166 -- content 167 body text not null, 168 169 -- meta 170 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 171 172 -- constraints 173 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade, 174 foreign key (submission_id) references pull_submissions(id) on delete cascade 175 ); 176 177 create table if not exists _jetstream ( 178 id integer primary key autoincrement, 179 last_time_us integer not null 180 ); 181 182 create table if not exists repo_issue_seqs ( 183 repo_at text primary key, 184 next_issue_id integer not null default 1 185 ); 186 187 create table if not exists repo_pull_seqs ( 188 repo_at text primary key, 189 next_pull_id integer not null default 1 190 ); 191 192 create table if not exists stars ( 193 id integer primary key autoincrement, 194 starred_by_did text not null, 195 repo_at text not null, 196 rkey text not null, 197 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 198 foreign key (repo_at) references repos(at_uri) on delete cascade, 199 unique(starred_by_did, repo_at) 200 ); 201 202 create table if not exists reactions ( 203 id integer primary key autoincrement, 204 reacted_by_did text not null, 205 thread_at text not null, 206 kind text not null, 207 rkey text not null, 208 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 209 unique(reacted_by_did, thread_at, kind) 210 ); 211 212 create table if not exists emails ( 213 id integer primary key autoincrement, 214 did text not null, 215 email text not null, 216 verified integer not null default 0, 217 verification_code text not null, 218 last_sent text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 219 is_primary integer not null default 0, 220 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 221 unique(did, email) 222 ); 223 224 create table if not exists artifacts ( 225 -- id 226 id integer primary key autoincrement, 227 did text not null, 228 rkey text not null, 229 230 -- meta 231 repo_at text not null, 232 tag binary(20) not null, 233 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 234 235 -- data 236 blob_cid text not null, 237 name text not null, 238 size integer not null default 0, 239 mimetype string not null default "*/*", 240 241 -- constraints 242 unique(did, rkey), -- record must be unique 243 unique(repo_at, tag, name), -- for a given tag object, each file must be unique 244 foreign key (repo_at) references repos(at_uri) on delete cascade 245 ); 246 247 create table if not exists profile ( 248 -- id 249 id integer primary key autoincrement, 250 did text not null, 251 252 -- data 253 description text not null, 254 include_bluesky integer not null default 0, 255 location text, 256 257 -- constraints 258 unique(did) 259 ); 260 create table if not exists profile_links ( 261 -- id 262 id integer primary key autoincrement, 263 did text not null, 264 265 -- data 266 link text not null, 267 268 -- constraints 269 foreign key (did) references profile(did) on delete cascade 270 ); 271 create table if not exists profile_stats ( 272 -- id 273 id integer primary key autoincrement, 274 did text not null, 275 276 -- data 277 kind text not null check (kind in ( 278 "merged-pull-request-count", 279 "closed-pull-request-count", 280 "open-pull-request-count", 281 "open-issue-count", 282 "closed-issue-count", 283 "repository-count" 284 )), 285 286 -- constraints 287 foreign key (did) references profile(did) on delete cascade 288 ); 289 create table if not exists profile_pinned_repositories ( 290 -- id 291 id integer primary key autoincrement, 292 did text not null, 293 294 -- data 295 at_uri text not null, 296 297 -- constraints 298 unique(did, at_uri), 299 foreign key (did) references profile(did) on delete cascade, 300 foreign key (at_uri) references repos(at_uri) on delete cascade 301 ); 302 303 create table if not exists oauth_requests ( 304 id integer primary key autoincrement, 305 auth_server_iss text not null, 306 state text not null, 307 did text not null, 308 handle text not null, 309 pds_url text not null, 310 pkce_verifier text not null, 311 dpop_auth_server_nonce text not null, 312 dpop_private_jwk text not null 313 ); 314 315 create table if not exists oauth_sessions ( 316 id integer primary key autoincrement, 317 did text not null, 318 handle text not null, 319 pds_url text not null, 320 auth_server_iss text not null, 321 access_jwt text not null, 322 refresh_jwt text not null, 323 dpop_pds_nonce text, 324 dpop_auth_server_nonce text not null, 325 dpop_private_jwk text not null, 326 expiry text not null 327 ); 328 329 create table if not exists punchcard ( 330 did text not null, 331 date text not null, -- yyyy-mm-dd 332 count integer, 333 primary key (did, date) 334 ); 335 336 create table if not exists spindles ( 337 id integer primary key autoincrement, 338 owner text not null, 339 instance text not null, 340 verified text, -- time of verification 341 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 342 343 unique(owner, instance) 344 ); 345 346 create table if not exists spindle_members ( 347 -- identifiers for the record 348 id integer primary key autoincrement, 349 did text not null, 350 rkey text not null, 351 352 -- data 353 instance text not null, 354 subject text not null, 355 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 356 357 -- constraints 358 unique (did, instance, subject) 359 ); 360 361 create table if not exists pipelines ( 362 -- identifiers 363 id integer primary key autoincrement, 364 knot text not null, 365 rkey text not null, 366 367 repo_owner text not null, 368 repo_name text not null, 369 370 -- every pipeline must be associated with exactly one commit 371 sha text not null check (length(sha) = 40), 372 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 373 374 -- trigger data 375 trigger_id integer not null, 376 377 unique(knot, rkey), 378 foreign key (trigger_id) references triggers(id) on delete cascade 379 ); 380 381 create table if not exists triggers ( 382 -- primary key 383 id integer primary key autoincrement, 384 385 -- top-level fields 386 kind text not null, 387 388 -- pushTriggerData fields 389 push_ref text, 390 push_new_sha text check (length(push_new_sha) = 40), 391 push_old_sha text check (length(push_old_sha) = 40), 392 393 -- pullRequestTriggerData fields 394 pr_source_branch text, 395 pr_target_branch text, 396 pr_source_sha text check (length(pr_source_sha) = 40), 397 pr_action text 398 ); 399 400 create table if not exists pipeline_statuses ( 401 -- identifiers 402 id integer primary key autoincrement, 403 spindle text not null, 404 rkey text not null, 405 406 -- referenced pipeline. these form the (did, rkey) pair 407 pipeline_knot text not null, 408 pipeline_rkey text not null, 409 410 -- content 411 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 412 workflow text not null, 413 status text not null, 414 error text, 415 exit_code integer not null default 0, 416 417 unique (spindle, rkey), 418 foreign key (pipeline_knot, pipeline_rkey) 419 references pipelines (knot, rkey) 420 on delete cascade 421 ); 422 423 create table if not exists repo_languages ( 424 -- identifiers 425 id integer primary key autoincrement, 426 427 -- repo identifiers 428 repo_at text not null, 429 ref text not null, 430 is_default_ref integer not null default 0, 431 432 -- language breakdown 433 language text not null, 434 bytes integer not null check (bytes >= 0), 435 436 unique(repo_at, ref, language) 437 ); 438 439 create table if not exists signups_inflight ( 440 id integer primary key autoincrement, 441 email text not null unique, 442 invite_code text not null, 443 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) 444 ); 445 446 create table if not exists strings ( 447 -- identifiers 448 did text not null, 449 rkey text not null, 450 451 -- content 452 filename text not null, 453 description text, 454 content text not null, 455 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 456 edited text, 457 458 primary key (did, rkey) 459 ); 460 461 create table if not exists migrations ( 462 id integer primary key autoincrement, 463 name text unique 464 ); 465 `) 466 if err != nil { 467 return nil, err 468 } 469 470 // run migrations 471 runMigration(db, "add-description-to-repos", func(tx *sql.Tx) error { 472 tx.Exec(` 473 alter table repos add column description text check (length(description) <= 200); 474 `) 475 return nil 476 }) 477 478 runMigration(db, "add-rkey-to-pubkeys", func(tx *sql.Tx) error { 479 // add unconstrained column 480 _, err := tx.Exec(` 481 alter table public_keys 482 add column rkey text; 483 `) 484 if err != nil { 485 return err 486 } 487 488 // backfill 489 _, err = tx.Exec(` 490 update public_keys 491 set rkey = '' 492 where rkey is null; 493 `) 494 if err != nil { 495 return err 496 } 497 498 return nil 499 }) 500 501 runMigration(db, "add-rkey-to-comments", func(tx *sql.Tx) error { 502 _, err := tx.Exec(` 503 alter table comments drop column comment_at; 504 alter table comments add column rkey text; 505 `) 506 return err 507 }) 508 509 runMigration(db, "add-deleted-and-edited-to-issue-comments", func(tx *sql.Tx) error { 510 _, err := tx.Exec(` 511 alter table comments add column deleted text; -- timestamp 512 alter table comments add column edited text; -- timestamp 513 `) 514 return err 515 }) 516 517 runMigration(db, "add-source-info-to-pulls-and-submissions", func(tx *sql.Tx) error { 518 _, err := tx.Exec(` 519 alter table pulls add column source_branch text; 520 alter table pulls add column source_repo_at text; 521 alter table pull_submissions add column source_rev text; 522 `) 523 return err 524 }) 525 526 runMigration(db, "add-source-to-repos", func(tx *sql.Tx) error { 527 _, err := tx.Exec(` 528 alter table repos add column source text; 529 `) 530 return err 531 }) 532 533 // disable foreign-keys for the next migration 534 // NOTE: this cannot be done in a transaction, so it is run outside [0] 535 // 536 // [0]: https://sqlite.org/pragma.html#pragma_foreign_keys 537 db.Exec("pragma foreign_keys = off;") 538 runMigration(db, "recreate-pulls-column-for-stacking-support", func(tx *sql.Tx) error { 539 _, err := tx.Exec(` 540 create table pulls_new ( 541 -- identifiers 542 id integer primary key autoincrement, 543 pull_id integer not null, 544 545 -- at identifiers 546 repo_at text not null, 547 owner_did text not null, 548 rkey text not null, 549 550 -- content 551 title text not null, 552 body text not null, 553 target_branch text not null, 554 state integer not null default 0 check (state in (0, 1, 2, 3)), -- closed, open, merged, deleted 555 556 -- source info 557 source_branch text, 558 source_repo_at text, 559 560 -- stacking 561 stack_id text, 562 change_id text, 563 parent_change_id text, 564 565 -- meta 566 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 567 568 -- constraints 569 unique(repo_at, pull_id), 570 foreign key (repo_at) references repos(at_uri) on delete cascade 571 ); 572 573 insert into pulls_new ( 574 id, pull_id, 575 repo_at, owner_did, rkey, 576 title, body, target_branch, state, 577 source_branch, source_repo_at, 578 created 579 ) 580 select 581 id, pull_id, 582 repo_at, owner_did, rkey, 583 title, body, target_branch, state, 584 source_branch, source_repo_at, 585 created 586 FROM pulls; 587 588 drop table pulls; 589 alter table pulls_new rename to pulls; 590 `) 591 return err 592 }) 593 db.Exec("pragma foreign_keys = on;") 594 595 // run migrations 596 runMigration(db, "add-spindle-to-repos", func(tx *sql.Tx) error { 597 tx.Exec(` 598 alter table repos add column spindle text; 599 `) 600 return nil 601 }) 602 603 // recreate and add rkey + created columns with default constraint 604 runMigration(db, "rework-collaborators-table", func(tx *sql.Tx) error { 605 // create new table 606 // - repo_at instead of repo integer 607 // - rkey field 608 // - created field 609 _, err := tx.Exec(` 610 create table collaborators_new ( 611 -- identifiers for the record 612 id integer primary key autoincrement, 613 did text not null, 614 rkey text, 615 616 -- content 617 subject_did text not null, 618 repo_at text not null, 619 620 -- meta 621 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 622 623 -- constraints 624 foreign key (repo_at) references repos(at_uri) on delete cascade 625 ) 626 `) 627 if err != nil { 628 return err 629 } 630 631 // copy data 632 _, err = tx.Exec(` 633 insert into collaborators_new (id, did, rkey, subject_did, repo_at) 634 select 635 c.id, 636 r.did, 637 '', 638 c.did, 639 r.at_uri 640 from collaborators c 641 join repos r on c.repo = r.id 642 `) 643 if err != nil { 644 return err 645 } 646 647 // drop old table 648 _, err = tx.Exec(`drop table collaborators`) 649 if err != nil { 650 return err 651 } 652 653 // rename new table 654 _, err = tx.Exec(`alter table collaborators_new rename to collaborators`) 655 return err 656 }) 657 658 return &DB{db}, nil 659} 660 661type migrationFn = func(*sql.Tx) error 662 663func runMigration(d *sql.DB, name string, migrationFn migrationFn) error { 664 tx, err := d.Begin() 665 if err != nil { 666 return err 667 } 668 defer tx.Rollback() 669 670 var exists bool 671 err = tx.QueryRow("select exists (select 1 from migrations where name = ?)", name).Scan(&exists) 672 if err != nil { 673 return err 674 } 675 676 if !exists { 677 // run migration 678 err = migrationFn(tx) 679 if err != nil { 680 log.Printf("Failed to run migration %s: %v", name, err) 681 return err 682 } 683 684 // mark migration as complete 685 _, err = tx.Exec("insert into migrations (name) values (?)", name) 686 if err != nil { 687 log.Printf("Failed to mark migration %s as complete: %v", name, err) 688 return err 689 } 690 691 // commit the transaction 692 if err := tx.Commit(); err != nil { 693 return err 694 } 695 696 log.Printf("migration %s applied successfully", name) 697 } else { 698 log.Printf("skipped migration %s, already applied", name) 699 } 700 701 return nil 702} 703 704type filter struct { 705 key string 706 arg any 707 cmp string 708} 709 710func newFilter(key, cmp string, arg any) filter { 711 return filter{ 712 key: key, 713 arg: arg, 714 cmp: cmp, 715 } 716} 717 718func FilterEq(key string, arg any) filter { return newFilter(key, "=", arg) } 719func FilterNotEq(key string, arg any) filter { return newFilter(key, "<>", arg) } 720func FilterGte(key string, arg any) filter { return newFilter(key, ">=", arg) } 721func FilterLte(key string, arg any) filter { return newFilter(key, "<=", arg) } 722func FilterIs(key string, arg any) filter { return newFilter(key, "is", arg) } 723func FilterIsNot(key string, arg any) filter { return newFilter(key, "is not", arg) } 724func FilterIn(key string, arg any) filter { return newFilter(key, "in", arg) } 725 726func (f filter) Condition() string { 727 rv := reflect.ValueOf(f.arg) 728 kind := rv.Kind() 729 730 // if we have `FilterIn(k, [1, 2, 3])`, compile it down to `k in (?, ?, ?)` 731 if kind == reflect.Slice || kind == reflect.Array { 732 if rv.Len() == 0 { 733 // always false 734 return "1 = 0" 735 } 736 737 placeholders := make([]string, rv.Len()) 738 for i := range placeholders { 739 placeholders[i] = "?" 740 } 741 742 return fmt.Sprintf("%s %s (%s)", f.key, f.cmp, strings.Join(placeholders, ", ")) 743 } 744 745 return fmt.Sprintf("%s %s ?", f.key, f.cmp) 746} 747 748func (f filter) Arg() []any { 749 rv := reflect.ValueOf(f.arg) 750 kind := rv.Kind() 751 if kind == reflect.Slice || kind == reflect.Array { 752 if rv.Len() == 0 { 753 return nil 754 } 755 756 out := make([]any, rv.Len()) 757 for i := range rv.Len() { 758 out[i] = rv.Index(i).Interface() 759 } 760 return out 761 } 762 763 return []any{f.arg} 764}