Assorted shell and Python scripts

Merge admin-scripts

+3
README.md
···
+
# bin
+
+
Assorted Python and shell scripts.
-10
README.org
···
-
#+title: bin
-
-
Assorted shell and Python scripts. This is cloned into my user's home directory at ~~/bin~.
-
-
#+begin_src shell
-
cd ~
-
git clone https://codeberg.org/hyperreal/bin
-
#+end_src
-
-
A lot of these scripts use [[https://github.com/charmbracelet/gum][gum]].
+11
add_prebuilt_repo
···
+
#!/usr/bin/env bash
+
+
set -euxo pipefail
+
+
wget -qO - 'https://proget.makedeb.org/debian-feeds/prebuilt-mpr.pub' | gpg --dearmor | sudo tee /usr/share/keyrings/prebuilt-mpr-archive-keyring.gpg 1> /dev/null
+
echo "deb [arch=all,$(dpkg --print-architecture) signed-by=/usr/share/keyrings/prebuilt-mpr-archive-keyring.gpg] https://proget.makedeb.org prebuilt-mpr $(lsb_release -cs)" | sudo tee /etc/apt/sources.list.d/prebuilt-mpr.list
+
sudo apt update -t bookworm-backports
+
sudo apt dist-upgrade -t bookworm-backports -y
+
sudo apt install -y just
+
sudo apt autoremove -y
+
exit 0
+38
archive_index_template
···
+
#!/usr/bin/env bash
+
+
set -euo pipefail
+
+
# If the number of arguments is not equal to 1, exit and display usage info.
+
if [ "$#" -ne 2 ]; then
+
echo "Usage: archive_index_template MINIO_INSTANCE BUCKET_NAME"
+
exit 1
+
fi
+
+
# Create temporary directory.
+
TMP_DIR=$(mktemp -d)
+
+
# Check if temporary directory was created.
+
if ! test -d "$TMP_DIR"; then
+
echo "Failed to create temp dir"
+
exit 1
+
fi
+
+
# Cleanup temporary directory.
+
function cleanup() {
+
rm -rf "$TMP_DIR"
+
echo "Cleaned up temp dir at $TMP_DIR"
+
}
+
+
# Trigger cleanup trap on EXIT and SIGINT signals
+
trap cleanup EXIT SIGINT
+
+
# Download archive-index-template.html and save to temporary directory as
+
# index.html.
+
wget --quiet https://files.hyperreal.coffee/archive-index-template.html \
+
-O "${TMP_DIR}/index.html"
+
+
# Replace "CHANGEME" with the the BUCKET_NAME argument in index.html.
+
sed -i "s/CHANGEME/$2/g" "${TMP_DIR}/index.html"
+
+
# Put the new index.html into the root of the given bucket.
+
mc put "${TMP_DIR}/index.html" "${1}/${2}/"
+30
archivebox_schedule
···
+
#!/usr/bin/env bash
+
+
set -euo pipefail
+
+
# Check if feed URL is supplied.
+
if (( "${#@}" == 0 )); then
+
echo "No feed URL has been supplied."
+
exit 1
+
fi
+
+
# Get Headnet IP address for desktop
+
DESKTOP_IP=$(sudo tailscale status | grep "desktop" | awk '{print $1}')
+
+
# Go to archivebox directory and run scheduled command for supplied
+
# feed URL. Send ntfy an error message if it fails.
+
cd /naspool/archivebox
+
if ! /home/jas/.local/bin/archivebox add --depth=1 "$1" \
+
>> /naspool/archivebox/logs/schedule.log; then
+
curl \
+
-H prio:urgent \
+
-H tags:warning \
+
-d "Error running archivebox schedule for $1" \
+
"http://${DESKTOP_IP}/archivebox_schedule"
+
else
+
curl \
+
-H prio:default \
+
-H tags:incoming_envelope \
+
-d "archivebox schedule succeeded: $1" \
+
"http://${DESKTOP_IP}/archivebox_schedule"
+
fi
+4 -6
backup_podman_volumes
···
set -euo pipefail
-
if [ "$#" -eq 0 ]; then
-
echo "Enter one or more Podman volume names."
-
exit 1
-
fi
-
BACKUP_DIR="${HOME}/podman_volume_backups"
DATE=$(date '+%Y-%m-%d_%H%M%S')
-
volumes=("$@")
+
volumes=(
+
"shaarli-cache"
+
"shaarli-data"
+
)
for vol in "${volumes[@]}"; do
podman volume export "$vol" --output "${BACKUP_DIR}/${vol}-${DATE}.tar"
+47
fetch_combined_trackers_list.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "requests",
+
# "docopt",
+
# ]
+
# ///
+
+
"""fetch_combined_trackers_list.py
+
+
Description:
+
This script fetches a combined list of tracker URLs from plaintext lists hosted
+
on the web and writes them to a file in the current working directory.
+
+
Usage:
+
fetch_combined_trackers_list.py
+
fetch_combined_trackers_list.py -h
+
+
Options:
+
-h, --help show this help message and exit
+
"""
+
+
from pathlib import Path
+
+
import requests
+
from docopt import docopt
+
+
if __name__ == "__main__":
+
args = docopt(__doc__) # type: ignore
+
+
live_trackers_list_urls = [
+
"https://newtrackon.com/api/stable",
+
"https://trackerslist.com/best.txt",
+
"https://trackerslist.com/http.txt",
+
"https://raw.githubusercontent.com/ngosang/trackerslist/master/trackers_best.txt",
+
]
+
+
combined_trackers_urls = []
+
for url in live_trackers_list_urls:
+
response = requests.get(url, timeout=60)
+
tracker_urls = [x for x in response.text.splitlines() if x != ""]
+
combined_trackers_urls.extend(tracker_urls)
+
+
tracker_urls_filename = Path.cwd().joinpath("tracker_urls.txt")
+
with open(tracker_urls_filename, "w") as tf:
+
for url in combined_trackers_urls:
+
tf.write(f"{url}\n")
+48
fetch_scihub_infohashes.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "requests",
+
# "docopt",
+
# ]
+
# ///
+
+
"""fetch_scihub_infohashes.py
+
+
Description:
+
This script fetches the infohashes of all Sci Hub torrents and writes them to a
+
plaintext file. The plaintext file is intended to be appended to a bittorrent
+
tracker whitelist. E.g., /etc/opentracker/whitelist.txt.
+
+
Optionally set the TORRENT_JSON_URL for the Sci Hub torrent health checker, or
+
run the script with no arguments to use the default.
+
+
Default health check URL:
+
https://zrthstr.github.io/libgen_torrent_cardiography/torrent.json
+
+
Usage:
+
fetch_scihub_infohashes.py [TORRENT_JSON_URL]
+
fetch_scihub_infohashes.py -h
+
+
Options:
+
-h, --help show this help message and exit.
+
"""
+
+
import json
+
from pathlib import Path
+
+
import requests
+
from docopt import docopt
+
+
if __name__ == "__main__":
+
args = docopt(__doc__) # type: ignore
+
url = (
+
args["TORRENT_JSON_URL"]
+
if args["TORRENT_JSON_URL"]
+
else "https://zrthstr.github.io/libgen_torrent_cardiography/torrent.json"
+
)
+
response = requests.get(url, timeout=60)
+
json_data = json.loads(response.text)
+
torrent_infohashes = [f"{x["infohash"]}\n" for x in json_data]
+
+
with open(Path.cwd().joinpath("scihub_torrent_infohashes.txt"), "w") as tf:
+
tf.writelines(torrent_infohashes)
+6
gen_digital_archive_listing
···
+
#!/usr/bin/env bash
+
+
cd /naspool/archives
+
fd | sort | tee /home/jas/digital_archive_listing.txt
+
+
exit 0
+7
install_just
···
+
#!/usr/bin/env bash
+
+
set -euxo pipefail
+
+
curl --proto '=https' --tlsv1.2 -sSf https://just.systems/install.sh | bash -s -- --to /usr/local/bin
+
+
exit 0
+75
list_torrents.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "qbittorrent-api",
+
# "docopt",
+
# ]
+
# ///
+
+
"""list_torrents.py
+
+
Description:
+
Fetch a list of torrents from a qBittorrent instance running on localhost.
+
The qBittorrent instance must be configured to allow login on localhost
+
without authentication. The output is formatted into a plaintext table.
+
+
Usage:
+
list_torrents.py
+
list_torrents.py -h
+
+
Options:
+
-h, --help show this help message and exit
+
"""
+
+
import qbittorrentapi
+
from docopt import docopt
+
+
+
# convert byte units
+
def human_bytes(input_bytes: int) -> str:
+
B = float(input_bytes)
+
KiB = float(1024)
+
MiB = float(KiB**2)
+
GiB = float(KiB**3)
+
TiB = float(KiB**4)
+
+
match B:
+
case B if B < KiB:
+
return "{0} {1}".format(B, "bytes" if 0 == B > 1 else "byte")
+
case B if KiB <= B <= MiB:
+
return "{0:.2f} KiB".format(B / KiB)
+
case B if MiB <= B <= GiB:
+
return "{0:.2f} MiB".format(B / MiB)
+
case B if GiB <= B <= TiB:
+
return "{0:.2f} GiB".format(B / GiB)
+
case B if TiB <= B:
+
return "{0:.2f} TiB".format(B / TiB)
+
case _:
+
return ""
+
+
+
def print_ssv():
+
with qbittorrentapi.Client(
+
host="localhost", port=8080, username="", password=""
+
) as qbt_client:
+
try:
+
qbt_client.auth_log_in()
+
except qbittorrentapi.LoginFailed as e:
+
print(e)
+
+
sorted_torrents = sorted(
+
qbt_client.torrents_info(), key=lambda d: d.ratio, reverse=True
+
)
+
print("Name Size # of Trackers Ratio Uploaded")
+
for torrent in sorted_torrents:
+
name = torrent.name
+
size = human_bytes(torrent.total_size)
+
trackers = torrent.trackers_count
+
ratio = torrent.ratio
+
uploaded = human_bytes(torrent.uploaded)
+
print(f"{name} {size} {trackers} {ratio} {uploaded}")
+
+
+
if __name__ == "__main__":
+
args = docopt(__doc__) # type: ignore
+
print_ssv()
+13
natpmpcd
···
+
#!/bin/sh
+
+
port=$(/usr/local/bin/natpmpc -a 1 0 udp 60 -g 10.2.0.1 | grep "Mapped public port" | awk '{print $4}')
+
echo $port | tee /usr/local/etc/natvpn_port.txt
+
+
while true; do
+
date
+
if ! /usr/local/bin/natpmpc -a 1 0 udp 60 -g 10.2.0.1 && /usr/local/bin/natpmpc -a 1 0 tcp 60 -g 10.2.0.1; then
+
echo "error Failure natpmpc $(date)"
+
break
+
fi
+
sleep 45
+
done
+48
publish_mastodon_archive.py
···
+
#!/usr/bin/env python3
+
+
import json
+
from html import unescape
+
+
+
def main():
+
with open("/home/jas/downloads/mastodon-archive/outbox.json", "r") as jf:
+
json_data = json.load(jf)
+
+
print("#+TITLE: Mastodon posts archive from 2024-02-16 to 2025-01-31")
+
print("#+DATE: 2025-02-02")
+
print("#+TAGS[]: mastodon archives")
+
print("#+AUTHOR: hyperreal")
+
print("#+SLUG: mastodon_archive-20240216-20250131")
+
print("#+LAYOUT: post")
+
print()
+
+
for item in sorted(
+
json_data["orderedItems"], key=json_data["orderedItems"].index, reverse=True
+
):
+
if type(item.get("object")) is dict:
+
published = item.get("object").get("published")
+
content = item.get("object").get("content")
+
attachment = (
+
item.get("object").get("attachment")
+
if len(item.get("object").get("attachment")) >= 1
+
else None
+
)
+
+
print(f"** {published}")
+
print("#+BEGIN_EXPORT html")
+
if type(content) is str:
+
print(unescape(content))
+
print("#+END_EXPORT")
+
if attachment:
+
for item in attachment:
+
if item.get("name"):
+
print(f"#+CAPTION: {item.get('name')}")
+
print(
+
f"[[https://files.hyperreal.coffee/mastodon_20240216-20250131/{item.get('url')}]]"
+
)
+
print("-----")
+
print()
+
+
+
if __name__ == "__main__":
+
main()
+15
qbt_stats_html.nu
···
+
#!/usr/bin/env nu
+
+
let old_head = "<html><style>body { background-color:white;color:black; }</style><body>"
+
let new_head = (
+
["<html><head><title>Torrent Stats</title><link type="text/css" rel="stylesheet" href="https://files.hyperreal.coffee/css/style1.css"/></head><body><h4>Last updated:", (date now | format date "%F %T%:z"), "</h4>"]
+
| str join ' '
+
)
+
+
(
+
/home/jas/admin-scripts/python/list_torrents.py
+
| from ssv -m 2
+
| to html
+
| str replace ($old_head) ($new_head)
+
| save -f -r /home/jas/public/html/torrents.html
+
)
+79
qbt_sum_size.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "qbittorrent-api",
+
# "docopt",
+
# ]
+
# ///
+
+
"""qbt_sum_size.py
+
+
Description:
+
Get the total size of all added torrents and the total size of all completed
+
torrents from a qBittorrent instance.
+
+
Usage:
+
qbt_sum_size.py (HOSTNAME) (USERNAME) (PASSWORD)
+
qbt_sum_size.py -h
+
+
Examples:
+
qbt_sum_size.py "http://localhost:8080" "admin" "adminadmin"
+
qbt_sum_size.py "https://cat.seedhost.eu/lol/qbittorrent" "lol" "supersecretpassword"
+
+
Options:
+
-h, --help show this help message and exit
+
"""
+
+
import qbittorrentapi
+
from docopt import docopt
+
+
+
# convert byte units
+
def human_bytes(bites: int) -> str:
+
B = float(bites)
+
KiB = float(1024)
+
MiB = float(KiB**2)
+
GiB = float(KiB**3)
+
TiB = float(KiB**4)
+
+
match B:
+
case B if B < KiB:
+
return "{0} {1}".format(B, "bytes" if 0 == B > 1 else "byte")
+
case B if KiB <= B < MiB:
+
return "{0:.2f} KiB".format(B / KiB)
+
case B if MiB <= B < GiB:
+
return "{0:.2f} MiB".format(B / MiB)
+
case B if GiB <= B < TiB:
+
return "{0:.2f} GiB".format(B / GiB)
+
case B if TiB <= B:
+
return "{0:.2f} TiB".format(B / TiB)
+
case _:
+
return ""
+
+
+
if __name__ == "__main__":
+
args = docopt(__doc__) # type: ignore
+
+
completed_torrent_sizes = []
+
total_added_bytes = int()
+
+
with qbittorrentapi.Client(
+
host=args["HOSTNAME"], username=args["USERNAME"], password=args["PASSWORD"]
+
) as qbt_client:
+
try:
+
qbt_client.auth_log_in()
+
except qbittorrentapi.LoginFailed as e:
+
print(e)
+
+
for torrent in qbt_client.torrents_info():
+
if torrent.completion_on != 0:
+
completed_torrent_sizes.append(torrent.total_size)
+
+
total_added_bytes = sum(
+
[torrent.total_size for torrent in qbt_client.torrents_info()]
+
)
+
+
total_completed_bytes = sum(completed_torrent_sizes)
+
+
print(f"\nTotal completed size: {human_bytes(total_completed_bytes)}")
+
print(f"Total added size: {human_bytes(total_added_bytes)}\n")
+15
record_mastodon_media_size
···
+
#!/usr/bin/env bash
+
+
# /etc/cron.daily/record_mastodon_media_size.bash
+
+
set -euo pipefail
+
+
RECORD_FILE="/var/log/mastodon_media_size.log"
+
+
file_count=$(sudo /home/jas/.cargo/bin/dust -c -P -d 0 -b -f -R -p /home/mastodon/live/public/system | awk '{print $3}')
+
+
sudo /home/jas/.cargo/bin/dust \
+
-c -P -d 0 -b -R -p \
+
/home/mastodon/live/public/system |
+
awk -v fc="$file_count" -v tstamp="$(date '+%Y-%m-%d-%H%M%S')" '{print tstamp,$1,$3,fc}' |
+
tee -a "${RECORD_FILE}"
+35
resend_notify.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "resend",
+
# ]
+
# ///
+
+
import subprocess
+
import sys
+
from pathlib import Path
+
+
import resend
+
+
+
def main():
+
resend.api_key = Path("/usr/local/etc/resend_api_key.txt").read_text().strip("\n")
+
+
if len(sys.argv) != 3:
+
exit("Usage: resend_notify.py SUBJECT MESSAGE")
+
subject = sys.argv[1]
+
message = sys.argv[2]
+
+
params: resend.Emails.SendParams = {
+
"from": "Admin <admin@hyperreal.coffee>",
+
"to": ["hyperreal@moonshadow.dev"],
+
"subject": subject,
+
"text": message,
+
}
+
+
email = resend.Emails.send(params)
+
print(email)
+
+
+
if __name__ == "__main__":
+
main()
+3 -3
rofimaim
···
# This is lifted from https://gitlab.com/vahnrr/rofi-menus and modified by
# hyperreal <hyperreal64@pm.me> on 2023-09-06T15:09:58-05:00
-
save_location="${HOME}/sync/pictures/screenshots"
-
if ! test -d "${HOME}/sync/pictures/screenshots"; then
-
mkdir "${HOME}/sync/pictures/screenshots"
+
save_location="${HOME}/Nextcloud/pictures/screenshots"
+
if ! test -d "${HOME}/Nextcloud/pictures/screenshots"; then
+
mkdir "${HOME}/Nextcloud/pictures/screenshots"
fi
screenshot_path="$save_location/$(date +'%Y-%m-%d-%H%M%S').png"
+213
scihub_knapsack.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "qbittorrent-api",
+
# "requests",
+
# "docopt",
+
# ]
+
# ///
+
+
"""scihub_knapsack.py
+
+
Description:
+
This script will add torrents to a qBittorrent instance until a specified size
+
limit is reached.
+
+
By default, the larger torrents are prioritized in descending order, but the
+
script can be run with the --smaller flag to prioritize smaller torrents in
+
ascending order.
+
+
The script will select only torrents with less than or equal to <max_seeders>.
+
+
Usage:
+
scihub_knapsack.py [--smaller] [--dry-run] -H <hostname> -U <username> -P <password> -S <size> -s <max_seeders>
+
scihub_knapsack.py -h
+
+
Examples:
+
scihub_knapsack.py -H http://localhost:8080 -U admin -P adminadmin -S 42T
+
scihub_knapsack.py --smaller -H https://qbt.hello.world -U admin -P adminadmin -S 2.2T
+
+
Options:
+
--smaller Prioritize from the smallest torrent sizes and work upward
+
to larger sizes. Default is to prioritize larger sizes.
+
--dry-run Only print the torrent names, total number of torrents, and
+
their total combined size instead of adding them to the
+
qBittorrent instance.
+
-H <hostname> Hostname of the server where the qBittorrent instance is
+
running.
+
-U <username> Username of the user to login to the qBittorrent instance.
+
-P <password> Password of the user to login to the qBittorrent instance.
+
-S <size> The maximum size, in GiB or TiB, of the knapsack to add Sci
+
Hub torrents to. Must be a positive integer or float. Must
+
have either G or T on the end, which represents GiB or TiB.
+
-s <max_seeders> Select torrents with less than or equal to <max_seeders>
+
seeders. <max_seeders> is a positive integer.
+
"""
+
+
import json
+
+
import qbittorrentapi
+
import requests
+
from docopt import docopt
+
+
+
def get_torrent_health_data() -> list[dict]:
+
"""
+
Fetch Sci Hub torrent health checker data from the given URL. The URL
+
should refer to a JSON-formatted file.
+
"""
+
TORRENT_HEALTH_URL = (
+
"https://zrthstr.github.io/libgen_torrent_cardiography/torrent.json"
+
)
+
response = requests.get(TORRENT_HEALTH_URL, timeout=60)
+
return json.loads(response.text)
+
+
+
def convert_size_to_bytes(size: str) -> int:
+
"""
+
Convert the given size string to bytes.
+
+
Example: 42G --> 45097156608 bytes
+
"""
+
total_bytes = int()
+
+
if size.endswith("T"):
+
total_bytes = int(size.split("T")[0]) * (1024**4)
+
+
if size.endswith("G"):
+
total_bytes = int(size.split("G")[0]) * (1024**3)
+
+
return total_bytes
+
+
+
def human_bytes(bites: int) -> str:
+
"""
+
Convert bytes to KiB, MiB, GiB, or TiB.
+
+
Example: 45097156608 bytes -> 42 GiB
+
"""
+
B = float(bites)
+
KiB = float(1024)
+
MiB = float(KiB**2)
+
GiB = float(KiB**3)
+
TiB = float(KiB**4)
+
+
match B:
+
case B if B < KiB:
+
return "{0} {1}".format(B, "bytes" if 0 == B > 1 else "byte")
+
case B if KiB <= B < MiB:
+
return "{0:.2f} KiB".format(B / KiB)
+
case B if MiB <= B < GiB:
+
return "{0:.2f} MiB".format(B / MiB)
+
case B if GiB <= B < TiB:
+
return "{0:.2f} GiB".format(B / GiB)
+
case B if TiB <= B:
+
return "{0:.2f} TiB".format(B / TiB)
+
case _:
+
return ""
+
+
+
def get_knapsack_weight(knapsack: list[dict]) -> str:
+
"""
+
Get the weight of the given knapsack in GiB or TiB.
+
"""
+
return human_bytes(sum([torrent["size_bytes"] for torrent in knapsack]))
+
+
+
def fill_knapsack(
+
max_seeders: int, knapsack_size: int, smaller: bool = False
+
) -> list[dict]:
+
"""
+
Fill the knapsack.
+
+
Arguments:
+
max_seeders: int -- Select only torrents with less than or equal to
+
this number of seeders
+
knapsack_size: int -- The size in bytes of the knapsack
+
smaller: bool -- Prioritize smaller sized torrents (Default = False)
+
+
Return value:
+
A list of dictionaries that represent the torrents.
+
"""
+
+
# List of torrents with less than or equal to <max_seeders>
+
torrents = [t for t in get_torrent_health_data() if t["seeders"] <= max_seeders]
+
+
# Sorted list of torrents with <max_seeders>. If smaller == True, sort them
+
# in ascending order by size_bytes. Else sort them in descending order by
+
# size_bytes.
+
sorted_torrents = (
+
sorted(torrents, key=lambda d: d["size_bytes"])
+
if smaller == True
+
else sorted(torrents, key=lambda d: d["size_bytes"], reverse=True)
+
)
+
+
# Sum the sizes of each torrent in sorted_torrents and add them to the
+
# knapsack until it is filled, then return the knapsack.
+
sum = 0
+
knapsack = []
+
for torrent in sorted_torrents:
+
if sum + torrent["size_bytes"] >= knapsack_size:
+
break
+
sum += torrent["size_bytes"]
+
knapsack.append(torrent)
+
+
return knapsack
+
+
+
if __name__ == "__main__":
+
args = docopt(__doc__) # type: ignore
+
hostname = args["-H"]
+
username = args["-U"]
+
password = args["-P"]
+
max_seeders = int(args["-s"])
+
knapsack_size = convert_size_to_bytes(args["-S"])
+
smaller = args["--smaller"]
+
dry_run = args["--dry-run"]
+
+
# Initialize client and login
+
qbt_client = qbittorrentapi.Client(
+
host=hostname, username=username, password=password
+
)
+
+
try:
+
qbt_client.auth_log_in()
+
except qbittorrentapi.LoginFailed as e:
+
print(e)
+
+
# Fill the knapsack
+
knapsack = fill_knapsack(max_seeders, knapsack_size, smaller)
+
+
# If it's a dry run, only print the knapsack's contents. Otherwise,
+
# add the knapsack's contents to the qBittorrent instance.
+
# When finished, print the number of items and the combined weight of all
+
# items in the knapsack. Before attempting to add items to the qBittorrent
+
# instance, check to see if libgen.rs is even working. If libgen.rs is down
+
# no torrents can be added to the qBittorrent instance, so exit with an
+
# notice.
+
if dry_run:
+
for torrent in knapsack:
+
print(torrent["link"])
+
else:
+
response = requests.get("https://libgen.is/")
+
if not response.ok:
+
exit(
+
"It appears https://libgen.is is currently down. Please try again later."
+
)
+
for torrent in knapsack:
+
for torrent in knapsack:
+
if "gen.lib.rus.ec" in torrent["link"]:
+
new_torrent = torrent["link"].replace("gen.lib.rus.ec", "libgen.is")
+
qbt_client.torrents_add(new_torrent, category="scihub")
+
+
if "libgen.rs" in torrent["link"]:
+
new_torrent = torrent["link"].replace("libgen.rs", "libgen.is")
+
qbt_client.torrents_add(new_torrent, category="scihub")
+
# print(f"Added {torrent['name']}")
+
+
qbt_client.auth_log_out()
+
+
print("----------------")
+
print(f"Count: {len(knapsack)} torrents")
+
print(f"Total combined size: {get_knapsack_weight(knapsack)}")
+
print("----------------")
+91
seed_armbian_torrents.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "qbittorrent-api",
+
# "requests",
+
# "bs4",
+
# "docopt"
+
# ]
+
# ///
+
+
"""seed_armbian_torrents.py
+
+
Description:
+
Armbian torrents seed script
+
+
This script will scrape https://mirrors.jevincanders.net/armbian/dl/ for
+
torrent files and add them to a qBittorrent instance. If there are already
+
Armbian torrents in the qBittorrent instance, they will be removed, and new
+
ones will be added in their place. This script is intended to be run under
+
/etc/cron.weekly or used in a systemd timer.
+
+
Usage:
+
seed_armbian_torrents.py (HOSTNAME) (USERNAME) (PASSWORD)
+
seed_armbian_torrents.py -h
+
+
Examples:
+
seed_armbian_torrents.py "http://localhost:8080" "admin" "adminadmin"
+
seed_armbian_torrents.py "https://cat.seedhost.eu/lol/qbittorrent" "lol" "pw"
+
+
Options:
+
-h, --help show this help message and exit.
+
"""
+
+
import os
+
+
import qbittorrentapi
+
import requests
+
from bs4 import BeautifulSoup
+
from docopt import docopt
+
+
+
def add_torrents(args: dict):
+
archive_dir_urls = [
+
"https://mirrors.jevincanders.net/armbian/dl/orangepi5-plus/archive/",
+
"https://mirrors.jevincanders.net/armbian/dl/rockpro64/archive/",
+
"https://mirrors.jevincanders.net/armbian/dl/rpi4b/archive/",
+
]
+
+
torrent_urls = []
+
for url in archive_dir_urls:
+
response = requests.get(url, timeout=60)
+
soup = BeautifulSoup(response.content, "html.parser")
+
links = soup.find_all("a")
+
for link in links:
+
if link.text.endswith(".torrent"):
+
torrent_urls.append(url + link.text)
+
+
try:
+
qbt_client = qbittorrentapi.Client(
+
host=args["HOSTNAME"], username=args["USERNAME"], password=args["PASSWORD"]
+
)
+
qbt_client.auth_log_in()
+
+
for url in torrent_urls:
+
qbt_client.torrents_add(url, category="distro")
+
print(f"Added {os.path.basename(url)}")
+
qbt_client.auth_log_out()
+
except qbittorrentapi.LoginFailed as e:
+
print(e)
+
+
+
def remove_torrents(args: dict):
+
try:
+
qbt_client = qbittorrentapi.Client(
+
host=args["HOSTNAME"], username=args["USERNAME"], password=args["PASSWORD"]
+
)
+
qbt_client.auth_log_in()
+
+
for torrent in qbt_client.torrents_info():
+
if torrent.name.startswith("Armbian"):
+
torrent.delete(delete_files=True)
+
print(f"Removed {torrent.name}")
+
qbt_client.auth_log_out()
+
except qbittorrentapi.LoginFailed as e:
+
print(e)
+
+
+
if __name__ == "__main__":
+
args = docopt(__doc__) # type: ignore
+
remove_torrents(args)
+
add_torrents(args)
+16
start_debian_vm
···
+
#!/bin/sh
+
# Purpose: Simple script to start my Debian VM using bhyve on FreeBSD
+
# Original author: Vivek Gite (https://www.cyberciti.biz) under GPL v2.x+
+
# Modifications made by: hyperreal (https://hyperreal.coffee) under GPL v2.x+
+
+
if ! kldstat | grep -w vmm.ko; then
+
kldload -v vmm
+
fi
+
+
if ! kldstat | grep -w nmdm.ko; then
+
kldload -v nmdm
+
fi
+
+
if ! bhyve -c 4 -m 8G -w -H -s 0,hostbridge -s 4,virtio-blk,/dev/zvol/zroot/debianvm -s 5,virtio-net,tap0 -s 29,fbuf,tcp=0.0.0.0:5900,w=1024,h=768 -s 30,xhci,tablet -s 31,lpc -l com1,stdio -l bootrom,/usr/local/share/uefi-firmware/BHYVE_UEFI.fd debianvm 2>/tmp/start_debian_vm_error; then
+
neomutt -s "[nas] start_debian_vm error" jas@nas </tmp/start_debian_vm_error
+
fi
+63
sync_from_remotes.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "resend",
+
# ]
+
# ///
+
+
import socket
+
import subprocess
+
from pathlib import Path
+
+
import resend
+
+
+
def send_email(program: str, log: str):
+
resend.api_key = Path("/usr/local/etc/resend_api_key.txt").read_text().strip("\n")
+
+
match log:
+
case "ok":
+
subj = f"[{socket.getfqdn()}] {program} OK ✅"
+
msg = f"{program} on {socket.getfqdn()} ran successfully!"
+
case "err":
+
subj = f"[{socket.getfqdn()}] {program} Error ❌"
+
msg = f"There was an error running {program} on {socket.getfqdn()}. Please investigate."
+
case _:
+
subj = ""
+
msg = ""
+
+
params: resend.Emails.SendParams = {
+
"from": "Admin <admin@hyperreal.coffee>",
+
"to": ["hyperreal@moonshadow.dev"],
+
"subject": subj,
+
"text": msg,
+
}
+
+
email = resend.Emails.send(params)
+
print(email)
+
+
+
def sync_from_remotes(src: str, dest: str):
+
rsync_cmd = ["rsync", "-avz", "--delete", src, dest]
+
+
try:
+
subprocess.run(rsync_cmd, check=True, text=True)
+
print(f"Successful sync from {src} to {dest}")
+
except subprocess.CalledProcessError as e:
+
print(f"Error during sync from {src} to {dest}: {e}")
+
send_email("sync_from_remotes", "err")
+
exit(1)
+
+
+
if __name__ == "__main__":
+
remotes = [
+
(
+
"root@hyperreal.lyrebird-marlin.ts.net:/srv/borgbackup/hyperreal/",
+
"/naspool/borgbackup/hyperreal",
+
),
+
]
+
+
for remote in remotes:
+
sync_from_remotes(remote[0], remote[1])
+
+
send_email("sync_from_remotes", "ok")
+34
sync_wikimedia_xmldumps
···
+
#!/usr/bin/env bash
+
+
# Use rclone to sync the last two good Wikimedia XML data dumps.
+
+
set -euxo pipefail
+
+
# Get Headnet IP for desktop
+
DESKTOP_IP=$(sudo tailscale status | grep "desktop" | awk '{print $1}')
+
+
wget https://wikimedia.bringyour.com/rsync-filelist-last-2-good.txt \
+
-O /home/jas/rsync-filelist-last-2-good.txt
+
+
cat /home/jas/rsync-filelist-last-2-good.txt \
+
| grep "enwiki" \
+
| grep -v "tenwiki" \
+
| tee /home/jas/rsync-filelist-last-2-good-en.txt
+
+
rm -fv /home/jas/rsync-filelist-last-2-good.txt
+
+
rclone sync \
+
--http-no-head \
+
--transfers 8 \
+
--include-from /home/jas/rsync-filelist-last-2-good-en.txt wikimedia.bringyour.com: \
+
/naspool/archives/wikimedia-xmldatadumps-en
+
+
rm -fv /home/jas/rsync-filelist-last-2-good-en.txt
+
+
curl \
+
-H prio:default \
+
-H tags:incoming_envelope \
+
-d "Syncing of wikimedia xml datadumps succeeded" \
+
"http://${DESKTOP_IP}/wikimedia_xmldatadumps_en"
+
+
exit 0
+90
systemd_syscall_filter
···
+
#!/usr/bin/env bash
+
+
# Usage:
+
# systemd_syscall_filter <absolute/path/to/binary> [-c]
+
#
+
# This script will print the syscalls the given binary executable uses
+
# along with the systemd syscall-filter categories they are in.
+
# This makes it easier to harden a systemd unit because you can see which
+
# categories you shouldn't add to the systemd unit's .d overrides for the
+
# SystemCallFilter= directive. If the given binary executable uses a
+
# particular system call, you probably don't want to keep that system call
+
# out of the sandbox, or the binary executable might not work as expected.
+
+
syscall_categories=(
+
"@default"
+
"@aio"
+
"@basic-io"
+
"@chown"
+
"@clock"
+
"@cpu-emulation"
+
"@debug"
+
"@file-system"
+
"@io-event"
+
"@ipc"
+
"@keyring"
+
"@memlock"
+
"@module"
+
"@mount"
+
"@network-io"
+
"@obsolete"
+
"@pkey"
+
"@privileged"
+
"@process"
+
"@raw-io"
+
"@reboot"
+
"@resources"
+
"@setuid"
+
"@signal"
+
"@swap"
+
"@sync"
+
"@system-service"
+
"@timer"
+
)
+
+
get_used_syscalls() {
+
for category in "${syscall_categories[@]}"; do
+
readarray -t syscalls < <(sudo systemd-analyze syscall-filter --no-pager "$category" | awk '{print $1}' | tail -n+3)
+
+
for sc in "${syscalls[@]}"; do
+
if strings "$1" | grep --silent -w "$sc"; then
+
echo "${category} : ${sc}"
+
fi
+
done
+
done
+
}
+
+
get_unused_categories() {
+
readarray -t used_syscalls < <(get_used_syscalls "$1" | awk '{print $1}' | uniq)
+
readarray -t unused_categories < <(echo "${syscall_categories[@]}" "${used_syscalls[@]}" | tr ' ' '\n' | sort | uniq -u)
+
for category in "${unused_categories[@]}"; do
+
echo "SystemCallFilter=~${category}"
+
done
+
}
+
+
if [ "$#" -eq 2 ]; then
+
case "$2" in
+
"-c")
+
get_unused_categories "$1"
+
;;
+
*)
+
echo "Unknown option: ${2}"
+
exit 1
+
;;
+
esac
+
elif [ "$#" -eq 1 ]; then
+
if ! test -x "$1"; then
+
echo "${1} is not found or is not executable"
+
exit 1
+
else
+
get_used_syscalls "$1"
+
fi
+
else
+
echo "Usage: systemd_syscall_filter <abs/path/to/binary> [-c]"
+
echo ""
+
echo "To get syscalls used by the binary:"
+
echo " systemd_syscall_filter /usr/sbin/auditd"
+
echo ""
+
echo "To get syscall categories not used by the binary, pass the -c (complement) flag:"
+
echo " systemd_syscall_filter /usr/sbin/auditd -c"
+
fi
+129
update_tracker.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "qbittorrent-api",
+
# "docopt",
+
# "rich",
+
# ]
+
# ///
+
+
"""update_tracker.py
+
+
Description:
+
This script collects infohashes of all torrents in each qBittorrent instance,
+
updates opentracker, and reannounces all torrents to their trackers.
+
+
Expectations:
+
- A JSON qBittorrent authentication file at ~/.config/qbittorrent_auth.json
+
- SSH pubkey access to torrent tracker server
+
- rsync installed on the host system running this script
+
+
Usage:
+
update_tracker.py (--add-tracker DOMAIN)
+
update_tracker.py -h
+
+
Options:
+
--add-tracker DOMAIN ensure the provided tracker domain is added to each torrent's tracker list
+
-h, --help show this help message and exit
+
+
Examples:
+
update_tracker.py --add-tracker hyperreal.coffee
+
"""
+
+
import json
+
import subprocess
+
import tempfile
+
from pathlib import Path
+
+
import qbittorrentapi
+
from docopt import docopt
+
from rich.console import Console
+
from rich.text import Text
+
+
if __name__ == "__main__":
+
args = docopt(__doc__) # type: ignore
+
+
tracker_domain = args["--add-tracker"]
+
+
console = Console()
+
with console.status("[bold green]Executing the tasks...") as status:
+
# JSON file containing authentication info for each qBittorrent instance
+
QBITTORRENT_AUTH_FILE = Path.home().joinpath(".config/qbittorrent_auth.json")
+
+
# Open authentication file and load JSON data
+
with open(QBITTORRENT_AUTH_FILE, "r") as qbt_auth:
+
auth_data = json.load(qbt_auth)
+
+
# Collect infohashes of all torrents in each qBittorrent instance
+
console.log(
+
"Collecting infohashes of all torrents in each qBittorrent instance."
+
)
+
torrent_infohashes = []
+
for item in auth_data["instances"]:
+
with qbittorrentapi.Client(
+
host=item["hostname"],
+
username=item["username"],
+
password=item["password"],
+
) as qbt_client:
+
try:
+
qbt_client.auth_log_in()
+
except qbittorrentapi.LoginFailed as e:
+
print(e)
+
+
for torrent in qbt_client.torrents_info():
+
torrent_infohashes.append(torrent.hash)
+
+
# Format the infohashes to have a \n at the end
+
console.log("Formatting infohashes to have a newline at the end.")
+
format_infohashes = set([f"{infohash}\n" for infohash in torrent_infohashes])
+
+
# Create a NamedTemporaryFile and write all infohashes to it, one per line
+
console.log("Creating temporary file to write infohashes to.")
+
+
with tempfile.NamedTemporaryFile() as ntf:
+
with open(ntf.name, "w") as tf:
+
tf.writelines(format_infohashes)
+
+
# Use `sudo cp -f` to copy the infohashes file to the torrent tracker's config
+
# directory, overwriting the whitelist.txt file.
+
console.log(
+
"Copying the temporary infohashes file to the torrent tracker's whitelist."
+
)
+
subprocess.run(
+
["sudo", "cp", "-f", ntf.name, "/etc/opentracker/whitelist.txt"]
+
)
+
+
# Run `sudo systemctl restart opentracker.service`
+
console.log("Restarting opentracker.service")
+
subprocess.run(["sudo", "systemctl", "restart", "opentracker.service"])
+
+
# Reannounce all torrents in each qBittorrent instance to their trackers
+
console.log("Reannouncing all torrents to their trackers.")
+
for item in auth_data["instances"]:
+
with qbittorrentapi.Client(
+
host=item["hostname"],
+
username=item["username"],
+
password=item["password"],
+
) as qbt_client:
+
for torrent in qbt_client.torrents_info():
+
torrent.reannounce()
+
+
console.log("Done!")
+
+
# Print output and make it look sexy ;)
+
console = Console()
+
tasks = Text("\nTasks completed:\n")
+
tasks.stylize("bold magenta")
+
console.print(tasks)
+
console.print(":white_check_mark: update the tracker's whitelist")
+
+
if tracker_domain:
+
console.print(
+
f":white_check_mark: ensure {tracker_domain}:6969/announce is in each torrent's tracker list"
+
)
+
+
console.print(":white_check_mark: reannounce all torrents to their trackers")
+
+
torrents = Text(str(len(torrent_infohashes)))
+
torrents.stylize("bold green")
+
console.print(torrents + " torrents were updated")
+25
yaml2json.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "yaml",
+
# ]
+
# ///
+
+
# YAML to JSON conversion script
+
# Based on https://www.geeksforgeeks.org/convert-yaml-to-json/
+
#
+
# This script takes a YAML file as the first arg, converts the
+
# YAML content to JSON, and outputs the converted JSON content
+
# to stdout.
+
+
import json
+
import sys
+
+
import yaml
+
+
try:
+
print(json.dumps(yaml.load(open(sys.argv[1]), Loader=yaml.FullLoader), indent=4))
+
except IndexError:
+
print("YAML file must be supplied as first arg")
+
except FileNotFoundError:
+
print("YAML file not found")