From f4dc75d20e7f5df28734968ec4432f1322da23b6 Mon Sep 17 00:00:00 2001 From: azomDev Date: Fri, 12 Sep 2025 02:29:18 -0400 Subject: [PATCH] added multiple terminal sync using global timestamps --- src/client.rs | 35 ++++++++------- src/jetstream.rs | 109 ++++++++++++++++++++++++++++------------------- src/tui.rs | 71 +++++++++++++++++++++--------- 3 files changed, 138 insertions(+), 77 deletions(-) diff --git a/src/client.rs b/src/client.rs index a604040..28f55d6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,9 @@ use anyhow::{Context, Result}; use chrono::Utc; -use reqwest::{Client as HttpClient, header::{HeaderMap, HeaderValue, AUTHORIZATION}}; +use reqwest::{ + header::{HeaderMap, HeaderValue, AUTHORIZATION}, + Client as HttpClient, +}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -62,13 +65,14 @@ impl AtProtoClient { pub async fn login(&mut self, credentials: &Credentials) -> Result<()> { let login_url = format!("{}/xrpc/com.atproto.server.createSession", self.base_url); - + let request = LoginRequest { identifier: credentials.username.clone(), password: credentials.password.clone(), }; - let response = self.http_client + let response = self + .http_client .post(&login_url) .header("Content-Type", "application/json") .json(&request) @@ -93,36 +97,37 @@ impl AtProtoClient { } pub async fn publish_blip(&self, content: &str) -> Result { - let session = self.session.as_ref() + let session = self + .session + .as_ref() .context("Not authenticated. Please run 'thought login' first.")?; + let timestamp = Utc::now().to_rfc3339().replace("+00:00", "Z"); + let record = BlipRecord { record_type: "stream.thought.blip".to_string(), content: content.to_string(), - created_at: Utc::now().to_rfc3339().replace("+00:00", "Z"), + created_at: timestamp.clone(), }; let request = CreateRecordRequest { repo: session.did.clone(), collection: "stream.thought.blip".to_string(), - record: serde_json::to_value(&record) - .context("Failed to serialize blip record")?, + record: serde_json::to_value(&record).context("Failed to serialize blip record")?, }; let create_url = format!("{}/xrpc/com.atproto.repo.createRecord", self.base_url); - + let mut headers = HeaderMap::new(); headers.insert( AUTHORIZATION, HeaderValue::from_str(&format!("Bearer {}", session.access_jwt)) .context("Invalid authorization header")?, ); - headers.insert( - "Content-Type", - HeaderValue::from_static("application/json"), - ); + headers.insert("Content-Type", HeaderValue::from_static("application/json")); - let response = self.http_client + let response = self + .http_client .post(&create_url) .headers(headers) .json(&request) @@ -141,7 +146,7 @@ impl AtProtoClient { .await .context("Failed to parse create record response")?; - Ok(create_response.uri) + Ok(timestamp) } pub fn is_authenticated(&self) -> bool { @@ -151,4 +156,4 @@ impl AtProtoClient { pub fn get_user_did(&self) -> Option { self.session.as_ref().map(|s| s.did.clone()) } -} \ No newline at end of file +} diff --git a/src/jetstream.rs b/src/jetstream.rs index a6234d8..4086226 100644 --- a/src/jetstream.rs +++ b/src/jetstream.rs @@ -1,15 +1,12 @@ use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; use futures_util::StreamExt; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, time::Duration}; use tokio::sync::mpsc; use tokio_tungstenite::{ connect_async, - tungstenite::{ - client::IntoClientRequest, - http::HeaderValue, - Message, - }, + tungstenite::{client::IntoClientRequest, http::HeaderValue, Message}, }; use url::Url; @@ -46,7 +43,7 @@ pub struct BlipRecord { pub struct JetstreamClient { did_cache: HashMap, // DID -> handle cache - own_did: Option, // User's own DID to filter out + own_did: Option, // User's own DID to filter out } impl JetstreamClient { @@ -57,24 +54,31 @@ impl JetstreamClient { } } - pub async fn connect_and_listen(&mut self, message_tx: mpsc::UnboundedSender) -> Result<()> { + pub async fn connect_and_listen( + &mut self, + message_tx: mpsc::UnboundedSender, + ) -> Result<()> { // Try simple connection first, then with collection filter let urls = vec![ "wss://jetstream2.us-west.bsky.network/subscribe", - "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip" + "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=stream.thought.blip", ]; - + for (i, jetstream_url) in urls.iter().enumerate() { // Send status to TUI instead of console let status_msg = crate::tui::Message::new( "system".to_string(), format!("Trying connection {} of {}", i + 1, urls.len()), false, + None, ); let _ = message_tx.send(status_msg); - + loop { - match self.try_connect_and_listen(&message_tx, jetstream_url).await { + match self + .try_connect_and_listen(&message_tx, jetstream_url) + .await + { Ok(_) => { return Ok(()); } @@ -85,6 +89,7 @@ impl JetstreamClient { "system".to_string(), "Connection failed, retrying in 5s...".to_string(), false, + None, ); let _ = message_tx.send(retry_msg); tokio::time::sleep(Duration::from_secs(5)).await; @@ -96,7 +101,7 @@ impl JetstreamClient { } } } - + Ok(()) } @@ -108,19 +113,16 @@ impl JetstreamClient { // Parse URL and create request with headers let url = Url::parse(url_str)?; let mut request = url.into_client_request()?; - + // Add User-Agent header - request.headers_mut().insert( - "User-Agent", - HeaderValue::from_static("think-cli/0.1.0") - ); - + request + .headers_mut() + .insert("User-Agent", HeaderValue::from_static("think-cli/0.1.0")); + // Connect with timeout let connect_future = connect_async(request); - let (ws_stream, _response) = tokio::time::timeout( - Duration::from_secs(10), - connect_future - ).await + let (ws_stream, _response) = tokio::time::timeout(Duration::from_secs(10), connect_future) + .await .context("Connection timeout")? .context("Failed to connect to jetstream")?; @@ -129,6 +131,7 @@ impl JetstreamClient { "system".to_string(), "Connected to jetstream! Listening for blips...".to_string(), false, + None, ); let _ = message_tx.send(success_msg); @@ -162,19 +165,20 @@ impl JetstreamClient { ) -> Result<()> { // First, check if it's even a commit event using basic JSON parsing let event_value: serde_json::Value = serde_json::from_str(message)?; - + // Only process commit events if event_value.get("kind").and_then(|k| k.as_str()) != Some("commit") { return Ok(()); } - + // Check if it has a commit with the right collection let commit = event_value.get("commit"); if let Some(commit_obj) = commit { - if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip") { + if commit_obj.get("collection").and_then(|c| c.as_str()) != Some("stream.thought.blip") + { return Ok(()); } - + // Skip delete operations if commit_obj.get("operation").and_then(|o| o.as_str()) == Some("delete") { return Ok(()); @@ -188,11 +192,11 @@ impl JetstreamClient { let commit = event.commit.as_ref().unwrap(); // Safe because we checked above // Skip messages from our own DID - if let Some(ref own_did) = self.own_did { - if &event.did == own_did { - return Ok(()); - } - } + // if let Some(ref own_did) = self.own_did { + // if &event.did == own_did { + // return Ok(()); + // } + // } // Parse the blip record let record_data = commit.record.as_ref(); @@ -206,13 +210,26 @@ impl JetstreamClient { }; // Get or resolve the handle - let handle = self.resolve_did(&event.did).await; + let mut handle = self.resolve_did(&event.did).await; + let mut is_own = false; + + if let Some(ref own_did) = self.own_did { + if &event.did == own_did { + is_own = true; + handle = String::from("you"); + } + } // Create TUI message let tui_message = TuiMessage::new( handle, blip_record.content, - false, // Not our own message + is_own, // Not our own message + Some( + DateTime::parse_from_rfc3339(&blip_record.created_at) + .unwrap() + .with_timezone(&Utc), + ), ); // Send to TUI @@ -252,25 +269,28 @@ impl JetstreamClient { async fn fetch_handle_for_did(&self, did: &str) -> Result { // Use the ATProto API to resolve DID to handle let client = reqwest::Client::new(); - let url = format!("https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}", did); - + let url = format!( + "https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?did={}", + did + ); + #[derive(Deserialize)] struct ResolveResponse { handle: String, } // Try a simpler approach - resolve via profile - let profile_url = format!("https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}", did); - + let profile_url = format!( + "https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor={}", + did + ); + #[derive(Deserialize)] struct ProfileResponse { handle: String, } - let response = client - .get(&profile_url) - .send() - .await?; + let response = client.get(&profile_url).send().await?; if response.status().is_success() { let profile: ProfileResponse = response.json().await?; @@ -281,7 +301,10 @@ impl JetstreamClient { } } -pub async fn start_jetstream_listener(message_tx: mpsc::UnboundedSender, own_did: Option) -> Result<()> { +pub async fn start_jetstream_listener( + message_tx: mpsc::UnboundedSender, + own_did: Option, +) -> Result<()> { let mut client = JetstreamClient::new(own_did); client.connect_and_listen(message_tx).await -} \ No newline at end of file +} diff --git a/src/tui.rs b/src/tui.rs index 2aa3b5c..004319d 100644 --- a/src/tui.rs +++ b/src/tui.rs @@ -21,7 +21,7 @@ use tokio::sync::mpsc; use crate::client::AtProtoClient; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Message { pub handle: String, pub content: String, @@ -30,11 +30,16 @@ pub struct Message { } impl Message { - pub fn new(handle: String, content: String, is_own: bool) -> Self { + pub fn new( + handle: String, + content: String, + is_own: bool, + timestamp: Option>, + ) -> Self { Self { handle, content, - timestamp: Utc::now(), + timestamp: timestamp.unwrap_or_else(Utc::now), is_own, } } @@ -71,12 +76,12 @@ impl TuiApp { pub fn add_message(&mut self, message: Message) { self.messages.push(message); self.message_count += 1; - + // Keep only last 1000 messages if self.messages.len() > 1000 { self.messages.remove(0); } - + // Auto-scroll to bottom unless user is scrolling up if self.scroll_offset == 0 { self.scroll_offset = 0; // Stay at bottom @@ -139,26 +144,28 @@ impl TuiApp { let vertical = Layout::default() .direction(Direction::Vertical) .constraints([ - Constraint::Min(0), // Messages area - Constraint::Length(3), // Status area - Constraint::Length(3), // Input area + Constraint::Min(0), // Messages area + Constraint::Length(3), // Status area + Constraint::Length(3), // Input area ]) .split(frame.area()); // Render messages let mut message_lines = Vec::new(); - + // Convert messages to styled lines in reverse chronological order (newest first) for msg in self.messages.iter().rev() { let style = if msg.is_own { - Style::default().fg(Color::Green).add_modifier(Modifier::BOLD) + Style::default() + .fg(Color::Green) + .add_modifier(Modifier::BOLD) } else { Style::default().fg(Color::White) }; - + message_lines.push(Line::from(Span::styled(msg.format_display(), style))); } - + let messages_text = Text::from(message_lines); let messages_paragraph = Paragraph::new(messages_text) .block(Block::default().borders(Borders::ALL).title("Messages")) @@ -172,15 +179,18 @@ impl TuiApp { } else { Style::default().fg(Color::Yellow) }; - + let status_paragraph = Paragraph::new(self.status.clone()) .style(status_style) .block(Block::default().borders(Borders::ALL).title("Status")); frame.render_widget(status_paragraph, vertical[1]); // Render input - let input_paragraph = Paragraph::new(self.input.clone()) - .block(Block::default().borders(Borders::ALL).title("Input (Esc to quit)")); + let input_paragraph = Paragraph::new(self.input.clone()).block( + Block::default() + .borders(Borders::ALL) + .title("Input (Esc to quit)"), + ); frame.render_widget(input_paragraph, vertical[2]); } } @@ -197,12 +207,13 @@ pub async fn run_tui( let mut terminal = Terminal::new(backend)?; let mut app = TuiApp::new(); - + // Add welcome message app.add_message(Message::new( "system".to_string(), "Welcome to Think TUI! Connecting to jetstream...".to_string(), false, + None, )); let result = run_tui_loop(&mut terminal, &mut app, client, &mut message_rx).await; @@ -234,7 +245,11 @@ async fn run_tui_loop( if let Event::Key(key) = event::read()? { if key.kind == KeyEventKind::Press { // Handle Ctrl+C - if matches!(key.code, KeyCode::Char('c')) && key.modifiers.contains(crossterm::event::KeyModifiers::CONTROL) { + if matches!(key.code, KeyCode::Char('c')) + && key + .modifiers + .contains(crossterm::event::KeyModifiers::CONTROL) + { break; } @@ -242,12 +257,17 @@ async fn run_tui_loop( if let Some(message) = app.handle_input(key.code) { // Publish the message match client.publish_blip(&message).await { - Ok(_) => { + Ok(t) => { // Add our own message to the display app.add_message(Message::new( "you".to_string(), message, true, + Some( + DateTime::parse_from_rfc3339(&t) + .unwrap() + .with_timezone(&Utc), + ), )); } Err(e) => { @@ -256,6 +276,7 @@ async fn run_tui_loop( "error".to_string(), format!("Failed to publish: {}", e), false, + None, )); } } @@ -266,6 +287,18 @@ async fn run_tui_loop( // Check for new messages from jetstream while let Ok(message) = message_rx.try_recv() { + // find most recent is_own message and see if it's already there (you posted it) + let duplicate = app + .messages + .iter() + .rev() + .find(|m| m.is_own) + .is_some_and(|m| m.timestamp == message.timestamp); + // TODO: some messages getting flagged as duplicates if it's the same content as previous separate message from you + + if duplicate { + continue; // skip this while iteration + } app.add_message(message); app.set_connection_status(true); } @@ -277,4 +310,4 @@ async fn run_tui_loop( } Ok(()) -} \ No newline at end of file +} -- 2.43.0 From 038795818ef2f393390e19cadbb2a0dd833eae4e Mon Sep 17 00:00:00 2001 From: azomDev Date: Fri, 12 Sep 2025 03:03:18 -0400 Subject: [PATCH] cleanup --- src/jetstream.rs | 34 +++++++++++----------------------- src/tui.rs | 4 ++-- 2 files changed, 13 insertions(+), 25 deletions(-) diff --git a/src/jetstream.rs b/src/jetstream.rs index 4086226..4968730 100644 --- a/src/jetstream.rs +++ b/src/jetstream.rs @@ -191,13 +191,6 @@ impl JetstreamClient { let event: JetstreamEvent = serde_json::from_str(message)?; let commit = event.commit.as_ref().unwrap(); // Safe because we checked above - // Skip messages from our own DID - // if let Some(ref own_did) = self.own_did { - // if &event.did == own_did { - // return Ok(()); - // } - // } - // Parse the blip record let record_data = commit.record.as_ref(); if record_data.is_none() { @@ -209,27 +202,22 @@ impl JetstreamClient { Err(_) => return Ok(()), // Silently skip unparseable records }; - // Get or resolve the handle - let mut handle = self.resolve_did(&event.did).await; - let mut is_own = false; - - if let Some(ref own_did) = self.own_did { - if &event.did == own_did { - is_own = true; - handle = String::from("you"); - } - } + let is_own = self.own_did.as_ref().is_some_and(|own| own == &event.did); + let handle = if is_own { + "you".into() + } else { + // Get or resolve the handle + self.resolve_did(&event.did).await + }; // Create TUI message let tui_message = TuiMessage::new( handle, blip_record.content, - is_own, // Not our own message - Some( - DateTime::parse_from_rfc3339(&blip_record.created_at) - .unwrap() - .with_timezone(&Utc), - ), + is_own, + DateTime::parse_from_rfc3339(&blip_record.created_at) + .map(|dt| dt.with_timezone(&Utc)) + .ok(), // Parse RFC3339 → UTC, None if invalid (so current timestamp instead) ); // Send to TUI diff --git a/src/tui.rs b/src/tui.rs index 004319d..059a730 100644 --- a/src/tui.rs +++ b/src/tui.rs @@ -294,11 +294,11 @@ async fn run_tui_loop( .rev() .find(|m| m.is_own) .is_some_and(|m| m.timestamp == message.timestamp); - // TODO: some messages getting flagged as duplicates if it's the same content as previous separate message from you if duplicate { - continue; // skip this while iteration + continue; } + app.add_message(message); app.set_connection_status(true); } -- 2.43.0