forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log" 8 9 _ "github.com/mattn/go-sqlite3" 10) 11 12type DB struct { 13 *sql.DB 14} 15 16type Execer interface { 17 Query(query string, args ...any) (*sql.Rows, error) 18 QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) 19 QueryRow(query string, args ...any) *sql.Row 20 QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row 21 Exec(query string, args ...any) (sql.Result, error) 22 ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) 23 Prepare(query string) (*sql.Stmt, error) 24 PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) 25} 26 27func Make(dbPath string) (*DB, error) { 28 db, err := sql.Open("sqlite3", dbPath) 29 if err != nil { 30 return nil, err 31 } 32 _, err = db.Exec(` 33 pragma journal_mode = WAL; 34 pragma synchronous = normal; 35 pragma foreign_keys = on; 36 pragma temp_store = memory; 37 pragma mmap_size = 30000000000; 38 pragma page_size = 32768; 39 pragma auto_vacuum = incremental; 40 pragma busy_timeout = 5000; 41 42 create table if not exists registrations ( 43 id integer primary key autoincrement, 44 domain text not null unique, 45 did text not null, 46 secret text not null, 47 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 48 registered text 49 ); 50 create table if not exists public_keys ( 51 id integer primary key autoincrement, 52 did text not null, 53 name text not null, 54 key text not null, 55 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 56 unique(did, name, key) 57 ); 58 create table if not exists repos ( 59 id integer primary key autoincrement, 60 did text not null, 61 name text not null, 62 knot text not null, 63 rkey text not null, 64 at_uri text not null unique, 65 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 66 unique(did, name, knot, rkey) 67 ); 68 create table if not exists collaborators ( 69 id integer primary key autoincrement, 70 did text not null, 71 repo integer not null, 72 foreign key (repo) references repos(id) on delete cascade 73 ); 74 create table if not exists follows ( 75 user_did text not null, 76 subject_did text not null, 77 rkey text not null, 78 followed_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 79 primary key (user_did, subject_did), 80 check (user_did <> subject_did) 81 ); 82 create table if not exists issues ( 83 id integer primary key autoincrement, 84 owner_did text not null, 85 repo_at text not null, 86 issue_id integer not null, 87 title text not null, 88 body text not null, 89 open integer not null default 1, 90 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 91 issue_at text, 92 unique(repo_at, issue_id), 93 foreign key (repo_at) references repos(at_uri) on delete cascade 94 ); 95 create table if not exists comments ( 96 id integer primary key autoincrement, 97 owner_did text not null, 98 issue_id integer not null, 99 repo_at text not null, 100 comment_id integer not null, 101 comment_at text not null, 102 body text not null, 103 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 104 unique(issue_id, comment_id), 105 foreign key (repo_at, issue_id) references issues(repo_at, issue_id) on delete cascade 106 ); 107 create table if not exists pulls ( 108 -- identifiers 109 id integer primary key autoincrement, 110 pull_id integer not null, 111 112 -- at identifiers 113 repo_at text not null, 114 owner_did text not null, 115 rkey text not null, 116 pull_at text, 117 118 -- content 119 title text not null, 120 body text not null, 121 target_branch text not null, 122 state integer not null default 0 check (state in (0, 1, 2)), -- open, merged, closed 123 124 -- meta 125 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 126 127 -- constraints 128 unique(repo_at, pull_id), 129 foreign key (repo_at) references repos(at_uri) on delete cascade 130 ); 131 132 -- every pull must have atleast 1 submission: the initial submission 133 create table if not exists pull_submissions ( 134 -- identifiers 135 id integer primary key autoincrement, 136 pull_id integer not null, 137 138 -- at identifiers 139 repo_at text not null, 140 141 -- content, these are immutable, and require a resubmission to update 142 round_number integer not null default 0, 143 patch text, 144 145 -- meta 146 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 147 148 -- constraints 149 unique(repo_at, pull_id, round_number), 150 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade 151 ); 152 153 create table if not exists pull_comments ( 154 -- identifiers 155 id integer primary key autoincrement, 156 pull_id integer not null, 157 submission_id integer not null, 158 159 -- at identifiers 160 repo_at text not null, 161 owner_did text not null, 162 comment_at text not null, 163 164 -- content 165 body text not null, 166 167 -- meta 168 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 169 170 -- constraints 171 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade, 172 foreign key (submission_id) references pull_submissions(id) on delete cascade 173 ); 174 175 create table if not exists _jetstream ( 176 id integer primary key autoincrement, 177 last_time_us integer not null 178 ); 179 180 create table if not exists repo_issue_seqs ( 181 repo_at text primary key, 182 next_issue_id integer not null default 1 183 ); 184 185 create table if not exists repo_pull_seqs ( 186 repo_at text primary key, 187 next_pull_id integer not null default 1 188 ); 189 190 create table if not exists stars ( 191 id integer primary key autoincrement, 192 starred_by_did text not null, 193 repo_at text not null, 194 rkey text not null, 195 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 196 foreign key (repo_at) references repos(at_uri) on delete cascade, 197 unique(starred_by_did, repo_at) 198 ); 199 200 create table if not exists emails ( 201 id integer primary key autoincrement, 202 did text not null, 203 email text not null, 204 verified integer not null default 0, 205 verification_code text not null, 206 last_sent text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 207 is_primary integer not null default 0, 208 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 209 unique(did, email) 210 ); 211 212 create table if not exists artifacts ( 213 -- id 214 id integer primary key autoincrement, 215 did text not null, 216 rkey text not null, 217 218 -- meta 219 repo_at text not null, 220 tag binary(20) not null, 221 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 222 223 -- data 224 blob_cid text not null, 225 name text not null, 226 size integer not null default 0, 227 mimetype string not null default "*/*", 228 229 -- constraints 230 unique(did, rkey), -- record must be unique 231 unique(repo_at, tag, name), -- for a given tag object, each file must be unique 232 foreign key (repo_at) references repos(at_uri) on delete cascade 233 ); 234 235 create table if not exists profile ( 236 -- id 237 id integer primary key autoincrement, 238 did text not null, 239 240 -- data 241 description text not null, 242 include_bluesky integer not null default 0, 243 location text, 244 245 -- constraints 246 unique(did) 247 ); 248 create table if not exists profile_links ( 249 -- id 250 id integer primary key autoincrement, 251 did text not null, 252 253 -- data 254 link text not null, 255 256 -- constraints 257 foreign key (did) references profile(did) on delete cascade 258 ); 259 create table if not exists profile_stats ( 260 -- id 261 id integer primary key autoincrement, 262 did text not null, 263 264 -- data 265 kind text not null check (kind in ( 266 "merged-pull-request-count", 267 "closed-pull-request-count", 268 "open-pull-request-count", 269 "open-issue-count", 270 "closed-issue-count", 271 "repository-count" 272 )), 273 274 -- constraints 275 foreign key (did) references profile(did) on delete cascade 276 ); 277 create table if not exists profile_pinned_repositories ( 278 -- id 279 id integer primary key autoincrement, 280 did text not null, 281 282 -- data 283 at_uri text not null, 284 285 -- constraints 286 unique(did, at_uri), 287 foreign key (did) references profile(did) on delete cascade, 288 foreign key (at_uri) references repos(at_uri) on delete cascade 289 ); 290 291 create table if not exists oauth_requests ( 292 id integer primary key autoincrement, 293 auth_server_iss text not null, 294 state text not null, 295 did text not null, 296 handle text not null, 297 pds_url text not null, 298 pkce_verifier text not null, 299 dpop_auth_server_nonce text not null, 300 dpop_private_jwk text not null 301 ); 302 303 create table if not exists oauth_sessions ( 304 id integer primary key autoincrement, 305 did text not null, 306 handle text not null, 307 pds_url text not null, 308 auth_server_iss text not null, 309 access_jwt text not null, 310 refresh_jwt text not null, 311 dpop_pds_nonce text, 312 dpop_auth_server_nonce text not null, 313 dpop_private_jwk text not null, 314 expiry text not null 315 ); 316 317 create table if not exists punchcard ( 318 did text not null, 319 date text not null, -- yyyy-mm-dd 320 count integer, 321 primary key (did, date) 322 ); 323 324 create table if not exists migrations ( 325 id integer primary key autoincrement, 326 name text unique 327 ) 328 `) 329 if err != nil { 330 return nil, err 331 } 332 333 // run migrations 334 runMigration(db, "add-description-to-repos", func(tx *sql.Tx) error { 335 tx.Exec(` 336 alter table repos add column description text check (length(description) <= 200); 337 `) 338 return nil 339 }) 340 341 runMigration(db, "add-rkey-to-pubkeys", func(tx *sql.Tx) error { 342 // add unconstrained column 343 _, err := tx.Exec(` 344 alter table public_keys 345 add column rkey text; 346 `) 347 if err != nil { 348 return err 349 } 350 351 // backfill 352 _, err = tx.Exec(` 353 update public_keys 354 set rkey = '' 355 where rkey is null; 356 `) 357 if err != nil { 358 return err 359 } 360 361 return nil 362 }) 363 364 runMigration(db, "add-rkey-to-comments", func(tx *sql.Tx) error { 365 _, err := tx.Exec(` 366 alter table comments drop column comment_at; 367 alter table comments add column rkey text; 368 `) 369 return err 370 }) 371 372 runMigration(db, "add-deleted-and-edited-to-issue-comments", func(tx *sql.Tx) error { 373 _, err := tx.Exec(` 374 alter table comments add column deleted text; -- timestamp 375 alter table comments add column edited text; -- timestamp 376 `) 377 return err 378 }) 379 380 runMigration(db, "add-source-info-to-pulls-and-submissions", func(tx *sql.Tx) error { 381 _, err := tx.Exec(` 382 alter table pulls add column source_branch text; 383 alter table pulls add column source_repo_at text; 384 alter table pull_submissions add column source_rev text; 385 `) 386 return err 387 }) 388 389 runMigration(db, "add-source-to-repos", func(tx *sql.Tx) error { 390 _, err := tx.Exec(` 391 alter table repos add column source text; 392 `) 393 return err 394 }) 395 396 // disable foreign-keys for the next migration 397 // NOTE: this cannot be done in a transaction, so it is run outside [0] 398 // 399 // [0]: https://sqlite.org/pragma.html#pragma_foreign_keys 400 db.Exec("pragma foreign_keys = off;") 401 runMigration(db, "recreate-pulls-column-for-stacking-support", func(tx *sql.Tx) error { 402 _, err := tx.Exec(` 403 create table pulls_new ( 404 -- identifiers 405 id integer primary key autoincrement, 406 pull_id integer not null, 407 408 -- at identifiers 409 repo_at text not null, 410 owner_did text not null, 411 rkey text not null, 412 413 -- content 414 title text not null, 415 body text not null, 416 target_branch text not null, 417 state integer not null default 0 check (state in (0, 1, 2, 3)), -- closed, open, merged, deleted 418 419 -- source info 420 source_branch text, 421 source_repo_at text, 422 423 -- stacking 424 stack_id text, 425 change_id text, 426 parent_change_id text, 427 428 -- meta 429 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 430 431 -- constraints 432 unique(repo_at, pull_id), 433 foreign key (repo_at) references repos(at_uri) on delete cascade 434 ); 435 436 insert into pulls_new ( 437 id, pull_id, 438 repo_at, owner_did, rkey, 439 title, body, target_branch, state, 440 source_branch, source_repo_at, 441 created 442 ) 443 select 444 id, pull_id, 445 repo_at, owner_did, rkey, 446 title, body, target_branch, state, 447 source_branch, source_repo_at, 448 created 449 FROM pulls; 450 451 drop table pulls; 452 alter table pulls_new rename to pulls; 453 `) 454 return err 455 }) 456 db.Exec("pragma foreign_keys = on;") 457 458 return &DB{db}, nil 459} 460 461type migrationFn = func(*sql.Tx) error 462 463func runMigration(d *sql.DB, name string, migrationFn migrationFn) error { 464 tx, err := d.Begin() 465 if err != nil { 466 return err 467 } 468 defer tx.Rollback() 469 470 var exists bool 471 err = tx.QueryRow("select exists (select 1 from migrations where name = ?)", name).Scan(&exists) 472 if err != nil { 473 return err 474 } 475 476 if !exists { 477 // run migration 478 err = migrationFn(tx) 479 if err != nil { 480 log.Printf("Failed to run migration %s: %v", name, err) 481 return err 482 } 483 484 // mark migration as complete 485 _, err = tx.Exec("insert into migrations (name) values (?)", name) 486 if err != nil { 487 log.Printf("Failed to mark migration %s as complete: %v", name, err) 488 return err 489 } 490 491 // commit the transaction 492 if err := tx.Commit(); err != nil { 493 return err 494 } 495 496 log.Printf("migration %s applied successfully", name) 497 } else { 498 log.Printf("skipped migration %s, already applied", name) 499 } 500 501 return nil 502} 503 504type filter struct { 505 key string 506 arg any 507 cmp string 508} 509 510func FilterEq(key string, arg any) filter { 511 return filter{ 512 key: key, 513 arg: arg, 514 cmp: "=", 515 } 516} 517 518func FilterNotEq(key string, arg any) filter { 519 return filter{ 520 key: key, 521 arg: arg, 522 cmp: "<>", 523 } 524} 525 526func FilterGte(key string, arg any) filter { 527 return filter{ 528 key: key, 529 arg: arg, 530 cmp: ">=", 531 } 532} 533 534func FilterLte(key string, arg any) filter { 535 return filter{ 536 key: key, 537 arg: arg, 538 cmp: "<=", 539 } 540} 541 542func (f filter) Condition() string { 543 return fmt.Sprintf("%s %s ?", f.key, f.cmp) 544}