social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

this is kind of better

zenfyr.dev 2da6655f 6c33c20a

verified
+1
.gitignore
···
# Virtual environments
.venv
+
.vscode/
data/
+52 -18
README.md
···
uv sync
```
-
print help message:
+
generate settings.json on first launch
```
-
uv run main.py run --help
+
uv run main.py
```
# Settings
-
## Bluesky
+
the tool allows you to specify an input and multiple outputs to post to.
-
in the bluesky block, you can configure who is allowed to reply to and quote the new posts.
+
some options accept a envvar syntax:
-
`quote_gate`:
+
```json
+
{
+
"token": "env:TOKEN"
+
}
+
```
-
prevent users from quoting the post. default: `false`
+
## Inputs
-
`thread_gate`:
+
### Mastodon WebSocket `mastodon-wss`
-
prevent users from replying to the post. leave empty to prevent replies completely.
+
listens to the user's home timeline for new posts, crossposts only the public/unlisted ones by the user.
-
accepted values:
-
- `following` followed users.
-
- `followers` users following the account.
-
- `mentioned` users mentioned in the post.
-
- `everybody` everybody is allowed to reply to the post. all other options will be skipped.
-
-
-
# Supported Software
+
```json5
+
{
+
"type": "mastodon-wss", // type
+
"instance": "env:MASTODON_INSTANCE", // mastodon instance
+
"token": "env:MASTODON_TOKEN", // user token (use webtools)
+
"options": {
+
"allowed_visibility": [
+
"public",
+
"unlisted"
+
]
+
}
+
}
+
```
any instance implementing `/api/v1/instance`, `/api/v1/accounts/verify_credentials` and `/api/v1/streaming?stream` will work fine.
confirmed supported:
- Mastodon
- Iceshrimp.NET
-
- Sharkey
- Akkoma
confirmed unsupported:
-
- Mitra
+
- Mitra
+
- Sharkey
+
+
## Outputs
+
+
### Bluesky
+
+
in the bluesky block, you can configure who is allowed to reply to and quote the new posts.
+
+
```json5
+
{
+
"type": "bluesky", // type
+
"handle": "env:BLUESKY_HANDLE", // handle (e.g. melontini.me)
+
"app_password": "env:BLUESKY_APP_PASSWORD", // https://bsky.app/settings/app-passwords
+
"did": "env:BLUESKY_DID", // use a DID instead of handle (avoids handle resolution)
+
"pds": "env:BLUESKY_PDS", // specify Your PDS directly (avoids DID doc lookup)
+
"options": {
+
"quote_gate": false, // block users from quoting the post
+
"thread_gate": [ // block replies. leave empty to disable replies
+
"mentioned",
+
"following",
+
"followers",
+
"everybody" // allow everybody to reply (ignores other options)
+
]
+
}
+
}
+
```
+286 -15
bluesky.py
···
-
from atproto import client_utils, Client, AtUri
+
from atproto import client_utils, Client, AtUri, IdResolver
from atproto_client import models
+
import json
+
import cross
+
import database
+
from database import DataBaseWorker
+
import util
+
import media_util
+
from util import LOGGER
+
+
# only for lexicon reference
+
SERVICE = 'https://bsky.app'
+
+
ADULT_LABEL = ["sexual content", "nsfw"]
+
PORN_LABEL = ["porn", "yiff"]
+
+
class BlueskyOutput(cross.Output):
+
def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
+
super().__init__(input, settings, db)
+
self.options = util.safe_get(settings, 'options', {})
+
+
if not util.get_or_envvar(settings, 'app-password'):
+
raise Exception("Account app password not provided!")
+
+
resolver = IdResolver()
+
did: str | None = util.get_or_envvar(settings, 'did')
+
if not did:
+
LOGGER.info("Resolving ATP identity for %s...", util.get_or_envvar(settings, 'handle'))
+
did = resolver.handle.resolve(util.get_or_envvar(settings, 'handle'))
+
if not did:
+
raise Exception("Failed to resolve DID!")
+
+
pds: str | None = util.get_or_envvar(settings, 'pds')
+
if not pds:
+
LOGGER.info("Resolving PDS from DID document...")
+
did_doc = resolver.did.resolve(did)
+
if not did_doc:
+
raise Exception("Failed to resolve DID doc for '%s'", did)
+
pds = did_doc.get_pds_endpoint()
+
if not pds:
+
raise Exception("Failed to resolve PDS!")
+
+
self.client = Client(pds)
+
self.client.login(did, util.get_or_envvar(settings, 'app-password'))
+
self.bsky = Bluesky(self.client)
+
+
def accept_post(self, post: cross.Post):
+
login = self.client.me
+
if not login:
+
raise Exception("Client not logged in!")
+
+
parent_id = post.get_parent_id()
+
+
# used for db insertion
+
new_root_id = None
+
new_parent_id = None
+
+
root_ref = None
+
reply_ref = None
+
if parent_id:
+
# parentless posts are skipped by the input
+
reply_data = database.find_post(self.db, parent_id, self.input.user_id, self.input.service)
+
assert reply_data, "reply_data requested, but doesn't exist in db (should've been skipped bt firehose)"
+
+
reply_mappings = [json.loads(data[0]) for data in database.find_mappings(self.db, reply_data['id'], SERVICE, login.did)]
+
if not reply_mappings:
+
LOGGER.error("Failed to find mappings for a post in the db!")
+
return
+
+
reply_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_mappings[-1]['uri']), cid=str(reply_mappings[-1]['cid']))
+
root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_mappings[0]['uri']), cid=str(reply_mappings[0]['cid']))
+
if reply_data['root_id']:
+
root_data = database.find_post_by_id(self.db, reply_data['root_id'])
+
assert root_data, "root_data requested but doesn't exist in db"
+
+
root_mappings = [json.loads(data[0]) for data in database.find_mappings(self.db, reply_data['root_id'], SERVICE, login.did)]
+
if not root_mappings:
+
LOGGER.error("Failed to find mappings for a post in the db!")
+
return
+
root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(root_mappings[0]['uri']), cid=str(root_mappings[0]['cid']))
+
+
new_root_id = reply_data['root_id']
+
new_parent_id = reply_data['id']
+
+
root_ref = models.create_strong_ref(root_record)
+
reply_ref = models.create_strong_ref(reply_record)
+
+
tokens = post.get_tokens()
+
+
unique_labels: set[str] = set()
+
cw = post.get_cw()
+
if cw:
+
tokens.insert(0, cross.TextToken("CW: " + cw + "\n\n"))
+
unique_labels.add('graphic-media')
+
+
if any(tag in cw for tag in ADULT_LABEL):
+
unique_labels.add('sexual')
+
+
if any(tag in cw for tag in PORN_LABEL):
+
unique_labels.add('porn')
+
+
if post.is_sensitive():
+
unique_labels.add('graphic-media')
+
+
labels = models.ComAtprotoLabelDefs.SelfLabels(values=[models.ComAtprotoLabelDefs.SelfLabel(val=label) for label in unique_labels])
+
+
split_tokens: list[list[cross.Token]] = util.split_tokens(post.get_tokens(), 300)
+
post_text: list[client_utils.TextBuilder] = []
+
+
# convert tokens into rich text. skip post if contains unsupported tokens
+
for block in split_tokens:
+
rich_text = tokens_to_richtext(block)
+
+
if not rich_text:
+
LOGGER.error("Skipping '%s' as it contains invalid rich text types!", post.get_id())
+
return
+
post_text.append(rich_text)
+
+
if not post_text:
+
post_text = [client_utils.TextBuilder().text('')]
+
+
created_records: list[models.AppBskyFeedPost.CreateRecordResponse] = []
+
attachments = post.get_attachments()
+
if not attachments:
+
for text in post_text:
+
if reply_ref and root_ref:
+
new_post = self.bsky.send_post(text, reply_to=models.AppBskyFeedPost.ReplyRef(
+
parent=reply_ref,
+
root=root_ref
+
), labels=labels)
+
else:
+
new_post = self.bsky.send_post(text, labels=labels)
+
root_ref = models.create_strong_ref(new_post)
+
+
self.bsky.create_gates(self.options, new_post.uri)
+
reply_ref = models.create_strong_ref(new_post)
+
created_records.append(new_post)
+
elif len(attachments) <= 4:
+
if len(attachments) == 1 and attachments[0].get_type() == 'video':
+
video_data = attachments[0]
+
+
video_io = media_util.download_blob(video_data.get_url(), max_bytes=100_000_000)
+
if not video_io:
+
LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
+
return
+
+
metadata = video_data.create_meta(video_io)
+
if metadata.get_duration() > 180:
+
LOGGER.info("Skipping post_id '%s', video attachment too long!", post.get_id())
+
return
+
+
aspect_ratio = models.AppBskyEmbedDefs.AspectRatio(
+
width=metadata.get_width(),
+
height=metadata.get_height()
+
)
+
+
new_post = self.bsky.send_video(
+
text=post_text[0],
+
video=video_io,
+
video_aspect_ratio=aspect_ratio,
+
video_alt=video_data.get_alt(),
+
reply_to= models.AppBskyFeedPost.ReplyRef(
+
parent=reply_ref,
+
root=root_ref
+
) if root_ref and reply_ref else None,
+
labels=labels
+
)
+
if not root_ref:
+
root_ref = models.create_strong_ref(new_post)
+
+
self.bsky.create_gates(self.options, new_post.uri)
+
reply_ref = models.create_strong_ref(new_post)
+
else:
+
for attachment in attachments:
+
if attachment.get_type() != 'image':
+
LOGGER.info("Skipping post_id '%s'. Attachment type mismatch. got: '%s' expected: 'image'", post.get_id(), attachment.get_type())
+
return
+
+
images: list[bytes] = []
+
image_alts: list[str] = []
+
image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] = []
+
for attachment in attachments:
+
image_io = media_util.download_blob(attachment.get_url(), max_bytes=2_000_000)
+
if not image_io:
+
LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
+
return
+
LOGGER.info("Converting %s to .webp...", attachment.get_url())
+
image_io = media_util.compress_image(image_io, quality=100)
+
metadata = attachment.create_meta(image_io)
+
+
if len(image_io) > 1_000_000:
+
LOGGER.info("Compressing %s...", attachment.get_url())
+
+
images.append(image_io)
+
image_alts.append(attachment.get_alt())
+
image_aspect_ratios.append(models.AppBskyEmbedDefs.AspectRatio(
+
width=metadata.get_width(),
+
height=metadata.get_height()
+
))
+
+
new_post = self.bsky.send_images(
+
text=post_text[0],
+
images=images,
+
image_alts=image_alts,
+
image_aspect_ratios=image_aspect_ratios,
+
reply_to= models.AppBskyFeedPost.ReplyRef(
+
parent=reply_ref,
+
root=root_ref
+
) if root_ref and reply_ref else None,
+
labels=labels
+
)
+
if not root_ref:
+
root_ref = models.create_strong_ref(new_post)
+
+
self.bsky.create_gates(self.options, new_post.uri)
+
reply_ref = models.create_strong_ref(new_post)
+
+
created_records.append(new_post)
+
for text in post_text[1:]:
+
new_post = self.bsky.send_post(text, reply_to=models.AppBskyFeedPost.ReplyRef(
+
parent=reply_ref,
+
root=root_ref
+
), labels=labels)
+
self.bsky.create_gates(self.options, new_post.uri)
+
+
reply_ref = models.create_strong_ref(new_post)
+
created_records.append(new_post)
+
else:
+
LOGGER.info("Skipping post_id '%s', too many attachments!", post.get_id())
+
return
+
+
if not created_records:
+
LOGGER.info("Skipped post_id '%s', for some reason...")
+
+
db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service)
+
assert db_post, "ghghghhhhh"
+
+
db_identifiers = [json.dumps(cr.model_dump(), sort_keys=True) for cr in created_records]
+
+
if new_root_id is None or new_parent_id is None:
+
new_root_id = database.insert_post(
+
self.db,
+
db_identifiers[0],
+
login.did,
+
SERVICE
+
)
+
new_parent_id = new_root_id
+
database.insert_mapping(self.db, db_post['id'], new_parent_id)
+
db_identifiers = db_identifiers[1:]
+
+
for db_id in db_identifiers:
+
new_parent_id = database.insert_reply(
+
self.db,
+
db_id,
+
login.did,
+
SERVICE,
+
new_parent_id,
+
new_root_id
+
)
+
database.insert_mapping(self.db, db_post['id'], new_parent_id)
+
+
def delete_post(self, identifier: str):
+
login = self.client.me
+
if not login:
+
raise Exception("Client not logged in!")
+
+
post = database.find_post(self.db, identifier, self.input.user_id, self.input.service)
+
if not post:
+
return
+
+
mappings = database.find_mappings(self.db, post['id'], SERVICE, login.did)
+
for mapping in mappings[::-1]:
+
self.client.delete_post(json.loads(mapping[0])['uri'])
+
database.delete_post(self.db, mapping[0], SERVICE, login.did)
+
class Bluesky():
def __init__(self, client: Client) -> None:
···
self.client.app.bsky.feed.threadgate.create(account.did, thread_gate, rkey)
-
if options['quote_gate']:
+
if options.get('quote_gate', False):
post_gate = models.AppBskyFeedPostgate.Record(
post=post_uri,
created_at=time,
···
self.client.app.bsky.feed.postgate.create(account.did, post_gate, rkey)
-
def tokens_to_richtext(tokens: list[dict]) -> client_utils.TextBuilder | None:
-
builder: client_utils.TextBuilder = client_utils.TextBuilder()
-
+
def tokens_to_richtext(tokens: list[cross.Token]) -> client_utils.TextBuilder | None:
+
builder = client_utils.TextBuilder()
+
for token in tokens:
-
token_type = token['type']
-
-
if token_type == 'text':
-
builder.text(token['content'])
-
elif token_type == 'hashtag':
-
builder.tag('#' + token['tag'], token['tag'])
-
elif token_type == 'link':
-
builder.link(token['text'], token['url'])
+
if isinstance(token, cross.TextToken):
+
builder.text(token.text)
+
elif isinstance(token, cross.LinkToken):
+
builder.link(token.label, token.href)
+
elif isinstance(token, cross.TagToken):
+
builder.tag('#' + token.tag, token.tag)
else:
-
# Fail on mention!
+
# fail on unsupported tokens
return None
-
+
return builder
+109
cross.py
···
+
from typing import Callable, Any
+
from database import DataBaseWorker
+
+
# generic token
+
class Token():
+
def __init__(self, type: str) -> None:
+
self.type = type
+
+
class TextToken(Token):
+
def __init__(self, text: str) -> None:
+
super().__init__('text')
+
self.text = text
+
+
# token that represents a link to a website. e.g. [link](https://google.com/)
+
class LinkToken(Token):
+
def __init__(self, href: str, label: str) -> None:
+
super().__init__('link')
+
self.href = href
+
self.label = label
+
+
# token that represents a hashtag. e.g. #SocialMedia
+
class TagToken(Token):
+
def __init__(self, tag: str) -> None:
+
super().__init__('tag')
+
self.tag = tag
+
+
# token that represents a mention of a user.
+
class MentionToken(Token):
+
def __init__(self, username: str, uri: str) -> None:
+
super().__init__('mention')
+
self.username = username
+
self.uri = uri
+
+
class MediaMeta():
+
def __init__(self, width: int, height: int, duration: float) -> None:
+
self.width = width
+
self.height = height
+
self.duration = duration
+
+
def get_width(self) -> int:
+
return self.width
+
+
def get_height(self) -> int:
+
return self.height
+
+
def get_duration(self) -> float:
+
return self.duration
+
+
class MediaAttachment():
+
def __init__(self) -> None:
+
pass
+
+
def create_meta(self, bytes: bytes) -> MediaMeta:
+
return MediaMeta(-1, -1, -1)
+
+
def get_url(self) -> str:
+
return ''
+
+
def get_type(self) -> str | None:
+
return None
+
+
def get_alt(self) -> str:
+
return ''
+
+
class Post():
+
def __init__(self) -> None:
+
pass
+
+
def get_tokens(self) -> list[Token]:
+
return []
+
+
def get_parent_id(self) -> str:
+
return ''
+
+
def get_attachments(self) -> list[MediaAttachment]:
+
return []
+
+
def get_id(self) -> str:
+
return ''
+
+
def get_cw(self) -> str:
+
return ''
+
+
def is_sensitive(self) -> bool:
+
return False
+
+
# generic input service.
+
# user and service for db queries
+
class Input():
+
def __init__(self, service: str, user_id: str, settings: dict, db: DataBaseWorker) -> None:
+
self.service = service
+
self.user_id = user_id
+
self.settings = settings
+
self.db = db
+
+
async def listen(self, handler: Callable[[Post], Any]):
+
pass
+
+
class Output():
+
def __init__(self, input: Input, settings: dict, db: DataBaseWorker) -> None:
+
self.input = input
+
self.settings = settings
+
self.db = db
+
+
def accept_post(self, post: Post):
+
pass
+
+
def delete_post(self, identifier: str):
+
pass
+118 -108
database.py
···
import sqlite3
-
import json
+
from concurrent.futures import Future
+
import threading
+
import queue
-
import sqlite3
-
import json
-
-
class DataBase():
-
-
def __init__(self, path: str) -> None:
-
self.path = path
-
connection = sqlite3.connect(self.path, autocommit=True)
-
cursor = connection.cursor()
-
cursor.execute('''
-
CREATE TABLE IF NOT EXISTS posts (
-
id TEXT,
-
user_id TEXT,
-
data TEXT,
-
PRIMARY KEY (id, user_id)
-
)
-
''')
-
cursor.close()
+
class DataBaseWorker():
+
def __init__(self, database: str) -> None:
+
super(DataBaseWorker, self).__init__()
+
self.database = database
+
self.queue = queue.Queue()
+
self.thread = threading.Thread(target=self._run, daemon=True)
+
self.shutdown_event = threading.Event()
+
self.conn = sqlite3.connect(self.database, check_same_thread=False)
+
self.lock = threading.Lock()
+
self.thread.start()
+
+
def _run(self):
+
while not self.shutdown_event.is_set():
+
try:
+
task, future = self.queue.get(timeout=1)
+
try:
+
with self.lock:
+
result = task(self.conn)
+
future.set_result(result)
+
except Exception as e:
+
future.set_exception(e)
+
finally:
+
self.queue.task_done()
+
except queue.Empty:
+
continue
-
def connect(self) -> sqlite3.Connection:
-
return sqlite3.connect(self.path, autocommit=True)
-
-
def put_post(self, db: sqlite3.Connection, user_id: str, id: str, data: dict):
-
cursor = db.cursor()
-
cursor.execute('''
-
INSERT OR REPLACE INTO posts (id, user_id, data) VALUES (?, ?, ?)
-
''', (id, user_id, json.dumps(data)))
-
cursor.close()
-
-
def del_post(self, db: sqlite3.Connection, user_id: str, id: str):
-
cursor = db.cursor()
-
cursor.execute('''
-
DELETE FROM posts WHERE id = ? AND user_id = ?
-
''', (id, user_id))
-
cursor.close()
+
def execute(self, sql: str, params = ()):
+
def task(conn: sqlite3.Connection):
+
cursor = conn.execute(sql, params)
+
conn.commit()
+
return cursor.fetchall()
-
def read_data(self, db: sqlite3.Connection, user_id: str, id: str) -> dict | None:
-
cursor = db.cursor()
-
cursor.execute('''
-
SELECT data FROM posts WHERE id = ? AND user_id = ?
-
''', (id, user_id))
-
row = cursor.fetchone()
-
cursor.close()
-
if row:
-
data_json = row[0]
-
return json.loads(data_json)
-
return None
-
-
def get_all_children(self, db: sqlite3.Connection, user_id: str, id: str) -> dict[str, dict]:
-
cursor = db.cursor()
-
cursor.execute('''
-
WITH RECURSIVE thread_cte (id, user_id, data, current_post_uri) AS (
-
SELECT
-
T1.id,
-
T1.user_id,
-
T1.data,
-
json_extract(
-
T1.data,
-
'$.mapped_post_refs[' || (json_array_length(T1.data, '$.mapped_post_refs') - 1) || '].uri'
-
) AS current_post_uri
-
FROM
-
posts AS T1
-
WHERE
-
T1.id = ? AND T1.user_id = ?
-
-
UNION ALL
-
-
SELECT
-
C.id,
-
C.user_id,
-
C.data,
-
json_extract(
-
C.data,
-
'$.mapped_post_refs[' || (json_array_length(C.data, '$.mapped_post_refs') - 1) || '].uri'
-
) AS current_post_uri
-
FROM
-
posts AS C
-
JOIN
-
thread_cte AS P ON json_extract(C.data, '$.parent_ref.uri') = P.current_post_uri
-
WHERE
-
C.user_id = ?
-
)
-
SELECT id, data FROM thread_cte;
-
''', (id, user_id, user_id))
-
raw_data = cursor.fetchall()
-
cursor.close()
+
future = Future()
+
self.queue.put((task, future))
+
return future.result()
+
+
def close(self):
+
self.shutdown_event.set()
+
self.thread.join()
+
with self.lock:
+
self.conn.close()
-
if not raw_data:
-
return {}
+
def insert_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str) -> int:
+
db.execute(
+
"""
+
INSERT INTO posts (user_id, service, identifier)
+
VALUES (?, ?, ?);
+
""", (user_id, serivce, identifier))
+
return db.execute("SELECT last_insert_rowid();", ())[0][0]
-
data: dict[str, dict] = {}
-
for post_id, post_data in raw_data:
-
data[post_id] = json.loads(post_data)
+
def insert_reply(db: DataBaseWorker, identifier: str, user_id: str, serivce: str, parent: int, root: int) -> int:
+
db.execute(
+
"""
+
INSERT INTO posts (user_id, service, identifier, parent_id, root_id)
+
VALUES (?, ?, ?, ?, ?);
+
""", (user_id, serivce, identifier, parent, root))
+
return db.execute("SELECT last_insert_rowid();", ())[0][0]
-
return data
+
def insert_mapping(db: DataBaseWorker, original: int, mapped: int):
+
db.execute("""
+
INSERT INTO mappings (original_post_id, mapped_post_id)
+
VALUES (?, ?);
+
""", (original, mapped))
-
class UserScopedDB:
-
def __init__(self, db: DataBase, user_id: str):
-
self.db = db
-
self.user_id = user_id
+
def delete_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str):
+
db.execute(
+
"""
+
DELETE FROM posts
+
WHERE identifier = ?
+
AND service = ?
+
AND user_id = ?
+
""", (identifier, serivce, user_id))
+
-
def connect(self) -> sqlite3.Connection:
-
return self.db.connect()
-
-
def put_post(self, db: sqlite3.Connection, id: str, data: dict):
-
return self.db.put_post(db, self.user_id, id, data)
-
-
def del_post(self, db: sqlite3.Connection, id: str):
-
return self.db.del_post(db, self.user_id, id)
-
-
def read_data(self, db: sqlite3.Connection, id: str) -> dict | None:
-
return self.db.read_data(db, self.user_id, id)
+
def find_mappings(db: DataBaseWorker, original_post: int, service: str, user_id: str) -> list[str]:
+
return db.execute(
+
"""
+
SELECT p.identifier
+
FROM posts AS p
+
JOIN mappings AS m
+
ON p.id = m.mapped_post_id
+
WHERE m.original_post_id = ?
+
AND p.service = ?
+
AND p.user_id = ?
+
ORDER BY p.id;
+
""",
+
(original_post, service, user_id))
+
+
def find_post_by_id(db: DataBaseWorker, id: int) -> dict | None:
+
result = db.execute(
+
"""
+
SELECT user_id, service, identifier, parent_id, root_id
+
FROM posts
+
WHERE id = ?
+
""", (id,))
+
if not result:
+
return None
+
user_id, service, identifier, parent_id, root_id = result[0]
+
return {
+
'user_id': user_id,
+
'service': service,
+
'identifier': identifier,
+
'parent_id': parent_id,
+
'root_id': root_id
+
}
-
def get_all_children(self, db: sqlite3.Connection, id: str) -> dict[str, dict]:
-
return self.db.get_all_children(db, self.user_id, id)
+
def find_post(db: DataBaseWorker, identifier: str, user_id: str, service: str) -> dict | None:
+
result = db.execute(
+
"""
+
SELECT id, parent_id, root_id
+
FROM posts
+
WHERE identifier = ?
+
AND user_id = ?
+
AND service = ?
+
""", (identifier, user_id, service))
+
if not result:
+
return None
+
id, parent_id, root_id = result[0]
+
return {
+
'id': id,
+
'parent_id': parent_id,
+
'root_id': root_id
+
}
+91 -378
main.py
···
-
import click
+
from util import LOGGER
+
import os
import json
-
import asyncio, threading, queue
-
from atproto import IdResolver, Client, client_utils
-
import atproto_client.models as models
-
import util, mastodon, bluesky, database
-
import os
-
import media_util
-
import traceback
+
import database
+
import mastodon, bluesky, cross
+
import asyncio, threading, queue, traceback
-
ADULT_LABEL = ["sexual content", "nsfw"]
-
PORN_LABEL = ["porn", "yiff"]
+
DEFAULT_SETTINGS: dict = {
+
'input': {
+
'type': 'mastodon-wss',
+
'instance': 'env:MASTODON_INSTANCE',
+
'token': 'env:MASTODON_TOKEN',
+
"options": {
+
"allowed_visibility": [
+
"public",
+
"unlisted"
+
]
+
}
+
},
+
'output': [
+
{
+
'type': 'bluesky',
+
'handle': 'env:BLUESKY_HANDLE',
+
'app-password': 'env:BLUESKY_APP_PASSWORD',
+
'options': {
+
'quote_gate': False,
+
'thread_gate': [
+
'everybody'
+
]
+
}
+
}
+
]
+
}
-
class SocketListener():
-
def __init__(self, user_id: str, atproto: Client, settings: dict, db_path: str) -> None:
-
self.user_id = user_id
-
self.atp = bluesky.Bluesky(atproto)
-
self.settings = settings
-
self.db = database.UserScopedDB(database.DataBase(db_path), user_id)
-
-
def create_post_records(self, status: dict) -> list[models.AppBskyFeedPost.CreateRecordResponse] | None:
-
tokens: list[dict] = util.tokenize_html(status['content'])
-
-
label_text: set[str] = set()
-
status_spoiler = status['spoiler_text']
-
if status_spoiler:
-
tokens.insert(0, {"type": "text", "content": "CW: " + status_spoiler + '\n\n'})
-
label_text.add('graphic-media')
-
-
if any(tag in status_spoiler for tag in ADULT_LABEL):
-
label_text.add('sexual')
-
-
if any(tag in status_spoiler for tag in PORN_LABEL):
-
label_text.add('porn')
-
-
if status['sensitive']:
-
label_text.add('graphic-media')
-
-
labels = models.ComAtprotoLabelDefs.SelfLabels(values=[models.ComAtprotoLabelDefs.SelfLabel(val=label) for label in label_text])
-
-
split_tokens: list[list[dict]] = util.split_tokens(tokens, 300)
-
-
post_text: list[client_utils.TextBuilder] = []
-
for funnel in split_tokens:
-
rich_text = bluesky.tokens_to_richtext(funnel)
-
-
if rich_text is None:
-
click.echo(f"Skipping '{status["id"]}' as it contains invalid rich text types!")
-
return None
-
post_text.append(rich_text)
-
-
if not post_text:
-
post_text = [client_utils.TextBuilder().text('')]
-
-
records: list[models.AppBskyFeedPost.CreateRecordResponse] = []
-
-
in_reply_to_id: str = status['in_reply_to_id']
-
-
root_ref = None
-
reply_ref = None
-
if in_reply_to_id:
-
db = self.db.connect()
-
data: dict | None = self.db.read_data(db, in_reply_to_id)
-
db.close()
-
-
if data is not None:
-
root_data = data['root_ref']
-
if not root_data:
-
root_data = data['mapped_post_refs'][0]
-
-
reply_data = data['mapped_post_refs'][-1]
-
-
root_post = models.AppBskyFeedPost.CreateRecordResponse(uri=str(root_data['uri']), cid=str(root_data['cid']))
-
root_ref = models.create_strong_ref(root_post)
-
-
reply_post = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_data['uri']), cid=str(reply_data['cid']))
-
reply_ref = models.create_strong_ref(reply_post)
-
-
attachments: list[dict] = status['media_attachments']
-
if not attachments:
-
for post in post_text:
-
if reply_ref and root_ref:
-
new_post = self.atp.send_post(post, reply_to=models.AppBskyFeedPost.ReplyRef(
-
parent=reply_ref,
-
root=root_ref
-
), labels=labels)
-
else:
-
new_post = self.atp.send_post(post, labels=labels)
-
root_ref = models.create_strong_ref(new_post)
-
-
self.atp.create_gates(self.settings.get('bluesky', {}), new_post.uri)
-
reply_ref = models.create_strong_ref(new_post)
-
records.append(new_post)
-
-
return records
-
elif len(attachments) <= 4:
-
if len(attachments) == 1 and attachments[0]['type'] == 'video':
-
video: dict = attachments[0]
-
-
video_io = media_util.download_blob(video['url'], max_bytes=100_000_000)
-
if not video_io:
-
click.echo(f"Skipping post_id '{status['id']}', failed to download attachment!")
-
return None
-
-
if len(video_io) > 100_000_000:
-
click.echo(f"Skipping post_id '{status['id']}'. Video file too large")
-
return None
-
-
# some mastodon api implementations don't seem to provide video meta
-
# try to probe it with ffmpeg
-
meta = media_util.get_video_meta(video_io)
-
if meta.get('duration', -1) > 180:
-
click.echo(f"Skipping post_id '{status["id"]}'. Video attachment too long!")
-
return None
-
-
aspect_ratio = models.AppBskyEmbedDefs.AspectRatio(width=meta['width'], height=meta['height'])
-
-
new_post = self.atp.send_video(
-
text=post_text[0],
-
video=video_io,
-
video_aspect_ratio=aspect_ratio,
-
video_alt=video['description'] if video['description'] else '',
-
reply_to= models.AppBskyFeedPost.ReplyRef(
-
parent=reply_ref,
-
root=root_ref
-
) if root_ref and reply_ref else None,
-
labels=labels
-
)
-
if not root_ref:
-
root_ref = models.create_strong_ref(new_post)
-
-
self.atp.create_gates(self.settings.get('bluesky', {}), new_post.uri)
-
reply_ref = models.create_strong_ref(new_post)
-
else:
-
# check if all attachments are images.
-
# bluesky doesn't support gifv and unknown (TODO link the file)
-
for attachment in attachments:
-
if attachment['type'] != 'image':
-
click.echo(f"Skipping post_id '{status['id']}'. Attachment type mismatch. got: '{attachment['type']}' expected: 'image'")
-
return None
-
-
images: list[bytes] = []
-
image_alts: list[str] = []
-
image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] = []
-
for attachment in attachments:
-
-
image_io = media_util.download_blob(attachment['url'], max_bytes=2_000_000)
-
if not image_io:
-
click.echo(f"Skipping post_id '{status['id']}', failed to download attachment!")
-
return None
-
-
# Try to compress image if it's too large
-
if len(image_io) > 1_000_000:
-
click.echo(f"Trying to compress {attachment['url']}..")
-
image_io = media_util.compress_image(image_io)
-
if len(image_io) > 1_000_000:
-
click.echo(f"Skipping post_id '{status['id']}', media attachment still too large after compression!")
-
return None
-
-
meta = util.safe_get(attachment, 'meta', {}).get('original')
-
-
# some mastodon api implementations don't seem to provide image meta
-
# try to probe it with ffmpeg
-
if not meta:
-
meta = media_util.get_image_meta(image_io)
-
-
images.append(image_io)
-
image_alts.append(attachment['description'] if attachment['description'] else '')
-
image_aspect_ratios.append(models.AppBskyEmbedDefs.AspectRatio(width=meta['width'], height=meta['height']))
-
-
new_post = self.atp.send_images(
-
text=post_text[0],
-
images=images,
-
image_alts=image_alts,
-
image_aspect_ratios=image_aspect_ratios,
-
reply_to= models.AppBskyFeedPost.ReplyRef(
-
parent=reply_ref,
-
root=root_ref
-
) if root_ref and reply_ref else None,
-
labels=labels
-
)
-
if not root_ref:
-
root_ref = models.create_strong_ref(new_post)
-
-
self.atp.create_gates(self.settings.get('bluesky', {}), new_post.uri)
-
reply_ref = models.create_strong_ref(new_post)
-
-
records.append(new_post)
-
for post in post_text[1:]:
-
new_post = self.atp.send_post(post, reply_to=models.AppBskyFeedPost.ReplyRef(
-
parent=reply_ref,
-
root=root_ref
-
), labels=labels)
-
self.atp.create_gates(self.settings.get('bluesky', {}), new_post.uri)
-
-
reply_ref = models.create_strong_ref(new_post)
-
records.append(new_post)
-
-
return records
-
else:
-
click.echo(f"Skipping post_id '{status['id']}'. Too many attachments!")
-
return records if records else None
-
-
def on_update(self, status: dict):
-
if util.safe_get(status, 'account', {})['id'] != self.user_id:
-
return
-
-
if status['reblog'] or status['poll']:
-
# TODO polls not supported on bsky. maybe 3rd party? skip for now
-
# we don't handle reblogs. possible with bridgy(?) and self
-
return
+
INPUTS = {
+
"mastodon-wss": lambda settings, db: mastodon.MastodonInput(settings, db)
+
}
-
in_reply: str | None = status['in_reply_to_id']
-
in_reply_to: str | None = status['in_reply_to_account_id']
-
if in_reply_to and in_reply_to != self.user_id:
-
# We don't support replies. possible with bridgy(?)
-
return
-
-
if status['visibility'] not in ['public', 'unlisted']:
-
# Skip f/o and direct posts
-
return
-
-
click.echo(f"Got 'update' event for post '{status['id']}'")
-
-
db = self.db.connect()
-
if in_reply:
-
data: dict | None = self.db.read_data(db, in_reply)
-
if not data:
-
click.echo(f"Post '{status['id']}' is missing parent in the database!")
-
return
-
db.close()
-
-
records = self.create_post_records(status)
-
if records is None:
-
click.echo(f"Skipped crossposting '{status['id']}' due to above erros..")
-
return
-
-
refs: list[dict] = []
-
-
for record in records:
-
refs.append({'cid': record.cid, 'uri': record.uri})
-
-
db = self.db.connect()
-
if not in_reply:
-
self.db.put_post(db, status['id'], {
-
'parent_ref': None,
-
'root_ref': None,
-
'mapped_post_refs': refs
-
})
-
else:
-
self.db.put_post(db, status['id'], {
-
'parent_ref': data['mapped_post_refs'][-1],
-
'root_ref': data['mapped_post_refs'][-1],
-
'mapped_post_refs': refs
-
})
-
db.close()
-
-
def on_delete(self, id: str):
-
db = self.db.connect()
-
post_data = self.db.read_data(db, id)
-
-
if not post_data:
-
return
-
-
click.echo(f"Got 'delete' event for post '{id}'...")
-
-
for ref in post_data['mapped_post_refs']:
-
self.atp.client.delete_post(ref['uri'])
-
-
children: dict[str, dict] = self.db.get_all_children(db, id)
-
for id, data in children.items():
-
for ref in data['mapped_post_refs']:
-
self.atp.client.delete_post(ref['uri'])
-
self.db.del_post(db, id)
-
self.db.del_post(db, id)
-
-
db.close()
-
click.echo(f"Removed post '{id}' and {len(children.items())} replies")
-
-
# TODO Handle edits
-
# The issue is that since there are no edits on bluesky,
-
# we have to recreate the records while keeping the media in tact.
-
# also, since the db only stores post relations, we have to pull all the replies from masto and the pds.
-
def on_status_update(self, status: dict):
-
if status.get('account', {})['id'] != self.user_id:
-
return
-
if status.get('in_reply_to_account_id') != self.user_id:
-
return
-
-
click.echo(f"Got 'status.update' event for post '{status['id']}'")
+
OUTPUTS = {
+
"bluesky": lambda input, settings, db: bluesky.BlueskyOutput(input, settings, db)
+
}
-
@click.group()
-
def main():
-
pass
-
-
@main.command('run')
-
@click.option(
-
"-I", "--instance",
-
envvar="MASTODON_INSTANCE",
-
required=True,
-
help="Mastodon compatible instance domain (e.g. https://mastodon.social)"
-
)
-
@click.option(
-
"-T", "--token",
-
envvar="MASTODON_TOKEN",
-
required=True,
-
help="Mastodon access token"
-
)
-
@click.option(
-
"-H", "--handle",
-
envvar="ATPROTO_HANDLE",
-
required=True,
-
help="ATProto handle (e.g. melontini.me)"
-
)
-
@click.option(
-
"-P", "--password",
-
envvar="ATPROTO_PASSWORD",
-
required=True,
-
help="ATProto/Bluesky app password (https://bsky.app/settings/app-passwords)"
-
)
-
@click.option('--data_dir', default='./data', type=click.Path(exists=True, file_okay=False, dir_okay=True, readable=True, writable=True))
-
def run(instance, token, handle, password, data_dir):
+
def execute(data_dir):
settings_path = os.path.join(data_dir, 'settings.json')
+
database_path = os.path.join(data_dir, 'data.db')
+
if not os.path.exists(settings_path):
-
click.echo(f"First launch detected! creating {settings_path} and exiting..")
+
LOGGER.info("First launch detected! Creating %s and exiting!", settings_path)
with open(settings_path, 'w') as f:
-
json.dump(util.DEFAULT_SETTINGS, f, indent=2)
+
json.dump(DEFAULT_SETTINGS, f, indent=2)
return 0
+
LOGGER.info('Loading settings...')
with open(settings_path, 'rb') as f:
settings = json.load(f)
-
click.echo(f"Connecting to {instance}...")
-
fedi = mastodon.Mastodon(instance, token)
+
LOGGER.info('Starting database worker...')
+
db_worker = database.DataBaseWorker(os.path.abspath(database_path))
+
+
db_worker.execute('PRAGMA foreign_keys = ON;')
+
+
# create the posts table
+
# id - internal id of the post
+
# user_id - user id on the service (e.g. a724sknj5y9ydk0w)
+
# service - the service (e.g. https://shrimp.melontini.me)
+
# identifier - post id on the service (e.g. a8mpiyeej0fpjp0p)
+
# parent_id - the internal id of the parent
+
db_worker.execute(
+
"""
+
CREATE TABLE IF NOT EXISTS posts (
+
id INTEGER PRIMARY KEY AUTOINCREMENT,
+
user_id TEXT NOT NULL,
+
service TEXT NOT NULL,
+
identifier TEXT NOT NULL UNIQUE,
+
parent_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL,
+
root_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL
+
);
+
"""
+
)
+
+
# create the mappings table
+
# original_post_id - the post this was mapped from
+
# mapped_post_id - the post this was mapped to
+
db_worker.execute(
+
"""
+
CREATE TABLE IF NOT EXISTS mappings (
+
original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
+
mapped_post_id INTEGER NOT NULL
+
);
+
"""
+
)
-
if not fedi.streaming:
-
click.echo(f"{fedi.instance} does not support streaming timelines!", err=True)
-
return -1
-
-
id = fedi.get_user_id()
-
if not id:
-
click.echo(f"Failed to get user id from token for {fedi.instance}", err=True)
-
return -1
-
click.echo(f"Got user ID '{id}'")
+
input_settings = settings.get('input')
+
if not input_settings:
+
raise Exception("No input specified!")
+
outputs_settings = settings.get('outputs', [])
-
click.echo(f"Resolving ATP identity for {handle}...")
-
resolver = IdResolver()
-
did: str | None = resolver.handle.resolve(handle)
-
if not did:
-
click.echo(f"Failed to resolve atproto did for handle {handle}!", err=True)
-
return -1
+
input = INPUTS[input_settings['type']](input_settings, db_worker)
-
did_doc = resolver.did.resolve(did)
-
if not did_doc:
-
click.echo(f"Failed to resolve did document from {did}")
-
return -1
+
outputs: list[cross.Output] = []
+
for output_settings in outputs_settings:
+
outputs.append(OUTPUTS[output_settings['type']](input, output_settings, db_worker))
-
pds = did_doc.get_pds_endpoint()
-
if not pds:
-
click.echo(f"Failed to resolve PDS endpoint for did {did}")
-
return -1
-
-
click.echo(f"Logging in to {handle} through {pds}...")
-
atp = Client(pds)
-
atp.login(handle, password)
-
-
click.echo("Starting worker thread...")
+
LOGGER.info('Starting task worker...')
task_queue = queue.Queue()
-
def worker():
while True:
task = task_queue.get()
···
try:
task()
except Exception as e:
-
click.echo(f"Exception in worker thread!\n{e}", err=True)
+
LOGGER.error(f"Exception in worker thread!\n{e}")
traceback.print_exc()
-
thread = threading.Thread(target=worker, daemon=True)
thread.start()
-
-
click.echo(f"Listening to {fedi.streaming}...")
-
listener = SocketListener(id, atp, settings, os.path.join(data_dir, 'data.db'))
+
LOGGER.info('Listening to %s...', input.service)
+
asyncio.run(input.listen(outputs, lambda x: task_queue.put(x)))
-
def handler(event_type, payload):
-
def handle_event():
-
try:
-
if event_type == 'update':
-
listener.on_update(json.loads(payload))
-
elif event_type == 'delete':
-
listener.on_delete(payload)
-
elif event_type == 'status.update':
-
listener.on_status_update(json.loads(payload))
-
except Exception as e:
-
click.echo(f"Error in event handler: {e}", err=True)
-
traceback.print_exc()
-
task_queue.put(handle_event)
-
-
asyncio.run(fedi.connect_websocket(handler))
-
-
task_queue.join()
-
-
task_queue.put(None)
-
thread.join()
-
return 0
if __name__ == "__main__":
-
main()
+
execute('./data')
+235 -23
mastodon.py
···
+
from util import LOGGER
import requests, websockets
-
import util, json
+
import util, media_util, json
+
import cross
+
import database
+
from database import DataBaseWorker
+
from typing import Callable, Any
+
+
from bs4 import BeautifulSoup, Tag
+
from bs4.element import NavigableString
+
from markdownify import markdownify as md
-
class Mastodon():
-
def __init__(self, instance: str, token: str) -> None:
-
self.token = token
-
self.instance = instance
-
self.streaming = self.get_streaming_url()
+
FORMATS = {
+
'video': 'video',
+
'image': 'image',
+
'gifv': 'gif',
+
'audio': 'audio',
+
'unknown': 'other'
+
}
-
def get_streaming_url(self):
-
response = requests.get(f"{self.instance}/api/v1/instance")
-
response.raise_for_status()
-
data: dict = response.json()
-
return util.safe_get(data, "urls", {}).get("streaming_api")
+
def tokenize_post(status: dict) -> list[cross.Token]:
+
soup = BeautifulSoup(status['content'], "html.parser")
+
tokens: list[cross.Token] = []
+
+
tags: list[dict] = status.get('tags', [])
+
mentions: list[dict] = status.get('mentions', [])
+
+
def recurse(node) -> None:
+
if isinstance(node, NavigableString):
+
tokens.append(cross.TextToken(str(node)))
+
return
+
+
if isinstance(node, Tag):
+
if node.name.lower() == "a":
+
href = node.get("href", "")
+
inner_html = "".join(str(c) for c in node.contents)
+
link_text_md = md(inner_html)
+
+
if link_text_md.startswith('@'):
+
as_mention = link_text_md[1:]
+
for block in mentions:
+
if href == block.get('url'):
+
tokens.append(cross.MentionToken(block['acct'], block['url']))
+
return
+
elif as_mention == block.get('acct') or as_mention == block.get('username'):
+
tokens.append(cross.MentionToken(block['acct'], block['url']))
+
return
+
+
if link_text_md.startswith('#'):
+
as_tag = link_text_md[1:].lower()
+
if any(as_tag == block.get('name') for block in tags):
+
tokens.append(cross.TagToken(link_text_md[1:]))
+
return
+
+
# idk if we can safely convert this to string
+
tokens.append(cross.LinkToken(str(href), link_text_md))
+
return
+
+
if node.find("a") is not None:
+
for child in node.contents:
+
recurse(child)
+
return
+
+
serialized = str(node)
+
markdownified = md(serialized)
+
if markdownified:
+
tokens.append(cross.TextToken(markdownified))
+
return
+
return
-
def get_user_id(self):
-
responce = requests.get(f"{self.instance}/api/v1/accounts/verify_credentials", headers={
+
for child in soup.contents:
+
recurse(child)
+
+
return tokens
+
+
class MastodonPost(cross.Post):
+
def __init__(self, status: dict) -> None:
+
super().__init__()
+
self.status = status
+
media_attachments: list[cross.MediaAttachment] = []
+
+
for attachment in status['media_attachments']:
+
media_attachments.append(MastodonAttachment(attachment))
+
+
self.media_attachments = media_attachments
+
+
self.tokens = util.tokenize_html(status['content'])
+
+
def get_tokens(self) -> list[cross.Token]:
+
return self.tokens
+
+
def get_parent_id(self) -> str:
+
return self.status['in_reply_to_id']
+
+
def get_cw(self) -> str:
+
return util.safe_get(self.status, 'spoiler_text', '')
+
+
def get_id(self) -> str:
+
return self.status['id']
+
+
def is_sensitive(self) -> bool:
+
return self.status['sensitive']
+
+
def get_attachments(self) -> list[cross.MediaAttachment]:
+
return self.media_attachments
+
+
class MastodonAttachment(cross.MediaAttachment):
+
def __init__(self, attachment: dict) -> None:
+
super().__init__()
+
self.attachment = attachment
+
+
if attachment.get('type') == 'video' or attachment.get('type') == 'image':
+
if attachment.get('meta') and attachment.get('meta', {}).get('original'):
+
def from_status(bytes: bytes) -> cross.MediaMeta:
+
o_meta = attachment.get('meta', {}).get('original')
+
return cross.MediaMeta(o_meta['width'], o_meta['height'], o_meta.get('duration', -1))
+
self.meta_generator = from_status
+
else:
+
def from_bytes(bytes: bytes) -> cross.MediaMeta:
+
o_meta = media_util.get_media_meta(bytes)
+
return cross.MediaMeta(o_meta['width'], o_meta['height'], o_meta.get('duration', -1))
+
self.meta_generator = from_bytes
+
+
# URL to download the attachment from
+
def get_url(self) -> str:
+
return self.attachment.get('url', '')
+
+
# type of attachment
+
def get_type(self) -> str | None:
+
return FORMATS[self.attachment.get('type', 'other')]
+
+
# create file metadata from bytes or other
+
def create_meta(self, bytes: bytes) -> cross.MediaMeta:
+
return self.meta_generator(bytes)
+
+
# get media description
+
def get_alt(self) -> str:
+
return util.safe_get(self.attachment, 'description', '')
+
+
class MastodonInput(cross.Input):
+
def __init__(self, settings: dict, db: DataBaseWorker) -> None:
+
self.options = settings.get('options', {})
+
self.token = util.get_or_envvar(settings, 'token')
+
instance: str = util.get_or_envvar(settings, 'instance')
+
+
service = instance[:-1] if instance.endswith('/') else instance
+
+
LOGGER.info("Verifying %s credentails...", service)
+
responce = requests.get(f"{service}/api/v1/accounts/verify_credentials", headers={
'Authorization': f'Bearer {self.token}'
})
-
if responce.status_code == 401:
raise Exception("Invalid Mastodon API token provided!")
+
+
super().__init__(service, responce.json()["id"], settings, db)
+
self.streaming = self._get_streaming_url()
+
+
if not self.streaming:
+
raise Exception("Instance %s does not support streaming!", service)
+
+
def _get_streaming_url(self):
+
response = requests.get(f"{self.service}/api/v1/instance")
+
response.raise_for_status()
+
data: dict = response.json()
+
return util.safe_get(data, "urls", {}).get("streaming_api")
-
return responce.json()["id"]
+
def _on_create_post(self, outputs: list[cross.Output], status: dict):
+
# skip events from other users
+
if util.safe_get(status, 'account', {})['id'] != self.user_id:
+
return
+
+
if status.get('reblog') or status.get('poll'):
+
# TODO polls not supported on bsky. maybe 3rd party? skip for now
+
# we don't handle reblogs. possible with bridgy(?) and self
+
LOGGER.info("Skipping '%s'! Reblog or poll..", status['id'])
+
return
+
+
in_reply: str | None = status.get('in_reply_to_id')
+
in_reply_to: str | None = status.get('in_reply_to_account_id')
+
if in_reply_to and in_reply_to != self.user_id:
+
# We don't support replies.
+
LOGGER.info("Skipping '%s'! Reply to other user..", status['id'])
+
return
+
+
if status.get('visibility') not in self.options.get('allowed_visibility', []):
+
# Skip f/o and direct posts
+
LOGGER.info("Skipping '%s'! '%s' visibility..", status['id'], status.get('visibility'))
+
return
+
+
root_id = None
+
parent_id = None
+
if in_reply:
+
parent_post = database.find_post(self.db, in_reply, self.user_id, self.service)
+
if not parent_post:
+
LOGGER.info("Skipping '%s' as parent post was not found in db!", status['id'])
+
return
+
+
root_id = parent_post['id']
+
parent_id = root_id
+
if parent_post['root_id']:
+
root_id = parent_post['root_id']
+
+
if root_id and parent_id:
+
database.insert_reply(
+
self.db,
+
status['id'],
+
self.user_id,
+
self.service,
+
parent_id,
+
root_id
+
)
+
else:
+
database.insert_post(
+
self.db,
+
status['id'],
+
self.user_id,
+
self.service
+
)
+
+
cross_post = MastodonPost(status)
+
for output in outputs:
+
output.accept_post(cross_post)
-
async def connect_websocket(self, handler):
+
def _on_delete_post(self, outputs: list[cross.Output], identifier: str):
+
post = database.find_post(self.db, identifier, self.user_id, self.service)
+
if not post:
+
return
+
+
LOGGER.info("Deleting '%s'...", identifier)
+
for output in outputs:
+
output.delete_post(identifier)
+
database.delete_post(self.db, identifier, self.user_id, self.service)
+
+
def _on_post(self, outputs: list[cross.Output], event: str, payload: str):
+
if event == 'update':
+
self._on_create_post(outputs, json.loads(payload))
+
elif event == 'delete':
+
self._on_delete_post(outputs, payload)
+
+
+
async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
uri = f"{self.streaming}/api/v1/streaming?stream=user&access_token={self.token}"
async with websockets.connect(uri, extra_headers={
-
"User-Agent": "XPost/0.0.1"
-
}) as websocket:
+
"User-Agent": "XPost/0.0.2"
+
}) as ws:
while True:
-
message = await websocket.recv()
+
message = await ws.recv()
event: dict = json.loads(message)
-
-
event_type = event.get('event')
-
payload = event.get('payload')
-
handler(event_type, payload)
+
submit(lambda: self._on_post(outputs, str(event.get('event')), str(event.get('payload'))))
+12 -25
media_util.py
···
import requests
-
import click
import subprocess
import json
+
from util import LOGGER
def probe_bytes(bytes: bytes) -> dict:
cmd = [
···
return json.loads(proc.stdout)
-
def compress_image(image_bytes: bytes):
+
def compress_image(image_bytes: bytes, quality: int = 90):
cmd = [
'ffmpeg',
'-f', 'image2pipe',
'-i', 'pipe:0',
'-c:v', 'webp',
-
'-q:v', '90',
+
'-q:v', str(quality),
'-f', 'image2pipe',
'pipe:1'
]
···
def download_blob(url: str, max_bytes: int = 5_000_000) -> bytes | None:
response = requests.get(url, stream=True, timeout=20)
if response.status_code != 200:
-
click.echo(f"Failed to download {url}! {response}")
+
LOGGER.info("Failed to download %s! %s", url, response)
return None
downloaded_bytes = b""
···
current_size += len(chunk)
if current_size > max_bytes:
-
click.echo(f"Failed to download {url}, file too large!")
response.close()
return None
···
return downloaded_bytes
-
def get_video_meta(video_bytes: bytes):
-
probe = probe_bytes(video_bytes)
-
video_streams = [s for s in probe['streams'] if s['codec_type'] == 'video']
-
if not video_streams:
+
def get_media_meta(bytes: bytes):
+
probe = probe_bytes(bytes)
+
streams = [s for s in probe['streams'] if s['codec_type'] == 'video']
+
if not streams:
raise ValueError("No video stream found")
-
video = video_streams[0]
+
media = streams[0]
return {
-
'width': int(video['width']),
-
'height': int(video['height']),
-
'duration': float(video.get('duration', probe['format'].get('duration', -1)))
-
}
-
-
def get_image_meta(image_bytes: bytes):
-
probe = probe_bytes(image_bytes)
-
stream = next((s for s in probe['streams'] if s['codec_type'] == 'video'), None)
-
-
if not stream:
-
raise ValueError("No video stream found")
-
-
return {
-
'width': int(stream['width']),
-
'height': int(stream['height'])
+
'width': int(media['width']),
+
'height': int(media['height']),
+
'duration': float(media.get('duration', probe['format'].get('duration', -1)))
}
+2 -1
pyproject.toml
···
[project]
name = "xpost"
-
version = "0.0.1"
+
version = "0.0.2"
description = "mastodon -> bluesky crossposting tool"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"atproto>=0.0.61",
+
"bs4>=0.0.2",
"click>=8.2.1",
"markdownify>=1.1.0",
"requests>=2.32.3",
+35 -26
util.py
···
import re
from markdownify import markdownify as md
+
import cross
+
import logging, sys, os
+
+
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
+
LOGGER = logging.getLogger("XPost")
ALTERNATE = re.compile(r'\S+|\s+')
···
def tokenize_html(content: str):
return tokenize_markdown(md(content, autolinks=False))
-
def tokenize_markdown(md):
+
def tokenize_markdown(md) -> list[cross.Token]:
tokens = []
i = 0
length = len(md)
···
url += md[i]
i += 1
i += 1 # skip )
-
tokens.append({'type': 'media', 'alt': alt_text, 'url': url})
+
#tokens.append({'type': 'media', 'alt': alt_text, 'url': url})
else:
-
tokens.append({'type': 'text', 'content': md[start:i]})
+
tokens.append(cross.TextToken(md[start:i]))
elif md[i] == '[':
# link or special
start = i
···
i += 1
i += 1 # skip )
if link_text.startswith('#'):
-
tokens.append({'type': 'hashtag', 'tag': link_text[1:], 'url': url})
+
tokens.append(cross.TagToken(link_text[1:]))
elif link_text.startswith('@'):
-
tokens.append({'type': 'mention', 'mention': link_text[1:], 'url': url})
+
tokens.append(cross.MentionToken(link_text[1:], url))
elif link_text.startswith('http://') or link_text.startswith('https://'):
-
tokens.append({'type': 'link', 'text': link_text, 'url': url})
+
tokens.append(cross.LinkToken(url, link_text))
else:
-
tokens.append({'type': 'link', 'text': link_text, 'url': url})
+
tokens.append(cross.LinkToken(url, link_text))
else:
-
tokens.append({'type': 'text', 'content': md[start:i]})
+
tokens.append(cross.TextToken(md[start:i]))
else:
# plain text
start = i
while i < length and md[i] != '[' and not (md[i] == '!' and i + 1 < length and md[i + 1] == '['):
i += 1
-
tokens.append({'type': 'text', 'content': md[start:i]})
+
tokens.append(cross.TextToken(md[start:i]))
return tokens
-
def split_tokens(tokens: list[dict], max_chars: int) -> list[list[dict]]:
+
def split_tokens(tokens: list[cross.Token], max_chars: int) -> list[list[cross.Token]]:
def start_new_block():
nonlocal current_block, blocks, current_length
if current_block:
···
def append_text_to_block(text_segment):
nonlocal current_block
# if the last element in the current block is also text, just append to it
-
if current_block and current_block[-1]['type'] == 'text':
-
current_block[-1]['content'] += text_segment
+
if current_block and isinstance(current_block[-1], cross.TextToken):
+
current_block[-1].text += text_segment
else:
-
current_block.append({'type': 'text', 'content': text_segment})
+
current_block.append(cross.TextToken(text_segment))
-
blocks: list[list[dict]] = []
-
current_block: list[dict] = []
+
blocks: list[list[cross.Token]] = []
+
current_block: list[cross.Token] = []
current_length: int = 0
for token in tokens:
-
ttype: str = token['type']
-
-
if ttype == 'text':
-
content: str = token['content']
+
if isinstance(token, cross.TextToken):
# split content into alternating “words” (\S+) and “whitespace” (\s+).
# this ensures every space/newline is treated as its own segment.
-
segments: list[str] = ALTERNATE.findall(content)
+
segments: list[str] = ALTERNATE.findall(token.text)
for seg in segments:
if seg.isspace():
···
append_text_to_block(word)
current_length = wlen
-
elif ttype == 'link':
-
url = token['url']
-
link_len = min(len(url), 35)
+
elif isinstance(token, cross.LinkToken):
+
link_len = min(len(token.label), 35)
if current_length + link_len <= max_chars:
current_block.append(token)
···
current_block.append(token)
current_length = link_len
-
elif ttype == 'hashtag':
+
elif isinstance(token, cross.TagToken):
# we treat a hashtag like “#tagname” for counting.
-
hashtag_len = 1 + len(token['tag'])
+
hashtag_len = 1 + len(token.tag)
if current_length + hashtag_len <= max_chars:
current_block.append(token)
current_length += hashtag_len
···
def safe_get(obj: dict, key: str, default):
val = obj.get(key, default)
-
return val if val else default
+
return val if val else default
+
+
def value_or_envvar(text: str) -> str:
+
if text.startswith('env:'):
+
return os.environ.get(text[4:], '')
+
return text
+
+
def get_or_envvar(obj: dict, key: str):
+
return value_or_envvar(obj.get(key, ''))
+14
uv.lock
···
]
[[package]]
+
name = "bs4"
+
version = "0.0.2"
+
source = { registry = "https://pypi.org/simple" }
+
dependencies = [
+
{ name = "beautifulsoup4" },
+
]
+
sdist = { url = "https://files.pythonhosted.org/packages/c9/aa/4acaf814ff901145da37332e05bb510452ebed97bc9602695059dd46ef39/bs4-0.0.2.tar.gz", hash = "sha256:a48685c58f50fe127722417bae83fe6badf500d54b55f7e39ffe43b798653925", size = 698, upload-time = "2024-01-17T18:15:47.371Z" }
+
wheels = [
+
{ url = "https://files.pythonhosted.org/packages/51/bb/bf7aab772a159614954d84aa832c129624ba6c32faa559dfb200a534e50b/bs4-0.0.2-py2.py3-none-any.whl", hash = "sha256:abf8742c0805ef7f662dce4b51cca104cffe52b835238afc169142ab9b3fbccc", size = 1189, upload-time = "2024-01-17T18:15:48.613Z" },
+
]
+
+
[[package]]
name = "certifi"
version = "2025.4.26"
source = { registry = "https://pypi.org/simple" }
···
source = { virtual = "." }
dependencies = [
{ name = "atproto" },
+
{ name = "bs4" },
{ name = "click" },
{ name = "markdownify" },
{ name = "requests" },
···
[package.metadata]
requires-dist = [
{ name = "atproto", specifier = ">=0.0.61" },
+
{ name = "bs4", specifier = ">=0.0.2" },
{ name = "click", specifier = ">=8.2.1" },
{ name = "markdownify", specifier = ">=1.1.0" },
{ name = "requests", specifier = ">=2.32.3" },