this repo has no description

add a baseball feed

+136
peruse/handle_baseball_feed.go
···
+
package peruse
+
+
import (
+
"context"
+
"fmt"
+
"log/slog"
+
"strconv"
+
"sync"
+
"time"
+
+
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
+
"github.com/haileyok/peruse/internal/helpers"
+
"github.com/labstack/echo/v4"
+
)
+
+
type BaseballFeed struct {
+
conn driver.Conn
+
logger *slog.Logger
+
cached []BaseballPost
+
cacheExpiresAt time.Time
+
mu sync.RWMutex
+
}
+
+
type BaseballPost struct {
+
LikeCt uint64 `ch:"like_ct"`
+
Uri string `ch:"uri"`
+
CreatedAt time.Time `ch:"created_at"`
+
HoursOld uint64 `ch:"hours_old"`
+
DecayScore float64 `ch:"decay_score"`
+
}
+
+
func NewBaseballFeed(s *Server) *BaseballFeed {
+
logger := s.logger.With("feed", "baseball-feed")
+
return &BaseballFeed{
+
conn: s.conn,
+
logger: logger,
+
cached: nil,
+
}
+
}
+
+
func (f *BaseballFeed) Name() string {
+
return "baseball"
+
}
+
+
func (f *BaseballFeed) HandleGetFeedSkeleton(e echo.Context, req FeedSkeletonRequest) error {
+
ctx := e.Request().Context()
+
+
cursor := time.Now()
+
if req.Cursor != "" {
+
maybeCursor, err := msToTime(req.Cursor)
+
if err != nil {
+
f.logger.Error("error getting time from cursor", "error", err)
+
return helpers.InputError(e, "InputError", "Invalid cursor for feed")
+
}
+
cursor = maybeCursor
+
}
+
+
posts, err := f.getPosts(ctx)
+
if err != nil {
+
f.logger.Error("error getting posts", "error", err)
+
return helpers.ServerError(e, "FeedError", "Unable to get posts for feed")
+
}
+
+
if len(posts) == 0 {
+
return helpers.ServerError(e, "FeedError", "Not enough posts")
+
}
+
+
for i, p := range posts {
+
if p.CreatedAt.Before(cursor) {
+
posts = posts[i:]
+
break
+
}
+
}
+
+
if len(posts) > 30 {
+
posts = posts[:30]
+
}
+
+
var items []FeedPostItem
+
for _, p := range posts {
+
items = append(items, FeedPostItem{Post: p.Uri})
+
}
+
+
newCursor := fmt.Sprintf("%d", posts[len(posts)-1].CreatedAt.UnixMilli())
+
+
return e.JSON(200, FeedSkeletonResponse{
+
Feed: items,
+
Cursor: &newCursor,
+
})
+
}
+
+
func (f *BaseballFeed) getPosts(ctx context.Context) ([]BaseballPost, error) {
+
f.mu.RLock()
+
f.mu.RUnlock()
+
if f.cached != nil && time.Now().Before(f.cacheExpiresAt) {
+
return f.cached, nil
+
}
+
+
f.mu.Lock()
+
defer f.mu.Unlock()
+
+
if f.cached != nil && time.Now().Before(f.cacheExpiresAt) {
+
return f.cached, nil
+
}
+
+
var posts []BaseballPost
+
if err := f.conn.Select(ctx, &posts, baseballQuery); err != nil {
+
return nil, err
+
}
+
f.cached = posts
+
f.cacheExpiresAt = time.Now().Add(1 * time.Minute)
+
return posts, nil
+
}
+
+
func msToTime(ms string) (time.Time, error) {
+
msInt, err := strconv.ParseInt(ms, 10, 64)
+
if err != nil {
+
return time.Time{}, err
+
}
+
return time.Unix(0, msInt*int64(time.Millisecond)), nil
+
}
+
+
var baseballQuery = `
+
SELECT
+
count(*) as like_ct,
+
bp.uri,
+
bp.created_at,
+
dateDiff('hour', bp.created_at, now()) as hours_old,
+
count(*) * exp(-0.1 * dateDiff('hour', bp.created_at, now())) as decay_score
+
FROM baseball_post bp
+
LEFT JOIN default.like_by_subject i ON bp.uri = i.subject_uri
+
WHERE bp.created_at > now() - INTERVAL 1 DAY
+
GROUP BY bp.uri, bp.created_at
+
ORDER BY decay_score DESC
+
LIMIT 5000
+
`
+1 -1
peruse/handle_get_suggested_follows.go
···
u := NewUser(req.Handle)
suggs, err := u.getSuggestedFollows(ctx, s, req.ShowHandles)
if err != nil {
-
return e.String(400, fmt.Sprintf("error getting suggested follows: %v"))
+
return e.String(400, fmt.Sprintf("error getting suggested follows: %v", err))
}
html := "<html><table><tr><th>suggested did</th><th>bsky profile</th></tr>"
+29
peruse/peruse.go
···
import (
"context"
"crypto"
+
"fmt"
"log/slog"
"net/http"
"os"
···
directory identity.Directory
userManager *UserManager
xrpc *xrpc.Client
+
feeds map[string]Feed
}
type ServerArgs struct {
···
ServiceEndpoint string
ChronoFeedRkey string
SuggestedFollowsRkey string
+
}
+
+
type Feed interface {
+
Name() string
+
HandleGetFeedSkeleton(e echo.Context, req FeedSkeletonRequest) error
}
func NewServer(args ServerArgs) (*Server, error) {
···
xrpc: &xrpc.Client{
Host: "https://public.api.bsky.app",
},
+
feeds: map[string]Feed{},
}, nil
}
···
ctx, cancel := context.WithCancel(ctx)
defer cancel()
+
s.addFeed(NewBaseballFeed(s))
+
s.addRoutes()
go func() {
···
return nil
}
+
func (s *Server) addFeed(f Feed) error {
+
_, exists := s.feeds[f.Name()]
+
if exists {
+
return fmt.Errorf("feed %s already exists", f.Name())
+
}
+
s.feeds[f.Name()] = f
+
return nil
+
}
+
func (s *Server) addRoutes() {
s.echo.GET("/xrpc/app.bsky.feed.getFeedSkeleton", s.handleFeedSkeleton, s.handleAuthMiddleware)
s.echo.GET("/xrpc/app.bsky.feed.describeFeedGenerator", s.handleDescribeFeedGenerator)
···
return next(e)
}
}
+
+
func urisToFeedPostItems(uris []string) []FeedPostItem {
+
var pis []FeedPostItem
+
for _, u := range uris {
+
pis = append(pis, FeedPostItem{
+
Post: u,
+
})
+
}
+
return pis
+
}