decentralised message store

feat: restore history from shard and close session if dropped

serenity fffcd95b 24f66858

Changed files
+89 -5
src
lib
handlers
types
utils
+68 -5
src/lib/handlers/connect.ts
···
import {
createNewSession,
+
deleteSession,
issuedHandshakes,
isValidSession,
} from "@/lib/sessions";
-
import { shardMessageSchema } from "@/lib/types/messages";
+
import type { Did } from "@/lib/types/atproto";
+
import type { HistoryMessage } from "@/lib/types/messages";
+
import {
+
requestHistoryMessageSchema,
+
shardMessageSchema,
+
} from "@/lib/types/messages";
import type { PreHandler, WsRouteHandler } from "@/lib/types/routes";
import { atUriToString } from "@/lib/utils/atproto";
-
import { storeMessageInDb } from "@/lib/utils/gmstn";
+
import { getChannelHistory, storeMessageInDb } from "@/lib/utils/gmstn";
import {
rawDataToString,
validateWsMessageType,
···
}
// convert at uri objects array to set of at uri strings for easier lookup.
-
const socketAllowedChannels = new Set(sessionInfo.allowedChannels.map(channel => atUriToString(channel)));
+
const socketAllowedChannels = new Set(
+
sessionInfo.allowedChannels.map((channel) => atUriToString(channel)),
+
);
+
+
const socketLatticeDid = sessionInfo.latticeDid;
socket.on("message", (rawData) => {
const event = rawDataToString(rawData);
···
return;
}
-
const { channel } = shardMessage
-
if(!socketAllowedChannels.has(channel)) return;
+
const { channel, routedThrough } = shardMessage;
+
if (!socketAllowedChannels.has(channel)) return;
+
if (routedThrough !== socketLatticeDid) return;
storeMessageInDb(shardMessage)
.then(() => {
···
});
break;
}
+
case "shard/requestHistory": {
+
const {
+
success,
+
error,
+
data: requestHistoryMessage,
+
} = requestHistoryMessageSchema.safeParse(
+
validateTypeResult.data,
+
);
+
if (!success) {
+
console.error(
+
"could not parse",
+
validateTypeResult.data,
+
"as a valid request history message.",
+
);
+
console.error(z.treeifyError(error));
+
return;
+
}
+
const { channel, requestedBy } = requestHistoryMessage;
+
if (!socketAllowedChannels.has(channel)) return;
+
+
(async () => {
+
const messagesResult = await getChannelHistory(channel);
+
if (!messagesResult.ok) {
+
console.error(messagesResult.error);
+
throw new Error(
+
"Channel history function returned 0 results.",
+
);
+
}
+
const historyMessage: HistoryMessage = {
+
type: "shard/history",
+
channel,
+
messages: messagesResult.data.map((message) => ({
+
type: "shard/message",
+
channel,
+
content: message.content,
+
sentBy: message.authorDid as Did,
+
sentAt: message.createdAt,
+
routedThrough: socketLatticeDid,
+
})),
+
forClient: requestedBy,
+
};
+
socket.send(JSON.stringify(historyMessage));
+
console.log("sent off", historyMessage, "back to lattice");
+
})().catch((e: unknown) => {
+
console.error("Could not get channel history.");
+
console.error(e);
+
});
+
}
}
+
});
+
+
socket.on("close", () => {
+
deleteSession(sessionInfo);
});
};
+2
src/lib/types/messages.ts
···
type: z.literal("shard/history"),
messages: z.optional(z.array(shardMessageSchema)),
channel: z.string(),
+
forClient: didSchema,
})
.strict();
export type HistoryMessage = z.infer<typeof historyMessageSchema>;
···
.safeExtend({
type: z.literal("shard/requestHistory"),
channel: z.string(),
+
requestedBy: didSchema,
})
.strict();
export type RequestHistoryMessage = z.infer<typeof requestHistoryMessageSchema>;
+19
src/lib/utils/gmstn.ts
···
import db from "@/db";
+
import type { ShardMessageSelect } from "@/db/schema/messages";
import { messagesTable, type ShardMessageInsert } from "@/db/schema/messages";
import type { ShardMessage } from "@/lib/types/messages";
+
import type { Result } from "@/lib/utils/result";
import * as TID from "@atcute/tid";
+
import { eq } from "drizzle-orm";
export const storeMessageInDb = async (message: ShardMessage) => {
const tid = TID.now();
···
console.error("insertResult:", insertResult);
}
};
+
+
export const getChannelHistory = async (
+
channelAtUriString: string,
+
): Promise<Result<Array<ShardMessageSelect>, unknown>> => {
+
const messages = await db
+
.select()
+
.from(messagesTable)
+
.where(eq(messagesTable.channelAtUri, channelAtUriString))
+
.limit(100);
+
if (messages.length === 0)
+
return {
+
ok: false,
+
error: "Channel either has no messages, or the provided channel at uri is wrong.",
+
};
+
return { ok: true, data: messages };
+
};