Fork of github.com/did-method-plc/did-method-plc
1import { Kysely, Migrator, PostgresDialect, sql } from 'kysely'
2import { Pool as PgPool, types as pgTypes } from 'pg'
3import { CID } from 'multiformats/cid'
4import { cidForCbor } from '@atproto/common'
5import * as plc from '@did-plc/lib'
6import { ServerError } from '../error'
7import * as migrations from '../migrations'
8import { DatabaseSchema, PlcDatabase } from './types'
9import MockDatabase from './mock'
10import { enforceOpsRateLimit } from '../constraints'
11
12export * from './mock'
13export * from './types'
14
15export class Database implements PlcDatabase {
16 migrator: Migrator
17 constructor(public db: Kysely<DatabaseSchema>, public schema?: string) {
18 this.migrator = new Migrator({
19 db,
20 migrationTableSchema: schema,
21 provider: {
22 async getMigrations() {
23 return migrations
24 },
25 },
26 })
27 }
28
29 static postgres(opts: PgOptions): Database {
30 const { schema } = opts
31 const pool = new PgPool({
32 connectionString: opts.url,
33 max: opts.poolSize,
34 maxUses: opts.poolMaxUses,
35 idleTimeoutMillis: opts.poolIdleTimeoutMs,
36 })
37
38 // Select count(*) and other pg bigints as js integer
39 pgTypes.setTypeParser(pgTypes.builtins.INT8, (n) => parseInt(n, 10))
40
41 // Setup schema usage, primarily for test parallelism (each test suite runs in its own pg schema)
42 if (schema !== undefined) {
43 if (!/^[a-z_]+$/i.test(schema)) {
44 throw new Error(
45 `Postgres schema must only contain [A-Za-z_]: ${schema}`,
46 )
47 }
48 pool.on('connect', (client) =>
49 // Shared objects such as extensions will go in the public schema
50 client.query(`SET search_path TO "${schema}",public`),
51 )
52 }
53
54 const db = new Kysely<DatabaseSchema>({
55 dialect: new PostgresDialect({ pool }),
56 })
57
58 return new Database(db, schema)
59 }
60
61 static mock(): MockDatabase {
62 return new MockDatabase()
63 }
64
65 async close(): Promise<void> {
66 await this.db.destroy()
67 }
68
69 async healthCheck(): Promise<void> {
70 await sql`select 1`.execute(this.db)
71 }
72
73 async migrateToOrThrow(migration: string) {
74 if (this.schema !== undefined) {
75 await this.db.schema.createSchema(this.schema).ifNotExists().execute()
76 }
77 const { error, results } = await this.migrator.migrateTo(migration)
78 if (error) {
79 throw error
80 }
81 if (!results) {
82 throw new Error('An unknown failure occurred while migrating')
83 }
84 return results
85 }
86
87 async migrateToLatestOrThrow() {
88 if (this.schema !== undefined) {
89 await this.db.schema.createSchema(this.schema).ifNotExists().execute()
90 }
91 const { error, results } = await this.migrator.migrateToLatest()
92 if (error) {
93 throw error
94 }
95 if (!results) {
96 throw new Error('An unknown failure occurred while migrating')
97 }
98 return results
99 }
100
101 async validateAndAddOp(
102 did: string,
103 proposed: plc.CompatibleOpOrTombstone,
104 ): Promise<void> {
105 const ops = await this.indexedOpsForDid(did)
106 // throws if invalid
107 const { nullified, prev } = await plc.assureValidNextOp(did, ops, proposed)
108 // do not enforce rate limits on recovery operations to prevent DDOS by a bad actor
109 if (nullified.length === 0) {
110 enforceOpsRateLimit(ops)
111 }
112
113 const cid = await cidForCbor(proposed)
114
115 await this.db.transaction().execute(async (tx) => {
116 // grab a row lock on user table
117 const userLock = await tx
118 .selectFrom('dids')
119 .forUpdate()
120 .selectAll()
121 .where('did', '=', did)
122 .executeTakeFirst()
123
124 if (!userLock) {
125 await tx.insertInto('dids').values({ did }).execute()
126 }
127
128 await tx
129 .insertInto('operations')
130 .values({
131 did,
132 operation: proposed,
133 cid: cid.toString(),
134 nullified: false,
135 })
136 .execute()
137
138 if (nullified.length > 0) {
139 const nullfiedStrs = nullified.map((cid) => cid.toString())
140 await tx
141 .updateTable('operations')
142 .set({ nullified: true })
143 .where('did', '=', did)
144 .where('cid', 'in', nullfiedStrs)
145 .execute()
146 }
147
148 // verify that the 2nd to last tx matches the proposed prev
149 // otherwise rollback to prevent forks in history
150 const mostRecent = await tx
151 .selectFrom('operations')
152 .select('cid')
153 .where('did', '=', did)
154 .where('nullified', '=', false)
155 .orderBy('createdAt', 'desc')
156 .limit(2)
157 .execute()
158 const isMatch =
159 (prev === null && !mostRecent[1]) ||
160 (prev && prev.equals(CID.parse(mostRecent[1].cid)))
161 if (!isMatch) {
162 throw new ServerError(
163 409,
164 `Proposed prev does not match the most recent operation: ${mostRecent?.toString()}`,
165 )
166 }
167 })
168 }
169
170 async mostRecentCid(did: string, notIncluded: CID[]): Promise<CID | null> {
171 const notIncludedStr = notIncluded.map((cid) => cid.toString())
172
173 const found = await this.db
174 .selectFrom('operations')
175 .select('cid')
176 .where('did', '=', did)
177 .where('nullified', '=', false)
178 .where('cid', 'not in', notIncludedStr)
179 .orderBy('createdAt', 'desc')
180 .executeTakeFirst()
181 return found ? CID.parse(found.cid) : null
182 }
183
184 async opsForDid(did: string): Promise<plc.CompatibleOpOrTombstone[]> {
185 const ops = await this.indexedOpsForDid(did)
186 return ops.map((op) => op.operation)
187 }
188
189 async indexedOpsForDid(
190 did: string,
191 includeNullified = false,
192 ): Promise<plc.IndexedOperation[]> {
193 let builder = this.db
194 .selectFrom('operations')
195 .selectAll()
196 .where('did', '=', did)
197 .orderBy('createdAt', 'asc')
198 if (!includeNullified) {
199 builder = builder.where('nullified', '=', false)
200 }
201 const res = await builder.execute()
202 return res.map((row) => ({
203 did: row.did,
204 operation: row.operation,
205 cid: CID.parse(row.cid),
206 nullified: row.nullified,
207 createdAt: row.createdAt,
208 }))
209 }
210
211 async lastOpForDid(did: string): Promise<plc.CompatibleOpOrTombstone | null> {
212 const res = await this.db
213 .selectFrom('operations')
214 .selectAll()
215 .where('did', '=', did)
216 .where('nullified', '=', false)
217 .orderBy('createdAt', 'desc')
218 .limit(1)
219 .executeTakeFirst()
220 return res?.operation ?? null
221 }
222
223 async exportOps(count: number, after?: Date): Promise<plc.ExportedOp[]> {
224 let builder = this.db
225 .selectFrom('operations')
226 .selectAll()
227 .orderBy('createdAt', 'asc')
228 .limit(count)
229 if (after) {
230 builder = builder.where('createdAt', '>', after)
231 }
232 const res = await builder.execute()
233 return res.map((row) => ({
234 ...row,
235 createdAt: row.createdAt.toISOString(),
236 }))
237 }
238}
239
240export type PgOptions = {
241 url: string
242 schema?: string
243 poolSize?: number
244 poolMaxUses?: number
245 poolIdleTimeoutMs?: number
246}
247
248export default Database