A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "sync"
9 "time"
10
11 "github.com/gorilla/websocket"
12)
13
14// VoteJetstreamConnector handles WebSocket connection to Jetstream for vote events
15type VoteJetstreamConnector struct {
16 consumer *VoteEventConsumer
17 wsURL string
18}
19
20// NewVoteJetstreamConnector creates a new Jetstream WebSocket connector for vote events
21func NewVoteJetstreamConnector(consumer *VoteEventConsumer, wsURL string) *VoteJetstreamConnector {
22 return &VoteJetstreamConnector{
23 consumer: consumer,
24 wsURL: wsURL,
25 }
26}
27
28// Start begins consuming events from Jetstream
29// Runs indefinitely, reconnecting on errors
30func (c *VoteJetstreamConnector) Start(ctx context.Context) error {
31 log.Printf("Starting Jetstream vote consumer: %s", c.wsURL)
32
33 for {
34 select {
35 case <-ctx.Done():
36 log.Println("Jetstream vote consumer shutting down")
37 return ctx.Err()
38 default:
39 if err := c.connect(ctx); err != nil {
40 log.Printf("Jetstream vote connection error: %v. Retrying in 5s...", err)
41 time.Sleep(5 * time.Second)
42 continue
43 }
44 }
45 }
46}
47
48// connect establishes WebSocket connection and processes events
49func (c *VoteJetstreamConnector) connect(ctx context.Context) error {
50 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil)
51 if err != nil {
52 return fmt.Errorf("failed to connect to Jetstream: %w", err)
53 }
54 defer func() {
55 if closeErr := conn.Close(); closeErr != nil {
56 log.Printf("Failed to close WebSocket connection: %v", closeErr)
57 }
58 }()
59
60 log.Println("Connected to Jetstream (vote consumer)")
61
62 // Set read deadline to detect connection issues
63 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
64 log.Printf("Failed to set read deadline: %v", err)
65 }
66
67 // Set pong handler to keep connection alive
68 conn.SetPongHandler(func(string) error {
69 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
70 log.Printf("Failed to set read deadline in pong handler: %v", err)
71 }
72 return nil
73 })
74
75 // Start ping ticker
76 ticker := time.NewTicker(30 * time.Second)
77 defer ticker.Stop()
78
79 done := make(chan struct{})
80 var closeOnce sync.Once // Ensure done channel is only closed once
81
82 // Ping goroutine
83 go func() {
84 for {
85 select {
86 case <-ticker.C:
87 if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil {
88 log.Printf("Failed to send ping: %v", err)
89 closeOnce.Do(func() { close(done) })
90 return
91 }
92 case <-done:
93 return
94 }
95 }
96 }()
97
98 // Read loop
99 for {
100 select {
101 case <-done:
102 return fmt.Errorf("connection closed by ping failure")
103 default:
104 }
105
106 _, message, err := conn.ReadMessage()
107 if err != nil {
108 closeOnce.Do(func() { close(done) })
109 return fmt.Errorf("read error: %w", err)
110 }
111
112 // Parse Jetstream event
113 var event JetstreamEvent
114 if err := json.Unmarshal(message, &event); err != nil {
115 log.Printf("Failed to parse Jetstream event: %v", err)
116 continue
117 }
118
119 // Process event through consumer
120 if err := c.consumer.HandleEvent(ctx, &event); err != nil {
121 log.Printf("Failed to handle vote event: %v", err)
122 // Continue processing other events even if one fails
123 }
124 }
125}