package main import ( "fmt" "log" "net/url" "time" "github.com/gorilla/websocket" ) const FIREHOSE_QUERY = "wantedCollections=app.bsky.feed.post" const URL_RELAY_JETSTREAM2 = "wss://jetstream2.us-east.bsky.network/subscribe" const URL_RELAY_BLACKSKY = "wss://atproto.africa/xrpc/com.atproto.sync.subscribeRepos" type OnMessageCB = func(message []byte) type FirehoseConsumer struct { WebsocketConnection *websocket.Conn MessagesChan *chan []byte DoneSignal *chan struct{} } func MakeConsumer(baseUrl string) *FirehoseConsumer { firehoseUrl, err := url.Parse(fmt.Sprintf("%s?%s", baseUrl, FIREHOSE_QUERY)) MessagesChan := make(chan []byte) doneSignal := make(chan struct{}) conn, _, err := websocket.DefaultDialer.Dial(firehoseUrl.String(), nil) if err != nil { log.Fatal("failed to dial: ", err) } consumer := FirehoseConsumer{ WebsocketConnection: conn, MessagesChan: &MessagesChan, DoneSignal: &doneSignal, } return &consumer } func (consumer *FirehoseConsumer) Close() error { return consumer.WebsocketConnection.Close() } func (consumer *FirehoseConsumer) StartConsumer() { defer close(*consumer.DoneSignal) for { _, message, err := consumer.WebsocketConnection.ReadMessage() if err != nil { log.Println(err) return } // onMessage(message) *consumer.MessagesChan <- message } } func (consumer *FirehoseConsumer) CloseWithTimeout(timeoutDuration time.Duration) error { err := consumer.WebsocketConnection.WriteMessage( websocket.CloseMessage, websocket.FormatCloseMessage( websocket.CloseNormalClosure, "", ), ) if err != nil { return fmt.Errorf("failed to close ws: ", err) } select { case <-*consumer.DoneSignal: return nil case <-time.After(timeoutDuration): return fmt.Errorf("websocket close timeout") } }