+5
-3
link_aggregator/readme.md
+5
-3
link_aggregator/readme.md
···-- [~] handle jetstream restart: don't miss events (currently sketch: rewinds cursor by 1us so we will always double-count at least one event)-- [ ] especially: figure out what the risk is to rotating to another jetstream server in terms of gap/overlap from a different jetstream instance's cursor+- [x] handle jetstream restart: don't miss events (currently sketch: rewinds cursor by 1us so we will always double-count at least one event)+- [x] especially: figure out what the risk is to rotating to another jetstream server in terms of gap/overlap from a different jetstream instance's cursor (follow up separately)···+- [ ] persist the jetstream server url, error if started with a different one (maybe with --switch-streams or something)
+36
-37
link_aggregator/src/consumer/jetstream.rs
+36
-37
link_aggregator/src/consumer/jetstream.rs
·········-counter!("jetstream_connect", "url" => url, "is_retry" => (connect_retries > 0).to_string()).increment(1);+counter!("jetstream_connect", "url" => stream.clone(), "is_retry" => (connect_retries > 0).to_string()).increment(1);println!("jetstream connecting, attempt #{connect_retries}, {stream_url:?} with user-agent: {ua:?}");···eprintln!("jetstream: unexpected text message, should be binary for compressed (ignoring)");-counter!("jetstream_read_fail", "url" => url, "reason" => "unexpected message", "message" => format!("{m:?}")).increment(1);+counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "unexpected message", "message" => format!("{m:?}")).increment(1);······+counter!("jetstream_read_fail", "url" => stream.clone(), "reason" => "clean close").increment(1);···············
+2
-1
link_aggregator/src/consumer/mod.rs
+2
-1
link_aggregator/src/consumer/mod.rs
······
+22
-3
link_aggregator/src/main.rs
+22
-3
link_aggregator/src/main.rs
···+/// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:············
+1
-1
link_aggregator/src/storage/rocks_store.rs
+1
-1
link_aggregator/src/storage/rocks_store.rs
···-self.delete_record_link(&mut mini_batch, &record_link_key); // _could_ use delete range here instead of individual deletes, but since we have to scan anyway it's not obvious if it's better+self.delete_record_link(&mut mini_batch, record_link_key); // _could_ use delete range here instead of individual deletes, but since we have to scan anyway it's not obvious if it's better