A community based topic aggregation platform built on atproto
at main 3.2 kB view raw
1package jetstream 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "sync" 9 "time" 10 11 "github.com/gorilla/websocket" 12) 13 14// PostJetstreamConnector handles WebSocket connection to Jetstream for post events 15type PostJetstreamConnector struct { 16 consumer *PostEventConsumer 17 wsURL string 18} 19 20// NewPostJetstreamConnector creates a new Jetstream WebSocket connector for post events 21func NewPostJetstreamConnector(consumer *PostEventConsumer, wsURL string) *PostJetstreamConnector { 22 return &PostJetstreamConnector{ 23 consumer: consumer, 24 wsURL: wsURL, 25 } 26} 27 28// Start begins consuming events from Jetstream 29// Runs indefinitely, reconnecting on errors 30func (c *PostJetstreamConnector) Start(ctx context.Context) error { 31 log.Printf("Starting Jetstream post consumer: %s", c.wsURL) 32 33 for { 34 select { 35 case <-ctx.Done(): 36 log.Println("Jetstream post consumer shutting down") 37 return ctx.Err() 38 default: 39 if err := c.connect(ctx); err != nil { 40 log.Printf("Jetstream post connection error: %v. Retrying in 5s...", err) 41 time.Sleep(5 * time.Second) 42 continue 43 } 44 } 45 } 46} 47 48// connect establishes WebSocket connection and processes events 49func (c *PostJetstreamConnector) connect(ctx context.Context) error { 50 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil) 51 if err != nil { 52 return fmt.Errorf("failed to connect to Jetstream: %w", err) 53 } 54 defer func() { 55 if closeErr := conn.Close(); closeErr != nil { 56 log.Printf("Failed to close WebSocket connection: %v", closeErr) 57 } 58 }() 59 60 log.Println("Connected to Jetstream (post consumer)") 61 62 // Set read deadline to detect connection issues 63 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { 64 log.Printf("Failed to set read deadline: %v", err) 65 } 66 67 // Set pong handler to keep connection alive 68 conn.SetPongHandler(func(string) error { 69 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { 70 log.Printf("Failed to set read deadline in pong handler: %v", err) 71 } 72 return nil 73 }) 74 75 // Start ping ticker 76 ticker := time.NewTicker(30 * time.Second) 77 defer ticker.Stop() 78 79 done := make(chan struct{}) 80 var closeOnce sync.Once // Ensure done channel is only closed once 81 82 // Ping goroutine 83 go func() { 84 for { 85 select { 86 case <-ticker.C: 87 if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil { 88 log.Printf("Failed to send ping: %v", err) 89 closeOnce.Do(func() { close(done) }) 90 return 91 } 92 case <-done: 93 return 94 } 95 } 96 }() 97 98 // Read loop 99 for { 100 select { 101 case <-done: 102 return fmt.Errorf("connection closed by ping failure") 103 default: 104 } 105 106 _, message, err := conn.ReadMessage() 107 if err != nil { 108 closeOnce.Do(func() { close(done) }) 109 return fmt.Errorf("read error: %w", err) 110 } 111 112 // Parse Jetstream event 113 var event JetstreamEvent 114 if err := json.Unmarshal(message, &event); err != nil { 115 log.Printf("Failed to parse Jetstream event: %v", err) 116 continue 117 } 118 119 // Process event through consumer 120 if err := c.consumer.HandleEvent(ctx, &event); err != nil { 121 log.Printf("Failed to handle post event: %v", err) 122 // Continue processing other events even if one fails 123 } 124 } 125}