forked from
microcosm.blue/microcosm-rs
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
1// use bincode::config::Options;
2// use clap::Parser;
3// use serde::Serialize;
4// use std::collections::HashMap;
5// use std::path::PathBuf;
6
7// use tokio_util::sync::CancellationToken;
8
9// use constellation::storage::rocks_store::{
10// Collection, DidId, RKey, RPath, Target, TargetKey, TargetLinkers, _bincode_opts,
11// };
12// use constellation::storage::RocksStorage;
13// use constellation::Did;
14
15// use links::parse_any_link;
16// use rocksdb::IteratorMode;
17// use std::time;
18
19// xxxx/// Aggregate links in the at-mosphere
20// #[derive(Parser, Debug)]
21// #[command(version, about, long_about = None)]
22// struct Args {
23// /// where is rocksdb's data
24// #[arg(short, long)]
25// data: PathBuf,
26// /// slow down so we don't kill the firehose consumer, if running concurrently
27// #[arg(short, long)]
28// limit: Option<u64>,
29// }
30
31// type LinkType = String;
32
33// #[derive(Debug, Eq, Hash, PartialEq, Serialize)]
34// struct SourceLink(Collection, RPath, LinkType, Option<Collection>); // last is target collection, if it's an at-uri link with a collection
35
36// #[derive(Debug, Serialize)]
37// struct SourceSample {
38// did: String,
39// rkey: String,
40// }
41
42// #[derive(Debug, Default, Serialize)]
43// struct Bucket {
44// count: u64,
45// sum: u64,
46// sample: Option<SourceSample>,
47// }
48
49// #[derive(Debug, Default, Serialize)]
50// struct Buckets([Bucket; 23]);
51
52// const BUCKETS: [u64; 23] = [
53// 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16_384, 65_535,
54// 262_144, 1_048_576,
55// ];
56
57// xxx// b1, b2, b3, b4, b5, b6, b7, b8, b9, b10, b12, b16, b32, b64, b128, b256, b512, b1024, b4096, b16384, b65535, b262144, bmax
58
59// static DID_IDS_CF: &str = "did_ids";
60// static TARGET_IDS_CF: &str = "target_ids";
61// static TARGET_LINKERS_CF: &str = "target_links";
62
63// const REPORT_INTERVAL: usize = 50_000;
64
65// type Stats = HashMap<SourceLink, Buckets>;
66
67// #[derive(Debug, Serialize)]
68// struct Printable {
69// collection: String,
70// path: String,
71// link_type: String,
72// target_collection: Option<String>,
73// buckets: Buckets,
74// }
75
76// #[derive(Debug, Default)]
77// struct ErrStats {
78// failed_to_get_sample: usize,
79// failed_to_read_target_id: usize,
80// failed_to_deserialize_target_key: usize,
81// failed_to_parse_target_as_link: usize,
82// failed_to_get_links: usize,
83// failed_to_deserialize_linkers: usize,
84// }
85
86// fn thousands(n: usize) -> String {
87// n.to_string()
88// .as_bytes()
89// .rchunks(3)
90// .rev()
91// .map(std::str::from_utf8)
92// .collect::<Result<Vec<&str>, _>>()
93// .unwrap()
94// .join(",")
95// }
96
97// fn main() {
98// let args = Args::parse();
99
100// let limit = args.limit.map(|amount| {
101// ratelimit::Ratelimiter::builder(amount, time::Duration::from_secs(1))
102// .max_tokens(amount)
103// .initial_available(amount)
104// .build()
105// .unwrap()
106// });
107
108// eprintln!("starting rocksdb...");
109// let rocks = RocksStorage::open_readonly(args.data).unwrap();
110// eprintln!("rocks ready.");
111
112// let RocksStorage { ref db, .. } = rocks;
113
114// let stay_alive = CancellationToken::new();
115// ctrlc::set_handler({
116// let mut desperation: u8 = 0;
117// let stay_alive = stay_alive.clone();
118// move || match desperation {
119// 0 => {
120// eprintln!("ok, shutting down...");
121// stay_alive.cancel();
122// desperation += 1;
123// }
124// 1.. => panic!("fine, panicking!"),
125// }
126// })
127// .unwrap();
128
129// let mut stats = Stats::new();
130// let mut err_stats: ErrStats = Default::default();
131
132// let did_ids_cf = db.cf_handle(DID_IDS_CF).unwrap();
133// let target_id_cf = db.cf_handle(TARGET_IDS_CF).unwrap();
134// let target_links_cf = db.cf_handle(TARGET_LINKERS_CF).unwrap();
135
136// let t0 = time::Instant::now();
137// let mut t_prev = t0;
138
139// let mut i = 0;
140// for item in db.iterator_cf(&target_id_cf, IteratorMode::Start) {
141// if stay_alive.is_cancelled() {
142// break;
143// }
144
145// if let Some(ref limiter) = limit {
146// if let Err(dur) = limiter.try_wait() {
147// std::thread::sleep(dur)
148// }
149// }
150
151// if i > 0 && i % REPORT_INTERVAL == 0 {
152// let now = time::Instant::now();
153// let rate = (REPORT_INTERVAL as f32) / (now.duration_since(t_prev).as_secs_f32());
154// eprintln!(
155// "{i}\t({}k)\t{:.2}\t{rate:.1}/s",
156// thousands(i / 1000),
157// t0.elapsed().as_secs_f32()
158// );
159// t_prev = now;
160// }
161// i += 1;
162
163// let Ok((target_key, target_id)) = item else {
164// err_stats.failed_to_read_target_id += 1;
165// continue;
166// };
167
168// let Ok(TargetKey(Target(target), collection, rpath)) =
169// _bincode_opts().deserialize(&target_key)
170// else {
171// err_stats.failed_to_deserialize_target_key += 1;
172// continue;
173// };
174
175// let source = {
176// let Some(parsed) = parse_any_link(&target) else {
177// err_stats.failed_to_parse_target_as_link += 1;
178// continue;
179// };
180// SourceLink(
181// collection,
182// rpath,
183// parsed.name().into(),
184// parsed.at_uri_collection().map(Collection),
185// )
186// };
187
188// let Ok(Some(links_raw)) = db.get_cf(&target_links_cf, &target_id) else {
189// err_stats.failed_to_get_links += 1;
190// continue;
191// };
192// let Ok(linkers) = _bincode_opts().deserialize::<TargetLinkers>(&links_raw) else {
193// err_stats.failed_to_deserialize_linkers += 1;
194// continue;
195// };
196// let (n, _) = linkers.count();
197
198// if n == 0 {
199// continue;
200// }
201
202// let mut bucket = 0;
203// for edge in BUCKETS {
204// if n <= edge || bucket == 22 {
205// break;
206// }
207// bucket += 1;
208// }
209
210// let b = &mut stats.entry(source).or_default().0[bucket];
211// b.count += 1;
212// b.sum += n;
213// if b.sample.is_none() {
214// let (DidId(did_id), RKey(k)) = &linkers.0[(n - 1) as usize];
215// if let Ok(Some(did_bytes)) = db.get_cf(&did_ids_cf, did_id.to_be_bytes()) {
216// if let Ok(Did(did)) = _bincode_opts().deserialize(&did_bytes) {
217// b.sample = Some(SourceSample {
218// did,
219// rkey: k.clone(),
220// });
221// } else {
222// err_stats.failed_to_get_sample += 1;
223// }
224// } else {
225// err_stats.failed_to_get_sample += 1;
226// }
227// }
228
229// // if i >= 40_000 {
230// // break;
231// // }
232// }
233
234// let dt = t0.elapsed();
235
236// eprintln!("gathering stats for output...");
237
238// let itemified = stats
239// .into_iter()
240// .map(
241// |(
242// SourceLink(Collection(collection), RPath(path), link_type, target_collection),
243// buckets,
244// )| Printable {
245// collection,
246// path,
247// link_type,
248// target_collection: target_collection.map(|Collection(c)| c),
249// buckets,
250// },
251// )
252// .collect::<Vec<_>>();
253
254// match serde_json::to_string(&itemified) {
255// Ok(s) => println!("{s}"),
256// Err(e) => eprintln!("failed to serialize results: {e:?}"),
257// }
258
259// eprintln!(
260// "{} summarizing {} link targets in {:.1}s",
261// if stay_alive.is_cancelled() {
262// "STOPPED"
263// } else {
264// "FINISHED"
265// },
266// thousands(i),
267// dt.as_secs_f32()
268// );
269// eprintln!("{err_stats:?}");
270// eprintln!("bye.");
271// }
272
273// xxx// scan plan
274
275// xxx// buckets (backlink count)
276// xxx// 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16384, 65535, 262144, 1048576+
277// xxx// by
278// xxx// - collection
279// xxx// - json path
280// xxx// - link type
281// xxx// samples for each bucket for each variation
282fn main() {}