decentralised sync engine

feat: shard forwarding

serenity c02444e2 4bedde9c

Changed files
+65 -6
src
lib
handlers
utils
+29 -5
src/lib/handlers/connect.ts
···
issuedLatticeTokens,
isValidSession,
} from "@/lib/sessions";
import type { PreHandler, WsRouteHandler } from "@/lib/types/routes";
-
import { rawDataToString } from "@/lib/utils/ws";
export const connectPreHandler: PreHandler = (req, reply, done) => {
const { query } = req;
···
return;
}
-
socket.on("message", (event) => {
-
const message = rawDataToString(event);
-
console.log(message)
-
})
};
···
issuedLatticeTokens,
isValidSession,
} from "@/lib/sessions";
+
import { shardSessions } from "@/lib/state";
+
import type { ShardMessage } from "@/lib/types/messages";
import type { PreHandler, WsRouteHandler } from "@/lib/types/routes";
+
import { stringToAtUri } from "@/lib/utils/atproto";
+
import { storeMessageInShard } from "@/lib/utils/gmstn";
+
import {
+
rawDataToString,
+
validateWsMessageType,
+
} from "@/lib/utils/ws/validate";
export const connectPreHandler: PreHandler = (req, reply, done) => {
const { query } = req;
···
return;
}
+
socket.on("message", (rawData) => {
+
const event = rawDataToString(rawData);
+
+
const data: unknown = JSON.parse(event);
+
const validateTypeResult = validateWsMessageType(data);
+
if (!validateTypeResult.ok) return;
+
+
const { type: messageType } = validateTypeResult.data;
+
+
switch (messageType) {
+
case "shard/message": {
+
const shardMessage = validateTypeResult.data as ShardMessage;
+
const { channel } = shardMessage;
+
const atUriParseResult = stringToAtUri(channel);
+
if (!atUriParseResult.ok) return;
+
const { data: channelAtUri } = atUriParseResult;
+
+
storeMessageInShard({ channelAtUri, message: shardMessage });
+
}
+
}
+
});
};
+36 -1
src/lib/utils/gmstn.ts
···
import { shardSessions } from "@/lib/state";
-
import type { Did } from "@/lib/types/atproto";
import type { ShardSessionInfo } from "@/lib/types/handshake";
import { getEndpointFromDid } from "@/lib/utils/atproto";
import WebSocket from "ws";
···
shardSessions.set(sessionInfo, ws);
return ws;
};
···
import { shardSessions } from "@/lib/state";
+
import type { AtUri, Did } from "@/lib/types/atproto";
import type { ShardSessionInfo } from "@/lib/types/handshake";
+
import type { ShardMessage } from "@/lib/types/messages";
import { getEndpointFromDid } from "@/lib/utils/atproto";
import WebSocket from "ws";
···
shardSessions.set(sessionInfo, ws);
return ws;
};
+
+
export const storeMessageInShard = ({
+
channelAtUri,
+
message,
+
}: {
+
channelAtUri: AtUri;
+
message: ShardMessage;
+
}) => {
+
const sessionInfo = shardSessions
+
.keys()
+
.find((sessionInfo) =>
+
sessionInfo.allowedChannels.some(
+
(allowedChannel) => allowedChannel.rKey === channelAtUri.rKey,
+
),
+
);
+
if (!sessionInfo) return;
+
+
const shardSocket = shardSessions.get(sessionInfo);
+
if (!shardSocket) {
+
console.error(
+
"Could find session info object in map, but socket could not be retrieved from map. Race condition?",
+
);
+
return;
+
}
+
if (shardSocket.readyState === WebSocket.OPEN)
+
shardSocket.send(JSON.stringify(message));
+
+
console.log(
+
"Sent off message",
+
message,
+
"to shard located at",
+
shardSocket.url,
+
);
+
};