decentralised sync engine
1import { attachHistoryFromShardListener } from "@/lib/listeners/shard-history";
2import { clientSessions } from "@/lib/sessions";
3import { shardSessions } from "@/lib/state";
4import type { AtUri, Did } from "@/lib/types/atproto";
5import type { ShardSessionInfo } from "@/lib/types/handshake";
6import type { RequestHistoryMessage, ShardMessage } from "@/lib/types/messages";
7import { getEndpointFromDid } from "@/lib/utils/atproto";
8import WebSocket from "ws";
9
10export const getShardEndpointFromDid = async (did: Did) => {
11 return await getEndpointFromDid(did, "GemstoneShard");
12};
13
14export const connectToShard = ({
15 shardUrl,
16 sessionInfo,
17}: {
18 shardUrl: string;
19 sessionInfo: ShardSessionInfo;
20}) => {
21 const endpoint = new URL(shardUrl);
22 const { token } = sessionInfo;
23 endpoint.searchParams.append("token", token);
24 const ws = new WebSocket(endpoint);
25 shardSessions.set(sessionInfo, ws);
26 attachHistoryFromShardListener(ws);
27 return ws;
28};
29
30export const storeMessageInShard = ({
31 channelAtUri,
32 message,
33}: {
34 channelAtUri: AtUri;
35 message: ShardMessage;
36}) => {
37 const shardSessionInfo = shardSessions
38 .keys()
39 .find((sessionInfo) =>
40 sessionInfo.allowedChannels.some(
41 (allowedChannel) => allowedChannel.rKey === channelAtUri.rKey,
42 ),
43 );
44 if (!shardSessionInfo) return;
45
46 const shardSocket = shardSessions.get(shardSessionInfo);
47 if (!shardSocket) {
48 console.error(
49 "Could find session info object in map, but socket could not be retrieved from map. Race condition?",
50 );
51 return;
52 }
53 const messageToSendToShard = {
54 ...message,
55 };
56 if (shardSocket.readyState === WebSocket.OPEN)
57 shardSocket.send(JSON.stringify(messageToSendToShard));
58
59 console.log(
60 "Sent off message",
61 message,
62 "to shard located at",
63 shardSocket.url,
64 );
65};
66
67export const sendToChannelClients = ({
68 channelAtUri,
69 message,
70}: {
71 channelAtUri: AtUri;
72 message: ShardMessage;
73}) => {
74 const sessions = clientSessions
75 .keys()
76 .filter((sessionInfo) =>
77 sessionInfo.allowedChannels.some(
78 (allowedChannel) => allowedChannel.rKey === channelAtUri.rKey,
79 ),
80 );
81
82 const clientSockets = sessions
83 .map((session) => {
84 return clientSessions.get(session);
85 })
86 .filter((e) => e !== undefined);
87
88 clientSockets.forEach((clientSocket) => {
89 clientSocket.send(JSON.stringify(message));
90 console.log(
91 "Sent off message",
92 message,
93 "to clientSocket pointing to",
94 clientSocket.url,
95 );
96 });
97};
98
99export const sendHistoryRequestToShard = ({
100 channelAtUri,
101 message,
102}: {
103 channelAtUri: AtUri;
104 message: RequestHistoryMessage;
105}) => {
106 const shardSessionInfo = shardSessions
107 .keys()
108 .find((sessionInfo) =>
109 sessionInfo.allowedChannels.some(
110 (allowedChannel) => allowedChannel.rKey === channelAtUri.rKey,
111 ),
112 );
113 if (!shardSessionInfo) return;
114
115 const shardSocket = shardSessions.get(shardSessionInfo);
116 if (!shardSocket) {
117 console.error(
118 "Could find session info object in map, but socket could not be retrieved from map. Race condition?",
119 );
120 return;
121 }
122 const messageToSendToShard = {
123 ...message,
124 };
125 if (shardSocket.readyState === WebSocket.OPEN)
126 shardSocket.send(JSON.stringify(messageToSendToShard));
127
128 console.log(
129 "Sent off message",
130 message,
131 "to shard located at",
132 shardSocket.url,
133 );
134};