decentralised sync engine
1import { OWNER_DID, SERVER_PORT, SERVICE_DID } from "@/lib/env";
2import { setRegistrationState } from "@/lib/state";
3import type { AtUri, Did } from "@/lib/types/atproto";
4import type { SessionInfo } from "@/lib/types/handshake";
5import { systemsGmstnDevelopmentChannelRecordSchema } from "@/lib/types/lexicon/systems.gmstn.development.channel";
6import { getRecordFromAtUri, stringToAtUri } from "@/lib/utils/atproto";
7import { getConstellationBacklink } from "@/lib/utils/constellation";
8import { isDomain } from "@/lib/utils/domains";
9import { initiateHandshakeTo } from "@/lib/utils/handshake";
10import { newErrorResponse } from "@/lib/utils/http/responses";
11import { connectToPrism } from "@/lib/utils/prism";
12import {
13 attachLatticeRegistrationListener,
14 wrapHttpRegistrationCheck,
15 wrapWsRegistrationCheck,
16} from "@/lib/utils/registration";
17import { routes } from "@/routes";
18import { setupServer } from "@/server";
19
20const main = async () => {
21 let latticeUrlOrigin = decodeURIComponent(
22 SERVICE_DID.startsWith("did:web:") ? SERVICE_DID.slice(8) : "",
23 );
24 if (latticeUrlOrigin === "localhost")
25 latticeUrlOrigin += `:${SERVER_PORT.toString()}`;
26 if (latticeUrlOrigin === "") {
27 // TODO: resolve did:plc endpoint to get the origin of the lattice endpoint described by the did:plc doc
28 // for now we just throw.
29 throw new Error(
30 "did:plc support not yet implemented. Provide a did:web for now. did:plc support will come in the future.",
31 );
32 }
33
34 const latticeRecord = await getRecordFromAtUri({
35 // @ts-expect-error alas, template literal weirdness continues uwu
36 authority: OWNER_DID,
37 collection: "systems.gmstn.development.lattice",
38 rKey: latticeUrlOrigin,
39 });
40
41 if (latticeRecord.ok) setRegistrationState(true);
42
43 const prismWebsocket = connectToPrism({
44 wantedCollections: ["systems.gmstn.development.*"],
45 });
46
47 // TODO: probably move this to an `attachListeners` hook that attaches the listeners we want.
48 attachLatticeRegistrationListener(prismWebsocket);
49
50 const constellationBacklinksResult = await getConstellationBacklink({
51 subject: SERVICE_DID,
52 source: {
53 nsid: "systems.gmstn.development.channel",
54 fieldName: "routeThrough.uri",
55 },
56 });
57
58 if (!constellationBacklinksResult.ok) {
59 throw new Error(
60 "Something went wrong fetching constellation backlinks to do Shard handshakes",
61 );
62 }
63
64 const { records: channelBacklinks } = constellationBacklinksResult.data;
65
66 // TODO: For private lattices, do permission check on owner's PDS
67 // and filter out records from unauthorised pdses.
68
69 const channelRecordsPromises = channelBacklinks.map(
70 async ({ did, collection, rkey }) =>
71 await getRecordFromAtUri({
72 // @ts-expect-error seriously i gotta do something about the template literals not converting properly SIGH
73 authority: did,
74 collection,
75 rKey: rkey,
76 }),
77 );
78
79 const channelRecordResults = await Promise.all(channelRecordsPromises);
80
81 // mapping of shard -> list of channels (all AtUris)
82 const channelsByShard = new Map<AtUri, Array<AtUri>>();
83
84 channelRecordResults.forEach((result, idx) => {
85 if (!result.ok) return;
86 const { success, data: channelRecord } =
87 systemsGmstnDevelopmentChannelRecordSchema.safeParse(result.data);
88 if (!success) return;
89 const { storeAt } = channelRecord;
90
91 const storeAtAtUriResult = stringToAtUri(storeAt.uri);
92 if (!storeAtAtUriResult.ok) return;
93 const storeAtAtUri = storeAtAtUriResult.data;
94
95 // this is fine because Promise.all() preserves the order of the arrays
96 const {
97 did: authority,
98 collection,
99 rkey: rKey,
100 } = channelBacklinks[idx];
101
102 const existingMapValue = channelsByShard.get(storeAtAtUri);
103
104 const currentChannelUri: Required<AtUri> = {
105 // @ts-expect-error seriously i gotta do something about the template literals not converting properly SIGH
106 authority,
107 collection,
108 rKey,
109 };
110
111 if (!existingMapValue) {
112 channelsByShard.set(storeAtAtUri, [currentChannelUri]);
113 } else {
114 const prevUris = existingMapValue;
115 channelsByShard.set(storeAtAtUri, [...prevUris, currentChannelUri]);
116 }
117 });
118
119 const channelSessions = new Map<AtUri, SessionInfo>();
120
121 const channelsByShardEntries = channelsByShard.entries();
122
123 for (const entry of channelsByShardEntries) {
124 const shardAtUri = entry[0];
125
126 let shardDid: Did | undefined;
127 // TODO: if the rkey of the shard URI is not a valid domain, then it must be a did:plc
128 // we need to find a better way to enforce this. we really should explore just resolving the
129 // record and then checking the record value for the actual domain instead.
130 // did resolution hard;;
131 if (
132 isDomain(shardAtUri.rKey ?? "") ||
133 shardAtUri.rKey?.startsWith("localhost:")
134 ) {
135 // from the isDomain check, if we pass, we can conclude that
136 shardDid = `did:web:${encodeURIComponent(shardAtUri.rKey ?? "")}`;
137 } else {
138 shardDid = `did:plc:${encodeURIComponent(shardAtUri.rKey ?? "")}`;
139 }
140
141 const channelAtUris = entry[1];
142
143 const handshakeResult = await initiateHandshakeTo({
144 did: shardDid,
145 channels: channelAtUris,
146 });
147 if (!handshakeResult.ok) return;
148 const sessionInfo = handshakeResult.data;
149 console.log("Handshake to", shardAtUri.rKey, "complete!");
150 console.log("Session info:", sessionInfo);
151 channelSessions.set(shardAtUri, sessionInfo);
152 }
153
154 const server = await setupServer();
155 for (const [url, route] of Object.entries(routes)) {
156 if (!route.wsHandler) {
157 const { handler, method, skipRegistrationCheck } = route;
158 server.route({
159 url,
160 method,
161 handler: skipRegistrationCheck
162 ? handler
163 : wrapHttpRegistrationCheck(handler),
164 });
165 } else {
166 const {
167 wsHandler,
168 method,
169 handler: httpHandler,
170 skipRegistrationCheckHttp,
171 skipRegistrationCheckWs,
172 } = route;
173 const handler =
174 httpHandler ??
175 (() =>
176 newErrorResponse(404, {
177 message:
178 "This is a websocket only route. Did you mean to initiate a websocket connection here?",
179 }));
180 server.route({
181 url,
182 method: method ?? "GET",
183 handler: skipRegistrationCheckHttp
184 ? handler
185 : wrapHttpRegistrationCheck(handler),
186 wsHandler: skipRegistrationCheckWs
187 ? wsHandler
188 : wrapWsRegistrationCheck(wsHandler),
189 });
190 }
191 }
192
193 server.listen({ port: SERVER_PORT }).catch((err: unknown) => {
194 server.log.error(err);
195 process.exit(1);
196 });
197};
198
199main()
200 .then(() => {
201 console.log(`Server is running on port ${SERVER_PORT.toString()}`);
202 })
203 .catch((err: unknown) => {
204 console.error("Something went wrong :(");
205 console.error(
206 "=========================== FULL ERROR BELOW ===========================",
207 );
208 console.error(err);
209 });