decentralised sync engine

feat!: import a bunch of stuff

serenity be0e93f6 a807aa04

Changed files
+355 -34
src
+191
src/lib/handlers/handshake.ts
···
+
import { OWNER_DID, SERVICE_DID } from "@/lib/env";
+
import { issueNewLatticeToken } from "@/lib/sessions";
+
import { HttpGeneralErrorType } from "@/lib/types/http/errors";
+
import { handshakeDataSchema } from "@/lib/types/http/handlers";
+
import { systemsGmstnDevelopmentChannelRecordSchema } from "@/lib/types/lexicon/systems.gmstn.development.channel";
+
import type { RouteHandler } from "@/lib/types/routes";
+
import { stringToAtUri } from "@/lib/utils/atproto";
+
import {
+
getConstellationBacklink,
+
getPdsRecordFromBacklink,
+
} from "@/lib/utils/constellation";
+
import {
+
newErrorResponse,
+
newSuccessResponse,
+
} from "@/lib/utils/http/responses";
+
import { verifyServiceJwt } from "@/lib/utils/verifyJwt";
+
import { z } from "zod";
+
+
export const handshakeHandler: RouteHandler = async (req) => {
+
const {
+
success: handshakeParseSuccess,
+
error: handshakeParseError,
+
data: handshakeData,
+
} = handshakeDataSchema.safeParse(req.body);
+
if (!handshakeParseSuccess) {
+
return newErrorResponse(400, {
+
message: HttpGeneralErrorType.TYPE_ERROR,
+
details: z.treeifyError(handshakeParseError),
+
});
+
}
+
+
const { interServiceJwt, channelAtUris: channelAtUriStrings } =
+
handshakeData;
+
const allowedChannels = channelAtUriStrings.map((channel) => {
+
const res = stringToAtUri(channel);
+
if (!res.ok) return;
+
return res.data;
+
});
+
+
const verifyJwtResult = await verifyServiceJwt(interServiceJwt);
+
if (!verifyJwtResult.ok) {
+
const { error } = verifyJwtResult;
+
return newErrorResponse(
+
401,
+
{
+
message:
+
"JWT authentication failed. Did you submit the right inter-service JWT to the right endpoint with the right signatures?",
+
details: error,
+
},
+
{
+
headers: {
+
"WWW-Authenticate":
+
'Bearer error="invalid_token", error_description="JWT signature verification failed"',
+
},
+
},
+
);
+
}
+
+
// TODO:
+
// if(PRIVATE_SHARD) doAllowCheck()
+
// see the sequence diagram for the proper flow.
+
// not implemented for now because we support public first
+
+
const constellationResponse = await getConstellationBacklink({
+
subject: `at://${OWNER_DID}/systems.gmstn.development.shard/${SERVICE_DID.slice(8)}`,
+
source: {
+
nsid: "systems.gmstn.development.channel",
+
fieldName: "storeAt.uri",
+
},
+
});
+
if (!constellationResponse.ok) {
+
const { error } = constellationResponse;
+
if ("fetchStatus" in error)
+
return newErrorResponse(error.fetchStatus, {
+
message:
+
"Could not fetch backlinks from constellation. Likely something went wrong on our side.",
+
details: error.message,
+
});
+
else
+
return newErrorResponse(400, {
+
message: HttpGeneralErrorType.TYPE_ERROR,
+
details: z.treeifyError(error),
+
});
+
}
+
+
const pdsRecordFetchPromises = constellationResponse.data.records.map(
+
async (backlink) => {
+
const recordResult = await getPdsRecordFromBacklink(backlink);
+
if (!recordResult.ok) {
+
console.error(
+
`something went wrong fetching the record from the given backlink ${JSON.stringify(backlink)}`,
+
);
+
throw new Error(
+
JSON.stringify({ error: recordResult.error, backlink }),
+
);
+
}
+
return recordResult.data;
+
},
+
);
+
+
let pdsChannelRecords;
+
try {
+
pdsChannelRecords = await Promise.all(pdsRecordFetchPromises);
+
} catch (err) {
+
return newErrorResponse(500, {
+
message:
+
"Something went wrong when fetching backlink channel records. Check the Shard logs if possible.",
+
details: err,
+
});
+
}
+
+
const {
+
success: channelRecordsParseSuccess,
+
error: channelRecordsParseError,
+
data: channelRecordsParsed,
+
} = z
+
.array(systemsGmstnDevelopmentChannelRecordSchema)
+
.safeParse(pdsChannelRecords);
+
if (!channelRecordsParseSuccess) {
+
return newErrorResponse(500, {
+
message:
+
"One of the backlinks returned by Constellation did not resolve to a proper lexicon Channel record.",
+
details: z.treeifyError(channelRecordsParseError),
+
});
+
}
+
+
// TODO:
+
// for private shards, ensure that the channels described by constellation backlinks are made
+
// by authorised parties (check owner pds for workspace management permissions)
+
// do another fetch to owner's pds first to grab the records, then cross-reference with the
+
// did of the backlink. if there are any channels described by unauthorised parties, simply drop them.
+
+
let mismatchOrIncorrect = false;
+
const requestingLatticeDid = verifyJwtResult.value.issuer;
+
+
channelRecordsParsed.forEach((channel) => {
+
if (mismatchOrIncorrect) return;
+
+
const { storeAt: storeAtRecord, routeThrough: routeThroughRecord } =
+
channel;
+
const storeAtRecordParseResult = stringToAtUri(storeAtRecord.uri);
+
if (!storeAtRecordParseResult.ok) {
+
mismatchOrIncorrect = true;
+
return;
+
}
+
const storeAtUri = storeAtRecordParseResult.data;
+
+
// FIXME: this assumes that the current shard's SERVICE_DID is a did:web.
+
// we should resolve the full record or add something that can tell us where to find this shard.
+
// likely, we should simply resolve the described shard record, which we can technically do faaaaar earlier on in the request
+
// or even store it in memory upon first boot of a shard.
+
// also incorrectly assumes that the storeAt rkey is a domain when it can in fact be anything.
+
// we should probably just resolve this properly first but for now, i cba.
+
if (storeAtUri.rKey !== SERVICE_DID.slice(8)) {
+
mismatchOrIncorrect = true;
+
return;
+
}
+
+
const routeThroughRecordParseResult = stringToAtUri(
+
routeThroughRecord.uri,
+
);
+
if (!routeThroughRecordParseResult.ok) {
+
mismatchOrIncorrect = true;
+
return;
+
}
+
const routeThroughUri = routeThroughRecordParseResult.data;
+
+
// FIXME: this also assumes that the requesting lattice's DID is a did:web
+
// see above for the rest of the issues.
+
if (routeThroughUri.rKey === requestingLatticeDid.slice(8)) {
+
mismatchOrIncorrect = true;
+
return;
+
}
+
});
+
+
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+
if (mismatchOrIncorrect)
+
return newErrorResponse(400, {
+
message:
+
"Channels provided during the handshake had a mismatch between the channel values. Ensure that you are only submitting exactly the channels you have access to.",
+
});
+
+
// yipee, it's a valid request :3
+
+
const sessionInfo = issueNewLatticeToken({
+
allowedChannels,
+
clientDid: verifyJwtResult.value.issuer,
+
});
+
+
return newSuccessResponse({ sessionInfo });
+
};
+115
src/lib/sessions.ts
···
+
import type { WebSocket } from "ws";
+
import * as crypto from "node:crypto";
+
import { SESSIONS_SECRET } from "@/lib/utils/crypto";
+
import type { Result } from "@/lib/utils/result";
+
import type { AtUri, Did } from "@/lib/types/atproto";
+
import { SERVER_PORT, SERVICE_DID } from "@/lib/env";
+
import type { LatticeSessionInfo } from "@/lib/types/handshake";
+
+
export const generateSessionId = () => {
+
return crypto.randomUUID();
+
};
+
+
export const generateLatticeSessionInfo = (
+
sessionId: string,
+
allowedChannels: Array<AtUri>,
+
clientDid: Did,
+
): LatticeSessionInfo => {
+
const token = crypto.randomBytes(32).toString("base64url");
+
+
const hmac = crypto.createHmac("sha256", SESSIONS_SECRET);
+
hmac.update(`${token}:${sessionId}`);
+
const fingerprint = hmac.digest("hex");
+
+
const latticeDid: Did = SERVICE_DID.includes("localhost")
+
? `${SERVICE_DID}%3A${SERVER_PORT.toString()}`
+
: SERVICE_DID;
+
+
return {
+
id: sessionId,
+
token,
+
fingerprint,
+
allowedChannels,
+
latticeDid,
+
clientDid,
+
};
+
};
+
+
export const verifyLatticeToken = ({
+
token,
+
fingerprint,
+
id: sessionId,
+
}: LatticeSessionInfo) => {
+
const hmac = crypto.createHmac("sha256", SESSIONS_SECRET);
+
hmac.update(`${token}:${sessionId}`);
+
const expectedFingerprint = hmac.digest("hex");
+
+
try {
+
return crypto.timingSafeEqual(
+
Buffer.from(fingerprint, "hex"),
+
Buffer.from(expectedFingerprint, "hex"),
+
);
+
} catch {
+
return false;
+
}
+
};
+
+
export const issuedLatticeTokens = new Map<string, LatticeSessionInfo>();
+
+
export const issueNewLatticeToken = ({
+
allowedChannels,
+
clientDid,
+
}: {
+
allowedChannels: Array<AtUri | undefined>;
+
clientDid: Did;
+
}) => {
+
const filteredChannels = allowedChannels.filter(
+
(channels) => channels !== undefined,
+
);
+
const sessionId = generateSessionId();
+
const sessionInfo = generateLatticeSessionInfo(
+
sessionId,
+
filteredChannels,
+
clientDid,
+
);
+
console.log("Issuing new handshake token with session info", sessionInfo);
+
issuedLatticeTokens.set(sessionInfo.token, sessionInfo);
+
return sessionInfo;
+
};
+
+
export const activeSessions = new Map<string, WebSocket>();
+
+
export const isValidSession = (sessionInfo: LatticeSessionInfo) => {
+
return (
+
issuedLatticeTokens.has(sessionInfo.token) &&
+
verifyLatticeToken(sessionInfo)
+
);
+
};
+
+
export const createNewSession = ({
+
sessionInfo,
+
socket,
+
}: {
+
sessionInfo: LatticeSessionInfo;
+
socket: WebSocket;
+
}): Result<{ sessionSocket: WebSocket }, undefined> => {
+
try {
+
issuedLatticeTokens.delete(sessionInfo.token);
+
} catch {
+
return { ok: false };
+
}
+
activeSessions.set(sessionInfo.id, socket);
+
return { ok: true, data: { sessionSocket: socket } };
+
};
+
+
export const deleteSession = (
+
sessionInfo: LatticeSessionInfo,
+
): Result<undefined, undefined> => {
+
if (!activeSessions.has(sessionInfo.id)) return { ok: false };
+
try {
+
activeSessions.delete(sessionInfo.id);
+
} catch {
+
return { ok: false };
+
}
+
return { ok: true };
+
};
+10
src/lib/types/handshake.ts
···
latticeDid: didSchema,
});
export type ShardSessionInfo = z.infer<typeof shardSessionInfoSchema>;
+
+
export const latticeSessionInfoSchema = z.object({
+
id: z.string(),
+
token: z.string(),
+
fingerprint: z.string(),
+
allowedChannels: z.array(atUriSchema),
+
clientDid: didSchema,
+
latticeDid: didSchema,
+
});
+
export type LatticeSessionInfo = z.infer<typeof latticeSessionInfoSchema>;
+7
src/lib/types/http/handlers.ts
···
+
import { z } from "zod";
+
+
export const handshakeDataSchema = z.object({
+
interServiceJwt: z.string(),
+
channelAtUris: z.array(z.string()),
+
});
+
export type HandshakeData = z.infer<typeof handshakeDataSchema>;
+2 -2
src/lib/types/http/responses.ts
···
-
import { shardSessionInfoSchema } from "@/lib/types/handshake";
+
import { latticeSessionInfoSchema } from "@/lib/types/handshake";
import { httpResponseErrorInfoSchema } from "@/lib/types/http/errors";
import { z } from "zod";
···
>;
export const handshakeResponseSchema = z.object({
-
sessionInfo: shardSessionInfoSchema,
+
sessionInfo: latticeSessionInfoSchema,
});
export type HandshakeResponse = z.infer<typeof handshakeResponseSchema>;
+7
src/lib/utils/crypto.ts
···
+
import * as crypto from "node:crypto";
+
+
export const generateNewSecret = () => {
+
return crypto.randomBytes(32).toString("hex");
+
};
+
+
export const SESSIONS_SECRET = generateNewSecret();
+14
src/lib/utils/verifyJwt.ts
···
+
import { SERVER_PORT, SERVICE_DID } from "@/lib/env";
+
import { didDocResolver } from "@/lib/utils/atproto";
+
import { ServiceJwtVerifier } from "@atcute/xrpc-server/auth";
+
+
export const verifyServiceJwt = async (jwt: string) => {
+
const serviceDid = SERVICE_DID.startsWith("did:web:localhost")
+
? (`${SERVICE_DID}%3A${SERVER_PORT.toString()}` as `did:${string}:${string}`)
+
: SERVICE_DID;
+
const verifier = new ServiceJwtVerifier({
+
resolver: didDocResolver,
+
serviceDid,
+
});
+
return await verifier.verify(jwt);
+
};
+7
src/routes/handshake/route.ts
···
+
import { clientHandshakeHandler } from "@/lib/handlers/handshake";
+
import type { Route } from "@/lib/types/routes";
+
+
export const handshakeRoute: Route = {
+
method: "POST",
+
handler: clientHandshakeHandler,
+
};
+2 -2
src/routes/index.ts
···
import type { Route, WsRoute } from "@/lib/types/routes";
import { didWebDocRoute } from "@/routes/dot-well-known/did-dot-json/route";
+
import { handshakeRoute } from "@/routes/handshake/route";
import { indexRoute } from "@/routes/route";
-
import { testingRoute } from "@/routes/testing/route";
export const routes: Record<string, Route | WsRoute> = {
"/": indexRoute,
"/.well-known/did.json": didWebDocRoute,
-
"/testing": testingRoute,
+
"/handshake": handshakeRoute,
};
-30
src/routes/testing/route.ts
···
-
import type { Route } from "@/lib/types/routes";
-
import { initiateHandshakeTo } from "@/lib/utils/handshake";
-
import {
-
newErrorResponse,
-
newSuccessResponse,
-
} from "@/lib/utils/http/responses";
-
-
export const testingRoute: Route = {
-
method: "GET",
-
handler: async () => {
-
const sessionInfo = await initiateHandshakeTo({
-
did: "did:web:localhost%3A7337",
-
channels: [
-
{
-
authority: "did:plc:knucpdtudgdpyoeydicvhzel",
-
collection: "systems.gmstn.development.channel",
-
rKey: "3m3tpcwneq22e",
-
},
-
],
-
});
-
if (!sessionInfo.ok)
-
return newErrorResponse(400, {
-
message: "something went wrong with the handshake.",
-
details: sessionInfo.error,
-
});
-
return newSuccessResponse({
-
sessionInfo: sessionInfo.data,
-
});
-
},
-
};