Pronouns labels on Bluesky
1import { AppBskyFeedLike } from "@atproto/api";
2import { Firehose } from "@skyware/firehose";
3import { getAgent } from "./agent.js";
4import { label } from "./label.js";
5import { DID } from "./constants.js";
6import fs from "node:fs";
7
8const subscribe = async () => {
9 const agent = await getAgent();
10
11 let cursorFirehose = 0;
12 let oldCursor = 0;
13 let cursorFile = fs.readFileSync("cursor.txt", "utf8");
14
15 const firehose = new Firehose({ cursor: cursorFile ?? "" });
16 if (cursorFile) console.log(`Initiate firehose at cursor ${cursorFile}`);
17
18 firehose.on("error", ({ cursor, error }) => {
19 console.log(`Firehose errored on cursor: ${cursor}`, error);
20 });
21
22 firehose.on("open", () => {
23 setInterval(() => {
24 if (oldCursor && oldCursor == cursorFirehose) {
25 firehose.close();
26 subscribe();
27 }
28 oldCursor = cursorFirehose;
29 const timestamp = new Date().toISOString();
30 console.log(`${timestamp} cursor: ${cursorFirehose}`);
31 fs.writeFile("cursor.txt", cursorFirehose.toString(), (err) => {
32 if (err) console.error(err);
33 });
34 }, 60000);
35 });
36
37 firehose.on("commit", async (commit) => {
38 cursorFirehose = commit.seq;
39 commit.ops.forEach(async (op) => {
40 if (op.action !== "delete" && AppBskyFeedLike.isRecord(op.record)) {
41 if ((op.record.subject.uri ?? "").includes(DID)) {
42 if ((op.record.subject.uri ?? "").includes("app.bsky.feed.post")) {
43 await label(agent, commit.repo, op.record.subject.uri).catch(
44 (err) => console.error(err),
45 );
46 }
47 }
48 }
49 });
50 });
51
52 firehose.start();
53};
54
55subscribe();