A community based topic aggregation platform built on atproto
1package communities
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "net/http"
10 "regexp"
11 "strings"
12 "time"
13
14 "Coves/internal/atproto/did"
15)
16
17// Community handle validation regex (!name@instance)
18var communityHandleRegex = regexp.MustCompile(`^?@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$`)
19
20type communityService struct {
21 repo Repository
22 didGen *did.Generator
23 pdsURL string // PDS URL for write-forward operations
24 instanceDID string // DID of this Coves instance
25 pdsAccessToken string // Access token for authenticating to PDS as the instance
26}
27
28// NewCommunityService creates a new community service
29func NewCommunityService(repo Repository, didGen *did.Generator, pdsURL string, instanceDID string) Service {
30 return &communityService{
31 repo: repo,
32 didGen: didGen,
33 pdsURL: pdsURL,
34 instanceDID: instanceDID,
35 }
36}
37
38// SetPDSAccessToken sets the PDS access token for authentication
39// This should be called after creating a session for the Coves instance DID on the PDS
40func (s *communityService) SetPDSAccessToken(token string) {
41 s.pdsAccessToken = token
42}
43
44// CreateCommunity creates a new community via write-forward to PDS
45// Flow: Service -> PDS (creates record) -> Firehose -> Consumer -> AppView DB
46func (s *communityService) CreateCommunity(ctx context.Context, req CreateCommunityRequest) (*Community, error) {
47 // Apply defaults before validation
48 if req.Visibility == "" {
49 req.Visibility = "public"
50 }
51
52 // Validate request
53 if err := s.validateCreateRequest(req); err != nil {
54 return nil, err
55 }
56
57 // Generate a unique DID for the community
58 communityDID, err := s.didGen.GenerateCommunityDID()
59 if err != nil {
60 return nil, fmt.Errorf("failed to generate community DID: %w", err)
61 }
62
63 // Build scoped handle: !{name}@{instance}
64 instanceDomain := extractDomain(s.instanceDID)
65 if instanceDomain == "" {
66 instanceDomain = "coves.local" // Fallback for testing
67 }
68 handle := fmt.Sprintf("!%s@%s", req.Name, instanceDomain)
69
70 // Validate the generated handle
71 if err := s.ValidateHandle(handle); err != nil {
72 return nil, fmt.Errorf("generated handle is invalid: %w", err)
73 }
74
75 // Build community profile record
76 profile := map[string]interface{}{
77 "$type": "social.coves.community.profile",
78 "did": communityDID, // Unique identifier for this community
79 "handle": handle,
80 "name": req.Name,
81 "visibility": req.Visibility,
82 "owner": s.instanceDID, // V1: instance owns the community
83 "createdBy": req.CreatedByDID,
84 "hostedBy": req.HostedByDID,
85 "createdAt": time.Now().Format(time.RFC3339),
86 "federation": map[string]interface{}{
87 "allowExternalDiscovery": req.AllowExternalDiscovery,
88 },
89 }
90
91 // Add optional fields
92 if req.DisplayName != "" {
93 profile["displayName"] = req.DisplayName
94 }
95 if req.Description != "" {
96 profile["description"] = req.Description
97 }
98 if len(req.Rules) > 0 {
99 profile["rules"] = req.Rules
100 }
101 if len(req.Categories) > 0 {
102 profile["categories"] = req.Categories
103 }
104 if req.Language != "" {
105 profile["language"] = req.Language
106 }
107
108 // Initialize counts
109 profile["memberCount"] = 0
110 profile["subscriberCount"] = 0
111
112 // TODO: Handle avatar and banner blobs
113 // For now, we'll skip blob uploads. This would require:
114 // 1. Upload blob to PDS via com.atproto.repo.uploadBlob
115 // 2. Get blob ref (CID)
116 // 3. Add to profile record
117
118 // Write-forward to PDS: create the community profile record in the INSTANCE's repository
119 // The instance owns all community records, community DID is just metadata in the record
120 // Record will be at: at://INSTANCE_DID/social.coves.community.profile/COMMUNITY_RKEY
121 recordURI, recordCID, err := s.createRecordOnPDS(ctx, s.instanceDID, "social.coves.community.profile", "", profile)
122 if err != nil {
123 return nil, fmt.Errorf("failed to create community on PDS: %w", err)
124 }
125
126 // Return a Community object representing what was created
127 // Note: This won't be in AppView DB until the Jetstream consumer processes it
128 community := &Community{
129 DID: communityDID,
130 Handle: handle,
131 Name: req.Name,
132 DisplayName: req.DisplayName,
133 Description: req.Description,
134 OwnerDID: s.instanceDID,
135 CreatedByDID: req.CreatedByDID,
136 HostedByDID: req.HostedByDID,
137 Visibility: req.Visibility,
138 AllowExternalDiscovery: req.AllowExternalDiscovery,
139 MemberCount: 0,
140 SubscriberCount: 0,
141 CreatedAt: time.Now(),
142 UpdatedAt: time.Now(),
143 RecordURI: recordURI,
144 RecordCID: recordCID,
145 }
146
147 return community, nil
148}
149
150// GetCommunity retrieves a community from AppView DB
151// identifier can be either a DID or handle
152func (s *communityService) GetCommunity(ctx context.Context, identifier string) (*Community, error) {
153 if identifier == "" {
154 return nil, ErrInvalidInput
155 }
156
157 // Determine if identifier is DID or handle
158 if strings.HasPrefix(identifier, "did:") {
159 return s.repo.GetByDID(ctx, identifier)
160 }
161
162 if strings.HasPrefix(identifier, "!") {
163 return s.repo.GetByHandle(ctx, identifier)
164 }
165
166 return nil, NewValidationError("identifier", "must be a DID or handle")
167}
168
169// UpdateCommunity updates a community via write-forward to PDS
170func (s *communityService) UpdateCommunity(ctx context.Context, req UpdateCommunityRequest) (*Community, error) {
171 if req.CommunityDID == "" {
172 return nil, NewValidationError("communityDid", "required")
173 }
174
175 if req.UpdatedByDID == "" {
176 return nil, NewValidationError("updatedByDid", "required")
177 }
178
179 // Get existing community
180 existing, err := s.repo.GetByDID(ctx, req.CommunityDID)
181 if err != nil {
182 return nil, err
183 }
184
185 // Authorization: verify user is the creator
186 // TODO(Communities-Auth): Add moderator check when moderation system is implemented
187 if existing.CreatedByDID != req.UpdatedByDID {
188 return nil, ErrUnauthorized
189 }
190
191 // Build updated profile record (start with existing)
192 profile := map[string]interface{}{
193 "$type": "social.coves.community.profile",
194 "handle": existing.Handle,
195 "name": existing.Name,
196 "owner": existing.OwnerDID,
197 "createdBy": existing.CreatedByDID,
198 "hostedBy": existing.HostedByDID,
199 "createdAt": existing.CreatedAt.Format(time.RFC3339),
200 }
201
202 // Apply updates
203 if req.DisplayName != nil {
204 profile["displayName"] = *req.DisplayName
205 } else {
206 profile["displayName"] = existing.DisplayName
207 }
208
209 if req.Description != nil {
210 profile["description"] = *req.Description
211 } else {
212 profile["description"] = existing.Description
213 }
214
215 if req.Visibility != nil {
216 profile["visibility"] = *req.Visibility
217 } else {
218 profile["visibility"] = existing.Visibility
219 }
220
221 if req.AllowExternalDiscovery != nil {
222 profile["federation"] = map[string]interface{}{
223 "allowExternalDiscovery": *req.AllowExternalDiscovery,
224 }
225 } else {
226 profile["federation"] = map[string]interface{}{
227 "allowExternalDiscovery": existing.AllowExternalDiscovery,
228 }
229 }
230
231 if req.ModerationType != nil {
232 profile["moderationType"] = *req.ModerationType
233 }
234
235 if len(req.ContentWarnings) > 0 {
236 profile["contentWarnings"] = req.ContentWarnings
237 }
238
239 // Preserve counts
240 profile["memberCount"] = existing.MemberCount
241 profile["subscriberCount"] = existing.SubscriberCount
242
243 // Extract rkey from existing record URI (communities live in instance's repo)
244 rkey := extractRKeyFromURI(existing.RecordURI)
245 if rkey == "" {
246 return nil, fmt.Errorf("invalid community record URI: %s", existing.RecordURI)
247 }
248
249 // Write-forward: update record on PDS using INSTANCE DID (communities are stored in instance repo)
250 recordURI, recordCID, err := s.putRecordOnPDS(ctx, s.instanceDID, "social.coves.community.profile", rkey, profile)
251 if err != nil {
252 return nil, fmt.Errorf("failed to update community on PDS: %w", err)
253 }
254
255 // Return updated community representation
256 // Actual AppView DB update happens via Jetstream consumer
257 updated := *existing
258 if req.DisplayName != nil {
259 updated.DisplayName = *req.DisplayName
260 }
261 if req.Description != nil {
262 updated.Description = *req.Description
263 }
264 if req.Visibility != nil {
265 updated.Visibility = *req.Visibility
266 }
267 if req.AllowExternalDiscovery != nil {
268 updated.AllowExternalDiscovery = *req.AllowExternalDiscovery
269 }
270 if req.ModerationType != nil {
271 updated.ModerationType = *req.ModerationType
272 }
273 if len(req.ContentWarnings) > 0 {
274 updated.ContentWarnings = req.ContentWarnings
275 }
276 updated.RecordURI = recordURI
277 updated.RecordCID = recordCID
278 updated.UpdatedAt = time.Now()
279
280 return &updated, nil
281}
282
283// ListCommunities queries AppView DB for communities with filters
284func (s *communityService) ListCommunities(ctx context.Context, req ListCommunitiesRequest) ([]*Community, int, error) {
285 // Set defaults
286 if req.Limit <= 0 || req.Limit > 100 {
287 req.Limit = 50
288 }
289
290 return s.repo.List(ctx, req)
291}
292
293// SearchCommunities performs fuzzy search in AppView DB
294func (s *communityService) SearchCommunities(ctx context.Context, req SearchCommunitiesRequest) ([]*Community, int, error) {
295 if req.Query == "" {
296 return nil, 0, NewValidationError("query", "search query is required")
297 }
298
299 // Set defaults
300 if req.Limit <= 0 || req.Limit > 100 {
301 req.Limit = 50
302 }
303
304 return s.repo.Search(ctx, req)
305}
306
307// SubscribeToCommunity creates a subscription via write-forward to PDS
308func (s *communityService) SubscribeToCommunity(ctx context.Context, userDID, communityIdentifier string) (*Subscription, error) {
309 if userDID == "" {
310 return nil, NewValidationError("userDid", "required")
311 }
312
313 // Resolve community identifier to DID
314 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
315 if err != nil {
316 return nil, err
317 }
318
319 // Verify community exists
320 community, err := s.repo.GetByDID(ctx, communityDID)
321 if err != nil {
322 return nil, err
323 }
324
325 // Check visibility - can't subscribe to private communities without invitation (TODO)
326 if community.Visibility == "private" {
327 return nil, ErrUnauthorized
328 }
329
330 // Build subscription record
331 subRecord := map[string]interface{}{
332 "$type": "social.coves.community.subscribe",
333 "community": communityDID,
334 }
335
336 // Write-forward: create subscription record in user's repo
337 recordURI, recordCID, err := s.createRecordOnPDS(ctx, userDID, "social.coves.community.subscribe", "", subRecord)
338 if err != nil {
339 return nil, fmt.Errorf("failed to create subscription on PDS: %w", err)
340 }
341
342 // Return subscription representation
343 subscription := &Subscription{
344 UserDID: userDID,
345 CommunityDID: communityDID,
346 SubscribedAt: time.Now(),
347 RecordURI: recordURI,
348 RecordCID: recordCID,
349 }
350
351 return subscription, nil
352}
353
354// UnsubscribeFromCommunity removes a subscription via PDS delete
355func (s *communityService) UnsubscribeFromCommunity(ctx context.Context, userDID, communityIdentifier string) error {
356 if userDID == "" {
357 return NewValidationError("userDid", "required")
358 }
359
360 // Resolve community identifier
361 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
362 if err != nil {
363 return err
364 }
365
366 // Get the subscription from AppView to find the record key
367 subscription, err := s.repo.GetSubscription(ctx, userDID, communityDID)
368 if err != nil {
369 return err
370 }
371
372 // Extract rkey from record URI (at://did/collection/rkey)
373 rkey := extractRKeyFromURI(subscription.RecordURI)
374 if rkey == "" {
375 return fmt.Errorf("invalid subscription record URI")
376 }
377
378 // Write-forward: delete record from PDS
379 if err := s.deleteRecordOnPDS(ctx, userDID, "social.coves.community.subscribe", rkey); err != nil {
380 return fmt.Errorf("failed to delete subscription on PDS: %w", err)
381 }
382
383 return nil
384}
385
386// GetUserSubscriptions queries AppView DB for user's subscriptions
387func (s *communityService) GetUserSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*Subscription, error) {
388 if limit <= 0 || limit > 100 {
389 limit = 50
390 }
391
392 return s.repo.ListSubscriptions(ctx, userDID, limit, offset)
393}
394
395// GetCommunitySubscribers queries AppView DB for community subscribers
396func (s *communityService) GetCommunitySubscribers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Subscription, error) {
397 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
398 if err != nil {
399 return nil, err
400 }
401
402 if limit <= 0 || limit > 100 {
403 limit = 50
404 }
405
406 return s.repo.ListSubscribers(ctx, communityDID, limit, offset)
407}
408
409// GetMembership retrieves membership info from AppView DB
410func (s *communityService) GetMembership(ctx context.Context, userDID, communityIdentifier string) (*Membership, error) {
411 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
412 if err != nil {
413 return nil, err
414 }
415
416 return s.repo.GetMembership(ctx, userDID, communityDID)
417}
418
419// ListCommunityMembers queries AppView DB for members
420func (s *communityService) ListCommunityMembers(ctx context.Context, communityIdentifier string, limit, offset int) ([]*Membership, error) {
421 communityDID, err := s.ResolveCommunityIdentifier(ctx, communityIdentifier)
422 if err != nil {
423 return nil, err
424 }
425
426 if limit <= 0 || limit > 100 {
427 limit = 50
428 }
429
430 return s.repo.ListMembers(ctx, communityDID, limit, offset)
431}
432
433// ValidateHandle checks if a community handle is valid
434func (s *communityService) ValidateHandle(handle string) error {
435 if handle == "" {
436 return NewValidationError("handle", "required")
437 }
438
439 if !communityHandleRegex.MatchString(handle) {
440 return ErrInvalidHandle
441 }
442
443 return nil
444}
445
446// ResolveCommunityIdentifier converts a handle or DID to a DID
447func (s *communityService) ResolveCommunityIdentifier(ctx context.Context, identifier string) (string, error) {
448 if identifier == "" {
449 return "", ErrInvalidInput
450 }
451
452 // If it's already a DID, return it
453 if strings.HasPrefix(identifier, "did:") {
454 return identifier, nil
455 }
456
457 // If it's a handle, look it up in AppView DB
458 if strings.HasPrefix(identifier, "!") {
459 community, err := s.repo.GetByHandle(ctx, identifier)
460 if err != nil {
461 return "", err
462 }
463 return community.DID, nil
464 }
465
466 return "", NewValidationError("identifier", "must be a DID or handle")
467}
468
469// Validation helpers
470
471func (s *communityService) validateCreateRequest(req CreateCommunityRequest) error {
472 if req.Name == "" {
473 return NewValidationError("name", "required")
474 }
475
476 if len(req.Name) > 64 {
477 return NewValidationError("name", "must be 64 characters or less")
478 }
479
480 // Name can only contain alphanumeric and hyphens
481 nameRegex := regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9-]{0,62}[a-zA-Z0-9])?$`)
482 if !nameRegex.MatchString(req.Name) {
483 return NewValidationError("name", "must contain only alphanumeric characters and hyphens")
484 }
485
486 if req.Description != "" && len(req.Description) > 3000 {
487 return NewValidationError("description", "must be 3000 characters or less")
488 }
489
490 // Visibility should already be set with default in CreateCommunity
491 if req.Visibility != "public" && req.Visibility != "unlisted" && req.Visibility != "private" {
492 return ErrInvalidVisibility
493 }
494
495 if req.CreatedByDID == "" {
496 return NewValidationError("createdByDid", "required")
497 }
498
499 if req.HostedByDID == "" {
500 return NewValidationError("hostedByDid", "required")
501 }
502
503 return nil
504}
505
506// PDS write-forward helpers
507
508func (s *communityService) createRecordOnPDS(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}) (string, string, error) {
509 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/"))
510
511 payload := map[string]interface{}{
512 "repo": repoDID,
513 "collection": collection,
514 "record": record,
515 }
516
517 if rkey != "" {
518 payload["rkey"] = rkey
519 }
520
521 return s.callPDS(ctx, "POST", endpoint, payload)
522}
523
524func (s *communityService) putRecordOnPDS(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}) (string, string, error) {
525 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", strings.TrimSuffix(s.pdsURL, "/"))
526
527 payload := map[string]interface{}{
528 "repo": repoDID,
529 "collection": collection,
530 "rkey": rkey,
531 "record": record,
532 }
533
534 return s.callPDS(ctx, "POST", endpoint, payload)
535}
536
537func (s *communityService) deleteRecordOnPDS(ctx context.Context, repoDID, collection, rkey string) error {
538 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/"))
539
540 payload := map[string]interface{}{
541 "repo": repoDID,
542 "collection": collection,
543 "rkey": rkey,
544 }
545
546 _, _, err := s.callPDS(ctx, "POST", endpoint, payload)
547 return err
548}
549
550func (s *communityService) callPDS(ctx context.Context, method, endpoint string, payload map[string]interface{}) (string, string, error) {
551 jsonData, err := json.Marshal(payload)
552 if err != nil {
553 return "", "", fmt.Errorf("failed to marshal payload: %w", err)
554 }
555
556 req, err := http.NewRequestWithContext(ctx, method, endpoint, bytes.NewBuffer(jsonData))
557 if err != nil {
558 return "", "", fmt.Errorf("failed to create request: %w", err)
559 }
560 req.Header.Set("Content-Type", "application/json")
561
562 // Add authentication if we have an access token
563 if s.pdsAccessToken != "" {
564 req.Header.Set("Authorization", "Bearer "+s.pdsAccessToken)
565 }
566
567 client := &http.Client{Timeout: 10 * time.Second}
568 resp, err := client.Do(req)
569 if err != nil {
570 return "", "", fmt.Errorf("failed to call PDS: %w", err)
571 }
572 defer resp.Body.Close()
573
574 body, err := io.ReadAll(resp.Body)
575 if err != nil {
576 return "", "", fmt.Errorf("failed to read response: %w", err)
577 }
578
579 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
580 return "", "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
581 }
582
583 // Parse response to extract URI and CID
584 var result struct {
585 URI string `json:"uri"`
586 CID string `json:"cid"`
587 }
588 if err := json.Unmarshal(body, &result); err != nil {
589 // For delete operations, there might not be a response body
590 if method == "POST" && strings.Contains(endpoint, "deleteRecord") {
591 return "", "", nil
592 }
593 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
594 }
595
596 return result.URI, result.CID, nil
597}
598
599// Helper functions
600
601func extractDomain(didOrURL string) string {
602 // For did:web:example.com -> example.com
603 if strings.HasPrefix(didOrURL, "did:web:") {
604 parts := strings.Split(didOrURL, ":")
605 if len(parts) >= 3 {
606 return parts[2]
607 }
608 }
609
610 // For URLs, extract domain
611 if strings.Contains(didOrURL, "://") {
612 parts := strings.Split(didOrURL, "://")
613 if len(parts) >= 2 {
614 domain := strings.Split(parts[1], "/")[0]
615 domain = strings.Split(domain, ":")[0] // Remove port
616 return domain
617 }
618 }
619
620 return ""
621}
622
623func extractRKeyFromURI(uri string) string {
624 // at://did/collection/rkey -> rkey
625 parts := strings.Split(uri, "/")
626 if len(parts) >= 4 {
627 return parts[len(parts)-1]
628 }
629 return ""
630}