···
1
-
use crate::db_types::{db_complete, DbBytes, DbStaticStr, StaticStr};
1
+
use crate::db_types::{db_complete, DbBytes, DbStaticStr, EncodingResult, StaticStr};
use crate::error::StorageError;
use crate::storage::{StorageResult, StorageWhatever, StoreBackground, StoreReader, StoreWriter};
use crate::store_types::{
···
NewRollupCursorKey, NewRollupCursorValue, NsidRecordFeedKey, NsidRecordFeedVal,
RecordLocationKey, RecordLocationMeta, RecordLocationVal, RecordRawValue, SketchSecretKey,
SketchSecretPrefix, TakeoffKey, TakeoffValue, TrimCollectionCursorKey, WeekTruncatedCursor,
12
-
WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, WithCollection,
12
+
WeeklyDidsKey, WeeklyRecordsKey, WeeklyRollupKey, WithCollection, WithRank,
CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord,
···
349
+
type GetRollupKey = Arc<dyn Fn(&Nsid) -> EncodingResult<Vec<u8>>>;
350
+
fn get_lookup_iter<T: WithCollection + WithRank + DbBytes + 'static>(
351
+
snapshot: lsm_tree::Snapshot,
352
+
start: Bound<Vec<u8>>,
353
+
end: Bound<Vec<u8>>,
354
+
get_rollup_key: GetRollupKey,
355
+
) -> StorageResult<NsidCounter> {
356
+
Ok(Box::new(snapshot.range((start, end)).rev().map(
358
+
let (k_bytes, _) = kv?;
359
+
let key = db_complete::<T>(&k_bytes)?;
360
+
let nsid = key.collection().clone();
361
+
let get_counts: GetCounts = Box::new({
362
+
let nsid = nsid.clone();
363
+
let snapshot = snapshot.clone();
364
+
let get_rollup_key = get_rollup_key.clone();
366
+
let db_count_bytes = snapshot.get(get_rollup_key(&nsid)?)?.expect(
367
+
"integrity: all-time rank rollup must have corresponding all-time count rollup",
369
+
Ok(db_complete::<CountsValue>(&db_count_bytes)?)
372
+
Ok((nsid, get_counts))
fn get_storage_stats(&self) -> StorageResult<serde_json::Value> {
···
409
-
fn get_collections(
436
+
fn get_lexi_collections(
438
+
snapshot: Snapshot,
412
-
order: OrderCollectionsBy,
413
-
since: Option<HourTruncatedCursor>,
414
-
until: Option<HourTruncatedCursor>,
440
+
cursor: Option<Vec<u8>>,
441
+
buckets: Vec<CursorBucket>,
) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
416
-
let snapshot = self.rollups.snapshot();
418
-
let buckets = if let (None, None) = (since, until) {
419
-
vec![CursorBucket::AllTime]
421
-
let mut lower = self.get_earliest_hour(Some(&snapshot))?;
422
-
if let Some(specified) = since {
423
-
if specified > lower {
427
-
let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
428
-
CursorBucket::buckets_spanning(lower, upper)
443
+
let cursor_nsid = cursor.as_deref().map(db_complete::<Nsid>).transpose()?;
let mut iters: Vec<Peekable<NsidCounter>> = Vec::with_capacity(buckets.len());
434
-
OrderCollectionsBy::Lexi { cursor } => {
435
-
let cursor_nsid = cursor.as_deref().map(db_complete::<Nsid>).transpose()?;
436
-
for bucket in &buckets {
437
-
let it: NsidCounter = match bucket {
438
-
CursorBucket::Hour(t) => {
439
-
let start = cursor_nsid
441
-
.map(|nsid| HourlyRollupKey::after_nsid(*t, nsid))
442
-
.unwrap_or_else(|| HourlyRollupKey::start(*t))?;
443
-
let end = HourlyRollupKey::end(*t)?;
444
-
get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)?
446
-
CursorBucket::Week(t) => {
447
-
let start = cursor_nsid
449
-
.map(|nsid| WeeklyRollupKey::after_nsid(*t, nsid))
450
-
.unwrap_or_else(|| WeeklyRollupKey::start(*t))?;
451
-
let end = WeeklyRollupKey::end(*t)?;
452
-
get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)?
454
-
CursorBucket::AllTime => {
455
-
let start = cursor_nsid
457
-
.map(AllTimeRollupKey::after_nsid)
458
-
.unwrap_or_else(AllTimeRollupKey::start)?;
459
-
let end = AllTimeRollupKey::end()?;
460
-
get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)?
463
-
iters.push(it.peekable());
445
+
for bucket in &buckets {
446
+
let it: NsidCounter = match bucket {
447
+
CursorBucket::Hour(t) => {
448
+
let start = cursor_nsid
450
+
.map(|nsid| HourlyRollupKey::after_nsid(*t, nsid))
451
+
.unwrap_or_else(|| HourlyRollupKey::start(*t))?;
452
+
let end = HourlyRollupKey::end(*t)?;
453
+
get_lexi_iter::<HourlyRollupKey>(&snapshot, start, end)?
466
-
OrderCollectionsBy::RecordsCreated => todo!(),
467
-
OrderCollectionsBy::DidsEstimate => todo!(),
455
+
CursorBucket::Week(t) => {
456
+
let start = cursor_nsid
458
+
.map(|nsid| WeeklyRollupKey::after_nsid(*t, nsid))
459
+
.unwrap_or_else(|| WeeklyRollupKey::start(*t))?;
460
+
let end = WeeklyRollupKey::end(*t)?;
461
+
get_lexi_iter::<WeeklyRollupKey>(&snapshot, start, end)?
463
+
CursorBucket::AllTime => {
464
+
let start = cursor_nsid
466
+
.map(AllTimeRollupKey::after_nsid)
467
+
.unwrap_or_else(AllTimeRollupKey::start)?;
468
+
let end = AllTimeRollupKey::end()?;
469
+
get_lexi_iter::<AllTimeRollupKey>(&snapshot, start, end)?
472
+
iters.push(it.peekable());
let mut out = Vec::new();
···
511
-
fn get_top_collections_by_count(
516
+
fn get_ordered_collections(
518
+
snapshot: Snapshot,
514
-
since: Option<HourTruncatedCursor>,
515
-
until: Option<HourTruncatedCursor>,
520
+
order: OrderCollectionsBy,
521
+
buckets: Vec<CursorBucket>,
) -> StorageResult<Vec<NsidCount>> {
517
-
Ok(if since.is_none() && until.is_none() {
518
-
let snapshot = self.rollups.snapshot();
519
-
let mut out = Vec::with_capacity(limit);
520
-
let prefix = AllTimeRecordsKey::from_prefix_to_db_bytes(&Default::default())?;
521
-
for kv in snapshot.prefix(prefix).rev().take(limit) {
522
-
let (key_bytes, _) = kv?;
523
-
let key = db_complete::<AllTimeRecordsKey>(&key_bytes)?;
524
-
let rollup_key = AllTimeRollupKey::new(key.collection());
525
-
let db_count_bytes = snapshot.get(rollup_key.to_db_bytes()?)?.expect(
526
-
"integrity: all-time rank rollup must have corresponding all-time count rollup",
528
-
let db_counts = db_complete::<CountsValue>(&db_count_bytes)?;
529
-
assert_eq!(db_counts.records(), key.count());
530
-
out.push(NsidCount {
531
-
nsid: key.collection().to_string(),
532
-
records: db_counts.records(),
533
-
dids_estimate: db_counts.dids().estimate() as u64,
523
+
let mut iters: Vec<NsidCounter> = Vec::with_capacity(buckets.len());
525
+
for bucket in buckets {
526
+
let it: NsidCounter = match (&order, bucket) {
527
+
(OrderCollectionsBy::RecordsCreated, CursorBucket::Hour(t)) => {
528
+
get_lookup_iter::<HourlyRecordsKey>(
530
+
HourlyRecordsKey::start(t)?,
531
+
HourlyRecordsKey::end(t)?,
533
+
move |collection| HourlyRollupKey::new(t, collection).to_db_bytes()
537
+
(OrderCollectionsBy::DidsEstimate, CursorBucket::Hour(t)) => {
538
+
get_lookup_iter::<HourlyDidsKey>(
540
+
HourlyDidsKey::start(t)?,
541
+
HourlyDidsKey::end(t)?,
543
+
move |collection| HourlyRollupKey::new(t, collection).to_db_bytes()
547
+
(OrderCollectionsBy::RecordsCreated, CursorBucket::Week(t)) => {
548
+
get_lookup_iter::<WeeklyRecordsKey>(
550
+
WeeklyRecordsKey::start(t)?,
551
+
WeeklyRecordsKey::end(t)?,
553
+
move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes()
557
+
(OrderCollectionsBy::DidsEstimate, CursorBucket::Week(t)) => {
558
+
get_lookup_iter::<WeeklyDidsKey>(
560
+
WeeklyDidsKey::start(t)?,
561
+
WeeklyDidsKey::end(t)?,
563
+
move |collection| WeeklyRollupKey::new(t, collection).to_db_bytes()
567
+
(OrderCollectionsBy::RecordsCreated, CursorBucket::AllTime) => {
568
+
get_lookup_iter::<AllTimeRecordsKey>(
570
+
AllTimeRecordsKey::start()?,
571
+
AllTimeRecordsKey::end()?,
572
+
Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()),
575
+
(OrderCollectionsBy::DidsEstimate, CursorBucket::AllTime) => {
576
+
get_lookup_iter::<AllTimeDidsKey>(
578
+
AllTimeDidsKey::start()?,
579
+
AllTimeDidsKey::end()?,
580
+
Arc::new(|collection| AllTimeRollupKey::new(collection).to_db_bytes()),
583
+
(OrderCollectionsBy::Lexi { .. }, _) => unreachable!(),
588
+
// overfetch by taking a bit more than the limit
589
+
// merge by collection
590
+
// sort by requested order, take limit, discard all remaining
592
+
// this isn't guaranteed to be correct, but it will hopefully be close most of the time:
593
+
// - it's possible that some NSIDs might score low during some time-buckets, and miss being merged
594
+
// - overfetching hopefully helps a bit by catching nsids near the threshold more often, but. yeah.
596
+
// this thing is heavy, there's probably a better way
597
+
let mut ranked: HashMap<Nsid, CountsValue> = HashMap::with_capacity(limit * 2);
598
+
for iter in iters {
599
+
for pair in iter.take((limit as f64 * 1.3).ceil() as usize) {
600
+
let (nsid, get_counts) = pair?;
601
+
let counts = get_counts()?;
602
+
ranked.entry(nsid).or_default().merge(&counts);
605
+
let mut ranked: Vec<(Nsid, CountsValue)> = ranked.into_iter().collect();
607
+
OrderCollectionsBy::RecordsCreated => ranked.sort_by_key(|(_, c)| c.records()),
608
+
OrderCollectionsBy::DidsEstimate => ranked.sort_by_key(|(_, c)| c.dids().estimate()),
609
+
OrderCollectionsBy::Lexi { .. } => unreachable!(),
611
+
let counts = ranked
615
+
.map(|(nsid, cv)| NsidCount {
616
+
nsid: nsid.to_string(),
617
+
records: cv.records(),
618
+
dids_estimate: cv.dids().estimate() as u64,
542
-
fn get_top_collections_by_dids(
624
+
fn get_collections(
627
+
order: OrderCollectionsBy,
since: Option<HourTruncatedCursor>,
until: Option<HourTruncatedCursor>,
547
-
) -> StorageResult<Vec<NsidCount>> {
548
-
Ok(if since.is_none() && until.is_none() {
549
-
let snapshot = self.rollups.snapshot();
550
-
let mut out = Vec::with_capacity(limit);
551
-
let prefix = AllTimeDidsKey::from_prefix_to_db_bytes(&Default::default())?;
552
-
for kv in snapshot.prefix(prefix).rev().take(limit) {
553
-
let (key_bytes, _) = kv?;
554
-
let key = db_complete::<AllTimeDidsKey>(&key_bytes)?;
555
-
let rollup_key = AllTimeRollupKey::new(key.collection());
556
-
let db_count_bytes = snapshot.get(rollup_key.to_db_bytes()?)?.expect(
557
-
"integrity: all-time rank rollup must have corresponding all-time count rollup",
559
-
let db_counts = db_complete::<CountsValue>(&db_count_bytes)?;
560
-
assert_eq!(db_counts.dids().estimate() as u64, key.count());
561
-
out.push(NsidCount {
562
-
nsid: key.collection().to_string(),
563
-
records: db_counts.records(),
564
-
dids_estimate: db_counts.dids().estimate() as u64,
630
+
) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)> {
631
+
let snapshot = self.rollups.snapshot();
632
+
let buckets = if let (None, None) = (since, until) {
633
+
vec![CursorBucket::AllTime]
635
+
let mut lower = self.get_earliest_hour(Some(&snapshot))?;
636
+
if let Some(specified) = since {
637
+
if specified > lower {
641
+
let upper = until.unwrap_or_else(|| Cursor::at(SystemTime::now()).into());
642
+
CursorBucket::buckets_spanning(lower, upper)
645
+
OrderCollectionsBy::Lexi { cursor } => {
646
+
self.get_lexi_collections(snapshot, limit, cursor, buckets)
649
+
self.get_ordered_collections(snapshot, limit, order, buckets)?,
fn get_counts_by_collection(&self, collection: &Nsid) -> StorageResult<(u64, u64)> {
···
tokio::task::spawn_blocking(move || {
FjallReader::get_collections(&s, limit, order, since, until)
685
-
async fn get_top_collections_by_count(
688
-
since: Option<HourTruncatedCursor>,
689
-
until: Option<HourTruncatedCursor>,
690
-
) -> StorageResult<Vec<NsidCount>> {
691
-
let s = self.clone();
692
-
tokio::task::spawn_blocking(move || {
693
-
FjallReader::get_top_collections_by_count(&s, limit, since, until)
697
-
async fn get_top_collections_by_dids(
700
-
since: Option<HourTruncatedCursor>,
701
-
until: Option<HourTruncatedCursor>,
702
-
) -> StorageResult<Vec<NsidCount>> {
703
-
let s = self.clone();
704
-
tokio::task::spawn_blocking(move || {
705
-
FjallReader::get_top_collections_by_dids(&s, limit, since, until)