social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

formatting

zenfyr.dev bfd693d6 ada6f7dd

verified
+6 -1
cross/attachments.py
···
@dataclass
-
class Attachment():
+
class Attachment:
pass
+
@dataclass
class SpoilerAttachment(Attachment):
spoiler: str
+
@dataclass
class LanguagesAttachment(Attachment):
langs: list[str]
+
@dataclass
class SensitiveAttachment(Attachment):
sensitive: bool
+
@dataclass
class RemoteUrlAttachment(Attachment):
url: str
+
@dataclass
class QuoteAttachment(Attachment):
+4
cross/fragments.py
···
from dataclasses import dataclass
+
@dataclass
class Fragment:
start: int
end: int
+
@dataclass
class LinkFragment(Fragment):
url: str
+
@dataclass
class TagFragment(Fragment):
tag: str
+
@dataclass
class MentionFragment(Fragment):
+6 -3
cross/post.py
···
from dataclasses import dataclass, field
+
from typing import TypeVar
+
from cross.attachments import Attachment
from cross.fragments import Fragment
-
from typing import TypeVar
-
T = TypeVar('T', bound=Attachment)
+
T = TypeVar("T", bound=Attachment)
+
class AttachmentKeeper:
def __init__(self) -> None:
···
raise TypeError(f"Expected {cls.__name__}, got {type(instance).__name__}")
return instance
+
@dataclass
class Post:
id: str
parent_id: str | None
-
text: str # utf-8 text
+
text: str # utf-8 text
attachments: AttachmentKeeper
fragments: list[Fragment] = field(default_factory=list)
+15 -5
cross/service.py
···
-
from pathlib import Path
import sqlite3
+
from pathlib import Path
from typing import Callable, cast
+
from cross.post import Post
from database.connection import get_conn
from util.util import LOGGER
-
from cross.post import Post
class Service:
def __init__(self, url: str, db: Path) -> None:
···
def close(self):
self.conn.close()
+
class OutputService(Service):
def accept_post(self, post: Post):
LOGGER.warning("NOT IMPLEMENTED (%s), accept_post %s", self.url, post.id)
···
LOGGER.warning("NOT IMPLEMENTED (%s), delete_post %s", self.url, post_id)
def accept_repost(self, repost_id: str, reposted_id: str):
-
LOGGER.warning("NOT IMPLEMENTED (%s), accept_repost %s of %s", self.url, repost_id, reposted_id)
+
LOGGER.warning(
+
"NOT IMPLEMENTED (%s), accept_repost %s of %s",
+
self.url,
+
repost_id,
+
reposted_id,
+
)
def delete_repost(self, repost_id: str):
LOGGER.warning("NOT IMPLEMENTED (%s), delete_repost %s", self.url, repost_id)
-
class InputService(Service):
-
async def listen(self, outputs: list[OutputService], submitter: Callable[[Callable[[], None]], None]): # pyright: ignore[reportUnusedParameter]
+
class InputService(Service):
+
async def listen(
+
self,
+
outputs: list[OutputService],
+
submitter: Callable[[Callable[[], None]], None],
+
): # pyright: ignore[reportUnusedParameter]
pass
+2 -1
database/connection.py
···
+
import sqlite3
from pathlib import Path
-
import sqlite3
+
def get_conn(db: Path) -> sqlite3.Connection:
conn = sqlite3.connect(db, autocommit=True, check_same_thread=False)
+1 -1
database/migrations.py
···
import sqlite3
from pathlib import Path
-
from util.util import LOGGER
from database.connection import get_conn
+
from util.util import LOGGER
class DatabaseMigrator:
+2 -2
env.py
···
import os
-
DATA_DIR = os.environ.get('DATA_DIR') or "./data"
-
MIGRATIONS_DIR = os.environ.get('MIGRATIONS_DIR') or "./migrations"
+
DATA_DIR = os.environ.get("DATA_DIR") or "./data"
+
MIGRATIONS_DIR = os.environ.get("MIGRATIONS_DIR") or "./migrations"
+3 -2
main.py
···
thread = threading.Thread(target=worker, args=(task_queue,), daemon=True)
thread.start()
-
LOGGER.info("Connecting to %s...", 'TODO') # TODO
+
LOGGER.info("Connecting to %s...", "TODO") # TODO
try:
task_queue.put(lambda: print("hi"))
-
sleep(10) # TODO
+
sleep(10) # TODO
except KeyboardInterrupt:
LOGGER.info("Stopping...")
task_queue.join()
task_queue.put(None)
thread.join()
+
if __name__ == "__main__":
main()
+2 -1
misskey/info.py
···
from abc import ABC, abstractmethod
-
from cross.service import Service
import requests
+
from cross.service import Service
from util.util import LOGGER
+
class MisskeyService(ABC, Service):
def verify_credentials(self):
+3 -4
misskey/input.py
···
import asyncio
-
from dataclasses import dataclass, field
import json
-
from pathlib import Path
import re
-
from typing import Any, Callable, override
import uuid
+
from dataclasses import dataclass, field
+
from pathlib import Path
+
from typing import Any, Callable, override
import websockets
···
async for msg in ws:
LOGGER.info(msg) # TODO
-
#keepalive = asyncio.create_task(self._send_keepalive(ws))
listen = asyncio.create_task(listen_for_messages())
_ = await asyncio.gather(listen)