decentralised sync engine

feat: broadcast incoming message to clients

serenity 9a18a2ca 5576769c

Changed files
+64 -11
src
lib
+19 -3
src/lib/handlers/connect.ts
···
issuedLatticeTokens,
isValidSession,
} from "@/lib/sessions";
-
import type { ShardMessage } from "@/lib/types/messages";
+
import { shardMessageSchema } from "@/lib/types/messages";
import type { PreHandler, WsRouteHandler } from "@/lib/types/routes";
import { stringToAtUri } from "@/lib/utils/atproto";
-
import { storeMessageInShard } from "@/lib/utils/gmstn";
+
import { sendToChannelClients, storeMessageInShard } from "@/lib/utils/gmstn";
import {
rawDataToString,
validateWsMessageType,
} from "@/lib/utils/ws/validate";
+
import { z } from "zod";
export const connectPreHandler: PreHandler = (req, reply, done) => {
const { query } = req;
···
switch (messageType) {
case "shard/message": {
-
const shardMessage = validateTypeResult.data as ShardMessage;
+
const {
+
success,
+
error,
+
data: shardMessage,
+
} = shardMessageSchema.safeParse(validateTypeResult.data);
+
if (!success) {
+
console.error(
+
"could not parse",
+
validateTypeResult.data,
+
"as a valid ShardMessage.",
+
);
+
console.error(z.treeifyError(error));
+
return;
+
}
const { channel } = shardMessage;
+
const atUriParseResult = stringToAtUri(channel);
if (!atUriParseResult.ok) return;
const { data: channelAtUri } = atUriParseResult;
+
sendToChannelClients({ channelAtUri, message: shardMessage });
storeMessageInShard({ channelAtUri, message: shardMessage });
}
}
+4 -4
src/lib/sessions.ts
···
return sessionInfo;
};
-
export const activeLatticeSessions = new Map<LatticeSessionInfo, WebSocket>();
+
export const clientSessions = new Map<LatticeSessionInfo, WebSocket>();
export const isValidSession = (sessionInfo: LatticeSessionInfo) => {
return (
···
} catch {
return { ok: false };
}
-
activeLatticeSessions.set(sessionInfo, socket);
+
clientSessions.set(sessionInfo, socket);
return { ok: true, data: { sessionSocket: socket } };
};
export const deleteSession = (
sessionInfo: LatticeSessionInfo,
): Result<undefined, undefined> => {
-
if (!activeLatticeSessions.has(sessionInfo)) return { ok: false };
+
if (!clientSessions.has(sessionInfo)) return { ok: false };
try {
-
activeLatticeSessions.delete(sessionInfo);
+
clientSessions.delete(sessionInfo);
} catch {
return { ok: false };
}
+41 -4
src/lib/utils/gmstn.ts
···
+
import { clientSessions } from "@/lib/sessions";
import { shardSessions } from "@/lib/state";
import type { AtUri, Did } from "@/lib/types/atproto";
import type { ShardSessionInfo } from "@/lib/types/handshake";
···
channelAtUri: AtUri;
message: ShardMessage;
}) => {
-
const sessionInfo = shardSessions
+
const shardSessionInfo = shardSessions
.keys()
.find((sessionInfo) =>
sessionInfo.allowedChannels.some(
(allowedChannel) => allowedChannel.rKey === channelAtUri.rKey,
),
);
-
if (!sessionInfo) return;
+
if (!shardSessionInfo) return;
-
const shardSocket = shardSessions.get(sessionInfo);
+
const shardSocket = shardSessions.get(shardSessionInfo);
if (!shardSocket) {
console.error(
"Could find session info object in map, but socket could not be retrieved from map. Race condition?",
);
return;
}
+
const messageToSendToShard = {
+
...message,
+
sessionToken: shardSessionInfo.token,
+
};
if (shardSocket.readyState === WebSocket.OPEN)
-
shardSocket.send(JSON.stringify(message));
+
shardSocket.send(JSON.stringify(messageToSendToShard));
console.log(
"Sent off message",
···
shardSocket.url,
);
};
+
+
export const sendToChannelClients = ({
+
channelAtUri,
+
message,
+
}: {
+
channelAtUri: AtUri;
+
message: ShardMessage;
+
}) => {
+
const sessions = clientSessions
+
.keys()
+
.filter((sessionInfo) =>
+
sessionInfo.allowedChannels.some(
+
(allowedChannel) => allowedChannel.rKey === channelAtUri.rKey,
+
),
+
);
+
+
const clientSockets = sessions
+
.map((session) => {
+
return clientSessions.get(session);
+
})
+
.filter((e) => e !== undefined);
+
+
clientSockets.forEach((clientSocket) => {
+
clientSocket.send(JSON.stringify(message));
+
console.log(
+
"Sent off message",
+
message,
+
"to clientSocket pointing to",
+
clientSocket.url,
+
);
+
});
+
};