A community based topic aggregation platform built on atproto

refactor: Simplify architecture - remove repository/CAR storage code

Major cleanup now that PDS handles all repository operations:

**Removed:**
- internal/core/repository/ - Repository domain logic (PDS handles this)
- internal/db/postgres/repository_repo.go - Repository database operations
- internal/api/handlers/repository_handler.go - Repository API handlers
- internal/api/routes/repository.go - Repository routes
- tests/integration/repository_test.go - Repository integration tests
- Migrations 002, 003, 004 - Repository/CAR storage tables
- internal/db/local_dev_db_compose/ - Separate dev database setup
- internal/db/test_db_compose/ - Separate test database setup

**Unified:**
- docker-compose.dev.yml now includes PostgreSQL + PDS + optional test DB
- All database management moved to Makefile commands
- Consistent use of .env.dev variables throughout

**Updated:**
- cmd/server/main.go - Simplified to only use user service
- Makefile - All-in-one commands (dev-up starts both PostgreSQL + PDS)
- Added db-migrate, db-reset, test commands using Docker profiles

**Architecture:**
- PDS: Self-contained with SQLite + CAR files (port 3001)
- PostgreSQL: Only for Coves AppView indexing (port 5433)
- Test DB: Available via --profile test (port 5434)
- Single source of truth: docker-compose.dev.yml + .env.dev

Commands:
- `make dev-up` - Start PostgreSQL + PDS
- `make test` - Start test DB + run tests
- `make db-migrate` - Run migrations
- `make db-shell` - Open psql shell

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+48 -38
Makefile
···
/^##@/ { printf "\n$(YELLOW)%s$(RESET)\n", substr($$0, 5) }' $(MAKEFILE_LIST)
@echo ""
-
##@ Local Development (atProto Stack)
-
dev-up: ## Start PDS for local development
@echo "$(GREEN)Starting Coves development stack...$(RESET)"
-
@echo "$(YELLOW)Note: Make sure PostgreSQL is running on port 5433$(RESET)"
-
@echo "Run 'make dev-db-up' if database is not running"
-
@docker-compose -f docker-compose.dev.yml --env-file .env.dev up -d pds
@echo ""
@echo "$(GREEN)✓ Development stack started!$(RESET)"
@echo ""
@echo "Services available at:"
@echo " - PDS (XRPC): http://localhost:3001"
@echo " - PDS Firehose (WS): ws://localhost:3001/xrpc/com.atproto.sync.subscribeRepos"
@echo " - AppView (API): http://localhost:8081 (when uncommented)"
@echo ""
@echo "Run 'make dev-logs' to view logs"
-
dev-down: ## Stop the atProto development stack
@echo "$(YELLOW)Stopping Coves development stack...$(RESET)"
-
@docker-compose -f docker-compose.dev.yml down
@echo "$(GREEN)✓ Development stack stopped$(RESET)"
dev-logs: ## Tail logs from all development services
-
@docker-compose -f docker-compose.dev.yml logs -f
dev-status: ## Show status of all development containers
@echo "$(CYAN)Development Stack Status:$(RESET)"
-
@docker-compose -f docker-compose.dev.yml ps
-
@echo ""
-
@echo "$(CYAN)Database Status:$(RESET)"
-
@cd internal/db/local_dev_db_compose && docker-compose ps
dev-reset: ## Nuclear option - stop everything and remove all volumes
-
@echo "$(YELLOW)⚠️ WARNING: This will delete all PDS data and volumes!$(RESET)"
@read -p "Are you sure? (y/N): " confirm && [ "$$confirm" = "y" ] || exit 1
@echo "$(YELLOW)Stopping and removing containers and volumes...$(RESET)"
-
@docker-compose -f docker-compose.dev.yml down -v
@echo "$(GREEN)✓ Reset complete - all data removed$(RESET)"
@echo "Run 'make dev-up' to start fresh"
##@ Database Management
-
dev-db-up: ## Start local PostgreSQL database (port 5433)
-
@echo "$(GREEN)Starting local PostgreSQL database...$(RESET)"
-
@cd internal/db/local_dev_db_compose && docker-compose up -d
-
@echo "$(GREEN)✓ Database started on port 5433$(RESET)"
-
@echo "Connection: postgresql://dev_user:dev_password@localhost:5433/coves_dev"
-
dev-db-down: ## Stop local PostgreSQL database
-
@echo "$(YELLOW)Stopping local PostgreSQL database...$(RESET)"
-
@cd internal/db/local_dev_db_compose && docker-compose down
-
@echo "$(GREEN)✓ Database stopped$(RESET)"
-
dev-db-reset: ## Reset database (delete all data and restart)
@echo "$(YELLOW)⚠️ WARNING: This will delete all database data!$(RESET)"
@read -p "Are you sure? (y/N): " confirm && [ "$$confirm" = "y" ] || exit 1
@echo "$(YELLOW)Resetting database...$(RESET)"
-
@cd internal/db/local_dev_db_compose && docker-compose down -v
-
@cd internal/db/local_dev_db_compose && docker-compose up -d
@echo "$(GREEN)✓ Database reset complete$(RESET)"
##@ Testing
test: ## Run all tests with test database
@echo "$(GREEN)Starting test database...$(RESET)"
-
@cd internal/db/test_db_compose && ./start-test-db.sh
@echo "$(GREEN)Running tests...$(RESET)"
-
@./run-tests.sh
@echo "$(GREEN)✓ Tests complete$(RESET)"
test-db-reset: ## Reset test database
@echo "$(GREEN)Resetting test database...$(RESET)"
-
@cd internal/db/test_db_compose && ./reset-test-db.sh
@echo "$(GREEN)✓ Test database reset$(RESET)"
##@ Build & Run
···
@echo "$(YELLOW)⚠️ WARNING: This will remove ALL Docker volumes!$(RESET)"
@read -p "Are you sure? (y/N): " confirm && [ "$$confirm" = "y" ] || exit 1
@make dev-reset
-
@make dev-db-reset
@echo "$(GREEN)✓ All clean$(RESET)"
##@ Workflows (Common Tasks)
-
fresh-start: ## Complete fresh start (reset DB, reset stack, start everything)
@echo "$(CYAN)Starting fresh development environment...$(RESET)"
-
@make dev-db-reset
@make dev-reset || true
@sleep 2
-
@make dev-db-up
-
@sleep 2
@make dev-up
@echo ""
@echo "$(GREEN)✓ Fresh environment ready!$(RESET)"
@make dev-status
···
@echo "$(GREEN)Validating Lexicon schemas...$(RESET)"
@./validate-lexicon
@echo "$(GREEN)✓ Lexicon validation complete$(RESET)"
-
-
db-shell: ## Open PostgreSQL shell for local database
-
@echo "$(CYAN)Connecting to local database...$(RESET)"
-
@PGPASSWORD=dev_password psql -h localhost -p 5433 -U dev_user -d coves_dev
##@ Documentation
···
/^##@/ { printf "\n$(YELLOW)%s$(RESET)\n", substr($$0, 5) }' $(MAKEFILE_LIST)
@echo ""
+
##@ Local Development (All-in-One)
+
dev-up: ## Start PDS + PostgreSQL for local development
@echo "$(GREEN)Starting Coves development stack...$(RESET)"
+
@docker-compose -f docker-compose.dev.yml --env-file .env.dev up -d postgres pds
@echo ""
@echo "$(GREEN)✓ Development stack started!$(RESET)"
@echo ""
@echo "Services available at:"
+
@echo " - PostgreSQL: localhost:5433"
@echo " - PDS (XRPC): http://localhost:3001"
@echo " - PDS Firehose (WS): ws://localhost:3001/xrpc/com.atproto.sync.subscribeRepos"
@echo " - AppView (API): http://localhost:8081 (when uncommented)"
@echo ""
@echo "Run 'make dev-logs' to view logs"
+
dev-down: ## Stop all development services
@echo "$(YELLOW)Stopping Coves development stack...$(RESET)"
+
@docker-compose -f docker-compose.dev.yml --env-file .env.dev down
@echo "$(GREEN)✓ Development stack stopped$(RESET)"
dev-logs: ## Tail logs from all development services
+
@docker-compose -f docker-compose.dev.yml --env-file .env.dev logs -f
dev-status: ## Show status of all development containers
@echo "$(CYAN)Development Stack Status:$(RESET)"
+
@docker-compose -f docker-compose.dev.yml --env-file .env.dev ps
dev-reset: ## Nuclear option - stop everything and remove all volumes
+
@echo "$(YELLOW)⚠️ WARNING: This will delete ALL data (PostgreSQL + PDS)!$(RESET)"
@read -p "Are you sure? (y/N): " confirm && [ "$$confirm" = "y" ] || exit 1
@echo "$(YELLOW)Stopping and removing containers and volumes...$(RESET)"
+
@docker-compose -f docker-compose.dev.yml --env-file .env.dev down -v
@echo "$(GREEN)✓ Reset complete - all data removed$(RESET)"
@echo "Run 'make dev-up' to start fresh"
##@ Database Management
+
db-shell: ## Open PostgreSQL shell for development database
+
@echo "$(CYAN)Connecting to development database...$(RESET)"
+
@docker exec -it coves-dev-postgres psql -U dev_user -d coves_dev
+
db-migrate: ## Run database migrations
+
@echo "$(GREEN)Running database migrations...$(RESET)"
+
@goose -dir internal/db/migrations postgres "postgresql://dev_user:dev_password@localhost:5433/coves_dev?sslmode=disable" up
+
@echo "$(GREEN)✓ Migrations complete$(RESET)"
+
db-migrate-down: ## Rollback last migration
+
@echo "$(YELLOW)Rolling back last migration...$(RESET)"
+
@goose -dir internal/db/migrations postgres "postgresql://dev_user:dev_password@localhost:5433/coves_dev?sslmode=disable" down
+
@echo "$(GREEN)✓ Rollback complete$(RESET)"
+
+
db-reset: ## Reset database (delete all data and re-run migrations)
@echo "$(YELLOW)⚠️ WARNING: This will delete all database data!$(RESET)"
@read -p "Are you sure? (y/N): " confirm && [ "$$confirm" = "y" ] || exit 1
@echo "$(YELLOW)Resetting database...$(RESET)"
+
@docker-compose -f docker-compose.dev.yml --env-file .env.dev rm -sf postgres
+
@docker volume rm coves-dev-postgres-data || true
+
@docker-compose -f docker-compose.dev.yml --env-file .env.dev up -d postgres
+
@echo "Waiting for PostgreSQL to be ready..."
+
@sleep 3
+
@make db-migrate
@echo "$(GREEN)✓ Database reset complete$(RESET)"
##@ Testing
test: ## Run all tests with test database
@echo "$(GREEN)Starting test database...$(RESET)"
+
@docker-compose -f docker-compose.dev.yml --env-file .env.dev --profile test up -d postgres-test
+
@echo "Waiting for test database to be ready..."
+
@sleep 3
+
@echo "$(GREEN)Running migrations on test database...$(RESET)"
+
@goose -dir internal/db/migrations postgres "postgresql://test_user:test_password@localhost:5434/coves_test?sslmode=disable" up || true
@echo "$(GREEN)Running tests...$(RESET)"
+
@TEST_DATABASE_URL="postgresql://test_user:test_password@localhost:5434/coves_test?sslmode=disable" go test ./... -v
@echo "$(GREEN)✓ Tests complete$(RESET)"
test-db-reset: ## Reset test database
@echo "$(GREEN)Resetting test database...$(RESET)"
+
@docker-compose -f docker-compose.dev.yml --env-file .env.dev --profile test rm -sf postgres-test
+
@docker volume rm coves-test-postgres-data || true
+
@docker-compose -f docker-compose.dev.yml --env-file .env.dev --profile test up -d postgres-test
+
@echo "Waiting for PostgreSQL to be ready..."
+
@sleep 3
+
@goose -dir internal/db/migrations postgres "postgresql://test_user:test_password@localhost:5434/coves_test?sslmode=disable" up || true
@echo "$(GREEN)✓ Test database reset$(RESET)"
+
+
test-db-stop: ## Stop test database
+
@docker-compose -f docker-compose.dev.yml --env-file .env.dev --profile test stop postgres-test
+
@echo "$(GREEN)✓ Test database stopped$(RESET)"
##@ Build & Run
···
@echo "$(YELLOW)⚠️ WARNING: This will remove ALL Docker volumes!$(RESET)"
@read -p "Are you sure? (y/N): " confirm && [ "$$confirm" = "y" ] || exit 1
@make dev-reset
@echo "$(GREEN)✓ All clean$(RESET)"
##@ Workflows (Common Tasks)
+
fresh-start: ## Complete fresh start (reset everything, start clean)
@echo "$(CYAN)Starting fresh development environment...$(RESET)"
@make dev-reset || true
@sleep 2
@make dev-up
+
@sleep 3
+
@make db-migrate
@echo ""
@echo "$(GREEN)✓ Fresh environment ready!$(RESET)"
@make dev-status
···
@echo "$(GREEN)Validating Lexicon schemas...$(RESET)"
@./validate-lexicon
@echo "$(GREEN)✓ Lexicon validation complete$(RESET)"
##@ Documentation
+342
PRD.md
···
···
+
# Coves Product Requirements Document (PRD)
+
## Digital Community Infrastructure Platform
+
+
---
+
+
## 1. Executive Summary
+
+
**Vision**: A federated forum platform that ensures a solid framework for digital community building and meaningful information discussion.
+
+
**Core Values**:
+
- User sovereignty through data portability
+
- Community autonomy and self-governance
+
- Cross-platform interoperability
+
+
---
+
+
## 2. MVP Scope Definition
+
+
### Phase 1: Core Forum Platform (Web)
+
+
#### Must Have:
+
- **Indigo PDS Integration** - Use existing atProto infrastructure (no CAR file reimplementation!)
+
- User registration with phone verification (verified badge)
+
- Community creation, subscription, and discovery
+
- Post creation (text initially, then image/video/article)
+
- Threaded comments with upvoting
+
- Three feed types:
+
- **Home**: Personalized based on subscriptions with 1-5 visibility slider per community
+
- **All**: Global content discovery
+
- **Community**: Standard forum sorting (hot/new/top)
+
- Read state tracking (hide read posts, history tab)
+
- Content tagging system (helpful/spam/hostile)
+
- User blocking capabilities
+
+
#### Defer to Post-MVP:
+
- Community wikis
+
- Advanced reputation system
+
- Sortition/tribunal moderation
+
- Edit history tracking
+
- Plugin/bot system
+
+
### Phase 2: Federation (MVP+)
+
- Bluesky post display (read-only via Indigo firehose)
+
- ActivityPub bidirectional support
+
- Federation source indicators
+
+
### Phase 3: Mobile Apps (Separate Repo)
+
- iOS/Android native apps
+
- TikTok-style horizontal scrolling feed
+
- Offline-first architecture
+
- Push notifications
+
+
---
+
+
## 3. Technical Architecture
+
+
### Backend Stack:
+
- **PDS Layer**: Indigo PDS implementation (handles atProto, CAR files, firehose)
+
- **Application Layer**: Go services for Coves-specific features
+
- **Databases**:
+
- Indigo PDS's PostgreSQL (user repos, DID management)
+
- Separate PostgreSQL for AppView (queries, read states, community data)
+
- **APIs**: XRPC protocol (leverage Indigo's implementation)
+
- **Identity**: atProto DIDs with phone verification
+
+
### Frontend Stack:
+
- **Web**: React/Next.js
+
- **Mobile**: React Native or Flutter (separate repository)
+
- **State Management**: Zustand/Redux with offline sync
+
+
---
+
+
## 4. Development Roadmap
+
+
### Week 1-2: Foundation
+
- Deploy Indigo PDS instance
+
- Set up AppView database
+
- Create core Lexicon schemas (community, post, comment)
+
- Implement phone verification flow
+
+
### Week 3-4: User & Identity
+
- User registration/login via Indigo PDS
+
- Profile management
+
- DID resolution and handle system
+
- Basic user settings
+
+
### Week 5-6: Communities
+
- Community CRUD operations
+
- Subscription management
+
- Community discovery page
+
- Basic moderation tools
+
+
### Week 7-8: Content Creation
+
- Post creation (text, then multimedia)
+
- Comment threads
+
- Upvoting system
+
- Content tagging
+
+
### Week 9-10: Feed System
+
- Home feed with subscription slider
+
- Community feeds with sorting
+
- All/global feed
+
- Read state tracking and history
+
+
### Week 11-12: Polish & Testing
+
- Search functionality
+
- User blocking
+
- Performance optimization
+
- Security audit
+
- Alpha deployment
+
+
### Post-MVP: Federation
+
- Subscribe to Indigo firehose for Bluesky content
+
- ActivityPub adapter service
+
- Cross-platform user mapping
+
+
---
+
+
## 5. Mobile Application Strategy
+
+
**Approach**: Separate repository, API-first design
+
+
### MVP Features:
+
- Horizontal swipe navigation between posts
+
- Offline caching with background sync
+
- Push notifications for interactions
+
- Native performance optimization
+
+
**Timeline**: Begin after web MVP validation (Month 4-6)
+
+
---
+
+
## 6. Key Technical Decisions
+
+
### Leveraging Indigo:
+
- Use Indigo PDS for all atProto infrastructure
+
- Subscribe to Indigo firehose instead of building our own
+
- Focus engineering on Coves-specific features
+
+
### Data Architecture:
+
- User content → Indigo PDS (portable)
+
- Read states/analytics → AppView (non-portable, privacy-focused)
+
- No ads, tracking, or user data monetization
+
+
### Open Source:
+
- MIT license
+
- Public repository
+
- Community contributions welcome
+
+
---
+
+
## 7. Success Metrics
+
+
### MVP Launch (3 months):
+
- 50+ active communities
+
- 500+ registered users
+
- <2 second page loads
+
- 99.5% uptime
+
- Successful Bluesky content display
+
+
### 6 Month Goals:
+
- 500+ communities
+
- 5,000+ MAU
+
- Mobile app launched
+
- Federation with 3+ platforms
+
+
---
+
+
## 8. Resource Requirements
+
+
### Team:
+
- 2 Backend Engineers (Go, atProto experience)
+
- 1 Frontend Engineer (React)
+
- 1 DevOps (part-time, Indigo deployment)
+
- 1 Mobile Developer (post-MVP)
+
+
### Infrastructure:
+
- Indigo PDS instance: ~$200/month
+
- AppView database: ~$100/month
+
- CDN/Storage: ~$100/month
+
- **Total**: ~$400-500/month initially
+
+
---
+
+
## 9. Risk Mitigation
+
+
| Risk | Mitigation |
+
|------|------------|
+
| Indigo PDS complexity | Start with vanilla deployment, customize gradually |
+
| Moderation at scale | Launch with simple tagging, evolve based on community needs |
+
| Federation conflicts | Phased rollout, Bluesky read-only first |
+
| Mobile development | Consider PWA if native timeline slips |
+
| Community adoption | Focus on unique features (read states, visibility slider) |
+
+
---
+
+
## 10. Immediate Next Steps
+
+
### Week 1:
+
- Deploy Indigo PDS test instance
+
- Design Coves-specific Lexicon schemas
+
- Set up development environment
+
+
### Week 2:
+
- Implement phone verification service
+
- Create first XRPC endpoints for communities
+
- Begin frontend scaffolding
+
+
### Week 3:
+
- User registration flow
+
- Community creation
+
- Basic post functionality
+
+
---
+
+
## 11. Long-term Vision (Post-MVP)
+
+
- Sortition-based moderation experiments
+
- Community wikis and documentation
+
- Plugin system for community bots
+
- Advanced reputation mechanics
+
- Full ActivityPub federation
+
- Possible Mastodon integration
+
- Community forking via DIDs
+
- Incognito browsing mode
+
+
---
+
+
## 12. Differentiators
+
+
**Why Coves vs Reddit/Lemmy/Discourse?**
+
- True data portability via atProto
+
- Subscription visibility slider (unique feed control)
+
- Read state tracking across devices
+
- Phone verification for trust
+
- Federation-first, not an afterthought
+
- Community-driven moderation models
+
- No corporate ownership or ads
+
+
---
+
+
## Feature Details from Domain Knowledge
+
+
### Feed System Specifications
+
+
#### Home Feed:
+
- **UI**: TikTok-style scrollable feed (horizontal swipe)
+
- **Personalization**:
+
- Subscription slider (1-5 scale per community)
+
- 1 = Only best/most popular content
+
- 5 = Show all content
+
- Read state tracking with history tab
+
- Read history stored in AppView (not user repos)
+
+
#### Community Feed:
+
- Standard sorting: hot, top (day/month/year), new
+
- Respects read state
+
- Community-specific rules application
+
+
### Community Features
+
+
#### Core Capabilities:
+
- Creation and blocking
+
- Wiki maintenance
+
- NSFW toggling
+
- Subscription management
+
+
#### Reputation System:
+
- Gained through posts, comments, positive tags
+
- Affects member access levels
+
- Influences comment ordering
+
- Voting weight based on reputation
+
+
#### Rules System:
+
- Democratic rule voting
+
- Post type restrictions (e.g., text-only)
+
- Website blocklists
+
- Geolocation restrictions
+
+
#### Moderation Models:
+
1. **Sortition-Based**:
+
- Tag-based removal with tribunal review
+
- Minimum reputation for tribunal service
+
+
2. **Traditional**:
+
- Moderator hierarchy
+
- Community override capability
+
- Hybrid approach with user feedback
+
+
### Post System
+
+
#### Types:
+
- Text (MVP)
+
- Video
+
- Image
+
- Article
+
- Microblog (for Bluesky posts)
+
+
#### Features:
+
- Upvoting
+
- Tagging (helpful/spam/hostile)
+
- Share tracking
+
- Comment ownership
+
- Federation source indicators
+
+
### User System
+
+
#### Identity:
+
- Phone verification (grants verified status)
+
- atProto DID system
+
- Platform federation tracking
+
+
#### Username System:
+
- Random generation patterns:
+
- "AdjectiveNoun" (e.g., "BraveEagle")
+
- "AdjectiveAdjectiveNoun" (e.g., "SmallQuietMouse")
+
+
#### Features:
+
- User blocking
+
- Notification muting
+
- Post saving
+
- View history
+
+
### Federation Approach
+
+
#### atProto (Bluesky):
+
- Display posts inline as references
+
- Posts-only initially (comments later)
+
+
#### ActivityPub:
+
- Two-way compatibility
+
- Instance → Hub mapping
+
- Community mapping
+
- User DID assignment
+
- Action translation to Coves lexicon
+
+
---
+
+
*Last Updated: January 2025*
+
*Version: 1.0*
+
+
This PRD focuses on shipping a working MVP in 3 months by leveraging existing Indigo infrastructure, then iterating based on real community feedback.
+2 -29
cmd/server/main.go
···
"github.com/go-chi/chi/v5/middleware"
_ "github.com/lib/pq"
"github.com/pressly/goose/v3"
-
"gorm.io/driver/postgres"
-
"gorm.io/gorm"
"Coves/internal/api/routes"
-
"Coves/internal/atproto/carstore"
-
"Coves/internal/core/repository"
"Coves/internal/core/users"
postgresRepo "Coves/internal/db/postgres"
)
···
r.Use(middleware.Recoverer)
r.Use(middleware.RequestID)
-
// Initialize GORM
-
gormDB, err := gorm.Open(postgres.New(postgres.Config{
-
Conn: db,
-
}), &gorm.Config{
-
DisableForeignKeyConstraintWhenMigrating: true,
-
PrepareStmt: true, // Enable prepared statements for better performance
-
})
-
if err != nil {
-
log.Fatal("Failed to initialize GORM:", err)
-
}
-
// Initialize repositories
userRepo := postgresRepo.NewUserRepository(db)
-
_ = users.NewUserService(userRepo) // TODO: Use when UserRoutes is fixed
-
-
// Initialize carstore for ATProto repository storage
-
carDirs := []string{"./data/carstore"}
-
repoStore, err := carstore.NewRepoStore(gormDB, carDirs)
-
if err != nil {
-
log.Fatal("Failed to initialize repo store:", err)
-
}
-
-
repositoryRepo := postgresRepo.NewRepositoryRepo(db)
-
repositoryService := repository.NewService(repositoryRepo, repoStore)
// Mount routes
-
// TODO: Fix UserRoutes to accept *UserService
-
// r.Mount("/api/users", routes.UserRoutes(userService))
-
r.Mount("/", routes.RepositoryRoutes(repositoryService))
r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
···
"github.com/go-chi/chi/v5/middleware"
_ "github.com/lib/pq"
"github.com/pressly/goose/v3"
"Coves/internal/api/routes"
"Coves/internal/core/users"
postgresRepo "Coves/internal/db/postgres"
)
···
r.Use(middleware.Recoverer)
r.Use(middleware.RequestID)
// Initialize repositories
userRepo := postgresRepo.NewUserRepository(db)
+
userService := users.NewUserService(userRepo)
// Mount routes
+
r.Mount("/api/users", routes.UserRoutes(userService))
r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
+68 -14
docker-compose.dev.yml
···
version: '3.8'
# Coves Local Development Stack
-
# Simple setup: PDS + AppView (no relay needed for local dev)
-
# AppView subscribes directly to PDS firehose at ws://localhost:3001/xrpc/com.atproto.sync.subscribeRepos
-
# Ports configured to avoid conflicts with production PDS on :3000
services:
# Bluesky Personal Data Server (PDS)
# Handles user repositories, DIDs, and CAR files
pds:
···
# IMPORTANT: Allow insecure WebSocket for local PDS (ws:// instead of wss://)
BGS_CRAWL_INSECURE_WS: "true"
-
# Database connection (uses shared PostgreSQL for relay state)
-
DATABASE_URL: postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@host.docker.internal:${POSTGRES_PORT}/${POSTGRES_DB}?sslmode=disable
# Relay will discover PDSs automatically - use admin API to restrict!
# See comments above for how to configure allowlist
···
LOG_LEVEL: ${LOG_LEVEL:-debug}
networks:
- coves-dev
-
extra_hosts:
-
- "host.docker.internal:host-gateway"
depends_on:
pds:
condition: service_healthy
healthcheck:
···
# - "8081:8080" # AppView API (avoiding conflicts)
# environment:
# # Database connection
-
# DATABASE_URL: postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@host.docker.internal:${POSTGRES_PORT}/${POSTGRES_DB}?sslmode=disable
#
# # PDS Firehose subscription (direct, no relay)
# FIREHOSE_URL: ws://pds:3000/xrpc/com.atproto.sync.subscribeRepos
···
# LOG_LEVEL: ${LOG_LEVEL:-debug}
# networks:
# - coves-dev
-
# extra_hosts:
-
# - "host.docker.internal:host-gateway"
# depends_on:
-
# - pds
-
-
# Note: PostgreSQL runs separately via internal/db/local_dev_db_compose/
-
# This stack connects to it via host.docker.internal:5433
networks:
coves-dev:
···
name: coves-dev-network
volumes:
pds-data:
name: coves-dev-pds-data
···
version: '3.8'
# Coves Local Development Stack
+
# All-in-one setup: PDS + PostgreSQL + optional Relay
+
#
+
# Usage:
+
# make dev-up # Start PDS + PostgreSQL
+
# make dev-down # Stop everything
+
# docker-compose up relay # Optional: start with relay
+
#
+
# Profiles:
+
# - default: PDS + PostgreSQL (dev database on port 5433)
+
# - test: PostgreSQL test database (port 5434)
+
# - relay: BigSky relay (optional, will crawl entire network!)
services:
+
# PostgreSQL Database (Port 5433)
+
# Used by Coves AppView for indexing data from firehose
+
postgres:
+
image: postgres:15
+
container_name: coves-dev-postgres
+
ports:
+
- "5433:5432"
+
environment:
+
POSTGRES_DB: ${POSTGRES_DB:-coves_dev}
+
POSTGRES_USER: ${POSTGRES_USER:-dev_user}
+
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-dev_password}
+
volumes:
+
- postgres-data:/var/lib/postgresql/data
+
networks:
+
- coves-dev
+
healthcheck:
+
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-dev_user} -d ${POSTGRES_DB:-coves_dev}"]
+
interval: 5s
+
timeout: 5s
+
retries: 5
+
+
# PostgreSQL Test Database (Port 5434) - Optional
+
# Use with: docker-compose --profile test up postgres-test
+
postgres-test:
+
image: postgres:15
+
container_name: coves-test-postgres
+
ports:
+
- "5434:5432"
+
environment:
+
POSTGRES_DB: coves_test
+
POSTGRES_USER: test_user
+
POSTGRES_PASSWORD: test_password
+
volumes:
+
- postgres-test-data:/var/lib/postgresql/data
+
networks:
+
- coves-dev
+
healthcheck:
+
test: ["CMD-SHELL", "pg_isready -U test_user -d coves_test"]
+
interval: 5s
+
timeout: 5s
+
retries: 5
+
profiles:
+
- test
+
# Bluesky Personal Data Server (PDS)
# Handles user repositories, DIDs, and CAR files
pds:
···
# IMPORTANT: Allow insecure WebSocket for local PDS (ws:// instead of wss://)
BGS_CRAWL_INSECURE_WS: "true"
+
# Database connection (uses PostgreSQL for relay state)
+
DATABASE_URL: postgresql://${POSTGRES_USER:-dev_user}:${POSTGRES_PASSWORD:-dev_password}@postgres:5432/${POSTGRES_DB:-coves_dev}?sslmode=disable
# Relay will discover PDSs automatically - use admin API to restrict!
# See comments above for how to configure allowlist
···
LOG_LEVEL: ${LOG_LEVEL:-debug}
networks:
- coves-dev
depends_on:
+
postgres:
+
condition: service_healthy
pds:
condition: service_healthy
healthcheck:
···
# - "8081:8080" # AppView API (avoiding conflicts)
# environment:
# # Database connection
+
# DATABASE_URL: postgresql://${POSTGRES_USER:-dev_user}:${POSTGRES_PASSWORD:-dev_password}@postgres:5432/${POSTGRES_DB:-coves_dev}?sslmode=disable
#
# # PDS Firehose subscription (direct, no relay)
# FIREHOSE_URL: ws://pds:3000/xrpc/com.atproto.sync.subscribeRepos
···
# LOG_LEVEL: ${LOG_LEVEL:-debug}
# networks:
# - coves-dev
# depends_on:
+
# postgres:
+
# condition: service_healthy
+
# pds:
+
# condition: service_healthy
networks:
coves-dev:
···
name: coves-dev-network
volumes:
+
postgres-data:
+
name: coves-dev-postgres-data
+
postgres-test-data:
+
name: coves-test-postgres-data
pds-data:
name: coves-dev-pds-data
-469
internal/api/handlers/repository_handler.go
···
-
package handlers
-
-
import (
-
"encoding/json"
-
"fmt"
-
"io"
-
"net/http"
-
"strings"
-
-
"Coves/internal/core/repository"
-
"github.com/ipfs/go-cid"
-
cbornode "github.com/ipfs/go-ipld-cbor"
-
)
-
-
// RepositoryHandler handles HTTP requests for repository operations
-
type RepositoryHandler struct {
-
service repository.RepositoryService
-
}
-
-
// NewRepositoryHandler creates a new repository handler
-
func NewRepositoryHandler(service repository.RepositoryService) *RepositoryHandler {
-
return &RepositoryHandler{
-
service: service,
-
}
-
}
-
-
// AT Protocol XRPC request/response types
-
-
// CreateRecordRequest represents a request to create a record
-
type CreateRecordRequest struct {
-
Repo string `json:"repo"` // DID of the repository
-
Collection string `json:"collection"` // NSID of the collection
-
RKey string `json:"rkey,omitempty"` // Optional record key
-
Validate bool `json:"validate"` // Whether to validate against lexicon
-
Record json.RawMessage `json:"record"` // The record data
-
}
-
-
// CreateRecordResponse represents the response after creating a record
-
type CreateRecordResponse struct {
-
URI string `json:"uri"` // AT-URI of the created record
-
CID string `json:"cid"` // CID of the record
-
}
-
-
// GetRecordRequest represents a request to get a record
-
type GetRecordRequest struct {
-
Repo string `json:"repo"` // DID of the repository
-
Collection string `json:"collection"` // NSID of the collection
-
RKey string `json:"rkey"` // Record key
-
}
-
-
// GetRecordResponse represents the response when getting a record
-
type GetRecordResponse struct {
-
URI string `json:"uri"` // AT-URI of the record
-
CID string `json:"cid"` // CID of the record
-
Value json.RawMessage `json:"value"` // The record data
-
}
-
-
// PutRecordRequest represents a request to update a record
-
type PutRecordRequest struct {
-
Repo string `json:"repo"` // DID of the repository
-
Collection string `json:"collection"` // NSID of the collection
-
RKey string `json:"rkey"` // Record key
-
Validate bool `json:"validate"` // Whether to validate against lexicon
-
Record json.RawMessage `json:"record"` // The record data
-
}
-
-
// PutRecordResponse represents the response after updating a record
-
type PutRecordResponse struct {
-
URI string `json:"uri"` // AT-URI of the updated record
-
CID string `json:"cid"` // CID of the record
-
}
-
-
// DeleteRecordRequest represents a request to delete a record
-
type DeleteRecordRequest struct {
-
Repo string `json:"repo"` // DID of the repository
-
Collection string `json:"collection"` // NSID of the collection
-
RKey string `json:"rkey"` // Record key
-
}
-
-
// ListRecordsRequest represents a request to list records
-
type ListRecordsRequest struct {
-
Repo string `json:"repo"` // DID of the repository
-
Collection string `json:"collection"` // NSID of the collection
-
Limit int `json:"limit,omitempty"`
-
Cursor string `json:"cursor,omitempty"`
-
}
-
-
// ListRecordsResponse represents the response when listing records
-
type ListRecordsResponse struct {
-
Cursor string `json:"cursor,omitempty"`
-
Records []RecordOutput `json:"records"`
-
}
-
-
// RecordOutput represents a record in list responses
-
type RecordOutput struct {
-
URI string `json:"uri"`
-
CID string `json:"cid"`
-
Value json.RawMessage `json:"value"`
-
}
-
-
// Handler methods
-
-
// CreateRecord handles POST /xrpc/com.atproto.repo.createRecord
-
func (h *RepositoryHandler) CreateRecord(w http.ResponseWriter, r *http.Request) {
-
var req CreateRecordRequest
-
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
-
writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid request: %v", err))
-
return
-
}
-
-
// Validate required fields
-
if req.Repo == "" || req.Collection == "" || len(req.Record) == 0 {
-
writeError(w, http.StatusBadRequest, "missing required fields")
-
return
-
}
-
-
// Create a generic record structure for CBOR encoding
-
// In a real implementation, you would unmarshal to the specific lexicon type
-
recordData := &GenericRecord{
-
Data: req.Record,
-
}
-
-
input := repository.CreateRecordInput{
-
DID: req.Repo,
-
Collection: req.Collection,
-
RecordKey: req.RKey,
-
Record: recordData,
-
Validate: req.Validate,
-
}
-
-
record, err := h.service.CreateRecord(input)
-
if err != nil {
-
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to create record: %v", err))
-
return
-
}
-
-
resp := CreateRecordResponse{
-
URI: record.URI,
-
CID: record.CID.String(),
-
}
-
-
writeJSON(w, http.StatusOK, resp)
-
}
-
-
// GetRecord handles GET /xrpc/com.atproto.repo.getRecord
-
func (h *RepositoryHandler) GetRecord(w http.ResponseWriter, r *http.Request) {
-
// Parse query parameters
-
repo := r.URL.Query().Get("repo")
-
collection := r.URL.Query().Get("collection")
-
rkey := r.URL.Query().Get("rkey")
-
-
if repo == "" || collection == "" || rkey == "" {
-
writeError(w, http.StatusBadRequest, "missing required parameters")
-
return
-
}
-
-
input := repository.GetRecordInput{
-
DID: repo,
-
Collection: collection,
-
RecordKey: rkey,
-
}
-
-
record, err := h.service.GetRecord(input)
-
if err != nil {
-
if strings.Contains(err.Error(), "not found") {
-
writeError(w, http.StatusNotFound, "record not found")
-
return
-
}
-
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to get record: %v", err))
-
return
-
}
-
-
resp := GetRecordResponse{
-
URI: record.URI,
-
CID: record.CID.String(),
-
Value: json.RawMessage(record.Value),
-
}
-
-
writeJSON(w, http.StatusOK, resp)
-
}
-
-
// PutRecord handles POST /xrpc/com.atproto.repo.putRecord
-
func (h *RepositoryHandler) PutRecord(w http.ResponseWriter, r *http.Request) {
-
var req PutRecordRequest
-
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
-
writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid request: %v", err))
-
return
-
}
-
-
// Validate required fields
-
if req.Repo == "" || req.Collection == "" || req.RKey == "" || len(req.Record) == 0 {
-
writeError(w, http.StatusBadRequest, "missing required fields")
-
return
-
}
-
-
// Create a generic record structure for CBOR encoding
-
recordData := &GenericRecord{
-
Data: req.Record,
-
}
-
-
input := repository.UpdateRecordInput{
-
DID: req.Repo,
-
Collection: req.Collection,
-
RecordKey: req.RKey,
-
Record: recordData,
-
Validate: req.Validate,
-
}
-
-
record, err := h.service.UpdateRecord(input)
-
if err != nil {
-
if strings.Contains(err.Error(), "not found") {
-
writeError(w, http.StatusNotFound, "record not found")
-
return
-
}
-
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to update record: %v", err))
-
return
-
}
-
-
resp := PutRecordResponse{
-
URI: record.URI,
-
CID: record.CID.String(),
-
}
-
-
writeJSON(w, http.StatusOK, resp)
-
}
-
-
// DeleteRecord handles POST /xrpc/com.atproto.repo.deleteRecord
-
func (h *RepositoryHandler) DeleteRecord(w http.ResponseWriter, r *http.Request) {
-
var req DeleteRecordRequest
-
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
-
writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid request: %v", err))
-
return
-
}
-
-
// Validate required fields
-
if req.Repo == "" || req.Collection == "" || req.RKey == "" {
-
writeError(w, http.StatusBadRequest, "missing required fields")
-
return
-
}
-
-
input := repository.DeleteRecordInput{
-
DID: req.Repo,
-
Collection: req.Collection,
-
RecordKey: req.RKey,
-
}
-
-
err := h.service.DeleteRecord(input)
-
if err != nil {
-
if strings.Contains(err.Error(), "not found") {
-
writeError(w, http.StatusNotFound, "record not found")
-
return
-
}
-
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to delete record: %v", err))
-
return
-
}
-
-
w.WriteHeader(http.StatusOK)
-
w.Write([]byte("{}"))
-
}
-
-
// ListRecords handles GET /xrpc/com.atproto.repo.listRecords
-
func (h *RepositoryHandler) ListRecords(w http.ResponseWriter, r *http.Request) {
-
// Parse query parameters
-
repo := r.URL.Query().Get("repo")
-
collection := r.URL.Query().Get("collection")
-
limit := 50 // Default limit
-
cursor := r.URL.Query().Get("cursor")
-
-
if repo == "" || collection == "" {
-
writeError(w, http.StatusBadRequest, "missing required parameters")
-
return
-
}
-
-
// Parse limit if provided
-
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
-
fmt.Sscanf(limitStr, "%d", &limit)
-
if limit > 100 {
-
limit = 100 // Max limit
-
}
-
}
-
-
records, nextCursor, err := h.service.ListRecords(repo, collection, limit, cursor)
-
if err != nil {
-
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to list records: %v", err))
-
return
-
}
-
-
// Convert to output format
-
recordOutputs := make([]RecordOutput, len(records))
-
for i, record := range records {
-
recordOutputs[i] = RecordOutput{
-
URI: record.URI,
-
CID: record.CID.String(),
-
Value: json.RawMessage(record.Value),
-
}
-
}
-
-
resp := ListRecordsResponse{
-
Cursor: nextCursor,
-
Records: recordOutputs,
-
}
-
-
writeJSON(w, http.StatusOK, resp)
-
}
-
-
// GetRepo handles GET /xrpc/com.atproto.sync.getRepo
-
func (h *RepositoryHandler) GetRepo(w http.ResponseWriter, r *http.Request) {
-
// Parse query parameters
-
did := r.URL.Query().Get("did")
-
if did == "" {
-
writeError(w, http.StatusBadRequest, "missing did parameter")
-
return
-
}
-
-
// Export repository as CAR file
-
carData, err := h.service.ExportRepository(did)
-
if err != nil {
-
if strings.Contains(err.Error(), "not found") {
-
writeError(w, http.StatusNotFound, "repository not found")
-
return
-
}
-
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to export repository: %v", err))
-
return
-
}
-
-
// Set appropriate headers for CAR file
-
w.Header().Set("Content-Type", "application/vnd.ipld.car")
-
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(carData)))
-
w.WriteHeader(http.StatusOK)
-
w.Write(carData)
-
}
-
-
// Additional repository management endpoints
-
-
// CreateRepository handles POST /xrpc/com.atproto.repo.createRepo
-
func (h *RepositoryHandler) CreateRepository(w http.ResponseWriter, r *http.Request) {
-
var req struct {
-
DID string `json:"did"`
-
}
-
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
-
writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid request: %v", err))
-
return
-
}
-
-
if req.DID == "" {
-
writeError(w, http.StatusBadRequest, "missing did")
-
return
-
}
-
-
repo, err := h.service.CreateRepository(req.DID)
-
if err != nil {
-
if strings.Contains(err.Error(), "already exists") {
-
writeError(w, http.StatusConflict, "repository already exists")
-
return
-
}
-
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to create repository: %v", err))
-
return
-
}
-
-
resp := struct {
-
DID string `json:"did"`
-
HeadCID string `json:"head"`
-
}{
-
DID: repo.DID,
-
HeadCID: repo.HeadCID.String(),
-
}
-
-
writeJSON(w, http.StatusOK, resp)
-
}
-
-
// GetCommit handles GET /xrpc/com.atproto.sync.getCommit
-
func (h *RepositoryHandler) GetCommit(w http.ResponseWriter, r *http.Request) {
-
// Parse query parameters
-
did := r.URL.Query().Get("did")
-
commitCIDStr := r.URL.Query().Get("cid")
-
-
if did == "" || commitCIDStr == "" {
-
writeError(w, http.StatusBadRequest, "missing required parameters")
-
return
-
}
-
-
// Parse CID
-
commitCID, err := cid.Parse(commitCIDStr)
-
if err != nil {
-
writeError(w, http.StatusBadRequest, "invalid cid")
-
return
-
}
-
-
commit, err := h.service.GetCommit(did, commitCID)
-
if err != nil {
-
if strings.Contains(err.Error(), "not found") {
-
writeError(w, http.StatusNotFound, "commit not found")
-
return
-
}
-
writeError(w, http.StatusInternalServerError, fmt.Sprintf("failed to get commit: %v", err))
-
return
-
}
-
-
resp := struct {
-
CID string `json:"cid"`
-
DID string `json:"did"`
-
Version int `json:"version"`
-
PrevCID *string `json:"prev,omitempty"`
-
DataCID string `json:"data"`
-
Revision string `json:"rev"`
-
Signature string `json:"sig"`
-
CreatedAt string `json:"createdAt"`
-
}{
-
CID: commit.CID.String(),
-
DID: commit.DID,
-
Version: commit.Version,
-
DataCID: commit.DataCID.String(),
-
Revision: commit.Revision,
-
Signature: fmt.Sprintf("%x", commit.Signature),
-
CreatedAt: commit.CreatedAt.Format("2006-01-02T15:04:05Z"),
-
}
-
-
if commit.PrevCID != nil {
-
prev := commit.PrevCID.String()
-
resp.PrevCID = &prev
-
}
-
-
writeJSON(w, http.StatusOK, resp)
-
}
-
-
// Helper functions
-
-
func writeJSON(w http.ResponseWriter, status int, data interface{}) {
-
w.Header().Set("Content-Type", "application/json")
-
w.WriteHeader(status)
-
json.NewEncoder(w).Encode(data)
-
}
-
-
func writeError(w http.ResponseWriter, status int, message string) {
-
w.Header().Set("Content-Type", "application/json")
-
w.WriteHeader(status)
-
json.NewEncoder(w).Encode(map[string]interface{}{
-
"error": http.StatusText(status),
-
"message": message,
-
})
-
}
-
-
// GenericRecord is a temporary structure for CBOR encoding
-
// In a real implementation, you would have specific types for each lexicon
-
type GenericRecord struct {
-
Data json.RawMessage
-
}
-
-
// MarshalCBOR implements the CBORMarshaler interface
-
func (g *GenericRecord) MarshalCBOR(w io.Writer) error {
-
// Parse JSON data into a generic map for proper CBOR encoding
-
var data map[string]interface{}
-
if err := json.Unmarshal(g.Data, &data); err != nil {
-
return fmt.Errorf("failed to unmarshal JSON data: %w", err)
-
}
-
-
// Use IPFS CBOR encoding to properly encode the data
-
cborData, err := cbornode.DumpObject(data)
-
if err != nil {
-
return fmt.Errorf("failed to marshal as CBOR: %w", err)
-
}
-
-
_, err = w.Write(cborData)
-
if err != nil {
-
return fmt.Errorf("failed to write CBOR data: %w", err)
-
}
-
-
return nil
-
}
···
-191
internal/api/handlers/repository_handler_test.go
···
-
package handlers
-
-
import (
-
"bytes"
-
"encoding/json"
-
"net/http"
-
"net/http/httptest"
-
"testing"
-
-
"Coves/internal/core/repository"
-
"github.com/ipfs/go-cid"
-
)
-
-
// MockRepositoryService is a mock implementation for testing
-
type MockRepositoryService struct {
-
repositories map[string]*repository.Repository
-
records map[string]*repository.Record
-
}
-
-
func NewMockRepositoryService() *MockRepositoryService {
-
return &MockRepositoryService{
-
repositories: make(map[string]*repository.Repository),
-
records: make(map[string]*repository.Record),
-
}
-
}
-
-
func (m *MockRepositoryService) CreateRepository(did string) (*repository.Repository, error) {
-
repo := &repository.Repository{
-
DID: did,
-
HeadCID: cid.Undef,
-
}
-
m.repositories[did] = repo
-
return repo, nil
-
}
-
-
func (m *MockRepositoryService) GetRepository(did string) (*repository.Repository, error) {
-
repo, exists := m.repositories[did]
-
if !exists {
-
return nil, nil
-
}
-
return repo, nil
-
}
-
-
func (m *MockRepositoryService) DeleteRepository(did string) error {
-
delete(m.repositories, did)
-
return nil
-
}
-
-
func (m *MockRepositoryService) CreateRecord(input repository.CreateRecordInput) (*repository.Record, error) {
-
uri := "at://" + input.DID + "/" + input.Collection + "/" + input.RecordKey
-
record := &repository.Record{
-
URI: uri,
-
CID: cid.Undef,
-
Collection: input.Collection,
-
RecordKey: input.RecordKey,
-
Value: []byte(`{"test": "data"}`),
-
}
-
m.records[uri] = record
-
return record, nil
-
}
-
-
func (m *MockRepositoryService) GetRecord(input repository.GetRecordInput) (*repository.Record, error) {
-
uri := "at://" + input.DID + "/" + input.Collection + "/" + input.RecordKey
-
record, exists := m.records[uri]
-
if !exists {
-
return nil, nil
-
}
-
return record, nil
-
}
-
-
func (m *MockRepositoryService) UpdateRecord(input repository.UpdateRecordInput) (*repository.Record, error) {
-
uri := "at://" + input.DID + "/" + input.Collection + "/" + input.RecordKey
-
record := &repository.Record{
-
URI: uri,
-
CID: cid.Undef,
-
Collection: input.Collection,
-
RecordKey: input.RecordKey,
-
Value: []byte(`{"test": "updated"}`),
-
}
-
m.records[uri] = record
-
return record, nil
-
}
-
-
func (m *MockRepositoryService) DeleteRecord(input repository.DeleteRecordInput) error {
-
uri := "at://" + input.DID + "/" + input.Collection + "/" + input.RecordKey
-
delete(m.records, uri)
-
return nil
-
}
-
-
func (m *MockRepositoryService) ListRecords(did string, collection string, limit int, cursor string) ([]*repository.Record, string, error) {
-
var records []*repository.Record
-
for _, record := range m.records {
-
if record.Collection == collection {
-
records = append(records, record)
-
}
-
}
-
return records, "", nil
-
}
-
-
func (m *MockRepositoryService) GetCommit(did string, cid cid.Cid) (*repository.Commit, error) {
-
return nil, nil
-
}
-
-
func (m *MockRepositoryService) ListCommits(did string, limit int, cursor string) ([]*repository.Commit, string, error) {
-
return []*repository.Commit{}, "", nil
-
}
-
-
func (m *MockRepositoryService) ExportRepository(did string) ([]byte, error) {
-
return []byte("mock-car-data"), nil
-
}
-
-
func (m *MockRepositoryService) ImportRepository(did string, carData []byte) error {
-
return nil
-
}
-
-
func TestCreateRecordHandler(t *testing.T) {
-
mockService := NewMockRepositoryService()
-
handler := NewRepositoryHandler(mockService)
-
-
// Create test request
-
reqData := CreateRecordRequest{
-
Repo: "did:plc:test123",
-
Collection: "app.bsky.feed.post",
-
RKey: "testkey",
-
Record: json.RawMessage(`{"text": "Hello, world!"}`),
-
}
-
-
reqBody, err := json.Marshal(reqData)
-
if err != nil {
-
t.Fatalf("Failed to marshal request: %v", err)
-
}
-
-
req := httptest.NewRequest("POST", "/xrpc/com.atproto.repo.createRecord", bytes.NewReader(reqBody))
-
req.Header.Set("Content-Type", "application/json")
-
w := httptest.NewRecorder()
-
-
// Call handler
-
handler.CreateRecord(w, req)
-
-
// Check response
-
if w.Code != http.StatusOK {
-
t.Errorf("Expected status 200, got %d", w.Code)
-
}
-
-
var resp CreateRecordResponse
-
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
-
t.Fatalf("Failed to decode response: %v", err)
-
}
-
-
expectedURI := "at://did:plc:test123/app.bsky.feed.post/testkey"
-
if resp.URI != expectedURI {
-
t.Errorf("Expected URI %s, got %s", expectedURI, resp.URI)
-
}
-
}
-
-
func TestGetRecordHandler(t *testing.T) {
-
mockService := NewMockRepositoryService()
-
handler := NewRepositoryHandler(mockService)
-
-
// Create a test record first
-
uri := "at://did:plc:test123/app.bsky.feed.post/testkey"
-
testRecord := &repository.Record{
-
URI: uri,
-
CID: cid.Undef,
-
Collection: "app.bsky.feed.post",
-
RecordKey: "testkey",
-
Value: []byte(`{"text": "Hello, world!"}`),
-
}
-
mockService.records[uri] = testRecord
-
-
// Create test request
-
req := httptest.NewRequest("GET", "/xrpc/com.atproto.repo.getRecord?repo=did:plc:test123&collection=app.bsky.feed.post&rkey=testkey", nil)
-
w := httptest.NewRecorder()
-
-
// Call handler
-
handler.GetRecord(w, req)
-
-
// Check response
-
if w.Code != http.StatusOK {
-
t.Errorf("Expected status 200, got %d", w.Code)
-
}
-
-
var resp GetRecordResponse
-
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
-
t.Fatalf("Failed to decode response: %v", err)
-
}
-
-
if resp.URI != uri {
-
t.Errorf("Expected URI %s, got %s", uri, resp.URI)
-
}
-
}
···
-33
internal/api/routes/repository.go
···
-
package routes
-
-
import (
-
"Coves/internal/api/handlers"
-
"Coves/internal/core/repository"
-
"github.com/go-chi/chi/v5"
-
)
-
-
// RepositoryRoutes returns repository-related routes
-
func RepositoryRoutes(service repository.RepositoryService) chi.Router {
-
handler := handlers.NewRepositoryHandler(service)
-
-
r := chi.NewRouter()
-
-
// AT Protocol XRPC endpoints for repository operations
-
r.Route("/xrpc", func(r chi.Router) {
-
// Record operations
-
r.Post("/com.atproto.repo.createRecord", handler.CreateRecord)
-
r.Get("/com.atproto.repo.getRecord", handler.GetRecord)
-
r.Post("/com.atproto.repo.putRecord", handler.PutRecord)
-
r.Post("/com.atproto.repo.deleteRecord", handler.DeleteRecord)
-
r.Get("/com.atproto.repo.listRecords", handler.ListRecords)
-
-
// Repository operations
-
r.Post("/com.atproto.repo.createRepo", handler.CreateRepository)
-
-
// Sync operations
-
r.Get("/com.atproto.sync.getRepo", handler.GetRepo)
-
r.Get("/com.atproto.sync.getCommit", handler.GetCommit)
-
})
-
-
return r
-
}
···
-20
internal/core/repository/constants.go
···
-
package repository
-
-
import (
-
"github.com/ipfs/go-cid"
-
"github.com/multiformats/go-multihash"
-
)
-
-
var (
-
// PlaceholderCID is used for empty repositories that have no content yet
-
// This allows us to maintain consistency in the repository record
-
// while the actual CAR data is created when records are added
-
PlaceholderCID cid.Cid
-
)
-
-
func init() {
-
// Initialize the placeholder CID once at startup
-
emptyData := []byte("empty")
-
mh, _ := multihash.Sum(emptyData, multihash.SHA2_256, -1)
-
PlaceholderCID = cid.NewCidV1(cid.Raw, mh)
-
}
···
-123
internal/core/repository/repository.go
···
-
package repository
-
-
import (
-
"time"
-
-
"github.com/ipfs/go-cid"
-
)
-
-
// Repository represents an AT Protocol data repository
-
type Repository struct {
-
DID string // Decentralized identifier of the repository owner
-
HeadCID cid.Cid // CID of the latest commit
-
Revision string // Current revision identifier
-
RecordCount int // Number of records in the repository
-
StorageSize int64 // Total storage size in bytes
-
CreatedAt time.Time
-
UpdatedAt time.Time
-
}
-
-
// Commit represents a signed repository commit
-
type Commit struct {
-
CID cid.Cid // Content identifier of this commit
-
DID string // DID of the committer
-
Version int // Repository version
-
PrevCID *cid.Cid // CID of the previous commit (nil for first commit)
-
DataCID cid.Cid // CID of the MST root
-
Revision string // Revision identifier
-
Signature []byte // Cryptographic signature
-
SigningKeyID string // Key ID used for signing
-
CreatedAt time.Time
-
}
-
-
// Record represents a record in the repository
-
type Record struct {
-
URI string // AT-URI of the record (e.g., at://did:plc:123/app.bsky.feed.post/abc)
-
CID cid.Cid // Content identifier
-
Collection string // Collection name (e.g., app.bsky.feed.post)
-
RecordKey string // Record key within collection
-
Value []byte // The actual record data (typically CBOR)
-
CreatedAt time.Time
-
UpdatedAt time.Time
-
}
-
-
-
// CreateRecordInput represents input for creating a record
-
type CreateRecordInput struct {
-
DID string
-
Collection string
-
RecordKey string // Optional - will be generated if not provided
-
Record interface{}
-
Validate bool // Whether to validate against lexicon
-
}
-
-
// UpdateRecordInput represents input for updating a record
-
type UpdateRecordInput struct {
-
DID string
-
Collection string
-
RecordKey string
-
Record interface{}
-
Validate bool
-
}
-
-
// GetRecordInput represents input for retrieving a record
-
type GetRecordInput struct {
-
DID string
-
Collection string
-
RecordKey string
-
}
-
-
// DeleteRecordInput represents input for deleting a record
-
type DeleteRecordInput struct {
-
DID string
-
Collection string
-
RecordKey string
-
}
-
-
// RepositoryService defines the business logic for repository operations
-
type RepositoryService interface {
-
// Repository operations
-
CreateRepository(did string) (*Repository, error)
-
GetRepository(did string) (*Repository, error)
-
DeleteRepository(did string) error
-
-
// Record operations
-
CreateRecord(input CreateRecordInput) (*Record, error)
-
GetRecord(input GetRecordInput) (*Record, error)
-
UpdateRecord(input UpdateRecordInput) (*Record, error)
-
DeleteRecord(input DeleteRecordInput) error
-
-
// Collection operations
-
ListRecords(did string, collection string, limit int, cursor string) ([]*Record, string, error)
-
-
// Commit operations
-
GetCommit(did string, cid cid.Cid) (*Commit, error)
-
ListCommits(did string, limit int, cursor string) ([]*Commit, string, error)
-
-
// Export operations
-
ExportRepository(did string) ([]byte, error) // Returns CAR file
-
ImportRepository(did string, carData []byte) error
-
}
-
-
// RepositoryRepository defines the data access interface for repositories
-
type RepositoryRepository interface {
-
// Repository operations
-
Create(repo *Repository) error
-
GetByDID(did string) (*Repository, error)
-
Update(repo *Repository) error
-
Delete(did string) error
-
-
// Commit operations
-
CreateCommit(commit *Commit) error
-
GetCommit(did string, cid cid.Cid) (*Commit, error)
-
GetLatestCommit(did string) (*Commit, error)
-
ListCommits(did string, limit int, offset int) ([]*Commit, error)
-
-
// Record operations
-
CreateRecord(record *Record) error
-
GetRecord(did string, collection string, recordKey string) (*Record, error)
-
UpdateRecord(record *Record) error
-
DeleteRecord(did string, collection string, recordKey string) error
-
ListRecords(did string, collection string, limit int, offset int) ([]*Record, error)
-
-
}
···
-243
internal/core/repository/service.go
···
-
package repository
-
-
import (
-
"bytes"
-
"context"
-
"fmt"
-
"strings"
-
"time"
-
-
"Coves/internal/atproto/carstore"
-
"github.com/ipfs/go-cid"
-
)
-
-
// Service implements the RepositoryService interface using Indigo's carstore
-
type Service struct {
-
repo RepositoryRepository
-
repoStore *carstore.RepoStore
-
signingKeys map[string]interface{} // DID -> signing key
-
}
-
-
// NewService creates a new repository service using carstore
-
func NewService(repo RepositoryRepository, repoStore *carstore.RepoStore) *Service {
-
return &Service{
-
repo: repo,
-
repoStore: repoStore,
-
signingKeys: make(map[string]interface{}),
-
}
-
}
-
-
// SetSigningKey sets the signing key for a DID
-
func (s *Service) SetSigningKey(did string, signingKey interface{}) {
-
s.signingKeys[did] = signingKey
-
}
-
-
// CreateRepository creates a new repository
-
func (s *Service) CreateRepository(did string) (*Repository, error) {
-
// Check if repository already exists
-
existing, err := s.repo.GetByDID(did)
-
if err != nil {
-
return nil, fmt.Errorf("checking existing repository: %w", err)
-
}
-
if existing != nil {
-
return nil, fmt.Errorf("repository already exists for DID: %s", did)
-
}
-
-
// For now, just create the user mapping without importing CAR data
-
// The actual repository data will be created when records are added
-
ctx := context.Background()
-
-
// Ensure user mapping exists
-
_, err = s.repoStore.GetOrCreateUID(ctx, did)
-
if err != nil {
-
return nil, fmt.Errorf("creating user mapping: %w", err)
-
}
-
-
// Use placeholder CID for the empty repository
-
-
// Create repository record
-
repository := &Repository{
-
DID: did,
-
HeadCID: PlaceholderCID,
-
Revision: "rev-0",
-
RecordCount: 0,
-
StorageSize: 0,
-
CreatedAt: time.Now(),
-
UpdatedAt: time.Now(),
-
}
-
-
// Save to database
-
if err := s.repo.Create(repository); err != nil {
-
return nil, fmt.Errorf("saving repository: %w", err)
-
}
-
-
return repository, nil
-
}
-
-
// GetRepository retrieves a repository by DID
-
func (s *Service) GetRepository(did string) (*Repository, error) {
-
repo, err := s.repo.GetByDID(did)
-
if err != nil {
-
return nil, fmt.Errorf("getting repository: %w", err)
-
}
-
if repo == nil {
-
return nil, fmt.Errorf("repository not found for DID: %s", did)
-
}
-
-
// Update head CID from carstore
-
headCID, err := s.repoStore.GetRepoHead(context.Background(), did)
-
if err == nil && headCID.Defined() {
-
repo.HeadCID = headCID
-
}
-
-
return repo, nil
-
}
-
-
// DeleteRepository deletes a repository
-
func (s *Service) DeleteRepository(did string) error {
-
// Delete from carstore
-
if err := s.repoStore.DeleteRepo(context.Background(), did); err != nil {
-
return fmt.Errorf("deleting repo from carstore: %w", err)
-
}
-
-
// Delete from database
-
if err := s.repo.Delete(did); err != nil {
-
return fmt.Errorf("deleting repository from database: %w", err)
-
}
-
-
return nil
-
}
-
-
// ExportRepository exports a repository as a CAR file
-
func (s *Service) ExportRepository(did string) ([]byte, error) {
-
// First check if repository exists in our database
-
repo, err := s.repo.GetByDID(did)
-
if err != nil {
-
return nil, fmt.Errorf("getting repository: %w", err)
-
}
-
if repo == nil {
-
return nil, fmt.Errorf("repository not found for DID: %s", did)
-
}
-
-
// Try to read from carstore
-
carData, err := s.repoStore.ReadRepo(context.Background(), did, "")
-
if err != nil {
-
// If no data in carstore yet, return empty CAR
-
// This happens when a repo is created but no records added yet
-
// Check for the specific error pattern from Indigo's carstore
-
errMsg := err.Error()
-
if strings.Contains(errMsg, "no data found for user") ||
-
strings.Contains(errMsg, "user not found") {
-
return []byte{}, nil
-
}
-
return nil, fmt.Errorf("exporting repository: %w", err)
-
}
-
-
return carData, nil
-
}
-
-
// ImportRepository imports a repository from a CAR file
-
func (s *Service) ImportRepository(did string, carData []byte) error {
-
ctx := context.Background()
-
-
// If empty CAR data, just create user mapping
-
if len(carData) == 0 {
-
_, err := s.repoStore.GetOrCreateUID(ctx, did)
-
if err != nil {
-
return fmt.Errorf("creating user mapping: %w", err)
-
}
-
-
// Use placeholder CID for empty repository
-
headCID := PlaceholderCID
-
-
// Create repository record
-
repo := &Repository{
-
DID: did,
-
HeadCID: headCID,
-
Revision: "imported-empty",
-
RecordCount: 0,
-
StorageSize: 0,
-
CreatedAt: time.Now(),
-
UpdatedAt: time.Now(),
-
}
-
if err := s.repo.Create(repo); err != nil {
-
return fmt.Errorf("creating repository: %w", err)
-
}
-
return nil
-
}
-
-
// Import non-empty CAR into carstore
-
headCID, err := s.repoStore.ImportRepo(ctx, did, bytes.NewReader(carData))
-
if err != nil {
-
return fmt.Errorf("importing repository: %w", err)
-
}
-
-
// Create or update repository record
-
repo, err := s.repo.GetByDID(did)
-
if err != nil {
-
return fmt.Errorf("getting repository: %w", err)
-
}
-
-
if repo == nil {
-
// Create new repository
-
repo = &Repository{
-
DID: did,
-
HeadCID: headCID,
-
Revision: "imported",
-
RecordCount: 0, // TODO: Count records in CAR
-
StorageSize: int64(len(carData)),
-
CreatedAt: time.Now(),
-
UpdatedAt: time.Now(),
-
}
-
if err := s.repo.Create(repo); err != nil {
-
return fmt.Errorf("creating repository: %w", err)
-
}
-
} else {
-
// Update existing repository
-
repo.HeadCID = headCID
-
repo.UpdatedAt = time.Now()
-
if err := s.repo.Update(repo); err != nil {
-
return fmt.Errorf("updating repository: %w", err)
-
}
-
}
-
-
return nil
-
}
-
-
// CompactRepository runs garbage collection on a repository
-
func (s *Service) CompactRepository(did string) error {
-
return s.repoStore.CompactRepo(context.Background(), did)
-
}
-
-
// Note: Record-level operations would require more complex implementation
-
// to work with the carstore. For now, these are placeholder implementations
-
// that would need to be expanded to properly handle record CRUD operations
-
// by reading the CAR, modifying the repo structure, and writing back.
-
-
func (s *Service) CreateRecord(input CreateRecordInput) (*Record, error) {
-
return nil, fmt.Errorf("record operations not yet implemented for carstore")
-
}
-
-
func (s *Service) GetRecord(input GetRecordInput) (*Record, error) {
-
return nil, fmt.Errorf("record operations not yet implemented for carstore")
-
}
-
-
func (s *Service) UpdateRecord(input UpdateRecordInput) (*Record, error) {
-
return nil, fmt.Errorf("record operations not yet implemented for carstore")
-
}
-
-
func (s *Service) DeleteRecord(input DeleteRecordInput) error {
-
return fmt.Errorf("record operations not yet implemented for carstore")
-
}
-
-
func (s *Service) ListRecords(did string, collection string, limit int, cursor string) ([]*Record, string, error) {
-
return nil, "", fmt.Errorf("record operations not yet implemented for carstore")
-
}
-
-
func (s *Service) GetCommit(did string, commitCID cid.Cid) (*Commit, error) {
-
return nil, fmt.Errorf("commit operations not yet implemented for carstore")
-
}
-
-
func (s *Service) ListCommits(did string, limit int, cursor string) ([]*Commit, string, error) {
-
return nil, "", fmt.Errorf("commit operations not yet implemented for carstore")
-
}
···
-513
internal/core/repository/service_test.go
···
-
package repository_test
-
-
import (
-
"context"
-
"database/sql"
-
"fmt"
-
"os"
-
"testing"
-
-
"Coves/internal/atproto/carstore"
-
"Coves/internal/core/repository"
-
"Coves/internal/db/postgres"
-
-
"github.com/ipfs/go-cid"
-
_ "github.com/lib/pq"
-
"github.com/pressly/goose/v3"
-
postgresDriver "gorm.io/driver/postgres"
-
"gorm.io/gorm"
-
)
-
-
// Mock signing key for testing
-
type mockSigningKey struct{}
-
-
// Test database connection
-
func setupTestDB(t *testing.T) (*sql.DB, *gorm.DB, func()) {
-
// Use test database URL from environment or default
-
dbURL := os.Getenv("TEST_DATABASE_URL")
-
if dbURL == "" {
-
// Skip test if no database configured
-
t.Skip("TEST_DATABASE_URL not set, skipping database tests")
-
}
-
-
// Connect with sql.DB for migrations
-
sqlDB, err := sql.Open("postgres", dbURL)
-
if err != nil {
-
t.Fatalf("Failed to connect to test database: %v", err)
-
}
-
-
// Run migrations
-
if err := goose.Up(sqlDB, "../../db/migrations"); err != nil {
-
t.Fatalf("Failed to run migrations: %v", err)
-
}
-
-
// Connect with GORM using a fresh connection
-
gormDB, err := gorm.Open(postgresDriver.Open(dbURL), &gorm.Config{
-
DisableForeignKeyConstraintWhenMigrating: true,
-
PrepareStmt: false,
-
})
-
if err != nil {
-
t.Fatalf("Failed to create GORM connection: %v", err)
-
}
-
-
// Cleanup function
-
cleanup := func() {
-
// Clean up test data
-
gormDB.Exec("DELETE FROM repositories")
-
gormDB.Exec("DELETE FROM commits")
-
gormDB.Exec("DELETE FROM records")
-
gormDB.Exec("DELETE FROM user_maps")
-
gormDB.Exec("DELETE FROM car_shards")
-
gormDB.Exec("DELETE FROM block_refs")
-
-
// Close GORM connection
-
if sqlGormDB, err := gormDB.DB(); err == nil {
-
sqlGormDB.Close()
-
}
-
-
// Close original SQL connection
-
sqlDB.Close()
-
}
-
-
return sqlDB, gormDB, cleanup
-
}
-
-
func TestRepositoryService_CreateRepository(t *testing.T) {
-
sqlDB, gormDB, cleanup := setupTestDB(t)
-
defer cleanup()
-
-
// Create temporary directory for carstore
-
tempDir, err := os.MkdirTemp("", "carstore_test")
-
if err != nil {
-
t.Fatalf("Failed to create temp dir: %v", err)
-
}
-
defer os.RemoveAll(tempDir)
-
-
// Initialize carstore
-
carDirs := []string{tempDir}
-
repoStore, err := carstore.NewRepoStore(gormDB, carDirs)
-
if err != nil {
-
t.Fatalf("Failed to create repo store: %v", err)
-
}
-
-
// Initialize repository service
-
repoRepo := postgres.NewRepositoryRepo(sqlDB)
-
service := repository.NewService(repoRepo, repoStore)
-
-
// Test DID
-
testDID := "did:plc:testuser123"
-
-
// Set signing key
-
service.SetSigningKey(testDID, &mockSigningKey{})
-
-
// Create repository
-
repo, err := service.CreateRepository(testDID)
-
if err != nil {
-
t.Fatalf("Failed to create repository: %v", err)
-
}
-
-
// Verify repository was created
-
if repo.DID != testDID {
-
t.Errorf("Expected DID %s, got %s", testDID, repo.DID)
-
}
-
if !repo.HeadCID.Defined() {
-
t.Error("Expected HeadCID to be defined")
-
}
-
if repo.RecordCount != 0 {
-
t.Errorf("Expected RecordCount 0, got %d", repo.RecordCount)
-
}
-
-
// Verify repository exists in database
-
fetchedRepo, err := service.GetRepository(testDID)
-
if err != nil {
-
t.Fatalf("Failed to get repository: %v", err)
-
}
-
if fetchedRepo.DID != testDID {
-
t.Errorf("Expected fetched DID %s, got %s", testDID, fetchedRepo.DID)
-
}
-
-
// Test duplicate creation should fail
-
_, err = service.CreateRepository(testDID)
-
if err == nil {
-
t.Error("Expected error creating duplicate repository")
-
}
-
}
-
-
func TestRepositoryService_ImportExport(t *testing.T) {
-
sqlDB, gormDB, cleanup := setupTestDB(t)
-
defer cleanup()
-
-
// Create temporary directory for carstore
-
tempDir, err := os.MkdirTemp("", "carstore_test")
-
if err != nil {
-
t.Fatalf("Failed to create temp dir: %v", err)
-
}
-
defer os.RemoveAll(tempDir)
-
-
// Log the temp directory for debugging
-
t.Logf("Using carstore directory: %s", tempDir)
-
-
// Initialize carstore
-
carDirs := []string{tempDir}
-
repoStore, err := carstore.NewRepoStore(gormDB, carDirs)
-
if err != nil {
-
t.Fatalf("Failed to create repo store: %v", err)
-
}
-
-
// Initialize repository service
-
repoRepo := postgres.NewRepositoryRepo(sqlDB)
-
service := repository.NewService(repoRepo, repoStore)
-
-
// Create first repository
-
did1 := "did:plc:user1"
-
service.SetSigningKey(did1, &mockSigningKey{})
-
repo1, err := service.CreateRepository(did1)
-
if err != nil {
-
t.Fatalf("Failed to create repository 1: %v", err)
-
}
-
t.Logf("Created repository with HeadCID: %s", repo1.HeadCID)
-
-
// Check what's in the database
-
var userMapCount int
-
gormDB.Raw("SELECT COUNT(*) FROM user_maps").Scan(&userMapCount)
-
t.Logf("User maps count: %d", userMapCount)
-
-
var carShardCount int
-
gormDB.Raw("SELECT COUNT(*) FROM car_shards").Scan(&carShardCount)
-
t.Logf("Car shards count: %d", carShardCount)
-
-
// Check block_refs too
-
var blockRefCount int
-
gormDB.Raw("SELECT COUNT(*) FROM block_refs").Scan(&blockRefCount)
-
t.Logf("Block refs count: %d", blockRefCount)
-
-
// Export repository
-
carData, err := service.ExportRepository(did1)
-
if err != nil {
-
t.Fatalf("Failed to export repository: %v", err)
-
}
-
// For now, empty repositories return empty CAR data
-
t.Logf("Exported CAR data size: %d bytes", len(carData))
-
-
// Import to new DID
-
did2 := "did:plc:user2"
-
err = service.ImportRepository(did2, carData)
-
if err != nil {
-
t.Fatalf("Failed to import repository: %v", err)
-
}
-
-
// Verify imported repository
-
repo2, err := service.GetRepository(did2)
-
if err != nil {
-
t.Fatalf("Failed to get imported repository: %v", err)
-
}
-
if repo2.DID != did2 {
-
t.Errorf("Expected DID %s, got %s", did2, repo2.DID)
-
}
-
// Note: HeadCID might differ due to new import
-
}
-
-
func TestRepositoryService_DeleteRepository(t *testing.T) {
-
sqlDB, gormDB, cleanup := setupTestDB(t)
-
defer cleanup()
-
-
// Create temporary directory for carstore
-
tempDir, err := os.MkdirTemp("", "carstore_test")
-
if err != nil {
-
t.Fatalf("Failed to create temp dir: %v", err)
-
}
-
defer os.RemoveAll(tempDir)
-
-
// Initialize carstore
-
carDirs := []string{tempDir}
-
repoStore, err := carstore.NewRepoStore(gormDB, carDirs)
-
if err != nil {
-
t.Fatalf("Failed to create repo store: %v", err)
-
}
-
-
// Initialize repository service
-
repoRepo := postgres.NewRepositoryRepo(sqlDB)
-
service := repository.NewService(repoRepo, repoStore)
-
-
// Create repository
-
testDID := "did:plc:deletetest"
-
service.SetSigningKey(testDID, &mockSigningKey{})
-
_, err = service.CreateRepository(testDID)
-
if err != nil {
-
t.Fatalf("Failed to create repository: %v", err)
-
}
-
-
// Delete repository
-
err = service.DeleteRepository(testDID)
-
if err != nil {
-
t.Fatalf("Failed to delete repository: %v", err)
-
}
-
-
// Verify repository is deleted
-
_, err = service.GetRepository(testDID)
-
if err == nil {
-
t.Error("Expected error getting deleted repository")
-
}
-
}
-
-
func TestRepositoryService_CompactRepository(t *testing.T) {
-
sqlDB, gormDB, cleanup := setupTestDB(t)
-
defer cleanup()
-
-
// Create temporary directory for carstore
-
tempDir, err := os.MkdirTemp("", "carstore_test")
-
if err != nil {
-
t.Fatalf("Failed to create temp dir: %v", err)
-
}
-
defer os.RemoveAll(tempDir)
-
-
// Initialize carstore
-
carDirs := []string{tempDir}
-
repoStore, err := carstore.NewRepoStore(gormDB, carDirs)
-
if err != nil {
-
t.Fatalf("Failed to create repo store: %v", err)
-
}
-
-
// Initialize repository service
-
repoRepo := postgres.NewRepositoryRepo(sqlDB)
-
service := repository.NewService(repoRepo, repoStore)
-
-
// Create repository
-
testDID := "did:plc:compacttest"
-
service.SetSigningKey(testDID, &mockSigningKey{})
-
_, err = service.CreateRepository(testDID)
-
if err != nil {
-
t.Fatalf("Failed to create repository: %v", err)
-
}
-
-
// Run compaction (should not error even with minimal data)
-
err = service.CompactRepository(testDID)
-
if err != nil {
-
t.Errorf("Failed to compact repository: %v", err)
-
}
-
}
-
-
// Test UserMapping functionality
-
func TestUserMapping(t *testing.T) {
-
_, gormDB, cleanup := setupTestDB(t)
-
defer cleanup()
-
-
// Create user mapping
-
mapping, err := carstore.NewUserMapping(gormDB)
-
if err != nil {
-
t.Fatalf("Failed to create user mapping: %v", err)
-
}
-
-
// Test creating new mapping
-
did1 := "did:plc:mapping1"
-
uid1, err := mapping.GetOrCreateUID(context.Background(), did1)
-
if err != nil {
-
t.Fatalf("Failed to create UID for %s: %v", did1, err)
-
}
-
if uid1 == 0 {
-
t.Error("Expected non-zero UID")
-
}
-
-
// Test getting existing mapping
-
uid1Again, err := mapping.GetOrCreateUID(context.Background(), did1)
-
if err != nil {
-
t.Fatalf("Failed to get UID for %s: %v", did1, err)
-
}
-
if uid1 != uid1Again {
-
t.Errorf("Expected same UID, got %d and %d", uid1, uid1Again)
-
}
-
-
// Test reverse lookup
-
didLookup, err := mapping.GetDID(uid1)
-
if err != nil {
-
t.Fatalf("Failed to get DID for UID %d: %v", uid1, err)
-
}
-
if didLookup != did1 {
-
t.Errorf("Expected DID %s, got %s", did1, didLookup)
-
}
-
-
// Test second user gets different UID
-
did2 := "did:plc:mapping2"
-
uid2, err := mapping.GetOrCreateUID(context.Background(), did2)
-
if err != nil {
-
t.Fatalf("Failed to create UID for %s: %v", did2, err)
-
}
-
if uid2 == uid1 {
-
t.Error("Expected different UIDs for different DIDs")
-
}
-
}
-
-
// Test with mock repository and carstore
-
func TestRepositoryService_MockedComponents(t *testing.T) {
-
// Use the existing mock repository from the old test file
-
_ = NewMockRepositoryRepository()
-
-
// For unit testing without real carstore, we would need to mock RepoStore
-
// For now, this demonstrates the structure
-
t.Skip("Mocked carstore tests would require creating mock RepoStore interface")
-
}
-
-
// Benchmark repository creation
-
func BenchmarkRepositoryCreation(b *testing.B) {
-
sqlDB, gormDB, cleanup := setupTestDB(&testing.T{})
-
defer cleanup()
-
-
tempDir, _ := os.MkdirTemp("", "carstore_bench")
-
defer os.RemoveAll(tempDir)
-
-
carDirs := []string{tempDir}
-
repoStore, _ := carstore.NewRepoStore(gormDB, carDirs)
-
repoRepo := postgres.NewRepositoryRepo(sqlDB)
-
service := repository.NewService(repoRepo, repoStore)
-
-
b.ResetTimer()
-
for i := 0; i < b.N; i++ {
-
did := fmt.Sprintf("did:plc:bench%d", i)
-
service.SetSigningKey(did, &mockSigningKey{})
-
_, _ = service.CreateRepository(did)
-
}
-
}
-
-
// MockRepositoryRepository is a mock implementation of repository.RepositoryRepository
-
type MockRepositoryRepository struct {
-
repositories map[string]*repository.Repository
-
commits map[string][]*repository.Commit
-
records map[string]*repository.Record
-
}
-
-
func NewMockRepositoryRepository() *MockRepositoryRepository {
-
return &MockRepositoryRepository{
-
repositories: make(map[string]*repository.Repository),
-
commits: make(map[string][]*repository.Commit),
-
records: make(map[string]*repository.Record),
-
}
-
}
-
-
// Repository operations
-
func (m *MockRepositoryRepository) Create(repo *repository.Repository) error {
-
m.repositories[repo.DID] = repo
-
return nil
-
}
-
-
func (m *MockRepositoryRepository) GetByDID(did string) (*repository.Repository, error) {
-
repo, exists := m.repositories[did]
-
if !exists {
-
return nil, nil
-
}
-
return repo, nil
-
}
-
-
func (m *MockRepositoryRepository) Update(repo *repository.Repository) error {
-
if _, exists := m.repositories[repo.DID]; !exists {
-
return nil
-
}
-
m.repositories[repo.DID] = repo
-
return nil
-
}
-
-
func (m *MockRepositoryRepository) Delete(did string) error {
-
delete(m.repositories, did)
-
return nil
-
}
-
-
// Commit operations
-
func (m *MockRepositoryRepository) CreateCommit(commit *repository.Commit) error {
-
m.commits[commit.DID] = append(m.commits[commit.DID], commit)
-
return nil
-
}
-
-
func (m *MockRepositoryRepository) GetCommit(did string, commitCID cid.Cid) (*repository.Commit, error) {
-
commits, exists := m.commits[did]
-
if !exists {
-
return nil, nil
-
}
-
-
for _, c := range commits {
-
if c.CID.Equals(commitCID) {
-
return c, nil
-
}
-
}
-
return nil, nil
-
}
-
-
func (m *MockRepositoryRepository) GetLatestCommit(did string) (*repository.Commit, error) {
-
commits, exists := m.commits[did]
-
if !exists || len(commits) == 0 {
-
return nil, nil
-
}
-
return commits[len(commits)-1], nil
-
}
-
-
func (m *MockRepositoryRepository) ListCommits(did string, limit int, offset int) ([]*repository.Commit, error) {
-
commits, exists := m.commits[did]
-
if !exists {
-
return []*repository.Commit{}, nil
-
}
-
-
start := offset
-
if start >= len(commits) {
-
return []*repository.Commit{}, nil
-
}
-
-
end := start + limit
-
if end > len(commits) {
-
end = len(commits)
-
}
-
-
return commits[start:end], nil
-
}
-
-
// Record operations
-
func (m *MockRepositoryRepository) CreateRecord(record *repository.Record) error {
-
key := record.URI
-
m.records[key] = record
-
return nil
-
}
-
-
func (m *MockRepositoryRepository) GetRecord(did string, collection string, recordKey string) (*repository.Record, error) {
-
uri := "at://" + did + "/" + collection + "/" + recordKey
-
record, exists := m.records[uri]
-
if !exists {
-
return nil, nil
-
}
-
return record, nil
-
}
-
-
func (m *MockRepositoryRepository) UpdateRecord(record *repository.Record) error {
-
key := record.URI
-
if _, exists := m.records[key]; !exists {
-
return nil
-
}
-
m.records[key] = record
-
return nil
-
}
-
-
func (m *MockRepositoryRepository) DeleteRecord(did string, collection string, recordKey string) error {
-
uri := "at://" + did + "/" + collection + "/" + recordKey
-
delete(m.records, uri)
-
return nil
-
}
-
-
func (m *MockRepositoryRepository) ListRecords(did string, collection string, limit int, offset int) ([]*repository.Record, error) {
-
var records []*repository.Record
-
prefix := "at://" + did + "/" + collection + "/"
-
-
for uri, record := range m.records {
-
if len(uri) > len(prefix) && uri[:len(prefix)] == prefix {
-
records = append(records, record)
-
}
-
}
-
-
// Simple pagination
-
start := offset
-
if start >= len(records) {
-
return []*repository.Record{}, nil
-
}
-
-
end := start + limit
-
if end > len(records) {
-
end = len(records)
-
}
-
-
return records[start:end], nil
-
}
···
-12
internal/db/local_dev_db_compose/docker-compose.yml
···
-
# docker-compose.yml
-
services:
-
postgres:
-
image: postgres:15
-
network_mode: host # Add this line
-
environment:
-
POSTGRES_DB: coves_dev
-
POSTGRES_USER: dev_user
-
POSTGRES_PASSWORD: dev_password
-
PGPORT: 5433
-
volumes:
-
- ~/Code/Coves/local_dev_data:/var/lib/postgresql/data
···
-88
internal/db/migrations/002_create_repository_tables.sql
···
-
-- +goose Up
-
-- +goose StatementBegin
-
-
-- Repositories table stores metadata about each user's repository
-
CREATE TABLE repositories (
-
did VARCHAR(256) PRIMARY KEY,
-
head_cid VARCHAR(256) NOT NULL,
-
revision VARCHAR(64) NOT NULL,
-
record_count INTEGER NOT NULL DEFAULT 0,
-
storage_size BIGINT NOT NULL DEFAULT 0,
-
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
-
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
-
);
-
-
CREATE INDEX idx_repositories_updated_at ON repositories(updated_at);
-
-
-- Commits table stores the commit history
-
CREATE TABLE commits (
-
cid VARCHAR(256) PRIMARY KEY,
-
did VARCHAR(256) NOT NULL,
-
version INTEGER NOT NULL,
-
prev_cid VARCHAR(256),
-
data_cid VARCHAR(256) NOT NULL,
-
revision VARCHAR(64) NOT NULL,
-
signature BYTEA NOT NULL,
-
signing_key_id VARCHAR(256) NOT NULL,
-
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
-
FOREIGN KEY (did) REFERENCES repositories(did) ON DELETE CASCADE
-
);
-
-
CREATE INDEX idx_commits_did ON commits(did);
-
CREATE INDEX idx_commits_created_at ON commits(created_at);
-
-
-- Records table stores record metadata (actual data is in MST)
-
CREATE TABLE records (
-
id SERIAL PRIMARY KEY,
-
did VARCHAR(256) NOT NULL,
-
uri VARCHAR(512) NOT NULL,
-
cid VARCHAR(256) NOT NULL,
-
collection VARCHAR(256) NOT NULL,
-
record_key VARCHAR(256) NOT NULL,
-
value BYTEA NOT NULL, -- CBOR-encoded record data
-
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
-
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
-
UNIQUE(did, collection, record_key),
-
FOREIGN KEY (did) REFERENCES repositories(did) ON DELETE CASCADE
-
);
-
-
CREATE INDEX idx_records_did_collection ON records(did, collection);
-
CREATE INDEX idx_records_uri ON records(uri);
-
CREATE INDEX idx_records_updated_at ON records(updated_at);
-
-
-- Blobs table stores binary large objects
-
CREATE TABLE blobs (
-
cid VARCHAR(256) PRIMARY KEY,
-
mime_type VARCHAR(256) NOT NULL,
-
size BIGINT NOT NULL,
-
ref_count INTEGER NOT NULL DEFAULT 0,
-
data BYTEA NOT NULL,
-
created_at TIMESTAMP NOT NULL DEFAULT NOW()
-
);
-
-
CREATE INDEX idx_blobs_ref_count ON blobs(ref_count);
-
CREATE INDEX idx_blobs_created_at ON blobs(created_at);
-
-
-- Blob references table tracks which records reference which blobs
-
CREATE TABLE blob_refs (
-
id SERIAL PRIMARY KEY,
-
record_id INTEGER NOT NULL,
-
blob_cid VARCHAR(256) NOT NULL,
-
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
-
FOREIGN KEY (record_id) REFERENCES records(id) ON DELETE CASCADE,
-
FOREIGN KEY (blob_cid) REFERENCES blobs(cid) ON DELETE RESTRICT,
-
UNIQUE(record_id, blob_cid)
-
);
-
-
CREATE INDEX idx_blob_refs_blob_cid ON blob_refs(blob_cid);
-
-
-- +goose StatementEnd
-
-
-- +goose Down
-
-- +goose StatementBegin
-
DROP TABLE IF EXISTS blob_refs;
-
DROP TABLE IF EXISTS blobs;
-
DROP TABLE IF EXISTS records;
-
DROP TABLE IF EXISTS commits;
-
DROP TABLE IF EXISTS repositories;
-
-- +goose StatementEnd
···
-65
internal/db/migrations/003_update_for_carstore.sql
···
-
-- +goose Up
-
-- +goose StatementBegin
-
-
-- WARNING: This migration removes blob storage tables.
-
-- Ensure all blob data has been migrated to carstore before running this migration.
-
-- This migration is NOT reversible if blob data exists!
-
-
-- Remove the value column from records table since blocks are now stored in filesystem
-
ALTER TABLE records DROP COLUMN IF EXISTS value;
-
-
-- Drop blob-related tables since FileCarStore handles block storage
-
-- WARNING: This will permanently delete all blob data!
-
DROP TABLE IF EXISTS blob_refs;
-
DROP TABLE IF EXISTS blobs;
-
-
-- Create block_refs table for garbage collection tracking
-
CREATE TABLE block_refs (
-
cid VARCHAR(256) NOT NULL,
-
did VARCHAR(256) NOT NULL,
-
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
-
PRIMARY KEY (cid, did),
-
FOREIGN KEY (did) REFERENCES repositories(did) ON DELETE CASCADE
-
);
-
-
CREATE INDEX idx_block_refs_did ON block_refs(did);
-
CREATE INDEX idx_block_refs_created_at ON block_refs(created_at);
-
-
-- +goose StatementEnd
-
-
-- +goose Down
-
-- +goose StatementBegin
-
-
-- Recreate the original schema for rollback
-
DROP TABLE IF EXISTS block_refs;
-
-
-- Add back the value column to records table
-
ALTER TABLE records ADD COLUMN value BYTEA;
-
-
-- Recreate blobs table
-
CREATE TABLE blobs (
-
cid VARCHAR(256) PRIMARY KEY,
-
mime_type VARCHAR(256) NOT NULL,
-
size BIGINT NOT NULL,
-
ref_count INTEGER NOT NULL DEFAULT 0,
-
data BYTEA NOT NULL,
-
created_at TIMESTAMP NOT NULL DEFAULT NOW()
-
);
-
-
CREATE INDEX idx_blobs_ref_count ON blobs(ref_count);
-
CREATE INDEX idx_blobs_created_at ON blobs(created_at);
-
-
-- Recreate blob_refs table
-
CREATE TABLE blob_refs (
-
id SERIAL PRIMARY KEY,
-
record_id INTEGER NOT NULL,
-
blob_cid VARCHAR(256) NOT NULL,
-
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
-
FOREIGN KEY (record_id) REFERENCES records(id) ON DELETE CASCADE,
-
FOREIGN KEY (blob_cid) REFERENCES blobs(cid) ON DELETE RESTRICT,
-
UNIQUE(record_id, blob_cid)
-
);
-
-
CREATE INDEX idx_blob_refs_blob_cid ON blob_refs(blob_cid);
-
-
-- +goose StatementEnd
···
-20
internal/db/migrations/004_remove_block_refs_for_indigo.sql
···
-
-- +goose Up
-
-- +goose StatementBegin
-
-- Drop our block_refs table since Indigo's carstore will create its own
-
DROP TABLE IF EXISTS block_refs;
-
-- +goose StatementEnd
-
-
-- +goose Down
-
-- +goose StatementBegin
-
-- Recreate block_refs table
-
CREATE TABLE block_refs (
-
cid VARCHAR(256) NOT NULL,
-
did VARCHAR(256) NOT NULL,
-
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
-
PRIMARY KEY (cid, did),
-
FOREIGN KEY (did) REFERENCES repositories(did) ON DELETE CASCADE
-
);
-
-
CREATE INDEX idx_block_refs_did ON block_refs(did);
-
CREATE INDEX idx_block_refs_created_at ON block_refs(created_at);
-
-- +goose StatementEnd
···
-465
internal/db/postgres/repository_repo.go
···
-
package postgres
-
-
import (
-
"database/sql"
-
"fmt"
-
"time"
-
-
"Coves/internal/core/repository"
-
"github.com/ipfs/go-cid"
-
"github.com/lib/pq"
-
)
-
-
// RepositoryRepo implements repository.RepositoryRepository using PostgreSQL
-
type RepositoryRepo struct {
-
db *sql.DB
-
}
-
-
// NewRepositoryRepo creates a new PostgreSQL repository implementation
-
func NewRepositoryRepo(db *sql.DB) *RepositoryRepo {
-
return &RepositoryRepo{db: db}
-
}
-
-
// Repository operations
-
-
func (r *RepositoryRepo) Create(repo *repository.Repository) error {
-
query := `
-
INSERT INTO repositories (did, head_cid, revision, record_count, storage_size, created_at, updated_at)
-
VALUES ($1, $2, $3, $4, $5, $6, $7)`
-
-
_, err := r.db.Exec(query,
-
repo.DID,
-
repo.HeadCID.String(),
-
repo.Revision,
-
repo.RecordCount,
-
repo.StorageSize,
-
repo.CreatedAt,
-
repo.UpdatedAt,
-
)
-
if err != nil {
-
return fmt.Errorf("failed to create repository: %w", err)
-
}
-
-
return nil
-
}
-
-
func (r *RepositoryRepo) GetByDID(did string) (*repository.Repository, error) {
-
query := `
-
SELECT did, head_cid, revision, record_count, storage_size, created_at, updated_at
-
FROM repositories
-
WHERE did = $1`
-
-
var repo repository.Repository
-
var headCIDStr string
-
-
err := r.db.QueryRow(query, did).Scan(
-
&repo.DID,
-
&headCIDStr,
-
&repo.Revision,
-
&repo.RecordCount,
-
&repo.StorageSize,
-
&repo.CreatedAt,
-
&repo.UpdatedAt,
-
)
-
if err == sql.ErrNoRows {
-
return nil, nil
-
}
-
if err != nil {
-
return nil, fmt.Errorf("failed to get repository: %w", err)
-
}
-
-
repo.HeadCID, err = cid.Parse(headCIDStr)
-
if err != nil {
-
return nil, fmt.Errorf("failed to parse head CID: %w", err)
-
}
-
-
return &repo, nil
-
}
-
-
func (r *RepositoryRepo) Update(repo *repository.Repository) error {
-
query := `
-
UPDATE repositories
-
SET head_cid = $2, revision = $3, record_count = $4, storage_size = $5, updated_at = $6
-
WHERE did = $1`
-
-
result, err := r.db.Exec(query,
-
repo.DID,
-
repo.HeadCID.String(),
-
repo.Revision,
-
repo.RecordCount,
-
repo.StorageSize,
-
time.Now(),
-
)
-
if err != nil {
-
return fmt.Errorf("failed to update repository: %w", err)
-
}
-
-
rowsAffected, err := result.RowsAffected()
-
if err != nil {
-
return fmt.Errorf("failed to get rows affected: %w", err)
-
}
-
if rowsAffected == 0 {
-
return fmt.Errorf("repository not found: %s", repo.DID)
-
}
-
-
return nil
-
}
-
-
func (r *RepositoryRepo) Delete(did string) error {
-
query := `DELETE FROM repositories WHERE did = $1`
-
-
result, err := r.db.Exec(query, did)
-
if err != nil {
-
return fmt.Errorf("failed to delete repository: %w", err)
-
}
-
-
rowsAffected, err := result.RowsAffected()
-
if err != nil {
-
return fmt.Errorf("failed to get rows affected: %w", err)
-
}
-
if rowsAffected == 0 {
-
return fmt.Errorf("repository not found: %s", did)
-
}
-
-
return nil
-
}
-
-
// Commit operations
-
-
func (r *RepositoryRepo) CreateCommit(commit *repository.Commit) error {
-
query := `
-
INSERT INTO commits (cid, did, version, prev_cid, data_cid, revision, signature, signing_key_id, created_at)
-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`
-
-
var prevCID *string
-
if commit.PrevCID != nil {
-
s := commit.PrevCID.String()
-
prevCID = &s
-
}
-
-
_, err := r.db.Exec(query,
-
commit.CID.String(),
-
commit.DID,
-
commit.Version,
-
prevCID,
-
commit.DataCID.String(),
-
commit.Revision,
-
commit.Signature,
-
commit.SigningKeyID,
-
commit.CreatedAt,
-
)
-
if err != nil {
-
return fmt.Errorf("failed to create commit: %w", err)
-
}
-
-
return nil
-
}
-
-
func (r *RepositoryRepo) GetCommit(did string, commitCID cid.Cid) (*repository.Commit, error) {
-
query := `
-
SELECT cid, did, version, prev_cid, data_cid, revision, signature, signing_key_id, created_at
-
FROM commits
-
WHERE did = $1 AND cid = $2`
-
-
var commit repository.Commit
-
var cidStr, dataCIDStr string
-
var prevCIDStr sql.NullString
-
-
err := r.db.QueryRow(query, did, commitCID.String()).Scan(
-
&cidStr,
-
&commit.DID,
-
&commit.Version,
-
&prevCIDStr,
-
&dataCIDStr,
-
&commit.Revision,
-
&commit.Signature,
-
&commit.SigningKeyID,
-
&commit.CreatedAt,
-
)
-
if err == sql.ErrNoRows {
-
return nil, nil
-
}
-
if err != nil {
-
return nil, fmt.Errorf("failed to get commit: %w", err)
-
}
-
-
commit.CID, err = cid.Parse(cidStr)
-
if err != nil {
-
return nil, fmt.Errorf("failed to parse commit CID: %w", err)
-
}
-
-
commit.DataCID, err = cid.Parse(dataCIDStr)
-
if err != nil {
-
return nil, fmt.Errorf("failed to parse data CID: %w", err)
-
}
-
-
if prevCIDStr.Valid {
-
prevCID, err := cid.Parse(prevCIDStr.String)
-
if err != nil {
-
return nil, fmt.Errorf("failed to parse prev CID: %w", err)
-
}
-
commit.PrevCID = &prevCID
-
}
-
-
return &commit, nil
-
}
-
-
func (r *RepositoryRepo) GetLatestCommit(did string) (*repository.Commit, error) {
-
query := `
-
SELECT cid, did, version, prev_cid, data_cid, revision, signature, signing_key_id, created_at
-
FROM commits
-
WHERE did = $1
-
ORDER BY created_at DESC
-
LIMIT 1`
-
-
var commit repository.Commit
-
var cidStr, dataCIDStr string
-
var prevCIDStr sql.NullString
-
-
err := r.db.QueryRow(query, did).Scan(
-
&cidStr,
-
&commit.DID,
-
&commit.Version,
-
&prevCIDStr,
-
&dataCIDStr,
-
&commit.Revision,
-
&commit.Signature,
-
&commit.SigningKeyID,
-
&commit.CreatedAt,
-
)
-
if err == sql.ErrNoRows {
-
return nil, nil
-
}
-
if err != nil {
-
return nil, fmt.Errorf("failed to get latest commit: %w", err)
-
}
-
-
commit.CID, err = cid.Parse(cidStr)
-
if err != nil {
-
return nil, fmt.Errorf("failed to parse commit CID: %w", err)
-
}
-
-
commit.DataCID, err = cid.Parse(dataCIDStr)
-
if err != nil {
-
return nil, fmt.Errorf("failed to parse data CID: %w", err)
-
}
-
-
if prevCIDStr.Valid {
-
prevCID, err := cid.Parse(prevCIDStr.String)
-
if err != nil {
-
return nil, fmt.Errorf("failed to parse prev CID: %w", err)
-
}
-
commit.PrevCID = &prevCID
-
}
-
-
return &commit, nil
-
}
-
-
func (r *RepositoryRepo) ListCommits(did string, limit int, offset int) ([]*repository.Commit, error) {
-
query := `
-
SELECT cid, did, version, prev_cid, data_cid, revision, signature, signing_key_id, created_at
-
FROM commits
-
WHERE did = $1
-
ORDER BY created_at DESC
-
LIMIT $2 OFFSET $3`
-
-
rows, err := r.db.Query(query, did, limit, offset)
-
if err != nil {
-
return nil, fmt.Errorf("failed to list commits: %w", err)
-
}
-
defer rows.Close()
-
-
var commits []*repository.Commit
-
for rows.Next() {
-
var commit repository.Commit
-
var cidStr, dataCIDStr string
-
var prevCIDStr sql.NullString
-
-
err := rows.Scan(
-
&cidStr,
-
&commit.DID,
-
&commit.Version,
-
&prevCIDStr,
-
&dataCIDStr,
-
&commit.Revision,
-
&commit.Signature,
-
&commit.SigningKeyID,
-
&commit.CreatedAt,
-
)
-
if err != nil {
-
return nil, fmt.Errorf("failed to scan commit: %w", err)
-
}
-
-
commit.CID, err = cid.Parse(cidStr)
-
if err != nil {
-
return nil, fmt.Errorf("failed to parse commit CID: %w", err)
-
}
-
-
commit.DataCID, err = cid.Parse(dataCIDStr)
-
if err != nil {
-
return nil, fmt.Errorf("failed to parse data CID: %w", err)
-
}
-
-
if prevCIDStr.Valid {
-
prevCID, err := cid.Parse(prevCIDStr.String)
-
if err != nil {
-
return nil, fmt.Errorf("failed to parse prev CID: %w", err)
-
}
-
commit.PrevCID = &prevCID
-
}
-
-
commits = append(commits, &commit)
-
}
-
-
return commits, nil
-
}
-
-
// Record operations
-
-
func (r *RepositoryRepo) CreateRecord(record *repository.Record) error {
-
query := `
-
INSERT INTO records (did, uri, cid, collection, record_key, created_at, updated_at)
-
VALUES ($1, $2, $3, $4, $5, $6, $7)`
-
-
_, err := r.db.Exec(query,
-
record.URI[:len("at://")+len(record.URI[len("at://"):])-len(record.Collection)-len(record.RecordKey)-2], // Extract DID from URI
-
record.URI,
-
record.CID.String(),
-
record.Collection,
-
record.RecordKey,
-
record.CreatedAt,
-
record.UpdatedAt,
-
)
-
if err != nil {
-
if pqErr, ok := err.(*pq.Error); ok && pqErr.Code == "23505" { // unique_violation
-
return fmt.Errorf("record already exists: %s", record.URI)
-
}
-
return fmt.Errorf("failed to create record: %w", err)
-
}
-
-
return nil
-
}
-
-
func (r *RepositoryRepo) GetRecord(did string, collection string, recordKey string) (*repository.Record, error) {
-
query := `
-
SELECT uri, cid, collection, record_key, created_at, updated_at
-
FROM records
-
WHERE did = $1 AND collection = $2 AND record_key = $3`
-
-
var record repository.Record
-
var cidStr string
-
-
err := r.db.QueryRow(query, did, collection, recordKey).Scan(
-
&record.URI,
-
&cidStr,
-
&record.Collection,
-
&record.RecordKey,
-
&record.CreatedAt,
-
&record.UpdatedAt,
-
)
-
if err == sql.ErrNoRows {
-
return nil, nil
-
}
-
if err != nil {
-
return nil, fmt.Errorf("failed to get record: %w", err)
-
}
-
-
record.CID, err = cid.Parse(cidStr)
-
if err != nil {
-
return nil, fmt.Errorf("failed to parse record CID: %w", err)
-
}
-
-
return &record, nil
-
}
-
-
func (r *RepositoryRepo) UpdateRecord(record *repository.Record) error {
-
did := record.URI[:len("at://")+len(record.URI[len("at://"):])-len(record.Collection)-len(record.RecordKey)-2]
-
-
query := `
-
UPDATE records
-
SET cid = $4, updated_at = $5
-
WHERE did = $1 AND collection = $2 AND record_key = $3`
-
-
result, err := r.db.Exec(query,
-
did,
-
record.Collection,
-
record.RecordKey,
-
record.CID.String(),
-
time.Now(),
-
)
-
if err != nil {
-
return fmt.Errorf("failed to update record: %w", err)
-
}
-
-
rowsAffected, err := result.RowsAffected()
-
if err != nil {
-
return fmt.Errorf("failed to get rows affected: %w", err)
-
}
-
if rowsAffected == 0 {
-
return fmt.Errorf("record not found: %s", record.URI)
-
}
-
-
return nil
-
}
-
-
func (r *RepositoryRepo) DeleteRecord(did string, collection string, recordKey string) error {
-
query := `DELETE FROM records WHERE did = $1 AND collection = $2 AND record_key = $3`
-
-
result, err := r.db.Exec(query, did, collection, recordKey)
-
if err != nil {
-
return fmt.Errorf("failed to delete record: %w", err)
-
}
-
-
rowsAffected, err := result.RowsAffected()
-
if err != nil {
-
return fmt.Errorf("failed to get rows affected: %w", err)
-
}
-
if rowsAffected == 0 {
-
return fmt.Errorf("record not found")
-
}
-
-
return nil
-
}
-
-
func (r *RepositoryRepo) ListRecords(did string, collection string, limit int, offset int) ([]*repository.Record, error) {
-
query := `
-
SELECT uri, cid, collection, record_key, created_at, updated_at
-
FROM records
-
WHERE did = $1 AND collection = $2
-
ORDER BY created_at DESC
-
LIMIT $3 OFFSET $4`
-
-
rows, err := r.db.Query(query, did, collection, limit, offset)
-
if err != nil {
-
return nil, fmt.Errorf("failed to list records: %w", err)
-
}
-
defer rows.Close()
-
-
var records []*repository.Record
-
for rows.Next() {
-
var record repository.Record
-
var cidStr string
-
-
err := rows.Scan(
-
&record.URI,
-
&cidStr,
-
&record.Collection,
-
&record.RecordKey,
-
&record.CreatedAt,
-
&record.UpdatedAt,
-
)
-
if err != nil {
-
return nil, fmt.Errorf("failed to scan record: %w", err)
-
}
-
-
record.CID, err = cid.Parse(cidStr)
-
if err != nil {
-
return nil, fmt.Errorf("failed to parse record CID: %w", err)
-
}
-
-
records = append(records, &record)
-
}
-
-
return records, nil
-
}
-
···
-84
internal/db/test_db_compose/README.md
···
-
# Test Database Setup
-
-
This directory contains the Docker Compose configuration for the Coves test database.
-
-
## Overview
-
-
The test database is a PostgreSQL instance specifically for running automated tests. It's completely isolated from development and production databases.
-
-
### Configuration
-
-
- **Port**: 5434 (different from dev: 5433, prod: 5432)
-
- **Database**: coves_test
-
- **User**: test_user
-
- **Password**: test_password
-
- **Data Volume**: ~/Code/Coves/test_db_data
-
-
## Usage
-
-
### Starting the Test Database
-
-
```bash
-
cd internal/db/test_db_compose
-
./start-test-db.sh
-
```
-
-
This will:
-
1. Start the PostgreSQL container
-
2. Wait for it to be ready
-
3. Display the connection string
-
-
### Running Tests
-
-
Once the database is running, you can run tests with:
-
-
```bash
-
TEST_DATABASE_URL=postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable go test -v ./...
-
```
-
-
Or set the environment variable:
-
-
```bash
-
export TEST_DATABASE_URL=postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable
-
go test -v ./...
-
```
-
-
### Stopping the Test Database
-
-
```bash
-
./stop-test-db.sh
-
```
-
-
### Resetting Test Data
-
-
To completely reset the test database (removes all data):
-
-
```bash
-
./reset-test-db.sh
-
```
-
-
## Test Isolation
-
-
The test database is isolated from other environments:
-
-
| Environment | Port | Database Name | User |
-
|------------|------|--------------|------|
-
| Test | 5434 | coves_test | test_user |
-
| Development | 5433 | coves_dev | dev_user |
-
| Production | 5432 | coves | (varies) |
-
-
## What Gets Tested
-
-
When tests run against this database, they will:
-
-
1. Run all migrations from `internal/db/migrations/`
-
2. Create Indigo carstore tables (via GORM auto-migration)
-
3. Test the full integration including:
-
- Repository CRUD operations
-
- CAR file metadata storage
-
- User DID to UID mapping
-
- Carstore operations
-
-
## CI/CD Integration
-
-
For CI/CD pipelines, you can use the same Docker Compose setup or connect to a dedicated test database instance.
···
-19
internal/db/test_db_compose/docker-compose.yml
···
-
# Test Database Docker Compose Configuration
-
# This database is specifically for running tests and is isolated from dev/prod
-
services:
-
postgres_test:
-
image: postgres:15
-
container_name: coves_test_db
-
network_mode: host
-
environment:
-
POSTGRES_DB: coves_test
-
POSTGRES_USER: test_user
-
POSTGRES_PASSWORD: test_password
-
PGPORT: 5434 # Different port from dev (5433) and prod (5432)
-
volumes:
-
- ~/Code/Coves/test_db_data:/var/lib/postgresql/data
-
healthcheck:
-
test: ["CMD-SHELL", "pg_isready -U test_user -d coves_test -p 5434"]
-
interval: 5s
-
timeout: 5s
-
retries: 5
···
-15
internal/db/test_db_compose/reset-test-db.sh
···
-
#!/bin/bash
-
# Reset the test database by removing all data
-
-
echo "WARNING: This will delete all test database data!"
-
echo "Press Ctrl+C to cancel, or Enter to continue..."
-
read
-
-
echo "Stopping test database..."
-
docker-compose -f docker-compose.yml down
-
-
echo "Removing test data volume..."
-
rm -rf ~/Code/Coves/test_db_data
-
-
echo "Starting fresh test database..."
-
./start-test-db.sh
···
-25
internal/db/test_db_compose/start-test-db.sh
···
-
#!/bin/bash
-
# Start the test database
-
-
echo "Starting Coves test database on port 5434..."
-
docker-compose -f docker-compose.yml up -d
-
-
# Wait for database to be ready
-
echo "Waiting for database to be ready..."
-
for i in {1..30}; do
-
if docker-compose -f docker-compose.yml exec -T postgres_test pg_isready -U test_user -d coves_test -p 5434 &>/dev/null; then
-
echo "Test database is ready!"
-
echo ""
-
echo "Connection string:"
-
echo "TEST_DATABASE_URL=postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
-
echo ""
-
echo "To run tests:"
-
echo "TEST_DATABASE_URL=postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable go test -v ./..."
-
exit 0
-
fi
-
echo -n "."
-
sleep 1
-
done
-
-
echo "Failed to start test database"
-
exit 1
···
-7
internal/db/test_db_compose/stop-test-db.sh
···
-
#!/bin/bash
-
# Stop the test database
-
-
echo "Stopping Coves test database..."
-
docker-compose -f docker-compose.yml down
-
-
echo "Test database stopped."
···
-103
tests/integration/repository_test.go
···
-
package integration_test
-
-
import (
-
"os"
-
"testing"
-
-
"Coves/internal/atproto/carstore"
-
"Coves/internal/core/repository"
-
"Coves/internal/db/postgres"
-
"database/sql"
-
_ "github.com/lib/pq"
-
"github.com/pressly/goose/v3"
-
postgresDriver "gorm.io/driver/postgres"
-
"gorm.io/gorm"
-
)
-
-
func TestRepositoryIntegration(t *testing.T) {
-
// Skip if not running integration tests
-
if testing.Short() {
-
t.Skip("Skipping integration test")
-
}
-
-
// Use test database URL from environment or default
-
dbURL := os.Getenv("TEST_DATABASE_URL")
-
if dbURL == "" {
-
dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
-
}
-
-
// Connect to test database with sql.DB for migrations
-
sqlDB, err := sql.Open("postgres", dbURL)
-
if err != nil {
-
t.Fatalf("Failed to connect to test database: %v", err)
-
}
-
defer sqlDB.Close()
-
-
// Run migrations
-
if err := goose.Up(sqlDB, "../../internal/db/migrations"); err != nil {
-
t.Fatalf("Failed to run migrations: %v", err)
-
}
-
-
// Connect with GORM for carstore
-
gormDB, err := gorm.Open(postgresDriver.Open(dbURL), &gorm.Config{
-
DisableForeignKeyConstraintWhenMigrating: true,
-
PrepareStmt: false,
-
})
-
if err != nil {
-
t.Fatalf("Failed to create GORM connection: %v", err)
-
}
-
-
// Create temporary directory for carstore
-
tempDir, err := os.MkdirTemp("", "carstore_integration_test")
-
if err != nil {
-
t.Fatalf("Failed to create temp dir: %v", err)
-
}
-
defer os.RemoveAll(tempDir)
-
-
// Initialize carstore
-
carDirs := []string{tempDir}
-
repoStore, err := carstore.NewRepoStore(gormDB, carDirs)
-
if err != nil {
-
t.Fatalf("Failed to create repo store: %v", err)
-
}
-
-
// Create repository repo
-
repoRepo := postgres.NewRepositoryRepo(sqlDB)
-
-
// Create service with both repo and repoStore
-
service := repository.NewService(repoRepo, repoStore)
-
-
// Test creating a repository
-
did := "did:plc:testuser123"
-
service.SetSigningKey(did, "mock-signing-key")
-
-
repo, err := service.CreateRepository(did)
-
if err != nil {
-
t.Fatalf("Failed to create repository: %v", err)
-
}
-
-
if repo.DID != did {
-
t.Errorf("Expected DID %s, got %s", did, repo.DID)
-
}
-
-
// Test getting the repository
-
fetchedRepo, err := service.GetRepository(did)
-
if err != nil {
-
t.Fatalf("Failed to get repository: %v", err)
-
}
-
-
if fetchedRepo.DID != did {
-
t.Errorf("Expected DID %s, got %s", did, fetchedRepo.DID)
-
}
-
-
// Clean up
-
err = service.DeleteRepository(did)
-
if err != nil {
-
t.Fatalf("Failed to delete repository: %v", err)
-
}
-
-
// Clean up test data
-
gormDB.Exec("DELETE FROM repositories")
-
gormDB.Exec("DELETE FROM user_maps")
-
gormDB.Exec("DELETE FROM car_shards")
-
}
···