social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

feat: reposts of own posts; fix: skip quotes from bluesky.

zenfyr.dev 5fa9521f 1d3d8b27

verified
Changed files
+277 -83
bluesky
mastodon
misskey
util
+45 -11
bluesky/input.py
···
parent_ref = None
if post.get('reply'):
parent_ref = json.dumps(post['reply']['parent'], sort_keys=True)
+
+
embed = post.get('embed', {})
+
if embed.get('$type') in ('app.bsky.embed.record', 'app.bsky.embed.recordWithMedia'):
+
did, collection, rid = str(embed['record']['uri'][len('at://'):]).split('/')
+
if collection == 'app.bsky.feed.post':
+
LOGGER.info("Skipping '%s'! Quote..", post_ref)
+
return
success = database.try_insert_post(self.db, post_ref, parent_ref, self.user_id, self.service)
if not success:
···
return f'{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.user_id}&cid={blob}'
attachments: list[MediaInfo] = []
-
embed = post.get('embed', {})
if embed.get('$type') == 'app.bsky.embed.images':
model = get_model_or_create(embed, model=models.AppBskyEmbedImages.Main)
assert isinstance(model, models.AppBskyEmbedImages.Main)
···
cross_post = BlueskyPost(post, tokens, attachments)
for output in outputs:
output.accept_post(cross_post)
-
return
-
def _on_delete_post(self, outputs: list[cross.Output], post_id: dict):
+
def _on_delete_post(self, outputs: list[cross.Output], post_id: dict, repost: bool):
identifier = json.dumps(post_id, sort_keys=True)
post = database.find_post(self.db, identifier, self.user_id, self.service)
if not post:
return
LOGGER.info("Deleting '%s'...", identifier)
-
for output in outputs:
-
output.delete_post(identifier)
+
if repost:
+
for output in outputs:
+
output.delete_repost(identifier)
+
else:
+
for output in outputs:
+
output.delete_post(identifier)
database.delete_post(self.db, identifier, self.user_id, self.service)
+
+
def _on_repost(self, outputs: list[cross.Output], post: dict[str, Any]):
+
post_ref = json.dumps(post['$xpost.strongRef'], sort_keys=True)
+
+
reposted_ref = {
+
'cid': post['subject']['cid'],
+
'uri': post['subject']['uri']
+
}
+
reposted_ref = json.dumps(reposted_ref, sort_keys=True)
+
+
success = database.try_insert_repost(self.db, post_ref, reposted_ref, self.user_id, self.service)
+
if not success:
+
LOGGER.info("Skipping '%s' as reposted post was not found in db!", post_ref)
+
return
+
+
LOGGER.info("Crossposting '%s'...", post_ref)
+
for output in outputs:
+
output.accept_repost(post_ref, reposted_ref)
+
class BlueskyPdsInput(BlueskyInput):
def __init__(self, settings: dict, db: DataBaseWorker) -> None:
···
if not op.prev:
continue
-
if not op.path.startswith('app.bsky.feed.post'):
-
continue
+
if op.path.startswith('app.bsky.feed.post'):
+
self._on_delete_post(outputs, {
+
'cid': op.prev.encode(),
+
'uri': f'at://{parsed.repo}/{op.path}'
+
}, False)
+
elif op.path.startswith('app.bsky.feed.repost'):
+
self._on_delete_post(outputs, {
+
'cid': op.prev.encode(),
+
'uri': f'at://{parsed.repo}/{op.path}'
+
}, True)
-
self._on_delete_post(outputs, {
-
'cid': op.prev.encode(),
-
'uri': f'at://{parsed.repo}/{op.path}'
-
})
continue
if op.action != 'create':
···
}
if record_dict['$type'] == 'app.bsky.feed.post':
self._on_post(outputs, record_dict)
+
elif record_dict['$type'] == 'app.bsky.feed.repost':
+
self._on_repost(outputs, record_dict)
async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
+50 -8
bluesky/output.py
···
)
self.bsky.login(did, as_envvar(settings.get('app-password')))
-
def _find_parent(self, parent_id: str):
+
def __check_login(self):
login = self.bsky.me
if not login:
raise Exception("Client not logged in!")
+
return login
+
+
def _find_parent(self, parent_id: str):
+
login = self.__check_login()
thread_tuple = database.find_mapped_thread(
self.db,
···
return result
def accept_post(self, post: cross.Post):
-
login = self.bsky.me
-
if not login:
-
raise Exception("Client not logged in!")
+
login = self.__check_login()
parent_id = post.get_parent_id()
···
database.insert_mapping(self.db, db_post['id'], new_parent_id)
def delete_post(self, identifier: str):
-
login = self.bsky.me
-
if not login:
-
raise Exception("Client not logged in!")
+
login = self.__check_login()
post = database.find_post(self.db, identifier, self.input.user_id, self.input.service)
if not post:
···
for mapping in mappings[::-1]:
LOGGER.info("Deleting '%s'...", mapping[0])
self.bsky.delete_post(json.loads(mapping[0])['uri'])
-
database.delete_post(self.db, mapping[0], SERVICE, login.did)
+
database.delete_post(self.db, mapping[0], SERVICE, login.did)
+
+
def accept_repost(self, repost_id: str, reposted_id: str):
+
login, repost = self.__delete_repost(repost_id)
+
if not (login and repost):
+
return
+
+
reposted = database.find_post(self.db, reposted_id, self.input.user_id, self.input.service)
+
if not reposted:
+
return
+
+
# mappings of the reposted post
+
mappings = database.find_mappings(self.db, reposted['id'], SERVICE, login.did)
+
if mappings:
+
id = json.loads(mappings[0][0])
+
rsp = self.bsky.repost(id['uri'], id['cid'])
+
+
internal_id = database.insert_repost(
+
self.db,
+
json.dumps(rsp.model_dump(), sort_keys=True),
+
reposted['id'],
+
login.did,
+
SERVICE)
+
database.insert_mapping(self.db, repost['id'], internal_id)
+
+
def __delete_repost(self, repost_id: str) -> tuple[models.AppBskyActorDefs.ProfileViewDetailed | None, dict | None]:
+
login = self.__check_login()
+
+
repost = database.find_post(self.db, repost_id, self.input.user_id, self.input.service)
+
if not repost:
+
return None, None
+
+
mappings = database.find_mappings(self.db, repost['id'], SERVICE, login.did)
+
if mappings:
+
LOGGER.info("Deleting '%s'...", mappings[0][0])
+
self.bsky.unrepost(json.loads(mappings[0][0])['uri'])
+
database.delete_post(self.db, mappings[0][0], login.did, SERVICE)
+
return login, repost
+
+
def delete_repost(self, repost_id: str):
+
self.__delete_repost(repost_id)
+
+
+8
main.py
···
"""
)
+
columns = db_worker.execute("PRAGMA table_info(posts)")
+
column_names = [col[1] for col in columns]
+
if "reposted_id" not in column_names:
+
db_worker.execute("""
+
ALTER TABLE posts
+
ADD COLUMN reposted_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL
+
""")
+
# create the mappings table
# original_post_id - the post this was mapped from
# mapped_post_id - the post this was mapped to
+36 -12
mastodon/input.py
···
if (status.get('account') or {})['id'] != self.user_id:
return
-
if status.get('reblog') or (status.get('quote_id') or status.get('quote')) or status.get('poll'):
-
# TODO polls not supported on bsky. maybe 3rd party? skip for now
-
# we don't handle reblogs. possible with bridgy(?) and self
-
# we don't handle quotes.
-
LOGGER.info("Skipping '%s'! Reblog, quote or poll..", status['id'])
+
if status.get('visibility') not in self.options.allowed_visibility:
+
# Skip f/o and direct posts
+
LOGGER.info("Skipping '%s'! '%s' visibility..", status['id'], status.get('visibility'))
+
return
+
+
# TODO polls not supported on bsky. maybe 3rd party? skip for now
+
# we don't handle reblogs. possible with bridgy(?) and self
+
# we don't handle quotes.
+
if status.get('poll'):
+
LOGGER.info("Skipping '%s'! Contains a poll..", status['id'])
+
return
+
+
if status.get('quote_id') or status.get('quote'):
+
LOGGER.info("Skipping '%s'! Quote..", status['id'])
+
return
+
+
reblog: dict | None = status.get('reblog')
+
if reblog:
+
if (reblog.get('account') or {})['id'] != self.user_id:
+
LOGGER.info("Skipping '%s'! Reblog of other user..", status['id'])
+
return
+
+
success = database.try_insert_repost(self.db, status['id'], reblog['id'], self.user_id, self.service)
+
if not success:
+
LOGGER.info("Skipping '%s' as reblogged post was not found in db!", status['id'])
+
return
+
+
for output in outputs:
+
output.accept_repost(status['id'], reblog['id'])
return
in_reply: str | None = status.get('in_reply_to_id')
···
if in_reply_to and in_reply_to != self.user_id:
# We don't support replies.
LOGGER.info("Skipping '%s'! Reply to other user..", status['id'])
-
return
-
-
if status.get('visibility') not in self.options.allowed_visibility:
-
# Skip f/o and direct posts
-
LOGGER.info("Skipping '%s'! '%s' visibility..", status['id'], status.get('visibility'))
return
success = database.try_insert_post(self.db, status['id'], in_reply, self.user_id, self.service)
···
return
LOGGER.info("Deleting '%s'...", identifier)
-
for output in outputs:
-
output.delete_post(identifier)
+
if post['reposted_id']:
+
for output in outputs:
+
output.delete_repost(identifier)
+
else:
+
for output in outputs:
+
output.delete_post(identifier)
+
database.delete_post(self.db, identifier, self.user_id, self.service)
def _on_post(self, outputs: list[cross.Output], event: str, payload: str):
+45 -1
mastodon/output.py
···
'Authorization': f'Bearer {self.token}'
})
database.delete_post(self.db, mapping[0], self.service, self.user_id)
-
+
+
def accept_repost(self, repost_id: str, reposted_id: str):
+
repost = self.__delete_repost(repost_id)
+
if not repost:
+
return None
+
+
reposted = database.find_post(self.db, reposted_id, self.input.user_id, self.input.service)
+
if not reposted:
+
return
+
+
mappings = database.find_mappings(self.db, reposted['id'], self.service, self.user_id)
+
if mappings:
+
rsp = requests.post(f'{self.service}/api/v1/statuses/{mappings[0][0]}/reblog', headers={
+
'Authorization': f'Bearer {self.token}'
+
})
+
+
if rsp.status_code != 200:
+
LOGGER.error("Failed to boost status! status_code: %s, msg: %s", rsp.status_code, rsp.content)
+
return
+
+
internal_id = database.insert_repost(
+
self.db,
+
rsp.json()['id'],
+
reposted['id'],
+
self.user_id,
+
self.service)
+
database.insert_mapping(self.db, repost['id'], internal_id)
+
+
def __delete_repost(self, repost_id: str) -> dict | None:
+
repost = database.find_post(self.db, repost_id, self.input.user_id, self.input.service)
+
if not repost:
+
return None
+
+
mappings = database.find_mappings(self.db, repost['id'], self.service, self.user_id)
+
reposted_mappings = database.find_mappings(self.db, repost['reposted_id'], self.service, self.user_id)
+
if mappings and reposted_mappings:
+
LOGGER.info("Deleting '%s'...", mappings[0][0])
+
requests.post(f'{self.service}/api/v1/statuses/{reposted_mappings[0][0]}/unreblog', headers={
+
'Authorization': f'Bearer {self.token}'
+
})
+
database.delete_post(self.db, mappings[0][0], self.user_id, self.service)
+
return repost
+
+
def delete_repost(self, repost_id: str):
+
self.__delete_repost(repost_id)
+27 -8
misskey/input.py
···
if note['userId'] != self.user_id:
return
-
if note.get('renoteId') or note.get('poll'):
-
# TODO polls not supported on bsky. maybe 3rd party? skip for now
-
# we don't handle reblogs. possible with bridgy(?) and self
-
LOGGER.info("Skipping '%s'! Renote or poll..", note['id'])
+
if note.get('visibility') not in self.options.allowed_visibility:
+
LOGGER.info("Skipping '%s'! '%s' visibility..", note['id'], note.get('visibility'))
+
return
+
+
# TODO polls not supported on bsky. maybe 3rd party? skip for now
+
# we don't handle reblogs. possible with bridgy(?) and self
+
if note.get('poll'):
+
LOGGER.info("Skipping '%s'! Contains a poll..", note['id'])
+
return
+
+
renote: dict | None = note.get('renote')
+
if renote:
+
if note.get('text') is not None:
+
LOGGER.info("Skipping '%s'! Quote..", note['id'])
+
return
+
+
if renote.get('userId') != self.user_id:
+
LOGGER.info("Skipping '%s'! Reblog of other user..", note['id'])
+
return
+
+
success = database.try_insert_repost(self.db, note['id'], renote['id'], self.user_id, self.service)
+
if not success:
+
LOGGER.info("Skipping '%s' as renoted note was not found in db!", note['id'])
+
return
+
+
for output in outputs:
+
output.accept_repost(note['id'], renote['id'])
return
reply_id: str | None = note.get('replyId')
···
if note.get('reply', {}).get('userId') != self.user_id:
LOGGER.info("Skipping '%s'! Reply to other user..", note['id'])
return
-
-
if note.get('visibility') not in self.options.allowed_visibility:
-
LOGGER.info("Skipping '%s'! '%s' visibility..", note['id'], note.get('visibility'))
-
return
success = database.try_insert_post(self.db, note['id'], reply_id, self.user_id, self.service)
if not success:
+66 -43
util/database.py
···
with self.lock:
self.conn.close()
+
def try_insert_repost(
+
db: DataBaseWorker,
+
post_id: str,
+
reposted_id: str,
+
input_user: str,
+
input_service: str) -> bool:
+
+
reposted = find_post(db, reposted_id, input_user, input_service)
+
if not reposted:
+
return False
+
+
insert_repost(db, post_id, reposted['id'], input_user, input_service)
+
return True
+
+
def try_insert_post(
db: DataBaseWorker,
post_id: str,
···
insert_post(db, post_id, input_user, input_service)
return True
-
-
def find_mapped_thread(
-
db: DataBaseWorker,
-
parent_id: str,
-
input_user: str,
-
input_service: str,
-
output_user: str,
-
output_service: str):
-
-
reply_data: dict | None = find_post(db, parent_id, input_user, input_service)
-
if not reply_data:
-
return None
-
-
reply_mappings: list[str] | None = find_mappings(db, reply_data['id'], output_service, output_user)
-
if not reply_mappings:
-
return None
-
-
reply_identifier: str = reply_mappings[-1]
-
root_identifier: str = reply_mappings[0]
-
if reply_data['root_id']:
-
root_data = find_post_by_id(db, reply_data['root_id'])
-
if not root_data:
-
return None
-
-
root_mappings = find_mappings(db, reply_data['root_id'], output_service, output_user)
-
if not root_mappings:
-
return None
-
root_identifier = root_mappings[0]
-
-
return (
-
root_identifier[0], # real ids
-
reply_identifier[0],
-
reply_data['root_id'], # db ids
-
reply_data['id']
-
)
-
+
def insert_repost(db: DataBaseWorker, identifier: str, reposted_id: int, user_id: str, serivce: str) -> int:
+
db.execute(
+
"""
+
INSERT INTO posts (user_id, service, identifier, reposted_id)
+
VALUES (?, ?, ?, ?);
+
""", (user_id, serivce, identifier, reposted_id))
+
return db.execute("SELECT last_insert_rowid();", ())[0][0]
def insert_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str) -> int:
db.execute(
···
def find_post_by_id(db: DataBaseWorker, id: int) -> dict | None:
result = db.execute(
"""
-
SELECT user_id, service, identifier, parent_id, root_id
+
SELECT user_id, service, identifier, parent_id, root_id, reposted_id
FROM posts
WHERE id = ?
""", (id,))
if not result:
return None
-
user_id, service, identifier, parent_id, root_id = result[0]
+
user_id, service, identifier, parent_id, root_id, reposted_id = result[0]
return {
'user_id': user_id,
'service': service,
'identifier': identifier,
'parent_id': parent_id,
-
'root_id': root_id
+
'root_id': root_id,
+
'reposted_id': reposted_id
}
def find_post(db: DataBaseWorker, identifier: str, user_id: str, service: str) -> dict | None:
result = db.execute(
"""
-
SELECT id, parent_id, root_id
+
SELECT id, parent_id, root_id, reposted_id
FROM posts
WHERE identifier = ?
AND user_id = ?
···
""", (identifier, user_id, service))
if not result:
return None
-
id, parent_id, root_id = result[0]
+
id, parent_id, root_id, reposted_id = result[0]
return {
'id': id,
'parent_id': parent_id,
-
'root_id': root_id
-
}
+
'root_id': root_id,
+
'reposted_id': reposted_id
+
}
+
+
def find_mapped_thread(
+
db: DataBaseWorker,
+
parent_id: str,
+
input_user: str,
+
input_service: str,
+
output_user: str,
+
output_service: str):
+
+
reply_data: dict | None = find_post(db, parent_id, input_user, input_service)
+
if not reply_data:
+
return None
+
+
reply_mappings: list[str] | None = find_mappings(db, reply_data['id'], output_service, output_user)
+
if not reply_mappings:
+
return None
+
+
reply_identifier: str = reply_mappings[-1]
+
root_identifier: str = reply_mappings[0]
+
if reply_data['root_id']:
+
root_data = find_post_by_id(db, reply_data['root_id'])
+
if not root_data:
+
return None
+
+
root_mappings = find_mappings(db, reply_data['root_id'], output_service, output_user)
+
if not root_mappings:
+
return None
+
root_identifier = root_mappings[0]
+
+
return (
+
root_identifier[0], # real ids
+
reply_identifier[0],
+
reply_data['root_id'], # db ids
+
reply_data['id']
+
)