social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

refactors and fixes

zenfyr.dev c8f69581 0b35ccff

verified
+152
atproto2.py
···
+
from typing import Any
+
from atproto import client_utils, Client, AtUri
+
from atproto_client import models
+
+
class Client2(Client):
+
def __init__(self, base_url: str | None = None, *args: Any, **kwargs: Any) -> None:
+
super().__init__(base_url, *args, **kwargs)
+
+
def send_video(
+
self,
+
text: str | client_utils.TextBuilder,
+
video: bytes,
+
video_alt: str | None = None,
+
video_aspect_ratio: models.AppBskyEmbedDefs.AspectRatio | None = None,
+
reply_to: models.AppBskyFeedPost.ReplyRef | None = None,
+
langs: list[str] | None = None,
+
facets: list[models.AppBskyRichtextFacet.Main] | None = None,
+
labels: models.ComAtprotoLabelDefs.SelfLabels | None = None
+
) -> models.AppBskyFeedPost.CreateRecordResponse:
+
"""same as send_video, but with labels"""
+
+
if video_alt is None:
+
video_alt = ''
+
+
upload = self.upload_blob(video)
+
+
return self.send_post(
+
text,
+
reply_to=reply_to,
+
embed=models.AppBskyEmbedVideo.Main(video=upload.blob, alt=video_alt, aspect_ratio=video_aspect_ratio),
+
langs=langs,
+
facets=facets,
+
labels=labels
+
)
+
+
def send_images(
+
self,
+
text: str | client_utils.TextBuilder,
+
images: list[bytes],
+
image_alts: list[str] | None = None,
+
image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] | None = None,
+
reply_to: models.AppBskyFeedPost.ReplyRef | None = None,
+
langs: list[str] | None = None,
+
facets: list[models.AppBskyRichtextFacet.Main] | None = None,
+
labels: models.ComAtprotoLabelDefs.SelfLabels | None = None
+
) -> models.AppBskyFeedPost.CreateRecordResponse:
+
"""same as send_images, but with labels"""
+
+
if image_alts is None:
+
image_alts = [''] * len(images)
+
else:
+
diff = len(images) - len(image_alts)
+
image_alts = image_alts + [''] * diff
+
+
if image_aspect_ratios is None:
+
aligned_image_aspect_ratios = [None] * len(images)
+
else:
+
diff = len(images) - len(image_aspect_ratios)
+
aligned_image_aspect_ratios = image_aspect_ratios + [None] * diff
+
+
uploads = [self.upload_blob(image) for image in images]
+
+
embed_images = [
+
models.AppBskyEmbedImages.Image(alt=alt, image=upload.blob, aspect_ratio=aspect_ratio)
+
for alt, upload, aspect_ratio in zip(image_alts, uploads, aligned_image_aspect_ratios)
+
]
+
+
return self.send_post(
+
text,
+
reply_to=reply_to,
+
embed=models.AppBskyEmbedImages.Main(images=embed_images),
+
langs=langs,
+
facets=facets,
+
labels=labels
+
)
+
+
def send_post(
+
self,
+
text: str | client_utils.TextBuilder,
+
reply_to: models.AppBskyFeedPost.ReplyRef | None = None,
+
embed:
+
None |
+
models.AppBskyEmbedImages.Main |
+
models.AppBskyEmbedExternal.Main |
+
models.AppBskyEmbedRecord.Main |
+
models.AppBskyEmbedRecordWithMedia.Main |
+
models.AppBskyEmbedVideo.Main = None,
+
langs: list[str] | None = None,
+
facets: list[models.AppBskyRichtextFacet.Main] | None = None,
+
labels: models.ComAtprotoLabelDefs.SelfLabels | None = None
+
) -> models.AppBskyFeedPost.CreateRecordResponse:
+
"""same as send_post, but with labels"""
+
+
if isinstance(text, client_utils.TextBuilder):
+
facets = text.build_facets()
+
text = text.build_text()
+
+
repo = self.me and self.me.did
+
if not repo:
+
raise Exception("Client not logged in!")
+
+
if not langs:
+
langs = ['en']
+
+
record = models.AppBskyFeedPost.Record(
+
created_at=self.get_current_time_iso(),
+
text=text,
+
reply=reply_to,
+
embed=embed,
+
langs=langs,
+
facets=facets,
+
labels=labels
+
)
+
return self.app.bsky.feed.post.create(repo, record)
+
+
def create_gates(self, options: dict, post_uri: str):
+
account = self.me
+
if not account:
+
raise Exception("Client not logged in!")
+
+
rkey = AtUri.from_str(post_uri).rkey
+
time = self.get_current_time_iso()
+
+
thread_gate_opts = options.get('thread_gate', [])
+
if 'everybody' not in thread_gate_opts:
+
allow = []
+
if thread_gate_opts:
+
if 'following' in thread_gate_opts:
+
allow.append(models.AppBskyFeedThreadgate.FollowingRule())
+
if 'followers' in thread_gate_opts:
+
allow.append(models.AppBskyFeedThreadgate.FollowerRule())
+
if 'mentioned' in thread_gate_opts:
+
allow.append(models.AppBskyFeedThreadgate.MentionRule())
+
+
thread_gate = models.AppBskyFeedThreadgate.Record(
+
post=post_uri,
+
created_at=time,
+
allow=allow
+
)
+
+
self.app.bsky.feed.threadgate.create(account.did, thread_gate, rkey)
+
+
if options.get('quote_gate', False):
+
post_gate = models.AppBskyFeedPostgate.Record(
+
post=post_uri,
+
created_at=time,
+
embedding_rules=[
+
models.AppBskyFeedPostgate.DisableRule()
+
]
+
)
+
+
self.app.bsky.feed.postgate.create(account.did, post_gate, rkey)
+22 -166
bluesky.py
···
-
from atproto import client_utils, Client, AtUri, IdResolver
+
from atproto import client_utils, IdResolver
from atproto_client import models
+
from atproto2 import Client2
import json
import cross
import database
···
import util
import media_util
from util import LOGGER
+
import re
# only for lexicon reference
SERVICE = 'https://bsky.app'
-
ADULT_LABEL = ["sexual content", "nsfw"]
-
PORN_LABEL = ["porn", "yiff"]
+
# TODO this is terrible and stupid
+
ADULT_PATTERN = re.compile(r"\b(sexual content|nsfw|erotic|adult only|18\+)\b", re.IGNORECASE)
+
PORN_PATTERN = re.compile(r"\b(porn|yiff|hentai|pornographic|fetish)\b", re.IGNORECASE)
class BlueskyOutput(cross.Output):
def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
super().__init__(input, settings, db)
-
self.options = util.safe_get(settings, 'options', {})
+
self.options = settings.get('options') or {}
-
if not util.get_or_envvar(settings, 'app-password'):
+
if not util.as_envvar(settings.get('app-password')):
raise Exception("Account app password not provided!")
resolver = IdResolver()
-
did: str | None = util.get_or_envvar(settings, 'did')
+
did: str | None = util.as_envvar(settings.get('did'))
if not did:
-
if not util.get_or_envvar(settings, 'handle'):
+
handle = util.as_envvar(settings.get('handle'))
+
if not handle:
raise Exception("ATP handle not specified!")
-
LOGGER.info("Resolving ATP identity for %s...", util.get_or_envvar(settings, 'handle'))
-
did = resolver.handle.resolve(util.get_or_envvar(settings, 'handle'))
+
LOGGER.info("Resolving ATP identity for %s...", handle)
+
did = resolver.handle.resolve(handle)
if not did:
raise Exception("Failed to resolve DID!")
-
pds: str | None = util.get_or_envvar(settings, 'pds')
+
pds: str | None = util.as_envvar(settings.get('pds'))
if not pds:
LOGGER.info("Resolving PDS from DID document...")
did_doc = resolver.did.resolve(did)
···
if not pds:
raise Exception("Failed to resolve PDS!")
-
self.client = Client(pds)
-
self.client.login(did, util.get_or_envvar(settings, 'app-password'))
-
self.bsky = Bluesky(self.client)
+
self.bsky = Client2(pds)
+
self.bsky.login(did, util.as_envvar(settings.get('app-password')))
def _find_parent(self, parent_id: str):
-
login = self.client.me
+
login = self.bsky.me
if not login:
raise Exception("Client not logged in!")
···
return result
def accept_post(self, post: cross.Post):
-
login = self.client.me
+
login = self.bsky.me
if not login:
raise Exception("Client not logged in!")
···
tokens.insert(0, cross.TextToken("CW: " + cw + "\n\n"))
unique_labels.add('graphic-media')
-
if any(tag in cw for tag in ADULT_LABEL):
+
if ADULT_PATTERN.search(cw):
unique_labels.add('sexual')
-
if any(tag in cw for tag in PORN_LABEL):
+
if PORN_PATTERN.search(cw):
unique_labels.add('porn')
if post.is_sensitive():
···
database.insert_mapping(self.db, db_post['id'], new_parent_id)
def delete_post(self, identifier: str):
-
login = self.client.me
+
login = self.bsky.me
if not login:
raise Exception("Client not logged in!")
···
mappings = database.find_mappings(self.db, post['id'], SERVICE, login.did)
for mapping in mappings[::-1]:
-
self.client.delete_post(json.loads(mapping[0])['uri'])
+
self.bsky.delete_post(json.loads(mapping[0])['uri'])
database.delete_post(self.db, mapping[0], SERVICE, login.did)
-
-
-
class Bluesky():
-
def __init__(self, client: Client) -> None:
-
self.client = client
-
-
def send_video(
-
self,
-
text: str | client_utils.TextBuilder,
-
video: bytes,
-
video_alt: str | None = None,
-
video_aspect_ratio: models.AppBskyEmbedDefs.AspectRatio | None = None,
-
reply_to: models.AppBskyFeedPost.ReplyRef | None = None,
-
langs: list[str] | None = None,
-
facets: list[models.AppBskyRichtextFacet.Main] | None = None,
-
labels: models.ComAtprotoLabelDefs.SelfLabels | None = None
-
) -> models.AppBskyFeedPost.CreateRecordResponse:
-
-
if video_alt is None:
-
video_alt = ''
-
-
upload = self.client.upload_blob(video)
-
-
return self.send_post(
-
text,
-
reply_to=reply_to,
-
embed=models.AppBskyEmbedVideo.Main(video=upload.blob, alt=video_alt, aspect_ratio=video_aspect_ratio),
-
langs=langs,
-
facets=facets,
-
labels=labels
-
)
-
-
def send_images(
-
self,
-
text: str | client_utils.TextBuilder,
-
images: list[bytes],
-
image_alts: list[str] | None = None,
-
image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] | None = None,
-
reply_to: models.AppBskyFeedPost.ReplyRef | None = None,
-
langs: list[str] | None = None,
-
facets: list[models.AppBskyRichtextFacet.Main] | None = None,
-
labels: models.ComAtprotoLabelDefs.SelfLabels | None = None
-
) -> models.AppBskyFeedPost.CreateRecordResponse:
-
-
if image_alts is None:
-
image_alts = [''] * len(images)
-
else:
-
diff = len(images) - len(image_alts)
-
image_alts = image_alts + [''] * diff
-
-
if image_aspect_ratios is None:
-
aligned_image_aspect_ratios = [None] * len(images)
-
else:
-
diff = len(images) - len(image_aspect_ratios)
-
aligned_image_aspect_ratios = image_aspect_ratios + [None] * diff
-
-
uploads = [self.client.upload_blob(image) for image in images]
-
-
embed_images = [
-
models.AppBskyEmbedImages.Image(alt=alt, image=upload.blob, aspect_ratio=aspect_ratio)
-
for alt, upload, aspect_ratio in zip(image_alts, uploads, aligned_image_aspect_ratios)
-
]
-
-
return self.send_post(
-
text,
-
reply_to=reply_to,
-
embed=models.AppBskyEmbedImages.Main(images=embed_images),
-
langs=langs,
-
facets=facets,
-
labels=labels
-
)
-
-
def send_post(
-
self,
-
text: str | client_utils.TextBuilder,
-
reply_to: models.AppBskyFeedPost.ReplyRef | None = None,
-
embed:
-
None |
-
models.AppBskyEmbedImages.Main |
-
models.AppBskyEmbedExternal.Main |
-
models.AppBskyEmbedRecord.Main |
-
models.AppBskyEmbedRecordWithMedia.Main |
-
models.AppBskyEmbedVideo.Main = None,
-
langs: list[str] | None = None,
-
facets: list[models.AppBskyRichtextFacet.Main] | None = None,
-
labels: models.ComAtprotoLabelDefs.SelfLabels | None = None
-
) -> models.AppBskyFeedPost.CreateRecordResponse:
-
-
if isinstance(text, client_utils.TextBuilder):
-
facets = text.build_facets()
-
text = text.build_text()
-
-
repo = self.client.me and self.client.me.did
-
if not repo:
-
raise Exception("Client not logged in!")
-
-
if not langs:
-
langs = ['en']
-
-
record = models.AppBskyFeedPost.Record(
-
created_at=self.client.get_current_time_iso(),
-
text=text,
-
reply=reply_to,
-
embed=embed,
-
langs=langs,
-
facets=facets,
-
labels=labels
-
)
-
return self.client.app.bsky.feed.post.create(repo, record)
-
-
def create_gates(self, options: dict, post_uri: str):
-
account = self.client.me
-
if not account:
-
raise Exception("Client not logged in!")
-
-
rkey = AtUri.from_str(post_uri).rkey
-
time = self.client.get_current_time_iso()
-
-
thread_gate_opts = options.get('thread_gate', [])
-
if 'everybody' not in thread_gate_opts:
-
allow = []
-
if thread_gate_opts:
-
if 'following' in thread_gate_opts:
-
allow.append(models.AppBskyFeedThreadgate.FollowingRule())
-
if 'followers' in thread_gate_opts:
-
allow.append(models.AppBskyFeedThreadgate.FollowerRule())
-
if 'mentioned' in thread_gate_opts:
-
allow.append(models.AppBskyFeedThreadgate.MentionRule())
-
-
thread_gate = models.AppBskyFeedThreadgate.Record(
-
post=post_uri,
-
created_at=time,
-
allow=allow
-
)
-
-
self.client.app.bsky.feed.threadgate.create(account.did, thread_gate, rkey)
-
-
if options.get('quote_gate', False):
-
post_gate = models.AppBskyFeedPostgate.Record(
-
post=post_uri,
-
created_at=time,
-
embedding_rules=[
-
models.AppBskyFeedPostgate.DisableRule()
-
]
-
)
-
-
self.client.app.bsky.feed.postgate.create(account.did, post_gate, rkey)
def tokens_to_richtext(tokens: list[cross.Token]) -> client_utils.TextBuilder | None:
+6 -6
mastodon.py
···
return self.status.get('in_reply_to_id')
def get_cw(self) -> str:
-
return util.safe_get(self.status, 'spoiler_text', '')
+
return self.status.get('spoiler_text') or ''
def get_id(self) -> str:
return self.status['id']
···
# get media description
def get_alt(self) -> str:
-
return util.safe_get(self.attachment, 'description', '')
+
return self.attachment.get('description') or ''
class MastodonInput(cross.Input):
def __init__(self, settings: dict, db: DataBaseWorker) -> None:
self.options = settings.get('options', {})
-
self.token = util.get_or_envvar(settings, 'token')
-
instance: str = util.get_or_envvar(settings, 'instance')
+
self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
+
instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
service = instance[:-1] if instance.endswith('/') else instance
···
response = requests.get(f"{self.service}/api/v1/instance")
response.raise_for_status()
data: dict = response.json()
-
return util.safe_get(data, "urls", {}).get("streaming_api")
+
return (data.get('urls') or {}).get('streaming_api')
def _on_create_post(self, outputs: list[cross.Output], status: dict):
# skip events from other users
-
if util.safe_get(status, 'account', {})['id'] != self.user_id:
+
if (status.get('account') or {})['id'] != self.user_id:
return
if status.get('reblog') or status.get('poll'):
+7 -8
misskey.py
···
if not text:
return []
-
mention_handles: dict = util.safe_get(note, 'mentionHandles', {})
+
mention_handles: dict = note.get('mentionHandles') or {}
+
tags: list[str] = note.get('tags') or []
+
handles: list[str] = []
-
for key, value in mention_handles.items():
handles.append(value)
-
-
tags: list[str] = util.safe_get(note, 'tags', [])
index: int = 0
total: int = len(text)
···
return self.note['id']
def get_cw(self) -> str:
-
return util.safe_get(self.note, 'cw', '')
+
return self.note.get('cw') or ''
def get_languages(self) -> list[str]:
return []
···
return get_image_common(self.attachment['type'])
def get_alt(self) -> str:
-
return util.safe_get(self.attachment, 'comment', '')
+
return self.attachment.get('comment') or ''
class MisskeyInput(cross.Input):
def __init__(self, settings: dict, db: cross.DataBaseWorker) -> None:
self.options = settings.get('options', {})
-
self.token = util.get_or_envvar(settings, 'token')
-
instance: str = util.get_or_envvar(settings, 'instance')
+
self.token = util.as_envvar(settings.get('token')) or (_ for _ in ()).throw(ValueError("'token' is required"))
+
instance: str = util.as_envvar(settings.get('instance')) or (_ for _ in ()).throw(ValueError("'instance' is required"))
service = instance[:-1] if instance.endswith('/') else instance
+6 -5
util.py
···
val = obj.get(key, default)
return val if val else default
-
def value_or_envvar(text: str) -> str:
+
def as_envvar(text: str | None) -> str | None:
+
if not text:
+
return None
+
if text.startswith('env:'):
return os.environ.get(text[4:], '')
-
return text
-
-
def get_or_envvar(obj: dict, key: str):
-
return value_or_envvar(obj.get(key, ''))
+
+
return text