A community based topic aggregation platform built on atproto
1package communities 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "net/http" 10 "regexp" 11 "strings" 12 "time" 13 14 "Coves/internal/atproto/did" 15) 16 17// Community handle validation regex (!name@instance) 18var communityHandleRegex = regexp.MustCompile(`^![a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`) 19 20type communityService struct { 21 repo Repository 22 didGen *did.Generator 23 pdsURL string // PDS URL for write-forward operations 24 instanceDID string // DID of this Coves instance 25 pdsAccessToken string // Access token for authenticating to PDS as the instance 26} 27 28// NewCommunityService creates a new community service 29func NewCommunityService(repo Repository, didGen *did.Generator, pdsURL string, instanceDID string) Service { 30 return &communityService{ 31 repo: repo, 32 didGen: didGen, 33 pdsURL: pdsURL, 34 instanceDID: instanceDID, 35 } 36} 37 38// SetPDSAccessToken sets the PDS access token for authentication 39// This should be called after creating a session for the Coves instance DID on the PDS 40func (s *communityService) SetPDSAccessToken(token string) { 41 s.pdsAccessToken = token 42} 43 44// CreateCommunity creates a new community via write-forward to PDS 45// Flow: Service -> PDS (creates record) -> Firehose -> Consumer -> AppView DB 46func (s *communityService) CreateCommunity(ctx context.Context, req CreateCommunityRequest) (*Community, error) { 47 // Apply defaults before validation 48 if req.Visibility == "" { 49 req.Visibility = "public" 50 } 51 52 // Validate request 53 if err := s.validateCreateRequest(req); err != nil { 54 return nil, err 55 } 56 57 // Generate a unique DID for the community 58 communityDID, err := s.didGen.GenerateCommunityDID() 59 if err != nil { 60 return nil, fmt.Errorf("failed to generate community DID: %w", err) 61 } 62 63 // Build scoped handle: !{name}@{instance} 64 instanceDomain := extractDomain(s.instanceDID) 65 if instanceDomain == "" { 66 instanceDomain = "coves.local" // Fallback for testing 67 } 68 handle := fmt.Sprintf("!%s@%s", req.Name, instanceDomain) 69 70 // Validate the generated handle 71 if err := s.ValidateHandle(handle); err != nil { 72 return nil, fmt.Errorf("generated handle is invalid: %w", err) 73 } 74 75 // Build community profile record 76 profile := map[string]interface{}{ 77 "$type": "social.coves.community.profile", 78 "did": communityDID, // Unique identifier for this community 79 "handle": handle, 80 "name": req.Name, 81 "visibility": req.Visibility, 82 "owner": s.instanceDID, // V1: instance owns the community 83 "createdBy": req.CreatedByDID, 84 "hostedBy": req.HostedByDID, 85 "createdAt": time.Now().Format(time.RFC3339), 86 "federation": map[string]interface{}{ 87 "allowExternalDiscovery": req.AllowExternalDiscovery, 88 }, 89 } 90 91 // Add optional fields 92 if req.DisplayName != "" { 93 profile["displayName"] = req.DisplayName 94 } 95 if req.Description != "" { 96 profile["description"] = req.Description 97 } 98 if len(req.Rules) > 0 { 99 profile["rules"] = req.Rules 100 } 101 if len(req.Categories) > 0 { 102 profile["categories"] = req.Categories 103 } 104 if req.Language != "" { 105 profile["language"] = req.Language 106 } 107 108 // Initialize counts 109 profile["memberCount"] = 0 110 profile["subscriberCount"] = 0 111 112 // TODO: Handle avatar and banner blobs 113 // For now, we'll skip blob uploads. This would require: 114 // 1. Upload blob to PDS via com.atproto.repo.uploadBlob 115 // 2. Get blob ref (CID) 116 // 3. Add to profile record 117 118 // Write-forward to PDS: create the community profile record in the INSTANCE's repository 119 // The instance owns all community records, community DID is just metadata in the record 120 // Record will be at: at://INSTANCE_DID/social.coves.community.profile/COMMUNITY_RKEY 121 recordURI, recordCID, err := s.createRecordOnPDS(ctx, s.instanceDID, "social.coves.community.profile", "", profile) 122 if err != nil { 123 return nil, fmt.Errorf("failed to create community on PDS: %w", err) 124 } 125 126 // Return a Community object representing what was created 127 // Note: This won't be in AppView DB until the Jetstream consumer processes it 128 community := &Community{ 129 DID: communityDID, 130 Handle: handle, 131 Name: req.Name, 132 DisplayName: req.DisplayName, 133 Description: req.Description, 134 OwnerDID: s.instanceDID, 135 CreatedByDID: req.CreatedByDID, 136 HostedByDID: req.HostedByDID, 137 Visibility: req.Visibility, 138 AllowExternalDiscovery: req.AllowExternalDiscovery, 139 MemberCount: 0, 140 SubscriberCount: 0, 141 CreatedAt: time.Now(), 142 UpdatedAt: time.Now(), 143 RecordURI: recordURI, 144 RecordCID: recordCID, 145 } 146 147 return community, nil 148} 149 150// GetCommunity retrieves a community from AppView DB 151// identifier can be either a DID or handle 152func (s *communityService) GetCommunity(ctx context.Context, identifier string) (*Community, error) { 153 if identifier == "" { 154 return nil, ErrInvalidInput 155 } 156 157 // Determine if identifier is DID or handle 158 if strings.HasPrefix(identifier, "did:") { 159 return s.repo.GetByDID(ctx, identifier) 160 } 161 162 if strings.HasPrefix(identifier, "!") { 163 return s.repo.GetByHandle(ctx, identifier) 164 } 165 166 return nil, NewValidationError("identifier", "must be a DID or handle") 167} 168 169// UpdateCommunity updates a community via write-forward to PDS 170func (s *communityService) UpdateCommunity(ctx context.Context, req UpdateCommunityRequest) (*Community, error) { 171 if req.CommunityDID == "" { 172 return nil, NewValidationError("communityDid", "required") 173 } 174 175 if req.UpdatedByDID == "" { 176 return nil, NewValidationError("updatedByDid", "required") 177 } 178 179 // Get existing community 180 existing, err := s.repo.GetByDID(ctx, req.CommunityDID) 181 if err != nil { 182 return nil, err 183 } 184 185 // Authorization: verify user is the creator 186 // TODO(Communities-Auth): Add moderator check when moderation system is implemented 187 if existing.CreatedByDID != req.UpdatedByDID { 188 return nil, ErrUnauthorized 189 } 190 191 // Build updated profile record (start with existing) 192 profile := map[string]interface{}{ 193 "$type": "social.coves.community.profile", 194 "handle": existing.Handle, 195 "name": existing.Name, 196 "owner": existing.OwnerDID, 197 "createdBy": existing.CreatedByDID, 198 "hostedBy": existing.HostedByDID, 199 "createdAt": existing.CreatedAt.Format(time.RFC3339), 200 } 201 202 // Apply updates 203 if req.DisplayName != nil { 204 profile["displayName"] = *req.DisplayName 205 } else { 206 profile["displayName"] = existing.DisplayName 207 } 208 209 if req.Description != nil { 210 profile["description"] = *req.Description 211 } else { 212 profile["description"] = existing.Description 213 } 214 215 if req.Visibility != nil { 216 profile["visibility"] = *req.Visibility 217 } else { 218 profile["visibility"] = existing.Visibility 219 } 220 221 if req.AllowExternalDiscovery != nil { 222 profile["federation"] = map[string]interface{}{ 223 "allowExternalDiscovery": *req.AllowExternalDiscovery, 224 } 225 } else { 226 profile["federation"] = map[string]interface{}{ 227 "allowExternalDiscovery": existing.AllowExternalDiscovery, 228 } 229 } 230 231 if req.ModerationType != nil { 232 profile["moderationType"] = *req.ModerationType 233 } 234 235 if len(req.ContentWarnings) > 0 { 236 profile["contentWarnings"] = req.ContentWarnings 237 } 238 239 // Preserve counts 240 profile["memberCount"] = existing.MemberCount 241 profile["subscriberCount"] = existing.SubscriberCount 242 243 // Extract rkey from existing record URI (communities live in instance's repo) 244 rkey := extractRKeyFromURI(existing.RecordURI) 245 if rkey == "" { 246 return nil, fmt.Errorf("invalid community record URI: %s", existing.RecordURI) 247 } 248 249 // Write-forward: update record on PDS using INSTANCE DID (communities are stored in instance repo) 250 recordURI, recordCID, err := s.putRecordOnPDS(ctx, s.instanceDID, "social.coves.community.profile", rkey, profile) 251 if err != nil { 252 return nil, fmt.Errorf("failed to update community on PDS: %w", err) 253 } 254 255 // Return updated community representation 256 // Actual AppView DB update happens via Jetstream consumer 257 updated := *existing 258 if req.DisplayName != nil { 259 updated.DisplayName = *req.DisplayName 260 } 261 if req.Description != nil { 262 updated.Description = *req.Description 263 } 264 if req.Visibility != nil { 265 updated.Visibility = *req.Visibility 266 } 267 if req.AllowExternalDiscovery != nil { 268 updated.AllowExternalDiscovery = *req.AllowExternalDiscovery 269 } 270 if req.ModerationType != nil { 271 updated.ModerationType = *req.ModerationType 272 } 273 if len(req.ContentWarnings) > 0 { 274 updated.ContentWarnings = req.ContentWarnings 275 } 276 updated.RecordURI = recordURI 277 updated.RecordCID = recordCID 278 updated.UpdatedAt = time.Now() 279 280 return &updated, nil 281} 282 283// ListCommunities queries AppView DB for communities with filters 284func (s *communityService) ListCommunities(ctx context.Context, req ListCommunitiesRequest) ([]*Community, int, error) { 285 // Set defaults 286 if req.Limit <= 0 || req.Limit > 100 { 287 req.Limit = 50 288 } 289 290 return s.repo.List(ctx, req) 291} 292 293// SearchCommunities performs fuzzy search in AppView DB 294func (s *communityService) SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) { 295 if req.Query == "" { 296 return nil, 0, NewValidationError("query", "search query is required") 297 } 298 299 // Set defaults 300 if req.Limit <= 0 || req.Limit > 100 { 301 req.Limit = 50 302 } 303 304 return s.repo.Search(ctx, req) 305} 306 307// SubscribeToCommunity creates a subscription via write-forward to PDS 308func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, communityIdentifier string) (*Subscription, error) { 309 if userDID == "" { 310 return nil, NewValidationError("userDid", "required") 311 } 312 313 // Resolve community identifier to DID 314 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 315 if err != nil { 316 return nil, err 317 } 318 319 // Verify community exists 320 community, err := s.repo.GetByDID(ctx, communityDID) 321 if err != nil { 322 return nil, err 323 } 324 325 // Check visibility - can't subscribe to private communities without invitation (TODO) 326 if community.Visibility == "private" { 327 return nil, ErrUnauthorized 328 } 329 330 // Build subscription record 331 subRecord := map[string]interface{}{ 332 "$type": "social.coves.community.subscribe", 333 "community": communityDID, 334 } 335 336 // Write-forward: create subscription record in user's repo 337 recordURI, recordCID, err := s.createRecordOnPDS(ctx, userDID, "social.coves.community.subscribe", "", subRecord) 338 if err != nil { 339 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err) 340 } 341 342 // Return subscription representation 343 subscription := &Subscription{ 344 UserDID: userDID, 345 CommunityDID: communityDID, 346 SubscribedAt: time.Now(), 347 RecordURI: recordURI, 348 RecordCID: recordCID, 349 } 350 351 return subscription, nil 352} 353 354// UnsubscribeFromCommunity removes a subscription via PDS delete 355func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, communityIdentifier string) error { 356 if userDID == "" { 357 return NewValidationError("userDid", "required") 358 } 359 360 // Resolve community identifier 361 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 362 if err != nil { 363 return err 364 } 365 366 // Get the subscription from AppView to find the record key 367 subscription, err := s.repo.GetSubscription(ctx, userDID, communityDID) 368 if err != nil { 369 return err 370 } 371 372 // Extract rkey from record URI (at://did/collection/rkey) 373 rkey := extractRKeyFromURI(subscription.RecordURI) 374 if rkey == "" { 375 return fmt.Errorf("invalid subscription record URI") 376 } 377 378 // Write-forward: delete record from PDS 379 if err := s.deleteRecordOnPDS(ctx, userDID, "social.coves.community.subscribe", rkey); err != nil { 380 return fmt.Errorf("failed to delete subscription on PDS: %w", err) 381 } 382 383 return nil 384} 385 386// GetUserSubscriptions queries AppView DB for user's subscriptions 387func (s *communityService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) { 388 if limit <= 0 || limit > 100 { 389 limit = 50 390 } 391 392 return s.repo.ListSubscriptions(ctx, userDID, limit, offset) 393} 394 395// GetCommunitySubscribers queries AppView DB for community subscribers 396func (s *communityService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) { 397 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 398 if err != nil { 399 return nil, err 400 } 401 402 if limit <= 0 || limit > 100 { 403 limit = 50 404 } 405 406 return s.repo.ListSubscribers(ctx, communityDID, limit, offset) 407} 408 409// GetMembership retrieves membership info from AppView DB 410func (s *communityService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error) { 411 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 412 if err != nil { 413 return nil, err 414 } 415 416 return s.repo.GetMembership(ctx, userDID, communityDID) 417} 418 419// ListCommunityMembers queries AppView DB for members 420func (s *communityService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Membership, error) { 421 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier) 422 if err != nil { 423 return nil, err 424 } 425 426 if limit <= 0 || limit > 100 { 427 limit = 50 428 } 429 430 return s.repo.ListMembers(ctx, communityDID, limit, offset) 431} 432 433// ValidateHandle checks if a community handle is valid 434func (s *communityService) ValidateHandle(handle string) error { 435 if handle == "" { 436 return NewValidationError("handle", "required") 437 } 438 439 if !communityHandleRegex.MatchString(handle) { 440 return ErrInvalidHandle 441 } 442 443 return nil 444} 445 446// ResolveCommunityIdentifier converts a handle or DID to a DID 447func (s *communityService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) { 448 if identifier == "" { 449 return "", ErrInvalidInput 450 } 451 452 // If it's already a DID, return it 453 if strings.HasPrefix(identifier, "did:") { 454 return identifier, nil 455 } 456 457 // If it's a handle, look it up in AppView DB 458 if strings.HasPrefix(identifier, "!") { 459 community, err := s.repo.GetByHandle(ctx, identifier) 460 if err != nil { 461 return "", err 462 } 463 return community.DID, nil 464 } 465 466 return "", NewValidationError("identifier", "must be a DID or handle") 467} 468 469// Validation helpers 470 471func (s *communityService) validateCreateRequest(req CreateCommunityRequest) error { 472 if req.Name == "" { 473 return NewValidationError("name", "required") 474 } 475 476 if len(req.Name) > 64 { 477 return NewValidationError("name", "must be 64 characters or less") 478 } 479 480 // Name can only contain alphanumeric and hyphens 481 nameRegex := regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,62}[a-zA-Z0-9])?$`) 482 if !nameRegex.MatchString(req.Name) { 483 return NewValidationError("name", "must contain only alphanumeric characters and hyphens") 484 } 485 486 if req.Description != "" && len(req.Description) > 3000 { 487 return NewValidationError("description", "must be 3000 characters or less") 488 } 489 490 // Visibility should already be set with default in CreateCommunity 491 if req.Visibility != "public" && req.Visibility != "unlisted" && req.Visibility != "private" { 492 return ErrInvalidVisibility 493 } 494 495 if req.CreatedByDID == "" { 496 return NewValidationError("createdByDid", "required") 497 } 498 499 if req.HostedByDID == "" { 500 return NewValidationError("hostedByDid", "required") 501 } 502 503 return nil 504} 505 506// PDS write-forward helpers 507 508func (s *communityService) createRecordOnPDS(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}) (string, string, error) { 509 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/")) 510 511 payload := map[string]interface{}{ 512 "repo": repoDID, 513 "collection": collection, 514 "record": record, 515 } 516 517 if rkey != "" { 518 payload["rkey"] = rkey 519 } 520 521 return s.callPDS(ctx, "POST", endpoint, payload) 522} 523 524func (s *communityService) putRecordOnPDS(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}) (string, string, error) { 525 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", strings.TrimSuffix(s.pdsURL, "/")) 526 527 payload := map[string]interface{}{ 528 "repo": repoDID, 529 "collection": collection, 530 "rkey": rkey, 531 "record": record, 532 } 533 534 return s.callPDS(ctx, "POST", endpoint, payload) 535} 536 537func (s *communityService) deleteRecordOnPDS(ctx context.Context, repoDID, collection, rkey string) error { 538 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/")) 539 540 payload := map[string]interface{}{ 541 "repo": repoDID, 542 "collection": collection, 543 "rkey": rkey, 544 } 545 546 _, _, err := s.callPDS(ctx, "POST", endpoint, payload) 547 return err 548} 549 550func (s *communityService) callPDS(ctx context.Context, method, endpoint string, payload map[string]interface{}) (string, string, error) { 551 jsonData, err := json.Marshal(payload) 552 if err != nil { 553 return "", "", fmt.Errorf("failed to marshal payload: %w", err) 554 } 555 556 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData)) 557 if err != nil { 558 return "", "", fmt.Errorf("failed to create request: %w", err) 559 } 560 req.Header.Set("Content-Type", "application/json") 561 562 // Add authentication if we have an access token 563 if s.pdsAccessToken != "" { 564 req.Header.Set("Authorization", "Bearer "+s.pdsAccessToken) 565 } 566 567 client := &http.Client{Timeout: 10 * time.Second} 568 resp, err := client.Do(req) 569 if err != nil { 570 return "", "", fmt.Errorf("failed to call PDS: %w", err) 571 } 572 defer resp.Body.Close() 573 574 body, err := io.ReadAll(resp.Body) 575 if err != nil { 576 return "", "", fmt.Errorf("failed to read response: %w", err) 577 } 578 579 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 580 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 581 } 582 583 // Parse response to extract URI and CID 584 var result struct { 585 URI string `json:"uri"` 586 CID string `json:"cid"` 587 } 588 if err := json.Unmarshal(body, &result); err != nil { 589 // For delete operations, there might not be a response body 590 if method == "POST" && strings.Contains(endpoint, "deleteRecord") { 591 return "", "", nil 592 } 593 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 594 } 595 596 return result.URI, result.CID, nil 597} 598 599// Helper functions 600 601func extractDomain(didOrURL string) string { 602 // For did:web:example.com -> example.com 603 if strings.HasPrefix(didOrURL, "did:web:") { 604 parts := strings.Split(didOrURL, ":") 605 if len(parts) >= 3 { 606 return parts[2] 607 } 608 } 609 610 // For URLs, extract domain 611 if strings.Contains(didOrURL, "://") { 612 parts := strings.Split(didOrURL, "://") 613 if len(parts) >= 2 { 614 domain := strings.Split(parts[1], "/")[0] 615 domain = strings.Split(domain, ":")[0] // Remove port 616 return domain 617 } 618 } 619 620 return "" 621} 622 623func extractRKeyFromURI(uri string) string { 624 // at://did/collection/rkey -> rkey 625 parts := strings.Split(uri, "/") 626 if len(parts) >= 4 { 627 return parts[len(parts)-1] 628 } 629 return "" 630}