A community based topic aggregation platform built on atproto

feat(communities): Wire up Communities service in main server

- Initialize DID generator with PLC directory config
- Create Communities service with PDS connection
- Authenticate instance DID with PDS for write-forward
- Register XRPC HTTP routes
- Add graceful handling for PDS auth failures

Environment variables:
- IS_DEV_ENV: Enable dev mode (mock DID generation)
- PLC_DIRECTORY_URL: PLC directory endpoint
- PDS_URL: Personal Data Server URL
- PDS_INSTANCE_HANDLE: Instance handle for auth
- PDS_INSTANCE_PASSWORD: Instance password for auth

Changed files
+119 -1
cmd
server
+21
.env.dev
···
APPVIEW_PUBLIC_URL=http://127.0.0.1:8081
# =============================================================================
# Development Settings
# =============================================================================
# Environment
ENV=development
NODE_ENV=development
# Logging
LOG_LEVEL=debug
LOG_ENABLED=true
# =============================================================================
# Notes
···
APPVIEW_PUBLIC_URL=http://127.0.0.1:8081
# =============================================================================
+
# Coves Instance PDS Authentication
+
# =============================================================================
+
# The Coves instance needs a PDS account to write community records
+
# Create this account once: curl -X POST http://localhost:3001/xrpc/com.atproto.server.createAccount
+
PDS_INSTANCE_HANDLE=testuser123.local.coves.dev
+
PDS_INSTANCE_PASSWORD=test-password-123
+
+
# =============================================================================
# Development Settings
# =============================================================================
# Environment
ENV=development
NODE_ENV=development
+
IS_DEV_ENV=true
# Logging
LOG_LEVEL=debug
LOG_ENABLED=true
+
+
# =============================================================================
+
# PLC Directory Configuration
+
# =============================================================================
+
# URL for PLC (Public Ledger of Credentials) directory
+
# Only used when IS_DEV_ENV=false (production)
+
#
+
# When IS_DEV_ENV=true: Generate did:plc:xxx locally WITHOUT registering (no PLC needed)
+
# When IS_DEV_ENV=false: Generate did:plc:xxx AND register with PLC_DIRECTORY_URL
+
#
+
# Production: https://plc.directory (currently Bluesky's, will transfer to third party)
+
PLC_DIRECTORY_URL=https://plc.directory
# =============================================================================
# Notes
+98 -1
cmd/server/main.go
···
package main
import (
"context"
"database/sql"
"fmt"
"log"
"net/http"
"os"
···
"Coves/internal/api/handlers/oauth"
"Coves/internal/api/middleware"
"Coves/internal/api/routes"
"Coves/internal/atproto/identity"
"Coves/internal/atproto/jetstream"
oauthCore "Coves/internal/core/oauth"
"Coves/internal/core/users"
postgresRepo "Coves/internal/db/postgres"
···
userRepo := postgresRepo.NewUserRepository(db)
userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
// Start Jetstream consumer for read-forward user indexing
jetstreamURL := os.Getenv("JETSTREAM_URL")
if jetstreamURL == "" {
···
}
}()
-
log.Printf("Started Jetstream consumer: %s", jetstreamURL)
// Start OAuth cleanup background job
go func() {
···
// Register XRPC routes
routes.RegisterUserRoutes(r, userService)
r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
···
fmt.Printf("Default PDS: %s\n", defaultPDS)
log.Fatal(http.ListenAndServe(":"+port, r))
}
···
package main
import (
+
"bytes"
"context"
"database/sql"
+
"encoding/json"
"fmt"
+
"io"
"log"
"net/http"
"os"
···
"Coves/internal/api/handlers/oauth"
"Coves/internal/api/middleware"
"Coves/internal/api/routes"
+
"Coves/internal/atproto/did"
"Coves/internal/atproto/identity"
"Coves/internal/atproto/jetstream"
+
"Coves/internal/core/communities"
oauthCore "Coves/internal/core/oauth"
"Coves/internal/core/users"
postgresRepo "Coves/internal/db/postgres"
···
userRepo := postgresRepo.NewUserRepository(db)
userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
+
communityRepo := postgresRepo.NewCommunityRepository(db)
+
+
// Initialize DID generator for communities
+
// IS_DEV_ENV=true: Generate did:plc:xxx without registering to PLC directory
+
// IS_DEV_ENV=false: Generate did:plc:xxx and register with PLC_DIRECTORY_URL
+
isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
+
plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
+
if plcDirectoryURL == "" {
+
plcDirectoryURL = "https://plc.directory" // Default to Bluesky's PLC
+
}
+
didGenerator := did.NewGenerator(isDevEnv, plcDirectoryURL)
+
log.Printf("DID generator initialized (dev_mode=%v, plc_url=%s)", isDevEnv, plcDirectoryURL)
+
+
instanceDID := os.Getenv("INSTANCE_DID")
+
if instanceDID == "" {
+
instanceDID = "did:web:coves.local" // Default for development
+
}
+
communityService := communities.NewCommunityService(communityRepo, didGenerator, defaultPDS, instanceDID)
+
+
// Authenticate Coves instance with PDS to enable community record writes
+
// The instance needs a PDS account to write community records it owns
+
pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
+
pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
+
if pdsHandle != "" && pdsPassword != "" {
+
log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
+
accessToken, err := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
+
if err != nil {
+
log.Printf("Warning: Failed to authenticate with PDS: %v", err)
+
log.Println("Community creation will fail until PDS authentication is configured")
+
} else {
+
if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
+
svc.SetPDSAccessToken(accessToken)
+
log.Println("✓ Coves instance authenticated with PDS")
+
}
+
}
+
} else {
+
log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
+
log.Println("Community creation via write-forward is disabled")
+
}
+
// Start Jetstream consumer for read-forward user indexing
jetstreamURL := os.Getenv("JETSTREAM_URL")
if jetstreamURL == "" {
···
}
}()
+
log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
+
+
// Note: Community indexing happens through the same Jetstream firehose
+
// The CommunityEventConsumer is used by handlers when processing community-related events
+
// For now, community records are created via write-forward to PDS, then indexed when
+
// they appear in the firehose. A dedicated consumer can be added later if needed.
+
log.Println("Community event consumer initialized (processes events from firehose)")
// Start OAuth cleanup background job
go func() {
···
// Register XRPC routes
routes.RegisterUserRoutes(r, userService)
+
routes.RegisterCommunityRoutes(r, communityService)
+
log.Println("Community XRPC endpoints registered")
r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
···
fmt.Printf("Default PDS: %s\n", defaultPDS)
log.Fatal(http.ListenAndServe(":"+port, r))
}
+
+
// authenticateWithPDS creates a session on the PDS and returns an access token
+
func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
+
type CreateSessionRequest struct {
+
Identifier string `json:"identifier"`
+
Password string `json:"password"`
+
}
+
+
type CreateSessionResponse struct {
+
DID string `json:"did"`
+
Handle string `json:"handle"`
+
AccessJwt string `json:"accessJwt"`
+
}
+
+
reqBody, err := json.Marshal(CreateSessionRequest{
+
Identifier: handle,
+
Password: password,
+
})
+
if err != nil {
+
return "", fmt.Errorf("failed to marshal request: %w", err)
+
}
+
+
resp, err := http.Post(
+
pdsURL+"/xrpc/com.atproto.server.createSession",
+
"application/json",
+
bytes.NewReader(reqBody),
+
)
+
if err != nil {
+
return "", fmt.Errorf("failed to call PDS: %w", err)
+
}
+
defer resp.Body.Close()
+
+
if resp.StatusCode != http.StatusOK {
+
body, _ := io.ReadAll(resp.Body)
+
return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
+
}
+
+
var session CreateSessionResponse
+
if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
+
return "", fmt.Errorf("failed to decode response: %w", err)
+
}
+
+
return session.AccessJwt, nil
+
}