A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "sync" 9 "time" 10 11 "github.com/gorilla/websocket" 12) 13 14// CommunityJetstreamConnector handles WebSocket connection to Jetstream for community events 15type CommunityJetstreamConnector struct { 16 consumer *CommunityEventConsumer 17 wsURL string 18} 19 20// NewCommunityJetstreamConnector creates a new Jetstream WebSocket connector for community events 21func NewCommunityJetstreamConnector(consumer *CommunityEventConsumer, wsURL string) *CommunityJetstreamConnector { 22 return &CommunityJetstreamConnector{ 23 consumer: consumer, 24 wsURL: wsURL, 25 } 26} 27 28// Start begins consuming events from Jetstream 29// Runs indefinitely, reconnecting on errors 30func (c *CommunityJetstreamConnector) Start(ctx context.Context) error { 31 log.Printf("Starting Jetstream community consumer: %s", c.wsURL) 32 33 for { 34 select { 35 case <-ctx.Done(): 36 log.Println("Jetstream community consumer shutting down") 37 return ctx.Err() 38 default: 39 if err := c.connect(ctx); err != nil { 40 log.Printf("Jetstream community connection error: %v. Retrying in 5s...", err) 41 time.Sleep(5 * time.Second) 42 continue 43 } 44 } 45 } 46} 47 48// connect establishes WebSocket connection and processes events 49func (c *CommunityJetstreamConnector) connect(ctx context.Context) error { 50 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil) 51 if err != nil { 52 return fmt.Errorf("failed to connect to Jetstream: %w", err) 53 } 54 defer func() { 55 if closeErr := conn.Close(); closeErr != nil { 56 log.Printf("Failed to close WebSocket connection: %v", closeErr) 57 } 58 }() 59 60 log.Println("Connected to Jetstream (community consumer)") 61 62 // Set read deadline to detect connection issues 63 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { 64 log.Printf("Failed to set read deadline: %v", err) 65 } 66 67 // Set pong handler to keep connection alive 68 conn.SetPongHandler(func(string) error { 69 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { 70 log.Printf("Failed to set read deadline in pong handler: %v", err) 71 } 72 return nil 73 }) 74 75 // Start ping ticker 76 ticker := time.NewTicker(30 * time.Second) 77 defer ticker.Stop() 78 79 done := make(chan struct{}) 80 var closeOnce sync.Once // Ensure done channel is only closed once 81 82 // Goroutine to send pings 83 go func() { 84 for { 85 select { 86 case <-ticker.C: 87 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 88 log.Printf("Ping error: %v", err) 89 closeOnce.Do(func() { close(done) }) 90 return 91 } 92 case <-done: 93 return 94 case <-ctx.Done(): 95 return 96 } 97 } 98 }() 99 100 // Read messages 101 for { 102 select { 103 case <-ctx.Done(): 104 return ctx.Err() 105 case <-done: 106 return fmt.Errorf("connection closed") 107 default: 108 _, message, err := conn.ReadMessage() 109 if err != nil { 110 closeOnce.Do(func() { close(done) }) 111 return fmt.Errorf("read error: %w", err) 112 } 113 114 // Reset read deadline on successful read 115 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { 116 log.Printf("Failed to set read deadline: %v", err) 117 } 118 119 if err := c.handleEvent(ctx, message); err != nil { 120 log.Printf("Error handling community event: %v", err) 121 // Continue processing other events 122 } 123 } 124 } 125} 126 127// handleEvent processes a single Jetstream event 128func (c *CommunityJetstreamConnector) handleEvent(ctx context.Context, data []byte) error { 129 var event JetstreamEvent 130 if err := json.Unmarshal(data, &event); err != nil { 131 return fmt.Errorf("failed to parse event: %w", err) 132 } 133 134 // Pass to consumer's HandleEvent method 135 return c.consumer.HandleEvent(ctx, &event) 136}