use std::sync::Arc; use axum::Router; use futures::FutureExt; use sqlx::postgres::PgPoolOptions; use tokio::sync::broadcast; use tower_http::compression::CompressionLayer; use tower_http::cors::{Any, CorsLayer}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use prism::config::Config; use prism::db::Database; use prism::ws::Broadcaster; use prism::AppState; #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| "prism=debug,tower_http=debug".into()), ) .with(tracing_subscriber::fmt::layer()) .init(); let config = Config::from_env(); tracing::info!("Starting Prism with config: {:?}", config); let pool = PgPoolOptions::new() .max_connections(10) .connect(&config.database_url) .await?; tracing::info!("Connected to database"); sqlx::migrate!("./migrations").run(&pool).await?; tracing::info!("Migrations complete"); let (tx, _rx) = broadcast::channel::(1024); let broadcaster = Broadcaster::new(tx.clone()); let state = Arc::new(AppState { db: Database::new(pool.clone()), broadcaster, }); let tap_state = state.clone(); let tap_url = config.tap_ws_url.clone(); let tap_tx = tx.clone(); tokio::spawn(async move { loop { let result = std::panic::AssertUnwindSafe(prism::tap::consumer::run( tap_url.clone(), tap_state.clone(), tap_tx.clone(), )); if let Err(e) = result.catch_unwind().await { let panic_msg = if let Some(s) = e.downcast_ref::<&str>() { s.to_string() } else if let Some(s) = e.downcast_ref::() { s.clone() } else { "Unknown panic".to_string() }; tracing::error!(panic = %panic_msg, "TAP consumer panicked, restarting in 5 seconds..."); tokio::time::sleep(std::time::Duration::from_secs(5)).await; } else { // run() returned normally (shouldn't happen as it loops forever) tracing::warn!("TAP consumer exited unexpectedly, restarting..."); } } }); let app = Router::new() .merge(prism::api::router()) .merge(prism::ws::router()) .layer(CompressionLayer::new()) .layer( CorsLayer::new() .allow_origin(Any) .allow_methods(Any) .allow_headers(Any), ) .with_state(state); let listener = tokio::net::TcpListener::bind(format!("{}:{}", config.host, config.port)).await?; tracing::info!("Listening on {}:{}", config.host, config.port); axum::serve(listener, app) .with_graceful_shutdown(shutdown_signal()) .await?; tracing::info!("Shutdown complete"); Ok(()) } async fn shutdown_signal() { let ctrl_c = async { tokio::signal::ctrl_c() .await .expect("Failed to install Ctrl+C handler"); }; #[cfg(unix)] let terminate = async { tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) .expect("Failed to install SIGTERM handler") .recv() .await; }; #[cfg(not(unix))] let terminate = std::future::pending::<()>(); tokio::select! { _ = ctrl_c => { tracing::info!("Received Ctrl+C, shutting down gracefully..."); }, _ = terminate => { tracing::info!("Received SIGTERM, shutting down gracefully..."); }, } }