1package main
2
3import (
4 "context"
5 "flag"
6 "log/slog"
7 "os"
8 "os/signal"
9 "strings"
10 "syscall"
11 "time"
12
13 "github.com/bluesky-social/jetstream/pkg/client"
14 "github.com/bluesky-social/jetstream/pkg/models"
15 "tangled.sh/tangled.sh/core/jetstream"
16)
17
18// Simple in-memory implementation of DB interface
19type MemoryDB struct {
20 lastTimeUs int64
21}
22
23func (m *MemoryDB) GetLastTimeUs() (int64, error) {
24 if m.lastTimeUs == 0 {
25 return time.Now().UnixMicro(), nil
26 }
27 return m.lastTimeUs, nil
28}
29
30func (m *MemoryDB) SaveLastTimeUs(ts int64) error {
31 m.lastTimeUs = ts
32 return nil
33}
34
35func (m *MemoryDB) UpdateLastTimeUs(ts int64) error {
36 m.lastTimeUs = ts
37 return nil
38}
39
40func main() {
41 // Setup logger
42 logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
43 Level: slog.LevelInfo,
44 }))
45
46 // Create in-memory DB
47 db := &MemoryDB{}
48
49 // Get query URL from flag
50 var queryURL string
51 flag.StringVar(&queryURL, "query-url", "", "Jetstream query URL containing DIDs")
52 flag.Parse()
53
54 if queryURL == "" {
55 logger.Error("No query URL provided, use --query-url flag")
56 os.Exit(1)
57 }
58
59 // Extract wantedDids parameters
60 didParams := strings.Split(queryURL, "&wantedDids=")
61 dids := make([]string, 0, len(didParams)-1)
62 for i, param := range didParams {
63 if i == 0 {
64 // Skip the first part (the base URL with cursor)
65 continue
66 }
67 dids = append(dids, param)
68 }
69
70 // Extract collections
71 collections := []string{"sh.tangled.publicKey", "sh.tangled.knot.member"}
72
73 // Create client configuration
74 cfg := client.DefaultClientConfig()
75 cfg.WebsocketURL = "wss://jetstream2.us-west.bsky.network/subscribe"
76 cfg.WantedCollections = collections
77
78 // Create jetstream client
79 jsClient, err := jetstream.NewJetstreamClient(
80 cfg.WebsocketURL,
81 "tangled-jetstream",
82 collections,
83 cfg,
84 logger,
85 db,
86 false,
87 )
88 if err != nil {
89 logger.Error("Failed to create jetstream client", "error", err)
90 os.Exit(1)
91 }
92
93 // Update DIDs
94 jsClient.UpdateDids(dids)
95
96 // Create a context that will be canceled on SIGINT or SIGTERM
97 ctx, cancel := context.WithCancel(context.Background())
98 defer cancel()
99
100 // Setup signal handling with a buffered channel
101 sigCh := make(chan os.Signal, 1)
102 signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
103
104 // Process function for events
105 processFunc := func(ctx context.Context, event *models.Event) error {
106 // Log the event details
107 logger.Info("Received event",
108 "collection", event.Commit.Collection,
109 "did", event.Did,
110 "rkey", event.Commit.RKey,
111 "action", event.Kind,
112 "time_us", event.TimeUS,
113 )
114
115 // Save the last time_us
116 if err := db.UpdateLastTimeUs(event.TimeUS); err != nil {
117 logger.Error("Failed to update last time_us", "error", err)
118 }
119
120 return nil
121 }
122
123 // Start jetstream
124 if err := jsClient.StartJetstream(ctx, processFunc); err != nil {
125 logger.Error("Failed to start jetstream", "error", err)
126 os.Exit(1)
127 }
128
129 // Wait for signal instead of context.Done()
130 sig := <-sigCh
131 logger.Info("Received signal, shutting down", "signal", sig)
132 cancel() // Cancel context after receiving signal
133
134 // Shutdown gracefully with a timeout
135 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
136 defer shutdownCancel()
137
138 done := make(chan struct{})
139 go func() {
140 jsClient.Shutdown()
141 close(done)
142 }()
143
144 select {
145 case <-done:
146 logger.Info("Jetstream client shut down gracefully")
147 case <-shutdownCtx.Done():
148 logger.Warn("Shutdown timed out, forcing exit")
149 }
150}