Opentelemetry tracing #1

merged
opened by bad-example.com targeting main from otel-tracing

Add tracing middleware and optionally configure an http-binary opentelemetry exporter (eg. for honeycomb.io)

Changed files
+269 -22
src
+162
Cargo.lock
···
"http-body-util",
"log",
"native-tls",
+
"opentelemetry",
+
"opentelemetry-otlp",
+
"opentelemetry_sdk",
"poem",
"postgres-native-tls",
"reqwest",
···
"tokio-postgres",
"tokio-stream",
"tokio-util",
+
"tracing",
+
"tracing-opentelemetry",
"tracing-subscriber",
]
···
"vcpkg",
+
[[package]]
+
name = "opentelemetry"
+
version = "0.30.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6"
+
dependencies = [
+
"futures-core",
+
"futures-sink",
+
"js-sys",
+
"pin-project-lite",
+
"thiserror 2.0.16",
+
"tracing",
+
]
+
+
[[package]]
+
name = "opentelemetry-http"
+
version = "0.30.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d"
+
dependencies = [
+
"async-trait",
+
"bytes",
+
"http",
+
"opentelemetry",
+
"reqwest",
+
]
+
+
[[package]]
+
name = "opentelemetry-otlp"
+
version = "0.30.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b"
+
dependencies = [
+
"http",
+
"opentelemetry",
+
"opentelemetry-http",
+
"opentelemetry-proto",
+
"opentelemetry_sdk",
+
"prost",
+
"reqwest",
+
"thiserror 2.0.16",
+
"tracing",
+
]
+
+
[[package]]
+
name = "opentelemetry-proto"
+
version = "0.30.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc"
+
dependencies = [
+
"opentelemetry",
+
"opentelemetry_sdk",
+
"prost",
+
"tonic",
+
]
+
+
[[package]]
+
name = "opentelemetry_sdk"
+
version = "0.30.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b"
+
dependencies = [
+
"futures-channel",
+
"futures-executor",
+
"futures-util",
+
"opentelemetry",
+
"percent-encoding",
+
"rand 0.9.2",
+
"serde_json",
+
"thiserror 2.0.16",
+
"tokio",
+
"tokio-stream",
+
]
+
[[package]]
name = "parking_lot"
version = "0.11.2"
···
"siphasher",
+
[[package]]
+
name = "pin-project"
+
version = "1.1.10"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
+
dependencies = [
+
"pin-project-internal",
+
]
+
+
[[package]]
+
name = "pin-project-internal"
+
version = "1.1.10"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
+
dependencies = [
+
"proc-macro2",
+
"quote",
+
"syn",
+
]
+
[[package]]
name = "pin-project-lite"
version = "0.2.16"
···
"unicode-ident",
+
[[package]]
+
name = "prost"
+
version = "0.13.5"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
+
dependencies = [
+
"bytes",
+
"prost-derive",
+
]
+
+
[[package]]
+
name = "prost-derive"
+
version = "0.13.5"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
+
dependencies = [
+
"anyhow",
+
"itertools",
+
"proc-macro2",
+
"quote",
+
"syn",
+
]
+
[[package]]
name = "quanta"
version = "0.12.6"
···
"base64",
"bytes",
"encoding_rs",
+
"futures-channel",
"futures-core",
"futures-util",
"h2",
···
"winnow",
+
[[package]]
+
name = "tonic"
+
version = "0.13.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
+
dependencies = [
+
"async-trait",
+
"base64",
+
"bytes",
+
"http",
+
"http-body",
+
"http-body-util",
+
"percent-encoding",
+
"pin-project",
+
"prost",
+
"tokio-stream",
+
"tower-layer",
+
"tower-service",
+
"tracing",
+
]
+
[[package]]
name = "tower"
version = "0.5.2"
···
"tracing-core",
+
[[package]]
+
name = "tracing-opentelemetry"
+
version = "0.31.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c"
+
dependencies = [
+
"js-sys",
+
"once_cell",
+
"opentelemetry",
+
"opentelemetry_sdk",
+
"smallvec",
+
"tracing",
+
"tracing-core",
+
"tracing-log",
+
"tracing-subscriber",
+
"web-time",
+
]
+
[[package]]
name = "tracing-subscriber"
version = "0.3.20"
+5
Cargo.toml
···
http-body-util = "0.1.3"
log = "0.4.28"
native-tls = "0.2.14"
+
opentelemetry = "0.30.0"
+
opentelemetry-otlp = { version = "0.30.0" }
+
opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] }
poem = { version = "3.1.12", features = ["acme", "compression"] }
postgres-native-tls = "0.5.1"
reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] }
···
tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] }
tokio-stream = { version = "0.1.17", features = ["io-util"] }
tokio-util = { version = "0.7.16", features = ["compat"] }
+
tracing = "0.1.41"
+
tracing-opentelemetry = "0.31.0"
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
+24 -4
src/bin/allegedly.rs
···
-
use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream};
+
use allegedly::bin::{GlobalArgs, InstrumentationArgs, bin_init};
+
use allegedly::{Dt, logo, pages_to_stdout, pages_to_weeks, poll_upstream};
use clap::{CommandFactory, Parser, Subcommand};
use std::{path::PathBuf, time::Duration, time::Instant};
use tokio::fs::create_dir_all;
···
Mirror {
#[command(flatten)]
args: mirror::Args,
+
#[command(flatten)]
+
instrumentation: InstrumentationArgs,
},
/// Wrap any did-method-plc server, without syncing upstream (read-only)
Wrap {
#[command(flatten)]
args: mirror::Args,
+
#[command(flatten)]
+
instrumentation: InstrumentationArgs,
},
/// Poll an upstream PLC server and log new ops to stdout
Tail {
···
},
}
+
impl Commands {
+
fn enable_otel(&self) -> bool {
+
match self {
+
Commands::Mirror {
+
instrumentation, ..
+
}
+
| Commands::Wrap {
+
instrumentation, ..
+
} => instrumentation.enable_opentelemetry,
+
_ => false,
+
}
+
}
+
}
+
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Cli::parse();
let matches = Cli::command().get_matches();
let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???");
-
bin_init(name);
+
bin_init(args.command.enable_otel());
+
log::info!("{}", logo(name));
let globals = args.globals.clone();
···
.await
.expect("to write bundles to output files");
}
-
Commands::Mirror { args } => mirror::run(globals, args, true).await?,
-
Commands::Wrap { args } => mirror::run(globals, args, false).await?,
+
Commands::Mirror { args, .. } => mirror::run(globals, args, true).await?,
+
Commands::Wrap { args, .. } => mirror::run(globals, args, false).await?,
Commands::Tail { after } => {
let mut url = globals.upstream;
url.set_path("/export");
+5 -3
src/bin/backfill.rs
···
use allegedly::{
-
Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs,
-
bin_init, full_pages, pages_to_pg, pages_to_stdout, poll_upstream,
+
Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg,
+
bin::{GlobalArgs, bin_init},
+
full_pages, logo, pages_to_pg, pages_to_stdout, poll_upstream,
};
use clap::Parser;
use reqwest::Url;
···
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = CliArgs::parse();
-
bin_init("backfill");
+
bin_init(false);
+
log::info!("{}", logo("backfill"));
run(args.globals, args.args).await?;
Ok(())
}
+28
src/bin/instrumentation/mod.rs
···
+
use opentelemetry::trace::TracerProvider as _;
+
use opentelemetry_otlp::{Protocol, WithExportConfig};
+
use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracer, SdkTracerProvider};
+
use tracing::Subscriber;
+
use tracing_opentelemetry::OpenTelemetryLayer;
+
use tracing_subscriber::registry::LookupSpan;
+
+
pub fn otel_layer<S>() -> OpenTelemetryLayer<S, SdkTracer>
+
where
+
S: Subscriber + for<'span> LookupSpan<'span>,
+
{
+
let exporter = opentelemetry_otlp::SpanExporter::builder()
+
.with_http()
+
.with_protocol(Protocol::HttpBinary)
+
.build()
+
.expect("to build otel otlp exporter");
+
+
let provider = SdkTracerProvider::builder()
+
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
+
1.0,
+
))))
+
.with_id_generator(RandomIdGenerator::default())
+
.with_batch_exporter(exporter)
+
.build();
+
+
let tracer = provider.tracer("tracing-otel-subscriber");
+
tracing_opentelemetry::layer().with_tracer(tracer)
+
}
+7 -2
src/bin/mirror.rs
···
use allegedly::{
-
Db, ExperimentalConf, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve,
+
Db, ExperimentalConf, ListenConf,
+
bin::{GlobalArgs, InstrumentationArgs, bin_init},
+
logo, pages_to_pg, poll_upstream, serve,
};
use clap::Parser;
use reqwest::Url;
···
#[command(flatten)]
globals: GlobalArgs,
#[command(flatten)]
+
instrumentation: InstrumentationArgs,
+
#[command(flatten)]
args: Args,
/// Run the mirror in wrap mode, no upstream synchronization (read-only)
#[arg(long, action)]
···
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = CliArgs::parse();
-
bin_init("mirror");
+
bin_init(args.instrumentation.enable_opentelemetry);
+
log::info!("{}", logo("mirror"));
run(args.globals, args.args, !args.wrap_mode).await?;
Ok(())
}
+38
src/bin/mod.rs
···
+
mod instrumentation;
+
use reqwest::Url;
+
use tracing_subscriber::layer::SubscriberExt;
#[derive(Debug, Clone, clap::Args)]
pub struct GlobalArgs {
···
pub upstream_throttle_ms: u64,
}
+
#[derive(Debug, Default, Clone, clap::Args)]
+
pub struct InstrumentationArgs {
+
/// Export traces to an OTLP collector
+
///
+
/// Configure the colletctor via standard env vars:
+
/// - `OTEL_EXPORTER_OTLP_ENDPOINT` eg "https://api.honeycomb.io/"
+
/// - `OTEL_EXPORTER_OTLP_HEADERS` eg "x-honeycomb-team=supersecret"
+
/// - `OTEL_SERVICE_NAME` eg "my-app"
+
#[arg(long, action, global = true, env = "ALLEGEDLY_ENABLE_OTEL")]
+
pub enable_opentelemetry: bool,
+
}
+
+
pub fn bin_init(enable_otlp: bool) {
+
let filter = tracing_subscriber::EnvFilter::builder()
+
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
+
.from_env_lossy();
+
+
let stderr_log = tracing_subscriber::fmt::layer()
+
.with_writer(std::io::stderr)
+
.pretty();
+
+
let otel = if enable_otlp {
+
Some(instrumentation::otel_layer())
+
} else {
+
None
+
};
+
+
let subscriber = tracing_subscriber::Registry::default()
+
.with(filter)
+
.with(stderr_log)
+
.with(otel);
+
+
tracing::subscriber::set_global_default(subscriber).expect("to set global tracing subscriber");
+
}
+
#[allow(dead_code)]
fn main() {
panic!("this is not actually a module")
-13
src/lib.rs
···
env!("CARGO_PKG_VERSION"),
)
}
-
-
pub fn bin_init(name: &str) {
-
if std::env::var_os("RUST_LOG").is_none() {
-
unsafe { std::env::set_var("RUST_LOG", "info") };
-
}
-
let filter = tracing_subscriber::EnvFilter::from_default_env();
-
tracing_subscriber::fmt()
-
.with_writer(std::io::stderr)
-
.with_env_filter(filter)
-
.init();
-
-
log::info!("{}", logo(name));
-
}