1package main
2
3import (
4 "context"
5 "flag"
6 "fmt"
7 "strings"
8 "time"
9
10 "tangled.sh/tangled.sh/core/knotclient"
11)
12
13func main() {
14 knots := flag.String("knots", "", "list of knots to connect to")
15 retryFlag := flag.Duration("retry", 1*time.Minute, "retry interval")
16 maxRetryFlag := flag.Duration("max-retry", 30*time.Minute, "max retry interval")
17 workerCount := flag.Int("workers", 10, "goroutine pool size")
18
19 flag.Parse()
20
21 if *knots == "" {
22 fmt.Println("error: -knots is required")
23 flag.Usage()
24 return
25 }
26
27 ccfg := knotclient.ConsumerConfig{
28 ProcessFunc: processEvent,
29 RetryInterval: *retryFlag,
30 MaxRetryInterval: *maxRetryFlag,
31 WorkerCount: *workerCount,
32 Dev: true,
33 }
34 for k := range strings.SplitSeq(*knots, ",") {
35 ccfg.AddEventSource(knotclient.NewEventSource(k))
36 }
37
38 consumer := knotclient.NewEventConsumer(ccfg)
39
40 ctx, cancel := context.WithCancel(context.Background())
41 consumer.Start(ctx)
42 time.Sleep(1 * time.Hour)
43 cancel()
44 consumer.Stop()
45}
46
47func processEvent(_ context.Context, source knotclient.EventSource, msg knotclient.Message) error {
48 fmt.Printf("From %s (%s, %s): %s\n", source.Knot, msg.Rkey, msg.Nsid, string(msg.EventJson))
49 return nil
50}