+3
.beads/beads.left.jsonl
+3
.beads/beads.left.jsonl
···+{"id":"Coves-95q","content_hash":"8ec99d598f067780436b985f9ad57f0fa19632026981038df4f65f192186620b","title":"Add comprehensive API documentation","description":"","status":"open","priority":2,"issue_type":"task","created_at":"2025-11-17T20:30:34.835721854-08:00","updated_at":"2025-11-17T20:30:34.835721854-08:00","source_repo":".","dependencies":[{"issue_id":"Coves-95q","depends_on_id":"Coves-e16","type":"blocks","created_at":"2025-11-17T20:30:46.273899399-08:00","created_by":"daemon"}]}+{"id":"Coves-e16","content_hash":"7c5d0fc8f0e7f626be3dad62af0e8412467330bad01a244e5a7e52ac5afff1c1","title":"Complete post creation and moderation features","description":"","status":"open","priority":1,"issue_type":"feature","created_at":"2025-11-17T20:30:12.885991306-08:00","updated_at":"2025-11-17T20:30:12.885991306-08:00","source_repo":"."}+{"id":"Coves-fce","content_hash":"26b3e16b99f827316ee0d741cc959464bd0c813446c95aef8105c7fd1e6b09ff","title":"Implement aggregator feed federation","description":"","status":"open","priority":1,"issue_type":"feature","created_at":"2025-11-17T20:30:21.453326012-08:00","updated_at":"2025-11-17T20:30:21.453326012-08:00","source_repo":"."}
+1
.beads/beads.left.meta.json
+1
.beads/beads.left.meta.json
···
+2
-8
internal/api/handlers/vote/delete_vote_test.go
+2
-8
internal/api/handlers/vote/delete_vote_test.go
······-expectedError: "SubjectNotFound", // Per lexicon: social.coves.feed.vote.create#SubjectNotFound
-3
internal/api/handlers/vote/errors.go
-3
internal/api/handlers/vote/errors.go
···-writeError(w, http.StatusNotFound, "SubjectNotFound", "The subject post or comment was not found")writeError(w, http.StatusBadRequest, "InvalidRequest", "Vote direction must be 'up' or 'down'")
+4
-4
internal/atproto/oauth/handlers_security.go
+4
-4
internal/atproto/oauth/handlers_security.go
···
-3
internal/core/votes/errors.go
-3
internal/core/votes/errors.go
···
-50
internal/core/votes/subject_validator.go
-50
internal/core/votes/subject_validator.go
···-func NewCompositeSubjectValidator(postExists, commentExists SubjectExistsFunc) *CompositeSubjectValidator {-// Determines type from the collection in the URI (e.g., social.coves.feed.post vs social.coves.feed.comment)-func (v *CompositeSubjectValidator) SubjectExists(ctx context.Context, uri string) (bool, error) {
-9
internal/core/votes/vote.go
-9
internal/core/votes/vote.go
···
+3
-2
internal/db/postgres/vote_repo.go
+3
-2
internal/db/postgres/vote_repo.go
······
+18
.env.dev
+18
.env.dev
···
+92
.env.dev.example
+92
.env.dev.example
···
+13
-2
internal/atproto/oauth/client.go
+13
-2
internal/atproto/oauth/client.go
············cacheDir := identity.NewCacheDirectory(baseDir, 100_000, time.Hour*24, time.Minute*2, time.Minute*5)+fmt.Printf("๐ OAuth client directory configured with PLC URL: %s (AllowPrivateIPs: %v)\n", config.PLCURL, config.AllowPrivateIPs)
+285
internal/atproto/oauth/dev_auth_resolver.go
+285
internal/atproto/oauth/dev_auth_resolver.go
···+// The standard indigo OAuth resolver requires HTTPS and no port numbers, which breaks local testing.+func (r *DevAuthResolver) ResolveAuthServerURL(ctx context.Context, hostURL string) (string, error) {+func (r *DevAuthResolver) ResolveAuthServerMetadataDev(ctx context.Context, serverURL string) (*oauthlib.AuthServerMetadata, error) {+metaURL = fmt.Sprintf("%s://%s:%s/.well-known/oauth-authorization-server", u.Scheme, u.Hostname(), u.Port())+metaURL = fmt.Sprintf("%s://%s/.well-known/oauth-authorization-server", u.Scheme, u.Hostname())+func (r *DevAuthResolver) StartDevAuthFlow(ctx context.Context, client *OAuthClient, identifier string, dir identity.Directory) (string, error) {+info, err := client.ClientApp.SendAuthRequest(ctx, authMeta, client.Config.Scopes, identifier)
+106
internal/atproto/oauth/dev_resolver.go
+106
internal/atproto/oauth/dev_resolver.go
···+func (r *DevHandleResolver) ResolveHandle(ctx context.Context, handle string) (string, error) {+// ResolveIdentifier attempts to resolve a handle to DID, or returns the DID if already provided+func (r *DevHandleResolver) ResolveIdentifier(ctx context.Context, identifier string) (string, error) {
+41
internal/atproto/oauth/dev_stubs.go
+41
internal/atproto/oauth/dev_stubs.go
···+func (r *DevHandleResolver) ResolveHandle(ctx context.Context, handle string) (string, error) {+func (r *DevAuthResolver) StartDevAuthFlow(ctx context.Context, client *OAuthClient, identifier string, dir identity.Directory) (string, error) {
+5
-1
scripts/dev-run.sh
+5
-1
scripts/dev-run.sh
······
+125
internal/atproto/pds/factory.go
+125
internal/atproto/pds/factory.go
···+func NewFromOAuthSession(ctx context.Context, oauthClient *oauth.ClientApp, sessionData *oauth.ClientSessionData) (Client, error) {+func (b *bearerAuth) DoWithAuth(c *http.Client, req *http.Request, _ syntax.NSID) (*http.Response, error) {
+18
tests/integration/helpers.go
+18
tests/integration/helpers.go
······+// PasswordAuthPDSClientFactory creates a PDSClientFactory that uses password-based Bearer auth.+return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken)
+267
cmd/reindex-votes/main.go
+267
cmd/reindex-votes/main.go
···+if _, err := db.ExecContext(ctx, "UPDATE posts SET upvote_count = 0, downvote_count = 0, score = 0"); err != nil {+if _, err := db.ExecContext(ctx, "UPDATE comments SET upvote_count = 0, downvote_count = 0, score = 0"); err != nil {+INSERT INTO votes (uri, cid, rkey, voter_did, subject_uri, subject_cid, direction, created_at, indexed_at)+updateQuery = `UPDATE posts SET upvote_count = upvote_count + 1, score = upvote_count + 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL`+updateQuery = `UPDATE posts SET downvote_count = downvote_count + 1, score = upvote_count - (downvote_count + 1) WHERE uri = $1 AND deleted_at IS NULL`+updateQuery = `UPDATE comments SET upvote_count = upvote_count + 1, score = upvote_count + 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL`+updateQuery = `UPDATE comments SET downvote_count = downvote_count + 1, score = upvote_count - (downvote_count + 1) WHERE uri = $1 AND deleted_at IS NULL`
+7
-5
internal/api/routes/communityFeed.go
+7
-5
internal/api/routes/communityFeed.go
······+r.With(authMiddleware.OptionalAuth).Get("/xrpc/social.coves.communityFeed.getCommunity", getCommunityHandler.HandleGetCommunity)
+3
-1
internal/api/routes/timeline.go
+3
-1
internal/api/routes/timeline.go
······
+221
internal/core/votes/cache.go
+221
internal/core/votes/cache.go
···+func (c *VoteCache) fetchAllVotesFromPDS(ctx context.Context, pdsClient pds.Client) (map[string]*CachedVote, error) {
+84
-2
internal/core/votes/service_impl.go
+84
-2
internal/core/votes/service_impl.go
···-func NewService(repo Repository, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, logger *slog.Logger) Service {+func NewService(repo Repository, oauthClient *oauthclient.OAuthClient, oauthStore oauth.ClientAuthStore, cache *VoteCache, logger *slog.Logger) Service {···-func NewServiceWithPDSFactory(repo Repository, logger *slog.Logger, factory PDSClientFactory) Service {+func NewServiceWithPDSFactory(repo Repository, cache *VoteCache, logger *slog.Logger, factory PDSClientFactory) Service {············+func (s *voteService) EnsureCachePopulated(ctx context.Context, session *oauth.ClientSessionData) error {+func (s *voteService) GetViewerVotesForSubjects(userDID string, subjectURIs []string) map[string]*CachedVote {
+76
-16
internal/atproto/jetstream/vote_consumer.go
+76
-16
internal/atproto/jetstream/vote_consumer.go
······-func (c *VoteEventConsumer) indexVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) error {+// Returns (true, nil) if vote was newly inserted, (false, nil) if already existed (idempotent)+func (c *VoteEventConsumer) indexVoteAndUpdateCounts(ctx context.Context, vote *votes.Vote) (bool, error) {···+if err := tx.QueryRowContext(ctx, checkQuery, vote.VoterDID, vote.SubjectURI, vote.URI).Scan(&existingDirection); err != nil && err != sql.ErrNoRows {+if _, err := tx.ExecContext(ctx, softDeleteQuery, vote.VoterDID, vote.SubjectURI, vote.URI); err != nil {+decrementQuery = `UPDATE posts SET upvote_count = GREATEST(0, upvote_count - 1), score = upvote_count - 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL`+decrementQuery = `UPDATE comments SET upvote_count = GREATEST(0, upvote_count - 1), score = upvote_count - 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL`+decrementQuery = `UPDATE posts SET downvote_count = GREATEST(0, downvote_count - 1), score = upvote_count - (downvote_count - 1) WHERE uri = $1 AND deleted_at IS NULL`+decrementQuery = `UPDATE comments SET downvote_count = GREATEST(0, downvote_count - 1), score = upvote_count - (downvote_count - 1) WHERE uri = $1 AND deleted_at IS NULL`+log.Printf("Cleaned up stale vote for %s on %s (was %s)", vote.VoterDID, vote.SubjectURI, existingDirection.String)······log.Printf("Vote subject has unsupported collection: %s (vote indexed, counts not updated)", collection)···
+38
internal/core/comments/types.go
+38
internal/core/comments/types.go
···
+130
internal/api/handlers/comments/create_comment.go
+130
internal/api/handlers/comments/create_comment.go
···+// CreateCommentInput matches the lexicon input schema for social.coves.community.comment.create
+80
internal/api/handlers/comments/delete_comment.go
+80
internal/api/handlers/comments/delete_comment.go
···+// DeleteCommentInput matches the lexicon input schema for social.coves.community.comment.delete
+34
-2
internal/api/handlers/comments/errors.go
+34
-2
internal/api/handlers/comments/errors.go
······+writeError(w, http.StatusBadRequest, "InvalidReply", "The reply reference is invalid or malformed")+writeError(w, http.StatusBadRequest, "ContentTooLong", "Comment content exceeds 10000 graphemes")+writeError(w, http.StatusForbidden, "NotAuthorized", "User is not authorized to perform this action")
+112
internal/api/handlers/comments/update_comment.go
+112
internal/api/handlers/comments/update_comment.go
···+// UpdateCommentInput matches the lexicon input schema for social.coves.community.comment.update
+35
internal/api/routes/comment.go
+35
internal/api/routes/comment.go
···+func RegisterCommentRoutes(r chi.Router, service commentsCore.Service, authMiddleware *middleware.OAuthAuthMiddleware) {
+4
-2
tests/integration/comment_query_test.go
+4
-2
tests/integration/comment_query_test.go
···+// Use factory constructor with nil factory - these tests only use the read path (GetComments)+return comments.NewCommentServiceWithPDSFactory(commentRepo, userRepo, postRepo, communityRepo, nil, nil)···+// Use factory constructor with nil factory - these tests only use the read path (GetComments)+service := comments.NewCommentServiceWithPDSFactory(commentRepo, userRepo, postRepo, communityRepo, nil, nil)
+6
-3
tests/integration/comment_vote_test.go
+6
-3
tests/integration/comment_vote_test.go
···+commentService := comments.NewCommentServiceWithPDSFactory(commentRepo, userRepo, postRepo, communityRepo, nil, nil)···+commentService := comments.NewCommentServiceWithPDSFactory(commentRepo, userRepo, postRepo, communityRepo, nil, nil)···+commentService := comments.NewCommentServiceWithPDSFactory(commentRepo, userRepo, postRepo, communityRepo, nil, nil)
+1
-1
go.mod
+1
-1
go.mod
···
+2
go.sum
+2
go.sum
···github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
+66
internal/db/migrations/021_add_comment_deletion_metadata.sql
+66
internal/db/migrations/021_add_comment_deletion_metadata.sql
···+COMMENT ON COLUMN comments.deletion_reason IS 'Reason for deletion: author (user deleted), moderator (community mod removed)';+CREATE INDEX idx_comments_root ON comments(root_uri, created_at DESC) WHERE deleted_at IS NULL;+CREATE INDEX idx_comments_parent ON comments(parent_uri, created_at DESC) WHERE deleted_at IS NULL;+CREATE INDEX idx_comments_parent_score ON comments(parent_uri, score DESC, created_at DESC) WHERE deleted_at IS NULL;
+17
-13
internal/core/comments/view_models.go
+17
-13
internal/core/comments/view_models.go
···
+23
-1
internal/core/comments/interfaces.go
+23
-1
internal/core/comments/interfaces.go
······+// SoftDeleteWithReason performs a soft delete that blanks content but preserves thread structure···+SoftDeleteWithReasonTx(ctx context.Context, tx *sql.Tx, uri, reason, deletedByDID string) (int64, error)
+5
-6
internal/core/comments/comment_service.go
+5
-6
internal/core/comments/comment_service.go
···-// TODO: Use PutRecord instead of CreateRecord for proper update semantics with optimistic locking.-// PutRecord should accept the existing CID (existingRecord.CID) to ensure concurrent updates are detected.+uri, cid, err := pdsClient.PutRecord(ctx, commentCollection, rkey, updatedRecord, existingRecord.CID)···
+73
internal/api/handlers/common/viewer_state.go
+73
internal/api/handlers/common/viewer_state.go
···+// This allows the helper to work with different feed post types (discover, timeline, communityFeed).
+11
-4
internal/api/handlers/discover/get_discover.go
+11
-4
internal/api/handlers/discover/get_discover.go
······+func NewGetDiscoverHandler(service discover.Service, voteService votes.Service) *GetDiscoverHandler {···
+9
-4
internal/api/routes/discover.go
+9
-4
internal/api/routes/discover.go
·········+r.With(authMiddleware.OptionalAuth).Get("/xrpc/social.coves.feed.getDiscover", getDiscoverHandler.HandleGetDiscover)
+5
internal/core/communityFeeds/types.go
+5
internal/core/communityFeeds/types.go
···
+5
internal/core/discover/types.go
+5
internal/core/discover/types.go
···
+5
internal/core/timeline/types.go
+5
internal/core/timeline/types.go
···
+193
-5
tests/integration/discover_test.go
+193
-5
tests/integration/discover_test.go
······+func (m *mockVoteService) CreateVote(_ context.Context, _ *oauthlib.ClientSessionData, _ votes.CreateVoteRequest) (*votes.CreateVoteResponse, error) {+func (m *mockVoteService) DeleteVote(_ context.Context, _ *oauthlib.ClientSessionData, _ votes.DeleteVoteRequest) error {+func (m *mockVoteService) EnsureCachePopulated(_ context.Context, _ *oauthlib.ClientSessionData) error {+func (m *mockVoteService) GetViewerVotesForSubjects(userDID string, subjectURIs []string) map[string]*votes.CachedVote {···+handler := discover.NewGetDiscoverHandler(discoverService, nil) // nil vote service - tests don't need vote state···+handler := discover.NewGetDiscoverHandler(discoverService, nil) // nil vote service - tests don't need vote state·········req := httptest.NewRequest(http.MethodGet, "/xrpc/social.coves.feed.getDiscover?sort=new&limit=100", nil)···+// TestGetDiscover_ViewerVoteState tests that authenticated users see their vote state on posts+communityDID, err := createFeedTestCommunity(db, ctx, fmt.Sprintf("votes-%d", testID), fmt.Sprintf("alice-%d.test", testID))+post1URI := createTestPost(t, db, communityDID, "did:plc:author1", "Post with upvote", 10, time.Now().Add(-1*time.Hour))+post2URI := createTestPost(t, db, communityDID, "did:plc:author2", "Post with downvote", 5, time.Now().Add(-2*time.Hour))+_ = createTestPost(t, db, communityDID, "did:plc:author3", "Post without vote", 3, time.Now().Add(-3*time.Hour))+req := httptest.NewRequest(http.MethodGet, "/xrpc/social.coves.feed.getDiscover?sort=new&limit=50", nil)+assert.Contains(t, *feedPost.Post.Viewer.VoteURI, "vote1", "Post1 should have correct vote URI")+// TestGetDiscover_NoViewerStateWithoutAuth tests that unauthenticated users don't get viewer state+communityDID, err := createFeedTestCommunity(db, ctx, fmt.Sprintf("noauth-%d", testID), fmt.Sprintf("alice-%d.test", testID))+mockVotes.AddVote("did:plc:someuser", postURI, "up", "at://did:plc:someuser/social.coves.vote/vote1")+req := httptest.NewRequest(http.MethodGet, "/xrpc/social.coves.feed.getDiscover?sort=new&limit=50", nil)
+11
-11
tests/integration/feed_test.go
+11
-11
tests/integration/feed_test.go
···············req := httptest.NewRequest(http.MethodGet, "/xrpc/social.coves.communityFeed.getCommunity?community=did:plc:nonexistent&sort=hot&limit=10", nil)··················
+7
-7
tests/integration/timeline_test.go
+7
-7
tests/integration/timeline_test.go
···············req := httptest.NewRequest(http.MethodGet, "/xrpc/social.coves.feed.getTimeline?sort=new&limit=10", nil)······
+2
scripts/generate_test_comments.go
+2
scripts/generate_test_comments.go
+38
-6
internal/atproto/jetstream/user_consumer.go
+38
-6
internal/atproto/jetstream/user_consumer.go
······-func NewUserEventConsumer(userService users.UserService, identityResolver identity.Resolver, wsURL, pdsFilter string) *UserEventConsumer {+func NewUserEventConsumer(userService users.UserService, identityResolver identity.Resolver, wsURL, pdsFilter string, opts ...ConsumerOption) *UserEventConsumer {···+if sessionsUpdated, updateErr := c.sessionHandleUpdater.UpdateHandleByDID(ctx, did, handle); updateErr != nil {
+29
internal/atproto/oauth/store.go
+29
internal/atproto/oauth/store.go
···+func (s *PostgresOAuthStore) UpdateHandleByDID(ctx context.Context, did, newHandle string) (int64, error) {
+375
tests/integration/oauth_session_handle_sync_test.go
+375
tests/integration/oauth_session_handle_sync_test.go
···+// TEST_DATABASE_URL="postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" \+// TEST_DATABASE_URL="postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" \+t.Skip("Jetstream not available at localhost:6008 - run 'docker-compose --profile jetstream up -d' first")
+13
-6
aggregators/kagi-news/src/coves_client.py
+13
-6
aggregators/kagi-news/src/coves_client.py
······
+8
-2
aggregators/kagi-news/src/main.py
+8
-2
aggregators/kagi-news/src/main.py
···
+142
aggregators/kagi-news/tests/test_coves_client.py
+142
aggregators/kagi-news/tests/test_coves_client.py
···
+76
-1
aggregators/kagi-news/tests/test_e2e.py
+76
-1
aggregators/kagi-news/tests/test_e2e.py
···
+137
aggregators/kagi-news/tests/test_main.py
+137
aggregators/kagi-news/tests/test_main.py
···+def test_create_post_with_sources_in_embed(self, mock_config, mock_rss_feed, sample_story, tmp_path):
+1188
tests/integration/comment_e2e_test.go
+1188
tests/integration/comment_e2e_test.go
···+jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.comment", pdsHostname)+t.Skipf("Jetstream not running at %s: %v. Run 'docker-compose --profile jetstream up' to start.", jetstreamURL, err)+pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)+testCommunityDID, err := createFeedTestCommunity(db, ctx, "comment-e2e-community", "owner.test")+postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post for Comments", 0, time.Now())+commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) {+return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken)+subscribeErr := subscribeToJetstreamForComment(ctx, jetstreamURL, userDID, commentConsumer, eventChan, done)+t.Errorf("Expected collection social.coves.community.comment, got %s", event.Commit.Collection)+jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.comment", pdsHostname)+pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)+testCommunityDID, err := createFeedTestCommunity(db, ctx, "comment-upd-community", "owner.test")+postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post for Update", 0, time.Now())+commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) {+return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken)+subscribeErr := subscribeToJetstreamForComment(ctx, jetstreamURL, userDID, commentConsumer, eventChan, done)+subscribeErr := subscribeToJetstreamForCommentUpdate(ctx, jetstreamURL, userDID, commentConsumer, updateEventChan, updateDone)+pdsResp, httpErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=social.coves.community.comment&rkey=%s",+t.Fatalf("Failed to get record from PDS: status=%d body=%s", pdsResp.StatusCode, string(body))+jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.comment", pdsHostname)+pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)+testCommunityDID, err := createFeedTestCommunity(db, ctx, "comment-del-community", "owner.test")+postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post for Delete", 0, time.Now())+commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) {+return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken)+subscribeErr := subscribeToJetstreamForComment(ctx, jetstreamURL, userDID, commentConsumer, eventChan, done)+subscribeErr := subscribeToJetstreamForCommentDelete(ctx, jetstreamURL, userDID, commentConsumer, deleteEventChan, deleteDone)+err = commentService.DeleteComment(ctx, session, comments.DeleteCommentRequest{URI: commentResp.URI})+// subscribeToJetstreamForComment subscribes to real Jetstream firehose for comment create events+pdsAccessTokenA, userADID, err := createPDSAccount(pdsURL, userAHandle, userAEmail, userAPassword)+pdsAccessTokenB, userBDID, err := createPDSAccount(pdsURL, userBHandle, userBEmail, userBPassword)+testCommunityDID, err := createFeedTestCommunity(db, ctx, "auth-test-community", "owner.test")+postURI := createTestPost(t, db, testCommunityDID, testUserA.DID, "Auth Test Post", 0, time.Now())+commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) {+return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken)+pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)+testCommunityDID, err := createFeedTestCommunity(db, ctx, "val-test-community", "owner.test")+postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Validation Test Post", 0, time.Now())+commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) {+return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken)
+381
tests/integration/community_update_e2e_test.go
+381
tests/integration/community_update_e2e_test.go
···+// TestCommunityUpdateE2E_WithJetstream tests the FULL community update flow with REAL Jetstream+// Flow: Service.UpdateCommunity() โ PDS putRecord โ REAL Jetstream Firehose โ Consumer โ AppView DB+jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.profile", pdsHostname)+t.Skipf("Jetstream not running at %s: %v. Run 'docker-compose --profile jetstream up' to start.", jetstreamURL, err)+consumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, true, identityResolver)+subscribeErr := subscribeToJetstreamForCommunityEvent(ctx, jetstreamURL, community.DID, "update", consumer, updateEventChan, updateDone)+subscribeErr := subscribeToJetstreamForCommunityEvent(ctx, jetstreamURL, community.DID, "update", consumer, updateEventChan, updateDone)+// subscribeToJetstreamForCommunityEvent subscribes to real Jetstream for specific community events
+1
-1
internal/api/handlers/community/create_test.go
+1
-1
internal/api/handlers/community/create_test.go
···
+43
-12
internal/atproto/jetstream/community_consumer.go
+43
-12
internal/atproto/jetstream/community_consumer.go
···+log.Printf("WARNING: Failed to create DID cache (size=1000), verification will be slower: %v", err)+panic(fmt.Sprintf("cannot create LRU cache: primary error=%v, fallback error=%v", err, fallbackErr))···+log.Printf("WARNING: Failed to marshal description facets for community %s: %v (facets will be omitted)", did, marshalErr)······+log.Printf("DEBUG: publicsuffix failed for @-format handle domain %q, using raw domain: %v", domain, err)···+log.Printf("DEBUG: publicsuffix failed for handle %q, using naive fallback: %q (error: %v)", handle, fallbackDomain, err)···+log.Printf("WARNING: constructHandleFromProfile: hostedBy %q is not did:web format, cannot construct handle for community %q",
+6
-6
internal/core/comments/comment_service_test.go
+6
-6
internal/core/comments/comment_service_test.go
···comment1 := createTestComment("at://did:plc:commenter123/comment/1", commenterDID, "commenter.test", postURI, postURI, 0)···commentRepo.listByParentWithHotRankFunc = func(ctx context.Context, parentURI, sort, timeframe string, limit int, cursor *string) ([]*Comment, *string, error) {······comment1 := createTestComment("at://did:plc:commenter123/comment/1", commenterDID, "commenter.test", postURI, postURI, 0)···comment1 := createTestComment("at://did:plc:commenter123/comment/1", commenterDID, "commenter.test", postURI, postURI, 0)···
+23
-13
internal/core/communities/community.go
+23
-13
internal/core/communities/community.go
······// Following Bluesky's pattern where client adds @ prefix for users, but for communities we use ! prefix+log.Printf("DEBUG: GetDisplayHandle: handle %q missing c- prefix, returning raw handle", c.Handle)+log.Printf("DEBUG: GetDisplayHandle: handle %q has no dot after c- prefix, returning raw handle", c.Handle)+log.Printf("DEBUG: GetDisplayHandle: handle %q has empty name after c- prefix, returning raw handle", c.Handle)
+5
-6
internal/core/communities/pds_provisioning.go
+5
-6
internal/core/communities/pds_provisioning.go
···-email := fmt.Sprintf("community-%s@community.%s", strings.ToLower(communityName), p.instanceDomain)
+49
internal/db/migrations/022_migrate_community_handles_to_c_prefix.sql
+49
internal/db/migrations/022_migrate_community_handles_to_c_prefix.sql
···+SET handle = 'c-' || SPLIT_PART(handle, '.community.', 1) || '.' || SPLIT_PART(handle, '.community.', 2)+SET pds_email = 'c-' || SUBSTRING(pds_email FROM 11 FOR POSITION('@' IN pds_email) - 11) || '@' || SUBSTRING(pds_email FROM POSITION('@community.' IN pds_email) + 11)
+2
-2
tests/integration/block_handle_resolution_test.go
+2
-2
tests/integration/block_handle_resolution_test.go
······
+5
-5
tests/integration/community_consumer_test.go
+5
-5
tests/integration/community_consumer_test.go
···············
+5
-5
tests/integration/community_e2e_test.go
+5
-5
tests/integration/community_e2e_test.go
·········
+9
-9
tests/integration/community_provisioning_test.go
+9
-9
tests/integration/community_provisioning_test.go
···························
+2
-2
tests/integration/community_service_integration_test.go
+2
-2
tests/integration/community_service_integration_test.go
···-// Use coves.social domain (configured in PDS_SERVICE_HANDLE_DOMAINS as .community.coves.social)···
+8
-8
tests/integration/community_v2_validation_test.go
+8
-8
tests/integration/community_v2_validation_test.go
·····················
+8
-7
tests/integration/post_creation_test.go
+8
-7
tests/integration/post_creation_test.go
···-Handle: "testcommunity.community.test.coves.social", // Canonical atProto handle (no ! prefix, .community. format)+Handle: "c-testcommunity.test.coves.social", // Canonical atProto handle (no ! prefix, c- format)······-Community: "nonexistent.community.test.coves.social", // Valid canonical handle format, but doesn't exist+Community: "c-nonexistent.test.coves.social", // Valid canonical handle format, but doesn't exist···
+7
-7
tests/integration/post_handler_test.go
+7
-7
tests/integration/post_handler_test.go
·········
+5
-5
tests/integration/post_unfurl_test.go
+5
-5
tests/integration/post_unfurl_test.go
···············
+2
-2
tests/integration/token_refresh_test.go
+2
-2
tests/integration/token_refresh_test.go
······
+2
-1
internal/atproto/oauth/handlers_test.go
+2
-1
internal/atproto/oauth/handlers_test.go
···
+2
internal/atproto/oauth/store_test.go
+2
internal/atproto/oauth/store_test.go
···
+5
-5
internal/db/postgres/vote_repo_test.go
+5
-5
internal/db/postgres/vote_repo_test.go
···assert.ErrorIs(t, err, votes.ErrVoteNotFound, "GetByVoterAndSubject should not return deleted votes")
+26
-3
tests/integration/oauth_e2e_test.go
+26
-3
tests/integration/oauth_e2e_test.go
···assert.Equal(t, oauth.ErrSessionNotFound, err, "Should return ErrSessionNotFound for expired session")······
+5
-5
tests/integration/vote_e2e_test.go
+5
-5
tests/integration/vote_e2e_test.go
···voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory())···voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory())···voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory())···voteService := votes.NewServiceWithPDSFactory(voteRepo, nil, nil, PasswordAuthPDSClientFactory())···
+1
-1
cmd/server/main.go
+1
-1
cmd/server/main.go
···log.Println("Discover XRPC endpoints registered (public with optional auth for viewer vote state)")+routes.RegisterAggregatorRoutes(r, aggregatorService, communityService, userService, identityResolver)log.Println("Aggregator XRPC endpoints registered (query endpoints public, registration endpoint public)")
+7
internal/api/handlers/aggregator/errors.go
+7
internal/api/handlers/aggregator/errors.go
······
+13
-6
internal/api/handlers/aggregator/list_for_community.go
+13
-6
internal/api/handlers/aggregator/list_for_community.go
······+func NewListForCommunityHandler(service aggregators.Service, communityService communities.Service) *ListForCommunityHandler {···+communityDID, err := h.communityService.ResolveCommunityIdentifier(r.Context(), communityIdentifier)
+16
-21
internal/api/handlers/community/subscribe.go
+16
-21
internal/api/handlers/community/subscribe.go
············// Extract authenticated user DID and access token from request context (injected by auth middleware)···subscription, err := h.service.SubscribeToCommunity(r.Context(), userDID, userAccessToken, req.Community, req.ContentVisibility)·········// Extract authenticated user DID and access token from request context (injected by auth middleware)···err := h.service.UnsubscribeFromCommunity(r.Context(), userDID, userAccessToken, req.Community)
+496
internal/api/handlers/community/subscribe_test.go
+496
internal/api/handlers/community/subscribe_test.go
···+subscribeFunc func(ctx context.Context, userDID, accessToken, communityIdentifier string, contentVisibility int) (*communities.Subscription, error)+unsubscribeFunc func(ctx context.Context, userDID, accessToken, communityIdentifier string) error+func (m *subscribeTestService) CreateCommunity(ctx context.Context, req communities.CreateCommunityRequest) (*communities.Community, error) {+func (m *subscribeTestService) GetCommunity(ctx context.Context, identifier string) (*communities.Community, error) {+func (m *subscribeTestService) UpdateCommunity(ctx context.Context, req communities.UpdateCommunityRequest) (*communities.Community, error) {+func (m *subscribeTestService) ListCommunities(ctx context.Context, req communities.ListCommunitiesRequest) ([]*communities.Community, error) {+func (m *subscribeTestService) SearchCommunities(ctx context.Context, req communities.SearchCommunitiesRequest) ([]*communities.Community, int, error) {+func (m *subscribeTestService) SubscribeToCommunity(ctx context.Context, userDID, accessToken, communityIdentifier string, contentVisibility int) (*communities.Subscription, error) {+func (m *subscribeTestService) UnsubscribeFromCommunity(ctx context.Context, userDID, accessToken, communityIdentifier string) error {+func (m *subscribeTestService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*communities.Subscription, error) {+func (m *subscribeTestService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*communities.Subscription, error) {+func (m *subscribeTestService) BlockCommunity(ctx context.Context, userDID, accessToken, communityIdentifier string) (*communities.CommunityBlock, error) {+func (m *subscribeTestService) UnblockCommunity(ctx context.Context, userDID, accessToken, communityIdentifier string) error {+func (m *subscribeTestService) GetBlockedCommunities(ctx context.Context, userDID string, limit, offset int) ([]*communities.CommunityBlock, error) {+func (m *subscribeTestService) IsBlocked(ctx context.Context, userDID, communityIdentifier string) (bool, error) {+func (m *subscribeTestService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*communities.Membership, error) {+func (m *subscribeTestService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*communities.Membership, error) {+func (m *subscribeTestService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) {+func (m *subscribeTestService) EnsureFreshToken(ctx context.Context, community *communities.Community) (*communities.Community, error) {+func (m *subscribeTestService) GetByDID(ctx context.Context, did string) (*communities.Community, error) {+subscribeFunc: func(ctx context.Context, userDID, accessToken, communityIdentifier string, contentVisibility int) (*communities.Subscription, error) {+req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.subscribe", bytes.NewBuffer(bodyBytes))+t.Errorf("Expected community %q to be passed to service, got %q", tc.expectedCommunity, receivedIdentifier)+req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.subscribe", bytes.NewBuffer(bodyBytes))+req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.subscribe", bytes.NewBuffer(bodyBytes))+subscribeFunc: func(ctx context.Context, userDID, accessToken, communityIdentifier string, contentVisibility int) (*communities.Subscription, error) {+req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.subscribe", bytes.NewBuffer(bodyBytes))+unsubscribeFunc: func(ctx context.Context, userDID, accessToken, communityIdentifier string) error {+req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.unsubscribe", bytes.NewBuffer(bodyBytes))+t.Errorf("Expected community %q to be passed to service, got %q", tc.expectedCommunity, receivedIdentifier)+unsubscribeFunc: func(ctx context.Context, userDID, accessToken, communityIdentifier string) error {+req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.unsubscribe", bytes.NewBuffer(bodyBytes))+req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.subscribe", bytes.NewBufferString("invalid json"))+req := httptest.NewRequest(http.MethodPost, "/xrpc/social.coves.community.subscribe", bytes.NewBuffer(bodyBytes))
+3
-1
internal/api/routes/aggregator.go
+3
-1
internal/api/routes/aggregator.go
······+listForCommunityHandler := aggregator.NewListForCommunityHandler(aggregatorService, communityService)