Pronouns labels on Bluesky
1import { AppBskyFeedLike } from "@atproto/api";
2import { Firehose } from "@skyware/firehose";
3import { getAgent } from "./agent.js";
4import { label } from "./label.js";
5import { DID, SIGNING_KEY } from "./constants.js";
6import fs from "node:fs";
7import { LabelerServer } from "@skyware/labeler";
8
9const server = new LabelerServer({ did: DID, signingKey: SIGNING_KEY });
10
11server.start(4001, (error, address) => {
12 if (error) {
13 console.error(error);
14 } else {
15 console.log(`Labeler server listening on ${address}`);
16 }
17});
18
19const subscribe = async () => {
20 const agent = await getAgent();
21
22 let cursorFirehose = 0;
23 const cursorFile = fs.readFileSync("cursor.txt", "utf8");
24
25 const firehose = new Firehose({ cursor: cursorFile ?? "" });
26 if (cursorFile) console.log(`Initiate firehose at cursor ${cursorFile}`);
27
28 firehose.on("error", ({ cursor, error }) => {
29 console.log(`Firehose errored on cursor: ${cursor}`, error);
30 });
31
32 firehose.on("open", () => {
33 setInterval(() => {
34 const timestamp = new Date().toISOString();
35 console.log(`${timestamp} cursor: ${cursorFirehose}`);
36 fs.writeFile("cursor.txt", cursorFirehose.toString(), (err) => {
37 if (err) console.error(err);
38 });
39 }, 60000);
40 });
41
42 firehose.on("commit", async (commit) => {
43 cursorFirehose = commit.seq;
44 commit.ops.forEach(async (op) => {
45 if (op.action !== "delete" && AppBskyFeedLike.isRecord(op.record)) {
46 if ((op.record.subject.uri ?? "").includes(DID)) {
47 if ((op.record.subject.uri ?? "").includes("app.bsky.feed.post")) {
48 await label(agent, commit.repo, op.record.subject.uri).catch(
49 (err) => console.error(err),
50 );
51 }
52 }
53 }
54 });
55 });
56
57 firehose.start();
58};
59
60subscribe();