forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package main
2
3import (
4 "context"
5 "flag"
6 "fmt"
7 "strings"
8 "time"
9
10 "tangled.sh/tangled.sh/core/knotclient"
11)
12
13func main() {
14 sourcesFlag := flag.String("sources", "", "list of wss sources")
15 retryFlag := flag.Duration("retry", 1*time.Minute, "retry interval")
16 maxRetryFlag := flag.Duration("max-retry", 30*time.Minute, "max retry interval")
17 workerCount := flag.Int("workers", 10, "goroutine pool size")
18
19 flag.Parse()
20
21 if *sourcesFlag == "" {
22 fmt.Println("error: -sources is required")
23 flag.Usage()
24 return
25 }
26
27 sources := strings.Split(*sourcesFlag, ",")
28
29 consumer := knotclient.NewEventConsumer(knotclient.ConsumerConfig{
30 Sources: sources,
31 ProcessFunc: processEvent,
32 RetryInterval: *retryFlag,
33 MaxRetryInterval: *maxRetryFlag,
34 WorkerCount: *workerCount,
35 })
36
37 ctx, cancel := context.WithCancel(context.Background())
38 consumer.Start(ctx)
39 time.Sleep(1 * time.Hour)
40 cancel()
41 consumer.Stop()
42}
43
44func processEvent(source string, msg []byte) error {
45 fmt.Printf("From %s: %s\n", source, string(msg))
46 return nil
47}