this repo has no description

Compare changes

Choose any two refs to compare.

-56
feeds/popular.py
···
-
import logging
-
-
import apsw
-
import apsw.ext
-
-
from . import BaseFeed
-
-
class PopularFeed(BaseFeed):
-
FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/popular'
-
-
def __init__(self):
-
# use the posts from the most-liked feed for this
-
self.db_cnx = apsw.Connection('db/mostliked.db')
-
self.db_cnx.pragma('foreign_keys', True)
-
self.db_cnx.pragma('journal_mode', 'WAL')
-
-
def process_commit(self, commit):
-
pass
-
-
def commit_changes(self):
-
pass
-
-
def generate_sql(self, limit, offset, langs):
-
bindings = []
-
sql = """
-
select posts.uri, create_ts, likes, lang, unixepoch('now') - create_ts as age_seconds,
-
exp( -1 * ( ( unixepoch('now') - create_ts ) / 1800.0 ) ) as decay,
-
likes * exp( -1 * ( ( unixepoch('now') - create_ts ) / 1800.0 ) ) as score
-
from posts
-
left join langs on posts.uri = langs.uri
-
where
-
"""
-
if not '*' in langs:
-
lang_values = list(langs.values())
-
bindings.extend(lang_values)
-
sql += " OR ".join(['lang = ?'] * len(lang_values))
-
else:
-
sql += " 1=1 "
-
sql += """
-
order by score desc
-
limit ? offset ?
-
"""
-
bindings.extend([limit, offset])
-
return sql, bindings
-
-
def serve_feed(self, limit, offset, langs):
-
sql, bindings = self.generate_sql(limit, offset, langs)
-
cur = self.db_cnx.execute(sql, bindings)
-
return [row[0] for row in cur]
-
-
def serve_feed_debug(self, limit, offset, langs):
-
sql, bindings = self.generate_sql(limit, offset, langs)
-
return apsw.ext.format_query_table(
-
self.db_cnx, sql, bindings,
-
string_sanitize=2, text_width=9999, use_unicode=True
-
)
···
-16
service/feedgen.service
···
-
[Unit]
-
Description=Bsky Feedgen
-
After=network.target syslog.target
-
-
[Service]
-
Type=simple
-
User=eric
-
WorkingDirectory=/home/eric/bsky-tools
-
ExecStart=/home/eric/.local/bin/pipenv run ./feedgen.py
-
TimeoutSec=15
-
Restart=on-failure
-
RestartSec=60
-
StandardOutput=journal
-
-
[Install]
-
WantedBy=multi-user.target
···
-16
service/feedweb.service
···
-
[Unit]
-
Description=Bsky Feedweb
-
After=network.target syslog.target
-
-
[Service]
-
Type=simple
-
User=eric
-
WorkingDirectory=/home/eric/bsky-tools
-
ExecStart=/home/eric/.local/bin/pipenv run gunicorn -w 1 -b 127.0.0.1:9060 feedweb:app
-
TimeoutSec=15
-
Restart=on-failure
-
RestartSec=1
-
StandardOutput=journal
-
-
[Install]
-
WantedBy=multi-user.target
···
+2 -1
.gitignore
···
db/
-
firehose.db*
···
db/
+
bin/
+
data/
+3
cmd/bsky-users/schema.sql
···
···
+
CREATE TABLE IF NOT EXISTS users (did TEXT, ts TIMESTAMP);
+
CREATE UNIQUE INDEX IF NOT EXISTS did_idx on users(did);
+
CREATE INDEX IF NOT EXISTS ts_idx on users(ts);
-53
feed_manager.py
···
-
from fnmatch import fnmatchcase
-
-
from feeds.battle import BattleFeed
-
from feeds.rapidfire import RapidFireFeed
-
from feeds.homeruns import HomeRunsTeamFeed
-
from feeds.norazone_interesting import NoraZoneInteresting
-
from feeds.sevendirtywords import SevenDirtyWordsFeed
-
from feeds.ratio import RatioFeed
-
from feeds.outlinetags import OutlineTagsFeed
-
from feeds.popqp import PopularQuotePostsFeed
-
-
class FeedManager:
-
def __init__(self):
-
self.feeds = {}
-
-
def register(self, feed):
-
self.feeds[feed.FEED_URI] = feed()
-
-
def process_commit(self, commit):
-
for feed in self.feeds.values():
-
feed.process_commit(commit)
-
-
def serve_feed(self, feed_uri, limit, offset, langs, debug=False):
-
for pattern, feed in self.feeds.items():
-
if fnmatchcase(feed_uri, pattern):
-
break
-
else:
-
raise Exception('no matching feed pattern found')
-
-
if '*' in pattern and debug:
-
return feed.serve_wildcard_feed_debug(feed_uri, limit, offset, langs)
-
-
elif '*' in pattern and not debug:
-
return feed.serve_wildcard_feed(feed_uri, limit, offset, langs)
-
-
elif '*' not in pattern and debug:
-
return feed.serve_feed_debug(limit, offset, langs)
-
-
elif '*' not in pattern and not debug:
-
return feed.serve_feed(limit, offset, langs)
-
-
def commit_changes(self):
-
for feed in self.feeds.values():
-
feed.commit_changes()
-
-
def stop_all(self):
-
for feed in self.feeds.values():
-
try:
-
feed.stop_db_worker()
-
except AttributeError:
-
pass
-
-
feed_manager = FeedManager()
···
-76
feedgen.py
···
-
#!/usr/bin/env python3
-
-
import asyncio
-
from io import BytesIO
-
import json
-
import logging
-
import signal
-
-
from atproto import CAR
-
import dag_cbor
-
import websockets
-
-
from feed_manager import feed_manager
-
-
logging.basicConfig(
-
format='%(asctime)s - %(levelname)-5s - %(name)-20s - %(message)s',
-
level=logging.DEBUG
-
)
-
logging.getLogger('').setLevel(logging.WARNING)
-
logging.getLogger('feeds').setLevel(logging.DEBUG)
-
logging.getLogger('firehose').setLevel(logging.DEBUG)
-
logging.getLogger('feedgen').setLevel(logging.DEBUG)
-
-
logger = logging.getLogger('feedgen')
-
-
async def firehose_events():
-
relay_url = 'ws://localhost:6008/subscribe'
-
-
logger = logging.getLogger('feeds.events')
-
logger.info(f'opening websocket connection to {relay_url}')
-
-
async with websockets.connect(relay_url, ping_timeout=60) as firehose:
-
while True:
-
payload = BytesIO(await firehose.recv())
-
yield json.load(payload)
-
-
async def main():
-
event_count = 0
-
-
async for commit in firehose_events():
-
feed_manager.process_commit(commit)
-
event_count += 1
-
if event_count % 2500 == 0:
-
feed_manager.commit_changes()
-
-
def handle_exception(loop, context):
-
msg = context.get("exception", context["message"])
-
logger.error(f"Caught exception: {msg}")
-
logger.info("Shutting down...")
-
asyncio.create_task(shutdown(loop))
-
-
async def shutdown(loop, signal=None):
-
if signal:
-
logger.info(f'received exit signal {signal.name}')
-
feed_manager.stop_all()
-
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
-
[task.cancel() for task in tasks]
-
logger.info(f'cancelling {len(tasks)} outstanding tasks')
-
await asyncio.gather(*tasks, return_exceptions=True)
-
loop.stop()
-
-
if __name__ == '__main__':
-
loop = asyncio.get_event_loop()
-
catch_signals = (signal.SIGTERM, signal.SIGINT)
-
for sig in catch_signals:
-
loop.add_signal_handler(
-
sig,
-
lambda s=sig: asyncio.create_task(shutdown(loop, signal=s))
-
)
-
loop.set_exception_handler(handle_exception)
-
-
try:
-
loop.create_task(main())
-
loop.run_forever()
-
finally:
-
loop.close()
···
-65
feeds/__init__.py
···
-
from datetime import datetime, timezone, timedelta
-
-
class BaseFeed:
-
def process_commit(self, commit):
-
raise NotImplementedError
-
-
def serve_feed(self, limit, offset, langs):
-
raise NotImplementedError
-
-
def serve_wildcard_feed(self, feed_uri, limit, offset, langs):
-
raise NotImplementedError
-
-
def commit_changes(self):
-
raise NotImplementedError
-
-
def parse_timestamp(self, timestamp):
-
# https://atproto.com/specs/lexicon#datetime
-
formats = {
-
# preferred
-
'1985-04-12T23:20:50.123Z': '%Y-%m-%dT%H:%M:%S.%f%z',
-
# '1985-04-12T23:20:50.123456Z': '%Y-%m-%dT%H:%M:%S.%f%z',
-
# '1985-04-12T23:20:50.120Z': '%Y-%m-%dT%H:%M:%S.%f%z',
-
# '1985-04-12T23:20:50.120000Z': '%Y-%m-%dT%H:%M:%S.%f%z',
-
-
# supported
-
# '1985-04-12T23:20:50.12345678912345Z': '',
-
'1985-04-12T23:20:50Z': '%Y-%m-%dT%H:%M:%S%z',
-
# '1985-04-12T23:20:50.0Z': '%Y-%m-%dT%H:%M:%S.%f%z',
-
# '1985-04-12T23:20:50.123+00:00': '%Y-%m-%dT%H:%M:%S.%f%z',
-
# '1985-04-12T23:20:50.123-07:00': '%Y-%m-%dT%H:%M:%S.%f%z',
-
}
-
-
for format in formats.values():
-
try:
-
ts = datetime.strptime(timestamp, format)
-
except ValueError:
-
continue
-
else:
-
return ts
-
-
return datetime.now(timezone.utc)
-
-
def safe_timestamp(self, timestamp):
-
utc_now = datetime.now(timezone.utc)
-
if timestamp is None:
-
return utc_now
-
-
parsed = self.parse_timestamp(timestamp)
-
if parsed.timestamp() <= 0:
-
return utc_now
-
elif parsed - timedelta(minutes=2) < utc_now:
-
return parsed
-
elif parsed > utc_now:
-
return utc_now
-
-
def transaction_begin(self, db):
-
if not db.in_transaction:
-
db.execute('BEGIN')
-
-
def transaction_commit(self, db):
-
if db.in_transaction:
-
db.execute('COMMIT')
-
-
def wal_checkpoint(self, db, mode='PASSIVE'):
-
return db.execute(f'PRAGMA wal_checkpoint({mode})')
···
-100
feeds/battle.py
···
-
import logging
-
-
import apsw
-
import apsw.ext
-
import grapheme
-
-
from . import BaseFeed
-
-
class BattleFeed(BaseFeed):
-
FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/battle'
-
-
def __init__(self):
-
self.db_cnx = apsw.Connection('db/battle.db')
-
self.db_cnx.pragma('journal_mode', 'WAL')
-
self.db_cnx.pragma('wal_autocheckpoint', '0')
-
-
with self.db_cnx:
-
self.db_cnx.execute("""
-
create table if not exists posts (
-
uri text,
-
grapheme_length integer,
-
create_ts timestamp,
-
lang text
-
);
-
create unique index if not exists ll_idx on posts(grapheme_length, lang);
-
""")
-
-
self.logger = logging.getLogger('feeds.battle')
-
-
def process_commit(self, commit):
-
if commit['opType'] != 'c':
-
return
-
-
if commit['collection'] != 'app.bsky.feed.post':
-
return
-
-
record = commit.get('record')
-
if record is None:
-
return
-
-
repo = commit['did']
-
rkey = commit['rkey']
-
post_uri = f'at://{repo}/app.bsky.feed.post/{rkey}'
-
length = grapheme.length(record.get('text', ''))
-
ts = self.safe_timestamp(record.get('createdAt')).timestamp()
-
-
self.transaction_begin(self.db_cnx)
-
-
langs = record.get('langs') or ['']
-
for lang in langs:
-
self.db_cnx.execute("""
-
insert into posts(uri, grapheme_length, create_ts, lang)
-
values(:uri, :length, :ts, :lang)
-
on conflict do update set uri = :uri, create_ts = :ts
-
""", dict(uri=post_uri, length=length, ts=ts, lang=lang))
-
-
def commit_changes(self):
-
self.logger.debug('committing changes')
-
self.transaction_commit(self.db_cnx)
-
self.wal_checkpoint(self.db_cnx, 'RESTART')
-
-
def serve_feed(self, limit, offset, langs):
-
if '*' in langs:
-
cur = self.db_cnx.execute("""
-
select uri
-
from posts
-
order by grapheme_length asc
-
limit :limit offset :offset
-
""", dict(limit=limit, offset=offset))
-
return [uri for (uri,) in cur]
-
else:
-
lang_values = list(langs.values())
-
lang_selects = ['select uri, grapheme_length from posts where lang = ?'] * len(lang_values)
-
lang_clause = ' union '.join(lang_selects)
-
cur = self.db_cnx.execute(
-
lang_clause + ' order by grapheme_length asc limit ? offset ?',
-
[*lang_values, limit, offset]
-
)
-
return [uri for (uri, grapheme_length) in cur]
-
-
def serve_feed_debug(self, limit, offset, langs):
-
if '*' in langs:
-
query = """
-
select *, unixepoch('now') - create_ts as age_seconds
-
from posts
-
order by grapheme_length asc
-
limit :limit offset :offset
-
"""
-
bindings = [limit, offset]
-
else:
-
lang_values = list(langs.values())
-
lang_selects = ["select *, unixepoch('now') - create_ts as age_seconds from posts where lang = ?"] * len(lang_values)
-
lang_clause = ' union '.join(lang_selects)
-
query = lang_clause + ' order by grapheme_length asc limit ? offset ?'
-
bindings = [*lang_values, limit, offset]
-
-
return apsw.ext.format_query_table(
-
self.db_cnx, query, bindings,
-
string_sanitize=2, text_width=9999, use_unicode=True
-
)
···
-107
feeds/homeruns.py
···
-
import logging
-
-
import apsw
-
import apsw.ext
-
-
from . import BaseFeed
-
-
MLBHRS_DID = 'did:plc:pnksqegntq5t3o7pusp2idx3'
-
-
TEAM_ABBR_LOOKUP = {
-
"OAK":"OaklandAthletics",
-
"PIT":"PittsburghPirates",
-
"SDN":"SanDiegoPadres",
-
"SEA":"SeattleMariners",
-
"SFN":"SanFranciscoGiants",
-
"SLN":"StLouisCardinals",
-
"TBA":"TampaBayRays",
-
"TEX":"TexasRangers",
-
"TOR":"TorontoBlueJays",
-
"MIN":"MinnesotaTwins",
-
"PHI":"PhiladelphiaPhillies",
-
"ATL":"AtlantaBraves",
-
"CHA":"ChicagoWhiteSox",
-
"MIA":"MiamiMarlins",
-
"NYA":"NewYorkYankees",
-
"MIL":"MilwaukeeBrewers",
-
"LAA":"LosAngelesAngels",
-
"ARI":"ArizonaDiamondbacks",
-
"BAL":"BaltimoreOrioles",
-
"BOS":"BostonRedSox",
-
"CHN":"ChicagoCubs",
-
"CIN":"CincinnatiReds",
-
"CLE":"ClevelandGuardians",
-
"COL":"ColoradoRockies",
-
"DET":"DetroitTigers",
-
"HOU":"HoustonAstros",
-
"KCA":"KansasCityRoyals",
-
"LAN":"LosAngelesDodgers",
-
"WAS":"WashingtonNationals",
-
"NYN":"NewYorkMets",
-
}
-
-
class HomeRunsTeamFeed(BaseFeed):
-
FEED_URI = 'at://did:plc:pnksqegntq5t3o7pusp2idx3/app.bsky.feed.generator/team:*'
-
-
def __init__(self):
-
self.db_cnx = apsw.Connection('db/homeruns.db')
-
self.db_cnx.pragma('journal_mode', 'WAL')
-
self.db_cnx.pragma('wal_autocheckpoint', '0')
-
-
with self.db_cnx:
-
self.db_cnx.execute("""
-
create table if not exists posts (uri text, tag text);
-
create index if not exists tag_idx on posts(tag);
-
""")
-
-
self.logger = logging.getLogger('feeds.homeruns')
-
-
def process_commit(self, commit):
-
if commit['did'] != MLBHRS_DID:
-
return
-
-
if commit['opType'] != 'c':
-
return
-
-
if commit['collection'] != 'app.bsky.feed.post':
-
return
-
-
record = commit.get('record')
-
if record is None:
-
return
-
-
uri = 'at://{repo}/app.bsky.feed.post/{rkey}'.format(
-
repo = commit['did'],
-
rkey = commit['rkey']
-
)
-
tags = record.get('tags', [])
-
-
self.logger.debug(f'adding {uri!r} under {tags!r}')
-
-
with self.db_cnx:
-
for tag in tags:
-
self.db_cnx.execute(
-
"insert into posts (uri, tag) values (:uri, :tag)",
-
dict(uri=uri, tag=tag)
-
)
-
-
def commit_changes(self):
-
self.logger.debug('committing changes')
-
self.wal_checkpoint(self.db_cnx, 'RESTART')
-
-
def serve_wildcard_feed(self, feed_uri, limit, offset, langs):
-
prefix, sep, team_abbr = feed_uri.rpartition(':')
-
team_tag = TEAM_ABBR_LOOKUP[team_abbr]
-
-
cur = self.db_cnx.execute("""
-
select uri
-
from posts
-
where tag = :tag
-
order by uri desc
-
limit :limit offset :offset
-
""", dict(tag=team_tag, limit=limit, offset=offset))
-
-
return [uri for (uri,) in cur]
-
-
def serve_wildcard_feed_debug(self, feed_uri, limit, offset, langs):
-
pass
···
-35
feeds/norazone_interesting.py
···
-
import logging
-
-
from atproto import Client, models
-
import apsw
-
import apsw.ext
-
-
from . import BaseFeed
-
-
# https://bsky.app/profile/nora.zone/post/3kv35hqi4a22b
-
TARGET_QUOTE_URI = 'at://did:plc:4qqizocrnriintskkh6trnzv/app.bsky.feed.post/3kv35hqi4a22b'
-
-
class NoraZoneInteresting(BaseFeed):
-
FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/nz-interesting'
-
-
def __init__(self):
-
self.client = Client('https://public.api.bsky.app')
-
-
def process_commit(self, commit):
-
pass
-
-
def commit_changes(self):
-
pass
-
-
def serve_feed(self, limit, cursor, langs):
-
quotes = self.client.app.bsky.feed.get_quotes(
-
models.AppBskyFeedGetQuotes.Params(
-
uri = TARGET_QUOTE_URI,
-
limit = limit,
-
cursor = cursor,
-
)
-
)
-
return {
-
'cursor': quotes.cursor,
-
'feed': [dict(post=post.uri) for post in quotes.posts],
-
}
···
-68
feeds/outlinetags.py
···
-
import logging
-
-
import apsw
-
import apsw.ext
-
-
from . import BaseFeed
-
-
class OutlineTagsFeed(BaseFeed):
-
FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/outline'
-
SERVE_FEED_QUERY = """
-
select uri, create_ts
-
from posts
-
order by create_ts desc
-
limit :limit offset :offset
-
"""
-
-
def __init__(self):
-
self.db_cnx = apsw.Connection('db/outlinetags.db')
-
self.db_cnx.pragma('journal_mode', 'WAL')
-
self.db_cnx.pragma('wal_autocheckpoint', '0')
-
-
with self.db_cnx:
-
self.db_cnx.execute("""
-
create table if not exists posts (uri text, create_ts timestamp);
-
create unique index if not exists create_ts_idx on posts(create_ts);
-
""")
-
-
self.logger = logging.getLogger('feeds.outlinetags')
-
-
def process_commit(self, commit):
-
if commit['opType'] != 'c':
-
return
-
-
if commit['collection'] != 'app.bsky.feed.post':
-
return
-
-
record = commit.get('record')
-
if record is None:
-
return
-
-
if not record.get('tags', []):
-
return
-
-
repo = commit['did']
-
rkey = commit['rkey']
-
post_uri = f'at://{repo}/app.bsky.feed.post/{rkey}'
-
ts = self.safe_timestamp(record.get('createdAt')).timestamp()
-
self.transaction_begin(self.db_cnx)
-
self.db_cnx.execute(
-
'insert into posts (uri, create_ts) values (:uri, :ts)',
-
dict(uri=post_uri, ts=ts)
-
)
-
-
def commit_changes(self):
-
self.logger.debug('committing changes')
-
self.transaction_commit(self.db_cnx)
-
self.wal_checkpoint(self.db_cnx, 'RESTART')
-
-
def serve_feed(self, limit, offset, langs):
-
cur = self.db_cnx.execute(self.SERVE_FEED_QUERY, dict(limit=limit, offset=offset))
-
return [row[0] for row in cur]
-
-
def serve_feed_debug(self, limit, offset, langs):
-
bindings = dict(limit=limit, offset=offset)
-
return apsw.ext.format_query_table(
-
self.db_cnx, self.SERVE_FEED_QUERY, bindings,
-
string_sanitize=2, text_width=9999, use_unicode=True
-
)
···
-90
feeds/popqp.py
···
-
import logging
-
-
import apsw
-
import apsw.ext
-
-
from . import BaseFeed
-
-
class PopularQuotePostsFeed(BaseFeed):
-
FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/popqp'
-
SERVE_FEED_QUERY = """
-
select uri, create_ts, update_ts, quote_count, exp( -1 * ( ( unixepoch('now') - create_ts ) / 10800.0 ) ) as decay,
-
quote_count * exp( -1 * ( ( unixepoch('now') - create_ts ) / 10800.0 ) ) as score
-
from posts
-
order by quote_count * exp( -1 * ( ( unixepoch('now') - create_ts ) / 10800.0 ) ) desc
-
limit :limit offset :offset
-
"""
-
DELETE_OLD_POSTS_QUERY = """
-
delete from posts where
-
quote_count * exp( -1 * ( ( unixepoch('now') - create_ts ) / 10800.0 ) ) < 1.0
-
and create_ts < unixepoch('now', '-24 hours')
-
"""
-
-
def __init__(self):
-
self.db_cnx = apsw.Connection('db/popqp.db')
-
self.db_cnx.pragma('journal_mode', 'WAL')
-
self.db_cnx.pragma('wal_autocheckpoint', '0')
-
-
with self.db_cnx:
-
self.db_cnx.execute("""
-
create table if not exists posts (
-
uri text, create_ts timestamp, update_ts timestamp, quote_count int
-
);
-
create unique index if not exists uri_idx on posts(uri);
-
""")
-
-
self.logger = logging.getLogger('feeds.popqp')
-
-
def process_commit(self, commit):
-
if commit['opType'] != 'c':
-
return
-
-
if commit['collection'] != 'app.bsky.feed.post':
-
return
-
-
record = commit.get('record')
-
if record is None:
-
return
-
-
embed = record.get('embed')
-
if embed is None:
-
return
-
-
embed_type = embed.get('$type')
-
if embed_type == 'app.bsky.embed.record':
-
quote_post_uri = embed['record']['uri']
-
elif embed_type == 'app.bsky.embed.recordWithMedia':
-
quote_post_uri = embed['record']['record']['uri']
-
else:
-
return
-
-
ts = self.safe_timestamp(record.get('createdAt')).timestamp()
-
self.transaction_begin(self.db_cnx)
-
-
self.db_cnx.execute("""
-
insert into posts (uri, create_ts, update_ts, quote_count)
-
values (:uri, :ts, :ts, 1)
-
on conflict (uri) do
-
update set quote_count = quote_count + 1, update_ts = :ts
-
""", dict(uri=quote_post_uri, ts=ts))
-
-
def delete_old_posts(self):
-
self.db_cnx.execute(self.DELETE_OLD_POSTS_QUERY)
-
self.logger.debug('deleted {} old posts'.format(self.db_cnx.changes()))
-
-
def commit_changes(self):
-
self.delete_old_posts()
-
self.logger.debug('committing changes')
-
self.transaction_commit(self.db_cnx)
-
self.wal_checkpoint(self.db_cnx, 'RESTART')
-
-
def serve_feed(self, limit, offset, langs):
-
cur = self.db_cnx.execute(self.SERVE_FEED_QUERY, dict(limit=limit, offset=offset))
-
return [row[0] for row in cur]
-
-
def serve_feed_debug(self, limit, offset, langs):
-
bindings = dict(limit=limit, offset=offset)
-
return apsw.ext.format_query_table(
-
self.db_cnx, self.SERVE_FEED_QUERY, bindings,
-
string_sanitize=2, text_width=9999, use_unicode=True
-
)
···
-98
feeds/rapidfire.py
···
-
import logging
-
-
import apsw
-
import apsw.ext
-
import grapheme
-
-
from . import BaseFeed
-
-
MAX_TEXT_LENGTH = 140
-
-
class RapidFireFeed(BaseFeed):
-
FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/rapidfire'
-
-
def __init__(self):
-
self.db_cnx = apsw.Connection('db/rapidfire.db')
-
self.db_cnx.pragma('journal_mode', 'WAL')
-
self.db_cnx.pragma('wal_autocheckpoint', '0')
-
-
with self.db_cnx:
-
self.db_cnx.execute("""
-
create table if not exists posts (uri text, create_ts timestamp, lang text);
-
create index if not exists create_ts_idx on posts(create_ts);
-
""")
-
-
self.logger = logging.getLogger('feeds.rapidfire')
-
-
def process_commit(self, commit):
-
if commit['opType'] != 'c':
-
return
-
-
if commit['collection'] != 'app.bsky.feed.post':
-
return
-
-
record = commit.get('record')
-
if record is None:
-
return
-
-
if all([
-
grapheme.length(record.get('text', '')) <= MAX_TEXT_LENGTH,
-
record.get('reply') is None,
-
record.get('embed') is None,
-
record.get('facets') is None
-
]):
-
repo = commit['did']
-
rkey = commit['rkey']
-
post_uri = f'at://{repo}/app.bsky.feed.post/{rkey}'
-
ts = self.safe_timestamp(record.get('createdAt')).timestamp()
-
-
self.transaction_begin(self.db_cnx)
-
-
langs = record.get('langs') or ['']
-
for lang in langs:
-
self.db_cnx.execute(
-
'insert into posts (uri, create_ts, lang) values (:uri, :ts, :lang)',
-
dict(uri=post_uri, ts=ts, lang=lang)
-
)
-
-
def delete_old_posts(self):
-
self.db_cnx.execute(
-
"delete from posts where create_ts < unixepoch('now', '-15 minutes')"
-
)
-
self.logger.debug('deleted {} old posts'.format(self.db_cnx.changes()))
-
-
def commit_changes(self):
-
self.delete_old_posts()
-
self.logger.debug('committing changes')
-
self.transaction_commit(self.db_cnx)
-
self.wal_checkpoint(self.db_cnx, 'RESTART')
-
-
def serve_feed(self, limit, offset, langs):
-
if '*' in langs:
-
cur = self.db_cnx.execute(
-
"select uri from posts order by create_ts desc limit :limit offset :offset",
-
dict(limit=limit, offset=offset)
-
)
-
return [uri for (uri,) in cur]
-
else:
-
lang_values = list(langs.values())
-
lang_selects = ['select uri, create_ts from posts where lang = ?'] * len(lang_values)
-
lang_clause = ' union '.join(lang_selects)
-
cur = self.db_cnx.execute(
-
lang_clause + ' order by create_ts desc limit ? offset ?',
-
[*lang_values, limit, offset]
-
)
-
return [uri for (uri, create_ts) in cur]
-
-
def serve_feed_debug(self, limit, offset, langs):
-
query = """
-
select *, unixepoch('now') - create_ts as age_seconds
-
from posts
-
order by create_ts desc
-
limit :limit offset :offset
-
"""
-
bindings = dict(limit=limit, offset=offset)
-
return apsw.ext.format_query_table(
-
self.db_cnx, query, bindings,
-
string_sanitize=2, text_width=9999, use_unicode=True
-
)
···
-144
feeds/ratio.py
···
-
import logging
-
-
import apsw
-
import apsw.ext
-
-
from . import BaseFeed
-
-
class RatioFeed(BaseFeed):
-
FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/ratio'
-
SERVE_FEED_QUERY = """
-
with served as (
-
select
-
uri,
-
create_ts,
-
( unixepoch('now') - create_ts ) as age_seconds,
-
replies,
-
quoteposts,
-
likes,
-
reposts,
-
( replies + quoteposts ) / ( likes + reposts + 1 ) as ratio,
-
exp( -1 * ( ( unixepoch('now') - create_ts ) / ( 3600.0 * 16 ) ) ) as decay
-
from posts
-
)
-
select
-
*,
-
( ratio * decay ) as score
-
from served
-
where replies > 15 and ratio > 2.5
-
order by score desc
-
limit :limit offset :offset
-
"""
-
DELETE_OLD_POSTS_QUERY = """
-
delete from posts
-
where
-
create_ts < unixepoch('now', '-5 days')
-
"""
-
-
def __init__(self):
-
self.db_cnx = apsw.Connection('db/ratio.db')
-
self.db_cnx.pragma('journal_mode', 'WAL')
-
self.db_cnx.pragma('wal_autocheckpoint', '0')
-
-
with self.db_cnx:
-
self.db_cnx.execute("""
-
create table if not exists posts (
-
uri text, create_ts timestamp,
-
replies float, likes float, reposts float, quoteposts float
-
);
-
create unique index if not exists uri_idx on posts(uri);
-
""")
-
-
self.logger = logging.getLogger('feeds.ratio')
-
-
def process_commit(self, commit):
-
if commit['opType'] != 'c':
-
return
-
-
subject_uri = None
-
is_reply = False
-
is_quotepost = False
-
-
if commit['collection'] in {'app.bsky.feed.like', 'app.bsky.feed.repost'}:
-
record = commit.get('record')
-
ts = self.safe_timestamp(record.get('createdAt')).timestamp()
-
try:
-
subject_uri = record['subject']['uri']
-
except KeyError:
-
return
-
elif commit['collection'] == 'app.bsky.feed.post':
-
record = commit.get('record')
-
ts = self.safe_timestamp(record.get('createdAt')).timestamp()
-
if record.get('reply') is not None:
-
is_reply = True
-
try:
-
subject_uri = record['reply']['parent']['uri']
-
except KeyError:
-
return
-
-
# only count non-OP replies
-
if subject_uri.startswith('at://' + commit['did']):
-
return
-
-
elif record.get('embed') is not None:
-
is_quotepost = True
-
t = record['embed']['$type']
-
if t == 'app.bsky.embed.record':
-
try:
-
subject_uri = record['embed']['record']['uri']
-
except KeyError:
-
return
-
elif t == 'app.bsky.embed.recordWithMedia':
-
try:
-
subject_uri = record['embed']['record']['record']['uri']
-
except KeyError:
-
return
-
-
if subject_uri is None:
-
return
-
-
params = {
-
'uri': subject_uri,
-
'ts': ts,
-
'is_reply': int(is_reply),
-
'is_like': int(commit['collection'] == 'app.bsky.feed.like'),
-
'is_repost': int(commit['collection'] == 'app.bsky.feed.repost'),
-
'is_quotepost': int(is_quotepost),
-
}
-
-
self.transaction_begin(self.db_cnx)
-
-
self.db_cnx.execute("""
-
insert into posts(uri, create_ts, replies, likes, reposts, quoteposts)
-
values (:uri, :ts,
-
case when :is_reply then 1 else 0 end,
-
case when :is_like then 1 else 0 end,
-
case when :is_repost then 1 else 0 end,
-
case when :is_quotepost then 1 else 0 end)
-
on conflict(uri)
-
do update set
-
replies = replies + case when :is_reply then 1 else 0 end,
-
likes = likes + case when :is_like then 1 else 0 end,
-
reposts = reposts + case when :is_repost then 1 else 0 end,
-
quoteposts = quoteposts + case when :is_quotepost then 1 else 0 end
-
""", params)
-
-
def delete_old_posts(self):
-
self.db_cnx.execute(self.DELETE_OLD_POSTS_QUERY)
-
-
def commit_changes(self):
-
self.logger.debug('committing changes')
-
self.delete_old_posts()
-
self.transaction_commit(self.db_cnx)
-
self.wal_checkpoint(self.db_cnx, 'RESTART')
-
-
def serve_feed(self, limit, offset, langs):
-
cur = self.db_cnx.execute(self.SERVE_FEED_QUERY, dict(limit=limit, offset=offset))
-
return [row[0] for row in cur]
-
-
def serve_feed_debug(self, limit, offset, langs):
-
bindings = dict(limit=limit, offset=offset)
-
return apsw.ext.format_query_table(
-
self.db_cnx, self.SERVE_FEED_QUERY, bindings,
-
string_sanitize=2, text_width=9999, use_unicode=True
-
)
···
-65
feedweb.py
···
-
#!/usr/bin/env python3
-
-
from flask import Flask, request, jsonify
-
from prometheus_client import Counter, make_wsgi_app
-
from werkzeug.middleware.dispatcher import DispatcherMiddleware
-
from werkzeug.datastructures import LanguageAccept
-
-
from feed_manager import feed_manager
-
-
feed_requests = Counter('feed_requests', 'requests by feed URI', ['feed'])
-
-
app = Flask(__name__)
-
-
@app.route('/xrpc/app.bsky.feed.getFeedSkeleton')
-
def get_feed_skeleton():
-
try:
-
limit = int(request.args.get('limit', 50))
-
except ValueError:
-
limit = 50
-
-
if 'nz-interesting' in request.args['feed']:
-
offset = request.args.get('cursor')
-
else:
-
try:
-
offset = int(request.args.get('cursor', 0))
-
except ValueError:
-
offset = 0
-
-
feed_uri = request.args['feed']
-
if feed_uri.endswith('-dev'):
-
feed_uri = feed_uri.replace('-dev', '')
-
else:
-
(prefix, sep, rkey) = feed_uri.rpartition('/')
-
feed_requests.labels(rkey).inc()
-
-
if request.args.getlist('langs'):
-
req_langs = request.args.getlist('langs')
-
langs = LanguageAccept([(l, 1) for l in req_langs])
-
else:
-
langs = request.accept_languages
-
-
if request.args.get('debug', '0') == '1':
-
headers = {'Content-Type': 'text/plain; charset=utf-8'}
-
debug = feed_manager.serve_feed(feed_uri, limit, offset, langs, debug=True)
-
return debug, headers
-
-
posts = feed_manager.serve_feed(feed_uri, limit, offset, langs, debug=False)
-
if isinstance(posts, dict):
-
return posts
-
-
if len(posts) < limit:
-
return dict(feed=[dict(post=uri) for uri in posts])
-
else:
-
offset += len(posts)
-
return dict(cursor=str(offset), feed=[dict(post=uri) for uri in posts])
-
-
app.wsgi_app = DispatcherMiddleware(app.wsgi_app, {
-
'/metrics': make_wsgi_app()
-
})
-
-
if __name__ == '__main__':
-
from feedweb_utils import did_doc
-
app.add_url_rule('/.well-known/did.json', view_func=did_doc)
-
-
app.run(debug=True)
···
-14
feedweb_utils.py
···
-
NGROK_HOSTNAME = 'routinely-right-barnacle.ngrok-free.app'
-
-
def did_doc():
-
return {
-
'@context': ['https://www.w3.org/ns/did/v1'],
-
'id': f'did:web:{NGROK_HOSTNAME}',
-
'service': [
-
{
-
'id': '#bsky_fg',
-
'type': 'BskyFeedGenerator',
-
'serviceEndpoint': f'https://{NGROK_HOSTNAME}',
-
},
-
],
-
}
···
+102
cmd/plc-activity/main.go
···
···
+
package main
+
+
import (
+
"context"
+
"encoding/json"
+
"io"
+
"log"
+
"net/http"
+
"strings"
+
"time"
+
+
"github.com/bluesky-social/indigo/atproto/syntax"
+
"github.com/redis/go-redis/v9"
+
)
+
+
const PlcExportUrl = `https://plc.directory/export`
+
const PlcOpsCountKey = `dev.edavis.muninsky.plc_ops`
+
+
func main() {
+
ctx := context.Background()
+
client := http.DefaultClient
+
+
rdb := redis.NewClient(&redis.Options{
+
Addr: "localhost:6379",
+
Password: "",
+
DB: 0,
+
})
+
+
req, err := http.NewRequestWithContext(ctx, "GET", PlcExportUrl, nil)
+
if err != nil {
+
panic(err)
+
}
+
+
var lastCursor string
+
var cursor string
+
cursor = syntax.DatetimeNow().String()
+
+
q := req.URL.Query()
+
q.Add("count", "1000")
+
req.URL.RawQuery = q.Encode()
+
+
for {
+
q := req.URL.Query()
+
if cursor != "" {
+
q.Set("after", cursor)
+
}
+
req.URL.RawQuery = q.Encode()
+
+
log.Printf("requesting %s\n", req.URL.String())
+
resp, err := client.Do(req)
+
if err != nil {
+
log.Printf("error doing PLC request: %v\n", err)
+
}
+
if resp.StatusCode != http.StatusOK {
+
log.Printf("PLC request failed status=%d\n", resp.StatusCode)
+
}
+
+
respBytes, err := io.ReadAll(resp.Body)
+
if err != nil {
+
log.Printf("error reading response body: %v\n", err)
+
}
+
+
lines := strings.Split(string(respBytes), "\n")
+
if len(lines) == 0 || (len(lines) == 1 && len(lines[0]) == 0) {
+
time.Sleep(5 * time.Second)
+
continue
+
}
+
+
var opCount int64
+
for _, l := range lines {
+
if len(l) < 2 {
+
break
+
}
+
+
var op map[string]interface{}
+
err = json.Unmarshal([]byte(l), &op)
+
if err != nil {
+
log.Printf("error decoding JSON: %v\n", err)
+
}
+
+
var ok bool
+
cursor, ok = op["createdAt"].(string)
+
if !ok {
+
log.Printf("missing createdAt")
+
}
+
+
if cursor == lastCursor {
+
continue
+
}
+
+
opCount += 1
+
lastCursor = cursor
+
}
+
+
log.Printf("fetched %d operations", opCount)
+
if _, err := rdb.IncrBy(ctx, PlcOpsCountKey, opCount).Result(); err != nil {
+
log.Printf("error incrementing op count in redis: %v\n", err)
+
}
+
+
time.Sleep(5 * time.Second)
+
}
+
}
+1 -1
autoposters/bskycharts.py
···
def main():
-
client = atproto.Client()
client.login(BSKY_HANDLE, BSKY_APP_PASSWORD)
resp = requests.get(BSKY_ACTIVITY_IMAGE_URL)
···
def main():
+
client = atproto.Client('https://pds.merklehost.xyz')
client.login(BSKY_HANDLE, BSKY_APP_PASSWORD)
resp = requests.get(BSKY_ACTIVITY_IMAGE_URL)
+6 -6
service/bsky-activity.service
···
[Unit]
-
Description=Bsky Activity
After=network.target syslog.target
[Service]
Type=simple
-
User=eric
-
WorkingDirectory=/home/eric/bsky-tools
-
ExecStart=/home/eric/.local/bin/pipenv run ./bsky-activity.py
TimeoutSec=15
-
Restart=on-failure
-
RestartSec=1
StandardOutput=journal
[Install]
···
[Unit]
+
Description=bsky activity
After=network.target syslog.target
[Service]
Type=simple
+
User=ubuntu
+
WorkingDirectory=/home/ubuntu/bsky-tools
+
ExecStart=/home/ubuntu/bsky-tools/bin/bsky-activity
TimeoutSec=15
+
Restart=always
+
RestartSec=30
StandardOutput=journal
[Install]
+55 -8
cmd/bsky-activity/main.go
···
"os"
"os/signal"
"strings"
"syscall"
appbsky "github.com/bluesky-social/indigo/api/bsky"
jetstream "github.com/bluesky-social/jetstream/pkg/models"
···
"github.com/redis/go-redis/v9"
)
const JetstreamUrl = `wss://jetstream1.us-west.bsky.network/subscribe`
var AppBskyAllowlist = map[string]bool{
···
return false
}
-
func handler(ctx context.Context, events <-chan jetstream.Event) {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
···
var eventCount int
eventLoop:
-
for event := range events {
select {
case <-ctx.Done():
break eventLoop
default:
}
if event.Kind != jetstream.EventKindCommit {
continue
}
···
if _, err := pipe.Exec(ctx); err != nil {
log.Printf("failed to exec pipe\n")
}
}
}
}
···
log.Printf("websocket closed\n")
}()
-
jetstreamEvents := make(chan jetstream.Event)
-
go handler(ctx, jetstreamEvents)
log.Printf("starting up\n")
-
var event jetstream.Event
go func() {
for {
-
event = jetstream.Event{}
err := conn.ReadJSON(&event)
if err != nil {
log.Printf("ReadJSON error: %v\n", err)
stop()
break
-
} else {
-
jetstreamEvents <- event
}
}
}()
···
"os"
"os/signal"
"strings"
+
"sync"
"syscall"
+
"time"
appbsky "github.com/bluesky-social/indigo/api/bsky"
jetstream "github.com/bluesky-social/jetstream/pkg/models"
···
"github.com/redis/go-redis/v9"
)
+
type Queue struct {
+
lk sync.Mutex
+
events []jetstream.Event
+
}
+
+
func NewQueue(capacity int) *Queue {
+
return &Queue{
+
events: make([]jetstream.Event, 0, capacity),
+
}
+
}
+
+
func (q *Queue) Enqueue(event jetstream.Event) {
+
q.lk.Lock()
+
defer q.lk.Unlock()
+
+
q.events = append(q.events, event)
+
}
+
+
func (q *Queue) Dequeue() (jetstream.Event, bool) {
+
q.lk.Lock()
+
defer q.lk.Unlock()
+
+
var event jetstream.Event
+
+
if len(q.events) == 0 {
+
return event, false
+
}
+
+
event = q.events[0]
+
q.events = q.events[1:]
+
return event, true
+
}
+
+
func (q *Queue) Size() int {
+
q.lk.Lock()
+
defer q.lk.Unlock()
+
+
return len(q.events)
+
}
+
const JetstreamUrl = `wss://jetstream1.us-west.bsky.network/subscribe`
var AppBskyAllowlist = map[string]bool{
···
return false
}
+
func handler(ctx context.Context, queue *Queue) {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
···
var eventCount int
eventLoop:
+
for {
select {
case <-ctx.Done():
break eventLoop
default:
}
+
event, ok := queue.Dequeue()
+
if !ok {
+
time.Sleep(100 * time.Millisecond)
+
continue
+
}
+
if event.Kind != jetstream.EventKindCommit {
continue
}
···
if _, err := pipe.Exec(ctx); err != nil {
log.Printf("failed to exec pipe\n")
}
+
log.Printf("queue size: %d\n", queue.Size())
}
}
}
···
log.Printf("websocket closed\n")
}()
+
queue := NewQueue(100_000)
+
go handler(ctx, queue)
log.Printf("starting up\n")
go func() {
for {
+
var event jetstream.Event
err := conn.ReadJSON(&event)
if err != nil {
log.Printf("ReadJSON error: %v\n", err)
stop()
break
}
+
queue.Enqueue(event)
}
}()
+11 -3
go.mod
···
module github.com/edavis/bsky-tools
-
go 1.23.0
require (
-
github.com/bluesky-social/indigo v0.0.0-20240905024844-a4f38639767f
github.com/bluesky-social/jetstream v0.0.0-20241020000921-dcd43344c716
github.com/gorilla/websocket v1.5.1
github.com/mattn/go-sqlite3 v1.14.22
github.com/pemistahl/lingua-go v1.4.0
github.com/redis/go-redis/v9 v9.3.0
)
require (
github.com/carlmjohnson/versioninfo v0.22.5 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
···
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
-
github.com/whyrusleeping/cbor-gen v0.1.3-0.20240904181319-8dc02b38228c // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
···
module github.com/edavis/bsky-tools
+
go 1.24
+
+
toolchain go1.24.7
require (
+
github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f
github.com/bluesky-social/jetstream v0.0.0-20241020000921-dcd43344c716
+
github.com/fxamacker/cbor/v2 v2.9.0
github.com/gorilla/websocket v1.5.1
github.com/mattn/go-sqlite3 v1.14.22
github.com/pemistahl/lingua-go v1.4.0
github.com/redis/go-redis/v9 v9.3.0
+
github.com/urfave/cli/v2 v2.26.0
)
require (
github.com/carlmjohnson/versioninfo v0.22.5 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
+
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
···
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect
+
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
+
github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e // indirect
+
github.com/x448/float16 v0.8.4 // indirect
+
github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
+16 -4
go.sum
···
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
-
github.com/bluesky-social/indigo v0.0.0-20240905024844-a4f38639767f h1:Q9cfCAlYWIWPsSDhg5w6qcutQ7YaJtfTjiRLP/mw+pc=
-
github.com/bluesky-social/indigo v0.0.0-20240905024844-a4f38639767f/go.mod h1:Zx9nSWgd/FxMenkJW07VKnzspxpHBdPrPmS+Fspl2I0=
github.com/bluesky-social/jetstream v0.0.0-20241020000921-dcd43344c716 h1:I8+VaZKaNIGCPGXE2/VXzJGlPFEZgiFLjnge+OWFl5w=
github.com/bluesky-social/jetstream v0.0.0-20241020000921-dcd43344c716/go.mod h1:/dE2dmFell/m4zxgIbH3fkiqZ1obzr/ETj4RpgomgMs=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
···
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
···
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
···
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
···
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
-
github.com/whyrusleeping/cbor-gen v0.1.3-0.20240904181319-8dc02b38228c h1:UsxJNcLPfyLyVaA4iusIrsLAqJn/xh36Qgb8emqtXzk=
-
github.com/whyrusleeping/cbor-gen v0.1.3-0.20240904181319-8dc02b38228c/go.mod h1:pM99HXyEbSQHcosHc0iW7YFmwnscr+t9Te4ibko05so=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
···
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
+
github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f h1:FugOoTzh0nCMTWGqNGsjttFWVPcwxaaGD3p/nE9V8qY=
+
github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f/go.mod h1:n6QE1NDPFoi7PRbMUZmc2y7FibCqiVU4ePpsvhHUBR8=
github.com/bluesky-social/jetstream v0.0.0-20241020000921-dcd43344c716 h1:I8+VaZKaNIGCPGXE2/VXzJGlPFEZgiFLjnge+OWFl5w=
github.com/bluesky-social/jetstream v0.0.0-20241020000921-dcd43344c716/go.mod h1:/dE2dmFell/m4zxgIbH3fkiqZ1obzr/ETj4RpgomgMs=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
···
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
+
github.com/cpuguy83/go-md2man/v2 v2.0.3 h1:qMCsGGgs+MAzDFyp9LpAe1Lqy/fY/qCovCm0qnXZOBM=
+
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
···
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
+
github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM=
+
github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
···
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
+
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
···
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
+
github.com/urfave/cli/v2 v2.26.0 h1:3f3AMg3HpThFNT4I++TKOejZO8yU55t3JnnSr4S4QEI=
+
github.com/urfave/cli/v2 v2.26.0/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
+
github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e h1:28X54ciEwwUxyHn9yrZfl5ojgF4CBNLWX7LR0rvBkf4=
+
github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e/go.mod h1:pM99HXyEbSQHcosHc0iW7YFmwnscr+t9Te4ibko05so=
+
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
+
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
+
github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e h1:+SOyEddqYF09QP7vr7CgJ1eti3pY9Fn3LHO1M1r/0sI=
+
github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+24
cmd/bsky-modactions/main.go
···
"net/http"
"os"
"os/signal"
comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/fxamacker/cbor/v2"
···
}
}()
<-ctx.Done()
stop()
slog.Info("shutting down")
return nil
}
···
"net/http"
"os"
"os/signal"
+
"time"
comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/fxamacker/cbor/v2"
···
}
}()
+
mux := http.NewServeMux()
+
mux.HandleFunc("/config", configHandler)
+
mux.HandleFunc("/", valueHandler)
+
+
srv := &http.Server{
+
Addr: "127.0.0.1:4456",
+
Handler: mux,
+
}
+
+
go func() {
+
if err := srv.ListenAndServe(); err != nil {
+
slog.Error("error starting HTTP server", "err", err)
+
return
+
}
+
}()
+
<-ctx.Done()
stop()
slog.Info("shutting down")
+
endctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
+
defer cancel()
+
+
if err := srv.Shutdown(endctx); err != nil {
+
slog.Error("error shutting down server", "err", err)
+
}
+
return nil
}