A community based topic aggregation platform built on atproto

feat(jetstream): add block event indexing to consumer

Implement Jetstream consumer support for community block records:

- handleBlock: Routes CREATE/DELETE operations for social.coves.community.block
- createBlock: Indexes block CREATE events from firehose
- Extracts community DID from "subject" field (atProto convention)
- Builds AT-URI: at://user_did/social.coves.community.block/rkey
- Preserves createdAt timestamp for chronological ordering during replays
- Idempotent: handles duplicate events via ON CONFLICT

- deleteBlock: Processes block DELETE events from firehose
- Looks up block by URI (DELETE events don't include record data)
- Removes from AppView index
- Gracefully handles deletion of non-existent blocks

Completes the write-forward flow:
Client → PDS → Jetstream Firehose → Consumer → AppView DB

Changed files
+99 -1
internal
atproto
+99 -1
internal/atproto/jetstream/community_consumer.go
···
package jetstream
import (
"Coves/internal/core/communities"
"context"
"encoding/json"
···
// IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures
// - social.coves.community.profile: Community profile records (in community's own repo)
// - social.coves.community.subscription: Subscription records (in user's repo)
//
// XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints
// that CREATE or DELETE records in these collections
···
case "social.coves.community.subscription":
// Handle both create (subscribe) and delete (unsubscribe) operations
return c.handleSubscription(ctx, event.Did, commit)
default:
// Not a community-related collection
return nil
···
uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
// Create subscription entity
subscription := &communities.Subscription{
UserDID: userDID,
CommunityDID: communityDID,
ContentVisibility: contentVisibility,
-
SubscribedAt: time.Now(),
RecordURI: uri,
RecordCID: commit.CID,
}
···
}
log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID)
return nil
}
···
package jetstream
import (
+
"Coves/internal/atproto/utils"
"Coves/internal/core/communities"
"context"
"encoding/json"
···
// IMPORTANT: Collection names refer to RECORD TYPES in repositories, not XRPC procedures
// - social.coves.community.profile: Community profile records (in community's own repo)
// - social.coves.community.subscription: Subscription records (in user's repo)
+
// - social.coves.community.block: Block records (in user's repo)
//
// XRPC procedures (social.coves.community.subscribe/unsubscribe) are just HTTP endpoints
// that CREATE or DELETE records in these collections
···
case "social.coves.community.subscription":
// Handle both create (subscribe) and delete (unsubscribe) operations
return c.handleSubscription(ctx, event.Did, commit)
+
case "social.coves.community.block":
+
// Handle both create (block) and delete (unblock) operations
+
return c.handleBlock(ctx, event.Did, commit)
default:
// Not a community-related collection
return nil
···
uri := fmt.Sprintf("at://%s/social.coves.community.subscription/%s", userDID, commit.RKey)
// Create subscription entity
+
// Parse createdAt from record to preserve chronological ordering during replays
subscription := &communities.Subscription{
UserDID: userDID,
CommunityDID: communityDID,
ContentVisibility: contentVisibility,
+
SubscribedAt: utils.ParseCreatedAt(commit.Record),
RecordURI: uri,
RecordCID: commit.CID,
}
···
}
log.Printf("✓ Removed subscription: %s -> %s", userDID, subscription.CommunityDID)
+
return nil
+
}
+
+
// handleBlock processes block create/delete events
+
// CREATE operation = user blocked a community
+
// DELETE operation = user unblocked a community
+
func (c *CommunityEventConsumer) handleBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
+
switch commit.Operation {
+
case "create":
+
return c.createBlock(ctx, userDID, commit)
+
case "delete":
+
return c.deleteBlock(ctx, userDID, commit)
+
default:
+
// Update operations shouldn't happen on blocks, but ignore gracefully
+
log.Printf("Ignoring unexpected operation on block: %s (userDID=%s, rkey=%s)",
+
commit.Operation, userDID, commit.RKey)
+
return nil
+
}
+
}
+
+
// createBlock indexes a new block
+
func (c *CommunityEventConsumer) createBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
+
if commit.Record == nil {
+
return fmt.Errorf("block create event missing record data")
+
}
+
+
// Extract community DID from record's subject field (following atProto conventions)
+
communityDID, ok := commit.Record["subject"].(string)
+
if !ok {
+
return fmt.Errorf("block record missing subject field")
+
}
+
+
// Build AT-URI for block record
+
// The record lives in the USER's repository
+
uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
+
+
// Create block entity
+
// Parse createdAt from record to preserve chronological ordering during replays
+
block := &communities.CommunityBlock{
+
UserDID: userDID,
+
CommunityDID: communityDID,
+
BlockedAt: utils.ParseCreatedAt(commit.Record),
+
RecordURI: uri,
+
RecordCID: commit.CID,
+
}
+
+
// Index the block
+
// This is idempotent - safe for Jetstream replays
+
_, err := c.repo.BlockCommunity(ctx, block)
+
if err != nil {
+
// If already exists, that's fine (idempotency)
+
if communities.IsConflict(err) {
+
log.Printf("Block already indexed: %s -> %s", userDID, communityDID)
+
return nil
+
}
+
return fmt.Errorf("failed to index block: %w", err)
+
}
+
+
log.Printf("✓ Indexed block: %s -> %s", userDID, communityDID)
+
return nil
+
}
+
+
// deleteBlock removes a block from the index
+
// DELETE operations don't include record data, so we need to look up the block
+
// by its URI to find which community the user unblocked
+
func (c *CommunityEventConsumer) deleteBlock(ctx context.Context, userDID string, commit *CommitEvent) error {
+
// Build AT-URI from the rkey
+
uri := fmt.Sprintf("at://%s/social.coves.community.block/%s", userDID, commit.RKey)
+
+
// Look up the block to get the community DID
+
// (DELETE operations don't include record data in Jetstream)
+
block, err := c.repo.GetBlockByURI(ctx, uri)
+
if err != nil {
+
if communities.IsNotFound(err) {
+
// Already deleted - this is fine (idempotency)
+
log.Printf("Block already deleted: %s", uri)
+
return nil
+
}
+
return fmt.Errorf("failed to find block for deletion: %w", err)
+
}
+
+
// Remove the block from the index
+
err = c.repo.UnblockCommunity(ctx, userDID, block.CommunityDID)
+
if err != nil {
+
if communities.IsNotFound(err) {
+
log.Printf("Block already removed: %s -> %s", userDID, block.CommunityDID)
+
return nil
+
}
+
return fmt.Errorf("failed to remove block: %w", err)
+
}
+
+
log.Printf("✓ Removed block: %s -> %s", userDID, block.CommunityDID)
return nil
}