forked from tangled.org/core
this repo has no description
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log" 8 "reflect" 9 "strings" 10 11 _ "github.com/mattn/go-sqlite3" 12) 13 14type DB struct { 15 *sql.DB 16} 17 18type Execer interface { 19 Query(query string, args ...any) (*sql.Rows, error) 20 QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) 21 QueryRow(query string, args ...any) *sql.Row 22 QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row 23 Exec(query string, args ...any) (sql.Result, error) 24 ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) 25 Prepare(query string) (*sql.Stmt, error) 26 PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) 27} 28 29func Make(dbPath string) (*DB, error) { 30 db, err := sql.Open("sqlite3", dbPath) 31 if err != nil { 32 return nil, err 33 } 34 _, err = db.Exec(` 35 pragma journal_mode = WAL; 36 pragma synchronous = normal; 37 pragma foreign_keys = on; 38 pragma temp_store = memory; 39 pragma mmap_size = 30000000000; 40 pragma page_size = 32768; 41 pragma auto_vacuum = incremental; 42 pragma busy_timeout = 5000; 43 44 create table if not exists registrations ( 45 id integer primary key autoincrement, 46 domain text not null unique, 47 did text not null, 48 secret text not null, 49 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 50 registered text 51 ); 52 create table if not exists public_keys ( 53 id integer primary key autoincrement, 54 did text not null, 55 name text not null, 56 key text not null, 57 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 58 unique(did, name, key) 59 ); 60 create table if not exists repos ( 61 id integer primary key autoincrement, 62 did text not null, 63 name text not null, 64 knot text not null, 65 rkey text not null, 66 at_uri text not null unique, 67 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 68 unique(did, name, knot, rkey) 69 ); 70 create table if not exists collaborators ( 71 id integer primary key autoincrement, 72 did text not null, 73 repo integer not null, 74 foreign key (repo) references repos(id) on delete cascade 75 ); 76 create table if not exists follows ( 77 user_did text not null, 78 subject_did text not null, 79 rkey text not null, 80 followed_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 81 primary key (user_did, subject_did), 82 check (user_did <> subject_did) 83 ); 84 create table if not exists issues ( 85 id integer primary key autoincrement, 86 owner_did text not null, 87 repo_at text not null, 88 issue_id integer not null, 89 title text not null, 90 body text not null, 91 open integer not null default 1, 92 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 93 issue_at text, 94 unique(repo_at, issue_id), 95 foreign key (repo_at) references repos(at_uri) on delete cascade 96 ); 97 create table if not exists comments ( 98 id integer primary key autoincrement, 99 owner_did text not null, 100 issue_id integer not null, 101 repo_at text not null, 102 comment_id integer not null, 103 comment_at text not null, 104 body text not null, 105 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 106 unique(issue_id, comment_id), 107 foreign key (repo_at, issue_id) references issues(repo_at, issue_id) on delete cascade 108 ); 109 create table if not exists pulls ( 110 -- identifiers 111 id integer primary key autoincrement, 112 pull_id integer not null, 113 114 -- at identifiers 115 repo_at text not null, 116 owner_did text not null, 117 rkey text not null, 118 pull_at text, 119 120 -- content 121 title text not null, 122 body text not null, 123 target_branch text not null, 124 state integer not null default 0 check (state in (0, 1, 2)), -- open, merged, closed 125 126 -- meta 127 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 128 129 -- constraints 130 unique(repo_at, pull_id), 131 foreign key (repo_at) references repos(at_uri) on delete cascade 132 ); 133 134 -- every pull must have atleast 1 submission: the initial submission 135 create table if not exists pull_submissions ( 136 -- identifiers 137 id integer primary key autoincrement, 138 pull_id integer not null, 139 140 -- at identifiers 141 repo_at text not null, 142 143 -- content, these are immutable, and require a resubmission to update 144 round_number integer not null default 0, 145 patch text, 146 147 -- meta 148 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 149 150 -- constraints 151 unique(repo_at, pull_id, round_number), 152 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade 153 ); 154 155 create table if not exists pull_comments ( 156 -- identifiers 157 id integer primary key autoincrement, 158 pull_id integer not null, 159 submission_id integer not null, 160 161 -- at identifiers 162 repo_at text not null, 163 owner_did text not null, 164 comment_at text not null, 165 166 -- content 167 body text not null, 168 169 -- meta 170 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 171 172 -- constraints 173 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade, 174 foreign key (submission_id) references pull_submissions(id) on delete cascade 175 ); 176 177 create table if not exists _jetstream ( 178 id integer primary key autoincrement, 179 last_time_us integer not null 180 ); 181 182 create table if not exists repo_issue_seqs ( 183 repo_at text primary key, 184 next_issue_id integer not null default 1 185 ); 186 187 create table if not exists repo_pull_seqs ( 188 repo_at text primary key, 189 next_pull_id integer not null default 1 190 ); 191 192 create table if not exists stars ( 193 id integer primary key autoincrement, 194 starred_by_did text not null, 195 repo_at text not null, 196 rkey text not null, 197 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 198 foreign key (repo_at) references repos(at_uri) on delete cascade, 199 unique(starred_by_did, repo_at) 200 ); 201 202 create table if not exists emails ( 203 id integer primary key autoincrement, 204 did text not null, 205 email text not null, 206 verified integer not null default 0, 207 verification_code text not null, 208 last_sent text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 209 is_primary integer not null default 0, 210 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 211 unique(did, email) 212 ); 213 214 create table if not exists artifacts ( 215 -- id 216 id integer primary key autoincrement, 217 did text not null, 218 rkey text not null, 219 220 -- meta 221 repo_at text not null, 222 tag binary(20) not null, 223 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 224 225 -- data 226 blob_cid text not null, 227 name text not null, 228 size integer not null default 0, 229 mimetype string not null default "*/*", 230 231 -- constraints 232 unique(did, rkey), -- record must be unique 233 unique(repo_at, tag, name), -- for a given tag object, each file must be unique 234 foreign key (repo_at) references repos(at_uri) on delete cascade 235 ); 236 237 create table if not exists profile ( 238 -- id 239 id integer primary key autoincrement, 240 did text not null, 241 242 -- data 243 description text not null, 244 include_bluesky integer not null default 0, 245 location text, 246 247 -- constraints 248 unique(did) 249 ); 250 create table if not exists profile_links ( 251 -- id 252 id integer primary key autoincrement, 253 did text not null, 254 255 -- data 256 link text not null, 257 258 -- constraints 259 foreign key (did) references profile(did) on delete cascade 260 ); 261 create table if not exists profile_stats ( 262 -- id 263 id integer primary key autoincrement, 264 did text not null, 265 266 -- data 267 kind text not null check (kind in ( 268 "merged-pull-request-count", 269 "closed-pull-request-count", 270 "open-pull-request-count", 271 "open-issue-count", 272 "closed-issue-count", 273 "repository-count" 274 )), 275 276 -- constraints 277 foreign key (did) references profile(did) on delete cascade 278 ); 279 create table if not exists profile_pinned_repositories ( 280 -- id 281 id integer primary key autoincrement, 282 did text not null, 283 284 -- data 285 at_uri text not null, 286 287 -- constraints 288 unique(did, at_uri), 289 foreign key (did) references profile(did) on delete cascade, 290 foreign key (at_uri) references repos(at_uri) on delete cascade 291 ); 292 293 create table if not exists oauth_requests ( 294 id integer primary key autoincrement, 295 auth_server_iss text not null, 296 state text not null, 297 did text not null, 298 handle text not null, 299 pds_url text not null, 300 pkce_verifier text not null, 301 dpop_auth_server_nonce text not null, 302 dpop_private_jwk text not null 303 ); 304 305 create table if not exists oauth_sessions ( 306 id integer primary key autoincrement, 307 did text not null, 308 handle text not null, 309 pds_url text not null, 310 auth_server_iss text not null, 311 access_jwt text not null, 312 refresh_jwt text not null, 313 dpop_pds_nonce text, 314 dpop_auth_server_nonce text not null, 315 dpop_private_jwk text not null, 316 expiry text not null 317 ); 318 319 create table if not exists punchcard ( 320 did text not null, 321 date text not null, -- yyyy-mm-dd 322 count integer, 323 primary key (did, date) 324 ); 325 326 create table if not exists spindles ( 327 id integer primary key autoincrement, 328 owner text not null, 329 instance text not null, 330 verified text, -- time of verification 331 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 332 333 unique(instance) 334 ); 335 336 create table if not exists pipelines ( 337 -- identifiers 338 id integer primary key autoincrement, 339 knot text not null, 340 rkey text not null, 341 342 repo_owner text not null, 343 repo_name text not null, 344 345 -- every pipeline must be associated with exactly one commit 346 sha text not null check (length(sha) = 40), 347 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 348 349 -- trigger data 350 trigger_id integer not null, 351 352 unique(knot, rkey), 353 foreign key (trigger_id) references triggers(id) on delete cascade 354 ); 355 356 create table if not exists triggers ( 357 -- primary key 358 id integer primary key autoincrement, 359 360 -- top-level fields 361 kind text not null, 362 363 -- pushTriggerData fields 364 push_ref text, 365 push_new_sha text check (length(push_new_sha) = 40), 366 push_old_sha text check (length(push_old_sha) = 40), 367 368 -- pullRequestTriggerData fields 369 pr_source_branch text, 370 pr_target_branch text, 371 pr_source_sha text check (length(pr_source_sha) = 40), 372 pr_action text 373 ); 374 375 create table if not exists pipeline_statuses ( 376 -- identifiers 377 id integer primary key autoincrement, 378 spindle text not null, 379 rkey text not null, 380 381 -- referenced pipeline. these form the (did, rkey) pair 382 pipeline_knot text not null, 383 pipeline_rkey text not null, 384 385 -- content 386 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 387 workflow text not null, 388 status text not null, 389 error text, 390 exit_code integer not null default 0, 391 392 unique (spindle, rkey), 393 foreign key (pipeline_knot, pipeline_rkey) 394 references pipelines (knot, rkey) 395 on delete cascade 396 ); 397 398 create table if not exists migrations ( 399 id integer primary key autoincrement, 400 name text unique 401 ); 402 `) 403 if err != nil { 404 return nil, err 405 } 406 407 // run migrations 408 runMigration(db, "add-description-to-repos", func(tx *sql.Tx) error { 409 tx.Exec(` 410 alter table repos add column description text check (length(description) <= 200); 411 `) 412 return nil 413 }) 414 415 runMigration(db, "add-rkey-to-pubkeys", func(tx *sql.Tx) error { 416 // add unconstrained column 417 _, err := tx.Exec(` 418 alter table public_keys 419 add column rkey text; 420 `) 421 if err != nil { 422 return err 423 } 424 425 // backfill 426 _, err = tx.Exec(` 427 update public_keys 428 set rkey = '' 429 where rkey is null; 430 `) 431 if err != nil { 432 return err 433 } 434 435 return nil 436 }) 437 438 runMigration(db, "add-rkey-to-comments", func(tx *sql.Tx) error { 439 _, err := tx.Exec(` 440 alter table comments drop column comment_at; 441 alter table comments add column rkey text; 442 `) 443 return err 444 }) 445 446 runMigration(db, "add-deleted-and-edited-to-issue-comments", func(tx *sql.Tx) error { 447 _, err := tx.Exec(` 448 alter table comments add column deleted text; -- timestamp 449 alter table comments add column edited text; -- timestamp 450 `) 451 return err 452 }) 453 454 runMigration(db, "add-source-info-to-pulls-and-submissions", func(tx *sql.Tx) error { 455 _, err := tx.Exec(` 456 alter table pulls add column source_branch text; 457 alter table pulls add column source_repo_at text; 458 alter table pull_submissions add column source_rev text; 459 `) 460 return err 461 }) 462 463 runMigration(db, "add-source-to-repos", func(tx *sql.Tx) error { 464 _, err := tx.Exec(` 465 alter table repos add column source text; 466 `) 467 return err 468 }) 469 470 // disable foreign-keys for the next migration 471 // NOTE: this cannot be done in a transaction, so it is run outside [0] 472 // 473 // [0]: https://sqlite.org/pragma.html#pragma_foreign_keys 474 db.Exec("pragma foreign_keys = off;") 475 runMigration(db, "recreate-pulls-column-for-stacking-support", func(tx *sql.Tx) error { 476 _, err := tx.Exec(` 477 create table pulls_new ( 478 -- identifiers 479 id integer primary key autoincrement, 480 pull_id integer not null, 481 482 -- at identifiers 483 repo_at text not null, 484 owner_did text not null, 485 rkey text not null, 486 487 -- content 488 title text not null, 489 body text not null, 490 target_branch text not null, 491 state integer not null default 0 check (state in (0, 1, 2, 3)), -- closed, open, merged, deleted 492 493 -- source info 494 source_branch text, 495 source_repo_at text, 496 497 -- stacking 498 stack_id text, 499 change_id text, 500 parent_change_id text, 501 502 -- meta 503 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 504 505 -- constraints 506 unique(repo_at, pull_id), 507 foreign key (repo_at) references repos(at_uri) on delete cascade 508 ); 509 510 insert into pulls_new ( 511 id, pull_id, 512 repo_at, owner_did, rkey, 513 title, body, target_branch, state, 514 source_branch, source_repo_at, 515 created 516 ) 517 select 518 id, pull_id, 519 repo_at, owner_did, rkey, 520 title, body, target_branch, state, 521 source_branch, source_repo_at, 522 created 523 FROM pulls; 524 525 drop table pulls; 526 alter table pulls_new rename to pulls; 527 `) 528 return err 529 }) 530 db.Exec("pragma foreign_keys = on;") 531 532 // run migrations 533 runMigration(db, "add-spindle-to-repos", func(tx *sql.Tx) error { 534 tx.Exec(` 535 alter table repos add column spindle text; 536 `) 537 return nil 538 }) 539 540 return &DB{db}, nil 541} 542 543type migrationFn = func(*sql.Tx) error 544 545func runMigration(d *sql.DB, name string, migrationFn migrationFn) error { 546 tx, err := d.Begin() 547 if err != nil { 548 return err 549 } 550 defer tx.Rollback() 551 552 var exists bool 553 err = tx.QueryRow("select exists (select 1 from migrations where name = ?)", name).Scan(&exists) 554 if err != nil { 555 return err 556 } 557 558 if !exists { 559 // run migration 560 err = migrationFn(tx) 561 if err != nil { 562 log.Printf("Failed to run migration %s: %v", name, err) 563 return err 564 } 565 566 // mark migration as complete 567 _, err = tx.Exec("insert into migrations (name) values (?)", name) 568 if err != nil { 569 log.Printf("Failed to mark migration %s as complete: %v", name, err) 570 return err 571 } 572 573 // commit the transaction 574 if err := tx.Commit(); err != nil { 575 return err 576 } 577 578 log.Printf("migration %s applied successfully", name) 579 } else { 580 log.Printf("skipped migration %s, already applied", name) 581 } 582 583 return nil 584} 585 586type filter struct { 587 key string 588 arg any 589 cmp string 590} 591 592func newFilter(key, cmp string, arg any) filter { 593 return filter{ 594 key: key, 595 arg: arg, 596 cmp: cmp, 597 } 598} 599 600func FilterEq(key string, arg any) filter { return newFilter(key, "=", arg) } 601func FilterNotEq(key string, arg any) filter { return newFilter(key, "<>", arg) } 602func FilterGte(key string, arg any) filter { return newFilter(key, ">=", arg) } 603func FilterLte(key string, arg any) filter { return newFilter(key, "<=", arg) } 604func FilterIs(key string, arg any) filter { return newFilter(key, "is", arg) } 605func FilterIsNot(key string, arg any) filter { return newFilter(key, "is not", arg) } 606func FilterIn(key string, arg any) filter { return newFilter(key, "in", arg) } 607 608func (f filter) Condition() string { 609 rv := reflect.ValueOf(f.arg) 610 kind := rv.Kind() 611 612 // if we have `FilterIn(k, [1, 2, 3])`, compile it down to `k in (?, ?, ?)` 613 if kind == reflect.Slice || kind == reflect.Array { 614 if rv.Len() == 0 { 615 panic(fmt.Sprintf("empty slice passed to %q filter on %s", f.cmp, f.key)) 616 } 617 618 placeholders := make([]string, rv.Len()) 619 for i := range placeholders { 620 placeholders[i] = "?" 621 } 622 623 return fmt.Sprintf("%s %s (%s)", f.key, f.cmp, strings.Join(placeholders, ", ")) 624 } 625 626 return fmt.Sprintf("%s %s ?", f.key, f.cmp) 627} 628 629func (f filter) Arg() []any { 630 rv := reflect.ValueOf(f.arg) 631 kind := rv.Kind() 632 if kind == reflect.Slice || kind == reflect.Array { 633 if rv.Len() == 0 { 634 panic(fmt.Sprintf("empty slice passed to %q filter on %s", f.cmp, f.key)) 635 } 636 637 out := make([]any, rv.Len()) 638 for i := range rv.Len() { 639 out[i] = rv.Index(i).Interface() 640 } 641 return out 642 } 643 644 return []any{f.arg} 645}