From 30992fa9126ec97335a32975ee73ff3ba7738588 Mon Sep 17 00:00:00 2001 From: phil Date: Fri, 31 Oct 2025 12:52:57 -0400 Subject: [PATCH] add tracing opentelemetry exporter for web reqs --- Cargo.lock | 162 +++++++++++++++++++++++++++++++++ Cargo.toml | 5 + src/bin/allegedly.rs | 28 +++++- src/bin/backfill.rs | 8 +- src/bin/instrumentation/mod.rs | 28 ++++++ src/bin/mirror.rs | 9 +- src/bin/mod.rs | 38 ++++++++ src/lib.rs | 13 --- 8 files changed, 269 insertions(+), 22 deletions(-) create mode 100644 src/bin/instrumentation/mod.rs diff --git a/Cargo.lock b/Cargo.lock index e1a121b..c8ba013 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -39,6 +39,9 @@ dependencies = [ "http-body-util", "log", "native-tls", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "poem", "postgres-native-tls", "reqwest", @@ -52,6 +55,8 @@ dependencies = [ "tokio-postgres", "tokio-stream", "tokio-util", + "tracing", + "tracing-opentelemetry", "tracing-subscriber", ] @@ -1583,6 +1588,80 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.16", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" +dependencies = [ + "async-trait", + "bytes", + "http", + "opentelemetry", + "reqwest", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" +dependencies = [ + "http", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest", + "thiserror 2.0.16", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "rand 0.9.2", + "serde_json", + "thiserror 2.0.16", + "tokio", + "tokio-stream", +] + [[package]] name = "parking_lot" version = "0.11.2" @@ -1665,6 +1744,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -1839,6 +1938,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "quanta" version = "0.12.6" @@ -2061,6 +2183,7 @@ dependencies = [ "base64", "bytes", "encoding_rs", + "futures-channel", "futures-core", "futures-util", "h2", @@ -2802,6 +2925,27 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" +dependencies = [ + "async-trait", + "base64", + "bytes", + "http", + "http-body", + "http-body-util", + "percent-encoding", + "pin-project", + "prost", + "tokio-stream", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.2" @@ -2890,6 +3034,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-subscriber" version = "0.3.20" diff --git a/Cargo.toml b/Cargo.toml index ef2a063..277bc81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,9 @@ governor = "0.10.1" http-body-util = "0.1.3" log = "0.4.28" native-tls = "0.2.14" +opentelemetry = "0.30.0" +opentelemetry-otlp = { version = "0.30.0" } +opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] } poem = { version = "3.1.12", features = ["acme", "compression"] } postgres-native-tls = "0.5.1" reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] } @@ -29,4 +32,6 @@ tokio = { version = "1.47.1", features = ["full"] } tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] } tokio-stream = { version = "0.1.17", features = ["io-util"] } tokio-util = { version = "0.7.16", features = ["compat"] } +tracing = "0.1.41" +tracing-opentelemetry = "0.31.0" tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } diff --git a/src/bin/allegedly.rs b/src/bin/allegedly.rs index 1c9eed7..7989231 100644 --- a/src/bin/allegedly.rs +++ b/src/bin/allegedly.rs @@ -1,4 +1,5 @@ -use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream}; +use allegedly::bin::{GlobalArgs, InstrumentationArgs, bin_init}; +use allegedly::{Dt, logo, pages_to_stdout, pages_to_weeks, poll_upstream}; use clap::{CommandFactory, Parser, Subcommand}; use std::{path::PathBuf, time::Duration, time::Instant}; use tokio::fs::create_dir_all; @@ -48,11 +49,15 @@ enum Commands { Mirror { #[command(flatten)] args: mirror::Args, + #[command(flatten)] + instrumentation: InstrumentationArgs, }, /// Wrap any did-method-plc server, without syncing upstream (read-only) Wrap { #[command(flatten)] args: mirror::Args, + #[command(flatten)] + instrumentation: InstrumentationArgs, }, /// Poll an upstream PLC server and log new ops to stdout Tail { @@ -62,12 +67,27 @@ enum Commands { }, } +impl Commands { + fn enable_otel(&self) -> bool { + match self { + Commands::Mirror { + instrumentation, .. + } + | Commands::Wrap { + instrumentation, .. + } => instrumentation.enable_opentelemetry, + _ => false, + } + } +} + #[tokio::main] async fn main() -> anyhow::Result<()> { let args = Cli::parse(); let matches = Cli::command().get_matches(); let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???"); - bin_init(name); + bin_init(args.command.enable_otel()); + log::info!("{}", logo(name)); let globals = args.globals.clone(); @@ -96,8 +116,8 @@ async fn main() -> anyhow::Result<()> { .await .expect("to write bundles to output files"); } - Commands::Mirror { args } => mirror::run(globals, args, true).await?, - Commands::Wrap { args } => mirror::run(globals, args, false).await?, + Commands::Mirror { args, .. } => mirror::run(globals, args, true).await?, + Commands::Wrap { args, .. } => mirror::run(globals, args, false).await?, Commands::Tail { after } => { let mut url = globals.upstream; url.set_path("/export"); diff --git a/src/bin/backfill.rs b/src/bin/backfill.rs index 36256cb..1328f71 100644 --- a/src/bin/backfill.rs +++ b/src/bin/backfill.rs @@ -1,6 +1,7 @@ use allegedly::{ - Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs, - bin_init, full_pages, pages_to_pg, pages_to_stdout, poll_upstream, + Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, + bin::{GlobalArgs, bin_init}, + full_pages, logo, pages_to_pg, pages_to_stdout, poll_upstream, }; use clap::Parser; use reqwest::Url; @@ -199,7 +200,8 @@ struct CliArgs { #[tokio::main] async fn main() -> anyhow::Result<()> { let args = CliArgs::parse(); - bin_init("backfill"); + bin_init(false); + log::info!("{}", logo("backfill")); run(args.globals, args.args).await?; Ok(()) } diff --git a/src/bin/instrumentation/mod.rs b/src/bin/instrumentation/mod.rs new file mode 100644 index 0000000..1f8af0e --- /dev/null +++ b/src/bin/instrumentation/mod.rs @@ -0,0 +1,28 @@ +use opentelemetry::trace::TracerProvider as _; +use opentelemetry_otlp::{Protocol, WithExportConfig}; +use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracer, SdkTracerProvider}; +use tracing::Subscriber; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::registry::LookupSpan; + +pub fn otel_layer() -> OpenTelemetryLayer +where + S: Subscriber + for<'span> LookupSpan<'span>, +{ + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_http() + .with_protocol(Protocol::HttpBinary) + .build() + .expect("to build otel otlp exporter"); + + let provider = SdkTracerProvider::builder() + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( + 1.0, + )))) + .with_id_generator(RandomIdGenerator::default()) + .with_batch_exporter(exporter) + .build(); + + let tracer = provider.tracer("tracing-otel-subscriber"); + tracing_opentelemetry::layer().with_tracer(tracer) +} diff --git a/src/bin/mirror.rs b/src/bin/mirror.rs index adb5f59..a383104 100644 --- a/src/bin/mirror.rs +++ b/src/bin/mirror.rs @@ -1,5 +1,7 @@ use allegedly::{ - Db, ExperimentalConf, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve, + Db, ExperimentalConf, ListenConf, + bin::{GlobalArgs, InstrumentationArgs, bin_init}, + logo, pages_to_pg, poll_upstream, serve, }; use clap::Parser; use reqwest::Url; @@ -166,6 +168,8 @@ struct CliArgs { #[command(flatten)] globals: GlobalArgs, #[command(flatten)] + instrumentation: InstrumentationArgs, + #[command(flatten)] args: Args, /// Run the mirror in wrap mode, no upstream synchronization (read-only) #[arg(long, action)] @@ -176,7 +180,8 @@ struct CliArgs { #[tokio::main] async fn main() -> anyhow::Result<()> { let args = CliArgs::parse(); - bin_init("mirror"); + bin_init(args.instrumentation.enable_opentelemetry); + log::info!("{}", logo("mirror")); run(args.globals, args.args, !args.wrap_mode).await?; Ok(()) } diff --git a/src/bin/mod.rs b/src/bin/mod.rs index 1ad965d..b770c67 100644 --- a/src/bin/mod.rs +++ b/src/bin/mod.rs @@ -1,4 +1,7 @@ +mod instrumentation; + use reqwest::Url; +use tracing_subscriber::layer::SubscriberExt; #[derive(Debug, Clone, clap::Args)] pub struct GlobalArgs { @@ -14,6 +17,41 @@ pub struct GlobalArgs { pub upstream_throttle_ms: u64, } +#[derive(Debug, Default, Clone, clap::Args)] +pub struct InstrumentationArgs { + /// Export traces to an OTLP collector + /// + /// Configure the colletctor via standard env vars: + /// - `OTEL_EXPORTER_OTLP_ENDPOINT` eg "https://api.honeycomb.io/" + /// - `OTEL_EXPORTER_OTLP_HEADERS` eg "x-honeycomb-team=supersecret" + /// - `OTEL_SERVICE_NAME` eg "my-app" + #[arg(long, action, global = true, env = "ALLEGEDLY_ENABLE_OTEL")] + pub enable_opentelemetry: bool, +} + +pub fn bin_init(enable_otlp: bool) { + let filter = tracing_subscriber::EnvFilter::builder() + .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) + .from_env_lossy(); + + let stderr_log = tracing_subscriber::fmt::layer() + .with_writer(std::io::stderr) + .pretty(); + + let otel = if enable_otlp { + Some(instrumentation::otel_layer()) + } else { + None + }; + + let subscriber = tracing_subscriber::Registry::default() + .with(filter) + .with(stderr_log) + .with(otel); + + tracing::subscriber::set_global_default(subscriber).expect("to set global tracing subscriber"); +} + #[allow(dead_code)] fn main() { panic!("this is not actually a module") diff --git a/src/lib.rs b/src/lib.rs index 72a3098..8d304f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -145,16 +145,3 @@ pub fn logo(name: &str) -> String { env!("CARGO_PKG_VERSION"), ) } - -pub fn bin_init(name: &str) { - if std::env::var_os("RUST_LOG").is_none() { - unsafe { std::env::set_var("RUST_LOG", "info") }; - } - let filter = tracing_subscriber::EnvFilter::from_default_env(); - tracing_subscriber::fmt() - .with_writer(std::io::stderr) - .with_env_filter(filter) - .init(); - - log::info!("{}", logo(name)); -} -- 2.43.0