forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package main 2 3import ( 4 "context" 5 "flag" 6 "fmt" 7 "strings" 8 "time" 9 10 "tangled.sh/tangled.sh/core/knotclient" 11) 12 13func main() { 14 knots := flag.String("knots", "", "list of knots to connect to") 15 retryFlag := flag.Duration("retry", 1*time.Minute, "retry interval") 16 maxRetryFlag := flag.Duration("max-retry", 30*time.Minute, "max retry interval") 17 workerCount := flag.Int("workers", 10, "goroutine pool size") 18 19 flag.Parse() 20 21 if *knots == "" { 22 fmt.Println("error: -knots is required") 23 flag.Usage() 24 return 25 } 26 27 ccfg := knotclient.ConsumerConfig{ 28 ProcessFunc: processEvent, 29 RetryInterval: *retryFlag, 30 MaxRetryInterval: *maxRetryFlag, 31 WorkerCount: *workerCount, 32 Dev: true, 33 } 34 for k := range strings.SplitSeq(*knots, ",") { 35 ccfg.AddEventSource(knotclient.NewEventSource(k)) 36 } 37 38 consumer := knotclient.NewEventConsumer(ccfg) 39 40 ctx, cancel := context.WithCancel(context.Background()) 41 consumer.Start(ctx) 42 time.Sleep(1 * time.Hour) 43 cancel() 44 consumer.Stop() 45} 46 47func processEvent(_ context.Context, source knotclient.EventSource, msg knotclient.Message) error { 48 fmt.Printf("From %s (%s, %s): %s\n", source.Knot, msg.Rkey, msg.Nsid, string(msg.EventJson)) 49 return nil 50}