forked from tangled.org/core
this repo has no description
at drop-at 3.5 kB view raw
1package main 2 3import ( 4 "context" 5 "flag" 6 "log/slog" 7 "os" 8 "os/signal" 9 "strings" 10 "syscall" 11 "time" 12 13 "github.com/bluesky-social/jetstream/pkg/client" 14 "github.com/bluesky-social/jetstream/pkg/models" 15 "tangled.sh/tangled.sh/core/jetstream" 16) 17 18// Simple in-memory implementation of DB interface 19type MemoryDB struct { 20 lastTimeUs int64 21} 22 23func (m *MemoryDB) GetLastTimeUs() (int64, error) { 24 if m.lastTimeUs == 0 { 25 return time.Now().UnixMicro(), nil 26 } 27 return m.lastTimeUs, nil 28} 29 30func (m *MemoryDB) SaveLastTimeUs(ts int64) error { 31 m.lastTimeUs = ts 32 return nil 33} 34 35func (m *MemoryDB) UpdateLastTimeUs(ts int64) error { 36 m.lastTimeUs = ts 37 return nil 38} 39 40func main() { 41 // Setup logger 42 logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ 43 Level: slog.LevelInfo, 44 })) 45 46 // Create in-memory DB 47 db := &MemoryDB{} 48 49 // Get query URL from flag 50 var queryURL string 51 flag.StringVar(&queryURL, "query-url", "", "Jetstream query URL containing DIDs") 52 flag.Parse() 53 54 if queryURL == "" { 55 logger.Error("No query URL provided, use --query-url flag") 56 os.Exit(1) 57 } 58 59 // Extract wantedDids parameters 60 didParams := strings.Split(queryURL, "&wantedDids=") 61 dids := make([]string, 0, len(didParams)-1) 62 for i, param := range didParams { 63 if i == 0 { 64 // Skip the first part (the base URL with cursor) 65 continue 66 } 67 dids = append(dids, param) 68 } 69 70 // Extract collections 71 collections := []string{"sh.tangled.publicKey", "sh.tangled.knot.member"} 72 73 // Create client configuration 74 cfg := client.DefaultClientConfig() 75 cfg.WebsocketURL = "wss://jetstream2.us-west.bsky.network/subscribe" 76 cfg.WantedCollections = collections 77 78 // Create jetstream client 79 jsClient, err := jetstream.NewJetstreamClient( 80 cfg.WebsocketURL, 81 "tangled-jetstream", 82 collections, 83 cfg, 84 logger, 85 db, 86 false, 87 ) 88 if err != nil { 89 logger.Error("Failed to create jetstream client", "error", err) 90 os.Exit(1) 91 } 92 93 // Update DIDs 94 jsClient.UpdateDids(dids) 95 96 // Create a context that will be canceled on SIGINT or SIGTERM 97 ctx, cancel := context.WithCancel(context.Background()) 98 defer cancel() 99 100 // Setup signal handling with a buffered channel 101 sigCh := make(chan os.Signal, 1) 102 signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) 103 104 // Process function for events 105 processFunc := func(ctx context.Context, event *models.Event) error { 106 // Log the event details 107 logger.Info("Received event", 108 "collection", event.Commit.Collection, 109 "did", event.Did, 110 "rkey", event.Commit.RKey, 111 "action", event.Kind, 112 "time_us", event.TimeUS, 113 ) 114 115 // Save the last time_us 116 if err := db.UpdateLastTimeUs(event.TimeUS); err != nil { 117 logger.Error("Failed to update last time_us", "error", err) 118 } 119 120 return nil 121 } 122 123 // Start jetstream 124 if err := jsClient.StartJetstream(ctx, processFunc); err != nil { 125 logger.Error("Failed to start jetstream", "error", err) 126 os.Exit(1) 127 } 128 129 // Wait for signal instead of context.Done() 130 sig := <-sigCh 131 logger.Info("Received signal, shutting down", "signal", sig) 132 cancel() // Cancel context after receiving signal 133 134 // Shutdown gracefully with a timeout 135 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) 136 defer shutdownCancel() 137 138 done := make(chan struct{}) 139 go func() { 140 jsClient.Shutdown() 141 close(done) 142 }() 143 144 select { 145 case <-done: 146 logger.Info("Jetstream client shut down gracefully") 147 case <-shutdownCtx.Done(): 148 logger.Warn("Shutdown timed out, forcing exit") 149 } 150}