relay filter/appview bootstrap
1use std::sync::Arc;
2
3use axum::Router;
4use futures::FutureExt;
5use sqlx::postgres::PgPoolOptions;
6use tokio::sync::broadcast;
7use tower_http::compression::CompressionLayer;
8use tower_http::cors::{Any, CorsLayer};
9use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
10
11use prism::config::Config;
12use prism::db::Database;
13use prism::ws::Broadcaster;
14use prism::AppState;
15
16#[tokio::main]
17async fn main() -> anyhow::Result<()> {
18 tracing_subscriber::registry()
19 .with(
20 tracing_subscriber::EnvFilter::try_from_default_env()
21 .unwrap_or_else(|_| "prism=debug,tower_http=debug".into()),
22 )
23 .with(tracing_subscriber::fmt::layer())
24 .init();
25
26 let config = Config::from_env();
27 tracing::info!("Starting Prism with config: {:?}", config);
28
29 let pool = PgPoolOptions::new()
30 .max_connections(10)
31 .connect(&config.database_url)
32 .await?;
33
34 tracing::info!("Connected to database");
35
36 sqlx::migrate!("./migrations").run(&pool).await?;
37 tracing::info!("Migrations complete");
38
39 let (tx, _rx) = broadcast::channel::<String>(1024);
40 let broadcaster = Broadcaster::new(tx.clone());
41
42 let state = Arc::new(AppState {
43 db: Database::new(pool.clone()),
44 broadcaster,
45 });
46
47 let tap_state = state.clone();
48 let tap_url = config.tap_ws_url.clone();
49 let tap_tx = tx.clone();
50 tokio::spawn(async move {
51 loop {
52 let result = std::panic::AssertUnwindSafe(prism::tap::consumer::run(
53 tap_url.clone(),
54 tap_state.clone(),
55 tap_tx.clone(),
56 ));
57
58 if let Err(e) = result.catch_unwind().await {
59 let panic_msg = if let Some(s) = e.downcast_ref::<&str>() {
60 s.to_string()
61 } else if let Some(s) = e.downcast_ref::<String>() {
62 s.clone()
63 } else {
64 "Unknown panic".to_string()
65 };
66 tracing::error!(panic = %panic_msg, "TAP consumer panicked, restarting in 5 seconds...");
67 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
68 } else {
69 // run() returned normally (shouldn't happen as it loops forever)
70 tracing::warn!("TAP consumer exited unexpectedly, restarting...");
71 }
72 }
73 });
74
75 let app = Router::new()
76 .merge(prism::api::router())
77 .merge(prism::ws::router())
78 .layer(CompressionLayer::new())
79 .layer(
80 CorsLayer::new()
81 .allow_origin(Any)
82 .allow_methods(Any)
83 .allow_headers(Any),
84 )
85 .with_state(state);
86
87 let listener = tokio::net::TcpListener::bind(format!("{}:{}", config.host, config.port)).await?;
88 tracing::info!("Listening on {}:{}", config.host, config.port);
89
90 axum::serve(listener, app)
91 .with_graceful_shutdown(shutdown_signal())
92 .await?;
93
94 tracing::info!("Shutdown complete");
95 Ok(())
96}
97
98async fn shutdown_signal() {
99 let ctrl_c = async {
100 tokio::signal::ctrl_c()
101 .await
102 .expect("Failed to install Ctrl+C handler");
103 };
104
105 #[cfg(unix)]
106 let terminate = async {
107 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
108 .expect("Failed to install SIGTERM handler")
109 .recv()
110 .await;
111 };
112
113 #[cfg(not(unix))]
114 let terminate = std::future::pending::<()>();
115
116 tokio::select! {
117 _ = ctrl_c => {
118 tracing::info!("Received Ctrl+C, shutting down gracefully...");
119 },
120 _ = terminate => {
121 tracing::info!("Received SIGTERM, shutting down gracefully...");
122 },
123 }
124}