A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/core/communities" 6 "Coves/internal/db/postgres" 7 "context" 8 "fmt" 9 "testing" 10 "time" 11) 12 13func TestCommunityConsumer_HandleCommunityProfile(t *testing.T) { 14 db := setupTestDB(t) 15 defer func() { 16 if err := db.Close(); err != nil { 17 t.Logf("Failed to close database: %v", err) 18 } 19 }() 20 21 repo := postgres.NewCommunityRepository(db) 22 // Skip verification in tests 23 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true) 24 ctx := context.Background() 25 26 t.Run("creates community from firehose event", func(t *testing.T) { 27 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 28 communityDID := generateTestDID(uniqueSuffix) 29 30 // Simulate a Jetstream commit event 31 event := &jetstream.JetstreamEvent{ 32 Did: communityDID, 33 TimeUS: time.Now().UnixMicro(), 34 Kind: "commit", 35 Commit: &jetstream.CommitEvent{ 36 Rev: "rev123", 37 Operation: "create", 38 Collection: "social.coves.community.profile", 39 RKey: "self", 40 CID: "bafy123abc", 41 Record: map[string]interface{}{ 42 "did": communityDID, // Community's unique DID 43 "handle": fmt.Sprintf("!test-community-%s@coves.local", uniqueSuffix), 44 "name": "test-community", 45 "displayName": "Test Community", 46 "description": "A test community", 47 "owner": "did:web:coves.local", 48 "createdBy": "did:plc:user123", 49 "hostedBy": "did:web:coves.local", 50 "visibility": "public", 51 "federation": map[string]interface{}{ 52 "allowExternalDiscovery": true, 53 }, 54 "memberCount": 0, 55 "subscriberCount": 0, 56 "createdAt": time.Now().Format(time.RFC3339), 57 }, 58 }, 59 } 60 61 // Handle the event 62 if err := consumer.HandleEvent(ctx, event); err != nil { 63 t.Fatalf("Failed to handle event: %v", err) 64 } 65 66 // Verify community was indexed 67 community, err := repo.GetByDID(ctx, communityDID) 68 if err != nil { 69 t.Fatalf("Failed to get indexed community: %v", err) 70 } 71 72 if community.DID != communityDID { 73 t.Errorf("Expected DID %s, got %s", communityDID, community.DID) 74 } 75 if community.DisplayName != "Test Community" { 76 t.Errorf("Expected DisplayName 'Test Community', got %s", community.DisplayName) 77 } 78 if community.Visibility != "public" { 79 t.Errorf("Expected Visibility 'public', got %s", community.Visibility) 80 } 81 }) 82 83 t.Run("updates existing community", func(t *testing.T) { 84 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 85 communityDID := generateTestDID(uniqueSuffix) 86 handle := fmt.Sprintf("!update-test-%s@coves.local", uniqueSuffix) 87 88 // Create initial community 89 initialCommunity := &communities.Community{ 90 DID: communityDID, 91 Handle: handle, 92 Name: "update-test", 93 DisplayName: "Original Name", 94 Description: "Original description", 95 OwnerDID: "did:web:coves.local", 96 CreatedByDID: "did:plc:user123", 97 HostedByDID: "did:web:coves.local", 98 Visibility: "public", 99 AllowExternalDiscovery: true, 100 CreatedAt: time.Now(), 101 UpdatedAt: time.Now(), 102 } 103 104 if _, err := repo.Create(ctx, initialCommunity); err != nil { 105 t.Fatalf("Failed to create initial community: %v", err) 106 } 107 108 // Simulate update event 109 updateEvent := &jetstream.JetstreamEvent{ 110 Did: communityDID, 111 TimeUS: time.Now().UnixMicro(), 112 Kind: "commit", 113 Commit: &jetstream.CommitEvent{ 114 Rev: "rev124", 115 Operation: "update", 116 Collection: "social.coves.community.profile", 117 RKey: "self", 118 CID: "bafy456def", 119 Record: map[string]interface{}{ 120 "did": communityDID, // Community's unique DID 121 "handle": handle, 122 "name": "update-test", 123 "displayName": "Updated Name", 124 "description": "Updated description", 125 "owner": "did:web:coves.local", 126 "createdBy": "did:plc:user123", 127 "hostedBy": "did:web:coves.local", 128 "visibility": "unlisted", 129 "federation": map[string]interface{}{ 130 "allowExternalDiscovery": false, 131 }, 132 "memberCount": 5, 133 "subscriberCount": 10, 134 "createdAt": time.Now().Format(time.RFC3339), 135 }, 136 }, 137 } 138 139 // Handle the update 140 if err := consumer.HandleEvent(ctx, updateEvent); err != nil { 141 t.Fatalf("Failed to handle update event: %v", err) 142 } 143 144 // Verify community was updated 145 updated, err := repo.GetByDID(ctx, communityDID) 146 if err != nil { 147 t.Fatalf("Failed to get updated community: %v", err) 148 } 149 150 if updated.DisplayName != "Updated Name" { 151 t.Errorf("Expected DisplayName 'Updated Name', got %s", updated.DisplayName) 152 } 153 if updated.Description != "Updated description" { 154 t.Errorf("Expected Description 'Updated description', got %s", updated.Description) 155 } 156 if updated.Visibility != "unlisted" { 157 t.Errorf("Expected Visibility 'unlisted', got %s", updated.Visibility) 158 } 159 if updated.AllowExternalDiscovery { 160 t.Error("Expected AllowExternalDiscovery to be false") 161 } 162 }) 163 164 t.Run("deletes community", func(t *testing.T) { 165 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 166 communityDID := generateTestDID(uniqueSuffix) 167 168 // Create community to delete 169 community := &communities.Community{ 170 DID: communityDID, 171 Handle: fmt.Sprintf("!delete-test-%s@coves.local", uniqueSuffix), 172 Name: "delete-test", 173 OwnerDID: "did:web:coves.local", 174 CreatedByDID: "did:plc:user123", 175 HostedByDID: "did:web:coves.local", 176 Visibility: "public", 177 CreatedAt: time.Now(), 178 UpdatedAt: time.Now(), 179 } 180 181 if _, err := repo.Create(ctx, community); err != nil { 182 t.Fatalf("Failed to create community: %v", err) 183 } 184 185 // Simulate delete event 186 deleteEvent := &jetstream.JetstreamEvent{ 187 Did: communityDID, 188 TimeUS: time.Now().UnixMicro(), 189 Kind: "commit", 190 Commit: &jetstream.CommitEvent{ 191 Rev: "rev125", 192 Operation: "delete", 193 Collection: "social.coves.community.profile", 194 RKey: "self", 195 }, 196 } 197 198 // Handle the delete 199 if err := consumer.HandleEvent(ctx, deleteEvent); err != nil { 200 t.Fatalf("Failed to handle delete event: %v", err) 201 } 202 203 // Verify community was deleted 204 if _, err := repo.GetByDID(ctx, communityDID); err != communities.ErrCommunityNotFound { 205 t.Errorf("Expected ErrCommunityNotFound, got: %v", err) 206 } 207 }) 208} 209 210func TestCommunityConsumer_HandleSubscription(t *testing.T) { 211 db := setupTestDB(t) 212 defer func() { 213 if err := db.Close(); err != nil { 214 t.Logf("Failed to close database: %v", err) 215 } 216 }() 217 218 repo := postgres.NewCommunityRepository(db) 219 // Skip verification in tests 220 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true) 221 ctx := context.Background() 222 223 t.Run("creates subscription from event", func(t *testing.T) { 224 // Create a community first 225 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 226 communityDID := generateTestDID(uniqueSuffix) 227 228 community := &communities.Community{ 229 DID: communityDID, 230 Handle: fmt.Sprintf("!sub-test-%s@coves.local", uniqueSuffix), 231 Name: "sub-test", 232 OwnerDID: "did:web:coves.local", 233 CreatedByDID: "did:plc:user123", 234 HostedByDID: "did:web:coves.local", 235 Visibility: "public", 236 CreatedAt: time.Now(), 237 UpdatedAt: time.Now(), 238 } 239 240 if _, err := repo.Create(ctx, community); err != nil { 241 t.Fatalf("Failed to create community: %v", err) 242 } 243 244 // Simulate subscription event 245 // IMPORTANT: Use correct collection name (record type, not XRPC procedure) 246 userDID := "did:plc:subscriber123" 247 subEvent := &jetstream.JetstreamEvent{ 248 Did: userDID, 249 TimeUS: time.Now().UnixMicro(), 250 Kind: "commit", 251 Commit: &jetstream.CommitEvent{ 252 Rev: "rev200", 253 Operation: "create", 254 Collection: "social.coves.community.subscription", // Updated to communities namespace 255 RKey: "sub123", 256 CID: "bafy789ghi", 257 Record: map[string]interface{}{ 258 "subject": communityDID, // Using 'subject' per atProto conventions 259 "contentVisibility": 3, 260 "createdAt": time.Now().Format(time.RFC3339), 261 }, 262 }, 263 } 264 265 // Handle the subscription 266 if err := consumer.HandleEvent(ctx, subEvent); err != nil { 267 t.Fatalf("Failed to handle subscription event: %v", err) 268 } 269 270 // Verify subscription was created 271 subscription, err := repo.GetSubscription(ctx, userDID, communityDID) 272 if err != nil { 273 t.Fatalf("Failed to get subscription: %v", err) 274 } 275 276 if subscription.UserDID != userDID { 277 t.Errorf("Expected UserDID %s, got %s", userDID, subscription.UserDID) 278 } 279 if subscription.CommunityDID != communityDID { 280 t.Errorf("Expected CommunityDID %s, got %s", communityDID, subscription.CommunityDID) 281 } 282 283 // Verify subscriber count was incremented 284 updated, err := repo.GetByDID(ctx, communityDID) 285 if err != nil { 286 t.Fatalf("Failed to get community: %v", err) 287 } 288 289 if updated.SubscriberCount != 1 { 290 t.Errorf("Expected SubscriberCount 1, got %d", updated.SubscriberCount) 291 } 292 }) 293} 294 295func TestCommunityConsumer_IgnoresNonCommunityEvents(t *testing.T) { 296 db := setupTestDB(t) 297 defer func() { 298 if err := db.Close(); err != nil { 299 t.Logf("Failed to close database: %v", err) 300 } 301 }() 302 303 repo := postgres.NewCommunityRepository(db) 304 // Skip verification in tests 305 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true) 306 ctx := context.Background() 307 308 t.Run("ignores identity events", func(t *testing.T) { 309 event := &jetstream.JetstreamEvent{ 310 Did: "did:plc:user123", 311 TimeUS: time.Now().UnixMicro(), 312 Kind: "identity", 313 Identity: &jetstream.IdentityEvent{ 314 Did: "did:plc:user123", 315 Handle: "alice.bsky.social", 316 }, 317 } 318 319 err := consumer.HandleEvent(ctx, event) 320 if err != nil { 321 t.Errorf("Expected no error for identity event, got: %v", err) 322 } 323 }) 324 325 t.Run("ignores non-community collections", func(t *testing.T) { 326 event := &jetstream.JetstreamEvent{ 327 Did: "did:plc:user123", 328 TimeUS: time.Now().UnixMicro(), 329 Kind: "commit", 330 Commit: &jetstream.CommitEvent{ 331 Rev: "rev300", 332 Operation: "create", 333 Collection: "app.bsky.communityFeed.post", 334 RKey: "post123", 335 Record: map[string]interface{}{ 336 "text": "Hello world", 337 }, 338 }, 339 } 340 341 err := consumer.HandleEvent(ctx, event) 342 if err != nil { 343 t.Errorf("Expected no error for non-community event, got: %v", err) 344 } 345 }) 346}