decentralised sync engine
1import { OWNER_DID, SERVER_PORT, SERVICE_DID } from "@/lib/env";
2import { setRegistrationState } from "@/lib/state";
3import type { AtUri, Did } from "@/lib/types/atproto";
4import { systemsGmstnDevelopmentChannelRecordSchema } from "@/lib/types/lexicon/systems.gmstn.development.channel";
5import { getRecordFromAtUri, stringToAtUri } from "@/lib/utils/atproto";
6import { getConstellationBacklink } from "@/lib/utils/constellation";
7import { newErrorResponse } from "@/lib/utils/http/responses";
8import { connectToPrism } from "@/lib/utils/prism";
9import {
10 attachLatticeRegistrationListener,
11 wrapHttpRegistrationCheck,
12 wrapWsRegistrationCheck,
13} from "@/lib/utils/registration";
14import { routes } from "@/routes";
15import { setupServer } from "@/server";
16
17const main = async () => {
18 let latticeUrlOrigin = decodeURIComponent(
19 SERVICE_DID.startsWith("did:web:") ? SERVICE_DID.slice(8) : "",
20 );
21 if (latticeUrlOrigin === "localhost")
22 latticeUrlOrigin += `:${SERVER_PORT.toString()}`;
23 if (latticeUrlOrigin === "") {
24 // TODO: resolve did:plc endpoint to get the origin of the lattice endpoint described by the did:plc doc
25 // for now we just throw.
26 throw new Error(
27 "did:plc support not yet implemented. Provide a did:web for now. did:plc support will come in the future.",
28 );
29 }
30
31 const latticeRecord = await getRecordFromAtUri({
32 // @ts-expect-error alas, template literal weirdness continues uwu
33 authority: OWNER_DID,
34 collection: "systems.gmstn.development.lattice",
35 rKey: latticeUrlOrigin,
36 });
37
38 if (latticeRecord.ok) setRegistrationState(true);
39
40 const prismWebsocket = connectToPrism({
41 wantedCollections: ["systems.gmstn.development.*"],
42 });
43
44 // TODO: probably move this to an `attachListeners` hook that attaches the listeners we want.
45 attachLatticeRegistrationListener(prismWebsocket);
46
47 const constellationBacklinksResult = await getConstellationBacklink({
48 subject: SERVICE_DID,
49 source: {
50 nsid: "systems.gmstn.development.channel",
51 fieldName: "routeThrough.uri",
52 },
53 });
54
55 if (!constellationBacklinksResult.ok) {
56 throw new Error(
57 "Something went wrong fetching constellation backlinks to do Shard handshakes",
58 );
59 }
60
61 const { records: channelBacklinks } = constellationBacklinksResult.data;
62
63 // TODO: For private lattices, do permission check on owner's PDS
64 // and filter out records from unauthorised pdses.
65
66 const channelRecordsPromises = channelBacklinks.map(
67 async ({ did, collection, rkey }) =>
68 await getRecordFromAtUri({
69 // @ts-expect-error seriously i gotta do something about the template literals not converting properly SIGH
70 authority: did,
71 collection,
72 rKey: rkey,
73 }),
74 );
75
76 const channelRecordResults = await Promise.all(channelRecordsPromises);
77
78 // mapping of shard -> list of channels (all AtUris)
79 const channelsByShard = new Map<AtUri, Array<AtUri>>();
80
81 channelRecordResults.forEach((result, idx) => {
82 if (!result.ok) return;
83 const { success, data: channelRecord } =
84 systemsGmstnDevelopmentChannelRecordSchema.safeParse(result.data);
85 if (!success) return;
86 const { storeAt } = channelRecord;
87
88 const storeAtAtUriResult = stringToAtUri(storeAt.uri);
89 if (!storeAtAtUriResult.ok) return;
90 const storeAtAtUri = storeAtAtUriResult.data;
91
92 // this is fine because Promise.all() preserves the order of the arrays
93 const {
94 did: authority,
95 collection,
96 rkey: rKey,
97 } = channelBacklinks[idx];
98
99 const existingMapValue = channelsByShard.get(storeAtAtUri);
100
101 const currentChannelUri: Required<AtUri> = {
102 // @ts-expect-error seriously i gotta do something about the template literals not converting properly SIGH
103 authority,
104 collection,
105 rKey,
106 };
107
108 if (!existingMapValue) {
109 channelsByShard.set(storeAtAtUri, [currentChannelUri]);
110 } else {
111 const prevUris = existingMapValue;
112 channelsByShard.set(storeAtAtUri, [...prevUris, currentChannelUri]);
113 }
114 });
115
116 const server = await setupServer();
117 for (const [url, route] of Object.entries(routes)) {
118 if (!route.wsHandler) {
119 const { handler, method, skipRegistrationCheck } = route;
120 server.route({
121 url,
122 method,
123 handler: skipRegistrationCheck
124 ? handler
125 : wrapHttpRegistrationCheck(handler),
126 });
127 } else {
128 const {
129 wsHandler,
130 method,
131 handler: httpHandler,
132 skipRegistrationCheckHttp,
133 skipRegistrationCheckWs,
134 } = route;
135 const handler =
136 httpHandler ??
137 (() =>
138 newErrorResponse(404, {
139 message:
140 "This is a websocket only route. Did you mean to initiate a websocket connection here?",
141 }));
142 server.route({
143 url,
144 method: method ?? "GET",
145 handler: skipRegistrationCheckHttp
146 ? handler
147 : wrapHttpRegistrationCheck(handler),
148 wsHandler: skipRegistrationCheckWs
149 ? wsHandler
150 : wrapWsRegistrationCheck(wsHandler),
151 });
152 }
153 }
154
155 server.listen({ port: SERVER_PORT }).catch((err: unknown) => {
156 server.log.error(err);
157 process.exit(1);
158 });
159};
160
161main()
162 .then(() => {
163 console.log(`Server is running on port ${SERVER_PORT.toString()}`);
164 })
165 .catch((err: unknown) => {
166 console.error("Something went wrong :(");
167 console.error(
168 "=========================== FULL ERROR BELOW ===========================",
169 );
170 console.error(err);
171 });