forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at packages 13 kB view raw
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log" 8 9 _ "github.com/mattn/go-sqlite3" 10) 11 12type DB struct { 13 *sql.DB 14} 15 16type Execer interface { 17 Query(query string, args ...any) (*sql.Rows, error) 18 QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) 19 QueryRow(query string, args ...any) *sql.Row 20 QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row 21 Exec(query string, args ...any) (sql.Result, error) 22 ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) 23 Prepare(query string) (*sql.Stmt, error) 24 PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) 25} 26 27func Make(dbPath string) (*DB, error) { 28 db, err := sql.Open("sqlite3", dbPath) 29 if err != nil { 30 return nil, err 31 } 32 _, err = db.Exec(` 33 pragma journal_mode = WAL; 34 pragma synchronous = normal; 35 pragma foreign_keys = on; 36 pragma temp_store = memory; 37 pragma mmap_size = 30000000000; 38 pragma page_size = 32768; 39 pragma auto_vacuum = incremental; 40 pragma busy_timeout = 5000; 41 42 create table if not exists registrations ( 43 id integer primary key autoincrement, 44 domain text not null unique, 45 did text not null, 46 secret text not null, 47 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 48 registered text 49 ); 50 create table if not exists public_keys ( 51 id integer primary key autoincrement, 52 did text not null, 53 name text not null, 54 key text not null, 55 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 56 unique(did, name, key) 57 ); 58 create table if not exists repos ( 59 id integer primary key autoincrement, 60 did text not null, 61 name text not null, 62 knot text not null, 63 rkey text not null, 64 at_uri text not null unique, 65 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 66 unique(did, name, knot, rkey) 67 ); 68 create table if not exists collaborators ( 69 id integer primary key autoincrement, 70 did text not null, 71 repo integer not null, 72 foreign key (repo) references repos(id) on delete cascade 73 ); 74 create table if not exists follows ( 75 user_did text not null, 76 subject_did text not null, 77 rkey text not null, 78 followed_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 79 primary key (user_did, subject_did), 80 check (user_did <> subject_did) 81 ); 82 create table if not exists issues ( 83 id integer primary key autoincrement, 84 owner_did text not null, 85 repo_at text not null, 86 issue_id integer not null, 87 title text not null, 88 body text not null, 89 open integer not null default 1, 90 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 91 issue_at text, 92 unique(repo_at, issue_id), 93 foreign key (repo_at) references repos(at_uri) on delete cascade 94 ); 95 create table if not exists comments ( 96 id integer primary key autoincrement, 97 owner_did text not null, 98 issue_id integer not null, 99 repo_at text not null, 100 comment_id integer not null, 101 comment_at text not null, 102 body text not null, 103 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 104 unique(issue_id, comment_id), 105 foreign key (repo_at, issue_id) references issues(repo_at, issue_id) on delete cascade 106 ); 107 create table if not exists pulls ( 108 -- identifiers 109 id integer primary key autoincrement, 110 pull_id integer not null, 111 112 -- at identifiers 113 repo_at text not null, 114 owner_did text not null, 115 rkey text not null, 116 pull_at text, 117 118 -- content 119 title text not null, 120 body text not null, 121 target_branch text not null, 122 state integer not null default 0 check (state in (0, 1, 2)), -- open, merged, closed 123 124 -- meta 125 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 126 127 -- constraints 128 unique(repo_at, pull_id), 129 foreign key (repo_at) references repos(at_uri) on delete cascade 130 ); 131 132 -- every pull must have atleast 1 submission: the initial submission 133 create table if not exists pull_submissions ( 134 -- identifiers 135 id integer primary key autoincrement, 136 pull_id integer not null, 137 138 -- at identifiers 139 repo_at text not null, 140 141 -- content, these are immutable, and require a resubmission to update 142 round_number integer not null default 0, 143 patch text, 144 145 -- meta 146 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 147 148 -- constraints 149 unique(repo_at, pull_id, round_number), 150 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade 151 ); 152 153 create table if not exists pull_comments ( 154 -- identifiers 155 id integer primary key autoincrement, 156 pull_id integer not null, 157 submission_id integer not null, 158 159 -- at identifiers 160 repo_at text not null, 161 owner_did text not null, 162 comment_at text not null, 163 164 -- content 165 body text not null, 166 167 -- meta 168 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 169 170 -- constraints 171 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade, 172 foreign key (submission_id) references pull_submissions(id) on delete cascade 173 ); 174 175 create table if not exists _jetstream ( 176 id integer primary key autoincrement, 177 last_time_us integer not null 178 ); 179 180 create table if not exists repo_issue_seqs ( 181 repo_at text primary key, 182 next_issue_id integer not null default 1 183 ); 184 185 create table if not exists repo_pull_seqs ( 186 repo_at text primary key, 187 next_pull_id integer not null default 1 188 ); 189 190 create table if not exists stars ( 191 id integer primary key autoincrement, 192 starred_by_did text not null, 193 repo_at text not null, 194 rkey text not null, 195 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 196 foreign key (repo_at) references repos(at_uri) on delete cascade, 197 unique(starred_by_did, repo_at) 198 ); 199 200 create table if not exists emails ( 201 id integer primary key autoincrement, 202 did text not null, 203 email text not null, 204 verified integer not null default 0, 205 verification_code text not null, 206 last_sent text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 207 is_primary integer not null default 0, 208 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 209 unique(did, email) 210 ); 211 212 create table if not exists artifacts ( 213 -- id 214 id integer primary key autoincrement, 215 did text not null, 216 rkey text not null, 217 218 -- meta 219 repo_at text not null, 220 tag binary(20) not null, 221 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 222 223 -- data 224 blob_cid text not null, 225 name text not null, 226 size integer not null default 0, 227 mimetype string not null default "*/*", 228 229 -- constraints 230 unique(did, rkey), -- record must be unique 231 unique(repo_at, tag, name), -- for a given tag object, each file must be unique 232 foreign key (repo_at) references repos(at_uri) on delete cascade 233 ); 234 235 create table if not exists profile ( 236 -- id 237 id integer primary key autoincrement, 238 did text not null, 239 240 -- data 241 description text not null, 242 include_bluesky integer not null default 0, 243 location text, 244 245 -- constraints 246 unique(did) 247 ); 248 create table if not exists profile_links ( 249 -- id 250 id integer primary key autoincrement, 251 did text not null, 252 253 -- data 254 link text not null, 255 256 -- constraints 257 foreign key (did) references profile(did) on delete cascade 258 ); 259 create table if not exists profile_stats ( 260 -- id 261 id integer primary key autoincrement, 262 did text not null, 263 264 -- data 265 kind text not null check (kind in ( 266 "merged-pull-request-count", 267 "closed-pull-request-count", 268 "open-pull-request-count", 269 "open-issue-count", 270 "closed-issue-count", 271 "repository-count" 272 )), 273 274 -- constraints 275 foreign key (did) references profile(did) on delete cascade 276 ); 277 create table if not exists profile_pinned_repositories ( 278 -- id 279 id integer primary key autoincrement, 280 did text not null, 281 282 -- data 283 at_uri text not null, 284 285 -- constraints 286 unique(did, at_uri), 287 foreign key (did) references profile(did) on delete cascade, 288 foreign key (at_uri) references repos(at_uri) on delete cascade 289 ); 290 291 create table if not exists oauth_requests ( 292 id integer primary key autoincrement, 293 auth_server_iss text not null, 294 state text not null, 295 did text not null, 296 handle text not null, 297 pds_url text not null, 298 pkce_verifier text not null, 299 dpop_auth_server_nonce text not null, 300 dpop_private_jwk text not null 301 ); 302 303 create table if not exists oauth_sessions ( 304 id integer primary key autoincrement, 305 did text not null, 306 handle text not null, 307 pds_url text not null, 308 auth_server_iss text not null, 309 access_jwt text not null, 310 refresh_jwt text not null, 311 dpop_pds_nonce text, 312 dpop_auth_server_nonce text not null, 313 dpop_private_jwk text not null, 314 expiry text not null 315 ); 316 317 create table if not exists migrations ( 318 id integer primary key autoincrement, 319 name text unique 320 ) 321 `) 322 if err != nil { 323 return nil, err 324 } 325 326 // run migrations 327 runMigration(db, "add-description-to-repos", func(tx *sql.Tx) error { 328 tx.Exec(` 329 alter table repos add column description text check (length(description) <= 200); 330 `) 331 return nil 332 }) 333 334 runMigration(db, "add-rkey-to-pubkeys", func(tx *sql.Tx) error { 335 // add unconstrained column 336 _, err := tx.Exec(` 337 alter table public_keys 338 add column rkey text; 339 `) 340 if err != nil { 341 return err 342 } 343 344 // backfill 345 _, err = tx.Exec(` 346 update public_keys 347 set rkey = '' 348 where rkey is null; 349 `) 350 if err != nil { 351 return err 352 } 353 354 return nil 355 }) 356 357 runMigration(db, "add-rkey-to-comments", func(tx *sql.Tx) error { 358 _, err := tx.Exec(` 359 alter table comments drop column comment_at; 360 alter table comments add column rkey text; 361 `) 362 return err 363 }) 364 365 runMigration(db, "add-deleted-and-edited-to-issue-comments", func(tx *sql.Tx) error { 366 _, err := tx.Exec(` 367 alter table comments add column deleted text; -- timestamp 368 alter table comments add column edited text; -- timestamp 369 `) 370 return err 371 }) 372 373 runMigration(db, "add-source-info-to-pulls-and-submissions", func(tx *sql.Tx) error { 374 _, err := tx.Exec(` 375 alter table pulls add column source_branch text; 376 alter table pulls add column source_repo_at text; 377 alter table pull_submissions add column source_rev text; 378 `) 379 return err 380 }) 381 382 runMigration(db, "add-source-to-repos", func(tx *sql.Tx) error { 383 _, err := tx.Exec(` 384 alter table repos add column source text; 385 `) 386 return err 387 }) 388 389 // disable foreign-keys for the next migration 390 // NOTE: this cannot be done in a transaction, so it is run outside [0] 391 // 392 // [0]: https://sqlite.org/pragma.html#pragma_foreign_keys 393 db.Exec("pragma foreign_keys = off;") 394 runMigration(db, "recreate-pulls-column-for-stacking-support", func(tx *sql.Tx) error { 395 _, err := tx.Exec(` 396 create table pulls_new ( 397 -- identifiers 398 id integer primary key autoincrement, 399 pull_id integer not null, 400 401 -- at identifiers 402 repo_at text not null, 403 owner_did text not null, 404 rkey text not null, 405 406 -- content 407 title text not null, 408 body text not null, 409 target_branch text not null, 410 state integer not null default 0 check (state in (0, 1, 2, 3)), -- closed, open, merged, deleted 411 412 -- source info 413 source_branch text, 414 source_repo_at text, 415 416 -- stacking 417 stack_id text, 418 change_id text, 419 parent_change_id text, 420 421 -- meta 422 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 423 424 -- constraints 425 unique(repo_at, pull_id), 426 foreign key (repo_at) references repos(at_uri) on delete cascade 427 ); 428 429 insert into pulls_new ( 430 id, pull_id, 431 repo_at, owner_did, rkey, 432 title, body, target_branch, state, 433 source_branch, source_repo_at, 434 created 435 ) 436 select 437 id, pull_id, 438 repo_at, owner_did, rkey, 439 title, body, target_branch, state, 440 source_branch, source_repo_at, 441 created 442 FROM pulls; 443 444 drop table pulls; 445 alter table pulls_new rename to pulls; 446 `) 447 return err 448 }) 449 db.Exec("pragma foreign_keys = on;") 450 451 return &DB{db}, nil 452} 453 454type migrationFn = func(*sql.Tx) error 455 456func runMigration(d *sql.DB, name string, migrationFn migrationFn) error { 457 tx, err := d.Begin() 458 if err != nil { 459 return err 460 } 461 defer tx.Rollback() 462 463 var exists bool 464 err = tx.QueryRow("select exists (select 1 from migrations where name = ?)", name).Scan(&exists) 465 if err != nil { 466 return err 467 } 468 469 if !exists { 470 // run migration 471 err = migrationFn(tx) 472 if err != nil { 473 log.Printf("Failed to run migration %s: %v", name, err) 474 return err 475 } 476 477 // mark migration as complete 478 _, err = tx.Exec("insert into migrations (name) values (?)", name) 479 if err != nil { 480 log.Printf("Failed to mark migration %s as complete: %v", name, err) 481 return err 482 } 483 484 // commit the transaction 485 if err := tx.Commit(); err != nil { 486 return err 487 } 488 489 log.Printf("migration %s applied successfully", name) 490 } else { 491 log.Printf("skipped migration %s, already applied", name) 492 } 493 494 return nil 495} 496 497type filter struct { 498 key string 499 arg any 500 cmp string 501} 502 503func FilterEq(key string, arg any) filter { 504 return filter{ 505 key: key, 506 arg: arg, 507 cmp: "=", 508 } 509} 510 511func FilterNotEq(key string, arg any) filter { 512 return filter{ 513 key: key, 514 arg: arg, 515 cmp: "<>", 516 } 517} 518 519func (f filter) Condition() string { 520 return fmt.Sprintf("%s %s ?", f.key, f.cmp) 521}