Feed generator written in Golang
1package main
2
3import (
4 "fmt"
5 "log"
6 "net/url"
7 "time"
8
9 "github.com/gorilla/websocket"
10)
11
12const FIREHOSE_QUERY = "wantedCollections=app.bsky.feed.post"
13
14const URL_RELAY_JETSTREAM2 = "wss://jetstream2.us-east.bsky.network/subscribe"
15const URL_RELAY_BLACKSKY = "wss://atproto.africa/xrpc/com.atproto.sync.subscribeRepos"
16
17type OnMessageCB = func(message []byte)
18
19type FirehoseConsumer struct {
20 WebsocketConnection *websocket.Conn
21 MessagesChan *chan []byte
22 DoneSignal *chan struct{}
23}
24
25func MakeConsumer(baseUrl string) *FirehoseConsumer {
26 firehoseUrl, err := url.Parse(fmt.Sprintf("%s?%s", baseUrl, FIREHOSE_QUERY))
27
28 MessagesChan := make(chan []byte)
29 doneSignal := make(chan struct{})
30 conn, _, err := websocket.DefaultDialer.Dial(firehoseUrl.String(), nil)
31 if err != nil {
32 log.Fatal("failed to dial: ", err)
33 }
34
35 consumer := FirehoseConsumer{
36 WebsocketConnection: conn,
37 MessagesChan: &MessagesChan,
38 DoneSignal: &doneSignal,
39 }
40
41 return &consumer
42}
43
44func (consumer *FirehoseConsumer) Close() error {
45 return consumer.WebsocketConnection.Close()
46}
47
48func (consumer *FirehoseConsumer) StartConsumer() {
49 defer close(*consumer.DoneSignal)
50 for {
51 _, message, err := consumer.WebsocketConnection.ReadMessage()
52 if err != nil {
53 log.Println(err)
54 return
55 }
56
57 // onMessage(message)
58 *consumer.MessagesChan <- message
59 }
60}
61
62func (consumer *FirehoseConsumer) CloseWithTimeout(timeoutDuration time.Duration) error {
63 err := consumer.WebsocketConnection.WriteMessage(
64 websocket.CloseMessage,
65 websocket.FormatCloseMessage(
66 websocket.CloseNormalClosure,
67 "",
68 ),
69 )
70
71 if err != nil {
72 return fmt.Errorf("failed to close ws: ", err)
73 }
74
75 select {
76 case <-*consumer.DoneSignal:
77 return nil
78 case <-time.After(timeoutDuration):
79 return fmt.Errorf("websocket close timeout")
80 }
81}