social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

cleanup: post classses and types

zenfyr.dev 6bfd85e1 92636ae6

verified
Changed files
+140 -128
bluesky
mastodon
misskey
+52 -53
bluesky/common.py
···
ADULT_PATTERN = re.compile(r"\b(sexual content|nsfw|erotic|adult only|18\+)\b", re.IGNORECASE)
PORN_PATTERN = re.compile(r"\b(porn|yiff|hentai|pornographic|fetish)\b", re.IGNORECASE)
+
class BlueskyPost(cross.Post):
+
def __init__(self, record: dict, tokens: list[cross.Token], attachments: list[MediaInfo]) -> None:
+
super().__init__()
+
self.uri = record['$xpost.strongRef']['uri']
+
self.parent_uri = None
+
if record.get('reply'):
+
self.parent_uri = record['reply']['parent']['uri']
+
+
self.tokens = tokens
+
self.timestamp = record['createdAt']
+
labels = record.get('labels', {}).get('values')
+
if labels:
+
self.spoiler = ', '.join([str(label['val']).replace('-', ' ') for label in labels])
+
+
self.attachments = attachments
+
self.languages = record.get('langs', [])
+
+
# at:// of the post record
+
def get_id(self) -> str:
+
return self.uri
+
+
def get_parent_id(self) -> str | None:
+
return self.parent_uri
+
+
def get_tokens(self) -> list[cross.Token]:
+
return self.tokens
+
+
def get_text_type(self) -> str:
+
return "text/plain"
+
+
def get_timestamp(self) -> str:
+
return self.timestamp
+
+
def get_attachments(self) -> list[MediaInfo]:
+
return self.attachments
+
+
def get_spoiler(self) -> str | None:
+
return self.spoiler
+
+
def get_languages(self) -> list[str]:
+
return self.languages
+
+
def is_sensitive(self) -> bool:
+
return self.spoiler is not None
+
+
def get_post_url(self) -> str | None:
+
parts = str(self.uri[len("at://"):]).split("/")
+
did, _, post_id = parts
+
+
return f"https://bsky.app/profile/{did}/post/{post_id}"
+
def tokenize_post(post: dict) -> list[cross.Token]:
text: str = post.get('text', '')
if not text:
···
if prev < len(text):
tokens.append(cross.TextToken(text[prev:]))
-
return tokens
-
-
class BlueskyPost(cross.Post):
-
def __init__(self, post: dict, tokens: list[cross.Token], attachments: list[MediaInfo]) -> None:
-
super().__init__()
-
self.post = post
-
self.tokens = tokens
-
-
self.id = post['$xpost.strongRef']['uri']
-
-
self.parent_id = None
-
if self.post.get('reply'):
-
self.parent_id = self.post['reply']['parent']['uri']
-
-
labels = self.post.get('labels', {}).get('values')
-
self.cw = ''
-
if labels:
-
self.cw = ', '.join([str(label['val']).replace('-', ' ') for label in labels])
-
self.attachments = attachments
-
-
def get_tokens(self) -> list[cross.Token]:
-
return self.tokens
-
-
def get_parent_id(self) -> str | None:
-
return self.parent_id
-
-
def get_post_date_iso(self) -> str:
-
return self.post.get('createdAt') or super().get_post_date_iso()
-
-
def get_cw(self) -> str:
-
return self.cw or ''
-
-
def get_id(self) -> str:
-
return self.id
-
-
def get_languages(self) -> list[str]:
-
return self.post.get('langs', []) or []
-
-
def is_sensitive(self) -> bool:
-
return self.post.get('labels', {}).get('values') or False
-
-
def get_attachments(self) -> list[MediaInfo]:
-
return self.attachments
-
-
def get_text_type(self) -> str:
-
return "text/plain"
-
-
def get_post_url(self) -> str | None:
-
parts = str(self.id[len("at://"):]).split("/")
-
did, _, post_id = parts
-
-
return f"https://bsky.app/profile/{did}/post/{post_id}"
-
+
return tokens
def tokens_to_richtext(tokens: list[cross.Token]) -> client_utils.TextBuilder | None:
builder = client_utils.TextBuilder()
+14 -14
bluesky/output.py
···
tokens = post.get_tokens().copy()
unique_labels: set[str] = set()
-
cw = post.get_cw()
+
cw = post.get_spoiler()
if cw:
tokens.insert(0, cross.TextToken("CW: " + cw + "\n\n"))
unique_labels.add('graphic-media')
-
-
# from bsky.app, a post can only have one of those labels
-
if PORN_PATTERN.search(cw):
-
unique_labels.add('porn')
-
elif ADULT_PATTERN.search(cw):
-
unique_labels.add('sexual')
+
+
# from bsky.app, a post can only have one of those labels
+
if PORN_PATTERN.search(cw):
+
unique_labels.add('porn')
+
elif ADULT_PATTERN.search(cw):
+
unique_labels.add('sexual')
if post.is_sensitive():
unique_labels.add('graphic-media')
···
new_post = self.bsky.send_post(text, reply_to=models.AppBskyFeedPost.ReplyRef(
parent=reply_ref,
root=root_ref
-
), labels=labels, time_iso=post.get_post_date_iso())
+
), labels=labels, time_iso=post.get_timestamp())
else:
-
new_post = self.bsky.send_post(text, labels=labels, time_iso=post.get_post_date_iso())
+
new_post = self.bsky.send_post(text, labels=labels, time_iso=post.get_timestamp())
root_ref = models.create_strong_ref(new_post)
self.bsky.create_gates(
self.options.thread_gate,
self.options.quote_gate,
new_post.uri,
-
time_iso=post.get_post_date_iso()
+
time_iso=post.get_timestamp()
)
reply_ref = models.create_strong_ref(new_post)
created_records.append(new_post)
···
root=root_ref
) if root_ref and reply_ref else None,
labels=labels,
-
time_iso=post.get_post_date_iso()
+
time_iso=post.get_timestamp()
)
if not root_ref:
root_ref = models.create_strong_ref(new_post)
···
self.options.thread_gate,
self.options.quote_gate,
new_post.uri,
-
time_iso=post.get_post_date_iso()
+
time_iso=post.get_timestamp()
)
reply_ref = models.create_strong_ref(new_post)
created_records.append(new_post)
···
root=root_ref
) if root_ref and reply_ref else None,
labels=labels,
-
time_iso=post.get_post_date_iso()
+
time_iso=post.get_timestamp()
)
if not root_ref:
root_ref = models.create_strong_ref(new_post)
···
self.options.thread_gate,
self.options.quote_gate,
new_post.uri,
-
time_iso=post.get_post_date_iso()
+
time_iso=post.get_timestamp()
)
reply_ref = models.create_strong_ref(new_post)
created_records.append(new_post)
+22 -19
cross.py
···
+
from abc import ABC, abstractmethod
from typing import Callable, Any
from util.database import DataBaseWorker
from datetime import datetime, timezone
···
def get_duration(self) -> float:
return self.duration
-
class Post():
-
def __init__(self) -> None:
-
self.now_timestamp = datetime.now(timezone.utc).isoformat()
+
class Post(ABC):
+
@abstractmethod
+
def get_id(self) -> str:
+
return ''
+
+
@abstractmethod
+
def get_parent_id(self) -> str | None:
pass
+
@abstractmethod
def get_tokens(self) -> list[Token]:
-
return []
-
-
def get_parent_id(self) -> str | None:
-
return None
+
pass
+
+
# returns input text type.
+
# text/plain, text/markdown, text/x.misskeymarkdown
+
@abstractmethod
+
def get_text_type(self) -> str:
+
pass
-
def get_post_date_iso(self) -> str:
-
return self.now_timestamp
+
# post iso timestamp
+
@abstractmethod
+
def get_timestamp(self) -> str:
+
pass
def get_attachments(self) -> list[MediaInfo]:
return []
-
def get_id(self) -> str:
-
return ''
-
-
def get_cw(self) -> str:
-
return ''
+
def get_spoiler(self) -> str | None:
+
return None
def get_languages(self) -> list[str]:
return []
def is_sensitive(self) -> bool:
return False
-
-
# returns input text type.
-
# text/plain, text/markdown, text/x.misskeymarkdown
-
def get_text_type(self) -> str:
-
return 'text/plain'
def get_post_url(self) -> str | None:
return None
+25 -19
mastodon/common.py
···
class MastodonPost(cross.Post):
def __init__(self, status: dict, tokens: list[cross.Token], media_attachments: list[MediaInfo]) -> None:
super().__init__()
-
self.status = status
-
self.media_attachments = media_attachments
+
self.id = status['id']
+
self.parent_id = status.get('in_reply_to_id')
self.tokens = tokens
self.content_type = status.get('content_type', 'text/plain')
+
self.timestamp = status['created_at']
+
self.media_attachments = media_attachments
+
self.spoiler = status.get('spoiler_text')
+
self.language = [status['language']] if status.get('language') else []
+
self.sensitive = status.get('sensitive', False)
+
self.url = status.get('url')
+
+
def get_id(self) -> str:
+
return self.id
+
+
def get_parent_id(self) -> str | None:
+
return self.parent_id
def get_tokens(self) -> list[cross.Token]:
return self.tokens
-
def get_parent_id(self) -> str | None:
-
return self.status.get('in_reply_to_id')
+
def get_text_type(self) -> str:
+
return self.content_type
-
def get_post_date_iso(self) -> str:
-
return self.status.get('created_at') or self.now_timestamp
+
def get_timestamp(self) -> str:
+
return self.timestamp
-
def get_cw(self) -> str:
-
return self.status.get('spoiler_text') or ''
+
def get_attachments(self) -> list[MediaInfo]:
+
return self.media_attachments
-
def get_id(self) -> str:
-
return self.status['id']
+
def get_spoiler(self) -> str | None:
+
return self.spoiler
def get_languages(self) -> list[str]:
-
return [self.status['language']] if self.status.get('language') else []
+
return self.language
def is_sensitive(self) -> bool:
-
return self.status.get('sensitive', False)
-
-
def get_attachments(self) -> list[MediaInfo]:
-
return self.media_attachments
-
-
def get_text_type(self) -> str:
-
return self.content_type
+
return self.sensitive or (self.spoiler is not None)
def get_post_url(self) -> str | None:
-
return self.status.get('url')
+
return self.url
+5 -4
mastodon/input.py
···
database.delete_post(self.db, identifier, self.user_id, self.service)
def _on_post(self, outputs: list[cross.Output], event: str, payload: str):
-
if event == 'update':
-
self._on_create_post(outputs, json.loads(payload))
-
elif event == 'delete':
-
self._on_delete_post(outputs, payload)
+
match event:
+
case 'update':
+
self._on_create_post(outputs, json.loads(payload))
+
case 'delete':
+
self._on_delete_post(outputs, payload)
async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}"
+2 -2
mastodon/output.py
···
payload = {
'status': status,
'media_ids': media or [],
-
'spoiler_text': post.get_cw(),
+
'spoiler_text': post.get_spoiler() or '',
'visibility': self.options.get('visibility', 'public'),
'content_type': self.text_format,
'language': lang
···
if media:
payload['sensitive'] = post.is_sensitive()
-
if post.get_cw():
+
if post.get_spoiler():
payload['sensitive'] = True
if not status:
+20 -17
misskey/common.py
···
def __init__(self, instance_url: str, note: dict, tokens: list[cross.Token], files: list[MediaInfo]) -> None:
super().__init__()
self.note = note
-
self.sensitive = any([a.get('isSensitive', False) for a in note.get('files', [])])
-
self.media_attachments = files
+
self.id = note['id']
+
self.parent_id = note.get('replyId')
self.tokens = tokens
+
self.timestamp = note['createdAt']
+
self.media_attachments = files
+
self.spoiler = note.get('cw')
+
self.sensitive = any([a.get('isSensitive', False) for a in note.get('files', [])])
self.url = instance_url + '/notes/' + note['id']
+
def get_id(self) -> str:
+
return self.id
+
+
def get_parent_id(self) -> str | None:
+
return self.parent_id
+
def get_tokens(self) -> list[cross.Token]:
return self.tokens
+
+
def get_text_type(self) -> str:
+
return "text/x.misskeymarkdown"
-
def get_parent_id(self) -> str | None:
-
return self.note.get('replyId')
-
-
def get_post_date_iso(self) -> str:
-
date = self.note.get('createdAt')
-
return date or super().get_post_date_iso()
+
def get_timestamp(self) -> str:
+
return self.timestamp
def get_attachments(self) -> list[MediaInfo]:
return self.media_attachments
-
def get_id(self) -> str:
-
return self.note['id']
-
-
def get_cw(self) -> str:
-
return self.note.get('cw') or ''
+
def get_spoiler(self) -> str | None:
+
return self.spoiler
def get_languages(self) -> list[str]:
return []
def is_sensitive(self) -> bool:
-
return self.sensitive
-
-
def get_text_type(self) -> str:
-
return "text/x.misskeymarkdown"
+
return self.sensitive or (self.spoiler is not None)
def get_post_url(self) -> str | None:
return self.url