forked from tangled.org/core
this repo has no description

defer last event time in appview ingester

Changed files
+51 -31
appview
knotserver
-2
appview/db/timeline.go
···
package db
import (
-
"log"
"sort"
"time"
)
···
}
for _, repo := range repos {
-
log.Println(repo.Created)
events = append(events, TimelineEvent{
Repo: &repo,
Follow: nil,
+50
appview/state/jetstream.go
···
+
package state
+
+
import (
+
"context"
+
"encoding/json"
+
"fmt"
+
"log"
+
+
"github.com/bluesky-social/jetstream/pkg/models"
+
tangled "github.com/sotangled/tangled/api/tangled"
+
"github.com/sotangled/tangled/appview/db"
+
)
+
+
type Ingester func(ctx context.Context, e *models.Event) error
+
+
func jetstreamIngester(db *db.DB) Ingester {
+
return func(ctx context.Context, e *models.Event) error {
+
var err error
+
defer func() {
+
eventTime := e.TimeUS
+
lastTimeUs := eventTime + 1
+
if err := db.UpdateLastTimeUs(lastTimeUs); err != nil {
+
err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
+
}
+
}()
+
+
if e.Kind != models.EventKindCommit {
+
return nil
+
}
+
+
did := e.Did
+
raw := json.RawMessage(e.Commit.Record)
+
+
switch e.Commit.Collection {
+
case tangled.GraphFollowNSID:
+
record := tangled.GraphFollow{}
+
err := json.Unmarshal(raw, &record)
+
if err != nil {
+
log.Println("invalid record")
+
return err
+
}
+
err = db.AddFollow(did, record.Subject, e.Commit.RKey)
+
if err != nil {
+
return fmt.Errorf("failed to add follow to db: %w", err)
+
}
+
}
+
+
return err
+
}
+
}
+1 -27
appview/state/state.go
···
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
-
"encoding/json"
"fmt"
"log"
"log/slog"
···
comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/atproto/syntax"
lexutil "github.com/bluesky-social/indigo/lex/util"
-
"github.com/bluesky-social/jetstream/pkg/models"
securejoin "github.com/cyphar/filepath-securejoin"
"github.com/go-chi/chi/v5"
tangled "github.com/sotangled/tangled/api/tangled"
···
if err != nil {
return nil, fmt.Errorf("failed to create jetstream client: %w", err)
}
-
err = jc.StartJetstream(context.Background(), func(ctx context.Context, e *models.Event) error {
-
if e.Kind != models.EventKindCommit {
-
return nil
-
}
-
-
did := e.Did
-
raw := json.RawMessage(e.Commit.Record)
-
-
switch e.Commit.Collection {
-
case tangled.GraphFollowNSID:
-
record := tangled.GraphFollow{}
-
err := json.Unmarshal(raw, &record)
-
if err != nil {
-
log.Println("invalid record")
-
return err
-
}
-
err = db.AddFollow(did, record.Subject, e.Commit.RKey)
-
if err != nil {
-
return fmt.Errorf("failed to add follow to db: %w", err)
-
}
-
return db.UpdateLastTimeUs(e.TimeUS)
-
}
-
-
return nil
-
})
+
err = jc.StartJetstream(context.Background(), jetstreamIngester(db))
if err != nil {
return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
}
-2
knotserver/middleware.go
···
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
-
"log"
"net/http"
"time"
)
···
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
signature := r.Header.Get("X-Signature")
-
log.Println(signature)
if signature == "" || !h.verifyHMAC(signature, r) {
writeError(w, "signature verification failed", http.StatusForbidden)
return