social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

add misskey input

zenfyr.dev 79423332 afeeef7a

verified
+27
README.md
···
- Mitra
- Sharkey
+
### Misskey WebSocket
+
+
listens to the homeTimeline channel for new posts, crossposts only the public/home ones by the user.
+
+
**IMPORTANT**: Misskey WSS does Not support deletes, you must delete posts manually. if you know how i can listen to all note events, i would appreciate your help.
+
+
Generate an access token with the `View your account information` scope.
+
+
```json5
+
{
+
"type": "misskey-wss", // type
+
"instance": "env:MISSKEY_INSTANCE", // misskey instance
+
"token": "env:MISSKEY_TOKEN",
+
"options": {
+
"allowed_visibility": [
+
"public",
+
"home"
+
]
+
}
+
}
+
```
+
+
Misskey API is not very good, this also wasn't tested on vanilla misskey.
+
+
confirmed supported:
+
- Sharkey
+
## Outputs
### Bluesky
+4 -3
main.py
···
import os
import json
import database
-
import mastodon, bluesky, cross
+
import mastodon, misskey, bluesky, cross
import asyncio, threading, queue, traceback
DEFAULT_SETTINGS: dict = {
···
}
INPUTS = {
-
"mastodon-wss": lambda settings, db: mastodon.MastodonInput(settings, db)
+
"mastodon-wss": lambda settings, db: mastodon.MastodonInput(settings, db),
+
"misskey-wss": lambda settigs, db: misskey.MisskeyInput(settigs, db)
}
OUTPUTS = {
···
traceback.print_exc()
thread = threading.Thread(target=worker, daemon=True)
thread.start()
-
LOGGER.info('Listening to %s...', input.service)
+
LOGGER.info('Connecting to %s...', input.service)
asyncio.run(input.listen(outputs, lambda x: task_queue.put(x)))
+6 -2
mastodon.py
···
responce = requests.get(f"{service}/api/v1/accounts/verify_credentials", headers={
'Authorization': f'Bearer {self.token}'
})
-
if responce.status_code == 401:
-
raise Exception("Invalid Mastodon API token provided!")
+
if responce.status_code != 200:
+
LOGGER.error("Failed to validate user credentials!")
+
responce.raise_for_status()
+
return
super().__init__(service, responce.json()["id"], settings, db)
self.streaming = self._get_streaming_url()
···
if parent_post['root_id']:
root_id = parent_post['root_id']
+
LOGGER.info("Crossposting '%s'...", status['id'])
if root_id and parent_id:
database.insert_reply(
self.db,
···
async for ws in websockets.connect(uri, extra_headers={"User-Agent": "XPost/0.0.2"}):
try:
+
LOGGER.info("Listening to %s...", self.streaming)
while True:
message = await ws.recv()
event: dict = json.loads(message)
+321
misskey.py
···
+
import cross, media_util, util, database
+
from util import LOGGER
+
import requests, websockets
+
import re
+
from typing import Callable, Any
+
import asyncio
+
import json, uuid
+
+
URL = re.compile(r"(?:https?://|mailto:|localhost\b)[\w\-\._~:/\?#\[\]@!\$&'\(\)\*\+,;=%]+")
+
MD_INLINE_LINK = re.compile(r"\[([^\]]+)\]\(([^\)]+)\)")
+
MD_AUTOLINK = re.compile(r"<((?:https?://[^\s>]+|mailto:[^\s>]+))>")
+
HASHTAG = re.compile(r'(?<!\w)\#([\w]+)')
+
FEDIVERSE_HANDLE = re.compile(r'(?<![\w@])@([\w-]+)(?:@([\w\.-]+\.[\w\.-]+))?')
+
+
def get_image_common(mime: str):
+
if mime.startswith('image/'):
+
if mime == 'image/gif':
+
return 'gif'
+
return 'image'
+
elif mime.startswith('video/'):
+
return 'video'
+
elif mime.startswith('audio/'):
+
return 'audio'
+
else:
+
return 'other'
+
+
def tokenize_note(note: dict) -> list[cross.Token]:
+
text: str = note.get('text', '')
+
if not text:
+
return []
+
+
mention_handles: dict = util.safe_get(note, 'mentionHandles', {})
+
handles: list[str] = []
+
+
for key, value in mention_handles.items():
+
handles.append(value)
+
+
tags: list[str] = util.safe_get(note, 'tags', [])
+
+
index: int = 0
+
total: int = len(text)
+
buffer: list[str] = []
+
+
tokens: list[cross.Token] = []
+
+
def flush():
+
nonlocal buffer
+
if buffer:
+
tokens.append(cross.TextToken(''.join(buffer)))
+
buffer = []
+
+
while index < total:
+
if text[index] == '[':
+
md_inline = MD_INLINE_LINK.match(text, index)
+
if md_inline:
+
flush()
+
label = md_inline.group(1)
+
href = md_inline.group(2)
+
tokens.append(cross.LinkToken(href, label))
+
index = md_inline.end()
+
continue
+
+
if text[index] == '<':
+
md_auto = MD_AUTOLINK.match(text, index)
+
if md_auto:
+
flush()
+
href = md_auto.group(1)
+
tokens.append(cross.LinkToken(href, href))
+
index = md_auto.end()
+
continue
+
+
if text[index] == '#':
+
tag = HASHTAG.match(text, index)
+
if tag:
+
tag_text = tag.group(1)
+
if tag_text.lower() in tags:
+
flush()
+
tokens.append(cross.TagToken(tag_text))
+
index = tag.end()
+
continue
+
+
if text[index] == '@':
+
handle = FEDIVERSE_HANDLE.match(text, index)
+
if handle:
+
handle_text = handle.group(0)
+
if handle_text.strip() in handles:
+
flush()
+
tokens.append(cross.MentionToken(handle_text, '')) # TODO misskey doesn't provide a uri
+
index = handle.end()
+
continue
+
+
url = URL.match(text, index)
+
if url:
+
flush()
+
href = url.group(0)
+
tokens.append(cross.LinkToken(href, href))
+
index = url.end()
+
continue
+
+
buffer.append(text[index])
+
index += 1
+
+
flush()
+
return tokens
+
+
class MisskeyPost(cross.Post):
+
def __init__(self, note: dict) -> None:
+
super().__init__()
+
self.note = note
+
+
media_attachments: list[cross.MediaAttachment] = []
+
+
sensitive = False
+
for attachment in note.get('files', []):
+
media_attachments.append(MisskeyAttachment(attachment))
+
sensitive |= attachment.get('isSensitive', False)
+
+
self.sensitive = sensitive
+
self.media_attachments = media_attachments
+
+
self.tokens = tokenize_note(self.note)
+
+
def get_tokens(self) -> list[cross.Token]:
+
return self.tokens
+
+
def get_parent_id(self) -> str | None:
+
return self.note.get('replyId')
+
+
def get_attachments(self) -> list[cross.MediaAttachment]:
+
return self.media_attachments
+
+
def get_id(self) -> str:
+
return self.note['id']
+
+
def get_cw(self) -> str:
+
return util.safe_get(self.note, 'cw', '')
+
+
def get_languages(self) -> list[str]:
+
return []
+
+
def is_sensitive(self) -> bool:
+
return self.sensitive
+
+
class MisskeyAttachment(cross.MediaAttachment):
+
def __init__(self, attachment: dict) -> None:
+
super().__init__()
+
self.attachment = attachment
+
+
def create_meta(self, bytes: bytes) -> cross.MediaMeta:
+
# it's nort worth it
+
if get_image_common(self.attachment['type']):
+
o_meta = media_util.get_media_meta(bytes)
+
return cross.MediaMeta(o_meta['width'], o_meta['height'], o_meta.get('duration', -1))
+
return cross.MediaMeta(-1, -1, -1)
+
+
def get_url(self) -> str:
+
return self.attachment.get('url', '')
+
+
def get_type(self) -> str | None:
+
return get_image_common(self.attachment['type'])
+
+
def get_alt(self) -> str:
+
return util.safe_get(self.attachment, 'comment', '')
+
+
class MisskeyInput(cross.Input):
+
def __init__(self, settings: dict, db: cross.DataBaseWorker) -> None:
+
self.options = settings.get('options', {})
+
self.token = util.get_or_envvar(settings, 'token')
+
instance: str = util.get_or_envvar(settings, 'instance')
+
+
service = instance[:-1] if instance.endswith('/') else instance
+
+
LOGGER.info("Verifying %s credentails...", service)
+
responce = requests.post(f"{instance}/api/i", json={ 'i': self.token }, headers={
+
"Content-Type": "application/json"
+
})
+
if responce.status_code != 200:
+
LOGGER.error("Failed to validate user credentials!")
+
responce.raise_for_status()
+
return
+
+
super().__init__(service, responce.json()["id"], settings, db)
+
+
def _on_note(self, outputs: list[cross.Output], note: dict):
+
if note['userId'] != self.user_id:
+
return
+
+
if note.get('renoteId') or note.get('poll'):
+
# TODO polls not supported on bsky. maybe 3rd party? skip for now
+
# we don't handle reblogs. possible with bridgy(?) and self
+
LOGGER.info("Skipping '%s'! Renote or poll..", note['id'])
+
return
+
+
reply_id: str | None = note.get('replyId')
+
if reply_id:
+
if note.get('reply', {}).get('userId') != self.user_id:
+
LOGGER.info("Skipping '%s'! Reply to other user..", note['id'])
+
return
+
+
if note.get('visibility') not in self.options.get('allowed_visibility', []):
+
LOGGER.info("Skipping '%s'! '%s' visibility..", note['id'], note.get('visibility'))
+
return
+
+
root_id = None
+
parent_id = None
+
if reply_id:
+
parent_post = database.find_post(self.db, reply_id, self.user_id, self.service)
+
if not parent_post:
+
LOGGER.info("Skipping '%s' as parent post was not found in db!", note['id'])
+
return
+
+
root_id = parent_post['id']
+
parent_id = root_id
+
if parent_post['root_id']:
+
root_id = parent_post['root_id']
+
+
LOGGER.info("Crossposting '%s'...", note['id'])
+
if root_id and parent_id:
+
database.insert_reply(
+
self.db,
+
note['id'],
+
self.user_id,
+
self.service,
+
parent_id,
+
root_id
+
)
+
else:
+
database.insert_post(
+
self.db,
+
note['id'],
+
self.user_id,
+
self.service
+
)
+
+
cross_post = MisskeyPost(note)
+
for output in outputs:
+
output.accept_post(cross_post)
+
+
def _on_delete(self, outputs: list[cross.Output], note: dict):
+
pass
+
+
def _on_message(self, outputs: list[cross.Output], data: dict):
+
+
if data['type'] == 'channel':
+
type: str = data['body']['type']
+
if type == 'note' or type == 'reply':
+
note_body = data['body']['body']
+
self._on_note(outputs, note_body)
+
return
+
+
pass
+
+
async def _send_keepalive(self, ws: websockets.WebSocketClientProtocol):
+
while ws.open:
+
try:
+
await asyncio.sleep(120)
+
if ws.open:
+
await ws.send("h")
+
LOGGER.debug("Sent keepalive h..")
+
else:
+
LOGGER.info("WebSocket is closed, stopping keepalive task.")
+
break
+
except Exception as e:
+
LOGGER.error(f"Error sending keepalive: {e}")
+
break
+
+
async def _listen_for_messages(
+
self,
+
ws: websockets.WebSocketClientProtocol,
+
submit: Callable[[Callable[[], Any]], Any],
+
outputs: list[cross.Output]):
+
+
async for msg in ws:
+
data = json.loads(msg)
+
+
# TODO listen to deletes somehow
+
if False and data['type'] == 'channel':
+
payload_type = data['body']['type']
+
if payload_type == 'reply' or payload_type == 'note':
+
user_id = data['body']['body']['userId']
+
if self.user_id == user_id:
+
note_id = data['body']['body']['id']
+
await ws.send(json.dumps({
+
'type': 's',
+
'body': {
+
'id': note_id
+
}
+
}))
+
LOGGER.info('Subscribed to note %s updates.', note_id)
+
+
submit(lambda: self._on_message(outputs, data))
+
+
async def _subscribe_to_home(self, ws: websockets.WebSocketClientProtocol):
+
home_message = json.dumps({
+
"type": "connect",
+
"body": {
+
"channel": "homeTimeline",
+
"id": str(uuid.uuid4())
+
}
+
})
+
await ws.send(home_message)
+
LOGGER.info("Subscribed to 'homeTimeline' channel...")
+
+
+
async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
+
streaming: str = f"wss://{self.service.split("://", 1)[1]}"
+
url: str = f"{streaming}/streaming?i={self.token}"
+
+
async for ws in websockets.connect(url, extra_headers={"User-Agent": "XPost/0.0.2"}):
+
try:
+
LOGGER.info("Listening to %s...", streaming)
+
await self._subscribe_to_home(ws)
+
+
keepalive = asyncio.create_task(self._send_keepalive(ws))
+
listen = asyncio.create_task(self._listen_for_messages(ws, submit, outputs))
+
+
await asyncio.gather(keepalive, listen)
+
except websockets.ConnectionClosedError as e:
+
LOGGER.error(e, stack_info=True, exc_info=True)
+
LOGGER.info("Reconnecting to %s...", streaming)
+
continue
+1 -1
uv.lock
···
[[package]]
name = "xpost"
-
version = "0.0.1"
+
version = "0.0.2"
source = { virtual = "." }
dependencies = [
{ name = "atproto" },