decentralised message store

feat: working storage

serenity e7e4d056 1a19bfd9

drizzle/0000_powerful_professor_monster.sql drizzle/0000_cute_maginty.sql
+1 -1
drizzle/meta/0000_snapshot.json
···
{
"version": "6",
"dialect": "sqlite",
-
"id": "0313582b-1157-40b9-a307-168f53b51012",
+
"id": "86ec75bd-9ac5-453e-a223-8c0b5ee135d4",
"prevId": "00000000-0000-0000-0000-000000000000",
"tables": {
"messages": {
+2 -2
drizzle/meta/_journal.json
···
{
"idx": 0,
"version": "6",
-
"when": 1760861619063,
-
"tag": "0000_powerful_professor_monster",
+
"when": 1761759684737,
+
"tag": "0000_cute_maginty",
"breakpoints": true
}
]
+1
package.json
···
"@atcute/client": "^4.0.5",
"@atcute/crypto": "^2.2.5",
"@atcute/identity-resolver": "^1.1.4",
+
"@atcute/tid": "^1.0.3",
"@atcute/xrpc-server": "^0.1.2",
"@fastify/websocket": "^11.2.0",
"@libsql/client": "^0.15.15",
+8
pnpm-lock.yaml
···
'@atcute/identity-resolver':
specifier: ^1.1.4
version: 1.1.4(@atcute/identity@1.1.1)
+
'@atcute/tid':
+
specifier: ^1.0.3
+
version: 1.0.3
'@atcute/xrpc-server':
specifier: ^0.1.2
version: 0.1.2
···
'@atcute/multibase@1.1.6':
resolution: {integrity: sha512-HBxuCgYLKPPxETV0Rot4VP9e24vKl8JdzGCZOVsDaOXJgbRZoRIF67Lp0H/OgnJeH/Xpva8Z5ReoTNJE5dn3kg==}
+
+
'@atcute/tid@1.0.3':
+
resolution: {integrity: sha512-wfMJx1IMdnu0CZgWl0uR4JO2s6PGT1YPhpytD4ZHzEYKKQVuqV6Eb/7vieaVo1eYNMp2FrY67FZObeR7utRl2w==}
'@atcute/uint8array@1.0.5':
resolution: {integrity: sha512-XLWWxoR2HNl2qU+FCr0rp1APwJXci7HnzbOQLxK55OaMNBXZ19+xNC5ii4QCsThsDxa4JS/JTzuiQLziITWf2Q==}
···
'@atcute/multibase@1.1.6':
dependencies:
'@atcute/uint8array': 1.0.5
+
+
'@atcute/tid@1.0.3': {}
'@atcute/uint8array@1.0.5': {}
+1
src/db/schema/messages.ts
···
channelAtUri: text("channel_at_uri"),
authorDid: text("author_did").notNull(),
content: text("content").notNull(),
+
sentAt: integer("created_at", { mode: "timestamp" }).notNull(),
createdAt: integer("created_at", { mode: "timestamp" })
.notNull()
.default(sql`(unixepoch('now'))`),
+48
src/lib/handlers/connect.ts
···
issuedHandshakes,
isValidSession,
} from "@/lib/sessions";
+
import { shardMessageSchema } from "@/lib/types/messages";
import type { PreHandler, WsRouteHandler } from "@/lib/types/routes";
+
import { storeMessageInDb } from "@/lib/utils/gmstn";
+
import {
+
rawDataToString,
+
validateWsMessageType,
+
} from "@/lib/utils/ws/validate";
+
import { z } from "zod";
export const connectPreHandler: PreHandler = (req, reply, done) => {
const { query } = req;
···
socket.close();
return;
}
+
+
socket.on("message", (rawData) => {
+
const event = rawDataToString(rawData);
+
+
const data: unknown = JSON.parse(event);
+
const validateTypeResult = validateWsMessageType(data);
+
if (!validateTypeResult.ok) return;
+
+
const { type: messageType } = validateTypeResult.data;
+
+
switch (messageType) {
+
case "shard/message": {
+
const {
+
success,
+
error,
+
data: shardMessage,
+
} = shardMessageSchema.safeParse(validateTypeResult.data);
+
if (!success) {
+
console.error(
+
"could not parse",
+
validateTypeResult.data,
+
"as a valid ShardMessage.",
+
);
+
console.error(z.treeifyError(error));
+
return;
+
}
+
storeMessageInDb(shardMessage)
+
.then(() => {
+
console.log("stored", shardMessage);
+
})
+
.catch((err: unknown) => {
+
console.error(
+
"something went wrong storing",
+
shardMessage,
+
);
+
console.error(err);
+
});
+
break;
+
}
+
}
+
});
};
+41
src/lib/types/messages.ts
···
+
import { didSchema } from "@/lib/types/atproto";
+
import { z } from "zod";
+
+
export const websocketMessageSchema = z
+
.object({
+
type: z.union([
+
z.literal("shard/message"),
+
z.literal("shard/history"),
+
z.literal("shard/requestHistory"),
+
]),
+
})
+
.loose();
+
export type WebsocketMessage = z.infer<typeof websocketMessageSchema>;
+
+
export const shardMessageSchema = websocketMessageSchema
+
.safeExtend({
+
type: z.literal("shard/message"),
+
channel: z.string(),
+
content: z.string(),
+
sentBy: didSchema,
+
sentAt: z.coerce.date(),
+
})
+
.strict();
+
export type ShardMessage = z.infer<typeof shardMessageSchema>;
+
+
export const historyMessageSchema = websocketMessageSchema
+
.safeExtend({
+
type: z.literal("shard/history"),
+
messages: z.optional(z.array(shardMessageSchema)),
+
channel: z.string(),
+
})
+
.strict();
+
export type HistoryMessage = z.infer<typeof historyMessageSchema>;
+
+
export const requestHistoryMessageSchema = websocketMessageSchema
+
.safeExtend({
+
type: z.literal("shard/requestHistory"),
+
channel: z.string(),
+
})
+
.strict();
+
export type RequestHistoryMessage = z.infer<typeof requestHistoryMessageSchema>;
-40
src/lib/types/ws/index.ts
···
-
import { shardMessagesInsertSchema } from "@/db/schema/messages";
-
import { verificationRequestSchema } from "@/lib/types/ws/verify";
-
import { z } from "zod";
-
-
export const WebSocketMessageType = {
-
SHARD_VERIFY: "shard/verify",
-
SHARD_MESSAGE: "shard/message",
-
} as const;
-
export const webSocketMessageTypeSchema = z.enum(WebSocketMessageType);
-
export type WebSocketMessageType = z.infer<typeof webSocketMessageTypeSchema>;
-
-
const webSocketMessageBase = z.object({
-
type: webSocketMessageTypeSchema,
-
data: z.unknown(),
-
});
-
-
export const verificationMessageSchema = webSocketMessageBase.safeExtend({
-
type: z.literal("shard/verify"),
-
data: verificationRequestSchema,
-
});
-
export type VerificationMessage = z.infer<typeof verificationMessageSchema>;
-
-
// there are only two difficult things in programming.
-
// 1. naming things
-
// 2. cache invalidation
-
// 3. off-by-one errors
-
export const shardMessageMessageSchema = webSocketMessageBase.safeExtend({
-
type: z.literal("shard/message"),
-
data: z.object({
-
message: shardMessagesInsertSchema,
-
sessionId: z.string(),
-
}),
-
});
-
export type ShardMessage = z.infer<typeof shardMessageMessageSchema>;
-
-
export const webSocketMessageSchema = z.union([
-
verificationMessageSchema,
-
shardMessageMessageSchema,
-
]);
-
export type WebSocketMessage = z.infer<typeof webSocketMessageSchema>;
-8
src/lib/types/ws/verify.ts
···
-
import { atUriSchema } from "@/lib/types/atproto";
-
import { z } from "zod";
-
-
export const verificationRequestSchema = z.object({
-
interServiceJwt: z.unknown(),
-
channelAtUris: z.array(atUriSchema),
-
});
-
export type VerificationRequest = z.infer<typeof verificationRequestSchema>;
+32
src/lib/utils/gmstn.ts
···
+
import db from "@/db";
+
import { messagesTable, type ShardMessageInsert } from "@/db/schema/messages";
+
import type { ShardMessage } from "@/lib/types/messages";
+
import * as TID from "@atcute/tid";
+
+
export const storeMessageInDb = async (message: ShardMessage) => {
+
const tid = TID.now();
+
const {
+
content,
+
channel: channelAtUri,
+
sentBy: authorDid,
+
sentAt,
+
} = message;
+
const messageToStore: ShardMessageInsert = {
+
id: tid,
+
authorDid,
+
content,
+
channelAtUri,
+
sentAt,
+
createdAt: new Date(),
+
};
+
+
console.log("sentAt", sentAt.getTime());
+
+
const insertResult = await db.insert(messagesTable).values(messageToStore);
+
+
if (insertResult.rowsAffected > 0) console.log("Stored!");
+
else {
+
console.error("Something went wrong storing", messageToStore);
+
console.error("insertResult:", insertResult);
+
}
+
};
+45
src/lib/utils/ws/validate.ts
···
+
import type { WebsocketMessage } from "@/lib/types/messages";
+
import { websocketMessageSchema } from "@/lib/types/messages";
+
import type { Result } from "@/lib/utils/result";
+
import { z } from "zod";
+
import type { RawData } from "ws";
+
+
export const rawDataToString = (data: RawData): string => {
+
if (Buffer.isBuffer(data)) {
+
return data.toString("utf-8");
+
}
+
if (Array.isArray(data)) {
+
return Buffer.concat(data).toString("utf-8");
+
}
+
return new TextDecoder().decode(data);
+
};
+
+
export const validateWsMessageString = (
+
data: unknown,
+
): Result<string, unknown> => {
+
const { success, error, data: message } = z.string().safeParse(data);
+
if (!success) {
+
console.error("Error decoding websocket message");
+
console.error(error);
+
return { ok: false, error: z.treeifyError(error) };
+
}
+
return { ok: true, data: message };
+
};
+
+
export const validateWsMessageType = (
+
data: unknown,
+
): Result<WebsocketMessage, unknown> => {
+
const {
+
success: wsMessageSuccess,
+
error: wsMessageError,
+
data: wsMessage,
+
} = websocketMessageSchema.safeParse(data);
+
if (!wsMessageSuccess) {
+
console.error(
+
"Error parsing websocket message. The data might be the wrong shape.",
+
);
+
console.error(wsMessageError);
+
return { ok: false, error: z.treeifyError(wsMessageError) };
+
}
+
return { ok: true, data: wsMessage };
+
};