forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package main 2 3import ( 4 "context" 5 "flag" 6 "fmt" 7 "strings" 8 "time" 9 10 "tangled.sh/tangled.sh/core/knotclient" 11) 12 13func main() { 14 knots := flag.String("knots", "", "list of knots to connect to") 15 retryFlag := flag.Duration("retry", 1*time.Minute, "retry interval") 16 maxRetryFlag := flag.Duration("max-retry", 30*time.Minute, "max retry interval") 17 workerCount := flag.Int("workers", 10, "goroutine pool size") 18 19 flag.Parse() 20 21 if *knots == "" { 22 fmt.Println("error: -knots is required") 23 flag.Usage() 24 return 25 } 26 27 var srcs []knotclient.EventSource 28 for k := range strings.SplitSeq(*knots, ",") { 29 srcs = append(srcs, knotclient.EventSource{k}) 30 } 31 32 consumer := knotclient.NewEventConsumer(knotclient.ConsumerConfig{ 33 Sources: srcs, 34 ProcessFunc: processEvent, 35 RetryInterval: *retryFlag, 36 MaxRetryInterval: *maxRetryFlag, 37 WorkerCount: *workerCount, 38 Dev: true, 39 }) 40 41 ctx, cancel := context.WithCancel(context.Background()) 42 consumer.Start(ctx) 43 time.Sleep(1 * time.Hour) 44 cancel() 45 consumer.Stop() 46} 47 48func processEvent(source knotclient.EventSource, msg knotclient.Message) error { 49 fmt.Printf("From %s (%s, %s): %s\n", source.Knot, msg.Rkey, msg.Nsid, string(msg.EventJson)) 50 return nil 51}