relay filter/appview bootstrap
at main 3.8 kB view raw
1use std::sync::Arc; 2 3use axum::Router; 4use futures::FutureExt; 5use sqlx::postgres::PgPoolOptions; 6use tokio::sync::broadcast; 7use tower_http::compression::CompressionLayer; 8use tower_http::cors::{Any, CorsLayer}; 9use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; 10 11use prism::config::Config; 12use prism::db::Database; 13use prism::ws::Broadcaster; 14use prism::AppState; 15 16#[tokio::main] 17async fn main() -> anyhow::Result<()> { 18 tracing_subscriber::registry() 19 .with( 20 tracing_subscriber::EnvFilter::try_from_default_env() 21 .unwrap_or_else(|_| "prism=debug,tower_http=debug".into()), 22 ) 23 .with(tracing_subscriber::fmt::layer()) 24 .init(); 25 26 let config = Config::from_env(); 27 tracing::info!("Starting Prism with config: {:?}", config); 28 29 let pool = PgPoolOptions::new() 30 .max_connections(10) 31 .connect(&config.database_url) 32 .await?; 33 34 tracing::info!("Connected to database"); 35 36 sqlx::migrate!("./migrations").run(&pool).await?; 37 tracing::info!("Migrations complete"); 38 39 let (tx, _rx) = broadcast::channel::<String>(1024); 40 let broadcaster = Broadcaster::new(tx.clone()); 41 42 let state = Arc::new(AppState { 43 db: Database::new(pool.clone()), 44 broadcaster, 45 }); 46 47 let tap_state = state.clone(); 48 let tap_url = config.tap_ws_url.clone(); 49 let tap_tx = tx.clone(); 50 tokio::spawn(async move { 51 loop { 52 let result = std::panic::AssertUnwindSafe(prism::tap::consumer::run( 53 tap_url.clone(), 54 tap_state.clone(), 55 tap_tx.clone(), 56 )); 57 58 if let Err(e) = result.catch_unwind().await { 59 let panic_msg = if let Some(s) = e.downcast_ref::<&str>() { 60 s.to_string() 61 } else if let Some(s) = e.downcast_ref::<String>() { 62 s.clone() 63 } else { 64 "Unknown panic".to_string() 65 }; 66 tracing::error!(panic = %panic_msg, "TAP consumer panicked, restarting in 5 seconds..."); 67 tokio::time::sleep(std::time::Duration::from_secs(5)).await; 68 } else { 69 // run() returned normally (shouldn't happen as it loops forever) 70 tracing::warn!("TAP consumer exited unexpectedly, restarting..."); 71 } 72 } 73 }); 74 75 let app = Router::new() 76 .merge(prism::api::router()) 77 .merge(prism::ws::router()) 78 .layer(CompressionLayer::new()) 79 .layer( 80 CorsLayer::new() 81 .allow_origin(Any) 82 .allow_methods(Any) 83 .allow_headers(Any), 84 ) 85 .with_state(state); 86 87 let listener = tokio::net::TcpListener::bind(format!("{}:{}", config.host, config.port)).await?; 88 tracing::info!("Listening on {}:{}", config.host, config.port); 89 90 axum::serve(listener, app) 91 .with_graceful_shutdown(shutdown_signal()) 92 .await?; 93 94 tracing::info!("Shutdown complete"); 95 Ok(()) 96} 97 98async fn shutdown_signal() { 99 let ctrl_c = async { 100 tokio::signal::ctrl_c() 101 .await 102 .expect("Failed to install Ctrl+C handler"); 103 }; 104 105 #[cfg(unix)] 106 let terminate = async { 107 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) 108 .expect("Failed to install SIGTERM handler") 109 .recv() 110 .await; 111 }; 112 113 #[cfg(not(unix))] 114 let terminate = std::future::pending::<()>(); 115 116 tokio::select! { 117 _ = ctrl_c => { 118 tracing::info!("Received Ctrl+C, shutting down gracefully..."); 119 }, 120 _ = terminate => { 121 tracing::info!("Received SIGTERM, shutting down gracefully..."); 122 }, 123 } 124}