Feed generator written in Golang
at main 1.8 kB view raw
1package main 2 3import ( 4 "fmt" 5 "log" 6 "net/url" 7 "time" 8 9 "github.com/gorilla/websocket" 10) 11 12const FIREHOSE_QUERY = "wantedCollections=app.bsky.feed.post" 13 14const URL_RELAY_JETSTREAM2 = "wss://jetstream2.us-east.bsky.network/subscribe" 15const URL_RELAY_BLACKSKY = "wss://atproto.africa/xrpc/com.atproto.sync.subscribeRepos" 16 17type OnMessageCB = func(message []byte) 18 19type FirehoseConsumer struct { 20 WebsocketConnection *websocket.Conn 21 MessagesChan *chan []byte 22 DoneSignal *chan struct{} 23} 24 25func MakeConsumer(baseUrl string) *FirehoseConsumer { 26 firehoseUrl, err := url.Parse(fmt.Sprintf("%s?%s", baseUrl, FIREHOSE_QUERY)) 27 28 MessagesChan := make(chan []byte) 29 doneSignal := make(chan struct{}) 30 conn, _, err := websocket.DefaultDialer.Dial(firehoseUrl.String(), nil) 31 if err != nil { 32 log.Fatal("failed to dial: ", err) 33 } 34 35 consumer := FirehoseConsumer{ 36 WebsocketConnection: conn, 37 MessagesChan: &MessagesChan, 38 DoneSignal: &doneSignal, 39 } 40 41 return &consumer 42} 43 44func (consumer *FirehoseConsumer) Close() error { 45 return consumer.WebsocketConnection.Close() 46} 47 48func (consumer *FirehoseConsumer) StartConsumer() { 49 defer close(*consumer.DoneSignal) 50 for { 51 _, message, err := consumer.WebsocketConnection.ReadMessage() 52 if err != nil { 53 log.Println(err) 54 return 55 } 56 57 // onMessage(message) 58 *consumer.MessagesChan <- message 59 } 60} 61 62func (consumer *FirehoseConsumer) CloseWithTimeout(timeoutDuration time.Duration) error { 63 err := consumer.WebsocketConnection.WriteMessage( 64 websocket.CloseMessage, 65 websocket.FormatCloseMessage( 66 websocket.CloseNormalClosure, 67 "", 68 ), 69 ) 70 71 if err != nil { 72 return fmt.Errorf("failed to close ws: ", err) 73 } 74 75 select { 76 case <-*consumer.DoneSignal: 77 return nil 78 case <-time.After(timeoutDuration): 79 return fmt.Errorf("websocket close timeout") 80 } 81}