1package main
2
3import (
4 "context"
5 "flag"
6 "fmt"
7 "strings"
8 "time"
9
10 "tangled.sh/tangled.sh/core/knotclient"
11)
12
13func main() {
14 knots := flag.String("knots", "", "list of knots to connect to")
15 retryFlag := flag.Duration("retry", 1*time.Minute, "retry interval")
16 maxRetryFlag := flag.Duration("max-retry", 30*time.Minute, "max retry interval")
17 workerCount := flag.Int("workers", 10, "goroutine pool size")
18
19 flag.Parse()
20
21 if *knots == "" {
22 fmt.Println("error: -knots is required")
23 flag.Usage()
24 return
25 }
26
27 var srcs []knotclient.EventSource
28 for k := range strings.SplitSeq(*knots, ",") {
29 srcs = append(srcs, knotclient.EventSource{k})
30 }
31
32 consumer := knotclient.NewEventConsumer(knotclient.ConsumerConfig{
33 Sources: srcs,
34 ProcessFunc: processEvent,
35 RetryInterval: *retryFlag,
36 MaxRetryInterval: *maxRetryFlag,
37 WorkerCount: *workerCount,
38 Dev: true,
39 })
40
41 ctx, cancel := context.WithCancel(context.Background())
42 consumer.Start(ctx)
43 time.Sleep(1 * time.Hour)
44 cancel()
45 consumer.Stop()
46}
47
48func processEvent(source knotclient.EventSource, msg knotclient.Message) error {
49 fmt.Printf("From %s (%s, %s): %s\n", source.Knot, msg.Rkey, msg.Nsid, string(msg.EventJson))
50 return nil
51}