eventconsumer: extract knotclient/events into its own package #248

merged
opened by oppi.li targeting master from push-mwkwusmyymno
+25 -48
knotclient/events.go eventconsumer/consumer.go
···
-
package knotclient
+
package eventconsumer
import (
"context"
"encoding/json"
-
"fmt"
"log/slog"
"math/rand"
"net/url"
"sync"
"time"
-
"tangled.sh/tangled.sh/core/knotclient/cursor"
+
"tangled.sh/tangled.sh/core/eventconsumer/cursor"
"tangled.sh/tangled.sh/core/log"
"github.com/gorilla/websocket"
)
-
type ProcessFunc func(ctx context.Context, source EventSource, message Message) error
+
type ProcessFunc func(ctx context.Context, source Source, message Message) error
type Message struct {
Rkey string
···
}
type ConsumerConfig struct {
-
Sources map[EventSource]struct{}
+
Sources map[Source]struct{}
ProcessFunc ProcessFunc
RetryInterval time.Duration
MaxRetryInterval time.Duration
···
func NewConsumerConfig() *ConsumerConfig {
return &ConsumerConfig{
-
Sources: make(map[EventSource]struct{}),
+
Sources: make(map[Source]struct{}),
}
}
-
type EventSource struct {
-
Knot string
+
type Source interface {
+
// url to start streaming events from
+
Url(cursor int64, dev bool) (*url.URL, error)
+
// cache key for cursor storage
+
Key() string
}
-
func NewEventSource(knot string) EventSource {
-
return EventSource{
-
Knot: knot,
-
}
-
}
-
-
type EventConsumer struct {
+
type Consumer struct {
wg sync.WaitGroup
dialer *websocket.Dialer
connMap sync.Map
···
cfg ConsumerConfig
}
-
func (e *EventConsumer) buildUrl(s EventSource, cursor int64) (*url.URL, error) {
-
scheme := "wss"
-
if e.cfg.Dev {
-
scheme = "ws"
-
}
-
-
u, err := url.Parse(scheme + "://" + s.Knot + "/events")
-
if err != nil {
-
return nil, err
-
}
-
-
if cursor != 0 {
-
query := url.Values{}
-
query.Add("cursor", fmt.Sprintf("%d", cursor))
-
u.RawQuery = query.Encode()
-
}
-
return u, nil
-
}
-
type job struct {
-
source EventSource
+
source Source
message []byte
}
-
func NewEventConsumer(cfg ConsumerConfig) *EventConsumer {
+
func NewConsumer(cfg ConsumerConfig) *Consumer {
if cfg.RetryInterval == 0 {
cfg.RetryInterval = 15 * time.Minute
}
···
cfg.MaxRetryInterval = 1 * time.Hour
}
if cfg.Logger == nil {
-
cfg.Logger = log.New("eventconsumer")
+
cfg.Logger = log.New("consumer")
}
if cfg.QueueSize == 0 {
cfg.QueueSize = 100
···
if cfg.CursorStore == nil {
cfg.CursorStore = &cursor.MemoryStore{}
}
-
return &EventConsumer{
+
return &Consumer{
cfg: cfg,
dialer: websocket.DefaultDialer,
jobQueue: make(chan job, cfg.QueueSize), // buffered job queue
···
}
}
-
func (c *EventConsumer) Start(ctx context.Context) {
+
func (c *Consumer) Start(ctx context.Context) {
c.cfg.Logger.Info("starting consumer", "config", c.cfg)
// start workers
···
}
}
-
func (c *EventConsumer) Stop() {
+
func (c *Consumer) Stop() {
c.connMap.Range(func(_, val any) bool {
if conn, ok := val.(*websocket.Conn); ok {
conn.Close()
···
close(c.jobQueue)
}
-
func (c *EventConsumer) AddSource(ctx context.Context, s EventSource) {
+
func (c *Consumer) AddSource(ctx context.Context, s Source) {
// we are already listening to this source
if _, ok := c.cfg.Sources[s]; ok {
c.logger.Info("source already present", "source", s)
···
c.cfgMu.Unlock()
}
-
func (c *EventConsumer) worker(ctx context.Context) {
+
func (c *Consumer) worker(ctx context.Context) {
defer c.wg.Done()
for {
select {
···
var msg Message
err := json.Unmarshal(j.message, &msg)
if err != nil {
-
c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err)
+
c.logger.Error("error deserializing message", "source", j.source.Key(), "err", err)
return
}
// update cursor
-
c.cfg.CursorStore.Set(j.source.Knot, time.Now().UnixNano())
+
c.cfg.CursorStore.Set(j.source.Key(), time.Now().UnixNano())
if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil {
c.logger.Error("error processing message", "source", j.source, "err", err)
···
}
}
-
func (c *EventConsumer) startConnectionLoop(ctx context.Context, source EventSource) {
+
func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) {
defer c.wg.Done()
retryInterval := c.cfg.RetryInterval
for {
···
}
}
-
func (c *EventConsumer) runConnection(ctx context.Context, source EventSource) error {
+
func (c *Consumer) runConnection(ctx context.Context, source Source) error {
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
defer cancel()
-
cursor := c.cfg.CursorStore.Get(source.Knot)
+
cursor := c.cfg.CursorStore.Get(source.Key())
-
u, err := c.buildUrl(source, cursor)
+
u, err := source.Url(cursor, c.cfg.Dev)
if err != nil {
return err
}
knotclient/cursor/memory.go eventconsumer/cursor/memory.go
knotclient/cursor/redis.go eventconsumer/cursor/redis.go
knotclient/cursor/sqlite.go eventconsumer/cursor/sqlite.go
knotclient/cursor/store.go eventconsumer/cursor/store.go
+39
eventconsumer/knot.go
···
+
package eventconsumer
+
+
import (
+
"fmt"
+
"net/url"
+
)
+
+
type KnotSource struct {
+
Knot string
+
}
+
+
func (k KnotSource) Key() string {
+
return k.Knot
+
}
+
+
func (k KnotSource) Url(cursor int64, dev bool) (*url.URL, error) {
+
scheme := "wss"
+
if dev {
+
scheme = "ws"
+
}
+
+
u, err := url.Parse(scheme + "://" + k.Knot + "/events")
+
if err != nil {
+
return nil, err
+
}
+
+
if cursor != 0 {
+
query := url.Values{}
+
query.Add("cursor", fmt.Sprintf("%d", cursor))
+
u.RawQuery = query.Encode()
+
}
+
return u, nil
+
}
+
+
func NewKnotSource(knot string) KnotSource {
+
return KnotSource{
+
Knot: knot,
+
}
+
}
+39
eventconsumer/spindle.go
···
+
package eventconsumer
+
+
import (
+
"fmt"
+
"net/url"
+
)
+
+
type SpindleSource struct {
+
Spindle string
+
}
+
+
func (s SpindleSource) Key() string {
+
return s.Spindle
+
}
+
+
func (s SpindleSource) Url(cursor int64, dev bool) (*url.URL, error) {
+
scheme := "wss"
+
if dev {
+
scheme = "ws"
+
}
+
+
u, err := url.Parse(scheme + "://" + s.Spindle + "/events")
+
if err != nil {
+
return nil, err
+
}
+
+
if cursor != 0 {
+
query := url.Values{}
+
query.Add("cursor", fmt.Sprintf("%d", cursor))
+
u.RawQuery = query.Encode()
+
}
+
return u, nil
+
}
+
+
func NewSpindleSource(spindle string) SpindleSource {
+
return SpindleSource{
+
Spindle: spindle,
+
}
+
}
+2 -2
spindle/ingester.go
···
"fmt"
"tangled.sh/tangled.sh/core/api/tangled"
-
"tangled.sh/tangled.sh/core/knotclient"
+
"tangled.sh/tangled.sh/core/eventconsumer"
"github.com/bluesky-social/jetstream/pkg/models"
)
···
}
// add this knot to the event consumer
-
src := knotclient.NewEventSource(record.Knot)
+
src := eventconsumer.NewKnotSource(record.Knot)
s.ks.AddSource(context.Background(), src)
return nil
+8 -8
spindle/server.go
···
"github.com/go-chi/chi/v5"
"tangled.sh/tangled.sh/core/api/tangled"
+
"tangled.sh/tangled.sh/core/eventconsumer"
+
"tangled.sh/tangled.sh/core/eventconsumer/cursor"
"tangled.sh/tangled.sh/core/jetstream"
-
"tangled.sh/tangled.sh/core/knotclient"
-
"tangled.sh/tangled.sh/core/knotclient/cursor"
"tangled.sh/tangled.sh/core/log"
"tangled.sh/tangled.sh/core/notifier"
"tangled.sh/tangled.sh/core/rbac"
···
eng *engine.Engine
jq *queue.Queue
cfg *config.Config
-
ks *knotclient.EventConsumer
+
ks *eventconsumer.Consumer
}
func Run(ctx context.Context) error {
···
// for each incoming sh.tangled.pipeline, we execute
// spindle.processPipeline, which in turn enqueues the pipeline
// job in the above registered queue.
-
ccfg := knotclient.NewConsumerConfig()
+
ccfg := eventconsumer.NewConsumerConfig()
ccfg.Logger = logger
ccfg.Dev = cfg.Server.Dev
ccfg.ProcessFunc = spindle.processPipeline
···
}
for _, knot := range knownKnots {
logger.Info("adding source start", "knot", knot)
-
ccfg.Sources[knotclient.EventSource{knot}] = struct{}{}
+
ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
}
-
spindle.ks = knotclient.NewEventConsumer(*ccfg)
+
spindle.ks = eventconsumer.NewConsumer(*ccfg)
go func() {
logger.Info("starting knot event consumer")
···
return mux
}
-
func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error {
+
func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
if msg.Nsid == tangled.PipelineNSID {
pipeline := tangled.Pipeline{}
err := json.Unmarshal(msg.EventJson, &pipeline)
···
}
pipelineId := models.PipelineId{
-
Knot: src.Knot,
+
Knot: src.Key(),
Rkey: msg.Rkey,
}
+24 -5
spindle/stream.go
···
import (
"context"
+
"encoding/json"
"fmt"
"net/http"
"strconv"
···
}
func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error {
-
ops, err := s.db.GetEvents(*cursor)
+
events, err := s.db.GetEvents(*cursor)
if err != nil {
s.l.Debug("err", "err", err)
return err
}
-
s.l.Debug("ops", "ops", ops)
+
s.l.Debug("ops", "ops", events)
+
+
for _, event := range events {
+
// first extract the inner json into a map
+
var eventJson map[string]any
+
err := json.Unmarshal([]byte(event.EventJson), &eventJson)
+
if err != nil {
+
s.l.Error("failed to unmarshal event", "err", err)
+
return err
+
}
+
+
jsonMsg, err := json.Marshal(map[string]any{
+
"rkey": event.Rkey,
+
"nsid": event.Nsid,
+
"event": eventJson,
+
})
+
if err != nil {
+
s.l.Error("failed to marshal record", "err", err)
+
return err
+
}
-
for _, op := range ops {
-
if err := conn.WriteJSON(op); err != nil {
+
if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
s.l.Debug("err", "err", err)
return err
}
-
*cursor = op.Created
+
*cursor = event.Created
}
return nil