Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Compare changes

Choose any two refs to compare.

+300 -73
Cargo.lock
···
[[package]]
name = "anyhow"
-
version = "1.0.97"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f"
[[package]]
name = "arbitrary"
···
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "arrayvec"
···
"nom",
"num-traits",
"rusticata-macros",
-
"thiserror 2.0.16",
"time",
]
···
"axum",
"handlebars",
"serde",
-
"thiserror 2.0.16",
]
[[package]]
···
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf"
[[package]]
name = "base64"
···
checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd"
[[package]]
name = "block-buffer"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "6236364b88b9b6d0bc181ba374cf1ab55ba3ef97a1cb6f8cddad48a273767fb5"
[[package]]
name = "bzip2-sys"
version = "0.1.13+1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"enum_dispatch",
"serde",
]
[[package]]
name = "cc"
···
"multihash",
"serde",
"serde_bytes",
-
"unsigned-varint",
]
[[package]]
···
[[package]]
name = "clap"
-
version = "4.5.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "7eac00902d9d136acd712710d71823fb8ac8004ca445a89e73a41d45aa712931"
dependencies = [
"clap_builder",
"clap_derive",
···
[[package]]
name = "clap_builder"
-
version = "4.5.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "2ad9bbf750e73b5884fb8a211a9424a1906c1e156724260fdae972f31d70e1d6"
dependencies = [
"anstream",
"anstyle",
···
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8"
[[package]]
name = "constellation"
···
]
[[package]]
name = "data-encoding"
-
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "575f75dfd25738df5b91b8e43e14d44bda14637a58fae779fd2b064f8bf3e010"
[[package]]
name = "data-encoding-macro"
-
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "9f9724adfcf41f45bf652b3995837669d73c4d49a1b5ac1ff82905ac7d9b5558"
dependencies = [
"data-encoding",
"data-encoding-macro-internal",
···
[[package]]
name = "data-encoding-macro-internal"
-
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f"
dependencies = [
"data-encoding",
-
"syn 1.0.109",
]
[[package]]
···
"slog-bunyan",
"slog-json",
"slog-term",
-
"thiserror 2.0.16",
"tokio",
"tokio-rustls 0.25.0",
"toml 0.9.7",
···
checksum = "0b25ad44cd4360a0448a9b5a0a6f1c7a621101cca4578706d43c9a821418aebc"
dependencies = [
"byteorder",
-
"byteview",
"dashmap",
"log",
-
"lsm-tree",
"path-absolutize",
"std-semaphore",
"tempfile",
···
source = "git+https://github.com/fjall-rs/fjall.git#42d811f7c8cc9004407d520d37d2a1d8d246c03d"
dependencies = [
"byteorder",
-
"byteview",
"dashmap",
"log",
-
"lsm-tree",
"path-absolutize",
"std-semaphore",
"tempfile",
···
]
[[package]]
name = "flate2"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"mixtrics",
"pin-project",
"serde",
-
"thiserror 2.0.16",
"tokio",
"tracing",
]
···
"parking_lot",
"pin-project",
"serde",
-
"thiserror 2.0.16",
"tokio",
"twox-hash",
]
···
"parking_lot",
"pin-project",
"serde",
-
"thiserror 2.0.16",
"tokio",
"tracing",
]
···
"pin-project",
"rand 0.9.1",
"serde",
-
"thiserror 2.0.16",
"tokio",
"tracing",
"twox-hash",
···
"pest_derive",
"serde",
"serde_json",
-
"thiserror 2.0.16",
"walkdir",
]
···
"once_cell",
"rand 0.9.1",
"ring",
-
"thiserror 2.0.16",
"tinyvec",
"tokio",
"tracing",
···
"rand 0.9.1",
"resolv-conf",
"smallvec",
-
"thiserror 2.0.16",
"tokio",
"tracing",
]
···
]
[[package]]
name = "is-terminal"
version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"metrics",
"serde",
"serde_json",
-
"thiserror 2.0.16",
"tokio",
"tokio-tungstenite 0.26.2",
"url",
···
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
dependencies = [
"cfg-if",
-
"windows-targets 0.48.5",
]
[[package]]
···
version = "0.1.0"
dependencies = [
"anyhow",
"fluent-uri",
"nom",
-
"thiserror 2.0.16",
"tinyjson",
]
···
[[package]]
name = "lsm-tree"
-
version = "2.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "55b6d7475a8dd22e749186968daacf8e2a77932b061b1bd263157987bbfc0c6c"
dependencies = [
"byteorder",
"crossbeam-skiplist",
···
]
[[package]]
name = "lz4"
version = "1.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
name = "lz4_flex"
-
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
[[package]]
name = "mach2"
···
]
[[package]]
name = "match_cfg"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"metrics",
"metrics-util 0.20.0",
"quanta",
-
"thiserror 2.0.16",
"tokio",
"tracing",
]
···
[[package]]
name = "multibase"
-
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "9b3539ec3c1f04ac9748a260728e855f261b4977f5c3406612c884564f329404"
dependencies = [
"base-x",
"data-encoding",
"data-encoding-macro",
]
···
dependencies = [
"core2",
"serde",
-
"unsigned-varint",
]
[[package]]
···
checksum = "1db05f56d34358a8b1066f67cbb203ee3e7ed2ba674a6263a1d5ec6db2204323"
dependencies = [
"memchr",
-
"thiserror 2.0.16",
"ucd-trie",
]
···
"rusqlite",
"serde",
"serde_json",
-
"thiserror 2.0.16",
"tokio",
"tracing-subscriber",
]
···
"smallvec",
"sync_wrapper",
"tempfile",
-
"thiserror 2.0.16",
"tokio",
"tokio-rustls 0.26.2",
"tokio-stream",
···
"serde_json",
"serde_urlencoded",
"serde_yaml",
-
"thiserror 2.0.16",
"tokio",
]
···
"quote",
"regex",
"syn 2.0.106",
-
"thiserror 2.0.16",
]
[[package]]
···
[[package]]
name = "quick_cache"
-
version = "0.6.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "8f8ed0655cbaf18a26966142ad23b95d8ab47221c50c4f73a1db7d0d2d6e3da8"
dependencies = [
"equivalent",
"hashbrown 0.15.2",
···
"rustc-hash 2.1.1",
"rustls 0.23.31",
"socket2 0.5.9",
-
"thiserror 2.0.16",
"tokio",
"tracing",
"web-time",
···
"rustls 0.23.31",
"rustls-pki-types",
"slab",
-
"thiserror 2.0.16",
"tinyvec",
"tracing",
"web-time",
···
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "reqwest"
-
version = "0.12.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "cbc931937e6ca3a06e3b6c0aa7841849b160a90351d6ab467a8b9b9959767531"
dependencies = [
"async-compression",
"base64 0.22.1",
···
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
···
[[package]]
name = "self_cell"
-
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "c2fdfc24bc566f839a2da4c4295b82db7d25a24253867d5c64355abb5799bdbe"
[[package]]
name = "semver"
···
[[package]]
name = "serde_bytes"
-
version = "0.11.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "8437fd221bde2d4ca316d61b90e337e9e702b3820b87d63caa9ba6c02bd06d96"
dependencies = [
"serde",
]
[[package]]
···
]
[[package]]
name = "serde_json"
version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"percent-encoding",
"ryu",
"serde",
-
"thiserror 2.0.16",
]
[[package]]
···
]
[[package]]
name = "sha1"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
dependencies = [
"num-bigint",
"num-traits",
-
"thiserror 2.0.16",
"time",
]
···
"rustls 0.23.31",
"serde",
"serde_json",
-
"thiserror 2.0.16",
"time",
"tokio",
"tokio-util",
···
name = "spacedust"
version = "0.1.0"
dependencies = [
"async-trait",
"clap",
"ctrlc",
"dropshot",
"env_logger",
"futures",
"http",
"jetstream",
"links",
"log",
"metrics",
"metrics-exporter-prometheus 0.17.2",
"rand 0.9.1",
"schemars",
"semver",
"serde",
"serde_json",
"serde_qs",
-
"thiserror 2.0.16",
"tinyjson",
"tokio",
"tokio-tungstenite 0.27.0",
···
[[package]]
name = "tempfile"
-
version = "3.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf"
dependencies = [
"fastrand",
"getrandom 0.3.3",
···
[[package]]
name = "thiserror"
-
version = "2.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "3467d614147380f2e4e374161426ff399c91084acd2363eaf549172b3d5e60c0"
dependencies = [
-
"thiserror-impl 2.0.16",
]
[[package]]
···
[[package]]
name = "thiserror-impl"
-
version = "2.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "6c5e1be1c48b9172ee610da68fd9cd2770e7a4056cb3fc98710ee6906f0c7960"
dependencies = [
"proc-macro2",
"quote",
···
"native-tls",
"rand 0.9.1",
"sha1",
-
"thiserror 2.0.16",
"url",
"utf-8",
]
···
"log",
"rand 0.9.1",
"sha1",
-
"thiserror 2.0.16",
"utf-8",
]
···
"http",
"jetstream",
"log",
-
"lsm-tree",
"metrics",
"metrics-exporter-prometheus 0.17.2",
"schemars",
···
"serde_qs",
"sha2",
"tempfile",
-
"thiserror 2.0.16",
"tikv-jemallocator",
"tokio",
"tokio-util",
···
[[package]]
name = "unsigned-varint"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06"
···
checksum = "62fc7c4ce161f049607ecea654dca3f2d727da5371ae85e2e4f14ce2b98ed67c"
dependencies = [
"byteorder",
-
"byteview",
"interval-heap",
"log",
"path-absolutize",
···
]
[[package]]
name = "web-sys"
version = "0.3.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"reqwest",
"serde",
"serde_json",
-
"thiserror 2.0.16",
"tokio",
"tokio-util",
"url",
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
-
"windows-sys 0.48.0",
]
[[package]]
···
"nom",
"oid-registry",
"rusticata-macros",
-
"thiserror 2.0.16",
"time",
]
···
[[package]]
name = "anyhow"
+
version = "1.0.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61"
[[package]]
name = "arbitrary"
···
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
+
+
[[package]]
+
name = "arrayref"
+
version = "0.3.9"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb"
[[package]]
name = "arrayvec"
···
"nom",
"num-traits",
"rusticata-macros",
+
"thiserror 2.0.17",
"time",
]
···
"axum",
"handlebars",
"serde",
+
"thiserror 2.0.17",
]
[[package]]
···
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf"
+
+
[[package]]
+
name = "base256emoji"
+
version = "1.0.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "b5e9430d9a245a77c92176e649af6e275f20839a48389859d1661e9a128d077c"
+
dependencies = [
+
"const-str",
+
"match-lookup",
+
]
[[package]]
name = "base64"
···
checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd"
[[package]]
+
name = "blake3"
+
version = "1.8.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "3888aaa89e4b2a40fca9848e400f6a658a5a3978de7be858e209cafa8be9a4a0"
+
dependencies = [
+
"arrayref",
+
"arrayvec",
+
"cc",
+
"cfg-if",
+
"constant_time_eq",
+
]
+
+
[[package]]
name = "block-buffer"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
+
name = "byteorder-lite"
+
version = "0.1.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495"
+
+
[[package]]
name = "bytes"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
checksum = "6236364b88b9b6d0bc181ba374cf1ab55ba3ef97a1cb6f8cddad48a273767fb5"
[[package]]
+
name = "byteview"
+
version = "0.8.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "1e6b0e42e210b794e14b152c6fe1a55831e30ef4a0f5dc39d73d714fb5f1906c"
+
+
[[package]]
name = "bzip2-sys"
version = "0.1.13+1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"enum_dispatch",
"serde",
]
+
+
[[package]]
+
name = "cbor4ii"
+
version = "0.2.14"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "b544cf8c89359205f4f990d0e6f3828db42df85b5dac95d09157a250eb0749c4"
+
dependencies = [
+
"serde",
+
]
+
+
[[package]]
+
name = "cbor4ii"
+
version = "1.2.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "b28d2802395e3bccd95cc4ae984bff7444b6c1f5981da46a41360c42a2c7e2d9"
[[package]]
name = "cc"
···
"multihash",
"serde",
"serde_bytes",
+
"unsigned-varint 0.8.0",
]
[[package]]
···
[[package]]
name = "clap"
+
version = "4.5.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae"
dependencies = [
"clap_builder",
"clap_derive",
···
[[package]]
name = "clap_builder"
+
version = "4.5.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9"
dependencies = [
"anstream",
"anstyle",
···
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8"
+
+
[[package]]
+
name = "const-str"
+
version = "0.4.3"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2f421161cb492475f1661ddc9815a745a1c894592070661180fdec3d4872e9c3"
+
+
[[package]]
+
name = "constant_time_eq"
+
version = "0.3.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6"
[[package]]
name = "constellation"
···
]
[[package]]
+
name = "dasl"
+
version = "0.2.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "b59666035a4386b0fd272bd78da4cbc3ccb558941e97579ab00f0eb4639f2a49"
+
dependencies = [
+
"blake3",
+
"cbor4ii 1.2.0",
+
"data-encoding",
+
"data-encoding-macro",
+
"scopeguard",
+
"serde",
+
"serde_bytes",
+
"sha2",
+
"thiserror 2.0.17",
+
]
+
+
[[package]]
name = "data-encoding"
+
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476"
[[package]]
name = "data-encoding-macro"
+
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "47ce6c96ea0102f01122a185683611bd5ac8d99e62bc59dd12e6bda344ee673d"
dependencies = [
"data-encoding",
"data-encoding-macro-internal",
···
[[package]]
name = "data-encoding-macro-internal"
+
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976"
dependencies = [
"data-encoding",
+
"syn 2.0.106",
]
[[package]]
···
"slog-bunyan",
"slog-json",
"slog-term",
+
"thiserror 2.0.17",
"tokio",
"tokio-rustls 0.25.0",
"toml 0.9.7",
···
checksum = "0b25ad44cd4360a0448a9b5a0a6f1c7a621101cca4578706d43c9a821418aebc"
dependencies = [
"byteorder",
+
"byteview 0.6.1",
"dashmap",
"log",
+
"lsm-tree 2.10.4",
"path-absolutize",
"std-semaphore",
"tempfile",
···
source = "git+https://github.com/fjall-rs/fjall.git#42d811f7c8cc9004407d520d37d2a1d8d246c03d"
dependencies = [
"byteorder",
+
"byteview 0.6.1",
"dashmap",
"log",
+
"lsm-tree 2.10.4",
"path-absolutize",
"std-semaphore",
"tempfile",
···
]
[[package]]
+
name = "fjall"
+
version = "3.0.0-pre.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "467588c1f15d1cfa9e43f02a45cf55d82fa1f12a6ae961b848c520458525600c"
+
dependencies = [
+
"byteorder-lite",
+
"byteview 0.8.0",
+
"dashmap",
+
"log",
+
"lsm-tree 3.0.0-pre.0",
+
"std-semaphore",
+
"tempfile",
+
"xxhash-rust",
+
]
+
+
[[package]]
name = "flate2"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"mixtrics",
"pin-project",
"serde",
+
"thiserror 2.0.17",
"tokio",
"tracing",
]
···
"parking_lot",
"pin-project",
"serde",
+
"thiserror 2.0.17",
"tokio",
"twox-hash",
]
···
"parking_lot",
"pin-project",
"serde",
+
"thiserror 2.0.17",
"tokio",
"tracing",
]
···
"pin-project",
"rand 0.9.1",
"serde",
+
"thiserror 2.0.17",
"tokio",
"tracing",
"twox-hash",
···
"pest_derive",
"serde",
"serde_json",
+
"thiserror 2.0.17",
"walkdir",
]
···
"once_cell",
"rand 0.9.1",
"ring",
+
"thiserror 2.0.17",
"tinyvec",
"tokio",
"tracing",
···
"rand 0.9.1",
"resolv-conf",
"smallvec",
+
"thiserror 2.0.17",
"tokio",
"tracing",
]
···
]
[[package]]
+
name = "iroh-car"
+
version = "0.5.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "cb7f8cd4cb9aa083fba8b52e921764252d0b4dcb1cd6d120b809dbfe1106e81a"
+
dependencies = [
+
"anyhow",
+
"cid",
+
"futures",
+
"serde",
+
"serde_ipld_dagcbor",
+
"thiserror 1.0.69",
+
"tokio",
+
"unsigned-varint 0.7.2",
+
]
+
+
[[package]]
name = "is-terminal"
version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"metrics",
"serde",
"serde_json",
+
"thiserror 2.0.17",
"tokio",
"tokio-tungstenite 0.26.2",
"url",
···
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
dependencies = [
"cfg-if",
+
"windows-targets 0.52.6",
]
[[package]]
···
version = "0.1.0"
dependencies = [
"anyhow",
+
"dasl",
"fluent-uri",
"nom",
+
"serde",
+
"thiserror 2.0.17",
"tinyjson",
]
···
[[package]]
name = "lsm-tree"
+
version = "2.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "799399117a2bfb37660e08be33f470958babb98386b04185288d829df362ea15"
dependencies = [
"byteorder",
"crossbeam-skiplist",
···
]
[[package]]
+
name = "lsm-tree"
+
version = "3.0.0-pre.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "be375d45e348328e78582dffbda4f1709dd52fca27c1a81c7bf6ca134e6335f7"
+
dependencies = [
+
"byteorder-lite",
+
"byteview 0.8.0",
+
"crossbeam-skiplist",
+
"enum_dispatch",
+
"interval-heap",
+
"log",
+
"lz4_flex",
+
"quick_cache",
+
"rustc-hash 2.1.1",
+
"self_cell",
+
"sfa",
+
"tempfile",
+
"varint-rs",
+
"xxhash-rust",
+
]
+
+
[[package]]
name = "lz4"
version = "1.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
[[package]]
name = "lz4_flex"
+
version = "0.11.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a"
[[package]]
name = "mach2"
···
]
[[package]]
+
name = "match-lookup"
+
version = "0.1.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "1265724d8cb29dbbc2b0f06fffb8bf1a8c0cf73a78eede9ba73a4a66c52a981e"
+
dependencies = [
+
"proc-macro2",
+
"quote",
+
"syn 1.0.109",
+
]
+
+
[[package]]
name = "match_cfg"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"metrics",
"metrics-util 0.20.0",
"quanta",
+
"thiserror 2.0.17",
"tokio",
"tracing",
]
···
[[package]]
name = "multibase"
+
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "8694bb4835f452b0e3bb06dbebb1d6fc5385b6ca1caf2e55fd165c042390ec77"
dependencies = [
"base-x",
+
"base256emoji",
"data-encoding",
"data-encoding-macro",
]
···
dependencies = [
"core2",
"serde",
+
"unsigned-varint 0.8.0",
]
[[package]]
···
checksum = "1db05f56d34358a8b1066f67cbb203ee3e7ed2ba674a6263a1d5ec6db2204323"
dependencies = [
"memchr",
+
"thiserror 2.0.17",
"ucd-trie",
]
···
"rusqlite",
"serde",
"serde_json",
+
"thiserror 2.0.17",
"tokio",
"tracing-subscriber",
]
···
"smallvec",
"sync_wrapper",
"tempfile",
+
"thiserror 2.0.17",
"tokio",
"tokio-rustls 0.26.2",
"tokio-stream",
···
"serde_json",
"serde_urlencoded",
"serde_yaml",
+
"thiserror 2.0.17",
"tokio",
]
···
"quote",
"regex",
"syn 2.0.106",
+
"thiserror 2.0.17",
]
[[package]]
···
[[package]]
name = "quick_cache"
+
version = "0.6.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9ad6644cb07b7f3488b9f3d2fde3b4c0a7fa367cafefb39dff93a659f76eb786"
dependencies = [
"equivalent",
"hashbrown 0.15.2",
···
"rustc-hash 2.1.1",
"rustls 0.23.31",
"socket2 0.5.9",
+
"thiserror 2.0.17",
"tokio",
"tracing",
"web-time",
···
"rustls 0.23.31",
"rustls-pki-types",
"slab",
+
"thiserror 2.0.17",
"tinyvec",
"tracing",
"web-time",
···
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
+
name = "repo-stream"
+
version = "0.2.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "093b48e604c138949bf3d4a1a9bc1165feb1db28a73af0101c84eb703d279f43"
+
dependencies = [
+
"bincode 2.0.1",
+
"futures",
+
"futures-core",
+
"ipld-core",
+
"iroh-car",
+
"log",
+
"multibase",
+
"rusqlite",
+
"serde",
+
"serde_bytes",
+
"serde_ipld_dagcbor",
+
"sha2",
+
"thiserror 2.0.17",
+
"tokio",
+
]
+
+
[[package]]
name = "reqwest"
+
version = "0.12.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f"
dependencies = [
"async-compression",
"base64 0.22.1",
···
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
+
"wasm-streams",
"web-sys",
]
···
[[package]]
name = "self_cell"
+
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "0f7d95a54511e0c7be3f51e8867aa8cf35148d7b9445d44de2f943e2b206e749"
[[package]]
name = "semver"
···
[[package]]
name = "serde_bytes"
+
version = "0.11.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a5d440709e79d88e51ac01c4b72fc6cb7314017bb7da9eeff678aa94c10e3ea8"
dependencies = [
"serde",
+
"serde_core",
]
[[package]]
···
]
[[package]]
+
name = "serde_ipld_dagcbor"
+
version = "0.6.4"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "46182f4f08349a02b45c998ba3215d3f9de826246ba02bb9dddfe9a2a2100778"
+
dependencies = [
+
"cbor4ii 0.2.14",
+
"ipld-core",
+
"scopeguard",
+
"serde",
+
]
+
+
[[package]]
name = "serde_json"
version = "1.0.145"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"percent-encoding",
"ryu",
"serde",
+
"thiserror 2.0.17",
]
[[package]]
···
]
[[package]]
+
name = "sfa"
+
version = "0.0.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e5f5f9dc21f55409f15103d5a7e7601b804935923c7fe4746dc806c3a422a038"
+
dependencies = [
+
"byteorder-lite",
+
"log",
+
"xxhash-rust",
+
]
+
+
[[package]]
name = "sha1"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
dependencies = [
"num-bigint",
"num-traits",
+
"thiserror 2.0.17",
"time",
]
···
"rustls 0.23.31",
"serde",
"serde_json",
+
"thiserror 2.0.17",
"time",
"tokio",
"tokio-util",
···
name = "spacedust"
version = "0.1.0"
dependencies = [
+
"anyhow",
+
"async-channel",
"async-trait",
"clap",
"ctrlc",
+
"dasl",
"dropshot",
"env_logger",
+
"fjall 3.0.0-pre.0",
"futures",
"http",
+
"ipld-core",
"jetstream",
"links",
"log",
"metrics",
"metrics-exporter-prometheus 0.17.2",
"rand 0.9.1",
+
"repo-stream",
+
"reqwest",
"schemars",
"semver",
"serde",
+
"serde_ipld_dagcbor",
"serde_json",
"serde_qs",
+
"thiserror 2.0.17",
"tinyjson",
"tokio",
"tokio-tungstenite 0.27.0",
···
[[package]]
name = "tempfile"
+
version = "3.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16"
dependencies = [
"fastrand",
"getrandom 0.3.3",
···
[[package]]
name = "thiserror"
+
version = "2.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8"
dependencies = [
+
"thiserror-impl 2.0.17",
]
[[package]]
···
[[package]]
name = "thiserror-impl"
+
version = "2.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913"
dependencies = [
"proc-macro2",
"quote",
···
"native-tls",
"rand 0.9.1",
"sha1",
+
"thiserror 2.0.17",
"url",
"utf-8",
]
···
"log",
"rand 0.9.1",
"sha1",
+
"thiserror 2.0.17",
"utf-8",
]
···
"http",
"jetstream",
"log",
+
"lsm-tree 2.10.4",
"metrics",
"metrics-exporter-prometheus 0.17.2",
"schemars",
···
"serde_qs",
"sha2",
"tempfile",
+
"thiserror 2.0.17",
"tikv-jemallocator",
"tokio",
"tokio-util",
···
[[package]]
name = "unsigned-varint"
+
version = "0.7.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105"
+
+
[[package]]
+
name = "unsigned-varint"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06"
···
checksum = "62fc7c4ce161f049607ecea654dca3f2d727da5371ae85e2e4f14ce2b98ed67c"
dependencies = [
"byteorder",
+
"byteview 0.6.1",
"interval-heap",
"log",
"path-absolutize",
···
]
[[package]]
+
name = "wasm-streams"
+
version = "0.4.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
+
dependencies = [
+
"futures-util",
+
"js-sys",
+
"wasm-bindgen",
+
"wasm-bindgen-futures",
+
"web-sys",
+
]
+
+
[[package]]
name = "web-sys"
version = "0.3.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
···
"reqwest",
"serde",
"serde_json",
+
"thiserror 2.0.17",
"tokio",
"tokio-util",
"url",
···
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
+
"windows-sys 0.59.0",
]
[[package]]
···
"nom",
"oid-registry",
"rusticata-macros",
+
"thiserror 2.0.17",
"time",
]
+29 -10
constellation/src/bin/main.rs
···
/// Saved jsonl from jetstream to use instead of a live subscription
#[arg(short, long)]
fixture: Option<PathBuf>,
}
#[derive(Debug, Clone, ValueEnum)]
···
rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
}
println!("rocks ready.");
-
run(
-
rocks,
-
fixture,
-
args.data,
-
stream,
-
bind,
-
metrics_bind,
-
stay_alive,
-
)
}
}
}
···
'monitor: loop {
match readable.get_stats() {
-
Ok(StorageStats { dids, targetables, linking_records }) => {
metrics::gauge!("storage.stats.dids").set(dids as f64);
metrics::gauge!("storage.stats.targetables").set(targetables as f64);
metrics::gauge!("storage.stats.linking_records").set(linking_records as f64);
···
/// Saved jsonl from jetstream to use instead of a live subscription
#[arg(short, long)]
fixture: Option<PathBuf>,
+
/// run a scan across the target id table and write all key -> ids to id -> keys
+
#[arg(long, action)]
+
repair_target_ids: bool,
}
#[derive(Debug, Clone, ValueEnum)]
···
rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
}
println!("rocks ready.");
+
std::thread::scope(|s| {
+
if args.repair_target_ids {
+
let rocks = rocks.clone();
+
let stay_alive = stay_alive.clone();
+
s.spawn(move || {
+
let rep = rocks.run_repair(time::Duration::from_millis(0), stay_alive);
+
eprintln!("repair finished: {rep:?}");
+
rep
+
});
+
}
+
s.spawn(|| {
+
let r = run(
+
rocks,
+
fixture,
+
args.data,
+
stream,
+
bind,
+
metrics_bind,
+
stay_alive,
+
);
+
eprintln!("run finished: {r:?}");
+
r
+
});
+
});
+
Ok(())
}
}
}
···
'monitor: loop {
match readable.get_stats() {
+
Ok(StorageStats { dids, targetables, linking_records, .. }) => {
metrics::gauge!("storage.stats.dids").set(dids as f64);
metrics::gauge!("storage.stats.targetables").set(targetables as f64);
metrics::gauge!("storage.stats.linking_records").set(linking_records as f64);
+8 -6
constellation/src/server/filters.rs
···
Ok({
if let Some(link) = parse_any_link(s) {
match link {
-
Link::AtUri(at_uri) => at_uri.strip_prefix("at://").map(|noproto| {
-
format!("https://atproto-browser-plus-links.vercel.app/at/{noproto}")
-
}),
-
Link::Did(did) => Some(format!(
-
"https://atproto-browser-plus-links.vercel.app/at/{did}"
-
)),
Link::Uri(uri) => Some(uri),
}
} else {
···
pub fn human_number(n: &u64) -> askama::Result<String> {
Ok(n.to_formatted_string(&Locale::en))
}
···
Ok({
if let Some(link) = parse_any_link(s) {
match link {
+
Link::AtUri(at_uri) => at_uri
+
.strip_prefix("at://")
+
.map(|noproto| format!("https://pdsls.dev/at://{noproto}")),
+
Link::Did(did) => Some(format!("https://pdsls.dev/at://{did}")),
Link::Uri(uri) => Some(uri),
}
} else {
···
pub fn human_number(n: &u64) -> askama::Result<String> {
Ok(n.to_formatted_string(&Locale::en))
}
+
+
pub fn to_u64(n: usize) -> askama::Result<u64> {
+
Ok(n as u64)
+
}
+9 -8
constellation/src/server/mod.rs
···
DEFAULT_CURSOR_LIMIT
}
-
const INDEX_BEGAN_AT_TS: u64 = 1738083600; // TODO: not this
-
fn to500(e: tokio::task::JoinError) -> http::StatusCode {
eprintln!("handler error: {e}");
http::StatusCode::INTERNAL_SERVER_ERROR
···
#[template(path = "hello.html.j2")]
struct HelloReponse {
help: &'static str,
-
days_indexed: u64,
stats: StorageStats,
}
fn hello(
···
let stats = store
.get_stats()
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
-
let days_indexed = (UNIX_EPOCH + Duration::from_secs(INDEX_BEGAN_AT_TS))
-
.elapsed()
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?
-
.as_secs()
-
/ 86400;
Ok(acceptable(accept, HelloReponse {
help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.",
days_indexed,
···
};
let path = format!(".{path}");
let paged = store
.get_many_to_many_counts(
&query.subject,
collection,
&path,
-
&query.path_to_other,
limit,
cursor_key,
&filter_dids,
···
DEFAULT_CURSOR_LIMIT
}
fn to500(e: tokio::task::JoinError) -> http::StatusCode {
eprintln!("handler error: {e}");
http::StatusCode::INTERNAL_SERVER_ERROR
···
#[template(path = "hello.html.j2")]
struct HelloReponse {
help: &'static str,
+
days_indexed: Option<u64>,
stats: StorageStats,
}
fn hello(
···
let stats = store
.get_stats()
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?;
+
let days_indexed = stats
+
.started_at
+
.map(|c| (UNIX_EPOCH + Duration::from_micros(c)).elapsed())
+
.transpose()
.map_err(|_| http::StatusCode::INTERNAL_SERVER_ERROR)?
+
.map(|d| d.as_secs() / 86_400);
Ok(acceptable(accept, HelloReponse {
help: "open this URL in a web browser (or request with Accept: text/html) for information about this API.",
days_indexed,
···
};
let path = format!(".{path}");
+
let path_to_other = format!(".{}", query.path_to_other);
+
let paged = store
.get_many_to_many_counts(
&query.subject,
collection,
&path,
+
&path_to_other,
limit,
cursor_key,
&filter_dids,
+2
constellation/src/storage/mem_store.rs
···
dids,
targetables,
linking_records,
})
}
}
···
dids,
targetables,
linking_records,
+
started_at: None,
+
other_data: Default::default(),
})
}
}
+6
constellation/src/storage/mod.rs
···
/// records with multiple links are single-counted.
/// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it.
pub linking_records: u64,
}
pub trait LinkStorage: Send + Sync {
···
/// records with multiple links are single-counted.
/// for LSM stores, deleted links don't decrement this, and updated records with any links will likely increment it.
pub linking_records: u64,
+
+
/// first jetstream cursor when this instance first started
+
pub started_at: Option<u64>,
+
+
/// anything else we want to throw in
+
pub other_data: HashMap<String, u64>,
}
pub trait LinkStorage: Send + Sync {
+168 -3
constellation/src/storage/rocks_store.rs
···
Arc,
};
use std::thread;
-
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
static DID_IDS_CF: &str = "did_ids";
···
static LINK_TARGETS_CF: &str = "link_targets";
static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor";
// todo: actually understand and set these options probably better
fn rocks_opts_base() -> Options {
···
_key_marker: PhantomData,
_val_marker: PhantomData,
name: name.into(),
-
id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninint", first seq num will be 1
}
}
fn get_id_val(
···
}
}
impl RocksStorage {
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
Self::describe_metrics();
-
RocksStorage::open_readmode(path, false)
}
pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
···
let did_id_table = IdTable::setup(DID_IDS_CF);
let target_id_table = IdTable::setup(TARGET_IDS_CF);
let cfs = vec![
// id reference tables
did_id_table.cf_descriptor(),
···
})
}
pub fn start_backup(
&mut self,
path: PathBuf,
···
let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?;
let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
return Ok(Default::default());
};
···
.take(1)
.next()
else {
continue;
};
···
.map(|s| s.parse::<u64>())
.transpose()?
.unwrap_or(0);
Ok(StorageStats {
dids,
targetables,
linking_records,
})
}
}
···
impl AsRocksValue for &TargetId {}
impl KeyFromRocks for TargetKey {}
impl ValueFromRocks for TargetId {}
// target_links table
impl AsRocksKey for &TargetId {}
···
Arc,
};
use std::thread;
+
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio_util::sync::CancellationToken;
static DID_IDS_CF: &str = "did_ids";
···
static LINK_TARGETS_CF: &str = "link_targets";
static JETSTREAM_CURSOR_KEY: &str = "jetstream_cursor";
+
static STARTED_AT_KEY: &str = "jetstream_first_cursor";
+
// add reverse mappings for targets if this db was running before that was a thing
+
static TARGET_ID_REPAIR_STATE_KEY: &str = "target_id_table_repair_state";
+
+
static COZY_FIRST_CURSOR: u64 = 1_738_083_600_000_000; // constellation.microcosm.blue started
+
+
#[derive(Debug, Clone, Serialize, Deserialize)]
+
struct TargetIdRepairState {
+
/// start time for repair, microseconds timestamp
+
current_us_started_at: u64,
+
/// id table's latest id when repair started
+
id_when_started: u64,
+
/// id table id
+
latest_repaired_i: u64,
+
}
+
impl AsRocksValue for TargetIdRepairState {}
+
impl ValueFromRocks for TargetIdRepairState {}
// todo: actually understand and set these options probably better
fn rocks_opts_base() -> Options {
···
_key_marker: PhantomData,
_val_marker: PhantomData,
name: name.into(),
+
id_seq: Arc::new(AtomicU64::new(0)), // zero is "uninit", first seq num will be 1
}
}
fn get_id_val(
···
}
}
+
fn now() -> u64 {
+
SystemTime::now()
+
.duration_since(UNIX_EPOCH)
+
.unwrap()
+
.as_micros() as u64
+
}
+
impl RocksStorage {
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
Self::describe_metrics();
+
let me = RocksStorage::open_readmode(path, false)?;
+
me.global_init()?;
+
Ok(me)
}
pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> {
···
let did_id_table = IdTable::setup(DID_IDS_CF);
let target_id_table = IdTable::setup(TARGET_IDS_CF);
+
// note: global stuff like jetstream cursor goes in the default cf
+
// these are bonus extra cfs
let cfs = vec![
// id reference tables
did_id_table.cf_descriptor(),
···
})
}
+
fn global_init(&self) -> Result<()> {
+
let first_run = self.db.get(JETSTREAM_CURSOR_KEY)?.is_some();
+
if first_run {
+
self.db.put(STARTED_AT_KEY, _rv(now()))?;
+
+
// hack / temporary: if we're a new db, put in a completed repair
+
// state so we don't run repairs (repairs are for old-code dbs)
+
let completed = TargetIdRepairState {
+
id_when_started: 0,
+
current_us_started_at: 0,
+
latest_repaired_i: 0,
+
};
+
self.db.put(TARGET_ID_REPAIR_STATE_KEY, _rv(completed))?;
+
}
+
Ok(())
+
}
+
+
pub fn run_repair(&self, breather: Duration, stay_alive: CancellationToken) -> Result<bool> {
+
let mut state = match self
+
.db
+
.get(TARGET_ID_REPAIR_STATE_KEY)?
+
.map(|s| _vr(&s))
+
.transpose()?
+
{
+
Some(s) => s,
+
None => TargetIdRepairState {
+
id_when_started: self.did_id_table.priv_id_seq,
+
current_us_started_at: now(),
+
latest_repaired_i: 0,
+
},
+
};
+
+
eprintln!("initial repair state: {state:?}");
+
+
let cf = self.db.cf_handle(TARGET_IDS_CF).unwrap();
+
+
let mut iter = self.db.raw_iterator_cf(&cf);
+
iter.seek_to_first();
+
+
eprintln!("repair iterator sent to first key");
+
+
// skip ahead if we're done some, or take a single first step
+
for _ in 0..state.latest_repaired_i {
+
iter.next();
+
}
+
+
eprintln!(
+
"repair iterator skipped to {}th key",
+
state.latest_repaired_i
+
);
+
+
let mut maybe_done = false;
+
+
let mut write_fast = rocksdb::WriteOptions::default();
+
write_fast.set_sync(false);
+
write_fast.disable_wal(true);
+
+
while !stay_alive.is_cancelled() && !maybe_done {
+
// let mut batch = WriteBatch::default();
+
+
let mut any_written = false;
+
+
for _ in 0..1000 {
+
if state.latest_repaired_i % 1_000_000 == 0 {
+
eprintln!("target iter at {}", state.latest_repaired_i);
+
}
+
state.latest_repaired_i += 1;
+
+
if !iter.valid() {
+
eprintln!("invalid iter, are we done repairing?");
+
maybe_done = true;
+
break;
+
};
+
+
// eprintln!("iterator seems to be valid! getting the key...");
+
let raw_key = iter.key().unwrap();
+
if raw_key.len() == 8 {
+
// eprintln!("found an 8-byte key, skipping it since it's probably an id...");
+
iter.next();
+
continue;
+
}
+
let target: TargetKey = _kr::<TargetKey>(raw_key)?;
+
let target_id: TargetId = _vr(iter.value().unwrap())?;
+
+
self.db
+
.put_cf_opt(&cf, target_id.id().to_be_bytes(), _rv(&target), &write_fast)?;
+
any_written = true;
+
iter.next();
+
}
+
+
if any_written {
+
self.db
+
.put(TARGET_ID_REPAIR_STATE_KEY, _rv(state.clone()))?;
+
std::thread::sleep(breather);
+
}
+
}
+
+
eprintln!("repair iterator done.");
+
+
Ok(false)
+
}
+
pub fn start_backup(
&mut self,
path: PathBuf,
···
let after = after.map(|s| s.parse::<u64>().map(TargetId)).transpose()?;
let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
+
eprintln!("nothin doin for this target, {target_key:?}");
return Ok(Default::default());
};
···
.take(1)
.next()
else {
+
eprintln!("no forward match");
continue;
};
···
.map(|s| s.parse::<u64>())
.transpose()?
.unwrap_or(0);
+
let started_at = self
+
.db
+
.get(STARTED_AT_KEY)?
+
.map(|c| _vr(&c))
+
.transpose()?
+
.unwrap_or(COZY_FIRST_CURSOR);
+
+
let other_data = self
+
.db
+
.get(TARGET_ID_REPAIR_STATE_KEY)?
+
.map(|s| _vr(&s))
+
.transpose()?
+
.map(
+
|TargetIdRepairState {
+
current_us_started_at,
+
id_when_started,
+
latest_repaired_i,
+
}| {
+
HashMap::from([
+
("current_us_started_at".to_string(), current_us_started_at),
+
("id_when_started".to_string(), id_when_started),
+
("latest_repaired_i".to_string(), latest_repaired_i),
+
])
+
},
+
)
+
.unwrap_or(HashMap::default());
+
Ok(StorageStats {
dids,
targetables,
linking_records,
+
started_at: Some(started_at),
+
other_data,
})
}
}
···
impl AsRocksValue for &TargetId {}
impl KeyFromRocks for TargetKey {}
impl ValueFromRocks for TargetId {}
+
+
// temp?
+
impl KeyFromRocks for TargetId {}
+
impl AsRocksValue for &TargetKey {}
// target_links table
impl AsRocksKey for &TargetId {}
+1 -1
constellation/templates/dids.html.j2
···
{% for did in linking_dids %}
<pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ did.0 }}
-> see <a href="/links/all?target={{ did.0|urlencode }}">links to this DID</a>
-
-> browse <a href="https://atproto-browser-plus-links.vercel.app/at/{{ did.0|urlencode }}">this DID record</a></pre>
{% endfor %}
{% if let Some(c) = cursor %}
···
{% for did in linking_dids %}
<pre style="display: block; margin: 1em 2em" class="code"><strong>DID</strong>: {{ did.0 }}
-> see <a href="/links/all?target={{ did.0|urlencode }}">links to this DID</a>
+
-> browse <a href="https://pdsls.dev/at://{{ did.0|urlencode }}">this DID record</a></pre>
{% endfor %}
{% if let Some(c) = cursor %}
+1 -1
constellation/templates/get-backlinks.html.j2
···
{% extends "base.html.j2" %}
{% import "try-it-macros.html.j2" as try_it %}
-
{% block title %}Links{% endblock %}
{% block description %}All {{ query.source }} records with links to {{ query.subject }}{% endblock %}
{% block content %}
···
{% extends "base.html.j2" %}
{% import "try-it-macros.html.j2" as try_it %}
+
{% block title %}Backlinks{% endblock %}
{% block description %}All {{ query.source }} records with links to {{ query.subject }}{% endblock %}
{% block content %}
+67
constellation/templates/get-many-to-many-counts.html.j2
···
···
+
{% extends "base.html.j2" %}
+
{% import "try-it-macros.html.j2" as try_it %}
+
+
{% block title %}Many to Many counts{% endblock %}
+
{% block description %}Counts of many-to-many {{ query.source }} join records with links to {{ query.subject }} and a secondary target at {{ query.path_to_other }}{% endblock %}
+
+
{% block content %}
+
+
{% call try_it::get_many_to_many_counts(
+
query.subject,
+
query.source,
+
query.path_to_other,
+
query.did,
+
query.other_subject,
+
query.limit,
+
) %}
+
+
<h2>
+
Many-to-many links to <code>{{ query.subject }}</code> joining through <code>{{ query.path_to_other }}</code>
+
{% if let Some(browseable_uri) = query.subject|to_browseable %}
+
<small style="font-weight: normal; font-size: 1rem"><a href="{{ browseable_uri }}">browse record</a></small>
+
{% endif %}
+
</h2>
+
+
<p><strong>{% if cursor.is_some() || query.cursor.is_some() %}more than {% endif %}{{ counts_by_other_subject.len()|to_u64|human_number }} joins</strong> <code>{{ query.source }}โ†’{{ query.path_to_other }}</code></p>
+
+
<ul>
+
<li>See direct backlinks at <code>/xrpc/blue.microcosm.links.getBacklinks</code>: <a href="/xrpc/blue.microcosm.links.getBacklinks?subject={{ query.subject|urlencode }}&source={{ query.source|urlencode }}">/xrpc/blue.microcosm.links.getBacklinks?subject={{ query.subject }}&source={{ query.source }}</a></li>
+
<li>See all links to this target at <code>/links/all</code>: <a href="/links/all?target={{ query.subject|urlencode }}">/links/all?target={{ query.subject }}</a></li>
+
</ul>
+
+
<h3>Counts by other subject:</h3>
+
+
{% for counts in counts_by_other_subject %}
+
<pre style="display: block; margin: 1em 2em" class="code"><strong>Joined subject</strong>: {{ counts.subject }}
+
<strong>Joining records</strong>: {{ counts.total }}
+
<strong>Unique joiner ids</strong>: {{ counts.distinct }}
+
-> {% if let Some(browseable_uri) = counts.subject|to_browseable -%}
+
<a href="{{ browseable_uri }}">browse record</a>
+
{%- endif %}</pre>
+
{% endfor %}
+
+
{% if let Some(c) = cursor %}
+
<form method="get" action="/xrpc/blue.microcosm.links.getManyToManyCounts">
+
<input type="hidden" name="subject" value="{{ query.subject }}" />
+
<input type="hidden" name="source" value="{{ query.source }}" />
+
<input type="hidden" name="pathToOther" value="{{ query.path_to_other }}" />
+
{% for did in query.did %}
+
<input type="hidden" name="did" value="{{ did }}" />
+
{% endfor %}
+
{% for otherSubject in query.other_subject %}
+
<input type="hidden" name="otherSubject" value="{{ otherSubject }}" />
+
{% endfor %}
+
<input type="hidden" name="limit" value="{{ query.limit }}" />
+
<input type="hidden" name="cursor" value={{ c|json|safe }} />
+
<button type="submit">next page&hellip;</button>
+
</form>
+
{% else %}
+
<button disabled><em>end of results</em></button>
+
{% endif %}
+
+
<details>
+
<summary>Raw JSON response</summary>
+
<pre class="code">{{ self|tojson }}</pre>
+
</details>
+
+
{% endblock %}
+38 -2
constellation/templates/hello.html.j2
···
<p>It works by recursively walking <em>all</em> records coming through the firehose, searching for anything that looks like a link. Links are indexed by the target they point at, the collection the record came from, and the JSON path to the link in that record.</p>
<p>
-
This server has indexed <span class="stat">{{ stats.linking_records|human_number }}</span> links between <span class="stat">{{ stats.targetables|human_number }}</span> targets and sources from <span class="stat">{{ stats.dids|human_number }}</span> identities over <span class="stat">{{ days_indexed|human_number }}</span> days.<br/>
<small>(indexing new records in real time, backfill coming soon!)</small>
</p>
-
<p>But feel free to use it! If you want to be nice, put your project name and bsky username (or email) in your user-agent header for api requests.</p>
<h2>API Endpoints</h2>
···
<p style="margin-bottom: 0"><strong>Try it:</strong></p>
{% call try_it::get_backlinks("at://did:plc:a4pqq234yw7fqbddawjo7y35/app.bsky.feed.post/3m237ilwc372e", "app.bsky.feed.like:subject.uri", [""], 16) %}
<h3 class="route"><code>GET /links</code></h3>
···
<p>It works by recursively walking <em>all</em> records coming through the firehose, searching for anything that looks like a link. Links are indexed by the target they point at, the collection the record came from, and the JSON path to the link in that record.</p>
<p>
+
This server has indexed <span class="stat">{{ stats.linking_records|human_number }}</span> links between <span class="stat">{{ stats.targetables|human_number }}</span> targets and sources from <span class="stat">{{ stats.dids|human_number }}</span> identities over <span class="stat">
+
{%- if let Some(days) = days_indexed %}
+
{{ days|human_number }}
+
{% else %}
+
???
+
{% endif -%}
+
</span> days.<br/>
<small>(indexing new records in real time, backfill coming soon!)</small>
</p>
+
{# {% for k, v in stats.other_data.iter() %}
+
<p><strong>{{ k }}</strong>: {{ v }}</p>
+
{% endfor %} #}
+
+
<p>You're welcome to use this public instance! Please do not build the torment nexus. If you want to be nice, put your project name and bsky username (or email) in your user-agent header for api requests.</p>
<h2>API Endpoints</h2>
···
<p style="margin-bottom: 0"><strong>Try it:</strong></p>
{% call try_it::get_backlinks("at://did:plc:a4pqq234yw7fqbddawjo7y35/app.bsky.feed.post/3m237ilwc372e", "app.bsky.feed.like:subject.uri", [""], 16) %}
+
+
+
<h3 class="route"><code>GET /xrpc/blue.microcosm.links.getManyToManyCounts</code></h3>
+
+
<p>TODO: description</p>
+
+
<h4>Query parameters:</h4>
+
+
<ul>
+
<li><p><code>subject</code>: required, must url-encode. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li>
+
<li><p><code>source</code>: required. Example: <code>app.bsky.feed.like:subject.uri</code></p></li>
+
<li><p><code>pathToOther</code>: required. Path to the secondary link in the many-to-many record. Example: <code>otherThing.uri</code></p></li>
+
<li><p><code>did</code>: optional, filter links to those from specific users. Include multiple times to filter by multiple users. Example: <code>did=did:plc:vc7f4oafdgxsihk4cry2xpze&did=did:plc:vc7f4oafdgxsihk4cry2xpze</code></p></li>
+
<li><p><code>otherSubject</code>: optional, filter secondary links to specific subjects. Include multiple times to filter by multiple users. Example: <code>at://did:plc:vc7f4oafdgxsihk4cry2xpze/app.bsky.feed.post/3lgwdn7vd722r</code></p></li>
+
<li><p><code>limit</code>: optional. Default: <code>16</code>. Maximum: <code>100</code></p></li>
+
</ul>
+
+
<p style="margin-bottom: 0"><strong>Try it:</strong></p>
+
{% call try_it::get_many_to_many_counts(
+
"at://did:plc:wshs7t2adsemcrrd4snkeqli/sh.tangled.label.definition/good-first-issue",
+
"sh.tangled.label.op:add[].key",
+
"subject",
+
[""],
+
[""],
+
25,
+
) %}
<h3 class="route"><code>GET /links</code></h3>
+43 -1
constellation/templates/try-it-macros.html.j2
···
{% macro get_backlinks(subject, source, dids, limit) %}
<form method="get" action="/xrpc/blue.microcosm.links.getBacklinks">
-
<pre class="code"><strong>GET</strong> /links
?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." />
&source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" />
{%- for did in dids %}{% if !did.is_empty() %}
···
p.insertBefore(document.createTextNode('&did= '), didPlaceholder);
p.insertBefore(i, didPlaceholder);
p.insertBefore(document.createTextNode('\n '), didPlaceholder);
});
</script>
{% endmacro %}
···
{% macro get_backlinks(subject, source, dids, limit) %}
<form method="get" action="/xrpc/blue.microcosm.links.getBacklinks">
+
<pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getBacklinks
?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." />
&source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" />
{%- for did in dids %}{% if !did.is_empty() %}
···
p.insertBefore(document.createTextNode('&did= '), didPlaceholder);
p.insertBefore(i, didPlaceholder);
p.insertBefore(document.createTextNode('\n '), didPlaceholder);
+
});
+
</script>
+
{% endmacro %}
+
+
{% macro get_many_to_many_counts(subject, source, pathToOther, dids, otherSubjects, limit) %}
+
<form method="get" action="/xrpc/blue.microcosm.links.getManyToManyCounts">
+
<pre class="code"><strong>GET</strong> /xrpc/blue.microcosm.links.getManyToManyCounts
+
?subject= <input type="text" name="subject" value="{{ subject }}" placeholder="at-uri, did, uri..." />
+
&source= <input type="text" name="source" value="{{ source }}" placeholder="app.bsky.feed.like:subject.uri" />
+
&pathToOther= <input type="text" name="pathToOther" value="{{ pathToOther }}" placeholder="otherThing.uri" />
+
{%- for did in dids %}{% if !did.is_empty() %}
+
&did= <input type="text" name="did" value="{{ did }}" placeholder="did:plc:..." />{% endif %}{% endfor %}
+
<span id="m2m-subject-placeholder"></span> <button id="m2m-add-subject">+ other subject filter</button>
+
{%- for otherSubject in otherSubjects %}{% if !otherSubject.is_empty() %}
+
&otherSubject= <input type="text" name="did" value="{{ otherSubject }}" placeholder="at-uri, did, uri..." />{% endif %}{% endfor %}
+
<span id="m2m-did-placeholder"></span> <button id="m2m-add-did">+ did filter</button>
+
&limit= <input type="number" name="limit" value="{{ limit }}" max="100" placeholder="100" /> <button type="submit">get links</button></pre>
+
</form>
+
<script>
+
const m2mAddDidButton = document.getElementById('m2m-add-did');
+
const m2mDidPlaceholder = document.getElementById('m2m-did-placeholder');
+
m2mAddDidButton.addEventListener('click', e => {
+
e.preventDefault();
+
const i = document.createElement('input');
+
i.placeholder = 'did:plc:...';
+
i.name = "did"
+
const p = m2mAddDidButton.parentNode;
+
p.insertBefore(document.createTextNode('&did= '), m2mDidPlaceholder);
+
p.insertBefore(i, m2mDidPlaceholder);
+
p.insertBefore(document.createTextNode('\n '), m2mDidPlaceholder);
+
});
+
const m2mAddSubjectButton = document.getElementById('m2m-add-subject');
+
const m2mSubjectPlaceholder = document.getElementById('m2m-subject-placeholder');
+
m2mAddSubjectButton.addEventListener('click', e => {
+
e.preventDefault();
+
const i = document.createElement('input');
+
i.placeholder = 'at-uri, did, uri...';
+
i.name = "otherSubject"
+
const p = m2mAddSubjectButton.parentNode;
+
p.insertBefore(document.createTextNode('&otherSubject= '), m2mSubjectPlaceholder);
+
p.insertBefore(i, m2mSubjectPlaceholder);
+
p.insertBefore(document.createTextNode('\n '), m2mSubjectPlaceholder);
});
</script>
{% endmacro %}
+2
links/Cargo.toml
···
[dependencies]
anyhow = "1.0.95"
fluent-uri = "0.3.2"
nom = "7.1.3"
thiserror = "2.0.9"
tinyjson = "2.5.1"
···
[dependencies]
anyhow = "1.0.95"
+
dasl = "0.2.0"
fluent-uri = "0.3.2"
nom = "7.1.3"
+
serde = { version = "1.0.228", features = ["derive"] }
thiserror = "2.0.9"
tinyjson = "2.5.1"
+3 -2
links/src/lib.rs
···
use fluent_uri::Uri;
pub mod at_uri;
pub mod did;
···
pub use record::collect_links;
-
#[derive(Debug, Clone, Ord, Eq, PartialOrd, PartialEq)]
pub enum Link {
AtUri(String),
Uri(String),
···
}
}
-
#[derive(Debug, PartialEq)]
pub struct CollectedLink {
pub path: String,
pub target: Link,
···
use fluent_uri::Uri;
+
use serde::{Deserialize, Serialize};
pub mod at_uri;
pub mod did;
···
pub use record::collect_links;
+
#[derive(Debug, Clone, Ord, Eq, PartialOrd, PartialEq, Serialize, Deserialize)]
pub enum Link {
AtUri(String),
Uri(String),
···
}
}
+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CollectedLink {
pub path: String,
pub target: Link,
+41
links/src/record.rs
···
use tinyjson::JsonValue;
use crate::{parse_any_link, CollectedLink};
···
}
}
pub fn collect_links(v: &JsonValue) -> Vec<CollectedLink> {
let mut found = vec![];
walk_record("", v, &mut found);
found
}
···
+
use dasl::drisl::Value as DrislValue;
use tinyjson::JsonValue;
use crate::{parse_any_link, CollectedLink};
···
}
}
+
pub fn walk_drisl(path: &str, v: &DrislValue, found: &mut Vec<CollectedLink>) {
+
match v {
+
DrislValue::Map(o) => {
+
for (key, child) in o {
+
walk_drisl(&format!("{path}.{key}"), child, found)
+
}
+
}
+
DrislValue::Array(a) => {
+
for child in a {
+
let child_p = match child {
+
DrislValue::Map(o) => {
+
if let Some(DrislValue::Text(t)) = o.get("$type") {
+
format!("{path}[{t}]")
+
} else {
+
format!("{path}[]")
+
}
+
}
+
_ => format!("{path}[]"),
+
};
+
walk_drisl(&child_p, child, found)
+
}
+
}
+
DrislValue::Text(s) => {
+
if let Some(link) = parse_any_link(s) {
+
found.push(CollectedLink {
+
path: path.to_string(),
+
target: link,
+
});
+
}
+
}
+
_ => {}
+
}
+
}
+
pub fn collect_links(v: &JsonValue) -> Vec<CollectedLink> {
let mut found = vec![];
walk_record("", v, &mut found);
+
found
+
}
+
+
pub fn collect_links_drisl(v: &DrislValue) -> Vec<CollectedLink> {
+
let mut found = vec![];
+
walk_drisl("", v, &mut found);
found
}
+8
spacedust/Cargo.toml
···
edition = "2024"
[dependencies]
async-trait = "0.1.88"
clap = { version = "4.5.40", features = ["derive"] }
ctrlc = "3.4.7"
dropshot = "0.16.2"
env_logger = "0.11.8"
futures = "0.3.31"
http = "1.3.1"
jetstream = { path = "../jetstream", features = ["metrics"] }
links = { path = "../links" }
log = "0.4.27"
metrics = "0.24.2"
metrics-exporter-prometheus = { version = "0.17.1", features = ["http-listener"] }
rand = "0.9.1"
schemars = "0.8.22"
semver = "1.0.26"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
serde_qs = "1.0.0-rc.3"
thiserror = "2.0.12"
···
edition = "2024"
[dependencies]
+
anyhow = "1.0.100"
+
async-channel = "2.5.0"
async-trait = "0.1.88"
clap = { version = "4.5.40", features = ["derive"] }
ctrlc = "3.4.7"
+
dasl = "0.2.0"
dropshot = "0.16.2"
env_logger = "0.11.8"
+
fjall = "3.0.0-pre.0"
futures = "0.3.31"
http = "1.3.1"
+
ipld-core = { version = "0.4.2", features = ["serde"] }
jetstream = { path = "../jetstream", features = ["metrics"] }
links = { path = "../links" }
log = "0.4.27"
metrics = "0.24.2"
metrics-exporter-prometheus = { version = "0.17.1", features = ["http-listener"] }
rand = "0.9.1"
+
repo-stream = "0.2.2"
+
reqwest = { version = "0.12.24", features = ["json", "stream"] }
schemars = "0.8.22"
semver = "1.0.26"
serde = { version = "1.0.219", features = ["derive"] }
+
serde_ipld_dagcbor = "0.6.4"
serde_json = "1.0.140"
serde_qs = "1.0.0-rc.3"
thiserror = "2.0.12"
+21
spacedust/src/bin/import_car_file.rs
···
···
+
use clap::Parser;
+
use std::path::PathBuf;
+
+
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
+
+
#[derive(Debug, Parser)]
+
struct Args {
+
#[arg()]
+
file: PathBuf,
+
}
+
+
#[tokio::main]
+
async fn main() -> Result<()> {
+
env_logger::init();
+
+
let Args { file } = Args::parse();
+
+
let _reader = tokio::fs::File::open(file).await?;
+
+
Ok(())
+
}
+258
spacedust/src/bin/import_scraped.rs
···
···
+
use clap::Parser;
+
use links::CollectedLink;
+
use repo_stream::{
+
DiskBuilder, DiskStore, Driver, DriverBuilder, Processable, drive::DriverBuilderWithProcessor,
+
drive::NeedDisk,
+
};
+
use std::path::PathBuf;
+
use std::sync::{
+
Arc,
+
atomic::{AtomicUsize, Ordering},
+
};
+
use tokio::{io::AsyncRead, task::JoinSet};
+
+
type Result<T> = anyhow::Result<T>; //std::result::Result<T, Box<dyn std::error::Error>>;
+
+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+
struct CollectedProcessed(CollectedLink);
+
+
impl Processable for CollectedProcessed {
+
fn get_size(&self) -> usize {
+
self.0.path.capacity() + self.0.target.as_str().len()
+
}
+
}
+
+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+
struct ErrString(String);
+
+
impl Processable for ErrString {
+
fn get_size(&self) -> usize {
+
self.0.capacity()
+
}
+
}
+
+
type Processed = std::result::Result<Vec<CollectedProcessed>, ErrString>;
+
+
/// hacky for now: put errors in strings ๐Ÿคทโ€โ™€๏ธ
+
fn process(block: Vec<u8>) -> Processed {
+
let value: dasl::drisl::Value = dasl::drisl::from_slice(&block)
+
.map_err(|e| ErrString(format!("failed to parse block with drisl: {e:?}")))?;
+
let links = links::record::collect_links_drisl(&value)
+
.into_iter()
+
.map(CollectedProcessed)
+
.collect();
+
Ok(links)
+
}
+
+
#[derive(Debug, Parser)]
+
struct Args {
+
#[arg(long)]
+
cars_folder: PathBuf,
+
#[arg(long)]
+
mem_workers: usize,
+
#[arg(long)]
+
disk_workers: usize,
+
#[arg(long)]
+
disk_folder: PathBuf,
+
}
+
+
async fn get_cars(
+
cars_folder: PathBuf,
+
tx: async_channel::Sender<tokio::io::BufReader<tokio::fs::File>>,
+
) -> Result<()> {
+
let mut dir = tokio::fs::read_dir(cars_folder).await?;
+
while let Some(entry) = dir.next_entry().await? {
+
if !entry.file_type().await?.is_file() {
+
continue;
+
}
+
let reader = tokio::fs::File::open(&entry.path()).await?;
+
let reader = tokio::io::BufReader::new(reader);
+
tx.send(reader).await?;
+
}
+
Ok(())
+
}
+
+
async fn drive_mem<R: AsyncRead + Unpin + Send + Sync + 'static>(
+
f: R,
+
builder: &DriverBuilderWithProcessor<Processed>,
+
disk_tx: &async_channel::Sender<NeedDisk<R, Processed>>,
+
) -> Result<Option<(usize, usize)>> {
+
let mut n = 0;
+
let mut n_records = 0;
+
match builder.load_car(f).await? {
+
Driver::Memory(_commit, mut driver) => {
+
while let Some(chunk) = driver.next_chunk(512).await? {
+
n_records += chunk.len();
+
for (_key, links) in chunk {
+
match links {
+
Ok(links) => n += links.len(),
+
Err(e) => eprintln!("wat: {e:?}"),
+
}
+
}
+
}
+
Ok(Some((n, n_records)))
+
}
+
Driver::Disk(need_disk) => {
+
disk_tx.send(need_disk).await?;
+
Ok(None)
+
}
+
}
+
}
+
+
async fn mem_worker<R: AsyncRead + Unpin + Send + Sync + 'static>(
+
car_rx: async_channel::Receiver<R>,
+
disk_tx: async_channel::Sender<NeedDisk<R, Processed>>,
+
n: Arc<AtomicUsize>,
+
n_records: Arc<AtomicUsize>,
+
) -> Result<()> {
+
let builder = DriverBuilder::new()
+
.with_block_processor(process) // don't care just counting records
+
.with_mem_limit_mb(128);
+
while let Ok(f) = car_rx.recv().await {
+
let driven = match drive_mem(f, &builder, &disk_tx).await {
+
Ok(d) => d,
+
Err(e) => {
+
eprintln!("failed to drive mem: {e:?}. skipping...");
+
continue;
+
}
+
};
+
if let Some((drove, recs)) = driven {
+
n.fetch_add(drove, Ordering::Relaxed);
+
n_records.fetch_add(recs, Ordering::Relaxed);
+
}
+
}
+
Ok(())
+
}
+
+
async fn drive_disk<R: AsyncRead + Unpin>(
+
needed: NeedDisk<R, Processed>,
+
store: DiskStore,
+
) -> Result<(usize, usize, DiskStore)> {
+
let (_commit, mut driver) = needed.finish_loading(store).await?;
+
let mut n = 0;
+
let mut n_records = 0;
+
while let Some(chunk) = driver.next_chunk(512).await? {
+
n_records += chunk.len();
+
for (_key, links) in chunk {
+
match links {
+
Ok(links) => n += links.len(),
+
Err(e) => eprintln!("wat: {e:?}"),
+
}
+
}
+
}
+
let store = driver.reset_store().await?;
+
Ok((n, n_records, store))
+
}
+
+
async fn disk_worker<R: AsyncRead + Unpin>(
+
worker_id: usize,
+
disk_rx: async_channel::Receiver<NeedDisk<R, Processed>>,
+
folder: PathBuf,
+
n: Arc<AtomicUsize>,
+
n_records: Arc<AtomicUsize>,
+
disk_workers_active: Arc<AtomicUsize>,
+
) -> Result<()> {
+
let mut file = folder;
+
file.push(format!("disk-worker-{worker_id}.sqlite"));
+
let builder = DiskBuilder::new().with_cache_size_mb(128);
+
let mut store = builder.open(file.clone()).await?;
+
while let Ok(needed) = disk_rx.recv().await {
+
let active = disk_workers_active.fetch_add(1, Ordering::AcqRel);
+
println!("-> disk workers active: {}", active + 1);
+
let (drove, records) = match drive_disk(needed, store).await {
+
Ok((d, r, s)) => {
+
store = s;
+
(d, r)
+
}
+
Err(e) => {
+
eprintln!("failed to drive disk: {e:?}. skipping...");
+
store = builder.open(file.clone()).await?;
+
continue;
+
}
+
};
+
n.fetch_add(drove, Ordering::Relaxed);
+
n_records.fetch_add(records, Ordering::Relaxed);
+
let were_active = disk_workers_active.fetch_sub(1, Ordering::AcqRel);
+
println!("<- disk workers active: {}", were_active - 1);
+
}
+
Ok(())
+
}
+
+
#[tokio::main]
+
async fn main() -> Result<()> {
+
env_logger::init();
+
+
let Args {
+
cars_folder,
+
disk_folder,
+
disk_workers,
+
mem_workers,
+
} = Args::parse();
+
+
let mut set = JoinSet::<Result<()>>::new();
+
+
let (cars_tx, cars_rx) = async_channel::bounded(2);
+
set.spawn(get_cars(cars_folder, cars_tx));
+
+
let n: Arc<AtomicUsize> = Arc::new(0.into());
+
let n_records: Arc<AtomicUsize> = Arc::new(0.into());
+
let disk_workers_active: Arc<AtomicUsize> = Arc::new(0.into());
+
+
set.spawn({
+
let n = n.clone();
+
let n_records = n_records.clone();
+
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
+
async move {
+
let mut last_n = n.load(Ordering::Relaxed);
+
let mut last_n_records = n.load(Ordering::Relaxed);
+
loop {
+
interval.tick().await;
+
let n = n.load(Ordering::Relaxed);
+
let n_records = n_records.load(Ordering::Relaxed);
+
let diff_n = n - last_n;
+
let diff_records = n_records - last_n_records;
+
println!("rate: {} rec/sec; {} n/sec", diff_records / 10, diff_n / 10);
+
if n_records > 0 && diff_records == 0 {
+
println!("zero encountered, stopping rate calculation polling.");
+
break Ok(());
+
}
+
last_n = n;
+
last_n_records = n_records;
+
}
+
}
+
});
+
+
let (needs_disk_tx, needs_disk_rx) = async_channel::bounded(disk_workers);
+
+
for _ in 0..mem_workers {
+
set.spawn(mem_worker(
+
cars_rx.clone(),
+
needs_disk_tx.clone(),
+
n.clone(),
+
n_records.clone(),
+
));
+
}
+
drop(cars_rx);
+
drop(needs_disk_tx);
+
+
tokio::fs::create_dir_all(disk_folder.clone()).await?;
+
for id in 0..disk_workers {
+
set.spawn(disk_worker(
+
id,
+
needs_disk_rx.clone(),
+
disk_folder.clone(),
+
n.clone(),
+
n_records.clone(),
+
disk_workers_active.clone(),
+
));
+
}
+
drop(needs_disk_rx);
+
+
while let Some(res) = set.join_next().await {
+
println!("task from set joined: {res:?}");
+
}
+
+
eprintln!("total records processed: {n_records:?}; total n: {n:?}");
+
+
Ok(())
+
}
+137
spacedust/src/bin/scrape_pds.rs
···
···
+
use clap::Parser;
+
use reqwest::Url;
+
use serde::Deserialize;
+
use std::path::PathBuf;
+
use tokio::io::AsyncWriteExt;
+
use tokio::{sync::mpsc, time};
+
+
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
+
+
use futures::StreamExt;
+
+
#[derive(Debug, Parser)]
+
struct Args {
+
#[arg(long)]
+
pds: Url,
+
#[arg(long)]
+
throttle_ms: u64, // 100ms per pds?
+
#[arg(long)]
+
folder: PathBuf,
+
}
+
+
async fn download_repo(
+
client: &reqwest::Client,
+
mut pds: Url,
+
did: String,
+
mut path: PathBuf,
+
) -> Result<()> {
+
path.push(format!("{did}.car"));
+
let f = tokio::fs::File::create(path).await?;
+
let mut w = tokio::io::BufWriter::new(f);
+
+
pds.set_path("/xrpc/com.atproto.sync.getRepo");
+
pds.set_query(Some(&format!("did={did}")));
+
let mut byte_stream = client.get(pds).send().await?.bytes_stream();
+
+
while let Some(stuff) = byte_stream.next().await {
+
tokio::io::copy(&mut stuff?.as_ref(), &mut w).await?;
+
}
+
w.flush().await?;
+
+
Ok(())
+
}
+
+
#[derive(Debug, Deserialize)]
+
struct RepoInfo {
+
did: String,
+
active: bool,
+
}
+
+
#[derive(Debug, Deserialize)]
+
struct ListReposResponse {
+
cursor: Option<String>,
+
repos: Vec<RepoInfo>,
+
}
+
+
fn get_pds_dids(client: reqwest::Client, mut pds: Url) -> mpsc::Receiver<String> {
+
let (tx, rx) = mpsc::channel(2);
+
tokio::task::spawn(async move {
+
pds.set_path("/xrpc/com.atproto.sync.listRepos");
+
let mut cursor = None;
+
+
loop {
+
if let Some(c) = cursor {
+
pds.set_query(Some(&format!("cursor={c}")));
+
}
+
let res: ListReposResponse = client
+
.get(pds.clone())
+
.send()
+
.await
+
.expect("to send request")
+
.error_for_status()
+
.expect("to be ok")
+
.json()
+
.await
+
.expect("json response");
+
for repo in res.repos {
+
if repo.active {
+
tx.send(repo.did)
+
.await
+
.expect("to be able to send on the channel");
+
}
+
}
+
cursor = res.cursor;
+
if cursor.is_none() {
+
break;
+
}
+
}
+
});
+
rx
+
}
+
+
#[tokio::main]
+
async fn main() -> Result<()> {
+
env_logger::init();
+
+
let Args {
+
pds,
+
throttle_ms,
+
folder,
+
} = Args::parse();
+
+
tokio::fs::create_dir_all(folder.clone()).await?;
+
+
let client = reqwest::Client::builder()
+
.user_agent("microcosm/spacedust-testing")
+
.build()?;
+
+
let mut dids = get_pds_dids(client.clone(), pds.clone());
+
+
let mut interval = time::interval(time::Duration::from_millis(throttle_ms));
+
let mut oks = 0;
+
let mut single_fails = 0;
+
let mut double_fails = 0;
+
+
while let Some(did) = dids.recv().await {
+
interval.tick().await;
+
println!("did: {did:?}");
+
if let Err(e) = download_repo(&client, pds.clone(), did.clone(), folder.clone()).await {
+
single_fails += 1;
+
eprintln!("failed to download repo for did: {did:?}: {e:?}. retrying in a moment...");
+
tokio::time::sleep(time::Duration::from_secs(3)).await;
+
interval.reset();
+
if let Err(e) = download_repo(&client, pds.clone(), did.clone(), folder.clone()).await {
+
double_fails += 1;
+
eprintln!("failed again: {e:?}. moving on in a moment...");
+
tokio::time::sleep(time::Duration::from_secs(1)).await;
+
continue;
+
}
+
}
+
oks += 1;
+
println!(" -> done. did: {did:?}");
+
}
+
+
eprintln!("got {oks} repos. single fails: {single_fails}; doubles: {double_fails}.");
+
+
Ok(())
+
}
+1
spacedust/src/lib.rs
···
pub mod error;
pub mod removable_delay_queue;
pub mod server;
pub mod subscriber;
use jetstream::events::CommitEvent;
···
pub mod error;
pub mod removable_delay_queue;
pub mod server;
+
pub mod storage;
pub mod subscriber;
use jetstream::events::CommitEvent;
spacedust/src/storage/car/drive.rs

This is a binary file and will not be displayed.

+1
spacedust/src/storage/car/mod.rs
···
···
+
spacedust/src/storage/car/walk.rs

This is a binary file and will not be displayed.

+9
spacedust/src/storage/fjall/mod.rs
···
···
+
use crate::storage::Storage;
+
+
pub struct FjallStorage {}
+
+
impl Storage for FjallStorage {
+
fn import_car() {
+
todo!()
+
}
+
}
+6
spacedust/src/storage/mod.rs
···
···
+
pub mod car;
+
pub mod fjall;
+
+
pub trait Storage {
+
fn import_car() {}
+
}