decentralised sync engine
1import {
2 createNewSession,
3 deleteSession,
4 issuedLatticeTokens,
5 isValidSession,
6} from "@/lib/sessions";
7import {
8 requestHistoryMessageSchema,
9 shardMessageSchema,
10} from "@/lib/types/messages";
11import type { PreHandler, WsRouteHandler } from "@/lib/types/routes";
12import { stringToAtUri } from "@/lib/utils/atproto";
13import {
14 sendHistoryRequestToShard,
15 sendToChannelClients,
16 storeMessageInShard,
17} from "@/lib/utils/gmstn";
18import {
19 rawDataToString,
20 validateWsMessageType,
21} from "@/lib/utils/ws/validate";
22import { z } from "zod";
23
24export const connectPreHandler: PreHandler = (req, reply, done) => {
25 const { query } = req;
26 if (!query) return;
27 if (!(typeof query === "object" && "token" in query)) {
28 reply.code(400).send("Provide token in query params");
29 return;
30 }
31
32 const sessionToken = query.token as string;
33
34 const sessionInfo = issuedLatticeTokens.get(sessionToken);
35 if (!sessionInfo) {
36 reply
37 .code(404)
38 .send(
39 "Session token could not resolve to existing session. retry?",
40 );
41 return;
42 }
43
44 if (!isValidSession(sessionInfo)) {
45 reply
46 .code(403)
47 .send(
48 "Session token resolved to session, but did not pass verification. this should not happen.",
49 );
50 return;
51 }
52
53 console.log(
54 "Found session:",
55 sessionInfo.id,
56 "from session token",
57 sessionToken,
58 );
59 done();
60};
61
62export const connectWsHandler: WsRouteHandler = (socket, req) => {
63 const { query } = req;
64 if (!query) return;
65 if (!(typeof query === "object" && "token" in query)) {
66 socket.close();
67 return;
68 }
69 const sessionToken = query.token as string;
70
71 const sessionInfo = issuedLatticeTokens.get(sessionToken);
72 if (!sessionInfo) {
73 socket.close();
74 return;
75 }
76
77 const sessionCreateResult = createNewSession({ sessionInfo, socket });
78 if (!sessionCreateResult.ok) {
79 socket.close();
80 return;
81 }
82
83 socket.on("message", (rawData) => {
84 const event = rawDataToString(rawData);
85
86 const data: unknown = JSON.parse(event);
87 const validateTypeResult = validateWsMessageType(data);
88 if (!validateTypeResult.ok) return;
89
90 const { type: messageType } = validateTypeResult.data;
91
92 switch (messageType) {
93 case "shard/message": {
94 const {
95 success,
96 error,
97 data: shardMessage,
98 } = shardMessageSchema.safeParse(validateTypeResult.data);
99 if (!success) {
100 console.error(
101 "could not parse",
102 validateTypeResult.data,
103 "as a valid ShardMessage.",
104 );
105 console.error(z.treeifyError(error));
106 return;
107 }
108 const { channel } = shardMessage;
109
110 const atUriParseResult = stringToAtUri(channel);
111 if (!atUriParseResult.ok) return;
112 const { data: channelAtUri } = atUriParseResult;
113
114 sendToChannelClients({ channelAtUri, message: shardMessage });
115 storeMessageInShard({ channelAtUri, message: shardMessage });
116 break;
117 }
118 case "shard/requestHistory": {
119 const {
120 success,
121 error,
122 data: requestHistoryMessage,
123 } = requestHistoryMessageSchema.safeParse(
124 validateTypeResult.data,
125 );
126 if (!success) {
127 console.error(
128 "could not parse",
129 validateTypeResult.data,
130 "as a valid history request message.",
131 );
132 console.error(z.treeifyError(error));
133 return;
134 }
135
136 const { channel } = requestHistoryMessage;
137
138 const atUriParseResult = stringToAtUri(channel);
139 if (!atUriParseResult.ok) return;
140 const { data: channelAtUri } = atUriParseResult;
141
142 sendHistoryRequestToShard({
143 channelAtUri,
144 message: requestHistoryMessage,
145 });
146 }
147 }
148 });
149
150 socket.on("close", () => {
151 deleteSession(sessionInfo);
152 });
153};