Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

metrics! server and previously-logged stats

logs are still there but they're moved to `trace`.

+25 -3
Cargo.lock
···
"links",
"mediatype",
"metrics",
-
"metrics-exporter-prometheus",
"metrics-process",
"num-format",
"ratelimit",
···
[[package]]
name = "metrics"
-
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "7a7deb012b3b2767169ff203fadb4c6b0b82b947512e5eb9e0b78c2e186ad9e3"
dependencies = [
"ahash",
"portable-atomic",
···
"metrics-util",
"quanta",
"thiserror 1.0.69",
"tokio",
"tracing",
]
···
"jetstream",
"log",
"lsm-tree",
"schemars",
"semver",
"serde",
···
"links",
"mediatype",
"metrics",
+
"metrics-exporter-prometheus 0.16.2",
"metrics-process",
"num-format",
"ratelimit",
···
[[package]]
name = "metrics"
+
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "25dea7ac8057892855ec285c440160265225438c3c45072613c25a4b26e98ef5"
dependencies = [
"ahash",
"portable-atomic",
···
"metrics-util",
"quanta",
"thiserror 1.0.69",
+
"tokio",
+
"tracing",
+
]
+
+
[[package]]
+
name = "metrics-exporter-prometheus"
+
version = "0.17.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "df88858cd28baaaf2cfc894e37789ed4184be0e1351157aec7bf3c2266c793fd"
+
dependencies = [
+
"base64 0.22.1",
+
"http-body-util",
+
"hyper",
+
"hyper-util",
+
"indexmap 2.9.0",
+
"ipnet",
+
"metrics",
+
"metrics-util",
+
"quanta",
+
"thiserror 2.0.12",
"tokio",
"tracing",
]
···
"jetstream",
"log",
"lsm-tree",
+
"metrics",
+
"metrics-exporter-prometheus 0.17.0",
"schemars",
"semver",
"serde",
+1 -1
cozy-setup (move to another repo).md
···
- systemd unit for running: `sudo nano /etc/systemd/system/constellation.service`
-
```toml
[Unit]
Description=Constellation backlinks index
After=network.target
···
- systemd unit for running: `sudo nano /etc/systemd/system/constellation.service`
+
```ini
[Unit]
Description=Constellation backlinks index
After=network.target
+11
jetstream/src/events.rs
···
let t: SystemTime = self.into();
t.elapsed()
}
/// Get the immediate next cursor value
///
/// This is possible for the implementation of jetstream cursors
···
let t: SystemTime = self.into();
t.elapsed()
}
+
/// Compute the age of the cursor vs the local clock
+
///
+
/// Converts the resulting duration into an f64, which can be negative!
+
///
+
/// Warning: this exploits the internal implementation detail of jetstream cursors
+
pub fn elapsed_micros_f64(&self) -> f64 {
+
match self.elapsed() {
+
Ok(d) => d.as_micros() as f64,
+
Err(e) => -(e.duration().as_micros() as f64),
+
}
+
}
/// Get the immediate next cursor value
///
/// This is possible for the implementation of jetstream cursors
+196
ufos ops (move to micro-ops).md
···
···
+
ufos ops
+
+
btrfs snapshots: snapper
+
+
```bash
+
sudo apt install snapper
+
sudo snapper -c ufos-db create-config /mnt/ufos-db
+
+
# edit /etc/snapper/configs/ufos-db
+
# change
+
TIMELINE_MIN_AGE="1800"
+
TIMELINE_LIMIT_HOURLY="10"
+
TIMELINE_LIMIT_DAILY="10"
+
TIMELINE_LIMIT_WEEKLY="0"
+
TIMELINE_LIMIT_MONTHLY="10"
+
TIMELINE_LIMIT_YEARLY="10"
+
# to
+
TIMELINE_MIN_AGE="1800"
+
TIMELINE_LIMIT_HOURLY="22"
+
TIMELINE_LIMIT_DAILY="4"
+
TIMELINE_LIMIT_WEEKLY="0"
+
TIMELINE_LIMIT_MONTHLY="0"
+
TIMELINE_LIMIT_YEARLY="0"
+
```
+
+
this should be enough?
+
+
list snapshots:
+
+
```bash
+
sudo snapper -c ufos-db list
+
```
+
+
systemd
+
+
create file: `/etc/systemd/system/ufos.service`
+
+
```ini
+
[Unit]
+
Description=UFOs-API
+
After=network.target
+
+
[Service]
+
User=pi
+
WorkingDirectory=/home/pi/
+
ExecStart=/home/pi/ufos --jetstream us-west-2 --data /mnt/ufos-db/
+
Environment="RUST_LOG=info"
+
LimitNOFILE=16384
+
Restart=always
+
+
[Install]
+
WantedBy=multi-user.target
+
```
+
+
then
+
+
```bash
+
sudo systemctl daemon-reload
+
sudo systemctl enable ufos
+
sudo systemctl start ufos
+
```
+
+
monitor with
+
+
```bash
+
journalctl -u ufos -f
+
```
+
+
make sure a backup dir exists
+
+
```bash
+
mkdir /home/pi/backup
+
```
+
+
mount the NAS
+
+
```bash
+
sudo mount.cifs "//truenas.local/folks data" /home/pi/backup -o user=phil,uid=pi
+
```
+
+
manual rsync
+
+
```bash
+
sudo rsync -ahP --delete /mnt/ufos-db/.snapshots/1/snapshot/ backup/ufos/
+
```
+
+
backup script sketch
+
+
```bash
+
NUM=$(sudo snapper --csvout -c ufos-db list --type single --columns number | tail -n1)
+
sudo rsync -ahP --delete "/mnt/ufos-db/.snapshots/${NUM}/snapshot/" backup/ufos/
+
```
+
+
just crontab it?
+
+
`sudo crontab -e`
+
```bash
+
0 1/6 * * * rsync -ahP --delete "/mnt/ufos-db/.snapshots/$(sudo snapper --csvout -c ufos-db list --columns number | tail -n1)/snapshot/" backup/ufos/
+
```
+
+
^^ try once initial backup is done
+
+
+
--columns subvolume,number
+
+
subvolume
+
number
+
+
+
+
+
gateway: follow constellation for nginx->prom thing
+
+
config at `/etc/prometheus-nginxlog-exporter.hcl`
+
+
before: `/etc/prometheus-nginxlog-exporter.hcl`
+
+
```hcl
+
listen {
+
port = 4044
+
}
+
+
namespace "nginx" {
+
source = {
+
files = [
+
"/var/log/nginx/constellation-access.log"
+
]
+
}
+
+
format = "$remote_addr - $remote_user [$time_local] \"$request\" $status $upstream_cache_status $body_bytes_sent \"$http_referer\" \"$http_user_agent\" \"$http_x_forwarded_for\""
+
+
labels {
+
app = "constellation"
+
}
+
+
relabel "cache_status" {
+
from = "upstream_cache_status"
+
}
+
}
+
```
+
+
after:
+
+
```hcl
+
listen {
+
port = 4044
+
}
+
+
namespace "constellation" {
+
source = {
+
files = [
+
"/var/log/nginx/constellation-access.log"
+
]
+
}
+
+
format = "$remote_addr - $remote_user [$time_local] \"$request\" $status $upstream_cache_status $body_bytes_sent \"$http_referer\" \"$http_user_agent\" \"$http_x_forwarded_for\""
+
+
labels {
+
app = "constellation"
+
}
+
+
relabel "cache_status" {
+
from = "upstream_cache_status"
+
}
+
+
namespace_label = "vhost"
+
metrics_override = { prefix = "nginx" }
+
}
+
+
namespace "ufos" {
+
source = {
+
files = [
+
"/var/log/nginx/ufos-access.log"
+
]
+
}
+
+
format = "$remote_addr - $remote_user [$time_local] \"$request\" $status $upstream_cache_status $body_bytes_sent \"$http_referer\" \"$http_user_agent\" \"$http_x_forwarded_for\""
+
+
labels {
+
app = "ufos"
+
}
+
+
relabel "cache_status" {
+
from = "upstream_cache_status"
+
}
+
+
namespace_label = "vhost"
+
metrics_override = { prefix = "nginx" }
+
}
+
```
+
+
+
```bash
+
systemctl start prometheus-nginxlog-exporter.service
+
```
+
+2
ufos/Cargo.toml
···
jetstream = { path = "../jetstream" }
log = "0.4.26"
lsm-tree = "2.6.6"
schemars = { version = "0.8.22", features = ["raw_value", "chrono"] }
semver = "1.0.26"
serde = "1.0.219"
···
jetstream = { path = "../jetstream" }
log = "0.4.26"
lsm-tree = "2.6.6"
+
metrics = "0.24.2"
+
metrics-exporter-prometheus = { version = "0.17.0", default-features = false, features = ["http-listener"] }
schemars = { version = "0.8.22", features = ["raw_value", "chrono"] }
semver = "1.0.26"
serde = "1.0.219"
+37 -4
ufos/src/consumer.rs
···
DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
JetstreamReceiver,
};
use std::mem;
use std::time::Duration;
use tokio::sync::mpsc::{channel, Receiver, Sender};
···
batch_sender: Sender<LimitedBatch>,
sketch_secret: SketchSecretPrefix,
) -> Self {
let mut rate_limit = tokio::time::interval(std::time::Duration::from_millis(3));
rate_limit.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
Self {
···
// holds up all consumer progress until it can send to the channel
// use this when the current batch is too full to add more to it
async fn send_current_batch_now(&mut self, small: bool, referrer: &str) -> anyhow::Result<()> {
let beginning = match self.current_batch.initial_cursor.map(|c| c.elapsed()) {
None => "unknown".to_string(),
Some(Ok(t)) => format!("{:?}", t),
Some(Err(e)) => format!("+{:?}", e.duration()),
};
-
log::info!(
-
"sending batch now from {beginning}, {}, queue capacity: {}, referrer: {referrer}",
-
if small { "small" } else { "full" },
-
self.batch_sender.capacity(),
);
let current = mem::take(&mut self.current_batch);
self.rate_limit.tick().await;
self.batch_sender
.send_timeout(current.batch, Duration::from_secs_f64(SEND_TIMEOUT_S))
.await?;
Ok(())
}
}
···
DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
JetstreamReceiver,
};
+
use metrics::{
+
counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, Unit,
+
};
use std::mem;
use std::time::Duration;
use tokio::sync::mpsc::{channel, Receiver, Sender};
···
batch_sender: Sender<LimitedBatch>,
sketch_secret: SketchSecretPrefix,
) -> Self {
+
describe_counter!(
+
"batcher_batches_sent",
+
Unit::Count,
+
"how many batches of events were sent from Batcher to storage"
+
);
+
describe_gauge!(
+
"batcher_batch_age",
+
Unit::Microseconds,
+
"how old the last-sent batch was"
+
);
+
describe_gauge!(
+
"batcher_send_queue_capacity",
+
Unit::Count,
+
"how many spaces are available for batches in the send queue"
+
);
+
describe_histogram!(
+
"batcher_total_collections",
+
Unit::Count,
+
"how many collections are in this batch"
+
);
let mut rate_limit = tokio::time::interval(std::time::Duration::from_millis(3));
rate_limit.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
Self {
···
// holds up all consumer progress until it can send to the channel
// use this when the current batch is too full to add more to it
async fn send_current_batch_now(&mut self, small: bool, referrer: &str) -> anyhow::Result<()> {
+
let size_label = if small { "small" } else { "full" };
+
let queue_cap = self.batch_sender.capacity();
+
+
if let Some(cursor) = self.current_batch.initial_cursor {
+
gauge!("batcher_batch_age", "size" => size_label).set(cursor.elapsed_micros_f64());
+
}
+
histogram!("batcher_total_collections", "size" => size_label)
+
.record(self.current_batch.batch.total_collections() as f64);
+
gauge!("batcher_send_queue_capacity").set(queue_cap as f64);
+
let beginning = match self.current_batch.initial_cursor.map(|c| c.elapsed()) {
None => "unknown".to_string(),
Some(Ok(t)) => format!("{:?}", t),
Some(Err(e)) => format!("+{:?}", e.duration()),
};
+
log::trace!(
+
"sending batch now from {beginning}, {size_label}, queue capacity: {queue_cap}, referrer: {referrer}",
);
let current = mem::take(&mut self.current_batch);
self.rate_limit.tick().await;
self.batch_sender
.send_timeout(current.batch, Duration::from_secs_f64(SEND_TIMEOUT_S))
.await?;
+
counter!("batcher_batches_sent", "size" => size_label, "referrer" => referrer.to_string())
+
.increment(1);
Ok(())
}
}
+43 -1
ufos/src/main.rs
···
use clap::Parser;
use jetstream::events::Cursor;
use std::path::PathBuf;
use std::time::{Duration, SystemTime};
use ufos::consumer;
···
let stating = do_update_stuff(read_store);
tokio::select! {
z = serving => log::warn!("serve task ended: {z:?}"),
z = rolling => log::warn!("rollup task ended: {z:?}"),
···
Ok(())
}
async fn do_update_stuff(read_store: impl StoreReader) {
let started_at = std::time::SystemTime::now();
let mut first_cursor = None;
let mut first_rollup = None;
···
started_at: SystemTime,
now: SystemTime,
) {
let nice_dt_two_maybes = |earlier: Option<Cursor>, later: Option<Cursor>| match (earlier, later)
{
(Some(earlier), Some(later)) => match later.duration_since(&earlier) {
···
let rollup_rate = rate(rollup_cursor, last_rollup, dt_real);
let rollup_avg = rate(rollup_cursor, first_rollup, dt_real_total);
-
log::info!(
"cursor: {} behind (→{}, {cursor_rate}x, {cursor_avg}x avg). rollup: {} behind (→{}, {rollup_rate}x, {rollup_avg}x avg).",
latest_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()),
nice_dt_two_maybes(last_cursor, latest_cursor),
···
use clap::Parser;
use jetstream::events::Cursor;
+
use metrics::{describe_gauge, gauge, Unit};
+
use metrics_exporter_prometheus::PrometheusBuilder;
use std::path::PathBuf;
use std::time::{Duration, SystemTime};
use ufos::consumer;
···
let stating = do_update_stuff(read_store);
+
install_metrics_server()?;
+
tokio::select! {
z = serving => log::warn!("serve task ended: {z:?}"),
z = rolling => log::warn!("rollup task ended: {z:?}"),
···
Ok(())
}
+
fn install_metrics_server() -> anyhow::Result<()> {
+
log::info!("installing metrics server...");
+
let host = [0, 0, 0, 0];
+
let port = 8765;
+
PrometheusBuilder::new()
+
.set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
+
.set_bucket_duration(Duration::from_secs(60))?
+
.set_bucket_count(std::num::NonZero::new(10).unwrap()) // count * duration = 10 mins. stuff doesn't happen that fast here.
+
.set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
+
.with_http_listener((host, port))
+
.install()?;
+
log::info!(
+
"metrics server installed! listening on http://{}.{}.{}.{}:{port}",
+
host[0],
+
host[1],
+
host[2],
+
host[3]
+
);
+
Ok(())
+
}
+
async fn do_update_stuff(read_store: impl StoreReader) {
+
describe_gauge!(
+
"persisted_cursor_age",
+
Unit::Microseconds,
+
"microseconds between our clock and the latest persisted event's cursor"
+
);
+
describe_gauge!(
+
"rollup_cursor_age",
+
Unit::Microseconds,
+
"microseconds between our clock and the latest rollup cursor"
+
);
let started_at = std::time::SystemTime::now();
let mut first_cursor = None;
let mut first_rollup = None;
···
started_at: SystemTime,
now: SystemTime,
) {
+
if let Some(cursor) = latest_cursor {
+
gauge!("persisted_cursor_age").set(cursor.elapsed_micros_f64());
+
}
+
if let Some(cursor) = rollup_cursor {
+
gauge!("rollup_cursor_age").set(cursor.elapsed_micros_f64());
+
}
+
let nice_dt_two_maybes = |earlier: Option<Cursor>, later: Option<Cursor>| match (earlier, later)
{
(Some(earlier), Some(later)) => match later.duration_since(&earlier) {
···
let rollup_rate = rate(rollup_cursor, last_rollup, dt_real);
let rollup_avg = rate(rollup_cursor, first_rollup, dt_real_total);
+
log::trace!(
"cursor: {} behind (→{}, {cursor_rate}x, {cursor_avg}x avg). rollup: {} behind (→{}, {rollup_rate}x, {rollup_avg}x avg).",
latest_cursor.map(|c| c.elapsed().map(nice_duration).unwrap_or("++".to_string())).unwrap_or("?".to_string()),
nice_dt_two_maybes(last_cursor, latest_cursor),
+41 -21
ufos/src/storage_fjall.rs
···
Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle, Snapshot,
};
use jetstream::events::Cursor;
use std::collections::{HashMap, HashSet};
use std::iter::Peekable;
use std::ops::Bound;
···
impl StoreWriter<FjallBackground> for FjallWriter {
fn background_tasks(&mut self, reroll: bool) -> StorageResult<FjallBackground> {
if self.bg_taken.swap(true, Ordering::SeqCst) {
-
Err(StorageError::BackgroundAlreadyStarted)
-
} else {
-
if reroll {
-
log::info!("reroll: resetting rollup cursor...");
-
insert_static_neu::<NewRollupCursorKey>(&self.global, Cursor::from_start())?;
-
log::info!("reroll: clearing trim cursors...");
-
let mut batch = self.keyspace.batch();
-
for kv in self
-
.global
-
.prefix(TrimCollectionCursorKey::from_prefix_to_db_bytes(
-
&Default::default(),
-
)?)
-
{
-
let (k, _) = kv?;
-
batch.remove(&self.global, k);
-
}
-
let n = batch.len();
-
batch.commit()?;
-
log::info!("reroll: cleared {n} trim cursors.");
}
-
Ok(FjallBackground(self.clone()))
}
}
fn insert_batch<const LIMIT: usize>(
···
break;
}
}
for c in completed {
dirty_nsids.remove(&c);
}
-
log::info!("finished trimming {n} nsids in {:?}: {total_danglers} dangling and {total_deleted} total removed.", t0.elapsed());
},
};
}
···
Batch as FjallBatch, Config, Keyspace, PartitionCreateOptions, PartitionHandle, Snapshot,
};
use jetstream::events::Cursor;
+
use metrics::{counter, describe_counter, describe_histogram, histogram, Unit};
use std::collections::{HashMap, HashSet};
use std::iter::Peekable;
use std::ops::Bound;
···
impl StoreWriter<FjallBackground> for FjallWriter {
fn background_tasks(&mut self, reroll: bool) -> StorageResult<FjallBackground> {
if self.bg_taken.swap(true, Ordering::SeqCst) {
+
return Err(StorageError::BackgroundAlreadyStarted);
+
}
+
describe_histogram!(
+
"storage_trim_dirty_nsids",
+
Unit::Count,
+
"number of NSIDs trimmed"
+
);
+
describe_histogram!(
+
"storage_trim_duration",
+
Unit::Microseconds,
+
"how long it took to trim the dirty NSIDs"
+
);
+
describe_counter!(
+
"storage_trim_removed",
+
Unit::Count,
+
"how many records were removed during trim"
+
);
+
if reroll {
+
log::info!("reroll: resetting rollup cursor...");
+
insert_static_neu::<NewRollupCursorKey>(&self.global, Cursor::from_start())?;
+
log::info!("reroll: clearing trim cursors...");
+
let mut batch = self.keyspace.batch();
+
for kv in self
+
.global
+
.prefix(TrimCollectionCursorKey::from_prefix_to_db_bytes(
+
&Default::default(),
+
)?)
+
{
+
let (k, _) = kv?;
+
batch.remove(&self.global, k);
}
+
let n = batch.len();
+
batch.commit()?;
+
log::info!("reroll: cleared {n} trim cursors.");
}
+
Ok(FjallBackground(self.clone()))
}
fn insert_batch<const LIMIT: usize>(
···
break;
}
}
+
let dt = t0.elapsed();
+
log::trace!("finished trimming {n} nsids in {:?}: {total_danglers} dangling and {total_deleted} total removed.", dt);
+
histogram!("storage_trim_dirty_nsids").record(completed.len() as f64);
+
histogram!("storage_trim_duration").record(dt.as_micros() as f64);
+
counter!("storage_trim_removed", "dangling" => "true").increment(total_danglers as u64);
+
counter!("storage_trim_removed", "dangling" => "false").increment((total_deleted - total_danglers) as u64);
for c in completed {
dirty_nsids.remove(&c);
}
},
};
}