social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

add pds input and mastodon api output

zenfyr.dev 619099ff 713b411d

verified
+31 -1
atproto2.py
···
from typing import Any
-
from atproto import client_utils, Client, AtUri
+
from atproto import client_utils, Client, AtUri, IdResolver
from atproto_client import models
+
from util import LOGGER
+
+
def resolve_identity(
+
handle: str | None = None,
+
did: str | None = None,
+
pds: str | None = None):
+
"""helper to try and resolve identity from provided parameters, a valid handle is enough"""
+
+
if did and pds:
+
return did, pds[:-1] if pds.endswith('/') else pds
+
+
resolver = IdResolver()
+
if not did:
+
if not handle:
+
raise Exception("ATP handle not specified!")
+
LOGGER.info("Resolving ATP identity for %s...", handle)
+
did = resolver.handle.resolve(handle)
+
if not did:
+
raise Exception("Failed to resolve DID!")
+
+
if not pds:
+
LOGGER.info("Resolving PDS from DID document...")
+
did_doc = resolver.did.resolve(did)
+
if not did_doc:
+
raise Exception("Failed to resolve DID doc for '%s'", did)
+
pds = did_doc.get_pds_endpoint()
+
if not pds:
+
raise Exception("Failed to resolve PDS!")
+
+
return did, pds[:-1] if pds.endswith('/') else pds
class Client2(Client):
def __init__(self, base_url: str | None = None, *args: Any, **kwargs: Any) -> None:
+344 -44
bluesky.py
···
-
from atproto import client_utils, IdResolver, Request
+
from atproto import client_utils, Request, AsyncFirehoseSubscribeReposClient, CAR, CID
from atproto_client import models
-
from atproto2 import Client2
+
from atproto_client.models.utils import get_or_create as get_model_or_create
+
from atproto_client.models.blob_ref import BlobRef
+
from atproto_firehose import models as firehose_models, parse_subscribe_repos_message as parse_firehose
+
from atproto2 import Client2, resolve_identity
from httpx import Timeout
import json
import cross
···
import media_util
from util import LOGGER
import re
+
from typing import Callable, Any
# only for lexicon reference
SERVICE = 'https://bsky.app'
···
ADULT_PATTERN = re.compile(r"\b(sexual content|nsfw|erotic|adult only|18\+)\b", re.IGNORECASE)
PORN_PATTERN = re.compile(r"\b(porn|yiff|hentai|pornographic|fetish)\b", re.IGNORECASE)
+
def tokenize_post(post: dict) -> list[cross.Token]:
+
text: str = post.get('text', '')
+
if not text:
+
return []
+
text = text.encode(encoding='utf-8').decode(encoding='utf-8')
+
+
facets: list[dict] = post.get('facets', [])
+
if not facets:
+
return [cross.TextToken(text)]
+
+
slices: list[tuple[int, int, str, str]] = []
+
+
for facet in facets:
+
features: list[dict] = facet.get('features', [])
+
if not features:
+
continue
+
+
# we don't support overlapping facets/features
+
feature = features[0]
+
feature_type = feature['$type']
+
index = facet['index']
+
if feature_type == 'app.bsky.richtext.facet#tag':
+
slices.append((index['byteStart'], index['byteEnd'], 'tag', feature['tag']))
+
elif feature_type == 'app.bsky.richtext.facet#link':
+
slices.append((index['byteStart'], index['byteEnd'], 'link', feature['uri']))
+
elif feature_type == 'app.bsky.richtext.facet#mention':
+
slices.append((index['byteStart'], index['byteEnd'], 'mention', feature['did']))
+
+
if not slices:
+
return [cross.TextToken(text)]
+
+
slices.sort(key=lambda s: s[0])
+
unique: list[tuple[int, int, str, str]] = []
+
current_end = 0
+
for start, end, ttype, val in slices:
+
if start >= current_end:
+
unique.append((start, end, ttype, val))
+
current_end = end
+
+
if not unique:
+
return [cross.TextToken(text)]
+
+
tokens: list[cross.Token] = []
+
prev = 0
+
+
for start, end, ttype, val in unique:
+
if start > prev:
+
# text between facets
+
tokens.append(cross.TextToken(text[prev:start]))
+
# facet token
+
if ttype == 'link':
+
label = text[start:end]
+
+
# try to unflatten links
+
split = val.split('://')
+
if len(split) > 1:
+
if split[1].startswith(label):
+
tokens.append(cross.LinkToken(val, ''))
+
elif label.endswith('...') and split[1].startswith(label[:-3]):
+
tokens.append(cross.LinkToken(val, ''))
+
else:
+
tokens.append(cross.LinkToken(val, label))
+
elif ttype == 'tag':
+
tokens.append(cross.TagToken(val))
+
elif ttype == 'mention':
+
tokens.append(cross.MentionToken(text[start:end], val))
+
prev = end
+
+
if prev < len(text):
+
tokens.append(cross.TextToken(text[prev:]))
+
+
for t in tokens:
+
print(t.__dict__)
+
+
return tokens
+
+
class BlueskyPost(cross.Post):
+
def __init__(self, pds_url: str, did: str, post: dict) -> None:
+
super().__init__()
+
self.post = post
+
self.tokens = tokenize_post(post)
+
+
self.id = json.dumps(self.post['$xpost.strongRef'], sort_keys=True)
+
+
self.parent_id = None
+
if self.post.get('reply'):
+
self.parent_id = json.dumps(self.post['reply']['parent'], sort_keys=True)
+
+
labels = self.post.get('labels', {}).get('values')
+
self.cw = ''
+
if labels:
+
self.cw = ', '.join([str(label['val']).replace('-', ' ') for label in labels])
+
+
def get_blob_url(blob: str):
+
nonlocal pds_url, did
+
return f'{pds_url}/xrpc/com.atproto.sync.getBlob?did={did}&cid={blob}'
+
+
attachments: list[cross.MediaAttachment] = []
+
embed = self.post.get('embed', {})
+
if embed.get('$type') == 'app.bsky.embed.images':
+
model = get_model_or_create(embed, model=models.AppBskyEmbedImages.Main)
+
assert isinstance(model, models.AppBskyEmbedImages.Main)
+
+
for image in model.images:
+
attachments.append(BlueskyAttachment(
+
get_blob_url(image.image.cid.encode()),
+
'image', image.alt
+
))
+
elif embed.get('$type') == 'app.bsky.embed.video':
+
model = get_model_or_create(embed, model=models.AppBskyEmbedVideo.Main)
+
assert isinstance(model, models.AppBskyEmbedVideo.Main)
+
+
attachments.append(BlueskyAttachment(
+
get_blob_url(model.video.cid.encode()),
+
'video', model.alt if model.alt else ''
+
))
+
self.attachments = attachments
+
+
def get_tokens(self) -> list[cross.Token]:
+
return self.tokens
+
+
def get_parent_id(self) -> str | None:
+
return self.parent_id
+
+
def get_post_date_iso(self) -> str:
+
return self.post.get('createdAt') or super().get_post_date_iso()
+
+
def get_cw(self) -> str:
+
return self.cw or ''
+
+
def get_id(self) -> str:
+
return self.id
+
+
def get_languages(self) -> list[str]:
+
return self.post.get('langs', []) or []
+
+
def is_sensitive(self) -> bool:
+
return self.post.get('labels', {}).get('values') or False
+
+
def get_attachments(self) -> list[cross.MediaAttachment]:
+
return self.attachments or []
+
+
class BlueskyAttachment(cross.MediaAttachment):
+
def __init__(self, url: str, type: str, alt: str) -> None:
+
super().__init__()
+
self.url = url
+
self.type = type
+
self.alt = alt
+
+
def get_url(self) -> str:
+
return self.url
+
+
def get_type(self) -> str | None:
+
return self.type
+
+
def create_meta(self, bytes: bytes) -> cross.MediaMeta:
+
o_meta = media_util.get_media_meta(bytes)
+
return cross.MediaMeta(o_meta['width'], o_meta['height'], o_meta.get('duration', -1))
+
+
def get_alt(self) -> str:
+
return self.alt
+
+
class BlueskyInput(cross.Input):
+
def __init__(self, settings: dict, db: DataBaseWorker) -> None:
+
self.options = settings.get('options', {})
+
did, pds = resolve_identity(
+
handle=util.as_envvar(settings.get('hanlde')),
+
did=util.as_envvar(settings.get('did')),
+
pds=util.as_envvar(settings.get('pds'))
+
)
+
self.pds = pds
+
+
# PDS is Not a service, the lexicon and rids are the same across pds
+
super().__init__(SERVICE, did, settings, db)
+
+
def _on_post(self, outputs: list[cross.Output], post: dict[str, Any]):
+
post_ref = json.dumps(post['$xpost.strongRef'], sort_keys=True)
+
+
parent_ref = None
+
if post.get('reply'):
+
parent_ref = json.dumps(post['reply']['parent'], sort_keys=True)
+
+
root_id = None
+
parent_id = None
+
if parent_ref:
+
parent_post = database.find_post(self.db, parent_ref, self.user_id, self.service)
+
if not parent_post:
+
LOGGER.info("Skipping '%s' as parent post was not found in db!", post_ref)
+
return
+
+
root_id = parent_post['id']
+
parent_id = root_id
+
if parent_post['root_id']:
+
root_id = parent_post['root_id']
+
+
LOGGER.info("Crossposting '%s'...", post_ref)
+
if root_id and parent_id:
+
database.insert_reply(
+
self.db,
+
post_ref,
+
self.user_id,
+
self.service,
+
parent_id,
+
root_id
+
)
+
else:
+
database.insert_post(
+
self.db,
+
post_ref,
+
self.user_id,
+
self.service
+
)
+
+
cross_post = BlueskyPost(self.pds, self.user_id, post)
+
for output in outputs:
+
output.accept_post(cross_post)
+
return
+
+
def _on_delete_post(self, outputs: list[cross.Output], post_id: dict):
+
identifier = json.dumps(post_id, sort_keys=True)
+
post = database.find_post(self.db, identifier, self.user_id, self.service)
+
if not post:
+
return
+
+
LOGGER.info("Deleting '%s'...", identifier)
+
for output in outputs:
+
output.delete_post(identifier)
+
database.delete_post(self.db, identifier, self.user_id, self.service)
+
+
class BlueskyPdsInput(BlueskyInput):
+
def __init__(self, settings: dict, db: DataBaseWorker) -> None:
+
super().__init__(settings, db)
+
+
def __on_commit(self, outputs: list[cross.Output], message: firehose_models.MessageFrame):
+
blocks = message.body.get('blocks')
+
if not blocks:
+
return
+
+
parsed = parse_firehose(message)
+
if not isinstance(parsed, models.ComAtprotoSyncSubscribeRepos.Commit):
+
return
+
blocks = parsed.blocks
+
+
car = None
+
def get_lazy_repo() -> CAR:
+
nonlocal car, blocks
+
+
if isinstance(blocks, str):
+
blocks = blocks.encode()
+
assert blocks
+
+
if car:
+
return car
+
car = CAR.from_bytes(blocks)
+
return car
+
+
for op in parsed.ops:
+
if op.action == 'delete':
+
if not op.prev:
+
continue
+
+
if not op.path.startswith('app.bsky.feed.post'):
+
continue
+
+
self._on_delete_post(outputs, {
+
'cid': op.prev.encode(),
+
'uri': f'at://{parsed.repo}/{op.path}'
+
})
+
continue
+
+
if op.action != 'create':
+
continue
+
+
if not op.cid:
+
continue
+
+
record_data = get_lazy_repo().blocks.get(op.cid)
+
if not record_data:
+
continue
+
+
record_dict = dict(record_data)
+
record_dict['$xpost.strongRef'] = {
+
'cid': op.cid.encode(),
+
'uri': f'at://{parsed.repo}/{op.path}'
+
}
+
if record_dict['$type'] == 'app.bsky.feed.post':
+
self._on_post(outputs, record_dict)
+
+
+
async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
+
streaming: str = f"wss://{self.pds.split("://", 1)[1]}/xrpc"
+
+
client = AsyncFirehoseSubscribeReposClient(base_uri=streaming)
+
+
async def on_message(message: firehose_models.MessageFrame):
+
if message.header.t != '#commit':
+
return
+
+
if message.body.get('repo') != self.user_id:
+
return
+
+
if message.body.get('tooBig'):
+
LOGGER.error("#commit message is tooBig!")
+
return
+
+
submit(lambda: self.__on_commit(outputs, message))
+
return
+
+
LOGGER.info("Listening to %s...", streaming + '/com.atproto.sync.subscribeRepos')
+
await client.start(on_message)
+
class BlueskyOutput(cross.Output):
def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
super().__init__(input, settings, db)
···
if not util.as_envvar(settings.get('app-password')):
raise Exception("Account app password not provided!")
-
resolver = IdResolver()
-
did: str | None = util.as_envvar(settings.get('did'))
-
if not did:
-
handle = util.as_envvar(settings.get('handle'))
-
if not handle:
-
raise Exception("ATP handle not specified!")
-
LOGGER.info("Resolving ATP identity for %s...", handle)
-
did = resolver.handle.resolve(handle)
-
if not did:
-
raise Exception("Failed to resolve DID!")
-
-
pds: str | None = util.as_envvar(settings.get('pds'))
-
if not pds:
-
LOGGER.info("Resolving PDS from DID document...")
-
did_doc = resolver.did.resolve(did)
-
if not did_doc:
-
raise Exception("Failed to resolve DID doc for '%s'", did)
-
pds = did_doc.get_pds_endpoint()
-
if not pds:
-
raise Exception("Failed to resolve PDS!")
+
did, pds = resolve_identity(
+
handle=util.as_envvar(settings.get('hanlde')),
+
did=util.as_envvar(settings.get('did')),
+
pds=util.as_envvar(settings.get('pds'))
+
)
reqs = Request(timeout=Timeout(None, connect=30.0))
···
if not login:
raise Exception("Client not logged in!")
-
reply_data = database.find_post(self.db, parent_id, self.input.user_id, self.input.service)
-
assert reply_data, "reply_data requested, but doesn't exist in db (should've been skipped bt firehose)"
-
-
reply_mappings = [json.loads(data[0]) for data in database.find_mappings(self.db, reply_data['id'], SERVICE, login.did)]
-
if not reply_mappings:
-
LOGGER.error("Failed to find mappings for a post in the db!")
+
thread_tuple = database.find_mapped_thread(
+
self.db,
+
parent_id,
+
self.input.user_id,
+
self.input.service,
+
login.did,
+
SERVICE
+
)
+
+
if not thread_tuple:
+
LOGGER.error("Failed to find thread tuple in the database!")
return None
-
-
reply_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_mappings[-1]['uri']), cid=str(reply_mappings[-1]['cid']))
-
root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_mappings[0]['uri']), cid=str(reply_mappings[0]['cid']))
-
if reply_data['root_id']:
-
root_data = database.find_post_by_id(self.db, reply_data['root_id'])
-
assert root_data, "root_data requested but doesn't exist in db"
-
-
root_mappings = [json.loads(data[0]) for data in database.find_mappings(self.db, reply_data['root_id'], SERVICE, login.did)]
-
if not root_mappings:
-
LOGGER.error("Failed to find mappings for a post in the db!")
-
return None
-
root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(root_mappings[0]['uri']), cid=str(root_mappings[0]['cid']))
+
+
root_ref = json.loads(thread_tuple[0])
+
reply_ref = json.loads(thread_tuple[1])
+
+
root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(root_ref['uri']), cid=str(root_ref['cid']))
+
reply_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_ref['uri']), cid=str(reply_ref['cid']))
return (
models.create_strong_ref(root_record),
models.create_strong_ref(reply_record),
-
reply_data['root_id'],
-
reply_data['id']
+
thread_tuple[2],
+
thread_tuple[3]
)
def _split_attachments(self, attachments: list[cross.MediaAttachment]):
···
return
root_ref, reply_ref, new_root_id, new_parent_id = parents
-
tokens = post.get_tokens()
+
tokens = post.get_tokens().copy()
unique_labels: set[str] = set()
cw = post.get_cw()
···
tokens.append(cross.TextToken(' '))
-
split_tokens: list[list[cross.Token]] = util.split_tokens(post.get_tokens(), 300)
+
split_tokens: list[list[cross.Token]] = util.split_tokens(tokens, 300)
post_text: list[client_utils.TextBuilder] = []
# convert tokens into rich text. skip post if contains unsupported tokens
···
mappings = database.find_mappings(self.db, post['id'], SERVICE, login.did)
for mapping in mappings[::-1]:
+
LOGGER.info("Deleting '%s'...", mapping[0])
self.bsky.delete_post(json.loads(mapping[0])['uri'])
database.delete_post(self.db, mapping[0], SERVICE, login.did)
+1 -1
cross.py
···
self.settings = settings
self.db = db
-
async def listen(self, handler: Callable[[Post], Any]):
+
async def listen(self, outputs: list, handler: Callable[[Post], Any]):
pass
class Output():
+36
database.py
···
with self.lock:
self.conn.close()
+
def find_mapped_thread(
+
db: DataBaseWorker,
+
parent_id: str,
+
input_user: str,
+
input_service: str,
+
output_user: str,
+
output_service: str):
+
+
reply_data: dict | None = find_post(db, parent_id, input_user, input_service)
+
if not reply_data:
+
return None
+
+
reply_mappings: list[str] | None = find_mappings(db, reply_data['id'], output_service, output_user)
+
if not reply_mappings:
+
return None
+
+
reply_identifier: str = reply_mappings[-1]
+
root_identifier: str = reply_mappings[0]
+
if reply_data['root_id']:
+
root_data = find_post_by_id(db, reply_data['root_id'])
+
if not root_data:
+
return None
+
+
root_mappings = find_mappings(db, reply_data['root_id'], output_service, output_user)
+
if not root_mappings:
+
return None
+
root_identifier = root_mappings[0]
+
+
return (
+
root_identifier[0], # real ids
+
reply_identifier[0],
+
reply_data['root_id'], # db ids
+
reply_data['id']
+
)
+
+
def insert_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str) -> int:
db.execute(
"""
+10 -4
main.py
···
INPUTS = {
"mastodon-wss": lambda settings, db: mastodon.MastodonInput(settings, db),
-
"misskey-wss": lambda settigs, db: misskey.MisskeyInput(settigs, db)
+
"misskey-wss": lambda settigs, db: misskey.MisskeyInput(settigs, db),
+
"bluesky-pds-wss": lambda settings, db: bluesky.BlueskyPdsInput(settings, db)
}
OUTPUTS = {
-
"bluesky": lambda input, settings, db: bluesky.BlueskyOutput(input, settings, db)
+
"bluesky": lambda input, settings, db: bluesky.BlueskyOutput(input, settings, db),
+
"mastodon": lambda input, settings, db: mastodon.MastodonOutput(input, settings, db)
}
def execute(data_dir):
···
input = INPUTS[input_settings['type']](input_settings, db_worker)
+
if not outputs_settings:
+
LOGGER.warning("No outputs specified! Check your config!")
+
outputs: list[cross.Output] = []
for output_settings in outputs_settings:
outputs.append(OUTPUTS[output_settings['type']](input, output_settings, db_worker))
···
try:
task()
-
queue.task_done()
except Exception as e:
LOGGER.error(f"Exception in worker thread!\n{e}")
traceback.print_exc()
+
finally:
+
queue.task_done()
task_queue = queue.Queue()
-
thread = threading.Thread(target=worker, args=(task_queue,))
+
thread = threading.Thread(target=worker, args=(task_queue,), daemon=True)
thread.start()
LOGGER.info('Connecting to %s...', input.service)
+333 -1
mastodon.py
···
import database
from database import DataBaseWorker
from typing import Callable, Any
-
import asyncio
+
import asyncio, time
+
import magic
from bs4 import BeautifulSoup, Tag
from bs4.element import NavigableString
···
LOGGER.error(e, stack_info=True, exc_info=True)
LOGGER.info("Reconnecting to %s...", self.streaming)
continue
+
+
class MastodonOutput(cross.Output):
+
def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
+
super().__init__(input, settings, db)
+
self.options = settings.get('options') or {}
+
self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
+
instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
+
+
self.service = instance[:-1] if instance.endswith('/') else instance
+
+
LOGGER.info("Verifying %s credentails...", self.service)
+
responce = requests.get(f"{self.service}/api/v1/accounts/verify_credentials", headers={
+
'Authorization': f'Bearer {self.token}'
+
})
+
if responce.status_code != 200:
+
LOGGER.error("Failed to validate user credentials!")
+
responce.raise_for_status()
+
return
+
self.user_id: str = responce.json()["id"]
+
+
LOGGER.info("Getting %s configuration...", self.service)
+
responce = requests.get(f"{self.service}/api/v1/instance", headers={
+
'Authorization': f'Bearer {self.token}'
+
})
+
if responce.status_code != 200:
+
LOGGER.error("Failed to get instance info!")
+
responce.raise_for_status()
+
return
+
+
instance_info: dict = responce.json()
+
configuration: dict = instance_info['configuration']
+
+
statuses_config: dict = configuration.get('statuses', {})
+
self.max_characters: int = statuses_config.get('max_characters', 500)
+
self.max_media_attachments: int = statuses_config.get('max_media_attachments', 4)
+
self.characters_reserved_per_url: int = statuses_config.get('characters_reserved_per_url', 23)
+
+
media_config: dict = configuration.get('media_attachments', {})
+
self.image_size_limit: int = media_config.get('image_size_limit', 16777216)
+
self.video_size_limit: int = media_config.get('video_size_limit', 103809024)
+
self.supported_mime_types: list[str] = media_config.get('supported_mime_types', [
+
'audio/ogg',
+
'image/jpeg',
+
'image/png',
+
'video/mp4'
+
])
+
+
# *oma max post chars
+
max_toot_chars = instance_info.get('max_toot_chars')
+
if max_toot_chars:
+
self.max_characters: int = max_toot_chars
+
+
# *oma max upload limit
+
upload_limit = instance_info.get('upload_limit')
+
if upload_limit:
+
self.image_size_limit: int = upload_limit
+
self.video_size_limit: int = upload_limit
+
+
self.text_format = 'text/plain'
+
pleroma = instance_info.get('pleroma')
+
if pleroma:
+
post_formats: list[str] = pleroma.get('metadata', {}).get('post_formats', [])
+
if 'text/x.misskeymarkdown' in post_formats:
+
self.text_format = 'text/x.misskeymarkdown'
+
elif 'text/markdown' in post_formats:
+
self.text_format = 'text/markdown'
+
+
def upload_media(self, attachments: list[cross.MediaAttachment]) -> list[str] | None:
+
prepare: list[tuple[str, str, bytes]] = []
+
+
for attachment in attachments:
+
alt = attachment.get_alt()
+
mbytes: bytes | None
+
+
if attachment.get_type() == 'image':
+
mbytes = media_util.download_blob(attachment.get_url(), self.image_size_limit)
+
elif attachment.get_type() in {'video', 'gif'}:
+
mbytes = media_util.download_blob(attachment.get_url(), self.video_size_limit)
+
else:
+
mbytes = media_util.download_blob(attachment.get_url(), 7_000_000)
+
+
if not mbytes:
+
return None
+
+
filename = media_util.get_filename_from_url(attachment.get_url())
+
LOGGER.info("Downloaded %s", filename)
+
prepare.append((filename, alt, mbytes))
+
+
uploads: list[dict] = []
+
+
for name, desc, bbytes in prepare:
+
mime_type = magic.Magic(mime=True).from_buffer(bbytes)
+
if not mime_type:
+
mime_type = 'application/octet-stream'
+
+
files = {
+
'file': (name, bbytes, mime_type)
+
}
+
data = {}
+
if desc:
+
data['description'] = desc
+
+
req = requests.post(f"{self.service}/api/v2/media", headers= {
+
'Authorization': f'Bearer {self.token}'
+
}, files=files, data=data)
+
+
if req.status_code == 200:
+
LOGGER.info("Uploaded %s! (%s)", name, req.json()['id'])
+
uploads.append({
+
'done': True,
+
'id': req.json()['id']
+
})
+
elif req.status_code == 202:
+
LOGGER.info("Waiting for %s to process!", name)
+
uploads.append({
+
'done': False,
+
'id': req.json()['id']
+
})
+
else:
+
LOGGER.error("Failes to download %s! %s", name, req.text)
+
req.raise_for_status()
+
+
while any([not val['done'] for val in uploads]):
+
LOGGER.info("Waiting for media to process...")
+
time.sleep(3)
+
for media in uploads:
+
if media['done']:
+
continue
+
+
reqs = requests.get(f'{self.service}/api/v1/media/{media['id']}', headers={
+
'Authorization': f'Bearer {self.token}'
+
})
+
+
if reqs.status_code == 206:
+
continue
+
+
if reqs.status_code == 200:
+
media['done'] = True
+
continue
+
reqs.raise_for_status()
+
+
return [val['id'] for val in uploads]
+
+
def token_to_string(self, tokens: list[cross.Token]) -> str | None:
+
p_text: str = ''
+
+
for token in tokens:
+
if isinstance(token, cross.TextToken):
+
p_text += token.text
+
elif isinstance(token, cross.TagToken):
+
p_text += '#' + token.tag
+
elif isinstance(token, cross.LinkToken):
+
if util.canonical_label(token.label, token.href):
+
p_text += token.href
+
else:
+
if self.text_format == 'text/plain':
+
p_text += f'{token.label}: {token.href}'
+
elif self.text_format in {'text/x.misskeymarkdown', 'text/markdown'}:
+
p_text += f'[{token.label}]({token.href})'
+
else:
+
return None
+
+
return p_text
+
+
def split_tokens_media(self, tokens: list[cross.Token], media: list[cross.MediaAttachment]):
+
split_tokens = util.split_tokens(tokens, self.max_characters, self.characters_reserved_per_url)
+
post_text: list[str] = []
+
+
for block in split_tokens:
+
baked_text = self.token_to_string(block)
+
+
if baked_text is None:
+
return None
+
post_text.append(baked_text)
+
+
if not post_text:
+
post_text = ['']
+
+
posts: list[dict] = [{"text": post_text, "attachments": []} for post_text in post_text]
+
available_indices: list[int] = list(range(len(posts)))
+
+
current_image_post_idx: int | None = None
+
+
def make_blank_post() -> dict:
+
return {
+
"text": '',
+
"attachments": []
+
}
+
+
def pop_next_empty_index() -> int:
+
if available_indices:
+
return available_indices.pop(0)
+
else:
+
new_idx = len(posts)
+
posts.append(make_blank_post())
+
return new_idx
+
+
for att in media:
+
if (
+
current_image_post_idx is not None
+
and len(posts[current_image_post_idx]["attachments"]) < self.max_media_attachments
+
):
+
posts[current_image_post_idx]["attachments"].append(att)
+
else:
+
idx = pop_next_empty_index()
+
posts[idx]["attachments"].append(att)
+
current_image_post_idx = idx
+
+
result: list[tuple[str, list[cross.MediaAttachment]]] = []
+
+
for p in posts:
+
result.append((p['text'], p["attachments"]))
+
+
return result
+
+
def accept_post(self, post: cross.Post):
+
parent_id = post.get_parent_id()
+
+
new_root_id: int | None = None
+
new_parent_id: int | None = None
+
+
reply_ref: str | None = None
+
if parent_id:
+
thread_tuple = database.find_mapped_thread(
+
self.db,
+
parent_id,
+
self.input.user_id,
+
self.input.service,
+
self.user_id,
+
self.service
+
)
+
+
if not thread_tuple:
+
LOGGER.error("Failed to find thread tuple in the database!")
+
return None
+
+
_, reply_ref, new_root_id, new_parent_id = thread_tuple
+
+
lang: str
+
if post.get_languages():
+
lang = post.get_languages()[0]
+
else:
+
lang = 'en'
+
+
raw_statuses = self.split_tokens_media(post.get_tokens(), post.get_attachments())
+
if not raw_statuses:
+
LOGGER.error("Failed to split post into statuses?")
+
return None
+
baked_statuses = []
+
+
for status, raw_media in raw_statuses:
+
media: list[str] | None = None
+
if raw_media:
+
media = self.upload_media(raw_media)
+
if not media:
+
LOGGER.error("Failed to upload attachments!")
+
return None
+
baked_statuses.append((status, media))
+
continue
+
baked_statuses.append((status,[]))
+
+
created_statuses: list[str] = []
+
+
for status, media in baked_statuses:
+
payload = {
+
'status': status,
+
'media_ids': media or [],
+
'spoiler_text': post.get_cw(),
+
'visibility': 'unlisted',
+
'content_type': self.text_format,
+
'language': lang
+
}
+
+
if media:
+
payload['sensitive'] = post.is_sensitive()
+
+
if reply_ref:
+
payload['in_reply_to_id'] = reply_ref
+
+
reqs = requests.post(f'{self.service}/api/v1/statuses', headers={
+
'Authorization': f'Bearer {self.token}',
+
'Content-Type': 'application/json'
+
}, json=payload)
+
+
if reqs.status_code != 200:
+
LOGGER.info("Failed to post status! %s - %s", reqs.status_code, reqs.text)
+
reqs.raise_for_status()
+
+
reply_ref = reqs.json()['id']
+
LOGGER.info("Created new status %s!", reply_ref)
+
+
created_statuses.append(reqs.json()['id'])
+
+
db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service)
+
assert db_post, "ghghghhhhh"
+
+
if new_root_id is None or new_parent_id is None:
+
new_root_id = database.insert_post(
+
self.db,
+
created_statuses[0],
+
self.user_id,
+
self.service
+
)
+
new_parent_id = new_root_id
+
database.insert_mapping(self.db, db_post['id'], new_parent_id)
+
created_statuses = created_statuses[1:]
+
+
for db_id in created_statuses:
+
new_parent_id = database.insert_reply(
+
self.db,
+
db_id,
+
self.user_id,
+
self.service,
+
new_parent_id,
+
new_root_id
+
)
+
database.insert_mapping(self.db, db_post['id'], new_parent_id)
+
+
def delete_post(self, identifier: str):
+
post = database.find_post(self.db, identifier, self.input.user_id, self.input.service)
+
if not post:
+
return
+
+
mappings = database.find_mappings(self.db, post['id'], self.service, self.user_id)
+
for mapping in mappings[::-1]:
+
LOGGER.info("Deleting '%s'...", mapping[0])
+
requests.delete(f'{self.service}/api/v1/statuses/{mapping[0]}', headers={
+
'Authorization': f'Bearer {self.token}'
+
})
+
database.delete_post(self.db, mapping[0], self.service, self.user_id)
+
+1 -1
media_util.py
···
def download_blob(url: str, max_bytes: int = 5_000_000) -> bytes | None:
response = requests.get(url, stream=True, timeout=20)
if response.status_code != 200:
-
LOGGER.info("Failed to download %s! %s", url, response)
+
LOGGER.info("Failed to download %s! %s", url, response.text)
return None
downloaded_bytes = b""
+1
pyproject.toml
···
"bs4>=0.0.2",
"click>=8.2.1",
"html-to-markdown>=1.3.3",
+
"python-magic>=0.4.27",
"requests>=2.32.3",
"websockets>=13.1",
]
+2 -2
util.py
···
return False
-
def split_tokens(tokens: list[cross.Token], max_chars: int) -> list[list[cross.Token]]:
+
def split_tokens(tokens: list[cross.Token], max_chars: int, max_link_len: int = 35) -> list[list[cross.Token]]:
def start_new_block():
nonlocal current_block, blocks, current_length
if current_block:
···
elif isinstance(token, cross.LinkToken):
link_len = len(token.label)
if canonical_label(token.label, token.href):
-
link_len = min(link_len, 35)
+
link_len = min(link_len, max_link_len)
if current_length + link_len <= max_chars:
current_block.append(token)
+11
uv.lock
···
]
[[package]]
+
name = "python-magic"
+
version = "0.4.27"
+
source = { registry = "https://pypi.org/simple" }
+
sdist = { url = "https://files.pythonhosted.org/packages/da/db/0b3e28ac047452d079d375ec6798bf76a036a08182dbb39ed38116a49130/python-magic-0.4.27.tar.gz", hash = "sha256:c1ba14b08e4a5f5c31a302b7721239695b2f0f058d125bd5ce1ee36b9d9d3c3b", size = 14677, upload-time = "2022-06-07T20:16:59.508Z" }
+
wheels = [
+
{ url = "https://files.pythonhosted.org/packages/6c/73/9f872cb81fc5c3bb48f7227872c28975f998f3e7c2b1c16e95e6432bbb90/python_magic-0.4.27-py2.py3-none-any.whl", hash = "sha256:c212960ad306f700aa0d01e5d7a325d20548ff97eb9920dcd29513174f0294d3", size = 13840, upload-time = "2022-06-07T20:16:57.763Z" },
+
]
+
+
[[package]]
name = "requests"
version = "2.32.3"
source = { registry = "https://pypi.org/simple" }
···
{ name = "bs4" },
{ name = "click" },
{ name = "html-to-markdown" },
+
{ name = "python-magic" },
{ name = "requests" },
{ name = "websockets" },
]
···
{ name = "bs4", specifier = ">=0.0.2" },
{ name = "click", specifier = ">=8.2.1" },
{ name = "html-to-markdown", specifier = ">=1.3.3" },
+
{ name = "python-magic", specifier = ">=0.4.27" },
{ name = "requests", specifier = ">=2.32.3" },
{ name = "websockets", specifier = ">=13.1" },
]