forked from tangled.org/core
this repo has no description
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log" 8 "reflect" 9 "strings" 10 11 _ "github.com/mattn/go-sqlite3" 12) 13 14type DB struct { 15 *sql.DB 16} 17 18type Execer interface { 19 Query(query string, args ...any) (*sql.Rows, error) 20 QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) 21 QueryRow(query string, args ...any) *sql.Row 22 QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row 23 Exec(query string, args ...any) (sql.Result, error) 24 ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) 25 Prepare(query string) (*sql.Stmt, error) 26 PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) 27} 28 29func Make(dbPath string) (*DB, error) { 30 // https://github.com/mattn/go-sqlite3#connection-string 31 opts := []string{ 32 "_foreign_keys=1", 33 "_journal_mode=WAL", 34 "_synchronous=NORMAL", 35 "_auto_vacuum=incremental", 36 } 37 38 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 39 if err != nil { 40 return nil, err 41 } 42 43 ctx := context.Background() 44 45 conn, err := db.Conn(ctx) 46 if err != nil { 47 return nil, err 48 } 49 defer conn.Close() 50 51 _, err = conn.ExecContext(ctx, ` 52 create table if not exists registrations ( 53 id integer primary key autoincrement, 54 domain text not null unique, 55 did text not null, 56 secret text not null, 57 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 58 registered text 59 ); 60 create table if not exists public_keys ( 61 id integer primary key autoincrement, 62 did text not null, 63 name text not null, 64 key text not null, 65 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 66 unique(did, name, key) 67 ); 68 create table if not exists repos ( 69 id integer primary key autoincrement, 70 did text not null, 71 name text not null, 72 knot text not null, 73 rkey text not null, 74 at_uri text not null unique, 75 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 76 unique(did, name, knot, rkey) 77 ); 78 create table if not exists collaborators ( 79 id integer primary key autoincrement, 80 did text not null, 81 repo integer not null, 82 foreign key (repo) references repos(id) on delete cascade 83 ); 84 create table if not exists follows ( 85 user_did text not null, 86 subject_did text not null, 87 rkey text not null, 88 followed_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 89 primary key (user_did, subject_did), 90 check (user_did <> subject_did) 91 ); 92 create table if not exists issues ( 93 id integer primary key autoincrement, 94 owner_did text not null, 95 repo_at text not null, 96 issue_id integer not null, 97 title text not null, 98 body text not null, 99 open integer not null default 1, 100 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 101 issue_at text, 102 unique(repo_at, issue_id), 103 foreign key (repo_at) references repos(at_uri) on delete cascade 104 ); 105 create table if not exists comments ( 106 id integer primary key autoincrement, 107 owner_did text not null, 108 issue_id integer not null, 109 repo_at text not null, 110 comment_id integer not null, 111 comment_at text not null, 112 body text not null, 113 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 114 unique(issue_id, comment_id), 115 foreign key (repo_at, issue_id) references issues(repo_at, issue_id) on delete cascade 116 ); 117 create table if not exists pulls ( 118 -- identifiers 119 id integer primary key autoincrement, 120 pull_id integer not null, 121 122 -- at identifiers 123 repo_at text not null, 124 owner_did text not null, 125 rkey text not null, 126 pull_at text, 127 128 -- content 129 title text not null, 130 body text not null, 131 target_branch text not null, 132 state integer not null default 0 check (state in (0, 1, 2)), -- open, merged, closed 133 134 -- meta 135 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 136 137 -- constraints 138 unique(repo_at, pull_id), 139 foreign key (repo_at) references repos(at_uri) on delete cascade 140 ); 141 142 -- every pull must have atleast 1 submission: the initial submission 143 create table if not exists pull_submissions ( 144 -- identifiers 145 id integer primary key autoincrement, 146 pull_id integer not null, 147 148 -- at identifiers 149 repo_at text not null, 150 151 -- content, these are immutable, and require a resubmission to update 152 round_number integer not null default 0, 153 patch text, 154 155 -- meta 156 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 157 158 -- constraints 159 unique(repo_at, pull_id, round_number), 160 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade 161 ); 162 163 create table if not exists pull_comments ( 164 -- identifiers 165 id integer primary key autoincrement, 166 pull_id integer not null, 167 submission_id integer not null, 168 169 -- at identifiers 170 repo_at text not null, 171 owner_did text not null, 172 comment_at text not null, 173 174 -- content 175 body text not null, 176 177 -- meta 178 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 179 180 -- constraints 181 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade, 182 foreign key (submission_id) references pull_submissions(id) on delete cascade 183 ); 184 185 create table if not exists _jetstream ( 186 id integer primary key autoincrement, 187 last_time_us integer not null 188 ); 189 190 create table if not exists repo_issue_seqs ( 191 repo_at text primary key, 192 next_issue_id integer not null default 1 193 ); 194 195 create table if not exists repo_pull_seqs ( 196 repo_at text primary key, 197 next_pull_id integer not null default 1 198 ); 199 200 create table if not exists stars ( 201 id integer primary key autoincrement, 202 starred_by_did text not null, 203 repo_at text not null, 204 rkey text not null, 205 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 206 foreign key (repo_at) references repos(at_uri) on delete cascade, 207 unique(starred_by_did, repo_at) 208 ); 209 210 create table if not exists reactions ( 211 id integer primary key autoincrement, 212 reacted_by_did text not null, 213 thread_at text not null, 214 kind text not null, 215 rkey text not null, 216 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 217 unique(reacted_by_did, thread_at, kind) 218 ); 219 220 create table if not exists emails ( 221 id integer primary key autoincrement, 222 did text not null, 223 email text not null, 224 verified integer not null default 0, 225 verification_code text not null, 226 last_sent text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 227 is_primary integer not null default 0, 228 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 229 unique(did, email) 230 ); 231 232 create table if not exists artifacts ( 233 -- id 234 id integer primary key autoincrement, 235 did text not null, 236 rkey text not null, 237 238 -- meta 239 repo_at text not null, 240 tag binary(20) not null, 241 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 242 243 -- data 244 blob_cid text not null, 245 name text not null, 246 size integer not null default 0, 247 mimetype string not null default "*/*", 248 249 -- constraints 250 unique(did, rkey), -- record must be unique 251 unique(repo_at, tag, name), -- for a given tag object, each file must be unique 252 foreign key (repo_at) references repos(at_uri) on delete cascade 253 ); 254 255 create table if not exists profile ( 256 -- id 257 id integer primary key autoincrement, 258 did text not null, 259 260 -- data 261 description text not null, 262 include_bluesky integer not null default 0, 263 location text, 264 265 -- constraints 266 unique(did) 267 ); 268 create table if not exists profile_links ( 269 -- id 270 id integer primary key autoincrement, 271 did text not null, 272 273 -- data 274 link text not null, 275 276 -- constraints 277 foreign key (did) references profile(did) on delete cascade 278 ); 279 create table if not exists profile_stats ( 280 -- id 281 id integer primary key autoincrement, 282 did text not null, 283 284 -- data 285 kind text not null check (kind in ( 286 "merged-pull-request-count", 287 "closed-pull-request-count", 288 "open-pull-request-count", 289 "open-issue-count", 290 "closed-issue-count", 291 "repository-count" 292 )), 293 294 -- constraints 295 foreign key (did) references profile(did) on delete cascade 296 ); 297 create table if not exists profile_pinned_repositories ( 298 -- id 299 id integer primary key autoincrement, 300 did text not null, 301 302 -- data 303 at_uri text not null, 304 305 -- constraints 306 unique(did, at_uri), 307 foreign key (did) references profile(did) on delete cascade, 308 foreign key (at_uri) references repos(at_uri) on delete cascade 309 ); 310 311 create table if not exists oauth_requests ( 312 id integer primary key autoincrement, 313 auth_server_iss text not null, 314 state text not null, 315 did text not null, 316 handle text not null, 317 pds_url text not null, 318 pkce_verifier text not null, 319 dpop_auth_server_nonce text not null, 320 dpop_private_jwk text not null 321 ); 322 323 create table if not exists oauth_sessions ( 324 id integer primary key autoincrement, 325 did text not null, 326 handle text not null, 327 pds_url text not null, 328 auth_server_iss text not null, 329 access_jwt text not null, 330 refresh_jwt text not null, 331 dpop_pds_nonce text, 332 dpop_auth_server_nonce text not null, 333 dpop_private_jwk text not null, 334 expiry text not null 335 ); 336 337 create table if not exists punchcard ( 338 did text not null, 339 date text not null, -- yyyy-mm-dd 340 count integer, 341 primary key (did, date) 342 ); 343 344 create table if not exists spindles ( 345 id integer primary key autoincrement, 346 owner text not null, 347 instance text not null, 348 verified text, -- time of verification 349 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 350 351 unique(owner, instance) 352 ); 353 354 create table if not exists spindle_members ( 355 -- identifiers for the record 356 id integer primary key autoincrement, 357 did text not null, 358 rkey text not null, 359 360 -- data 361 instance text not null, 362 subject text not null, 363 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 364 365 -- constraints 366 unique (did, instance, subject) 367 ); 368 369 create table if not exists pipelines ( 370 -- identifiers 371 id integer primary key autoincrement, 372 knot text not null, 373 rkey text not null, 374 375 repo_owner text not null, 376 repo_name text not null, 377 378 -- every pipeline must be associated with exactly one commit 379 sha text not null check (length(sha) = 40), 380 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 381 382 -- trigger data 383 trigger_id integer not null, 384 385 unique(knot, rkey), 386 foreign key (trigger_id) references triggers(id) on delete cascade 387 ); 388 389 create table if not exists triggers ( 390 -- primary key 391 id integer primary key autoincrement, 392 393 -- top-level fields 394 kind text not null, 395 396 -- pushTriggerData fields 397 push_ref text, 398 push_new_sha text check (length(push_new_sha) = 40), 399 push_old_sha text check (length(push_old_sha) = 40), 400 401 -- pullRequestTriggerData fields 402 pr_source_branch text, 403 pr_target_branch text, 404 pr_source_sha text check (length(pr_source_sha) = 40), 405 pr_action text 406 ); 407 408 create table if not exists pipeline_statuses ( 409 -- identifiers 410 id integer primary key autoincrement, 411 spindle text not null, 412 rkey text not null, 413 414 -- referenced pipeline. these form the (did, rkey) pair 415 pipeline_knot text not null, 416 pipeline_rkey text not null, 417 418 -- content 419 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 420 workflow text not null, 421 status text not null, 422 error text, 423 exit_code integer not null default 0, 424 425 unique (spindle, rkey), 426 foreign key (pipeline_knot, pipeline_rkey) 427 references pipelines (knot, rkey) 428 on delete cascade 429 ); 430 431 create table if not exists repo_languages ( 432 -- identifiers 433 id integer primary key autoincrement, 434 435 -- repo identifiers 436 repo_at text not null, 437 ref text not null, 438 is_default_ref integer not null default 0, 439 440 -- language breakdown 441 language text not null, 442 bytes integer not null check (bytes >= 0), 443 444 unique(repo_at, ref, language) 445 ); 446 447 create table if not exists signups_inflight ( 448 id integer primary key autoincrement, 449 email text not null unique, 450 invite_code text not null, 451 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) 452 ); 453 454 create table if not exists strings ( 455 -- identifiers 456 did text not null, 457 rkey text not null, 458 459 -- content 460 filename text not null, 461 description text, 462 content text not null, 463 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 464 edited text, 465 466 primary key (did, rkey) 467 ); 468 469 create table if not exists migrations ( 470 id integer primary key autoincrement, 471 name text unique 472 ); 473 `) 474 if err != nil { 475 return nil, err 476 } 477 478 // run migrations 479 runMigration(conn, "add-description-to-repos", func(tx *sql.Tx) error { 480 tx.Exec(` 481 alter table repos add column description text check (length(description) <= 200); 482 `) 483 return nil 484 }) 485 486 runMigration(conn, "add-rkey-to-pubkeys", func(tx *sql.Tx) error { 487 // add unconstrained column 488 _, err := tx.Exec(` 489 alter table public_keys 490 add column rkey text; 491 `) 492 if err != nil { 493 return err 494 } 495 496 // backfill 497 _, err = tx.Exec(` 498 update public_keys 499 set rkey = '' 500 where rkey is null; 501 `) 502 if err != nil { 503 return err 504 } 505 506 return nil 507 }) 508 509 runMigration(conn, "add-rkey-to-comments", func(tx *sql.Tx) error { 510 _, err := tx.Exec(` 511 alter table comments drop column comment_at; 512 alter table comments add column rkey text; 513 `) 514 return err 515 }) 516 517 runMigration(conn, "add-deleted-and-edited-to-issue-comments", func(tx *sql.Tx) error { 518 _, err := tx.Exec(` 519 alter table comments add column deleted text; -- timestamp 520 alter table comments add column edited text; -- timestamp 521 `) 522 return err 523 }) 524 525 runMigration(conn, "add-source-info-to-pulls-and-submissions", func(tx *sql.Tx) error { 526 _, err := tx.Exec(` 527 alter table pulls add column source_branch text; 528 alter table pulls add column source_repo_at text; 529 alter table pull_submissions add column source_rev text; 530 `) 531 return err 532 }) 533 534 runMigration(conn, "add-source-to-repos", func(tx *sql.Tx) error { 535 _, err := tx.Exec(` 536 alter table repos add column source text; 537 `) 538 return err 539 }) 540 541 // disable foreign-keys for the next migration 542 // NOTE: this cannot be done in a transaction, so it is run outside [0] 543 // 544 // [0]: https://sqlite.org/pragma.html#pragma_foreign_keys 545 conn.ExecContext(ctx, "pragma foreign_keys = off;") 546 runMigration(conn, "recreate-pulls-column-for-stacking-support", func(tx *sql.Tx) error { 547 _, err := tx.Exec(` 548 create table pulls_new ( 549 -- identifiers 550 id integer primary key autoincrement, 551 pull_id integer not null, 552 553 -- at identifiers 554 repo_at text not null, 555 owner_did text not null, 556 rkey text not null, 557 558 -- content 559 title text not null, 560 body text not null, 561 target_branch text not null, 562 state integer not null default 0 check (state in (0, 1, 2, 3)), -- closed, open, merged, deleted 563 564 -- source info 565 source_branch text, 566 source_repo_at text, 567 568 -- stacking 569 stack_id text, 570 change_id text, 571 parent_change_id text, 572 573 -- meta 574 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 575 576 -- constraints 577 unique(repo_at, pull_id), 578 foreign key (repo_at) references repos(at_uri) on delete cascade 579 ); 580 581 insert into pulls_new ( 582 id, pull_id, 583 repo_at, owner_did, rkey, 584 title, body, target_branch, state, 585 source_branch, source_repo_at, 586 created 587 ) 588 select 589 id, pull_id, 590 repo_at, owner_did, rkey, 591 title, body, target_branch, state, 592 source_branch, source_repo_at, 593 created 594 FROM pulls; 595 596 drop table pulls; 597 alter table pulls_new rename to pulls; 598 `) 599 return err 600 }) 601 conn.ExecContext(ctx, "pragma foreign_keys = on;") 602 603 // run migrations 604 runMigration(conn, "add-spindle-to-repos", func(tx *sql.Tx) error { 605 tx.Exec(` 606 alter table repos add column spindle text; 607 `) 608 return nil 609 }) 610 611 // recreate and add rkey + created columns with default constraint 612 runMigration(conn, "rework-collaborators-table", func(tx *sql.Tx) error { 613 // create new table 614 // - repo_at instead of repo integer 615 // - rkey field 616 // - created field 617 _, err := tx.Exec(` 618 create table collaborators_new ( 619 -- identifiers for the record 620 id integer primary key autoincrement, 621 did text not null, 622 rkey text, 623 624 -- content 625 subject_did text not null, 626 repo_at text not null, 627 628 -- meta 629 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 630 631 -- constraints 632 foreign key (repo_at) references repos(at_uri) on delete cascade 633 ) 634 `) 635 if err != nil { 636 return err 637 } 638 639 // copy data 640 _, err = tx.Exec(` 641 insert into collaborators_new (id, did, rkey, subject_did, repo_at) 642 select 643 c.id, 644 r.did, 645 '', 646 c.did, 647 r.at_uri 648 from collaborators c 649 join repos r on c.repo = r.id 650 `) 651 if err != nil { 652 return err 653 } 654 655 // drop old table 656 _, err = tx.Exec(`drop table collaborators`) 657 if err != nil { 658 return err 659 } 660 661 // rename new table 662 _, err = tx.Exec(`alter table collaborators_new rename to collaborators`) 663 return err 664 }) 665 666 return &DB{db}, nil 667} 668 669type migrationFn = func(*sql.Tx) error 670 671func runMigration(c *sql.Conn, name string, migrationFn migrationFn) error { 672 tx, err := c.BeginTx(context.Background(), nil) 673 if err != nil { 674 return err 675 } 676 defer tx.Rollback() 677 678 var exists bool 679 err = tx.QueryRow("select exists (select 1 from migrations where name = ?)", name).Scan(&exists) 680 if err != nil { 681 return err 682 } 683 684 if !exists { 685 // run migration 686 err = migrationFn(tx) 687 if err != nil { 688 log.Printf("Failed to run migration %s: %v", name, err) 689 return err 690 } 691 692 // mark migration as complete 693 _, err = tx.Exec("insert into migrations (name) values (?)", name) 694 if err != nil { 695 log.Printf("Failed to mark migration %s as complete: %v", name, err) 696 return err 697 } 698 699 // commit the transaction 700 if err := tx.Commit(); err != nil { 701 return err 702 } 703 704 log.Printf("migration %s applied successfully", name) 705 } else { 706 log.Printf("skipped migration %s, already applied", name) 707 } 708 709 return nil 710} 711 712type filter struct { 713 key string 714 arg any 715 cmp string 716} 717 718func newFilter(key, cmp string, arg any) filter { 719 return filter{ 720 key: key, 721 arg: arg, 722 cmp: cmp, 723 } 724} 725 726func FilterEq(key string, arg any) filter { return newFilter(key, "=", arg) } 727func FilterNotEq(key string, arg any) filter { return newFilter(key, "<>", arg) } 728func FilterGte(key string, arg any) filter { return newFilter(key, ">=", arg) } 729func FilterLte(key string, arg any) filter { return newFilter(key, "<=", arg) } 730func FilterIs(key string, arg any) filter { return newFilter(key, "is", arg) } 731func FilterIsNot(key string, arg any) filter { return newFilter(key, "is not", arg) } 732func FilterIn(key string, arg any) filter { return newFilter(key, "in", arg) } 733 734func (f filter) Condition() string { 735 rv := reflect.ValueOf(f.arg) 736 kind := rv.Kind() 737 738 // if we have `FilterIn(k, [1, 2, 3])`, compile it down to `k in (?, ?, ?)` 739 if (kind == reflect.Slice && rv.Type().Elem().Kind() != reflect.Uint8) || kind == reflect.Array { 740 if rv.Len() == 0 { 741 // always false 742 return "1 = 0" 743 } 744 745 placeholders := make([]string, rv.Len()) 746 for i := range placeholders { 747 placeholders[i] = "?" 748 } 749 750 return fmt.Sprintf("%s %s (%s)", f.key, f.cmp, strings.Join(placeholders, ", ")) 751 } 752 753 return fmt.Sprintf("%s %s ?", f.key, f.cmp) 754} 755 756func (f filter) Arg() []any { 757 rv := reflect.ValueOf(f.arg) 758 kind := rv.Kind() 759 if (kind == reflect.Slice && rv.Type().Elem().Kind() != reflect.Uint8) || kind == reflect.Array { 760 if rv.Len() == 0 { 761 return nil 762 } 763 764 out := make([]any, rv.Len()) 765 for i := range rv.Len() { 766 out[i] = rv.Index(i).Interface() 767 } 768 return out 769 } 770 771 return []any{f.arg} 772}