1package db
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log"
8 "reflect"
9 "strings"
10
11 _ "github.com/mattn/go-sqlite3"
12)
13
14type DB struct {
15 *sql.DB
16}
17
18type Execer interface {
19 Query(query string, args ...any) (*sql.Rows, error)
20 QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
21 QueryRow(query string, args ...any) *sql.Row
22 QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
23 Exec(query string, args ...any) (sql.Result, error)
24 ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
25 Prepare(query string) (*sql.Stmt, error)
26 PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
27}
28
29func Make(dbPath string) (*DB, error) {
30 db, err := sql.Open("sqlite3", dbPath)
31 if err != nil {
32 return nil, err
33 }
34 _, err = db.Exec(`
35 pragma journal_mode = WAL;
36 pragma synchronous = normal;
37 pragma foreign_keys = on;
38 pragma temp_store = memory;
39 pragma mmap_size = 30000000000;
40 pragma page_size = 32768;
41 pragma auto_vacuum = incremental;
42 pragma busy_timeout = 5000;
43
44 create table if not exists registrations (
45 id integer primary key autoincrement,
46 domain text not null unique,
47 did text not null,
48 secret text not null,
49 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
50 registered text
51 );
52 create table if not exists public_keys (
53 id integer primary key autoincrement,
54 did text not null,
55 name text not null,
56 key text not null,
57 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
58 unique(did, name, key)
59 );
60 create table if not exists repos (
61 id integer primary key autoincrement,
62 did text not null,
63 name text not null,
64 knot text not null,
65 rkey text not null,
66 at_uri text not null unique,
67 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
68 unique(did, name, knot, rkey)
69 );
70 create table if not exists collaborators (
71 id integer primary key autoincrement,
72 did text not null,
73 repo integer not null,
74 foreign key (repo) references repos(id) on delete cascade
75 );
76 create table if not exists follows (
77 user_did text not null,
78 subject_did text not null,
79 rkey text not null,
80 followed_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
81 primary key (user_did, subject_did),
82 check (user_did <> subject_did)
83 );
84 create table if not exists issues (
85 id integer primary key autoincrement,
86 owner_did text not null,
87 repo_at text not null,
88 issue_id integer not null,
89 title text not null,
90 body text not null,
91 open integer not null default 1,
92 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
93 issue_at text,
94 unique(repo_at, issue_id),
95 foreign key (repo_at) references repos(at_uri) on delete cascade
96 );
97 create table if not exists comments (
98 id integer primary key autoincrement,
99 owner_did text not null,
100 issue_id integer not null,
101 repo_at text not null,
102 comment_id integer not null,
103 comment_at text not null,
104 body text not null,
105 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
106 unique(issue_id, comment_id),
107 foreign key (repo_at, issue_id) references issues(repo_at, issue_id) on delete cascade
108 );
109 create table if not exists pulls (
110 -- identifiers
111 id integer primary key autoincrement,
112 pull_id integer not null,
113
114 -- at identifiers
115 repo_at text not null,
116 owner_did text not null,
117 rkey text not null,
118 pull_at text,
119
120 -- content
121 title text not null,
122 body text not null,
123 target_branch text not null,
124 state integer not null default 0 check (state in (0, 1, 2)), -- open, merged, closed
125
126 -- meta
127 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
128
129 -- constraints
130 unique(repo_at, pull_id),
131 foreign key (repo_at) references repos(at_uri) on delete cascade
132 );
133
134 -- every pull must have atleast 1 submission: the initial submission
135 create table if not exists pull_submissions (
136 -- identifiers
137 id integer primary key autoincrement,
138 pull_id integer not null,
139
140 -- at identifiers
141 repo_at text not null,
142
143 -- content, these are immutable, and require a resubmission to update
144 round_number integer not null default 0,
145 patch text,
146
147 -- meta
148 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
149
150 -- constraints
151 unique(repo_at, pull_id, round_number),
152 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade
153 );
154
155 create table if not exists pull_comments (
156 -- identifiers
157 id integer primary key autoincrement,
158 pull_id integer not null,
159 submission_id integer not null,
160
161 -- at identifiers
162 repo_at text not null,
163 owner_did text not null,
164 comment_at text not null,
165
166 -- content
167 body text not null,
168
169 -- meta
170 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
171
172 -- constraints
173 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade,
174 foreign key (submission_id) references pull_submissions(id) on delete cascade
175 );
176
177 create table if not exists _jetstream (
178 id integer primary key autoincrement,
179 last_time_us integer not null
180 );
181
182 create table if not exists repo_issue_seqs (
183 repo_at text primary key,
184 next_issue_id integer not null default 1
185 );
186
187 create table if not exists repo_pull_seqs (
188 repo_at text primary key,
189 next_pull_id integer not null default 1
190 );
191
192 create table if not exists stars (
193 id integer primary key autoincrement,
194 starred_by_did text not null,
195 repo_at text not null,
196 rkey text not null,
197 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
198 foreign key (repo_at) references repos(at_uri) on delete cascade,
199 unique(starred_by_did, repo_at)
200 );
201
202 create table if not exists emails (
203 id integer primary key autoincrement,
204 did text not null,
205 email text not null,
206 verified integer not null default 0,
207 verification_code text not null,
208 last_sent text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
209 is_primary integer not null default 0,
210 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
211 unique(did, email)
212 );
213
214 create table if not exists artifacts (
215 -- id
216 id integer primary key autoincrement,
217 did text not null,
218 rkey text not null,
219
220 -- meta
221 repo_at text not null,
222 tag binary(20) not null,
223 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
224
225 -- data
226 blob_cid text not null,
227 name text not null,
228 size integer not null default 0,
229 mimetype string not null default "*/*",
230
231 -- constraints
232 unique(did, rkey), -- record must be unique
233 unique(repo_at, tag, name), -- for a given tag object, each file must be unique
234 foreign key (repo_at) references repos(at_uri) on delete cascade
235 );
236
237 create table if not exists profile (
238 -- id
239 id integer primary key autoincrement,
240 did text not null,
241
242 -- data
243 description text not null,
244 include_bluesky integer not null default 0,
245 location text,
246
247 -- constraints
248 unique(did)
249 );
250 create table if not exists profile_links (
251 -- id
252 id integer primary key autoincrement,
253 did text not null,
254
255 -- data
256 link text not null,
257
258 -- constraints
259 foreign key (did) references profile(did) on delete cascade
260 );
261 create table if not exists profile_stats (
262 -- id
263 id integer primary key autoincrement,
264 did text not null,
265
266 -- data
267 kind text not null check (kind in (
268 "merged-pull-request-count",
269 "closed-pull-request-count",
270 "open-pull-request-count",
271 "open-issue-count",
272 "closed-issue-count",
273 "repository-count"
274 )),
275
276 -- constraints
277 foreign key (did) references profile(did) on delete cascade
278 );
279 create table if not exists profile_pinned_repositories (
280 -- id
281 id integer primary key autoincrement,
282 did text not null,
283
284 -- data
285 at_uri text not null,
286
287 -- constraints
288 unique(did, at_uri),
289 foreign key (did) references profile(did) on delete cascade,
290 foreign key (at_uri) references repos(at_uri) on delete cascade
291 );
292
293 create table if not exists oauth_requests (
294 id integer primary key autoincrement,
295 auth_server_iss text not null,
296 state text not null,
297 did text not null,
298 handle text not null,
299 pds_url text not null,
300 pkce_verifier text not null,
301 dpop_auth_server_nonce text not null,
302 dpop_private_jwk text not null
303 );
304
305 create table if not exists oauth_sessions (
306 id integer primary key autoincrement,
307 did text not null,
308 handle text not null,
309 pds_url text not null,
310 auth_server_iss text not null,
311 access_jwt text not null,
312 refresh_jwt text not null,
313 dpop_pds_nonce text,
314 dpop_auth_server_nonce text not null,
315 dpop_private_jwk text not null,
316 expiry text not null
317 );
318
319 create table if not exists punchcard (
320 did text not null,
321 date text not null, -- yyyy-mm-dd
322 count integer,
323 primary key (did, date)
324 );
325
326 create table if not exists spindles (
327 id integer primary key autoincrement,
328 owner text not null,
329 instance text not null,
330 verified text, -- time of verification
331 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
332
333 unique(did, instance)
334 );
335
336 create table if not exists spindle_members (
337 -- identifiers for the record
338 id integer primary key autoincrement,
339 did text not null,
340 rkey text not null,
341
342 -- data
343 instance text not null,
344 subject text not null,
345 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
346
347 -- constraints
348 foreign key (did, instance) references spindles(owner, instance) on delete cascade,
349 unique (did, instance, subject)
350 );
351
352 create table if not exists pipelines (
353 -- identifiers
354 id integer primary key autoincrement,
355 knot text not null,
356 rkey text not null,
357
358 repo_owner text not null,
359 repo_name text not null,
360
361 -- every pipeline must be associated with exactly one commit
362 sha text not null check (length(sha) = 40),
363 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
364
365 -- trigger data
366 trigger_id integer not null,
367
368 unique(knot, rkey),
369 foreign key (trigger_id) references triggers(id) on delete cascade
370 );
371
372 create table if not exists triggers (
373 -- primary key
374 id integer primary key autoincrement,
375
376 -- top-level fields
377 kind text not null,
378
379 -- pushTriggerData fields
380 push_ref text,
381 push_new_sha text check (length(push_new_sha) = 40),
382 push_old_sha text check (length(push_old_sha) = 40),
383
384 -- pullRequestTriggerData fields
385 pr_source_branch text,
386 pr_target_branch text,
387 pr_source_sha text check (length(pr_source_sha) = 40),
388 pr_action text
389 );
390
391 create table if not exists pipeline_statuses (
392 -- identifiers
393 id integer primary key autoincrement,
394 spindle text not null,
395 rkey text not null,
396
397 -- referenced pipeline. these form the (did, rkey) pair
398 pipeline_knot text not null,
399 pipeline_rkey text not null,
400
401 -- content
402 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
403 workflow text not null,
404 status text not null,
405 error text,
406 exit_code integer not null default 0,
407
408 unique (spindle, rkey),
409 foreign key (pipeline_knot, pipeline_rkey)
410 references pipelines (knot, rkey)
411 on delete cascade
412 );
413
414 create table if not exists migrations (
415 id integer primary key autoincrement,
416 name text unique
417 );
418 `)
419 if err != nil {
420 return nil, err
421 }
422
423 // run migrations
424 runMigration(db, "add-description-to-repos", func(tx *sql.Tx) error {
425 tx.Exec(`
426 alter table repos add column description text check (length(description) <= 200);
427 `)
428 return nil
429 })
430
431 runMigration(db, "add-rkey-to-pubkeys", func(tx *sql.Tx) error {
432 // add unconstrained column
433 _, err := tx.Exec(`
434 alter table public_keys
435 add column rkey text;
436 `)
437 if err != nil {
438 return err
439 }
440
441 // backfill
442 _, err = tx.Exec(`
443 update public_keys
444 set rkey = ''
445 where rkey is null;
446 `)
447 if err != nil {
448 return err
449 }
450
451 return nil
452 })
453
454 runMigration(db, "add-rkey-to-comments", func(tx *sql.Tx) error {
455 _, err := tx.Exec(`
456 alter table comments drop column comment_at;
457 alter table comments add column rkey text;
458 `)
459 return err
460 })
461
462 runMigration(db, "add-deleted-and-edited-to-issue-comments", func(tx *sql.Tx) error {
463 _, err := tx.Exec(`
464 alter table comments add column deleted text; -- timestamp
465 alter table comments add column edited text; -- timestamp
466 `)
467 return err
468 })
469
470 runMigration(db, "add-source-info-to-pulls-and-submissions", func(tx *sql.Tx) error {
471 _, err := tx.Exec(`
472 alter table pulls add column source_branch text;
473 alter table pulls add column source_repo_at text;
474 alter table pull_submissions add column source_rev text;
475 `)
476 return err
477 })
478
479 runMigration(db, "add-source-to-repos", func(tx *sql.Tx) error {
480 _, err := tx.Exec(`
481 alter table repos add column source text;
482 `)
483 return err
484 })
485
486 // disable foreign-keys for the next migration
487 // NOTE: this cannot be done in a transaction, so it is run outside [0]
488 //
489 // [0]: https://sqlite.org/pragma.html#pragma_foreign_keys
490 db.Exec("pragma foreign_keys = off;")
491 runMigration(db, "recreate-pulls-column-for-stacking-support", func(tx *sql.Tx) error {
492 _, err := tx.Exec(`
493 create table pulls_new (
494 -- identifiers
495 id integer primary key autoincrement,
496 pull_id integer not null,
497
498 -- at identifiers
499 repo_at text not null,
500 owner_did text not null,
501 rkey text not null,
502
503 -- content
504 title text not null,
505 body text not null,
506 target_branch text not null,
507 state integer not null default 0 check (state in (0, 1, 2, 3)), -- closed, open, merged, deleted
508
509 -- source info
510 source_branch text,
511 source_repo_at text,
512
513 -- stacking
514 stack_id text,
515 change_id text,
516 parent_change_id text,
517
518 -- meta
519 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
520
521 -- constraints
522 unique(repo_at, pull_id),
523 foreign key (repo_at) references repos(at_uri) on delete cascade
524 );
525
526 insert into pulls_new (
527 id, pull_id,
528 repo_at, owner_did, rkey,
529 title, body, target_branch, state,
530 source_branch, source_repo_at,
531 created
532 )
533 select
534 id, pull_id,
535 repo_at, owner_did, rkey,
536 title, body, target_branch, state,
537 source_branch, source_repo_at,
538 created
539 FROM pulls;
540
541 drop table pulls;
542 alter table pulls_new rename to pulls;
543 `)
544 return err
545 })
546 db.Exec("pragma foreign_keys = on;")
547
548 // run migrations
549 runMigration(db, "add-spindle-to-repos", func(tx *sql.Tx) error {
550 tx.Exec(`
551 alter table repos add column spindle text;
552 `)
553 return nil
554 })
555
556 return &DB{db}, nil
557}
558
559type migrationFn = func(*sql.Tx) error
560
561func runMigration(d *sql.DB, name string, migrationFn migrationFn) error {
562 tx, err := d.Begin()
563 if err != nil {
564 return err
565 }
566 defer tx.Rollback()
567
568 var exists bool
569 err = tx.QueryRow("select exists (select 1 from migrations where name = ?)", name).Scan(&exists)
570 if err != nil {
571 return err
572 }
573
574 if !exists {
575 // run migration
576 err = migrationFn(tx)
577 if err != nil {
578 log.Printf("Failed to run migration %s: %v", name, err)
579 return err
580 }
581
582 // mark migration as complete
583 _, err = tx.Exec("insert into migrations (name) values (?)", name)
584 if err != nil {
585 log.Printf("Failed to mark migration %s as complete: %v", name, err)
586 return err
587 }
588
589 // commit the transaction
590 if err := tx.Commit(); err != nil {
591 return err
592 }
593
594 log.Printf("migration %s applied successfully", name)
595 } else {
596 log.Printf("skipped migration %s, already applied", name)
597 }
598
599 return nil
600}
601
602type filter struct {
603 key string
604 arg any
605 cmp string
606}
607
608func newFilter(key, cmp string, arg any) filter {
609 return filter{
610 key: key,
611 arg: arg,
612 cmp: cmp,
613 }
614}
615
616func FilterEq(key string, arg any) filter { return newFilter(key, "=", arg) }
617func FilterNotEq(key string, arg any) filter { return newFilter(key, "<>", arg) }
618func FilterGte(key string, arg any) filter { return newFilter(key, ">=", arg) }
619func FilterLte(key string, arg any) filter { return newFilter(key, "<=", arg) }
620func FilterIs(key string, arg any) filter { return newFilter(key, "is", arg) }
621func FilterIsNot(key string, arg any) filter { return newFilter(key, "is not", arg) }
622func FilterIn(key string, arg any) filter { return newFilter(key, "in", arg) }
623
624func (f filter) Condition() string {
625 rv := reflect.ValueOf(f.arg)
626 kind := rv.Kind()
627
628 // if we have `FilterIn(k, [1, 2, 3])`, compile it down to `k in (?, ?, ?)`
629 if kind == reflect.Slice || kind == reflect.Array {
630 if rv.Len() == 0 {
631 panic(fmt.Sprintf("empty slice passed to %q filter on %s", f.cmp, f.key))
632 }
633
634 placeholders := make([]string, rv.Len())
635 for i := range placeholders {
636 placeholders[i] = "?"
637 }
638
639 return fmt.Sprintf("%s %s (%s)", f.key, f.cmp, strings.Join(placeholders, ", "))
640 }
641
642 return fmt.Sprintf("%s %s ?", f.key, f.cmp)
643}
644
645func (f filter) Arg() []any {
646 rv := reflect.ValueOf(f.arg)
647 kind := rv.Kind()
648 if kind == reflect.Slice || kind == reflect.Array {
649 if rv.Len() == 0 {
650 panic(fmt.Sprintf("empty slice passed to %q filter on %s", f.cmp, f.key))
651 }
652
653 out := make([]any, rv.Len())
654 for i := range rv.Len() {
655 out[i] = rv.Index(i).Interface()
656 }
657 return out
658 }
659
660 return []any{f.arg}
661}