social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

maybe handle missing values, try reconnect ws, post languages

zenfyr.dev 2867f353 2da6655f

verified
Changed files
+26 -14
+5 -2
cross.py
···
def get_tokens(self) -> list[Token]:
return []
-
def get_parent_id(self) -> str:
-
return ''
+
def get_parent_id(self) -> str | None:
+
return None
def get_attachments(self) -> list[MediaAttachment]:
return []
···
def get_cw(self) -> str:
return ''
+
+
def get_languages(self) -> list[str]:
+
return []
def is_sensitive(self) -> bool:
return False
+21 -12
mastodon.py
···
self.status = status
media_attachments: list[cross.MediaAttachment] = []
-
for attachment in status['media_attachments']:
+
for attachment in status.get('media_attachments', []):
media_attachments.append(MastodonAttachment(attachment))
self.media_attachments = media_attachments
-
self.tokens = util.tokenize_html(status['content'])
+
self.tokens = util.tokenize_html(status.get('content', ''))
def get_tokens(self) -> list[cross.Token]:
return self.tokens
-
def get_parent_id(self) -> str:
-
return self.status['in_reply_to_id']
+
def get_parent_id(self) -> str | None:
+
return self.status.get('in_reply_to_id')
def get_cw(self) -> str:
return util.safe_get(self.status, 'spoiler_text', '')
···
def get_id(self) -> str:
return self.status['id']
+
def get_languages(self) -> list[str]:
+
if self.status.get('language'):
+
return [self.status['language']]
+
return []
+
def is_sensitive(self) -> bool:
-
return self.status['sensitive']
+
return self.status.get('sensitive', False)
def get_attachments(self) -> list[cross.MediaAttachment]:
return self.media_attachments
···
async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}"
-
async with websockets.connect(uri, extra_headers={
-
"User-Agent": "XPost/0.0.2"
-
}) as ws:
-
while True:
-
message = await ws.recv()
-
event: dict = json.loads(message)
-
submit(lambda: self._on_post(outputs, str(event.get('event')), str(event.get('payload'))))
+
+
async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.2"}):
+
try:
+
while True:
+
message = await ws.recv()
+
event: dict = json.loads(message)
+
submit(lambda: self._on_post(outputs, str(event.get('event')), str(event.get('payload'))))
+
except websockets.ConnectionClosedError as e:
+
LOGGER.error(e, stack_info=True, exc_info=True)
+
LOGGER.info("Reconnecting to %s...", self.streaming)
+
continue