decentralised sync engine

feat: client lattice connection

serenity 848b4af9 b949cd1d

Changed files
+117 -17
src
lib
handlers
types
routes
connect
+2
src/index.ts
···
wsHandler,
method,
handler: httpHandler,
+
preHandler,
skipRegistrationCheckHttp,
skipRegistrationCheckWs,
} = route;
···
wsHandler: skipRegistrationCheckWs
? wsHandler
: wrapWsRegistrationCheck(wsHandler),
+
preHandler,
});
}
}
+72
src/lib/handlers/connect.ts
···
+
import {
+
createNewSession,
+
issuedLatticeTokens,
+
isValidSession,
+
} from "@/lib/sessions";
+
import type { PreHandler, WsRouteHandler } from "@/lib/types/routes";
+
import { rawDataToString } from "@/lib/utils/ws";
+
+
export const connectPreHandler: PreHandler = (req, reply, done) => {
+
const { query } = req;
+
if (!query) return;
+
if (!(typeof query === "object" && "token" in query)) {
+
reply.code(400).send("Provide token in query params");
+
return;
+
}
+
+
const sessionToken = query.token as string;
+
+
const sessionInfo = issuedLatticeTokens.get(sessionToken);
+
if (!sessionInfo) {
+
reply
+
.code(404)
+
.send(
+
"Session token could not resolve to existing session. retry?",
+
);
+
return;
+
}
+
+
if (!isValidSession(sessionInfo)) {
+
reply
+
.code(403)
+
.send(
+
"Session token resolved to session, but did not pass verification. this should not happen.",
+
);
+
return;
+
}
+
+
console.log(
+
"Found session:",
+
sessionInfo.id,
+
"from session token",
+
sessionToken,
+
);
+
done();
+
};
+
+
export const connectWsHandler: WsRouteHandler = (socket, req) => {
+
const { query } = req;
+
if (!query) return;
+
if (!(typeof query === "object" && "token" in query)) {
+
socket.close();
+
return;
+
}
+
const sessionToken = query.token as string;
+
+
const sessionInfo = issuedLatticeTokens.get(sessionToken);
+
if (!sessionInfo) {
+
socket.close();
+
return;
+
}
+
+
const sessionCreateResult = createNewSession({ sessionInfo, socket });
+
if (!sessionCreateResult.ok) {
+
socket.close();
+
return;
+
}
+
+
socket.on("message", (event) => {
+
const message = rawDataToString(event);
+
console.log(message)
+
})
+
};
+34 -17
src/lib/types/messages.ts
···
-
import { didPlcSchema } from "@/lib/types/atproto";
+
import { didSchema } from "@/lib/types/atproto";
import { z } from "zod";
-
export const websocketMessageSchema = z.object({
-
type: z.union([z.literal("shard/message"), z.literal("shard/history")]),
-
});
-
+
export const websocketMessageSchema = z
+
.object({
+
type: z.union([
+
z.literal("shard/message"),
+
z.literal("shard/history"),
+
z.literal("shard/requestHistory"),
+
]),
+
})
+
.loose();
export type WebsocketMessage = z.infer<typeof websocketMessageSchema>;
-
export const shardMessageSchema = websocketMessageSchema.extend({
-
type: z.literal("shard/message"),
-
text: z.string(),
-
did: didPlcSchema,
-
timestamp: z.coerce.date(),
-
});
-
+
export const shardMessageSchema = websocketMessageSchema
+
.safeExtend({
+
type: z.literal("shard/message"),
+
channel: z.string(),
+
content: z.string(),
+
sentBy: didSchema,
+
sentAt: z.coerce.date(),
+
})
+
.strict();
export type ShardMessage = z.infer<typeof shardMessageSchema>;
-
export const historyMessageSchema = websocketMessageSchema.extend({
-
type: z.literal("shard/history"),
-
messages: z.optional(z.array(shardMessageSchema)),
-
});
+
export const historyMessageSchema = websocketMessageSchema
+
.safeExtend({
+
type: z.literal("shard/history"),
+
messages: z.optional(z.array(shardMessageSchema)),
+
channel: z.string(),
+
})
+
.strict();
+
export type HistoryMessage = z.infer<typeof historyMessageSchema>;
-
export type HistoryMessage = z.infer<typeof historyMessageSchema>;
+
export const requestHistoryMessageSchema = websocketMessageSchema
+
.safeExtend({
+
type: z.literal("shard/requestHistory"),
+
channel: z.string(),
+
})
+
.strict();
+
export type RequestHistoryMessage = z.infer<typeof requestHistoryMessageSchema>;
+7
src/routes/connect/route.ts
···
+
import { connectPreHandler, connectWsHandler } from "@/lib/handlers/connect";
+
import type { WsRoute } from "@/lib/types/routes";
+
+
export const connectRoute: WsRoute = {
+
wsHandler: connectWsHandler,
+
preHandler: connectPreHandler,
+
};
+2
src/routes/index.ts
···
import type { Route, WsRoute } from "@/lib/types/routes";
+
import { connectRoute } from "@/routes/connect/route";
import { didWebDocRoute } from "@/routes/dot-well-known/did-dot-json/route";
import { handshakeRoute } from "@/routes/handshake/route";
import { indexRoute } from "@/routes/route";
···
"/": indexRoute,
"/.well-known/did.json": didWebDocRoute,
"/handshake": handshakeRoute,
+
"/connect": connectRoute,
};