tracks lexicons and how many times they appeared on the jetstream

Compare changes

Choose any two refs to compare.

+2
.gitignore
···
result
server/bsky_zstd_dictionary
+
fjall_data*
+
.fjall_data*
+2 -1
README.md
···
-
a webapp and server that monitors bluesky's jetsream and counts how many times different types of records are created or deleted. it shows you which collections (like posts, likes, follows, etc.) are most active on the network.
+
a webapp and server that monitors the jetstream and tracks the different lexicons as they are created or deleted.
+
it shows you which collections are most active on the network.
for backend it uses rust with fjall as db, the frontend is built with sveltekit.
+7
client/bun.lock
···
"workspaces": {
"": {
"name": "nsid-tracker",
+
"dependencies": {
+
"@number-flow/svelte": "^0.3.9",
+
},
"devDependencies": {
"@eslint/compat": "^1.2.5",
"@eslint/js": "^9.18.0",
···
"@nodelib/fs.stat": ["@nodelib/fs.stat@2.0.5", "", {}, "sha512-RkhPPp2zrqDAQA/2jNhnztcPAlv64XdhIp7a7454A5ovI7Bukxgt7MX7udwAu3zg1DcpPU0rz3VV1SeaqvY4+A=="],
"@nodelib/fs.walk": ["@nodelib/fs.walk@1.2.8", "", { "dependencies": { "@nodelib/fs.scandir": "2.1.5", "fastq": "^1.6.0" } }, "sha512-oGB+UxlgWcgQkgwo8GcEGwemoTFt3FIO9ababBmaGwXIoBKZ+GTy0pP185beGg7Llih/NSHSV2XAs1lnznocSg=="],
+
+
"@number-flow/svelte": ["@number-flow/svelte@0.3.9", "", { "dependencies": { "esm-env": "^1.1.4", "number-flow": "0.5.8" }, "peerDependencies": { "svelte": "^4 || ^5" } }, "sha512-CTw1+e0074GzbPX2IHcNCaK8nqxGNCOIUnQUjEjhcmBwBxOAhN3GYLQ6cJHvhQnWwplVe4eQ3z+c25Vttr2stQ=="],
"@polka/url": ["@polka/url@1.0.0-next.29", "", {}, "sha512-wwQAWhWSuHaag8c4q/KN/vCoeOJYshAIvMQwD4GpSb3OiZklFfvAgmj0VCBBImRpuF/aFgIRzllXlVX93Jevww=="],
···
"nanoid": ["nanoid@3.3.11", "", { "bin": { "nanoid": "bin/nanoid.cjs" } }, "sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w=="],
"natural-compare": ["natural-compare@1.4.0", "", {}, "sha512-OWND8ei3VtNC9h7V60qff3SVobHr996CTwgxubgyQYEpg290h9J0buyECNNJexkFm5sOajh5G116RYA1c8ZMSw=="],
+
+
"number-flow": ["number-flow@0.5.8", "", { "dependencies": { "esm-env": "^1.1.4" } }, "sha512-FPr1DumWyGi5Nucoug14bC6xEz70A1TnhgSHhKyfqjgji2SOTz+iLJxKtv37N5JyJbteGYCm6NQ9p1O4KZ7iiA=="],
"optionator": ["optionator@0.9.4", "", { "dependencies": { "deep-is": "^0.1.3", "fast-levenshtein": "^2.0.6", "levn": "^0.4.1", "prelude-ls": "^1.2.1", "type-check": "^0.4.0", "word-wrap": "^1.2.5" } }, "sha512-6IpQ7mKUxRcZNLIObR0hz7lxsapSSIYNZJwXPGeF0mTVqGKFIXj1DQcMoT22S3ROcLyY/rz0PWaWZ9ayWmad9g=="],
+4 -1
client/package.json
···
"check:watch": "svelte-kit sync && svelte-check --tsconfig ./tsconfig.json --watch",
"lint": "eslint ."
},
-
"type": "module"
+
"type": "module",
+
"dependencies": {
+
"@number-flow/svelte": "^0.3.9"
+
}
}
+13 -1
client/src/lib/api.ts
···
import { dev } from "$app/environment";
-
import type { Events } from "./types";
+
import type { Events, Since } from "./types";
import { PUBLIC_API_URL } from "$env/static/public";
export const fetchEvents = async (): Promise<Events> => {
···
const data = await response.json();
return data;
};
+
+
export const fetchTrackingSince = async (): Promise<Since> => {
+
const response = await fetch(
+
`${dev ? "http" : "https"}://${PUBLIC_API_URL}/since`,
+
);
+
if (!response.ok) {
+
throw new Error(`(${response.status}): ${await response.json()}`);
+
}
+
+
const data = await response.json();
+
return data;
+
};
+25
client/src/lib/components/BskyToggle.svelte
···
+
<script lang="ts">
+
interface Props {
+
dontShowBsky: boolean;
+
onBskyToggle: () => void;
+
}
+
+
let { dontShowBsky, onBskyToggle }: Props = $props();
+
</script>
+
+
<!-- svelte-ignore a11y_click_events_have_key_events -->
+
<!-- svelte-ignore a11y_no_static_element_interactions -->
+
<button
+
onclick={onBskyToggle}
+
class="wsbadge !mt-0 !font-normal bg-yellow-100 hover:bg-yellow-200 border-yellow-300"
+
>
+
<input checked={dontShowBsky} type="checkbox" />
+
<span class="ml-0.5"> hide app.bsky.* </span>
+
</button>
+
+
<style lang="postcss">
+
@reference "../../app.css";
+
.wsbadge {
+
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
+
}
+
</style>
+21 -14
client/src/lib/components/EventCard.svelte
···
<script lang="ts">
import { formatNumber, formatTimestamp } from "$lib/format";
-
import type { EventRecord } from "$lib/types";
+
import type { NsidCount } from "$lib/types";
import { onMount, onDestroy } from "svelte";
interface Props {
-
event: EventRecord;
+
event: NsidCount;
index: number;
}
···
let isAnimating = $state(false);
// Constants for border behavior
-
const MAX_BORDER_THICKNESS = 7; // Maximum border thickness in pixels
+
const MAX_BORDER_THICKNESS = 6; // Maximum border thickness in pixels
const INITIAL_THICKNESS_ADD = 2; // How much thickness to add for first/slow events
const RAPID_SUCCESSION_THRESHOLD = 50; // ms - events faster than this are considered rapid
const DECAY_RATE = 0.1; // How much thickness to remove per decay tick
···
</script>
<div
-
class="mx-auto md:mx-0 bg-white border border-gray-200 rounded-lg p-2 md:p-6 hover:shadow-lg transition-all duration-200 hover:-translate-y-1 transform"
+
class="group flex flex-col gap-2 p-1.5 md:p-3 min-h-64 bg-white border border-gray-200 rounded-lg hover:shadow-lg md:hover:-translate-y-1 transition-all duration-200 transform"
class:has-activity={isAnimating}
style="--border-thickness: {borderThickness}px"
>
-
<div class="flex justify-between items-start mb-3">
+
<div class="flex items-start gap-2">
<div
class="text-sm font-bold text-blue-600 bg-blue-100 px-3 py-1 rounded-full"
>
#{index + 1}
</div>
+
<div
+
title={event.nsid}
+
class="font-mono text-sm text-gray-700 mt-0.5 leading-relaxed rounded-full text-nowrap text-ellipsis overflow-hidden group-hover:overflow-visible group-hover:bg-gray-50 border-gray-100 group-hover:border transition-all px-1"
+
>
+
{event.nsid}
+
</div>
</div>
-
<div class="font-mono text-sm text-gray-700 mb-2 break-all leading-relaxed">
-
{event.nsid}
-
</div>
-
<div class="text-lg font-bold text-green-600">
-
{formatNumber(event.count)} created
-
</div>
-
<div class="text-lg font-bold text-red-600 mb-3">
-
{formatNumber(event.deleted_count)} deleted
+
<div class="mt-auto flex flex-col gap-1">
+
<div class="text-3xl font-bold text-green-600">
+
{formatNumber(event.count)}
+
<div class="text-xl">created</div>
+
</div>
+
<div class="text-3xl font-bold text-red-600">
+
{formatNumber(event.deleted_count)}
+
<div class="text-xl">deleted</div>
+
</div>
</div>
-
<div class="text-xs text-gray-500">
+
<div class="text-xs text-gray-500 mt-auto">
last: {formatTimestamp(event.last_seen)}
</div>
</div>
+13 -28
client/src/lib/components/FilterControls.svelte
···
<script lang="ts">
interface Props {
filterRegex: string;
-
dontShowBsky: boolean;
onFilterChange: (value: string) => void;
-
onBskyToggle: () => void;
}
-
let { filterRegex, dontShowBsky, onFilterChange, onBskyToggle }: Props =
-
$props();
+
let { filterRegex, onFilterChange }: Props = $props();
</script>
-
<div class="flex flex-wrap items-center gap-3 mb-6">
-
<div
-
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-blue-100 hover:bg-blue-200 border-blue-300"
-
>
-
<label for="filter-regex" class="text-blue-800 mr-1"> filter: </label>
-
<input
-
id="filter-regex"
-
value={filterRegex}
-
oninput={(e) =>
-
onFilterChange((e.target as HTMLInputElement).value)}
-
type="text"
-
placeholder="regex..."
-
class="bg-blue-50 text-blue-900 placeholder-blue-400 border border-blue-200 rounded-full px-1 outline-none focus:bg-white focus:border-blue-400 min-w-0 w-24"
-
/>
-
</div>
-
<!-- svelte-ignore a11y_click_events_have_key_events -->
-
<!-- svelte-ignore a11y_no_static_element_interactions -->
-
<button
-
onclick={onBskyToggle}
-
class="wsbadge !mt-0 !font-normal bg-yellow-100 hover:bg-yellow-200 border-yellow-300"
-
>
-
<input checked={dontShowBsky} type="checkbox" />
-
<span class="ml-0.5"> hide app.bsky.* </span>
-
</button>
+
<div
+
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-blue-100 hover:bg-blue-200 border-blue-300"
+
>
+
<label for="filter-regex" class="text-blue-800 mr-1"> filter: </label>
+
<input
+
id="filter-regex"
+
value={filterRegex}
+
oninput={(e) => onFilterChange((e.target as HTMLInputElement).value)}
+
type="text"
+
placeholder="regex..."
+
class="bg-blue-50 text-blue-900 placeholder-blue-400 border border-blue-200 rounded-full px-1 outline-none focus:bg-white focus:border-blue-400 min-w-0 w-24"
+
/>
</div>
<style lang="postcss">
+37
client/src/lib/components/RefreshControl.svelte
···
+
<script lang="ts">
+
interface Props {
+
refreshRate: string;
+
onRefreshChange: (value: string) => void;
+
}
+
+
let { refreshRate, onRefreshChange }: Props = $props();
+
</script>
+
+
<div
+
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-green-100 hover:bg-green-200 border-green-300"
+
>
+
<label for="refresh-rate" class="text-green-800 mr-1">refresh:</label>
+
<input
+
id="refresh-rate"
+
value={refreshRate}
+
oninput={(e) => {
+
const el = e.target as HTMLInputElement;
+
if (!el.validity.valid) el.value = el.value.replace(/\D+/g, "");
+
onRefreshChange(el.value);
+
}}
+
type="text"
+
inputmode="numeric"
+
pattern="[0-9]*"
+
min="0"
+
placeholder="real-time"
+
class="bg-green-50 text-green-900 placeholder-green-400 border border-green-200 rounded-full px-1 outline-none focus:bg-white focus:border-green-400 min-w-0 w-20"
+
/>
+
<span class="text-green-700">s</span>
+
</div>
+
+
<style lang="postcss">
+
@reference "../../app.css";
+
.wsbadge {
+
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
+
}
+
</style>
+41
client/src/lib/components/SortControls.svelte
···
+
<script lang="ts">
+
import type { SortOption } from "$lib/types";
+
+
interface Props {
+
sortBy: SortOption;
+
onSortChange: (value: SortOption) => void;
+
}
+
+
let { sortBy, onSortChange }: Props = $props();
+
+
const sortOptions = [
+
{ value: "total" as const, label: "total count" },
+
{ value: "created" as const, label: "created count" },
+
{ value: "deleted" as const, label: "deleted count" },
+
{ value: "date" as const, label: "newest first" },
+
];
+
</script>
+
+
<div
+
class="wsbadge !pl-2 !px-1 !mt-0 !font-normal bg-purple-100 hover:bg-purple-200 border-purple-300"
+
>
+
<label for="sort-by" class="text-purple-800 mr-1"> sort by: </label>
+
<select
+
id="sort-by"
+
value={sortBy}
+
onchange={(e) =>
+
onSortChange((e.target as HTMLSelectElement).value as SortOption)}
+
class="bg-purple-50 text-purple-900 border border-purple-200 rounded-full px-1 outline-none focus:bg-white focus:border-purple-400 min-w-0"
+
>
+
{#each sortOptions as option}
+
<option value={option.value}>{option.label}</option>
+
{/each}
+
</select>
+
</div>
+
+
<style lang="postcss">
+
@reference "../../app.css";
+
.wsbadge {
+
@apply text-sm font-semibold mt-1.5 px-2.5 py-0.5 rounded-full border;
+
}
+
</style>
+5 -6
client/src/lib/components/StatsCard.svelte
···
<script lang="ts">
import { formatNumber } from "$lib/format";
+
import NumberFlow from "@number-flow/svelte";
const colorClasses = {
green: {
···
const colors = $derived(colorClasses[colorScheme]);
</script>
-
<div
-
class="bg-gradient-to-r {colors.bg} p-3 md:p-6 rounded-lg border {colors.border}"
-
>
-
<h3 class="text-sm md:text-base font-medium {colors.titleText} mb-2">
+
<div class="bg-gradient-to-r {colors.bg} p-3 rounded-lg border {colors.border}">
+
<h3 class="text-base font-medium {colors.titleText} mb-2">
{title}
</h3>
-
<p class="text-base md:text-3xl font-bold {colors.valueText}">
-
{formatNumber(value)}
+
<p class="text-xl md:text-2xl font-bold {colors.valueText}">
+
<NumberFlow {value} />
</p>
</div>
+4 -4
client/src/lib/components/StatusBadge.svelte
···
const statusConfig = {
connected: {
-
text: "live",
+
text: "stream live",
classes: "bg-green-100 text-green-800 border-green-200",
},
connecting: {
-
text: "connecting",
+
text: "stream connecting",
classes: "bg-yellow-100 text-yellow-800 border-yellow-200",
},
error: {
-
text: "error",
+
text: "stream errored",
classes: "bg-red-100 text-red-800 border-red-200",
},
disconnected: {
-
text: "offline",
+
text: "stream offline",
classes: "bg-gray-100 text-gray-800 border-gray-200",
},
};
+5
client/src/lib/types.ts
···
count: number;
deleted_count: number;
};
+
export type Since = {
+
since: number;
+
};
+
+
export type SortOption = "total" | "created" | "deleted" | "date";
+126 -31
client/src/routes/+page.svelte
···
<script lang="ts">
import { dev } from "$app/environment";
-
import type { EventRecord, NsidCount } from "$lib/types";
+
import type { EventRecord, NsidCount, SortOption } from "$lib/types";
import { onMount, onDestroy } from "svelte";
import { writable } from "svelte/store";
import { PUBLIC_API_URL } from "$env/static/public";
-
import { fetchEvents } from "$lib/api";
+
import { fetchEvents, fetchTrackingSince } from "$lib/api";
import { createRegexFilter } from "$lib/filter";
import StatsCard from "$lib/components/StatsCard.svelte";
import StatusBadge from "$lib/components/StatusBadge.svelte";
import EventCard from "$lib/components/EventCard.svelte";
import FilterControls from "$lib/components/FilterControls.svelte";
+
import SortControls from "$lib/components/SortControls.svelte";
+
import BskyToggle from "$lib/components/BskyToggle.svelte";
+
import RefreshControl from "$lib/components/RefreshControl.svelte";
+
import { formatTimestamp } from "$lib/format";
const events = writable(new Map<string, EventRecord>());
+
const pendingUpdates = new Map<string, EventRecord>();
let eventsList: NsidCount[] = $state([]);
+
let updateTimer: NodeJS.Timeout | null = null;
events.subscribe((value) => {
eventsList = value
.entries()
···
...event,
}))
.toArray();
-
eventsList.sort((a, b) => b.count - a.count);
});
let per_second = $state(0);
+
let tracking_since = $state(0);
let all: EventRecord = $derived(
eventsList.reduce(
···
let error: string | null = $state(null);
let filterRegex = $state("");
let dontShowBsky = $state(false);
+
let sortBy: SortOption = $state("total");
+
let refreshRate = $state("");
+
let changedByUser = $state(false);
let websocket: WebSocket | null = null;
let isStreamOpen = $state(false);
···
if (jsonData.per_second > 0) {
per_second = jsonData.per_second;
}
-
events.update((map) => {
+
+
// Store updates in pending map if refresh rate is set
+
if (refreshRate) {
for (const [nsid, event] of Object.entries(jsonData.events)) {
-
map.set(nsid, event as EventRecord);
+
pendingUpdates.set(nsid, event as EventRecord);
}
-
return map;
-
});
+
} else {
+
// Apply updates immediately if no refresh rate
+
events.update((map) => {
+
for (const [nsid, event] of Object.entries(
+
jsonData.events,
+
)) {
+
map.set(nsid, event as EventRecord);
+
}
+
return map;
+
});
+
}
};
websocket.onerror = (error) => {
console.error("ws error:", error);
···
}
return map;
});
+
tracking_since = (await fetchTrackingSince()).since;
} catch (err) {
error =
err instanceof Error
···
}
};
+
// Update the refresh timer when refresh rate changes
+
$effect(() => {
+
if (updateTimer) {
+
clearInterval(updateTimer);
+
updateTimer = null;
+
}
+
+
if (refreshRate) {
+
const rate = parseInt(refreshRate, 10) * 1000; // Convert to milliseconds
+
if (!isNaN(rate) && rate > 0) {
+
updateTimer = setInterval(() => {
+
if (pendingUpdates.size > 0) {
+
events.update((map) => {
+
for (const [nsid, event] of pendingUpdates) {
+
map.set(nsid, event);
+
}
+
pendingUpdates.clear();
+
return map;
+
});
+
}
+
}, rate);
+
}
+
}
+
});
+
onMount(() => {
loadData();
connectToStream();
});
onDestroy(() => {
+
// Clear refresh timer
+
if (updateTimer) {
+
clearInterval(updateTimer);
+
updateTimer = null;
+
}
// Close WebSocket connection
if (websocket) {
websocket.close();
}
});
+
const sortEvents = (events: NsidCount[], sortBy: SortOption) => {
+
const sorted = [...events];
+
switch (sortBy) {
+
case "total":
+
sorted.sort(
+
(a, b) =>
+
b.count + b.deleted_count - (a.count + a.deleted_count),
+
);
+
break;
+
case "created":
+
sorted.sort((a, b) => b.count - a.count);
+
break;
+
case "deleted":
+
sorted.sort((a, b) => b.deleted_count - a.deleted_count);
+
break;
+
case "date":
+
sorted.sort((a, b) => b.last_seen - a.last_seen);
+
break;
+
}
+
return sorted;
+
};
+
const filterEvents = (events: NsidCount[]) => {
let filtered = events;
···
</script>
<svelte:head>
-
<title>bluesky jetstream tracker</title>
+
<title>lexicon tracker</title>
<meta
name="description"
content="tracks bluesky jetstream events by collection"
/>
</svelte:head>
-
<div class="md:max-w-[60vw] mx-auto p-2">
-
<header class="text-center mb-8">
-
<h1 class="text-4xl font-bold mb-2 text-gray-900">
-
bluesky event tracker
-
</h1>
-
<p class="text-gray-600">
-
tracking of bluesky events by collection from the jetstream
-
</p>
-
</header>
-
+
<header class="border-gray-300 border-b mb-4 pb-2">
<div
-
class="mx-auto w-fit grid grid-cols-2 md:grid-cols-4 gap-2 md:gap-5 mb-8"
+
class="px-2 md:ml-[19vw] mx-auto flex flex-wrap items-center text-center"
>
+
<h1 class="text-4xl font-bold mr-4 text-gray-900">lexicon tracker</h1>
+
<p class="text-lg mt-1 text-gray-600">
+
tracks lexicons seen on the jetstream {tracking_since === 0
+
? ""
+
: `(since: ${formatTimestamp(tracking_since)})`}
+
</p>
+
</div>
+
</header>
+
<div class="md:max-w-[61vw] mx-auto p-2">
+
<div class="min-w-fit grid grid-cols-2 xl:grid-cols-4 gap-2 2xl:gap-6 mb-8">
<StatsCard
title="total creation"
value={all.count}
···
{#if eventsList.length > 0}
<div class="mb-8">
<div class="flex flex-wrap items-center gap-3 mb-3">
-
<h2 class="text-2xl font-bold text-gray-900">
-
events by collection
-
</h2>
+
<h2 class="text-2xl font-bold text-gray-900">seen lexicons</h2>
<StatusBadge status={websocketStatus} />
</div>
-
<FilterControls
-
{filterRegex}
-
{dontShowBsky}
-
onFilterChange={(value) => (filterRegex = value)}
-
onBskyToggle={() => (dontShowBsky = !dontShowBsky)}
-
/>
-
<div class="grid grid-cols-2 lg:grid-cols-3 xl:grid-cols-4 gap-4">
-
{#each filterEvents(eventsList) as event, index (event.nsid)}
+
<div class="flex flex-wrap items-center gap-1.5 mb-6">
+
<FilterControls
+
{filterRegex}
+
onFilterChange={(value) => (filterRegex = value)}
+
/>
+
<BskyToggle
+
{dontShowBsky}
+
onBskyToggle={() => (dontShowBsky = !dontShowBsky)}
+
/>
+
<SortControls
+
{sortBy}
+
onSortChange={(value: SortOption) => {
+
sortBy = value;
+
if (refreshRate === "" && sortBy === "date")
+
refreshRate = "2";
+
else if (refreshRate === "2" && changedByUser === false)
+
refreshRate = "";
+
}}
+
/>
+
<RefreshControl
+
{refreshRate}
+
onRefreshChange={(value) => {
+
refreshRate = value;
+
changedByUser = refreshRate !== "";
+
}}
+
/>
+
</div>
+
<div
+
class="grid grid-cols-1 lg:grid-cols-2 xl:grid-cols-3 2xl:grid-cols-4 gap-4"
+
>
+
{#each sortEvents(filterEvents(eventsList), sortBy) as event, index (event.nsid)}
<EventCard {event} {index} />
{/each}
</div>
···
{:else}
<div class="text-center py-12 bg-gray-50 rounded-lg">
<div class="text-gray-400 text-4xl mb-4">๐Ÿ“Š</div>
-
<p class="text-gray-600">no events tracked yet.</p>
+
<p class="text-gray-600">no events tracked yet. try refreshing?</p>
</div>
{/if}
</div>
+1 -1
nix/client-modules.nix
···
src = ../client;
-
outputHash = "sha256-TzTafbNTng/mMyf0yR9Rc6XS9/zzipwmK9SUWm2XxeY=";
+
outputHash = "sha256-t8PJFo+3XGkzmMNbw9Rf9cS5Ob5YtI8ucX3ay+u9a3M=";
outputHashAlgo = "sha256";
outputHashMode = "recursive";
+5 -2
nix/server.nix
···
{
-
rustPlatform,
-
...
+
rustPlatform,
+
cmake,
+
...
}:
rustPlatform.buildRustPackage {
pname = "nsid-tracker-server";
version = "main";
src = ../server;
+
+
nativeBuildInputs = [ cmake ];
cargoLock = {
lockFile = ../server/Cargo.lock;
+1 -1
server/.gitignore
···
target
-
.fjall_data
+
.fjall_data*
+328 -53
server/Cargo.lock
···
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
-
name = "ahash"
-
version = "0.8.12"
+
name = "aho-corasick"
+
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
+
checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916"
dependencies = [
-
"cfg-if",
-
"getrandom 0.3.3",
-
"once_cell",
-
"version_check",
-
"zerocopy",
+
"memchr",
]
[[package]]
-
name = "aho-corasick"
-
version = "1.1.3"
+
name = "alloc-no-stdlib"
+
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916"
+
checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3"
+
+
[[package]]
+
name = "alloc-stdlib"
+
version = "0.2.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece"
dependencies = [
-
"memchr",
+
"alloc-no-stdlib",
]
[[package]]
···
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
[[package]]
+
name = "async-compression"
+
version = "0.4.25"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "40f6024f3f856663b45fd0c9b6f2024034a702f453549449e0d84a305900dad4"
+
dependencies = [
+
"brotli",
+
"flate2",
+
"futures-core",
+
"memchr",
+
"pin-project-lite",
+
"tokio",
+
"zstd",
+
"zstd-safe",
+
]
+
+
[[package]]
name = "async-trait"
version = "0.1.88"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
dependencies = [
"axum-core",
"bytes",
+
"form_urlencoded",
"futures-util",
"http",
"http-body",
···
"serde",
"serde_json",
"serde_path_to_error",
+
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
···
]
[[package]]
+
name = "branches"
+
version = "0.2.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a918aa7a861caeba57e502465c30e3a0d74ae02ee0b9db2933602fdb6a3a90e5"
+
dependencies = [
+
"rustc_version",
+
]
+
+
[[package]]
+
name = "brotli"
+
version = "8.0.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9991eea70ea4f293524138648e41ee89b0b2b12ddef3b255effa43c8056e0e0d"
+
dependencies = [
+
"alloc-no-stdlib",
+
"alloc-stdlib",
+
"brotli-decompressor",
+
]
+
+
[[package]]
+
name = "brotli-decompressor"
+
version = "5.0.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03"
+
dependencies = [
+
"alloc-no-stdlib",
+
"alloc-stdlib",
+
]
+
+
[[package]]
name = "bumpalo"
version = "3.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "deec109607ca693028562ed836a5f1c4b8bd77755c4e132fc5ce11b0b6211ae7"
dependencies = [
+
"jobserver",
+
"libc",
"shlex",
]
···
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
+
name = "cmake"
+
version = "0.1.54"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0"
+
dependencies = [
+
"cc",
+
]
+
+
[[package]]
name = "combine"
version = "4.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
]
[[package]]
+
name = "crc32fast"
+
version = "1.5.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511"
+
dependencies = [
+
"cfg-if",
+
]
+
+
[[package]]
+
name = "crossbeam-deque"
+
version = "0.8.6"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
+
dependencies = [
+
"crossbeam-epoch",
+
"crossbeam-utils",
+
]
+
+
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "c0d05e1c0dbad51b52c38bda7adceef61b9efc2baf04acfe8726a8c4630a6f57"
[[package]]
+
name = "either"
+
version = "1.15.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
+
+
[[package]]
name = "enum_dispatch"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"std-semaphore",
"tempfile",
"xxhash-rust",
+
]
+
+
[[package]]
+
name = "flate2"
+
version = "1.1.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d"
+
dependencies = [
+
"crc32fast",
+
"miniz_oxide",
]
[[package]]
···
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
+
name = "form_urlencoded"
+
version = "1.2.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456"
+
dependencies = [
+
"percent-encoding",
+
]
+
+
[[package]]
name = "futures-channel"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5"
[[package]]
+
name = "hermit-abi"
+
version = "0.5.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
+
+
[[package]]
name = "http"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"bitflags",
"cfg-if",
"libc",
+
]
+
+
[[package]]
+
name = "itertools"
+
version = "0.14.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
+
dependencies = [
+
"either",
]
[[package]]
···
checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130"
[[package]]
+
name = "jobserver"
+
version = "0.1.33"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a"
+
dependencies = [
+
"getrandom 0.3.3",
+
"libc",
+
]
+
+
[[package]]
name = "js-sys"
version = "0.3.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
]
[[package]]
+
name = "num_cpus"
+
version = "1.17.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b"
+
dependencies = [
+
"hermit-abi",
+
"libc",
+
]
+
+
[[package]]
name = "object"
version = "0.36.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
+
name = "ordered-varint"
+
version = "2.0.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e9cc9f18ab4bad1e01726bda1259feb8f11e5e76308708a966b4c0136e9db34c"
+
+
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
-
name = "papaya"
-
version = "0.2.3"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "f92dd0b07c53a0a0c764db2ace8c541dc47320dad97c2200c2a637ab9dd2328f"
-
dependencies = [
-
"equivalent",
-
"seize",
-
]
-
-
[[package]]
name = "parking_lot"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
-
name = "pingora-limits"
-
version = "0.5.0"
+
name = "pkg-config"
+
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "a719a8cb5558ca06bd6076c97b8905d500ea556da89e132ba53d4272844f95b9"
-
dependencies = [
-
"ahash",
-
]
+
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
[[package]]
name = "proc-macro2"
···
]
[[package]]
+
name = "quanta"
+
version = "0.12.6"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7"
+
dependencies = [
+
"crossbeam-utils",
+
"libc",
+
"once_cell",
+
"raw-cpuid",
+
"wasi 0.11.1+wasi-snapshot-preview1",
+
"web-sys",
+
"winapi",
+
]
+
+
[[package]]
name = "quick_cache"
version = "0.6.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
+
name = "raw-cpuid"
+
version = "11.5.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "c6df7ab838ed27997ba19a4664507e6f82b41fe6e20be42929332156e5e85146"
+
dependencies = [
+
"bitflags",
+
]
+
+
[[package]]
+
name = "rayon"
+
version = "1.10.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
+
dependencies = [
+
"either",
+
"rayon-core",
+
]
+
+
[[package]]
+
name = "rayon-core"
+
version = "1.12.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
+
dependencies = [
+
"crossbeam-deque",
+
"crossbeam-utils",
+
]
+
+
[[package]]
+
name = "rclite"
+
version = "0.2.7"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2f528dfeba924f5fc67bb84a17fe043451d1b392758016ce2d9e9116649b0f35"
+
dependencies = [
+
"branches",
+
]
+
+
[[package]]
name = "redox_syscall"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
+
+
[[package]]
+
name = "rustc_version"
+
version = "0.4.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
+
dependencies = [
+
"semver",
+
]
[[package]]
name = "rustix"
···
[[package]]
+
name = "scc"
+
version = "2.3.4"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "22b2d775fb28f245817589471dd49c5edf64237f4a19d10ce9a92ff4651a27f4"
+
dependencies = [
+
"sdd",
+
]
+
+
[[package]]
name = "schannel"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
+
name = "sdd"
+
version = "3.0.10"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca"
+
+
[[package]]
name = "security-framework"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
-
name = "seize"
-
version = "0.5.0"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "e4b8d813387d566f627f3ea1b914c068aac94c40ae27ec43f5f33bde65abefe7"
-
dependencies = [
-
"libc",
-
"windows-sys 0.52.0",
-
]
-
-
[[package]]
name = "self_cell"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f7d95a54511e0c7be3f51e8867aa8cf35148d7b9445d44de2f943e2b206e749"
+
+
[[package]]
+
name = "semver"
+
version = "1.0.26"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0"
[[package]]
name = "serde"
···
[[package]]
+
name = "serde_urlencoded"
+
version = "0.7.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
+
dependencies = [
+
"form_urlencoded",
+
"itoa",
+
"ryu",
+
"serde",
+
]
+
+
[[package]]
name = "server"
version = "0.1.0"
dependencies = [
···
"async-trait",
"axum",
"axum-tws",
+
"byteview",
"fjall",
"futures-util",
-
"papaya",
-
"pingora-limits",
+
"itertools",
+
"ordered-varint",
+
"parking_lot",
+
"quanta",
+
"rayon",
+
"rclite",
"rkyv",
"rustls",
+
"scc",
"serde",
"serde_json",
"smol_str",
+
"snmalloc-rs",
+
"threadpool",
"tikv-jemallocator",
"tokio",
"tokio-util",
···
[[package]]
+
name = "snmalloc-rs"
+
version = "0.3.8"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "eb317153089fdfa4d8a2eec059d40a5a23c3bde43995ea23b19121c3f621e74a"
+
dependencies = [
+
"snmalloc-sys",
+
]
+
+
[[package]]
+
name = "snmalloc-sys"
+
version = "0.3.8"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "065fea53d32bb77bc36cca466cb191f2e5216ebfd0ed360b1d64889ee6e559ea"
+
dependencies = [
+
"cmake",
+
]
+
+
[[package]]
name = "socket2"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
dependencies = [
"cfg-if",
+
]
+
+
[[package]]
+
name = "threadpool"
+
version = "1.8.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
+
dependencies = [
+
"num_cpus",
[[package]]
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2"
dependencies = [
+
"async-compression",
"bitflags",
"bytes",
+
"futures-core",
"http",
"http-body",
"pin-project-lite",
+
"tokio",
+
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
···
checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d"
dependencies = [
"unicode-ident",
+
]
+
+
[[package]]
+
name = "web-sys"
+
version = "0.3.77"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2"
+
dependencies = [
+
"js-sys",
+
"wasm-bindgen",
[[package]]
···
checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3"
[[package]]
-
name = "zerocopy"
-
version = "0.8.26"
+
name = "zeroize"
+
version = "1.8.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde"
+
+
[[package]]
+
name = "zstd"
+
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f"
+
checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a"
dependencies = [
-
"zerocopy-derive",
+
"zstd-safe",
[[package]]
-
name = "zerocopy-derive"
-
version = "0.8.26"
+
name = "zstd-safe"
+
version = "7.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181"
+
checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d"
dependencies = [
-
"proc-macro2",
-
"quote",
-
"syn",
+
"zstd-sys",
[[package]]
-
name = "zeroize"
-
version = "1.8.1"
+
name = "zstd-sys"
+
version = "2.0.15+zstd.1.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde"
+
checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237"
+
dependencies = [
+
"cc",
+
"pkg-config",
+
]
+15 -5
server/Cargo.toml
···
async-trait = "0.1"
tracing-subscriber = {version = "0.3", features = ["env-filter"]}
tracing = "0.1"
-
tokio = { version = "1", features = ["full"] }
+
tokio = { version = "1", features = ["full", "parking_lot"] }
tokio-util = { version = "0.7", features = ["tracing"] }
rustls = { version = "0.23", default-features = false, features = ["log", "ring", "std"] }
tokio-websockets = { version = "0.12", features = ["client", "rustls-platform-verifier", "getrandom", "ring"] }
futures-util = "0.3"
-
axum = { version = "0.8", default-features = false, features = ["http1", "tokio", "tracing", "json"] }
+
axum = { version = "0.8", default-features = false, features = ["http1", "tokio", "tracing", "json", "query"] }
axum-tws = { git = "https://github.com/90-008/axum-tws.git", features = ["http2"] }
-
pingora-limits = "0.5"
-
tower-http = {version = "0.6", features = ["request-id", "trace"]}
+
tower-http = {version = "0.6", features = ["request-id", "trace", "compression-full"]}
fjall = { version = "2", default-features = false, features = ["miniz", "lz4"] }
rkyv = {version = "0.8", features = ["unaligned"]}
smol_str = { version = "0.3", features = ["serde"] }
-
papaya = "0.2"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.141"
+
scc = "2.3.4"
+
ordered-varint = "2.0.0"
+
threadpool = "1.8.1"
+
quanta = "0.12.6"
+
itertools = "0.14.0"
+
byteview = "0.6.1"
+
rayon = "1.10.0"
+
parking_lot = { version = "0.12", features = ["send_guard", "hardware-lock-elision"] }
+
rclite = "0.2.7"
+
+
[target.'cfg(target_env = "msvc")'.dependencies]
+
snmalloc-rs = "0.3.8"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"
+84 -3
server/src/api.rs
···
use std::{
-
collections::HashMap, fmt::Display, net::SocketAddr, ops::Deref, sync::Arc, time::Duration,
+
collections::HashMap,
+
fmt::Display,
+
net::SocketAddr,
+
ops::{Bound, Deref, RangeBounds},
+
time::Duration,
};
use anyhow::anyhow;
-
use axum::{Json, Router, extract::State, http::Request, response::Response, routing::get};
+
use axum::{
+
Json, Router,
+
extract::{Query, State},
+
http::Request,
+
response::Response,
+
routing::get,
+
};
use axum_tws::{Message, WebSocketUpgrade};
-
use serde::Serialize;
+
use rclite::Arc;
+
use serde::{Deserialize, Serialize};
use smol_str::SmolStr;
use tokio_util::sync::CancellationToken;
use tower_http::{
classify::ServerErrorsFailureClass,
+
compression::CompressionLayer,
request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer},
trace::TraceLayer,
};
···
let app = Router::new()
.route("/events", get(events))
.route("/stream_events", get(stream_events))
+
.route("/hits", get(hits))
+
.route("/since", get(since))
+
.route_layer(CompressionLayer::new().br(true).deflate(true).gzip(true).zstd(true))
.route_layer(PropagateRequestIdLayer::x_request_id())
.route_layer(
TraceLayer::new_for_http()
···
}))
}
+
#[derive(Debug, Deserialize)]
+
struct HitsQuery {
+
nsid: SmolStr,
+
from: Option<u64>,
+
to: Option<u64>,
+
}
+
+
#[derive(Debug, Serialize)]
+
struct Hit {
+
timestamp: u64,
+
deleted: bool,
+
}
+
+
const MAX_HITS: usize = 100_000;
+
+
#[derive(Debug)]
+
struct HitsRange {
+
from: Bound<u64>,
+
to: Bound<u64>,
+
}
+
+
impl RangeBounds<u64> for HitsRange {
+
fn start_bound(&self) -> Bound<&u64> {
+
self.from.as_ref()
+
}
+
+
fn end_bound(&self) -> Bound<&u64> {
+
self.to.as_ref()
+
}
+
}
+
+
async fn hits(
+
State(db): State<Arc<Db>>,
+
Query(params): Query<HitsQuery>,
+
) -> AppResult<Json<Vec<Hit>>> {
+
let from = params.to.map(Bound::Included).unwrap_or(Bound::Unbounded);
+
let to = params.from.map(Bound::Included).unwrap_or(Bound::Unbounded);
+
let maybe_hits = db
+
.get_hits(&params.nsid, HitsRange { from, to })
+
.take(MAX_HITS);
+
let mut hits = Vec::with_capacity(maybe_hits.size_hint().0);
+
+
for maybe_hit in maybe_hits {
+
let hit = maybe_hit?;
+
let hit_data = hit.deser()?;
+
+
hits.push(Hit {
+
timestamp: hit.timestamp,
+
deleted: hit_data.deleted,
+
});
+
}
+
+
Ok(Json(hits))
+
}
+
async fn stream_events(db: State<Arc<Db>>, ws: WebSocketUpgrade) -> Response {
let span = tracing::info_span!(parent: Span::current(), "ws");
ws.on_upgrade(move |mut socket| {
···
.instrument(span)
})
}
+
+
#[derive(Debug, Serialize)]
+
struct Since {
+
since: u64,
+
}
+
+
async fn since(db: State<Arc<Db>>) -> AppResult<Json<Since>> {
+
Ok(Json(Since {
+
since: db.tracking_since()?,
+
}))
+
}
+521
server/src/db/block.rs
···
+
use std::{
+
io::{self, Read, Write},
+
marker::PhantomData,
+
usize,
+
};
+
+
use rkyv::{
+
Archive, Deserialize, Serialize,
+
api::high::{HighSerializer, HighValidator},
+
bytecheck::CheckBytes,
+
de::Pool,
+
rancor::{self, Strategy},
+
ser::allocator::ArenaHandle,
+
util::AlignedVec,
+
};
+
+
use crate::{
+
error::{AppError, AppResult},
+
utils::{ReadVariableExt, WriteVariableExt},
+
};
+
+
pub struct Item<T> {
+
pub timestamp: u64,
+
pub data: AlignedVec,
+
phantom: PhantomData<T>,
+
}
+
+
impl<T> Item<T>
+
where
+
T: Archive,
+
T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>>
+
+ Deserialize<T, Strategy<Pool, rancor::Error>>,
+
{
+
pub fn deser(&self) -> AppResult<T> {
+
rkyv::from_bytes(&self.data).map_err(AppError::from)
+
}
+
}
+
+
impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> {
+
pub fn new(timestamp: u64, data: &T) -> Self {
+
Item {
+
timestamp,
+
data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() },
+
phantom: PhantomData,
+
}
+
}
+
}
+
+
pub struct ItemEncoder<W: Write, T> {
+
writer: W,
+
prev_timestamp: u64,
+
prev_delta: i64,
+
item_count: usize,
+
_item: PhantomData<T>,
+
}
+
+
impl<W: Write, T> ItemEncoder<W, T> {
+
pub fn new(writer: W, item_count: usize) -> Self {
+
assert!(item_count > 0);
+
ItemEncoder {
+
writer,
+
prev_timestamp: 0,
+
prev_delta: 0,
+
item_count,
+
_item: PhantomData,
+
}
+
}
+
+
/// NOTE: this is a best effort estimate of the encoded length of the block.
+
/// if T contains variable-length data, the encoded length may be larger than this estimate.
+
pub fn encoded_len(item_count: usize) -> usize {
+
// items length + item count * delta length + data length
+
size_of::<usize>() + item_count * size_of::<(i64, T)>()
+
}
+
+
pub fn encode(&mut self, item: &Item<T>) -> io::Result<()> {
+
if self.prev_timestamp == 0 {
+
self.writer.write_varint(self.item_count)?;
+
// self.writer.write_varint(item.timestamp)?;
+
self.prev_timestamp = item.timestamp;
+
self.write_data(&item.data)?;
+
return Ok(());
+
}
+
+
let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64;
+
+
self.writer.write_varint(delta - self.prev_delta)?;
+
self.prev_timestamp = item.timestamp;
+
self.prev_delta = delta;
+
+
self.write_data(&item.data)?;
+
+
Ok(())
+
}
+
+
fn write_data(&mut self, data: &[u8]) -> io::Result<()> {
+
self.writer.write_varint(data.len())?;
+
self.writer.write_all(data)?;
+
Ok(())
+
}
+
+
pub fn finish(mut self) -> io::Result<W> {
+
self.writer.flush()?;
+
Ok(self.writer)
+
}
+
}
+
+
pub struct ItemDecoder<R, T> {
+
reader: R,
+
current_timestamp: u64,
+
current_delta: i64,
+
items_read: usize,
+
expected: usize,
+
_item: PhantomData<T>,
+
}
+
+
impl<R: Read, T: Archive> ItemDecoder<R, T> {
+
pub fn new(mut reader: R, start_timestamp: u64) -> io::Result<Self> {
+
let expected = match reader.read_varint() {
+
Ok(expected) => expected,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => 0,
+
Err(e) => return Err(e.into()),
+
};
+
+
Ok(ItemDecoder {
+
reader,
+
current_timestamp: start_timestamp,
+
current_delta: 0,
+
items_read: 0,
+
expected,
+
_item: PhantomData,
+
})
+
}
+
+
pub fn item_count(&self) -> usize {
+
self.expected
+
}
+
+
pub fn decode(&mut self) -> io::Result<Option<Item<T>>> {
+
if self.items_read == 0 {
+
// read the first timestamp
+
// let timestamp = match self.reader.read_varint::<u64>() {
+
// Ok(timestamp) => timestamp,
+
// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
// Err(e) => return Err(e.into()),
+
// };
+
// self.current_timestamp = timestamp;
+
+
let Some(data_raw) = self.read_item()? else {
+
return Ok(None);
+
};
+
+
self.items_read += 1;
+
return Ok(Some(Item {
+
timestamp: self.current_timestamp,
+
data: data_raw,
+
phantom: PhantomData,
+
}));
+
}
+
+
if self.items_read >= self.expected {
+
return Ok(None);
+
}
+
+
let Some(_delta) = self.read_timestamp()? else {
+
return Ok(None);
+
};
+
+
// read data
+
let data_raw = match self.read_item()? {
+
Some(data_raw) => data_raw,
+
None => {
+
return Err(io::Error::new(
+
io::ErrorKind::UnexpectedEof,
+
"expected data after delta",
+
)
+
.into());
+
}
+
};
+
+
self.items_read += 1;
+
Ok(Some(Item {
+
timestamp: self.current_timestamp,
+
data: data_raw,
+
phantom: PhantomData,
+
}))
+
}
+
+
// [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1]
+
fn read_timestamp(&mut self) -> io::Result<Option<u64>> {
+
let delta = match self.reader.read_varint::<i64>() {
+
Ok(delta) => delta,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
Err(e) => return Err(e.into()),
+
};
+
self.current_delta += delta;
+
self.current_timestamp =
+
(self.current_timestamp as i128 + self.current_delta as i128) as u64;
+
Ok(Some(self.current_timestamp))
+
}
+
+
fn read_item(&mut self) -> io::Result<Option<AlignedVec>> {
+
let data_len = match self.reader.read_varint::<usize>() {
+
Ok(data_len) => data_len,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
Err(e) => return Err(e.into()),
+
};
+
let mut data_raw = AlignedVec::with_capacity(data_len);
+
for _ in 0..data_len {
+
data_raw.push(0);
+
}
+
self.reader.read_exact(data_raw.as_mut_slice())?;
+
Ok(Some(data_raw))
+
}
+
}
+
+
impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> {
+
type Item = io::Result<Item<T>>;
+
+
fn next(&mut self) -> Option<Self::Item> {
+
self.decode().transpose()
+
}
+
+
fn size_hint(&self) -> (usize, Option<usize>) {
+
(self.expected, Some(self.expected))
+
}
+
}
+
+
#[cfg(test)]
+
mod test {
+
use super::*;
+
use rkyv::{Archive, Deserialize, Serialize};
+
use std::io::Cursor;
+
+
#[derive(Archive, Deserialize, Serialize, Debug, PartialEq)]
+
#[rkyv(compare(PartialEq))]
+
struct TestData {
+
id: u32,
+
value: String,
+
}
+
+
#[test]
+
fn test_encoder_decoder_single_item() {
+
let data = TestData {
+
id: 123,
+
value: "test".to_string(),
+
};
+
+
let item = Item::new(1000, &data);
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, 1);
+
encoder.encode(&item).unwrap();
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_item = decoder.decode().unwrap().unwrap();
+
assert_eq!(decoded_item.timestamp, 1000);
+
+
let decoded_data = decoded_item.deser().unwrap();
+
assert_eq!(decoded_data.id, 123);
+
assert_eq!(decoded_data.value.as_str(), "test");
+
}
+
+
#[test]
+
fn test_encoder_decoder_multiple_items() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "first".to_string(),
+
},
+
),
+
Item::new(
+
1010,
+
&TestData {
+
id: 2,
+
value: "second".to_string(),
+
},
+
),
+
Item::new(
+
1015,
+
&TestData {
+
id: 3,
+
value: "third".to_string(),
+
},
+
),
+
Item::new(
+
1025,
+
&TestData {
+
id: 4,
+
value: "fourth".to_string(),
+
},
+
),
+
];
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, items.len());
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let mut decoded_items = Vec::new();
+
while let Some(item) = decoder.decode().unwrap() {
+
decoded_items.push(item);
+
}
+
+
assert_eq!(decoded_items.len(), 4);
+
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
+
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.deser().unwrap().id, decoded.deser().unwrap().id);
+
assert_eq!(
+
original.deser().unwrap().value.as_str(),
+
decoded.deser().unwrap().value.as_str()
+
);
+
}
+
}
+
+
#[test]
+
fn test_encoder_decoder_with_iterator() {
+
let items = vec![
+
Item::new(
+
2000,
+
&TestData {
+
id: 10,
+
value: "a".to_string(),
+
},
+
),
+
Item::new(
+
2005,
+
&TestData {
+
id: 20,
+
value: "b".to_string(),
+
},
+
),
+
Item::new(
+
2012,
+
&TestData {
+
id: 30,
+
value: "c".to_string(),
+
},
+
),
+
];
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, items.len());
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 3);
+
assert_eq!(decoded_items[0].timestamp, 2000);
+
assert_eq!(decoded_items[1].timestamp, 2005);
+
assert_eq!(decoded_items[2].timestamp, 2012);
+
+
assert_eq!(decoded_items[0].deser().unwrap().id, 10);
+
assert_eq!(decoded_items[1].deser().unwrap().id, 20);
+
assert_eq!(decoded_items[2].deser().unwrap().id, 30);
+
}
+
+
#[test]
+
fn test_delta_compression() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "a".to_string(),
+
},
+
),
+
Item::new(
+
1010,
+
&TestData {
+
id: 2,
+
value: "b".to_string(),
+
},
+
), // delta = 10
+
Item::new(
+
1020,
+
&TestData {
+
id: 3,
+
value: "c".to_string(),
+
},
+
), // delta = 10, delta-of-delta = 0
+
Item::new(
+
1025,
+
&TestData {
+
id: 4,
+
value: "d".to_string(),
+
},
+
), // delta = 5, delta-of-delta = -5
+
];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, items.len());
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode and verify
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
+
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.deser().unwrap().id, decoded.deser().unwrap().id);
+
}
+
}
+
+
#[test]
+
fn test_empty_decode() {
+
let buffer = Vec::new();
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let result = decoder.decode().unwrap();
+
assert!(result.is_none());
+
}
+
+
#[test]
+
fn test_backwards_timestamp() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "first".to_string(),
+
},
+
),
+
Item::new(
+
900,
+
&TestData {
+
id: 2,
+
value: "second".to_string(),
+
},
+
),
+
];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, items.len());
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 2);
+
assert_eq!(decoded_items[0].timestamp, 1000);
+
assert_eq!(decoded_items[1].timestamp, 900);
+
}
+
+
#[test]
+
fn test_different_data_sizes() {
+
let small_data = TestData {
+
id: 1,
+
value: "x".to_string(),
+
};
+
let large_data = TestData {
+
id: 2,
+
value: "a".repeat(1000),
+
};
+
+
let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer, items.len());
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 2);
+
assert_eq!(decoded_items[0].deser().unwrap().value.as_str(), "x");
+
assert_eq!(decoded_items[1].deser().unwrap().value.len(), 1000);
+
assert_eq!(
+
decoded_items[1].deser().unwrap().value.as_str(),
+
"a".repeat(1000)
+
);
+
}
+
}
+232
server/src/db/handle.rs
···
+
use std::{
+
fmt::Debug,
+
io::Cursor,
+
ops::{Bound, Deref, RangeBounds},
+
sync::atomic::{AtomicU64, Ordering as AtomicOrdering},
+
time::Duration,
+
};
+
+
use byteview::ByteView;
+
use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice};
+
use itertools::Itertools;
+
use parking_lot::Mutex;
+
use rayon::iter::{IntoParallelIterator, ParallelIterator};
+
use rclite::Arc;
+
use smol_str::SmolStr;
+
+
use crate::{
+
db::{EventRecord, NsidHit, block},
+
error::AppResult,
+
utils::{CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, varints_unsigned_encoded},
+
};
+
+
pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
+
pub type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
+
pub type Item = block::Item<NsidHit>;
+
+
pub struct Block {
+
pub written: usize,
+
pub key: ByteView,
+
pub data: Vec<u8>,
+
}
+
+
pub struct LexiconHandle {
+
tree: Partition,
+
nsid: SmolStr,
+
buf: Arc<Mutex<Vec<EventRecord>>>,
+
last_insert: AtomicU64, // relaxed
+
eps: DefaultRateTracker,
+
}
+
+
impl Debug for LexiconHandle {
+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
f.debug_struct("LexiconHandle")
+
.field("nsid", self.nsid())
+
.finish()
+
}
+
}
+
+
impl Deref for LexiconHandle {
+
type Target = Partition;
+
+
fn deref(&self) -> &Self::Target {
+
&self.tree
+
}
+
}
+
+
impl LexiconHandle {
+
pub fn new(keyspace: &Keyspace, nsid: &str) -> Self {
+
let opts = PartitionCreateOptions::default()
+
.block_size(1024 * 128)
+
.compression(fjall::CompressionType::Miniz(9));
+
Self {
+
tree: keyspace.open_partition(nsid, opts).unwrap(),
+
nsid: nsid.into(),
+
buf: Default::default(),
+
last_insert: AtomicU64::new(0),
+
eps: RateTracker::new(Duration::from_secs(10)),
+
}
+
}
+
+
pub fn nsid(&self) -> &SmolStr {
+
&self.nsid
+
}
+
+
pub fn item_count(&self) -> usize {
+
self.buf.lock().len()
+
}
+
+
pub fn since_last_activity(&self) -> u64 {
+
CLOCK.delta_as_nanos(self.last_insert.load(AtomicOrdering::Relaxed), CLOCK.raw())
+
}
+
+
pub fn suggested_block_size(&self) -> usize {
+
self.eps.rate() as usize * 60
+
}
+
+
pub fn queue(&self, events: impl IntoIterator<Item = EventRecord>) {
+
let mut count = 0;
+
self.buf.lock().extend(events.into_iter().inspect(|_| {
+
count += 1;
+
}));
+
self.last_insert.store(CLOCK.raw(), AtomicOrdering::Relaxed);
+
self.eps.observe(count);
+
}
+
+
pub fn compact(
+
&self,
+
compact_to: usize,
+
range: impl RangeBounds<u64>,
+
sort: bool,
+
) -> AppResult<()> {
+
let start_limit = match range.start_bound().cloned() {
+
Bound::Included(start) => start,
+
Bound::Excluded(start) => start.saturating_add(1),
+
Bound::Unbounded => 0,
+
};
+
let end_limit = match range.end_bound().cloned() {
+
Bound::Included(end) => end,
+
Bound::Excluded(end) => end.saturating_sub(1),
+
Bound::Unbounded => u64::MAX,
+
};
+
+
let start_key = varints_unsigned_encoded([start_limit]);
+
let end_key = varints_unsigned_encoded([end_limit]);
+
+
let blocks_to_compact = self
+
.tree
+
.range(start_key..end_key)
+
.collect::<Result<Vec<_>, _>>()?;
+
if blocks_to_compact.len() < 2 {
+
tracing::info!("{}: nothing to compact", self.nsid);
+
return Ok(());
+
}
+
+
let start_blocks_size = blocks_to_compact.len();
+
let keys_to_delete = blocks_to_compact.iter().map(|(key, _)| key);
+
let mut all_items =
+
blocks_to_compact
+
.iter()
+
.try_fold(Vec::new(), |mut acc, (key, value)| {
+
let mut timestamps = Cursor::new(key);
+
let start_timestamp = timestamps.read_varint()?;
+
let decoder = block::ItemDecoder::new(Cursor::new(value), start_timestamp)?;
+
let mut items = decoder.collect::<Result<Vec<_>, _>>()?;
+
acc.append(&mut items);
+
AppResult::Ok(acc)
+
})?;
+
+
if sort {
+
all_items.sort_unstable_by_key(|e| e.timestamp);
+
}
+
+
let new_blocks = all_items
+
.into_iter()
+
.chunks(compact_to)
+
.into_iter()
+
.map(|chunk| chunk.collect_vec())
+
.collect_vec()
+
.into_par_iter()
+
.map(|chunk| {
+
let count = chunk.len();
+
Self::encode_block_from_items(chunk, count)
+
})
+
.collect::<Result<Vec<_>, _>>()?;
+
let end_blocks_size = new_blocks.len();
+
+
for key in keys_to_delete {
+
self.tree.remove(key.clone())?;
+
}
+
for block in new_blocks {
+
self.tree.insert(block.key, block.data)?;
+
}
+
+
tracing::info!(
+
"{}: compacted {} blocks to {} blocks ({}% reduction)",
+
self.nsid,
+
start_blocks_size,
+
end_blocks_size,
+
((start_blocks_size - end_blocks_size) as f64 / start_blocks_size as f64) * 100.0,
+
);
+
+
Ok(())
+
}
+
+
pub fn encode_block_from_items(
+
items: impl IntoIterator<Item = Item>,
+
count: usize,
+
) -> AppResult<Block> {
+
if count == 0 {
+
return Err(std::io::Error::new(
+
std::io::ErrorKind::InvalidInput,
+
"no items requested",
+
)
+
.into());
+
}
+
let mut writer =
+
ItemEncoder::new(Vec::with_capacity(ItemEncoder::encoded_len(count)), count);
+
let mut start_timestamp = None;
+
let mut end_timestamp = None;
+
let mut written = 0_usize;
+
for item in items.into_iter().take(count) {
+
writer.encode(&item)?;
+
if start_timestamp.is_none() {
+
start_timestamp = Some(item.timestamp);
+
}
+
end_timestamp = Some(item.timestamp);
+
written += 1;
+
}
+
if written != count {
+
return Err(std::io::Error::new(
+
std::io::ErrorKind::InvalidData,
+
"unexpected number of items, invalid data?",
+
)
+
.into());
+
}
+
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
+
let value = writer.finish()?;
+
let key = varints_unsigned_encoded([start_timestamp, end_timestamp]);
+
return Ok(Block {
+
written,
+
key,
+
data: value,
+
});
+
}
+
Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "no items are in queue").into())
+
}
+
+
pub fn take_block_items(&self, item_count: usize) -> Vec<Item> {
+
let mut buf = self.buf.lock();
+
let end = item_count.min(buf.len());
+
buf.drain(..end)
+
.map(|event| {
+
Item::new(
+
event.timestamp,
+
&NsidHit {
+
deleted: event.deleted,
+
},
+
)
+
})
+
.collect()
+
}
+
}
+447
server/src/db/mod.rs
···
+
use std::{
+
collections::HashMap,
+
fmt::Debug,
+
io::Cursor,
+
ops::{Bound, Deref, RangeBounds},
+
path::{Path, PathBuf},
+
time::Duration,
+
};
+
+
use byteview::StrView;
+
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions};
+
use itertools::{Either, Itertools};
+
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
+
use rclite::Arc;
+
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
+
use smol_str::{SmolStr, ToSmolStr};
+
use tokio::sync::broadcast;
+
use tokio_util::sync::CancellationToken;
+
+
use crate::{
+
db::handle::{ItemDecoder, LexiconHandle},
+
error::{AppError, AppResult},
+
jetstream::JetstreamEvent,
+
utils::{RateTracker, ReadVariableExt, varints_unsigned_encoded},
+
};
+
+
mod block;
+
mod handle;
+
+
#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
+
#[rkyv(compare(PartialEq), derive(Debug))]
+
pub struct NsidCounts {
+
pub count: u128,
+
pub deleted_count: u128,
+
pub last_seen: u64,
+
}
+
+
#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
+
#[rkyv(compare(PartialEq), derive(Debug))]
+
pub struct NsidHit {
+
pub deleted: bool,
+
}
+
+
#[derive(Clone)]
+
pub struct EventRecord {
+
pub nsid: SmolStr,
+
pub timestamp: u64, // seconds
+
pub deleted: bool,
+
}
+
+
impl EventRecord {
+
pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
+
match event {
+
JetstreamEvent::Commit {
+
time_us, commit, ..
+
} => Some(Self {
+
nsid: commit.collection.into(),
+
timestamp: time_us / 1_000_000,
+
deleted: false,
+
}),
+
JetstreamEvent::Delete {
+
time_us, commit, ..
+
} => Some(Self {
+
nsid: commit.collection.into(),
+
timestamp: time_us / 1_000_000,
+
deleted: true,
+
}),
+
_ => None,
+
}
+
}
+
}
+
+
pub struct DbInfo {
+
pub nsids: HashMap<SmolStr, Vec<usize>>,
+
pub disk_size: u64,
+
}
+
+
pub struct DbConfig {
+
pub ks_config: fjall::Config,
+
pub min_block_size: usize,
+
pub max_block_size: usize,
+
pub max_last_activity: u64,
+
}
+
+
impl DbConfig {
+
pub fn path(mut self, path: impl AsRef<Path>) -> Self {
+
self.ks_config = fjall::Config::new(path);
+
self
+
}
+
+
pub fn ks(mut self, f: impl FnOnce(fjall::Config) -> fjall::Config) -> Self {
+
self.ks_config = f(self.ks_config);
+
self
+
}
+
}
+
+
impl Default for DbConfig {
+
fn default() -> Self {
+
Self {
+
ks_config: fjall::Config::default(),
+
min_block_size: 512,
+
max_block_size: 500_000,
+
max_last_activity: Duration::from_secs(10).as_nanos() as u64,
+
}
+
}
+
}
+
+
// counts is nsid -> NsidCounts
+
// hits is tree per nsid: varint start time + varint end time -> block of hits
+
pub struct Db {
+
pub cfg: DbConfig,
+
pub ks: Keyspace,
+
counts: Partition,
+
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
+
sync_pool: threadpool::ThreadPool,
+
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
+
eps: RateTracker<100>,
+
cancel_token: CancellationToken,
+
}
+
+
impl Db {
+
pub fn new(cfg: DbConfig, cancel_token: CancellationToken) -> AppResult<Self> {
+
tracing::info!("opening db...");
+
let ks = cfg.ks_config.clone().open()?;
+
Ok(Self {
+
cfg,
+
hits: Default::default(),
+
sync_pool: threadpool::Builder::new()
+
.num_threads(rayon::current_num_threads() * 2)
+
.build(),
+
counts: ks.open_partition(
+
"_counts",
+
PartitionCreateOptions::default().compression(fjall::CompressionType::None),
+
)?,
+
ks,
+
event_broadcaster: broadcast::channel(1000).0,
+
eps: RateTracker::new(Duration::from_secs(1)),
+
cancel_token,
+
})
+
}
+
+
#[inline(always)]
+
pub fn shutting_down(&self) -> impl Future<Output = ()> {
+
self.cancel_token.cancelled()
+
}
+
+
#[inline(always)]
+
pub fn is_shutting_down(&self) -> bool {
+
self.cancel_token.is_cancelled()
+
}
+
+
#[inline(always)]
+
pub fn eps(&self) -> usize {
+
self.eps.rate() as usize
+
}
+
+
#[inline(always)]
+
pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
+
self.event_broadcaster.subscribe()
+
}
+
+
pub fn sync(&self, all: bool) -> AppResult<()> {
+
// prepare all the data
+
let mut data = Vec::with_capacity(self.hits.len());
+
let _guard = scc::ebr::Guard::new();
+
for (_, handle) in self.hits.iter(&_guard) {
+
let mut nsid_data = Vec::with_capacity(2);
+
let mut total_count = 0;
+
let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity;
+
// if we disconnect for a long time, we want to sync all of what we
+
// have to avoid having many small blocks (even if we run compaction
+
// later, it reduces work until we run compaction)
+
let block_size = (is_too_old || all)
+
.then_some(self.cfg.max_block_size)
+
.unwrap_or_else(|| {
+
self.cfg
+
.max_block_size
+
.min(self.cfg.min_block_size.max(handle.suggested_block_size()))
+
});
+
let count = handle.item_count();
+
let data_count = count / block_size;
+
if count > 0 && (all || data_count > 0 || is_too_old) {
+
for i in 0..data_count {
+
nsid_data.push((i, handle.clone(), block_size));
+
total_count += block_size;
+
}
+
// only sync remainder if we haven't met block size
+
let remainder = count % block_size;
+
if (all || data_count == 0) && remainder > 0 {
+
nsid_data.push((data_count, handle.clone(), remainder));
+
total_count += remainder;
+
}
+
}
+
tracing::info!(
+
"{}: will sync {} blocks ({} count)",
+
handle.nsid(),
+
nsid_data.len(),
+
total_count,
+
);
+
data.push(nsid_data);
+
}
+
drop(_guard);
+
+
// process the blocks
+
data.into_par_iter()
+
.map(|chunk| {
+
chunk
+
.into_iter()
+
.map(|(i, handle, max_block_size)| {
+
(i, handle.take_block_items(max_block_size), handle)
+
})
+
.collect::<Vec<_>>()
+
.into_par_iter()
+
.map(|(i, items, handle)| {
+
let count = items.len();
+
let block = LexiconHandle::encode_block_from_items(items, count)?;
+
tracing::info!(
+
"{}: encoded block with {} items",
+
handle.nsid(),
+
block.written,
+
);
+
AppResult::Ok((i, block, handle))
+
})
+
.collect::<Result<Vec<_>, _>>()
+
})
+
.try_for_each(|chunk| {
+
let chunk = chunk?;
+
for (i, block, handle) in chunk {
+
self.sync_pool
+
.execute(move || match handle.insert(block.key, block.data) {
+
Ok(_) => {
+
tracing::info!("{}: [{i}] synced {}", block.written, handle.nsid())
+
}
+
Err(err) => tracing::error!("failed to sync block: {}", err),
+
});
+
}
+
AppResult::Ok(())
+
})?;
+
self.sync_pool.join();
+
+
Ok(())
+
}
+
+
pub fn compact(
+
&self,
+
nsid: impl AsRef<str>,
+
max_count: usize,
+
range: impl RangeBounds<u64>,
+
sort: bool,
+
) -> AppResult<()> {
+
let Some(handle) = self.get_handle(nsid) else {
+
return Ok(());
+
};
+
handle.compact(max_count, range, sort)
+
}
+
+
pub fn compact_all(
+
&self,
+
max_count: usize,
+
range: impl RangeBounds<u64> + Clone,
+
sort: bool,
+
) -> AppResult<()> {
+
for nsid in self.get_nsids() {
+
self.compact(nsid, max_count, range.clone(), sort)?;
+
}
+
Ok(())
+
}
+
+
pub fn major_compact(&self) -> AppResult<()> {
+
self.compact_all(self.cfg.max_block_size, .., true)?;
+
let _guard = scc::ebr::Guard::new();
+
for (_, handle) in self.hits.iter(&_guard) {
+
handle.deref().major_compact()?;
+
}
+
Ok(())
+
}
+
+
#[inline(always)]
+
fn get_handle(&self, nsid: impl AsRef<str>) -> Option<Arc<LexiconHandle>> {
+
let _guard = scc::ebr::Guard::new();
+
let handle = match self.hits.peek(nsid.as_ref(), &_guard) {
+
Some(handle) => handle.clone(),
+
None => {
+
if self.ks.partition_exists(nsid.as_ref()) {
+
let handle = Arc::new(LexiconHandle::new(&self.ks, nsid.as_ref()));
+
let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
+
handle
+
} else {
+
return None;
+
}
+
}
+
};
+
Some(handle)
+
}
+
+
#[inline(always)]
+
fn ensure_handle(&self, nsid: &SmolStr) -> impl Deref<Target = Arc<LexiconHandle>> + use<'_> {
+
self.hits
+
.entry(nsid.clone())
+
.or_insert_with(|| Arc::new(LexiconHandle::new(&self.ks, &nsid)))
+
}
+
+
pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> {
+
for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() {
+
let mut counts = self.get_count(&key)?;
+
let mut count = 0;
+
self.ensure_handle(&key).queue(chunk.inspect(|e| {
+
// increment count
+
counts.last_seen = e.timestamp;
+
if e.deleted {
+
counts.deleted_count += 1;
+
} else {
+
counts.count += 1;
+
}
+
count += 1;
+
}));
+
self.eps.observe(count);
+
self.insert_count(&key, &counts)?;
+
if self.event_broadcaster.receiver_count() > 0 {
+
let _ = self.event_broadcaster.send((key, counts));
+
}
+
}
+
Ok(())
+
}
+
+
#[inline(always)]
+
fn insert_count(&self, nsid: &str, counts: &NsidCounts) -> AppResult<()> {
+
self.counts
+
.insert(
+
nsid,
+
unsafe { rkyv::to_bytes::<Error>(counts).unwrap_unchecked() }.as_slice(),
+
)
+
.map_err(AppError::from)
+
}
+
+
pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
+
let Some(raw) = self.counts.get(nsid)? else {
+
return Ok(NsidCounts::default());
+
};
+
Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
+
}
+
+
pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
+
self.counts.iter().map(|res| {
+
res.map_err(AppError::from).map(|(key, val)| {
+
(
+
SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
+
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
+
)
+
})
+
})
+
}
+
+
pub fn get_nsids(&self) -> impl Iterator<Item = StrView> {
+
self.ks
+
.list_partitions()
+
.into_iter()
+
.filter(|k| k.deref() != "_counts")
+
}
+
+
pub fn info(&self) -> AppResult<DbInfo> {
+
let mut nsids = HashMap::new();
+
for nsid in self.get_nsids() {
+
let Some(handle) = self.get_handle(&nsid) else {
+
continue;
+
};
+
let block_lens = handle.iter().rev().try_fold(Vec::new(), |mut acc, item| {
+
let (key, value) = item?;
+
let mut timestamps = Cursor::new(key);
+
let start_timestamp = timestamps.read_varint()?;
+
let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?;
+
acc.push(decoder.item_count());
+
AppResult::Ok(acc)
+
})?;
+
nsids.insert(nsid.to_smolstr(), block_lens);
+
}
+
Ok(DbInfo {
+
nsids,
+
disk_size: self.ks.disk_space(),
+
})
+
}
+
+
pub fn get_hits(
+
&self,
+
nsid: &str,
+
range: impl RangeBounds<u64> + std::fmt::Debug,
+
) -> impl Iterator<Item = AppResult<handle::Item>> {
+
let start_limit = match range.start_bound().cloned() {
+
Bound::Included(start) => start,
+
Bound::Excluded(start) => start.saturating_add(1),
+
Bound::Unbounded => 0,
+
};
+
let end_limit = match range.end_bound().cloned() {
+
Bound::Included(end) => end,
+
Bound::Excluded(end) => end.saturating_sub(1),
+
Bound::Unbounded => u64::MAX,
+
};
+
let end_key = varints_unsigned_encoded([end_limit]);
+
+
let Some(handle) = self.get_handle(nsid) else {
+
return Either::Right(std::iter::empty());
+
};
+
+
let map_block = move |(key, val)| {
+
let mut key_reader = Cursor::new(key);
+
let start_timestamp = key_reader.read_varint::<u64>()?;
+
if start_timestamp < start_limit {
+
return Ok(None);
+
}
+
let items = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?
+
.take_while(move |item| {
+
item.as_ref().map_or(true, |item| {
+
item.timestamp <= end_limit && item.timestamp >= start_limit
+
})
+
})
+
.map(|res| res.map_err(AppError::from));
+
Ok(Some(items))
+
};
+
+
Either::Left(
+
handle
+
.range(..end_key)
+
.rev()
+
.map_while(move |res| res.map_err(AppError::from).and_then(map_block).transpose())
+
.collect::<Vec<_>>()
+
.into_iter()
+
.rev()
+
.flatten()
+
.flatten(),
+
)
+
}
+
+
pub fn tracking_since(&self) -> AppResult<u64> {
+
// HACK: we should actually store when we started tracking but im lazy
+
// this should be accurate enough
+
let Some(handle) = self.get_handle("app.bsky.feed.like") else {
+
return Ok(0);
+
};
+
let Some((timestamps_raw, _)) = handle.first_key_value()? else {
+
return Ok(0);
+
};
+
let mut timestamp_reader = Cursor::new(timestamps_raw);
+
timestamp_reader
+
.read_varint::<u64>()
+
.map_err(AppError::from)
+
}
+
}
-195
server/src/db.rs
···
-
use std::{ops::Deref, path::Path, time::Duration};
-
-
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions};
-
use pingora_limits::rate::Rate;
-
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
-
use smol_str::SmolStr;
-
use tokio::sync::broadcast;
-
-
use crate::{
-
error::{AppError, AppResult},
-
jetstream::JetstreamEvent,
-
};
-
-
#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
-
#[rkyv(compare(PartialEq), derive(Debug))]
-
pub struct NsidCounts {
-
pub count: u128,
-
pub deleted_count: u128,
-
pub last_seen: u64,
-
}
-
-
#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
-
#[rkyv(compare(PartialEq), derive(Debug))]
-
pub struct NsidHit {
-
pub deleted: bool,
-
}
-
-
pub struct EventRecord {
-
pub nsid: SmolStr,
-
pub timestamp: u64,
-
pub deleted: bool,
-
}
-
-
impl EventRecord {
-
pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
-
match event {
-
JetstreamEvent::Commit {
-
time_us, commit, ..
-
} => Some(Self {
-
nsid: commit.collection.into(),
-
timestamp: time_us,
-
deleted: false,
-
}),
-
JetstreamEvent::Delete {
-
time_us, commit, ..
-
} => Some(Self {
-
nsid: commit.collection.into(),
-
timestamp: time_us,
-
deleted: true,
-
}),
-
_ => None,
-
}
-
}
-
}
-
-
// counts is nsid -> NsidCounts
-
// hits is tree per nsid: timestamp -> NsidHit
-
pub struct Db {
-
inner: Keyspace,
-
hits: papaya::HashMap<SmolStr, Partition>,
-
counts: Partition,
-
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
-
eps: Rate,
-
}
-
-
impl Db {
-
pub fn new(path: impl AsRef<Path>) -> AppResult<Self> {
-
tracing::info!("opening db...");
-
let ks = Config::new(path)
-
.cache_size(8 * 1024 * 1024) // from talna
-
.open()?;
-
Ok(Self {
-
hits: Default::default(),
-
counts: ks.open_partition(
-
"_counts",
-
PartitionCreateOptions::default().compression(fjall::CompressionType::None),
-
)?,
-
inner: ks,
-
event_broadcaster: broadcast::channel(1000).0,
-
eps: Rate::new(Duration::from_secs(1)),
-
})
-
}
-
-
pub fn eps(&self) -> usize {
-
self.eps.rate(&()) as usize
-
}
-
-
pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
-
self.event_broadcaster.subscribe()
-
}
-
-
#[inline(always)]
-
fn run_in_nsid_tree<T>(
-
&self,
-
nsid: &str,
-
f: impl FnOnce(&Partition) -> AppResult<T>,
-
) -> AppResult<T> {
-
f(self.hits.pin().get_or_insert_with(SmolStr::new(nsid), || {
-
let opts = PartitionCreateOptions::default()
-
.compression(fjall::CompressionType::Miniz(9))
-
.compaction_strategy(fjall::compaction::Strategy::Fifo(fjall::compaction::Fifo {
-
limit: 5 * 1024 * 1024 * 1024, // 5 gb
-
ttl_seconds: Some(60 * 60 * 24 * 30), // 30 days
-
}));
-
self.inner.open_partition(nsid, opts).unwrap()
-
}))
-
}
-
-
pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
-
let EventRecord {
-
nsid,
-
timestamp,
-
deleted,
-
} = e;
-
-
self.insert_event(&nsid, timestamp, deleted)?;
-
// increment count
-
let mut counts = self.get_count(&nsid)?;
-
counts.last_seen = timestamp;
-
if deleted {
-
counts.deleted_count += 1;
-
} else {
-
counts.count += 1;
-
}
-
self.insert_count(&nsid, counts.clone())?;
-
if self.event_broadcaster.receiver_count() > 0 {
-
let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts));
-
}
-
self.eps.observe(&(), 1);
-
Ok(())
-
}
-
-
#[inline(always)]
-
fn insert_event(&self, nsid: &str, timestamp: u64, deleted: bool) -> AppResult<()> {
-
self.run_in_nsid_tree(nsid, |tree| {
-
tree.insert(
-
timestamp.to_be_bytes(),
-
unsafe { rkyv::to_bytes::<Error>(&NsidHit { deleted }).unwrap_unchecked() }
-
.as_slice(),
-
)
-
.map_err(AppError::from)
-
})
-
}
-
-
#[inline(always)]
-
fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> {
-
self.counts
-
.insert(
-
nsid,
-
unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(),
-
)
-
.map_err(AppError::from)
-
}
-
-
pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
-
let Some(raw) = self.counts.get(nsid)? else {
-
return Ok(NsidCounts::default());
-
};
-
Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
-
}
-
-
pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
-
self.counts.iter().map(|res| {
-
res.map_err(AppError::from).map(|(key, val)| {
-
(
-
SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
-
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
-
)
-
})
-
})
-
}
-
-
pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str>> {
-
self.inner
-
.list_partitions()
-
.into_iter()
-
.filter(|k| k.deref() != "_counts")
-
}
-
-
pub fn get_hits(
-
&self,
-
nsid: &str,
-
) -> AppResult<impl Iterator<Item = AppResult<(u64, NsidHit)>>> {
-
self.run_in_nsid_tree(nsid, |tree| {
-
Ok(tree.iter().map(|res| {
-
res.map_err(AppError::from).map(|(key, val)| {
-
(
-
u64::from_be_bytes(key.as_ref().try_into().unwrap()),
-
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
-
)
-
})
-
}))
-
})
-
}
-
}
+501
server/src/db_old/block.rs
···
+
use ordered_varint::Variable;
+
use rkyv::{
+
Archive, Deserialize, Serialize,
+
api::high::{HighSerializer, HighValidator},
+
bytecheck::CheckBytes,
+
de::Pool,
+
rancor::{self, Strategy},
+
ser::allocator::ArenaHandle,
+
util::AlignedVec,
+
};
+
use std::{
+
io::{self, Read, Write},
+
marker::PhantomData,
+
};
+
+
use crate::error::{AppError, AppResult};
+
+
pub struct Item<T> {
+
pub timestamp: u64,
+
data: AlignedVec,
+
phantom: PhantomData<T>,
+
}
+
+
impl<T: Archive> Item<T> {
+
pub fn access(&self) -> &T::Archived {
+
unsafe { rkyv::access_unchecked::<T::Archived>(&self.data) }
+
}
+
}
+
+
impl<T> Item<T>
+
where
+
T: Archive,
+
T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>>
+
+ Deserialize<T, Strategy<Pool, rancor::Error>>,
+
{
+
pub fn deser(&self) -> AppResult<T> {
+
rkyv::from_bytes(&self.data).map_err(AppError::from)
+
}
+
}
+
+
impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> {
+
pub fn new(timestamp: u64, data: &T) -> Self {
+
Item {
+
timestamp,
+
data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() },
+
phantom: PhantomData,
+
}
+
}
+
}
+
+
pub struct ItemEncoder<W: Write, T> {
+
writer: W,
+
prev_timestamp: u64,
+
prev_delta: i64,
+
_item: PhantomData<T>,
+
}
+
+
impl<W: Write, T> ItemEncoder<W, T> {
+
pub fn new(writer: W) -> Self {
+
ItemEncoder {
+
writer,
+
prev_timestamp: 0,
+
prev_delta: 0,
+
_item: PhantomData,
+
}
+
}
+
+
pub fn encode(&mut self, item: &Item<T>) -> AppResult<()> {
+
if self.prev_timestamp == 0 {
+
// self.writer.write_varint(item.timestamp)?;
+
self.prev_timestamp = item.timestamp;
+
self.write_data(&item.data)?;
+
return Ok(());
+
}
+
+
let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64;
+
+
self.writer.write_varint(delta - self.prev_delta)?;
+
self.prev_timestamp = item.timestamp;
+
self.prev_delta = delta;
+
+
self.write_data(&item.data)?;
+
+
Ok(())
+
}
+
+
fn write_data(&mut self, data: &[u8]) -> AppResult<()> {
+
self.writer.write_varint(data.len())?;
+
self.writer.write_all(data)?;
+
Ok(())
+
}
+
+
pub fn finish(mut self) -> AppResult<W> {
+
self.writer.flush()?;
+
Ok(self.writer)
+
}
+
}
+
+
pub struct ItemDecoder<R, T> {
+
reader: R,
+
current_timestamp: u64,
+
current_delta: i64,
+
first_item: bool,
+
_item: PhantomData<T>,
+
}
+
+
impl<R: Read, T: Archive> ItemDecoder<R, T> {
+
pub fn new(reader: R, start_timestamp: u64) -> AppResult<Self> {
+
Ok(ItemDecoder {
+
reader,
+
current_timestamp: start_timestamp,
+
current_delta: 0,
+
first_item: true,
+
_item: PhantomData,
+
})
+
}
+
+
pub fn decode(&mut self) -> AppResult<Option<Item<T>>> {
+
if self.first_item {
+
// read the first timestamp
+
// let timestamp = match self.reader.read_varint::<u64>() {
+
// Ok(timestamp) => timestamp,
+
// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
// Err(e) => return Err(e.into()),
+
// };
+
// self.current_timestamp = timestamp;
+
+
let Some(data_raw) = self.read_item()? else {
+
return Ok(None);
+
};
+
self.first_item = false;
+
return Ok(Some(Item {
+
timestamp: self.current_timestamp,
+
data: data_raw,
+
phantom: PhantomData,
+
}));
+
}
+
+
let Some(_delta) = self.read_timestamp()? else {
+
return Ok(None);
+
};
+
+
// read data
+
let data_raw = match self.read_item()? {
+
Some(data_raw) => data_raw,
+
None => {
+
return Err(io::Error::new(
+
io::ErrorKind::UnexpectedEof,
+
"expected data after delta",
+
)
+
.into());
+
}
+
};
+
+
Ok(Some(Item {
+
timestamp: self.current_timestamp,
+
data: data_raw,
+
phantom: PhantomData,
+
}))
+
}
+
+
// [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1]
+
fn read_timestamp(&mut self) -> AppResult<Option<u64>> {
+
let delta = match self.reader.read_varint::<i64>() {
+
Ok(delta) => delta,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
Err(e) => return Err(e.into()),
+
};
+
self.current_delta += delta;
+
self.current_timestamp =
+
(self.current_timestamp as i128 + self.current_delta as i128) as u64;
+
Ok(Some(self.current_timestamp))
+
}
+
+
fn read_item(&mut self) -> AppResult<Option<AlignedVec>> {
+
let data_len = match self.reader.read_varint::<usize>() {
+
Ok(data_len) => data_len,
+
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
+
Err(e) => return Err(e.into()),
+
};
+
let mut data_raw = AlignedVec::with_capacity(data_len);
+
for _ in 0..data_len {
+
data_raw.push(0);
+
}
+
self.reader.read_exact(data_raw.as_mut_slice())?;
+
Ok(Some(data_raw))
+
}
+
}
+
+
impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> {
+
type Item = AppResult<Item<T>>;
+
+
fn next(&mut self) -> Option<Self::Item> {
+
self.decode().transpose()
+
}
+
}
+
+
pub trait WriteVariableExt: Write {
+
fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> {
+
value.encode_variable(self)
+
}
+
}
+
impl<W: Write> WriteVariableExt for W {}
+
+
pub trait ReadVariableExt: Read {
+
fn read_varint<T: Variable>(&mut self) -> io::Result<T> {
+
T::decode_variable(self)
+
}
+
}
+
impl<R: Read> ReadVariableExt for R {}
+
+
#[cfg(test)]
+
mod test {
+
use super::*;
+
use rkyv::{Archive, Deserialize, Serialize};
+
use std::io::Cursor;
+
+
#[derive(Archive, Deserialize, Serialize, Debug, PartialEq)]
+
#[rkyv(compare(PartialEq))]
+
struct TestData {
+
id: u32,
+
value: String,
+
}
+
+
#[test]
+
fn test_encoder_decoder_single_item() {
+
let data = TestData {
+
id: 123,
+
value: "test".to_string(),
+
};
+
+
let item = Item::new(1000, &data);
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
encoder.encode(&item).unwrap();
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_item = decoder.decode().unwrap().unwrap();
+
assert_eq!(decoded_item.timestamp, 1000);
+
+
let decoded_data = decoded_item.access();
+
assert_eq!(decoded_data.id, 123);
+
assert_eq!(decoded_data.value.as_str(), "test");
+
}
+
+
#[test]
+
fn test_encoder_decoder_multiple_items() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "first".to_string(),
+
},
+
),
+
Item::new(
+
1010,
+
&TestData {
+
id: 2,
+
value: "second".to_string(),
+
},
+
),
+
Item::new(
+
1015,
+
&TestData {
+
id: 3,
+
value: "third".to_string(),
+
},
+
),
+
Item::new(
+
1025,
+
&TestData {
+
id: 4,
+
value: "fourth".to_string(),
+
},
+
),
+
];
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let mut decoded_items = Vec::new();
+
while let Some(item) = decoder.decode().unwrap() {
+
decoded_items.push(item);
+
}
+
+
assert_eq!(decoded_items.len(), 4);
+
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
+
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.access().id, decoded.access().id);
+
assert_eq!(
+
original.access().value.as_str(),
+
decoded.access().value.as_str()
+
);
+
}
+
}
+
+
#[test]
+
fn test_encoder_decoder_with_iterator() {
+
let items = vec![
+
Item::new(
+
2000,
+
&TestData {
+
id: 10,
+
value: "a".to_string(),
+
},
+
),
+
Item::new(
+
2005,
+
&TestData {
+
id: 20,
+
value: "b".to_string(),
+
},
+
),
+
Item::new(
+
2012,
+
&TestData {
+
id: 30,
+
value: "c".to_string(),
+
},
+
),
+
];
+
+
// encode
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 3);
+
assert_eq!(decoded_items[0].timestamp, 2000);
+
assert_eq!(decoded_items[1].timestamp, 2005);
+
assert_eq!(decoded_items[2].timestamp, 2012);
+
+
assert_eq!(decoded_items[0].access().id, 10);
+
assert_eq!(decoded_items[1].access().id, 20);
+
assert_eq!(decoded_items[2].access().id, 30);
+
}
+
+
#[test]
+
fn test_delta_compression() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "a".to_string(),
+
},
+
),
+
Item::new(
+
1010,
+
&TestData {
+
id: 2,
+
value: "b".to_string(),
+
},
+
), // delta = 10
+
Item::new(
+
1020,
+
&TestData {
+
id: 3,
+
value: "c".to_string(),
+
},
+
), // delta = 10, delta-of-delta = 0
+
Item::new(
+
1025,
+
&TestData {
+
id: 4,
+
value: "d".to_string(),
+
},
+
), // delta = 5, delta-of-delta = -5
+
];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
// decode and verify
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
for (original, decoded) in items.iter().zip(decoded_items.iter()) {
+
assert_eq!(original.timestamp, decoded.timestamp);
+
assert_eq!(original.access().id, decoded.access().id);
+
}
+
}
+
+
#[test]
+
fn test_empty_decode() {
+
let buffer = Vec::new();
+
let cursor = Cursor::new(buffer);
+
let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let result = decoder.decode().unwrap();
+
assert!(result.is_none());
+
}
+
+
#[test]
+
fn test_backwards_timestamp() {
+
let items = vec![
+
Item::new(
+
1000,
+
&TestData {
+
id: 1,
+
value: "first".to_string(),
+
},
+
),
+
Item::new(
+
900,
+
&TestData {
+
id: 2,
+
value: "second".to_string(),
+
},
+
),
+
];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 2);
+
assert_eq!(decoded_items[0].timestamp, 1000);
+
assert_eq!(decoded_items[1].timestamp, 900);
+
}
+
+
#[test]
+
fn test_different_data_sizes() {
+
let small_data = TestData {
+
id: 1,
+
value: "x".to_string(),
+
};
+
let large_data = TestData {
+
id: 2,
+
value: "a".repeat(1000),
+
};
+
+
let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)];
+
+
let mut buffer = Vec::new();
+
let mut encoder = ItemEncoder::new(&mut buffer);
+
+
for item in &items {
+
encoder.encode(item).unwrap();
+
}
+
encoder.finish().unwrap();
+
+
let cursor = Cursor::new(buffer);
+
let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap();
+
+
let decoded_items: Result<Vec<_>, _> = decoder.collect();
+
let decoded_items = decoded_items.unwrap();
+
+
assert_eq!(decoded_items.len(), 2);
+
assert_eq!(decoded_items[0].access().value.as_str(), "x");
+
assert_eq!(decoded_items[1].access().value.len(), 1000);
+
assert_eq!(decoded_items[1].access().value.as_str(), "a".repeat(1000));
+
}
+
}
+424
server/src/db_old/mod.rs
···
+
use std::{
+
io::Cursor,
+
ops::{Bound, Deref, RangeBounds},
+
path::Path,
+
sync::{
+
Arc,
+
atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering},
+
},
+
time::{Duration, Instant},
+
};
+
+
use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice};
+
use ordered_varint::Variable;
+
use rkyv::{Archive, Deserialize, Serialize, rancor::Error};
+
use smol_str::SmolStr;
+
use tokio::sync::broadcast;
+
+
use crate::{
+
db_old::block::{ReadVariableExt, WriteVariableExt},
+
error::{AppError, AppResult},
+
jetstream::JetstreamEvent,
+
utils::{DefaultRateTracker, get_time},
+
};
+
+
mod block;
+
+
#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
+
#[rkyv(compare(PartialEq), derive(Debug))]
+
pub struct NsidCounts {
+
pub count: u128,
+
pub deleted_count: u128,
+
pub last_seen: u64,
+
}
+
+
#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)]
+
#[rkyv(compare(PartialEq), derive(Debug))]
+
pub struct NsidHit {
+
pub deleted: bool,
+
}
+
+
#[derive(Clone)]
+
pub struct EventRecord {
+
pub nsid: SmolStr,
+
pub timestamp: u64, // seconds
+
pub deleted: bool,
+
}
+
+
impl EventRecord {
+
pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> {
+
match event {
+
JetstreamEvent::Commit {
+
time_us, commit, ..
+
} => Some(Self {
+
nsid: commit.collection.into(),
+
timestamp: time_us / 1_000_000,
+
deleted: false,
+
}),
+
JetstreamEvent::Delete {
+
time_us, commit, ..
+
} => Some(Self {
+
nsid: commit.collection.into(),
+
timestamp: time_us / 1_000_000,
+
deleted: true,
+
}),
+
_ => None,
+
}
+
}
+
}
+
+
type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>;
+
type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>;
+
type Item = block::Item<NsidHit>;
+
+
pub struct LexiconHandle {
+
tree: Partition,
+
buf: Arc<scc::Queue<EventRecord>>,
+
buf_len: AtomicUsize,
+
last_insert: AtomicU64,
+
eps: DefaultRateTracker,
+
block_size: AtomicUsize,
+
}
+
+
impl LexiconHandle {
+
fn new(keyspace: &Keyspace, nsid: &str) -> Self {
+
let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9));
+
Self {
+
tree: keyspace.open_partition(nsid, opts).unwrap(),
+
buf: Default::default(),
+
buf_len: AtomicUsize::new(0),
+
last_insert: AtomicU64::new(0),
+
eps: DefaultRateTracker::new(Duration::from_secs(5)),
+
block_size: AtomicUsize::new(1000),
+
}
+
}
+
+
fn item_count(&self) -> usize {
+
self.buf_len.load(AtomicOrdering::Acquire)
+
}
+
+
fn last_insert(&self) -> u64 {
+
self.last_insert.load(AtomicOrdering::Acquire)
+
}
+
+
fn suggested_block_size(&self) -> usize {
+
self.block_size.load(AtomicOrdering::Relaxed)
+
}
+
+
fn insert(&self, event: EventRecord) {
+
self.buf.push(event);
+
self.buf_len.fetch_add(1, AtomicOrdering::Release);
+
self.last_insert
+
.store(get_time().as_millis() as u64, AtomicOrdering::Release);
+
self.eps.observe(1);
+
let rate = self.eps.rate() as usize;
+
if rate != 0 {
+
self.block_size.store(rate * 60, AtomicOrdering::Relaxed);
+
}
+
}
+
+
fn sync(&self, max_block_size: usize) -> AppResult<usize> {
+
let mut writer = ItemEncoder::new(Vec::with_capacity(
+
size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>(),
+
));
+
let mut start_timestamp = None;
+
let mut end_timestamp = None;
+
let mut written = 0_usize;
+
while let Some(event) = self.buf.pop() {
+
let item = Item::new(
+
event.timestamp,
+
&NsidHit {
+
deleted: event.deleted,
+
},
+
);
+
writer.encode(&item)?;
+
if start_timestamp.is_none() {
+
start_timestamp = Some(event.timestamp);
+
}
+
end_timestamp = Some(event.timestamp);
+
if written >= max_block_size {
+
break;
+
}
+
written += 1;
+
}
+
if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) {
+
self.buf_len.store(0, AtomicOrdering::Release);
+
let value = writer.finish()?;
+
let mut key = Vec::with_capacity(size_of::<u64>() * 2);
+
key.write_varint(start_timestamp)?;
+
key.write_varint(end_timestamp)?;
+
self.tree.insert(key, value)?;
+
}
+
Ok(written)
+
}
+
}
+
+
type BoxedIter<T> = Box<dyn Iterator<Item = T>>;
+
+
// counts is nsid -> NsidCounts
+
// hits is tree per nsid: varint start time + varint end time -> block of hits
+
pub struct Db {
+
inner: Keyspace,
+
hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>,
+
counts: Partition,
+
event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>,
+
eps: DefaultRateTracker,
+
min_block_size: usize,
+
max_block_size: usize,
+
max_last_activity: Duration,
+
}
+
+
impl Db {
+
pub fn new(path: impl AsRef<Path>) -> AppResult<Self> {
+
tracing::info!("opening db...");
+
let ks = Config::new(path)
+
.cache_size(8 * 1024 * 1024) // from talna
+
.open()?;
+
Ok(Self {
+
hits: Default::default(),
+
counts: ks.open_partition(
+
"_counts",
+
PartitionCreateOptions::default().compression(fjall::CompressionType::None),
+
)?,
+
inner: ks,
+
event_broadcaster: broadcast::channel(1000).0,
+
eps: DefaultRateTracker::new(Duration::from_secs(1)),
+
min_block_size: 512,
+
max_block_size: 100_000,
+
max_last_activity: Duration::from_secs(10),
+
})
+
}
+
+
pub fn sync(&self, all: bool) -> AppResult<()> {
+
let _guard = scc::ebr::Guard::new();
+
for (nsid, tree) in self.hits.iter(&_guard) {
+
let count = tree.item_count();
+
let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size());
+
let is_too_old = (get_time().as_millis() as u64 - tree.last_insert())
+
> self.max_last_activity.as_millis() as u64;
+
if count > 0 && (all || is_max_block_size || is_too_old) {
+
loop {
+
let synced = tree.sync(self.max_block_size)?;
+
if synced == 0 {
+
break;
+
}
+
tracing::info!("synced {synced} of {nsid} to db");
+
}
+
}
+
}
+
Ok(())
+
}
+
+
#[inline(always)]
+
pub fn eps(&self) -> usize {
+
self.eps.rate() as usize
+
}
+
+
#[inline(always)]
+
pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> {
+
self.event_broadcaster.subscribe()
+
}
+
+
#[inline(always)]
+
fn maybe_run_in_nsid_tree<T>(
+
&self,
+
nsid: &str,
+
f: impl FnOnce(&LexiconHandle) -> T,
+
) -> Option<T> {
+
let _guard = scc::ebr::Guard::new();
+
let handle = match self.hits.peek(nsid, &_guard) {
+
Some(handle) => handle.clone(),
+
None => {
+
if self.inner.partition_exists(nsid) {
+
let handle = Arc::new(LexiconHandle::new(&self.inner, nsid));
+
let _ = self.hits.insert(SmolStr::new(nsid), handle.clone());
+
handle
+
} else {
+
return None;
+
}
+
}
+
};
+
Some(f(&handle))
+
}
+
+
#[inline(always)]
+
fn run_in_nsid_tree<T>(
+
&self,
+
nsid: SmolStr,
+
f: impl FnOnce(&LexiconHandle) -> AppResult<T>,
+
) -> AppResult<T> {
+
f(self
+
.hits
+
.entry(nsid.clone())
+
.or_insert_with(move || Arc::new(LexiconHandle::new(&self.inner, &nsid)))
+
.get())
+
}
+
+
pub fn record_event(&self, e: EventRecord) -> AppResult<()> {
+
let EventRecord {
+
nsid,
+
timestamp,
+
deleted,
+
} = e.clone();
+
+
// insert event
+
self.run_in_nsid_tree(nsid.clone(), move |tree| Ok(tree.insert(e)))?;
+
// increment count
+
let mut counts = self.get_count(&nsid)?;
+
counts.last_seen = timestamp;
+
if deleted {
+
counts.deleted_count += 1;
+
} else {
+
counts.count += 1;
+
}
+
self.insert_count(&nsid, counts.clone())?;
+
if self.event_broadcaster.receiver_count() > 0 {
+
let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts));
+
}
+
self.eps.observe(1);
+
Ok(())
+
}
+
+
#[inline(always)]
+
fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> {
+
self.counts
+
.insert(
+
nsid,
+
unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(),
+
)
+
.map_err(AppError::from)
+
}
+
+
pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> {
+
let Some(raw) = self.counts.get(nsid)? else {
+
return Ok(NsidCounts::default());
+
};
+
Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() })
+
}
+
+
pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> {
+
self.counts.iter().map(|res| {
+
res.map_err(AppError::from).map(|(key, val)| {
+
(
+
SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }),
+
unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() },
+
)
+
})
+
})
+
}
+
+
pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str> + 'static> {
+
self.inner
+
.list_partitions()
+
.into_iter()
+
.filter(|k| k.deref() != "_counts")
+
}
+
+
pub fn get_hits_debug(&self, nsid: &str) -> BoxedIter<AppResult<(Slice, Slice)>> {
+
self.maybe_run_in_nsid_tree(nsid, |handle| -> BoxedIter<AppResult<(Slice, Slice)>> {
+
Box::new(
+
handle
+
.tree
+
.iter()
+
.rev()
+
.map(|res| res.map_err(AppError::from)),
+
)
+
})
+
.unwrap_or_else(|| Box::new(std::iter::empty()))
+
}
+
+
pub fn get_hits(
+
&self,
+
nsid: &str,
+
range: impl RangeBounds<u64> + std::fmt::Debug,
+
) -> BoxedIter<AppResult<Item>> {
+
let start = range
+
.start_bound()
+
.cloned()
+
.map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() });
+
let end = range
+
.end_bound()
+
.cloned()
+
.map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() });
+
let limit = match range.end_bound().cloned() {
+
Bound::Included(end) => end,
+
Bound::Excluded(end) => end.saturating_sub(1),
+
Bound::Unbounded => u64::MAX,
+
};
+
+
self.maybe_run_in_nsid_tree(nsid, move |handle| -> BoxedIter<AppResult<Item>> {
+
let map_block = move |(key, val)| {
+
let mut key_reader = Cursor::new(key);
+
let start_timestamp = key_reader.read_varint::<u64>()?;
+
let items =
+
ItemDecoder::new(Cursor::new(val), start_timestamp)?.take_while(move |item| {
+
item.as_ref().map_or(true, |item| item.timestamp <= limit)
+
});
+
Ok(items)
+
};
+
+
Box::new(
+
handle
+
.tree
+
.range(TimestampRange { start, end })
+
.map(move |res| res.map_err(AppError::from).and_then(map_block))
+
.flatten()
+
.flatten(),
+
)
+
})
+
.unwrap_or_else(|| Box::new(std::iter::empty()))
+
}
+
+
pub fn tracking_since(&self) -> AppResult<u64> {
+
// HACK: we should actually store when we started tracking but im lazy
+
// should be accurate enough
+
self.maybe_run_in_nsid_tree("app.bsky.feed.like", |handle| {
+
let Some((timestamps_raw, _)) = handle.tree.first_key_value()? else {
+
return Ok(0);
+
};
+
let mut timestamp_reader = Cursor::new(timestamps_raw);
+
timestamp_reader
+
.read_varint::<u64>()
+
.map_err(AppError::from)
+
})
+
.unwrap_or(Ok(0))
+
}
+
}
+
+
type TimestampRepr = Vec<u8>;
+
+
struct TimestampRange {
+
start: Bound<TimestampRepr>,
+
end: Bound<TimestampRepr>,
+
}
+
+
impl RangeBounds<TimestampRepr> for TimestampRange {
+
#[inline(always)]
+
fn start_bound(&self) -> Bound<&TimestampRepr> {
+
self.start.as_ref()
+
}
+
+
#[inline(always)]
+
fn end_bound(&self) -> Bound<&TimestampRepr> {
+
self.end.as_ref()
+
}
+
}
+
+
type TimestampReprOld = [u8; 8];
+
+
struct TimestampRangeOld {
+
start: Bound<TimestampReprOld>,
+
end: Bound<TimestampReprOld>,
+
}
+
+
impl RangeBounds<TimestampReprOld> for TimestampRangeOld {
+
#[inline(always)]
+
fn start_bound(&self) -> Bound<&TimestampReprOld> {
+
self.start.as_ref()
+
}
+
+
#[inline(always)]
+
fn end_bound(&self) -> Bound<&TimestampReprOld> {
+
self.end.as_ref()
+
}
+
}
+230 -40
server/src/main.rs
···
-
use std::{ops::Deref, sync::Arc};
+
use std::{ops::Deref, time::Duration, u64};
+
use itertools::Itertools;
+
use rclite::Arc;
use smol_str::ToSmolStr;
-
#[cfg(not(target_env = "msvc"))]
-
use tikv_jemallocator::Jemalloc;
use tokio_util::sync::CancellationToken;
use tracing::Level;
use tracing_subscriber::EnvFilter;
use crate::{
api::serve,
-
db::{Db, EventRecord},
+
db::{Db, DbConfig, EventRecord},
error::AppError,
jetstream::JetstreamClient,
+
utils::{CLOCK, RelativeDateTime, get_time},
};
mod api;
mod db;
+
mod db_old;
mod error;
mod jetstream;
+
mod utils;
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
-
static GLOBAL: Jemalloc = Jemalloc;
+
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
+
+
#[cfg(target_env = "msvc")]
+
#[global_allocator]
+
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
#[tokio::main]
async fn main() {
···
.compact()
.init();
-
if std::env::args()
-
.nth(1)
-
.map_or(false, |arg| arg == "migrate")
-
{
-
migrate_to_miniz();
-
return;
+
match std::env::args().nth(1).as_deref() {
+
Some("compact") => {
+
compact();
+
return;
+
}
+
Some("migrate") => {
+
migrate();
+
return;
+
}
+
Some("debug") => {
+
debug();
+
return;
+
}
+
Some(x) => {
+
tracing::error!("unknown command: {}", x);
+
return;
+
}
+
None => {}
}
-
let db = Arc::new(Db::new(".fjall_data").expect("couldnt create db"));
+
let cancel_token = CancellationToken::new();
+
+
let db = Arc::new(
+
Db::new(DbConfig::default(), cancel_token.child_token()).expect("couldnt create db"),
+
);
rustls::crypto::ring::default_provider()
.install_default()
···
}
};
-
let cancel_token = CancellationToken::new();
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(1000);
-
let consume_events = tokio::spawn({
let consume_cancel = cancel_token.child_token();
async move {
···
let Some(record) = EventRecord::from_jetstream(event) else {
continue;
};
-
let _ = event_tx.send(record).await;
+
event_tx.send(record).await?;
}
Err(err) => return Err(err),
},
···
let ingest_events = std::thread::spawn({
let db = db.clone();
move || {
-
tracing::info!("starting ingest events thread...");
-
while let Some(e) = event_rx.blocking_recv() {
-
if let Err(e) = db.record_event(e) {
-
tracing::error!("failed to record event: {}", e);
+
let mut buffer = Vec::new();
+
loop {
+
let read = event_rx.blocking_recv_many(&mut buffer, 100);
+
if let Err(err) = db.ingest_events(buffer.drain(..)) {
+
tracing::error!("failed to ingest events: {}", err);
+
}
+
if read == 0 || db.is_shutting_down() {
+
break;
+
}
+
}
+
}
+
});
+
+
let db_task = tokio::task::spawn({
+
let db = db.clone();
+
async move {
+
let sync_period = Duration::from_secs(10);
+
let mut sync_interval = tokio::time::interval(sync_period);
+
sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
+
+
let compact_period = std::time::Duration::from_secs(60 * 30); // 30 mins
+
let mut compact_interval = tokio::time::interval(compact_period);
+
compact_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
+
+
loop {
+
let sync_db = async || {
+
tokio::task::spawn_blocking({
+
let db = db.clone();
+
move || {
+
if db.is_shutting_down() {
+
return;
+
}
+
match db.sync(false) {
+
Ok(_) => (),
+
Err(e) => tracing::error!("failed to sync db: {}", e),
+
}
+
}
+
})
+
.await
+
.unwrap();
+
};
+
let compact_db = async || {
+
tokio::task::spawn_blocking({
+
let db = db.clone();
+
move || {
+
if db.is_shutting_down() {
+
return;
+
}
+
let end = get_time() - compact_period / 2;
+
let start = end - compact_period;
+
let range = start.as_secs()..end.as_secs();
+
tracing::info!(
+
{
+
start = %RelativeDateTime::from_now(start),
+
end = %RelativeDateTime::from_now(end),
+
},
+
"running compaction...",
+
);
+
match db.compact_all(db.cfg.max_block_size, range, false) {
+
Ok(_) => (),
+
Err(e) => tracing::error!("failed to compact db: {}", e),
+
}
+
}
+
})
+
.await
+
.unwrap();
+
};
+
tokio::select! {
+
_ = sync_interval.tick() => sync_db().await,
+
_ = compact_interval.tick() => compact_db().await,
+
_ = db.shutting_down() => break,
}
}
}
});
tokio::select! {
-
res = serve(db, cancel_token.child_token()) => {
+
res = serve(db.clone(), cancel_token.child_token()) => {
if let Err(e) = res {
tracing::error!("serve failed: {}", e);
}
···
}
tracing::info!("shutting down...");
-
ingest_events
-
.join()
-
.expect("failed to join ingest events thread");
+
cancel_token.cancel();
+
ingest_events.join().expect("failed to join ingest events");
+
db_task.await.expect("cant join db task");
+
db.sync(true).expect("cant sync db");
}
-
fn migrate_to_miniz() {
-
let from = Db::new(".fjall_data").expect("couldnt create db");
-
let to = Db::new(".fjall_data_miniz").expect("couldnt create db");
+
fn debug() {
+
let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db");
+
let info = db.info().expect("cant get db info");
+
println!("disk size: {}", info.disk_size);
+
for (nsid, blocks) in info.nsids {
+
print!("{nsid}:");
+
let mut last_size = 0;
+
let mut same_size_count = 0;
+
for item_count in blocks {
+
if item_count == last_size {
+
same_size_count += 1;
+
} else {
+
if same_size_count > 1 {
+
print!("x{}", same_size_count);
+
}
+
print!(" {item_count}");
+
same_size_count = 0;
+
}
+
last_size = item_count;
+
}
+
print!("\n");
+
}
+
}
+
+
fn compact() {
+
let db = Db::new(
+
DbConfig::default().ks(|c| {
+
c.max_journaling_size(u64::MAX)
+
.max_write_buffer_size(u64::MAX)
+
}),
+
CancellationToken::new(),
+
)
+
.expect("couldnt create db");
+
let info = db.info().expect("cant get db info");
+
db.major_compact().expect("cant compact");
+
std::thread::sleep(Duration::from_secs(5));
+
let compacted_info = db.info().expect("cant get db info");
+
println!(
+
"disk size: {} -> {}",
+
info.disk_size, compacted_info.disk_size
+
);
+
for (nsid, blocks) in info.nsids {
+
println!(
+
"{nsid}: {} -> {}",
+
blocks.len(),
+
compacted_info.nsids[&nsid].len()
+
)
+
}
+
}
+
+
fn migrate() {
+
let cancel_token = CancellationToken::new();
+
+
let from = Arc::new(db_old::Db::new(".fjall_data_from").expect("couldnt create db"));
+
+
let to = Arc::new(
+
Db::new(
+
DbConfig::default().path(".fjall_data_to").ks(|c| {
+
c.max_journaling_size(u64::MAX)
+
.max_write_buffer_size(u64::MAX)
+
.compaction_workers(rayon::current_num_threads() * 4)
+
.flush_workers(rayon::current_num_threads() * 4)
+
}),
+
cancel_token.child_token(),
+
)
+
.expect("couldnt create db"),
+
);
+
let nsids = from.get_nsids().collect::<Vec<_>>();
+
let eps_thread = std::thread::spawn({
+
let to = to.clone();
+
move || {
+
loop {
+
std::thread::sleep(Duration::from_secs(3));
+
let eps = to.eps();
+
if eps > 0 {
+
tracing::info!("{} rps", eps);
+
}
+
}
+
}
+
});
+
let mut threads = Vec::with_capacity(nsids.len());
+
let start = CLOCK.now();
+
for nsid in nsids {
+
let from = from.clone();
+
let to = to.clone();
+
threads.push(std::thread::spawn(move || {
+
tracing::info!("{}: migrating...", nsid.deref());
+
let mut count = 0_u64;
+
for hits in from.get_hits(&nsid, ..).chunks(100000).into_iter() {
+
to.ingest_events(hits.map(|hit| {
+
count += 1;
+
let hit = hit.expect("cant decode hit");
+
EventRecord {
+
nsid: nsid.to_smolstr(),
+
timestamp: hit.timestamp,
+
deleted: hit.deser().unwrap().deleted,
+
}
+
}))
+
.expect("cant record event");
+
}
+
tracing::info!("{}: ingested {} events...", nsid.deref(), count);
+
count
+
}));
+
}
let mut total_count = 0_u64;
-
for nsid in from.get_nsids() {
-
tracing::info!("migrating {} ...", nsid.deref());
-
for hit in from.get_hits(&nsid).expect("cant read hits") {
-
let (timestamp, data) = hit.expect("cant read event");
-
to.record_event(EventRecord {
-
nsid: nsid.to_smolstr(),
-
timestamp,
-
deleted: data.deleted,
-
})
-
.expect("cant record event");
-
total_count += 1;
-
}
+
for thread in threads {
+
let count = thread.join().expect("thread panicked");
+
total_count += count;
}
-
-
tracing::info!("migrated {total_count} events!");
+
let read_time = start.elapsed();
+
let read_per_second = total_count as f64 / read_time.as_secs_f64();
+
drop(from);
+
tracing::info!("starting sync!!!");
+
to.sync(true).expect("cant sync");
+
tracing::info!("persisting...");
+
let total_time = start.elapsed();
+
let write_per_second = total_count as f64 / (total_time - read_time).as_secs_f64();
+
tracing::info!(
+
"migrated {total_count} events in {total_time:?} ({read_per_second:.2} rps, {write_per_second:.2} wps)"
+
);
}
+322
server/src/utils.rs
···
+
use std::io::{self, Read, Write};
+
use std::sync::atomic::{AtomicU64, Ordering};
+
use std::time::Duration;
+
+
use byteview::ByteView;
+
use ordered_varint::Variable;
+
+
pub fn get_time() -> Duration {
+
std::time::SystemTime::now()
+
.duration_since(std::time::UNIX_EPOCH)
+
.unwrap()
+
}
+
+
pub trait WriteVariableExt: Write {
+
fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> {
+
value.encode_variable(self)
+
}
+
}
+
impl<W: Write> WriteVariableExt for W {}
+
+
pub trait ReadVariableExt: Read {
+
fn read_varint<T: Variable>(&mut self) -> io::Result<T> {
+
T::decode_variable(self)
+
}
+
}
+
impl<R: Read> ReadVariableExt for R {}
+
+
pub struct WritableByteView {
+
view: ByteView,
+
written: usize,
+
}
+
+
impl WritableByteView {
+
// returns None if the view already has a reference to it
+
pub fn with_size(capacity: usize) -> Self {
+
Self {
+
view: ByteView::with_size(capacity),
+
written: 0,
+
}
+
}
+
+
#[inline(always)]
+
pub fn into_inner(self) -> ByteView {
+
self.view
+
}
+
}
+
+
impl Write for WritableByteView {
+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+
let len = buf.len();
+
if len > self.view.len() - self.written {
+
return Err(std::io::Error::new(
+
std::io::ErrorKind::StorageFull,
+
"buffer full",
+
));
+
}
+
// SAFETY: this is safe because we have checked that the buffer is not full
+
// SAFETY: we own the mutator so no other references to the view exist
+
unsafe {
+
std::ptr::copy_nonoverlapping(
+
buf.as_ptr(),
+
self.view
+
.get_mut()
+
.unwrap_unchecked()
+
.as_mut_ptr()
+
.add(self.written),
+
len,
+
);
+
self.written += len;
+
}
+
Ok(len)
+
}
+
+
#[inline(always)]
+
fn flush(&mut self) -> std::io::Result<()> {
+
Ok(())
+
}
+
}
+
+
pub fn varints_unsigned_encoded<const N: usize>(values: [u64; N]) -> ByteView {
+
let mut buf =
+
WritableByteView::with_size(values.into_iter().map(varint_unsigned_encoded_len).sum());
+
for value in values {
+
// cant fail
+
let _ = buf.write_varint(value);
+
}
+
buf.into_inner()
+
}
+
+
// gets the encoded length of a varint-encoded unsigned integer
+
// see ordered_varint
+
pub fn varint_unsigned_encoded_len(value: u64) -> usize {
+
let value = value.to_be_bytes();
+
value
+
.iter()
+
.enumerate()
+
.find_map(|(index, &byte)| {
+
(byte > 0).then(|| {
+
let extra_bytes = 7 - index;
+
(byte < 16)
+
.then(|| extra_bytes + 1)
+
.unwrap_or_else(|| extra_bytes + 2)
+
})
+
})
+
.unwrap_or(0)
+
.max(1)
+
}
+
+
pub static CLOCK: std::sync::LazyLock<quanta::Clock> =
+
std::sync::LazyLock::new(|| quanta::Clock::new());
+
+
/// simple thread-safe rate tracker using time buckets
+
/// divides time into fixed buckets and rotates through them
+
#[derive(Debug)]
+
pub struct RateTracker<const BUCKET_WINDOW: u64> {
+
buckets: Vec<AtomicU64>,
+
last_bucket_time: AtomicU64,
+
bucket_duration_nanos: u64,
+
window_duration: Duration,
+
start_time: u64, // raw time when tracker was created
+
}
+
+
pub type DefaultRateTracker = RateTracker<1000>;
+
+
impl<const BUCKET_WINDOW: u64> RateTracker<BUCKET_WINDOW> {
+
/// create a new rate tracker with the specified time window
+
pub fn new(window_duration: Duration) -> Self {
+
let bucket_duration_nanos = Duration::from_millis(BUCKET_WINDOW).as_nanos() as u64;
+
let num_buckets =
+
(window_duration.as_nanos() as u64 / bucket_duration_nanos).max(1) as usize;
+
+
let mut buckets = Vec::with_capacity(num_buckets);
+
for _ in 0..num_buckets {
+
buckets.push(AtomicU64::new(0));
+
}
+
+
let start_time = CLOCK.raw();
+
Self {
+
buckets,
+
bucket_duration_nanos,
+
window_duration,
+
last_bucket_time: AtomicU64::new(0),
+
start_time,
+
}
+
}
+
+
#[inline(always)]
+
fn elapsed(&self) -> u64 {
+
CLOCK.delta_as_nanos(self.start_time, CLOCK.raw())
+
}
+
+
/// record an event
+
pub fn observe(&self, count: u64) {
+
self.maybe_advance_buckets();
+
+
let bucket_index = self.get_current_bucket_index();
+
self.buckets[bucket_index].fetch_add(count, Ordering::Relaxed);
+
}
+
+
/// get the current rate in events per second
+
pub fn rate(&self) -> f64 {
+
self.maybe_advance_buckets();
+
+
let total_events: u64 = self
+
.buckets
+
.iter()
+
.map(|bucket| bucket.load(Ordering::Relaxed))
+
.sum();
+
+
total_events as f64 / self.window_duration.as_secs_f64()
+
}
+
+
fn get_current_bucket_index(&self) -> usize {
+
let bucket_number = self.elapsed() / self.bucket_duration_nanos;
+
(bucket_number as usize) % self.buckets.len()
+
}
+
+
fn maybe_advance_buckets(&self) {
+
let current_bucket_time =
+
(self.elapsed() / self.bucket_duration_nanos) * self.bucket_duration_nanos;
+
let last_bucket_time = self.last_bucket_time.load(Ordering::Relaxed);
+
+
if current_bucket_time > last_bucket_time {
+
// try to update the last bucket time
+
if self
+
.last_bucket_time
+
.compare_exchange_weak(
+
last_bucket_time,
+
current_bucket_time,
+
Ordering::Relaxed,
+
Ordering::Relaxed,
+
)
+
.is_ok()
+
{
+
// clear buckets that are now too old
+
let buckets_to_advance = ((current_bucket_time - last_bucket_time)
+
/ self.bucket_duration_nanos)
+
.min(self.buckets.len() as u64);
+
+
for i in 0..buckets_to_advance {
+
let bucket_time = last_bucket_time + (i + 1) * self.bucket_duration_nanos;
+
let bucket_index =
+
(bucket_time / self.bucket_duration_nanos) as usize % self.buckets.len();
+
self.buckets[bucket_index].store(0, Ordering::Relaxed);
+
}
+
}
+
}
+
}
+
}
+
+
#[cfg(test)]
+
mod tests {
+
use super::*;
+
use std::sync::Arc;
+
use std::thread;
+
+
#[test]
+
fn test_rate_tracker_basic() {
+
let tracker = DefaultRateTracker::new(Duration::from_secs(2));
+
+
// record some events
+
tracker.observe(3);
+
+
let rate = tracker.rate();
+
assert_eq!(rate, 1.5); // 3 events over 2 seconds = 1.5 events/sec
+
}
+
+
#[test]
+
fn test_rate_tracker_burst() {
+
let tracker = DefaultRateTracker::new(Duration::from_secs(1));
+
+
// record a lot of events
+
tracker.observe(1000);
+
+
let rate = tracker.rate();
+
assert_eq!(rate, 1000.0); // 1000 events in 1 second
+
}
+
+
#[test]
+
fn test_rate_tracker_threading() {
+
let tracker = Arc::new(DefaultRateTracker::new(Duration::from_secs(1)));
+
let mut handles = vec![];
+
+
for _ in 0..4 {
+
let tracker_clone = Arc::clone(&tracker);
+
let handle = thread::spawn(move || {
+
tracker_clone.observe(10);
+
});
+
handles.push(handle);
+
}
+
+
for handle in handles {
+
handle.join().unwrap();
+
}
+
+
let rate = tracker.rate();
+
assert_eq!(rate, 40.0); // 40 events in 1 second
+
}
+
}
+
+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+
pub enum TimeDirection {
+
Backwards, // Past (default)
+
Forwards, // Future
+
}
+
+
impl Default for TimeDirection {
+
fn default() -> Self {
+
TimeDirection::Backwards
+
}
+
}
+
+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct RelativeDateTime {
+
duration: Duration,
+
direction: TimeDirection,
+
}
+
+
impl RelativeDateTime {
+
pub fn new(duration: Duration, direction: TimeDirection) -> Self {
+
Self {
+
duration,
+
direction,
+
}
+
}
+
+
pub fn from_now(duration: Duration) -> Self {
+
let cur = get_time();
+
if duration > cur {
+
Self::new(duration - cur, TimeDirection::Forwards)
+
} else {
+
Self::new(cur - duration, TimeDirection::Backwards)
+
}
+
}
+
}
+
+
impl std::fmt::Display for RelativeDateTime {
+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
let secs = self.duration.as_secs();
+
+
if secs == 0 {
+
return write!(f, "now");
+
}
+
+
let (amount, unit) = match secs {
+
0 => unreachable!(), // handled above
+
1..=59 => (secs, "second"),
+
60..=3599 => (secs / 60, "minute"),
+
3600..=86399 => (secs / 3600, "hour"),
+
86400..=2591999 => (secs / 86400, "day"), // up to 29 days
+
2592000..=31535999 => (secs / 2592000, "month"), // 30 days to 364 days
+
_ => (secs / 31536000, "year"), // 365 days+
+
};
+
+
let plural = if amount != 1 { "s" } else { "" };
+
+
match self.direction {
+
TimeDirection::Forwards => write!(f, "in {} {}{}", amount, unit, plural),
+
TimeDirection::Backwards => write!(f, "{} {}{} ago", amount, unit, plural),
+
}
+
}
+
}