decentralised sync engine

feat: websocket sessions yay

serenity 9eda0144 c1b967a8

Changed files
+66 -4
src
+14 -4
src/index.ts
···
import { OWNER_DID, SERVER_PORT, SERVICE_DID } from "@/lib/env";
-
import { performHandshakes } from "@/lib/setup";
+
import { connectToShards, performHandshakes } from "@/lib/setup";
import { handshakeTokens, setRegistrationState } from "@/lib/state";
import type { AtUri } from "@/lib/types/atproto";
import { getRecordFromAtUri } from "@/lib/utils/atproto";
···
});
// TODO: probably move this to an `attachListeners` hook that attaches the listeners we want.
+
// least tested. will probably have nuances we need to work on in the future
attachLatticeRegistrationListener(prismWebsocket);
await performHandshakes(latticeAtUri);
+
+
const test = await connectToShards();
+
console.log("connected to", test.map((socket) => socket.url));
// TODO: change this to the actual WS sessions
const handshakeTokenEntries = handshakeTokens.entries().toArray();
if (handshakeTokenEntries.length === 0) {
-
console.warn("Warning: there are zero handshake tokens on this Lattice.");
-
console.warn("If you're hacking locally, you might want to make sure that there's a running Shard as well.")
+
console.warn(
+
"Warning: there are zero handshake tokens on this Lattice.",
+
);
+
console.warn(
+
"If you're hacking locally, you might want to make sure that there's a running Shard as well.",
+
);
// NOTE: might change in the future
-
console.warn("Channel records connecting a Lattice to a Shard are not supported now. Dev lattices must point to a dev Shard if both are running locally")
+
console.warn(
+
"Channel records connecting a Lattice to a Shard are not supported now. Dev lattices must point to a dev Shard if both are running locally",
+
);
}
};
+39
src/lib/setup.ts
···
} from "@/lib/utils/atproto";
import { getConstellationBacklink } from "@/lib/utils/constellation";
import { isDomain } from "@/lib/utils/domains";
+
import { connectToShard, getShardEndpointFromDid } from "@/lib/utils/gmstn";
import { initiateHandshakeTo } from "@/lib/utils/handshake";
export const performHandshakes = async (latticeAtUri: AtUri) => {
···
const channelAtUris = entry[1];
+
// FIXME: perf issue. we are awaiting each handshake to resolve before we make new ones
+
// this means that the handshakes are consecutive and not concurrent.
+
// stuff this into a Promise.all by mapping over the array instead
const handshakeResult = await initiateHandshakeTo({
did: shardDid,
channels: channelAtUris,
···
handshakeTokens.set(shardAtUri, sessionInfo);
}
};
+
+
export const connectToShards = async () => {
+
const shardSessions = handshakeTokens.entries();
+
const shardConnectionPromises = shardSessions
+
.map(async (session) => {
+
const atUri = session[0];
+
const { token } = session[1];
+
const rkey = atUri.rKey ?? "";
+
const shardDid = isDomain(rkey)
+
? `did:web:${encodeURIComponent(rkey)}`
+
: `did:plc:${rkey}`;
+
+
console.log(shardDid);
+
+
// TODO: again, implement proper did -> endpoint parsing here too.
+
// for now, we just assume did:web and construce a URL based on that.
+
// @ts-expect-error trust me bro it's a string
+
const shardUrlResult = await getShardEndpointFromDid(shardDid);
+
+
if (!shardUrlResult.ok) return;
+
+
return {
+
// TODO: xrpc and lexicon this endpoint
+
shardUrl: `${shardUrlResult.data.origin}/connect`,
+
sessionToken: token,
+
};
+
})
+
.toArray();
+
+
const shardConnectionRequests = await Promise.all(shardConnectionPromises);
+
+
return shardConnectionRequests
+
.filter((request) => request !== undefined)
+
.map((request) => connectToShard(request));
+
};
+13
src/lib/utils/gmstn.ts
···
import type { Did } from "@/lib/types/atproto";
import { getEndpointFromDid } from "@/lib/utils/atproto";
+
import WebSocket from "ws";
export const getShardEndpointFromDid = async (did: Did) => {
return await getEndpointFromDid(did, "GemstoneShard");
};
+
+
export const connectToShard = ({
+
shardUrl,
+
sessionToken,
+
}: {
+
shardUrl: string;
+
sessionToken: string;
+
}) => {
+
const endpoint = new URL(shardUrl);
+
endpoint.searchParams.append("token", sessionToken);
+
return new WebSocket(endpoint);
+
};