Fork of github.com/did-method-plc/did-method-plc
1import { Kysely, Migrator, PostgresDialect, sql } from 'kysely' 2import { Pool as PgPool, types as pgTypes } from 'pg' 3import { CID } from 'multiformats/cid' 4import { cidForCbor } from '@atproto/common' 5import * as plc from '@did-plc/lib' 6import { ServerError } from '../error' 7import * as migrations from '../migrations' 8import { DatabaseSchema, PlcDatabase } from './types' 9import MockDatabase from './mock' 10import { enforceOpsRateLimit } from '../constraints' 11 12export * from './mock' 13export * from './types' 14 15export class Database implements PlcDatabase { 16 migrator: Migrator 17 constructor(public db: Kysely<DatabaseSchema>, public schema?: string) { 18 this.migrator = new Migrator({ 19 db, 20 migrationTableSchema: schema, 21 provider: { 22 async getMigrations() { 23 return migrations 24 }, 25 }, 26 }) 27 } 28 29 static postgres(opts: PgOptions): Database { 30 const { schema } = opts 31 const pool = new PgPool({ 32 connectionString: opts.url, 33 max: opts.poolSize, 34 maxUses: opts.poolMaxUses, 35 idleTimeoutMillis: opts.poolIdleTimeoutMs, 36 }) 37 38 // Select count(*) and other pg bigints as js integer 39 pgTypes.setTypeParser(pgTypes.builtins.INT8, (n) => parseInt(n, 10)) 40 41 // Setup schema usage, primarily for test parallelism (each test suite runs in its own pg schema) 42 if (schema !== undefined) { 43 if (!/^[a-z_]+$/i.test(schema)) { 44 throw new Error( 45 `Postgres schema must only contain [A-Za-z_]: ${schema}`, 46 ) 47 } 48 pool.on('connect', (client) => 49 // Shared objects such as extensions will go in the public schema 50 client.query(`SET search_path TO "${schema}",public`), 51 ) 52 } 53 54 const db = new Kysely<DatabaseSchema>({ 55 dialect: new PostgresDialect({ pool }), 56 }) 57 58 return new Database(db, schema) 59 } 60 61 static mock(): MockDatabase { 62 return new MockDatabase() 63 } 64 65 async close(): Promise<void> { 66 await this.db.destroy() 67 } 68 69 async healthCheck(): Promise<void> { 70 await sql`select 1`.execute(this.db) 71 } 72 73 async migrateToOrThrow(migration: string) { 74 if (this.schema !== undefined) { 75 await this.db.schema.createSchema(this.schema).ifNotExists().execute() 76 } 77 const { error, results } = await this.migrator.migrateTo(migration) 78 if (error) { 79 throw error 80 } 81 if (!results) { 82 throw new Error('An unknown failure occurred while migrating') 83 } 84 return results 85 } 86 87 async migrateToLatestOrThrow() { 88 if (this.schema !== undefined) { 89 await this.db.schema.createSchema(this.schema).ifNotExists().execute() 90 } 91 const { error, results } = await this.migrator.migrateToLatest() 92 if (error) { 93 throw error 94 } 95 if (!results) { 96 throw new Error('An unknown failure occurred while migrating') 97 } 98 return results 99 } 100 101 async validateAndAddOp( 102 did: string, 103 proposed: plc.CompatibleOpOrTombstone, 104 ): Promise<void> { 105 const ops = await this.indexedOpsForDid(did) 106 // throws if invalid 107 const { nullified, prev } = await plc.assureValidNextOp(did, ops, proposed) 108 // do not enforce rate limits on recovery operations to prevent DDOS by a bad actor 109 if (nullified.length === 0) { 110 enforceOpsRateLimit(ops) 111 } 112 113 const cid = await cidForCbor(proposed) 114 115 await this.db.transaction().execute(async (tx) => { 116 // grab a row lock on user table 117 const userLock = await tx 118 .selectFrom('dids') 119 .forUpdate() 120 .selectAll() 121 .where('did', '=', did) 122 .executeTakeFirst() 123 124 if (!userLock) { 125 await tx.insertInto('dids').values({ did }).execute() 126 } 127 128 await tx 129 .insertInto('operations') 130 .values({ 131 did, 132 operation: proposed, 133 cid: cid.toString(), 134 nullified: false, 135 }) 136 .execute() 137 138 if (nullified.length > 0) { 139 const nullfiedStrs = nullified.map((cid) => cid.toString()) 140 await tx 141 .updateTable('operations') 142 .set({ nullified: true }) 143 .where('did', '=', did) 144 .where('cid', 'in', nullfiedStrs) 145 .execute() 146 } 147 148 // verify that the 2nd to last tx matches the proposed prev 149 // otherwise rollback to prevent forks in history 150 const mostRecent = await tx 151 .selectFrom('operations') 152 .select('cid') 153 .where('did', '=', did) 154 .where('nullified', '=', false) 155 .orderBy('createdAt', 'desc') 156 .limit(2) 157 .execute() 158 const isMatch = 159 (prev === null && !mostRecent[1]) || 160 (prev && prev.equals(CID.parse(mostRecent[1].cid))) 161 if (!isMatch) { 162 throw new ServerError( 163 409, 164 `Proposed prev does not match the most recent operation: ${mostRecent?.toString()}`, 165 ) 166 } 167 }) 168 } 169 170 async mostRecentCid(did: string, notIncluded: CID[]): Promise<CID | null> { 171 const notIncludedStr = notIncluded.map((cid) => cid.toString()) 172 173 const found = await this.db 174 .selectFrom('operations') 175 .select('cid') 176 .where('did', '=', did) 177 .where('nullified', '=', false) 178 .where('cid', 'not in', notIncludedStr) 179 .orderBy('createdAt', 'desc') 180 .executeTakeFirst() 181 return found ? CID.parse(found.cid) : null 182 } 183 184 async opsForDid(did: string): Promise<plc.CompatibleOpOrTombstone[]> { 185 const ops = await this.indexedOpsForDid(did) 186 return ops.map((op) => op.operation) 187 } 188 189 async indexedOpsForDid( 190 did: string, 191 includeNullified = false, 192 ): Promise<plc.IndexedOperation[]> { 193 let builder = this.db 194 .selectFrom('operations') 195 .selectAll() 196 .where('did', '=', did) 197 .orderBy('createdAt', 'asc') 198 if (!includeNullified) { 199 builder = builder.where('nullified', '=', false) 200 } 201 const res = await builder.execute() 202 return res.map((row) => ({ 203 did: row.did, 204 operation: row.operation, 205 cid: CID.parse(row.cid), 206 nullified: row.nullified, 207 createdAt: row.createdAt, 208 })) 209 } 210 211 async lastOpForDid(did: string): Promise<plc.CompatibleOpOrTombstone | null> { 212 const res = await this.db 213 .selectFrom('operations') 214 .selectAll() 215 .where('did', '=', did) 216 .where('nullified', '=', false) 217 .orderBy('createdAt', 'desc') 218 .limit(1) 219 .executeTakeFirst() 220 return res?.operation ?? null 221 } 222 223 async exportOps(count: number, after?: Date): Promise<plc.ExportedOp[]> { 224 let builder = this.db 225 .selectFrom('operations') 226 .selectAll() 227 .orderBy('createdAt', 'asc') 228 .limit(count) 229 if (after) { 230 builder = builder.where('createdAt', '>', after) 231 } 232 const res = await builder.execute() 233 return res.map((row) => ({ 234 ...row, 235 createdAt: row.createdAt.toISOString(), 236 })) 237 } 238} 239 240export type PgOptions = { 241 url: string 242 schema?: string 243 poolSize?: number 244 poolMaxUses?: number 245 poolIdleTimeoutMs?: number 246} 247 248export default Database