decentralised sync engine
at main 3.7 kB view raw
1import { attachHistoryFromShardListener } from "@/lib/listeners/shard-history"; 2import { clientSessions } from "@/lib/sessions"; 3import { shardSessions } from "@/lib/state"; 4import type { AtUri, Did } from "@/lib/types/atproto"; 5import type { ShardSessionInfo } from "@/lib/types/handshake"; 6import type { RequestHistoryMessage, ShardMessage } from "@/lib/types/messages"; 7import { getEndpointFromDid } from "@/lib/utils/atproto"; 8import WebSocket from "ws"; 9 10export const getShardEndpointFromDid = async (did: Did) => { 11 return await getEndpointFromDid(did, "GemstoneShard"); 12}; 13 14export const connectToShard = ({ 15 shardUrl, 16 sessionInfo, 17}: { 18 shardUrl: string; 19 sessionInfo: ShardSessionInfo; 20}) => { 21 const endpoint = new URL(shardUrl); 22 const { token } = sessionInfo; 23 endpoint.searchParams.append("token", token); 24 const ws = new WebSocket(endpoint); 25 shardSessions.set(sessionInfo, ws); 26 attachHistoryFromShardListener(ws); 27 return ws; 28}; 29 30export const storeMessageInShard = ({ 31 channelAtUri, 32 message, 33}: { 34 channelAtUri: AtUri; 35 message: ShardMessage; 36}) => { 37 const shardSessionInfo = shardSessions 38 .keys() 39 .find((sessionInfo) => 40 sessionInfo.allowedChannels.some( 41 (allowedChannel) => allowedChannel.rKey === channelAtUri.rKey, 42 ), 43 ); 44 if (!shardSessionInfo) return; 45 46 const shardSocket = shardSessions.get(shardSessionInfo); 47 if (!shardSocket) { 48 console.error( 49 "Could find session info object in map, but socket could not be retrieved from map. Race condition?", 50 ); 51 return; 52 } 53 const messageToSendToShard = { 54 ...message, 55 }; 56 if (shardSocket.readyState === WebSocket.OPEN) 57 shardSocket.send(JSON.stringify(messageToSendToShard)); 58 59 console.log( 60 "Sent off message", 61 message, 62 "to shard located at", 63 shardSocket.url, 64 ); 65}; 66 67export const sendToChannelClients = ({ 68 channelAtUri, 69 message, 70}: { 71 channelAtUri: AtUri; 72 message: ShardMessage; 73}) => { 74 const sessions = clientSessions 75 .keys() 76 .filter((sessionInfo) => 77 sessionInfo.allowedChannels.some( 78 (allowedChannel) => allowedChannel.rKey === channelAtUri.rKey, 79 ), 80 ); 81 82 const clientSockets = sessions 83 .map((session) => { 84 return clientSessions.get(session); 85 }) 86 .filter((e) => e !== undefined); 87 88 clientSockets.forEach((clientSocket) => { 89 clientSocket.send(JSON.stringify(message)); 90 console.log( 91 "Sent off message", 92 message, 93 "to clientSocket pointing to", 94 clientSocket.url, 95 ); 96 }); 97}; 98 99export const sendHistoryRequestToShard = ({ 100 channelAtUri, 101 message, 102}: { 103 channelAtUri: AtUri; 104 message: RequestHistoryMessage; 105}) => { 106 const shardSessionInfo = shardSessions 107 .keys() 108 .find((sessionInfo) => 109 sessionInfo.allowedChannels.some( 110 (allowedChannel) => allowedChannel.rKey === channelAtUri.rKey, 111 ), 112 ); 113 if (!shardSessionInfo) return; 114 115 const shardSocket = shardSessions.get(shardSessionInfo); 116 if (!shardSocket) { 117 console.error( 118 "Could find session info object in map, but socket could not be retrieved from map. Race condition?", 119 ); 120 return; 121 } 122 const messageToSendToShard = { 123 ...message, 124 }; 125 if (shardSocket.readyState === WebSocket.OPEN) 126 shardSocket.send(JSON.stringify(messageToSendToShard)); 127 128 console.log( 129 "Sent off message", 130 message, 131 "to shard located at", 132 shardSocket.url, 133 ); 134};