decentralised sync engine
1import { OWNER_DID, SERVER_PORT, SERVICE_DID } from "@/lib/env"; 2import { setRegistrationState } from "@/lib/state"; 3import type { AtUri, Did } from "@/lib/types/atproto"; 4import type { SessionInfo } from "@/lib/types/handshake"; 5import { systemsGmstnDevelopmentChannelRecordSchema } from "@/lib/types/lexicon/systems.gmstn.development.channel"; 6import { getRecordFromAtUri, stringToAtUri } from "@/lib/utils/atproto"; 7import { getConstellationBacklink } from "@/lib/utils/constellation"; 8import { isDomain } from "@/lib/utils/domains"; 9import { initiateHandshakeTo } from "@/lib/utils/handshake"; 10import { newErrorResponse } from "@/lib/utils/http/responses"; 11import { connectToPrism } from "@/lib/utils/prism"; 12import { 13 attachLatticeRegistrationListener, 14 wrapHttpRegistrationCheck, 15 wrapWsRegistrationCheck, 16} from "@/lib/utils/registration"; 17import { routes } from "@/routes"; 18import { setupServer } from "@/server"; 19 20const main = async () => { 21 let latticeUrlOrigin = decodeURIComponent( 22 SERVICE_DID.startsWith("did:web:") ? SERVICE_DID.slice(8) : "", 23 ); 24 if (latticeUrlOrigin === "localhost") 25 latticeUrlOrigin += `:${SERVER_PORT.toString()}`; 26 if (latticeUrlOrigin === "") { 27 // TODO: resolve did:plc endpoint to get the origin of the lattice endpoint described by the did:plc doc 28 // for now we just throw. 29 throw new Error( 30 "did:plc support not yet implemented. Provide a did:web for now. did:plc support will come in the future.", 31 ); 32 } 33 34 const latticeRecord = await getRecordFromAtUri({ 35 // @ts-expect-error alas, template literal weirdness continues uwu 36 authority: OWNER_DID, 37 collection: "systems.gmstn.development.lattice", 38 rKey: latticeUrlOrigin, 39 }); 40 41 if (latticeRecord.ok) setRegistrationState(true); 42 43 const prismWebsocket = connectToPrism({ 44 wantedCollections: ["systems.gmstn.development.*"], 45 }); 46 47 // TODO: probably move this to an `attachListeners` hook that attaches the listeners we want. 48 attachLatticeRegistrationListener(prismWebsocket); 49 50 const constellationBacklinksResult = await getConstellationBacklink({ 51 subject: SERVICE_DID, 52 source: { 53 nsid: "systems.gmstn.development.channel", 54 fieldName: "routeThrough.uri", 55 }, 56 }); 57 58 if (!constellationBacklinksResult.ok) { 59 throw new Error( 60 "Something went wrong fetching constellation backlinks to do Shard handshakes", 61 ); 62 } 63 64 const { records: channelBacklinks } = constellationBacklinksResult.data; 65 66 // TODO: For private lattices, do permission check on owner's PDS 67 // and filter out records from unauthorised pdses. 68 69 const channelRecordsPromises = channelBacklinks.map( 70 async ({ did, collection, rkey }) => 71 await getRecordFromAtUri({ 72 // @ts-expect-error seriously i gotta do something about the template literals not converting properly SIGH 73 authority: did, 74 collection, 75 rKey: rkey, 76 }), 77 ); 78 79 const channelRecordResults = await Promise.all(channelRecordsPromises); 80 81 // mapping of shard -> list of channels (all AtUris) 82 const channelsByShard = new Map<AtUri, Array<AtUri>>(); 83 84 channelRecordResults.forEach((result, idx) => { 85 if (!result.ok) return; 86 const { success, data: channelRecord } = 87 systemsGmstnDevelopmentChannelRecordSchema.safeParse(result.data); 88 if (!success) return; 89 const { storeAt } = channelRecord; 90 91 const storeAtAtUriResult = stringToAtUri(storeAt.uri); 92 if (!storeAtAtUriResult.ok) return; 93 const storeAtAtUri = storeAtAtUriResult.data; 94 95 // this is fine because Promise.all() preserves the order of the arrays 96 const { 97 did: authority, 98 collection, 99 rkey: rKey, 100 } = channelBacklinks[idx]; 101 102 const existingMapValue = channelsByShard.get(storeAtAtUri); 103 104 const currentChannelUri: Required<AtUri> = { 105 // @ts-expect-error seriously i gotta do something about the template literals not converting properly SIGH 106 authority, 107 collection, 108 rKey, 109 }; 110 111 if (!existingMapValue) { 112 channelsByShard.set(storeAtAtUri, [currentChannelUri]); 113 } else { 114 const prevUris = existingMapValue; 115 channelsByShard.set(storeAtAtUri, [...prevUris, currentChannelUri]); 116 } 117 }); 118 119 const channelSessions = new Map<AtUri, SessionInfo>(); 120 121 const channelsByShardEntries = channelsByShard.entries(); 122 123 for (const entry of channelsByShardEntries) { 124 const shardAtUri = entry[0]; 125 126 let shardDid: Did | undefined; 127 // TODO: if the rkey of the shard URI is not a valid domain, then it must be a did:plc 128 // we need to find a better way to enforce this. we really should explore just resolving the 129 // record and then checking the record value for the actual domain instead. 130 // did resolution hard;; 131 if ( 132 isDomain(shardAtUri.rKey ?? "") || 133 shardAtUri.rKey?.startsWith("localhost:") 134 ) { 135 // from the isDomain check, if we pass, we can conclude that 136 shardDid = `did:web:${encodeURIComponent(shardAtUri.rKey ?? "")}`; 137 } else { 138 shardDid = `did:plc:${encodeURIComponent(shardAtUri.rKey ?? "")}`; 139 } 140 141 const channelAtUris = entry[1]; 142 143 const handshakeResult = await initiateHandshakeTo({ 144 did: shardDid, 145 channels: channelAtUris, 146 }); 147 if (!handshakeResult.ok) return; 148 const sessionInfo = handshakeResult.data; 149 console.log("Handshake to", shardAtUri.rKey, "complete!"); 150 console.log("Session info:", sessionInfo); 151 channelSessions.set(shardAtUri, sessionInfo); 152 } 153 154 const server = await setupServer(); 155 for (const [url, route] of Object.entries(routes)) { 156 if (!route.wsHandler) { 157 const { handler, method, skipRegistrationCheck } = route; 158 server.route({ 159 url, 160 method, 161 handler: skipRegistrationCheck 162 ? handler 163 : wrapHttpRegistrationCheck(handler), 164 }); 165 } else { 166 const { 167 wsHandler, 168 method, 169 handler: httpHandler, 170 skipRegistrationCheckHttp, 171 skipRegistrationCheckWs, 172 } = route; 173 const handler = 174 httpHandler ?? 175 (() => 176 newErrorResponse(404, { 177 message: 178 "This is a websocket only route. Did you mean to initiate a websocket connection here?", 179 })); 180 server.route({ 181 url, 182 method: method ?? "GET", 183 handler: skipRegistrationCheckHttp 184 ? handler 185 : wrapHttpRegistrationCheck(handler), 186 wsHandler: skipRegistrationCheckWs 187 ? wsHandler 188 : wrapWsRegistrationCheck(wsHandler), 189 }); 190 } 191 } 192 193 server.listen({ port: SERVER_PORT }).catch((err: unknown) => { 194 server.log.error(err); 195 process.exit(1); 196 }); 197}; 198 199main() 200 .then(() => { 201 console.log(`Server is running on port ${SERVER_PORT.toString()}`); 202 }) 203 .catch((err: unknown) => { 204 console.error("Something went wrong :("); 205 console.error( 206 "=========================== FULL ERROR BELOW ===========================", 207 ); 208 console.error(err); 209 });