tracks lexicons and how many times they appeared on the jetstream

Compare changes

Choose any two refs to compare.

+2
.gitignore
···
result
server/bsky_zstd_dictionary
+
fjall_data*
+
.fjall_data*
+12 -2
README.md
···
-
a webapp and server that monitors bluesky's jetsream and counts how many times different types of records are created or deleted. it shows you which collections (like posts, likes, follows, etc.) are most active on the network.
+
a webapp and server that monitors the jetstream and tracks the different
+
lexicons as they are created or deleted. it shows you which collections are most
+
active on the network.
for backend it uses rust with fjall as db, the frontend is built with sveltekit.
see [here](https://gaze.systems/nsid-tracker) for a hosted instance of it.
+
## performance / storage
+
+
it uses about 50MB of space for 620M recorded events (events being just
+
timestamp in seconds and deleted boolean for now). and around 50-60ms for
+
querying 300-400k events.
+
+
this is on a machine with AMD EPYC 7281 (32) @ 2.100GHz.
+
## running
### with nix
-
- run the server: `nix run git+https://tangled.sh/@poor.dog/nsid-tracker#server`
+
- build the server: `nix build git+https://tangled.sh/@poor.dog/nsid-tracker#server`
- build the client: `nix build git+https://tangled.sh/@poor.dog/nsid-tracker#client`
### manually
+9
client/bun.lock
···
"name": "nsid-tracker",
"dependencies": {
"@number-flow/svelte": "^0.3.9",
+
"svelte-adapter-bun": "^0.5.2",
},
"devDependencies": {
"@eslint/compat": "^1.2.5",
···
"globals": ["globals@16.3.0", "", {}, "sha512-bqWEnJ1Nt3neqx2q5SFfGS8r/ahumIakg3HcwtNlrVlwXIeNumWn/c7Pn/wKzGhf6SaW6H6uWXLqC30STCMchQ=="],
+
"globalyzer": ["globalyzer@0.1.0", "", {}, "sha512-40oNTM9UfG6aBmuKxk/giHn5nQ8RVz/SS4Ir6zgzOv9/qC3kKZ9v4etGTcJbEl/NyVQH7FGU7d+X1egr57Md2Q=="],
+
+
"globrex": ["globrex@0.1.2", "", {}, "sha512-uHJgbwAMwNFf5mLst7IWLNg14x1CkeqglJb/K3doi4dw6q2IvAAmM/Y81kevy83wP+Sst+nutFTYOGg3d1lsxg=="],
+
"graceful-fs": ["graceful-fs@4.2.11", "", {}, "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ=="],
"graphemer": ["graphemer@1.4.0", "", {}, "sha512-EtKwoO6kxCL9WO5xipiHTZlSzBm7WLT627TqC/uVRd0HKmq8NXyebnNYxDoBi7wt8eTWrUrKXCOVaFq9x1kgag=="],
···
"svelte": ["svelte@5.36.8", "", { "dependencies": { "@ampproject/remapping": "^2.3.0", "@jridgewell/sourcemap-codec": "^1.5.0", "@sveltejs/acorn-typescript": "^1.0.5", "@types/estree": "^1.0.5", "acorn": "^8.12.1", "aria-query": "^5.3.1", "axobject-query": "^4.1.0", "clsx": "^2.1.1", "esm-env": "^1.2.1", "esrap": "^2.1.0", "is-reference": "^3.0.3", "locate-character": "^3.0.0", "magic-string": "^0.30.11", "zimmerframe": "^1.1.2" } }, "sha512-8JbZWQu96hMjH/oYQPxXW6taeC6Awl6muGHeZzJTxQx7NGRQ/J9wN1hkzRKLOlSDlbS2igiFg7p5xyTp5uXG3A=="],
+
"svelte-adapter-bun": ["svelte-adapter-bun@0.5.2", "", { "dependencies": { "tiny-glob": "^0.2.9" } }, "sha512-xEtFgaal6UgrCwwkSIcapO9kopoFNUYCYqyKCikdqxX9bz2TDYnrWQZ7qBnkunMxi1HOIERUCvTcebYGiarZLA=="],
+
"svelte-check": ["svelte-check@4.3.0", "", { "dependencies": { "@jridgewell/trace-mapping": "^0.3.25", "chokidar": "^4.0.1", "fdir": "^6.2.0", "picocolors": "^1.0.0", "sade": "^1.7.4" }, "peerDependencies": { "svelte": "^4.0.0 || ^5.0.0-next.0", "typescript": ">=5.0.0" }, "bin": { "svelte-check": "bin/svelte-check" } }, "sha512-Iz8dFXzBNAM7XlEIsUjUGQhbEE+Pvv9odb9+0+ITTgFWZBGeJRRYqHUUglwe2EkLD5LIsQaAc4IUJyvtKuOO5w=="],
"svelte-eslint-parser": ["svelte-eslint-parser@1.3.0", "", { "dependencies": { "eslint-scope": "^8.2.0", "eslint-visitor-keys": "^4.0.0", "espree": "^10.0.0", "postcss": "^8.4.49", "postcss-scss": "^4.0.9", "postcss-selector-parser": "^7.0.0" }, "peerDependencies": { "svelte": "^3.37.0 || ^4.0.0 || ^5.0.0" }, "optionalPeers": ["svelte"] }, "sha512-VCgMHKV7UtOGcGLGNFSbmdm6kEKjtzo5nnpGU/mnx4OsFY6bZ7QwRF5DUx+Hokw5Lvdyo8dpk8B1m8mliomrNg=="],
···
"tapable": ["tapable@2.2.2", "", {}, "sha512-Re10+NauLTMCudc7T5WLFLAwDhQ0JWdrMK+9B2M8zR5hRExKmsRDCBA7/aV/pNJFltmBFO5BAMlQFi/vq3nKOg=="],
"tar": ["tar@7.4.3", "", { "dependencies": { "@isaacs/fs-minipass": "^4.0.0", "chownr": "^3.0.0", "minipass": "^7.1.2", "minizlib": "^3.0.1", "mkdirp": "^3.0.1", "yallist": "^5.0.0" } }, "sha512-5S7Va8hKfV7W5U6g3aYxXmlPoZVAwUMy9AOKyF2fVuZa2UD3qZjg578OrLRt8PcNN1PleVaL/5/yYATNL0ICUw=="],
+
+
"tiny-glob": ["tiny-glob@0.2.9", "", { "dependencies": { "globalyzer": "0.1.0", "globrex": "^0.1.2" } }, "sha512-g/55ssRPUjShh+xkfx9UPDXqhckHEsHr4Vd9zX55oSdGZc/MD0m3sferOkwWtp98bv+kcVfEHtRJgBVJzelrzg=="],
"tinyglobby": ["tinyglobby@0.2.14", "", { "dependencies": { "fdir": "^6.4.4", "picomatch": "^4.0.2" } }, "sha512-tX5e7OM1HnYr2+a2C/4V0htOcSQcoSTH9KgJnVvNm5zm/cyEWKJ7j7YutsH9CxMdtOkkLFy2AHrMci9IM8IPZQ=="],
+2 -1
client/package.json
···
},
"type": "module",
"dependencies": {
-
"@number-flow/svelte": "^0.3.9"
+
"@number-flow/svelte": "^0.3.9",
+
"svelte-adapter-bun": "^0.5.2"
}
}
+4
client/src/app.css
···
overflow-y: overlay;
overflow-y: auto; /* Fallback for browsers that don't support overlay */
}
+
+
.wsbadge {
+
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
+
}
+9 -9
client/src/app.html
···
<!doctype html>
<html lang="en">
-
<head>
-
<meta charset="utf-8" />
-
<link rel="icon" href="%sveltekit.assets%/favicon.svg" />
-
<meta name="viewport" content="width=device-width, initial-scale=1" />
-
%sveltekit.head%
-
</head>
-
<body data-sveltekit-preload-data="hover">
-
<div style="display: contents">%sveltekit.body%</div>
-
</body>
+
<head>
+
<meta charset="utf-8" />
+
<link rel="icon" href="%sveltekit.assets%/favicon.svg" />
+
<meta name="viewport" content="width=device-width, initial-scale=1" />
+
%sveltekit.head%
+
</head>
+
<body class="bg-white dark:bg-gray-900" data-sveltekit-preload-data="hover">
+
<div style="display: contents">%sveltekit.body%</div>
+
</body>
</html>
+13 -1
client/src/lib/api.ts
···
import { dev } from "$app/environment";
-
import type { Events } from "./types";
+
import type { Events, Since } from "./types";
import { PUBLIC_API_URL } from "$env/static/public";
export const fetchEvents = async (): Promise<Events> => {
···
const data = await response.json();
return data;
};
+
+
export const fetchTrackingSince = async (): Promise<Since> => {
+
const response = await fetch(
+
`${dev ? "http" : "https"}://${PUBLIC_API_URL}/since`,
+
);
+
if (!response.ok) {
+
throw new Error(`(${response.status}): ${await response.json()}`);
+
}
+
+
const data = await response.json();
+
return data;
+
};
+2 -9
client/src/lib/components/BskyToggle.svelte
···
<!-- svelte-ignore a11y_no_static_element_interactions -->
<button
onclick={onBskyToggle}
-
class="wsbadge !mt-0 !font-normal bg-yellow-100 hover:bg-yellow-200 border-yellow-300"
+
class="wsbadge !mt-0 !font-normal bg-blue-100 dark:bg-blue-900 hover:bg-blue-200 dark:hover:bg-blue-800 border-blue-300 dark:border-blue-700"
>
<input checked={dontShowBsky} type="checkbox" />
-
<span class="ml-0.5"> hide app.bsky.* </span>
+
<span class="ml-0.5 text-black dark:text-gray-200"> hide app.bsky.* </span>
</button>
-
-
<style lang="postcss">
-
@reference "../../app.css";
-
.wsbadge {
-
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
-
}
-
</style>
+8 -5
client/src/lib/components/EventCard.svelte
···
</script>
<div
-
class="group flex flex-col gap-2 p-1.5 md:p-3 min-h-64 bg-white border border-gray-200 rounded-lg hover:shadow-lg md:hover:-translate-y-1 transition-all duration-200 transform"
+
class="group flex flex-col gap-2 p-1.5 md:p-3 min-h-64 bg-white dark:bg-gray-800/50 border border-gray-200 dark:border-gray-950 rounded-lg hover:shadow-lg md:hover:-translate-y-1 transition-all duration-200 transform"
class:has-activity={isAnimating}
style="--border-thickness: {borderThickness}px"
>
<div class="flex items-start gap-2">
<div
-
class="text-sm font-bold text-blue-600 bg-blue-100 px-3 py-1 rounded-full"
+
class="text-sm font-bold text-blue-600 bg-blue-100 dark:bg-indigo-950 px-3 py-1 rounded-full"
>
#{index + 1}
</div>
<div
title={event.nsid}
-
class="font-mono text-sm text-gray-700 mt-0.5 leading-relaxed rounded-full text-nowrap text-ellipsis overflow-hidden group-hover:overflow-visible group-hover:bg-gray-50 border-gray-100 group-hover:border transition-all px-1"
+
class="font-mono text-sm text-gray-700 dark:text-gray-300 mt-0.5 leading-relaxed rounded-full text-nowrap text-ellipsis overflow-hidden group-hover:overflow-visible group-hover:bg-gray-50 dark:group-hover:bg-gray-700 border-gray-100 dark:border-gray-900 group-hover:border transition-all px-1"
>
{event.nsid}
</div>
···
</div>
</div>
-
<style>
+
<style lang="postcss">
.has-activity {
position: relative;
transition: all 0.2s ease-out;
}
.has-activity::before {
+
@reference "../../app.css";
+
@apply border-blue-500 dark:border-blue-800;
content: "";
position: absolute;
top: calc(-1 * var(--border-thickness));
left: calc(-1 * var(--border-thickness));
right: calc(-1 * var(--border-thickness));
bottom: calc(-1 * var(--border-thickness));
-
border: var(--border-thickness) solid rgba(59, 130, 246, 0.8);
+
border-width: var(--border-thickness);
+
border-style: solid;
border-radius: calc(0.5rem + var(--border-thickness));
pointer-events: none;
transition: all 0.3s ease-out;
+5 -10
client/src/lib/components/FilterControls.svelte
···
</script>
<div
-
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-blue-100 hover:bg-blue-200 border-blue-300"
+
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-blue-100 dark:bg-blue-900 hover:bg-blue-200 dark:hover:bg-blue-800 border-blue-300 dark:border-blue-700"
>
-
<label for="filter-regex" class="text-blue-800 mr-1"> filter: </label>
+
<label for="filter-regex" class="text-blue-800 dark:text-gray-200 mr-1">
+
filter:
+
</label>
<input
id="filter-regex"
value={filterRegex}
oninput={(e) => onFilterChange((e.target as HTMLInputElement).value)}
type="text"
placeholder="regex..."
-
class="bg-blue-50 text-blue-900 placeholder-blue-400 border border-blue-200 rounded-full px-1 outline-none focus:bg-white focus:border-blue-400 min-w-0 w-24"
+
class="bg-blue-50 dark:bg-blue-950 text-blue-900 dark:text-gray-400 placeholder-blue-400 dark:placeholder-blue-700 border border-blue-200 dark:border-blue-700 rounded-full px-1 outline-none focus:border-blue-400 min-w-0 w-24"
/>
</div>
-
-
<style lang="postcss">
-
@reference "../../app.css";
-
.wsbadge {
-
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
-
}
-
</style>
+6 -11
client/src/lib/components/RefreshControl.svelte
···
</script>
<div
-
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-green-100 hover:bg-green-200 border-green-300"
+
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-lime-100 dark:bg-lime-900 dark:hover:bg-lime-800 hover:bg-lime-200 border-lime-300 dark:border-lime-700"
>
-
<label for="refresh-rate" class="text-green-800 mr-1">refresh:</label>
+
<label for="refresh-rate" class="text-lime-800 dark:text-lime-200 mr-1"
+
>refresh:</label
+
>
<input
id="refresh-rate"
value={refreshRate}
···
pattern="[0-9]*"
min="0"
placeholder="real-time"
-
class="bg-green-50 text-green-900 placeholder-green-400 border border-green-200 rounded-full px-1 outline-none focus:bg-white focus:border-green-400 min-w-0 w-20"
+
class="bg-green-50 dark:bg-green-900 text-lime-900 dark:text-lime-200 placeholder-lime-600 dark:placeholder-lime-400 border border-lime-200 dark:border-lime-700 rounded-full px-1 outline-none focus:border-lime-400 min-w-0 w-20"
/>
-
<span class="text-green-700">s</span>
+
<span class="text-lime-800 dark:text-lime-200">s</span>
</div>
-
-
<style lang="postcss">
-
@reference "../../app.css";
-
.wsbadge {
-
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
-
}
-
</style>
+31
client/src/lib/components/ShowControls.svelte
···
+
<script lang="ts">
+
import type { ShowOption } from "$lib/types";
+
+
interface Props {
+
show: ShowOption;
+
onShowChange: (value: ShowOption) => void;
+
}
+
+
let { show, onShowChange }: Props = $props();
+
+
const showOptions: ShowOption[] = ["server init", "stream start"];
+
</script>
+
+
<div
+
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-pink-100 dark:bg-pink-800 hover:bg-pink-200 dark:hover:bg-pink-700 border-pink-300 dark:border-pink-700"
+
>
+
<label for="show" class="text-pink-800 dark:text-pink-100 mr-1">
+
show since:
+
</label>
+
<select
+
id="show"
+
value={show}
+
onchange={(e) =>
+
onShowChange((e.target as HTMLSelectElement).value as ShowOption)}
+
class="bg-pink-50 dark:bg-pink-900 text-pink-900 dark:text-pink-100 border border-pink-200 dark:border-pink-700 rounded-full px-1 outline-none focus:border-pink-400 min-w-0"
+
>
+
{#each showOptions as option}
+
<option value={option}>{option}</option>
+
{/each}
+
</select>
+
</div>
+5 -10
client/src/lib/components/SortControls.svelte
···
</script>
<div
-
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-purple-100 hover:bg-purple-200 border-purple-300"
+
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-purple-100 dark:bg-purple-800 hover:bg-purple-200 dark:hover:bg-purple-700 border-purple-300 dark:border-purple-700"
>
-
<label for="sort-by" class="text-purple-800 mr-1"> sort by: </label>
+
<label for="sort-by" class="text-purple-800 dark:text-purple-300 mr-1">
+
sort by:
+
</label>
<select
id="sort-by"
value={sortBy}
onchange={(e) =>
onSortChange((e.target as HTMLSelectElement).value as SortOption)}
-
class="bg-purple-50 text-purple-900 border border-purple-200 rounded-full px-1 outline-none focus:bg-white focus:border-purple-400 min-w-0"
+
class="bg-purple-50 dark:bg-purple-900 text-purple-900 dark:text-purple-300 border border-purple-200 dark:border-purple-700 rounded-full px-1 outline-none focus:border-purple-400 min-w-0"
>
{#each sortOptions as option}
<option value={option.value}>{option.label}</option>
{/each}
</select>
</div>
-
-
<style lang="postcss">
-
@reference "../../app.css";
-
.wsbadge {
-
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
-
}
-
</style>
+19 -22
client/src/lib/components/StatsCard.svelte
···
<script lang="ts">
import { formatNumber } from "$lib/format";
-
import NumberFlow from "@number-flow/svelte";
const colorClasses = {
green: {
-
bg: "from-green-50 to-green-100",
-
border: "border-green-200",
-
titleText: "text-green-700",
-
valueText: "text-green-900",
+
bg: "from-green-50 to-green-100 dark:from-green-900 dark:to-green-800",
+
border: "border-green-200 dark:border-green-800",
+
titleText: "text-green-700 dark:text-green-400",
+
valueText: "text-green-900 dark:text-green-200",
},
red: {
-
bg: "from-red-50 to-red-100",
-
border: "border-red-200",
-
titleText: "text-red-700",
-
valueText: "text-red-900",
+
bg: "from-red-50 to-red-100 dark:from-red-900 dark:to-red-800",
+
border: "border-red-200 dark:border-red-800",
+
titleText: "text-red-700 dark:text-red-400",
+
valueText: "text-red-900 dark:text-red-200",
},
turqoise: {
-
bg: "from-teal-50 to-teal-100",
-
border: "border-teal-200",
-
titleText: "text-teal-700",
-
valueText: "text-teal-900",
+
bg: "from-teal-50 to-teal-100 dark:from-teal-900 dark:to-teal-800",
+
border: "border-teal-200 dark:border-teal-800",
+
titleText: "text-teal-700 dark:text-teal-400",
+
valueText: "text-teal-900 dark:text-teal-200",
},
orange: {
-
bg: "from-orange-50 to-orange-100",
-
border: "border-orange-200",
-
titleText: "text-orange-700",
-
valueText: "text-orange-900",
+
bg: "from-orange-50 to-orange-100 dark:from-orange-900 dark:to-orange-800",
+
border: "border-orange-200 dark:border-orange-800",
+
titleText: "text-orange-700 dark:text-orange-400",
+
valueText: "text-orange-900 dark:text-orange-200",
},
};
···
const colors = $derived(colorClasses[colorScheme]);
</script>
-
<div
-
class="bg-gradient-to-r {colors.bg} p-3 md:p-6 rounded-lg border {colors.border}"
-
>
+
<div class="bg-gradient-to-r {colors.bg} p-3 rounded-lg border {colors.border}">
<h3 class="text-base font-medium {colors.titleText} mb-2">
{title}
</h3>
-
<p class="text-xl md:text-3xl font-bold {colors.valueText}">
-
<NumberFlow {value} />
+
<p class="text-xl md:text-2xl font-bold {colors.valueText}">
+
{formatNumber(value)}
</p>
</div>
+18 -14
client/src/lib/components/StatusBadge.svelte
···
const statusConfig = {
connected: {
text: "stream live",
-
classes: "bg-green-100 text-green-800 border-green-200",
+
classes:
+
"bg-green-100 dark:bg-green-900 text-green-800 dark:text-green-200 border-green-200 dark:border-green-800",
},
connecting: {
text: "stream connecting",
-
classes: "bg-yellow-100 text-yellow-800 border-yellow-200",
+
classes:
+
"bg-yellow-100 dark:bg-yellow-900 text-yellow-800 dark:text-yellow-200 border-yellow-200 dark:border-yellow-800",
},
error: {
text: "stream errored",
-
classes: "bg-red-100 text-red-800 border-red-200",
+
classes:
+
"bg-red-100 dark:bg-red-900 text-red-800 dark:text-red-200 border-red-200 dark:border-red-800",
},
disconnected: {
text: "stream offline",
-
classes: "bg-gray-100 text-gray-800 border-gray-200",
+
classes:
+
"bg-gray-100 dark:bg-gray-900 text-gray-800 dark:text-gray-200 border-gray-200 dark:border-gray-800",
},
};
const config = $derived(statusConfig[status]);
</script>
-
<span class="wsbadge {config.classes}">
-
{config.text}
-
</span>
-
-
<style lang="postcss">
-
@reference "../../app.css";
-
.wsbadge {
-
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
-
}
-
</style>
+
<div class="flex flex-row items-center gap-2 wsbadge {config.classes}">
+
<!-- connecting spinner -->
+
{#if status === "connecting"}
+
<div
+
class="animate-spin rounded-full h-4 w-4 border-b-2 border-yellow-800 dark:border-yellow-200"
+
></div>
+
{/if}
+
<!-- status text -->
+
<span>{config.text}</span>
+
</div>
+5 -1
client/src/lib/format.ts
···
return num.toLocaleString();
};
+
const isValidDate = (d: Date) => d instanceof Date && !isNaN(d.getTime());
export const formatTimestamp = (timestamp: number): string => {
-
return new Date(timestamp / 1000).toLocaleString();
+
const date = new Date(timestamp * 1000);
+
return isValidDate(date)
+
? date.toLocaleString()
+
: new Date(timestamp / 1000).toLocaleString();
};
+4
client/src/lib/types.ts
···
count: number;
deleted_count: number;
};
+
export type Since = {
+
since: number;
+
};
export type SortOption = "total" | "created" | "deleted" | "date";
+
export type ShowOption = "server init" | "stream start";
+3 -2
client/src/routes/+layout.ts
···
-
export const prerender = true;
-
export const ssr = false;
+
export const prerender = false;
+
export const ssr = true;
+
export const csr = true;
+7
client/src/routes/+page.server.ts
···
+
import { fetchEvents, fetchTrackingSince } from "$lib/api";
+
+
export const load = async () => {
+
const events = await fetchEvents();
+
const trackingSince = await fetchTrackingSince();
+
return { events, trackingSince };
+
};
+130 -73
client/src/routes/+page.svelte
···
<script lang="ts">
import { dev } from "$app/environment";
-
import type { EventRecord, NsidCount, SortOption } from "$lib/types";
+
import type {
+
EventRecord,
+
Events,
+
NsidCount,
+
ShowOption,
+
Since,
+
SortOption,
+
} from "$lib/types";
import { onMount, onDestroy } from "svelte";
-
import { writable } from "svelte/store";
+
import { get, writable } from "svelte/store";
import { PUBLIC_API_URL } from "$env/static/public";
-
import { fetchEvents } from "$lib/api";
+
import { fetchEvents, fetchTrackingSince } from "$lib/api";
import { createRegexFilter } from "$lib/filter";
import StatsCard from "$lib/components/StatsCard.svelte";
import StatusBadge from "$lib/components/StatusBadge.svelte";
···
import SortControls from "$lib/components/SortControls.svelte";
import BskyToggle from "$lib/components/BskyToggle.svelte";
import RefreshControl from "$lib/components/RefreshControl.svelte";
+
import { formatTimestamp } from "$lib/format";
+
import ShowControls from "$lib/components/ShowControls.svelte";
-
const events = writable(new Map<string, EventRecord>());
+
type Props = {
+
data: { events: Events; trackingSince: Since };
+
};
+
+
const { data }: Props = $props();
+
+
const events = writable(
+
new Map<string, EventRecord>(Object.entries(data.events.events)),
+
);
+
const eventsStart = new Map<string, EventRecord>(
+
Object.entries(data.events.events),
+
);
const pendingUpdates = new Map<string, EventRecord>();
-
let eventsList: NsidCount[] = $state([]);
+
let updateTimer: NodeJS.Timeout | null = null;
-
events.subscribe((value) => {
-
eventsList = value
-
.entries()
-
.map(([nsid, event]) => ({
-
nsid,
-
...event,
-
}))
-
.toArray();
-
});
-
let per_second = $state(0);
+
let per_second = $state(data.events.per_second);
+
let tracking_since = $state(data.trackingSince.since);
+
const diffEvents = (
+
oldEvents: Map<string, EventRecord>,
+
newEvents: Map<string, EventRecord>,
+
): NsidCount[] => {
+
const nsidCounts: NsidCount[] = [];
+
for (const [nsid, event] of newEvents.entries()) {
+
const oldEvent = oldEvents.get(nsid);
+
if (oldEvent) {
+
const counts = {
+
nsid,
+
count: event.count - oldEvent.count,
+
deleted_count: event.deleted_count - oldEvent.deleted_count,
+
last_seen: event.last_seen,
+
};
+
if (counts.count > 0 || counts.deleted_count > 0)
+
nsidCounts.push(counts);
+
} else {
+
nsidCounts.push({
+
nsid,
+
...event,
+
});
+
}
+
}
+
return nsidCounts;
+
};
+
const applyEvents = (newEvents: Record<string, EventRecord>) => {
+
events.update((map) => {
+
for (const [nsid, event] of Object.entries(newEvents)) {
+
map.set(nsid, event);
+
}
+
return map;
+
});
+
};
+
+
let error: string | null = $state(null);
+
let filterRegex = $state("");
+
let dontShowBsky = $state(false);
+
let sortBy: SortOption = $state("total");
+
let refreshRate = $state("");
+
let changedByUser = $state(false);
+
let show: ShowOption = $state("server init");
+
let eventsList: NsidCount[] = $state([]);
+
let updateEventsList = $derived((value: Map<string, EventRecord>) => {
+
switch (show) {
+
case "server init":
+
eventsList = value
+
.entries()
+
.map(([nsid, event]) => ({
+
nsid,
+
...event,
+
}))
+
.toArray();
+
break;
+
case "stream start":
+
eventsList = diffEvents(eventsStart, value);
+
break;
+
}
+
});
+
events.subscribe((value) => updateEventsList(value));
let all: EventRecord = $derived(
eventsList.reduce(
(acc, event) => {
···
},
),
);
-
let error: string | null = $state(null);
-
let filterRegex = $state("");
-
let dontShowBsky = $state(false);
-
let sortBy: SortOption = $state("total");
-
let refreshRate = $state("");
-
let previousRefreshRate = "";
let websocket: WebSocket | null = null;
let isStreamOpen = $state(false);
···
};
websocket.onmessage = async (event) => {
const jsonData = JSON.parse(event.data);
-
-
if (jsonData.per_second > 0) {
-
per_second = jsonData.per_second;
-
}
-
-
// Store updates in pending map if refresh rate is set
+
per_second = jsonData.per_second;
if (refreshRate) {
for (const [nsid, event] of Object.entries(jsonData.events)) {
pendingUpdates.set(nsid, event as EventRecord);
}
} else {
-
// Apply updates immediately if no refresh rate
-
events.update((map) => {
-
for (const [nsid, event] of Object.entries(
-
jsonData.events,
-
)) {
-
map.set(nsid, event as EventRecord);
-
}
-
return map;
-
});
+
applyEvents(jsonData.events);
}
};
websocket.onerror = (error) => {
···
error = null;
const data = await fetchEvents();
per_second = data.per_second;
-
events.update((map) => {
-
for (const [nsid, event] of Object.entries(data.events)) {
-
map.set(nsid, event);
-
}
-
return map;
-
});
+
applyEvents(data.events);
+
tracking_since = (await fetchTrackingSince()).since;
} catch (err) {
error =
err instanceof Error
···
console.error("error loading data:", err);
}
};
-
-
// Set refresh rate when sort mode changes
-
$effect(() => {
-
if (sortBy === "date" && !refreshRate) {
-
// Only set to 2 if currently empty (real-time)
-
previousRefreshRate = "";
-
refreshRate = "2";
-
} else if (refreshRate === "2" && sortBy !== "date") {
-
// Only restore to empty if we auto-set it and switching away from date
-
refreshRate = previousRefreshRate;
-
previousRefreshRate = "";
-
}
-
});
// Update the refresh timer when refresh rate changes
$effect(() => {
···
/>
</svelte:head>
-
<header class="border-gray-300 border-b mb-4 pb-2">
+
<header
+
class="bg-white dark:bg-gray-900 border-gray-300 dark:border-gray-950 border-b mb-4 pb-2"
+
>
<div
class="px-2 md:ml-[19vw] mx-auto flex flex-wrap items-center text-center"
>
-
<h1 class="text-4xl font-bold mr-4 text-gray-900">lexicon tracker</h1>
-
<p class="text-lg mt-1 text-gray-600">
-
tracks lexicons seen on the jetstream
+
<h1 class="text-4xl font-bold mr-4 text-gray-900 dark:text-gray-200">
+
lexicon tracker
+
</h1>
+
<p class="text-lg mt-1 text-gray-600 dark:text-gray-300">
+
tracks lexicons seen on the jetstream {tracking_since === 0
+
? ""
+
: `(since: ${formatTimestamp(tracking_since)})`}
</p>
</div>
</header>
-
<div class="md:max-w-[61vw] mx-auto p-2">
-
<div
-
class="min-w-fit grid grid-cols-2 xl:grid-cols-4 gap-2 2xl:gap-6 2xl:mx-16 mb-8"
-
>
+
<div class="bg-white dark:bg-gray-900 md:max-w-[61vw] mx-auto p-2">
+
<div class="min-w-fit grid grid-cols-2 xl:grid-cols-4 gap-2 2xl:gap-6 mb-8">
<StatsCard
title="total creation"
value={all.count}
···
{#if error}
<div
-
class="bg-red-100 border border-red-300 text-red-700 px-4 py-3 rounded-lg mb-6"
+
class="bg-red-100 dark:bg-red-900 border border-red-300 dark:border-red-700 text-red-700 dark:text-red-200 px-4 py-3 rounded-lg mb-6"
>
<p>Error: {error}</p>
</div>
···
{#if eventsList.length > 0}
<div class="mb-8">
<div class="flex flex-wrap items-center gap-3 mb-3">
-
<h2 class="text-2xl font-bold text-gray-900">seen lexicons</h2>
+
<h2 class="text-2xl font-bold text-gray-900 dark:text-gray-200">
+
seen lexicons
+
</h2>
<StatusBadge status={websocketStatus} />
</div>
<div class="flex flex-wrap items-center gap-1.5 mb-6">
···
{filterRegex}
onFilterChange={(value) => (filterRegex = value)}
/>
-
<SortControls
-
{sortBy}
-
onSortChange={(value: SortOption) => (sortBy = value)}
-
/>
<BskyToggle
{dontShowBsky}
onBskyToggle={() => (dontShowBsky = !dontShowBsky)}
/>
+
<SortControls
+
{sortBy}
+
onSortChange={(value: SortOption) => {
+
sortBy = value;
+
if (refreshRate === "" && sortBy === "date")
+
refreshRate = "2";
+
else if (refreshRate === "2" && changedByUser === false)
+
refreshRate = "";
+
}}
+
/>
+
<ShowControls
+
{show}
+
onShowChange={(value: ShowOption) => {
+
show = value;
+
updateEventsList(get(events));
+
}}
+
/>
<RefreshControl
{refreshRate}
-
onRefreshChange={(value) => (refreshRate = value)}
+
onRefreshChange={(value) => {
+
refreshRate = value;
+
changedByUser = refreshRate !== "";
+
}}
/>
</div>
<div
···
{/if}
</div>
-
<footer class="py-2 border-t border-gray-200 text-center">
-
<p class="text-gray-600 text-sm">
+
<footer class="py-2 border-t border-gray-200 dark:border-gray-800 text-center">
+
<p class="text-gray-600 dark:text-gray-200 text-sm">
source code <a
href="https://tangled.sh/@poor.dog/nsid-tracker"
target="_blank"
rel="noopener noreferrer"
-
class="text-blue-600 hover:text-blue-800 underline"
+
class="text-blue-600 dark:text-blue-400 hover:text-blue-800 dark:hover:text-blue-600 underline"
>@poor.dog/nsid-tracker</a
>
</p>
+1 -1
client/svelte.config.js
···
-
import adapter from "@sveltejs/adapter-static";
+
import adapter from "svelte-adapter-bun";
import { vitePreprocess } from "@sveltejs/vite-plugin-svelte";
/** @type {import('@sveltejs/kit').Config} */
+1 -1
nix/client-modules.nix
···
src = ../client;
-
outputHash = "sha256-TzTafbNTng/mMyf0yR9Rc6XS9/zzipwmK9SUWm2XxeY=";
+
outputHash = "sha256-njwXk3u0NUsYWLv9EOdCltgQOjTVkcfu+D+0COSw/6I=";
outputHashAlgo = "sha256";
outputHashMode = "recursive";
+10 -2
nix/client.nix
···
{
+
lib,
stdenv,
makeBinaryWrapper,
bun,
···
'';
buildPhase = ''
runHook preBuild
-
bun --prefer-offline run --bun build
+
bun --prefer-offline run build
runHook postBuild
'';
installPhase = ''
runHook preInstall
-
mkdir -p $out
+
+
mkdir -p $out/bin
cp -R ./build/* $out
+
cp -R ./node_modules $out
+
+
makeBinaryWrapper ${bun}/bin/bun $out/bin/website \
+
--prefix PATH : ${lib.makeBinPath [ bun ]} \
+
--add-flags "run --bun --no-install --cwd $out start"
+
runHook postInstall
'';
}
+5 -2
nix/server.nix
···
{
-
rustPlatform,
-
...
+
rustPlatform,
+
cmake,
+
...
}:
rustPlatform.buildRustPackage {
pname = "nsid-tracker-server";
version = "main";
src = ../server;
+
+
nativeBuildInputs = [ cmake ];
cargoLock = {
lockFile = ../server/Cargo.lock;
+1 -1
server/.gitignore
···
target
-
.fjall_data
+
.fjall_data*
+317 -28
server/Cargo.lock
···
"cfg-if",
"getrandom 0.3.3",
"once_cell",
+
"serde",
"version_check",
"zerocopy",
]
···
]
[[package]]
+
name = "alloc-no-stdlib"
+
version = "2.0.4"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3"
+
+
[[package]]
+
name = "alloc-stdlib"
+
version = "0.2.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece"
+
dependencies = [
+
"alloc-no-stdlib",
+
]
+
+
[[package]]
name = "anyhow"
version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
[[package]]
+
name = "arc-swap"
+
version = "1.7.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
+
+
[[package]]
+
name = "async-compression"
+
version = "0.4.25"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "40f6024f3f856663b45fd0c9b6f2024034a702f453549449e0d84a305900dad4"
+
dependencies = [
+
"brotli",
+
"flate2",
+
"futures-core",
+
"memchr",
+
"pin-project-lite",
+
"tokio",
+
"zstd",
+
"zstd-safe",
+
]
+
+
[[package]]
name = "async-trait"
version = "0.1.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
dependencies = [
"axum-core",
"bytes",
+
"form_urlencoded",
"futures-util",
"http",
"http-body",
···
"serde",
"serde_json",
"serde_path_to_error",
+
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
···
]
[[package]]
+
name = "branches"
+
version = "0.2.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a918aa7a861caeba57e502465c30e3a0d74ae02ee0b9db2933602fdb6a3a90e5"
+
dependencies = [
+
"rustc_version",
+
]
+
+
[[package]]
+
name = "brotli"
+
version = "8.0.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9991eea70ea4f293524138648e41ee89b0b2b12ddef3b255effa43c8056e0e0d"
+
dependencies = [
+
"alloc-no-stdlib",
+
"alloc-stdlib",
+
"brotli-decompressor",
+
]
+
+
[[package]]
+
name = "brotli-decompressor"
+
version = "5.0.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03"
+
dependencies = [
+
"alloc-no-stdlib",
+
"alloc-stdlib",
+
]
+
+
[[package]]
name = "bumpalo"
version = "3.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "deec109607ca693028562ed836a5f1c4b8bd77755c4e132fc5ce11b0b6211ae7"
dependencies = [
+
"jobserver",
+
"libc",
"shlex",
]
···
]
[[package]]
+
name = "crc32fast"
+
version = "1.5.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511"
+
dependencies = [
+
"cfg-if",
+
]
+
+
[[package]]
+
name = "crossbeam-deque"
+
version = "0.8.6"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
+
dependencies = [
+
"crossbeam-epoch",
+
"crossbeam-utils",
+
]
+
+
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "c0d05e1c0dbad51b52c38bda7adceef61b9efc2baf04acfe8726a8c4630a6f57"
[[package]]
+
name = "either"
+
version = "1.15.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
+
+
[[package]]
name = "enum_dispatch"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
]
[[package]]
+
name = "flate2"
+
version = "1.1.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d"
+
dependencies = [
+
"crc32fast",
+
"miniz_oxide",
+
]
+
+
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
+
name = "form_urlencoded"
+
version = "1.2.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456"
+
dependencies = [
+
"percent-encoding",
+
]
+
+
[[package]]
name = "futures-channel"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5"
[[package]]
+
name = "hermit-abi"
+
version = "0.5.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
+
+
[[package]]
name = "http"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
]
[[package]]
+
name = "itertools"
+
version = "0.14.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
+
dependencies = [
+
"either",
+
]
+
+
[[package]]
name = "itoa"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130"
+
+
[[package]]
+
name = "jobserver"
+
version = "0.1.33"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a"
+
dependencies = [
+
"getrandom 0.3.3",
+
"libc",
+
]
[[package]]
name = "js-sys"
···
]
[[package]]
+
name = "num_cpus"
+
version = "1.17.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b"
+
dependencies = [
+
"hermit-abi",
+
"libc",
+
]
+
+
[[package]]
name = "object"
version = "0.36.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
-
name = "overload"
-
version = "0.1.1"
+
name = "ordered-varint"
+
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
+
checksum = "e9cc9f18ab4bad1e01726bda1259feb8f11e5e76308708a966b4c0136e9db34c"
[[package]]
-
name = "papaya"
-
version = "0.2.3"
+
name = "overload"
+
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "f92dd0b07c53a0a0c764db2ace8c541dc47320dad97c2200c2a637ab9dd2328f"
-
dependencies = [
-
"equivalent",
-
"seize",
-
]
+
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
···
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
-
name = "pingora-limits"
-
version = "0.5.0"
+
name = "pkg-config"
+
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "a719a8cb5558ca06bd6076c97b8905d500ea556da89e132ba53d4272844f95b9"
-
dependencies = [
-
"ahash",
-
]
+
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
[[package]]
name = "proc-macro2"
···
]
[[package]]
+
name = "quanta"
+
version = "0.12.6"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7"
+
dependencies = [
+
"crossbeam-utils",
+
"libc",
+
"once_cell",
+
"raw-cpuid",
+
"wasi 0.11.1+wasi-snapshot-preview1",
+
"web-sys",
+
"winapi",
+
]
+
+
[[package]]
name = "quick_cache"
version = "0.6.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
+
name = "raw-cpuid"
+
version = "11.5.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "c6df7ab838ed27997ba19a4664507e6f82b41fe6e20be42929332156e5e85146"
+
dependencies = [
+
"bitflags",
+
]
+
+
[[package]]
+
name = "rayon"
+
version = "1.10.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
+
dependencies = [
+
"either",
+
"rayon-core",
+
]
+
+
[[package]]
+
name = "rayon-core"
+
version = "1.12.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
+
dependencies = [
+
"crossbeam-deque",
+
"crossbeam-utils",
+
]
+
+
[[package]]
+
name = "rclite"
+
version = "0.2.7"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2f528dfeba924f5fc67bb84a17fe043451d1b392758016ce2d9e9116649b0f35"
+
dependencies = [
+
"branches",
+
]
+
+
[[package]]
name = "redox_syscall"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
+
+
[[package]]
+
name = "rustc_version"
+
version = "0.4.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
+
dependencies = [
+
"semver",
+
]
[[package]]
name = "rustix"
···
[[package]]
+
name = "scc"
+
version = "2.3.4"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "22b2d775fb28f245817589471dd49c5edf64237f4a19d10ce9a92ff4651a27f4"
+
dependencies = [
+
"sdd",
+
]
+
+
[[package]]
name = "schannel"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
+
name = "sdd"
+
version = "3.0.10"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca"
+
+
[[package]]
name = "security-framework"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
-
name = "seize"
-
version = "0.5.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "e4b8d813387d566f627f3ea1b914c068aac94c40ae27ec43f5f33bde65abefe7"
-
dependencies = [
-
"libc",
-
"windows-sys 0.52.0",
-
]
-
-
[[package]]
name = "self_cell"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f7d95a54511e0c7be3f51e8867aa8cf35148d7b9445d44de2f943e2b206e749"
+
+
[[package]]
+
name = "semver"
+
version = "1.0.26"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0"
[[package]]
name = "serde"
···
[[package]]
+
name = "serde_urlencoded"
+
version = "0.7.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
+
dependencies = [
+
"form_urlencoded",
+
"itoa",
+
"ryu",
+
"serde",
+
]
+
+
[[package]]
name = "server"
version = "0.1.0"
dependencies = [
+
"ahash",
"anyhow",
+
"arc-swap",
"async-trait",
"axum",
"axum-tws",
+
"byteview",
"fjall",
"futures-util",
-
"papaya",
-
"pingora-limits",
+
"itertools",
+
"ordered-varint",
+
"parking_lot",
+
"quanta",
+
"rayon",
+
"rclite",
"rkyv",
"rustls",
+
"scc",
"serde",
"serde_json",
"smol_str",
+
"threadpool",
"tikv-jemallocator",
"tokio",
"tokio-util",
···
[[package]]
+
name = "threadpool"
+
version = "1.8.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
+
dependencies = [
+
"num_cpus",
+
]
+
+
[[package]]
name = "tikv-jemalloc-sys"
version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2"
dependencies = [
+
"async-compression",
"bitflags",
"bytes",
+
"futures-core",
"http",
"http-body",
"pin-project-lite",
+
"tokio",
+
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
···
[[package]]
+
name = "web-sys"
+
version = "0.3.77"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2"
+
dependencies = [
+
"js-sys",
+
"wasm-bindgen",
+
]
+
+
[[package]]
name = "webpki-root-certs"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde"
+
+
[[package]]
+
name = "zstd"
+
version = "0.13.3"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a"
+
dependencies = [
+
"zstd-safe",
+
]
+
+
[[package]]
+
name = "zstd-safe"
+
version = "7.2.4"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d"
+
dependencies = [
+
"zstd-sys",
+
]
+
+
[[package]]
+
name = "zstd-sys"
+
version = "2.0.15+zstd.1.5.7"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237"
+
dependencies = [
+
"cc",
+
"pkg-config",
+
]
+15 -5
server/Cargo.toml
···
async-trait = "0.1"
tracing-subscriber = {version = "0.3", features = ["env-filter"]}
tracing = "0.1"
-
tokio = { version = "1", features = ["full"] }
+
tokio = { version = "1", features = ["full", "parking_lot"] }
tokio-util = { version = "0.7", features = ["tracing"] }
rustls = { version = "0.23", default-features = false, features = ["log", "ring", "std"] }
tokio-websockets = { version = "0.12", features = ["client", "rustls-platform-verifier", "getrandom", "ring"] }
futures-util = "0.3"
-
axum = { version = "0.8", default-features = false, features = ["http1", "tokio", "tracing", "json"] }
+
axum = { version = "0.8", default-features = false, features = ["http1", "tokio", "tracing", "json", "query"] }
axum-tws = { git = "https://github.com/90-008/axum-tws.git", features = ["http2"] }
-
pingora-limits = "0.5"
-
tower-http = {version = "0.6", features = ["request-id", "trace"]}
+
tower-http = {version = "0.6", features = ["request-id", "trace", "compression-full"]}
fjall = { version = "2", default-features = false, features = ["miniz", "lz4"] }
rkyv = {version = "0.8", features = ["unaligned"]}
smol_str = { version = "0.3", features = ["serde"] }
-
papaya = "0.2"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.141"
+
scc = "2.3.4"
+
ordered-varint = "2.0.0"
+
threadpool = "1.8.1"
+
quanta = "0.12.6"
+
itertools = "0.14.0"
+
byteview = "0.6.1"
+
rayon = "1.10.0"
+
parking_lot = { version = "0.12", features = ["send_guard", "hardware-lock-elision"] }
+
rclite = "0.2.7"
+
arc-swap = "1.7.1"
+
ahash = { version = "0.8.12", features = ["serde"] }
+
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"
+85 -6
server/src/api.rs
···
use std::{
-
collections::HashMap, fmt::Display, net::SocketAddr, ops::Deref, sync::Arc, time::Duration,
+
fmt::Display,
+
net::SocketAddr,
+
ops::{Bound, Deref, RangeBounds},
+
time::Duration,
};
+
use ahash::AHashMap;
use anyhow::anyhow;
-
use axum::{Json, Router, extract::State, http::Request, response::Response, routing::get};
+
use axum::{
+
Json, Router,
+
extract::{Query, State},
+
http::Request,
+
response::Response,
+
routing::get,
+
};
use axum_tws::{Message, WebSocketUpgrade};
-
use serde::Serialize;
+
use rclite::Arc;
+
use serde::{Deserialize, Serialize};
use smol_str::SmolStr;
use tokio_util::sync::CancellationToken;
use tower_http::{
classify::ServerErrorsFailureClass,
+
compression::CompressionLayer,
request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer},
trace::TraceLayer,
};
···
let app = Router::new()
.route("/events", get(events))
.route("/stream_events", get(stream_events))
+
.route("/hits", get(hits))
+
.route("/since", get(since))
+
.route_layer(CompressionLayer::new().br(true).deflate(true).gzip(true).zstd(true))
.route_layer(PropagateRequestIdLayer::x_request_id())
.route_layer(
TraceLayer::new_for_http()
···
#[derive(Serialize)]
struct Events {
per_second: usize,
-
events: HashMap<SmolStr, NsidCount>,
+
events: AHashMap<SmolStr, NsidCount>,
}
async fn events(db: State<Arc<Db>>) -> AppResult<Json<Events>> {
-
let mut events = HashMap::new();
+
let mut events = AHashMap::new();
for result in db.get_counts() {
let (nsid, counts) = result?;
events.insert(
···
}))
}
+
#[derive(Debug, Deserialize)]
+
struct HitsQuery {
+
nsid: SmolStr,
+
from: Option<u64>,
+
to: Option<u64>,
+
}
+
+
#[derive(Debug, Serialize)]
+
struct Hit {
+
timestamp: u64,
+
deleted: bool,
+
}
+
+
const MAX_HITS: usize = 100_000;
+
+
#[derive(Debug)]
+
struct HitsRange {
+
from: Bound<u64>,
+
to: Bound<u64>,
+
}
+
+
impl RangeBounds<u64> for HitsRange {
+
fn start_bound(&self) -> Bound<&u64> {
+
self.from.as_ref()
+
}
+
+
fn end_bound(&self) -> Bound<&u64> {
+
self.to.as_ref()
+
}
+
}
+
+
async fn hits(
+
State(db): State<Arc<Db>>,
+
Query(params): Query<HitsQuery>,
+
) -> AppResult<Json<Vec<Hit>>> {
+
let from = params.to.map(Bound::Included).unwrap_or(Bound::Unbounded);
+
let to = params.from.map(Bound::Included).unwrap_or(Bound::Unbounded);
+
+
db.get_hits(&params.nsid, HitsRange { from, to }, MAX_HITS)
+
.take(MAX_HITS)
+
.try_fold(Vec::with_capacity(MAX_HITS), |mut acc, hit| {
+
let hit = hit?;
+
let hit_data = hit.deser()?;
+
+
acc.push(Hit {
+
timestamp: hit.timestamp,
+
deleted: hit_data.deleted,
+
});
+
Ok(acc)
+
})
+
.map(Json)
+
}
+
async fn stream_events(db: State<Arc<Db>>, ws: WebSocketUpgrade) -> Response {
let span = tracing::info_span!(parent: Span::current(), "ws");
ws.on_upgrade(move |mut socket| {
(async move {
let mut listener = db.new_listener();
let mut data = Events {
-
events: HashMap::<SmolStr, NsidCount>::with_capacity(10),
+
events: AHashMap::<SmolStr, NsidCount>::with_capacity(10),
per_second: 0,
};
let mut updates = 0;
···
.instrument(span)
})
}
+
+
#[derive(Debug, Serialize)]
+
struct Since {
+
since: u64,
+
}
+
+
async fn since(db: State<Arc<Db>>) -> AppResult<Json<Since>> {
+
Ok(Json(Since {
+
since: db.tracking_since()?,
+
}))
+
}
+521
server/src/db/block.rs
···
+
use std::{
+
io::{self, Read, Write},
+
marker::PhantomData,
+
usize,
+
};
+
+
use rkyv::{
+
Archive, Deserialize, Serialize,
+
api::high::{HighSerializer, HighValidator},
+
bytecheck::CheckBytes,
+
de::Pool,
+
rancor::{self, Strategy},
+
ser::allocator::ArenaHandle,
+
util::AlignedVec,
+
};
+
+
use crate::{
+
error::{AppError, AppResult},
+
utils::{ReadVariableExt, WriteVariableExt},
+
};
+
+
pub struct Item<T> {
+
pub timestamp: u64,
+
pub data: AlignedVec,
+
phantom: PhantomData<T>,
+
}
+
+
impl<T> Item<T>
+
where
+
T: Archive,
+
T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>>
+
+ Deserialize<T, Strategy<Pool, rancor::Error>>,
+
{
+
pub fn deser(&self) -> AppResult<T> {
+
rkyv::from_bytes(&self.data).map_err(AppError::from)
+
}
+
}
+
+
impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> {
+
pub fn new(timestamp: u64, data: &T) -> Self {
+
Item {
+
timestamp,
+
data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() },
+
phantom: PhantomData,
+
}
+
}
+
}
+
+
pub struct ItemEncoder<W: Write, T> {
+
writer: W,
+
prev_timestamp: u64,
+
prev_delta: i64,
+
item_count: usize,
+
_item: PhantomData<T>,
+
}
+
+
impl<W: Write, T> ItemEncoder<W, T> {
+
pub fn new(writer: W, item_count: usize) -> Self {
+
assert!(item_count > 0);
+
ItemEncoder {
+
writer,
+
prev_timestamp: 0,
+
prev_delta: 0,
+
item_count,
+
_item: PhantomData,
+
}
+
}
+
+
/// NOTE: this is a best effort estimate of the encoded length of the block.
+
/// if T contains variable-length data, the encoded length may be larger than this estimate.
+
pub fn encoded_len(item_count: usize) -> usize {
+
// items length + item count * delta length + data length
+
size_of::<usize>() + item_count * size_of::<(i64, T)>()
+
}
+
+
pub fn encode(&mut self, item: &Item<T>) -> io::Result<()> {
+
if self.prev_timestamp == 0 {
+
self.writer.write_varint(self.item_count)?;
+
// self.writer.write_varint(item.timestamp)?;
+
self.prev_timestamp = item.timestamp;
+
self.write_data(&item.data)?;
+
return Ok(());
+
}
+
+
let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64;
+
+
self.writer.write_varint(delta - self.prev_delta)?;
+
self.prev_timestamp = item.timestamp;
+
self.prev_delta = delta;
+
+
self.write_data(&item.data)?;
+
+
Ok(())
+
}
+
+
fn write_data(&mut self, data: &[u8]) -> io::Result<()> {
+
self.writer.write_varint(data.len())?;
+
self.writer.write_all(data)?;
+
Ok(())
+
}
+
+
pub fn finish(mut self) -> io::Result<W> {
+
self.writer.flush()?;
+
Ok(self.writer)
+
}
+
}
+
+
pub struct ItemDecoder<R, T> {
+
reader: R,
+
current_timestamp: u64,
+
current_delta: i64,
+
items_read: usize,
+
expected: usize,
+
_item: PhantomData<T>,
+
}
+
+
impl<R: Read, T: Archive> ItemDecoder<R, T> {
+
pub fn new(mut reader: R, start_timestamp: u64) -> io::Result<Self> {
+
let expected = match reader.read_varint() {
+
Ok(expected) => expected,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => 0,
+
Err(e) => return Err(e.into()),
+
};
+
+
Ok(ItemDecoder {
+
reader,
+
current_timestamp: start_timestamp,
+
current_delta: 0,
+
items_read: 0,
+
expected,
+
_item: PhantomData,
+
})
+
}
+
+
pub fn item_count(&self) -> usize {
+
self.expected
+
}
+
+
pub fn decode(&mut self) -> io::Result<Option<Item<T>>> {
+
if self.items_read == 0 {
+
// read the first timestamp
+
// let timestamp = match self.reader.read_varint::<u64>() {
+
// Ok(timestamp) => timestamp,
+
// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
// Err(e) => return Err(e.into()),
+
// };
+
// self.current_timestamp = timestamp;
+
+
let Some(data_raw) = self.read_item()? else {
+
return Ok(None);
+
};
+
+
self.items_read += 1;
+
return Ok(Some(Item {
+
timestamp: self.current_timestamp,
+
data: data_raw,
+
phantom: PhantomData,
+
}));
+
}
+
+
if self.items_read >= self.expected {
+
return Ok(None);
+
}
+
+
let Some(_delta) = self.read_timestamp()? else {
+
return Ok(None);
+
};
+
+
// read data
+
let data_raw = match self.read_item()? {
+
Some(data_raw) => data_raw,
+
None => {
+
return Err(io::Error::new(
+
io::ErrorKind::UnexpectedEof,
+
"expected data after delta",
+
)
+
.into());
+
}
+
};
+
+
self.items_read += 1;
+
Ok(Some(Item {
+
timestamp: self.current_timestamp,
+
data: data_raw,
+
phantom: PhantomData,
+
}))
+
}
+
+
// [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1]
+
fn read_timestamp(&mut self) -> io::Result<Option<u64>> {
+
let delta = match self.reader.read_varint::<i64>() {
+
Ok(delta) => delta,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
Err(e) => return Err(e.into()),
+
};
+
self.current_delta += delta;
+
self.current_timestamp =
+
(self.current_timestamp as i128 + self.current_delta as i128) as u64;
+
Ok(Some(self.current_timestamp))
+
}
+
+
fn read_item(&mut self) -> io::Result<Option<AlignedVec>> {
+
let data_len = match self.reader.read_varint::<usize>() {
+
Ok(data_len) => data_len,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
Err(e) => return Err(e.into()),
+
};
+
let mut data_raw = AlignedVec::with_capacity(data_len);
+
for _ in 0..data_len {
+
data_raw.push(0);
+
}
+
self.reader.read_exact(data_raw.as_mut_slice())?;
+
Ok(Some(data_raw))
+
}
+
}
+
+
impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> {
+
type Item = io::Result<Item<T>>;
+
+
fn next(&mut self) -> Option<Self::Item> {
+
self.decode().transpose()
+
}
+
+
fn size_hint(&self) -> (usize, Option<usize>) {
+
(self.expected, Some(self.expected))
+
}
+
}
+
+
#[cfg(test)]
+
mod test {
+
use super::*;
+
use rkyv::{Archive, Deserialize, Serialize};
+
use std::io::Cursor;
+
+
#[derive(Archive, Deserialize, Serialize, Debug, PartialEq)]
+
#[rkyv(compare(PartialEq))]
+
struct TestData {
+
id: u32,
+
value: String,
+
}
+
+
#[test]
+
fn test_encoder_decoder_single_item() {
+
let data = TestData {
+
id: 123,
+
value: "test".to_string(),
+
};
+
+
let item = Item::new(1000, &data);
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, 1);
+
encoder.encode(&item).unwrap();
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_item = decoder.decode().unwrap().unwrap();
+
assert_eq!(decoded_item.timestamp, 1000);
+
+
let decoded_data = decoded_item.deser().unwrap();
+
assert_eq!(decoded_data.id, 123);
+
assert_eq!(decoded_data.value.as_str(), "test");
+
}
+
+
#[test]
+
fn test_encoder_decoder_multiple_items() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "first".to_string(),
+
},
+
),
+
Item::new(
+
1010,
+
&TestData {
+
id: 2,
+
value: "second".to_string(),
+
},
+
),
+
Item::new(
+
1015,
+
&TestData {
+
id: 3,
+
value: "third".to_string(),
+
},
+
),
+
Item::new(
+
1025,
+
&TestData {
+
id: 4,
+
value: "fourth".to_string(),
+
},
+
),
+
];
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, items.len());
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let mut decoded_items = Vec::new();
+
while let Some(item) = decoder.decode().unwrap() {
+
decoded_items.push(item);
+
}
+
+
assert_eq!(decoded_items.len(), 4);
+
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
+
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.deser().unwrap().id, decoded.deser().unwrap().id);
+
assert_eq!(
+
original.deser().unwrap().value.as_str(),
+
decoded.deser().unwrap().value.as_str()
+
);
+
}
+
}
+
+
#[test]
+
fn test_encoder_decoder_with_iterator() {
+
let items = vec![
+
Item::new(
+
2000,
+
&TestData {
+
id: 10,
+
value: "a".to_string(),
+
},
+
),
+
Item::new(
+
2005,
+
&TestData {
+
id: 20,
+
value: "b".to_string(),
+
},
+
),
+
Item::new(
+
2012,
+
&TestData {
+
id: 30,
+
value: "c".to_string(),
+
},
+
),
+
];
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, items.len());
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 3);
+
assert_eq!(decoded_items[0].timestamp, 2000);
+
assert_eq!(decoded_items[1].timestamp, 2005);
+
assert_eq!(decoded_items[2].timestamp, 2012);
+
+
assert_eq!(decoded_items[0].deser().unwrap().id, 10);
+
assert_eq!(decoded_items[1].deser().unwrap().id, 20);
+
assert_eq!(decoded_items[2].deser().unwrap().id, 30);
+
}
+
+
#[test]
+
fn test_delta_compression() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "a".to_string(),
+
},
+
),
+
Item::new(
+
1010,
+
&TestData {
+
id: 2,
+
value: "b".to_string(),
+
},
+
), // delta = 10
+
Item::new(
+
1020,
+
&TestData {
+
id: 3,
+
value: "c".to_string(),
+
},
+
), // delta = 10, delta-of-delta = 0
+
Item::new(
+
1025,
+
&TestData {
+
id: 4,
+
value: "d".to_string(),
+
},
+
), // delta = 5, delta-of-delta = -5
+
];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, items.len());
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode and verify
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
+
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.deser().unwrap().id, decoded.deser().unwrap().id);
+
}
+
}
+
+
#[test]
+
fn test_empty_decode() {
+
let buffer = Vec::new();
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let result = decoder.decode().unwrap();
+
assert!(result.is_none());
+
}
+
+
#[test]
+
fn test_backwards_timestamp() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "first".to_string(),
+
},
+
),
+
Item::new(
+
900,
+
&TestData {
+
id: 2,
+
value: "second".to_string(),
+
},
+
),
+
];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, items.len());
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 2);
+
assert_eq!(decoded_items[0].timestamp, 1000);
+
assert_eq!(decoded_items[1].timestamp, 900);
+
}
+
+
#[test]
+
fn test_different_data_sizes() {
+
let small_data = TestData {
+
id: 1,
+
value: "x".to_string(),
+
};
+
let large_data = TestData {
+
id: 2,
+
value: "a".repeat(1000),
+
};
+
+
let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, items.len());
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 2);
+
assert_eq!(decoded_items[0].deser().unwrap().value.as_str(), "x");
+
assert_eq!(decoded_items[1].deser().unwrap().value.len(), 1000);
+
assert_eq!(
+
decoded_items[1].deser().unwrap().value.as_str(),
+
"a".repeat(1000)
+
);
+
}
+
}
+260
server/src/db/handle.rs
···
+
use std::{
+
fmt::Debug,
+
io::Cursor,
+
ops::{Bound, RangeBounds},
+
sync::atomic::{AtomicU64, Ordering as AtomicOrdering},
+
time::Duration,
+
};
+
+
use byteview::ByteView;
+
use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice, Snapshot};
+
use itertools::Itertools;
+
use parking_lot::Mutex;
+
use rayon::iter::{IntoParallelIterator, ParallelIterator};
+
use rclite::Arc;
+
use smol_str::SmolStr;
+
+
use crate::{
+
db::{EventRecord, NsidHit, block},
+
error::{AppError, AppResult},
+
utils::{
+
ArcRefCnt, ArcliteSwap, CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt,
+
varints_unsigned_encoded,
+
},
+
};
+
+
pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
+
pub type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
+
pub type Item = block::Item<NsidHit>;
+
+
pub struct Block {
+
pub written: usize,
+
pub key: ByteView,
+
pub data: Vec<u8>,
+
}
+
+
pub struct LexiconHandle {
+
write_tree: Partition,
+
read_tree: ArcliteSwap<Snapshot>,
+
nsid: SmolStr,
+
buf: Arc<Mutex<Vec<EventRecord>>>,
+
last_insert: AtomicU64, // relaxed
+
eps: DefaultRateTracker,
+
}
+
+
impl Debug for LexiconHandle {
+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
f.debug_struct("LexiconHandle")
+
.field("nsid", self.nsid())
+
.finish()
+
}
+
}
+
+
impl LexiconHandle {
+
pub fn new(keyspace: &Keyspace, nsid: &str) -> Self {
+
let opts = PartitionCreateOptions::default()
+
.block_size(1024 * 48)
+
.compression(fjall::CompressionType::Miniz(9));
+
let write_tree = keyspace.open_partition(nsid, opts).unwrap();
+
let read_tree = ArcliteSwap::new(ArcRefCnt::new(write_tree.snapshot()));
+
Self {
+
write_tree,
+
read_tree,
+
nsid: nsid.into(),
+
buf: Default::default(),
+
last_insert: AtomicU64::new(0),
+
eps: RateTracker::new(Duration::from_secs(10)),
+
}
+
}
+
+
#[inline(always)]
+
pub fn read(&self) -> arc_swap::Guard<ArcRefCnt<Snapshot>> {
+
self.read_tree.load()
+
}
+
+
#[inline(always)]
+
pub fn update_tree(&self) {
+
self.read_tree
+
.store(ArcRefCnt::new(self.write_tree.snapshot()));
+
}
+
+
#[inline(always)]
+
pub fn span(&self) -> tracing::Span {
+
tracing::info_span!("handle", nsid = %self.nsid)
+
}
+
+
#[inline(always)]
+
pub fn nsid(&self) -> &SmolStr {
+
&self.nsid
+
}
+
+
#[inline(always)]
+
pub fn item_count(&self) -> usize {
+
self.buf.lock().len()
+
}
+
+
pub fn since_last_activity(&self) -> Duration {
+
Duration::from_nanos(
+
CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw()),
+
)
+
}
+
+
pub fn suggested_block_size(&self) -> usize {
+
self.eps.rate() as usize * 60
+
}
+
+
pub fn queue(&self, events: impl IntoIterator<Item = EventRecord>) {
+
let mut count = 0;
+
self.buf.lock().extend(events.into_iter().inspect(|_| {
+
count += 1;
+
}));
+
self.last_insert.store(CLOCK.raw(), AtomicOrdering::Relaxed);
+
self.eps.observe(count);
+
}
+
+
pub fn compact(
+
&self,
+
compact_to: usize,
+
range: impl RangeBounds<u64>,
+
sort: bool,
+
) -> AppResult<()> {
+
let _span = self.span().entered();
+
+
let start_limit = match range.start_bound().cloned() {
+
Bound::Included(start) => start,
+
Bound::Excluded(start) => start.saturating_add(1),
+
Bound::Unbounded => 0,
+
};
+
let end_limit = match range.end_bound().cloned() {
+
Bound::Included(end) => end,
+
Bound::Excluded(end) => end.saturating_sub(1),
+
Bound::Unbounded => u64::MAX,
+
};
+
+
let start_key = varints_unsigned_encoded([start_limit]);
+
let end_key = varints_unsigned_encoded([end_limit]);
+
+
let blocks_to_compact = self
+
.read()
+
.range(start_key..end_key)
+
.collect::<Result<Vec<_>, _>>()?;
+
if blocks_to_compact.len() < 2 {
+
return Ok(());
+
}
+
+
let start_blocks_size = blocks_to_compact.len();
+
let keys_to_delete = blocks_to_compact.iter().map(|(key, _)| key);
+
let mut all_items =
+
blocks_to_compact
+
.iter()
+
.try_fold(Vec::new(), |mut acc, (key, value)| {
+
let mut timestamps = Cursor::new(key);
+
let start_timestamp = timestamps.read_varint()?;
+
let decoder = block::ItemDecoder::new(Cursor::new(value), start_timestamp)?;
+
let mut items = decoder.collect::<Result<Vec<_>, _>>()?;
+
acc.append(&mut items);
+
AppResult::Ok(acc)
+
})?;
+
+
if sort {
+
all_items.sort_unstable_by_key(|e| e.timestamp);
+
}
+
+
let new_blocks = all_items
+
.into_iter()
+
.chunks(compact_to)
+
.into_iter()
+
.map(|chunk| chunk.collect_vec())
+
.collect_vec()
+
.into_par_iter()
+
.map(|chunk| {
+
let count = chunk.len();
+
Self::encode_block_from_items(chunk, count)
+
})
+
.collect::<Result<Vec<_>, _>>()?;
+
let end_blocks_size = new_blocks.len();
+
+
for key in keys_to_delete {
+
self.write_tree.remove(key.clone())?;
+
}
+
for block in new_blocks {
+
self.write_tree.insert(block.key, block.data)?;
+
}
+
+
let reduction =
+
((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0;
+
tracing::info!(
+
{
+
start = start_blocks_size,
+
end = end_blocks_size,
+
},
+
"blocks compacted {reduction:.2}%",
+
);
+
+
Ok(())
+
}
+
+
pub fn insert_block(&self, block: Block) -> AppResult<()> {
+
self.write_tree
+
.insert(block.key, block.data)
+
.map_err(AppError::from)
+
}
+
+
pub fn encode_block_from_items(
+
items: impl IntoIterator<Item = Item>,
+
count: usize,
+
) -> AppResult<Block> {
+
if count == 0 {
+
return Err(std::io::Error::new(
+
std::io::ErrorKind::InvalidInput,
+
"no items requested",
+
)
+
.into());
+
}
+
let mut writer =
+
ItemEncoder::new(Vec::with_capacity(ItemEncoder::encoded_len(count)), count);
+
let mut start_timestamp = None;
+
let mut end_timestamp = None;
+
let mut written = 0_usize;
+
for item in items.into_iter().take(count) {
+
writer.encode(&item)?;
+
if start_timestamp.is_none() {
+
start_timestamp = Some(item.timestamp);
+
}
+
end_timestamp = Some(item.timestamp);
+
written += 1;
+
}
+
if written != count {
+
return Err(std::io::Error::new(
+
std::io::ErrorKind::InvalidData,
+
"unexpected number of items, invalid data?",
+
)
+
.into());
+
}
+
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
+
let value = writer.finish()?;
+
let key = varints_unsigned_encoded([start_timestamp, end_timestamp]);
+
return Ok(Block {
+
written,
+
key,
+
data: value,
+
});
+
}
+
Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "no items are in queue").into())
+
}
+
+
pub fn take_block_items(&self, item_count: usize) -> Vec<Item> {
+
let mut buf = self.buf.lock();
+
let end = item_count.min(buf.len());
+
buf.drain(..end)
+
.map(|event| {
+
Item::new(
+
event.timestamp,
+
&NsidHit {
+
deleted: event.deleted,
+
},
+
)
+
})
+
.collect()
+
}
+
}
+504
server/src/db/mod.rs
···
+
use std::{
+
fmt::Debug,
+
io::Cursor,
+
ops::{Bound, Deref, RangeBounds},
+
path::Path,
+
time::Duration,
+
u64,
+
};
+
+
use ahash::{AHashMap, AHashSet};
+
use byteview::StrView;
+
use fjall::{Keyspace, Partition, PartitionCreateOptions};
+
use itertools::{Either, Itertools};
+
use rayon::iter::{IntoParallelIterator, ParallelIterator};
+
use rclite::Arc;
+
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
+
use smol_str::{SmolStr, ToSmolStr};
+
use tokio::sync::broadcast;
+
use tokio_util::sync::CancellationToken;
+
+
use crate::{
+
db::handle::{ItemDecoder, LexiconHandle},
+
error::{AppError, AppResult},
+
jetstream::JetstreamEvent,
+
utils::{CLOCK, RateTracker, ReadVariableExt, varints_unsigned_encoded},
+
};
+
+
mod block;
+
mod handle;
+
+
#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
+
#[rkyv(compare(PartialEq), derive(Debug))]
+
pub struct NsidCounts {
+
pub count: u128,
+
pub deleted_count: u128,
+
pub last_seen: u64,
+
}
+
+
#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
+
#[rkyv(compare(PartialEq), derive(Debug))]
+
pub struct NsidHit {
+
pub deleted: bool,
+
}
+
+
#[derive(Clone)]
+
pub struct EventRecord {
+
pub nsid: SmolStr,
+
pub timestamp: u64, // seconds
+
pub deleted: bool,
+
}
+
+
impl EventRecord {
+
pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
+
match event {
+
JetstreamEvent::Commit {
+
time_us, commit, ..
+
} => Some(Self {
+
nsid: commit.collection.into(),
+
timestamp: time_us / 1_000_000,
+
deleted: false,
+
}),
+
JetstreamEvent::Delete {
+
time_us, commit, ..
+
} => Some(Self {
+
nsid: commit.collection.into(),
+
timestamp: time_us / 1_000_000,
+
deleted: true,
+
}),
+
_ => None,
+
}
+
}
+
}
+
+
pub struct DbInfo {
+
pub nsids: AHashMap<SmolStr, Vec<usize>>,
+
pub disk_size: u64,
+
}
+
+
pub struct DbConfig {
+
pub ks_config: fjall::Config,
+
pub min_block_size: usize,
+
pub max_block_size: usize,
+
pub max_last_activity: Duration,
+
}
+
+
impl DbConfig {
+
pub fn path(mut self, path: impl AsRef<Path>) -> Self {
+
self.ks_config = fjall::Config::new(path);
+
self
+
}
+
+
pub fn ks(mut self, f: impl FnOnce(fjall::Config) -> fjall::Config) -> Self {
+
self.ks_config = f(self.ks_config);
+
self
+
}
+
}
+
+
impl Default for DbConfig {
+
fn default() -> Self {
+
Self {
+
ks_config: fjall::Config::default()
+
.cache_size(1024 * 1024 * 512)
+
.max_write_buffer_size(u64::MAX),
+
min_block_size: 1000,
+
max_block_size: 250_000,
+
max_last_activity: Duration::from_secs(10),
+
}
+
}
+
}
+
+
// counts is nsid -> NsidCounts
+
// hits is tree per nsid: varint start time + varint end time -> block of hits
+
pub struct Db {
+
pub cfg: DbConfig,
+
pub ks: Keyspace,
+
counts: Partition,
+
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>, ahash::RandomState>,
+
sync_pool: threadpool::ThreadPool,
+
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
+
eps: RateTracker<100>, // 100 millis buckets
+
cancel_token: CancellationToken,
+
}
+
+
impl Db {
+
pub fn new(cfg: DbConfig, cancel_token: CancellationToken) -> AppResult<Self> {
+
tracing::info!("opening db...");
+
let ks = cfg.ks_config.clone().open()?;
+
Ok(Self {
+
cfg,
+
hits: Default::default(),
+
sync_pool: threadpool::Builder::new()
+
.num_threads(rayon::current_num_threads() * 2)
+
.build(),
+
counts: ks.open_partition(
+
"_counts",
+
PartitionCreateOptions::default().compression(fjall::CompressionType::None),
+
)?,
+
ks,
+
event_broadcaster: broadcast::channel(1000).0,
+
eps: RateTracker::new(Duration::from_secs(1)),
+
cancel_token,
+
})
+
}
+
+
#[inline(always)]
+
pub fn shutting_down(&self) -> impl Future<Output = ()> {
+
self.cancel_token.cancelled()
+
}
+
+
#[inline(always)]
+
pub fn is_shutting_down(&self) -> bool {
+
self.cancel_token.is_cancelled()
+
}
+
+
#[inline(always)]
+
pub fn eps(&self) -> usize {
+
self.eps.rate() as usize
+
}
+
+
#[inline(always)]
+
pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
+
self.event_broadcaster.subscribe()
+
}
+
+
pub fn sync(&self, all: bool) -> AppResult<()> {
+
let start = CLOCK.now();
+
// prepare all the data
+
let nsids_len = self.hits.len();
+
let mut data = Vec::with_capacity(nsids_len);
+
let mut nsids = AHashSet::with_capacity(nsids_len);
+
let _guard = scc::ebr::Guard::new();
+
for (nsid, handle) in self.hits.iter(&_guard) {
+
let mut nsid_data = Vec::with_capacity(2);
+
// let mut total_count = 0;
+
let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity;
+
// if we disconnect for a long time, we want to sync all of what we
+
// have to avoid having many small blocks (even if we run compaction
+
// later, it reduces work until we run compaction)
+
let block_size = (is_too_old || all)
+
.then_some(self.cfg.max_block_size)
+
.unwrap_or_else(|| {
+
self.cfg
+
.max_block_size
+
.min(self.cfg.min_block_size.max(handle.suggested_block_size()))
+
});
+
let count = handle.item_count();
+
let data_count = count / block_size;
+
if count > 0 && (all || data_count > 0 || is_too_old) {
+
for _ in 0..data_count {
+
nsid_data.push((handle.clone(), block_size));
+
// total_count += block_size;
+
}
+
// only sync remainder if we haven't met block size
+
let remainder = count % block_size;
+
if (all || data_count == 0) && remainder > 0 {
+
nsid_data.push((handle.clone(), remainder));
+
// total_count += remainder;
+
}
+
}
+
let _span = handle.span().entered();
+
if nsid_data.len() > 0 {
+
// tracing::info!(
+
// {blocks = %nsid_data.len(), count = %total_count},
+
// "will encode & sync",
+
// );
+
nsids.insert(nsid.clone());
+
data.push(nsid_data);
+
}
+
}
+
drop(_guard);
+
+
// process the blocks
+
data.into_par_iter()
+
.map(|chunk| {
+
chunk
+
.into_iter()
+
.map(|(handle, max_block_size)| {
+
(handle.take_block_items(max_block_size), handle)
+
})
+
.collect::<Vec<_>>()
+
.into_par_iter()
+
.map(|(items, handle)| {
+
let count = items.len();
+
let block = LexiconHandle::encode_block_from_items(items, count)?;
+
AppResult::Ok((block, handle))
+
})
+
.collect::<Result<Vec<_>, _>>()
+
})
+
.try_for_each(|chunk| {
+
let chunk = chunk?;
+
for (block, handle) in chunk {
+
self.sync_pool.execute(move || {
+
let _span = handle.span().entered();
+
let written = block.written;
+
match handle.insert_block(block) {
+
Ok(_) => {
+
tracing::info!({count = %written}, "synced")
+
}
+
Err(err) => tracing::error!({ err = %err }, "failed to sync block"),
+
}
+
});
+
}
+
AppResult::Ok(())
+
})?;
+
self.sync_pool.join();
+
+
// update snapshots for all (changed) handles
+
for nsid in nsids {
+
self.hits.peek_with(&nsid, |_, handle| handle.update_tree());
+
}
+
+
tracing::info!(time = %start.elapsed().as_secs_f64(), "synced all blocks");
+
+
Ok(())
+
}
+
+
pub fn compact(
+
&self,
+
nsid: impl AsRef<str>,
+
max_count: usize,
+
range: impl RangeBounds<u64>,
+
sort: bool,
+
) -> AppResult<()> {
+
let Some(handle) = self.get_handle(nsid) else {
+
return Ok(());
+
};
+
handle.compact(max_count, range, sort)?;
+
handle.update_tree();
+
Ok(())
+
}
+
+
pub fn compact_all(
+
&self,
+
max_count: usize,
+
range: impl RangeBounds<u64> + Clone,
+
sort: bool,
+
) -> AppResult<()> {
+
for nsid in self.get_nsids() {
+
self.compact(nsid, max_count, range.clone(), sort)?;
+
}
+
Ok(())
+
}
+
+
pub fn major_compact(&self) -> AppResult<()> {
+
self.compact_all(self.cfg.max_block_size, .., true)?;
+
Ok(())
+
}
+
+
#[inline(always)]
+
fn get_handle(&self, nsid: impl AsRef<str>) -> Option<Arc<LexiconHandle>> {
+
let _guard = scc::ebr::Guard::new();
+
let handle = match self.hits.peek(nsid.as_ref(), &_guard) {
+
Some(handle) => handle.clone(),
+
None => {
+
if self.ks.partition_exists(nsid.as_ref()) {
+
let handle = Arc::new(LexiconHandle::new(&self.ks, nsid.as_ref()));
+
let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
+
handle
+
} else {
+
return None;
+
}
+
}
+
};
+
Some(handle)
+
}
+
+
#[inline(always)]
+
fn ensure_handle(&self, nsid: &SmolStr) -> impl Deref<Target = Arc<LexiconHandle>> + use<'_> {
+
self.hits
+
.entry(nsid.clone())
+
.or_insert_with(|| Arc::new(LexiconHandle::new(&self.ks, &nsid)))
+
}
+
+
pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> {
+
let mut seen_events = 0;
+
for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() {
+
let mut counts = self.get_count(&key)?;
+
self.ensure_handle(&key).queue(chunk.inspect(|e| {
+
// increment count
+
counts.last_seen = e.timestamp;
+
if e.deleted {
+
counts.deleted_count += 1;
+
} else {
+
counts.count += 1;
+
}
+
seen_events += 1;
+
}));
+
self.insert_count(&key, &counts)?;
+
if self.event_broadcaster.receiver_count() > 0 {
+
let _ = self.event_broadcaster.send((key, counts));
+
}
+
}
+
self.eps.observe(seen_events);
+
Ok(())
+
}
+
+
#[inline(always)]
+
fn insert_count(&self, nsid: &str, counts: &NsidCounts) -> AppResult<()> {
+
self.counts
+
.insert(
+
nsid,
+
unsafe { rkyv::to_bytes::<Error>(counts).unwrap_unchecked() }.as_slice(),
+
)
+
.map_err(AppError::from)
+
}
+
+
pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
+
let Some(raw) = self.counts.get(nsid)? else {
+
return Ok(NsidCounts::default());
+
};
+
Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
+
}
+
+
pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
+
self.counts.iter().map(|res| {
+
res.map_err(AppError::from).map(|(key, val)| {
+
(
+
SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
+
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
+
)
+
})
+
})
+
}
+
+
pub fn get_nsids(&self) -> impl Iterator<Item = StrView> {
+
self.ks
+
.list_partitions()
+
.into_iter()
+
.filter(|k| k.deref() != "_counts")
+
}
+
+
pub fn info(&self) -> AppResult<DbInfo> {
+
let mut nsids = AHashMap::new();
+
for nsid in self.get_nsids() {
+
let Some(handle) = self.get_handle(&nsid) else {
+
continue;
+
};
+
let block_lens = handle
+
.read()
+
.iter()
+
.rev()
+
.try_fold(Vec::new(), |mut acc, item| {
+
let (key, value) = item?;
+
let mut timestamps = Cursor::new(key);
+
let start_timestamp = timestamps.read_varint()?;
+
let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?;
+
acc.push(decoder.item_count());
+
AppResult::Ok(acc)
+
})?;
+
nsids.insert(nsid.to_smolstr(), block_lens);
+
}
+
Ok(DbInfo {
+
nsids,
+
disk_size: self.ks.disk_space(),
+
})
+
}
+
+
pub fn get_hits(
+
&self,
+
nsid: &str,
+
range: impl RangeBounds<u64> + std::fmt::Debug,
+
max_items: usize,
+
) -> impl Iterator<Item = AppResult<handle::Item>> {
+
let start_limit = match range.start_bound().cloned() {
+
Bound::Included(start) => start,
+
Bound::Excluded(start) => start.saturating_add(1),
+
Bound::Unbounded => 0,
+
};
+
let end_limit = match range.end_bound().cloned() {
+
Bound::Included(end) => end,
+
Bound::Excluded(end) => end.saturating_sub(1),
+
Bound::Unbounded => u64::MAX,
+
};
+
let end_key = varints_unsigned_encoded([end_limit]);
+
+
let Some(handle) = self.get_handle(nsid) else {
+
return Either::Right(std::iter::empty());
+
};
+
+
// let mut ts = CLOCK.now();
+
let map_block = move |(res, current_item_count)| -> AppResult<(Option<_>, usize)> {
+
if current_item_count >= max_items {
+
return Ok((None, current_item_count));
+
}
+
let (key, val) = res?;
+
let mut key_reader = Cursor::new(key);
+
let start_timestamp = key_reader.read_varint::<u64>()?;
+
// let end_timestamp = key_reader.read_varint::<u64>()?;
+
if start_timestamp < start_limit {
+
// tracing::info!(
+
// "stopped at block with timestamps {start_timestamp}..{end_timestamp} because {start_limit} is greater"
+
// );
+
return Ok((None, current_item_count));
+
}
+
let decoder = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?;
+
let current_item_count = current_item_count + decoder.item_count();
+
// tracing::info!(
+
// "took {}ns to get block with size {}",
+
// ts.elapsed().as_nanos(),
+
// decoder.item_count()
+
// );
+
// ts = CLOCK.now();
+
Ok((
+
Some(
+
decoder
+
.take_while(move |item| {
+
item.as_ref().map_or(true, |item| {
+
item.timestamp <= end_limit && item.timestamp >= start_limit
+
})
+
})
+
.map(|res| res.map_err(AppError::from)),
+
),
+
current_item_count,
+
))
+
};
+
+
let (blocks, _counted) = handle
+
.read()
+
.range(..end_key)
+
.map(|res| res.map_err(AppError::from))
+
.rev()
+
.fold_while(
+
(Vec::with_capacity(20), 0),
+
|(mut blocks, current_item_count), res| {
+
use itertools::FoldWhile::*;
+
+
match map_block((res, current_item_count)) {
+
Ok((Some(block), current_item_count)) => {
+
blocks.push(Ok(block));
+
Continue((blocks, current_item_count))
+
}
+
Ok((None, current_item_count)) => Done((blocks, current_item_count)),
+
Err(err) => {
+
blocks.push(Err(err));
+
Done((blocks, current_item_count))
+
}
+
}
+
},
+
)
+
.into_inner();
+
+
// tracing::info!(
+
// "got blocks with size {}, item count {counted}",
+
// blocks.len()
+
// );
+
+
Either::Left(blocks.into_iter().rev().flatten().flatten())
+
}
+
+
pub fn tracking_since(&self) -> AppResult<u64> {
+
// HACK: we should actually store when we started tracking but im lazy
+
// this should be accurate enough
+
let Some(handle) = self.get_handle("app.bsky.feed.like") else {
+
return Ok(0);
+
};
+
let Some((timestamps_raw, _)) = handle.read().first_key_value()? else {
+
return Ok(0);
+
};
+
let mut timestamp_reader = Cursor::new(timestamps_raw);
+
timestamp_reader
+
.read_varint::<u64>()
+
.map_err(AppError::from)
+
}
+
}
-195
server/src/db.rs
···
-
use std::{ops::Deref, path::Path, time::Duration};
-
-
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions};
-
use pingora_limits::rate::Rate;
-
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
-
use smol_str::SmolStr;
-
use tokio::sync::broadcast;
-
-
use crate::{
-
error::{AppError, AppResult},
-
jetstream::JetstreamEvent,
-
};
-
-
#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
-
#[rkyv(compare(PartialEq), derive(Debug))]
-
pub struct NsidCounts {
-
pub count: u128,
-
pub deleted_count: u128,
-
pub last_seen: u64,
-
}
-
-
#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
-
#[rkyv(compare(PartialEq), derive(Debug))]
-
pub struct NsidHit {
-
pub deleted: bool,
-
}
-
-
pub struct EventRecord {
-
pub nsid: SmolStr,
-
pub timestamp: u64,
-
pub deleted: bool,
-
}
-
-
impl EventRecord {
-
pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
-
match event {
-
JetstreamEvent::Commit {
-
time_us, commit, ..
-
} => Some(Self {
-
nsid: commit.collection.into(),
-
timestamp: time_us,
-
deleted: false,
-
}),
-
JetstreamEvent::Delete {
-
time_us, commit, ..
-
} => Some(Self {
-
nsid: commit.collection.into(),
-
timestamp: time_us,
-
deleted: true,
-
}),
-
_ => None,
-
}
-
}
-
}
-
-
// counts is nsid -> NsidCounts
-
// hits is tree per nsid: timestamp -> NsidHit
-
pub struct Db {
-
inner: Keyspace,
-
hits: papaya::HashMap<SmolStr, Partition>,
-
counts: Partition,
-
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
-
eps: Rate,
-
}
-
-
impl Db {
-
pub fn new(path: impl AsRef<Path>) -> AppResult<Self> {
-
tracing::info!("opening db...");
-
let ks = Config::new(path)
-
.cache_size(8 * 1024 * 1024) // from talna
-
.open()?;
-
Ok(Self {
-
hits: Default::default(),
-
counts: ks.open_partition(
-
"_counts",
-
PartitionCreateOptions::default().compression(fjall::CompressionType::None),
-
)?,
-
inner: ks,
-
event_broadcaster: broadcast::channel(1000).0,
-
eps: Rate::new(Duration::from_secs(1)),
-
})
-
}
-
-
pub fn eps(&self) -> usize {
-
self.eps.rate(&()) as usize
-
}
-
-
pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
-
self.event_broadcaster.subscribe()
-
}
-
-
#[inline(always)]
-
fn run_in_nsid_tree<T>(
-
&self,
-
nsid: &str,
-
f: impl FnOnce(&Partition) -> AppResult<T>,
-
) -> AppResult<T> {
-
f(self.hits.pin().get_or_insert_with(SmolStr::new(nsid), || {
-
let opts = PartitionCreateOptions::default()
-
.compression(fjall::CompressionType::Miniz(9))
-
.compaction_strategy(fjall::compaction::Strategy::Fifo(fjall::compaction::Fifo {
-
limit: 5 * 1024 * 1024 * 1024, // 5 gb
-
ttl_seconds: Some(60 * 60 * 24 * 30), // 30 days
-
}));
-
self.inner.open_partition(nsid, opts).unwrap()
-
}))
-
}
-
-
pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
-
let EventRecord {
-
nsid,
-
timestamp,
-
deleted,
-
} = e;
-
-
self.insert_event(&nsid, timestamp, deleted)?;
-
// increment count
-
let mut counts = self.get_count(&nsid)?;
-
counts.last_seen = timestamp;
-
if deleted {
-
counts.deleted_count += 1;
-
} else {
-
counts.count += 1;
-
}
-
self.insert_count(&nsid, counts.clone())?;
-
if self.event_broadcaster.receiver_count() > 0 {
-
let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts));
-
}
-
self.eps.observe(&(), 1);
-
Ok(())
-
}
-
-
#[inline(always)]
-
fn insert_event(&self, nsid: &str, timestamp: u64, deleted: bool) -> AppResult<()> {
-
self.run_in_nsid_tree(nsid, |tree| {
-
tree.insert(
-
timestamp.to_be_bytes(),
-
unsafe { rkyv::to_bytes::<Error>(&NsidHit { deleted }).unwrap_unchecked() }
-
.as_slice(),
-
)
-
.map_err(AppError::from)
-
})
-
}
-
-
#[inline(always)]
-
fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> {
-
self.counts
-
.insert(
-
nsid,
-
unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(),
-
)
-
.map_err(AppError::from)
-
}
-
-
pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
-
let Some(raw) = self.counts.get(nsid)? else {
-
return Ok(NsidCounts::default());
-
};
-
Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
-
}
-
-
pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
-
self.counts.iter().map(|res| {
-
res.map_err(AppError::from).map(|(key, val)| {
-
(
-
SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
-
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
-
)
-
})
-
})
-
}
-
-
pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str>> {
-
self.inner
-
.list_partitions()
-
.into_iter()
-
.filter(|k| k.deref() != "_counts")
-
}
-
-
pub fn get_hits(
-
&self,
-
nsid: &str,
-
) -> AppResult<impl Iterator<Item = AppResult<(u64, NsidHit)>>> {
-
self.run_in_nsid_tree(nsid, |tree| {
-
Ok(tree.iter().map(|res| {
-
res.map_err(AppError::from).map(|(key, val)| {
-
(
-
u64::from_be_bytes(key.as_ref().try_into().unwrap()),
-
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
-
)
-
})
-
}))
-
})
-
}
-
}
+21 -11
server/src/jetstream.rs
···
pub struct JetstreamClient {
stream: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
tls_connector: tokio_websockets::Connector,
-
url: SmolStr,
+
urls: Vec<SmolStr>,
}
impl JetstreamClient {
-
pub fn new(url: &str) -> AppResult<Self> {
+
pub fn new(urls: impl IntoIterator<Item = impl Into<SmolStr>>) -> AppResult<Self> {
Ok(Self {
stream: None,
tls_connector: tokio_websockets::Connector::new()?,
-
url: SmolStr::new(url),
+
urls: urls.into_iter().map(Into::into).collect(),
})
}
pub async fn connect(&mut self) -> AppResult<()> {
-
let (stream, _) = ClientBuilder::new()
-
.connector(&self.tls_connector)
-
.uri(&self.url)?
-
.connect()
-
.await?;
-
self.stream = Some(stream);
-
tracing::info!("connected to jetstream ({})", self.url);
-
Ok(())
+
for uri in &self.urls {
+
let conn_result = ClientBuilder::new()
+
.connector(&self.tls_connector)
+
.uri(uri)?
+
.connect()
+
.await;
+
match conn_result {
+
Ok((stream, _)) => {
+
self.stream = Some(stream);
+
tracing::info!("connected to jetstream {}", uri);
+
return Ok(());
+
}
+
Err(err) => {
+
tracing::error!("failed to connect to jetstream {uri}: {err}");
+
}
+
};
+
}
+
Err(anyhow!("failed to connect to any jetstream server").into())
}
// automatically retries connection, only returning error if it fails many times
+264 -47
server/src/main.rs
···
-
use std::{ops::Deref, sync::Arc};
+
use std::{ops::Deref, time::Duration, u64, usize};
+
use itertools::Itertools;
+
use rclite::Arc;
use smol_str::ToSmolStr;
-
#[cfg(not(target_env = "msvc"))]
-
use tikv_jemallocator::Jemalloc;
use tokio_util::sync::CancellationToken;
use tracing::Level;
use tracing_subscriber::EnvFilter;
use crate::{
api::serve,
-
db::{Db, EventRecord},
+
db::{Db, DbConfig, EventRecord},
error::AppError,
jetstream::JetstreamClient,
+
utils::{CLOCK, RelativeDateTime, get_time},
};
mod api;
mod db;
mod error;
mod jetstream;
+
mod utils;
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
-
static GLOBAL: Jemalloc = Jemalloc;
+
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[tokio::main]
async fn main() {
···
.compact()
.init();
-
if std::env::args()
-
.nth(1)
-
.map_or(false, |arg| arg == "migrate")
-
{
-
migrate_to_miniz();
-
return;
+
match std::env::args().nth(1).as_deref() {
+
Some("compact") => {
+
compact();
+
return;
+
}
+
Some("migrate") => {
+
migrate();
+
return;
+
}
+
Some("debug") => {
+
debug();
+
return;
+
}
+
Some("print") => {
+
print_all();
+
return;
+
}
+
Some(x) => {
+
tracing::error!("unknown command: {}", x);
+
return;
+
}
+
None => {}
}
-
let db = Arc::new(Db::new(".fjall_data").expect("couldnt create db"));
+
let cancel_token = CancellationToken::new();
+
+
let db = Arc::new(
+
Db::new(DbConfig::default(), cancel_token.child_token()).expect("couldnt create db"),
+
);
rustls::crypto::ring::default_provider()
.install_default()
.expect("cant install rustls crypto provider");
-
let mut jetstream =
-
match JetstreamClient::new("wss://jetstream2.us-west.bsky.network/subscribe") {
-
Ok(client) => client,
-
Err(err) => {
-
tracing::error!("can't create jetstream client: {err}");
-
return;
-
}
-
};
+
let urls = [
+
"wss://jetstream1.us-west.bsky.network/subscribe",
+
"wss://jetstream2.us-west.bsky.network/subscribe",
+
"wss://jetstream2.fr.hose.cam/subscribe",
+
"wss://jetstream.fire.hose.cam/subscribe",
+
];
+
let mut jetstream = match JetstreamClient::new(urls) {
+
Ok(client) => client,
+
Err(err) => {
+
tracing::error!("can't create jetstream client: {err}");
+
return;
+
}
+
};
-
let cancel_token = CancellationToken::new();
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000);
-
let consume_events = tokio::spawn({
let consume_cancel = cancel_token.child_token();
async move {
···
let Some(record) = EventRecord::from_jetstream(event) else {
continue;
};
-
let _ = event_tx.send(record).await;
+
event_tx.send(record).await?;
}
Err(err) => return Err(err),
},
···
let ingest_events = std::thread::spawn({
let db = db.clone();
move || {
-
tracing::info!("starting ingest events thread...");
-
while let Some(e) = event_rx.blocking_recv() {
-
if let Err(e) = db.record_event(e) {
-
tracing::error!("failed to record event: {}", e);
+
let mut buffer = Vec::new();
+
loop {
+
let read = event_rx.blocking_recv_many(&mut buffer, 500);
+
if let Err(err) = db.ingest_events(buffer.drain(..)) {
+
tracing::error!("failed to ingest events: {}", err);
+
}
+
if read == 0 || db.is_shutting_down() {
+
break;
+
}
+
}
+
}
+
});
+
+
let db_task = tokio::task::spawn({
+
let db = db.clone();
+
async move {
+
let sync_period = Duration::from_secs(10);
+
let mut sync_interval = tokio::time::interval(sync_period);
+
sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
+
+
let compact_period = std::time::Duration::from_secs(60 * 30); // 30 mins
+
let mut compact_interval = tokio::time::interval(compact_period);
+
compact_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
+
+
loop {
+
let sync_db = async || {
+
tokio::task::spawn_blocking({
+
let db = db.clone();
+
move || {
+
if db.is_shutting_down() {
+
return;
+
}
+
match db.sync(false) {
+
Ok(_) => (),
+
Err(e) => tracing::error!("failed to sync db: {}", e),
+
}
+
}
+
})
+
.await
+
.unwrap();
+
};
+
let compact_db = async || {
+
tokio::task::spawn_blocking({
+
let db = db.clone();
+
move || {
+
if db.is_shutting_down() {
+
return;
+
}
+
let end = get_time();
+
let start = end - compact_period;
+
let range = start.as_secs()..end.as_secs();
+
tracing::info!(
+
{
+
start = %RelativeDateTime::from_now(start),
+
end = %RelativeDateTime::from_now(end),
+
},
+
"running compaction...",
+
);
+
match db.compact_all(db.cfg.max_block_size, range, false) {
+
Ok(_) => (),
+
Err(e) => tracing::error!("failed to compact db: {}", e),
+
}
+
}
+
})
+
.await
+
.unwrap();
+
};
+
tokio::select! {
+
_ = sync_interval.tick() => sync_db().await,
+
_ = compact_interval.tick() => compact_db().await,
+
_ = db.shutting_down() => break,
}
}
}
});
tokio::select! {
-
res = serve(db, cancel_token.child_token()) => {
+
res = serve(db.clone(), cancel_token.child_token()) => {
if let Err(e) = res {
tracing::error!("serve failed: {}", e);
}
···
}
tracing::info!("shutting down...");
-
ingest_events
-
.join()
-
.expect("failed to join ingest events thread");
+
cancel_token.cancel();
+
ingest_events.join().expect("failed to join ingest events");
+
db_task.await.expect("cant join db task");
+
db.sync(true).expect("cant sync db");
}
-
fn migrate_to_miniz() {
-
let from = Db::new(".fjall_data").expect("couldnt create db");
-
let to = Db::new(".fjall_data_miniz").expect("couldnt create db");
+
fn print_all() {
+
let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
+
let nsids = db.get_nsids().collect::<Vec<_>>();
+
let mut count = 0_usize;
+
for nsid in nsids {
+
println!("{}:", nsid.deref());
+
for hit in db.get_hits(&nsid, .., usize::MAX) {
+
let hit = hit.expect("aaa");
+
println!("{} {}", hit.timestamp, hit.deser().unwrap().deleted);
+
count += 1;
+
}
+
}
+
println!("total hits: {}", count);
+
}
-
let mut total_count = 0_u64;
-
for nsid in from.get_nsids() {
-
tracing::info!("migrating {} ...", nsid.deref());
-
for hit in from.get_hits(&nsid).expect("cant read hits") {
-
let (timestamp, data) = hit.expect("cant read event");
-
to.record_event(EventRecord {
-
nsid: nsid.to_smolstr(),
-
timestamp,
-
deleted: data.deleted,
-
})
-
.expect("cant record event");
-
total_count += 1;
+
fn debug() {
+
let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
+
let info = db.info().expect("cant get db info");
+
println!("disk size: {}", info.disk_size);
+
for (nsid, blocks) in info.nsids {
+
print!("{nsid}:");
+
let mut last_size = 0;
+
let mut same_size_count = 0;
+
for item_count in blocks {
+
if item_count == last_size {
+
same_size_count += 1;
+
} else {
+
if same_size_count > 1 {
+
print!("x{}", same_size_count);
+
}
+
print!(" {item_count}");
+
same_size_count = 0;
+
}
+
last_size = item_count;
}
+
print!("\n");
}
+
}
-
tracing::info!("migrated {total_count} events!");
+
fn compact() {
+
let db = Db::new(
+
DbConfig::default().ks(|c| {
+
c.max_journaling_size(u64::MAX)
+
.max_write_buffer_size(u64::MAX)
+
}),
+
CancellationToken::new(),
+
)
+
.expect("couldnt create db");
+
let info = db.info().expect("cant get db info");
+
db.major_compact().expect("cant compact");
+
std::thread::sleep(Duration::from_secs(5));
+
let compacted_info = db.info().expect("cant get db info");
+
println!(
+
"disk size: {} -> {}",
+
info.disk_size, compacted_info.disk_size
+
);
+
for (nsid, blocks) in info.nsids {
+
println!(
+
"{nsid}: {} -> {}",
+
blocks.len(),
+
compacted_info.nsids[&nsid].len()
+
)
+
}
+
}
+
+
fn migrate() {
+
let cancel_token = CancellationToken::new();
+
let from = Arc::new(
+
Db::new(
+
DbConfig::default().path(".fjall_data_from"),
+
cancel_token.child_token(),
+
)
+
.expect("couldnt create db"),
+
);
+
let to = Arc::new(
+
Db::new(
+
DbConfig::default().path(".fjall_data_to").ks(|c| {
+
c.max_journaling_size(u64::MAX)
+
.max_write_buffer_size(u64::MAX)
+
.compaction_workers(rayon::current_num_threads() * 4)
+
.flush_workers(rayon::current_num_threads() * 4)
+
}),
+
cancel_token.child_token(),
+
)
+
.expect("couldnt create db"),
+
);
+
+
let nsids = from.get_nsids().collect::<Vec<_>>();
+
let _eps_thread = std::thread::spawn({
+
let to = to.clone();
+
move || {
+
loop {
+
std::thread::sleep(Duration::from_secs(3));
+
let eps = to.eps();
+
if eps > 0 {
+
tracing::info!("{} rps", eps);
+
}
+
}
+
}
+
});
+
let mut threads = Vec::with_capacity(nsids.len());
+
let start = CLOCK.now();
+
for nsid in nsids {
+
let from = from.clone();
+
let to = to.clone();
+
threads.push(std::thread::spawn(move || {
+
tracing::info!("{}: migrating...", nsid.deref());
+
let mut count = 0_u64;
+
for hits in from
+
.get_hits(&nsid, .., usize::MAX)
+
.chunks(100000)
+
.into_iter()
+
{
+
to.ingest_events(hits.map(|hit| {
+
count += 1;
+
let hit = hit.expect("cant decode hit");
+
EventRecord {
+
nsid: nsid.to_smolstr(),
+
timestamp: hit.timestamp,
+
deleted: hit.deser().unwrap().deleted,
+
}
+
}))
+
.expect("cant record event");
+
}
+
tracing::info!("{}: ingested {} events...", nsid.deref(), count);
+
count
+
}));
+
}
+
let mut total_count = 0_u64;
+
for thread in threads {
+
let count = thread.join().expect("thread panicked");
+
total_count += count;
+
}
+
let read_time = start.elapsed();
+
let read_per_second = total_count as f64 / read_time.as_secs_f64();
+
drop(from);
+
tracing::info!("starting sync!!!");
+
to.sync(true).expect("cant sync");
+
tracing::info!("persisting...");
+
let total_time = start.elapsed();
+
let write_per_second = total_count as f64 / (total_time - read_time).as_secs_f64();
+
tracing::info!(
+
"migrated {total_count} events in {total_time:?} ({read_per_second:.2} rps, {write_per_second:.2} wps)"
+
);
}
+388
server/src/utils.rs
···
+
use std::io::{self, Read, Write};
+
use std::ops::Deref;
+
use std::sync::atomic::{AtomicU64, Ordering};
+
use std::time::Duration;
+
+
use arc_swap::RefCnt;
+
use byteview::ByteView;
+
use ordered_varint::Variable;
+
use rclite::Arc;
+
+
pub fn get_time() -> Duration {
+
std::time::SystemTime::now()
+
.duration_since(std::time::UNIX_EPOCH)
+
.unwrap()
+
}
+
+
pub trait WriteVariableExt: Write {
+
fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> {
+
value.encode_variable(self)
+
}
+
}
+
impl<W: Write> WriteVariableExt for W {}
+
+
pub trait ReadVariableExt: Read {
+
fn read_varint<T: Variable>(&mut self) -> io::Result<T> {
+
T::decode_variable(self)
+
}
+
}
+
impl<R: Read> ReadVariableExt for R {}
+
+
pub struct WritableByteView {
+
view: ByteView,
+
written: usize,
+
}
+
+
impl WritableByteView {
+
// returns None if the view already has a reference to it
+
pub fn with_size(capacity: usize) -> Self {
+
Self {
+
view: ByteView::with_size(capacity),
+
written: 0,
+
}
+
}
+
+
#[inline(always)]
+
pub fn into_inner(self) -> ByteView {
+
self.view
+
}
+
}
+
+
impl Write for WritableByteView {
+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+
let len = buf.len();
+
if len > self.view.len() - self.written {
+
return Err(std::io::Error::new(
+
std::io::ErrorKind::StorageFull,
+
"buffer full",
+
));
+
}
+
// SAFETY: this is safe because we have checked that the buffer is not full
+
// SAFETY: we own the mutator so no other references to the view exist
+
unsafe {
+
std::ptr::copy_nonoverlapping(
+
buf.as_ptr(),
+
self.view
+
.get_mut()
+
.unwrap_unchecked()
+
.as_mut_ptr()
+
.add(self.written),
+
len,
+
);
+
self.written += len;
+
}
+
Ok(len)
+
}
+
+
#[inline(always)]
+
fn flush(&mut self) -> std::io::Result<()> {
+
Ok(())
+
}
+
}
+
+
pub fn varints_unsigned_encoded<const N: usize>(values: [u64; N]) -> ByteView {
+
let mut buf =
+
WritableByteView::with_size(values.into_iter().map(varint_unsigned_encoded_len).sum());
+
for value in values {
+
// cant fail
+
let _ = buf.write_varint(value);
+
}
+
buf.into_inner()
+
}
+
+
// gets the encoded length of a varint-encoded unsigned integer
+
// see ordered_varint
+
pub fn varint_unsigned_encoded_len(value: u64) -> usize {
+
let value = value.to_be_bytes();
+
value
+
.iter()
+
.enumerate()
+
.find_map(|(index, &byte)| {
+
(byte > 0).then(|| {
+
let extra_bytes = 7 - index;
+
(byte < 16)
+
.then(|| extra_bytes + 1)
+
.unwrap_or_else(|| extra_bytes + 2)
+
})
+
})
+
.unwrap_or(0)
+
.max(1)
+
}
+
+
pub static CLOCK: std::sync::LazyLock<quanta::Clock> =
+
std::sync::LazyLock::new(|| quanta::Clock::new());
+
+
/// simple thread-safe rate tracker using time buckets
+
/// divides time into fixed buckets and rotates through them
+
#[derive(Debug)]
+
pub struct RateTracker<const BUCKET_WINDOW: u64> {
+
buckets: Vec<AtomicU64>,
+
last_bucket_time: AtomicU64,
+
bucket_duration_nanos: u64,
+
window_duration: Duration,
+
start_time: u64, // raw time when tracker was created
+
}
+
+
pub type DefaultRateTracker = RateTracker<1000>;
+
+
impl<const BUCKET_WINDOW: u64> RateTracker<BUCKET_WINDOW> {
+
/// create a new rate tracker with the specified time window
+
pub fn new(window_duration: Duration) -> Self {
+
let bucket_duration_nanos = Duration::from_millis(BUCKET_WINDOW).as_nanos() as u64;
+
let num_buckets =
+
(window_duration.as_nanos() as u64 / bucket_duration_nanos).max(1) as usize;
+
+
let mut buckets = Vec::with_capacity(num_buckets);
+
for _ in 0..num_buckets {
+
buckets.push(AtomicU64::new(0));
+
}
+
+
let start_time = CLOCK.raw();
+
Self {
+
buckets,
+
bucket_duration_nanos,
+
window_duration,
+
last_bucket_time: AtomicU64::new(0),
+
start_time,
+
}
+
}
+
+
#[inline(always)]
+
fn elapsed(&self) -> u64 {
+
CLOCK.delta_as_nanos(self.start_time, CLOCK.raw())
+
}
+
+
/// record an event
+
pub fn observe(&self, count: u64) {
+
self.maybe_advance_buckets();
+
+
let bucket_index = self.get_current_bucket_index();
+
self.buckets[bucket_index].fetch_add(count, Ordering::Relaxed);
+
}
+
+
/// get the current rate in events per second
+
pub fn rate(&self) -> f64 {
+
self.maybe_advance_buckets();
+
+
let total_events: u64 = self
+
.buckets
+
.iter()
+
.map(|bucket| bucket.load(Ordering::Relaxed))
+
.sum();
+
+
total_events as f64 / self.window_duration.as_secs_f64()
+
}
+
+
fn get_current_bucket_index(&self) -> usize {
+
let bucket_number = self.elapsed() / self.bucket_duration_nanos;
+
(bucket_number as usize) % self.buckets.len()
+
}
+
+
fn maybe_advance_buckets(&self) {
+
let current_bucket_time =
+
(self.elapsed() / self.bucket_duration_nanos) * self.bucket_duration_nanos;
+
let last_bucket_time = self.last_bucket_time.load(Ordering::Relaxed);
+
+
if current_bucket_time > last_bucket_time {
+
// try to update the last bucket time
+
if self
+
.last_bucket_time
+
.compare_exchange_weak(
+
last_bucket_time,
+
current_bucket_time,
+
Ordering::Relaxed,
+
Ordering::Relaxed,
+
)
+
.is_ok()
+
{
+
// clear buckets that are now too old
+
let buckets_to_advance = ((current_bucket_time - last_bucket_time)
+
/ self.bucket_duration_nanos)
+
.min(self.buckets.len() as u64);
+
+
for i in 0..buckets_to_advance {
+
let bucket_time = last_bucket_time + (i + 1) * self.bucket_duration_nanos;
+
let bucket_index =
+
(bucket_time / self.bucket_duration_nanos) as usize % self.buckets.len();
+
self.buckets[bucket_index].store(0, Ordering::Relaxed);
+
}
+
}
+
}
+
}
+
}
+
+
#[cfg(test)]
+
mod tests {
+
use super::*;
+
use std::sync::Arc;
+
use std::thread;
+
+
#[test]
+
fn test_rate_tracker_basic() {
+
let tracker = DefaultRateTracker::new(Duration::from_secs(2));
+
+
// record some events
+
tracker.observe(3);
+
+
let rate = tracker.rate();
+
assert_eq!(rate, 1.5); // 3 events over 2 seconds = 1.5 events/sec
+
}
+
+
#[test]
+
fn test_rate_tracker_burst() {
+
let tracker = DefaultRateTracker::new(Duration::from_secs(1));
+
+
// record a lot of events
+
tracker.observe(1000);
+
+
let rate = tracker.rate();
+
assert_eq!(rate, 1000.0); // 1000 events in 1 second
+
}
+
+
#[test]
+
fn test_rate_tracker_threading() {
+
let tracker = Arc::new(DefaultRateTracker::new(Duration::from_secs(1)));
+
let mut handles = vec![];
+
+
for _ in 0..4 {
+
let tracker_clone = Arc::clone(&tracker);
+
let handle = thread::spawn(move || {
+
tracker_clone.observe(10);
+
});
+
handles.push(handle);
+
}
+
+
for handle in handles {
+
handle.join().unwrap();
+
}
+
+
let rate = tracker.rate();
+
assert_eq!(rate, 40.0); // 40 events in 1 second
+
}
+
}
+
+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+
pub enum TimeDirection {
+
Backwards, // Past (default)
+
Forwards, // Future
+
}
+
+
impl Default for TimeDirection {
+
fn default() -> Self {
+
TimeDirection::Backwards
+
}
+
}
+
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct RelativeDateTime {
+
duration: Duration,
+
direction: TimeDirection,
+
}
+
+
impl RelativeDateTime {
+
pub fn new(duration: Duration, direction: TimeDirection) -> Self {
+
Self {
+
duration,
+
direction,
+
}
+
}
+
+
pub fn from_now(duration: Duration) -> Self {
+
let cur = get_time();
+
if duration > cur {
+
Self::new(duration - cur, TimeDirection::Forwards)
+
} else {
+
Self::new(cur - duration, TimeDirection::Backwards)
+
}
+
}
+
}
+
+
impl std::fmt::Display for RelativeDateTime {
+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
let secs = self.duration.as_secs();
+
+
if secs == 0 {
+
return write!(f, "now");
+
}
+
+
let (amount, unit) = match secs {
+
0 => unreachable!(), // handled above
+
1..=59 => (secs, "second"),
+
60..=3599 => (secs / 60, "minute"),
+
3600..=86399 => (secs / 3600, "hour"),
+
86400..=2591999 => (secs / 86400, "day"), // up to 29 days
+
2592000..=31535999 => (secs / 2592000, "month"), // 30 days to 364 days
+
_ => (secs / 31536000, "year"), // 365 days+
+
};
+
+
let plural = if amount != 1 { "s" } else { "" };
+
+
match self.direction {
+
TimeDirection::Forwards => write!(f, "in {} {}{}", amount, unit, plural),
+
TimeDirection::Backwards => write!(f, "{} {}{} ago", amount, unit, plural),
+
}
+
}
+
}
+
+
pub type ArcliteSwap<T> = arc_swap::ArcSwapAny<ArcRefCnt<T>>;
+
+
pub struct ArcRefCnt<T>(Arc<T>);
+
+
impl<T> ArcRefCnt<T> {
+
pub fn new(value: T) -> Self {
+
Self(Arc::new(value))
+
}
+
}
+
+
impl<T> Deref for ArcRefCnt<T> {
+
type Target = T;
+
+
fn deref(&self) -> &Self::Target {
+
&self.0
+
}
+
}
+
+
impl<T> Clone for ArcRefCnt<T> {
+
fn clone(&self) -> Self {
+
Self(self.0.clone())
+
}
+
}
+
+
// SAFETY: uhhhhhhhh copied the Arc impl from arc_swap xd
+
unsafe impl<T> RefCnt for ArcRefCnt<T> {
+
type Base = T;
+
+
fn into_ptr(me: Self) -> *mut Self::Base {
+
Arc::into_raw(me.0) as *mut T
+
}
+
+
fn as_ptr(me: &Self) -> *mut Self::Base {
+
// Slightly convoluted way to do this, but this avoids stacked borrows violations. The same
+
// intention as
+
//
+
// me as &T as *const T as *mut T
+
//
+
// We first create a "shallow copy" of me - one that doesn't really own its ref count
+
// (that's OK, me _does_ own it, so it can't be destroyed in the meantime).
+
// Then we can use into_raw (which preserves not having the ref count).
+
//
+
// We need to "revert" the changes we did. In current std implementation, the combination
+
// of from_raw and forget is no-op. But formally, into_raw shall be paired with from_raw
+
// and that read shall be paired with forget to properly "close the brackets". In future
+
// versions of STD, these may become something else that's not really no-op (unlikely, but
+
// possible), so we future-proof it a bit.
+
+
// SAFETY: &T cast to *const T will always be aligned, initialised and valid for reads
+
let ptr = Arc::into_raw(unsafe { std::ptr::read(&me.0) });
+
let ptr = ptr as *mut T;
+
+
// SAFETY: We got the pointer from into_raw just above
+
std::mem::forget(unsafe { Arc::from_raw(ptr) });
+
+
ptr
+
}
+
+
unsafe fn from_ptr(ptr: *const Self::Base) -> Self {
+
Self(unsafe { Arc::from_raw(ptr) })
+
}
+
}