A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "Coves/internal/atproto/did" 5 "Coves/internal/atproto/jetstream" 6 "Coves/internal/core/communities" 7 "Coves/internal/db/postgres" 8 "context" 9 "fmt" 10 "testing" 11 "time" 12) 13 14func TestCommunityConsumer_HandleCommunityProfile(t *testing.T) { 15 db := setupTestDB(t) 16 defer func() { 17 if err := db.Close(); err != nil { 18 t.Logf("Failed to close database: %v", err) 19 } 20 }() 21 22 repo := postgres.NewCommunityRepository(db) 23 consumer := jetstream.NewCommunityEventConsumer(repo) 24 didGen := did.NewGenerator(true, "https://plc.directory") 25 ctx := context.Background() 26 27 t.Run("creates community from firehose event", func(t *testing.T) { 28 communityDID, err := didGen.GenerateCommunityDID() 29 if err != nil { 30 t.Fatalf("Failed to generate community DID: %v", err) 31 } 32 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 33 34 // Simulate a Jetstream commit event 35 event := &jetstream.JetstreamEvent{ 36 Did: communityDID, 37 TimeUS: time.Now().UnixMicro(), 38 Kind: "commit", 39 Commit: &jetstream.CommitEvent{ 40 Rev: "rev123", 41 Operation: "create", 42 Collection: "social.coves.community.profile", 43 RKey: "self", 44 CID: "bafy123abc", 45 Record: map[string]interface{}{ 46 "did": communityDID, // Community's unique DID 47 "handle": fmt.Sprintf("!test-community-%s@coves.local", uniqueSuffix), 48 "name": "test-community", 49 "displayName": "Test Community", 50 "description": "A test community", 51 "owner": "did:web:coves.local", 52 "createdBy": "did:plc:user123", 53 "hostedBy": "did:web:coves.local", 54 "visibility": "public", 55 "federation": map[string]interface{}{ 56 "allowExternalDiscovery": true, 57 }, 58 "memberCount": 0, 59 "subscriberCount": 0, 60 "createdAt": time.Now().Format(time.RFC3339), 61 }, 62 }, 63 } 64 65 // Handle the event 66 if err := consumer.HandleEvent(ctx, event); err != nil { 67 t.Fatalf("Failed to handle event: %v", err) 68 } 69 70 // Verify community was indexed 71 community, err := repo.GetByDID(ctx, communityDID) 72 if err != nil { 73 t.Fatalf("Failed to get indexed community: %v", err) 74 } 75 76 if community.DID != communityDID { 77 t.Errorf("Expected DID %s, got %s", communityDID, community.DID) 78 } 79 if community.DisplayName != "Test Community" { 80 t.Errorf("Expected DisplayName 'Test Community', got %s", community.DisplayName) 81 } 82 if community.Visibility != "public" { 83 t.Errorf("Expected Visibility 'public', got %s", community.Visibility) 84 } 85 }) 86 87 t.Run("updates existing community", func(t *testing.T) { 88 communityDID, err := didGen.GenerateCommunityDID() 89 if err != nil { 90 t.Fatalf("Failed to generate community DID: %v", err) 91 } 92 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 93 handle := fmt.Sprintf("!update-test-%s@coves.local", uniqueSuffix) 94 95 // Create initial community 96 initialCommunity := &communities.Community{ 97 DID: communityDID, 98 Handle: handle, 99 Name: "update-test", 100 DisplayName: "Original Name", 101 Description: "Original description", 102 OwnerDID: "did:web:coves.local", 103 CreatedByDID: "did:plc:user123", 104 HostedByDID: "did:web:coves.local", 105 Visibility: "public", 106 AllowExternalDiscovery: true, 107 CreatedAt: time.Now(), 108 UpdatedAt: time.Now(), 109 } 110 111 if _, err := repo.Create(ctx, initialCommunity); err != nil { 112 t.Fatalf("Failed to create initial community: %v", err) 113 } 114 115 // Simulate update event 116 updateEvent := &jetstream.JetstreamEvent{ 117 Did: communityDID, 118 TimeUS: time.Now().UnixMicro(), 119 Kind: "commit", 120 Commit: &jetstream.CommitEvent{ 121 Rev: "rev124", 122 Operation: "update", 123 Collection: "social.coves.community.profile", 124 RKey: "self", 125 CID: "bafy456def", 126 Record: map[string]interface{}{ 127 "did": communityDID, // Community's unique DID 128 "handle": handle, 129 "name": "update-test", 130 "displayName": "Updated Name", 131 "description": "Updated description", 132 "owner": "did:web:coves.local", 133 "createdBy": "did:plc:user123", 134 "hostedBy": "did:web:coves.local", 135 "visibility": "unlisted", 136 "federation": map[string]interface{}{ 137 "allowExternalDiscovery": false, 138 }, 139 "memberCount": 5, 140 "subscriberCount": 10, 141 "createdAt": time.Now().Format(time.RFC3339), 142 }, 143 }, 144 } 145 146 // Handle the update 147 if err := consumer.HandleEvent(ctx, updateEvent); err != nil { 148 t.Fatalf("Failed to handle update event: %v", err) 149 } 150 151 // Verify community was updated 152 updated, err := repo.GetByDID(ctx, communityDID) 153 if err != nil { 154 t.Fatalf("Failed to get updated community: %v", err) 155 } 156 157 if updated.DisplayName != "Updated Name" { 158 t.Errorf("Expected DisplayName 'Updated Name', got %s", updated.DisplayName) 159 } 160 if updated.Description != "Updated description" { 161 t.Errorf("Expected Description 'Updated description', got %s", updated.Description) 162 } 163 if updated.Visibility != "unlisted" { 164 t.Errorf("Expected Visibility 'unlisted', got %s", updated.Visibility) 165 } 166 if updated.AllowExternalDiscovery { 167 t.Error("Expected AllowExternalDiscovery to be false") 168 } 169 }) 170 171 t.Run("deletes community", func(t *testing.T) { 172 communityDID, err := didGen.GenerateCommunityDID() 173 if err != nil { 174 t.Fatalf("Failed to generate community DID: %v", err) 175 } 176 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 177 178 // Create community to delete 179 community := &communities.Community{ 180 DID: communityDID, 181 Handle: fmt.Sprintf("!delete-test-%s@coves.local", uniqueSuffix), 182 Name: "delete-test", 183 OwnerDID: "did:web:coves.local", 184 CreatedByDID: "did:plc:user123", 185 HostedByDID: "did:web:coves.local", 186 Visibility: "public", 187 CreatedAt: time.Now(), 188 UpdatedAt: time.Now(), 189 } 190 191 if _, err := repo.Create(ctx, community); err != nil { 192 t.Fatalf("Failed to create community: %v", err) 193 } 194 195 // Simulate delete event 196 deleteEvent := &jetstream.JetstreamEvent{ 197 Did: communityDID, 198 TimeUS: time.Now().UnixMicro(), 199 Kind: "commit", 200 Commit: &jetstream.CommitEvent{ 201 Rev: "rev125", 202 Operation: "delete", 203 Collection: "social.coves.community.profile", 204 RKey: "self", 205 }, 206 } 207 208 // Handle the delete 209 if err := consumer.HandleEvent(ctx, deleteEvent); err != nil { 210 t.Fatalf("Failed to handle delete event: %v", err) 211 } 212 213 // Verify community was deleted 214 if _, err := repo.GetByDID(ctx, communityDID); err != communities.ErrCommunityNotFound { 215 t.Errorf("Expected ErrCommunityNotFound, got: %v", err) 216 } 217 }) 218} 219 220func TestCommunityConsumer_HandleSubscription(t *testing.T) { 221 db := setupTestDB(t) 222 defer func() { 223 if err := db.Close(); err != nil { 224 t.Logf("Failed to close database: %v", err) 225 } 226 }() 227 228 repo := postgres.NewCommunityRepository(db) 229 consumer := jetstream.NewCommunityEventConsumer(repo) 230 didGen := did.NewGenerator(true, "https://plc.directory") 231 ctx := context.Background() 232 233 t.Run("creates subscription from event", func(t *testing.T) { 234 // Create a community first 235 communityDID, err := didGen.GenerateCommunityDID() 236 if err != nil { 237 t.Fatalf("Failed to generate community DID: %v", err) 238 } 239 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 240 241 community := &communities.Community{ 242 DID: communityDID, 243 Handle: fmt.Sprintf("!sub-test-%s@coves.local", uniqueSuffix), 244 Name: "sub-test", 245 OwnerDID: "did:web:coves.local", 246 CreatedByDID: "did:plc:user123", 247 HostedByDID: "did:web:coves.local", 248 Visibility: "public", 249 CreatedAt: time.Now(), 250 UpdatedAt: time.Now(), 251 } 252 253 if _, err := repo.Create(ctx, community); err != nil { 254 t.Fatalf("Failed to create community: %v", err) 255 } 256 257 // Simulate subscription event 258 userDID := "did:plc:subscriber123" 259 subEvent := &jetstream.JetstreamEvent{ 260 Did: userDID, 261 TimeUS: time.Now().UnixMicro(), 262 Kind: "commit", 263 Commit: &jetstream.CommitEvent{ 264 Rev: "rev200", 265 Operation: "create", 266 Collection: "social.coves.community.subscribe", 267 RKey: "sub123", 268 CID: "bafy789ghi", 269 Record: map[string]interface{}{ 270 "community": communityDID, 271 }, 272 }, 273 } 274 275 // Handle the subscription 276 if err := consumer.HandleEvent(ctx, subEvent); err != nil { 277 t.Fatalf("Failed to handle subscription event: %v", err) 278 } 279 280 // Verify subscription was created 281 subscription, err := repo.GetSubscription(ctx, userDID, communityDID) 282 if err != nil { 283 t.Fatalf("Failed to get subscription: %v", err) 284 } 285 286 if subscription.UserDID != userDID { 287 t.Errorf("Expected UserDID %s, got %s", userDID, subscription.UserDID) 288 } 289 if subscription.CommunityDID != communityDID { 290 t.Errorf("Expected CommunityDID %s, got %s", communityDID, subscription.CommunityDID) 291 } 292 293 // Verify subscriber count was incremented 294 updated, err := repo.GetByDID(ctx, communityDID) 295 if err != nil { 296 t.Fatalf("Failed to get community: %v", err) 297 } 298 299 if updated.SubscriberCount != 1 { 300 t.Errorf("Expected SubscriberCount 1, got %d", updated.SubscriberCount) 301 } 302 }) 303} 304 305func TestCommunityConsumer_IgnoresNonCommunityEvents(t *testing.T) { 306 db := setupTestDB(t) 307 defer func() { 308 if err := db.Close(); err != nil { 309 t.Logf("Failed to close database: %v", err) 310 } 311 }() 312 313 repo := postgres.NewCommunityRepository(db) 314 consumer := jetstream.NewCommunityEventConsumer(repo) 315 ctx := context.Background() 316 317 t.Run("ignores identity events", func(t *testing.T) { 318 event := &jetstream.JetstreamEvent{ 319 Did: "did:plc:user123", 320 TimeUS: time.Now().UnixMicro(), 321 Kind: "identity", 322 Identity: &jetstream.IdentityEvent{ 323 Did: "did:plc:user123", 324 Handle: "alice.bsky.social", 325 }, 326 } 327 328 err := consumer.HandleEvent(ctx, event) 329 if err != nil { 330 t.Errorf("Expected no error for identity event, got: %v", err) 331 } 332 }) 333 334 t.Run("ignores non-community collections", func(t *testing.T) { 335 event := &jetstream.JetstreamEvent{ 336 Did: "did:plc:user123", 337 TimeUS: time.Now().UnixMicro(), 338 Kind: "commit", 339 Commit: &jetstream.CommitEvent{ 340 Rev: "rev300", 341 Operation: "create", 342 Collection: "app.bsky.feed.post", 343 RKey: "post123", 344 Record: map[string]interface{}{ 345 "text": "Hello world", 346 }, 347 }, 348 } 349 350 err := consumer.HandleEvent(ctx, event) 351 if err != nil { 352 t.Errorf("Expected no error for non-community event, got: %v", err) 353 } 354 }) 355}