A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/core/communities" 6 "Coves/internal/db/postgres" 7 "context" 8 "fmt" 9 "testing" 10 "time" 11) 12 13func TestCommunityConsumer_HandleCommunityProfile(t *testing.T) { 14 db := setupTestDB(t) 15 defer func() { 16 if err := db.Close(); err != nil { 17 t.Logf("Failed to close database: %v", err) 18 } 19 }() 20 21 repo := postgres.NewCommunityRepository(db) 22 consumer := jetstream.NewCommunityEventConsumer(repo) 23 ctx := context.Background() 24 25 t.Run("creates community from firehose event", func(t *testing.T) { 26 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 27 communityDID := generateTestDID(uniqueSuffix) 28 29 // Simulate a Jetstream commit event 30 event := &jetstream.JetstreamEvent{ 31 Did: communityDID, 32 TimeUS: time.Now().UnixMicro(), 33 Kind: "commit", 34 Commit: &jetstream.CommitEvent{ 35 Rev: "rev123", 36 Operation: "create", 37 Collection: "social.coves.community.profile", 38 RKey: "self", 39 CID: "bafy123abc", 40 Record: map[string]interface{}{ 41 "did": communityDID, // Community's unique DID 42 "handle": fmt.Sprintf("!test-community-%s@coves.local", uniqueSuffix), 43 "name": "test-community", 44 "displayName": "Test Community", 45 "description": "A test community", 46 "owner": "did:web:coves.local", 47 "createdBy": "did:plc:user123", 48 "hostedBy": "did:web:coves.local", 49 "visibility": "public", 50 "federation": map[string]interface{}{ 51 "allowExternalDiscovery": true, 52 }, 53 "memberCount": 0, 54 "subscriberCount": 0, 55 "createdAt": time.Now().Format(time.RFC3339), 56 }, 57 }, 58 } 59 60 // Handle the event 61 if err := consumer.HandleEvent(ctx, event); err != nil { 62 t.Fatalf("Failed to handle event: %v", err) 63 } 64 65 // Verify community was indexed 66 community, err := repo.GetByDID(ctx, communityDID) 67 if err != nil { 68 t.Fatalf("Failed to get indexed community: %v", err) 69 } 70 71 if community.DID != communityDID { 72 t.Errorf("Expected DID %s, got %s", communityDID, community.DID) 73 } 74 if community.DisplayName != "Test Community" { 75 t.Errorf("Expected DisplayName 'Test Community', got %s", community.DisplayName) 76 } 77 if community.Visibility != "public" { 78 t.Errorf("Expected Visibility 'public', got %s", community.Visibility) 79 } 80 }) 81 82 t.Run("updates existing community", func(t *testing.T) { 83 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 84 communityDID := generateTestDID(uniqueSuffix) 85 handle := fmt.Sprintf("!update-test-%s@coves.local", uniqueSuffix) 86 87 // Create initial community 88 initialCommunity := &communities.Community{ 89 DID: communityDID, 90 Handle: handle, 91 Name: "update-test", 92 DisplayName: "Original Name", 93 Description: "Original description", 94 OwnerDID: "did:web:coves.local", 95 CreatedByDID: "did:plc:user123", 96 HostedByDID: "did:web:coves.local", 97 Visibility: "public", 98 AllowExternalDiscovery: true, 99 CreatedAt: time.Now(), 100 UpdatedAt: time.Now(), 101 } 102 103 if _, err := repo.Create(ctx, initialCommunity); err != nil { 104 t.Fatalf("Failed to create initial community: %v", err) 105 } 106 107 // Simulate update event 108 updateEvent := &jetstream.JetstreamEvent{ 109 Did: communityDID, 110 TimeUS: time.Now().UnixMicro(), 111 Kind: "commit", 112 Commit: &jetstream.CommitEvent{ 113 Rev: "rev124", 114 Operation: "update", 115 Collection: "social.coves.community.profile", 116 RKey: "self", 117 CID: "bafy456def", 118 Record: map[string]interface{}{ 119 "did": communityDID, // Community's unique DID 120 "handle": handle, 121 "name": "update-test", 122 "displayName": "Updated Name", 123 "description": "Updated description", 124 "owner": "did:web:coves.local", 125 "createdBy": "did:plc:user123", 126 "hostedBy": "did:web:coves.local", 127 "visibility": "unlisted", 128 "federation": map[string]interface{}{ 129 "allowExternalDiscovery": false, 130 }, 131 "memberCount": 5, 132 "subscriberCount": 10, 133 "createdAt": time.Now().Format(time.RFC3339), 134 }, 135 }, 136 } 137 138 // Handle the update 139 if err := consumer.HandleEvent(ctx, updateEvent); err != nil { 140 t.Fatalf("Failed to handle update event: %v", err) 141 } 142 143 // Verify community was updated 144 updated, err := repo.GetByDID(ctx, communityDID) 145 if err != nil { 146 t.Fatalf("Failed to get updated community: %v", err) 147 } 148 149 if updated.DisplayName != "Updated Name" { 150 t.Errorf("Expected DisplayName 'Updated Name', got %s", updated.DisplayName) 151 } 152 if updated.Description != "Updated description" { 153 t.Errorf("Expected Description 'Updated description', got %s", updated.Description) 154 } 155 if updated.Visibility != "unlisted" { 156 t.Errorf("Expected Visibility 'unlisted', got %s", updated.Visibility) 157 } 158 if updated.AllowExternalDiscovery { 159 t.Error("Expected AllowExternalDiscovery to be false") 160 } 161 }) 162 163 t.Run("deletes community", func(t *testing.T) { 164 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 165 communityDID := generateTestDID(uniqueSuffix) 166 167 // Create community to delete 168 community := &communities.Community{ 169 DID: communityDID, 170 Handle: fmt.Sprintf("!delete-test-%s@coves.local", uniqueSuffix), 171 Name: "delete-test", 172 OwnerDID: "did:web:coves.local", 173 CreatedByDID: "did:plc:user123", 174 HostedByDID: "did:web:coves.local", 175 Visibility: "public", 176 CreatedAt: time.Now(), 177 UpdatedAt: time.Now(), 178 } 179 180 if _, err := repo.Create(ctx, community); err != nil { 181 t.Fatalf("Failed to create community: %v", err) 182 } 183 184 // Simulate delete event 185 deleteEvent := &jetstream.JetstreamEvent{ 186 Did: communityDID, 187 TimeUS: time.Now().UnixMicro(), 188 Kind: "commit", 189 Commit: &jetstream.CommitEvent{ 190 Rev: "rev125", 191 Operation: "delete", 192 Collection: "social.coves.community.profile", 193 RKey: "self", 194 }, 195 } 196 197 // Handle the delete 198 if err := consumer.HandleEvent(ctx, deleteEvent); err != nil { 199 t.Fatalf("Failed to handle delete event: %v", err) 200 } 201 202 // Verify community was deleted 203 if _, err := repo.GetByDID(ctx, communityDID); err != communities.ErrCommunityNotFound { 204 t.Errorf("Expected ErrCommunityNotFound, got: %v", err) 205 } 206 }) 207} 208 209func TestCommunityConsumer_HandleSubscription(t *testing.T) { 210 db := setupTestDB(t) 211 defer func() { 212 if err := db.Close(); err != nil { 213 t.Logf("Failed to close database: %v", err) 214 } 215 }() 216 217 repo := postgres.NewCommunityRepository(db) 218 consumer := jetstream.NewCommunityEventConsumer(repo) 219 ctx := context.Background() 220 221 t.Run("creates subscription from event", func(t *testing.T) { 222 // Create a community first 223 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 224 communityDID := generateTestDID(uniqueSuffix) 225 226 community := &communities.Community{ 227 DID: communityDID, 228 Handle: fmt.Sprintf("!sub-test-%s@coves.local", uniqueSuffix), 229 Name: "sub-test", 230 OwnerDID: "did:web:coves.local", 231 CreatedByDID: "did:plc:user123", 232 HostedByDID: "did:web:coves.local", 233 Visibility: "public", 234 CreatedAt: time.Now(), 235 UpdatedAt: time.Now(), 236 } 237 238 if _, err := repo.Create(ctx, community); err != nil { 239 t.Fatalf("Failed to create community: %v", err) 240 } 241 242 // Simulate subscription event 243 userDID := "did:plc:subscriber123" 244 subEvent := &jetstream.JetstreamEvent{ 245 Did: userDID, 246 TimeUS: time.Now().UnixMicro(), 247 Kind: "commit", 248 Commit: &jetstream.CommitEvent{ 249 Rev: "rev200", 250 Operation: "create", 251 Collection: "social.coves.community.subscribe", 252 RKey: "sub123", 253 CID: "bafy789ghi", 254 Record: map[string]interface{}{ 255 "community": communityDID, 256 }, 257 }, 258 } 259 260 // Handle the subscription 261 if err := consumer.HandleEvent(ctx, subEvent); err != nil { 262 t.Fatalf("Failed to handle subscription event: %v", err) 263 } 264 265 // Verify subscription was created 266 subscription, err := repo.GetSubscription(ctx, userDID, communityDID) 267 if err != nil { 268 t.Fatalf("Failed to get subscription: %v", err) 269 } 270 271 if subscription.UserDID != userDID { 272 t.Errorf("Expected UserDID %s, got %s", userDID, subscription.UserDID) 273 } 274 if subscription.CommunityDID != communityDID { 275 t.Errorf("Expected CommunityDID %s, got %s", communityDID, subscription.CommunityDID) 276 } 277 278 // Verify subscriber count was incremented 279 updated, err := repo.GetByDID(ctx, communityDID) 280 if err != nil { 281 t.Fatalf("Failed to get community: %v", err) 282 } 283 284 if updated.SubscriberCount != 1 { 285 t.Errorf("Expected SubscriberCount 1, got %d", updated.SubscriberCount) 286 } 287 }) 288} 289 290func TestCommunityConsumer_IgnoresNonCommunityEvents(t *testing.T) { 291 db := setupTestDB(t) 292 defer func() { 293 if err := db.Close(); err != nil { 294 t.Logf("Failed to close database: %v", err) 295 } 296 }() 297 298 repo := postgres.NewCommunityRepository(db) 299 consumer := jetstream.NewCommunityEventConsumer(repo) 300 ctx := context.Background() 301 302 t.Run("ignores identity events", func(t *testing.T) { 303 event := &jetstream.JetstreamEvent{ 304 Did: "did:plc:user123", 305 TimeUS: time.Now().UnixMicro(), 306 Kind: "identity", 307 Identity: &jetstream.IdentityEvent{ 308 Did: "did:plc:user123", 309 Handle: "alice.bsky.social", 310 }, 311 } 312 313 err := consumer.HandleEvent(ctx, event) 314 if err != nil { 315 t.Errorf("Expected no error for identity event, got: %v", err) 316 } 317 }) 318 319 t.Run("ignores non-community collections", func(t *testing.T) { 320 event := &jetstream.JetstreamEvent{ 321 Did: "did:plc:user123", 322 TimeUS: time.Now().UnixMicro(), 323 Kind: "commit", 324 Commit: &jetstream.CommitEvent{ 325 Rev: "rev300", 326 Operation: "create", 327 Collection: "app.bsky.feed.post", 328 RKey: "post123", 329 Record: map[string]interface{}{ 330 "text": "Hello world", 331 }, 332 }, 333 } 334 335 err := consumer.HandleEvent(ctx, event) 336 if err != nil { 337 t.Errorf("Expected no error for non-community event, got: %v", err) 338 } 339 }) 340}