A community based topic aggregation platform built on atproto

feat(api): add aggregator XRPC query handlers

Implement XRPC query endpoints for aggregator discovery and
authorization management.

Handlers (internal/api/handlers/aggregator/):
- get_services.go: Fetch aggregator details by DIDs
* Supports detailed=true for stats (communities_using, posts_created)
* Returns aggregatorView or aggregatorViewDetailed union type
* Bulk query optimization for multiple DIDs

- get_authorizations.go: List communities using an aggregator
* Nested aggregatorView in response (lexicon compliance)
* Supports enabledOnly filter and pagination

- list_for_community.go: List aggregators for a community
* Accepts at-identifier (DID or handle) for community
* Returns authorizationView with config
* Supports enabledOnly filter and pagination

- errors.go: Error handling with domain error mapping
* Maps domain errors to appropriate HTTP status codes
* 404 for not found, 400 for validation, 501 for not implemented

Routes (internal/api/routes/aggregator.go):
- GET /xrpc/social.coves.aggregator.getServices?dids=...
- GET /xrpc/social.coves.aggregator.getAuthorizations?aggregatorDid=...
- GET /xrpc/social.coves.aggregator.listForCommunity?community=...

Features:
- Query parameter parsing with validation
- Domain model to API view conversion
- JSON response formatting matching lexicon
- Proper HTTP status codes (404, 400, 500, 501)
- Config unmarshal from JSONB to interface{}

Deferred to Phase 2:
- Write endpoints (enable, disable, updateConfig) return 501 Not Implemented

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+603
internal
+54
internal/api/handlers/aggregator/errors.go
···
+
package aggregator
+
+
import (
+
"Coves/internal/core/aggregators"
+
"encoding/json"
+
"log"
+
"net/http"
+
)
+
+
// ErrorResponse represents an XRPC error response
+
type ErrorResponse struct {
+
Error string `json:"error"`
+
Message string `json:"message"`
+
}
+
+
// writeError writes a JSON error response
+
func writeError(w http.ResponseWriter, statusCode int, errorType, message string) {
+
w.Header().Set("Content-Type", "application/json")
+
w.WriteHeader(statusCode)
+
if err := json.NewEncoder(w).Encode(ErrorResponse{
+
Error: errorType,
+
Message: message,
+
}); err != nil {
+
log.Printf("ERROR: Failed to encode error response: %v", err)
+
}
+
}
+
+
// handleServiceError maps service errors to HTTP responses
+
func handleServiceError(w http.ResponseWriter, err error) {
+
if err == nil {
+
return
+
}
+
+
// Map domain errors to HTTP status codes
+
switch {
+
case aggregators.IsNotFound(err):
+
writeError(w, http.StatusNotFound, "NotFound", err.Error())
+
case aggregators.IsValidationError(err):
+
writeError(w, http.StatusBadRequest, "InvalidRequest", err.Error())
+
case aggregators.IsUnauthorized(err):
+
writeError(w, http.StatusForbidden, "Forbidden", err.Error())
+
case aggregators.IsConflict(err):
+
writeError(w, http.StatusConflict, "Conflict", err.Error())
+
case aggregators.IsRateLimited(err):
+
writeError(w, http.StatusTooManyRequests, "RateLimitExceeded", err.Error())
+
case aggregators.IsNotImplemented(err):
+
writeError(w, http.StatusNotImplemented, "NotImplemented", "This feature is not yet available (Phase 2)")
+
default:
+
// Internal errors - don't leak details
+
log.Printf("ERROR: Aggregator service error: %v", err)
+
writeError(w, http.StatusInternalServerError, "InternalServerError",
+
"An internal error occurred")
+
}
+
}
+143
internal/api/handlers/aggregator/get_authorizations.go
···
+
package aggregator
+
+
import (
+
"Coves/internal/core/aggregators"
+
"encoding/json"
+
"log"
+
"net/http"
+
"strconv"
+
)
+
+
// GetAuthorizationsHandler handles listing authorizations for an aggregator
+
type GetAuthorizationsHandler struct {
+
service aggregators.Service
+
}
+
+
// NewGetAuthorizationsHandler creates a new get authorizations handler
+
func NewGetAuthorizationsHandler(service aggregators.Service) *GetAuthorizationsHandler {
+
return &GetAuthorizationsHandler{
+
service: service,
+
}
+
}
+
+
// HandleGetAuthorizations lists all communities that authorized an aggregator
+
// GET /xrpc/social.coves.aggregator.getAuthorizations?aggregatorDid=did:plc:abc123&enabledOnly=true&limit=50&cursor=xyz
+
// Following Bluesky's pattern for listing feed subscribers
+
func (h *GetAuthorizationsHandler) HandleGetAuthorizations(w http.ResponseWriter, r *http.Request) {
+
if r.Method != http.MethodGet {
+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+
return
+
}
+
+
// Parse request
+
req, err := h.parseRequest(r)
+
if err != nil {
+
writeError(w, http.StatusBadRequest, "InvalidRequest", err.Error())
+
return
+
}
+
+
// Get aggregator details first (needed for nested aggregator object in response)
+
agg, err := h.service.GetAggregator(r.Context(), req.AggregatorDID)
+
if err != nil {
+
if aggregators.IsNotFound(err) {
+
writeError(w, http.StatusNotFound, "AggregatorNotFound", "Aggregator DID does not exist or has no service declaration")
+
return
+
}
+
handleServiceError(w, err)
+
return
+
}
+
+
// Get authorizations from service
+
auths, err := h.service.GetAuthorizationsForAggregator(r.Context(), req)
+
if err != nil {
+
handleServiceError(w, err)
+
return
+
}
+
+
// Build response
+
response := GetAuthorizationsResponse{
+
Authorizations: make([]CommunityAuthView, 0, len(auths)),
+
}
+
+
// Convert aggregator to view for nesting in each authorization
+
aggregatorView := toAggregatorView(agg)
+
+
for _, auth := range auths {
+
response.Authorizations = append(response.Authorizations, toCommunityAuthView(auth, aggregatorView))
+
}
+
+
// Return response
+
w.Header().Set("Content-Type", "application/json")
+
w.WriteHeader(http.StatusOK)
+
if err := json.NewEncoder(w).Encode(response); err != nil {
+
log.Printf("ERROR: Failed to encode getAuthorizations response: %v", err)
+
}
+
}
+
+
// parseRequest parses query parameters
+
func (h *GetAuthorizationsHandler) parseRequest(r *http.Request) (aggregators.GetAuthorizationsRequest, error) {
+
req := aggregators.GetAuthorizationsRequest{}
+
+
// Required: aggregatorDid
+
req.AggregatorDID = r.URL.Query().Get("aggregatorDid")
+
+
// Optional: enabledOnly (default: false)
+
if enabledOnlyStr := r.URL.Query().Get("enabledOnly"); enabledOnlyStr == "true" {
+
req.EnabledOnly = true
+
}
+
+
// Optional: limit (default: 50, set by service)
+
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
+
if limit, err := strconv.Atoi(limitStr); err == nil {
+
req.Limit = limit
+
}
+
}
+
+
// Optional: offset (default: 0)
+
if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" {
+
if offset, err := strconv.Atoi(offsetStr); err == nil {
+
req.Offset = offset
+
}
+
}
+
+
return req, nil
+
}
+
+
// GetAuthorizationsResponse matches the lexicon output
+
type GetAuthorizationsResponse struct {
+
Authorizations []CommunityAuthView `json:"authorizations"`
+
Cursor *string `json:"cursor,omitempty"` // Pagination cursor
+
}
+
+
// CommunityAuthView matches social.coves.aggregator.defs#communityAuthView
+
// Shows authorization from aggregator's perspective with nested aggregator details
+
type CommunityAuthView struct {
+
Aggregator AggregatorView `json:"aggregator"` // REQUIRED: Nested full aggregator object
+
Enabled bool `json:"enabled"` // REQUIRED
+
Config interface{} `json:"config,omitempty"`
+
CreatedAt string `json:"createdAt"` // REQUIRED
+
RecordUri string `json:"recordUri,omitempty"`
+
}
+
+
// toCommunityAuthView converts domain model to API view
+
func toCommunityAuthView(auth *aggregators.Authorization, aggregatorView AggregatorView) CommunityAuthView {
+
view := CommunityAuthView{
+
Aggregator: aggregatorView, // Nested aggregator object
+
Enabled: auth.Enabled,
+
CreatedAt: auth.CreatedAt.Format("2006-01-02T15:04:05.000Z"),
+
}
+
+
// Add optional fields
+
if len(auth.Config) > 0 {
+
// Config is JSONB, unmarshal it
+
var config interface{}
+
if err := json.Unmarshal(auth.Config, &config); err == nil {
+
view.Config = config
+
}
+
}
+
if auth.RecordURI != "" {
+
view.RecordUri = auth.RecordURI
+
}
+
+
return view
+
}
+194
internal/api/handlers/aggregator/get_services.go
···
+
package aggregator
+
+
import (
+
"Coves/internal/core/aggregators"
+
"encoding/json"
+
"log"
+
"net/http"
+
"strings"
+
)
+
+
// GetServicesHandler handles aggregator service details retrieval
+
type GetServicesHandler struct {
+
service aggregators.Service
+
}
+
+
// NewGetServicesHandler creates a new get services handler
+
func NewGetServicesHandler(service aggregators.Service) *GetServicesHandler {
+
return &GetServicesHandler{
+
service: service,
+
}
+
}
+
+
// HandleGetServices retrieves aggregator details by DID(s)
+
// GET /xrpc/social.coves.aggregator.getServices?dids=did:plc:abc123,did:plc:def456&detailed=true
+
// Following Bluesky's pattern: app.bsky.feed.getFeedGenerators
+
func (h *GetServicesHandler) HandleGetServices(w http.ResponseWriter, r *http.Request) {
+
if r.Method != http.MethodGet {
+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+
return
+
}
+
+
// Parse DIDs from query parameter
+
didsParam := r.URL.Query().Get("dids")
+
if didsParam == "" {
+
writeError(w, http.StatusBadRequest, "InvalidRequest", "dids parameter is required")
+
return
+
}
+
+
// Parse detailed flag (default: false)
+
detailed := r.URL.Query().Get("detailed") == "true"
+
+
// Split comma-separated DIDs
+
rawDIDs := strings.Split(didsParam, ",")
+
+
// Trim whitespace and filter out empty DIDs (handles double commas, trailing commas, etc.)
+
dids := make([]string, 0, len(rawDIDs))
+
for _, did := range rawDIDs {
+
trimmed := strings.TrimSpace(did)
+
if trimmed != "" {
+
dids = append(dids, trimmed)
+
}
+
}
+
+
// Validate we have at least one valid DID
+
if len(dids) == 0 {
+
writeError(w, http.StatusBadRequest, "InvalidRequest", "at least one valid DID is required")
+
return
+
}
+
+
// Get aggregators from service
+
aggs, err := h.service.GetAggregators(r.Context(), dids)
+
if err != nil {
+
handleServiceError(w, err)
+
return
+
}
+
+
// Build response with appropriate view type based on detailed flag
+
response := GetServicesResponse{
+
Views: make([]interface{}, 0, len(aggs)),
+
}
+
+
for _, agg := range aggs {
+
if detailed {
+
response.Views = append(response.Views, toAggregatorViewDetailed(agg))
+
} else {
+
response.Views = append(response.Views, toAggregatorView(agg))
+
}
+
}
+
+
// Return response
+
w.Header().Set("Content-Type", "application/json")
+
w.WriteHeader(http.StatusOK)
+
if err := json.NewEncoder(w).Encode(response); err != nil {
+
log.Printf("ERROR: Failed to encode getServices response: %v", err)
+
}
+
}
+
+
// GetServicesResponse matches the lexicon output
+
type GetServicesResponse struct {
+
Views []interface{} `json:"views"` // Union of aggregatorView | aggregatorViewDetailed
+
}
+
+
// AggregatorView matches social.coves.aggregator.defs#aggregatorView (without stats)
+
type AggregatorView struct {
+
DID string `json:"did"`
+
DisplayName string `json:"displayName"`
+
Description *string `json:"description,omitempty"`
+
Avatar *string `json:"avatar,omitempty"`
+
ConfigSchema interface{} `json:"configSchema,omitempty"`
+
SourceURL *string `json:"sourceUrl,omitempty"`
+
MaintainerDID *string `json:"maintainer,omitempty"`
+
CreatedAt string `json:"createdAt"`
+
RecordUri string `json:"recordUri"`
+
}
+
+
// AggregatorViewDetailed matches social.coves.aggregator.defs#aggregatorViewDetailed (with stats)
+
type AggregatorViewDetailed struct {
+
DID string `json:"did"`
+
DisplayName string `json:"displayName"`
+
Description *string `json:"description,omitempty"`
+
Avatar *string `json:"avatar,omitempty"`
+
ConfigSchema interface{} `json:"configSchema,omitempty"`
+
SourceURL *string `json:"sourceUrl,omitempty"`
+
MaintainerDID *string `json:"maintainer,omitempty"`
+
CreatedAt string `json:"createdAt"`
+
RecordUri string `json:"recordUri"`
+
Stats AggregatorStats `json:"stats"`
+
}
+
+
// AggregatorStats matches social.coves.aggregator.defs#aggregatorStats
+
type AggregatorStats struct {
+
CommunitiesUsing int `json:"communitiesUsing"`
+
PostsCreated int `json:"postsCreated"`
+
}
+
+
// toAggregatorView converts domain model to basic aggregatorView (no stats)
+
func toAggregatorView(agg *aggregators.Aggregator) AggregatorView {
+
view := AggregatorView{
+
DID: agg.DID,
+
DisplayName: agg.DisplayName,
+
CreatedAt: agg.CreatedAt.Format("2006-01-02T15:04:05.000Z"),
+
RecordUri: agg.RecordURI,
+
}
+
+
// Add optional fields
+
if agg.Description != "" {
+
view.Description = &agg.Description
+
}
+
if agg.AvatarURL != "" {
+
view.Avatar = &agg.AvatarURL
+
}
+
if agg.MaintainerDID != "" {
+
view.MaintainerDID = &agg.MaintainerDID
+
}
+
if agg.SourceURL != "" {
+
view.SourceURL = &agg.SourceURL
+
}
+
if len(agg.ConfigSchema) > 0 {
+
// ConfigSchema is already JSON, unmarshal it for the view
+
var schema interface{}
+
if err := json.Unmarshal(agg.ConfigSchema, &schema); err == nil {
+
view.ConfigSchema = schema
+
}
+
}
+
+
return view
+
}
+
+
// toAggregatorViewDetailed converts domain model to detailed aggregatorViewDetailed (with stats)
+
func toAggregatorViewDetailed(agg *aggregators.Aggregator) AggregatorViewDetailed {
+
view := AggregatorViewDetailed{
+
DID: agg.DID,
+
DisplayName: agg.DisplayName,
+
CreatedAt: agg.CreatedAt.Format("2006-01-02T15:04:05.000Z"),
+
RecordUri: agg.RecordURI,
+
Stats: AggregatorStats{
+
CommunitiesUsing: agg.CommunitiesUsing,
+
PostsCreated: agg.PostsCreated,
+
},
+
}
+
+
// Add optional fields
+
if agg.Description != "" {
+
view.Description = &agg.Description
+
}
+
if agg.AvatarURL != "" {
+
view.Avatar = &agg.AvatarURL
+
}
+
if agg.MaintainerDID != "" {
+
view.MaintainerDID = &agg.MaintainerDID
+
}
+
if agg.SourceURL != "" {
+
view.SourceURL = &agg.SourceURL
+
}
+
if len(agg.ConfigSchema) > 0 {
+
// ConfigSchema is already JSON, unmarshal it for the view
+
var schema interface{}
+
if err := json.Unmarshal(agg.ConfigSchema, &schema); err == nil {
+
view.ConfigSchema = schema
+
}
+
}
+
+
return view
+
}
+173
internal/api/handlers/aggregator/list_for_community.go
···
+
package aggregator
+
+
import (
+
"Coves/internal/core/aggregators"
+
"encoding/json"
+
"log"
+
"net/http"
+
"strconv"
+
)
+
+
// ListForCommunityHandler handles listing aggregators for a community
+
type ListForCommunityHandler struct {
+
service aggregators.Service
+
}
+
+
// NewListForCommunityHandler creates a new list for community handler
+
func NewListForCommunityHandler(service aggregators.Service) *ListForCommunityHandler {
+
return &ListForCommunityHandler{
+
service: service,
+
}
+
}
+
+
// HandleListForCommunity lists all aggregators authorized by a community
+
// GET /xrpc/social.coves.aggregator.listForCommunity?community=did:plc:xyz789&enabledOnly=true&limit=50&cursor=xyz
+
// Used by community settings UI to manage aggregators
+
func (h *ListForCommunityHandler) HandleListForCommunity(w http.ResponseWriter, r *http.Request) {
+
if r.Method != http.MethodGet {
+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+
return
+
}
+
+
// Parse request
+
req, communityIdentifier, err := h.parseRequest(r)
+
if err != nil {
+
writeError(w, http.StatusBadRequest, "InvalidRequest", err.Error())
+
return
+
}
+
+
// Resolve community identifier to DID (handles both DIDs and handles)
+
// TODO: Implement identifier resolution service - for now, assume it's a DID
+
req.CommunityDID = communityIdentifier
+
+
// Get authorizations from service
+
// Note: Community handle/name fields will be empty until we integrate with communities service
+
// This is acceptable for alpha - clients can resolve community details separately if needed
+
auths, err := h.service.ListAggregatorsForCommunity(r.Context(), req)
+
if err != nil {
+
handleServiceError(w, err)
+
return
+
}
+
+
// Build response
+
response := ListForCommunityResponse{
+
Aggregators: make([]AuthorizationView, 0, len(auths)),
+
}
+
+
for _, auth := range auths {
+
response.Aggregators = append(response.Aggregators, toAuthorizationView(auth, req.CommunityDID))
+
}
+
+
// Return response
+
w.Header().Set("Content-Type", "application/json")
+
w.WriteHeader(http.StatusOK)
+
if err := json.NewEncoder(w).Encode(response); err != nil {
+
log.Printf("ERROR: Failed to encode listForCommunity response: %v", err)
+
}
+
}
+
+
// parseRequest parses query parameters and returns request + community identifier
+
func (h *ListForCommunityHandler) parseRequest(r *http.Request) (aggregators.ListForCommunityRequest, string, error) {
+
req := aggregators.ListForCommunityRequest{}
+
+
// Required: community (at-identifier: DID or handle)
+
communityIdentifier := r.URL.Query().Get("community")
+
if communityIdentifier == "" {
+
return req, "", writeErrorMsg("community parameter is required")
+
}
+
+
// Optional: enabledOnly (default: false per lexicon)
+
if enabledOnlyStr := r.URL.Query().Get("enabledOnly"); enabledOnlyStr == "true" {
+
req.EnabledOnly = true
+
}
+
+
// Optional: limit (default: 50, set by service)
+
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
+
if limit, err := strconv.Atoi(limitStr); err == nil {
+
req.Limit = limit
+
}
+
}
+
+
// TODO: Add cursor-based pagination support
+
// if cursor := r.URL.Query().Get("cursor"); cursor != "" {
+
// req.Cursor = cursor
+
// }
+
+
return req, communityIdentifier, nil
+
}
+
+
// writeErrorMsg creates an error for returning
+
func writeErrorMsg(msg string) error {
+
return &requestError{Message: msg}
+
}
+
+
type requestError struct {
+
Message string
+
}
+
+
func (e *requestError) Error() string {
+
return e.Message
+
}
+
+
// ListForCommunityResponse matches the lexicon output
+
type ListForCommunityResponse struct {
+
Aggregators []AuthorizationView `json:"aggregators"`
+
Cursor *string `json:"cursor,omitempty"` // Pagination cursor
+
}
+
+
// AuthorizationView matches social.coves.aggregator.defs#authorizationView
+
// Shows authorization from community's perspective
+
type AuthorizationView struct {
+
AggregatorDID string `json:"aggregatorDid"`
+
CommunityDID string `json:"communityDid"`
+
CommunityHandle *string `json:"communityHandle,omitempty"` // Optional: populated when communities service integration is complete
+
CommunityName *string `json:"communityName,omitempty"` // Optional: populated when communities service integration is complete
+
Enabled bool `json:"enabled"`
+
Config interface{} `json:"config,omitempty"`
+
CreatedAt string `json:"createdAt"` // REQUIRED
+
CreatedBy *string `json:"createdBy,omitempty"`
+
DisabledAt *string `json:"disabledAt,omitempty"`
+
DisabledBy *string `json:"disabledBy,omitempty"`
+
RecordUri string `json:"recordUri,omitempty"`
+
}
+
+
// toAuthorizationView converts domain model to API view
+
// communityHandle and communityName are left nil until communities service integration is complete
+
func toAuthorizationView(auth *aggregators.Authorization, communityDID string) AuthorizationView {
+
// Safety check for nil authorization
+
if auth == nil {
+
return AuthorizationView{}
+
}
+
+
view := AuthorizationView{
+
AggregatorDID: auth.AggregatorDID,
+
CommunityDID: communityDID,
+
// CommunityHandle and CommunityName left nil - TODO: fetch from communities service
+
Enabled: auth.Enabled,
+
CreatedAt: auth.CreatedAt.Format("2006-01-02T15:04:05.000Z"),
+
}
+
+
// Add optional fields
+
if len(auth.Config) > 0 {
+
// Config is JSONB, unmarshal it
+
var config interface{}
+
if err := json.Unmarshal(auth.Config, &config); err == nil {
+
view.Config = config
+
}
+
}
+
if auth.CreatedBy != "" {
+
view.CreatedBy = &auth.CreatedBy
+
}
+
if auth.DisabledAt != nil && !auth.DisabledAt.IsZero() {
+
disabledAt := auth.DisabledAt.Format("2006-01-02T15:04:05.000Z")
+
view.DisabledAt = &disabledAt
+
}
+
if auth.DisabledBy != "" {
+
view.DisabledBy = &auth.DisabledBy
+
}
+
if auth.RecordURI != "" {
+
view.RecordUri = auth.RecordURI
+
}
+
+
return view
+
}
+39
internal/api/routes/aggregator.go
···
+
package routes
+
+
import (
+
"Coves/internal/api/handlers/aggregator"
+
"Coves/internal/core/aggregators"
+
+
"github.com/go-chi/chi/v5"
+
)
+
+
// RegisterAggregatorRoutes registers aggregator-related XRPC endpoints
+
// Following Bluesky's pattern for feed generators and labelers
+
func RegisterAggregatorRoutes(
+
r chi.Router,
+
aggregatorService aggregators.Service,
+
) {
+
// Create query handlers
+
getServicesHandler := aggregator.NewGetServicesHandler(aggregatorService)
+
getAuthorizationsHandler := aggregator.NewGetAuthorizationsHandler(aggregatorService)
+
listForCommunityHandler := aggregator.NewListForCommunityHandler(aggregatorService)
+
+
// Query endpoints (public - no auth required)
+
// GET /xrpc/social.coves.aggregator.getServices?dids=did:plc:abc,did:plc:def
+
// Following app.bsky.feed.getFeedGenerators pattern
+
r.Get("/xrpc/social.coves.aggregator.getServices", getServicesHandler.HandleGetServices)
+
+
// GET /xrpc/social.coves.aggregator.getAuthorizations?aggregatorDid=did:plc:abc&enabledOnly=true
+
// Lists communities that authorized an aggregator
+
r.Get("/xrpc/social.coves.aggregator.getAuthorizations", getAuthorizationsHandler.HandleGetAuthorizations)
+
+
// GET /xrpc/social.coves.aggregator.listForCommunity?communityDid=did:plc:xyz&enabledOnly=true
+
// Lists aggregators authorized by a community
+
r.Get("/xrpc/social.coves.aggregator.listForCommunity", listForCommunityHandler.HandleListForCommunity)
+
+
// Write endpoints (Phase 2 - require authentication and moderator permissions)
+
// TODO: Implement after Jetstream consumer is ready
+
// POST /xrpc/social.coves.aggregator.enable (requires auth + moderator)
+
// POST /xrpc/social.coves.aggregator.disable (requires auth + moderator)
+
// POST /xrpc/social.coves.aggregator.updateConfig (requires auth + moderator)
+
}