decentralised sync engine
1import { handshakeTokens } from "@/lib/state";
2import type { AtUri, Did } from "@/lib/types/atproto";
3import { systemsGmstnDevelopmentChannelRecordSchema } from "@/lib/types/lexicon/systems.gmstn.development.channel";
4import {
5 atUriToString,
6 getRecordFromFullAtUri,
7 stringToAtUri,
8} from "@/lib/utils/atproto";
9import { getConstellationBacklink } from "@/lib/utils/constellation";
10import { isDomain } from "@/lib/utils/domains";
11import { connectToShard, getShardEndpointFromDid } from "@/lib/utils/gmstn";
12import { initiateHandshakeTo } from "@/lib/utils/handshake";
13
14export const performHandshakes = async (latticeAtUri: AtUri) => {
15 const latticeAtUriString = atUriToString(latticeAtUri);
16 const constellationBacklinksResult = await getConstellationBacklink({
17 subject: latticeAtUriString,
18 source: {
19 nsid: "systems.gmstn.development.channel",
20 fieldName: "routeThrough.uri",
21 },
22 });
23
24 if (!constellationBacklinksResult.ok) {
25 throw new Error(
26 "Something went wrong fetching constellation backlinks to do Shard handshakes",
27 );
28 }
29
30 const { records: channelBacklinks } = constellationBacklinksResult.data;
31
32 // TODO: For private lattices, do permission check on owner's PDS
33 // and filter out records from unauthorised pdses.
34
35 const channelRecordsPromises = channelBacklinks.map(
36 async ({ did, collection, rkey }) =>
37 await getRecordFromFullAtUri({
38 // @ts-expect-error seriously i gotta do something about the template literals not converting properly SIGH
39 authority: did,
40 collection,
41 rKey: rkey,
42 }),
43 );
44
45 const channelRecordResults = await Promise.all(channelRecordsPromises);
46
47 // mapping of shard -> list of channels (all AtUris)
48 const channelsByShard = new Map<AtUri, Array<AtUri>>();
49
50 channelRecordResults.forEach((result, idx) => {
51 if (!result.ok) return;
52 const { success, data: channelRecord } =
53 systemsGmstnDevelopmentChannelRecordSchema.safeParse(result.data);
54 if (!success) return;
55 const { storeAt } = channelRecord;
56
57 const storeAtAtUriResult = stringToAtUri(storeAt.uri);
58 if (!storeAtAtUriResult.ok) return;
59 const storeAtAtUri = storeAtAtUriResult.data;
60
61 // this is fine because Promise.all() preserves the order of the arrays
62 const {
63 did: authority,
64 collection,
65 rkey: rKey,
66 } = channelBacklinks[idx];
67
68 const existingMapValue = channelsByShard.get(storeAtAtUri);
69
70 const currentChannelUri: Required<AtUri> = {
71 // @ts-expect-error seriously i gotta do something about the template literals not converting properly SIGH
72 authority,
73 collection,
74 rKey,
75 };
76
77 if (!existingMapValue) {
78 channelsByShard.set(storeAtAtUri, [currentChannelUri]);
79 } else {
80 const prevUris = existingMapValue;
81 channelsByShard.set(storeAtAtUri, [...prevUris, currentChannelUri]);
82 }
83 });
84
85 const channelsByShardEntries = channelsByShard.entries();
86
87 for (const entry of channelsByShardEntries) {
88 const shardAtUri = entry[0];
89
90 let shardDid: Did | undefined;
91 // TODO: if the rkey of the shard URI is not a valid domain, then it must be a did:plc
92 // we need to find a better way to enforce this. we really should explore just resolving the
93 // record and then checking the record value for the actual domain instead.
94 // did resolution hard;;
95 if (
96 isDomain(shardAtUri.rKey ?? "") ||
97 shardAtUri.rKey?.startsWith("localhost:")
98 ) {
99 // from the isDomain check, if we pass, we can conclude that
100 shardDid = `did:web:${encodeURIComponent(shardAtUri.rKey ?? "")}`;
101 } else {
102 shardDid = `did:plc:${encodeURIComponent(shardAtUri.rKey ?? "")}`;
103 }
104
105 const channelAtUris = entry[1];
106
107 // FIXME: perf issue. we are awaiting each handshake to resolve before we make new ones
108 // this means that the handshakes are consecutive and not concurrent.
109 // stuff this into a Promise.all by mapping over the array instead
110 const handshakeResult = await initiateHandshakeTo({
111 did: shardDid,
112 channels: channelAtUris,
113 });
114 if (!handshakeResult.ok) {
115 console.error("Handshake to", shardDid, "failed.");
116 console.error(JSON.stringify(handshakeResult.error));
117 continue;
118 }
119 const sessionInfo = handshakeResult.data;
120 console.log("Handshake to", shardAtUri.rKey, "complete!");
121 handshakeTokens.set(shardAtUri, sessionInfo);
122 }
123};
124
125export const connectToShards = async () => {
126 const handshakes = handshakeTokens.entries();
127 const shardConnectionPromises = handshakes
128 .map(async (handshake) => {
129 const atUri = handshake[0];
130 const sessionInfo = handshake[1];
131 const rkey = atUri.rKey ?? "";
132 const shardDid = isDomain(rkey)
133 ? `did:web:${encodeURIComponent(rkey)}`
134 : `did:plc:${rkey}`;
135
136 console.log(shardDid);
137
138 // TODO: again, implement proper did -> endpoint parsing here too.
139 // for now, we just assume did:web and construce a URL based on that.
140 // @ts-expect-error trust me bro it's a string
141 const shardUrlResult = await getShardEndpointFromDid(shardDid);
142
143 if (!shardUrlResult.ok) return;
144
145 return {
146 // TODO: xrpc and lexicon this endpoint
147 shardUrl: `${shardUrlResult.data.origin}/connect`,
148 sessionInfo,
149 };
150 })
151 .toArray();
152
153 const shardConnectionRequests = await Promise.all(shardConnectionPromises);
154
155 return shardConnectionRequests
156 .filter((request) => request !== undefined)
157 .map((request) => connectToShard(request));
158};