decentralised sync engine

feat: restore history from shard and close session if dropped

serenity fe69cc3a 08dc4206

Changed files
+140 -3
src
lib
handlers
listeners
types
utils
+44 -2
src/lib/handlers/connect.ts
···
import {
createNewSession,
issuedLatticeTokens,
isValidSession,
} from "@/lib/sessions";
-
import { shardMessageSchema } from "@/lib/types/messages";
import type { PreHandler, WsRouteHandler } from "@/lib/types/routes";
import { stringToAtUri } from "@/lib/utils/atproto";
-
import { sendToChannelClients, storeMessageInShard } from "@/lib/utils/gmstn";
import {
rawDataToString,
validateWsMessageType,
···
sendToChannelClients({ channelAtUri, message: shardMessage });
storeMessageInShard({ channelAtUri, message: shardMessage });
}
}
});
};
···
import {
createNewSession,
+
deleteSession,
issuedLatticeTokens,
isValidSession,
} from "@/lib/sessions";
+
import {
+
requestHistoryMessageSchema,
+
shardMessageSchema,
+
} from "@/lib/types/messages";
import type { PreHandler, WsRouteHandler } from "@/lib/types/routes";
import { stringToAtUri } from "@/lib/utils/atproto";
+
import {
+
sendHistoryRequestToShard,
+
sendToChannelClients,
+
storeMessageInShard,
+
} from "@/lib/utils/gmstn";
import {
rawDataToString,
validateWsMessageType,
···
sendToChannelClients({ channelAtUri, message: shardMessage });
storeMessageInShard({ channelAtUri, message: shardMessage });
+
break;
+
}
+
case "shard/requestHistory": {
+
const {
+
success,
+
error,
+
data: requestHistoryMessage,
+
} = requestHistoryMessageSchema.safeParse(
+
validateTypeResult.data,
+
);
+
if (!success) {
+
console.error(
+
"could not parse",
+
validateTypeResult.data,
+
"as a valid history request message.",
+
);
+
console.error(z.treeifyError(error));
+
return;
+
}
+
+
const { channel } = requestHistoryMessage;
+
+
const atUriParseResult = stringToAtUri(channel);
+
if (!atUriParseResult.ok) return;
+
const { data: channelAtUri } = atUriParseResult;
+
+
sendHistoryRequestToShard({
+
channelAtUri,
+
message: requestHistoryMessage,
+
});
}
}
+
});
+
+
socket.on("close", () => {
+
deleteSession(sessionInfo);
});
};
+54
src/lib/listeners/shard-history.ts
···
···
+
import { clientSessions } from "@/lib/sessions";
+
import { historyMessageSchema } from "@/lib/types/messages";
+
import {
+
rawDataToString,
+
validateWsMessageType,
+
} from "@/lib/utils/ws/validate";
+
import type WebSocket from "ws";
+
import { z } from "zod";
+
+
export const attachHistoryFromShardListener = (socket: WebSocket) => {
+
socket.on("message", (rawData) => {
+
const event = rawDataToString(rawData);
+
+
const data: unknown = JSON.parse(event);
+
const validateTypeResult = validateWsMessageType(data);
+
if (!validateTypeResult.ok) return;
+
+
console.log("received", validateTypeResult.data, "from shard")
+
+
const { type: messageType } = validateTypeResult.data;
+
if (messageType !== "shard/history") return;
+
const {
+
success,
+
error,
+
data: historyMessage,
+
} = historyMessageSchema.safeParse(validateTypeResult.data);
+
if (!success) {
+
console.error(
+
"could not parse",
+
validateTypeResult.data,
+
"as a valid history message.",
+
);
+
console.error(z.treeifyError(error));
+
return;
+
}
+
const { forClient: intendedRecipient } = historyMessage;
+
const clientSessionInfo = clientSessions
+
.keys()
+
.find((sessionInfo) => sessionInfo.clientDid === intendedRecipient);
+
if (!clientSessionInfo) {
+
console.error("Could not client session info in sessions map.");
+
return;
+
}
+
const clientSocket = clientSessions.get(clientSessionInfo);
+
if (!clientSocket) {
+
console.error(
+
"Could find session info in map but somehow couldn't find socket? This should not happen.",
+
);
+
return;
+
}
+
clientSocket.send(JSON.stringify(historyMessage));
+
console.log("sent off", historyMessage, "to client")
+
});
+
};
+2
src/lib/types/messages.ts
···
type: z.literal("shard/history"),
messages: z.optional(z.array(shardMessageSchema)),
channel: z.string(),
})
.strict();
export type HistoryMessage = z.infer<typeof historyMessageSchema>;
···
.safeExtend({
type: z.literal("shard/requestHistory"),
channel: z.string(),
})
.strict();
export type RequestHistoryMessage = z.infer<typeof requestHistoryMessageSchema>;
···
type: z.literal("shard/history"),
messages: z.optional(z.array(shardMessageSchema)),
channel: z.string(),
+
forClient: didSchema,
})
.strict();
export type HistoryMessage = z.infer<typeof historyMessageSchema>;
···
.safeExtend({
type: z.literal("shard/requestHistory"),
channel: z.string(),
+
requestedBy: didSchema,
})
.strict();
export type RequestHistoryMessage = z.infer<typeof requestHistoryMessageSchema>;
+40 -1
src/lib/utils/gmstn.ts
···
import { clientSessions } from "@/lib/sessions";
import { shardSessions } from "@/lib/state";
import type { AtUri, Did } from "@/lib/types/atproto";
import type { ShardSessionInfo } from "@/lib/types/handshake";
-
import type { ShardMessage } from "@/lib/types/messages";
import { getEndpointFromDid } from "@/lib/utils/atproto";
import WebSocket from "ws";
···
endpoint.searchParams.append("token", token);
const ws = new WebSocket(endpoint);
shardSessions.set(sessionInfo, ws);
return ws;
};
···
);
});
};
···
+
import { attachHistoryFromShardListener } from "@/lib/listeners/shard-history";
import { clientSessions } from "@/lib/sessions";
import { shardSessions } from "@/lib/state";
import type { AtUri, Did } from "@/lib/types/atproto";
import type { ShardSessionInfo } from "@/lib/types/handshake";
+
import type { RequestHistoryMessage, ShardMessage } from "@/lib/types/messages";
import { getEndpointFromDid } from "@/lib/utils/atproto";
import WebSocket from "ws";
···
endpoint.searchParams.append("token", token);
const ws = new WebSocket(endpoint);
shardSessions.set(sessionInfo, ws);
+
attachHistoryFromShardListener(ws);
return ws;
};
···
);
});
};
+
+
export const sendHistoryRequestToShard = ({
+
channelAtUri,
+
message,
+
}: {
+
channelAtUri: AtUri;
+
message: RequestHistoryMessage;
+
}) => {
+
const shardSessionInfo = shardSessions
+
.keys()
+
.find((sessionInfo) =>
+
sessionInfo.allowedChannels.some(
+
(allowedChannel) => allowedChannel.rKey === channelAtUri.rKey,
+
),
+
);
+
if (!shardSessionInfo) return;
+
+
const shardSocket = shardSessions.get(shardSessionInfo);
+
if (!shardSocket) {
+
console.error(
+
"Could find session info object in map, but socket could not be retrieved from map. Race condition?",
+
);
+
return;
+
}
+
const messageToSendToShard = {
+
...message,
+
};
+
if (shardSocket.readyState === WebSocket.OPEN)
+
shardSocket.send(JSON.stringify(messageToSendToShard));
+
+
console.log(
+
"Sent off message",
+
message,
+
"to shard located at",
+
shardSocket.url,
+
);
+
};