decentralised sync engine
at main 4.6 kB view raw
1import { 2 createNewSession, 3 deleteSession, 4 issuedLatticeTokens, 5 isValidSession, 6} from "@/lib/sessions"; 7import { 8 requestHistoryMessageSchema, 9 shardMessageSchema, 10} from "@/lib/types/messages"; 11import type { PreHandler, WsRouteHandler } from "@/lib/types/routes"; 12import { stringToAtUri } from "@/lib/utils/atproto"; 13import { 14 sendHistoryRequestToShard, 15 sendToChannelClients, 16 storeMessageInShard, 17} from "@/lib/utils/gmstn"; 18import { 19 rawDataToString, 20 validateWsMessageType, 21} from "@/lib/utils/ws/validate"; 22import { z } from "zod"; 23 24export const connectPreHandler: PreHandler = (req, reply, done) => { 25 const { query } = req; 26 if (!query) return; 27 if (!(typeof query === "object" && "token" in query)) { 28 reply.code(400).send("Provide token in query params"); 29 return; 30 } 31 32 const sessionToken = query.token as string; 33 34 const sessionInfo = issuedLatticeTokens.get(sessionToken); 35 if (!sessionInfo) { 36 reply 37 .code(404) 38 .send( 39 "Session token could not resolve to existing session. retry?", 40 ); 41 return; 42 } 43 44 if (!isValidSession(sessionInfo)) { 45 reply 46 .code(403) 47 .send( 48 "Session token resolved to session, but did not pass verification. this should not happen.", 49 ); 50 return; 51 } 52 53 console.log( 54 "Found session:", 55 sessionInfo.id, 56 "from session token", 57 sessionToken, 58 ); 59 done(); 60}; 61 62export const connectWsHandler: WsRouteHandler = (socket, req) => { 63 const { query } = req; 64 if (!query) return; 65 if (!(typeof query === "object" && "token" in query)) { 66 socket.close(); 67 return; 68 } 69 const sessionToken = query.token as string; 70 71 const sessionInfo = issuedLatticeTokens.get(sessionToken); 72 if (!sessionInfo) { 73 socket.close(); 74 return; 75 } 76 77 const sessionCreateResult = createNewSession({ sessionInfo, socket }); 78 if (!sessionCreateResult.ok) { 79 socket.close(); 80 return; 81 } 82 83 socket.on("message", (rawData) => { 84 const event = rawDataToString(rawData); 85 86 const data: unknown = JSON.parse(event); 87 const validateTypeResult = validateWsMessageType(data); 88 if (!validateTypeResult.ok) return; 89 90 const { type: messageType } = validateTypeResult.data; 91 92 switch (messageType) { 93 case "shard/message": { 94 const { 95 success, 96 error, 97 data: shardMessage, 98 } = shardMessageSchema.safeParse(validateTypeResult.data); 99 if (!success) { 100 console.error( 101 "could not parse", 102 validateTypeResult.data, 103 "as a valid ShardMessage.", 104 ); 105 console.error(z.treeifyError(error)); 106 return; 107 } 108 const { channel } = shardMessage; 109 110 const atUriParseResult = stringToAtUri(channel); 111 if (!atUriParseResult.ok) return; 112 const { data: channelAtUri } = atUriParseResult; 113 114 sendToChannelClients({ channelAtUri, message: shardMessage }); 115 storeMessageInShard({ channelAtUri, message: shardMessage }); 116 break; 117 } 118 case "shard/requestHistory": { 119 const { 120 success, 121 error, 122 data: requestHistoryMessage, 123 } = requestHistoryMessageSchema.safeParse( 124 validateTypeResult.data, 125 ); 126 if (!success) { 127 console.error( 128 "could not parse", 129 validateTypeResult.data, 130 "as a valid history request message.", 131 ); 132 console.error(z.treeifyError(error)); 133 return; 134 } 135 136 const { channel } = requestHistoryMessage; 137 138 const atUriParseResult = stringToAtUri(channel); 139 if (!atUriParseResult.ok) return; 140 const { data: channelAtUri } = atUriParseResult; 141 142 sendHistoryRequestToShard({ 143 channelAtUri, 144 message: requestHistoryMessage, 145 }); 146 } 147 } 148 }); 149 150 socket.on("close", () => { 151 deleteSession(sessionInfo); 152 }); 153};