decentralised sync engine
1import { OWNER_DID, SERVER_PORT, SERVICE_DID } from "@/lib/env"; 2import { setRegistrationState } from "@/lib/state"; 3import type { AtUri, Did } from "@/lib/types/atproto"; 4import { systemsGmstnDevelopmentChannelRecordSchema } from "@/lib/types/lexicon/systems.gmstn.development.channel"; 5import { getRecordFromAtUri, stringToAtUri } from "@/lib/utils/atproto"; 6import { getConstellationBacklink } from "@/lib/utils/constellation"; 7import { newErrorResponse } from "@/lib/utils/http/responses"; 8import { connectToPrism } from "@/lib/utils/prism"; 9import { 10 attachLatticeRegistrationListener, 11 wrapHttpRegistrationCheck, 12 wrapWsRegistrationCheck, 13} from "@/lib/utils/registration"; 14import { routes } from "@/routes"; 15import { setupServer } from "@/server"; 16 17const main = async () => { 18 let latticeUrlOrigin = decodeURIComponent( 19 SERVICE_DID.startsWith("did:web:") ? SERVICE_DID.slice(8) : "", 20 ); 21 if (latticeUrlOrigin === "localhost") 22 latticeUrlOrigin += `:${SERVER_PORT.toString()}`; 23 if (latticeUrlOrigin === "") { 24 // TODO: resolve did:plc endpoint to get the origin of the lattice endpoint described by the did:plc doc 25 // for now we just throw. 26 throw new Error( 27 "did:plc support not yet implemented. Provide a did:web for now. did:plc support will come in the future.", 28 ); 29 } 30 31 const latticeRecord = await getRecordFromAtUri({ 32 // @ts-expect-error alas, template literal weirdness continues uwu 33 authority: OWNER_DID, 34 collection: "systems.gmstn.development.lattice", 35 rKey: latticeUrlOrigin, 36 }); 37 38 if (latticeRecord.ok) setRegistrationState(true); 39 40 const prismWebsocket = connectToPrism({ 41 wantedCollections: ["systems.gmstn.development.*"], 42 }); 43 44 // TODO: probably move this to an `attachListeners` hook that attaches the listeners we want. 45 attachLatticeRegistrationListener(prismWebsocket); 46 47 const constellationBacklinksResult = await getConstellationBacklink({ 48 subject: SERVICE_DID, 49 source: { 50 nsid: "systems.gmstn.development.channel", 51 fieldName: "routeThrough.uri", 52 }, 53 }); 54 55 if (!constellationBacklinksResult.ok) { 56 throw new Error( 57 "Something went wrong fetching constellation backlinks to do Shard handshakes", 58 ); 59 } 60 61 const { records: channelBacklinks } = constellationBacklinksResult.data; 62 63 // TODO: For private lattices, do permission check on owner's PDS 64 // and filter out records from unauthorised pdses. 65 66 const channelRecordsPromises = channelBacklinks.map( 67 async ({ did, collection, rkey }) => 68 await getRecordFromAtUri({ 69 // @ts-expect-error seriously i gotta do something about the template literals not converting properly SIGH 70 authority: did, 71 collection, 72 rKey: rkey, 73 }), 74 ); 75 76 const channelRecordResults = await Promise.all(channelRecordsPromises); 77 78 // mapping of shard -> list of channels (all AtUris) 79 const channelsByShard = new Map<AtUri, Array<AtUri>>(); 80 81 channelRecordResults.forEach((result, idx) => { 82 if (!result.ok) return; 83 const { success, data: channelRecord } = 84 systemsGmstnDevelopmentChannelRecordSchema.safeParse(result.data); 85 if (!success) return; 86 const { storeAt } = channelRecord; 87 88 const storeAtAtUriResult = stringToAtUri(storeAt.uri); 89 if (!storeAtAtUriResult.ok) return; 90 const storeAtAtUri = storeAtAtUriResult.data; 91 92 // this is fine because Promise.all() preserves the order of the arrays 93 const { 94 did: authority, 95 collection, 96 rkey: rKey, 97 } = channelBacklinks[idx]; 98 99 const existingMapValue = channelsByShard.get(storeAtAtUri); 100 101 const currentChannelUri: Required<AtUri> = { 102 // @ts-expect-error seriously i gotta do something about the template literals not converting properly SIGH 103 authority, 104 collection, 105 rKey, 106 }; 107 108 if (!existingMapValue) { 109 channelsByShard.set(storeAtAtUri, [currentChannelUri]); 110 } else { 111 const prevUris = existingMapValue; 112 channelsByShard.set(storeAtAtUri, [...prevUris, currentChannelUri]); 113 } 114 }); 115 116 const server = await setupServer(); 117 for (const [url, route] of Object.entries(routes)) { 118 if (!route.wsHandler) { 119 const { handler, method, skipRegistrationCheck } = route; 120 server.route({ 121 url, 122 method, 123 handler: skipRegistrationCheck 124 ? handler 125 : wrapHttpRegistrationCheck(handler), 126 }); 127 } else { 128 const { 129 wsHandler, 130 method, 131 handler: httpHandler, 132 skipRegistrationCheckHttp, 133 skipRegistrationCheckWs, 134 } = route; 135 const handler = 136 httpHandler ?? 137 (() => 138 newErrorResponse(404, { 139 message: 140 "This is a websocket only route. Did you mean to initiate a websocket connection here?", 141 })); 142 server.route({ 143 url, 144 method: method ?? "GET", 145 handler: skipRegistrationCheckHttp 146 ? handler 147 : wrapHttpRegistrationCheck(handler), 148 wsHandler: skipRegistrationCheckWs 149 ? wsHandler 150 : wrapWsRegistrationCheck(wsHandler), 151 }); 152 } 153 } 154 155 server.listen({ port: SERVER_PORT }).catch((err: unknown) => { 156 server.log.error(err); 157 process.exit(1); 158 }); 159}; 160 161main() 162 .then(() => { 163 console.log(`Server is running on port ${SERVER_PORT.toString()}`); 164 }) 165 .catch((err: unknown) => { 166 console.error("Something went wrong :("); 167 console.error( 168 "=========================== FULL ERROR BELOW ===========================", 169 ); 170 console.error(err); 171 });