1package db
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log"
8 "reflect"
9 "strings"
10
11 _ "github.com/mattn/go-sqlite3"
12)
13
14type DB struct {
15 *sql.DB
16}
17
18type Execer interface {
19 Query(query string, args ...any) (*sql.Rows, error)
20 QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
21 QueryRow(query string, args ...any) *sql.Row
22 QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
23 Exec(query string, args ...any) (sql.Result, error)
24 ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
25 Prepare(query string) (*sql.Stmt, error)
26 PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
27}
28
29func Make(dbPath string) (*DB, error) {
30 db, err := sql.Open("sqlite3", dbPath)
31 if err != nil {
32 return nil, err
33 }
34 _, err = db.Exec(`
35 pragma journal_mode = WAL;
36 pragma synchronous = normal;
37 pragma foreign_keys = on;
38 pragma temp_store = memory;
39 pragma mmap_size = 30000000000;
40 pragma page_size = 32768;
41 pragma auto_vacuum = incremental;
42 pragma busy_timeout = 5000;
43
44 create table if not exists registrations (
45 id integer primary key autoincrement,
46 domain text not null unique,
47 did text not null,
48 secret text not null,
49 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
50 registered text
51 );
52 create table if not exists public_keys (
53 id integer primary key autoincrement,
54 did text not null,
55 name text not null,
56 key text not null,
57 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
58 unique(did, name, key)
59 );
60 create table if not exists repos (
61 id integer primary key autoincrement,
62 did text not null,
63 name text not null,
64 knot text not null,
65 rkey text not null,
66 at_uri text not null unique,
67 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
68 unique(did, name, knot, rkey)
69 );
70 create table if not exists collaborators (
71 id integer primary key autoincrement,
72 did text not null,
73 repo integer not null,
74 foreign key (repo) references repos(id) on delete cascade
75 );
76 create table if not exists follows (
77 user_did text not null,
78 subject_did text not null,
79 rkey text not null,
80 followed_at text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
81 primary key (user_did, subject_did),
82 check (user_did <> subject_did)
83 );
84 create table if not exists issues (
85 id integer primary key autoincrement,
86 owner_did text not null,
87 repo_at text not null,
88 issue_id integer not null,
89 title text not null,
90 body text not null,
91 open integer not null default 1,
92 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
93 issue_at text,
94 unique(repo_at, issue_id),
95 foreign key (repo_at) references repos(at_uri) on delete cascade
96 );
97 create table if not exists comments (
98 id integer primary key autoincrement,
99 owner_did text not null,
100 issue_id integer not null,
101 repo_at text not null,
102 comment_id integer not null,
103 comment_at text not null,
104 body text not null,
105 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
106 unique(issue_id, comment_id),
107 foreign key (repo_at, issue_id) references issues(repo_at, issue_id) on delete cascade
108 );
109 create table if not exists pulls (
110 -- identifiers
111 id integer primary key autoincrement,
112 pull_id integer not null,
113
114 -- at identifiers
115 repo_at text not null,
116 owner_did text not null,
117 rkey text not null,
118 pull_at text,
119
120 -- content
121 title text not null,
122 body text not null,
123 target_branch text not null,
124 state integer not null default 0 check (state in (0, 1, 2)), -- open, merged, closed
125
126 -- meta
127 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
128
129 -- constraints
130 unique(repo_at, pull_id),
131 foreign key (repo_at) references repos(at_uri) on delete cascade
132 );
133
134 -- every pull must have atleast 1 submission: the initial submission
135 create table if not exists pull_submissions (
136 -- identifiers
137 id integer primary key autoincrement,
138 pull_id integer not null,
139
140 -- at identifiers
141 repo_at text not null,
142
143 -- content, these are immutable, and require a resubmission to update
144 round_number integer not null default 0,
145 patch text,
146
147 -- meta
148 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
149
150 -- constraints
151 unique(repo_at, pull_id, round_number),
152 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade
153 );
154
155 create table if not exists pull_comments (
156 -- identifiers
157 id integer primary key autoincrement,
158 pull_id integer not null,
159 submission_id integer not null,
160
161 -- at identifiers
162 repo_at text not null,
163 owner_did text not null,
164 comment_at text not null,
165
166 -- content
167 body text not null,
168
169 -- meta
170 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
171
172 -- constraints
173 foreign key (repo_at, pull_id) references pulls(repo_at, pull_id) on delete cascade,
174 foreign key (submission_id) references pull_submissions(id) on delete cascade
175 );
176
177 create table if not exists _jetstream (
178 id integer primary key autoincrement,
179 last_time_us integer not null
180 );
181
182 create table if not exists repo_issue_seqs (
183 repo_at text primary key,
184 next_issue_id integer not null default 1
185 );
186
187 create table if not exists repo_pull_seqs (
188 repo_at text primary key,
189 next_pull_id integer not null default 1
190 );
191
192 create table if not exists stars (
193 id integer primary key autoincrement,
194 starred_by_did text not null,
195 repo_at text not null,
196 rkey text not null,
197 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
198 foreign key (repo_at) references repos(at_uri) on delete cascade,
199 unique(starred_by_did, repo_at)
200 );
201
202 create table if not exists emails (
203 id integer primary key autoincrement,
204 did text not null,
205 email text not null,
206 verified integer not null default 0,
207 verification_code text not null,
208 last_sent text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
209 is_primary integer not null default 0,
210 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
211 unique(did, email)
212 );
213
214 create table if not exists artifacts (
215 -- id
216 id integer primary key autoincrement,
217 did text not null,
218 rkey text not null,
219
220 -- meta
221 repo_at text not null,
222 tag binary(20) not null,
223 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
224
225 -- data
226 blob_cid text not null,
227 name text not null,
228 size integer not null default 0,
229 mimetype string not null default "*/*",
230
231 -- constraints
232 unique(did, rkey), -- record must be unique
233 unique(repo_at, tag, name), -- for a given tag object, each file must be unique
234 foreign key (repo_at) references repos(at_uri) on delete cascade
235 );
236
237 create table if not exists profile (
238 -- id
239 id integer primary key autoincrement,
240 did text not null,
241
242 -- data
243 description text not null,
244 include_bluesky integer not null default 0,
245 location text,
246
247 -- constraints
248 unique(did)
249 );
250 create table if not exists profile_links (
251 -- id
252 id integer primary key autoincrement,
253 did text not null,
254
255 -- data
256 link text not null,
257
258 -- constraints
259 foreign key (did) references profile(did) on delete cascade
260 );
261 create table if not exists profile_stats (
262 -- id
263 id integer primary key autoincrement,
264 did text not null,
265
266 -- data
267 kind text not null check (kind in (
268 "merged-pull-request-count",
269 "closed-pull-request-count",
270 "open-pull-request-count",
271 "open-issue-count",
272 "closed-issue-count",
273 "repository-count"
274 )),
275
276 -- constraints
277 foreign key (did) references profile(did) on delete cascade
278 );
279 create table if not exists profile_pinned_repositories (
280 -- id
281 id integer primary key autoincrement,
282 did text not null,
283
284 -- data
285 at_uri text not null,
286
287 -- constraints
288 unique(did, at_uri),
289 foreign key (did) references profile(did) on delete cascade,
290 foreign key (at_uri) references repos(at_uri) on delete cascade
291 );
292
293 create table if not exists oauth_requests (
294 id integer primary key autoincrement,
295 auth_server_iss text not null,
296 state text not null,
297 did text not null,
298 handle text not null,
299 pds_url text not null,
300 pkce_verifier text not null,
301 dpop_auth_server_nonce text not null,
302 dpop_private_jwk text not null
303 );
304
305 create table if not exists oauth_sessions (
306 id integer primary key autoincrement,
307 did text not null,
308 handle text not null,
309 pds_url text not null,
310 auth_server_iss text not null,
311 access_jwt text not null,
312 refresh_jwt text not null,
313 dpop_pds_nonce text,
314 dpop_auth_server_nonce text not null,
315 dpop_private_jwk text not null,
316 expiry text not null
317 );
318
319 create table if not exists punchcard (
320 did text not null,
321 date text not null, -- yyyy-mm-dd
322 count integer,
323 primary key (did, date)
324 );
325
326 create table if not exists spindles (
327 id integer primary key autoincrement,
328 owner text not null,
329 instance text not null,
330 verified text, -- time of verification
331 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
332
333 unique(instance)
334 );
335
336 create table if not exists pipelines (
337 -- identifiers
338 id integer primary key autoincrement,
339 knot text not null,
340 rkey text not null,
341
342 repo_owner text not null,
343 repo_name text not null,
344
345 -- every pipeline must be associated with exactly one commit
346 sha text not null check (length(sha) = 40),
347 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
348
349 -- trigger data
350 trigger_id integer not null,
351
352 unique(knot, rkey),
353 foreign key (trigger_id) references triggers(id) on delete cascade
354 );
355
356 create table if not exists triggers (
357 -- primary key
358 id integer primary key autoincrement,
359
360 -- top-level fields
361 kind text not null,
362
363 -- pushTriggerData fields
364 push_ref text,
365 push_new_sha text check (length(push_new_sha) = 40),
366 push_old_sha text check (length(push_old_sha) = 40),
367
368 -- pullRequestTriggerData fields
369 pr_source_branch text,
370 pr_target_branch text,
371 pr_source_sha text check (length(pr_source_sha) = 40),
372 pr_action text
373 );
374
375 create table if not exists pipeline_statuses (
376 -- identifiers
377 id integer primary key autoincrement,
378 spindle text not null,
379 rkey text not null,
380
381 -- referenced pipeline. these form the (did, rkey) pair
382 pipeline_knot text not null,
383 pipeline_rkey text not null,
384
385 -- content
386 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
387 workflow text not null,
388 status text not null,
389 error text,
390 exit_code integer not null default 0,
391
392 unique (spindle, rkey),
393 foreign key (pipeline_knot, pipeline_rkey)
394 references pipelines (knot, rkey)
395 on delete cascade
396 );
397
398 create table if not exists migrations (
399 id integer primary key autoincrement,
400 name text unique
401 );
402 `)
403 if err != nil {
404 return nil, err
405 }
406
407 // run migrations
408 runMigration(db, "add-description-to-repos", func(tx *sql.Tx) error {
409 tx.Exec(`
410 alter table repos add column description text check (length(description) <= 200);
411 `)
412 return nil
413 })
414
415 runMigration(db, "add-rkey-to-pubkeys", func(tx *sql.Tx) error {
416 // add unconstrained column
417 _, err := tx.Exec(`
418 alter table public_keys
419 add column rkey text;
420 `)
421 if err != nil {
422 return err
423 }
424
425 // backfill
426 _, err = tx.Exec(`
427 update public_keys
428 set rkey = ''
429 where rkey is null;
430 `)
431 if err != nil {
432 return err
433 }
434
435 return nil
436 })
437
438 runMigration(db, "add-rkey-to-comments", func(tx *sql.Tx) error {
439 _, err := tx.Exec(`
440 alter table comments drop column comment_at;
441 alter table comments add column rkey text;
442 `)
443 return err
444 })
445
446 runMigration(db, "add-deleted-and-edited-to-issue-comments", func(tx *sql.Tx) error {
447 _, err := tx.Exec(`
448 alter table comments add column deleted text; -- timestamp
449 alter table comments add column edited text; -- timestamp
450 `)
451 return err
452 })
453
454 runMigration(db, "add-source-info-to-pulls-and-submissions", func(tx *sql.Tx) error {
455 _, err := tx.Exec(`
456 alter table pulls add column source_branch text;
457 alter table pulls add column source_repo_at text;
458 alter table pull_submissions add column source_rev text;
459 `)
460 return err
461 })
462
463 runMigration(db, "add-source-to-repos", func(tx *sql.Tx) error {
464 _, err := tx.Exec(`
465 alter table repos add column source text;
466 `)
467 return err
468 })
469
470 // disable foreign-keys for the next migration
471 // NOTE: this cannot be done in a transaction, so it is run outside [0]
472 //
473 // [0]: https://sqlite.org/pragma.html#pragma_foreign_keys
474 db.Exec("pragma foreign_keys = off;")
475 runMigration(db, "recreate-pulls-column-for-stacking-support", func(tx *sql.Tx) error {
476 _, err := tx.Exec(`
477 create table pulls_new (
478 -- identifiers
479 id integer primary key autoincrement,
480 pull_id integer not null,
481
482 -- at identifiers
483 repo_at text not null,
484 owner_did text not null,
485 rkey text not null,
486
487 -- content
488 title text not null,
489 body text not null,
490 target_branch text not null,
491 state integer not null default 0 check (state in (0, 1, 2, 3)), -- closed, open, merged, deleted
492
493 -- source info
494 source_branch text,
495 source_repo_at text,
496
497 -- stacking
498 stack_id text,
499 change_id text,
500 parent_change_id text,
501
502 -- meta
503 created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
504
505 -- constraints
506 unique(repo_at, pull_id),
507 foreign key (repo_at) references repos(at_uri) on delete cascade
508 );
509
510 insert into pulls_new (
511 id, pull_id,
512 repo_at, owner_did, rkey,
513 title, body, target_branch, state,
514 source_branch, source_repo_at,
515 created
516 )
517 select
518 id, pull_id,
519 repo_at, owner_did, rkey,
520 title, body, target_branch, state,
521 source_branch, source_repo_at,
522 created
523 FROM pulls;
524
525 drop table pulls;
526 alter table pulls_new rename to pulls;
527 `)
528 return err
529 })
530 db.Exec("pragma foreign_keys = on;")
531
532 // run migrations
533 runMigration(db, "add-spindle-to-repos", func(tx *sql.Tx) error {
534 tx.Exec(`
535 alter table repos add column spindle text;
536 `)
537 return nil
538 })
539
540 return &DB{db}, nil
541}
542
543type migrationFn = func(*sql.Tx) error
544
545func runMigration(d *sql.DB, name string, migrationFn migrationFn) error {
546 tx, err := d.Begin()
547 if err != nil {
548 return err
549 }
550 defer tx.Rollback()
551
552 var exists bool
553 err = tx.QueryRow("select exists (select 1 from migrations where name = ?)", name).Scan(&exists)
554 if err != nil {
555 return err
556 }
557
558 if !exists {
559 // run migration
560 err = migrationFn(tx)
561 if err != nil {
562 log.Printf("Failed to run migration %s: %v", name, err)
563 return err
564 }
565
566 // mark migration as complete
567 _, err = tx.Exec("insert into migrations (name) values (?)", name)
568 if err != nil {
569 log.Printf("Failed to mark migration %s as complete: %v", name, err)
570 return err
571 }
572
573 // commit the transaction
574 if err := tx.Commit(); err != nil {
575 return err
576 }
577
578 log.Printf("migration %s applied successfully", name)
579 } else {
580 log.Printf("skipped migration %s, already applied", name)
581 }
582
583 return nil
584}
585
586type filter struct {
587 key string
588 arg any
589 cmp string
590}
591
592func newFilter(key, cmp string, arg any) filter {
593 return filter{
594 key: key,
595 arg: arg,
596 cmp: cmp,
597 }
598}
599
600func FilterEq(key string, arg any) filter { return newFilter(key, "=", arg) }
601func FilterNotEq(key string, arg any) filter { return newFilter(key, "<>", arg) }
602func FilterGte(key string, arg any) filter { return newFilter(key, ">=", arg) }
603func FilterLte(key string, arg any) filter { return newFilter(key, "<=", arg) }
604func FilterIs(key string, arg any) filter { return newFilter(key, "is", arg) }
605func FilterIsNot(key string, arg any) filter { return newFilter(key, "is not", arg) }
606func FilterIn(key string, arg any) filter { return newFilter(key, "in", arg) }
607
608func (f filter) Condition() string {
609 rv := reflect.ValueOf(f.arg)
610 kind := rv.Kind()
611
612 // if we have `FilterIn(k, [1, 2, 3])`, compile it down to `k in (?, ?, ?)`
613 if kind == reflect.Slice || kind == reflect.Array {
614 if rv.Len() == 0 {
615 panic(fmt.Sprintf("empty slice passed to %q filter on %s", f.cmp, f.key))
616 }
617
618 placeholders := make([]string, rv.Len())
619 for i := range placeholders {
620 placeholders[i] = "?"
621 }
622
623 return fmt.Sprintf("%s %s (%s)", f.key, f.cmp, strings.Join(placeholders, ", "))
624 }
625
626 return fmt.Sprintf("%s %s ?", f.key, f.cmp)
627}
628
629func (f filter) Arg() []any {
630 rv := reflect.ValueOf(f.arg)
631 kind := rv.Kind()
632 if kind == reflect.Slice || kind == reflect.Array {
633 if rv.Len() == 0 {
634 panic(fmt.Sprintf("empty slice passed to %q filter on %s", f.cmp, f.key))
635 }
636
637 out := make([]any, rv.Len())
638 for i := range rv.Len() {
639 out[i] = rv.Index(i).Interface()
640 }
641 return out
642 }
643
644 return []any{f.arg}
645}