Pronouns labels on Bluesky
1import { AppBskyFeedLike } from "@atproto/api"; 2import { Firehose } from "@skyware/firehose"; 3import { label } from "./label.js"; 4import { DID } from "./constants.js"; 5import fs from "node:fs"; 6 7const subscribe = async () => { 8 let cursorFirehose = 0; 9 let intervalID: NodeJS.Timeout; 10 const cursorFile = fs.readFileSync("cursor.txt", "utf8"); 11 12 const firehose = new Firehose({ cursor: cursorFile ?? "" }); 13 if (cursorFile) console.log(`Initiate firehose at cursor ${cursorFile}`); 14 15 firehose.on("error", ({ cursor, error }) => { 16 console.log(`Firehose errored on cursor: ${cursor}`, error); 17 }); 18 19 firehose.on("open", () => { 20 intervalID = setInterval(() => { 21 const timestamp = new Date().toISOString(); 22 console.log(`${timestamp} cursor: ${cursorFirehose}`); 23 fs.writeFile("cursor.txt", cursorFirehose.toString(), (err) => { 24 if (err) console.error(err); 25 }); 26 }, 60000); 27 }); 28 29 firehose.on("close", () => { 30 clearInterval(intervalID); 31 }); 32 33 firehose.on("commit", async (commit) => { 34 cursorFirehose = commit.seq; 35 commit.ops.forEach(async (op) => { 36 if (op.action !== "delete" && AppBskyFeedLike.isRecord(op.record)) { 37 if (op.record.subject.uri.includes(DID)) { 38 if (op.record.subject.uri.includes("app.bsky.feed.post")) { 39 await label( 40 commit.repo, 41 op.record.subject.uri.split("/").pop()!, 42 ).catch((err) => console.error(err)); 43 } 44 } 45 } 46 }); 47 }); 48 49 firehose.start(); 50}; 51 52subscribe();