social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

add license and refactor websockets a bit

zenfyr.dev d101a855 79423332

verified
Changed files
+45 -38
+21
LICENSE
···
···
+
MIT License
+
+
Copyright (c) 2025
+
+
Permission is hereby granted, free of charge, to any person obtaining a copy
+
of this software and associated documentation files (the "Software"), to deal
+
in the Software without restriction, including without limitation the rights
+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+
copies of the Software, and to permit persons to whom the Software is
+
furnished to do so, subject to the following conditions:
+
+
The above copyright notice and this permission notice shall be included in all
+
copies or substantial portions of the Software.
+
+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+
SOFTWARE.
+15 -8
mastodon.py
···
from util import LOGGER
import requests, websockets
-
import util, media_util, json
-
import cross
import database
from database import DataBaseWorker
from typing import Callable, Any
from bs4 import BeautifulSoup, Tag
from bs4.element import NavigableString
···
if event == 'update':
self._on_create_post(outputs, json.loads(payload))
elif event == 'delete':
-
self._on_delete_post(outputs, payload)
-
async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}"
···
async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.2"}):
try:
LOGGER.info("Listening to %s...", self.streaming)
-
while True:
-
message = await ws.recv()
-
event: dict = json.loads(message)
-
submit(lambda: self._on_post(outputs, str(event.get('event')), str(event.get('payload'))))
except websockets.ConnectionClosedError as e:
LOGGER.error(e, stack_info=True, exc_info=True)
LOGGER.info("Reconnecting to %s...", self.streaming)
···
from util import LOGGER
import requests, websockets
+
import util, media_util, json, cross
import database
from database import DataBaseWorker
from typing import Callable, Any
+
import asyncio
from bs4 import BeautifulSoup, Tag
from bs4.element import NavigableString
···
if event == 'update':
self._on_create_post(outputs, json.loads(payload))
elif event == 'delete':
+
self._on_delete_post(outputs, payload)
async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}"
···
async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.2"}):
try:
LOGGER.info("Listening to %s...", self.streaming)
+
+
async def listen_for_messages():
+
async for msg in ws:
+
data = json.loads(msg)
+
event: str = data.get('event')
+
payload: str = data.get('payload')
+
+
submit(lambda: self._on_post(outputs, str(event), str(payload)))
+
+
listen = asyncio.create_task(listen_for_messages())
+
+
await asyncio.gather(listen)
except websockets.ConnectionClosedError as e:
LOGGER.error(e, stack_info=True, exc_info=True)
LOGGER.info("Reconnecting to %s...", self.streaming)
+9 -30
misskey.py
···
output.accept_post(cross_post)
def _on_delete(self, outputs: list[cross.Output], note: dict):
pass
def _on_message(self, outputs: list[cross.Output], data: dict):
···
LOGGER.error(f"Error sending keepalive: {e}")
break
-
async def _listen_for_messages(
-
self,
-
ws: websockets.WebSocketClientProtocol,
-
submit: Callable[[Callable[[], Any]], Any],
-
outputs: list[cross.Output]):
-
-
async for msg in ws:
-
data = json.loads(msg)
-
-
# TODO listen to deletes somehow
-
if False and data['type'] == 'channel':
-
payload_type = data['body']['type']
-
if payload_type == 'reply' or payload_type == 'note':
-
user_id = data['body']['body']['userId']
-
if self.user_id == user_id:
-
note_id = data['body']['body']['id']
-
await ws.send(json.dumps({
-
'type': 's',
-
'body': {
-
'id': note_id
-
}
-
}))
-
LOGGER.info('Subscribed to note %s updates.', note_id)
-
-
submit(lambda: self._on_message(outputs, data))
-
async def _subscribe_to_home(self, ws: websockets.WebSocketClientProtocol):
-
home_message = json.dumps({
"type": "connect",
"body": {
"channel": "homeTimeline",
"id": str(uuid.uuid4())
}
-
})
-
await ws.send(home_message)
LOGGER.info("Subscribed to 'homeTimeline' channel...")
···
LOGGER.info("Listening to %s...", streaming)
await self._subscribe_to_home(ws)
keepalive = asyncio.create_task(self._send_keepalive(ws))
-
listen = asyncio.create_task(self._listen_for_messages(ws, submit, outputs))
await asyncio.gather(keepalive, listen)
except websockets.ConnectionClosedError as e:
···
output.accept_post(cross_post)
def _on_delete(self, outputs: list[cross.Output], note: dict):
+
# TODO handle deletes
pass
def _on_message(self, outputs: list[cross.Output], data: dict):
···
LOGGER.error(f"Error sending keepalive: {e}")
break
async def _subscribe_to_home(self, ws: websockets.WebSocketClientProtocol):
+
await ws.send(json.dumps({
"type": "connect",
"body": {
"channel": "homeTimeline",
"id": str(uuid.uuid4())
}
+
}))
LOGGER.info("Subscribed to 'homeTimeline' channel...")
···
LOGGER.info("Listening to %s...", streaming)
await self._subscribe_to_home(ws)
+
async def listen_for_messages():
+
async for msg in ws:
+
# TODO listen to deletes somehow
+
submit(lambda: self._on_message(outputs, json.loads(msg)))
+
keepalive = asyncio.create_task(self._send_keepalive(ws))
+
listen = asyncio.create_task(listen_for_messages())
await asyncio.gather(keepalive, listen)
except websockets.ConnectionClosedError as e: