Assorted shell and Python scripts

Compare changes

Choose any two refs to compare.

+11
.archived/add_prebuilt_repo
···
+
#!/usr/bin/env bash
+
+
set -euxo pipefail
+
+
wget -qO - 'https://proget.makedeb.org/debian-feeds/prebuilt-mpr.pub' | gpg --dearmor | sudo tee /usr/share/keyrings/prebuilt-mpr-archive-keyring.gpg 1> /dev/null
+
echo "deb [arch=all,$(dpkg --print-architecture) signed-by=/usr/share/keyrings/prebuilt-mpr-archive-keyring.gpg] https://proget.makedeb.org prebuilt-mpr $(lsb_release -cs)" | sudo tee /etc/apt/sources.list.d/prebuilt-mpr.list
+
sudo apt update -t bookworm-backports
+
sudo apt dist-upgrade -t bookworm-backports -y
+
sudo apt install -y just
+
sudo apt autoremove -y
+
exit 0
+36
.archived/amimullvad
···
+
#!/usr/bin/env zsh
+
+
# Check for dependencies
+
if ! test -x "$(command -v curl)"; then
+
echo "Missing dependency: curl"
+
exit 1
+
fi
+
+
if ! test -x "$(command -v gum)"; then
+
echo "Missing dependency: gum"
+
echo "See github.com/charmbracelet/gum"
+
exit 1
+
fi
+
+
if ! test -x "$(command -v jq)"; then
+
echo "Missing dependency: jq"
+
exit 1
+
fi
+
+
MV_API=$(curl -sSL https://am.i.mullvad.net/json)
+
IP=$(echo $MV_API | jq ."ip" | tr -d '"')
+
CITY=$(echo $MV_API | jq ."city" | tr -d '"')
+
COUNTRY=$(echo $MV_API | jq ."country" | tr -d '"')
+
MV_EXIT_IP_HN=$(echo $MV_API | jq ."mullvad_exit_ip_hostname" | tr -d '"')
+
MV_SERVER_TYPE=$(echo $MV_API | jq ."mullvad_server_type" | tr -d '"')
+
BLACKLISTED=$(echo $MV_API | jq ."blacklisted"."blacklisted")
+
+
LEFT_COL=$(printf "%s\n%s\n%s\n%s\n%s\n%s\n" "IP Address" "City" "Country" "Exit IP Hostname" "Server Type" "Blacklisted")
+
RIGHT_COL=$(printf "%s\n%s\n%s\n%s\n%s\n%s\n" "$IP" "$CITY" "$COUNTRY" "$MV_EXIT_IP_HN" "$MV_SERVER_TYPE" "$BLACKLISTED")
+
GUM_LEFT=$(gum style --foreground "#73F59F" --border-foreground 57 --border none --width 20 --margin "1 2" --padding "0 1" --align right "$LEFT_COL")
+
GUM_RIGHT=$(gum style --foreground "#F1F1F1" --border-foreground 57 --border none --width 20 --margin "1 0" --align left "$RIGHT_COL")
+
+
GUM_TOP=$(gum style --bold --foreground 212 --border-foreground 57 --border rounded --width 50 --align center --padding "0 1" "Am I Mullvad?")
+
GUM_BOTTOM=$(gum join --horizontal --align right "$GUM_LEFT" "$GUM_RIGHT")
+
BOTTOM=$(gum style --border-foreground 57 --border rounded --width 50 --align center --padding "0 1" $GUM_BOTTOM)
+
gum join --vertical "$GUM_TOP" "$BOTTOM"
+38
.archived/archive_index_template
···
+
#!/usr/bin/env bash
+
+
set -euo pipefail
+
+
# If the number of arguments is not equal to 1, exit and display usage info.
+
if [ "$#" -ne 2 ]; then
+
echo "Usage: archive_index_template MINIO_INSTANCE BUCKET_NAME"
+
exit 1
+
fi
+
+
# Create temporary directory.
+
TMP_DIR=$(mktemp -d)
+
+
# Check if temporary directory was created.
+
if ! test -d "$TMP_DIR"; then
+
echo "Failed to create temp dir"
+
exit 1
+
fi
+
+
# Cleanup temporary directory.
+
function cleanup() {
+
rm -rf "$TMP_DIR"
+
echo "Cleaned up temp dir at $TMP_DIR"
+
}
+
+
# Trigger cleanup trap on EXIT and SIGINT signals
+
trap cleanup EXIT SIGINT
+
+
# Download archive-index-template.html and save to temporary directory as
+
# index.html.
+
wget --quiet https://files.hyperreal.coffee/archive-index-template.html \
+
-O "${TMP_DIR}/index.html"
+
+
# Replace "CHANGEME" with the the BUCKET_NAME argument in index.html.
+
sed -i "s/CHANGEME/$2/g" "${TMP_DIR}/index.html"
+
+
# Put the new index.html into the root of the given bucket.
+
mc put "${TMP_DIR}/index.html" "${1}/${2}/"
+21
.archived/backup_podvol
···
+
#!/usr/bin/env bash
+
+
set -euo pipefail
+
+
BACKUP_DIR="${HOME}/podman_volume_backups"
+
DATE=$(date '+%Y-%m-%d_%H%M%S')
+
+
volumes=("$@")
+
+
if [ ! -d "$BACKUP_DIR" ]; then
+
mkdir -p "$BACKUP_DIR"
+
fi
+
+
for vol in "${volumes[@]}"; do
+
podman volume export "$vol" --output "${BACKUP_DIR}/${vol}-${DATE}.tar"
+
gzip "${BACKUP_DIR}/${vol}-${DATE}.tar"
+
done
+
+
find "$BACKUP_DIR" -maxdepth 1 -mtime +3 -type f -delete
+
+
exit 0
+24
.archived/bluebuild_iso
···
+
#!/usr/bin/env bash
+
+
set -euo pipefail
+
+
OUTPUT_DIR="/home/jas/bluebuild-isos"
+
DATE=$(date '+%Y%m%d')
+
+
/usr/local/bin/bluebuild generate-iso \
+
--verbose \
+
--iso-name "bluebuild-bazzite-${DATE}.iso" \
+
--output-dir "$OUTPUT_DIR" \
+
image aux-remote.carp-wyvern.ts.net/bluebuild-bazzite:latest
+
+
/usr/local/bin/bluebuild generate-iso \
+
--verbose \
+
--iso-name "bluebuild-bluefin-${DATE}.iso" \
+
--output-dir "$OUTPUT_DIR" \
+
image aux-remote.carp-wyvern.ts.net/bluebuild-bluefin:latest
+
+
chown -R jas:jas "$OUTPUT_DIR"
+
find "$OUTPUT_DIR" -maxdepth 1 -mtime +6 -type f -delete
+
sudo -u jas rsync -avz --delete "$OUTPUT_DIR"/ jas@hyperreal.carp-wyvern.ts.net:/home/jas/public/files/bluebuild-isos
+
+
exit 0
+36
.archived/check_http
···
+
#!/usr/bin/env bash
+
+
set -euo pipefail
+
+
domains=(
+
"hyperreal.coffee"
+
"annas-archive.hyperreal.coffee"
+
"anonoverflow.hyperreal.coffee"
+
"bash-hackers.hyperreal.coffee"
+
"bookmarks.hyperreal.coffee"
+
"breezewiki.hyperreal.coffee"
+
"dumb.hyperreal.coffee"
+
"feels.hyperreal.coffee"
+
"files.hyperreal.coffee"
+
"gitlab.hyperreal.coffee"
+
"jargonfile.hyperreal.coffee"
+
"lingva.hyperreal.coffee"
+
"opengist.hyperreal.coffee"
+
"pb.hyperreal.coffee"
+
"searxng.hyperreal.coffee"
+
"techne.hyperreal.coffee"
+
"txtdot.hyperreal.coffee"
+
"zsh-guide.hyperreal.coffee"
+
"zsh-manual.hyperreal.coffee"
+
)
+
+
for domain in "${domains[@]}"; do
+
HTTP_STATUS=$(curl -o /dev/null -s -w "%{http_code}\n" "https://${domain}")
+
if [ "$HTTP_STATUS" -ge 400 ]; then
+
curl \
+
-H prio:urgent \
+
-H tags:warning \
+
-d "HTTP Status for $domain: $HTTP_STATUS" \
+
"http://localhost:8080/http_status"
+
fi
+
done
+19
.archived/check_updates
···
+
#!/usr/bin/env bash
+
+
updates=()
+
+
if [[ -f /etc/debian_version ]]; then
+
APT_UPDATES=$(sudo apt update 2>/dev/null | grep package | tail -1 | cut -d '.' -f 1 | awk '{print $1}')
+
if [ "$APT_UPDATES" = "All" ]; then
+
NUM_UPDATES=0
+
else
+
NUM_UPDATES="$APT_UPDATES"
+
fi
+
updates+=("APT: ${NUM_UPDATES}")
+
fi
+
+
if command -v flatpak >/dev/null; then
+
updates+=("Flatpak: $(flatpak remote-ls --updates | wc -l)")
+
fi
+
+
echo "${updates[*]}"
+19
.archived/dayofweek
···
+
#!/usr/bin/env -S uv run --script
+
#
+
# Usage: dayofweek <year> <month> <day>
+
#
+
# Example: dayofweek 2003 11 6
+
+
import sys
+
from datetime import datetime
+
+
if __name__ == "__main__":
+
if len(sys.argv) != 4:
+
print("Usage: dayofweek <year> <month> <day>")
+
exit(1)
+
else:
+
print(
+
datetime(int(sys.argv[1]), int(sys.argv[2]), int(sys.argv[3])).strftime(
+
"%A"
+
)
+
)
+16
.archived/delete_yum_repo
···
+
#!/usr/bin/env zsh
+
+
selection=$(find /etc/yum.repos.d -type f -name "*.repo" | gum choose --no-limit)
+
+
format_string_array=(
+
"# You selected the following repo file(s):\n"
+
)
+
+
echo "$selection" | while read -r line; do format_string_array+=("- $line\n"); done
+
echo "${format_string_array[@]}" | gum format
+
echo ""
+
if gum confirm "Are you sure you want to delete?"; then
+
sudo rm -v $(echo "$selection")
+
else
+
echo ":raised_eyebrow: Oh, okay then. Carry on." | gum format -t emoji
+
fi
+28
.archived/encrypt_mail
···
+
#!/usr/bin/env bash
+
+
set -euxo pipefail
+
+
if test -f "${HOME}/.env_common"; then
+
source "${HOME}/.env_common"
+
fi
+
+
cleanup() {
+
echo "Cleaning up"
+
rm -rfv /tmp/mail*
+
}
+
+
trap cleanup 0 1 2 3 6
+
+
DATE=$(date '+%Y-%m-%d')
+
tar czf "/tmp/mail-${DATE}.tar.gz" /naspool/mail
+
age --recipient 'age12pcwr6d8w6wfh5ymarphypzlyqxza3c3xj7cseturzyu70s02umske6mt6' --output "/tmp/mail-${DATE}.tar.gz.age" "/tmp/mail-${DATE}.tar.gz"
+
scp "/tmp/mail-${DATE}.tar.gz.age" root@aux-remote.carp-wyvern.ts.net:/bpool/encrypted_mail
+
ssh root@aux-remote.carp-wyvern.ts.net -- find /bpool/encrypted_mail -maxdepth 1 -type f -mtime +7 -delete
+
+
curl \
+
-H prio:default \
+
-H tags:incoming_envelope \
+
-d "encrypt_mail: success" \
+
"${NTFY_SERVER}/backups"
+
+
# vim: ts=4 sts=4 sw=4 et ai ft=bash
+17
.archived/fedora_rm_old_kernels
···
+
#!/usr/bin/env bash
+
+
# Source: https://docs.fedoraproject.org/en-US/quick-docs/upgrading-fedora-offline/
+
+
old_kernels=($(dnf repoquery --installonly --latest-limit=-1 -q))
+
if [ "${#old_kernels[@]}" -eq 0 ]; then
+
echo "No old kernels found"
+
exit 0
+
fi
+
+
if ! sudo dnf remove "${old_kernels[@]}"; then
+
echo "Failed to remove old kernels"
+
exit 1
+
fi
+
+
echo "Removed old kernels"
+
exit 0
+47
.archived/fetch_combined_trackers_list.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "requests",
+
# "docopt",
+
# ]
+
# ///
+
+
"""fetch_combined_trackers_list.py
+
+
Description:
+
This script fetches a combined list of tracker URLs from plaintext lists hosted
+
on the web and writes them to a file in the current working directory.
+
+
Usage:
+
fetch_combined_trackers_list.py
+
fetch_combined_trackers_list.py -h
+
+
Options:
+
-h, --help show this help message and exit
+
"""
+
+
from pathlib import Path
+
+
import requests
+
from docopt import docopt
+
+
if __name__ == "__main__":
+
args = docopt(__doc__) # type: ignore
+
+
live_trackers_list_urls = [
+
"https://newtrackon.com/api/stable",
+
"https://trackerslist.com/best.txt",
+
"https://trackerslist.com/http.txt",
+
"https://raw.githubusercontent.com/ngosang/trackerslist/master/trackers_best.txt",
+
]
+
+
combined_trackers_urls = []
+
for url in live_trackers_list_urls:
+
response = requests.get(url, timeout=60)
+
tracker_urls = [x for x in response.text.splitlines() if x != ""]
+
combined_trackers_urls.extend(tracker_urls)
+
+
tracker_urls_filename = Path.cwd().joinpath("tracker_urls.txt")
+
with open(tracker_urls_filename, "w") as tf:
+
for url in combined_trackers_urls:
+
tf.write(f"{url}\n")
+48
.archived/fetch_scihub_infohashes.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "requests",
+
# "docopt",
+
# ]
+
# ///
+
+
"""fetch_scihub_infohashes.py
+
+
Description:
+
This script fetches the infohashes of all Sci Hub torrents and writes them to a
+
plaintext file. The plaintext file is intended to be appended to a bittorrent
+
tracker whitelist. E.g., /etc/opentracker/whitelist.txt.
+
+
Optionally set the TORRENT_JSON_URL for the Sci Hub torrent health checker, or
+
run the script with no arguments to use the default.
+
+
Default health check URL:
+
https://zrthstr.github.io/libgen_torrent_cardiography/torrent.json
+
+
Usage:
+
fetch_scihub_infohashes.py [TORRENT_JSON_URL]
+
fetch_scihub_infohashes.py -h
+
+
Options:
+
-h, --help show this help message and exit.
+
"""
+
+
import json
+
from pathlib import Path
+
+
import requests
+
from docopt import docopt
+
+
if __name__ == "__main__":
+
args = docopt(__doc__) # type: ignore
+
url = (
+
args["TORRENT_JSON_URL"]
+
if args["TORRENT_JSON_URL"]
+
else "https://zrthstr.github.io/libgen_torrent_cardiography/torrent.json"
+
)
+
response = requests.get(url, timeout=60)
+
json_data = json.loads(response.text)
+
torrent_infohashes = [f"{x["infohash"]}\n" for x in json_data]
+
+
with open(Path.cwd().joinpath("scihub_torrent_infohashes.txt"), "w") as tf:
+
tf.writelines(torrent_infohashes)
+30
.archived/gemma3
···
+
#!/usr/bin/env bash
+
+
# This script is for testing the quality of generated responses to basic
+
# knowledge questions from the Gemma3:27b LLM model.
+
#
+
# It requires an Ollama server with the gemma3:27b model available, either
+
# defined by `export OLLAMA_SERVER=` or it defaults to localhost:11434.
+
#
+
# Dependencies:
+
# - curl
+
# - gum
+
# - glow
+
# - jq
+
# - ollama
+
+
set -euo pipefail
+
+
OLLAMA_SERVER="${OLLAMA_SERVER:=localhost:11434}"
+
+
MESSAGE=$(gum write --placeholder="Ask Gemma3...")
+
+
gum style --padding="1 2" --border-foreground="#a6e3a1" --border="normal" "$MESSAGE"
+
+
RESPONSE=$(gum spin -s "dot" --title="Generating response..." --show-output -- curl -s -S -H "Content-Type: application/json" -X POST -d '{
+
"model": "gemma3:27b",
+
"messages": [{"role": "user", "content": "'"$MESSAGE"'"}],
+
"stream": false
+
}' "http://${OLLAMA_SERVER}/api/chat")
+
+
printf '%s' "$RESPONSE" | jq -r ".message.content" | glow -p
+7
.archived/install_just
···
+
#!/usr/bin/env bash
+
+
set -euxo pipefail
+
+
curl --proto '=https' --tlsv1.2 -sSf https://just.systems/install.sh | bash -s -- --to /usr/local/bin
+
+
exit 0
+75
.archived/list_torrents.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "qbittorrent-api",
+
# "docopt",
+
# ]
+
# ///
+
+
"""list_torrents.py
+
+
Description:
+
Fetch a list of torrents from a qBittorrent instance running on localhost.
+
The qBittorrent instance must be configured to allow login on localhost
+
without authentication. The output is formatted into a plaintext table.
+
+
Usage:
+
list_torrents.py
+
list_torrents.py -h
+
+
Options:
+
-h, --help show this help message and exit
+
"""
+
+
import qbittorrentapi
+
from docopt import docopt
+
+
+
# convert byte units
+
def human_bytes(input_bytes: int) -> str:
+
B = float(input_bytes)
+
KiB = float(1024)
+
MiB = float(KiB**2)
+
GiB = float(KiB**3)
+
TiB = float(KiB**4)
+
+
match B:
+
case B if B < KiB:
+
return "{0} {1}".format(B, "bytes" if 0 == B > 1 else "byte")
+
case B if KiB <= B <= MiB:
+
return "{0:.2f} KiB".format(B / KiB)
+
case B if MiB <= B <= GiB:
+
return "{0:.2f} MiB".format(B / MiB)
+
case B if GiB <= B <= TiB:
+
return "{0:.2f} GiB".format(B / GiB)
+
case B if TiB <= B:
+
return "{0:.2f} TiB".format(B / TiB)
+
case _:
+
return ""
+
+
+
def print_ssv():
+
with qbittorrentapi.Client(
+
host="localhost", port=8080, username="", password=""
+
) as qbt_client:
+
try:
+
qbt_client.auth_log_in()
+
except qbittorrentapi.LoginFailed as e:
+
print(e)
+
+
sorted_torrents = sorted(
+
qbt_client.torrents_info(), key=lambda d: d.ratio, reverse=True
+
)
+
print("Name Size # of Trackers Ratio Uploaded")
+
for torrent in sorted_torrents:
+
name = torrent.name
+
size = human_bytes(torrent.total_size)
+
trackers = torrent.trackers_count
+
ratio = torrent.ratio
+
uploaded = human_bytes(torrent.uploaded)
+
print(f"{name} {size} {trackers} {ratio} {uploaded}")
+
+
+
if __name__ == "__main__":
+
args = docopt(__doc__) # type: ignore
+
print_ssv()
+35
.archived/mediawiki_backup
···
+
#!/usr/bin/env bash
+
+
# To be run on the same host as mediawiki
+
+
set -euxo pipefail
+
+
DATE=$(date '+%Y-%m-%d')
+
TEMP=$(mktemp -d)
+
MWDIR="${TEMP}/mediawiki-${DATE}"
+
+
cleanup() {
+
sudo rm -rf "$TEMP"
+
sudo rm -fv "${PWD}/mediawiki-${DATE}.tar"
+
}
+
+
trap cleanup 0 1 2 3 6
+
+
sudo mkdir "$MWDIR"
+
sudo cp -rf /var/www/mediawiki "$MWDIR"/
+
sudo find \
+
/var/lib/automysqlbackup/daily/mw1523532 \
+
-maxdepth 1 \
+
-type f \
+
-mtime -1 \
+
-exec cp -fv {} "$MWDIR" \;
+
sudo tar cf "mediawiki-${DATE}.tar" -C "$MWDIR" .
+
sudo cp -fv "${PWD}/mediawiki-${DATE}.tar" /mnt/mediawiki_backups/
+
sudo find \
+
/mnt/mediawiki_backups \
+
-maxdepth 1 \
+
-type f \
+
-mtime +7 \
+
-exec rm -fv {} \;
+
+
# vim: ts=4 sts=4 sw=4 et ai ft=bash
+13
.archived/natpmpcd
···
+
#!/bin/sh
+
+
port=$(/usr/local/bin/natpmpc -a 1 0 udp 60 -g 10.2.0.1 | grep "Mapped public port" | awk '{print $4}')
+
echo $port | tee /usr/local/etc/natvpn_port.txt
+
+
while true; do
+
date
+
if ! /usr/local/bin/natpmpc -a 1 0 udp 60 -g 10.2.0.1 && /usr/local/bin/natpmpc -a 1 0 tcp 60 -g 10.2.0.1; then
+
echo "error Failure natpmpc $(date)"
+
break
+
fi
+
sleep 45
+
done
+26
.archived/nc_snap_create
···
+
#!/usr/bin/env nu
+
+
let refresh_token = (open ($env.HOME | path join .netcup_refresh_token.json) | get refresh_token)
+
+
def get_access_token [refresh_token] {
+
(curl -s "https://www.servercontrolpanel.de/realms/scp/protocol/openid-connect/token" -d "client_id=scp" -d $"refresh_token=($refresh_token)" -d "grant_type=refresh_token")
+
| from json
+
| get access_token
+
}
+
+
def get_servers [access_token] {
+
(curl -s -X "GET" "https://www.servercontrolpanel.de/scp-core/api/v1/servers" -H $"Authorization: Bearer ($access_token)" -H "accept: application/hal+json")
+
| from json
+
}
+
+
let access_token = (get_access_token ($refresh_token))
+
let servers = (get_servers ($access_token))
+
+
let date_now = (date now | format date "%Y%m%d")
+
+
($servers | get id) | each {
+
(curl -s -X "POST" $"https://www.servercontrolpanel.de/scp-core/api/v1/servers/($in)/snapshots" -H $"Authorization: Bearer ($access_token)" -H "accept: application/hal+json" -H "Content-Type: application/json" -d $'{"name": "($date_now)", "description": "via script", "diskName": "vda", "onlineSnapshot": false}')
+
| from json
+
}
+
+
# vim: sw=4 sts=4 ts=4 ai et ft=nu
+21
.archived/nc_snap_optimize
···
+
#!/usr/bin/env nu
+
+
let refresh_token = (open ($env.HOME | path join .netcup_refresh_token.json) | get refresh_token)
+
+
def get_access_token [refresh_token] {
+
(curl -s "https://www.servercontrolpanel.de/realms/scp/protocol/openid-connect/token" -d "client_id=scp" -d $"refresh_token=($refresh_token)" -d "grant_type=refresh_token")
+
| from json
+
| get access_token
+
}
+
+
def get_servers [access_token] {
+
(curl -s -X "GET" "https://www.servercontrolpanel.de/scp-core/api/v1/servers" -H $"Authorization: Bearer ($access_token)" -H "accept: application/hal+json")
+
| from json
+
}
+
+
let access_token = (get_access_token ($refresh_token))
+
let servers = (get_servers ($access_token))
+
+
($servers | get id) | par-each { |p| (curl -s -X "POST" $"https://www.servercontrolpanel.de/scp-core/api/v1/servers/($p)/storageoptimization?startAfterOptimization=true" -H $"Authorization: Bearer ($access_token)" -H "accept: application/hal+json") | from json }
+
+
# vim: sw=4 sts=4 ts=4 ai et ft=nu
+362
.archived/oci_reg_helper
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "docopt",
+
# "rich",
+
# ]
+
# ///
+
+
"""OCI Registry Helper
+
+
Usage:
+
ocirh <subcommand> [<args>...]
+
+
Subcommands:
+
repos Lists repositories in the registry. Repos correspond to images
+
pushed to the registry.
+
tags Lists tags of the given repository.
+
manifests Lists manifests of the given repository for the given tag.
+
rmi Removes a tag from an image. If given tag is the only tag,
+
removes the image.
+
gc Runs garbage collection on the registry. Requires SSH public key
+
access to registry server.
+
rmr Removes given repository from the registry. Requires SSH public
+
key access to registry server.
+
+
Examples:
+
Suppose we have an image called 'fedora-toolbox' tagged with 'latest'.
+
+
ocirh repos
+
ocirh tags fedora-toolbox
+
ocirh manifests fedora-toolbox latest
+
ocirh rmi fedora-toolbox latest
+
ocirh gc
+
ocirh rmr fedora-toolbox
+
"""
+
import http.client
+
import json
+
import logging
+
import math
+
import subprocess
+
+
from docopt import docopt
+
from rich import print
+
from rich.console import Group
+
from rich.logging import RichHandler
+
from rich.panel import Panel
+
from rich.table import Table
+
from rich.text import Text
+
from rich.traceback import install
+
from rich.tree import Tree
+
+
install(show_locals=True)
+
+
# Rich logging handler
+
FORMAT = "%(message)s"
+
logging.basicConfig(
+
level="NOTSET",
+
format=FORMAT,
+
datefmt="[%X]",
+
handlers=[RichHandler(rich_tracebacks=True)],
+
)
+
log = logging.getLogger("rich")
+
+
+
# Taken from https://stackoverflow.com/a/14822210
+
#
+
# How this function works:
+
# If size_bytes == 0, returns 0 B.
+
# size_name is a tuple containing binary prefixes for bytes.
+
#
+
# math.log takes the logarithm of size_bytes to base 1024.
+
# math.floor rounds down the result of math.log to the nearest integer.
+
# int ensures the result of math.floor is of type int, and stores it in i.
+
# The value of i is used to determine which binary prefix to use from
+
# size_name.
+
#
+
# math.pow returns the value of 1024 raised to the power of i, stores it in p.
+
#
+
# round takes the value of size_bytes, divides it by p, and stores the result
+
# in s at precision of 2 decimal places.
+
#
+
# A formatted string with size s and binary prefix size_name[i] is returned.
+
def convert_size(size_bytes: int) -> str:
+
"""
+
Converts a decimal integer of bytes to its respective binary-prefixed size.
+
+
Parameters:
+
size_bytes (int): A decimal integer.
+
+
Returns:
+
(str): Binary-prefixed size of size_bytes formatted as a string.
+
"""
+
if size_bytes == 0:
+
return "0 B"
+
size_name = ("B", "KiB", "MiB", "GiB")
+
i = int(math.floor(math.log(size_bytes, 1024)))
+
p = math.pow(1024, i)
+
s = round(size_bytes / p, 2)
+
return "%s %s" % (s, size_name[i])
+
+
+
REGISTRY_URL = "registry.hyperreal.coffee"
+
+
+
def get_auth() -> str:
+
"""
+
Get the base64 encoded password for registry autentication.
+
+
Returns:
+
auth (str): A string containing the base64 encoded password.
+
"""
+
try:
+
with open("/run/user/1000/containers/auth.json", "r") as authfile:
+
json_data = json.loads(authfile.read())
+
except Exception as ex:
+
log.exception(ex)
+
+
auth = json_data["auths"][REGISTRY_URL]["auth"]
+
return auth
+
+
+
def get_headers() -> dict:
+
"""
+
Returns headers for HTTP request authentication to the registry server.
+
+
Returns:
+
headers (dict): A dict of HTTP headers
+
"""
+
return {
+
"Accept": "application/vnd.oci.image.manifest.v1+json",
+
"Authorization": "Basic " + get_auth(),
+
}
+
+
+
def get_json_response(request: str, url: str) -> dict:
+
"""
+
Connects to registry and returns response data as JSON.
+
+
Parameters:
+
request (str): A string like "GET" or "DELETE"
+
url (str) : A string containing the URL of the requested data
+
+
Returns:
+
json_data (dict): JSON data as a dict object
+
"""
+
conn = http.client.HTTPSConnection(REGISTRY_URL)
+
headers = get_headers()
+
try:
+
conn.request(request, url, "", headers)
+
res = conn.getresponse()
+
data = res.read()
+
json_data = json.loads(data.decode("utf-8"))
+
except Exception as ex:
+
log.exception(ex)
+
+
return json_data
+
+
+
def get_repositories():
+
"""
+
Prints a Rich Tree that lists the repositories of the registry.
+
"""
+
+
json_data = get_json_response("GET", "/v2/_catalog")
+
repo_tree = Tree("[green]Repositories")
+
for repo in json_data["repositories"]:
+
repo_tree.add("[blue]%s" % repo)
+
+
print(repo_tree)
+
+
+
def get_tags(repo: str):
+
"""
+
Prints a Rich Tree that lists the tags for the given repository.
+
+
Parameters:
+
repo (str): A string containing the name of the repo
+
"""
+
json_data = get_json_response("GET", "/v2/" + repo + "/tags/list")
+
tags_tree = Tree("[green]%s tags" % repo)
+
for tag in json_data["tags"]:
+
tags_tree.add("[cyan]:%s" % tag)
+
+
print(tags_tree)
+
+
+
def get_manifests(repo: str, tag: str):
+
"""
+
Prints a Rich grid table that displays the manifests and metadata of the
+
image repository.
+
+
Parameters:
+
repo (str): A string containing the name of the repo
+
tag (str) : A string containing the tag of the desired image
+
"""
+
json_data = get_json_response("GET", "/v2/" + repo + "/manifests/" + tag)
+
+
grid_meta = Table.grid(expand=True)
+
grid_meta.add_column()
+
grid_meta.add_column()
+
meta_schema_version_key = Text("Schema version")
+
meta_schema_version_key.stylize("bold green", 0)
+
meta_schema_version_value = Text(str(json_data["schemaVersion"]))
+
meta_media_type_key = Text("Media type")
+
meta_media_type_key.stylize("bold green", 0)
+
meta_media_type_value = Text(json_data["mediaType"])
+
grid_meta.add_row(meta_schema_version_key, meta_schema_version_value)
+
grid_meta.add_row(meta_media_type_key, meta_media_type_value)
+
+
grid_config = Table.grid(expand=True)
+
grid_config.add_column()
+
grid_config.add_column()
+
config_media_type_key = Text("Media type")
+
config_media_type_key.stylize("bold green", 0)
+
config_media_type_value = Text(json_data["config"]["mediaType"])
+
config_digest_key = Text("Digest")
+
config_digest_key.stylize("bold green", 0)
+
config_digest_value = Text(json_data["config"]["digest"])
+
config_size_key = Text("Size")
+
config_size_key.stylize("bold green", 0)
+
config_size_value = Text(convert_size(json_data["config"]["size"]))
+
grid_config.add_row(config_media_type_key, config_media_type_value)
+
grid_config.add_row(config_digest_key, config_digest_value)
+
grid_config.add_row(config_size_key, config_size_value)
+
+
grid_annotations = Table.grid(expand=True)
+
grid_annotations.add_column()
+
grid_annotations.add_column()
+
for item in json_data["annotations"].items():
+
annotations_item_key = Text(item[0])
+
annotations_item_key.stylize("bold green", 0)
+
annotations_item_value = Text(item[1])
+
grid_annotations.add_row(annotations_item_key, annotations_item_value)
+
+
total_size = sum(layer.get("size") for layer in json_data["layers"])
+
table_layers = Table(box=None, show_footer=True)
+
table_layers.add_column(
+
"Digest", justify="right", style="yellow", no_wrap=True, footer="Total size:"
+
)
+
table_layers.add_column(
+
"Size",
+
justify="left",
+
style="cyan",
+
no_wrap=True,
+
footer=convert_size(total_size),
+
)
+
for layer in json_data["layers"]:
+
table_layers.add_row(layer.get("digest"), convert_size(layer.get("size")))
+
+
panel_group = Group(
+
Panel(grid_meta, title="[bold blue]Metadata"),
+
Panel(grid_config, title="[bold blue]Config"),
+
Panel(grid_annotations, title="Annotations"),
+
Panel(
+
table_layers,
+
title="[bold blue]Layers: %s" % json_data["layers"][0].get("mediaType"),
+
),
+
)
+
print(Panel(panel_group, title="[bold blue]%s:%s" % (repo, tag)))
+
+
+
def delete_image(repo: str, tag: str):
+
"""
+
Removes the given tag from the image. If the given tag is the only tag,
+
removes the image.
+
+
Parameters:
+
repo (str): A string containing the name of the repo
+
tag (str) : A string containing the tag to be removed
+
"""
+
try:
+
conn = http.client.HTTPSConnection(REGISTRY_URL)
+
headers = get_headers()
+
conn.request("GET", "/v2/" + repo + "/manifests/" + tag, "", headers)
+
res = conn.getresponse()
+
docker_content_digest = res.getheader("Docker-Content-Digest")
+
except Exception as ex:
+
log.exception(ex)
+
+
try:
+
conn.request(
+
"DELETE", "/v2/" + repo + "/manifests/" + docker_content_digest, "", headers
+
)
+
except Exception as ex:
+
log.exception(ex)
+
+
print("Untagged %s:%s successfully" % (repo, tag))
+
+
+
def garbage_collection():
+
"""
+
Runs garbage collection command on the remote registry server. Requires
+
SSH public key access.
+
"""
+
command = "/usr/local/bin/registry-gc"
+
+
try:
+
ssh = subprocess.Popen(
+
["ssh", "%s" % REGISTRY_URL, command],
+
shell=False,
+
stdout=subprocess.PIPE,
+
stderr=subprocess.PIPE,
+
text=True,
+
)
+
result = ssh.stdout.readlines()
+
if result == []:
+
log.error(ssh.stderr.readlines())
+
else:
+
print(result)
+
except Exception as ex:
+
log.exception(ex)
+
+
+
def remove_repo(repo: str):
+
"""
+
Runs command on remote registry server to remove the given repo.
+
+
Parameters:
+
repo (str): A string containing the name of the repo.
+
"""
+
command = "/usr/local/bin/registry-rm-repo " + repo
+
+
try:
+
ssh = subprocess.Popen(
+
["ssh", "%s" % REGISTRY_URL, command],
+
shell=False,
+
stdout=subprocess.PIPE,
+
stderr=subprocess.PIPE,
+
text=True,
+
)
+
result = ssh.stdout.readlines()
+
if result == []:
+
log.error(ssh.stderr.readlines())
+
else:
+
print(result)
+
except Exception as ex:
+
log.exception(ex)
+
+
+
if __name__ == "__main__":
+
args = docopt(__doc__, options_first=True)
+
match args["<subcommand>"]:
+
case "repos":
+
get_repositories()
+
case "tags":
+
get_tags(args["<args>"][0])
+
case "manifests":
+
get_manifests(args["<args>"][0], args["<args>"][1])
+
case "rmi":
+
delete_image(args["<args>"][0], args["<args>"][1])
+
case "gc":
+
garbage_collection()
+
case "rmr":
+
remove_repo(args["<args>"])
+
case _:
+
if args["<subcommand>"] in ["help", None]:
+
exit(subprocess.call(["python3", "ocirh", "--help"]))
+
else:
+
exit(
+
"%r is not a ocirh subcommand. See 'ocirh --help."
+
% args["<subcommand>"]
+
)
+48
.archived/publish_mastodon_archive.py
···
+
#!/usr/bin/env python3
+
+
import json
+
from html import unescape
+
+
+
def main():
+
with open("/home/jas/downloads/mastodon-archive/outbox.json", "r") as jf:
+
json_data = json.load(jf)
+
+
print("#+TITLE: Mastodon posts archive from 2024-02-16 to 2025-01-31")
+
print("#+DATE: 2025-02-02")
+
print("#+TAGS[]: mastodon archives")
+
print("#+AUTHOR: hyperreal")
+
print("#+SLUG: mastodon_archive-20240216-20250131")
+
print("#+LAYOUT: post")
+
print()
+
+
for item in sorted(
+
json_data["orderedItems"], key=json_data["orderedItems"].index, reverse=True
+
):
+
if type(item.get("object")) is dict:
+
published = item.get("object").get("published")
+
content = item.get("object").get("content")
+
attachment = (
+
item.get("object").get("attachment")
+
if len(item.get("object").get("attachment")) >= 1
+
else None
+
)
+
+
print(f"** {published}")
+
print("#+BEGIN_EXPORT html")
+
if type(content) is str:
+
print(unescape(content))
+
print("#+END_EXPORT")
+
if attachment:
+
for item in attachment:
+
if item.get("name"):
+
print(f"#+CAPTION: {item.get('name')}")
+
print(
+
f"[[https://files.hyperreal.coffee/mastodon_20240216-20250131/{item.get('url')}]]"
+
)
+
print("-----")
+
print()
+
+
+
if __name__ == "__main__":
+
main()
+6
.archived/qbittorrent_state
···
+
#!/usr/bin/env bash
+
+
set -euo pipefail
+
+
DHT_NODES=$(curl -s "http://${1}:8080/api/v2/transfer/info" | jq '.dht_nodes')
+
curl -d "${DHT_NODES} nodes" http://localhost:8080/qbittorrent_state
+15
.archived/qbt_stats_html.nu
···
+
#!/usr/bin/env nu
+
+
let old_head = "<html><style>body { background-color:white;color:black; }</style><body>"
+
let new_head = (
+
["<html><head><title>Torrent Stats</title><link type="text/css" rel="stylesheet" href="https://files.hyperreal.coffee/css/style1.css"/></head><body><h4>Last updated:", (date now | format date "%F %T%:z"), "</h4>"]
+
| str join ' '
+
)
+
+
(
+
/home/jas/admin-scripts/python/list_torrents.py
+
| from ssv -m 2
+
| to html
+
| str replace ($old_head) ($new_head)
+
| save -f -r /home/jas/public/html/torrents.html
+
)
+79
.archived/qbt_sum_size.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "qbittorrent-api",
+
# "docopt",
+
# ]
+
# ///
+
+
"""qbt_sum_size.py
+
+
Description:
+
Get the total size of all added torrents and the total size of all completed
+
torrents from a qBittorrent instance.
+
+
Usage:
+
qbt_sum_size.py (HOSTNAME) (USERNAME) (PASSWORD)
+
qbt_sum_size.py -h
+
+
Examples:
+
qbt_sum_size.py "http://localhost:8080" "admin" "adminadmin"
+
qbt_sum_size.py "https://cat.seedhost.eu/lol/qbittorrent" "lol" "supersecretpassword"
+
+
Options:
+
-h, --help show this help message and exit
+
"""
+
+
import qbittorrentapi
+
from docopt import docopt
+
+
+
# convert byte units
+
def human_bytes(bites: int) -> str:
+
B = float(bites)
+
KiB = float(1024)
+
MiB = float(KiB**2)
+
GiB = float(KiB**3)
+
TiB = float(KiB**4)
+
+
match B:
+
case B if B < KiB:
+
return "{0} {1}".format(B, "bytes" if 0 == B > 1 else "byte")
+
case B if KiB <= B < MiB:
+
return "{0:.2f} KiB".format(B / KiB)
+
case B if MiB <= B < GiB:
+
return "{0:.2f} MiB".format(B / MiB)
+
case B if GiB <= B < TiB:
+
return "{0:.2f} GiB".format(B / GiB)
+
case B if TiB <= B:
+
return "{0:.2f} TiB".format(B / TiB)
+
case _:
+
return ""
+
+
+
if __name__ == "__main__":
+
args = docopt(__doc__) # type: ignore
+
+
completed_torrent_sizes = []
+
total_added_bytes = int()
+
+
with qbittorrentapi.Client(
+
host=args["HOSTNAME"], username=args["USERNAME"], password=args["PASSWORD"]
+
) as qbt_client:
+
try:
+
qbt_client.auth_log_in()
+
except qbittorrentapi.LoginFailed as e:
+
print(e)
+
+
for torrent in qbt_client.torrents_info():
+
if torrent.completion_on != 0:
+
completed_torrent_sizes.append(torrent.total_size)
+
+
total_added_bytes = sum(
+
[torrent.total_size for torrent in qbt_client.torrents_info()]
+
)
+
+
total_completed_bytes = sum(completed_torrent_sizes)
+
+
print(f"\nTotal completed size: {human_bytes(total_completed_bytes)}")
+
print(f"Total added size: {human_bytes(total_added_bytes)}\n")
+15
.archived/record_mastodon_media_size
···
+
#!/usr/bin/env bash
+
+
# /etc/cron.daily/record_mastodon_media_size.bash
+
+
set -euo pipefail
+
+
RECORD_FILE="/var/log/mastodon_media_size.log"
+
+
file_count=$(sudo /home/jas/.cargo/bin/dust -c -P -d 0 -b -f -R -p /home/mastodon/live/public/system | awk '{print $3}')
+
+
sudo /home/jas/.cargo/bin/dust \
+
-c -P -d 0 -b -R -p \
+
/home/mastodon/live/public/system |
+
awk -v fc="$file_count" -v tstamp="$(date '+%Y-%m-%d-%H%M%S')" '{print tstamp,$1,$3,fc}' |
+
tee -a "${RECORD_FILE}"
+20
.archived/registry_gc
···
+
#!/usr/bin/env bash
+
# registry-gc
+
# description: run garbage collection on registry
+
# name: registry
+
+
set -eu
+
+
if ! sudo podman container exists registry; then
+
echo "registry container does not exist"
+
exit 1
+
fi
+
+
if sudo podman container exec -it registry bin/registry garbage-collect /etc/docker/registry/config.yml -m; then
+
echo "Registry garbage collection ran successfully"
+
exit 0
+
else
+
echo "Error running registry garbage collection"
+
exit 1
+
fi
+
+18
.archived/registry_rm_repo
···
+
#!/usr/bin/env bash
+
# registry-rm-repo
+
# description: remove repository directory from registry data directory
+
+
set -eu
+
+
REPO_DIR="/mnt/registry_data/data/docker/registry/v2/repositories/"
+
REPO_TO_DELETE="$1"
+
+
if [ -d "${REPO_DIR}/${REPO_TO_DELETE}" ]; then
+
sudo rm -rf "${REPO_DIR}/${REPO_TO_DELETE}"
+
echo "${REPO_TO_DELETE} repo successfully deleted"
+
exit 0
+
else
+
echo "${REPO_TO_DELETE} repo not found"
+
exit 1
+
fi
+
+35
.archived/resend_notify.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "resend",
+
# ]
+
# ///
+
+
import subprocess
+
import sys
+
from pathlib import Path
+
+
import resend
+
+
+
def main():
+
resend.api_key = Path("/usr/local/etc/resend_api_key.txt").read_text().strip("\n")
+
+
if len(sys.argv) != 3:
+
exit("Usage: resend_notify.py SUBJECT MESSAGE")
+
subject = sys.argv[1]
+
message = sys.argv[2]
+
+
params: resend.Emails.SendParams = {
+
"from": "Admin <admin@hyperreal.coffee>",
+
"to": ["hyperreal@moonshadow.dev"],
+
"subject": subject,
+
"text": message,
+
}
+
+
email = resend.Emails.send(params)
+
print(email)
+
+
+
if __name__ == "__main__":
+
main()
+38
.archived/rofimaim
···
+
#!/usr/bin/env bash
+
#
+
# Rofi powered menu to take a screenshot of the whole screen, a selected area or
+
# the active window. The image is then saved and copied to the clipboard.
+
# Uses: date maim notify-send rofi xclip xdotool
+
#
+
# This is lifted from https://gitlab.com/vahnrr/rofi-menus and modified by
+
# hyperreal <hyperreal64@pm.me> on 2023-09-06T15:09:58-05:00
+
+
save_location="${HOME}/Nextcloud/pictures/screenshots"
+
if ! test -d "${HOME}/Nextcloud/pictures/screenshots"; then
+
mkdir "${HOME}/Nextcloud/pictures/screenshots"
+
fi
+
screenshot_path="$save_location/$(date +'%Y-%m-%d-%H%M%S').png"
+
+
screen='๏’ฉ Screen'
+
area='๏„ฅ Select area'
+
window='๏’ˆ Window'
+
+
chosen=$(printf '%s;%s;%s' "$screen" "$area" "$window" |
+
rofi -theme-str 'window { width: 10em; height: 10em; }' \
+
-P 'Screenshot' \
+
-dmenu \
+
-sep ';' \
+
-selected-row 1)
+
+
case "$chosen" in
+
"$screen") extra_args='--delay=1' ;;
+
"$area") extra_args='--delay=0.1 --select --highlight --color=0.85,0.87,0.91,0.2' ;;
+
"$window") extra_args="--delay=1 --window=$(xdotool getactivewindow)" ;;
+
*) exit 1 ;;
+
esac
+
+
# The variable is used as a command's options, so it shouldn't be quoted.
+
# shellcheck disable=2086
+
maim --hidecursor --quiet --quality=10 --format='png' $extra_args "$screenshot_path" && {
+
notify-send --icon=org.xfce.screenshooter "Screenshot saved as $screenshot_path"
+
}
+213
.archived/scihub_knapsack.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "qbittorrent-api",
+
# "requests",
+
# "docopt",
+
# ]
+
# ///
+
+
"""scihub_knapsack.py
+
+
Description:
+
This script will add torrents to a qBittorrent instance until a specified size
+
limit is reached.
+
+
By default, the larger torrents are prioritized in descending order, but the
+
script can be run with the --smaller flag to prioritize smaller torrents in
+
ascending order.
+
+
The script will select only torrents with less than or equal to <max_seeders>.
+
+
Usage:
+
scihub_knapsack.py [--smaller] [--dry-run] -H <hostname> -U <username> -P <password> -S <size> -s <max_seeders>
+
scihub_knapsack.py -h
+
+
Examples:
+
scihub_knapsack.py -H http://localhost:8080 -U admin -P adminadmin -S 42T
+
scihub_knapsack.py --smaller -H https://qbt.hello.world -U admin -P adminadmin -S 2.2T
+
+
Options:
+
--smaller Prioritize from the smallest torrent sizes and work upward
+
to larger sizes. Default is to prioritize larger sizes.
+
--dry-run Only print the torrent names, total number of torrents, and
+
their total combined size instead of adding them to the
+
qBittorrent instance.
+
-H <hostname> Hostname of the server where the qBittorrent instance is
+
running.
+
-U <username> Username of the user to login to the qBittorrent instance.
+
-P <password> Password of the user to login to the qBittorrent instance.
+
-S <size> The maximum size, in GiB or TiB, of the knapsack to add Sci
+
Hub torrents to. Must be a positive integer or float. Must
+
have either G or T on the end, which represents GiB or TiB.
+
-s <max_seeders> Select torrents with less than or equal to <max_seeders>
+
seeders. <max_seeders> is a positive integer.
+
"""
+
+
import json
+
+
import qbittorrentapi
+
import requests
+
from docopt import docopt
+
+
+
def get_torrent_health_data() -> list[dict]:
+
"""
+
Fetch Sci Hub torrent health checker data from the given URL. The URL
+
should refer to a JSON-formatted file.
+
"""
+
TORRENT_HEALTH_URL = (
+
"https://zrthstr.github.io/libgen_torrent_cardiography/torrent.json"
+
)
+
response = requests.get(TORRENT_HEALTH_URL, timeout=60)
+
return json.loads(response.text)
+
+
+
def convert_size_to_bytes(size: str) -> int:
+
"""
+
Convert the given size string to bytes.
+
+
Example: 42G --> 45097156608 bytes
+
"""
+
total_bytes = int()
+
+
if size.endswith("T"):
+
total_bytes = int(size.split("T")[0]) * (1024**4)
+
+
if size.endswith("G"):
+
total_bytes = int(size.split("G")[0]) * (1024**3)
+
+
return total_bytes
+
+
+
def human_bytes(bites: int) -> str:
+
"""
+
Convert bytes to KiB, MiB, GiB, or TiB.
+
+
Example: 45097156608 bytes -> 42 GiB
+
"""
+
B = float(bites)
+
KiB = float(1024)
+
MiB = float(KiB**2)
+
GiB = float(KiB**3)
+
TiB = float(KiB**4)
+
+
match B:
+
case B if B < KiB:
+
return "{0} {1}".format(B, "bytes" if 0 == B > 1 else "byte")
+
case B if KiB <= B < MiB:
+
return "{0:.2f} KiB".format(B / KiB)
+
case B if MiB <= B < GiB:
+
return "{0:.2f} MiB".format(B / MiB)
+
case B if GiB <= B < TiB:
+
return "{0:.2f} GiB".format(B / GiB)
+
case B if TiB <= B:
+
return "{0:.2f} TiB".format(B / TiB)
+
case _:
+
return ""
+
+
+
def get_knapsack_weight(knapsack: list[dict]) -> str:
+
"""
+
Get the weight of the given knapsack in GiB or TiB.
+
"""
+
return human_bytes(sum([torrent["size_bytes"] for torrent in knapsack]))
+
+
+
def fill_knapsack(
+
max_seeders: int, knapsack_size: int, smaller: bool = False
+
) -> list[dict]:
+
"""
+
Fill the knapsack.
+
+
Arguments:
+
max_seeders: int -- Select only torrents with less than or equal to
+
this number of seeders
+
knapsack_size: int -- The size in bytes of the knapsack
+
smaller: bool -- Prioritize smaller sized torrents (Default = False)
+
+
Return value:
+
A list of dictionaries that represent the torrents.
+
"""
+
+
# List of torrents with less than or equal to <max_seeders>
+
torrents = [t for t in get_torrent_health_data() if t["seeders"] <= max_seeders]
+
+
# Sorted list of torrents with <max_seeders>. If smaller == True, sort them
+
# in ascending order by size_bytes. Else sort them in descending order by
+
# size_bytes.
+
sorted_torrents = (
+
sorted(torrents, key=lambda d: d["size_bytes"])
+
if smaller == True
+
else sorted(torrents, key=lambda d: d["size_bytes"], reverse=True)
+
)
+
+
# Sum the sizes of each torrent in sorted_torrents and add them to the
+
# knapsack until it is filled, then return the knapsack.
+
sum = 0
+
knapsack = []
+
for torrent in sorted_torrents:
+
if sum + torrent["size_bytes"] >= knapsack_size:
+
break
+
sum += torrent["size_bytes"]
+
knapsack.append(torrent)
+
+
return knapsack
+
+
+
if __name__ == "__main__":
+
args = docopt(__doc__) # type: ignore
+
hostname = args["-H"]
+
username = args["-U"]
+
password = args["-P"]
+
max_seeders = int(args["-s"])
+
knapsack_size = convert_size_to_bytes(args["-S"])
+
smaller = args["--smaller"]
+
dry_run = args["--dry-run"]
+
+
# Initialize client and login
+
qbt_client = qbittorrentapi.Client(
+
host=hostname, username=username, password=password
+
)
+
+
try:
+
qbt_client.auth_log_in()
+
except qbittorrentapi.LoginFailed as e:
+
print(e)
+
+
# Fill the knapsack
+
knapsack = fill_knapsack(max_seeders, knapsack_size, smaller)
+
+
# If it's a dry run, only print the knapsack's contents. Otherwise,
+
# add the knapsack's contents to the qBittorrent instance.
+
# When finished, print the number of items and the combined weight of all
+
# items in the knapsack. Before attempting to add items to the qBittorrent
+
# instance, check to see if libgen.rs is even working. If libgen.rs is down
+
# no torrents can be added to the qBittorrent instance, so exit with an
+
# notice.
+
if dry_run:
+
for torrent in knapsack:
+
print(torrent["link"])
+
else:
+
response = requests.get("https://libgen.is/")
+
if not response.ok:
+
exit(
+
"It appears https://libgen.is is currently down. Please try again later."
+
)
+
for torrent in knapsack:
+
for torrent in knapsack:
+
if "gen.lib.rus.ec" in torrent["link"]:
+
new_torrent = torrent["link"].replace("gen.lib.rus.ec", "libgen.is")
+
qbt_client.torrents_add(new_torrent, category="scihub")
+
+
if "libgen.rs" in torrent["link"]:
+
new_torrent = torrent["link"].replace("libgen.rs", "libgen.is")
+
qbt_client.torrents_add(new_torrent, category="scihub")
+
# print(f"Added {torrent['name']}")
+
+
qbt_client.auth_log_out()
+
+
print("----------------")
+
print(f"Count: {len(knapsack)} torrents")
+
print(f"Total combined size: {get_knapsack_weight(knapsack)}")
+
print("----------------")
+104
.archived/seed_armbian_torrents
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "qbittorrent-api",
+
# "requests",
+
# "bs4",
+
# "docopt"
+
# ]
+
# ///
+
+
"""seed_armbian_torrents.py
+
+
Description:
+
Armbian torrents seed script
+
+
This script will scrape https://mirrors.jevincanders.net/armbian/dl/ for
+
torrent files and add them to a qBittorrent instance. If there are already
+
Armbian torrents in the qBittorrent instance, they will be removed, and new
+
ones will be added in their place. This script is intended to be run under
+
/etc/cron.weekly or used in a systemd timer.
+
+
Usage:
+
seed_armbian_torrents.py (HOSTNAME) (USERNAME) (PASSWORD)
+
seed_armbian_torrents.py -h
+
+
Examples:
+
seed_armbian_torrents.py "http://localhost:8080" "admin" "adminadmin"
+
seed_armbian_torrents.py "https://cat.seedhost.eu/lol/qbittorrent" "lol" "pw"
+
+
Options:
+
-h, --help show this help message and exit.
+
"""
+
+
import os
+
+
import qbittorrentapi
+
import requests
+
from bs4 import BeautifulSoup
+
from docopt import docopt
+
+
+
def add_torrents(args: dict):
+
base_url = "https://mirrors.jevincanders.net/armbian/dl"
+
ignore_dirs = ["/armbian/", "_patch/", "_toolchain/"]
+
archive_dir_urls = []
+
+
page = requests.get(base_url).text
+
soup = BeautifulSoup(page, "html.parser")
+
for node in soup.find_all("a"):
+
if node.get("href") is not None:
+
if node.get("href").endswith("/") and node.get("href") not in ignore_dirs:
+
archive_dir_urls.append(f"{base_url}/{node.get("href")}archive/")
+
+
torrent_urls = []
+
for url in archive_dir_urls:
+
response = requests.get(url, timeout=60)
+
soup = BeautifulSoup(response.content, "html.parser")
+
links = soup.find_all("a")
+
for link in links:
+
if link.text.endswith(".torrent"):
+
torrent_urls.append(url + link.text)
+
+
try:
+
qbt_client = qbittorrentapi.Client(
+
host=args["HOSTNAME"], username=args["USERNAME"], password=args["PASSWORD"]
+
)
+
qbt_client.auth_log_in()
+
+
torrent_count = 0
+
for url in torrent_urls:
+
torrent_count = torrent_count + 1
+
+
print(f"There are {torrent_count} torrents to add. This gonna take a while...")
+
+
for url in torrent_urls:
+
qbt_client.torrents_add(url, category="distro")
+
print(f"Added {os.path.basename(url)}")
+
qbt_client.auth_log_out()
+
except qbittorrentapi.LoginFailed as e:
+
print(e)
+
+
+
def remove_torrents(args: dict):
+
try:
+
qbt_client = qbittorrentapi.Client(
+
host=args["HOSTNAME"], username=args["USERNAME"], password=args["PASSWORD"]
+
)
+
qbt_client.auth_log_in()
+
+
for torrent in qbt_client.torrents_info():
+
if torrent.name.startswith("Armbian"):
+
torrent.delete(delete_files=True)
+
print(f"Removed {torrent.name}")
+
qbt_client.auth_log_out()
+
except qbittorrentapi.LoginFailed as e:
+
print(e)
+
+
+
if __name__ == "__main__":
+
args = docopt(__doc__) # type: ignore
+
remove_torrents(args)
+
add_torrents(args)
+
+
# vim: ts=4 sts=4 sw=4 et ai ft=python
+22
.archived/speedcheck
···
+
#!/usr/bin/env bash
+
+
set -euo pipefail
+
+
LOG_DIR="${HOME}/speedtest-logs"
+
DAY="$(date '+%Y-%m-%d')"
+
+
if [ ! -d "${LOG_DIR}" ]; then
+
mkdir -p "${LOG_DIR}"
+
fi
+
+
print_speed() {
+
_time=$(date '+%H:%M:%S')
+
_speedtest=$(speedtest++ --output text | tail -n 2)
+
_dl_speed=$(echo "$_speedtest" | head -n 1 | awk -F= '{print $2}')
+
_ul_speed=$(echo "$_speedtest" | tail -n 1 | awk -F= '{print $2}')
+
echo "${_time} [D: ${_dl_speed} MB/s] [U: ${_ul_speed} MB/s]"
+
}
+
+
print_speed >>"${LOG_DIR}/${DAY}.log"
+
+
# vim: sw=4 ts=4 sts=4 ai et ft=bash
+16
.archived/start_debian_vm
···
+
#!/bin/sh
+
# Purpose: Simple script to start my Debian VM using bhyve on FreeBSD
+
# Original author: Vivek Gite (https://www.cyberciti.biz) under GPL v2.x+
+
# Modifications made by: hyperreal (https://hyperreal.coffee) under GPL v2.x+
+
+
if ! kldstat | grep -w vmm.ko; then
+
kldload -v vmm
+
fi
+
+
if ! kldstat | grep -w nmdm.ko; then
+
kldload -v nmdm
+
fi
+
+
if ! bhyve -c 4 -m 8G -w -H -s 0,hostbridge -s 4,virtio-blk,/dev/zvol/zroot/debianvm -s 5,virtio-net,tap0 -s 29,fbuf,tcp=0.0.0.0:5900,w=1024,h=768 -s 30,xhci,tablet -s 31,lpc -l com1,stdio -l bootrom,/usr/local/share/uefi-firmware/BHYVE_UEFI.fd debianvm 2>/tmp/start_debian_vm_error; then
+
neomutt -s "[nas] start_debian_vm error" jas@nas </tmp/start_debian_vm_error
+
fi
+31
.archived/swivel
···
+
#!/usr/bin/env bash
+
+
# swivel - Easily switch between running glances servers.
+
#
+
# Usage:
+
# swivel 10.0.0.10 10.0.0.11 10.0.0.12
+
+
set -euo pipefail
+
+
# Dependency check
+
missing_deps=()
+
command -v gum >/dev/null || missing_deps+=(gum)
+
command -v glances >/dev/null || missing_deps+=(glances)
+
+
if (( "${#missing_deps[@]}" != 0 )); then
+
echo "Missing dependencies:" "${missing_deps[@]}"
+
exit 1
+
fi
+
+
# Check number of args supplied at cli
+
if (( "${#@}" == 0 )); then
+
echo "At least one IP address or hostname must be supplied."
+
echo ""
+
echo "Usage: swivel <ip_addr0> <ip_addr1> ... <ip addrN>"
+
exit 1
+
fi
+
+
while
+
selection=$(gum choose "${@}" "Exit" --limit=1 --header="Selection:")
+
[[ "$selection" != "Exit" ]] && glances -c "@$selection" -p 61209
+
do true; done
+63
.archived/sync_from_remotes.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "resend",
+
# ]
+
# ///
+
+
import socket
+
import subprocess
+
from pathlib import Path
+
+
import resend
+
+
+
def send_email(program: str, log: str):
+
resend.api_key = Path("/usr/local/etc/resend_api_key.txt").read_text().strip("\n")
+
+
match log:
+
case "ok":
+
subj = f"[{socket.getfqdn()}] {program} OK โœ…"
+
msg = f"{program} on {socket.getfqdn()} ran successfully!"
+
case "err":
+
subj = f"[{socket.getfqdn()}] {program} Error โŒ"
+
msg = f"There was an error running {program} on {socket.getfqdn()}. Please investigate."
+
case _:
+
subj = ""
+
msg = ""
+
+
params: resend.Emails.SendParams = {
+
"from": "Admin <admin@hyperreal.coffee>",
+
"to": ["hyperreal@moonshadow.dev"],
+
"subject": subj,
+
"text": msg,
+
}
+
+
email = resend.Emails.send(params)
+
print(email)
+
+
+
def sync_from_remotes(src: str, dest: str):
+
rsync_cmd = ["rsync", "-avz", "--delete", src, dest]
+
+
try:
+
subprocess.run(rsync_cmd, check=True, text=True)
+
print(f"Successful sync from {src} to {dest}")
+
except subprocess.CalledProcessError as e:
+
print(f"Error during sync from {src} to {dest}: {e}")
+
send_email("sync_from_remotes", "err")
+
exit(1)
+
+
+
if __name__ == "__main__":
+
remotes = [
+
(
+
"root@hyperreal.lyrebird-marlin.ts.net:/srv/borgbackup/hyperreal/",
+
"/naspool/borgbackup/hyperreal",
+
),
+
]
+
+
for remote in remotes:
+
sync_from_remotes(remote[0], remote[1])
+
+
send_email("sync_from_remotes", "ok")
+45
.archived/sync_wikimedia_xmldumps
···
+
#!/usr/bin/env bash
+
+
# Use rclone to sync the last two good Wikimedia XML data dumps.
+
+
set -euxo pipefail
+
+
if [ -z "$1" ]; then
+
echo "Please supply an rclone remote"
+
exit 1
+
fi
+
+
RCLONE_REMOTE="$1"
+
MIRROR_URL=$(rclone config show "$RCLONE_REMOTE" | grep "url" | awk '{print $3}')
+
NTFY_IP=$(sudo tailscale status | grep "dietpi" | awk '{print $1}')
+
+
cleanup() {
+
echo "Cleaning up"
+
rm -fv "${HOME}/rsync-filelist-last-2-good.txt"
+
rm -fv "${HOME}/rsync-filelist-last-2-good-en.txt"
+
exit
+
}
+
+
trap cleanup 0 1 2 3 6
+
+
wget "${MIRROR_URL}/rsync-filelist-last-2-good.txt" \
+
-O "${HOME}/rsync-filelist-last-2-good.txt"
+
+
grep "enwiki" "${HOME}/rsync-filelist-last-2-good.txt" |
+
grep -v "tenwiki" |
+
tee "${HOME}/rsync-filelist-last-2-good-en.txt"
+
+
rclone sync \
+
--http-no-head \
+
-P \
+
--transfers 16 \
+
--include-from "${HOME}/rsync-filelist-last-2-good-en.txt" "${RCLONE_REMOTE}:" \
+
/naspool/archives/wikimedia-xmldatadumps-en
+
+
curl \
+
-H prio:default \
+
-H tags:incoming_envelope \
+
-d "Syncing of wikimedia xml datadumps succeeded" \
+
"http://${NTFY_IP}:8080/wikimedia_xmldatadumps_en"
+
+
exit 0
+90
.archived/systemd_syscall_filter
···
+
#!/usr/bin/env bash
+
+
# Usage:
+
# systemd_syscall_filter <absolute/path/to/binary> [-c]
+
#
+
# This script will print the syscalls the given binary executable uses
+
# along with the systemd syscall-filter categories they are in.
+
# This makes it easier to harden a systemd unit because you can see which
+
# categories you shouldn't add to the systemd unit's .d overrides for the
+
# SystemCallFilter= directive. If the given binary executable uses a
+
# particular system call, you probably don't want to keep that system call
+
# out of the sandbox, or the binary executable might not work as expected.
+
+
syscall_categories=(
+
"@default"
+
"@aio"
+
"@basic-io"
+
"@chown"
+
"@clock"
+
"@cpu-emulation"
+
"@debug"
+
"@file-system"
+
"@io-event"
+
"@ipc"
+
"@keyring"
+
"@memlock"
+
"@module"
+
"@mount"
+
"@network-io"
+
"@obsolete"
+
"@pkey"
+
"@privileged"
+
"@process"
+
"@raw-io"
+
"@reboot"
+
"@resources"
+
"@setuid"
+
"@signal"
+
"@swap"
+
"@sync"
+
"@system-service"
+
"@timer"
+
)
+
+
get_used_syscalls() {
+
for category in "${syscall_categories[@]}"; do
+
readarray -t syscalls < <(sudo systemd-analyze syscall-filter --no-pager "$category" | awk '{print $1}' | tail -n+3)
+
+
for sc in "${syscalls[@]}"; do
+
if strings "$1" | grep --silent -w "$sc"; then
+
echo "${category} : ${sc}"
+
fi
+
done
+
done
+
}
+
+
get_unused_categories() {
+
readarray -t used_syscalls < <(get_used_syscalls "$1" | awk '{print $1}' | uniq)
+
readarray -t unused_categories < <(echo "${syscall_categories[@]}" "${used_syscalls[@]}" | tr ' ' '\n' | sort | uniq -u)
+
for category in "${unused_categories[@]}"; do
+
echo "SystemCallFilter=~${category}"
+
done
+
}
+
+
if [ "$#" -eq 2 ]; then
+
case "$2" in
+
"-c")
+
get_unused_categories "$1"
+
;;
+
*)
+
echo "Unknown option: ${2}"
+
exit 1
+
;;
+
esac
+
elif [ "$#" -eq 1 ]; then
+
if ! test -x "$1"; then
+
echo "${1} is not found or is not executable"
+
exit 1
+
else
+
get_used_syscalls "$1"
+
fi
+
else
+
echo "Usage: systemd_syscall_filter <abs/path/to/binary> [-c]"
+
echo ""
+
echo "To get syscalls used by the binary:"
+
echo " systemd_syscall_filter /usr/sbin/auditd"
+
echo ""
+
echo "To get syscall categories not used by the binary, pass the -c (complement) flag:"
+
echo " systemd_syscall_filter /usr/sbin/auditd -c"
+
fi
+43
.archived/to_snake_case
···
+
#!/usr/bin/env bash
+
+
# I shamefully used ChatGPT to generate this. My brain just is not suited to
+
# coming up with that regex on my own and I didn't have much luck searching
+
# the web for helpful material.
+
+
# Function to convert a string to snake_case
+
to_snake_case() {
+
local input="$1"
+
local snake_case
+
snake_case=$(echo "$input" | sed -E 's/[[:space:]]+/_/g; s/([a-z])([A-Z])/\1_\2/g; s/[^a-zA-Z0-9_]+/_/g; s/__+/_/g; s/^_+|_+$//g' | tr '[:upper:]' '[:lower:]')
+
echo "$snake_case"
+
}
+
+
# Check if the file name is provided as an argument
+
if [ -z "$1" ]; then
+
echo "Usage: $0 <file-name>"
+
exit 1
+
fi
+
+
# Get the file name from the argument
+
file_name="$1"
+
+
# Extract the directory, base name, and extension
+
dir=$(dirname "$file_name")
+
base_name=$(basename "$file_name")
+
extension="${base_name##*.}"
+
base_name="${base_name%.*}"
+
+
# Convert the base name to snake_case
+
snake_case_base_name=$(to_snake_case "$base_name")
+
+
# Construct the new file name
+
if [ "$base_name" == "$extension" ]; then
+
new_file_name="$dir/$snake_case_base_name"
+
else
+
new_file_name="$dir/$snake_case_base_name.$extension"
+
fi
+
+
# Rename the file
+
mv "$file_name" "$new_file_name"
+
+
echo "File renamed to: $new_file_name"
+129
.archived/update_tracker.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "qbittorrent-api",
+
# "docopt",
+
# "rich",
+
# ]
+
# ///
+
+
"""update_tracker.py
+
+
Description:
+
This script collects infohashes of all torrents in each qBittorrent instance,
+
updates opentracker, and reannounces all torrents to their trackers.
+
+
Expectations:
+
- A JSON qBittorrent authentication file at ~/.config/qbittorrent_auth.json
+
- SSH pubkey access to torrent tracker server
+
- rsync installed on the host system running this script
+
+
Usage:
+
update_tracker.py (--add-tracker DOMAIN)
+
update_tracker.py -h
+
+
Options:
+
--add-tracker DOMAIN ensure the provided tracker domain is added to each torrent's tracker list
+
-h, --help show this help message and exit
+
+
Examples:
+
update_tracker.py --add-tracker hyperreal.coffee
+
"""
+
+
import json
+
import subprocess
+
import tempfile
+
from pathlib import Path
+
+
import qbittorrentapi
+
from docopt import docopt
+
from rich.console import Console
+
from rich.text import Text
+
+
if __name__ == "__main__":
+
args = docopt(__doc__) # type: ignore
+
+
tracker_domain = args["--add-tracker"]
+
+
console = Console()
+
with console.status("[bold green]Executing the tasks...") as status:
+
# JSON file containing authentication info for each qBittorrent instance
+
QBITTORRENT_AUTH_FILE = Path.home().joinpath(".config/qbittorrent_auth.json")
+
+
# Open authentication file and load JSON data
+
with open(QBITTORRENT_AUTH_FILE, "r") as qbt_auth:
+
auth_data = json.load(qbt_auth)
+
+
# Collect infohashes of all torrents in each qBittorrent instance
+
console.log(
+
"Collecting infohashes of all torrents in each qBittorrent instance."
+
)
+
torrent_infohashes = []
+
for item in auth_data["instances"]:
+
with qbittorrentapi.Client(
+
host=item["hostname"],
+
username=item["username"],
+
password=item["password"],
+
) as qbt_client:
+
try:
+
qbt_client.auth_log_in()
+
except qbittorrentapi.LoginFailed as e:
+
print(e)
+
+
for torrent in qbt_client.torrents_info():
+
torrent_infohashes.append(torrent.hash)
+
+
# Format the infohashes to have a \n at the end
+
console.log("Formatting infohashes to have a newline at the end.")
+
format_infohashes = set([f"{infohash}\n" for infohash in torrent_infohashes])
+
+
# Create a NamedTemporaryFile and write all infohashes to it, one per line
+
console.log("Creating temporary file to write infohashes to.")
+
+
with tempfile.NamedTemporaryFile() as ntf:
+
with open(ntf.name, "w") as tf:
+
tf.writelines(format_infohashes)
+
+
# Use `sudo cp -f` to copy the infohashes file to the torrent tracker's config
+
# directory, overwriting the whitelist.txt file.
+
console.log(
+
"Copying the temporary infohashes file to the torrent tracker's whitelist."
+
)
+
subprocess.run(
+
["sudo", "cp", "-f", ntf.name, "/etc/opentracker/whitelist.txt"]
+
)
+
+
# Run `sudo systemctl restart opentracker.service`
+
console.log("Restarting opentracker.service")
+
subprocess.run(["sudo", "systemctl", "restart", "opentracker.service"])
+
+
# Reannounce all torrents in each qBittorrent instance to their trackers
+
console.log("Reannouncing all torrents to their trackers.")
+
for item in auth_data["instances"]:
+
with qbittorrentapi.Client(
+
host=item["hostname"],
+
username=item["username"],
+
password=item["password"],
+
) as qbt_client:
+
for torrent in qbt_client.torrents_info():
+
torrent.reannounce()
+
+
console.log("Done!")
+
+
# Print output and make it look sexy ;)
+
console = Console()
+
tasks = Text("\nTasks completed:\n")
+
tasks.stylize("bold magenta")
+
console.print(tasks)
+
console.print(":white_check_mark: update the tracker's whitelist")
+
+
if tracker_domain:
+
console.print(
+
f":white_check_mark: ensure {tracker_domain}:6969/announce is in each torrent's tracker list"
+
)
+
+
console.print(":white_check_mark: reannounce all torrents to their trackers")
+
+
torrents = Text(str(len(torrent_infohashes)))
+
torrents.stylize("bold green")
+
console.print(torrents + " torrents were updated")
+25
.archived/yaml2json.py
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "yaml",
+
# ]
+
# ///
+
+
# YAML to JSON conversion script
+
# Based on https://www.geeksforgeeks.org/convert-yaml-to-json/
+
#
+
# This script takes a YAML file as the first arg, converts the
+
# YAML content to JSON, and outputs the converted JSON content
+
# to stdout.
+
+
import json
+
import sys
+
+
import yaml
+
+
try:
+
print(json.dumps(yaml.load(open(sys.argv[1]), Loader=yaml.FullLoader), indent=4))
+
except IndexError:
+
print("YAML file must be supplied as first arg")
+
except FileNotFoundError:
+
print("YAML file not found")
-11
add_prebuilt_repo
···
-
#!/usr/bin/env bash
-
-
set -euxo pipefail
-
-
wget -qO - 'https://proget.makedeb.org/debian-feeds/prebuilt-mpr.pub' | gpg --dearmor | sudo tee /usr/share/keyrings/prebuilt-mpr-archive-keyring.gpg 1> /dev/null
-
echo "deb [arch=all,$(dpkg --print-architecture) signed-by=/usr/share/keyrings/prebuilt-mpr-archive-keyring.gpg] https://proget.makedeb.org prebuilt-mpr $(lsb_release -cs)" | sudo tee /etc/apt/sources.list.d/prebuilt-mpr.list
-
sudo apt update -t bookworm-backports
-
sudo apt dist-upgrade -t bookworm-backports -y
-
sudo apt install -y just
-
sudo apt autoremove -y
-
exit 0
-36
amimullvad
···
-
#!/usr/bin/env zsh
-
-
# Check for dependencies
-
if ! test -x "$(command -v curl)"; then
-
echo "Missing dependency: curl"
-
exit 1
-
fi
-
-
if ! test -x "$(command -v gum)"; then
-
echo "Missing dependency: gum"
-
echo "See github.com/charmbracelet/gum"
-
exit 1
-
fi
-
-
if ! test -x "$(command -v jq)"; then
-
echo "Missing dependency: jq"
-
exit 1
-
fi
-
-
MV_API=$(curl -sSL https://am.i.mullvad.net/json)
-
IP=$(echo $MV_API | jq ."ip" | tr -d '"')
-
CITY=$(echo $MV_API | jq ."city" | tr -d '"')
-
COUNTRY=$(echo $MV_API | jq ."country" | tr -d '"')
-
MV_EXIT_IP_HN=$(echo $MV_API | jq ."mullvad_exit_ip_hostname" | tr -d '"')
-
MV_SERVER_TYPE=$(echo $MV_API | jq ."mullvad_server_type" | tr -d '"')
-
BLACKLISTED=$(echo $MV_API | jq ."blacklisted"."blacklisted")
-
-
LEFT_COL=$(printf "%s\n%s\n%s\n%s\n%s\n%s\n" "IP Address" "City" "Country" "Exit IP Hostname" "Server Type" "Blacklisted")
-
RIGHT_COL=$(printf "%s\n%s\n%s\n%s\n%s\n%s\n" "$IP" "$CITY" "$COUNTRY" "$MV_EXIT_IP_HN" "$MV_SERVER_TYPE" "$BLACKLISTED")
-
GUM_LEFT=$(gum style --foreground "#73F59F" --border-foreground 57 --border none --width 20 --margin "1 2" --padding "0 1" --align right "$LEFT_COL")
-
GUM_RIGHT=$(gum style --foreground "#F1F1F1" --border-foreground 57 --border none --width 20 --margin "1 0" --align left "$RIGHT_COL")
-
-
GUM_TOP=$(gum style --bold --foreground 212 --border-foreground 57 --border rounded --width 50 --align center --padding "0 1" "Am I Mullvad?")
-
GUM_BOTTOM=$(gum join --horizontal --align right "$GUM_LEFT" "$GUM_RIGHT")
-
BOTTOM=$(gum style --border-foreground 57 --border rounded --width 50 --align center --padding "0 1" $GUM_BOTTOM)
-
gum join --vertical "$GUM_TOP" "$BOTTOM"
-38
archive_index_template
···
-
#!/usr/bin/env bash
-
-
set -euo pipefail
-
-
# If the number of arguments is not equal to 1, exit and display usage info.
-
if [ "$#" -ne 2 ]; then
-
echo "Usage: archive_index_template MINIO_INSTANCE BUCKET_NAME"
-
exit 1
-
fi
-
-
# Create temporary directory.
-
TMP_DIR=$(mktemp -d)
-
-
# Check if temporary directory was created.
-
if ! test -d "$TMP_DIR"; then
-
echo "Failed to create temp dir"
-
exit 1
-
fi
-
-
# Cleanup temporary directory.
-
function cleanup() {
-
rm -rf "$TMP_DIR"
-
echo "Cleaned up temp dir at $TMP_DIR"
-
}
-
-
# Trigger cleanup trap on EXIT and SIGINT signals
-
trap cleanup EXIT SIGINT
-
-
# Download archive-index-template.html and save to temporary directory as
-
# index.html.
-
wget --quiet https://files.hyperreal.coffee/archive-index-template.html \
-
-O "${TMP_DIR}/index.html"
-
-
# Replace "CHANGEME" with the the BUCKET_NAME argument in index.html.
-
sed -i "s/CHANGEME/$2/g" "${TMP_DIR}/index.html"
-
-
# Put the new index.html into the root of the given bucket.
-
mc put "${TMP_DIR}/index.html" "${1}/${2}/"
+4 -17
archivebox_schedule
···
set -euo pipefail
# Check if feed URL is supplied.
-
if (( "${#@}" == 0 )); then
+
if (("${#@}" == 0)); then
echo "No feed URL has been supplied."
exit 1
fi
-
# Get Headnet IP address for desktop
-
SERVER_IP=$(sudo tailscale status | grep "dietpi" | awk '{print $1}')
-
# Go to archivebox directory and run scheduled command for supplied
-
# feed URL. Send ntfy an error message if it fails.
+
# feed URL.
cd /naspool/archivebox
if ! /home/jas/.local/bin/archivebox add --depth=1 "$1" \
-
>> /naspool/archivebox/logs/schedule.log; then
-
curl \
-
-H prio:urgent \
-
-H tags:warning \
-
-d "Error running archivebox schedule for $1" \
-
"http://${SERVER_IP}:8080/archivebox_schedule"
-
else
-
curl \
-
-H prio:default \
-
-H tags:incoming_envelope \
-
-d "archivebox schedule succeeded: $1" \
-
"http://${SERVER_IP}:8080/archivebox_schedule"
+
>>/naspool/archivebox/logs/schedule.log; then
+
echo "$(date '+%Y-%m-%d %H:%M:%S') ERROR. Exiting."
fi
+4
awkuptime
···
+
#!/usr/bin/env sh
+
+
# Lifted from https://superuser.com/a/1783477/3091052
+
awk '{m=int($1/60%60);h=int($1/3600%24);d=int($1/86400);printf "%sd %sh %sm\n", d, h, m}' /proc/uptime
-20
backup_podman_volumes
···
-
#!/usr/bin/env bash
-
-
set -euo pipefail
-
-
BACKUP_DIR="${HOME}/podman_volume_backups"
-
DATE=$(date '+%Y-%m-%d_%H%M%S')
-
-
volumes=(
-
"shaarli-cache"
-
"shaarli-data"
-
)
-
-
for vol in "${volumes[@]}"; do
-
podman volume export "$vol" --output "${BACKUP_DIR}/${vol}-${DATE}.tar"
-
gzip "${BACKUP_DIR}/${vol}-${DATE}.tar"
-
done
-
-
find "$BACKUP_DIR" -maxdepth 1 -mtime +3 -type f -delete
-
-
exit 0
+146
blog2gemlog
···
+
#!/usr/bin/env -S uv run --script
+
# /// script
+
# dependencies = [
+
# "feedgen",
+
# "feedparser",
+
# "md2gemini",
+
# ]
+
# ///
+
+
# 1. Take a markdown blog post as input, convert it to gemtext.
+
# 2. Update gemlog index.
+
# 3. Update the gemlog Atom feed.
+
+
import sys
+
from datetime import datetime
+
from pathlib import Path
+
from zoneinfo import ZoneInfo
+
+
import feedparser
+
from feedgen.feed import FeedGenerator
+
from md2gemini import md2gemini
+
+
# This is so that Python doesn't yell at me if I forget an argument. Not
+
# likely to happen, but still.
+
if len(sys.argv) != 3:
+
print('Usage: blog2gemlog /path/to/blog/post.md "Blog Post Title"')
+
exit(1)
+
+
# Set the absolute path to the gemini content directory
+
gemini_dir = Path.home().joinpath(
+
"repos/tildegit.org/hyperreal/hyperreal.coffee/gemini"
+
)
+
+
# Get the current date in YYYY-MM-DD format
+
date_now = datetime.now().strftime("%Y-%m-%d")
+
+
# Read blog post path from sys.argv[1] and ensure it is an absolute path
+
blog_post_path = Path(sys.argv[1])
+
if not blog_post_path.is_absolute():
+
print("Supply absolute path to blog post.")
+
exit(1)
+
+
# Convert the markdown blog post to gemtext
+
with open(blog_post_path, "r") as md_f:
+
content = md2gemini(md_f.read(), frontmatter=True, links="paragraph", md_links=True)
+
+
# Set the absolute path to the gemlog post
+
gemlog_post_path = gemini_dir.joinpath(f"gemlog/{blog_post_path.stem}.gmi")
+
+
# Write the gemtext content to the gemlog post path
+
with open(gemlog_post_path, "w") as gmi_f:
+
gmi_f.write(content)
+
+
# Set the string for the END section of the gemlog post
+
gemlog_end = f"\n\n## END\nLast updated: {date_now}\n\n=> ../gemlog Gemlog archive\n=> ../ hyperreal.coffee"
+
+
# Append gemlog_end to the end of the gemlog post
+
with open(gemlog_post_path, "a") as gmi_f:
+
gmi_f.write(gemlog_end)
+
+
# Read the gemlog post file lines into a list
+
with open(gemlog_post_path, "r") as gmi_f:
+
contents = gmi_f.readlines()
+
+
# Get the gemlog post title from sys.argv[2]
+
gemlog_post_title = str(sys.argv[2])
+
+
# Insert the gemlog post title as the level 1 heading on line 1
+
contents.insert(0, f"# {gemlog_post_title}\n\n")
+
+
# Write the new contents as a string to the gemlog file
+
with open(gemlog_post_path, "w") as gmi_f:
+
contents = "".join(contents)
+
gmi_f.write(contents)
+
+
# Read the lines of the gemlog index into a list
+
with open(gemini_dir.joinpath("gemlog/index.gmi"), "r") as index_f:
+
contents = index_f.readlines()
+
+
# Set the content of the gemlog index entry line
+
gemlog_index_line = f"=> ./{gemlog_post_path.name} {date_now} {gemlog_post_title}\n"
+
+
# Insert the new gemlog index line into the list on line 6
+
contents.insert(5, gemlog_index_line)
+
+
# Write the new contents as a string to the gemlog index file
+
with open(gemini_dir.joinpath("gemlog/index.gmi"), "w") as index_f:
+
contents = "".join(contents)
+
index_f.write(contents)
+
+
# Get a timezone-aware datetime object from a timestamp of the present moment
+
aware_ts = datetime.fromtimestamp(
+
datetime.timestamp(datetime.now()), tz=ZoneInfo("America/Chicago")
+
)
+
+
# Format the timezone-aware datetime object for the <updated> element of the
+
# Atom feed
+
updated_ts = aware_ts.strftime("%Y-%m-%dT%H:%M:%S%z")
+
+
# Instantiate a FeedParserDict object
+
d = feedparser.parse(gemini_dir.joinpath("gemlog/atom.xml"))
+
+
# Update the <updated> element's value to the current timestamp
+
d["updated"] = updated_ts
+
+
# Define a dictionary for the new Atom feed entry
+
new_entry_dict = {
+
"id": f"gemini://hyperreal.coffee/gemlog/{gemlog_post_path.name}",
+
"title": gemlog_post_title,
+
"updated": updated_ts,
+
"links": [
+
{
+
"href": f"gemini://hyperreal.coffee/gemlog/{gemlog_post_path.name}",
+
"rel": "alternate",
+
"type": "text/gemini",
+
}
+
],
+
}
+
+
# Insert the new Atom feed entry into the FeedParserDict
+
d["entries"].insert(0, new_entry_dict)
+
+
# Instantiate a FeedGenerator object and set the methods for the feed
+
fg = FeedGenerator()
+
fg.id(d["feed"]["id"])
+
fg.title(d["feed"]["title"])
+
fg.updated(d["feed"]["updated"])
+
fg.link(d["feed"]["links"])
+
+
# Reverse the order of d["entries"] so that they are written to the file in
+
# the correct order
+
d["entries"].reverse()
+
+
# For each entry, add a new entry to the FeedGenerator object
+
for entry in d["entries"]:
+
fe = fg.add_entry()
+
fe.id(entry["id"])
+
fe.title(entry["title"])
+
fe.updated(entry["updated"])
+
fe.link(entry["links"])
+
+
# Finally, render the FeedGenerator object as an Atom feed and write it to
+
# the atom.xml file
+
fg.atom_file(gemini_dir.joinpath("gemlog/atom.xml"), pretty=True)
+
+
# vim: ai et ft=python sts=4 sw=4 ts=4
-23
check_updates
···
-
#!/usr/bin/env bash
-
-
updates=()
-
-
if [[ -f /etc/debian_version ]]; then
-
APT_UPDATES=$(sudo apt update 2>/dev/null | grep package | tail -1 | cut -d '.' -f 1 | awk '{print $1}')
-
if [ "$APT_UPDATES" = "All" ]; then
-
NUM_UPDATES=0
-
else
-
NUM_UPDATES="$APT_UPDATES"
-
fi
-
updates+=("[ APT: ${NUM_UPDATES} ]")
-
fi
-
-
if [[ -f /etc/redhat-release ]]; then
-
updates+=("[ DNF: $(sudo dnf check-update | wc -l) ]")
-
fi
-
-
if command -v flatpak >/dev/null; then
-
updates+=("[ Flatpak: $(flatpak remote-ls --updates | wc -l) ]")
-
fi
-
-
echo "${updates[*]}"
-8
create_archive
···
-
#!/usr/bin/env zsh
-
-
# Create a tarball from given directory.
-
-
dir_name="$1"
-
archive_name="${dir_name}-$(date '+%Y%m%d').tar.gz"
-
tar cvfz "$archive_name" "$1" && \
-
echo "Created archive $archive_name"
-19
dayofweek
···
-
#!/usr/bin/env -S uv run --script
-
#
-
# Usage: dayofweek <year> <month> <day>
-
#
-
# Example: dayofweek 2003 11 6
-
-
import sys
-
from datetime import datetime
-
-
if __name__ == "__main__":
-
if len(sys.argv) != 4:
-
print("Usage: dayofweek <year> <month> <day>")
-
exit(1)
-
else:
-
print(
-
datetime(int(sys.argv[1]), int(sys.argv[2]), int(sys.argv[3])).strftime(
-
"%A"
-
)
-
)
-16
delete_yum_repo
···
-
#!/usr/bin/env zsh
-
-
selection=$(find /etc/yum.repos.d -type f -name "*.repo" | gum choose --no-limit)
-
-
format_string_array=(
-
"# You selected the following repo file(s):\n"
-
)
-
-
echo "$selection" | while read -r line; do format_string_array+=("- $line\n"); done
-
echo "${format_string_array[@]}" | gum format
-
echo ""
-
if gum confirm "Are you sure you want to delete?"; then
-
sudo rm -v $(echo "$selection")
-
else
-
echo ":raised_eyebrow: Oh, okay then. Carry on." | gum format -t emoji
-
fi
-17
fedora_rm_old_kernels
···
-
#!/usr/bin/env bash
-
-
# Source: https://docs.fedoraproject.org/en-US/quick-docs/upgrading-fedora-offline/
-
-
old_kernels=($(dnf repoquery --installonly --latest-limit=-1 -q))
-
if [ "${#old_kernels[@]}" -eq 0 ]; then
-
echo "No old kernels found"
-
exit 0
-
fi
-
-
if ! sudo dnf remove "${old_kernels[@]}"; then
-
echo "Failed to remove old kernels"
-
exit 1
-
fi
-
-
echo "Removed old kernels"
-
exit 0
+26
feed_count
···
+
#!/usr/bin/env nu
+
+
let mf_auth_token = (secret-tool lookup miniflux-auth-token hyperreal)
+
let mf_password = (secret-tool lookup miniflux-password hyperreal)
+
let mf_api_url = "http://moonshadow.carp-wyvern.ts.net:8080/v1/feeds/counters"
+
+
let unreads = (
+
( curl \
+
-s \
+
-X GET \
+
-H "Content-Type: application/json" \
+
-H $"X-Auth-Token: ($mf_auth_token)" \
+
-u $"hyperreal:($mf_password)" \
+
($mf_api_url)
+
)
+
| from json
+
| get unreads
+
| values
+
)
+
+
if ($unreads | is-empty) {
+
"0"
+
} else {
+
$unreads | math sum
+
}
+
# vim: sw=4 sts=4 ts=4 ai et ft=nu
-47
fetch_combined_trackers_list.py
···
-
#!/usr/bin/env -S uv run --script
-
# /// script
-
# dependencies = [
-
# "requests",
-
# "docopt",
-
# ]
-
# ///
-
-
"""fetch_combined_trackers_list.py
-
-
Description:
-
This script fetches a combined list of tracker URLs from plaintext lists hosted
-
on the web and writes them to a file in the current working directory.
-
-
Usage:
-
fetch_combined_trackers_list.py
-
fetch_combined_trackers_list.py -h
-
-
Options:
-
-h, --help show this help message and exit
-
"""
-
-
from pathlib import Path
-
-
import requests
-
from docopt import docopt
-
-
if __name__ == "__main__":
-
args = docopt(__doc__) # type: ignore
-
-
live_trackers_list_urls = [
-
"https://newtrackon.com/api/stable",
-
"https://trackerslist.com/best.txt",
-
"https://trackerslist.com/http.txt",
-
"https://raw.githubusercontent.com/ngosang/trackerslist/master/trackers_best.txt",
-
]
-
-
combined_trackers_urls = []
-
for url in live_trackers_list_urls:
-
response = requests.get(url, timeout=60)
-
tracker_urls = [x for x in response.text.splitlines() if x != ""]
-
combined_trackers_urls.extend(tracker_urls)
-
-
tracker_urls_filename = Path.cwd().joinpath("tracker_urls.txt")
-
with open(tracker_urls_filename, "w") as tf:
-
for url in combined_trackers_urls:
-
tf.write(f"{url}\n")
-48
fetch_scihub_infohashes.py
···
-
#!/usr/bin/env -S uv run --script
-
# /// script
-
# dependencies = [
-
# "requests",
-
# "docopt",
-
# ]
-
# ///
-
-
"""fetch_scihub_infohashes.py
-
-
Description:
-
This script fetches the infohashes of all Sci Hub torrents and writes them to a
-
plaintext file. The plaintext file is intended to be appended to a bittorrent
-
tracker whitelist. E.g., /etc/opentracker/whitelist.txt.
-
-
Optionally set the TORRENT_JSON_URL for the Sci Hub torrent health checker, or
-
run the script with no arguments to use the default.
-
-
Default health check URL:
-
https://zrthstr.github.io/libgen_torrent_cardiography/torrent.json
-
-
Usage:
-
fetch_scihub_infohashes.py [TORRENT_JSON_URL]
-
fetch_scihub_infohashes.py -h
-
-
Options:
-
-h, --help show this help message and exit.
-
"""
-
-
import json
-
from pathlib import Path
-
-
import requests
-
from docopt import docopt
-
-
if __name__ == "__main__":
-
args = docopt(__doc__) # type: ignore
-
url = (
-
args["TORRENT_JSON_URL"]
-
if args["TORRENT_JSON_URL"]
-
else "https://zrthstr.github.io/libgen_torrent_cardiography/torrent.json"
-
)
-
response = requests.get(url, timeout=60)
-
json_data = json.loads(response.text)
-
torrent_infohashes = [f"{x["infohash"]}\n" for x in json_data]
-
-
with open(Path.cwd().joinpath("scihub_torrent_infohashes.txt"), "w") as tf:
-
tf.writelines(torrent_infohashes)
-6
gen_digital_archive_listing
···
-
#!/usr/bin/env bash
-
-
cd /naspool/archives
-
fd | sort | tee /home/jas/digital_archive_listing.txt
-
-
exit 0
+55
git_backup
···
+
#!/usr/bin/env bash
+
+
set -euxo pipefail
+
+
if [ -f "${HOME}/.env_common" ]; then
+
source "${HOME}/.env_common"
+
else
+
echo ".env_common not found"
+
exit 1
+
fi
+
+
TILDEGIT_URL="https://tildegit.org"
+
TILDEGIT_CLONE_URL="git@tildegit.org:hyperreal"
+
TILDEGIT_BACKUP_DIR="/naspool/tildegit-backup"
+
KNOT_BACKUP_DIR="/naspool/knot-backup"
+
KNOT_CLONE_URL="git@knot.moonshadow.dev:hyperreal.bsky.moonshadow.dev"
+
+
curl -s -k \
+
-u "hyperreal:${GITEA_TOKEN}" \
+
"${TILDEGIT_URL}/api/v1/user/repos?limit=100&page=1" |
+
jq '.[].name | select(.!="keyoxide_proof")' |
+
tr -d '"' |
+
tee "${TILDEGIT_BACKUP_DIR}/repos.txt"
+
+
while read -r line; do
+
if [ -d "${TILDEGIT_BACKUP_DIR}/${line}" ]; then
+
cd "${TILDEGIT_BACKUP_DIR}/${line}"
+
git pull
+
else
+
cd "${TILDEGIT_BACKUP_DIR}"
+
git clone "${TILDEGIT_CLONE_URL}/${line}.git"
+
fi
+
sleep 30
+
done <"${TILDEGIT_BACKUP_DIR}/repos.txt"
+
+
knot_repos=(
+
"ansible-homelab"
+
"bin"
+
"dotfiles"
+
"hyperreal.coffee"
+
"justfiles"
+
)
+
+
for repo in "${knot_repos[@]}"; do
+
if [ -d "${KNOT_BACKUP_DIR}/${repo}" ]; then
+
cd "${KNOT_BACKUP_DIR}/${repo}"
+
git pull
+
else
+
cd "${KNOT_BACKUP_DIR}"
+
git clone "${KNOT_CLONE_URL}/${repo}"
+
fi
+
sleep 30
+
done
+
+
# vim: ts=4 sw=4 sts=4 ai et ft=bash
+32
hyperreal_backup
···
+
#!/usr/bin/env bash
+
+
set -euxo pipefail
+
+
if [ ! -f "${HOME}/.env_common" ]; then
+
echo "ERROR: .env_common not found"
+
exit 1
+
else
+
source "${HOME}/.env_common"
+
fi
+
+
curl --retry 3 "${HC_PING_URL}/start"
+
+
BORG_ARCHIVE=$(borg list ssh://u511927@u511927.your-storagebox.de:23/home/borgbackup/hyperreal | tail -n 1 | awk '{print $1}')
+
ARCHIVE_BASENAME=$(echo "$BORG_ARCHIVE" | cut -d "T" -f 1)
+
+
if ! borg export-tar \
+
"ssh://u511927@u511927.your-storagebox.de:23/home/borgbackup/hyperreal::${BORG_ARCHIVE}" \
+
"/naspool/hyperreal_backup/${ARCHIVE_BASENAME}.tar"; then
+
curl --retry 3 "${HC_PING_URL}/fail"
+
fi
+
+
find /naspool/hyperreal_backup \
+
-maxdepth 1 \
+
-type f \
+
-mtime +7 \
+
-exec rm -fv {} \; ||
+
curl --retry 3 "${HC_PING_URL}/fail"
+
+
curl --retry 3 "$HC_PING_URL"
+
+
# vim: ts=4 sts=4 sw=4 et ai ft=bash
-7
install_just
···
-
#!/usr/bin/env bash
-
-
set -euxo pipefail
-
-
curl --proto '=https' --tlsv1.2 -sSf https://just.systems/install.sh | bash -s -- --to /usr/local/bin
-
-
exit 0
-18
license
···
-
#!/usr/bin/env bash
-
-
set -euo pipefail
-
-
# Fetch an open source license
-
-
base_url="https://api.github.com/licenses"
-
headers="Accept: application/vnd.github.drax-preview+json"
-
-
if (( $# == 0 )); then
-
res=$(curl --silent --header $headers $base_url)
-
selection=$(echo "$res" | jq ".[].key" | tr -d '"' | gum choose --limit=1)
-
else
-
selection="$argv[1]"
-
fi
-
-
res=$(curl --silent --header $headers $base_url/$selection | jq ."body")
-
echo -e $res | tr -d '"'
-75
list_torrents.py
···
-
#!/usr/bin/env -S uv run --script
-
# /// script
-
# dependencies = [
-
# "qbittorrent-api",
-
# "docopt",
-
# ]
-
# ///
-
-
"""list_torrents.py
-
-
Description:
-
Fetch a list of torrents from a qBittorrent instance running on localhost.
-
The qBittorrent instance must be configured to allow login on localhost
-
without authentication. The output is formatted into a plaintext table.
-
-
Usage:
-
list_torrents.py
-
list_torrents.py -h
-
-
Options:
-
-h, --help show this help message and exit
-
"""
-
-
import qbittorrentapi
-
from docopt import docopt
-
-
-
# convert byte units
-
def human_bytes(input_bytes: int) -> str:
-
B = float(input_bytes)
-
KiB = float(1024)
-
MiB = float(KiB**2)
-
GiB = float(KiB**3)
-
TiB = float(KiB**4)
-
-
match B:
-
case B if B < KiB:
-
return "{0} {1}".format(B, "bytes" if 0 == B > 1 else "byte")
-
case B if KiB <= B <= MiB:
-
return "{0:.2f} KiB".format(B / KiB)
-
case B if MiB <= B <= GiB:
-
return "{0:.2f} MiB".format(B / MiB)
-
case B if GiB <= B <= TiB:
-
return "{0:.2f} GiB".format(B / GiB)
-
case B if TiB <= B:
-
return "{0:.2f} TiB".format(B / TiB)
-
case _:
-
return ""
-
-
-
def print_ssv():
-
with qbittorrentapi.Client(
-
host="localhost", port=8080, username="", password=""
-
) as qbt_client:
-
try:
-
qbt_client.auth_log_in()
-
except qbittorrentapi.LoginFailed as e:
-
print(e)
-
-
sorted_torrents = sorted(
-
qbt_client.torrents_info(), key=lambda d: d.ratio, reverse=True
-
)
-
print("Name Size # of Trackers Ratio Uploaded")
-
for torrent in sorted_torrents:
-
name = torrent.name
-
size = human_bytes(torrent.total_size)
-
trackers = torrent.trackers_count
-
ratio = torrent.ratio
-
uploaded = human_bytes(torrent.uploaded)
-
print(f"{name} {size} {trackers} {ratio} {uploaded}")
-
-
-
if __name__ == "__main__":
-
args = docopt(__doc__) # type: ignore
-
print_ssv()
-25
myip
···
-
#!/usr/bin/env -S uv run --script
-
# /// script
-
# dependencies = [
-
# "requests",
-
# ]
-
# ///
-
-
"""
-
myip - Fetch and display public IP information from ipinfo.io
-
"""
-
-
import json
-
-
import requests
-
-
if __name__ == "__main__":
-
KEY_COLOR = "\033[92m"
-
END_COLOR = "\033[0m"
-
-
response = requests.get("https://ipinfo.io", timeout=60)
-
json_data = json.loads(response.text)
-
-
print()
-
for item in json_data:
-
print(f"- {KEY_COLOR}{item.title():<16}{END_COLOR} {json_data[item]}")
-13
natpmpcd
···
-
#!/bin/sh
-
-
port=$(/usr/local/bin/natpmpc -a 1 0 udp 60 -g 10.2.0.1 | grep "Mapped public port" | awk '{print $4}')
-
echo $port | tee /usr/local/etc/natvpn_port.txt
-
-
while true; do
-
date
-
if ! /usr/local/bin/natpmpc -a 1 0 udp 60 -g 10.2.0.1 && /usr/local/bin/natpmpc -a 1 0 tcp 60 -g 10.2.0.1; then
-
echo "error Failure natpmpc $(date)"
-
break
-
fi
-
sleep 45
-
done
-362
oci_reg_helper
···
-
#!/usr/bin/env -S uv run --script
-
# /// script
-
# dependencies = [
-
# "docopt",
-
# "rich",
-
# ]
-
# ///
-
-
"""OCI Registry Helper
-
-
Usage:
-
ocirh <subcommand> [<args>...]
-
-
Subcommands:
-
repos Lists repositories in the registry. Repos correspond to images
-
pushed to the registry.
-
tags Lists tags of the given repository.
-
manifests Lists manifests of the given repository for the given tag.
-
rmi Removes a tag from an image. If given tag is the only tag,
-
removes the image.
-
gc Runs garbage collection on the registry. Requires SSH public key
-
access to registry server.
-
rmr Removes given repository from the registry. Requires SSH public
-
key access to registry server.
-
-
Examples:
-
Suppose we have an image called 'fedora-toolbox' tagged with 'latest'.
-
-
ocirh repos
-
ocirh tags fedora-toolbox
-
ocirh manifests fedora-toolbox latest
-
ocirh rmi fedora-toolbox latest
-
ocirh gc
-
ocirh rmr fedora-toolbox
-
"""
-
import http.client
-
import json
-
import logging
-
import math
-
import subprocess
-
-
from docopt import docopt
-
from rich import print
-
from rich.console import Group
-
from rich.logging import RichHandler
-
from rich.panel import Panel
-
from rich.table import Table
-
from rich.text import Text
-
from rich.traceback import install
-
from rich.tree import Tree
-
-
install(show_locals=True)
-
-
# Rich logging handler
-
FORMAT = "%(message)s"
-
logging.basicConfig(
-
level="NOTSET",
-
format=FORMAT,
-
datefmt="[%X]",
-
handlers=[RichHandler(rich_tracebacks=True)],
-
)
-
log = logging.getLogger("rich")
-
-
-
# Taken from https://stackoverflow.com/a/14822210
-
#
-
# How this function works:
-
# If size_bytes == 0, returns 0 B.
-
# size_name is a tuple containing binary prefixes for bytes.
-
#
-
# math.log takes the logarithm of size_bytes to base 1024.
-
# math.floor rounds down the result of math.log to the nearest integer.
-
# int ensures the result of math.floor is of type int, and stores it in i.
-
# The value of i is used to determine which binary prefix to use from
-
# size_name.
-
#
-
# math.pow returns the value of 1024 raised to the power of i, stores it in p.
-
#
-
# round takes the value of size_bytes, divides it by p, and stores the result
-
# in s at precision of 2 decimal places.
-
#
-
# A formatted string with size s and binary prefix size_name[i] is returned.
-
def convert_size(size_bytes: int) -> str:
-
"""
-
Converts a decimal integer of bytes to its respective binary-prefixed size.
-
-
Parameters:
-
size_bytes (int): A decimal integer.
-
-
Returns:
-
(str): Binary-prefixed size of size_bytes formatted as a string.
-
"""
-
if size_bytes == 0:
-
return "0 B"
-
size_name = ("B", "KiB", "MiB", "GiB")
-
i = int(math.floor(math.log(size_bytes, 1024)))
-
p = math.pow(1024, i)
-
s = round(size_bytes / p, 2)
-
return "%s %s" % (s, size_name[i])
-
-
-
REGISTRY_URL = "registry.hyperreal.coffee"
-
-
-
def get_auth() -> str:
-
"""
-
Get the base64 encoded password for registry autentication.
-
-
Returns:
-
auth (str): A string containing the base64 encoded password.
-
"""
-
try:
-
with open("/run/user/1000/containers/auth.json", "r") as authfile:
-
json_data = json.loads(authfile.read())
-
except Exception as ex:
-
log.exception(ex)
-
-
auth = json_data["auths"][REGISTRY_URL]["auth"]
-
return auth
-
-
-
def get_headers() -> dict:
-
"""
-
Returns headers for HTTP request authentication to the registry server.
-
-
Returns:
-
headers (dict): A dict of HTTP headers
-
"""
-
return {
-
"Accept": "application/vnd.oci.image.manifest.v1+json",
-
"Authorization": "Basic " + get_auth(),
-
}
-
-
-
def get_json_response(request: str, url: str) -> dict:
-
"""
-
Connects to registry and returns response data as JSON.
-
-
Parameters:
-
request (str): A string like "GET" or "DELETE"
-
url (str) : A string containing the URL of the requested data
-
-
Returns:
-
json_data (dict): JSON data as a dict object
-
"""
-
conn = http.client.HTTPSConnection(REGISTRY_URL)
-
headers = get_headers()
-
try:
-
conn.request(request, url, "", headers)
-
res = conn.getresponse()
-
data = res.read()
-
json_data = json.loads(data.decode("utf-8"))
-
except Exception as ex:
-
log.exception(ex)
-
-
return json_data
-
-
-
def get_repositories():
-
"""
-
Prints a Rich Tree that lists the repositories of the registry.
-
"""
-
-
json_data = get_json_response("GET", "/v2/_catalog")
-
repo_tree = Tree("[green]Repositories")
-
for repo in json_data["repositories"]:
-
repo_tree.add("[blue]%s" % repo)
-
-
print(repo_tree)
-
-
-
def get_tags(repo: str):
-
"""
-
Prints a Rich Tree that lists the tags for the given repository.
-
-
Parameters:
-
repo (str): A string containing the name of the repo
-
"""
-
json_data = get_json_response("GET", "/v2/" + repo + "/tags/list")
-
tags_tree = Tree("[green]%s tags" % repo)
-
for tag in json_data["tags"]:
-
tags_tree.add("[cyan]:%s" % tag)
-
-
print(tags_tree)
-
-
-
def get_manifests(repo: str, tag: str):
-
"""
-
Prints a Rich grid table that displays the manifests and metadata of the
-
image repository.
-
-
Parameters:
-
repo (str): A string containing the name of the repo
-
tag (str) : A string containing the tag of the desired image
-
"""
-
json_data = get_json_response("GET", "/v2/" + repo + "/manifests/" + tag)
-
-
grid_meta = Table.grid(expand=True)
-
grid_meta.add_column()
-
grid_meta.add_column()
-
meta_schema_version_key = Text("Schema version")
-
meta_schema_version_key.stylize("bold green", 0)
-
meta_schema_version_value = Text(str(json_data["schemaVersion"]))
-
meta_media_type_key = Text("Media type")
-
meta_media_type_key.stylize("bold green", 0)
-
meta_media_type_value = Text(json_data["mediaType"])
-
grid_meta.add_row(meta_schema_version_key, meta_schema_version_value)
-
grid_meta.add_row(meta_media_type_key, meta_media_type_value)
-
-
grid_config = Table.grid(expand=True)
-
grid_config.add_column()
-
grid_config.add_column()
-
config_media_type_key = Text("Media type")
-
config_media_type_key.stylize("bold green", 0)
-
config_media_type_value = Text(json_data["config"]["mediaType"])
-
config_digest_key = Text("Digest")
-
config_digest_key.stylize("bold green", 0)
-
config_digest_value = Text(json_data["config"]["digest"])
-
config_size_key = Text("Size")
-
config_size_key.stylize("bold green", 0)
-
config_size_value = Text(convert_size(json_data["config"]["size"]))
-
grid_config.add_row(config_media_type_key, config_media_type_value)
-
grid_config.add_row(config_digest_key, config_digest_value)
-
grid_config.add_row(config_size_key, config_size_value)
-
-
grid_annotations = Table.grid(expand=True)
-
grid_annotations.add_column()
-
grid_annotations.add_column()
-
for item in json_data["annotations"].items():
-
annotations_item_key = Text(item[0])
-
annotations_item_key.stylize("bold green", 0)
-
annotations_item_value = Text(item[1])
-
grid_annotations.add_row(annotations_item_key, annotations_item_value)
-
-
total_size = sum(layer.get("size") for layer in json_data["layers"])
-
table_layers = Table(box=None, show_footer=True)
-
table_layers.add_column(
-
"Digest", justify="right", style="yellow", no_wrap=True, footer="Total size:"
-
)
-
table_layers.add_column(
-
"Size",
-
justify="left",
-
style="cyan",
-
no_wrap=True,
-
footer=convert_size(total_size),
-
)
-
for layer in json_data["layers"]:
-
table_layers.add_row(layer.get("digest"), convert_size(layer.get("size")))
-
-
panel_group = Group(
-
Panel(grid_meta, title="[bold blue]Metadata"),
-
Panel(grid_config, title="[bold blue]Config"),
-
Panel(grid_annotations, title="Annotations"),
-
Panel(
-
table_layers,
-
title="[bold blue]Layers: %s" % json_data["layers"][0].get("mediaType"),
-
),
-
)
-
print(Panel(panel_group, title="[bold blue]%s:%s" % (repo, tag)))
-
-
-
def delete_image(repo: str, tag: str):
-
"""
-
Removes the given tag from the image. If the given tag is the only tag,
-
removes the image.
-
-
Parameters:
-
repo (str): A string containing the name of the repo
-
tag (str) : A string containing the tag to be removed
-
"""
-
try:
-
conn = http.client.HTTPSConnection(REGISTRY_URL)
-
headers = get_headers()
-
conn.request("GET", "/v2/" + repo + "/manifests/" + tag, "", headers)
-
res = conn.getresponse()
-
docker_content_digest = res.getheader("Docker-Content-Digest")
-
except Exception as ex:
-
log.exception(ex)
-
-
try:
-
conn.request(
-
"DELETE", "/v2/" + repo + "/manifests/" + docker_content_digest, "", headers
-
)
-
except Exception as ex:
-
log.exception(ex)
-
-
print("Untagged %s:%s successfully" % (repo, tag))
-
-
-
def garbage_collection():
-
"""
-
Runs garbage collection command on the remote registry server. Requires
-
SSH public key access.
-
"""
-
command = "/usr/local/bin/registry-gc"
-
-
try:
-
ssh = subprocess.Popen(
-
["ssh", "%s" % REGISTRY_URL, command],
-
shell=False,
-
stdout=subprocess.PIPE,
-
stderr=subprocess.PIPE,
-
text=True,
-
)
-
result = ssh.stdout.readlines()
-
if result == []:
-
log.error(ssh.stderr.readlines())
-
else:
-
print(result)
-
except Exception as ex:
-
log.exception(ex)
-
-
-
def remove_repo(repo: str):
-
"""
-
Runs command on remote registry server to remove the given repo.
-
-
Parameters:
-
repo (str): A string containing the name of the repo.
-
"""
-
command = "/usr/local/bin/registry-rm-repo " + repo
-
-
try:
-
ssh = subprocess.Popen(
-
["ssh", "%s" % REGISTRY_URL, command],
-
shell=False,
-
stdout=subprocess.PIPE,
-
stderr=subprocess.PIPE,
-
text=True,
-
)
-
result = ssh.stdout.readlines()
-
if result == []:
-
log.error(ssh.stderr.readlines())
-
else:
-
print(result)
-
except Exception as ex:
-
log.exception(ex)
-
-
-
if __name__ == "__main__":
-
args = docopt(__doc__, options_first=True)
-
match args["<subcommand>"]:
-
case "repos":
-
get_repositories()
-
case "tags":
-
get_tags(args["<args>"][0])
-
case "manifests":
-
get_manifests(args["<args>"][0], args["<args>"][1])
-
case "rmi":
-
delete_image(args["<args>"][0], args["<args>"][1])
-
case "gc":
-
garbage_collection()
-
case "rmr":
-
remove_repo(args["<args>"])
-
case _:
-
if args["<subcommand>"] in ["help", None]:
-
exit(subprocess.call(["python3", "ocirh", "--help"]))
-
else:
-
exit(
-
"%r is not a ocirh subcommand. See 'ocirh --help."
-
% args["<subcommand>"]
-
)
-48
publish_mastodon_archive.py
···
-
#!/usr/bin/env python3
-
-
import json
-
from html import unescape
-
-
-
def main():
-
with open("/home/jas/downloads/mastodon-archive/outbox.json", "r") as jf:
-
json_data = json.load(jf)
-
-
print("#+TITLE: Mastodon posts archive from 2024-02-16 to 2025-01-31")
-
print("#+DATE: 2025-02-02")
-
print("#+TAGS[]: mastodon archives")
-
print("#+AUTHOR: hyperreal")
-
print("#+SLUG: mastodon_archive-20240216-20250131")
-
print("#+LAYOUT: post")
-
print()
-
-
for item in sorted(
-
json_data["orderedItems"], key=json_data["orderedItems"].index, reverse=True
-
):
-
if type(item.get("object")) is dict:
-
published = item.get("object").get("published")
-
content = item.get("object").get("content")
-
attachment = (
-
item.get("object").get("attachment")
-
if len(item.get("object").get("attachment")) >= 1
-
else None
-
)
-
-
print(f"** {published}")
-
print("#+BEGIN_EXPORT html")
-
if type(content) is str:
-
print(unescape(content))
-
print("#+END_EXPORT")
-
if attachment:
-
for item in attachment:
-
if item.get("name"):
-
print(f"#+CAPTION: {item.get('name')}")
-
print(
-
f"[[https://files.hyperreal.coffee/mastodon_20240216-20250131/{item.get('url')}]]"
-
)
-
print("-----")
-
print()
-
-
-
if __name__ == "__main__":
-
main()
-15
qbt_stats_html.nu
···
-
#!/usr/bin/env nu
-
-
let old_head = "<html><style>body { background-color:white;color:black; }</style><body>"
-
let new_head = (
-
["<html><head><title>Torrent Stats</title><link type="text/css" rel="stylesheet" href="https://files.hyperreal.coffee/css/style1.css"/></head><body><h4>Last updated:", (date now | format date "%F %T%:z"), "</h4>"]
-
| str join ' '
-
)
-
-
(
-
/home/jas/admin-scripts/python/list_torrents.py
-
| from ssv -m 2
-
| to html
-
| str replace ($old_head) ($new_head)
-
| save -f -r /home/jas/public/html/torrents.html
-
)
-79
qbt_sum_size.py
···
-
#!/usr/bin/env -S uv run --script
-
# /// script
-
# dependencies = [
-
# "qbittorrent-api",
-
# "docopt",
-
# ]
-
# ///
-
-
"""qbt_sum_size.py
-
-
Description:
-
Get the total size of all added torrents and the total size of all completed
-
torrents from a qBittorrent instance.
-
-
Usage:
-
qbt_sum_size.py (HOSTNAME) (USERNAME) (PASSWORD)
-
qbt_sum_size.py -h
-
-
Examples:
-
qbt_sum_size.py "http://localhost:8080" "admin" "adminadmin"
-
qbt_sum_size.py "https://cat.seedhost.eu/lol/qbittorrent" "lol" "supersecretpassword"
-
-
Options:
-
-h, --help show this help message and exit
-
"""
-
-
import qbittorrentapi
-
from docopt import docopt
-
-
-
# convert byte units
-
def human_bytes(bites: int) -> str:
-
B = float(bites)
-
KiB = float(1024)
-
MiB = float(KiB**2)
-
GiB = float(KiB**3)
-
TiB = float(KiB**4)
-
-
match B:
-
case B if B < KiB:
-
return "{0} {1}".format(B, "bytes" if 0 == B > 1 else "byte")
-
case B if KiB <= B < MiB:
-
return "{0:.2f} KiB".format(B / KiB)
-
case B if MiB <= B < GiB:
-
return "{0:.2f} MiB".format(B / MiB)
-
case B if GiB <= B < TiB:
-
return "{0:.2f} GiB".format(B / GiB)
-
case B if TiB <= B:
-
return "{0:.2f} TiB".format(B / TiB)
-
case _:
-
return ""
-
-
-
if __name__ == "__main__":
-
args = docopt(__doc__) # type: ignore
-
-
completed_torrent_sizes = []
-
total_added_bytes = int()
-
-
with qbittorrentapi.Client(
-
host=args["HOSTNAME"], username=args["USERNAME"], password=args["PASSWORD"]
-
) as qbt_client:
-
try:
-
qbt_client.auth_log_in()
-
except qbittorrentapi.LoginFailed as e:
-
print(e)
-
-
for torrent in qbt_client.torrents_info():
-
if torrent.completion_on != 0:
-
completed_torrent_sizes.append(torrent.total_size)
-
-
total_added_bytes = sum(
-
[torrent.total_size for torrent in qbt_client.torrents_info()]
-
)
-
-
total_completed_bytes = sum(completed_torrent_sizes)
-
-
print(f"\nTotal completed size: {human_bytes(total_completed_bytes)}")
-
print(f"Total added size: {human_bytes(total_added_bytes)}\n")
+23 -34
quickinfo
···
#!/usr/bin/env bash
-
if ! command -v gum >/dev/null; then
-
echo "Gum command not found"
-
exit 0
-
fi
+
set -euo pipefail
-
function gumstyle() {
-
GUMSTYLE=$(gum style --foreground="#cba6f7" "$1")
-
echo "${GUMSTYLE} : ${2}"
-
}
+
desktop_info="separator:os:kernel:uptime:packages:memory:initsystem:btrfs:separator"
+
laptop_info="separator:os:kernel:uptime:packages:memory:initsystem:btrfs:battery:separator"
+
server_info="separator:os:kernel:uptime:packages:memory:initsystem:separator"
+
fbsd_info="separator:os:kernel:uptime:packages:memory:initsystem:zpool:separator"
+
styled_unread=$(gum style --foreground="#f2cdcd" --bold "Unread feeds: ")
-
echo
-
source /etc/os-release
-
gum style --foreground="#f38ba8" "$(echo $PRETTY_NAME)"
-
echo
-
gumstyle "Kernel" "$(uname -sr)"
-
gumstyle "Uptime" "$(uptime -p)"
-
echo
-
-
if [[ -f /etc/debian_version ]]; then
-
APT_PACKAGES=$(sudo apt update 2>/dev/null | grep packages | cut -d '.' -f 1 | awk '{print $1}')
-
gumstyle "APT updates" "$APT_PACKAGES"
-
fi
-
-
if [[ -f /etc/redhat-release ]]; then
-
DNF_PACKAGES=$(sudo dnf check-update | wc -l)
-
gumstyle "DNF updates" "$DNF_PACKAGES"
-
fi
-
-
if command -v flatpak >/dev/null; then
-
FLATPAK_PACKAGES=$(flatpak remote-ls --updates | wc -l)
-
gumstyle "Flatpak updates" "$FLATPAK_PACKAGES"
-
fi
+
case "$(hostname)" in
+
"desktop")
+
fastfetch -s "$desktop_info" -l none
+
gum join "$styled_unread" "$("${HOME}"/bin/feed_count)"
+
;;
+
"laptop")
+
fastfetch -s "$laptop_info" -l none
+
gum join "$styled_unread" "$("${HOME}"/bin/feed_count)"
+
;;
+
"nas")
+
fastfetch -s "$fbsd_info" -l none
+
;;
+
*)
+
fastfetch -s "$server_info" -l none
+
;;
+
esac
-
if [[ -d /home/linuxbrew/.linuxbrew ]]; then
-
BREW_PACKAGES=$(cat /home/jas/.homebrew_updates)
-
gumstyle "Homebrew updates" "$BREW_PACKAGES"
-
fi
+
# vim: sw=4 sts=4 ts=4 ai et ft=bash
-15
record_mastodon_media_size
···
-
#!/usr/bin/env bash
-
-
# /etc/cron.daily/record_mastodon_media_size.bash
-
-
set -euo pipefail
-
-
RECORD_FILE="/var/log/mastodon_media_size.log"
-
-
file_count=$(sudo /home/jas/.cargo/bin/dust -c -P -d 0 -b -f -R -p /home/mastodon/live/public/system | awk '{print $3}')
-
-
sudo /home/jas/.cargo/bin/dust \
-
-c -P -d 0 -b -R -p \
-
/home/mastodon/live/public/system |
-
awk -v fc="$file_count" -v tstamp="$(date '+%Y-%m-%d-%H%M%S')" '{print tstamp,$1,$3,fc}' |
-
tee -a "${RECORD_FILE}"
-20
registry_gc
···
-
#!/usr/bin/env bash
-
# registry-gc
-
# description: run garbage collection on registry
-
# name: registry
-
-
set -eu
-
-
if ! sudo podman container exists registry; then
-
echo "registry container does not exist"
-
exit 1
-
fi
-
-
if sudo podman container exec -it registry bin/registry garbage-collect /etc/docker/registry/config.yml -m; then
-
echo "Registry garbage collection ran successfully"
-
exit 0
-
else
-
echo "Error running registry garbage collection"
-
exit 1
-
fi
-
-18
registry_rm_repo
···
-
#!/usr/bin/env bash
-
# registry-rm-repo
-
# description: remove repository directory from registry data directory
-
-
set -eu
-
-
REPO_DIR="/mnt/registry_data/data/docker/registry/v2/repositories/"
-
REPO_TO_DELETE="$1"
-
-
if [ -d "${REPO_DIR}/${REPO_TO_DELETE}" ]; then
-
sudo rm -rf "${REPO_DIR}/${REPO_TO_DELETE}"
-
echo "${REPO_TO_DELETE} repo successfully deleted"
-
exit 0
-
else
-
echo "${REPO_TO_DELETE} repo not found"
-
exit 1
-
fi
-
-35
resend_notify.py
···
-
#!/usr/bin/env -S uv run --script
-
# /// script
-
# dependencies = [
-
# "resend",
-
# ]
-
# ///
-
-
import subprocess
-
import sys
-
from pathlib import Path
-
-
import resend
-
-
-
def main():
-
resend.api_key = Path("/usr/local/etc/resend_api_key.txt").read_text().strip("\n")
-
-
if len(sys.argv) != 3:
-
exit("Usage: resend_notify.py SUBJECT MESSAGE")
-
subject = sys.argv[1]
-
message = sys.argv[2]
-
-
params: resend.Emails.SendParams = {
-
"from": "Admin <admin@hyperreal.coffee>",
-
"to": ["hyperreal@moonshadow.dev"],
-
"subject": subject,
-
"text": message,
-
}
-
-
email = resend.Emails.send(params)
-
print(email)
-
-
-
if __name__ == "__main__":
-
main()
-38
rofimaim
···
-
#!/usr/bin/env bash
-
#
-
# Rofi powered menu to take a screenshot of the whole screen, a selected area or
-
# the active window. The image is then saved and copied to the clipboard.
-
# Uses: date maim notify-send rofi xclip xdotool
-
#
-
# This is lifted from https://gitlab.com/vahnrr/rofi-menus and modified by
-
# hyperreal <hyperreal64@pm.me> on 2023-09-06T15:09:58-05:00
-
-
save_location="${HOME}/Nextcloud/pictures/screenshots"
-
if ! test -d "${HOME}/Nextcloud/pictures/screenshots"; then
-
mkdir "${HOME}/Nextcloud/pictures/screenshots"
-
fi
-
screenshot_path="$save_location/$(date +'%Y-%m-%d-%H%M%S').png"
-
-
screen='๏’ฉ Screen'
-
area='๏„ฅ Select area'
-
window='๏’ˆ Window'
-
-
chosen=$(printf '%s;%s;%s' "$screen" "$area" "$window" |
-
rofi -theme-str 'window { width: 10em; height: 10em; }' \
-
-P 'Screenshot' \
-
-dmenu \
-
-sep ';' \
-
-selected-row 1)
-
-
case "$chosen" in
-
"$screen") extra_args='--delay=1' ;;
-
"$area") extra_args='--delay=0.1 --select --highlight --color=0.85,0.87,0.91,0.2' ;;
-
"$window") extra_args="--delay=1 --window=$(xdotool getactivewindow)" ;;
-
*) exit 1 ;;
-
esac
-
-
# The variable is used as a command's options, so it shouldn't be quoted.
-
# shellcheck disable=2086
-
maim --hidecursor --quiet --quality=10 --format='png' $extra_args "$screenshot_path" && {
-
notify-send --icon=org.xfce.screenshooter "Screenshot saved as $screenshot_path"
-
}
-213
scihub_knapsack.py
···
-
#!/usr/bin/env -S uv run --script
-
# /// script
-
# dependencies = [
-
# "qbittorrent-api",
-
# "requests",
-
# "docopt",
-
# ]
-
# ///
-
-
"""scihub_knapsack.py
-
-
Description:
-
This script will add torrents to a qBittorrent instance until a specified size
-
limit is reached.
-
-
By default, the larger torrents are prioritized in descending order, but the
-
script can be run with the --smaller flag to prioritize smaller torrents in
-
ascending order.
-
-
The script will select only torrents with less than or equal to <max_seeders>.
-
-
Usage:
-
scihub_knapsack.py [--smaller] [--dry-run] -H <hostname> -U <username> -P <password> -S <size> -s <max_seeders>
-
scihub_knapsack.py -h
-
-
Examples:
-
scihub_knapsack.py -H http://localhost:8080 -U admin -P adminadmin -S 42T
-
scihub_knapsack.py --smaller -H https://qbt.hello.world -U admin -P adminadmin -S 2.2T
-
-
Options:
-
--smaller Prioritize from the smallest torrent sizes and work upward
-
to larger sizes. Default is to prioritize larger sizes.
-
--dry-run Only print the torrent names, total number of torrents, and
-
their total combined size instead of adding them to the
-
qBittorrent instance.
-
-H <hostname> Hostname of the server where the qBittorrent instance is
-
running.
-
-U <username> Username of the user to login to the qBittorrent instance.
-
-P <password> Password of the user to login to the qBittorrent instance.
-
-S <size> The maximum size, in GiB or TiB, of the knapsack to add Sci
-
Hub torrents to. Must be a positive integer or float. Must
-
have either G or T on the end, which represents GiB or TiB.
-
-s <max_seeders> Select torrents with less than or equal to <max_seeders>
-
seeders. <max_seeders> is a positive integer.
-
"""
-
-
import json
-
-
import qbittorrentapi
-
import requests
-
from docopt import docopt
-
-
-
def get_torrent_health_data() -> list[dict]:
-
"""
-
Fetch Sci Hub torrent health checker data from the given URL. The URL
-
should refer to a JSON-formatted file.
-
"""
-
TORRENT_HEALTH_URL = (
-
"https://zrthstr.github.io/libgen_torrent_cardiography/torrent.json"
-
)
-
response = requests.get(TORRENT_HEALTH_URL, timeout=60)
-
return json.loads(response.text)
-
-
-
def convert_size_to_bytes(size: str) -> int:
-
"""
-
Convert the given size string to bytes.
-
-
Example: 42G --> 45097156608 bytes
-
"""
-
total_bytes = int()
-
-
if size.endswith("T"):
-
total_bytes = int(size.split("T")[0]) * (1024**4)
-
-
if size.endswith("G"):
-
total_bytes = int(size.split("G")[0]) * (1024**3)
-
-
return total_bytes
-
-
-
def human_bytes(bites: int) -> str:
-
"""
-
Convert bytes to KiB, MiB, GiB, or TiB.
-
-
Example: 45097156608 bytes -> 42 GiB
-
"""
-
B = float(bites)
-
KiB = float(1024)
-
MiB = float(KiB**2)
-
GiB = float(KiB**3)
-
TiB = float(KiB**4)
-
-
match B:
-
case B if B < KiB:
-
return "{0} {1}".format(B, "bytes" if 0 == B > 1 else "byte")
-
case B if KiB <= B < MiB:
-
return "{0:.2f} KiB".format(B / KiB)
-
case B if MiB <= B < GiB:
-
return "{0:.2f} MiB".format(B / MiB)
-
case B if GiB <= B < TiB:
-
return "{0:.2f} GiB".format(B / GiB)
-
case B if TiB <= B:
-
return "{0:.2f} TiB".format(B / TiB)
-
case _:
-
return ""
-
-
-
def get_knapsack_weight(knapsack: list[dict]) -> str:
-
"""
-
Get the weight of the given knapsack in GiB or TiB.
-
"""
-
return human_bytes(sum([torrent["size_bytes"] for torrent in knapsack]))
-
-
-
def fill_knapsack(
-
max_seeders: int, knapsack_size: int, smaller: bool = False
-
) -> list[dict]:
-
"""
-
Fill the knapsack.
-
-
Arguments:
-
max_seeders: int -- Select only torrents with less than or equal to
-
this number of seeders
-
knapsack_size: int -- The size in bytes of the knapsack
-
smaller: bool -- Prioritize smaller sized torrents (Default = False)
-
-
Return value:
-
A list of dictionaries that represent the torrents.
-
"""
-
-
# List of torrents with less than or equal to <max_seeders>
-
torrents = [t for t in get_torrent_health_data() if t["seeders"] <= max_seeders]
-
-
# Sorted list of torrents with <max_seeders>. If smaller == True, sort them
-
# in ascending order by size_bytes. Else sort them in descending order by
-
# size_bytes.
-
sorted_torrents = (
-
sorted(torrents, key=lambda d: d["size_bytes"])
-
if smaller == True
-
else sorted(torrents, key=lambda d: d["size_bytes"], reverse=True)
-
)
-
-
# Sum the sizes of each torrent in sorted_torrents and add them to the
-
# knapsack until it is filled, then return the knapsack.
-
sum = 0
-
knapsack = []
-
for torrent in sorted_torrents:
-
if sum + torrent["size_bytes"] >= knapsack_size:
-
break
-
sum += torrent["size_bytes"]
-
knapsack.append(torrent)
-
-
return knapsack
-
-
-
if __name__ == "__main__":
-
args = docopt(__doc__) # type: ignore
-
hostname = args["-H"]
-
username = args["-U"]
-
password = args["-P"]
-
max_seeders = int(args["-s"])
-
knapsack_size = convert_size_to_bytes(args["-S"])
-
smaller = args["--smaller"]
-
dry_run = args["--dry-run"]
-
-
# Initialize client and login
-
qbt_client = qbittorrentapi.Client(
-
host=hostname, username=username, password=password
-
)
-
-
try:
-
qbt_client.auth_log_in()
-
except qbittorrentapi.LoginFailed as e:
-
print(e)
-
-
# Fill the knapsack
-
knapsack = fill_knapsack(max_seeders, knapsack_size, smaller)
-
-
# If it's a dry run, only print the knapsack's contents. Otherwise,
-
# add the knapsack's contents to the qBittorrent instance.
-
# When finished, print the number of items and the combined weight of all
-
# items in the knapsack. Before attempting to add items to the qBittorrent
-
# instance, check to see if libgen.rs is even working. If libgen.rs is down
-
# no torrents can be added to the qBittorrent instance, so exit with an
-
# notice.
-
if dry_run:
-
for torrent in knapsack:
-
print(torrent["link"])
-
else:
-
response = requests.get("https://libgen.is/")
-
if not response.ok:
-
exit(
-
"It appears https://libgen.is is currently down. Please try again later."
-
)
-
for torrent in knapsack:
-
for torrent in knapsack:
-
if "gen.lib.rus.ec" in torrent["link"]:
-
new_torrent = torrent["link"].replace("gen.lib.rus.ec", "libgen.is")
-
qbt_client.torrents_add(new_torrent, category="scihub")
-
-
if "libgen.rs" in torrent["link"]:
-
new_torrent = torrent["link"].replace("libgen.rs", "libgen.is")
-
qbt_client.torrents_add(new_torrent, category="scihub")
-
# print(f"Added {torrent['name']}")
-
-
qbt_client.auth_log_out()
-
-
print("----------------")
-
print(f"Count: {len(knapsack)} torrents")
-
print(f"Total combined size: {get_knapsack_weight(knapsack)}")
-
print("----------------")
-91
seed_armbian_torrents.py
···
-
#!/usr/bin/env -S uv run --script
-
# /// script
-
# dependencies = [
-
# "qbittorrent-api",
-
# "requests",
-
# "bs4",
-
# "docopt"
-
# ]
-
# ///
-
-
"""seed_armbian_torrents.py
-
-
Description:
-
Armbian torrents seed script
-
-
This script will scrape https://mirrors.jevincanders.net/armbian/dl/ for
-
torrent files and add them to a qBittorrent instance. If there are already
-
Armbian torrents in the qBittorrent instance, they will be removed, and new
-
ones will be added in their place. This script is intended to be run under
-
/etc/cron.weekly or used in a systemd timer.
-
-
Usage:
-
seed_armbian_torrents.py (HOSTNAME) (USERNAME) (PASSWORD)
-
seed_armbian_torrents.py -h
-
-
Examples:
-
seed_armbian_torrents.py "http://localhost:8080" "admin" "adminadmin"
-
seed_armbian_torrents.py "https://cat.seedhost.eu/lol/qbittorrent" "lol" "pw"
-
-
Options:
-
-h, --help show this help message and exit.
-
"""
-
-
import os
-
-
import qbittorrentapi
-
import requests
-
from bs4 import BeautifulSoup
-
from docopt import docopt
-
-
-
def add_torrents(args: dict):
-
archive_dir_urls = [
-
"https://mirrors.jevincanders.net/armbian/dl/orangepi5-plus/archive/",
-
"https://mirrors.jevincanders.net/armbian/dl/rockpro64/archive/",
-
"https://mirrors.jevincanders.net/armbian/dl/rpi4b/archive/",
-
]
-
-
torrent_urls = []
-
for url in archive_dir_urls:
-
response = requests.get(url, timeout=60)
-
soup = BeautifulSoup(response.content, "html.parser")
-
links = soup.find_all("a")
-
for link in links:
-
if link.text.endswith(".torrent"):
-
torrent_urls.append(url + link.text)
-
-
try:
-
qbt_client = qbittorrentapi.Client(
-
host=args["HOSTNAME"], username=args["USERNAME"], password=args["PASSWORD"]
-
)
-
qbt_client.auth_log_in()
-
-
for url in torrent_urls:
-
qbt_client.torrents_add(url, category="distro")
-
print(f"Added {os.path.basename(url)}")
-
qbt_client.auth_log_out()
-
except qbittorrentapi.LoginFailed as e:
-
print(e)
-
-
-
def remove_torrents(args: dict):
-
try:
-
qbt_client = qbittorrentapi.Client(
-
host=args["HOSTNAME"], username=args["USERNAME"], password=args["PASSWORD"]
-
)
-
qbt_client.auth_log_in()
-
-
for torrent in qbt_client.torrents_info():
-
if torrent.name.startswith("Armbian"):
-
torrent.delete(delete_files=True)
-
print(f"Removed {torrent.name}")
-
qbt_client.auth_log_out()
-
except qbittorrentapi.LoginFailed as e:
-
print(e)
-
-
-
if __name__ == "__main__":
-
args = docopt(__doc__) # type: ignore
-
remove_torrents(args)
-
add_torrents(args)
+24
split_dict
···
+
#!/usr/bin/env bash
+
+
# Check if gum is available
+
if ! command -v gum >/dev/null; then
+
echo "Missing dependency: gum"
+
echo "See https://github.com/charmbracelet/gum"
+
exit 1
+
fi
+
+
# Check if get-def is available
+
if ! command -v get-def >/dev/null; then
+
echo "Missing dependency: get-def"
+
echo "Run pipx install get-def"
+
exit 1
+
fi
+
+
WORD=$(gum input --placeholder="word")
+
+
if [[ -n "$ZELLIJ" ]]; then
+
zellij action new-pane -- get-def "$WORD"
+
else
+
echo "No Zellij sessions detected."
+
exit 1
+
fi
+1 -1
split_man
···
#!/usr/bin/env bash
# Check if gum is available
-
if ! test -x "$(command -v gum)"; then
+
if ! command -v gum >/dev/null; then
echo "Missing dependency: gum"
echo "See https://github.com/charmbracelet/gum"
exit 1
-16
start_debian_vm
···
-
#!/bin/sh
-
# Purpose: Simple script to start my Debian VM using bhyve on FreeBSD
-
# Original author: Vivek Gite (https://www.cyberciti.biz) under GPL v2.x+
-
# Modifications made by: hyperreal (https://hyperreal.coffee) under GPL v2.x+
-
-
if ! kldstat | grep -w vmm.ko; then
-
kldload -v vmm
-
fi
-
-
if ! kldstat | grep -w nmdm.ko; then
-
kldload -v nmdm
-
fi
-
-
if ! bhyve -c 4 -m 8G -w -H -s 0,hostbridge -s 4,virtio-blk,/dev/zvol/zroot/debianvm -s 5,virtio-net,tap0 -s 29,fbuf,tcp=0.0.0.0:5900,w=1024,h=768 -s 30,xhci,tablet -s 31,lpc -l com1,stdio -l bootrom,/usr/local/share/uefi-firmware/BHYVE_UEFI.fd debianvm 2>/tmp/start_debian_vm_error; then
-
neomutt -s "[nas] start_debian_vm error" jas@nas </tmp/start_debian_vm_error
-
fi
-31
swivel
···
-
#!/usr/bin/env bash
-
-
# swivel - Easily switch between running glances servers.
-
#
-
# Usage:
-
# swivel 10.0.0.10 10.0.0.11 10.0.0.12
-
-
set -euo pipefail
-
-
# Dependency check
-
missing_deps=()
-
command -v gum >/dev/null || missing_deps+=(gum)
-
command -v glances >/dev/null || missing_deps+=(glances)
-
-
if (( "${#missing_deps[@]}" != 0 )); then
-
echo "Missing dependencies:" "${missing_deps[@]}"
-
exit 1
-
fi
-
-
# Check number of args supplied at cli
-
if (( "${#@}" == 0 )); then
-
echo "At least one IP address or hostname must be supplied."
-
echo ""
-
echo "Usage: swivel <ip_addr0> <ip_addr1> ... <ip addrN>"
-
exit 1
-
fi
-
-
while
-
selection=$(gum choose "${@}" "Exit" --limit=1 --header="Selection:")
-
[[ "$selection" != "Exit" ]] && glances -c "@$selection" -p 61209
-
do true; done
-63
sync_from_remotes.py
···
-
#!/usr/bin/env -S uv run --script
-
# /// script
-
# dependencies = [
-
# "resend",
-
# ]
-
# ///
-
-
import socket
-
import subprocess
-
from pathlib import Path
-
-
import resend
-
-
-
def send_email(program: str, log: str):
-
resend.api_key = Path("/usr/local/etc/resend_api_key.txt").read_text().strip("\n")
-
-
match log:
-
case "ok":
-
subj = f"[{socket.getfqdn()}] {program} OK โœ…"
-
msg = f"{program} on {socket.getfqdn()} ran successfully!"
-
case "err":
-
subj = f"[{socket.getfqdn()}] {program} Error โŒ"
-
msg = f"There was an error running {program} on {socket.getfqdn()}. Please investigate."
-
case _:
-
subj = ""
-
msg = ""
-
-
params: resend.Emails.SendParams = {
-
"from": "Admin <admin@hyperreal.coffee>",
-
"to": ["hyperreal@moonshadow.dev"],
-
"subject": subj,
-
"text": msg,
-
}
-
-
email = resend.Emails.send(params)
-
print(email)
-
-
-
def sync_from_remotes(src: str, dest: str):
-
rsync_cmd = ["rsync", "-avz", "--delete", src, dest]
-
-
try:
-
subprocess.run(rsync_cmd, check=True, text=True)
-
print(f"Successful sync from {src} to {dest}")
-
except subprocess.CalledProcessError as e:
-
print(f"Error during sync from {src} to {dest}: {e}")
-
send_email("sync_from_remotes", "err")
-
exit(1)
-
-
-
if __name__ == "__main__":
-
remotes = [
-
(
-
"root@hyperreal.lyrebird-marlin.ts.net:/srv/borgbackup/hyperreal/",
-
"/naspool/borgbackup/hyperreal",
-
),
-
]
-
-
for remote in remotes:
-
sync_from_remotes(remote[0], remote[1])
-
-
send_email("sync_from_remotes", "ok")
-33
sync_wikimedia_xmldumps
···
-
#!/usr/bin/env bash
-
-
# Use rclone to sync the last two good Wikimedia XML data dumps.
-
-
set -euxo pipefail
-
-
# Get Headnet IP for desktop
-
NTFY_IP=$(sudo tailscale status | grep "dietpi" | awk '{print $1}')
-
-
wget https://dumps.wikimedia.your.org/rsync-filelist-last-2-good.txt \
-
-O /home/jas/rsync-filelist-last-2-good.txt
-
-
grep "enwiki" /home/jas/rsync-filelist-last-2-good.txt |
-
grep -v "tenwiki" |
-
tee /home/jas/rsync-filelist-last-2-good-en.txt
-
-
rm -fv /home/jas/rsync-filelist-last-2-good.txt
-
-
rclone sync \
-
--http-no-head \
-
--transfers 8 \
-
--include-from /home/jas/rsync-filelist-last-2-good-en.txt dumps.wikimedia.your.org: \
-
/naspool/archives/wikimedia-xmldatadumps-en
-
-
rm -fv /home/jas/rsync-filelist-last-2-good-en.txt
-
-
curl \
-
-H prio:default \
-
-H tags:incoming_envelope \
-
-d "Syncing of wikimedia xml datadumps succeeded" \
-
"http://${NTFY_IP}:8080/wikimedia_xmldatadumps_en"
-
-
exit 0
-90
systemd_syscall_filter
···
-
#!/usr/bin/env bash
-
-
# Usage:
-
# systemd_syscall_filter <absolute/path/to/binary> [-c]
-
#
-
# This script will print the syscalls the given binary executable uses
-
# along with the systemd syscall-filter categories they are in.
-
# This makes it easier to harden a systemd unit because you can see which
-
# categories you shouldn't add to the systemd unit's .d overrides for the
-
# SystemCallFilter= directive. If the given binary executable uses a
-
# particular system call, you probably don't want to keep that system call
-
# out of the sandbox, or the binary executable might not work as expected.
-
-
syscall_categories=(
-
"@default"
-
"@aio"
-
"@basic-io"
-
"@chown"
-
"@clock"
-
"@cpu-emulation"
-
"@debug"
-
"@file-system"
-
"@io-event"
-
"@ipc"
-
"@keyring"
-
"@memlock"
-
"@module"
-
"@mount"
-
"@network-io"
-
"@obsolete"
-
"@pkey"
-
"@privileged"
-
"@process"
-
"@raw-io"
-
"@reboot"
-
"@resources"
-
"@setuid"
-
"@signal"
-
"@swap"
-
"@sync"
-
"@system-service"
-
"@timer"
-
)
-
-
get_used_syscalls() {
-
for category in "${syscall_categories[@]}"; do
-
readarray -t syscalls < <(sudo systemd-analyze syscall-filter --no-pager "$category" | awk '{print $1}' | tail -n+3)
-
-
for sc in "${syscalls[@]}"; do
-
if strings "$1" | grep --silent -w "$sc"; then
-
echo "${category} : ${sc}"
-
fi
-
done
-
done
-
}
-
-
get_unused_categories() {
-
readarray -t used_syscalls < <(get_used_syscalls "$1" | awk '{print $1}' | uniq)
-
readarray -t unused_categories < <(echo "${syscall_categories[@]}" "${used_syscalls[@]}" | tr ' ' '\n' | sort | uniq -u)
-
for category in "${unused_categories[@]}"; do
-
echo "SystemCallFilter=~${category}"
-
done
-
}
-
-
if [ "$#" -eq 2 ]; then
-
case "$2" in
-
"-c")
-
get_unused_categories "$1"
-
;;
-
*)
-
echo "Unknown option: ${2}"
-
exit 1
-
;;
-
esac
-
elif [ "$#" -eq 1 ]; then
-
if ! test -x "$1"; then
-
echo "${1} is not found or is not executable"
-
exit 1
-
else
-
get_used_syscalls "$1"
-
fi
-
else
-
echo "Usage: systemd_syscall_filter <abs/path/to/binary> [-c]"
-
echo ""
-
echo "To get syscalls used by the binary:"
-
echo " systemd_syscall_filter /usr/sbin/auditd"
-
echo ""
-
echo "To get syscall categories not used by the binary, pass the -c (complement) flag:"
-
echo " systemd_syscall_filter /usr/sbin/auditd -c"
-
fi
+9
techdirt2gmi
···
+
#!/usr/bin/env bash
+
+
set -euo pipefail
+
+
wget -q -O- https://techdirt.com/feed/ \
+
| markitdown \
+
| md2gemini -l paragraph \
+
| tee /home/jas/public/gemini/hyperreal.coffee/techdirt.gmi
+
-43
to_snake_case
···
-
#!/usr/bin/env bash
-
-
# I shamefully used ChatGPT to generate this. My brain just is not suited to
-
# coming up with that regex on my own and I didn't have much luck searching
-
# the web for helpful material.
-
-
# Function to convert a string to snake_case
-
to_snake_case() {
-
local input="$1"
-
local snake_case
-
snake_case=$(echo "$input" | sed -E 's/[[:space:]]+/_/g; s/([a-z])([A-Z])/\1_\2/g; s/[^a-zA-Z0-9_]+/_/g; s/__+/_/g; s/^_+|_+$//g' | tr '[:upper:]' '[:lower:]')
-
echo "$snake_case"
-
}
-
-
# Check if the file name is provided as an argument
-
if [ -z "$1" ]; then
-
echo "Usage: $0 <file-name>"
-
exit 1
-
fi
-
-
# Get the file name from the argument
-
file_name="$1"
-
-
# Extract the directory, base name, and extension
-
dir=$(dirname "$file_name")
-
base_name=$(basename "$file_name")
-
extension="${base_name##*.}"
-
base_name="${base_name%.*}"
-
-
# Convert the base name to snake_case
-
snake_case_base_name=$(to_snake_case "$base_name")
-
-
# Construct the new file name
-
if [ "$base_name" == "$extension" ]; then
-
new_file_name="$dir/$snake_case_base_name"
-
else
-
new_file_name="$dir/$snake_case_base_name.$extension"
-
fi
-
-
# Rename the file
-
mv "$file_name" "$new_file_name"
-
-
echo "File renamed to: $new_file_name"
-129
update_tracker.py
···
-
#!/usr/bin/env -S uv run --script
-
# /// script
-
# dependencies = [
-
# "qbittorrent-api",
-
# "docopt",
-
# "rich",
-
# ]
-
# ///
-
-
"""update_tracker.py
-
-
Description:
-
This script collects infohashes of all torrents in each qBittorrent instance,
-
updates opentracker, and reannounces all torrents to their trackers.
-
-
Expectations:
-
- A JSON qBittorrent authentication file at ~/.config/qbittorrent_auth.json
-
- SSH pubkey access to torrent tracker server
-
- rsync installed on the host system running this script
-
-
Usage:
-
update_tracker.py (--add-tracker DOMAIN)
-
update_tracker.py -h
-
-
Options:
-
--add-tracker DOMAIN ensure the provided tracker domain is added to each torrent's tracker list
-
-h, --help show this help message and exit
-
-
Examples:
-
update_tracker.py --add-tracker hyperreal.coffee
-
"""
-
-
import json
-
import subprocess
-
import tempfile
-
from pathlib import Path
-
-
import qbittorrentapi
-
from docopt import docopt
-
from rich.console import Console
-
from rich.text import Text
-
-
if __name__ == "__main__":
-
args = docopt(__doc__) # type: ignore
-
-
tracker_domain = args["--add-tracker"]
-
-
console = Console()
-
with console.status("[bold green]Executing the tasks...") as status:
-
# JSON file containing authentication info for each qBittorrent instance
-
QBITTORRENT_AUTH_FILE = Path.home().joinpath(".config/qbittorrent_auth.json")
-
-
# Open authentication file and load JSON data
-
with open(QBITTORRENT_AUTH_FILE, "r") as qbt_auth:
-
auth_data = json.load(qbt_auth)
-
-
# Collect infohashes of all torrents in each qBittorrent instance
-
console.log(
-
"Collecting infohashes of all torrents in each qBittorrent instance."
-
)
-
torrent_infohashes = []
-
for item in auth_data["instances"]:
-
with qbittorrentapi.Client(
-
host=item["hostname"],
-
username=item["username"],
-
password=item["password"],
-
) as qbt_client:
-
try:
-
qbt_client.auth_log_in()
-
except qbittorrentapi.LoginFailed as e:
-
print(e)
-
-
for torrent in qbt_client.torrents_info():
-
torrent_infohashes.append(torrent.hash)
-
-
# Format the infohashes to have a \n at the end
-
console.log("Formatting infohashes to have a newline at the end.")
-
format_infohashes = set([f"{infohash}\n" for infohash in torrent_infohashes])
-
-
# Create a NamedTemporaryFile and write all infohashes to it, one per line
-
console.log("Creating temporary file to write infohashes to.")
-
-
with tempfile.NamedTemporaryFile() as ntf:
-
with open(ntf.name, "w") as tf:
-
tf.writelines(format_infohashes)
-
-
# Use `sudo cp -f` to copy the infohashes file to the torrent tracker's config
-
# directory, overwriting the whitelist.txt file.
-
console.log(
-
"Copying the temporary infohashes file to the torrent tracker's whitelist."
-
)
-
subprocess.run(
-
["sudo", "cp", "-f", ntf.name, "/etc/opentracker/whitelist.txt"]
-
)
-
-
# Run `sudo systemctl restart opentracker.service`
-
console.log("Restarting opentracker.service")
-
subprocess.run(["sudo", "systemctl", "restart", "opentracker.service"])
-
-
# Reannounce all torrents in each qBittorrent instance to their trackers
-
console.log("Reannouncing all torrents to their trackers.")
-
for item in auth_data["instances"]:
-
with qbittorrentapi.Client(
-
host=item["hostname"],
-
username=item["username"],
-
password=item["password"],
-
) as qbt_client:
-
for torrent in qbt_client.torrents_info():
-
torrent.reannounce()
-
-
console.log("Done!")
-
-
# Print output and make it look sexy ;)
-
console = Console()
-
tasks = Text("\nTasks completed:\n")
-
tasks.stylize("bold magenta")
-
console.print(tasks)
-
console.print(":white_check_mark: update the tracker's whitelist")
-
-
if tracker_domain:
-
console.print(
-
f":white_check_mark: ensure {tracker_domain}:6969/announce is in each torrent's tracker list"
-
)
-
-
console.print(":white_check_mark: reannounce all torrents to their trackers")
-
-
torrents = Text(str(len(torrent_infohashes)))
-
torrents.stylize("bold green")
-
console.print(torrents + " torrents were updated")
-25
yaml2json.py
···
-
#!/usr/bin/env -S uv run --script
-
# /// script
-
# dependencies = [
-
# "yaml",
-
# ]
-
# ///
-
-
# YAML to JSON conversion script
-
# Based on https://www.geeksforgeeks.org/convert-yaml-to-json/
-
#
-
# This script takes a YAML file as the first arg, converts the
-
# YAML content to JSON, and outputs the converted JSON content
-
# to stdout.
-
-
import json
-
import sys
-
-
import yaml
-
-
try:
-
print(json.dumps(yaml.load(open(sys.argv[1]), Loader=yaml.FullLoader), indent=4))
-
except IndexError:
-
print("YAML file must be supplied as first arg")
-
except FileNotFoundError:
-
print("YAML file not found")