···
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use tokio::sync::mpsc::Receiver;
19
-
use tokio::time::sleep;
19
+
use tokio::time::{interval_at, sleep};
/// Commit the RW batch immediately if this number of events have been read off the mod queue
const MAX_BATCHED_RW_EVENTS: usize = 18;
···
/// Read-write loop reads from the queue for record-modifying events and does rollups
pub async fn rw_loop(&self) -> anyhow::Result<()> {
// TODO: lock so that only one rw loop can possibly be run. or even better, take a mutable resource thing to enforce at compile time.
159
-
sleep(Duration::from_secs_f64(0.1)).await; // todo: interval rate-limit instead
162
-
let keyspace = db.keyspace.clone();
163
-
let partition = db.partition.clone();
159
+
let now = tokio::time::Instant::now();
160
+
let mut time_to_update_events = interval_at(now, Duration::from_secs_f64(0.051));
161
+
let mut time_to_trim_surplus = interval_at(
162
+
now + Duration::from_secs_f64(1.0),
163
+
Duration::from_secs_f64(3.3),
165
+
let mut time_to_roll_up = interval_at(
166
+
now + Duration::from_secs_f64(0.4),
167
+
Duration::from_secs_f64(0.9),
165
-
log::trace!("rw: spawn blocking for batch...");
166
-
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
167
-
log::trace!("rw: getting rw cursor...");
168
-
let mod_cursor = get_static::<ModCursorKey, ModCursorValue>(&partition)?
169
-
.unwrap_or(Cursor::from_start());
170
-
let range = ModQueueItemKey::new(mod_cursor.clone()).range_to_prefix_end()?;
170
+
time_to_update_events.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
171
+
time_to_trim_surplus.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
172
+
time_to_roll_up.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
172
-
let mut db_batch = keyspace.batch();
173
-
let mut batched_rw_items = 0;
174
-
let mut any_tasks_found = false;
175
+
let keyspace = self.db.keyspace.clone();
176
+
let partition = self.db.partition.clone();
178
+
_ = time_to_update_events.tick() => {
179
+
log::debug!("beginning event update task");
180
+
tokio::task::spawn_blocking(move || Self::update_events(keyspace, partition)).await??;
181
+
log::debug!("finished event update task");
183
+
_ = time_to_trim_surplus.tick() => {
184
+
log::debug!("beginning record trim task");
185
+
tokio::task::spawn_blocking(move || Self::trim_old_events(keyspace, partition)).await??;
186
+
log::debug!("finished record trim task");
188
+
_ = time_to_roll_up.tick() => {
189
+
log::debug!("beginning rollup task");
190
+
tokio::task::spawn_blocking(move || Self::roll_up_counts(keyspace, partition)).await??;
191
+
log::debug!("finished rollup task");
176
-
log::trace!("rw: iterating newer rw items...");
197
+
fn update_events(keyspace: Keyspace, partition: PartitionHandle) -> anyhow::Result<()> {
198
+
// TODO: lock this to prevent concurrent rw
178
-
for (i, pair) in partition.range(range.clone()).enumerate() {
179
-
log::trace!("rw: iterating {i}");
180
-
any_tasks_found = true;
200
+
log::trace!("rw: getting rw cursor...");
202
+
get_static::<ModCursorKey, ModCursorValue>(&partition)?.unwrap_or(Cursor::from_start());
203
+
let range = ModQueueItemKey::new(mod_cursor.clone()).range_to_prefix_end()?;
182
-
if i >= MAX_BATCHED_RW_EVENTS {
205
+
let mut db_batch = keyspace.batch();
206
+
let mut batched_rw_items = 0;
207
+
let mut any_tasks_found = false;
186
-
let (key_bytes, val_bytes) = pair?;
187
-
let mod_key = match db_complete::<ModQueueItemKey>(&key_bytes) {
189
-
Err(EncodingError::WrongStaticPrefix(_, _)) => {
190
-
panic!("wsp: mod queue empty.");
192
-
otherwise => otherwise?,
209
+
log::trace!("rw: iterating newer rw items...");
195
-
let mod_value: ModQueueItemValue =
196
-
db_complete::<ModQueueItemStringValue>(&val_bytes)?.try_into()?;
211
+
for (i, pair) in partition.range(range.clone()).enumerate() {
212
+
log::trace!("rw: iterating {i}");
213
+
any_tasks_found = true;
198
-
log::trace!("rw: iterating {i}: sending to batcher {mod_key:?} => {mod_value:?}");
199
-
batched_rw_items += DBWriter {
200
-
keyspace: keyspace.clone(),
201
-
partition: partition.clone(),
203
-
.write_rw(&mut db_batch, mod_key, mod_value)?;
204
-
log::trace!("rw: iterating {i}: back from batcher.");
215
+
if i >= MAX_BATCHED_RW_EVENTS {
206
-
if batched_rw_items >= MAX_BATCHED_RW_ITEMS {
207
-
log::trace!("rw: iterating {i}: batch big enough, breaking out.");
219
+
let (key_bytes, val_bytes) = pair?;
220
+
let mod_key = match db_complete::<ModQueueItemKey>(&key_bytes) {
222
+
Err(EncodingError::WrongStaticPrefix(_, _)) => {
223
+
panic!("wsp: mod queue empty.");
225
+
otherwise => otherwise?,
228
+
let mod_value: ModQueueItemValue =
229
+
db_complete::<ModQueueItemStringValue>(&val_bytes)?.try_into()?;
212
-
if !any_tasks_found {
213
-
log::trace!("rw: skipping batch commit since apparently no items were added (this is normal, skipping is new)");
231
+
log::trace!("rw: iterating {i}: sending to batcher {mod_key:?} => {mod_value:?}");
232
+
batched_rw_items += DBWriter {
233
+
keyspace: keyspace.clone(),
234
+
partition: partition.clone(),
236
+
.write_rw(&mut db_batch, mod_key, mod_value)?;
237
+
log::trace!("rw: iterating {i}: back from batcher.");
217
-
log::info!("rw: committing rw batch with {batched_rw_items} items (items != total inserts/deletes)...");
218
-
let r = db_batch.commit();
219
-
log::info!("rw: commit result: {r:?}");
224
-
log::trace!("rw: back from blocking for rw...");
239
+
if batched_rw_items >= MAX_BATCHED_RW_ITEMS {
240
+
log::trace!("rw: iterating {i}: batch big enough, breaking out.");
226
-
// log::warn!("exited rw loop (rw task)");
245
+
if !any_tasks_found {
246
+
log::trace!("rw: skipping batch commit since apparently no items were added (this is normal, skipping is new)");
247
+
// TODO: is this missing a chance to update the cursor?
251
+
log::info!("rw: committing rw batch with {batched_rw_items} items (items != total inserts/deletes)...");
252
+
let r = db_batch.commit();
253
+
log::info!("rw: commit result: {r:?}");
258
+
fn trim_old_events(_keyspace: Keyspace, _partition: PartitionHandle) -> anyhow::Result<()> {
259
+
// we *could* keep a collection dirty list in memory to reduce the amount of searching here
260
+
// actually can we use seen_by_js_cursor_collection??
261
+
// * ["seen_by_js_cursor_collection"|js_cursor|collection] => u64
262
+
// -> the rollup cursor could handle trims.
265
+
// * ["by_collection"|collection|js_cursor] => [did|rkey|record]
268
+
// 1. collect `collection`s seen during rollup
269
+
// 2. for each collected collection:
270
+
// 3. set up prefix iterator
271
+
// 4. reverse and try to walk back MAX_RETAINED steps
272
+
// 5. if we didn't end iteration yet, start deleting records (and their forward links) until we get to the end
274
+
// ... we can probably do even better with cursor ranges too, since we'll have a cursor range from rollup and it's in the by_collection key
279
+
fn roll_up_counts(_keyspace: Keyspace, _partition: PartitionHandle) -> anyhow::Result<()> {
pub async fn get_collection_records(