A community based topic aggregation platform built on atproto
1package integration 2 3import ( 4 "context" 5 "fmt" 6 "testing" 7 "time" 8 9 "Coves/internal/atproto/did" 10 "Coves/internal/atproto/jetstream" 11 "Coves/internal/core/communities" 12 "Coves/internal/db/postgres" 13) 14 15func TestCommunityConsumer_HandleCommunityProfile(t *testing.T) { 16 db := setupTestDB(t) 17 defer db.Close() 18 19 repo := postgres.NewCommunityRepository(db) 20 consumer := jetstream.NewCommunityEventConsumer(repo) 21 didGen := did.NewGenerator(true, "https://plc.directory") 22 ctx := context.Background() 23 24 t.Run("creates community from firehose event", func(t *testing.T) { 25 communityDID, _ := didGen.GenerateCommunityDID() 26 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 27 28 // Simulate a Jetstream commit event 29 event := &jetstream.JetstreamEvent{ 30 Did: communityDID, 31 TimeUS: time.Now().UnixMicro(), 32 Kind: "commit", 33 Commit: &jetstream.CommitEvent{ 34 Rev: "rev123", 35 Operation: "create", 36 Collection: "social.coves.community.profile", 37 RKey: "self", 38 CID: "bafy123abc", 39 Record: map[string]interface{}{ 40 "did": communityDID, // Community's unique DID 41 "handle": fmt.Sprintf("!test-community-%s@coves.local", uniqueSuffix), 42 "name": "test-community", 43 "displayName": "Test Community", 44 "description": "A test community", 45 "owner": "did:web:coves.local", 46 "createdBy": "did:plc:user123", 47 "hostedBy": "did:web:coves.local", 48 "visibility": "public", 49 "federation": map[string]interface{}{ 50 "allowExternalDiscovery": true, 51 }, 52 "memberCount": 0, 53 "subscriberCount": 0, 54 "createdAt": time.Now().Format(time.RFC3339), 55 }, 56 }, 57 } 58 59 // Handle the event 60 err := consumer.HandleEvent(ctx, event) 61 if err != nil { 62 t.Fatalf("Failed to handle event: %v", err) 63 } 64 65 // Verify community was indexed 66 community, err := repo.GetByDID(ctx, communityDID) 67 if err != nil { 68 t.Fatalf("Failed to get indexed community: %v", err) 69 } 70 71 if community.DID != communityDID { 72 t.Errorf("Expected DID %s, got %s", communityDID, community.DID) 73 } 74 if community.DisplayName != "Test Community" { 75 t.Errorf("Expected DisplayName 'Test Community', got %s", community.DisplayName) 76 } 77 if community.Visibility != "public" { 78 t.Errorf("Expected Visibility 'public', got %s", community.Visibility) 79 } 80 }) 81 82 t.Run("updates existing community", func(t *testing.T) { 83 communityDID, _ := didGen.GenerateCommunityDID() 84 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 85 handle := fmt.Sprintf("!update-test-%s@coves.local", uniqueSuffix) 86 87 // Create initial community 88 initialCommunity := &communities.Community{ 89 DID: communityDID, 90 Handle: handle, 91 Name: "update-test", 92 DisplayName: "Original Name", 93 Description: "Original description", 94 OwnerDID: "did:web:coves.local", 95 CreatedByDID: "did:plc:user123", 96 HostedByDID: "did:web:coves.local", 97 Visibility: "public", 98 AllowExternalDiscovery: true, 99 CreatedAt: time.Now(), 100 UpdatedAt: time.Now(), 101 } 102 103 _, err := repo.Create(ctx, initialCommunity) 104 if err != nil { 105 t.Fatalf("Failed to create initial community: %v", err) 106 } 107 108 // Simulate update event 109 updateEvent := &jetstream.JetstreamEvent{ 110 Did: communityDID, 111 TimeUS: time.Now().UnixMicro(), 112 Kind: "commit", 113 Commit: &jetstream.CommitEvent{ 114 Rev: "rev124", 115 Operation: "update", 116 Collection: "social.coves.community.profile", 117 RKey: "self", 118 CID: "bafy456def", 119 Record: map[string]interface{}{ 120 "did": communityDID, // Community's unique DID 121 "handle": handle, 122 "name": "update-test", 123 "displayName": "Updated Name", 124 "description": "Updated description", 125 "owner": "did:web:coves.local", 126 "createdBy": "did:plc:user123", 127 "hostedBy": "did:web:coves.local", 128 "visibility": "unlisted", 129 "federation": map[string]interface{}{ 130 "allowExternalDiscovery": false, 131 }, 132 "memberCount": 5, 133 "subscriberCount": 10, 134 "createdAt": time.Now().Format(time.RFC3339), 135 }, 136 }, 137 } 138 139 // Handle the update 140 err = consumer.HandleEvent(ctx, updateEvent) 141 if err != nil { 142 t.Fatalf("Failed to handle update event: %v", err) 143 } 144 145 // Verify community was updated 146 updated, err := repo.GetByDID(ctx, communityDID) 147 if err != nil { 148 t.Fatalf("Failed to get updated community: %v", err) 149 } 150 151 if updated.DisplayName != "Updated Name" { 152 t.Errorf("Expected DisplayName 'Updated Name', got %s", updated.DisplayName) 153 } 154 if updated.Description != "Updated description" { 155 t.Errorf("Expected Description 'Updated description', got %s", updated.Description) 156 } 157 if updated.Visibility != "unlisted" { 158 t.Errorf("Expected Visibility 'unlisted', got %s", updated.Visibility) 159 } 160 if updated.AllowExternalDiscovery { 161 t.Error("Expected AllowExternalDiscovery to be false") 162 } 163 }) 164 165 t.Run("deletes community", func(t *testing.T) { 166 communityDID, _ := didGen.GenerateCommunityDID() 167 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 168 169 // Create community to delete 170 community := &communities.Community{ 171 DID: communityDID, 172 Handle: fmt.Sprintf("!delete-test-%s@coves.local", uniqueSuffix), 173 Name: "delete-test", 174 OwnerDID: "did:web:coves.local", 175 CreatedByDID: "did:plc:user123", 176 HostedByDID: "did:web:coves.local", 177 Visibility: "public", 178 CreatedAt: time.Now(), 179 UpdatedAt: time.Now(), 180 } 181 182 _, err := repo.Create(ctx, community) 183 if err != nil { 184 t.Fatalf("Failed to create community: %v", err) 185 } 186 187 // Simulate delete event 188 deleteEvent := &jetstream.JetstreamEvent{ 189 Did: communityDID, 190 TimeUS: time.Now().UnixMicro(), 191 Kind: "commit", 192 Commit: &jetstream.CommitEvent{ 193 Rev: "rev125", 194 Operation: "delete", 195 Collection: "social.coves.community.profile", 196 RKey: "self", 197 }, 198 } 199 200 // Handle the delete 201 err = consumer.HandleEvent(ctx, deleteEvent) 202 if err != nil { 203 t.Fatalf("Failed to handle delete event: %v", err) 204 } 205 206 // Verify community was deleted 207 _, err = repo.GetByDID(ctx, communityDID) 208 if err != communities.ErrCommunityNotFound { 209 t.Errorf("Expected ErrCommunityNotFound, got: %v", err) 210 } 211 }) 212} 213 214func TestCommunityConsumer_HandleSubscription(t *testing.T) { 215 db := setupTestDB(t) 216 defer db.Close() 217 218 repo := postgres.NewCommunityRepository(db) 219 consumer := jetstream.NewCommunityEventConsumer(repo) 220 didGen := did.NewGenerator(true, "https://plc.directory") 221 ctx := context.Background() 222 223 t.Run("creates subscription from event", func(t *testing.T) { 224 // Create a community first 225 communityDID, _ := didGen.GenerateCommunityDID() 226 uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano()) 227 228 community := &communities.Community{ 229 DID: communityDID, 230 Handle: fmt.Sprintf("!sub-test-%s@coves.local", uniqueSuffix), 231 Name: "sub-test", 232 OwnerDID: "did:web:coves.local", 233 CreatedByDID: "did:plc:user123", 234 HostedByDID: "did:web:coves.local", 235 Visibility: "public", 236 CreatedAt: time.Now(), 237 UpdatedAt: time.Now(), 238 } 239 240 _, err := repo.Create(ctx, community) 241 if err != nil { 242 t.Fatalf("Failed to create community: %v", err) 243 } 244 245 // Simulate subscription event 246 userDID := "did:plc:subscriber123" 247 subEvent := &jetstream.JetstreamEvent{ 248 Did: userDID, 249 TimeUS: time.Now().UnixMicro(), 250 Kind: "commit", 251 Commit: &jetstream.CommitEvent{ 252 Rev: "rev200", 253 Operation: "create", 254 Collection: "social.coves.community.subscribe", 255 RKey: "sub123", 256 CID: "bafy789ghi", 257 Record: map[string]interface{}{ 258 "community": communityDID, 259 }, 260 }, 261 } 262 263 // Handle the subscription 264 err = consumer.HandleEvent(ctx, subEvent) 265 if err != nil { 266 t.Fatalf("Failed to handle subscription event: %v", err) 267 } 268 269 // Verify subscription was created 270 subscription, err := repo.GetSubscription(ctx, userDID, communityDID) 271 if err != nil { 272 t.Fatalf("Failed to get subscription: %v", err) 273 } 274 275 if subscription.UserDID != userDID { 276 t.Errorf("Expected UserDID %s, got %s", userDID, subscription.UserDID) 277 } 278 if subscription.CommunityDID != communityDID { 279 t.Errorf("Expected CommunityDID %s, got %s", communityDID, subscription.CommunityDID) 280 } 281 282 // Verify subscriber count was incremented 283 updated, err := repo.GetByDID(ctx, communityDID) 284 if err != nil { 285 t.Fatalf("Failed to get community: %v", err) 286 } 287 288 if updated.SubscriberCount != 1 { 289 t.Errorf("Expected SubscriberCount 1, got %d", updated.SubscriberCount) 290 } 291 }) 292} 293 294func TestCommunityConsumer_IgnoresNonCommunityEvents(t *testing.T) { 295 db := setupTestDB(t) 296 defer db.Close() 297 298 repo := postgres.NewCommunityRepository(db) 299 consumer := jetstream.NewCommunityEventConsumer(repo) 300 ctx := context.Background() 301 302 t.Run("ignores identity events", func(t *testing.T) { 303 event := &jetstream.JetstreamEvent{ 304 Did: "did:plc:user123", 305 TimeUS: time.Now().UnixMicro(), 306 Kind: "identity", 307 Identity: &jetstream.IdentityEvent{ 308 Did: "did:plc:user123", 309 Handle: "alice.bsky.social", 310 }, 311 } 312 313 err := consumer.HandleEvent(ctx, event) 314 if err != nil { 315 t.Errorf("Expected no error for identity event, got: %v", err) 316 } 317 }) 318 319 t.Run("ignores non-community collections", func(t *testing.T) { 320 event := &jetstream.JetstreamEvent{ 321 Did: "did:plc:user123", 322 TimeUS: time.Now().UnixMicro(), 323 Kind: "commit", 324 Commit: &jetstream.CommitEvent{ 325 Rev: "rev300", 326 Operation: "create", 327 Collection: "app.bsky.feed.post", 328 RKey: "post123", 329 Record: map[string]interface{}{ 330 "text": "Hello world", 331 }, 332 }, 333 } 334 335 err := consumer.HandleEvent(ctx, event) 336 if err != nil { 337 t.Errorf("Expected no error for non-community event, got: %v", err) 338 } 339 }) 340}