social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

change project structure (pls work still)

zenfyr.dev 30c2c81a 3fe345ab

verified
+1 -1
atproto2.py bluesky/atproto2.py
···
from typing import Any
from atproto import client_utils, Client, AtUri, IdResolver
from atproto_client import models
-
from util import LOGGER
+
from util.util import LOGGER
def resolve_identity(
handle: str | None = None,
-693
bluesky.py
···
-
from atproto import client_utils, Request, AsyncFirehoseSubscribeReposClient, CAR, CID
-
from atproto_client import models
-
from atproto_client.models.utils import get_or_create as get_model_or_create
-
from atproto_client.models.blob_ref import BlobRef
-
from atproto_firehose import models as firehose_models, parse_subscribe_repos_message as parse_firehose
-
from atproto2 import Client2, resolve_identity
-
from httpx import Timeout
-
import json
-
import cross
-
import database
-
from database import DataBaseWorker
-
import util
-
import media_util
-
from util import LOGGER
-
import re
-
from typing import Callable, Any
-
-
# only for lexicon reference
-
SERVICE = 'https://bsky.app'
-
-
# TODO this is terrible and stupid
-
ADULT_PATTERN = re.compile(r"\b(sexual content|nsfw|erotic|adult only|18\+)\b", re.IGNORECASE)
-
PORN_PATTERN = re.compile(r"\b(porn|yiff|hentai|pornographic|fetish)\b", re.IGNORECASE)
-
-
def tokenize_post(post: dict) -> list[cross.Token]:
-
text: str = post.get('text', '')
-
if not text:
-
return []
-
text = text.encode(encoding='utf-8').decode(encoding='utf-8')
-
-
facets: list[dict] = post.get('facets', [])
-
if not facets:
-
return [cross.TextToken(text)]
-
-
slices: list[tuple[int, int, str, str]] = []
-
-
for facet in facets:
-
features: list[dict] = facet.get('features', [])
-
if not features:
-
continue
-
-
# we don't support overlapping facets/features
-
feature = features[0]
-
feature_type = feature['$type']
-
index = facet['index']
-
if feature_type == 'app.bsky.richtext.facet#tag':
-
slices.append((index['byteStart'], index['byteEnd'], 'tag', feature['tag']))
-
elif feature_type == 'app.bsky.richtext.facet#link':
-
slices.append((index['byteStart'], index['byteEnd'], 'link', feature['uri']))
-
elif feature_type == 'app.bsky.richtext.facet#mention':
-
slices.append((index['byteStart'], index['byteEnd'], 'mention', feature['did']))
-
-
if not slices:
-
return [cross.TextToken(text)]
-
-
slices.sort(key=lambda s: s[0])
-
unique: list[tuple[int, int, str, str]] = []
-
current_end = 0
-
for start, end, ttype, val in slices:
-
if start >= current_end:
-
unique.append((start, end, ttype, val))
-
current_end = end
-
-
if not unique:
-
return [cross.TextToken(text)]
-
-
tokens: list[cross.Token] = []
-
prev = 0
-
-
for start, end, ttype, val in unique:
-
if start > prev:
-
# text between facets
-
tokens.append(cross.TextToken(text[prev:start]))
-
# facet token
-
if ttype == 'link':
-
label = text[start:end]
-
-
# try to unflatten links
-
split = val.split('://')
-
if len(split) > 1:
-
if split[1].startswith(label):
-
tokens.append(cross.LinkToken(val, ''))
-
elif label.endswith('...') and split[1].startswith(label[:-3]):
-
tokens.append(cross.LinkToken(val, ''))
-
else:
-
tokens.append(cross.LinkToken(val, label))
-
elif ttype == 'tag':
-
tokens.append(cross.TagToken(val))
-
elif ttype == 'mention':
-
tokens.append(cross.MentionToken(text[start:end], val))
-
prev = end
-
-
if prev < len(text):
-
tokens.append(cross.TextToken(text[prev:]))
-
-
for t in tokens:
-
print(t.__dict__)
-
-
return tokens
-
-
class BlueskyPost(cross.Post):
-
def __init__(self, post: dict, tokens: list[cross.Token], attachments: list[media_util.MediaInfo]) -> None:
-
super().__init__()
-
self.post = post
-
self.tokens = tokens
-
-
self.id = json.dumps(self.post['$xpost.strongRef'], sort_keys=True)
-
-
self.parent_id = None
-
if self.post.get('reply'):
-
self.parent_id = json.dumps(self.post['reply']['parent'], sort_keys=True)
-
-
labels = self.post.get('labels', {}).get('values')
-
self.cw = ''
-
if labels:
-
self.cw = ', '.join([str(label['val']).replace('-', ' ') for label in labels])
-
self.attachments = attachments
-
-
def get_tokens(self) -> list[cross.Token]:
-
return self.tokens
-
-
def get_parent_id(self) -> str | None:
-
return self.parent_id
-
-
def get_post_date_iso(self) -> str:
-
return self.post.get('createdAt') or super().get_post_date_iso()
-
-
def get_cw(self) -> str:
-
return self.cw or ''
-
-
def get_id(self) -> str:
-
return self.id
-
-
def get_languages(self) -> list[str]:
-
return self.post.get('langs', []) or []
-
-
def is_sensitive(self) -> bool:
-
return self.post.get('labels', {}).get('values') or False
-
-
def get_attachments(self) -> list[media_util.MediaInfo]:
-
return self.attachments
-
-
class BlueskyInputOptions():
-
def __init__(self, o: dict) -> None:
-
self.filters = [re.compile(f) for f in o.get('regex_filters', [])]
-
-
class BlueskyInput(cross.Input):
-
def __init__(self, settings: dict, db: DataBaseWorker) -> None:
-
self.options = BlueskyInputOptions(settings.get('options', {}))
-
did, pds = resolve_identity(
-
handle=util.as_envvar(settings.get('handle')),
-
did=util.as_envvar(settings.get('did')),
-
pds=util.as_envvar(settings.get('pds'))
-
)
-
self.pds = pds
-
-
# PDS is Not a service, the lexicon and rids are the same across pds
-
super().__init__(SERVICE, did, settings, db)
-
-
def _on_post(self, outputs: list[cross.Output], post: dict[str, Any]):
-
post_ref = json.dumps(post['$xpost.strongRef'], sort_keys=True)
-
-
parent_ref = None
-
if post.get('reply'):
-
parent_ref = json.dumps(post['reply']['parent'], sort_keys=True)
-
-
success = database.try_insert_post(self.db, post_ref, parent_ref, self.user_id, self.service)
-
if not success:
-
LOGGER.info("Skipping '%s' as parent post was not found in db!", post_ref)
-
return
-
-
tokens = tokenize_post(post)
-
if not cross.test_filters(tokens, self.options.filters):
-
LOGGER.info("Skipping '%s'. Matched a filter!", post_ref)
-
return
-
-
LOGGER.info("Crossposting '%s'...", post_ref)
-
-
def get_blob_url(blob: str):
-
return f'{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.user_id}&cid={blob}'
-
-
attachments: list[media_util.MediaInfo] = []
-
embed = post.get('embed', {})
-
if embed.get('$type') == 'app.bsky.embed.images':
-
model = get_model_or_create(embed, model=models.AppBskyEmbedImages.Main)
-
assert isinstance(model, models.AppBskyEmbedImages.Main)
-
-
for image in model.images:
-
url = get_blob_url(image.image.cid.encode())
-
LOGGER.info("Downloading %s...", url)
-
io = media_util.download_media(url, image.alt)
-
if not io:
-
LOGGER.error("Skipping '%s'. Failed to download media!", post_ref)
-
return
-
attachments.append(io)
-
elif embed.get('$type') == 'app.bsky.embed.video':
-
model = get_model_or_create(embed, model=models.AppBskyEmbedVideo.Main)
-
assert isinstance(model, models.AppBskyEmbedVideo.Main)
-
url = get_blob_url(model.video.cid.encode())
-
LOGGER.info("Downloading %s...", url)
-
io = media_util.download_media(url, model.alt if model.alt else '')
-
if not io:
-
LOGGER.error("Skipping '%s'. Failed to download media!", post_ref)
-
return
-
attachments.append(io)
-
-
cross_post = BlueskyPost(post, tokens, attachments)
-
for output in outputs:
-
output.accept_post(cross_post)
-
return
-
-
def _on_delete_post(self, outputs: list[cross.Output], post_id: dict):
-
identifier = json.dumps(post_id, sort_keys=True)
-
post = database.find_post(self.db, identifier, self.user_id, self.service)
-
if not post:
-
return
-
-
LOGGER.info("Deleting '%s'...", identifier)
-
for output in outputs:
-
output.delete_post(identifier)
-
database.delete_post(self.db, identifier, self.user_id, self.service)
-
-
class BlueskyPdsInput(BlueskyInput):
-
def __init__(self, settings: dict, db: DataBaseWorker) -> None:
-
super().__init__(settings, db)
-
-
def __on_commit(self, outputs: list[cross.Output], message: firehose_models.MessageFrame):
-
blocks = message.body.get('blocks')
-
if not blocks:
-
return
-
-
parsed = parse_firehose(message)
-
if not isinstance(parsed, models.ComAtprotoSyncSubscribeRepos.Commit):
-
return
-
blocks = parsed.blocks
-
-
car = None
-
def get_lazy_repo() -> CAR:
-
nonlocal car, blocks
-
-
if isinstance(blocks, str):
-
blocks = blocks.encode()
-
assert blocks
-
-
if car:
-
return car
-
car = CAR.from_bytes(blocks)
-
return car
-
-
for op in parsed.ops:
-
if op.action == 'delete':
-
if not op.prev:
-
continue
-
-
if not op.path.startswith('app.bsky.feed.post'):
-
continue
-
-
self._on_delete_post(outputs, {
-
'cid': op.prev.encode(),
-
'uri': f'at://{parsed.repo}/{op.path}'
-
})
-
continue
-
-
if op.action != 'create':
-
continue
-
-
if not op.cid:
-
continue
-
-
record_data = get_lazy_repo().blocks.get(op.cid)
-
if not record_data:
-
continue
-
-
record_dict = dict(record_data)
-
record_dict['$xpost.strongRef'] = {
-
'cid': op.cid.encode(),
-
'uri': f'at://{parsed.repo}/{op.path}'
-
}
-
if record_dict['$type'] == 'app.bsky.feed.post':
-
self._on_post(outputs, record_dict)
-
-
-
async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
-
streaming: str = f"wss://{self.pds.split("://", 1)[1]}/xrpc"
-
-
client = AsyncFirehoseSubscribeReposClient(base_uri=streaming)
-
-
async def on_message(message: firehose_models.MessageFrame):
-
if message.header.t != '#commit':
-
return
-
-
if message.body.get('repo') != self.user_id:
-
return
-
-
if message.body.get('tooBig'):
-
LOGGER.error("#commit message is tooBig!")
-
return
-
-
submit(lambda: self.__on_commit(outputs, message))
-
return
-
-
LOGGER.info("Listening to %s...", streaming + '/com.atproto.sync.subscribeRepos')
-
await client.start(on_message)
-
-
ALLOWED_GATES = ['mentioned', 'following', 'followers', 'everybody']
-
-
class BlueskyOutputOptions:
-
def __init__(self, o: dict) -> None:
-
self.quote_gate: bool = False
-
self.thread_gate: list[str] = ['everybody']
-
self.encode_videos: bool = True
-
-
quote_gate = o.get('quote_gate')
-
if quote_gate is not None:
-
self.quote_gate = bool(quote_gate)
-
-
thread_gate = o.get('thread_gate')
-
if thread_gate is not None:
-
if any([v not in ALLOWED_GATES for v in thread_gate]):
-
raise ValueError(f"'thread_gate' only accepts {', '.join(ALLOWED_GATES)} or [], got: {thread_gate}")
-
self.thread_gate = thread_gate
-
-
encode_videos = o.get('encode_videos')
-
if encode_videos is not None:
-
self.encode_videos = bool(encode_videos)
-
-
class BlueskyOutput(cross.Output):
-
def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
-
super().__init__(input, settings, db)
-
self.options = BlueskyOutputOptions(settings.get('options') or {})
-
-
if not util.as_envvar(settings.get('app-password')):
-
raise Exception("Account app password not provided!")
-
-
did, pds = resolve_identity(
-
handle=util.as_envvar(settings.get('handle')),
-
did=util.as_envvar(settings.get('did')),
-
pds=util.as_envvar(settings.get('pds'))
-
)
-
-
reqs = Request(timeout=Timeout(None, connect=30.0))
-
-
self.bsky = Client2(pds, request=reqs)
-
self.bsky.login(did, util.as_envvar(settings.get('app-password')))
-
-
def _find_parent(self, parent_id: str):
-
login = self.bsky.me
-
if not login:
-
raise Exception("Client not logged in!")
-
-
thread_tuple = database.find_mapped_thread(
-
self.db,
-
parent_id,
-
self.input.user_id,
-
self.input.service,
-
login.did,
-
SERVICE
-
)
-
-
if not thread_tuple:
-
LOGGER.error("Failed to find thread tuple in the database!")
-
return None
-
-
root_ref = json.loads(thread_tuple[0])
-
reply_ref = json.loads(thread_tuple[1])
-
-
root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(root_ref['uri']), cid=str(root_ref['cid']))
-
reply_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_ref['uri']), cid=str(reply_ref['cid']))
-
-
return (
-
models.create_strong_ref(root_record),
-
models.create_strong_ref(reply_record),
-
thread_tuple[2],
-
thread_tuple[3]
-
)
-
-
def _split_attachments(self, attachments: list[media_util.MediaInfo]):
-
sup_media: list[media_util.MediaInfo] = []
-
unsup_media: list[media_util.MediaInfo] = []
-
-
for a in attachments:
-
if a.mime.startswith('image/') or a.mime.startswith('video/'): # TODO convert gifs to videos
-
sup_media.append(a)
-
else:
-
unsup_media.append(a)
-
-
return (sup_media, unsup_media)
-
-
def _split_media_per_post(
-
self,
-
tokens: list[client_utils.TextBuilder],
-
media: list[media_util.MediaInfo]):
-
-
posts: list[dict] = [{"tokens": tokens, "attachments": []} for tokens in tokens]
-
available_indices: list[int] = list(range(len(posts)))
-
-
current_image_post_idx: int | None = None
-
-
def make_blank_post() -> dict:
-
return {
-
"tokens": [client_utils.TextBuilder().text('')],
-
"attachments": []
-
}
-
-
def pop_next_empty_index() -> int:
-
if available_indices:
-
return available_indices.pop(0)
-
else:
-
new_idx = len(posts)
-
posts.append(make_blank_post())
-
return new_idx
-
-
for att in media:
-
if att.mime.startswith('video/'):
-
current_image_post_idx = None
-
idx = pop_next_empty_index()
-
posts[idx]["attachments"].append(att)
-
elif att.mime.startswith('image/'):
-
if (
-
current_image_post_idx is not None
-
and len(posts[current_image_post_idx]["attachments"]) < 4
-
):
-
posts[current_image_post_idx]["attachments"].append(att)
-
else:
-
idx = pop_next_empty_index()
-
posts[idx]["attachments"].append(att)
-
current_image_post_idx = idx
-
-
result: list[tuple[client_utils.TextBuilder, list[media_util.MediaInfo]]] = []
-
for p in posts:
-
result.append((p["tokens"], p["attachments"]))
-
return result
-
-
def accept_post(self, post: cross.Post):
-
login = self.bsky.me
-
if not login:
-
raise Exception("Client not logged in!")
-
-
parent_id = post.get_parent_id()
-
-
# used for db insertion
-
new_root_id = None
-
new_parent_id = None
-
-
root_ref = None
-
reply_ref = None
-
if parent_id:
-
parents = self._find_parent(parent_id)
-
if not parents:
-
return
-
root_ref, reply_ref, new_root_id, new_parent_id = parents
-
-
tokens = post.get_tokens().copy()
-
-
unique_labels: set[str] = set()
-
cw = post.get_cw()
-
if cw:
-
tokens.insert(0, cross.TextToken("CW: " + cw + "\n\n"))
-
unique_labels.add('graphic-media')
-
-
# from bsky.app, a post can only have one of those labels
-
if PORN_PATTERN.search(cw):
-
unique_labels.add('porn')
-
elif ADULT_PATTERN.search(cw):
-
unique_labels.add('sexual')
-
-
if post.is_sensitive():
-
unique_labels.add('graphic-media')
-
-
labels = models.ComAtprotoLabelDefs.SelfLabels(values=[models.ComAtprotoLabelDefs.SelfLabel(val=label) for label in unique_labels])
-
-
sup_media, unsup_media = self._split_attachments(post.get_attachments())
-
-
if unsup_media:
-
if tokens:
-
tokens.append(cross.TextToken('\n'))
-
for i, attachment in enumerate(unsup_media):
-
tokens.append(cross.LinkToken(
-
attachment.url,
-
f"[{media_util.get_filename_from_url(attachment.url)}]"
-
))
-
tokens.append(cross.TextToken(' '))
-
-
-
split_tokens: list[list[cross.Token]] = cross.split_tokens(tokens, 300)
-
post_text: list[client_utils.TextBuilder] = []
-
-
# convert tokens into rich text. skip post if contains unsupported tokens
-
for block in split_tokens:
-
rich_text = tokens_to_richtext(block)
-
-
if not rich_text:
-
LOGGER.error("Skipping '%s' as it contains invalid rich text types!", post.get_id())
-
return
-
post_text.append(rich_text)
-
-
if not post_text:
-
post_text = [client_utils.TextBuilder().text('')]
-
-
for m in sup_media:
-
if m.mime.startswith('image/'):
-
if len(m.io) > 2_000_000:
-
LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large.", post.get_id())
-
return
-
-
if m.mime.startswith('video/'):
-
if m.mime != 'video/mp4' and not self.options.encode_videos:
-
LOGGER.info("Video is not mp4, but encoding is disabled. Skipping '%s'...", post.get_id())
-
return
-
-
if len(m.io) > 100_000_000:
-
LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
-
return
-
-
created_records: list[models.AppBskyFeedPost.CreateRecordResponse] = []
-
baked_media = self._split_media_per_post(post_text, sup_media)
-
-
for text, attachments in baked_media:
-
if not attachments:
-
if reply_ref and root_ref:
-
new_post = self.bsky.send_post(text, reply_to=models.AppBskyFeedPost.ReplyRef(
-
parent=reply_ref,
-
root=root_ref
-
), labels=labels, time_iso=post.get_post_date_iso())
-
else:
-
new_post = self.bsky.send_post(text, labels=labels, time_iso=post.get_post_date_iso())
-
root_ref = models.create_strong_ref(new_post)
-
-
self.bsky.create_gates(
-
self.options.thread_gate,
-
self.options.quote_gate,
-
new_post.uri,
-
time_iso=post.get_post_date_iso()
-
)
-
reply_ref = models.create_strong_ref(new_post)
-
created_records.append(new_post)
-
else:
-
# if a single post is an image - everything else is an image
-
if attachments[0].mime.startswith('image/'):
-
images: list[bytes] = []
-
image_alts: list[str] = []
-
image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] = []
-
-
for attachment in attachments:
-
image_io = media_util.compress_image(attachment.io, quality=100)
-
metadata = media_util.get_media_meta(image_io)
-
-
if len(image_io) > 1_000_000:
-
LOGGER.info("Compressing %s...", attachment.name)
-
image_io = media_util.compress_image(image_io)
-
-
images.append(image_io)
-
image_alts.append(attachment.alt)
-
image_aspect_ratios.append(models.AppBskyEmbedDefs.AspectRatio(
-
width=metadata['width'],
-
height=metadata['height']
-
))
-
-
new_post = self.bsky.send_images(
-
text=post_text[0],
-
images=images,
-
image_alts=image_alts,
-
image_aspect_ratios=image_aspect_ratios,
-
reply_to= models.AppBskyFeedPost.ReplyRef(
-
parent=reply_ref,
-
root=root_ref
-
) if root_ref and reply_ref else None,
-
labels=labels,
-
time_iso=post.get_post_date_iso()
-
)
-
if not root_ref:
-
root_ref = models.create_strong_ref(new_post)
-
-
self.bsky.create_gates(
-
self.options.thread_gate,
-
self.options.quote_gate,
-
new_post.uri,
-
time_iso=post.get_post_date_iso()
-
)
-
reply_ref = models.create_strong_ref(new_post)
-
created_records.append(new_post)
-
else: # video is guarantedd to be one
-
metadata = media_util.get_media_meta(attachments[0].io)
-
if metadata['duration'] > 180:
-
LOGGER.info("Skipping post_id '%s', video attachment too long!", post.get_id())
-
return
-
-
video_io = attachments[0].io
-
if attachments[0].mime != 'video/mp4':
-
LOGGER.info("Converting %s to mp4...", attachments[0].name)
-
video_io = media_util.convert_to_mp4(video_io)
-
-
aspect_ratio = models.AppBskyEmbedDefs.AspectRatio(
-
width=metadata['width'],
-
height=metadata['height']
-
)
-
-
new_post = self.bsky.send_video(
-
text=post_text[0],
-
video=video_io,
-
video_aspect_ratio=aspect_ratio,
-
video_alt=attachments[0].alt,
-
reply_to= models.AppBskyFeedPost.ReplyRef(
-
parent=reply_ref,
-
root=root_ref
-
) if root_ref and reply_ref else None,
-
labels=labels,
-
time_iso=post.get_post_date_iso()
-
)
-
if not root_ref:
-
root_ref = models.create_strong_ref(new_post)
-
-
self.bsky.create_gates(
-
self.options.thread_gate,
-
self.options.quote_gate,
-
new_post.uri,
-
time_iso=post.get_post_date_iso()
-
)
-
reply_ref = models.create_strong_ref(new_post)
-
created_records.append(new_post)
-
-
db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service)
-
assert db_post, "ghghghhhhh"
-
-
db_identifiers = [json.dumps(cr.model_dump(), sort_keys=True) for cr in created_records]
-
-
if new_root_id is None or new_parent_id is None:
-
new_root_id = database.insert_post(
-
self.db,
-
db_identifiers[0],
-
login.did,
-
SERVICE
-
)
-
new_parent_id = new_root_id
-
database.insert_mapping(self.db, db_post['id'], new_parent_id)
-
db_identifiers = db_identifiers[1:]
-
-
for db_id in db_identifiers:
-
new_parent_id = database.insert_reply(
-
self.db,
-
db_id,
-
login.did,
-
SERVICE,
-
new_parent_id,
-
new_root_id
-
)
-
database.insert_mapping(self.db, db_post['id'], new_parent_id)
-
-
def delete_post(self, identifier: str):
-
login = self.bsky.me
-
if not login:
-
raise Exception("Client not logged in!")
-
-
post = database.find_post(self.db, identifier, self.input.user_id, self.input.service)
-
if not post:
-
return
-
-
mappings = database.find_mappings(self.db, post['id'], SERVICE, login.did)
-
for mapping in mappings[::-1]:
-
LOGGER.info("Deleting '%s'...", mapping[0])
-
self.bsky.delete_post(json.loads(mapping[0])['uri'])
-
database.delete_post(self.db, mapping[0], SERVICE, login.did)
-
-
-
def tokens_to_richtext(tokens: list[cross.Token]) -> client_utils.TextBuilder | None:
-
builder = client_utils.TextBuilder()
-
-
def flatten_link(href: str):
-
split = href.split('://', 1)
-
if len(split) > 1:
-
href = split[1]
-
-
if len(href) > 32:
-
href = href[:32] + '...'
-
-
return href
-
-
for token in tokens:
-
if isinstance(token, cross.TextToken):
-
builder.text(token.text)
-
elif isinstance(token, cross.LinkToken):
-
if util.canonical_label(token.label, token.href):
-
builder.link(flatten_link(token.href), token.href)
-
continue
-
-
builder.link(token.label, token.href)
-
elif isinstance(token, cross.TagToken):
-
builder.tag('#' + token.tag, token.tag)
-
else:
-
# fail on unsupported tokens
-
return None
-
-
return builder
+163
bluesky/common.py
···
+
import re, json
+
+
from atproto import client_utils
+
+
import cross
+
from util.media import MediaInfo
+
from util.util import canonical_label
+
+
# only for lexicon reference
+
SERVICE = 'https://bsky.app'
+
+
# TODO this is terrible and stupid
+
ADULT_PATTERN = re.compile(r"\b(sexual content|nsfw|erotic|adult only|18\+)\b", re.IGNORECASE)
+
PORN_PATTERN = re.compile(r"\b(porn|yiff|hentai|pornographic|fetish)\b", re.IGNORECASE)
+
+
def tokenize_post(post: dict) -> list[cross.Token]:
+
text: str = post.get('text', '')
+
if not text:
+
return []
+
text = text.encode(encoding='utf-8').decode(encoding='utf-8')
+
+
facets: list[dict] = post.get('facets', [])
+
if not facets:
+
return [cross.TextToken(text)]
+
+
slices: list[tuple[int, int, str, str]] = []
+
+
for facet in facets:
+
features: list[dict] = facet.get('features', [])
+
if not features:
+
continue
+
+
# we don't support overlapping facets/features
+
feature = features[0]
+
feature_type = feature['$type']
+
index = facet['index']
+
if feature_type == 'app.bsky.richtext.facet#tag':
+
slices.append((index['byteStart'], index['byteEnd'], 'tag', feature['tag']))
+
elif feature_type == 'app.bsky.richtext.facet#link':
+
slices.append((index['byteStart'], index['byteEnd'], 'link', feature['uri']))
+
elif feature_type == 'app.bsky.richtext.facet#mention':
+
slices.append((index['byteStart'], index['byteEnd'], 'mention', feature['did']))
+
+
if not slices:
+
return [cross.TextToken(text)]
+
+
slices.sort(key=lambda s: s[0])
+
unique: list[tuple[int, int, str, str]] = []
+
current_end = 0
+
for start, end, ttype, val in slices:
+
if start >= current_end:
+
unique.append((start, end, ttype, val))
+
current_end = end
+
+
if not unique:
+
return [cross.TextToken(text)]
+
+
tokens: list[cross.Token] = []
+
prev = 0
+
+
for start, end, ttype, val in unique:
+
if start > prev:
+
# text between facets
+
tokens.append(cross.TextToken(text[prev:start]))
+
# facet token
+
if ttype == 'link':
+
label = text[start:end]
+
+
# try to unflatten links
+
split = val.split('://')
+
if len(split) > 1:
+
if split[1].startswith(label):
+
tokens.append(cross.LinkToken(val, ''))
+
elif label.endswith('...') and split[1].startswith(label[:-3]):
+
tokens.append(cross.LinkToken(val, ''))
+
else:
+
tokens.append(cross.LinkToken(val, label))
+
elif ttype == 'tag':
+
tokens.append(cross.TagToken(val))
+
elif ttype == 'mention':
+
tokens.append(cross.MentionToken(text[start:end], val))
+
prev = end
+
+
if prev < len(text):
+
tokens.append(cross.TextToken(text[prev:]))
+
+
for t in tokens:
+
print(t.__dict__)
+
+
return tokens
+
+
class BlueskyPost(cross.Post):
+
def __init__(self, post: dict, tokens: list[cross.Token], attachments: list[MediaInfo]) -> None:
+
super().__init__()
+
self.post = post
+
self.tokens = tokens
+
+
self.id = json.dumps(self.post['$xpost.strongRef'], sort_keys=True)
+
+
self.parent_id = None
+
if self.post.get('reply'):
+
self.parent_id = json.dumps(self.post['reply']['parent'], sort_keys=True)
+
+
labels = self.post.get('labels', {}).get('values')
+
self.cw = ''
+
if labels:
+
self.cw = ', '.join([str(label['val']).replace('-', ' ') for label in labels])
+
self.attachments = attachments
+
+
def get_tokens(self) -> list[cross.Token]:
+
return self.tokens
+
+
def get_parent_id(self) -> str | None:
+
return self.parent_id
+
+
def get_post_date_iso(self) -> str:
+
return self.post.get('createdAt') or super().get_post_date_iso()
+
+
def get_cw(self) -> str:
+
return self.cw or ''
+
+
def get_id(self) -> str:
+
return self.id
+
+
def get_languages(self) -> list[str]:
+
return self.post.get('langs', []) or []
+
+
def is_sensitive(self) -> bool:
+
return self.post.get('labels', {}).get('values') or False
+
+
def get_attachments(self) -> list[MediaInfo]:
+
return self.attachments
+
+
+
def tokens_to_richtext(tokens: list[cross.Token]) -> client_utils.TextBuilder | None:
+
builder = client_utils.TextBuilder()
+
+
def flatten_link(href: str):
+
split = href.split('://', 1)
+
if len(split) > 1:
+
href = split[1]
+
+
if len(href) > 32:
+
href = href[:32] + '...'
+
+
return href
+
+
for token in tokens:
+
if isinstance(token, cross.TextToken):
+
builder.text(token.text)
+
elif isinstance(token, cross.LinkToken):
+
if canonical_label(token.label, token.href):
+
builder.link(flatten_link(token.href), token.href)
+
continue
+
+
builder.link(token.label, token.href)
+
elif isinstance(token, cross.TagToken):
+
builder.tag('#' + token.tag, token.tag)
+
else:
+
# fail on unsupported tokens
+
return None
+
+
return builder
+178
bluesky/input.py
···
+
import re, json
+
+
from atproto import AsyncFirehoseSubscribeReposClient, CAR
+
from atproto_client import models
+
from atproto_client.models.utils import get_or_create as get_model_or_create
+
from atproto_firehose import models as firehose_models, parse_subscribe_repos_message as parse_firehose
+
from bluesky.atproto2 import resolve_identity
+
+
from bluesky.common import BlueskyPost, SERVICE, tokenize_post
+
+
import cross, util.database as database
+
from util.util import LOGGER, as_envvar
+
from util.media import MediaInfo, download_media
+
from util.database import DataBaseWorker
+
+
from typing import Callable, Any
+
+
class BlueskyInputOptions():
+
def __init__(self, o: dict) -> None:
+
self.filters = [re.compile(f) for f in o.get('regex_filters', [])]
+
+
class BlueskyInput(cross.Input):
+
def __init__(self, settings: dict, db: DataBaseWorker) -> None:
+
self.options = BlueskyInputOptions(settings.get('options', {}))
+
did, pds = resolve_identity(
+
handle=as_envvar(settings.get('handle')),
+
did=as_envvar(settings.get('did')),
+
pds=as_envvar(settings.get('pds'))
+
)
+
self.pds = pds
+
+
# PDS is Not a service, the lexicon and rids are the same across pds
+
super().__init__(SERVICE, did, settings, db)
+
+
def _on_post(self, outputs: list[cross.Output], post: dict[str, Any]):
+
post_ref = json.dumps(post['$xpost.strongRef'], sort_keys=True)
+
+
parent_ref = None
+
if post.get('reply'):
+
parent_ref = json.dumps(post['reply']['parent'], sort_keys=True)
+
+
success = database.try_insert_post(self.db, post_ref, parent_ref, self.user_id, self.service)
+
if not success:
+
LOGGER.info("Skipping '%s' as parent post was not found in db!", post_ref)
+
return
+
+
tokens = tokenize_post(post)
+
if not cross.test_filters(tokens, self.options.filters):
+
LOGGER.info("Skipping '%s'. Matched a filter!", post_ref)
+
return
+
+
LOGGER.info("Crossposting '%s'...", post_ref)
+
+
def get_blob_url(blob: str):
+
return f'{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.user_id}&cid={blob}'
+
+
attachments: list[MediaInfo] = []
+
embed = post.get('embed', {})
+
if embed.get('$type') == 'app.bsky.embed.images':
+
model = get_model_or_create(embed, model=models.AppBskyEmbedImages.Main)
+
assert isinstance(model, models.AppBskyEmbedImages.Main)
+
+
for image in model.images:
+
url = get_blob_url(image.image.cid.encode())
+
LOGGER.info("Downloading %s...", url)
+
io = download_media(url, image.alt)
+
if not io:
+
LOGGER.error("Skipping '%s'. Failed to download media!", post_ref)
+
return
+
attachments.append(io)
+
elif embed.get('$type') == 'app.bsky.embed.video':
+
model = get_model_or_create(embed, model=models.AppBskyEmbedVideo.Main)
+
assert isinstance(model, models.AppBskyEmbedVideo.Main)
+
url = get_blob_url(model.video.cid.encode())
+
LOGGER.info("Downloading %s...", url)
+
io = download_media(url, model.alt if model.alt else '')
+
if not io:
+
LOGGER.error("Skipping '%s'. Failed to download media!", post_ref)
+
return
+
attachments.append(io)
+
+
cross_post = BlueskyPost(post, tokens, attachments)
+
for output in outputs:
+
output.accept_post(cross_post)
+
return
+
+
def _on_delete_post(self, outputs: list[cross.Output], post_id: dict):
+
identifier = json.dumps(post_id, sort_keys=True)
+
post = database.find_post(self.db, identifier, self.user_id, self.service)
+
if not post:
+
return
+
+
LOGGER.info("Deleting '%s'...", identifier)
+
for output in outputs:
+
output.delete_post(identifier)
+
database.delete_post(self.db, identifier, self.user_id, self.service)
+
+
class BlueskyPdsInput(BlueskyInput):
+
def __init__(self, settings: dict, db: DataBaseWorker) -> None:
+
super().__init__(settings, db)
+
+
def __on_commit(self, outputs: list[cross.Output], message: firehose_models.MessageFrame):
+
blocks = message.body.get('blocks')
+
if not blocks:
+
return
+
+
parsed = parse_firehose(message)
+
if not isinstance(parsed, models.ComAtprotoSyncSubscribeRepos.Commit):
+
return
+
blocks = parsed.blocks
+
+
car = None
+
def get_lazy_repo() -> CAR:
+
nonlocal car, blocks
+
+
if isinstance(blocks, str):
+
blocks = blocks.encode()
+
assert blocks
+
+
if car:
+
return car
+
car = CAR.from_bytes(blocks)
+
return car
+
+
for op in parsed.ops:
+
if op.action == 'delete':
+
if not op.prev:
+
continue
+
+
if not op.path.startswith('app.bsky.feed.post'):
+
continue
+
+
self._on_delete_post(outputs, {
+
'cid': op.prev.encode(),
+
'uri': f'at://{parsed.repo}/{op.path}'
+
})
+
continue
+
+
if op.action != 'create':
+
continue
+
+
if not op.cid:
+
continue
+
+
record_data = get_lazy_repo().blocks.get(op.cid)
+
if not record_data:
+
continue
+
+
record_dict = dict(record_data)
+
record_dict['$xpost.strongRef'] = {
+
'cid': op.cid.encode(),
+
'uri': f'at://{parsed.repo}/{op.path}'
+
}
+
if record_dict['$type'] == 'app.bsky.feed.post':
+
self._on_post(outputs, record_dict)
+
+
+
async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
+
streaming: str = f"wss://{self.pds.split("://", 1)[1]}/xrpc"
+
+
client = AsyncFirehoseSubscribeReposClient(base_uri=streaming)
+
+
async def on_message(message: firehose_models.MessageFrame):
+
if message.header.t != '#commit':
+
return
+
+
if message.body.get('repo') != self.user_id:
+
return
+
+
if message.body.get('tooBig'):
+
LOGGER.error("#commit message is tooBig!")
+
return
+
+
submit(lambda: self.__on_commit(outputs, message))
+
return
+
+
LOGGER.info("Listening to %s...", streaming + '/com.atproto.sync.subscribeRepos')
+
await client.start(on_message)
+372
bluesky/output.py
···
+
import json
+
from httpx import Timeout
+
+
from atproto import client_utils, Request
+
from atproto_client import models
+
from bluesky.atproto2 import Client2, resolve_identity
+
+
from bluesky.common import SERVICE, ADULT_PATTERN, PORN_PATTERN, tokens_to_richtext
+
+
import cross, util.database as database
+
from util.util import LOGGER, as_envvar
+
from util.media import MediaInfo, get_filename_from_url, get_media_meta, compress_image, convert_to_mp4
+
from util.database import DataBaseWorker
+
+
ALLOWED_GATES = ['mentioned', 'following', 'followers', 'everybody']
+
+
class BlueskyOutputOptions:
+
def __init__(self, o: dict) -> None:
+
self.quote_gate: bool = False
+
self.thread_gate: list[str] = ['everybody']
+
self.encode_videos: bool = True
+
+
quote_gate = o.get('quote_gate')
+
if quote_gate is not None:
+
self.quote_gate = bool(quote_gate)
+
+
thread_gate = o.get('thread_gate')
+
if thread_gate is not None:
+
if any([v not in ALLOWED_GATES for v in thread_gate]):
+
raise ValueError(f"'thread_gate' only accepts {', '.join(ALLOWED_GATES)} or [], got: {thread_gate}")
+
self.thread_gate = thread_gate
+
+
encode_videos = o.get('encode_videos')
+
if encode_videos is not None:
+
self.encode_videos = bool(encode_videos)
+
+
class BlueskyOutput(cross.Output):
+
def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
+
super().__init__(input, settings, db)
+
self.options = BlueskyOutputOptions(settings.get('options') or {})
+
+
if not as_envvar(settings.get('app-password')):
+
raise Exception("Account app password not provided!")
+
+
did, pds = resolve_identity(
+
handle=as_envvar(settings.get('handle')),
+
did=as_envvar(settings.get('did')),
+
pds=as_envvar(settings.get('pds'))
+
)
+
+
reqs = Request(timeout=Timeout(None, connect=30.0))
+
+
self.bsky = Client2(pds, request=reqs)
+
self.bsky.login(did, as_envvar(settings.get('app-password')))
+
+
def _find_parent(self, parent_id: str):
+
login = self.bsky.me
+
if not login:
+
raise Exception("Client not logged in!")
+
+
thread_tuple = database.find_mapped_thread(
+
self.db,
+
parent_id,
+
self.input.user_id,
+
self.input.service,
+
login.did,
+
SERVICE
+
)
+
+
if not thread_tuple:
+
LOGGER.error("Failed to find thread tuple in the database!")
+
return None
+
+
root_ref = json.loads(thread_tuple[0])
+
reply_ref = json.loads(thread_tuple[1])
+
+
root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(root_ref['uri']), cid=str(root_ref['cid']))
+
reply_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_ref['uri']), cid=str(reply_ref['cid']))
+
+
return (
+
models.create_strong_ref(root_record),
+
models.create_strong_ref(reply_record),
+
thread_tuple[2],
+
thread_tuple[3]
+
)
+
+
def _split_attachments(self, attachments: list[MediaInfo]):
+
sup_media: list[MediaInfo] = []
+
unsup_media: list[MediaInfo] = []
+
+
for a in attachments:
+
if a.mime.startswith('image/') or a.mime.startswith('video/'): # TODO convert gifs to videos
+
sup_media.append(a)
+
else:
+
unsup_media.append(a)
+
+
return (sup_media, unsup_media)
+
+
def _split_media_per_post(
+
self,
+
tokens: list[client_utils.TextBuilder],
+
media: list[MediaInfo]):
+
+
posts: list[dict] = [{"tokens": tokens, "attachments": []} for tokens in tokens]
+
available_indices: list[int] = list(range(len(posts)))
+
+
current_image_post_idx: int | None = None
+
+
def make_blank_post() -> dict:
+
return {
+
"tokens": [client_utils.TextBuilder().text('')],
+
"attachments": []
+
}
+
+
def pop_next_empty_index() -> int:
+
if available_indices:
+
return available_indices.pop(0)
+
else:
+
new_idx = len(posts)
+
posts.append(make_blank_post())
+
return new_idx
+
+
for att in media:
+
if att.mime.startswith('video/'):
+
current_image_post_idx = None
+
idx = pop_next_empty_index()
+
posts[idx]["attachments"].append(att)
+
elif att.mime.startswith('image/'):
+
if (
+
current_image_post_idx is not None
+
and len(posts[current_image_post_idx]["attachments"]) < 4
+
):
+
posts[current_image_post_idx]["attachments"].append(att)
+
else:
+
idx = pop_next_empty_index()
+
posts[idx]["attachments"].append(att)
+
current_image_post_idx = idx
+
+
result: list[tuple[client_utils.TextBuilder, list[MediaInfo]]] = []
+
for p in posts:
+
result.append((p["tokens"], p["attachments"]))
+
return result
+
+
def accept_post(self, post: cross.Post):
+
login = self.bsky.me
+
if not login:
+
raise Exception("Client not logged in!")
+
+
parent_id = post.get_parent_id()
+
+
# used for db insertion
+
new_root_id = None
+
new_parent_id = None
+
+
root_ref = None
+
reply_ref = None
+
if parent_id:
+
parents = self._find_parent(parent_id)
+
if not parents:
+
return
+
root_ref, reply_ref, new_root_id, new_parent_id = parents
+
+
tokens = post.get_tokens().copy()
+
+
unique_labels: set[str] = set()
+
cw = post.get_cw()
+
if cw:
+
tokens.insert(0, cross.TextToken("CW: " + cw + "\n\n"))
+
unique_labels.add('graphic-media')
+
+
# from bsky.app, a post can only have one of those labels
+
if PORN_PATTERN.search(cw):
+
unique_labels.add('porn')
+
elif ADULT_PATTERN.search(cw):
+
unique_labels.add('sexual')
+
+
if post.is_sensitive():
+
unique_labels.add('graphic-media')
+
+
labels = models.ComAtprotoLabelDefs.SelfLabels(values=[models.ComAtprotoLabelDefs.SelfLabel(val=label) for label in unique_labels])
+
+
sup_media, unsup_media = self._split_attachments(post.get_attachments())
+
+
if unsup_media:
+
if tokens:
+
tokens.append(cross.TextToken('\n'))
+
for i, attachment in enumerate(unsup_media):
+
tokens.append(cross.LinkToken(
+
attachment.url,
+
f"[{get_filename_from_url(attachment.url)}]"
+
))
+
tokens.append(cross.TextToken(' '))
+
+
+
split_tokens: list[list[cross.Token]] = cross.split_tokens(tokens, 300)
+
post_text: list[client_utils.TextBuilder] = []
+
+
# convert tokens into rich text. skip post if contains unsupported tokens
+
for block in split_tokens:
+
rich_text = tokens_to_richtext(block)
+
+
if not rich_text:
+
LOGGER.error("Skipping '%s' as it contains invalid rich text types!", post.get_id())
+
return
+
post_text.append(rich_text)
+
+
if not post_text:
+
post_text = [client_utils.TextBuilder().text('')]
+
+
for m in sup_media:
+
if m.mime.startswith('image/'):
+
if len(m.io) > 2_000_000:
+
LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large.", post.get_id())
+
return
+
+
if m.mime.startswith('video/'):
+
if m.mime != 'video/mp4' and not self.options.encode_videos:
+
LOGGER.info("Video is not mp4, but encoding is disabled. Skipping '%s'...", post.get_id())
+
return
+
+
if len(m.io) > 100_000_000:
+
LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
+
return
+
+
created_records: list[models.AppBskyFeedPost.CreateRecordResponse] = []
+
baked_media = self._split_media_per_post(post_text, sup_media)
+
+
for text, attachments in baked_media:
+
if not attachments:
+
if reply_ref and root_ref:
+
new_post = self.bsky.send_post(text, reply_to=models.AppBskyFeedPost.ReplyRef(
+
parent=reply_ref,
+
root=root_ref
+
), labels=labels, time_iso=post.get_post_date_iso())
+
else:
+
new_post = self.bsky.send_post(text, labels=labels, time_iso=post.get_post_date_iso())
+
root_ref = models.create_strong_ref(new_post)
+
+
self.bsky.create_gates(
+
self.options.thread_gate,
+
self.options.quote_gate,
+
new_post.uri,
+
time_iso=post.get_post_date_iso()
+
)
+
reply_ref = models.create_strong_ref(new_post)
+
created_records.append(new_post)
+
else:
+
# if a single post is an image - everything else is an image
+
if attachments[0].mime.startswith('image/'):
+
images: list[bytes] = []
+
image_alts: list[str] = []
+
image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] = []
+
+
for attachment in attachments:
+
image_io = compress_image(attachment.io, quality=100)
+
metadata = get_media_meta(image_io)
+
+
if len(image_io) > 1_000_000:
+
LOGGER.info("Compressing %s...", attachment.name)
+
image_io = compress_image(image_io)
+
+
images.append(image_io)
+
image_alts.append(attachment.alt)
+
image_aspect_ratios.append(models.AppBskyEmbedDefs.AspectRatio(
+
width=metadata['width'],
+
height=metadata['height']
+
))
+
+
new_post = self.bsky.send_images(
+
text=post_text[0],
+
images=images,
+
image_alts=image_alts,
+
image_aspect_ratios=image_aspect_ratios,
+
reply_to= models.AppBskyFeedPost.ReplyRef(
+
parent=reply_ref,
+
root=root_ref
+
) if root_ref and reply_ref else None,
+
labels=labels,
+
time_iso=post.get_post_date_iso()
+
)
+
if not root_ref:
+
root_ref = models.create_strong_ref(new_post)
+
+
self.bsky.create_gates(
+
self.options.thread_gate,
+
self.options.quote_gate,
+
new_post.uri,
+
time_iso=post.get_post_date_iso()
+
)
+
reply_ref = models.create_strong_ref(new_post)
+
created_records.append(new_post)
+
else: # video is guarantedd to be one
+
metadata = get_media_meta(attachments[0].io)
+
if metadata['duration'] > 180:
+
LOGGER.info("Skipping post_id '%s', video attachment too long!", post.get_id())
+
return
+
+
video_io = attachments[0].io
+
if attachments[0].mime != 'video/mp4':
+
LOGGER.info("Converting %s to mp4...", attachments[0].name)
+
video_io = convert_to_mp4(video_io)
+
+
aspect_ratio = models.AppBskyEmbedDefs.AspectRatio(
+
width=metadata['width'],
+
height=metadata['height']
+
)
+
+
new_post = self.bsky.send_video(
+
text=post_text[0],
+
video=video_io,
+
video_aspect_ratio=aspect_ratio,
+
video_alt=attachments[0].alt,
+
reply_to= models.AppBskyFeedPost.ReplyRef(
+
parent=reply_ref,
+
root=root_ref
+
) if root_ref and reply_ref else None,
+
labels=labels,
+
time_iso=post.get_post_date_iso()
+
)
+
if not root_ref:
+
root_ref = models.create_strong_ref(new_post)
+
+
self.bsky.create_gates(
+
self.options.thread_gate,
+
self.options.quote_gate,
+
new_post.uri,
+
time_iso=post.get_post_date_iso()
+
)
+
reply_ref = models.create_strong_ref(new_post)
+
created_records.append(new_post)
+
+
db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service)
+
assert db_post, "ghghghhhhh"
+
+
db_identifiers = [json.dumps(cr.model_dump(), sort_keys=True) for cr in created_records]
+
+
if new_root_id is None or new_parent_id is None:
+
new_root_id = database.insert_post(
+
self.db,
+
db_identifiers[0],
+
login.did,
+
SERVICE
+
)
+
new_parent_id = new_root_id
+
database.insert_mapping(self.db, db_post['id'], new_parent_id)
+
db_identifiers = db_identifiers[1:]
+
+
for db_id in db_identifiers:
+
new_parent_id = database.insert_reply(
+
self.db,
+
db_id,
+
login.did,
+
SERVICE,
+
new_parent_id,
+
new_root_id
+
)
+
database.insert_mapping(self.db, db_post['id'], new_parent_id)
+
+
def delete_post(self, identifier: str):
+
login = self.bsky.me
+
if not login:
+
raise Exception("Client not logged in!")
+
+
post = database.find_post(self.db, identifier, self.input.user_id, self.input.service)
+
if not post:
+
return
+
+
mappings = database.find_mappings(self.db, post['id'], SERVICE, login.did)
+
for mapping in mappings[::-1]:
+
LOGGER.info("Deleting '%s'...", mapping[0])
+
self.bsky.delete_post(json.loads(mapping[0])['uri'])
+
database.delete_post(self.db, mapping[0], SERVICE, login.did)
+4 -5
cross.py
···
from typing import Callable, Any
-
from database import DataBaseWorker
+
from util.database import DataBaseWorker
from datetime import datetime, timezone
-
from media_util import MediaInfo
-
from util import LOGGER
-
import util
+
from util.media import MediaInfo
+
from util.util import LOGGER, canonical_label
import re
ALTERNATE = re.compile(r'\S+|\s+')
···
elif isinstance(token, LinkToken):
link_len = len(token.label)
-
if util.canonical_label(token.label, token.href):
+
if canonical_label(token.label, token.href):
link_len = min(link_len, max_link_len)
if current_length + link_len <= max_chars:
database.py util/database.py
+19 -12
main.py
···
-
from util import LOGGER
import os
import json
-
import database
-
import mastodon, misskey, bluesky, cross
import asyncio, threading, queue, traceback
-
import util
+
+
from util.util import LOGGER, as_json
+
import cross, util.database as database
+
+
from bluesky.input import BlueskyPdsInput
+
from bluesky.output import BlueskyOutputOptions, BlueskyOutput
+
+
from mastodon.input import MastodonInputOptions, MastodonInput
+
from mastodon.output import MastodonOutput
+
+
from misskey.input import MisskeyInput
DEFAULT_SETTINGS: dict = {
'input': {
'type': 'mastodon-wss',
'instance': 'env:MASTODON_INSTANCE',
'token': 'env:MASTODON_TOKEN',
-
"options": mastodon.MastodonInputOptions({})
+
"options": MastodonInputOptions({})
},
'outputs': [
{
'type': 'bluesky',
'handle': 'env:BLUESKY_HANDLE',
'app-password': 'env:BLUESKY_APP_PASSWORD',
-
'options': bluesky.BlueskyOutputOptions({})
+
'options': BlueskyOutputOptions({})
}
]
}
INPUTS = {
-
"mastodon-wss": lambda settings, db: mastodon.MastodonInput(settings, db),
-
"misskey-wss": lambda settigs, db: misskey.MisskeyInput(settigs, db),
-
"bluesky-pds-wss": lambda settings, db: bluesky.BlueskyPdsInput(settings, db)
+
"mastodon-wss": lambda settings, db: MastodonInput(settings, db),
+
"misskey-wss": lambda settigs, db: MisskeyInput(settigs, db),
+
"bluesky-pds-wss": lambda settings, db: BlueskyPdsInput(settings, db)
}
OUTPUTS = {
-
"bluesky": lambda input, settings, db: bluesky.BlueskyOutput(input, settings, db),
-
"mastodon": lambda input, settings, db: mastodon.MastodonOutput(input, settings, db)
+
"bluesky": lambda input, settings, db: BlueskyOutput(input, settings, db),
+
"mastodon": lambda input, settings, db: MastodonOutput(input, settings, db)
}
def execute(data_dir):
···
LOGGER.info("First launch detected! Creating %s and exiting!", settings_path)
with open(settings_path, 'w') as f:
-
f.write(util.as_json(DEFAULT_SETTINGS, indent=2))
+
f.write(as_json(DEFAULT_SETTINGS, indent=2))
return 0
LOGGER.info('Loading settings...')
markeddown.py mastodon/markeddown.py
-614
mastodon.py
···
-
from util import LOGGER
-
import requests, websockets
-
import util, media_util, json, cross
-
import database
-
from database import DataBaseWorker
-
from typing import Callable, Any
-
import asyncio, time
-
-
from bs4 import BeautifulSoup, Tag
-
from bs4.element import NavigableString
-
import markeddown
-
from html import unescape
-
import re
-
-
POSSIBLE_MIMES = [
-
'audio/ogg',
-
'audio/mp3',
-
'image/webp',
-
'image/jpeg',
-
'image/png',
-
'video/mp4',
-
'video/quicktime',
-
'video/webm'
-
]
-
-
md_parser = markeddown.HTMLToMarkdownParser()
-
md_parser.preserve_spaces = True
-
-
def tokenize_post(status: dict) -> list[cross.Token]:
-
if not status.get('content'):
-
return []
-
-
soup = BeautifulSoup(status['content'], "html.parser")
-
tokens: list[cross.Token] = []
-
-
tags: list[dict] = status.get('tags', [])
-
mentions: list[dict] = status.get('mentions', [])
-
-
def mdd(html):
-
md_parser.feed(unescape(html))
-
md = md_parser.get_markdown()
-
md_parser.reset()
-
return md
-
-
def recurse(node) -> None:
-
if isinstance(node, NavigableString):
-
tokens.append(cross.TextToken(str(node)))
-
return
-
-
if isinstance(node, Tag):
-
if node.name.lower() == "a":
-
href = node.get("href", "")
-
inner_html = "".join(str(c) for c in node.contents)
-
link_text_md = mdd(inner_html)
-
-
if link_text_md.startswith('@'):
-
as_mention = link_text_md[1:]
-
for block in mentions:
-
if href == block.get('url'):
-
tokens.append(cross.MentionToken(block['acct'], block['url']))
-
return
-
elif as_mention == block.get('acct') or as_mention == block.get('username'):
-
tokens.append(cross.MentionToken(block['acct'], block['url']))
-
return
-
-
if link_text_md.startswith('#'):
-
as_tag = link_text_md[1:].lower()
-
if any(as_tag == block.get('name') for block in tags):
-
tokens.append(cross.TagToken(link_text_md[1:]))
-
return
-
-
# idk if we can safely convert this to string
-
tokens.append(cross.LinkToken(str(href), link_text_md))
-
return
-
-
if node.find("a") is not None:
-
for child in node.contents:
-
recurse(child)
-
return
-
-
serialized = str(node)
-
markdownified = mdd(serialized)
-
if markdownified:
-
tokens.append(cross.TextToken(markdownified))
-
return
-
return
-
-
for child in soup.contents:
-
recurse(child)
-
-
if not tokens:
-
return []
-
-
last_token = tokens[-1]
-
if last_token and isinstance(last_token, cross.TextToken) and last_token.text.endswith('\n\n'):
-
tokens[-1] = cross.TextToken(last_token.text[:-2])
-
-
return tokens
-
-
MARKDOWNY = ['text/x.misskeymarkdown', 'text/markdown', 'text/plain']
-
-
class MastodonPost(cross.Post):
-
def __init__(self, status: dict, tokens: list[cross.Token], media_attachments: list[media_util.MediaInfo]) -> None:
-
super().__init__()
-
self.status = status
-
self.media_attachments = media_attachments
-
self.tokens = tokens
-
-
def get_tokens(self) -> list[cross.Token]:
-
return self.tokens
-
-
def get_parent_id(self) -> str | None:
-
return self.status.get('in_reply_to_id')
-
-
def get_post_date_iso(self) -> str:
-
date = self.status.get('created_at')
-
return date or super().get_post_date_iso()
-
-
def get_cw(self) -> str:
-
return self.status.get('spoiler_text') or ''
-
-
def get_id(self) -> str:
-
return self.status['id']
-
-
def get_languages(self) -> list[str]:
-
if self.status.get('language'):
-
return [self.status['language']]
-
return []
-
-
def is_sensitive(self) -> bool:
-
return self.status.get('sensitive', False)
-
-
def get_attachments(self) -> list[media_util.MediaInfo]:
-
return self.media_attachments
-
-
ALLOWED_VISIBILITY = ['public', 'unlisted']
-
-
class MastodonInputOptions():
-
def __init__(self, o: dict) -> None:
-
self.allowed_visibility = ALLOWED_VISIBILITY
-
self.filters = [re.compile(f) for f in o.get('regex_filters', [])]
-
-
allowed_visibility = o.get('allowed_visibility')
-
if allowed_visibility is not None:
-
if any([v not in ALLOWED_VISIBILITY for v in allowed_visibility]):
-
raise ValueError(f"'allowed_visibility' only accepts {', '.join(ALLOWED_VISIBILITY)}, got: {allowed_visibility}")
-
self.allowed_visibility = allowed_visibility
-
-
class MastodonInput(cross.Input):
-
def __init__(self, settings: dict, db: DataBaseWorker) -> None:
-
self.options = MastodonInputOptions(settings.get('options', {}))
-
self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
-
instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
-
-
service = instance[:-1] if instance.endswith('/') else instance
-
-
LOGGER.info("Verifying %s credentails...", service)
-
responce = requests.get(f"{service}/api/v1/accounts/verify_credentials", headers={
-
'Authorization': f'Bearer {self.token}'
-
})
-
if responce.status_code != 200:
-
LOGGER.error("Failed to validate user credentials!")
-
responce.raise_for_status()
-
return
-
-
super().__init__(service, responce.json()["id"], settings, db)
-
self.streaming = self._get_streaming_url()
-
-
if not self.streaming:
-
raise Exception("Instance %s does not support streaming!", service)
-
-
def _get_streaming_url(self):
-
response = requests.get(f"{self.service}/api/v1/instance")
-
response.raise_for_status()
-
data: dict = response.json()
-
return (data.get('urls') or {}).get('streaming_api')
-
-
def __to_tokens(self, status: dict):
-
content_type = status.get('content_type', 'text/plain')
-
raw_text = status.get('text')
-
-
tags: list[str] = []
-
for tag in status.get('tags', []):
-
tags.append(tag['name'])
-
-
mentions: list[tuple[str, str]] = []
-
for mention in status.get('mentions', []):
-
mentions.append(('@' + mention['username'], '@' + mention['acct']))
-
-
if raw_text and content_type in MARKDOWNY:
-
return cross.tokenize_markdown(raw_text, tags, mentions)
-
-
akkoma_ext: dict | None = status.get('akkoma', {}).get('source')
-
if akkoma_ext:
-
if akkoma_ext.get('mediaType') in MARKDOWNY:
-
return cross.tokenize_markdown(akkoma_ext["content"], tags, mentions)
-
-
return tokenize_post(status)
-
-
def _on_create_post(self, outputs: list[cross.Output], status: dict):
-
# skip events from other users
-
if (status.get('account') or {})['id'] != self.user_id:
-
return
-
-
if status.get('reblog') or (status.get('quote_id') or status.get('quote')) or status.get('poll'):
-
# TODO polls not supported on bsky. maybe 3rd party? skip for now
-
# we don't handle reblogs. possible with bridgy(?) and self
-
# we don't handle quotes.
-
LOGGER.info("Skipping '%s'! Reblog, quote or poll..", status['id'])
-
return
-
-
in_reply: str | None = status.get('in_reply_to_id')
-
in_reply_to: str | None = status.get('in_reply_to_account_id')
-
if in_reply_to and in_reply_to != self.user_id:
-
# We don't support replies.
-
LOGGER.info("Skipping '%s'! Reply to other user..", status['id'])
-
return
-
-
if status.get('visibility') not in self.options.allowed_visibility:
-
# Skip f/o and direct posts
-
LOGGER.info("Skipping '%s'! '%s' visibility..", status['id'], status.get('visibility'))
-
return
-
-
success = database.try_insert_post(self.db, status['id'], in_reply, self.user_id, self.service)
-
if not success:
-
LOGGER.info("Skipping '%s' as parent post was not found in db!", status['id'])
-
return
-
-
tokens = self.__to_tokens(status)
-
if not cross.test_filters(tokens, self.options.filters):
-
LOGGER.info("Skipping '%s'. Matched a filter!", status['id'])
-
return
-
-
LOGGER.info("Crossposting '%s'...", status['id'])
-
-
media_attachments: list[media_util.MediaInfo] = []
-
for attachment in status.get('media_attachments', []):
-
LOGGER.info("Downloading %s...", attachment['url'])
-
info = media_util.download_media(attachment['url'], attachment.get('description') or '')
-
if not info:
-
LOGGER.error("Skipping '%s'. Failed to download media!", status['id'])
-
return
-
media_attachments.append(info)
-
-
cross_post = MastodonPost(status, tokens, media_attachments)
-
for output in outputs:
-
output.accept_post(cross_post)
-
-
def _on_delete_post(self, outputs: list[cross.Output], identifier: str):
-
post = database.find_post(self.db, identifier, self.user_id, self.service)
-
if not post:
-
return
-
-
LOGGER.info("Deleting '%s'...", identifier)
-
for output in outputs:
-
output.delete_post(identifier)
-
database.delete_post(self.db, identifier, self.user_id, self.service)
-
-
def _on_post(self, outputs: list[cross.Output], event: str, payload: str):
-
if event == 'update':
-
self._on_create_post(outputs, json.loads(payload))
-
elif event == 'delete':
-
self._on_delete_post(outputs, payload)
-
-
async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
-
uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}"
-
-
async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.3"}):
-
try:
-
LOGGER.info("Listening to %s...", self.streaming)
-
-
async def listen_for_messages():
-
async for msg in ws:
-
data = json.loads(msg)
-
event: str = data.get('event')
-
payload: str = data.get('payload')
-
-
submit(lambda: self._on_post(outputs, str(event), str(payload)))
-
-
listen = asyncio.create_task(listen_for_messages())
-
-
await asyncio.gather(listen)
-
except websockets.ConnectionClosedError as e:
-
LOGGER.error(e, stack_info=True, exc_info=True)
-
LOGGER.info("Reconnecting to %s...", self.streaming)
-
continue
-
-
ALLOWED_POSTING_VISIBILITY = ['public', 'unlisted', 'private']
-
-
class MastodonOutputOptions():
-
def __init__(self, o: dict) -> None:
-
self.visibility = 'public'
-
-
visibility = o.get('visibility')
-
if visibility is not None:
-
if visibility not in ALLOWED_POSTING_VISIBILITY:
-
raise ValueError(f"'visibility' only accepts {', '.join(ALLOWED_POSTING_VISIBILITY)}, got: {visibility}")
-
self.visibility = visibility
-
-
class MastodonOutput(cross.Output):
-
def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
-
super().__init__(input, settings, db)
-
self.options = settings.get('options') or {}
-
self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
-
instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
-
-
self.service = instance[:-1] if instance.endswith('/') else instance
-
-
LOGGER.info("Verifying %s credentails...", self.service)
-
responce = requests.get(f"{self.service}/api/v1/accounts/verify_credentials", headers={
-
'Authorization': f'Bearer {self.token}'
-
})
-
if responce.status_code != 200:
-
LOGGER.error("Failed to validate user credentials!")
-
responce.raise_for_status()
-
return
-
self.user_id: str = responce.json()["id"]
-
-
LOGGER.info("Getting %s configuration...", self.service)
-
responce = requests.get(f"{self.service}/api/v1/instance", headers={
-
'Authorization': f'Bearer {self.token}'
-
})
-
if responce.status_code != 200:
-
LOGGER.error("Failed to get instance info!")
-
responce.raise_for_status()
-
return
-
-
instance_info: dict = responce.json()
-
configuration: dict = instance_info['configuration']
-
-
statuses_config: dict = configuration.get('statuses', {})
-
self.max_characters: int = statuses_config.get('max_characters', 500)
-
self.max_media_attachments: int = statuses_config.get('max_media_attachments', 4)
-
self.characters_reserved_per_url: int = statuses_config.get('characters_reserved_per_url', 23)
-
-
media_config: dict = configuration.get('media_attachments', {})
-
self.image_size_limit: int = media_config.get('image_size_limit', 16777216)
-
self.video_size_limit: int = media_config.get('video_size_limit', 103809024)
-
self.supported_mime_types: list[str] = media_config.get('supported_mime_types', POSSIBLE_MIMES)
-
-
# *oma: max post chars
-
max_toot_chars = instance_info.get('max_toot_chars')
-
if max_toot_chars:
-
self.max_characters: int = max_toot_chars
-
-
# *oma: max upload limit
-
upload_limit = instance_info.get('upload_limit')
-
if upload_limit:
-
self.image_size_limit: int = upload_limit
-
self.video_size_limit: int = upload_limit
-
-
# *oma ext: supported text types
-
self.text_format = 'text/plain'
-
pleroma = instance_info.get('pleroma')
-
if pleroma:
-
post_formats: list[str] = pleroma.get('metadata', {}).get('post_formats', [])
-
if 'text/x.misskeymarkdown' in post_formats:
-
self.text_format = 'text/x.misskeymarkdown'
-
elif 'text/markdown' in post_formats:
-
self.text_format = 'text/markdown'
-
-
def upload_media(self, attachments: list[media_util.MediaInfo]) -> list[str] | None:
-
for a in attachments:
-
if a.mime.startswith('image/') and len(a.io) > self.image_size_limit:
-
return None
-
-
if a.mime.startswith('video/') and len(a.io) > self.video_size_limit:
-
return None
-
-
if not a.mime.startswith('image/') and not a.mime.startswith('video/'):
-
if len(a.io) > 7_000_000:
-
return None
-
-
uploads: list[dict] = []
-
for a in attachments:
-
data = {}
-
if a.alt:
-
data['description'] = a.alt
-
-
req = requests.post(f"{self.service}/api/v2/media", headers= {
-
'Authorization': f'Bearer {self.token}'
-
}, files={'file': (a.name, a.io, a.mime)}, data=data)
-
-
if req.status_code == 200:
-
LOGGER.info("Uploaded %s! (%s)", a.name, req.json()['id'])
-
uploads.append({
-
'done': True,
-
'id': req.json()['id']
-
})
-
elif req.status_code == 202:
-
LOGGER.info("Waiting for %s to process!", a.name)
-
uploads.append({
-
'done': False,
-
'id': req.json()['id']
-
})
-
else:
-
LOGGER.error("Failed to upload %s! %s", a.name, req.text)
-
req.raise_for_status()
-
-
while any([not val['done'] for val in uploads]):
-
LOGGER.info("Waiting for media to process...")
-
time.sleep(3)
-
for media in uploads:
-
if media['done']:
-
continue
-
-
reqs = requests.get(f'{self.service}/api/v1/media/{media['id']}', headers={
-
'Authorization': f'Bearer {self.token}'
-
})
-
-
if reqs.status_code == 206:
-
continue
-
-
if reqs.status_code == 200:
-
media['done'] = True
-
continue
-
reqs.raise_for_status()
-
-
return [val['id'] for val in uploads]
-
-
def token_to_string(self, tokens: list[cross.Token]) -> str | None:
-
p_text: str = ''
-
-
for token in tokens:
-
if isinstance(token, cross.TextToken):
-
p_text += token.text
-
elif isinstance(token, cross.TagToken):
-
p_text += '#' + token.tag
-
elif isinstance(token, cross.LinkToken):
-
if util.canonical_label(token.label, token.href):
-
p_text += token.href
-
else:
-
if self.text_format == 'text/plain':
-
p_text += f'{token.label}: {token.href}'
-
elif self.text_format in {'text/x.misskeymarkdown', 'text/markdown'}:
-
p_text += f'[{token.label}]({token.href})'
-
else:
-
return None
-
-
return p_text
-
-
def split_tokens_media(self, tokens: list[cross.Token], media: list[media_util.MediaInfo]):
-
split_tokens = cross.split_tokens(tokens, self.max_characters, self.characters_reserved_per_url)
-
post_text: list[str] = []
-
-
for block in split_tokens:
-
baked_text = self.token_to_string(block)
-
-
if baked_text is None:
-
return None
-
post_text.append(baked_text)
-
-
if not post_text:
-
post_text = ['']
-
-
posts: list[dict] = [{"text": post_text, "attachments": []} for post_text in post_text]
-
available_indices: list[int] = list(range(len(posts)))
-
-
current_image_post_idx: int | None = None
-
-
def make_blank_post() -> dict:
-
return {
-
"text": '',
-
"attachments": []
-
}
-
-
def pop_next_empty_index() -> int:
-
if available_indices:
-
return available_indices.pop(0)
-
else:
-
new_idx = len(posts)
-
posts.append(make_blank_post())
-
return new_idx
-
-
for att in media:
-
if (
-
current_image_post_idx is not None
-
and len(posts[current_image_post_idx]["attachments"]) < self.max_media_attachments
-
):
-
posts[current_image_post_idx]["attachments"].append(att)
-
else:
-
idx = pop_next_empty_index()
-
posts[idx]["attachments"].append(att)
-
current_image_post_idx = idx
-
-
result: list[tuple[str, list[media_util.MediaInfo]]] = []
-
-
for p in posts:
-
result.append((p['text'], p["attachments"]))
-
-
return result
-
-
def accept_post(self, post: cross.Post):
-
parent_id = post.get_parent_id()
-
-
new_root_id: int | None = None
-
new_parent_id: int | None = None
-
-
reply_ref: str | None = None
-
if parent_id:
-
thread_tuple = database.find_mapped_thread(
-
self.db,
-
parent_id,
-
self.input.user_id,
-
self.input.service,
-
self.user_id,
-
self.service
-
)
-
-
if not thread_tuple:
-
LOGGER.error("Failed to find thread tuple in the database!")
-
return None
-
-
_, reply_ref, new_root_id, new_parent_id = thread_tuple
-
-
lang: str
-
if post.get_languages():
-
lang = post.get_languages()[0]
-
else:
-
lang = 'en'
-
-
raw_statuses = self.split_tokens_media(post.get_tokens(), post.get_attachments())
-
if not raw_statuses:
-
LOGGER.error("Failed to split post into statuses?")
-
return None
-
baked_statuses = []
-
-
for status, raw_media in raw_statuses:
-
media: list[str] | None = None
-
if raw_media:
-
media = self.upload_media(raw_media)
-
if not media:
-
LOGGER.error("Failed to upload attachments!")
-
return None
-
baked_statuses.append((status, media))
-
continue
-
baked_statuses.append((status,[]))
-
-
created_statuses: list[str] = []
-
-
for status, media in baked_statuses:
-
payload = {
-
'status': status,
-
'media_ids': media or [],
-
'spoiler_text': post.get_cw(),
-
'visibility': self.options.get('visibility', 'public'),
-
'content_type': self.text_format,
-
'language': lang
-
}
-
-
if media:
-
payload['sensitive'] = post.is_sensitive()
-
-
if post.get_cw():
-
payload['sensitive'] = True
-
-
if not status:
-
payload['status'] = '🖼️'
-
-
if reply_ref:
-
payload['in_reply_to_id'] = reply_ref
-
-
reqs = requests.post(f'{self.service}/api/v1/statuses', headers={
-
'Authorization': f'Bearer {self.token}',
-
'Content-Type': 'application/json'
-
}, json=payload)
-
-
if reqs.status_code != 200:
-
LOGGER.info("Failed to post status! %s - %s", reqs.status_code, reqs.text)
-
reqs.raise_for_status()
-
-
reply_ref = reqs.json()['id']
-
LOGGER.info("Created new status %s!", reply_ref)
-
-
created_statuses.append(reqs.json()['id'])
-
-
db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service)
-
assert db_post, "ghghghhhhh"
-
-
if new_root_id is None or new_parent_id is None:
-
new_root_id = database.insert_post(
-
self.db,
-
created_statuses[0],
-
self.user_id,
-
self.service
-
)
-
new_parent_id = new_root_id
-
database.insert_mapping(self.db, db_post['id'], new_parent_id)
-
created_statuses = created_statuses[1:]
-
-
for db_id in created_statuses:
-
new_parent_id = database.insert_reply(
-
self.db,
-
db_id,
-
self.user_id,
-
self.service,
-
new_parent_id,
-
new_root_id
-
)
-
database.insert_mapping(self.db, db_post['id'], new_parent_id)
-
-
def delete_post(self, identifier: str):
-
post = database.find_post(self.db, identifier, self.input.user_id, self.input.service)
-
if not post:
-
return
-
-
mappings = database.find_mappings(self.db, post['id'], self.service, self.user_id)
-
for mapping in mappings[::-1]:
-
LOGGER.info("Deleting '%s'...", mapping[0])
-
requests.delete(f'{self.service}/api/v1/statuses/{mapping[0]}', headers={
-
'Authorization': f'Bearer {self.token}'
-
})
-
database.delete_post(self.db, mapping[0], self.service, self.user_id)
-
+116
mastodon/common.py
···
+
from bs4 import BeautifulSoup, Tag
+
from bs4.element import NavigableString
+
from html import unescape
+
+
import mastodon.markeddown as markeddown
+
+
import cross
+
from util.media import MediaInfo
+
+
md_parser = markeddown.HTMLToMarkdownParser()
+
md_parser.preserve_spaces = True
+
+
class MastodonPost(cross.Post):
+
def __init__(self, status: dict, tokens: list[cross.Token], media_attachments: list[MediaInfo]) -> None:
+
super().__init__()
+
self.status = status
+
self.media_attachments = media_attachments
+
self.tokens = tokens
+
+
def get_tokens(self) -> list[cross.Token]:
+
return self.tokens
+
+
def get_parent_id(self) -> str | None:
+
return self.status.get('in_reply_to_id')
+
+
def get_post_date_iso(self) -> str:
+
date = self.status.get('created_at')
+
return date or super().get_post_date_iso()
+
+
def get_cw(self) -> str:
+
return self.status.get('spoiler_text') or ''
+
+
def get_id(self) -> str:
+
return self.status['id']
+
+
def get_languages(self) -> list[str]:
+
if self.status.get('language'):
+
return [self.status['language']]
+
return []
+
+
def is_sensitive(self) -> bool:
+
return self.status.get('sensitive', False)
+
+
def get_attachments(self) -> list[MediaInfo]:
+
return self.media_attachments
+
+
def tokenize_post(status: dict) -> list[cross.Token]:
+
if not status.get('content'):
+
return []
+
+
soup = BeautifulSoup(status['content'], "html.parser")
+
tokens: list[cross.Token] = []
+
+
tags: list[dict] = status.get('tags', [])
+
mentions: list[dict] = status.get('mentions', [])
+
+
def mdd(html):
+
md_parser.feed(unescape(html))
+
md = md_parser.get_markdown()
+
md_parser.reset()
+
return md
+
+
def recurse(node) -> None:
+
if isinstance(node, NavigableString):
+
tokens.append(cross.TextToken(str(node)))
+
return
+
+
if isinstance(node, Tag):
+
if node.name.lower() == "a":
+
href = node.get("href", "")
+
inner_html = "".join(str(c) for c in node.contents)
+
link_text_md = mdd(inner_html)
+
+
if link_text_md.startswith('@'):
+
as_mention = link_text_md[1:]
+
for block in mentions:
+
if href == block.get('url'):
+
tokens.append(cross.MentionToken(block['acct'], block['url']))
+
return
+
elif as_mention == block.get('acct') or as_mention == block.get('username'):
+
tokens.append(cross.MentionToken(block['acct'], block['url']))
+
return
+
+
if link_text_md.startswith('#'):
+
as_tag = link_text_md[1:].lower()
+
if any(as_tag == block.get('name') for block in tags):
+
tokens.append(cross.TagToken(link_text_md[1:]))
+
return
+
+
# idk if we can safely convert this to string
+
tokens.append(cross.LinkToken(str(href), link_text_md))
+
return
+
+
if node.find("a") is not None:
+
for child in node.contents:
+
recurse(child)
+
return
+
+
serialized = str(node)
+
markdownified = mdd(serialized)
+
if markdownified:
+
tokens.append(cross.TextToken(markdownified))
+
return
+
return
+
+
for child in soup.contents:
+
recurse(child)
+
+
if not tokens:
+
return []
+
+
last_token = tokens[-1]
+
if last_token and isinstance(last_token, cross.TextToken) and last_token.text.endswith('\n\n'):
+
tokens[-1] = cross.TextToken(last_token.text[:-2])
+
+
return tokens
+166
mastodon/input.py
···
+
import requests, websockets
+
import json
+
import re
+
import asyncio
+
+
from mastodon.common import MastodonPost, tokenize_post
+
+
import cross, util.database as database
+
from util.util import LOGGER, as_envvar
+
from util.media import MediaInfo, download_media
+
from util.database import DataBaseWorker
+
+
from typing import Callable, Any
+
+
ALLOWED_VISIBILITY = ['public', 'unlisted']
+
MARKDOWNY = ['text/x.misskeymarkdown', 'text/markdown', 'text/plain']
+
+
class MastodonInputOptions():
+
def __init__(self, o: dict) -> None:
+
self.allowed_visibility = ALLOWED_VISIBILITY
+
self.filters = [re.compile(f) for f in o.get('regex_filters', [])]
+
+
allowed_visibility = o.get('allowed_visibility')
+
if allowed_visibility is not None:
+
if any([v not in ALLOWED_VISIBILITY for v in allowed_visibility]):
+
raise ValueError(f"'allowed_visibility' only accepts {', '.join(ALLOWED_VISIBILITY)}, got: {allowed_visibility}")
+
self.allowed_visibility = allowed_visibility
+
+
class MastodonInput(cross.Input):
+
def __init__(self, settings: dict, db: DataBaseWorker) -> None:
+
self.options = MastodonInputOptions(settings.get('options', {}))
+
self.token = as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
+
instance: str = as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
+
+
service = instance[:-1] if instance.endswith('/') else instance
+
+
LOGGER.info("Verifying %s credentails...", service)
+
responce = requests.get(f"{service}/api/v1/accounts/verify_credentials", headers={
+
'Authorization': f'Bearer {self.token}'
+
})
+
if responce.status_code != 200:
+
LOGGER.error("Failed to validate user credentials!")
+
responce.raise_for_status()
+
return
+
+
super().__init__(service, responce.json()["id"], settings, db)
+
self.streaming = self._get_streaming_url()
+
+
if not self.streaming:
+
raise Exception("Instance %s does not support streaming!", service)
+
+
def _get_streaming_url(self):
+
response = requests.get(f"{self.service}/api/v1/instance")
+
response.raise_for_status()
+
data: dict = response.json()
+
return (data.get('urls') or {}).get('streaming_api')
+
+
def __to_tokens(self, status: dict):
+
content_type = status.get('content_type', 'text/plain')
+
raw_text = status.get('text')
+
+
tags: list[str] = []
+
for tag in status.get('tags', []):
+
tags.append(tag['name'])
+
+
mentions: list[tuple[str, str]] = []
+
for mention in status.get('mentions', []):
+
mentions.append(('@' + mention['username'], '@' + mention['acct']))
+
+
if raw_text and content_type in MARKDOWNY:
+
return cross.tokenize_markdown(raw_text, tags, mentions)
+
+
akkoma_ext: dict | None = status.get('akkoma', {}).get('source')
+
if akkoma_ext:
+
if akkoma_ext.get('mediaType') in MARKDOWNY:
+
return cross.tokenize_markdown(akkoma_ext["content"], tags, mentions)
+
+
return tokenize_post(status)
+
+
def _on_create_post(self, outputs: list[cross.Output], status: dict):
+
# skip events from other users
+
if (status.get('account') or {})['id'] != self.user_id:
+
return
+
+
if status.get('reblog') or (status.get('quote_id') or status.get('quote')) or status.get('poll'):
+
# TODO polls not supported on bsky. maybe 3rd party? skip for now
+
# we don't handle reblogs. possible with bridgy(?) and self
+
# we don't handle quotes.
+
LOGGER.info("Skipping '%s'! Reblog, quote or poll..", status['id'])
+
return
+
+
in_reply: str | None = status.get('in_reply_to_id')
+
in_reply_to: str | None = status.get('in_reply_to_account_id')
+
if in_reply_to and in_reply_to != self.user_id:
+
# We don't support replies.
+
LOGGER.info("Skipping '%s'! Reply to other user..", status['id'])
+
return
+
+
if status.get('visibility') not in self.options.allowed_visibility:
+
# Skip f/o and direct posts
+
LOGGER.info("Skipping '%s'! '%s' visibility..", status['id'], status.get('visibility'))
+
return
+
+
success = database.try_insert_post(self.db, status['id'], in_reply, self.user_id, self.service)
+
if not success:
+
LOGGER.info("Skipping '%s' as parent post was not found in db!", status['id'])
+
return
+
+
tokens = self.__to_tokens(status)
+
if not cross.test_filters(tokens, self.options.filters):
+
LOGGER.info("Skipping '%s'. Matched a filter!", status['id'])
+
return
+
+
LOGGER.info("Crossposting '%s'...", status['id'])
+
+
media_attachments: list[MediaInfo] = []
+
for attachment in status.get('media_attachments', []):
+
LOGGER.info("Downloading %s...", attachment['url'])
+
info = download_media(attachment['url'], attachment.get('description') or '')
+
if not info:
+
LOGGER.error("Skipping '%s'. Failed to download media!", status['id'])
+
return
+
media_attachments.append(info)
+
+
cross_post = MastodonPost(status, tokens, media_attachments)
+
for output in outputs:
+
output.accept_post(cross_post)
+
+
def _on_delete_post(self, outputs: list[cross.Output], identifier: str):
+
post = database.find_post(self.db, identifier, self.user_id, self.service)
+
if not post:
+
return
+
+
LOGGER.info("Deleting '%s'...", identifier)
+
for output in outputs:
+
output.delete_post(identifier)
+
database.delete_post(self.db, identifier, self.user_id, self.service)
+
+
def _on_post(self, outputs: list[cross.Output], event: str, payload: str):
+
if event == 'update':
+
self._on_create_post(outputs, json.loads(payload))
+
elif event == 'delete':
+
self._on_delete_post(outputs, payload)
+
+
async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
+
uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}"
+
+
async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.3"}):
+
try:
+
LOGGER.info("Listening to %s...", self.streaming)
+
+
async def listen_for_messages():
+
async for msg in ws:
+
data = json.loads(msg)
+
event: str = data.get('event')
+
payload: str = data.get('payload')
+
+
submit(lambda: self._on_post(outputs, str(event), str(payload)))
+
+
listen = asyncio.create_task(listen_for_messages())
+
+
await asyncio.gather(listen)
+
except websockets.ConnectionClosedError as e:
+
LOGGER.error(e, stack_info=True, exc_info=True)
+
LOGGER.info("Reconnecting to %s...", self.streaming)
+
continue
+345
mastodon/output.py
···
+
import requests, time
+
+
import cross, util.database as database
+
from util.util import LOGGER, as_envvar, canonical_label
+
from util.media import MediaInfo
+
from util.database import DataBaseWorker
+
+
POSSIBLE_MIMES = [
+
'audio/ogg',
+
'audio/mp3',
+
'image/webp',
+
'image/jpeg',
+
'image/png',
+
'video/mp4',
+
'video/quicktime',
+
'video/webm'
+
]
+
+
ALLOWED_POSTING_VISIBILITY = ['public', 'unlisted', 'private']
+
+
class MastodonOutputOptions():
+
def __init__(self, o: dict) -> None:
+
self.visibility = 'public'
+
+
visibility = o.get('visibility')
+
if visibility is not None:
+
if visibility not in ALLOWED_POSTING_VISIBILITY:
+
raise ValueError(f"'visibility' only accepts {', '.join(ALLOWED_POSTING_VISIBILITY)}, got: {visibility}")
+
self.visibility = visibility
+
+
class MastodonOutput(cross.Output):
+
def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
+
super().__init__(input, settings, db)
+
self.options = settings.get('options') or {}
+
self.token = as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
+
instance: str = as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
+
+
self.service = instance[:-1] if instance.endswith('/') else instance
+
+
LOGGER.info("Verifying %s credentails...", self.service)
+
responce = requests.get(f"{self.service}/api/v1/accounts/verify_credentials", headers={
+
'Authorization': f'Bearer {self.token}'
+
})
+
if responce.status_code != 200:
+
LOGGER.error("Failed to validate user credentials!")
+
responce.raise_for_status()
+
return
+
self.user_id: str = responce.json()["id"]
+
+
LOGGER.info("Getting %s configuration...", self.service)
+
responce = requests.get(f"{self.service}/api/v1/instance", headers={
+
'Authorization': f'Bearer {self.token}'
+
})
+
if responce.status_code != 200:
+
LOGGER.error("Failed to get instance info!")
+
responce.raise_for_status()
+
return
+
+
instance_info: dict = responce.json()
+
configuration: dict = instance_info['configuration']
+
+
statuses_config: dict = configuration.get('statuses', {})
+
self.max_characters: int = statuses_config.get('max_characters', 500)
+
self.max_media_attachments: int = statuses_config.get('max_media_attachments', 4)
+
self.characters_reserved_per_url: int = statuses_config.get('characters_reserved_per_url', 23)
+
+
media_config: dict = configuration.get('media_attachments', {})
+
self.image_size_limit: int = media_config.get('image_size_limit', 16777216)
+
self.video_size_limit: int = media_config.get('video_size_limit', 103809024)
+
self.supported_mime_types: list[str] = media_config.get('supported_mime_types', POSSIBLE_MIMES)
+
+
# *oma: max post chars
+
max_toot_chars = instance_info.get('max_toot_chars')
+
if max_toot_chars:
+
self.max_characters: int = max_toot_chars
+
+
# *oma: max upload limit
+
upload_limit = instance_info.get('upload_limit')
+
if upload_limit:
+
self.image_size_limit: int = upload_limit
+
self.video_size_limit: int = upload_limit
+
+
# *oma ext: supported text types
+
self.text_format = 'text/plain'
+
pleroma = instance_info.get('pleroma')
+
if pleroma:
+
post_formats: list[str] = pleroma.get('metadata', {}).get('post_formats', [])
+
if 'text/x.misskeymarkdown' in post_formats:
+
self.text_format = 'text/x.misskeymarkdown'
+
elif 'text/markdown' in post_formats:
+
self.text_format = 'text/markdown'
+
+
def upload_media(self, attachments: list[MediaInfo]) -> list[str] | None:
+
for a in attachments:
+
if a.mime.startswith('image/') and len(a.io) > self.image_size_limit:
+
return None
+
+
if a.mime.startswith('video/') and len(a.io) > self.video_size_limit:
+
return None
+
+
if not a.mime.startswith('image/') and not a.mime.startswith('video/'):
+
if len(a.io) > 7_000_000:
+
return None
+
+
uploads: list[dict] = []
+
for a in attachments:
+
data = {}
+
if a.alt:
+
data['description'] = a.alt
+
+
req = requests.post(f"{self.service}/api/v2/media", headers= {
+
'Authorization': f'Bearer {self.token}'
+
}, files={'file': (a.name, a.io, a.mime)}, data=data)
+
+
if req.status_code == 200:
+
LOGGER.info("Uploaded %s! (%s)", a.name, req.json()['id'])
+
uploads.append({
+
'done': True,
+
'id': req.json()['id']
+
})
+
elif req.status_code == 202:
+
LOGGER.info("Waiting for %s to process!", a.name)
+
uploads.append({
+
'done': False,
+
'id': req.json()['id']
+
})
+
else:
+
LOGGER.error("Failed to upload %s! %s", a.name, req.text)
+
req.raise_for_status()
+
+
while any([not val['done'] for val in uploads]):
+
LOGGER.info("Waiting for media to process...")
+
time.sleep(3)
+
for media in uploads:
+
if media['done']:
+
continue
+
+
reqs = requests.get(f'{self.service}/api/v1/media/{media['id']}', headers={
+
'Authorization': f'Bearer {self.token}'
+
})
+
+
if reqs.status_code == 206:
+
continue
+
+
if reqs.status_code == 200:
+
media['done'] = True
+
continue
+
reqs.raise_for_status()
+
+
return [val['id'] for val in uploads]
+
+
def token_to_string(self, tokens: list[cross.Token]) -> str | None:
+
p_text: str = ''
+
+
for token in tokens:
+
if isinstance(token, cross.TextToken):
+
p_text += token.text
+
elif isinstance(token, cross.TagToken):
+
p_text += '#' + token.tag
+
elif isinstance(token, cross.LinkToken):
+
if canonical_label(token.label, token.href):
+
p_text += token.href
+
else:
+
if self.text_format == 'text/plain':
+
p_text += f'{token.label}: {token.href}'
+
elif self.text_format in {'text/x.misskeymarkdown', 'text/markdown'}:
+
p_text += f'[{token.label}]({token.href})'
+
else:
+
return None
+
+
return p_text
+
+
def split_tokens_media(self, tokens: list[cross.Token], media: list[MediaInfo]):
+
split_tokens = cross.split_tokens(tokens, self.max_characters, self.characters_reserved_per_url)
+
post_text: list[str] = []
+
+
for block in split_tokens:
+
baked_text = self.token_to_string(block)
+
+
if baked_text is None:
+
return None
+
post_text.append(baked_text)
+
+
if not post_text:
+
post_text = ['']
+
+
posts: list[dict] = [{"text": post_text, "attachments": []} for post_text in post_text]
+
available_indices: list[int] = list(range(len(posts)))
+
+
current_image_post_idx: int | None = None
+
+
def make_blank_post() -> dict:
+
return {
+
"text": '',
+
"attachments": []
+
}
+
+
def pop_next_empty_index() -> int:
+
if available_indices:
+
return available_indices.pop(0)
+
else:
+
new_idx = len(posts)
+
posts.append(make_blank_post())
+
return new_idx
+
+
for att in media:
+
if (
+
current_image_post_idx is not None
+
and len(posts[current_image_post_idx]["attachments"]) < self.max_media_attachments
+
):
+
posts[current_image_post_idx]["attachments"].append(att)
+
else:
+
idx = pop_next_empty_index()
+
posts[idx]["attachments"].append(att)
+
current_image_post_idx = idx
+
+
result: list[tuple[str, list[MediaInfo]]] = []
+
+
for p in posts:
+
result.append((p['text'], p["attachments"]))
+
+
return result
+
+
def accept_post(self, post: cross.Post):
+
parent_id = post.get_parent_id()
+
+
new_root_id: int | None = None
+
new_parent_id: int | None = None
+
+
reply_ref: str | None = None
+
if parent_id:
+
thread_tuple = database.find_mapped_thread(
+
self.db,
+
parent_id,
+
self.input.user_id,
+
self.input.service,
+
self.user_id,
+
self.service
+
)
+
+
if not thread_tuple:
+
LOGGER.error("Failed to find thread tuple in the database!")
+
return None
+
+
_, reply_ref, new_root_id, new_parent_id = thread_tuple
+
+
lang: str
+
if post.get_languages():
+
lang = post.get_languages()[0]
+
else:
+
lang = 'en'
+
+
raw_statuses = self.split_tokens_media(post.get_tokens(), post.get_attachments())
+
if not raw_statuses:
+
LOGGER.error("Failed to split post into statuses?")
+
return None
+
baked_statuses = []
+
+
for status, raw_media in raw_statuses:
+
media: list[str] | None = None
+
if raw_media:
+
media = self.upload_media(raw_media)
+
if not media:
+
LOGGER.error("Failed to upload attachments!")
+
return None
+
baked_statuses.append((status, media))
+
continue
+
baked_statuses.append((status,[]))
+
+
created_statuses: list[str] = []
+
+
for status, media in baked_statuses:
+
payload = {
+
'status': status,
+
'media_ids': media or [],
+
'spoiler_text': post.get_cw(),
+
'visibility': self.options.get('visibility', 'public'),
+
'content_type': self.text_format,
+
'language': lang
+
}
+
+
if media:
+
payload['sensitive'] = post.is_sensitive()
+
+
if post.get_cw():
+
payload['sensitive'] = True
+
+
if not status:
+
payload['status'] = '🖼️'
+
+
if reply_ref:
+
payload['in_reply_to_id'] = reply_ref
+
+
reqs = requests.post(f'{self.service}/api/v1/statuses', headers={
+
'Authorization': f'Bearer {self.token}',
+
'Content-Type': 'application/json'
+
}, json=payload)
+
+
if reqs.status_code != 200:
+
LOGGER.info("Failed to post status! %s - %s", reqs.status_code, reqs.text)
+
reqs.raise_for_status()
+
+
reply_ref = reqs.json()['id']
+
LOGGER.info("Created new status %s!", reply_ref)
+
+
created_statuses.append(reqs.json()['id'])
+
+
db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service)
+
assert db_post, "ghghghhhhh"
+
+
if new_root_id is None or new_parent_id is None:
+
new_root_id = database.insert_post(
+
self.db,
+
created_statuses[0],
+
self.user_id,
+
self.service
+
)
+
new_parent_id = new_root_id
+
database.insert_mapping(self.db, db_post['id'], new_parent_id)
+
created_statuses = created_statuses[1:]
+
+
for db_id in created_statuses:
+
new_parent_id = database.insert_reply(
+
self.db,
+
db_id,
+
self.user_id,
+
self.service,
+
new_parent_id,
+
new_root_id
+
)
+
database.insert_mapping(self.db, db_post['id'], new_parent_id)
+
+
def delete_post(self, identifier: str):
+
post = database.find_post(self.db, identifier, self.input.user_id, self.input.service)
+
if not post:
+
return
+
+
mappings = database.find_mappings(self.db, post['id'], self.service, self.user_id)
+
for mapping in mappings[::-1]:
+
LOGGER.info("Deleting '%s'...", mapping[0])
+
requests.delete(f'{self.service}/api/v1/statuses/{mapping[0]}', headers={
+
'Authorization': f'Bearer {self.token}'
+
})
+
database.delete_post(self.db, mapping[0], self.service, self.user_id)
+
+1 -1
media_util.py util/media.py
···
import subprocess
import json
import re, urllib.parse, os
-
from util import LOGGER
+
from util.util import LOGGER
import magic
FILENAME = re.compile(r'filename="?([^\";]*)"?')
+10 -39
misskey.py misskey/input.py
···
-
import cross, media_util, util, database
-
from util import LOGGER
import requests, websockets
-
from typing import Callable, Any
import asyncio
import json, uuid
import re
+
from misskey.common import MisskeyPost
-
class MisskeyPost(cross.Post):
-
def __init__(self, note: dict, tokens: list[cross.Token], files: list[media_util.MediaInfo]) -> None:
-
super().__init__()
-
self.note = note
-
self.sensitive = any([a.get('isSensitive', False) for a in note.get('files', [])])
-
self.media_attachments = files
-
self.tokens = tokens
-
-
def get_tokens(self) -> list[cross.Token]:
-
return self.tokens
-
-
def get_parent_id(self) -> str | None:
-
return self.note.get('replyId')
-
-
def get_post_date_iso(self) -> str:
-
date = self.note.get('createdAt')
-
return date or super().get_post_date_iso()
-
-
def get_attachments(self) -> list[media_util.MediaInfo]:
-
return self.media_attachments
-
-
def get_id(self) -> str:
-
return self.note['id']
-
-
def get_cw(self) -> str:
-
return self.note.get('cw') or ''
-
-
def get_languages(self) -> list[str]:
-
return []
-
-
def is_sensitive(self) -> bool:
-
return self.sensitive
+
import cross, util.database as database
+
from util.media import MediaInfo, download_media
+
from util.util import LOGGER, as_envvar
+
+
from typing import Callable, Any
ALLOWED_VISIBILITY = ['public', 'home']
···
class MisskeyInput(cross.Input):
def __init__(self, settings: dict, db: cross.DataBaseWorker) -> None:
self.options = MisskeyInputOptions(settings.get('options', {}))
-
self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
-
instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
+
self.token = as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
+
instance: str = as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
service = instance[:-1] if instance.endswith('/') else instance
···
LOGGER.info("Crossposting '%s'...", note['id'])
-
media_attachments: list[media_util.MediaInfo] = []
+
media_attachments: list[MediaInfo] = []
for attachment in note.get('files', []):
LOGGER.info("Downloading %s...", attachment['url'])
-
info = media_util.download_media(attachment['url'], attachment.get('comment') or '')
+
info = download_media(attachment['url'], attachment.get('comment') or '')
if not info:
LOGGER.error("Skipping '%s'. Failed to download media!", note['id'])
return
+35
misskey/common.py
···
+
import cross
+
from util.media import MediaInfo
+
+
class MisskeyPost(cross.Post):
+
def __init__(self, note: dict, tokens: list[cross.Token], files: list[MediaInfo]) -> None:
+
super().__init__()
+
self.note = note
+
self.sensitive = any([a.get('isSensitive', False) for a in note.get('files', [])])
+
self.media_attachments = files
+
self.tokens = tokens
+
+
def get_tokens(self) -> list[cross.Token]:
+
return self.tokens
+
+
def get_parent_id(self) -> str | None:
+
return self.note.get('replyId')
+
+
def get_post_date_iso(self) -> str:
+
date = self.note.get('createdAt')
+
return date or super().get_post_date_iso()
+
+
def get_attachments(self) -> list[MediaInfo]:
+
return self.media_attachments
+
+
def get_id(self) -> str:
+
return self.note['id']
+
+
def get_cw(self) -> str:
+
return self.note.get('cw') or ''
+
+
def get_languages(self) -> list[str]:
+
return []
+
+
def is_sensitive(self) -> bool:
+
return self.sensitive
util.py util/util.py