A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "sync"
9 "time"
10
11 "github.com/gorilla/websocket"
12)
13
14// AggregatorJetstreamConnector handles WebSocket connection to Jetstream for aggregator events
15type AggregatorJetstreamConnector struct {
16 consumer *AggregatorEventConsumer
17 wsURL string
18}
19
20// NewAggregatorJetstreamConnector creates a new Jetstream WebSocket connector for aggregator events
21func NewAggregatorJetstreamConnector(consumer *AggregatorEventConsumer, wsURL string) *AggregatorJetstreamConnector {
22 return &AggregatorJetstreamConnector{
23 consumer: consumer,
24 wsURL: wsURL,
25 }
26}
27
28// Start begins consuming events from Jetstream
29// Runs indefinitely, reconnecting on errors
30func (c *AggregatorJetstreamConnector) Start(ctx context.Context) error {
31 log.Printf("Starting Jetstream aggregator consumer: %s", c.wsURL)
32
33 for {
34 select {
35 case <-ctx.Done():
36 log.Println("Jetstream aggregator consumer shutting down")
37 return ctx.Err()
38 default:
39 if err := c.connect(ctx); err != nil {
40 log.Printf("Jetstream aggregator connection error: %v. Retrying in 5s...", err)
41 time.Sleep(5 * time.Second)
42 continue
43 }
44 }
45 }
46}
47
48// connect establishes WebSocket connection and processes events
49func (c *AggregatorJetstreamConnector) connect(ctx context.Context) error {
50 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil)
51 if err != nil {
52 return fmt.Errorf("failed to connect to Jetstream: %w", err)
53 }
54 defer func() {
55 if closeErr := conn.Close(); closeErr != nil {
56 log.Printf("Failed to close WebSocket connection: %v", closeErr)
57 }
58 }()
59
60 log.Println("Connected to Jetstream (aggregator consumer)")
61
62 // Set read deadline to detect connection issues
63 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
64 log.Printf("Failed to set read deadline: %v", err)
65 }
66
67 // Set pong handler to keep connection alive
68 conn.SetPongHandler(func(string) error {
69 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
70 log.Printf("Failed to set read deadline in pong handler: %v", err)
71 }
72 return nil
73 })
74
75 // Start ping ticker
76 ticker := time.NewTicker(30 * time.Second)
77 defer ticker.Stop()
78
79 done := make(chan struct{})
80 var closeOnce sync.Once // Ensure done channel is only closed once
81
82 // Goroutine to send pings
83 go func() {
84 for {
85 select {
86 case <-ticker.C:
87 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
88 log.Printf("Ping error: %v", err)
89 closeOnce.Do(func() { close(done) })
90 return
91 }
92 case <-done:
93 return
94 case <-ctx.Done():
95 return
96 }
97 }
98 }()
99
100 // Read messages
101 for {
102 select {
103 case <-ctx.Done():
104 return ctx.Err()
105 case <-done:
106 return fmt.Errorf("connection closed")
107 default:
108 _, message, err := conn.ReadMessage()
109 if err != nil {
110 closeOnce.Do(func() { close(done) })
111 return fmt.Errorf("read error: %w", err)
112 }
113
114 // Reset read deadline on successful read
115 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
116 log.Printf("Failed to set read deadline: %v", err)
117 }
118
119 if err := c.handleEvent(ctx, message); err != nil {
120 log.Printf("Error handling aggregator event: %v", err)
121 // Continue processing other events
122 }
123 }
124 }
125}
126
127// handleEvent processes a single Jetstream event
128func (c *AggregatorJetstreamConnector) handleEvent(ctx context.Context, data []byte) error {
129 var event JetstreamEvent
130 if err := json.Unmarshal(data, &event); err != nil {
131 return fmt.Errorf("failed to parse event: %w", err)
132 }
133
134 // Pass to consumer's HandleEvent method
135 return c.consumer.HandleEvent(ctx, &event)
136}