decentralised sync engine

Compare changes

Choose any two refs to compare.

Changed files
+256 -23
src
lib
routes
connect
handshake
xrpc
_health
systems.gmstn.development.lattice.getOwner
server
+12
src/lib/types/lexicon/systems.gmstn.development.channel.invite.ts
···
+
import { comAtprotoRepoStrongRefSchema, didSchema } from "@/lib/types/atproto";
+
import { z } from "zod";
+
+
export const systemsGmstnDevelopmentChannelInviteRecordSchema = z.object({
+
$type: z.string(),
+
channel: comAtprotoRepoStrongRefSchema,
+
recipient: didSchema,
+
createdAt: z.coerce.date(),
+
});
+
export type SystemsGmstnDevelopmentChannelInvite = z.infer<
+
typeof systemsGmstnDevelopmentChannelInviteRecordSchema
+
>;
+4 -3
src/lib/types/http/handlers.ts
···
+
import { systemsGmstnDevelopmentChannelMembershipRecordSchema } from "@/lib/types/lexicon/systems.gmstn.development.channel.membership";
import { z } from "zod";
-
export const handshakeDataSchema = z.object({
+
export const latticeHandshakeDataSchema = z.object({
interServiceJwt: z.string(),
-
channelAtUris: z.array(z.string()),
+
memberships: z.array(systemsGmstnDevelopmentChannelMembershipRecordSchema),
});
-
export type HandshakeData = z.infer<typeof handshakeDataSchema>;
+
export type LatticeHandshakeData = z.infer<typeof latticeHandshakeDataSchema>;
+2 -2
src/routes/handshake/route.ts
···
-
import { clientHandshakeHandler } from "@/lib/handlers/handshake";
+
import { latticeHandshakeHandler } from "@/lib/handlers/latticeHandshake";
import type { Route } from "@/lib/types/routes";
export const handshakeRoute: Route = {
method: "POST",
-
handler: clientHandshakeHandler,
+
handler: latticeHandshakeHandler,
};
+5
src/server/index.ts
···
import websocket from "@fastify/websocket";
+
import cors from "@fastify/cors";
import Fastify from "fastify";
export const setupServer = async () => {
···
logger: true,
});
+
await fastify.register(cors, {
+
origin: true,
+
});
+
await fastify.register(websocket);
return fastify;
+12 -1
src/lib/types/routes.ts
···
-
import type { FastifyReply, FastifyRequest } from "fastify";
+
import type {
+
FastifyReply,
+
FastifyRequest,
+
HookHandlerDoneFunction,
+
} from "fastify";
import type { WebSocket } from "ws";
export type RouteHandler = (
···
reply: FastifyReply,
) => Response | Promise<Response>;
+
export type PreHandler = (
+
req: FastifyRequest,
+
reply: FastifyReply,
+
done: HookHandlerDoneFunction,
+
) => void;
+
export type Method = "GET" | "POST" | "PUT" | "DELETE" | "PATCH";
export interface Route {
···
method?: Method;
handler?: RouteHandler;
wsHandler: WsRouteHandler;
+
preHandler?: PreHandler;
skipRegistrationCheckHttp?: true;
skipRegistrationCheckWs?: true;
}
+7
src/routes/connect/route.ts
···
+
import { connectPreHandler, connectWsHandler } from "@/lib/handlers/connect";
+
import type { WsRoute } from "@/lib/types/routes";
+
+
export const connectRoute: WsRoute = {
+
wsHandler: connectWsHandler,
+
preHandler: connectPreHandler,
+
};
+1 -1
src/lib/utils/registration.ts
···
import { prismCommitSchema } from "@/lib/types/prism";
import type { RouteHandler, WsRouteHandler } from "@/lib/types/routes";
import { newErrorResponse } from "@/lib/utils/http/responses";
-
import { rawDataToString } from "@/lib/utils/ws";
+
import { rawDataToString } from "@/lib/utils/ws/validate";
import type { RawData } from "ws";
import type WebSocket from "ws";
-11
src/lib/utils/ws/index.ts
···
-
import type { RawData } from "ws";
-
-
export const rawDataToString = (data: RawData): string => {
-
if (Buffer.isBuffer(data)) {
-
return data.toString("utf-8");
-
}
-
if (Array.isArray(data)) {
-
return Buffer.concat(data).toString("utf-8");
-
}
-
return new TextDecoder().decode(data);
-
};
+45
src/lib/utils/ws/validate.ts
···
+
import type { WebsocketMessage } from "@/lib/types/messages";
+
import { websocketMessageSchema } from "@/lib/types/messages";
+
import type { Result } from "@/lib/utils/result";
+
import { z } from "zod";
+
import type { RawData } from "ws";
+
+
export const rawDataToString = (data: RawData): string => {
+
if (Buffer.isBuffer(data)) {
+
return data.toString("utf-8");
+
}
+
if (Array.isArray(data)) {
+
return Buffer.concat(data).toString("utf-8");
+
}
+
return new TextDecoder().decode(data);
+
};
+
+
export const validateWsMessageString = (
+
data: unknown,
+
): Result<string, unknown> => {
+
const { success, error, data: message } = z.string().safeParse(data);
+
if (!success) {
+
console.error("Error decoding websocket message");
+
console.error(error);
+
return { ok: false, error: z.treeifyError(error) };
+
}
+
return { ok: true, data: message };
+
};
+
+
export const validateWsMessageType = (
+
data: unknown,
+
): Result<WebsocketMessage, unknown> => {
+
const {
+
success: wsMessageSuccess,
+
error: wsMessageError,
+
data: wsMessage,
+
} = websocketMessageSchema.safeParse(data);
+
if (!wsMessageSuccess) {
+
console.error(
+
"Error parsing websocket message. The data might be the wrong shape.",
+
);
+
console.error(wsMessageError);
+
return { ok: false, error: z.treeifyError(wsMessageError) };
+
}
+
return { ok: true, data: wsMessage };
+
};
+44 -2
src/lib/handlers/connect.ts
···
import {
createNewSession,
+
deleteSession,
issuedLatticeTokens,
isValidSession,
} from "@/lib/sessions";
-
import { shardMessageSchema } from "@/lib/types/messages";
+
import {
+
requestHistoryMessageSchema,
+
shardMessageSchema,
+
} from "@/lib/types/messages";
import type { PreHandler, WsRouteHandler } from "@/lib/types/routes";
import { stringToAtUri } from "@/lib/utils/atproto";
-
import { sendToChannelClients, storeMessageInShard } from "@/lib/utils/gmstn";
+
import {
+
sendHistoryRequestToShard,
+
sendToChannelClients,
+
storeMessageInShard,
+
} from "@/lib/utils/gmstn";
import {
rawDataToString,
validateWsMessageType,
···
sendToChannelClients({ channelAtUri, message: shardMessage });
storeMessageInShard({ channelAtUri, message: shardMessage });
+
break;
+
}
+
case "shard/requestHistory": {
+
const {
+
success,
+
error,
+
data: requestHistoryMessage,
+
} = requestHistoryMessageSchema.safeParse(
+
validateTypeResult.data,
+
);
+
if (!success) {
+
console.error(
+
"could not parse",
+
validateTypeResult.data,
+
"as a valid history request message.",
+
);
+
console.error(z.treeifyError(error));
+
return;
+
}
+
+
const { channel } = requestHistoryMessage;
+
+
const atUriParseResult = stringToAtUri(channel);
+
if (!atUriParseResult.ok) return;
+
const { data: channelAtUri } = atUriParseResult;
+
+
sendHistoryRequestToShard({
+
channelAtUri,
+
message: requestHistoryMessage,
+
});
}
}
});
+
+
socket.on("close", () => {
+
deleteSession(sessionInfo);
+
});
};
+54
src/lib/listeners/shard-history.ts
···
+
import { clientSessions } from "@/lib/sessions";
+
import { historyMessageSchema } from "@/lib/types/messages";
+
import {
+
rawDataToString,
+
validateWsMessageType,
+
} from "@/lib/utils/ws/validate";
+
import type WebSocket from "ws";
+
import { z } from "zod";
+
+
export const attachHistoryFromShardListener = (socket: WebSocket) => {
+
socket.on("message", (rawData) => {
+
const event = rawDataToString(rawData);
+
+
const data: unknown = JSON.parse(event);
+
const validateTypeResult = validateWsMessageType(data);
+
if (!validateTypeResult.ok) return;
+
+
console.log("received", validateTypeResult.data, "from shard")
+
+
const { type: messageType } = validateTypeResult.data;
+
if (messageType !== "shard/history") return;
+
const {
+
success,
+
error,
+
data: historyMessage,
+
} = historyMessageSchema.safeParse(validateTypeResult.data);
+
if (!success) {
+
console.error(
+
"could not parse",
+
validateTypeResult.data,
+
"as a valid history message.",
+
);
+
console.error(z.treeifyError(error));
+
return;
+
}
+
const { forClient: intendedRecipient } = historyMessage;
+
const clientSessionInfo = clientSessions
+
.keys()
+
.find((sessionInfo) => sessionInfo.clientDid === intendedRecipient);
+
if (!clientSessionInfo) {
+
console.error("Could not client session info in sessions map.");
+
return;
+
}
+
const clientSocket = clientSessions.get(clientSessionInfo);
+
if (!clientSocket) {
+
console.error(
+
"Could find session info in map but somehow couldn't find socket? This should not happen.",
+
);
+
return;
+
}
+
clientSocket.send(JSON.stringify(historyMessage));
+
console.log("sent off", historyMessage, "to client")
+
});
+
};
+40 -1
src/lib/utils/gmstn.ts
···
+
import { attachHistoryFromShardListener } from "@/lib/listeners/shard-history";
import { clientSessions } from "@/lib/sessions";
import { shardSessions } from "@/lib/state";
import type { AtUri, Did } from "@/lib/types/atproto";
import type { ShardSessionInfo } from "@/lib/types/handshake";
-
import type { ShardMessage } from "@/lib/types/messages";
+
import type { RequestHistoryMessage, ShardMessage } from "@/lib/types/messages";
import { getEndpointFromDid } from "@/lib/utils/atproto";
import WebSocket from "ws";
···
endpoint.searchParams.append("token", token);
const ws = new WebSocket(endpoint);
shardSessions.set(sessionInfo, ws);
+
attachHistoryFromShardListener(ws);
return ws;
};
···
);
});
};
+
+
export const sendHistoryRequestToShard = ({
+
channelAtUri,
+
message,
+
}: {
+
channelAtUri: AtUri;
+
message: RequestHistoryMessage;
+
}) => {
+
const shardSessionInfo = shardSessions
+
.keys()
+
.find((sessionInfo) =>
+
sessionInfo.allowedChannels.some(
+
(allowedChannel) => allowedChannel.rKey === channelAtUri.rKey,
+
),
+
);
+
if (!shardSessionInfo) return;
+
+
const shardSocket = shardSessions.get(shardSessionInfo);
+
if (!shardSocket) {
+
console.error(
+
"Could find session info object in map, but socket could not be retrieved from map. Race condition?",
+
);
+
return;
+
}
+
const messageToSendToShard = {
+
...message,
+
};
+
if (shardSocket.readyState === WebSocket.OPEN)
+
shardSocket.send(JSON.stringify(messageToSendToShard));
+
+
console.log(
+
"Sent off message",
+
message,
+
"to shard located at",
+
shardSocket.url,
+
);
+
};
+9
src/lib/handlers/getOwnerDid.ts
···
+
import { OWNER_DID } from "@/lib/env";
+
import { getRegistrationState } from "@/lib/state";
+
import type { RouteHandler } from "@/lib/types/routes";
+
import { newSuccessResponse } from "@/lib/utils/http/responses";
+
+
export const getOwnerHandler: RouteHandler = () => {
+
const { registered } = getRegistrationState();
+
return newSuccessResponse({ registered, ownerDid: OWNER_DID });
+
};
+10
src/routes/xrpc/_health/route.ts
···
+
import type { Route } from "@/lib/types/routes";
+
+
export const healthRoute: Route = {
+
method: "GET",
+
handler: () => {
+
return new Response("this lattice is running at 0.0.1", {
+
headers: { "content-type": "text/plain; charset=utf-8" },
+
});
+
},
+
};
+7
src/routes/xrpc/systems.gmstn.development.lattice.getOwner/route.ts
···
+
import { getOwnerHandler } from "@/lib/handlers/getOwnerDid";
+
import type { Route } from "@/lib/types/routes";
+
+
export const systemsGmstnDevelopmentShardGetOwnerRoute: Route = {
+
method: "GET",
+
handler: getOwnerHandler,
+
};
+1
.gitignore
···
/node_modules
/dist
.env
+
.docker.env
*.tsbuildinfo
+2 -1
src/lib/setup.ts
···
channels: channelAtUris,
});
if (!handshakeResult.ok) {
-
console.error(handshakeResult.error);
+
console.error("Handshake to", shardDid, "failed.");
+
console.error(JSON.stringify(handshakeResult.error));
continue;
}
const sessionInfo = handshakeResult.data;
+1 -1
src/lib/env.ts
···
"Environment variable SERVICE_DID not set. Defaulting to `did:web:localhost`",
);
}
-
export const SERVICE_DID = serviceDidParsed ?? "did:web:localhost";
+
export const SERVICE_DID = serviceDidParsed ?? `did:web:localhost%3A${SERVER_PORT.toString()}`;
const constellationUrl = process.env.CONSTELLATION_URL;
let constellationUrlParsed: URL | undefined;