tracks lexicons and how many times they appeared on the jetstream

Compare changes

Choose any two refs to compare.

+2
.gitignore
···
result
server/bsky_zstd_dictionary
+
fjall_data*
+
.fjall_data*
+12 -3
README.md
···
-
a webapp and server that monitors the jetstream and tracks the different lexicons as they are created or deleted.
-
it shows you which collections are most active on the network.
+
a webapp and server that monitors the jetstream and tracks the different
+
lexicons as they are created or deleted. it shows you which collections are most
+
active on the network.
for backend it uses rust with fjall as db, the frontend is built with sveltekit.
see [here](https://gaze.systems/nsid-tracker) for a hosted instance of it.
+
## performance / storage
+
+
it uses about 50MB of space for 620M recorded events (events being just
+
timestamp in seconds and deleted boolean for now). and around 50-60ms for
+
querying 300-400k events.
+
+
this is on a machine with AMD EPYC 7281 (32) @ 2.100GHz.
+
## running
### with nix
-
- run the server: `nix run git+https://tangled.sh/@poor.dog/nsid-tracker#server`
+
- build the server: `nix build git+https://tangled.sh/@poor.dog/nsid-tracker#server`
- build the client: `nix build git+https://tangled.sh/@poor.dog/nsid-tracker#client`
### manually
+9
client/bun.lock
···
"name": "nsid-tracker",
"dependencies": {
"@number-flow/svelte": "^0.3.9",
+
"svelte-adapter-bun": "^0.5.2",
},
"devDependencies": {
"@eslint/compat": "^1.2.5",
···
"globals": ["globals@16.3.0", "", {}, "sha512-bqWEnJ1Nt3neqx2q5SFfGS8r/ahumIakg3HcwtNlrVlwXIeNumWn/c7Pn/wKzGhf6SaW6H6uWXLqC30STCMchQ=="],
+
"globalyzer": ["globalyzer@0.1.0", "", {}, "sha512-40oNTM9UfG6aBmuKxk/giHn5nQ8RVz/SS4Ir6zgzOv9/qC3kKZ9v4etGTcJbEl/NyVQH7FGU7d+X1egr57Md2Q=="],
+
+
"globrex": ["globrex@0.1.2", "", {}, "sha512-uHJgbwAMwNFf5mLst7IWLNg14x1CkeqglJb/K3doi4dw6q2IvAAmM/Y81kevy83wP+Sst+nutFTYOGg3d1lsxg=="],
+
"graceful-fs": ["graceful-fs@4.2.11", "", {}, "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ=="],
"graphemer": ["graphemer@1.4.0", "", {}, "sha512-EtKwoO6kxCL9WO5xipiHTZlSzBm7WLT627TqC/uVRd0HKmq8NXyebnNYxDoBi7wt8eTWrUrKXCOVaFq9x1kgag=="],
···
"svelte": ["svelte@5.36.8", "", { "dependencies": { "@ampproject/remapping": "^2.3.0", "@jridgewell/sourcemap-codec": "^1.5.0", "@sveltejs/acorn-typescript": "^1.0.5", "@types/estree": "^1.0.5", "acorn": "^8.12.1", "aria-query": "^5.3.1", "axobject-query": "^4.1.0", "clsx": "^2.1.1", "esm-env": "^1.2.1", "esrap": "^2.1.0", "is-reference": "^3.0.3", "locate-character": "^3.0.0", "magic-string": "^0.30.11", "zimmerframe": "^1.1.2" } }, "sha512-8JbZWQu96hMjH/oYQPxXW6taeC6Awl6muGHeZzJTxQx7NGRQ/J9wN1hkzRKLOlSDlbS2igiFg7p5xyTp5uXG3A=="],
+
"svelte-adapter-bun": ["svelte-adapter-bun@0.5.2", "", { "dependencies": { "tiny-glob": "^0.2.9" } }, "sha512-xEtFgaal6UgrCwwkSIcapO9kopoFNUYCYqyKCikdqxX9bz2TDYnrWQZ7qBnkunMxi1HOIERUCvTcebYGiarZLA=="],
+
"svelte-check": ["svelte-check@4.3.0", "", { "dependencies": { "@jridgewell/trace-mapping": "^0.3.25", "chokidar": "^4.0.1", "fdir": "^6.2.0", "picocolors": "^1.0.0", "sade": "^1.7.4" }, "peerDependencies": { "svelte": "^4.0.0 || ^5.0.0-next.0", "typescript": ">=5.0.0" }, "bin": { "svelte-check": "bin/svelte-check" } }, "sha512-Iz8dFXzBNAM7XlEIsUjUGQhbEE+Pvv9odb9+0+ITTgFWZBGeJRRYqHUUglwe2EkLD5LIsQaAc4IUJyvtKuOO5w=="],
"svelte-eslint-parser": ["svelte-eslint-parser@1.3.0", "", { "dependencies": { "eslint-scope": "^8.2.0", "eslint-visitor-keys": "^4.0.0", "espree": "^10.0.0", "postcss": "^8.4.49", "postcss-scss": "^4.0.9", "postcss-selector-parser": "^7.0.0" }, "peerDependencies": { "svelte": "^3.37.0 || ^4.0.0 || ^5.0.0" }, "optionalPeers": ["svelte"] }, "sha512-VCgMHKV7UtOGcGLGNFSbmdm6kEKjtzo5nnpGU/mnx4OsFY6bZ7QwRF5DUx+Hokw5Lvdyo8dpk8B1m8mliomrNg=="],
···
"tapable": ["tapable@2.2.2", "", {}, "sha512-Re10+NauLTMCudc7T5WLFLAwDhQ0JWdrMK+9B2M8zR5hRExKmsRDCBA7/aV/pNJFltmBFO5BAMlQFi/vq3nKOg=="],
"tar": ["tar@7.4.3", "", { "dependencies": { "@isaacs/fs-minipass": "^4.0.0", "chownr": "^3.0.0", "minipass": "^7.1.2", "minizlib": "^3.0.1", "mkdirp": "^3.0.1", "yallist": "^5.0.0" } }, "sha512-5S7Va8hKfV7W5U6g3aYxXmlPoZVAwUMy9AOKyF2fVuZa2UD3qZjg578OrLRt8PcNN1PleVaL/5/yYATNL0ICUw=="],
+
+
"tiny-glob": ["tiny-glob@0.2.9", "", { "dependencies": { "globalyzer": "0.1.0", "globrex": "^0.1.2" } }, "sha512-g/55ssRPUjShh+xkfx9UPDXqhckHEsHr4Vd9zX55oSdGZc/MD0m3sferOkwWtp98bv+kcVfEHtRJgBVJzelrzg=="],
"tinyglobby": ["tinyglobby@0.2.14", "", { "dependencies": { "fdir": "^6.4.4", "picomatch": "^4.0.2" } }, "sha512-tX5e7OM1HnYr2+a2C/4V0htOcSQcoSTH9KgJnVvNm5zm/cyEWKJ7j7YutsH9CxMdtOkkLFy2AHrMci9IM8IPZQ=="],
+2 -1
client/package.json
···
},
"type": "module",
"dependencies": {
-
"@number-flow/svelte": "^0.3.9"
+
"@number-flow/svelte": "^0.3.9",
+
"svelte-adapter-bun": "^0.5.2"
}
}
+4
client/src/app.css
···
overflow-y: overlay;
overflow-y: auto; /* Fallback for browsers that don't support overlay */
}
+
+
.wsbadge {
+
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
+
}
+9 -9
client/src/app.html
···
<!doctype html>
<html lang="en">
-
<head>
-
<meta charset="utf-8" />
-
<link rel="icon" href="%sveltekit.assets%/favicon.svg" />
-
<meta name="viewport" content="width=device-width, initial-scale=1" />
-
%sveltekit.head%
-
</head>
-
<body data-sveltekit-preload-data="hover">
-
<div style="display: contents">%sveltekit.body%</div>
-
</body>
+
<head>
+
<meta charset="utf-8" />
+
<link rel="icon" href="%sveltekit.assets%/favicon.svg" />
+
<meta name="viewport" content="width=device-width, initial-scale=1" />
+
%sveltekit.head%
+
</head>
+
<body class="bg-white dark:bg-gray-900" data-sveltekit-preload-data="hover">
+
<div style="display: contents">%sveltekit.body%</div>
+
</body>
</html>
+13 -1
client/src/lib/api.ts
···
import { dev } from "$app/environment";
-
import type { Events } from "./types";
+
import type { Events, Since } from "./types";
import { PUBLIC_API_URL } from "$env/static/public";
export const fetchEvents = async (): Promise<Events> => {
···
const data = await response.json();
return data;
};
+
+
export const fetchTrackingSince = async (): Promise<Since> => {
+
const response = await fetch(
+
`${dev ? "http" : "https"}://${PUBLIC_API_URL}/since`,
+
);
+
if (!response.ok) {
+
throw new Error(`(${response.status}): ${await response.json()}`);
+
}
+
+
const data = await response.json();
+
return data;
+
};
+2 -9
client/src/lib/components/BskyToggle.svelte
···
<!-- svelte-ignore a11y_no_static_element_interactions -->
<button
onclick={onBskyToggle}
-
class="wsbadge !mt-0 !font-normal bg-yellow-100 hover:bg-yellow-200 border-yellow-300"
+
class="wsbadge !mt-0 !font-normal bg-blue-100 dark:bg-blue-900 hover:bg-blue-200 dark:hover:bg-blue-800 border-blue-300 dark:border-blue-700"
>
<input checked={dontShowBsky} type="checkbox" />
-
<span class="ml-0.5"> hide app.bsky.* </span>
+
<span class="ml-0.5 text-black dark:text-gray-200"> hide app.bsky.* </span>
</button>
-
-
<style lang="postcss">
-
@reference "../../app.css";
-
.wsbadge {
-
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
-
}
-
</style>
+8 -5
client/src/lib/components/EventCard.svelte
···
</script>
<div
-
class="group flex flex-col gap-2 p-1.5 md:p-3 min-h-64 bg-white border border-gray-200 rounded-lg hover:shadow-lg md:hover:-translate-y-1 transition-all duration-200 transform"
+
class="group flex flex-col gap-2 p-1.5 md:p-3 min-h-64 bg-white dark:bg-gray-800/50 border border-gray-200 dark:border-gray-950 rounded-lg hover:shadow-lg md:hover:-translate-y-1 transition-all duration-200 transform"
class:has-activity={isAnimating}
style="--border-thickness: {borderThickness}px"
>
<div class="flex items-start gap-2">
<div
-
class="text-sm font-bold text-blue-600 bg-blue-100 px-3 py-1 rounded-full"
+
class="text-sm font-bold text-blue-600 bg-blue-100 dark:bg-indigo-950 px-3 py-1 rounded-full"
>
#{index + 1}
</div>
<div
title={event.nsid}
-
class="font-mono text-sm text-gray-700 mt-0.5 leading-relaxed rounded-full text-nowrap text-ellipsis overflow-hidden group-hover:overflow-visible group-hover:bg-gray-50 border-gray-100 group-hover:border transition-all px-1"
+
class="font-mono text-sm text-gray-700 dark:text-gray-300 mt-0.5 leading-relaxed rounded-full text-nowrap text-ellipsis overflow-hidden group-hover:overflow-visible group-hover:bg-gray-50 dark:group-hover:bg-gray-700 border-gray-100 dark:border-gray-900 group-hover:border transition-all px-1"
>
{event.nsid}
</div>
···
</div>
</div>
-
<style>
+
<style lang="postcss">
.has-activity {
position: relative;
transition: all 0.2s ease-out;
}
.has-activity::before {
+
@reference "../../app.css";
+
@apply border-blue-500 dark:border-blue-800;
content: "";
position: absolute;
top: calc(-1 * var(--border-thickness));
left: calc(-1 * var(--border-thickness));
right: calc(-1 * var(--border-thickness));
bottom: calc(-1 * var(--border-thickness));
-
border: var(--border-thickness) solid rgba(59, 130, 246, 0.8);
+
border-width: var(--border-thickness);
+
border-style: solid;
border-radius: calc(0.5rem + var(--border-thickness));
pointer-events: none;
transition: all 0.3s ease-out;
+5 -10
client/src/lib/components/FilterControls.svelte
···
</script>
<div
-
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-blue-100 hover:bg-blue-200 border-blue-300"
+
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-blue-100 dark:bg-blue-900 hover:bg-blue-200 dark:hover:bg-blue-800 border-blue-300 dark:border-blue-700"
>
-
<label for="filter-regex" class="text-blue-800 mr-1"> filter: </label>
+
<label for="filter-regex" class="text-blue-800 dark:text-gray-200 mr-1">
+
filter:
+
</label>
<input
id="filter-regex"
value={filterRegex}
oninput={(e) => onFilterChange((e.target as HTMLInputElement).value)}
type="text"
placeholder="regex..."
-
class="bg-blue-50 text-blue-900 placeholder-blue-400 border border-blue-200 rounded-full px-1 outline-none focus:bg-white focus:border-blue-400 min-w-0 w-24"
+
class="bg-blue-50 dark:bg-blue-950 text-blue-900 dark:text-gray-400 placeholder-blue-400 dark:placeholder-blue-700 border border-blue-200 dark:border-blue-700 rounded-full px-1 outline-none focus:border-blue-400 min-w-0 w-24"
/>
</div>
-
-
<style lang="postcss">
-
@reference "../../app.css";
-
.wsbadge {
-
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
-
}
-
</style>
+6 -11
client/src/lib/components/RefreshControl.svelte
···
</script>
<div
-
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-green-100 hover:bg-green-200 border-green-300"
+
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-lime-100 dark:bg-lime-900 dark:hover:bg-lime-800 hover:bg-lime-200 border-lime-300 dark:border-lime-700"
>
-
<label for="refresh-rate" class="text-green-800 mr-1">refresh:</label>
+
<label for="refresh-rate" class="text-lime-800 dark:text-lime-200 mr-1"
+
>refresh:</label
+
>
<input
id="refresh-rate"
value={refreshRate}
···
pattern="[0-9]*"
min="0"
placeholder="real-time"
-
class="bg-green-50 text-green-900 placeholder-green-400 border border-green-200 rounded-full px-1 outline-none focus:bg-white focus:border-green-400 min-w-0 w-20"
+
class="bg-green-50 dark:bg-green-900 text-lime-900 dark:text-lime-200 placeholder-lime-600 dark:placeholder-lime-400 border border-lime-200 dark:border-lime-700 rounded-full px-1 outline-none focus:border-lime-400 min-w-0 w-20"
/>
-
<span class="text-green-700">s</span>
+
<span class="text-lime-800 dark:text-lime-200">s</span>
</div>
-
-
<style lang="postcss">
-
@reference "../../app.css";
-
.wsbadge {
-
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
-
}
-
</style>
+31
client/src/lib/components/ShowControls.svelte
···
+
<script lang="ts">
+
import type { ShowOption } from "$lib/types";
+
+
interface Props {
+
show: ShowOption;
+
onShowChange: (value: ShowOption) => void;
+
}
+
+
let { show, onShowChange }: Props = $props();
+
+
const showOptions: ShowOption[] = ["server init", "stream start"];
+
</script>
+
+
<div
+
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-pink-100 dark:bg-pink-800 hover:bg-pink-200 dark:hover:bg-pink-700 border-pink-300 dark:border-pink-700"
+
>
+
<label for="show" class="text-pink-800 dark:text-pink-100 mr-1">
+
show since:
+
</label>
+
<select
+
id="show"
+
value={show}
+
onchange={(e) =>
+
onShowChange((e.target as HTMLSelectElement).value as ShowOption)}
+
class="bg-pink-50 dark:bg-pink-900 text-pink-900 dark:text-pink-100 border border-pink-200 dark:border-pink-700 rounded-full px-1 outline-none focus:border-pink-400 min-w-0"
+
>
+
{#each showOptions as option}
+
<option value={option}>{option}</option>
+
{/each}
+
</select>
+
</div>
+5 -10
client/src/lib/components/SortControls.svelte
···
</script>
<div
-
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-purple-100 hover:bg-purple-200 border-purple-300"
+
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-purple-100 dark:bg-purple-800 hover:bg-purple-200 dark:hover:bg-purple-700 border-purple-300 dark:border-purple-700"
>
-
<label for="sort-by" class="text-purple-800 mr-1"> sort by: </label>
+
<label for="sort-by" class="text-purple-800 dark:text-purple-300 mr-1">
+
sort by:
+
</label>
<select
id="sort-by"
value={sortBy}
onchange={(e) =>
onSortChange((e.target as HTMLSelectElement).value as SortOption)}
-
class="bg-purple-50 text-purple-900 border border-purple-200 rounded-full px-1 outline-none focus:bg-white focus:border-purple-400 min-w-0"
+
class="bg-purple-50 dark:bg-purple-900 text-purple-900 dark:text-purple-300 border border-purple-200 dark:border-purple-700 rounded-full px-1 outline-none focus:border-purple-400 min-w-0"
>
{#each sortOptions as option}
<option value={option.value}>{option.label}</option>
{/each}
</select>
</div>
-
-
<style lang="postcss">
-
@reference "../../app.css";
-
.wsbadge {
-
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
-
}
-
</style>
+19 -22
client/src/lib/components/StatsCard.svelte
···
<script lang="ts">
import { formatNumber } from "$lib/format";
-
import NumberFlow from "@number-flow/svelte";
const colorClasses = {
green: {
-
bg: "from-green-50 to-green-100",
-
border: "border-green-200",
-
titleText: "text-green-700",
-
valueText: "text-green-900",
+
bg: "from-green-50 to-green-100 dark:from-green-900 dark:to-green-800",
+
border: "border-green-200 dark:border-green-800",
+
titleText: "text-green-700 dark:text-green-400",
+
valueText: "text-green-900 dark:text-green-200",
},
red: {
-
bg: "from-red-50 to-red-100",
-
border: "border-red-200",
-
titleText: "text-red-700",
-
valueText: "text-red-900",
+
bg: "from-red-50 to-red-100 dark:from-red-900 dark:to-red-800",
+
border: "border-red-200 dark:border-red-800",
+
titleText: "text-red-700 dark:text-red-400",
+
valueText: "text-red-900 dark:text-red-200",
},
turqoise: {
-
bg: "from-teal-50 to-teal-100",
-
border: "border-teal-200",
-
titleText: "text-teal-700",
-
valueText: "text-teal-900",
+
bg: "from-teal-50 to-teal-100 dark:from-teal-900 dark:to-teal-800",
+
border: "border-teal-200 dark:border-teal-800",
+
titleText: "text-teal-700 dark:text-teal-400",
+
valueText: "text-teal-900 dark:text-teal-200",
},
orange: {
-
bg: "from-orange-50 to-orange-100",
-
border: "border-orange-200",
-
titleText: "text-orange-700",
-
valueText: "text-orange-900",
+
bg: "from-orange-50 to-orange-100 dark:from-orange-900 dark:to-orange-800",
+
border: "border-orange-200 dark:border-orange-800",
+
titleText: "text-orange-700 dark:text-orange-400",
+
valueText: "text-orange-900 dark:text-orange-200",
},
};
···
const colors = $derived(colorClasses[colorScheme]);
</script>
-
<div
-
class="bg-gradient-to-r {colors.bg} p-3 md:p-6 rounded-lg border {colors.border}"
-
>
+
<div class="bg-gradient-to-r {colors.bg} p-3 rounded-lg border {colors.border}">
<h3 class="text-base font-medium {colors.titleText} mb-2">
{title}
</h3>
-
<p class="text-xl md:text-3xl font-bold {colors.valueText}">
-
<NumberFlow {value} />
+
<p class="text-xl md:text-2xl font-bold {colors.valueText}">
+
{formatNumber(value)}
</p>
</div>
+18 -14
client/src/lib/components/StatusBadge.svelte
···
const statusConfig = {
connected: {
text: "stream live",
-
classes: "bg-green-100 text-green-800 border-green-200",
+
classes:
+
"bg-green-100 dark:bg-green-900 text-green-800 dark:text-green-200 border-green-200 dark:border-green-800",
},
connecting: {
text: "stream connecting",
-
classes: "bg-yellow-100 text-yellow-800 border-yellow-200",
+
classes:
+
"bg-yellow-100 dark:bg-yellow-900 text-yellow-800 dark:text-yellow-200 border-yellow-200 dark:border-yellow-800",
},
error: {
text: "stream errored",
-
classes: "bg-red-100 text-red-800 border-red-200",
+
classes:
+
"bg-red-100 dark:bg-red-900 text-red-800 dark:text-red-200 border-red-200 dark:border-red-800",
},
disconnected: {
text: "stream offline",
-
classes: "bg-gray-100 text-gray-800 border-gray-200",
+
classes:
+
"bg-gray-100 dark:bg-gray-900 text-gray-800 dark:text-gray-200 border-gray-200 dark:border-gray-800",
},
};
const config = $derived(statusConfig[status]);
</script>
-
<span class="wsbadge {config.classes}">
-
{config.text}
-
</span>
-
-
<style lang="postcss">
-
@reference "../../app.css";
-
.wsbadge {
-
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
-
}
-
</style>
+
<div class="flex flex-row items-center gap-2 wsbadge {config.classes}">
+
<!-- connecting spinner -->
+
{#if status === "connecting"}
+
<div
+
class="animate-spin rounded-full h-4 w-4 border-b-2 border-yellow-800 dark:border-yellow-200"
+
></div>
+
{/if}
+
<!-- status text -->
+
<span>{config.text}</span>
+
</div>
+5 -1
client/src/lib/format.ts
···
return num.toLocaleString();
};
+
const isValidDate = (d: Date) => d instanceof Date && !isNaN(d.getTime());
export const formatTimestamp = (timestamp: number): string => {
-
return new Date(timestamp / 1000).toLocaleString();
+
const date = new Date(timestamp * 1000);
+
return isValidDate(date)
+
? date.toLocaleString()
+
: new Date(timestamp / 1000).toLocaleString();
};
+4
client/src/lib/types.ts
···
count: number;
deleted_count: number;
};
+
export type Since = {
+
since: number;
+
};
export type SortOption = "total" | "created" | "deleted" | "date";
+
export type ShowOption = "server init" | "stream start";
+3 -2
client/src/routes/+layout.ts
···
-
export const prerender = true;
-
export const ssr = false;
+
export const prerender = false;
+
export const ssr = true;
+
export const csr = true;
+7
client/src/routes/+page.server.ts
···
+
import { fetchEvents, fetchTrackingSince } from "$lib/api";
+
+
export const load = async () => {
+
const events = await fetchEvents();
+
const trackingSince = await fetchTrackingSince();
+
return { events, trackingSince };
+
};
+127 -75
client/src/routes/+page.svelte
···
<script lang="ts">
import { dev } from "$app/environment";
-
import type { EventRecord, NsidCount, SortOption } from "$lib/types";
+
import type {
+
EventRecord,
+
Events,
+
NsidCount,
+
ShowOption,
+
Since,
+
SortOption,
+
} from "$lib/types";
import { onMount, onDestroy } from "svelte";
-
import { writable } from "svelte/store";
+
import { get, writable } from "svelte/store";
import { PUBLIC_API_URL } from "$env/static/public";
-
import { fetchEvents } from "$lib/api";
+
import { fetchEvents, fetchTrackingSince } from "$lib/api";
import { createRegexFilter } from "$lib/filter";
import StatsCard from "$lib/components/StatsCard.svelte";
import StatusBadge from "$lib/components/StatusBadge.svelte";
···
import SortControls from "$lib/components/SortControls.svelte";
import BskyToggle from "$lib/components/BskyToggle.svelte";
import RefreshControl from "$lib/components/RefreshControl.svelte";
+
import { formatTimestamp } from "$lib/format";
+
import ShowControls from "$lib/components/ShowControls.svelte";
-
const events = writable(new Map<string, EventRecord>());
+
type Props = {
+
data: { events: Events; trackingSince: Since };
+
};
+
+
const { data }: Props = $props();
+
+
const events = writable(
+
new Map<string, EventRecord>(Object.entries(data.events.events)),
+
);
+
const eventsStart = new Map<string, EventRecord>(
+
Object.entries(data.events.events),
+
);
const pendingUpdates = new Map<string, EventRecord>();
+
+
let updateTimer: NodeJS.Timeout | null = null;
+
let per_second = $state(data.events.per_second);
+
let tracking_since = $state(data.trackingSince.since);
+
+
const diffEvents = (
+
oldEvents: Map<string, EventRecord>,
+
newEvents: Map<string, EventRecord>,
+
): NsidCount[] => {
+
const nsidCounts: NsidCount[] = [];
+
for (const [nsid, event] of newEvents.entries()) {
+
const oldEvent = oldEvents.get(nsid);
+
if (oldEvent) {
+
const counts = {
+
nsid,
+
count: event.count - oldEvent.count,
+
deleted_count: event.deleted_count - oldEvent.deleted_count,
+
last_seen: event.last_seen,
+
};
+
if (counts.count > 0 || counts.deleted_count > 0)
+
nsidCounts.push(counts);
+
} else {
+
nsidCounts.push({
+
nsid,
+
...event,
+
});
+
}
+
}
+
return nsidCounts;
+
};
+
const applyEvents = (newEvents: Record<string, EventRecord>) => {
+
events.update((map) => {
+
for (const [nsid, event] of Object.entries(newEvents)) {
+
map.set(nsid, event);
+
}
+
return map;
+
});
+
};
+
+
let error: string | null = $state(null);
+
let filterRegex = $state("");
+
let dontShowBsky = $state(false);
+
let sortBy: SortOption = $state("total");
+
let refreshRate = $state("");
+
let changedByUser = $state(false);
+
let show: ShowOption = $state("server init");
let eventsList: NsidCount[] = $state([]);
-
let updateTimer: NodeJS.Timeout | null = null;
-
events.subscribe((value) => {
-
eventsList = value
-
.entries()
-
.map(([nsid, event]) => ({
-
nsid,
-
...event,
-
}))
-
.toArray();
+
let updateEventsList = $derived((value: Map<string, EventRecord>) => {
+
switch (show) {
+
case "server init":
+
eventsList = value
+
.entries()
+
.map(([nsid, event]) => ({
+
nsid,
+
...event,
+
}))
+
.toArray();
+
break;
+
case "stream start":
+
eventsList = diffEvents(eventsStart, value);
+
break;
+
}
});
-
let per_second = $state(0);
-
+
events.subscribe((value) => updateEventsList(value));
let all: EventRecord = $derived(
eventsList.reduce(
(acc, event) => {
···
},
),
);
-
let error: string | null = $state(null);
-
let filterRegex = $state("");
-
let dontShowBsky = $state(false);
-
let sortBy: SortOption = $state("total");
-
let refreshRate = $state("");
-
let previousRefreshRate = $state("");
let websocket: WebSocket | null = null;
let isStreamOpen = $state(false);
···
};
websocket.onmessage = async (event) => {
const jsonData = JSON.parse(event.data);
-
-
if (jsonData.per_second > 0) {
-
per_second = jsonData.per_second;
-
}
-
-
// Store updates in pending map if refresh rate is set
+
per_second = jsonData.per_second;
if (refreshRate) {
for (const [nsid, event] of Object.entries(jsonData.events)) {
pendingUpdates.set(nsid, event as EventRecord);
}
} else {
-
// Apply updates immediately if no refresh rate
-
events.update((map) => {
-
for (const [nsid, event] of Object.entries(
-
jsonData.events,
-
)) {
-
map.set(nsid, event as EventRecord);
-
}
-
return map;
-
});
+
applyEvents(jsonData.events);
}
};
websocket.onerror = (error) => {
···
error = null;
const data = await fetchEvents();
per_second = data.per_second;
-
events.update((map) => {
-
for (const [nsid, event] of Object.entries(data.events)) {
-
map.set(nsid, event);
-
}
-
return map;
-
});
+
applyEvents(data.events);
+
tracking_since = (await fetchTrackingSince()).since;
} catch (err) {
error =
err instanceof Error
···
console.error("error loading data:", err);
}
};
-
-
// Set refresh rate when sort mode changes
-
$effect(() => {
-
if (sortBy === "date" && refreshRate === "") {
-
// Only set to 2 if we're in real-time mode
-
refreshRate = "2";
-
} else if (
-
sortBy !== "date" &&
-
refreshRate === "2" &&
-
previousRefreshRate === "2"
-
) {
-
// Return to real-time mode if we auto-set the refresh rate
-
refreshRate = "";
-
}
-
});
// Update the refresh timer when refresh rate changes
$effect(() => {
···
/>
</svelte:head>
-
<header class="border-gray-300 border-b mb-4 pb-2">
+
<header
+
class="bg-white dark:bg-gray-900 border-gray-300 dark:border-gray-950 border-b mb-4 pb-2"
+
>
<div
class="px-2 md:ml-[19vw] mx-auto flex flex-wrap items-center text-center"
>
-
<h1 class="text-4xl font-bold mr-4 text-gray-900">lexicon tracker</h1>
-
<p class="text-lg mt-1 text-gray-600">
-
tracks lexicons seen on the jetstream
+
<h1 class="text-4xl font-bold mr-4 text-gray-900 dark:text-gray-200">
+
lexicon tracker
+
</h1>
+
<p class="text-lg mt-1 text-gray-600 dark:text-gray-300">
+
tracks lexicons seen on the jetstream {tracking_since === 0
+
? ""
+
: `(since: ${formatTimestamp(tracking_since)})`}
</p>
</div>
</header>
-
<div class="md:max-w-[61vw] mx-auto p-2">
-
<div
-
class="min-w-fit grid grid-cols-2 xl:grid-cols-4 gap-2 2xl:gap-6 2xl:mx-16 mb-8"
-
>
+
<div class="bg-white dark:bg-gray-900 md:max-w-[61vw] mx-auto p-2">
+
<div class="min-w-fit grid grid-cols-2 xl:grid-cols-4 gap-2 2xl:gap-6 mb-8">
<StatsCard
title="total creation"
value={all.count}
···
{#if error}
<div
-
class="bg-red-100 border border-red-300 text-red-700 px-4 py-3 rounded-lg mb-6"
+
class="bg-red-100 dark:bg-red-900 border border-red-300 dark:border-red-700 text-red-700 dark:text-red-200 px-4 py-3 rounded-lg mb-6"
>
<p>Error: {error}</p>
</div>
···
{#if eventsList.length > 0}
<div class="mb-8">
<div class="flex flex-wrap items-center gap-3 mb-3">
-
<h2 class="text-2xl font-bold text-gray-900">seen lexicons</h2>
+
<h2 class="text-2xl font-bold text-gray-900 dark:text-gray-200">
+
seen lexicons
+
</h2>
<StatusBadge status={websocketStatus} />
</div>
<div class="flex flex-wrap items-center gap-1.5 mb-6">
<FilterControls
{filterRegex}
onFilterChange={(value) => (filterRegex = value)}
-
/>
-
<SortControls
-
{sortBy}
-
onSortChange={(value: SortOption) => (sortBy = value)}
/>
<BskyToggle
{dontShowBsky}
onBskyToggle={() => (dontShowBsky = !dontShowBsky)}
/>
+
<SortControls
+
{sortBy}
+
onSortChange={(value: SortOption) => {
+
sortBy = value;
+
if (refreshRate === "" && sortBy === "date")
+
refreshRate = "2";
+
else if (refreshRate === "2" && changedByUser === false)
+
refreshRate = "";
+
}}
+
/>
+
<ShowControls
+
{show}
+
onShowChange={(value: ShowOption) => {
+
show = value;
+
updateEventsList(get(events));
+
}}
+
/>
<RefreshControl
{refreshRate}
onRefreshChange={(value) => {
-
previousRefreshRate = refreshRate;
refreshRate = value;
+
changedByUser = refreshRate !== "";
}}
/>
</div>
···
{/if}
</div>
-
<footer class="py-2 border-t border-gray-200 text-center">
-
<p class="text-gray-600 text-sm">
+
<footer class="py-2 border-t border-gray-200 dark:border-gray-800 text-center">
+
<p class="text-gray-600 dark:text-gray-200 text-sm">
source code <a
href="https://tangled.sh/@poor.dog/nsid-tracker"
target="_blank"
rel="noopener noreferrer"
-
class="text-blue-600 hover:text-blue-800 underline"
+
class="text-blue-600 dark:text-blue-400 hover:text-blue-800 dark:hover:text-blue-600 underline"
>@poor.dog/nsid-tracker</a
>
</p>
+1 -1
client/svelte.config.js
···
-
import adapter from "@sveltejs/adapter-static";
+
import adapter from "svelte-adapter-bun";
import { vitePreprocess } from "@sveltejs/vite-plugin-svelte";
/** @type {import('@sveltejs/kit').Config} */
+1 -1
nix/client-modules.nix
···
src = ../client;
-
outputHash = "sha256-t8PJFo+3XGkzmMNbw9Rf9cS5Ob5YtI8ucX3ay+u9a3M=";
+
outputHash = "sha256-njwXk3u0NUsYWLv9EOdCltgQOjTVkcfu+D+0COSw/6I=";
outputHashAlgo = "sha256";
outputHashMode = "recursive";
+10 -2
nix/client.nix
···
{
+
lib,
stdenv,
makeBinaryWrapper,
bun,
···
'';
buildPhase = ''
runHook preBuild
-
bun --prefer-offline run --bun build
+
bun --prefer-offline run build
runHook postBuild
'';
installPhase = ''
runHook preInstall
-
mkdir -p $out
+
+
mkdir -p $out/bin
cp -R ./build/* $out
+
cp -R ./node_modules $out
+
+
makeBinaryWrapper ${bun}/bin/bun $out/bin/website \
+
--prefix PATH : ${lib.makeBinPath [ bun ]} \
+
--add-flags "run --bun --no-install --cwd $out start"
+
runHook postInstall
'';
}
+5 -2
nix/server.nix
···
{
-
rustPlatform,
-
...
+
rustPlatform,
+
cmake,
+
...
}:
rustPlatform.buildRustPackage {
pname = "nsid-tracker-server";
version = "main";
src = ../server;
+
+
nativeBuildInputs = [ cmake ];
cargoLock = {
lockFile = ../server/Cargo.lock;
+1 -1
server/.gitignore
···
target
-
.fjall_data
+
.fjall_data*
+176 -31
server/Cargo.lock
···
"cfg-if",
"getrandom 0.3.3",
"once_cell",
+
"serde",
"version_check",
"zerocopy",
]
···
version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
+
+
[[package]]
+
name = "arc-swap"
+
version = "1.7.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "async-compression"
···
]
[[package]]
+
name = "branches"
+
version = "0.2.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a918aa7a861caeba57e502465c30e3a0d74ae02ee0b9db2933602fdb6a3a90e5"
+
dependencies = [
+
"rustc_version",
+
]
+
+
[[package]]
name = "brotli"
version = "8.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511"
dependencies = [
"cfg-if",
+
]
+
+
[[package]]
+
name = "crossbeam-deque"
+
version = "0.8.6"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
+
dependencies = [
+
"crossbeam-epoch",
+
"crossbeam-utils",
]
[[package]]
···
checksum = "c0d05e1c0dbad51b52c38bda7adceef61b9efc2baf04acfe8726a8c4630a6f57"
[[package]]
+
name = "either"
+
version = "1.15.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
+
+
[[package]]
name = "enum_dispatch"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5"
[[package]]
+
name = "hermit-abi"
+
version = "0.5.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
+
+
[[package]]
name = "http"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
]
[[package]]
+
name = "itertools"
+
version = "0.14.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
+
dependencies = [
+
"either",
+
]
+
+
[[package]]
name = "itoa"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
]
[[package]]
+
name = "num_cpus"
+
version = "1.17.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b"
+
dependencies = [
+
"hermit-abi",
+
"libc",
+
]
+
+
[[package]]
name = "object"
version = "0.36.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
-
name = "overload"
-
version = "0.1.1"
+
name = "ordered-varint"
+
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
+
checksum = "e9cc9f18ab4bad1e01726bda1259feb8f11e5e76308708a966b4c0136e9db34c"
[[package]]
-
name = "papaya"
-
version = "0.2.3"
+
name = "overload"
+
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "f92dd0b07c53a0a0c764db2ace8c541dc47320dad97c2200c2a637ab9dd2328f"
-
dependencies = [
-
"equivalent",
-
"seize",
-
]
+
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
···
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
-
name = "pingora-limits"
-
version = "0.5.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "a719a8cb5558ca06bd6076c97b8905d500ea556da89e132ba53d4272844f95b9"
-
dependencies = [
-
"ahash",
-
]
-
-
[[package]]
name = "pkg-config"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
+
name = "quanta"
+
version = "0.12.6"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7"
+
dependencies = [
+
"crossbeam-utils",
+
"libc",
+
"once_cell",
+
"raw-cpuid",
+
"wasi 0.11.1+wasi-snapshot-preview1",
+
"web-sys",
+
"winapi",
+
]
+
+
[[package]]
name = "quick_cache"
version = "0.6.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
+
name = "raw-cpuid"
+
version = "11.5.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "c6df7ab838ed27997ba19a4664507e6f82b41fe6e20be42929332156e5e85146"
+
dependencies = [
+
"bitflags",
+
]
+
+
[[package]]
+
name = "rayon"
+
version = "1.10.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
+
dependencies = [
+
"either",
+
"rayon-core",
+
]
+
+
[[package]]
+
name = "rayon-core"
+
version = "1.12.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
+
dependencies = [
+
"crossbeam-deque",
+
"crossbeam-utils",
+
]
+
+
[[package]]
+
name = "rclite"
+
version = "0.2.7"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2f528dfeba924f5fc67bb84a17fe043451d1b392758016ce2d9e9116649b0f35"
+
dependencies = [
+
"branches",
+
]
+
+
[[package]]
name = "redox_syscall"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
+
name = "rustc_version"
+
version = "0.4.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
+
dependencies = [
+
"semver",
+
]
+
+
[[package]]
name = "rustix"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
+
name = "scc"
+
version = "2.3.4"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "22b2d775fb28f245817589471dd49c5edf64237f4a19d10ce9a92ff4651a27f4"
+
dependencies = [
+
"sdd",
+
]
+
+
[[package]]
name = "schannel"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
+
name = "sdd"
+
version = "3.0.10"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca"
+
+
[[package]]
name = "security-framework"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
-
name = "seize"
-
version = "0.5.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "e4b8d813387d566f627f3ea1b914c068aac94c40ae27ec43f5f33bde65abefe7"
-
dependencies = [
-
"libc",
-
"windows-sys 0.52.0",
-
]
-
-
[[package]]
name = "self_cell"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f7d95a54511e0c7be3f51e8867aa8cf35148d7b9445d44de2f943e2b206e749"
+
+
[[package]]
+
name = "semver"
+
version = "1.0.26"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0"
[[package]]
name = "serde"
···
name = "server"
version = "0.1.0"
dependencies = [
+
"ahash",
"anyhow",
+
"arc-swap",
"async-trait",
"axum",
"axum-tws",
+
"byteview",
"fjall",
"futures-util",
-
"papaya",
-
"pingora-limits",
+
"itertools",
+
"ordered-varint",
+
"parking_lot",
+
"quanta",
+
"rayon",
+
"rclite",
"rkyv",
"rustls",
+
"scc",
"serde",
"serde_json",
"smol_str",
+
"threadpool",
"tikv-jemallocator",
"tokio",
"tokio-util",
···
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
dependencies = [
"cfg-if",
+
]
+
+
[[package]]
+
name = "threadpool"
+
version = "1.8.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
+
dependencies = [
+
"num_cpus",
[[package]]
···
checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d"
dependencies = [
"unicode-ident",
+
]
+
+
[[package]]
+
name = "web-sys"
+
version = "0.3.77"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2"
+
dependencies = [
+
"js-sys",
+
"wasm-bindgen",
[[package]]
+13 -3
server/Cargo.toml
···
async-trait = "0.1"
tracing-subscriber = {version = "0.3", features = ["env-filter"]}
tracing = "0.1"
-
tokio = { version = "1", features = ["full"] }
+
tokio = { version = "1", features = ["full", "parking_lot"] }
tokio-util = { version = "0.7", features = ["tracing"] }
rustls = { version = "0.23", default-features = false, features = ["log", "ring", "std"] }
tokio-websockets = { version = "0.12", features = ["client", "rustls-platform-verifier", "getrandom", "ring"] }
futures-util = "0.3"
axum = { version = "0.8", default-features = false, features = ["http1", "tokio", "tracing", "json", "query"] }
axum-tws = { git = "https://github.com/90-008/axum-tws.git", features = ["http2"] }
-
pingora-limits = "0.5"
tower-http = {version = "0.6", features = ["request-id", "trace", "compression-full"]}
fjall = { version = "2", default-features = false, features = ["miniz", "lz4"] }
rkyv = {version = "0.8", features = ["unaligned"]}
smol_str = { version = "0.3", features = ["serde"] }
-
papaya = "0.2"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.141"
+
scc = "2.3.4"
+
ordered-varint = "2.0.0"
+
threadpool = "1.8.1"
+
quanta = "0.12.6"
+
itertools = "0.14.0"
+
byteview = "0.6.1"
+
rayon = "1.10.0"
+
parking_lot = { version = "0.12", features = ["send_guard", "hardware-lock-elision"] }
+
rclite = "0.2.7"
+
arc-swap = "1.7.1"
+
ahash = { version = "0.8.12", features = ["serde"] }
+
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"
+50 -29
server/src/api.rs
···
use std::{
-
collections::HashMap,
fmt::Display,
net::SocketAddr,
-
ops::Deref,
-
sync::Arc,
-
time::{Duration, UNIX_EPOCH},
+
ops::{Bound, Deref, RangeBounds},
+
time::Duration,
};
+
use ahash::AHashMap;
use anyhow::anyhow;
use axum::{
Json, Router,
···
routing::get,
};
use axum_tws::{Message, WebSocketUpgrade};
+
use rclite::Arc;
use serde::{Deserialize, Serialize};
use smol_str::SmolStr;
use tokio_util::sync::CancellationToken;
···
.route("/events", get(events))
.route("/stream_events", get(stream_events))
.route("/hits", get(hits))
+
.route("/since", get(since))
.route_layer(CompressionLayer::new().br(true).deflate(true).gzip(true).zstd(true))
.route_layer(PropagateRequestIdLayer::x_request_id())
.route_layer(
···
#[derive(Serialize)]
struct Events {
per_second: usize,
-
events: HashMap<SmolStr, NsidCount>,
+
events: AHashMap<SmolStr, NsidCount>,
}
async fn events(db: State<Arc<Db>>) -> AppResult<Json<Events>> {
-
let mut events = HashMap::new();
+
let mut events = AHashMap::new();
for result in db.get_counts() {
let (nsid, counts) = result?;
events.insert(
···
to: Option<u64>,
}
-
#[derive(Serialize)]
+
#[derive(Debug, Serialize)]
struct Hit {
timestamp: u64,
deleted: bool,
···
const MAX_HITS: usize = 100_000;
+
#[derive(Debug)]
+
struct HitsRange {
+
from: Bound<u64>,
+
to: Bound<u64>,
+
}
+
+
impl RangeBounds<u64> for HitsRange {
+
fn start_bound(&self) -> Bound<&u64> {
+
self.from.as_ref()
+
}
+
+
fn end_bound(&self) -> Bound<&u64> {
+
self.to.as_ref()
+
}
+
}
+
async fn hits(
State(db): State<Arc<Db>>,
Query(params): Query<HitsQuery>,
) -> AppResult<Json<Vec<Hit>>> {
-
let maybe_hits = db
-
.get_hits(
-
&params.nsid,
-
params.to.unwrap_or(0)
-
..params.from.unwrap_or(
-
std::time::SystemTime::now()
-
.duration_since(UNIX_EPOCH)
-
.expect("oops")
-
.as_micros() as u64,
-
),
-
)?
-
.take(MAX_HITS);
-
let mut hits = Vec::with_capacity(maybe_hits.size_hint().0);
+
let from = params.to.map(Bound::Included).unwrap_or(Bound::Unbounded);
+
let to = params.from.map(Bound::Included).unwrap_or(Bound::Unbounded);
-
for maybe_hit in maybe_hits {
-
let (timestamp, hit) = maybe_hit?;
-
hits.push(Hit {
-
timestamp,
-
deleted: hit.deleted,
-
});
-
}
+
db.get_hits(&params.nsid, HitsRange { from, to }, MAX_HITS)
+
.take(MAX_HITS)
+
.try_fold(Vec::with_capacity(MAX_HITS), |mut acc, hit| {
+
let hit = hit?;
+
let hit_data = hit.deser()?;
-
Ok(Json(hits))
+
acc.push(Hit {
+
timestamp: hit.timestamp,
+
deleted: hit_data.deleted,
+
});
+
Ok(acc)
+
})
+
.map(Json)
}
async fn stream_events(db: State<Arc<Db>>, ws: WebSocketUpgrade) -> Response {
···
(async move {
let mut listener = db.new_listener();
let mut data = Events {
-
events: HashMap::<SmolStr, NsidCount>::with_capacity(10),
+
events: AHashMap::<SmolStr, NsidCount>::with_capacity(10),
per_second: 0,
};
let mut updates = 0;
···
.instrument(span)
})
}
+
+
#[derive(Debug, Serialize)]
+
struct Since {
+
since: u64,
+
}
+
+
async fn since(db: State<Arc<Db>>) -> AppResult<Json<Since>> {
+
Ok(Json(Since {
+
since: db.tracking_since()?,
+
}))
+
}
+521
server/src/db/block.rs
···
+
use std::{
+
io::{self, Read, Write},
+
marker::PhantomData,
+
usize,
+
};
+
+
use rkyv::{
+
Archive, Deserialize, Serialize,
+
api::high::{HighSerializer, HighValidator},
+
bytecheck::CheckBytes,
+
de::Pool,
+
rancor::{self, Strategy},
+
ser::allocator::ArenaHandle,
+
util::AlignedVec,
+
};
+
+
use crate::{
+
error::{AppError, AppResult},
+
utils::{ReadVariableExt, WriteVariableExt},
+
};
+
+
pub struct Item<T> {
+
pub timestamp: u64,
+
pub data: AlignedVec,
+
phantom: PhantomData<T>,
+
}
+
+
impl<T> Item<T>
+
where
+
T: Archive,
+
T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>>
+
+ Deserialize<T, Strategy<Pool, rancor::Error>>,
+
{
+
pub fn deser(&self) -> AppResult<T> {
+
rkyv::from_bytes(&self.data).map_err(AppError::from)
+
}
+
}
+
+
impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> {
+
pub fn new(timestamp: u64, data: &T) -> Self {
+
Item {
+
timestamp,
+
data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() },
+
phantom: PhantomData,
+
}
+
}
+
}
+
+
pub struct ItemEncoder<W: Write, T> {
+
writer: W,
+
prev_timestamp: u64,
+
prev_delta: i64,
+
item_count: usize,
+
_item: PhantomData<T>,
+
}
+
+
impl<W: Write, T> ItemEncoder<W, T> {
+
pub fn new(writer: W, item_count: usize) -> Self {
+
assert!(item_count > 0);
+
ItemEncoder {
+
writer,
+
prev_timestamp: 0,
+
prev_delta: 0,
+
item_count,
+
_item: PhantomData,
+
}
+
}
+
+
/// NOTE: this is a best effort estimate of the encoded length of the block.
+
/// if T contains variable-length data, the encoded length may be larger than this estimate.
+
pub fn encoded_len(item_count: usize) -> usize {
+
// items length + item count * delta length + data length
+
size_of::<usize>() + item_count * size_of::<(i64, T)>()
+
}
+
+
pub fn encode(&mut self, item: &Item<T>) -> io::Result<()> {
+
if self.prev_timestamp == 0 {
+
self.writer.write_varint(self.item_count)?;
+
// self.writer.write_varint(item.timestamp)?;
+
self.prev_timestamp = item.timestamp;
+
self.write_data(&item.data)?;
+
return Ok(());
+
}
+
+
let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64;
+
+
self.writer.write_varint(delta - self.prev_delta)?;
+
self.prev_timestamp = item.timestamp;
+
self.prev_delta = delta;
+
+
self.write_data(&item.data)?;
+
+
Ok(())
+
}
+
+
fn write_data(&mut self, data: &[u8]) -> io::Result<()> {
+
self.writer.write_varint(data.len())?;
+
self.writer.write_all(data)?;
+
Ok(())
+
}
+
+
pub fn finish(mut self) -> io::Result<W> {
+
self.writer.flush()?;
+
Ok(self.writer)
+
}
+
}
+
+
pub struct ItemDecoder<R, T> {
+
reader: R,
+
current_timestamp: u64,
+
current_delta: i64,
+
items_read: usize,
+
expected: usize,
+
_item: PhantomData<T>,
+
}
+
+
impl<R: Read, T: Archive> ItemDecoder<R, T> {
+
pub fn new(mut reader: R, start_timestamp: u64) -> io::Result<Self> {
+
let expected = match reader.read_varint() {
+
Ok(expected) => expected,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => 0,
+
Err(e) => return Err(e.into()),
+
};
+
+
Ok(ItemDecoder {
+
reader,
+
current_timestamp: start_timestamp,
+
current_delta: 0,
+
items_read: 0,
+
expected,
+
_item: PhantomData,
+
})
+
}
+
+
pub fn item_count(&self) -> usize {
+
self.expected
+
}
+
+
pub fn decode(&mut self) -> io::Result<Option<Item<T>>> {
+
if self.items_read == 0 {
+
// read the first timestamp
+
// let timestamp = match self.reader.read_varint::<u64>() {
+
// Ok(timestamp) => timestamp,
+
// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
// Err(e) => return Err(e.into()),
+
// };
+
// self.current_timestamp = timestamp;
+
+
let Some(data_raw) = self.read_item()? else {
+
return Ok(None);
+
};
+
+
self.items_read += 1;
+
return Ok(Some(Item {
+
timestamp: self.current_timestamp,
+
data: data_raw,
+
phantom: PhantomData,
+
}));
+
}
+
+
if self.items_read >= self.expected {
+
return Ok(None);
+
}
+
+
let Some(_delta) = self.read_timestamp()? else {
+
return Ok(None);
+
};
+
+
// read data
+
let data_raw = match self.read_item()? {
+
Some(data_raw) => data_raw,
+
None => {
+
return Err(io::Error::new(
+
io::ErrorKind::UnexpectedEof,
+
"expected data after delta",
+
)
+
.into());
+
}
+
};
+
+
self.items_read += 1;
+
Ok(Some(Item {
+
timestamp: self.current_timestamp,
+
data: data_raw,
+
phantom: PhantomData,
+
}))
+
}
+
+
// [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1]
+
fn read_timestamp(&mut self) -> io::Result<Option<u64>> {
+
let delta = match self.reader.read_varint::<i64>() {
+
Ok(delta) => delta,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
Err(e) => return Err(e.into()),
+
};
+
self.current_delta += delta;
+
self.current_timestamp =
+
(self.current_timestamp as i128 + self.current_delta as i128) as u64;
+
Ok(Some(self.current_timestamp))
+
}
+
+
fn read_item(&mut self) -> io::Result<Option<AlignedVec>> {
+
let data_len = match self.reader.read_varint::<usize>() {
+
Ok(data_len) => data_len,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
Err(e) => return Err(e.into()),
+
};
+
let mut data_raw = AlignedVec::with_capacity(data_len);
+
for _ in 0..data_len {
+
data_raw.push(0);
+
}
+
self.reader.read_exact(data_raw.as_mut_slice())?;
+
Ok(Some(data_raw))
+
}
+
}
+
+
impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> {
+
type Item = io::Result<Item<T>>;
+
+
fn next(&mut self) -> Option<Self::Item> {
+
self.decode().transpose()
+
}
+
+
fn size_hint(&self) -> (usize, Option<usize>) {
+
(self.expected, Some(self.expected))
+
}
+
}
+
+
#[cfg(test)]
+
mod test {
+
use super::*;
+
use rkyv::{Archive, Deserialize, Serialize};
+
use std::io::Cursor;
+
+
#[derive(Archive, Deserialize, Serialize, Debug, PartialEq)]
+
#[rkyv(compare(PartialEq))]
+
struct TestData {
+
id: u32,
+
value: String,
+
}
+
+
#[test]
+
fn test_encoder_decoder_single_item() {
+
let data = TestData {
+
id: 123,
+
value: "test".to_string(),
+
};
+
+
let item = Item::new(1000, &data);
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, 1);
+
encoder.encode(&item).unwrap();
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_item = decoder.decode().unwrap().unwrap();
+
assert_eq!(decoded_item.timestamp, 1000);
+
+
let decoded_data = decoded_item.deser().unwrap();
+
assert_eq!(decoded_data.id, 123);
+
assert_eq!(decoded_data.value.as_str(), "test");
+
}
+
+
#[test]
+
fn test_encoder_decoder_multiple_items() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "first".to_string(),
+
},
+
),
+
Item::new(
+
1010,
+
&TestData {
+
id: 2,
+
value: "second".to_string(),
+
},
+
),
+
Item::new(
+
1015,
+
&TestData {
+
id: 3,
+
value: "third".to_string(),
+
},
+
),
+
Item::new(
+
1025,
+
&TestData {
+
id: 4,
+
value: "fourth".to_string(),
+
},
+
),
+
];
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, items.len());
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let mut decoded_items = Vec::new();
+
while let Some(item) = decoder.decode().unwrap() {
+
decoded_items.push(item);
+
}
+
+
assert_eq!(decoded_items.len(), 4);
+
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
+
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.deser().unwrap().id, decoded.deser().unwrap().id);
+
assert_eq!(
+
original.deser().unwrap().value.as_str(),
+
decoded.deser().unwrap().value.as_str()
+
);
+
}
+
}
+
+
#[test]
+
fn test_encoder_decoder_with_iterator() {
+
let items = vec![
+
Item::new(
+
2000,
+
&TestData {
+
id: 10,
+
value: "a".to_string(),
+
},
+
),
+
Item::new(
+
2005,
+
&TestData {
+
id: 20,
+
value: "b".to_string(),
+
},
+
),
+
Item::new(
+
2012,
+
&TestData {
+
id: 30,
+
value: "c".to_string(),
+
},
+
),
+
];
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, items.len());
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 3);
+
assert_eq!(decoded_items[0].timestamp, 2000);
+
assert_eq!(decoded_items[1].timestamp, 2005);
+
assert_eq!(decoded_items[2].timestamp, 2012);
+
+
assert_eq!(decoded_items[0].deser().unwrap().id, 10);
+
assert_eq!(decoded_items[1].deser().unwrap().id, 20);
+
assert_eq!(decoded_items[2].deser().unwrap().id, 30);
+
}
+
+
#[test]
+
fn test_delta_compression() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "a".to_string(),
+
},
+
),
+
Item::new(
+
1010,
+
&TestData {
+
id: 2,
+
value: "b".to_string(),
+
},
+
), // delta = 10
+
Item::new(
+
1020,
+
&TestData {
+
id: 3,
+
value: "c".to_string(),
+
},
+
), // delta = 10, delta-of-delta = 0
+
Item::new(
+
1025,
+
&TestData {
+
id: 4,
+
value: "d".to_string(),
+
},
+
), // delta = 5, delta-of-delta = -5
+
];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, items.len());
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode and verify
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
+
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.deser().unwrap().id, decoded.deser().unwrap().id);
+
}
+
}
+
+
#[test]
+
fn test_empty_decode() {
+
let buffer = Vec::new();
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let result = decoder.decode().unwrap();
+
assert!(result.is_none());
+
}
+
+
#[test]
+
fn test_backwards_timestamp() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "first".to_string(),
+
},
+
),
+
Item::new(
+
900,
+
&TestData {
+
id: 2,
+
value: "second".to_string(),
+
},
+
),
+
];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, items.len());
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 2);
+
assert_eq!(decoded_items[0].timestamp, 1000);
+
assert_eq!(decoded_items[1].timestamp, 900);
+
}
+
+
#[test]
+
fn test_different_data_sizes() {
+
let small_data = TestData {
+
id: 1,
+
value: "x".to_string(),
+
};
+
let large_data = TestData {
+
id: 2,
+
value: "a".repeat(1000),
+
};
+
+
let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, items.len());
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 2);
+
assert_eq!(decoded_items[0].deser().unwrap().value.as_str(), "x");
+
assert_eq!(decoded_items[1].deser().unwrap().value.len(), 1000);
+
assert_eq!(
+
decoded_items[1].deser().unwrap().value.as_str(),
+
"a".repeat(1000)
+
);
+
}
+
}
+260
server/src/db/handle.rs
···
+
use std::{
+
fmt::Debug,
+
io::Cursor,
+
ops::{Bound, RangeBounds},
+
sync::atomic::{AtomicU64, Ordering as AtomicOrdering},
+
time::Duration,
+
};
+
+
use byteview::ByteView;
+
use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice, Snapshot};
+
use itertools::Itertools;
+
use parking_lot::Mutex;
+
use rayon::iter::{IntoParallelIterator, ParallelIterator};
+
use rclite::Arc;
+
use smol_str::SmolStr;
+
+
use crate::{
+
db::{EventRecord, NsidHit, block},
+
error::{AppError, AppResult},
+
utils::{
+
ArcRefCnt, ArcliteSwap, CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt,
+
varints_unsigned_encoded,
+
},
+
};
+
+
pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
+
pub type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
+
pub type Item = block::Item<NsidHit>;
+
+
pub struct Block {
+
pub written: usize,
+
pub key: ByteView,
+
pub data: Vec<u8>,
+
}
+
+
pub struct LexiconHandle {
+
write_tree: Partition,
+
read_tree: ArcliteSwap<Snapshot>,
+
nsid: SmolStr,
+
buf: Arc<Mutex<Vec<EventRecord>>>,
+
last_insert: AtomicU64, // relaxed
+
eps: DefaultRateTracker,
+
}
+
+
impl Debug for LexiconHandle {
+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
f.debug_struct("LexiconHandle")
+
.field("nsid", self.nsid())
+
.finish()
+
}
+
}
+
+
impl LexiconHandle {
+
pub fn new(keyspace: &Keyspace, nsid: &str) -> Self {
+
let opts = PartitionCreateOptions::default()
+
.block_size(1024 * 48)
+
.compression(fjall::CompressionType::Miniz(9));
+
let write_tree = keyspace.open_partition(nsid, opts).unwrap();
+
let read_tree = ArcliteSwap::new(ArcRefCnt::new(write_tree.snapshot()));
+
Self {
+
write_tree,
+
read_tree,
+
nsid: nsid.into(),
+
buf: Default::default(),
+
last_insert: AtomicU64::new(0),
+
eps: RateTracker::new(Duration::from_secs(10)),
+
}
+
}
+
+
#[inline(always)]
+
pub fn read(&self) -> arc_swap::Guard<ArcRefCnt<Snapshot>> {
+
self.read_tree.load()
+
}
+
+
#[inline(always)]
+
pub fn update_tree(&self) {
+
self.read_tree
+
.store(ArcRefCnt::new(self.write_tree.snapshot()));
+
}
+
+
#[inline(always)]
+
pub fn span(&self) -> tracing::Span {
+
tracing::info_span!("handle", nsid = %self.nsid)
+
}
+
+
#[inline(always)]
+
pub fn nsid(&self) -> &SmolStr {
+
&self.nsid
+
}
+
+
#[inline(always)]
+
pub fn item_count(&self) -> usize {
+
self.buf.lock().len()
+
}
+
+
pub fn since_last_activity(&self) -> Duration {
+
Duration::from_nanos(
+
CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw()),
+
)
+
}
+
+
pub fn suggested_block_size(&self) -> usize {
+
self.eps.rate() as usize * 60
+
}
+
+
pub fn queue(&self, events: impl IntoIterator<Item = EventRecord>) {
+
let mut count = 0;
+
self.buf.lock().extend(events.into_iter().inspect(|_| {
+
count += 1;
+
}));
+
self.last_insert.store(CLOCK.raw(), AtomicOrdering::Relaxed);
+
self.eps.observe(count);
+
}
+
+
pub fn compact(
+
&self,
+
compact_to: usize,
+
range: impl RangeBounds<u64>,
+
sort: bool,
+
) -> AppResult<()> {
+
let _span = self.span().entered();
+
+
let start_limit = match range.start_bound().cloned() {
+
Bound::Included(start) => start,
+
Bound::Excluded(start) => start.saturating_add(1),
+
Bound::Unbounded => 0,
+
};
+
let end_limit = match range.end_bound().cloned() {
+
Bound::Included(end) => end,
+
Bound::Excluded(end) => end.saturating_sub(1),
+
Bound::Unbounded => u64::MAX,
+
};
+
+
let start_key = varints_unsigned_encoded([start_limit]);
+
let end_key = varints_unsigned_encoded([end_limit]);
+
+
let blocks_to_compact = self
+
.read()
+
.range(start_key..end_key)
+
.collect::<Result<Vec<_>, _>>()?;
+
if blocks_to_compact.len() < 2 {
+
return Ok(());
+
}
+
+
let start_blocks_size = blocks_to_compact.len();
+
let keys_to_delete = blocks_to_compact.iter().map(|(key, _)| key);
+
let mut all_items =
+
blocks_to_compact
+
.iter()
+
.try_fold(Vec::new(), |mut acc, (key, value)| {
+
let mut timestamps = Cursor::new(key);
+
let start_timestamp = timestamps.read_varint()?;
+
let decoder = block::ItemDecoder::new(Cursor::new(value), start_timestamp)?;
+
let mut items = decoder.collect::<Result<Vec<_>, _>>()?;
+
acc.append(&mut items);
+
AppResult::Ok(acc)
+
})?;
+
+
if sort {
+
all_items.sort_unstable_by_key(|e| e.timestamp);
+
}
+
+
let new_blocks = all_items
+
.into_iter()
+
.chunks(compact_to)
+
.into_iter()
+
.map(|chunk| chunk.collect_vec())
+
.collect_vec()
+
.into_par_iter()
+
.map(|chunk| {
+
let count = chunk.len();
+
Self::encode_block_from_items(chunk, count)
+
})
+
.collect::<Result<Vec<_>, _>>()?;
+
let end_blocks_size = new_blocks.len();
+
+
for key in keys_to_delete {
+
self.write_tree.remove(key.clone())?;
+
}
+
for block in new_blocks {
+
self.write_tree.insert(block.key, block.data)?;
+
}
+
+
let reduction =
+
((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0;
+
tracing::info!(
+
{
+
start = start_blocks_size,
+
end = end_blocks_size,
+
},
+
"blocks compacted {reduction:.2}%",
+
);
+
+
Ok(())
+
}
+
+
pub fn insert_block(&self, block: Block) -> AppResult<()> {
+
self.write_tree
+
.insert(block.key, block.data)
+
.map_err(AppError::from)
+
}
+
+
pub fn encode_block_from_items(
+
items: impl IntoIterator<Item = Item>,
+
count: usize,
+
) -> AppResult<Block> {
+
if count == 0 {
+
return Err(std::io::Error::new(
+
std::io::ErrorKind::InvalidInput,
+
"no items requested",
+
)
+
.into());
+
}
+
let mut writer =
+
ItemEncoder::new(Vec::with_capacity(ItemEncoder::encoded_len(count)), count);
+
let mut start_timestamp = None;
+
let mut end_timestamp = None;
+
let mut written = 0_usize;
+
for item in items.into_iter().take(count) {
+
writer.encode(&item)?;
+
if start_timestamp.is_none() {
+
start_timestamp = Some(item.timestamp);
+
}
+
end_timestamp = Some(item.timestamp);
+
written += 1;
+
}
+
if written != count {
+
return Err(std::io::Error::new(
+
std::io::ErrorKind::InvalidData,
+
"unexpected number of items, invalid data?",
+
)
+
.into());
+
}
+
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
+
let value = writer.finish()?;
+
let key = varints_unsigned_encoded([start_timestamp, end_timestamp]);
+
return Ok(Block {
+
written,
+
key,
+
data: value,
+
});
+
}
+
Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "no items are in queue").into())
+
}
+
+
pub fn take_block_items(&self, item_count: usize) -> Vec<Item> {
+
let mut buf = self.buf.lock();
+
let end = item_count.min(buf.len());
+
buf.drain(..end)
+
.map(|event| {
+
Item::new(
+
event.timestamp,
+
&NsidHit {
+
deleted: event.deleted,
+
},
+
)
+
})
+
.collect()
+
}
+
}
+504
server/src/db/mod.rs
···
+
use std::{
+
fmt::Debug,
+
io::Cursor,
+
ops::{Bound, Deref, RangeBounds},
+
path::Path,
+
time::Duration,
+
u64,
+
};
+
+
use ahash::{AHashMap, AHashSet};
+
use byteview::StrView;
+
use fjall::{Keyspace, Partition, PartitionCreateOptions};
+
use itertools::{Either, Itertools};
+
use rayon::iter::{IntoParallelIterator, ParallelIterator};
+
use rclite::Arc;
+
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
+
use smol_str::{SmolStr, ToSmolStr};
+
use tokio::sync::broadcast;
+
use tokio_util::sync::CancellationToken;
+
+
use crate::{
+
db::handle::{ItemDecoder, LexiconHandle},
+
error::{AppError, AppResult},
+
jetstream::JetstreamEvent,
+
utils::{CLOCK, RateTracker, ReadVariableExt, varints_unsigned_encoded},
+
};
+
+
mod block;
+
mod handle;
+
+
#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
+
#[rkyv(compare(PartialEq), derive(Debug))]
+
pub struct NsidCounts {
+
pub count: u128,
+
pub deleted_count: u128,
+
pub last_seen: u64,
+
}
+
+
#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
+
#[rkyv(compare(PartialEq), derive(Debug))]
+
pub struct NsidHit {
+
pub deleted: bool,
+
}
+
+
#[derive(Clone)]
+
pub struct EventRecord {
+
pub nsid: SmolStr,
+
pub timestamp: u64, // seconds
+
pub deleted: bool,
+
}
+
+
impl EventRecord {
+
pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
+
match event {
+
JetstreamEvent::Commit {
+
time_us, commit, ..
+
} => Some(Self {
+
nsid: commit.collection.into(),
+
timestamp: time_us / 1_000_000,
+
deleted: false,
+
}),
+
JetstreamEvent::Delete {
+
time_us, commit, ..
+
} => Some(Self {
+
nsid: commit.collection.into(),
+
timestamp: time_us / 1_000_000,
+
deleted: true,
+
}),
+
_ => None,
+
}
+
}
+
}
+
+
pub struct DbInfo {
+
pub nsids: AHashMap<SmolStr, Vec<usize>>,
+
pub disk_size: u64,
+
}
+
+
pub struct DbConfig {
+
pub ks_config: fjall::Config,
+
pub min_block_size: usize,
+
pub max_block_size: usize,
+
pub max_last_activity: Duration,
+
}
+
+
impl DbConfig {
+
pub fn path(mut self, path: impl AsRef<Path>) -> Self {
+
self.ks_config = fjall::Config::new(path);
+
self
+
}
+
+
pub fn ks(mut self, f: impl FnOnce(fjall::Config) -> fjall::Config) -> Self {
+
self.ks_config = f(self.ks_config);
+
self
+
}
+
}
+
+
impl Default for DbConfig {
+
fn default() -> Self {
+
Self {
+
ks_config: fjall::Config::default()
+
.cache_size(1024 * 1024 * 512)
+
.max_write_buffer_size(u64::MAX),
+
min_block_size: 1000,
+
max_block_size: 250_000,
+
max_last_activity: Duration::from_secs(10),
+
}
+
}
+
}
+
+
// counts is nsid -> NsidCounts
+
// hits is tree per nsid: varint start time + varint end time -> block of hits
+
pub struct Db {
+
pub cfg: DbConfig,
+
pub ks: Keyspace,
+
counts: Partition,
+
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>, ahash::RandomState>,
+
sync_pool: threadpool::ThreadPool,
+
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
+
eps: RateTracker<100>, // 100 millis buckets
+
cancel_token: CancellationToken,
+
}
+
+
impl Db {
+
pub fn new(cfg: DbConfig, cancel_token: CancellationToken) -> AppResult<Self> {
+
tracing::info!("opening db...");
+
let ks = cfg.ks_config.clone().open()?;
+
Ok(Self {
+
cfg,
+
hits: Default::default(),
+
sync_pool: threadpool::Builder::new()
+
.num_threads(rayon::current_num_threads() * 2)
+
.build(),
+
counts: ks.open_partition(
+
"_counts",
+
PartitionCreateOptions::default().compression(fjall::CompressionType::None),
+
)?,
+
ks,
+
event_broadcaster: broadcast::channel(1000).0,
+
eps: RateTracker::new(Duration::from_secs(1)),
+
cancel_token,
+
})
+
}
+
+
#[inline(always)]
+
pub fn shutting_down(&self) -> impl Future<Output = ()> {
+
self.cancel_token.cancelled()
+
}
+
+
#[inline(always)]
+
pub fn is_shutting_down(&self) -> bool {
+
self.cancel_token.is_cancelled()
+
}
+
+
#[inline(always)]
+
pub fn eps(&self) -> usize {
+
self.eps.rate() as usize
+
}
+
+
#[inline(always)]
+
pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
+
self.event_broadcaster.subscribe()
+
}
+
+
pub fn sync(&self, all: bool) -> AppResult<()> {
+
let start = CLOCK.now();
+
// prepare all the data
+
let nsids_len = self.hits.len();
+
let mut data = Vec::with_capacity(nsids_len);
+
let mut nsids = AHashSet::with_capacity(nsids_len);
+
let _guard = scc::ebr::Guard::new();
+
for (nsid, handle) in self.hits.iter(&_guard) {
+
let mut nsid_data = Vec::with_capacity(2);
+
// let mut total_count = 0;
+
let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity;
+
// if we disconnect for a long time, we want to sync all of what we
+
// have to avoid having many small blocks (even if we run compaction
+
// later, it reduces work until we run compaction)
+
let block_size = (is_too_old || all)
+
.then_some(self.cfg.max_block_size)
+
.unwrap_or_else(|| {
+
self.cfg
+
.max_block_size
+
.min(self.cfg.min_block_size.max(handle.suggested_block_size()))
+
});
+
let count = handle.item_count();
+
let data_count = count / block_size;
+
if count > 0 && (all || data_count > 0 || is_too_old) {
+
for _ in 0..data_count {
+
nsid_data.push((handle.clone(), block_size));
+
// total_count += block_size;
+
}
+
// only sync remainder if we haven't met block size
+
let remainder = count % block_size;
+
if (all || data_count == 0) && remainder > 0 {
+
nsid_data.push((handle.clone(), remainder));
+
// total_count += remainder;
+
}
+
}
+
let _span = handle.span().entered();
+
if nsid_data.len() > 0 {
+
// tracing::info!(
+
// {blocks = %nsid_data.len(), count = %total_count},
+
// "will encode & sync",
+
// );
+
nsids.insert(nsid.clone());
+
data.push(nsid_data);
+
}
+
}
+
drop(_guard);
+
+
// process the blocks
+
data.into_par_iter()
+
.map(|chunk| {
+
chunk
+
.into_iter()
+
.map(|(handle, max_block_size)| {
+
(handle.take_block_items(max_block_size), handle)
+
})
+
.collect::<Vec<_>>()
+
.into_par_iter()
+
.map(|(items, handle)| {
+
let count = items.len();
+
let block = LexiconHandle::encode_block_from_items(items, count)?;
+
AppResult::Ok((block, handle))
+
})
+
.collect::<Result<Vec<_>, _>>()
+
})
+
.try_for_each(|chunk| {
+
let chunk = chunk?;
+
for (block, handle) in chunk {
+
self.sync_pool.execute(move || {
+
let _span = handle.span().entered();
+
let written = block.written;
+
match handle.insert_block(block) {
+
Ok(_) => {
+
tracing::info!({count = %written}, "synced")
+
}
+
Err(err) => tracing::error!({ err = %err }, "failed to sync block"),
+
}
+
});
+
}
+
AppResult::Ok(())
+
})?;
+
self.sync_pool.join();
+
+
// update snapshots for all (changed) handles
+
for nsid in nsids {
+
self.hits.peek_with(&nsid, |_, handle| handle.update_tree());
+
}
+
+
tracing::info!(time = %start.elapsed().as_secs_f64(), "synced all blocks");
+
+
Ok(())
+
}
+
+
pub fn compact(
+
&self,
+
nsid: impl AsRef<str>,
+
max_count: usize,
+
range: impl RangeBounds<u64>,
+
sort: bool,
+
) -> AppResult<()> {
+
let Some(handle) = self.get_handle(nsid) else {
+
return Ok(());
+
};
+
handle.compact(max_count, range, sort)?;
+
handle.update_tree();
+
Ok(())
+
}
+
+
pub fn compact_all(
+
&self,
+
max_count: usize,
+
range: impl RangeBounds<u64> + Clone,
+
sort: bool,
+
) -> AppResult<()> {
+
for nsid in self.get_nsids() {
+
self.compact(nsid, max_count, range.clone(), sort)?;
+
}
+
Ok(())
+
}
+
+
pub fn major_compact(&self) -> AppResult<()> {
+
self.compact_all(self.cfg.max_block_size, .., true)?;
+
Ok(())
+
}
+
+
#[inline(always)]
+
fn get_handle(&self, nsid: impl AsRef<str>) -> Option<Arc<LexiconHandle>> {
+
let _guard = scc::ebr::Guard::new();
+
let handle = match self.hits.peek(nsid.as_ref(), &_guard) {
+
Some(handle) => handle.clone(),
+
None => {
+
if self.ks.partition_exists(nsid.as_ref()) {
+
let handle = Arc::new(LexiconHandle::new(&self.ks, nsid.as_ref()));
+
let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
+
handle
+
} else {
+
return None;
+
}
+
}
+
};
+
Some(handle)
+
}
+
+
#[inline(always)]
+
fn ensure_handle(&self, nsid: &SmolStr) -> impl Deref<Target = Arc<LexiconHandle>> + use<'_> {
+
self.hits
+
.entry(nsid.clone())
+
.or_insert_with(|| Arc::new(LexiconHandle::new(&self.ks, &nsid)))
+
}
+
+
pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> {
+
let mut seen_events = 0;
+
for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() {
+
let mut counts = self.get_count(&key)?;
+
self.ensure_handle(&key).queue(chunk.inspect(|e| {
+
// increment count
+
counts.last_seen = e.timestamp;
+
if e.deleted {
+
counts.deleted_count += 1;
+
} else {
+
counts.count += 1;
+
}
+
seen_events += 1;
+
}));
+
self.insert_count(&key, &counts)?;
+
if self.event_broadcaster.receiver_count() > 0 {
+
let _ = self.event_broadcaster.send((key, counts));
+
}
+
}
+
self.eps.observe(seen_events);
+
Ok(())
+
}
+
+
#[inline(always)]
+
fn insert_count(&self, nsid: &str, counts: &NsidCounts) -> AppResult<()> {
+
self.counts
+
.insert(
+
nsid,
+
unsafe { rkyv::to_bytes::<Error>(counts).unwrap_unchecked() }.as_slice(),
+
)
+
.map_err(AppError::from)
+
}
+
+
pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
+
let Some(raw) = self.counts.get(nsid)? else {
+
return Ok(NsidCounts::default());
+
};
+
Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
+
}
+
+
pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
+
self.counts.iter().map(|res| {
+
res.map_err(AppError::from).map(|(key, val)| {
+
(
+
SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
+
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
+
)
+
})
+
})
+
}
+
+
pub fn get_nsids(&self) -> impl Iterator<Item = StrView> {
+
self.ks
+
.list_partitions()
+
.into_iter()
+
.filter(|k| k.deref() != "_counts")
+
}
+
+
pub fn info(&self) -> AppResult<DbInfo> {
+
let mut nsids = AHashMap::new();
+
for nsid in self.get_nsids() {
+
let Some(handle) = self.get_handle(&nsid) else {
+
continue;
+
};
+
let block_lens = handle
+
.read()
+
.iter()
+
.rev()
+
.try_fold(Vec::new(), |mut acc, item| {
+
let (key, value) = item?;
+
let mut timestamps = Cursor::new(key);
+
let start_timestamp = timestamps.read_varint()?;
+
let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?;
+
acc.push(decoder.item_count());
+
AppResult::Ok(acc)
+
})?;
+
nsids.insert(nsid.to_smolstr(), block_lens);
+
}
+
Ok(DbInfo {
+
nsids,
+
disk_size: self.ks.disk_space(),
+
})
+
}
+
+
pub fn get_hits(
+
&self,
+
nsid: &str,
+
range: impl RangeBounds<u64> + std::fmt::Debug,
+
max_items: usize,
+
) -> impl Iterator<Item = AppResult<handle::Item>> {
+
let start_limit = match range.start_bound().cloned() {
+
Bound::Included(start) => start,
+
Bound::Excluded(start) => start.saturating_add(1),
+
Bound::Unbounded => 0,
+
};
+
let end_limit = match range.end_bound().cloned() {
+
Bound::Included(end) => end,
+
Bound::Excluded(end) => end.saturating_sub(1),
+
Bound::Unbounded => u64::MAX,
+
};
+
let end_key = varints_unsigned_encoded([end_limit]);
+
+
let Some(handle) = self.get_handle(nsid) else {
+
return Either::Right(std::iter::empty());
+
};
+
+
// let mut ts = CLOCK.now();
+
let map_block = move |(res, current_item_count)| -> AppResult<(Option<_>, usize)> {
+
if current_item_count >= max_items {
+
return Ok((None, current_item_count));
+
}
+
let (key, val) = res?;
+
let mut key_reader = Cursor::new(key);
+
let start_timestamp = key_reader.read_varint::<u64>()?;
+
// let end_timestamp = key_reader.read_varint::<u64>()?;
+
if start_timestamp < start_limit {
+
// tracing::info!(
+
// "stopped at block with timestamps {start_timestamp}..{end_timestamp} because {start_limit} is greater"
+
// );
+
return Ok((None, current_item_count));
+
}
+
let decoder = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?;
+
let current_item_count = current_item_count + decoder.item_count();
+
// tracing::info!(
+
// "took {}ns to get block with size {}",
+
// ts.elapsed().as_nanos(),
+
// decoder.item_count()
+
// );
+
// ts = CLOCK.now();
+
Ok((
+
Some(
+
decoder
+
.take_while(move |item| {
+
item.as_ref().map_or(true, |item| {
+
item.timestamp <= end_limit && item.timestamp >= start_limit
+
})
+
})
+
.map(|res| res.map_err(AppError::from)),
+
),
+
current_item_count,
+
))
+
};
+
+
let (blocks, _counted) = handle
+
.read()
+
.range(..end_key)
+
.map(|res| res.map_err(AppError::from))
+
.rev()
+
.fold_while(
+
(Vec::with_capacity(20), 0),
+
|(mut blocks, current_item_count), res| {
+
use itertools::FoldWhile::*;
+
+
match map_block((res, current_item_count)) {
+
Ok((Some(block), current_item_count)) => {
+
blocks.push(Ok(block));
+
Continue((blocks, current_item_count))
+
}
+
Ok((None, current_item_count)) => Done((blocks, current_item_count)),
+
Err(err) => {
+
blocks.push(Err(err));
+
Done((blocks, current_item_count))
+
}
+
}
+
},
+
)
+
.into_inner();
+
+
// tracing::info!(
+
// "got blocks with size {}, item count {counted}",
+
// blocks.len()
+
// );
+
+
Either::Left(blocks.into_iter().rev().flatten().flatten())
+
}
+
+
pub fn tracking_since(&self) -> AppResult<u64> {
+
// HACK: we should actually store when we started tracking but im lazy
+
// this should be accurate enough
+
let Some(handle) = self.get_handle("app.bsky.feed.like") else {
+
return Ok(0);
+
};
+
let Some((timestamps_raw, _)) = handle.read().first_key_value()? else {
+
return Ok(0);
+
};
+
let mut timestamp_reader = Cursor::new(timestamps_raw);
+
timestamp_reader
+
.read_varint::<u64>()
+
.map_err(AppError::from)
+
}
+
}
-227
server/src/db.rs
···
-
use std::{
-
ops::{Bound, Deref, RangeBounds},
-
path::Path,
-
time::Duration,
-
};
-
-
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions};
-
use pingora_limits::rate::Rate;
-
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
-
use smol_str::SmolStr;
-
use tokio::sync::broadcast;
-
-
use crate::{
-
error::{AppError, AppResult},
-
jetstream::JetstreamEvent,
-
};
-
-
#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
-
#[rkyv(compare(PartialEq), derive(Debug))]
-
pub struct NsidCounts {
-
pub count: u128,
-
pub deleted_count: u128,
-
pub last_seen: u64,
-
}
-
-
#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
-
#[rkyv(compare(PartialEq), derive(Debug))]
-
pub struct NsidHit {
-
pub deleted: bool,
-
}
-
-
pub struct EventRecord {
-
pub nsid: SmolStr,
-
pub timestamp: u64,
-
pub deleted: bool,
-
}
-
-
impl EventRecord {
-
pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
-
match event {
-
JetstreamEvent::Commit {
-
time_us, commit, ..
-
} => Some(Self {
-
nsid: commit.collection.into(),
-
timestamp: time_us,
-
deleted: false,
-
}),
-
JetstreamEvent::Delete {
-
time_us, commit, ..
-
} => Some(Self {
-
nsid: commit.collection.into(),
-
timestamp: time_us,
-
deleted: true,
-
}),
-
_ => None,
-
}
-
}
-
}
-
-
// counts is nsid -> NsidCounts
-
// hits is tree per nsid: timestamp -> NsidHit
-
pub struct Db {
-
inner: Keyspace,
-
hits: papaya::HashMap<SmolStr, Partition>,
-
counts: Partition,
-
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
-
eps: Rate,
-
}
-
-
impl Db {
-
pub fn new(path: impl AsRef<Path>) -> AppResult<Self> {
-
tracing::info!("opening db...");
-
let ks = Config::new(path)
-
.cache_size(8 * 1024 * 1024) // from talna
-
.open()?;
-
Ok(Self {
-
hits: Default::default(),
-
counts: ks.open_partition(
-
"_counts",
-
PartitionCreateOptions::default().compression(fjall::CompressionType::None),
-
)?,
-
inner: ks,
-
event_broadcaster: broadcast::channel(1000).0,
-
eps: Rate::new(Duration::from_secs(1)),
-
})
-
}
-
-
pub fn eps(&self) -> usize {
-
self.eps.rate(&()) as usize
-
}
-
-
pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
-
self.event_broadcaster.subscribe()
-
}
-
-
#[inline(always)]
-
fn run_in_nsid_tree<T>(
-
&self,
-
nsid: &str,
-
f: impl FnOnce(&Partition) -> AppResult<T>,
-
) -> AppResult<T> {
-
f(self.hits.pin().get_or_insert_with(SmolStr::new(nsid), || {
-
let opts = PartitionCreateOptions::default()
-
.compression(fjall::CompressionType::Miniz(9))
-
.compaction_strategy(fjall::compaction::Strategy::Fifo(fjall::compaction::Fifo {
-
limit: 5 * 1024 * 1024 * 1024, // 5 gb
-
ttl_seconds: Some(60 * 60 * 24 * 30), // 30 days
-
}));
-
self.inner.open_partition(nsid, opts).unwrap()
-
}))
-
}
-
-
pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
-
let EventRecord {
-
nsid,
-
timestamp,
-
deleted,
-
} = e;
-
-
self.insert_event(&nsid, timestamp, deleted)?;
-
// increment count
-
let mut counts = self.get_count(&nsid)?;
-
counts.last_seen = timestamp;
-
if deleted {
-
counts.deleted_count += 1;
-
} else {
-
counts.count += 1;
-
}
-
self.insert_count(&nsid, counts.clone())?;
-
if self.event_broadcaster.receiver_count() > 0 {
-
let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts));
-
}
-
self.eps.observe(&(), 1);
-
Ok(())
-
}
-
-
#[inline(always)]
-
fn insert_event(&self, nsid: &str, timestamp: u64, deleted: bool) -> AppResult<()> {
-
self.run_in_nsid_tree(nsid, |tree| {
-
tree.insert(
-
timestamp.to_be_bytes(),
-
unsafe { rkyv::to_bytes::<Error>(&NsidHit { deleted }).unwrap_unchecked() }
-
.as_slice(),
-
)
-
.map_err(AppError::from)
-
})
-
}
-
-
#[inline(always)]
-
fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> {
-
self.counts
-
.insert(
-
nsid,
-
unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(),
-
)
-
.map_err(AppError::from)
-
}
-
-
pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
-
let Some(raw) = self.counts.get(nsid)? else {
-
return Ok(NsidCounts::default());
-
};
-
Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
-
}
-
-
pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
-
self.counts.iter().map(|res| {
-
res.map_err(AppError::from).map(|(key, val)| {
-
(
-
SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
-
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
-
)
-
})
-
})
-
}
-
-
pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str>> {
-
self.inner
-
.list_partitions()
-
.into_iter()
-
.filter(|k| k.deref() != "_counts")
-
}
-
-
pub fn get_hits(
-
&self,
-
nsid: &str,
-
range: impl RangeBounds<u64>,
-
) -> AppResult<Box<dyn Iterator<Item = AppResult<(u64, NsidHit)>>>> {
-
let start = range.start_bound().cloned().map(u64::to_be_bytes);
-
let end = range.end_bound().cloned().map(u64::to_be_bytes);
-
-
let _guard = self.hits.guard();
-
let Some(tree) = self.hits.get(nsid, &_guard) else {
-
return Ok(Box::new(std::iter::empty()));
-
};
-
-
Ok(Box::new(tree.range(TimestampRange { start, end }).map(
-
|res| {
-
res.map_err(AppError::from).map(|(key, val)| {
-
(
-
u64::from_be_bytes(key.as_ref().try_into().unwrap()),
-
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
-
)
-
})
-
},
-
)))
-
}
-
}
-
-
type TimestampRepr = [u8; 8];
-
-
struct TimestampRange {
-
start: Bound<TimestampRepr>,
-
end: Bound<TimestampRepr>,
-
}
-
-
impl RangeBounds<TimestampRepr> for TimestampRange {
-
#[inline(always)]
-
fn start_bound(&self) -> Bound<&TimestampRepr> {
-
self.start.as_ref()
-
}
-
-
#[inline(always)]
-
fn end_bound(&self) -> Bound<&TimestampRepr> {
-
self.end.as_ref()
-
}
-
}
+21 -11
server/src/jetstream.rs
···
pub struct JetstreamClient {
stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
tls_connector: tokio_websockets::Connector,
-
url: SmolStr,
+
urls: Vec<SmolStr>,
}
impl JetstreamClient {
-
pub fn new(url: &str) -> AppResult<Self> {
+
pub fn new(urls: impl IntoIterator<Item = impl Into<SmolStr>>) -> AppResult<Self> {
Ok(Self {
stream: None,
tls_connector: tokio_websockets::Connector::new()?,
-
url: SmolStr::new(url),
+
urls: urls.into_iter().map(Into::into).collect(),
})
}
pub async fn connect(&mut self) -> AppResult<()> {
-
let (stream, _) = ClientBuilder::new()
-
.connector(&self.tls_connector)
-
.uri(&self.url)?
-
.connect()
-
.await?;
-
self.stream = Some(stream);
-
tracing::info!("connected to jetstream ({})", self.url);
-
Ok(())
+
for uri in &self.urls {
+
let conn_result = ClientBuilder::new()
+
.connector(&self.tls_connector)
+
.uri(uri)?
+
.connect()
+
.await;
+
match conn_result {
+
Ok((stream, _)) => {
+
self.stream = Some(stream);
+
tracing::info!("connected to jetstream {}", uri);
+
return Ok(());
+
}
+
Err(err) => {
+
tracing::error!("failed to connect to jetstream {uri}: {err}");
+
}
+
};
+
}
+
Err(anyhow!("failed to connect to any jetstream server").into())
}
// automatically retries connection, only returning error if it fails many times
+264 -47
server/src/main.rs
···
-
use std::{ops::Deref, sync::Arc};
+
use std::{ops::Deref, time::Duration, u64, usize};
+
use itertools::Itertools;
+
use rclite::Arc;
use smol_str::ToSmolStr;
-
#[cfg(not(target_env = "msvc"))]
-
use tikv_jemallocator::Jemalloc;
use tokio_util::sync::CancellationToken;
use tracing::Level;
use tracing_subscriber::EnvFilter;
use crate::{
api::serve,
-
db::{Db, EventRecord},
+
db::{Db, DbConfig, EventRecord},
error::AppError,
jetstream::JetstreamClient,
+
utils::{CLOCK, RelativeDateTime, get_time},
};
mod api;
mod db;
mod error;
mod jetstream;
+
mod utils;
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
-
static GLOBAL: Jemalloc = Jemalloc;
+
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[tokio::main]
async fn main() {
···
.compact()
.init();
-
if std::env::args()
-
.nth(1)
-
.map_or(false, |arg| arg == "migrate")
-
{
-
migrate_to_miniz();
-
return;
+
match std::env::args().nth(1).as_deref() {
+
Some("compact") => {
+
compact();
+
return;
+
}
+
Some("migrate") => {
+
migrate();
+
return;
+
}
+
Some("debug") => {
+
debug();
+
return;
+
}
+
Some("print") => {
+
print_all();
+
return;
+
}
+
Some(x) => {
+
tracing::error!("unknown command: {}", x);
+
return;
+
}
+
None => {}
}
-
let db = Arc::new(Db::new(".fjall_data").expect("couldnt create db"));
+
let cancel_token = CancellationToken::new();
+
+
let db = Arc::new(
+
Db::new(DbConfig::default(), cancel_token.child_token()).expect("couldnt create db"),
+
);
rustls::crypto::ring::default_provider()
.install_default()
.expect("cant install rustls crypto provider");
-
let mut jetstream =
-
match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") {
-
Ok(client) => client,
-
Err(err) => {
-
tracing::error!("can't create jetstream client: {err}");
-
return;
-
}
-
};
+
let urls = [
+
"wss://jetstream1.us-west.bsky.network/subscribe",
+
"wss://jetstream2.us-west.bsky.network/subscribe",
+
"wss://jetstream2.fr.hose.cam/subscribe",
+
"wss://jetstream.fire.hose.cam/subscribe",
+
];
+
let mut jetstream = match JetstreamClient::new(urls) {
+
Ok(client) => client,
+
Err(err) => {
+
tracing::error!("can't create jetstream client: {err}");
+
return;
+
}
+
};
-
let cancel_token = CancellationToken::new();
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000);
-
let consume_events = tokio::spawn({
let consume_cancel = cancel_token.child_token();
async move {
···
let Some(record) = EventRecord::from_jetstream(event) else {
continue;
};
-
let _ = event_tx.send(record).await;
+
event_tx.send(record).await?;
}
Err(err) => return Err(err),
},
···
let ingest_events = std::thread::spawn({
let db = db.clone();
move || {
-
tracing::info!("starting ingest events thread...");
-
while let Some(e) = event_rx.blocking_recv() {
-
if let Err(e) = db.record_event(e) {
-
tracing::error!("failed to record event: {}", e);
+
let mut buffer = Vec::new();
+
loop {
+
let read = event_rx.blocking_recv_many(&mut buffer, 500);
+
if let Err(err) = db.ingest_events(buffer.drain(..)) {
+
tracing::error!("failed to ingest events: {}", err);
+
}
+
if read == 0 || db.is_shutting_down() {
+
break;
+
}
+
}
+
}
+
});
+
+
let db_task = tokio::task::spawn({
+
let db = db.clone();
+
async move {
+
let sync_period = Duration::from_secs(10);
+
let mut sync_interval = tokio::time::interval(sync_period);
+
sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
+
+
let compact_period = std::time::Duration::from_secs(60 * 30); // 30 mins
+
let mut compact_interval = tokio::time::interval(compact_period);
+
compact_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
+
+
loop {
+
let sync_db = async || {
+
tokio::task::spawn_blocking({
+
let db = db.clone();
+
move || {
+
if db.is_shutting_down() {
+
return;
+
}
+
match db.sync(false) {
+
Ok(_) => (),
+
Err(e) => tracing::error!("failed to sync db: {}", e),
+
}
+
}
+
})
+
.await
+
.unwrap();
+
};
+
let compact_db = async || {
+
tokio::task::spawn_blocking({
+
let db = db.clone();
+
move || {
+
if db.is_shutting_down() {
+
return;
+
}
+
let end = get_time();
+
let start = end - compact_period;
+
let range = start.as_secs()..end.as_secs();
+
tracing::info!(
+
{
+
start = %RelativeDateTime::from_now(start),
+
end = %RelativeDateTime::from_now(end),
+
},
+
"running compaction...",
+
);
+
match db.compact_all(db.cfg.max_block_size, range, false) {
+
Ok(_) => (),
+
Err(e) => tracing::error!("failed to compact db: {}", e),
+
}
+
}
+
})
+
.await
+
.unwrap();
+
};
+
tokio::select! {
+
_ = sync_interval.tick() => sync_db().await,
+
_ = compact_interval.tick() => compact_db().await,
+
_ = db.shutting_down() => break,
}
}
}
});
tokio::select! {
-
res = serve(db, cancel_token.child_token()) => {
+
res = serve(db.clone(), cancel_token.child_token()) => {
if let Err(e) = res {
tracing::error!("serve failed: {}", e);
}
···
}
tracing::info!("shutting down...");
-
ingest_events
-
.join()
-
.expect("failed to join ingest events thread");
+
cancel_token.cancel();
+
ingest_events.join().expect("failed to join ingest events");
+
db_task.await.expect("cant join db task");
+
db.sync(true).expect("cant sync db");
}
-
fn migrate_to_miniz() {
-
let from = Db::new(".fjall_data").expect("couldnt create db");
-
let to = Db::new(".fjall_data_miniz").expect("couldnt create db");
+
fn print_all() {
+
let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
+
let nsids = db.get_nsids().collect::<Vec<_>>();
+
let mut count = 0_usize;
+
for nsid in nsids {
+
println!("{}:", nsid.deref());
+
for hit in db.get_hits(&nsid, .., usize::MAX) {
+
let hit = hit.expect("aaa");
+
println!("{} {}", hit.timestamp, hit.deser().unwrap().deleted);
+
count += 1;
+
}
+
}
+
println!("total hits: {}", count);
+
}
-
let mut total_count = 0_u64;
-
for nsid in from.get_nsids() {
-
tracing::info!("migrating {} ...", nsid.deref());
-
for hit in from.get_hits(&nsid, ..).expect("cant read hits") {
-
let (timestamp, data) = hit.expect("cant read event");
-
to.record_event(EventRecord {
-
nsid: nsid.to_smolstr(),
-
timestamp,
-
deleted: data.deleted,
-
})
-
.expect("cant record event");
-
total_count += 1;
+
fn debug() {
+
let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
+
let info = db.info().expect("cant get db info");
+
println!("disk size: {}", info.disk_size);
+
for (nsid, blocks) in info.nsids {
+
print!("{nsid}:");
+
let mut last_size = 0;
+
let mut same_size_count = 0;
+
for item_count in blocks {
+
if item_count == last_size {
+
same_size_count += 1;
+
} else {
+
if same_size_count > 1 {
+
print!("x{}", same_size_count);
+
}
+
print!(" {item_count}");
+
same_size_count = 0;
+
}
+
last_size = item_count;
}
+
print!("\n");
}
+
}
-
tracing::info!("migrated {total_count} events!");
+
fn compact() {
+
let db = Db::new(
+
DbConfig::default().ks(|c| {
+
c.max_journaling_size(u64::MAX)
+
.max_write_buffer_size(u64::MAX)
+
}),
+
CancellationToken::new(),
+
)
+
.expect("couldnt create db");
+
let info = db.info().expect("cant get db info");
+
db.major_compact().expect("cant compact");
+
std::thread::sleep(Duration::from_secs(5));
+
let compacted_info = db.info().expect("cant get db info");
+
println!(
+
"disk size: {} -> {}",
+
info.disk_size, compacted_info.disk_size
+
);
+
for (nsid, blocks) in info.nsids {
+
println!(
+
"{nsid}: {} -> {}",
+
blocks.len(),
+
compacted_info.nsids[&nsid].len()
+
)
+
}
+
}
+
+
fn migrate() {
+
let cancel_token = CancellationToken::new();
+
let from = Arc::new(
+
Db::new(
+
DbConfig::default().path(".fjall_data_from"),
+
cancel_token.child_token(),
+
)
+
.expect("couldnt create db"),
+
);
+
let to = Arc::new(
+
Db::new(
+
DbConfig::default().path(".fjall_data_to").ks(|c| {
+
c.max_journaling_size(u64::MAX)
+
.max_write_buffer_size(u64::MAX)
+
.compaction_workers(rayon::current_num_threads() * 4)
+
.flush_workers(rayon::current_num_threads() * 4)
+
}),
+
cancel_token.child_token(),
+
)
+
.expect("couldnt create db"),
+
);
+
+
let nsids = from.get_nsids().collect::<Vec<_>>();
+
let _eps_thread = std::thread::spawn({
+
let to = to.clone();
+
move || {
+
loop {
+
std::thread::sleep(Duration::from_secs(3));
+
let eps = to.eps();
+
if eps > 0 {
+
tracing::info!("{} rps", eps);
+
}
+
}
+
}
+
});
+
let mut threads = Vec::with_capacity(nsids.len());
+
let start = CLOCK.now();
+
for nsid in nsids {
+
let from = from.clone();
+
let to = to.clone();
+
threads.push(std::thread::spawn(move || {
+
tracing::info!("{}: migrating...", nsid.deref());
+
let mut count = 0_u64;
+
for hits in from
+
.get_hits(&nsid, .., usize::MAX)
+
.chunks(100000)
+
.into_iter()
+
{
+
to.ingest_events(hits.map(|hit| {
+
count += 1;
+
let hit = hit.expect("cant decode hit");
+
EventRecord {
+
nsid: nsid.to_smolstr(),
+
timestamp: hit.timestamp,
+
deleted: hit.deser().unwrap().deleted,
+
}
+
}))
+
.expect("cant record event");
+
}
+
tracing::info!("{}: ingested {} events...", nsid.deref(), count);
+
count
+
}));
+
}
+
let mut total_count = 0_u64;
+
for thread in threads {
+
let count = thread.join().expect("thread panicked");
+
total_count += count;
+
}
+
let read_time = start.elapsed();
+
let read_per_second = total_count as f64 / read_time.as_secs_f64();
+
drop(from);
+
tracing::info!("starting sync!!!");
+
to.sync(true).expect("cant sync");
+
tracing::info!("persisting...");
+
let total_time = start.elapsed();
+
let write_per_second = total_count as f64 / (total_time - read_time).as_secs_f64();
+
tracing::info!(
+
"migrated {total_count} events in {total_time:?} ({read_per_second:.2} rps, {write_per_second:.2} wps)"
+
);
}
+388
server/src/utils.rs
···
+
use std::io::{self, Read, Write};
+
use std::ops::Deref;
+
use std::sync::atomic::{AtomicU64, Ordering};
+
use std::time::Duration;
+
+
use arc_swap::RefCnt;
+
use byteview::ByteView;
+
use ordered_varint::Variable;
+
use rclite::Arc;
+
+
pub fn get_time() -> Duration {
+
std::time::SystemTime::now()
+
.duration_since(std::time::UNIX_EPOCH)
+
.unwrap()
+
}
+
+
pub trait WriteVariableExt: Write {
+
fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> {
+
value.encode_variable(self)
+
}
+
}
+
impl<W: Write> WriteVariableExt for W {}
+
+
pub trait ReadVariableExt: Read {
+
fn read_varint<T: Variable>(&mut self) -> io::Result<T> {
+
T::decode_variable(self)
+
}
+
}
+
impl<R: Read> ReadVariableExt for R {}
+
+
pub struct WritableByteView {
+
view: ByteView,
+
written: usize,
+
}
+
+
impl WritableByteView {
+
// returns None if the view already has a reference to it
+
pub fn with_size(capacity: usize) -> Self {
+
Self {
+
view: ByteView::with_size(capacity),
+
written: 0,
+
}
+
}
+
+
#[inline(always)]
+
pub fn into_inner(self) -> ByteView {
+
self.view
+
}
+
}
+
+
impl Write for WritableByteView {
+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+
let len = buf.len();
+
if len > self.view.len() - self.written {
+
return Err(std::io::Error::new(
+
std::io::ErrorKind::StorageFull,
+
"buffer full",
+
));
+
}
+
// SAFETY: this is safe because we have checked that the buffer is not full
+
// SAFETY: we own the mutator so no other references to the view exist
+
unsafe {
+
std::ptr::copy_nonoverlapping(
+
buf.as_ptr(),
+
self.view
+
.get_mut()
+
.unwrap_unchecked()
+
.as_mut_ptr()
+
.add(self.written),
+
len,
+
);
+
self.written += len;
+
}
+
Ok(len)
+
}
+
+
#[inline(always)]
+
fn flush(&mut self) -> std::io::Result<()> {
+
Ok(())
+
}
+
}
+
+
pub fn varints_unsigned_encoded<const N: usize>(values: [u64; N]) -> ByteView {
+
let mut buf =
+
WritableByteView::with_size(values.into_iter().map(varint_unsigned_encoded_len).sum());
+
for value in values {
+
// cant fail
+
let _ = buf.write_varint(value);
+
}
+
buf.into_inner()
+
}
+
+
// gets the encoded length of a varint-encoded unsigned integer
+
// see ordered_varint
+
pub fn varint_unsigned_encoded_len(value: u64) -> usize {
+
let value = value.to_be_bytes();
+
value
+
.iter()
+
.enumerate()
+
.find_map(|(index, &byte)| {
+
(byte > 0).then(|| {
+
let extra_bytes = 7 - index;
+
(byte < 16)
+
.then(|| extra_bytes + 1)
+
.unwrap_or_else(|| extra_bytes + 2)
+
})
+
})
+
.unwrap_or(0)
+
.max(1)
+
}
+
+
pub static CLOCK: std::sync::LazyLock<quanta::Clock> =
+
std::sync::LazyLock::new(|| quanta::Clock::new());
+
+
/// simple thread-safe rate tracker using time buckets
+
/// divides time into fixed buckets and rotates through them
+
#[derive(Debug)]
+
pub struct RateTracker<const BUCKET_WINDOW: u64> {
+
buckets: Vec<AtomicU64>,
+
last_bucket_time: AtomicU64,
+
bucket_duration_nanos: u64,
+
window_duration: Duration,
+
start_time: u64, // raw time when tracker was created
+
}
+
+
pub type DefaultRateTracker = RateTracker<1000>;
+
+
impl<const BUCKET_WINDOW: u64> RateTracker<BUCKET_WINDOW> {
+
/// create a new rate tracker with the specified time window
+
pub fn new(window_duration: Duration) -> Self {
+
let bucket_duration_nanos = Duration::from_millis(BUCKET_WINDOW).as_nanos() as u64;
+
let num_buckets =
+
(window_duration.as_nanos() as u64 / bucket_duration_nanos).max(1) as usize;
+
+
let mut buckets = Vec::with_capacity(num_buckets);
+
for _ in 0..num_buckets {
+
buckets.push(AtomicU64::new(0));
+
}
+
+
let start_time = CLOCK.raw();
+
Self {
+
buckets,
+
bucket_duration_nanos,
+
window_duration,
+
last_bucket_time: AtomicU64::new(0),
+
start_time,
+
}
+
}
+
+
#[inline(always)]
+
fn elapsed(&self) -> u64 {
+
CLOCK.delta_as_nanos(self.start_time, CLOCK.raw())
+
}
+
+
/// record an event
+
pub fn observe(&self, count: u64) {
+
self.maybe_advance_buckets();
+
+
let bucket_index = self.get_current_bucket_index();
+
self.buckets[bucket_index].fetch_add(count, Ordering::Relaxed);
+
}
+
+
/// get the current rate in events per second
+
pub fn rate(&self) -> f64 {
+
self.maybe_advance_buckets();
+
+
let total_events: u64 = self
+
.buckets
+
.iter()
+
.map(|bucket| bucket.load(Ordering::Relaxed))
+
.sum();
+
+
total_events as f64 / self.window_duration.as_secs_f64()
+
}
+
+
fn get_current_bucket_index(&self) -> usize {
+
let bucket_number = self.elapsed() / self.bucket_duration_nanos;
+
(bucket_number as usize) % self.buckets.len()
+
}
+
+
fn maybe_advance_buckets(&self) {
+
let current_bucket_time =
+
(self.elapsed() / self.bucket_duration_nanos) * self.bucket_duration_nanos;
+
let last_bucket_time = self.last_bucket_time.load(Ordering::Relaxed);
+
+
if current_bucket_time > last_bucket_time {
+
// try to update the last bucket time
+
if self
+
.last_bucket_time
+
.compare_exchange_weak(
+
last_bucket_time,
+
current_bucket_time,
+
Ordering::Relaxed,
+
Ordering::Relaxed,
+
)
+
.is_ok()
+
{
+
// clear buckets that are now too old
+
let buckets_to_advance = ((current_bucket_time - last_bucket_time)
+
/ self.bucket_duration_nanos)
+
.min(self.buckets.len() as u64);
+
+
for i in 0..buckets_to_advance {
+
let bucket_time = last_bucket_time + (i + 1) * self.bucket_duration_nanos;
+
let bucket_index =
+
(bucket_time / self.bucket_duration_nanos) as usize % self.buckets.len();
+
self.buckets[bucket_index].store(0, Ordering::Relaxed);
+
}
+
}
+
}
+
}
+
}
+
+
#[cfg(test)]
+
mod tests {
+
use super::*;
+
use std::sync::Arc;
+
use std::thread;
+
+
#[test]
+
fn test_rate_tracker_basic() {
+
let tracker = DefaultRateTracker::new(Duration::from_secs(2));
+
+
// record some events
+
tracker.observe(3);
+
+
let rate = tracker.rate();
+
assert_eq!(rate, 1.5); // 3 events over 2 seconds = 1.5 events/sec
+
}
+
+
#[test]
+
fn test_rate_tracker_burst() {
+
let tracker = DefaultRateTracker::new(Duration::from_secs(1));
+
+
// record a lot of events
+
tracker.observe(1000);
+
+
let rate = tracker.rate();
+
assert_eq!(rate, 1000.0); // 1000 events in 1 second
+
}
+
+
#[test]
+
fn test_rate_tracker_threading() {
+
let tracker = Arc::new(DefaultRateTracker::new(Duration::from_secs(1)));
+
let mut handles = vec![];
+
+
for _ in 0..4 {
+
let tracker_clone = Arc::clone(&tracker);
+
let handle = thread::spawn(move || {
+
tracker_clone.observe(10);
+
});
+
handles.push(handle);
+
}
+
+
for handle in handles {
+
handle.join().unwrap();
+
}
+
+
let rate = tracker.rate();
+
assert_eq!(rate, 40.0); // 40 events in 1 second
+
}
+
}
+
+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+
pub enum TimeDirection {
+
Backwards, // Past (default)
+
Forwards, // Future
+
}
+
+
impl Default for TimeDirection {
+
fn default() -> Self {
+
TimeDirection::Backwards
+
}
+
}
+
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct RelativeDateTime {
+
duration: Duration,
+
direction: TimeDirection,
+
}
+
+
impl RelativeDateTime {
+
pub fn new(duration: Duration, direction: TimeDirection) -> Self {
+
Self {
+
duration,
+
direction,
+
}
+
}
+
+
pub fn from_now(duration: Duration) -> Self {
+
let cur = get_time();
+
if duration > cur {
+
Self::new(duration - cur, TimeDirection::Forwards)
+
} else {
+
Self::new(cur - duration, TimeDirection::Backwards)
+
}
+
}
+
}
+
+
impl std::fmt::Display for RelativeDateTime {
+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
let secs = self.duration.as_secs();
+
+
if secs == 0 {
+
return write!(f, "now");
+
}
+
+
let (amount, unit) = match secs {
+
0 => unreachable!(), // handled above
+
1..=59 => (secs, "second"),
+
60..=3599 => (secs / 60, "minute"),
+
3600..=86399 => (secs / 3600, "hour"),
+
86400..=2591999 => (secs / 86400, "day"), // up to 29 days
+
2592000..=31535999 => (secs / 2592000, "month"), // 30 days to 364 days
+
_ => (secs / 31536000, "year"), // 365 days+
+
};
+
+
let plural = if amount != 1 { "s" } else { "" };
+
+
match self.direction {
+
TimeDirection::Forwards => write!(f, "in {} {}{}", amount, unit, plural),
+
TimeDirection::Backwards => write!(f, "{} {}{} ago", amount, unit, plural),
+
}
+
}
+
}
+
+
pub type ArcliteSwap<T> = arc_swap::ArcSwapAny<ArcRefCnt<T>>;
+
+
pub struct ArcRefCnt<T>(Arc<T>);
+
+
impl<T> ArcRefCnt<T> {
+
pub fn new(value: T) -> Self {
+
Self(Arc::new(value))
+
}
+
}
+
+
impl<T> Deref for ArcRefCnt<T> {
+
type Target = T;
+
+
fn deref(&self) -> &Self::Target {
+
&self.0
+
}
+
}
+
+
impl<T> Clone for ArcRefCnt<T> {
+
fn clone(&self) -> Self {
+
Self(self.0.clone())
+
}
+
}
+
+
// SAFETY: uhhhhhhhh copied the Arc impl from arc_swap xd
+
unsafe impl<T> RefCnt for ArcRefCnt<T> {
+
type Base = T;
+
+
fn into_ptr(me: Self) -> *mut Self::Base {
+
Arc::into_raw(me.0) as *mut T
+
}
+
+
fn as_ptr(me: &Self) -> *mut Self::Base {
+
// Slightly convoluted way to do this, but this avoids stacked borrows violations. The same
+
// intention as
+
//
+
// me as &T as *const T as *mut T
+
//
+
// We first create a "shallow copy" of me - one that doesn't really own its ref count
+
// (that's OK, me _does_ own it, so it can't be destroyed in the meantime).
+
// Then we can use into_raw (which preserves not having the ref count).
+
//
+
// We need to "revert" the changes we did. In current std implementation, the combination
+
// of from_raw and forget is no-op. But formally, into_raw shall be paired with from_raw
+
// and that read shall be paired with forget to properly "close the brackets". In future
+
// versions of STD, these may become something else that's not really no-op (unlikely, but
+
// possible), so we future-proof it a bit.
+
+
// SAFETY: &T cast to *const T will always be aligned, initialised and valid for reads
+
let ptr = Arc::into_raw(unsafe { std::ptr::read(&me.0) });
+
let ptr = ptr as *mut T;
+
+
// SAFETY: We got the pointer from into_raw just above
+
std::mem::forget(unsafe { Arc::from_raw(ptr) });
+
+
ptr
+
}
+
+
unsafe fn from_ptr(ptr: *const Self::Base) -> Self {
+
Self(unsafe { Arc::from_raw(ptr) })
+
}
+
}