decentralised sync engine
at main 6.1 kB view raw
1import { handshakeTokens } from "@/lib/state"; 2import type { AtUri, Did } from "@/lib/types/atproto"; 3import { systemsGmstnDevelopmentChannelRecordSchema } from "@/lib/types/lexicon/systems.gmstn.development.channel"; 4import { 5 atUriToString, 6 getRecordFromFullAtUri, 7 stringToAtUri, 8} from "@/lib/utils/atproto"; 9import { getConstellationBacklink } from "@/lib/utils/constellation"; 10import { isDomain } from "@/lib/utils/domains"; 11import { connectToShard, getShardEndpointFromDid } from "@/lib/utils/gmstn"; 12import { initiateHandshakeTo } from "@/lib/utils/handshake"; 13 14export const performHandshakes = async (latticeAtUri: AtUri) => { 15 const latticeAtUriString = atUriToString(latticeAtUri); 16 const constellationBacklinksResult = await getConstellationBacklink({ 17 subject: latticeAtUriString, 18 source: { 19 nsid: "systems.gmstn.development.channel", 20 fieldName: "routeThrough.uri", 21 }, 22 }); 23 24 if (!constellationBacklinksResult.ok) { 25 throw new Error( 26 "Something went wrong fetching constellation backlinks to do Shard handshakes", 27 ); 28 } 29 30 const { records: channelBacklinks } = constellationBacklinksResult.data; 31 32 // TODO: For private lattices, do permission check on owner's PDS 33 // and filter out records from unauthorised pdses. 34 35 const channelRecordsPromises = channelBacklinks.map( 36 async ({ did, collection, rkey }) => 37 await getRecordFromFullAtUri({ 38 // @ts-expect-error seriously i gotta do something about the template literals not converting properly SIGH 39 authority: did, 40 collection, 41 rKey: rkey, 42 }), 43 ); 44 45 const channelRecordResults = await Promise.all(channelRecordsPromises); 46 47 // mapping of shard -> list of channels (all AtUris) 48 const channelsByShard = new Map<AtUri, Array<AtUri>>(); 49 50 channelRecordResults.forEach((result, idx) => { 51 if (!result.ok) return; 52 const { success, data: channelRecord } = 53 systemsGmstnDevelopmentChannelRecordSchema.safeParse(result.data); 54 if (!success) return; 55 const { storeAt } = channelRecord; 56 57 const storeAtAtUriResult = stringToAtUri(storeAt.uri); 58 if (!storeAtAtUriResult.ok) return; 59 const storeAtAtUri = storeAtAtUriResult.data; 60 61 // this is fine because Promise.all() preserves the order of the arrays 62 const { 63 did: authority, 64 collection, 65 rkey: rKey, 66 } = channelBacklinks[idx]; 67 68 const existingMapValue = channelsByShard.get(storeAtAtUri); 69 70 const currentChannelUri: Required<AtUri> = { 71 // @ts-expect-error seriously i gotta do something about the template literals not converting properly SIGH 72 authority, 73 collection, 74 rKey, 75 }; 76 77 if (!existingMapValue) { 78 channelsByShard.set(storeAtAtUri, [currentChannelUri]); 79 } else { 80 const prevUris = existingMapValue; 81 channelsByShard.set(storeAtAtUri, [...prevUris, currentChannelUri]); 82 } 83 }); 84 85 const channelsByShardEntries = channelsByShard.entries(); 86 87 for (const entry of channelsByShardEntries) { 88 const shardAtUri = entry[0]; 89 90 let shardDid: Did | undefined; 91 // TODO: if the rkey of the shard URI is not a valid domain, then it must be a did:plc 92 // we need to find a better way to enforce this. we really should explore just resolving the 93 // record and then checking the record value for the actual domain instead. 94 // did resolution hard;; 95 if ( 96 isDomain(shardAtUri.rKey ?? "") || 97 shardAtUri.rKey?.startsWith("localhost:") 98 ) { 99 // from the isDomain check, if we pass, we can conclude that 100 shardDid = `did:web:${encodeURIComponent(shardAtUri.rKey ?? "")}`; 101 } else { 102 shardDid = `did:plc:${encodeURIComponent(shardAtUri.rKey ?? "")}`; 103 } 104 105 const channelAtUris = entry[1]; 106 107 // FIXME: perf issue. we are awaiting each handshake to resolve before we make new ones 108 // this means that the handshakes are consecutive and not concurrent. 109 // stuff this into a Promise.all by mapping over the array instead 110 const handshakeResult = await initiateHandshakeTo({ 111 did: shardDid, 112 channels: channelAtUris, 113 }); 114 if (!handshakeResult.ok) { 115 console.error("Handshake to", shardDid, "failed."); 116 console.error(JSON.stringify(handshakeResult.error)); 117 continue; 118 } 119 const sessionInfo = handshakeResult.data; 120 console.log("Handshake to", shardAtUri.rKey, "complete!"); 121 handshakeTokens.set(shardAtUri, sessionInfo); 122 } 123}; 124 125export const connectToShards = async () => { 126 const handshakes = handshakeTokens.entries(); 127 const shardConnectionPromises = handshakes 128 .map(async (handshake) => { 129 const atUri = handshake[0]; 130 const sessionInfo = handshake[1]; 131 const rkey = atUri.rKey ?? ""; 132 const shardDid = isDomain(rkey) 133 ? `did:web:${encodeURIComponent(rkey)}` 134 : `did:plc:${rkey}`; 135 136 console.log(shardDid); 137 138 // TODO: again, implement proper did -> endpoint parsing here too. 139 // for now, we just assume did:web and construce a URL based on that. 140 // @ts-expect-error trust me bro it's a string 141 const shardUrlResult = await getShardEndpointFromDid(shardDid); 142 143 if (!shardUrlResult.ok) return; 144 145 return { 146 // TODO: xrpc and lexicon this endpoint 147 shardUrl: `${shardUrlResult.data.origin}/connect`, 148 sessionInfo, 149 }; 150 }) 151 .toArray(); 152 153 const shardConnectionRequests = await Promise.all(shardConnectionPromises); 154 155 return shardConnectionRequests 156 .filter((request) => request !== undefined) 157 .map((request) => connectToShard(request)); 158};