this repo has no description

add seattle feed

+20 -12
peruse/handle_baseball_feed.go
···
type BaseballFeed struct {
conn driver.Conn
logger *slog.Logger
-
cached []BaseballPost
cacheExpiresAt time.Time
mu sync.RWMutex
}
-
type BaseballPost struct {
LikeCt uint64 `ch:"like_ct"`
Uri string `ch:"uri"`
CreatedAt time.Time `ch:"created_at"`
···
func (f *BaseballFeed) HandleGetFeedSkeleton(e echo.Context, req FeedSkeletonRequest) error {
ctx := e.Request().Context()
-
cursor := time.Now()
-
if req.Cursor != "" {
-
maybeCursor, err := msToTime(req.Cursor)
-
if err != nil {
-
f.logger.Error("error getting time from cursor", "error", err)
-
return helpers.InputError(e, "InputError", "Invalid cursor for feed")
-
}
-
cursor = maybeCursor
}
posts, err := f.getPosts(ctx)
···
})
}
-
func (f *BaseballFeed) getPosts(ctx context.Context) ([]BaseballPost, error) {
f.mu.RLock()
f.mu.RUnlock()
if f.cached != nil && time.Now().Before(f.cacheExpiresAt) {
···
return f.cached, nil
}
-
var posts []BaseballPost
if err := f.conn.Select(ctx, &posts, baseballQuery); err != nil {
return nil, err
}
···
ORDER BY decay_score DESC
LIMIT 5000
`
···
type BaseballFeed struct {
conn driver.Conn
logger *slog.Logger
+
cached []RankedFeedPost
cacheExpiresAt time.Time
mu sync.RWMutex
}
+
type RankedFeedPost struct {
LikeCt uint64 `ch:"like_ct"`
Uri string `ch:"uri"`
CreatedAt time.Time `ch:"created_at"`
···
func (f *BaseballFeed) HandleGetFeedSkeleton(e echo.Context, req FeedSkeletonRequest) error {
ctx := e.Request().Context()
+
cursor, err := getTimeBasedCursor(req)
+
if err != nil {
+
f.logger.Error("error getting cursor", "error", err)
+
return helpers.InputError(e, "FeedError", "Invalid cursor for feed")
}
posts, err := f.getPosts(ctx)
···
})
}
+
func (f *BaseballFeed) getPosts(ctx context.Context) ([]RankedFeedPost, error) {
f.mu.RLock()
f.mu.RUnlock()
if f.cached != nil && time.Now().Before(f.cacheExpiresAt) {
···
return f.cached, nil
}
+
var posts []RankedFeedPost
if err := f.conn.Select(ctx, &posts, baseballQuery); err != nil {
return nil, err
}
···
ORDER BY decay_score DESC
LIMIT 5000
`
+
+
func getTimeBasedCursor(req FeedSkeletonRequest) (time.Time, error) {
+
cursor := time.Now()
+
if req.Cursor != "" {
+
maybeCursor, err := msToTime(req.Cursor)
+
if err != nil {
+
return time.Time{}, fmt.Errorf("error getting time from cursor: %w", err)
+
}
+
cursor = maybeCursor
+
}
+
return cursor, nil
+
}
+115
peruse/handle_seattle_feed.go
···
···
+
package peruse
+
+
import (
+
"context"
+
"fmt"
+
"log/slog"
+
"sync"
+
"time"
+
+
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
+
"github.com/haileyok/peruse/internal/helpers"
+
"github.com/labstack/echo/v4"
+
)
+
+
type SeattleFeed struct {
+
conn driver.Conn
+
logger *slog.Logger
+
cached []RankedFeedPost
+
cacheExpiresAt time.Time
+
mu sync.RWMutex
+
}
+
+
func NewSeattleFeed(s *Server) *SeattleFeed {
+
logger := s.logger.With("feed", "seattle-feed")
+
return &SeattleFeed{
+
conn: s.conn,
+
logger: logger,
+
}
+
}
+
+
func (f *SeattleFeed) Name() string {
+
return "seattle"
+
}
+
+
func (f *SeattleFeed) HandleGetFeedSkeleton(e echo.Context, req FeedSkeletonRequest) error {
+
ctx := e.Request().Context()
+
+
cursor, err := getTimeBasedCursor(req)
+
if err != nil {
+
f.logger.Error("error getting cursor", "error", err)
+
return helpers.InputError(e, "FeedError", "Invalid cursor for feed")
+
}
+
+
posts, err := f.getPosts(ctx)
+
if err != nil {
+
f.logger.Error("error getting posts", "error", err)
+
return helpers.ServerError(e, "FeedError", "Unable to get posts for feed")
+
}
+
+
for i, p := range posts {
+
if p.CreatedAt.Before(cursor) {
+
posts = posts[i:]
+
break
+
}
+
}
+
+
if len(posts) > 30 {
+
posts = posts[:30]
+
}
+
+
var items []FeedPostItem
+
for _, p := range posts {
+
items = append(items, FeedPostItem{
+
Post: p.Uri,
+
})
+
}
+
+
newCursor := fmt.Sprintf("%d", posts[len(posts)-1].CreatedAt.UnixMilli())
+
+
return e.JSON(200, FeedSkeletonResponse{
+
Feed: items,
+
Cursor: &newCursor,
+
})
+
}
+
+
func (f *SeattleFeed) getPosts(ctx context.Context) ([]RankedFeedPost, error) {
+
now := time.Now()
+
f.mu.RLock()
+
expiresAt := f.cacheExpiresAt
+
posts := f.cached
+
f.mu.RUnlock()
+
if posts != nil && now.Before(expiresAt) {
+
return posts, nil
+
}
+
+
f.mu.Lock()
+
defer f.mu.Unlock()
+
+
if f.cached != nil && now.Before(expiresAt) {
+
return f.cached, nil
+
}
+
+
if err := f.conn.Select(ctx, &posts, seattleQuery); err != nil {
+
return nil, err
+
}
+
f.cached = posts
+
f.cacheExpiresAt = now
+
+
return posts, nil
+
}
+
+
var seattleQuery = `
+
SELECT
+
count(*) as like_ct,
+
sp.uri,
+
sp.created_at,
+
dateDiff('hour', sp.created_at, now()) as hours_old,
+
count(*) * exp(-0.1 * dateDiff('hour', sp.created_at, now())) as decay_score
+
FROM seattle_post sp
+
LEFT JOIN default.like_by_subject i ON sp.uri = i.subject_uri
+
WHERE sp.created_at > now() - INTERVAL 1 DAY
+
GROUP BY sp.uri, sp.created_at
+
ORDER BY decay_score DESC
+
LIMIT 5000
+
`
+1
peruse/peruse.go
···
defer cancel()
s.addFeed(NewBaseballFeed(s))
s.addRoutes()
···
defer cancel()
s.addFeed(NewBaseballFeed(s))
+
s.addFeed(NewSeattleFeed(s))
s.addRoutes()