Fork of github.com/did-method-plc/did-method-plc

Merge pull request #4 from bluesky-social/auditability

Auditability

Changed files
+190 -90
packages
+35 -10
packages/lib/src/client.ts
···
-
import { cidForCbor } from '@atproto/common'
+
import { check, cidForCbor } from '@atproto/common'
import { Keypair } from '@atproto/crypto'
import axios from 'axios'
-
import { didForCreateOp, signOperation } from './operations'
+
import { didForCreateOp, normalizeOp, signOperation } from './operations'
import * as t from './types'
export class Client {
···
}
async getDocumentData(did: string): Promise<t.DocumentData> {
-
const res = await axios.get(`${this.url}/data/${encodeURIComponent(did)}`)
+
const res = await axios.get(`${this.url}/${encodeURIComponent(did)}/data`)
return res.data
}
-
async getOperationLog(did: string): Promise<t.Operation[]> {
-
const res = await axios.get(`${this.url}/log/${encodeURIComponent(did)}`)
-
return res.data.log
+
async getOperationLog(did: string): Promise<t.CompatibleOpOrTombstone[]> {
+
const res = await axios.get(`${this.url}/${encodeURIComponent(did)}/log`)
+
return res.data
+
}
+
+
async getAuditableLog(did: string): Promise<t.ExportedOp[]> {
+
const res = await axios.get(
+
`${this.url}/${encodeURIComponent(did)}/log/audit`,
+
)
+
return res.data
}
postOpUrl(did: string): string {
return `${this.url}/${encodeURIComponent(did)}`
}
-
async getLastOp(did: string): Promise<t.Operation> {
-
const res = await axios.get(`${this.url}/last/${encodeURIComponent(did)}`)
+
async getLastOp(did: string): Promise<t.CompatibleOpOrTombstone> {
+
const res = await axios.get(
+
`${this.url}/${encodeURIComponent(did)}/log/last`,
+
)
return res.data
}
···
key: Keypair,
) {
const lastOp = await this.getLastOp(did)
+
if (check.is(lastOp, t.def.tombstone)) {
+
throw new Error('Cannot apply op to tombstone')
+
}
const prev = await cidForCbor(lastOp)
-
const { signingKey, rotationKeys, handles, services } = lastOp
+
const { signingKey, rotationKeys, handles, services } = normalizeOp(lastOp)
const op = await signOperation(
{
signingKey,
···
return did
}
-
async sendOperation(did: string, op: t.Operation) {
+
async sendOperation(did: string, op: t.OpOrTombstone) {
await axios.post(this.postOpUrl(did), op)
+
}
+
+
async export(after?: string, count?: number): Promise<t.ExportedOp[]> {
+
const url = new URL(`${this.url}/export`)
+
if (after) {
+
url.searchParams.append('after', after)
+
}
+
if (count !== undefined) {
+
url.searchParams.append('count', count.toString(10))
+
}
+
const res = await axios.get(url.toString())
+
const lines = res.data.split('\n')
+
return lines.map((l) => JSON.parse(l))
}
async health() {
+11
packages/lib/src/types.ts
···
})
export type IndexedOperation = z.infer<typeof indexedOperation>
+
export const exportedOp = z.object({
+
did: z.string(),
+
operation: compatibleOpOrTombstone,
+
cid: z.string(),
+
nullified: z.boolean(),
+
createdAt: z.string(),
+
})
+
export type ExportedOp = z.infer<typeof exportedOp>
+
export const didDocVerificationMethod = z.object({
id: z.string(),
type: z.string(),
···
opOrTombstone,
compatibleOp,
compatibleOpOrTombstone,
+
indexedOperation,
+
exportedOp,
didDocument,
}
+1 -1
packages/lib/tests/compatibility.test.ts
···
const result = await assureValidNextOp(did, [indexedLegacy], nextOp)
expect(result.nullified.length).toBe(0)
-
expect(result.prev?.equals(legacyCid))
+
expect(result.prev?.equals(legacyCid)).toBeTruthy()
})
})
+3 -3
packages/lib/tests/recovery.test.ts
···
expect(res.nullified.length).toBe(2)
expect(res.nullified[0].equals(log[1].cid))
expect(res.nullified[1].equals(log[2].cid))
-
expect(res.prev?.equals(createCid))
+
expect(res.prev?.equals(createCid)).toBeTruthy()
log = [log[0], rotate.indexed]
})
···
const res = await data.assureValidNextOp(did, log, rotate.op)
expect(res.nullified.length).toBe(1)
expect(res.nullified[0].equals(log[1].cid))
-
expect(res.prev?.equals(createCid))
+
expect(res.prev?.equals(createCid)).toBeTruthy()
log = [log[0], rotate.indexed]
})
···
)
expect(result.nullified.length).toBe(1)
expect(result.nullified[0].equals(cid))
-
expect(result.prev?.equals(createCid))
+
expect(result.prev?.equals(createCid)).toBeTruthy()
})
})
+42 -46
packages/server/src/db/index.ts
···
-
import { Generated, Kysely, Migrator, PostgresDialect, sql } from 'kysely'
+
import { Kysely, Migrator, PostgresDialect, sql } from 'kysely'
import { Pool as PgPool, types as pgTypes } from 'pg'
import { CID } from 'multiformats/cid'
-
import { cidForCbor, check } from '@atproto/common'
+
import { cidForCbor } from '@atproto/common'
import * as plc from '@did-plc/lib'
import { ServerError } from '../error'
import * as migrations from '../migrations'
-
import { OpLogExport, PlcDatabase } from './types'
+
import { DatabaseSchema, PlcDatabase } from './types'
import MockDatabase from './mock'
export * from './mock'
···
}
async validateAndAddOp(did: string, proposed: plc.Operation): Promise<void> {
-
const ops = await this._opsForDid(did)
+
const ops = await this.indexedOpsForDid(did)
// throws if invalid
const { nullified, prev } = await plc.assureValidNextOp(did, ops, proposed)
const cid = await cidForCbor(proposed)
···
return found ? CID.parse(found.cid) : null
}
-
async opsForDid(did: string): Promise<plc.OpOrTombstone[]> {
-
const ops = await this._opsForDid(did)
-
return ops.map((op) => {
-
if (check.is(op.operation, plc.def.createOpV1)) {
-
return plc.normalizeOp(op.operation)
-
}
-
return op.operation
-
})
+
async opsForDid(did: string): Promise<plc.CompatibleOpOrTombstone[]> {
+
const ops = await this.indexedOpsForDid(did)
+
return ops.map((op) => op.operation)
}
-
async _opsForDid(did: string): Promise<plc.IndexedOperation[]> {
-
const res = await this.db
+
async indexedOpsForDid(
+
did: string,
+
includeNullified = false,
+
): Promise<plc.IndexedOperation[]> {
+
let builder = this.db
.selectFrom('operations')
.selectAll()
.where('did', '=', did)
-
.where('nullified', '=', false)
.orderBy('createdAt', 'asc')
-
.execute()
-
+
if (!includeNullified) {
+
builder = builder.where('nullified', '=', false)
+
}
+
const res = await builder.execute()
return res.map((row) => ({
did: row.did,
operation: row.operation,
···
}))
}
-
async fullExport(): Promise<Record<string, OpLogExport>> {
-
return {}
-
// const res = await this.db
-
// .selectFrom('operations')
-
// .selectAll()
-
// .orderBy('did')
-
// .orderBy('createdAt')
-
// .execute()
-
// return res.reduce((acc, cur) => {
-
// acc[cur.did] ??= []
-
// acc[cur.did].push({
-
// op: cur.operation),
-
// nullified: cur.nullified === 1,
-
// createdAt: cur.createdAt,
-
// })
-
// return acc
-
// }, {} as Record<string, OpLogExport>)
+
async lastOpForDid(did: string): Promise<plc.CompatibleOpOrTombstone | null> {
+
const res = await this.db
+
.selectFrom('operations')
+
.selectAll()
+
.where('did', '=', did)
+
.where('nullified', '=', false)
+
.orderBy('createdAt', 'desc')
+
.limit(1)
+
.executeTakeFirst()
+
return res?.operation ?? null
}
-
}
-
export default Database
-
-
interface OperationsTable {
-
did: string
-
operation: plc.CompatibleOpOrTombstone
-
cid: string
-
nullified: boolean
-
createdAt: Generated<Date>
+
async exportOps(count: number, after?: Date): Promise<plc.ExportedOp[]> {
+
let builder = this.db
+
.selectFrom('operations')
+
.selectAll()
+
.orderBy('createdAt', 'asc')
+
.limit(count)
+
if (after) {
+
builder = builder.where('createdAt', '>', after)
+
}
+
const res = await builder.execute()
+
return res.map((row) => ({
+
...row,
+
createdAt: row.createdAt.toISOString(),
+
}))
+
}
}
-
interface DatabaseSchema {
-
operations: OperationsTable
-
}
+
export default Database
+22 -7
packages/server/src/db/mock.ts
···
import { cidForCbor, check } from '@atproto/common'
import * as plc from '@did-plc/lib'
import { ServerError } from '../error'
-
import { OpLogExport, PlcDatabase } from './types'
+
import { PlcDatabase } from './types'
type Contents = Record<string, plc.IndexedOperation[]>
···
}
}
-
async opsForDid(did: string): Promise<plc.OpOrTombstone[]> {
-
const ops = await this._opsForDid(did)
+
async opsForDid(did: string): Promise<plc.CompatibleOpOrTombstone[]> {
+
const ops = await this.indexedOpsForDid(did)
return ops.map((op) => {
if (check.is(op.operation, plc.def.createOpV1)) {
return plc.normalizeOp(op.operation)
···
})
}
-
async _opsForDid(did: string): Promise<plc.IndexedOperation[]> {
-
return this.contents[did] ?? []
+
async indexedOpsForDid(
+
did: string,
+
includeNull = false,
+
): Promise<plc.IndexedOperation[]> {
+
const ops = this.contents[did] ?? []
+
if (includeNull) {
+
return ops
+
}
+
return ops.filter((op) => op.nullified === false)
+
}
+
+
async lastOpForDid(did: string): Promise<plc.CompatibleOpOrTombstone | null> {
+
const op = this.contents[did]?.at(-1)
+
+
if (!op) return null
+
return op.operation
}
-
async fullExport(): Promise<Record<string, OpLogExport>> {
-
return {}
+
// disabled in mocks
+
async exportOps(_count: number, _after?: Date): Promise<plc.ExportedOp[]> {
+
return []
}
}
+17 -8
packages/server/src/db/types.ts
···
import * as plc from '@did-plc/lib'
+
import { Generated } from 'kysely'
export interface PlcDatabase {
close(): Promise<void>
healthCheck(): Promise<void>
validateAndAddOp(did: string, proposed: plc.Operation): Promise<void>
-
opsForDid(did: string): Promise<plc.OpOrTombstone[]>
-
_opsForDid(did: string): Promise<plc.IndexedOperation[]>
-
fullExport(): Promise<Record<string, OpLogExport>>
+
opsForDid(did: string): Promise<plc.CompatibleOpOrTombstone[]>
+
indexedOpsForDid(
+
did: string,
+
includeNull?: boolean,
+
): Promise<plc.IndexedOperation[]>
+
lastOpForDid(did: string): Promise<plc.CompatibleOpOrTombstone | null>
+
exportOps(count: number, after?: Date): Promise<plc.ExportedOp[]>
}
-
export type OpLogExport = OpExport[]
-
-
export type OpExport = {
-
op: Record<string, unknown>
+
export interface OperationsTable {
+
did: string
+
operation: plc.CompatibleOpOrTombstone
+
cid: string
nullified: boolean
-
createdAt: string
+
createdAt: Generated<Date>
+
}
+
+
export interface DatabaseSchema {
+
operations: OperationsTable
}
+36 -13
packages/server/src/routes.ts
···
res.send({ version })
})
-
// @TODO paginate & test this
+
// Export ops in the form of paginated json lines
router.get('/export', async function (req, res) {
-
const fullExport = await ctx.db.fullExport()
+
const parsedCount = req.count ? parseInt(req.count, 10) : 1000
+
if (isNaN(parsedCount) || parsedCount < 1) {
+
throw new ServerError(400, 'Invalid count parameter')
+
}
+
const count = Math.min(parsedCount, 1000)
+
const after = req.query.after ? new Date(req.query.after) : undefined
+
const ops = await ctx.db.exportOps(count, after)
res.setHeader('content-type', 'application/jsonlines')
res.status(200)
-
for (const [did, ops] of Object.entries(fullExport)) {
-
const line = JSON.stringify({ did, ops })
+
for (let i = 0; i < ops.length; i++) {
+
if (i > 0) {
+
res.write('\n')
+
}
+
const line = JSON.stringify(ops[i])
res.write(line)
-
res.write('\n')
}
res.end()
})
···
})
// Get data for a DID document
-
router.get('/data/:did', async function (req, res) {
+
router.get('/:did/data', async function (req, res) {
const { did } = req.params
const log = await ctx.db.opsForDid(did)
if (log.length === 0) {
···
})
// Get operation log for a DID
-
router.get('/log/:did', async function (req, res) {
+
router.get('/:did/log', async function (req, res) {
const { did } = req.params
const log = await ctx.db.opsForDid(did)
if (log.length === 0) {
throw new ServerError(404, `DID not registered: ${did}`)
}
-
res.json({ log })
+
res.json(log)
+
})
+
+
// Get operation log for a DID
+
router.get('/:did/log/audit', async function (req, res) {
+
const { did } = req.params
+
const ops = await ctx.db.indexedOpsForDid(did, true)
+
if (ops.length === 0) {
+
throw new ServerError(404, `DID not registered: ${did}`)
+
}
+
const log = ops.map((op) => ({
+
...op,
+
cid: op.cid.toString(),
+
createdAt: op.createdAt.toISOString(),
+
}))
+
+
res.json(log)
})
// Get the most recent operation in the log for a DID
-
router.get('/last/:did', async function (req, res) {
+
router.get('/:did/log/last', async function (req, res) {
const { did } = req.params
-
const log = await ctx.db.opsForDid(did)
-
const curr = log.at(-1)
-
if (!curr) {
+
const last = await ctx.db.lastOpForDid(did)
+
if (!last) {
throw new ServerError(404, `DID not registered: ${did}`)
}
-
res.json(curr)
+
res.json(last)
})
// Update or create a DID doc
+23 -2
packages/server/tests/server.test.ts
···
import { EcdsaKeypair } from '@atproto/crypto'
import * as plc from '@did-plc/lib'
import { CloseFn, runTestServer } from './_util'
-
import { cidForCbor } from '@atproto/common'
+
import { check, cidForCbor } from '@atproto/common'
import { AxiosError } from 'axios'
import { Database } from '../src'
import { signOperation } from '@did-plc/lib'
···
const newKey = await EcdsaKeypair.create()
const ops = await client.getOperationLog(did)
const forkPoint = ops.at(-2)
-
if (!forkPoint) {
+
if (!check.is(forkPoint, plc.def.operation)) {
throw new Error('Could not find fork point')
}
const forkCid = await cidForCbor(forkPoint)
···
expect(doc.rotationKeys).toEqual([newKey.did()])
expect(doc.handles).toEqual([handle])
expect(doc.services).toEqual({ atpPds })
+
})
+
+
it('retrieves the auditable operation log', async () => {
+
const log = await client.getOperationLog(did)
+
const auditable = await client.getAuditableLog(did)
+
// has one nullifed op
+
expect(auditable.length).toBe(log.length + 1)
+
expect(auditable.filter((op) => op.nullified).length).toBe(1)
+
expect(auditable.at(-2)?.nullified).toBe(true)
+
expect(
+
auditable.every((op) => check.is(op, plc.def.exportedOp)),
+
).toBeTruthy()
})
it('retrieves the did doc', async () => {
···
const ops = await client.getOperationLog(did)
await plc.validateOperationLog(did, ops)
+
})
+
+
it('exports the data set', async () => {
+
const data = await client.export()
+
expect(data.every((row) => check.is(row, plc.def.exportedOp))).toBeTruthy()
+
expect(data.length).toBe(58)
+
for (let i = 1; i < data.length; i++) {
+
expect(data[i].createdAt >= data[i - 1].createdAt).toBeTruthy()
+
}
})
it('healthcheck succeeds when database is available.', async () => {