social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

breaking: drop pds input in favor of jetstream. remove uri as ids instead of uri + cid json, attach cid as extra data

zenfyr.dev 31a048ee 9d485c2e

verified
Changed files
+145 -127
bluesky
util
+3 -3
bluesky/common.py
···
self.post = post
self.tokens = tokens
-
self.id = json.dumps(self.post['$xpost.strongRef'], sort_keys=True)
+
self.id = post['$xpost.strongRef']['uri']
self.parent_id = None
if self.post.get('reply'):
-
self.parent_id = json.dumps(self.post['reply']['parent'], sort_keys=True)
+
self.parent_id = self.post['reply']['parent']['uri']
labels = self.post.get('labels', {}).get('values')
self.cw = ''
···
return "text/plain"
def get_post_url(self) -> str | None:
-
at_uri: str = self.post['$xpost.strongRef']['uri'][len("at://"):]
+
at_uri: str = self.post['$xpost.uri'][len("at://"):]
parts = at_uri.split("/")
did, _, post_id = parts
+72 -107
bluesky/input.py
···
-
import re, json
+
import re, json, websockets, asyncio
-
from atproto import AsyncFirehoseSubscribeReposClient, CAR
from atproto_client import models
from atproto_client.models.utils import get_or_create as get_model_or_create
-
from atproto_firehose import models as firehose_models, parse_subscribe_repos_message as parse_firehose
from bluesky.atproto2 import resolve_identity
from bluesky.common import BlueskyPost, SERVICE, tokenize_post
···
super().__init__(SERVICE, did, settings, db)
def _on_post(self, outputs: list[cross.Output], post: dict[str, Any]):
-
post_ref = json.dumps(post['$xpost.strongRef'], sort_keys=True)
+
post_uri = post['$xpost.strongRef']['uri']
+
post_cid = post['$xpost.strongRef']['cid']
-
parent_ref = None
+
parent_uri = None
if post.get('reply'):
-
parent_ref = json.dumps(post['reply']['parent'], sort_keys=True)
+
parent_uri = post['reply']['parent']['uri']
embed = post.get('embed', {})
if embed.get('$type') in ('app.bsky.embed.record', 'app.bsky.embed.recordWithMedia'):
did, collection, rid = str(embed['record']['uri'][len('at://'):]).split('/')
if collection == 'app.bsky.feed.post':
-
LOGGER.info("Skipping '%s'! Quote..", post_ref)
+
LOGGER.info("Skipping '%s'! Quote..", post_uri)
return
-
success = database.try_insert_post(self.db, post_ref, parent_ref, self.user_id, self.service)
+
success = database.try_insert_post(self.db, post_uri, parent_uri, self.user_id, self.service)
if not success:
-
LOGGER.info("Skipping '%s' as parent post was not found in db!", post_ref)
+
LOGGER.info("Skipping '%s' as parent post was not found in db!", post_uri)
return
+
database.store_data(self.db, post_uri, self.user_id, self.service, {'cid': post_cid})
tokens = tokenize_post(post)
if not cross.test_filters(tokens, self.options.filters):
-
LOGGER.info("Skipping '%s'. Matched a filter!", post_ref)
+
LOGGER.info("Skipping '%s'. Matched a filter!", post_uri)
return
-
LOGGER.info("Crossposting '%s'...", post_ref)
+
LOGGER.info("Crossposting '%s'...", post_uri)
def get_blob_url(blob: str):
return f'{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.user_id}&cid={blob}'
···
LOGGER.info("Downloading %s...", url)
io = download_media(url, image.alt)
if not io:
-
LOGGER.error("Skipping '%s'. Failed to download media!", post_ref)
+
LOGGER.error("Skipping '%s'. Failed to download media!", post_uri)
return
attachments.append(io)
elif embed.get('$type') == 'app.bsky.embed.video':
···
LOGGER.info("Downloading %s...", url)
io = download_media(url, model.alt if model.alt else '')
if not io:
-
LOGGER.error("Skipping '%s'. Failed to download media!", post_ref)
+
LOGGER.error("Skipping '%s'. Failed to download media!", post_uri)
return
attachments.append(io)
···
for output in outputs:
output.accept_post(cross_post)
-
def _on_delete_post(self, outputs: list[cross.Output], post_id: dict, repost: bool):
-
identifier = json.dumps(post_id, sort_keys=True)
-
post = database.find_post(self.db, identifier, self.user_id, self.service)
+
def _on_delete_post(self, outputs: list[cross.Output], post_id: str, repost: bool):
+
post = database.find_post(self.db, post_id, self.user_id, self.service)
if not post:
return
-
LOGGER.info("Deleting '%s'...", identifier)
+
LOGGER.info("Deleting '%s'...", post_id)
if repost:
for output in outputs:
-
output.delete_repost(identifier)
+
output.delete_repost(post_id)
else:
for output in outputs:
-
output.delete_post(identifier)
-
database.delete_post(self.db, identifier, self.user_id, self.service)
+
output.delete_post(post_id)
+
database.delete_post(self.db, post_id, self.user_id, self.service)
def _on_repost(self, outputs: list[cross.Output], post: dict[str, Any]):
-
post_ref = json.dumps(post['$xpost.strongRef'], sort_keys=True)
+
post_uri = post['$xpost.strongRef']['uri']
+
post_cid = post['$xpost.strongRef']['cid']
-
reposted_ref = {
-
'cid': post['subject']['cid'],
-
'uri': post['subject']['uri']
-
}
-
reposted_ref = json.dumps(reposted_ref, sort_keys=True)
+
reposted_uri = post['subject']['uri']
-
success = database.try_insert_repost(self.db, post_ref, reposted_ref, self.user_id, self.service)
+
success = database.try_insert_repost(self.db, post_uri, reposted_uri, self.user_id, self.service)
if not success:
-
LOGGER.info("Skipping '%s' as reposted post was not found in db!", post_ref)
+
LOGGER.info("Skipping '%s' as reposted post was not found in db!", post_uri)
return
+
database.store_data(self.db, post_uri, self.user_id, self.service, {'cid': post_cid})
-
LOGGER.info("Crossposting '%s'...", post_ref)
+
LOGGER.info("Crossposting '%s'...", post_uri)
for output in outputs:
-
output.accept_repost(post_ref, reposted_ref)
-
+
output.accept_repost(post_uri, reposted_uri)
-
class BlueskyPdsInput(BlueskyInput):
+
class BlueskyJetstreamInput(BlueskyInput):
def __init__(self, settings: dict, db: DataBaseWorker) -> None:
super().__init__(settings, db)
+
self.jetstream = settings.get("jetstream", "wss://jetstream2.us-east.bsky.network/subscribe")
-
def __on_commit(self, outputs: list[cross.Output], message: firehose_models.MessageFrame):
-
blocks = message.body.get('blocks')
-
if not blocks:
+
def __on_commit(self, outputs: list[cross.Output], msg: dict):
+
if msg.get('did') != self.user_id:
return
-
parsed = parse_firehose(message)
-
if not isinstance(parsed, models.ComAtprotoSyncSubscribeRepos.Commit):
+
commit: dict = msg.get('commit', {})
+
if not commit:
return
-
blocks = parsed.blocks
-
-
car = None
-
def get_lazy_repo() -> CAR:
-
nonlocal car, blocks
+
+
commit_type = commit['operation']
+
match commit_type:
+
case 'create':
+
record = dict(commit.get('record', {}))
+
record['$xpost.strongRef'] = {
+
'cid': commit['cid'],
+
'uri': f'at://{self.user_id}/{commit['collection']}/{commit['rkey']}'
+
}
-
if isinstance(blocks, str):
-
blocks = blocks.encode()
-
assert blocks
-
-
if car:
-
return car
-
car = CAR.from_bytes(blocks)
-
return car
-
-
for op in parsed.ops:
-
if op.action == 'delete':
-
if not op.prev:
-
continue
-
-
if op.path.startswith('app.bsky.feed.post'):
-
self._on_delete_post(outputs, {
-
'cid': op.prev.encode(),
-
'uri': f'at://{parsed.repo}/{op.path}'
-
}, False)
-
elif op.path.startswith('app.bsky.feed.repost'):
-
self._on_delete_post(outputs, {
-
'cid': op.prev.encode(),
-
'uri': f'at://{parsed.repo}/{op.path}'
-
}, True)
-
-
continue
-
-
if op.action != 'create':
-
continue
-
-
if not op.cid:
-
continue
-
-
record_data = get_lazy_repo().blocks.get(op.cid)
-
if not record_data:
-
continue
-
-
record_dict = dict(record_data)
-
record_dict['$xpost.strongRef'] = {
-
'cid': op.cid.encode(),
-
'uri': f'at://{parsed.repo}/{op.path}'
-
}
-
if record_dict['$type'] == 'app.bsky.feed.post':
-
self._on_post(outputs, record_dict)
-
elif record_dict['$type'] == 'app.bsky.feed.repost':
-
self._on_repost(outputs, record_dict)
-
+
match commit['collection']:
+
case 'app.bsky.feed.post':
+
self._on_post(outputs, record)
+
case 'app.bsky.feed.repost':
+
self._on_repost(outputs, record)
+
case 'delete':
+
post_id: str = f'at://{self.user_id}/{commit['collection']}/{commit['rkey']}'
+
match commit['collection']:
+
case 'app.bsky.feed.post':
+
self._on_delete_post(outputs, post_id, False)
+
case 'app.bsky.feed.repost':
+
self._on_delete_post(outputs, post_id, True)
async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
-
streaming: str = f"wss://{self.pds.split("://", 1)[1]}/xrpc"
-
-
client = AsyncFirehoseSubscribeReposClient(base_uri=streaming)
+
uri = self.jetstream + '?'
+
uri += "wantedCollections=app.bsky.feed.post"
+
uri += "&wantedCollections=app.bsky.feed.repost"
+
uri += f"&wantedDids={self.user_id}"
-
async def on_message(message: firehose_models.MessageFrame):
-
if message.header.t != '#commit':
-
return
-
-
if message.body.get('repo') != self.user_id:
-
return
+
async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.3"}):
+
try:
+
LOGGER.info("Listening to %s...", self.jetstream)
+
+
async def listen_for_messages():
+
async for msg in ws:
+
submit(lambda: self.__on_commit(outputs, json.loads(msg)))
-
if message.body.get('tooBig'):
-
LOGGER.error("#commit message is tooBig!")
-
return
-
-
submit(lambda: self.__on_commit(outputs, message))
-
return
-
-
LOGGER.info("Listening to %s...", streaming + '/com.atproto.sync.subscribeRepos')
-
await client.start(on_message)
+
listen = asyncio.create_task(listen_for_messages())
+
+
await asyncio.gather(listen)
+
except websockets.ConnectionClosedError as e:
+
LOGGER.error(e, stack_info=True, exc_info=True)
+
LOGGER.info("Reconnecting to %s...", self.jetstream)
+
continue
+38 -15
bluesky/output.py
···
LOGGER.error("Failed to find thread tuple in the database!")
return None
-
root_ref = json.loads(thread_tuple[0])
-
reply_ref = json.loads(thread_tuple[1])
+
root_uri: str = thread_tuple[0]
+
reply_uri: str = thread_tuple[1]
+
+
root_cid = database.fetch_data(self.db, root_uri, login.did, SERVICE)['cid']
+
reply_cid = database.fetch_data(self.db, root_uri, login.did, SERVICE)['cid']
-
root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(root_ref['uri']), cid=str(root_ref['cid']))
-
reply_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_ref['uri']), cid=str(reply_ref['cid']))
+
root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=root_uri, cid=root_cid)
+
reply_record = models.AppBskyFeedPost.CreateRecordResponse(uri=reply_uri, cid=reply_cid)
return (
models.create_strong_ref(root_record),
···
db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service)
assert db_post, "ghghghhhhh"
-
db_identifiers = [json.dumps(cr.model_dump(), sort_keys=True) for cr in created_records]
-
if new_root_id is None or new_parent_id is None:
new_root_id = database.insert_post(
self.db,
-
db_identifiers[0],
+
created_records[0].uri,
login.did,
SERVICE
)
+
database.store_data(
+
self.db,
+
created_records[0].uri,
+
login.did,
+
SERVICE,
+
{'cid': created_records[0].cid}
+
)
+
new_parent_id = new_root_id
database.insert_mapping(self.db, db_post['id'], new_parent_id)
-
db_identifiers = db_identifiers[1:]
+
created_records = created_records[1:]
-
for db_id in db_identifiers:
+
for record in created_records:
new_parent_id = database.insert_reply(
self.db,
-
db_id,
+
record.uri,
login.did,
SERVICE,
new_parent_id,
new_root_id
)
+
database.store_data(
+
self.db,
+
record.uri,
+
login.did,
+
SERVICE,
+
{'cid': record.cid}
+
)
database.insert_mapping(self.db, db_post['id'], new_parent_id)
def delete_post(self, identifier: str):
···
mappings = database.find_mappings(self.db, post['id'], SERVICE, login.did)
for mapping in mappings[::-1]:
LOGGER.info("Deleting '%s'...", mapping[0])
-
self.bsky.delete_post(json.loads(mapping[0])['uri'])
+
self.bsky.delete_post(mapping[0])
database.delete_post(self.db, mapping[0], SERVICE, login.did)
def accept_repost(self, repost_id: str, reposted_id: str):
···
# mappings of the reposted post
mappings = database.find_mappings(self.db, reposted['id'], SERVICE, login.did)
if mappings:
-
id = json.loads(mappings[0][0])
-
rsp = self.bsky.repost(id['uri'], id['cid'])
+
cid = database.fetch_data(self.db, mappings[0][0], login.did, SERVICE)['cid']
+
rsp = self.bsky.repost(mappings[0][0], cid)
internal_id = database.insert_repost(
self.db,
-
json.dumps(rsp.model_dump(), sort_keys=True),
+
rsp.uri,
reposted['id'],
login.did,
SERVICE)
+
database.store_data(
+
self.db,
+
rsp.uri,
+
login.did,
+
SERVICE,
+
{'cid': rsp.cid}
+
)
database.insert_mapping(self.db, repost['id'], internal_id)
def __delete_repost(self, repost_id: str) -> tuple[models.AppBskyActorDefs.ProfileViewDetailed | None, dict | None]:
···
mappings = database.find_mappings(self.db, repost['id'], SERVICE, login.did)
if mappings:
LOGGER.info("Deleting '%s'...", mappings[0][0])
-
self.bsky.unrepost(json.loads(mappings[0][0])['uri'])
+
self.bsky.unrepost(mappings[0][0])
database.delete_post(self.db, mappings[0][0], login.did, SERVICE)
return login, repost
+7 -2
main.py
···
from util.util import LOGGER, as_json
import cross, util.database as database
-
from bluesky.input import BlueskyPdsInput
+
from bluesky.input import BlueskyJetstreamInput
from bluesky.output import BlueskyOutputOptions, BlueskyOutput
from mastodon.input import MastodonInputOptions, MastodonInput
···
INPUTS = {
"mastodon-wss": lambda settings, db: MastodonInput(settings, db),
"misskey-wss": lambda settigs, db: MisskeyInput(settigs, db),
-
"bluesky-pds-wss": lambda settings, db: BlueskyPdsInput(settings, db)
+
"bluesky-jetstream-wss": lambda settings, db: BlueskyJetstreamInput(settings, db)
}
OUTPUTS = {
···
db_worker.execute("""
ALTER TABLE posts
ADD COLUMN reposted_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL
+
""")
+
if "extra_data" not in column_names:
+
db_worker.execute("""
+
ALTER TABLE posts
+
ADD COLUMN extra_data TEXT NULL
""")
# create the mappings table
+25
util/database.py
···
from concurrent.futures import Future
import threading
import queue
+
import json
class DataBaseWorker():
def __init__(self, database: str) -> None:
···
AND user_id = ?
""", (identifier, serivce, user_id))
+
def fetch_data(db: DataBaseWorker, identifier: str, user_id: str, service: str) -> dict:
+
result = db.execute(
+
"""
+
SELECT extra_data
+
FROM posts
+
WHERE identifier = ?
+
AND user_id = ?
+
AND service = ?
+
""", (identifier, user_id, service))
+
if not result or not result[0]:
+
return {}
+
return json.loads(result[0][0])
+
+
def store_data(db: DataBaseWorker, identifier: str, user_id: str, service: str, extra_data: dict) -> None:
+
db.execute(
+
"""
+
UPDATE posts
+
SET extra_data = ?
+
WHERE identifier = ?
+
AND user_id = ?
+
AND service = ?
+
""",
+
(json.dumps(extra_data), identifier, user_id, service)
+
)
def find_mappings(db: DataBaseWorker, original_post: int, service: str, user_id: str) -> list[str]:
return db.execute(