Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1// use bincode::config::Options; 2// use clap::Parser; 3// use serde::Serialize; 4// use std::collections::HashMap; 5// use std::path::PathBuf; 6 7// use tokio_util::sync::CancellationToken; 8 9// use constellation::storage::rocks_store::{ 10// Collection, DidId, RKey, RPath, Target, TargetKey, TargetLinkers, _bincode_opts, 11// }; 12// use constellation::storage::RocksStorage; 13// use constellation::Did; 14 15// use links::parse_any_link; 16// use rocksdb::IteratorMode; 17// use std::time; 18 19// xxxx/// Aggregate links in the at-mosphere 20// #[derive(Parser, Debug)] 21// #[command(version, about, long_about = None)] 22// struct Args { 23// /// where is rocksdb's data 24// #[arg(short, long)] 25// data: PathBuf, 26// /// slow down so we don't kill the firehose consumer, if running concurrently 27// #[arg(short, long)] 28// limit: Option<u64>, 29// } 30 31// type LinkType = String; 32 33// #[derive(Debug, Eq, Hash, PartialEq, Serialize)] 34// struct SourceLink(Collection, RPath, LinkType, Option<Collection>); // last is target collection, if it's an at-uri link with a collection 35 36// #[derive(Debug, Serialize)] 37// struct SourceSample { 38// did: String, 39// rkey: String, 40// } 41 42// #[derive(Debug, Default, Serialize)] 43// struct Bucket { 44// count: u64, 45// sum: u64, 46// sample: Option<SourceSample>, 47// } 48 49// #[derive(Debug, Default, Serialize)] 50// struct Buckets([Bucket; 23]); 51 52// const BUCKETS: [u64; 23] = [ 53// 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16_384, 65_535, 54// 262_144, 1_048_576, 55// ]; 56 57// xxx// b1, b2, b3, b4, b5, b6, b7, b8, b9, b10, b12, b16, b32, b64, b128, b256, b512, b1024, b4096, b16384, b65535, b262144, bmax 58 59// static DID_IDS_CF: &str = "did_ids"; 60// static TARGET_IDS_CF: &str = "target_ids"; 61// static TARGET_LINKERS_CF: &str = "target_links"; 62 63// const REPORT_INTERVAL: usize = 50_000; 64 65// type Stats = HashMap<SourceLink, Buckets>; 66 67// #[derive(Debug, Serialize)] 68// struct Printable { 69// collection: String, 70// path: String, 71// link_type: String, 72// target_collection: Option<String>, 73// buckets: Buckets, 74// } 75 76// #[derive(Debug, Default)] 77// struct ErrStats { 78// failed_to_get_sample: usize, 79// failed_to_read_target_id: usize, 80// failed_to_deserialize_target_key: usize, 81// failed_to_parse_target_as_link: usize, 82// failed_to_get_links: usize, 83// failed_to_deserialize_linkers: usize, 84// } 85 86// fn thousands(n: usize) -> String { 87// n.to_string() 88// .as_bytes() 89// .rchunks(3) 90// .rev() 91// .map(std::str::from_utf8) 92// .collect::<Result<Vec<&str>, _>>() 93// .unwrap() 94// .join(",") 95// } 96 97// fn main() { 98// let args = Args::parse(); 99 100// let limit = args.limit.map(|amount| { 101// ratelimit::Ratelimiter::builder(amount, time::Duration::from_secs(1)) 102// .max_tokens(amount) 103// .initial_available(amount) 104// .build() 105// .unwrap() 106// }); 107 108// eprintln!("starting rocksdb..."); 109// let rocks = RocksStorage::open_readonly(args.data).unwrap(); 110// eprintln!("rocks ready."); 111 112// let RocksStorage { ref db, .. } = rocks; 113 114// let stay_alive = CancellationToken::new(); 115// ctrlc::set_handler({ 116// let mut desperation: u8 = 0; 117// let stay_alive = stay_alive.clone(); 118// move || match desperation { 119// 0 => { 120// eprintln!("ok, shutting down..."); 121// stay_alive.cancel(); 122// desperation += 1; 123// } 124// 1.. => panic!("fine, panicking!"), 125// } 126// }) 127// .unwrap(); 128 129// let mut stats = Stats::new(); 130// let mut err_stats: ErrStats = Default::default(); 131 132// let did_ids_cf = db.cf_handle(DID_IDS_CF).unwrap(); 133// let target_id_cf = db.cf_handle(TARGET_IDS_CF).unwrap(); 134// let target_links_cf = db.cf_handle(TARGET_LINKERS_CF).unwrap(); 135 136// let t0 = time::Instant::now(); 137// let mut t_prev = t0; 138 139// let mut i = 0; 140// for item in db.iterator_cf(&target_id_cf, IteratorMode::Start) { 141// if stay_alive.is_cancelled() { 142// break; 143// } 144 145// if let Some(ref limiter) = limit { 146// if let Err(dur) = limiter.try_wait() { 147// std::thread::sleep(dur) 148// } 149// } 150 151// if i > 0 && i % REPORT_INTERVAL == 0 { 152// let now = time::Instant::now(); 153// let rate = (REPORT_INTERVAL as f32) / (now.duration_since(t_prev).as_secs_f32()); 154// eprintln!( 155// "{i}\t({}k)\t{:.2}\t{rate:.1}/s", 156// thousands(i / 1000), 157// t0.elapsed().as_secs_f32() 158// ); 159// t_prev = now; 160// } 161// i += 1; 162 163// let Ok((target_key, target_id)) = item else { 164// err_stats.failed_to_read_target_id += 1; 165// continue; 166// }; 167 168// let Ok(TargetKey(Target(target), collection, rpath)) = 169// _bincode_opts().deserialize(&target_key) 170// else { 171// err_stats.failed_to_deserialize_target_key += 1; 172// continue; 173// }; 174 175// let source = { 176// let Some(parsed) = parse_any_link(&target) else { 177// err_stats.failed_to_parse_target_as_link += 1; 178// continue; 179// }; 180// SourceLink( 181// collection, 182// rpath, 183// parsed.name().into(), 184// parsed.at_uri_collection().map(Collection), 185// ) 186// }; 187 188// let Ok(Some(links_raw)) = db.get_cf(&target_links_cf, &target_id) else { 189// err_stats.failed_to_get_links += 1; 190// continue; 191// }; 192// let Ok(linkers) = _bincode_opts().deserialize::<TargetLinkers>(&links_raw) else { 193// err_stats.failed_to_deserialize_linkers += 1; 194// continue; 195// }; 196// let (n, _) = linkers.count(); 197 198// if n == 0 { 199// continue; 200// } 201 202// let mut bucket = 0; 203// for edge in BUCKETS { 204// if n <= edge || bucket == 22 { 205// break; 206// } 207// bucket += 1; 208// } 209 210// let b = &mut stats.entry(source).or_default().0[bucket]; 211// b.count += 1; 212// b.sum += n; 213// if b.sample.is_none() { 214// let (DidId(did_id), RKey(k)) = &linkers.0[(n - 1) as usize]; 215// if let Ok(Some(did_bytes)) = db.get_cf(&did_ids_cf, did_id.to_be_bytes()) { 216// if let Ok(Did(did)) = _bincode_opts().deserialize(&did_bytes) { 217// b.sample = Some(SourceSample { 218// did, 219// rkey: k.clone(), 220// }); 221// } else { 222// err_stats.failed_to_get_sample += 1; 223// } 224// } else { 225// err_stats.failed_to_get_sample += 1; 226// } 227// } 228 229// // if i >= 40_000 { 230// // break; 231// // } 232// } 233 234// let dt = t0.elapsed(); 235 236// eprintln!("gathering stats for output..."); 237 238// let itemified = stats 239// .into_iter() 240// .map( 241// |( 242// SourceLink(Collection(collection), RPath(path), link_type, target_collection), 243// buckets, 244// )| Printable { 245// collection, 246// path, 247// link_type, 248// target_collection: target_collection.map(|Collection(c)| c), 249// buckets, 250// }, 251// ) 252// .collect::<Vec<_>>(); 253 254// match serde_json::to_string(&itemified) { 255// Ok(s) => println!("{s}"), 256// Err(e) => eprintln!("failed to serialize results: {e:?}"), 257// } 258 259// eprintln!( 260// "{} summarizing {} link targets in {:.1}s", 261// if stay_alive.is_cancelled() { 262// "STOPPED" 263// } else { 264// "FINISHED" 265// }, 266// thousands(i), 267// dt.as_secs_f32() 268// ); 269// eprintln!("{err_stats:?}"); 270// eprintln!("bye."); 271// } 272 273// xxx// scan plan 274 275// xxx// buckets (backlink count) 276// xxx// 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16384, 65535, 262144, 1048576+ 277// xxx// by 278// xxx// - collection 279// xxx// - json path 280// xxx// - link type 281// xxx// samples for each bucket for each variation 282fn main() {}