A community based topic aggregation platform built on atproto

feat: Implement identity resolution with cache and handle update system

## Core Features

### Identity Resolution System (internal/atproto/identity/)
- Implement DNS/HTTPS handle resolution using Bluesky Indigo library
- Add DID document resolution via PLC directory
- Create PostgreSQL-backed caching layer (24h TTL)
- Support bidirectional caching (handle ↔ DID)
- Add atomic cache purge with single-query CTE optimization

### Database Schema
- Add identity_cache table with timezone-aware timestamps
- Support handle normalization via database trigger
- Enable automatic expiry checking

### Handle Update System
- Add UpdateHandle method to UserService and UserRepository
- Implement handle change detection in Jetstream consumer
- Update database BEFORE purging cache (prevents race condition)
- Purge BOTH old handle and DID entries on handle changes
- Add structured logging for cache operations

### Bug Fixes
- Fix timezone bugs throughout codebase (use UTC consistently)
- Fix rate limiter timestamp handling
- Resolve pre-existing test isolation bug in identity cache tests
- Fix Makefile test command to exclude restricted directories

### Testing
- Add comprehensive identity resolution test suite (450+ lines)
- Add handle change integration test with cache verification
- All 46+ integration test subtests passing
- Test both local and real atProto handle resolution

### Configuration
- Add IDENTITY_PLC_URL and IDENTITY_CACHE_TTL env vars
- Add golangci-lint configuration
- Update Makefile to avoid permission denied errors

## Architecture Decisions

- Use decorator pattern for caching resolver
- Maintain layer separation (no SQL in handlers)
- Reject database triggers for cache invalidation (keeps logic in app layer)
- Follow atProto best practices from QuickDID

## Files Changed
- 7 new files (identity system + migration + tests)
- 12 modified files (integration + bug fixes)
- ~800 lines of production code
- ~450 lines of tests

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+9
.env.dev
···
# JETSTREAM_PDS_FILTER=http://localhost:3001
# =============================================================================
+
# Identity Resolution Configuration
+
# =============================================================================
+
# PLC Directory URL for DID resolution
+
IDENTITY_PLC_URL=https://plc.directory
+
+
# Cache TTL for resolved identities (Go duration format: 24h, 1h30m, etc.)
+
IDENTITY_CACHE_TTL=24h
+
+
# =============================================================================
# Development Settings
# =============================================================================
# Environment
+29
.golangci.yml
···
+
linters:
+
enable:
+
- gofmt # Enforce standard Go formatting
+
- govet # Examine Go source code and report suspicious constructs
+
- errcheck # Check for unchecked errors
+
- staticcheck # Advanced static analysis
+
- unused # Check for unused code
+
- gosimple # Suggest code simplifications
+
- ineffassign # Detect ineffectual assignments
+
- typecheck # Standard Go type checker
+
+
linters-settings:
+
errcheck:
+
check-blank: true # Check for blank error assignments (x, _ = f())
+
+
govet:
+
enable-all: true
+
+
staticcheck:
+
checks: ["all"]
+
+
run:
+
timeout: 5m
+
tests: true
+
+
issues:
+
exclude-use-default: false
+
max-issues-per-linter: 0
+
max-same-issues: 0
+65 -92
CLAUDE.md
···
+
# [CLAUDE-BUILD.md](http://claude-build.md/)
-
Project: Coves PR Reviewer
-
You are a distinguished senior architect conducting a thorough code review for Coves, a forum-like atProto social media platform.
+
Project: Coves Builder You are a distinguished developer actively building Coves, a forum-like atProto social media platform. Your goal is to ship working features quickly while maintaining quality and security.
-
## Review Mindset
-
- Be constructive but thorough - catch issues before they reach production
-
- Question assumptions and look for edge cases
-
- Prioritize security, performance, and maintainability concerns
-
- Suggest alternatives when identifying problems
+
## Builder Mindset
+
- Ship working code today, refactor tomorrow
+
- Security is built-in, not bolted-on
+
- Test-driven: write the test, then make it pass
+
- When stuck, check Context7 for patterns and examples
+
- ASK QUESTIONS if you need context surrounding the product DONT ASSUME
-
## Special Attention Areas for Coves
-
- **atProto Integration**: Verify proper use of indigo packages
-
- **atProto architecture**: Ensure architecture follows atProto recommendations
-
- **Federation**: Check for proper DID resolution and identity verification
-
- **PostgreSQL**: Verify migrations are reversible and indexes are appropriate
+
#### Human & LLM Readability Guidelines:
+
+
- Descriptive Naming: Use full words over abbreviations (e.g., CommunityGovernance not CommGov)
+
+
## atProto Essentials for Coves
+
+
### Architecture
+
+
- **PDS is Self-Contained**: Uses internal SQLite + CAR files (in Docker volume)
+
- **PostgreSQL for AppView Only**: One database for Coves AppView indexing
+
- **Don't Touch PDS Internals**: PDS manages its own storage, we just read from firehose
+
- **Data Flow**: Client → PDS → Firehose → AppView → PostgreSQL
+
+
### Always Consider:
+
+
- [ ]  **Identity**: Every action needs DID verification
+
- [ ]  **Record Types**: Define custom lexicons (e.g., `social.coves.post`, `social.coves.community`)
+
- [ ]  **Is it federated-friendly?** (Can other PDSs interact with it?)
+
- [ ]  **Does the Lexicon make sense?** (Would it work for other forums?)
+
- [ ]  **AppView only indexes**: We don't write to CAR files, only read from firehose
-
## Review Checklist
+
## Security-First Building
-
### 1. Architecture Compliance
-
**MUST VERIFY:**
-
- [ ] NO SQL queries in handlers (automatic rejection if found)
-
- [ ] Proper layer separation: Handler → Service → Repository → Database
-
- [ ] Services use repository interfaces, not concrete implementations
-
- [ ] Dependencies injected via constructors, not globals
-
- [ ] No database packages imported in handlers
+
### Every Feature MUST:
-
### 2. Security Review
-
**CHECK FOR:**
-
- SQL injection vulnerabilities (even with prepared statements, verify)
-
- Proper input validation and sanitization
-
- Authentication/authorization checks on all protected endpoints
-
- No sensitive data in logs or error messages
-
- Rate limiting on public endpoints
-
- CSRF protection where applicable
-
- Proper atProto identity verification
+
- [ ]  **Validate all inputs** at the handler level
+
- [ ]  **Use parameterized queries** (never string concatenation)
+
- [ ]  **Check authorization** before any operation
+
- [ ]  **Limit resource access** (pagination, rate limits)
+
- [ ]  **Log security events** (failed auth, invalid inputs)
+
- [ ]  **Never log sensitive data** (passwords, tokens, PII)
-
### 3. Error Handling Audit
-
**VERIFY:**
-
- All errors are handled, not ignored
-
- Error wrapping provides context: `fmt.Errorf("service: %w", err)`
-
- Domain errors defined in core/errors/
-
- HTTP status codes correctly map to error types
-
- No internal error details exposed to API consumers
-
- Nil pointer checks before dereferencing
+
### Red Flags to Avoid:
-
### 4. Performance Considerations
-
**LOOK FOR:**
-
- N+1 query problems
-
- Missing database indexes for frequently queried fields
-
- Unnecessary database round trips
-
- Large unbounded queries without pagination
-
- Memory leaks in goroutines
-
- Proper connection pool usage
-
- Efficient atProto federation calls
+
- `fmt.Sprintf` in SQL queries → Use parameterized queries
+
- Missing `context.Context` → Need it for timeouts/cancellation
+
- No input validation → Add it immediately
+
- Error messages with internal details → Wrap errors properly
+
- Unbounded queries → Add limits/pagination
-
### 5. Testing Coverage
-
**REQUIRE:**
-
- Unit tests for all new service methods
-
- Integration tests for new API endpoints
-
- Edge case coverage (empty inputs, max values, special characters)
-
- Error path testing
-
- Mock verification in unit tests
-
- No flaky tests (check for time dependencies, random values)
+
### "How should I structure this?"
-
### 6. Code Quality
-
**ASSESS:**
-
- Naming follows conventions (full words, not abbreviations)
-
- Functions do one thing well
-
- No code duplication (DRY principle)
-
- Consistent error handling patterns
-
- Proper use of Go idioms
-
- No commented-out code
+
1. One domain, one package
+
2. Interfaces for testability
+
3. Services coordinate repos
+
4. Handlers only handle XRPC
-
### 7. Breaking Changes
-
**IDENTIFY:**
-
- API contract changes
-
- Database schema modifications affecting existing data
-
- Changes to core interfaces
-
- Modified error codes or response formats
+
## Pre-Production Advantages
-
### 8. Documentation
-
**ENSURE:**
-
- API endpoints have example requests/responses
-
- Complex business logic is explained
-
- Database migrations include rollback scripts
-
- README updated if setup process changes
-
- Swagger/OpenAPI specs updated if applicable
+
Since we're pre-production:
-
## Review Process
+
- **Break things**: Delete and rebuild rather than complex migrations
+
- **Experiment**: Try approaches, keep what works
+
- **Simplify**: Remove unused code aggressively
+
- **But never compromise security basics**
-
1. **First Pass - Automatic Rejections**
-
- SQL in handlers
-
- Missing tests
-
- Security vulnerabilities
-
- Broken layer separation
+
## Success Metrics
-
2. **Second Pass - Deep Dive**
-
- Business logic correctness
-
- Edge case handling
-
- Performance implications
-
- Code maintainability
+
Your code is ready when:
-
3. **Third Pass - Suggestions**
-
- Better patterns or approaches
-
- Refactoring opportunities
-
- Future considerations
+
- [ ]  Tests pass (including security tests)
+
- [ ]  Follows atProto patterns
+
- [ ]  Handles errors gracefully
+
- [ ]  Works end-to-end with auth
-
Then provide detailed feedback organized by: 1. 🚨 **Critical Issues** (must fix) 2. ⚠️ **Important Issues** (should fix) 3. 💡 **Suggestions** (consider for improvement) 4. ✅ **Good Practices Observed** (reinforce positive patterns)
+
## Quick Checks Before Committing
+
1. **Will it work?** (Integration test proves it)
+
2. **Is it secure?** (Auth, validation, parameterized queries)
+
3. **Is it simple?** (Could you explain to a junior?)
+
4. **Is it complete?** (Test, implementation, documentation)
-
Remember: The goal is to ship quality code quickly. Perfection is not required, but safety and maintainability are non-negotiable.
+
Remember: We're building a working product. Perfect is the enemy of shipped.
+13 -1
Makefile
···
@echo "$(GREEN)Running migrations on test database...$(RESET)"
@goose -dir internal/db/migrations postgres "postgresql://$(POSTGRES_TEST_USER):$(POSTGRES_TEST_PASSWORD)@localhost:$(POSTGRES_TEST_PORT)/$(POSTGRES_TEST_DB)?sslmode=disable" up || true
@echo "$(GREEN)Running fast tests (use 'make e2e-test' for E2E tests)...$(RESET)"
-
@go test ./... -short -v
+
@go test ./cmd/... ./internal/... ./tests/... -short -v
@echo "$(GREEN)✓ Tests complete$(RESET)"
e2e-test: ## Run automated E2E tests (requires: make dev-up + make run in another terminal)
···
test-db-stop: ## Stop test database
@docker-compose -f docker-compose.dev.yml --env-file .env.dev --profile test stop postgres-test
@echo "$(GREEN)✓ Test database stopped$(RESET)"
+
+
##@ Code Quality
+
+
lint: ## Run golangci-lint on the codebase
+
@echo "$(GREEN)Running linter...$(RESET)"
+
@golangci-lint run
+
@echo "$(GREEN)✓ Linting complete$(RESET)"
+
+
lint-fix: ## Run golangci-lint and auto-fix issues
+
@echo "$(GREEN)Running linter with auto-fix...$(RESET)"
+
@golangci-lint run --fix
+
@echo "$(GREEN)✓ Linting complete$(RESET)"
##@ Build & Run
+18 -2
cmd/server/main.go
···
"Coves/internal/api/middleware"
"Coves/internal/api/routes"
+
"Coves/internal/atproto/identity"
"Coves/internal/atproto/jetstream"
"Coves/internal/core/users"
postgresRepo "Coves/internal/db/postgres"
···
rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
r.Use(rateLimiter.Middleware)
+
// Initialize identity resolver
+
identityConfig := identity.DefaultConfig()
+
// Override from environment if set
+
if plcURL := os.Getenv("IDENTITY_PLC_URL"); plcURL != "" {
+
identityConfig.PLCURL = plcURL
+
}
+
if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
+
if duration, err := time.ParseDuration(cacheTTL); err == nil {
+
identityConfig.CacheTTL = duration
+
}
+
}
+
+
identityResolver := identity.NewResolver(db, identityConfig)
+
log.Println("Identity resolver initialized with PLC:", identityConfig.PLCURL)
+
// Initialize repositories and services
userRepo := postgresRepo.NewUserRepository(db)
-
userService := users.NewUserService(userRepo, defaultPDS)
+
userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
// Start Jetstream consumer for read-forward user indexing
jetstreamURL := os.Getenv("JETSTREAM_URL")
···
pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
-
userConsumer := jetstream.NewUserEventConsumer(userService, jetstreamURL, pdsFilter)
+
userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
ctx := context.Background()
go func() {
if err := userConsumer.Start(ctx); err != nil {
+2 -2
internal/api/middleware/ratelimit.go
···
rl.mu.Lock()
defer rl.mu.Unlock()
-
now := time.Now()
+
now := time.Now().UTC()
// Get or create client limit
client, exists := rl.clients[clientID]
···
for range ticker.C {
rl.mu.Lock()
-
now := time.Now()
+
now := time.Now().UTC()
for clientID, client := range rl.clients {
if now.After(client.resetTime) {
delete(rl.clients, clientID)
+137
internal/atproto/identity/base_resolver.go
···
+
package identity
+
+
import (
+
"context"
+
"fmt"
+
"net/http"
+
"strings"
+
"time"
+
+
indigoIdentity "github.com/bluesky-social/indigo/atproto/identity"
+
"github.com/bluesky-social/indigo/atproto/syntax"
+
)
+
+
// baseResolver implements Resolver using Indigo's identity resolution
+
type baseResolver struct {
+
directory indigoIdentity.Directory
+
}
+
+
// newBaseResolver creates a new base resolver using Indigo
+
func newBaseResolver(plcURL string, httpClient *http.Client) Resolver {
+
// Create Indigo's BaseDirectory which handles DNS and HTTPS resolution
+
dir := &indigoIdentity.BaseDirectory{
+
PLCURL: plcURL,
+
HTTPClient: *httpClient,
+
// Indigo will use default DNS resolver if not specified
+
}
+
+
return &baseResolver{
+
directory: dir,
+
}
+
}
+
+
// Resolve resolves a handle or DID to complete identity information
+
func (r *baseResolver) Resolve(ctx context.Context, identifier string) (*Identity, error) {
+
identifier = strings.TrimSpace(identifier)
+
+
if identifier == "" {
+
return nil, &ErrInvalidIdentifier{
+
Identifier: identifier,
+
Reason: "identifier cannot be empty",
+
}
+
}
+
+
// Parse the identifier (could be handle or DID)
+
atID, err := syntax.ParseAtIdentifier(identifier)
+
if err != nil {
+
return nil, &ErrInvalidIdentifier{
+
Identifier: identifier,
+
Reason: fmt.Sprintf("invalid identifier format: %v", err),
+
}
+
}
+
+
// Resolve using Indigo's directory
+
ident, err := r.directory.Lookup(ctx, *atID)
+
+
if err != nil {
+
// Check if it's a "not found" error
+
errStr := err.Error()
+
if strings.Contains(errStr, "not found") ||
+
strings.Contains(errStr, "NoRecordsFound") ||
+
strings.Contains(errStr, "404") {
+
return nil, &ErrNotFound{
+
Identifier: identifier,
+
Reason: errStr,
+
}
+
}
+
+
return nil, &ErrResolutionFailed{
+
Identifier: identifier,
+
Reason: errStr,
+
}
+
}
+
+
// Extract PDS URL from identity
+
pdsURL := ident.PDSEndpoint()
+
+
return &Identity{
+
DID: ident.DID.String(),
+
Handle: ident.Handle.String(),
+
PDSURL: pdsURL,
+
ResolvedAt: time.Now().UTC(),
+
Method: MethodHTTPS, // Default - Indigo doesn't expose which method was used
+
}, nil
+
}
+
+
// ResolveHandle specifically resolves a handle to DID and PDS URL
+
func (r *baseResolver) ResolveHandle(ctx context.Context, handle string) (did, pdsURL string, err error) {
+
ident, err := r.Resolve(ctx, handle)
+
if err != nil {
+
return "", "", err
+
}
+
+
return ident.DID, ident.PDSURL, nil
+
}
+
+
// ResolveDID retrieves a DID document and extracts the PDS endpoint
+
func (r *baseResolver) ResolveDID(ctx context.Context, didStr string) (*DIDDocument, error) {
+
did, err := syntax.ParseDID(didStr)
+
if err != nil {
+
return nil, &ErrInvalidIdentifier{
+
Identifier: didStr,
+
Reason: fmt.Sprintf("invalid DID format: %v", err),
+
}
+
}
+
+
ident, err := r.directory.LookupDID(ctx, did)
+
if err != nil {
+
return nil, &ErrResolutionFailed{
+
Identifier: didStr,
+
Reason: err.Error(),
+
}
+
}
+
+
// Construct our DID document from Indigo's identity
+
doc := &DIDDocument{
+
DID: ident.DID.String(),
+
Service: []Service{},
+
}
+
+
// Extract PDS service endpoint
+
pdsURL := ident.PDSEndpoint()
+
if pdsURL != "" {
+
doc.Service = append(doc.Service, Service{
+
ID: "#atproto_pds",
+
Type: "AtprotoPersonalDataServer",
+
ServiceEndpoint: pdsURL,
+
})
+
}
+
+
return doc, nil
+
}
+
+
// Purge is a no-op for base resolver (no caching)
+
func (r *baseResolver) Purge(ctx context.Context, identifier string) error {
+
// Base resolver doesn't cache, so nothing to purge
+
return nil
+
}
+88
internal/atproto/identity/caching_resolver.go
···
+
package identity
+
+
import (
+
"context"
+
"log"
+
)
+
+
// cachingResolver wraps a base resolver with caching
+
type cachingResolver struct {
+
base Resolver
+
cache IdentityCache
+
}
+
+
// newCachingResolver creates a new caching resolver
+
func newCachingResolver(base Resolver, cache IdentityCache) Resolver {
+
return &cachingResolver{
+
base: base,
+
cache: cache,
+
}
+
}
+
+
// Resolve resolves a handle or DID to complete identity information
+
// First checks cache, then falls back to base resolver
+
func (r *cachingResolver) Resolve(ctx context.Context, identifier string) (*Identity, error) {
+
// Try cache first
+
cached, err := r.cache.Get(ctx, identifier)
+
if err == nil {
+
// Cache hit - mark it as from cache
+
cached.Method = MethodCache
+
return cached, nil
+
}
+
+
// Cache miss - resolve using base resolver
+
identity, err := r.base.Resolve(ctx, identifier)
+
if err != nil {
+
return nil, err
+
}
+
+
// Cache the resolved identity (ignore cache errors, just log them)
+
if cacheErr := r.cache.Set(ctx, identity); cacheErr != nil {
+
log.Printf("Warning: failed to cache identity for %s: %v", identifier, cacheErr)
+
}
+
+
return identity, nil
+
}
+
+
// ResolveHandle specifically resolves a handle to DID and PDS URL
+
func (r *cachingResolver) ResolveHandle(ctx context.Context, handle string) (did, pdsURL string, err error) {
+
identity, err := r.Resolve(ctx, handle)
+
if err != nil {
+
return "", "", err
+
}
+
+
return identity.DID, identity.PDSURL, nil
+
}
+
+
// ResolveDID retrieves a DID document and extracts the PDS endpoint
+
func (r *cachingResolver) ResolveDID(ctx context.Context, did string) (*DIDDocument, error) {
+
// Try to get from cache first
+
cached, err := r.cache.Get(ctx, did)
+
if err == nil {
+
// We have cached identity, construct a simple DID document
+
return &DIDDocument{
+
DID: cached.DID,
+
Service: []Service{
+
{
+
ID: "#atproto_pds",
+
Type: "AtprotoPersonalDataServer",
+
ServiceEndpoint: cached.PDSURL,
+
},
+
},
+
}, nil
+
}
+
+
// Cache miss - use base resolver
+
return r.base.ResolveDID(ctx, did)
+
}
+
+
// Purge removes an identifier from the cache and propagates to base
+
func (r *cachingResolver) Purge(ctx context.Context, identifier string) error {
+
// Purge from cache
+
if err := r.cache.Purge(ctx, identifier); err != nil {
+
return err
+
}
+
+
// Propagate to base resolver (though it typically won't cache)
+
return r.base.Purge(ctx, identifier)
+
}
+45
internal/atproto/identity/errors.go
···
+
package identity
+
+
import "fmt"
+
+
// ErrNotFound is returned when an identity cannot be resolved
+
type ErrNotFound struct {
+
Identifier string
+
Reason string
+
}
+
+
func (e *ErrNotFound) Error() string {
+
if e.Reason != "" {
+
return fmt.Sprintf("identity not found: %s (%s)", e.Identifier, e.Reason)
+
}
+
return fmt.Sprintf("identity not found: %s", e.Identifier)
+
}
+
+
// ErrInvalidIdentifier is returned for malformed handles or DIDs
+
type ErrInvalidIdentifier struct {
+
Identifier string
+
Reason string
+
}
+
+
func (e *ErrInvalidIdentifier) Error() string {
+
return fmt.Sprintf("invalid identifier %s: %s", e.Identifier, e.Reason)
+
}
+
+
// ErrCacheMiss is returned when an identifier is not in the cache
+
type ErrCacheMiss struct {
+
Identifier string
+
}
+
+
func (e *ErrCacheMiss) Error() string {
+
return fmt.Sprintf("cache miss: %s", e.Identifier)
+
}
+
+
// ErrResolutionFailed is returned when resolution fails for reasons other than not found
+
type ErrResolutionFailed struct {
+
Identifier string
+
Reason string
+
}
+
+
func (e *ErrResolutionFailed) Error() string {
+
return fmt.Sprintf("resolution failed for %s: %s", e.Identifier, e.Reason)
+
}
+56
internal/atproto/identity/factory.go
···
+
package identity
+
+
import (
+
"database/sql"
+
"net/http"
+
"time"
+
)
+
+
// Config holds configuration for the identity resolver
+
type Config struct {
+
// PLCURL is the URL of the PLC directory (default: https://plc.directory)
+
PLCURL string
+
+
// CacheTTL is how long to cache resolved identities
+
CacheTTL time.Duration
+
+
// HTTPClient for making HTTP requests (optional, will use default if nil)
+
HTTPClient *http.Client
+
}
+
+
// DefaultConfig returns a configuration with sensible defaults
+
func DefaultConfig() Config {
+
return Config{
+
PLCURL: "https://plc.directory",
+
CacheTTL: 24 * time.Hour, // Cache for 24 hours
+
HTTPClient: &http.Client{Timeout: 10 * time.Second},
+
}
+
}
+
+
// NewResolver creates a new identity resolver with caching
+
func NewResolver(db *sql.DB, config Config) Resolver {
+
// Apply defaults if not set
+
if config.PLCURL == "" {
+
config.PLCURL = "https://plc.directory"
+
}
+
if config.CacheTTL == 0 {
+
config.CacheTTL = 24 * time.Hour
+
}
+
if config.HTTPClient == nil {
+
config.HTTPClient = &http.Client{Timeout: 10 * time.Second}
+
}
+
+
// Create base resolver using Indigo
+
base := newBaseResolver(config.PLCURL, config.HTTPClient)
+
+
// Wrap with caching using PostgreSQL
+
cache := NewPostgresCache(db, config.CacheTTL)
+
caching := newCachingResolver(base, cache)
+
+
// Future: could add rate limiting here if needed
+
// if config.MaxConcurrent > 0 {
+
// return newRateLimitedResolver(caching, config.MaxConcurrent)
+
// }
+
+
return caching
+
}
+165
internal/atproto/identity/postgres_cache.go
···
+
package identity
+
+
import (
+
"context"
+
"database/sql"
+
"fmt"
+
"log"
+
"strings"
+
"time"
+
)
+
+
// postgresCache implements IdentityCache using PostgreSQL
+
type postgresCache struct {
+
db *sql.DB
+
ttl time.Duration
+
}
+
+
// NewPostgresCache creates a new PostgreSQL-backed identity cache
+
func NewPostgresCache(db *sql.DB, ttl time.Duration) IdentityCache {
+
return &postgresCache{
+
db: db,
+
ttl: ttl,
+
}
+
}
+
+
// Get retrieves a cached identity by handle or DID
+
func (r *postgresCache) Get(ctx context.Context, identifier string) (*Identity, error) {
+
identifier = normalizeIdentifier(identifier)
+
+
query := `
+
SELECT did, handle, pds_url, resolved_at, resolution_method, expires_at
+
FROM identity_cache
+
WHERE identifier = $1 AND expires_at > NOW()
+
`
+
+
var i Identity
+
var method string
+
var expiresAt time.Time
+
+
err := r.db.QueryRowContext(ctx, query, identifier).Scan(
+
&i.DID,
+
&i.Handle,
+
&i.PDSURL,
+
&i.ResolvedAt,
+
&method,
+
&expiresAt,
+
)
+
+
if err == sql.ErrNoRows {
+
return nil, &ErrCacheMiss{Identifier: identifier}
+
}
+
if err != nil {
+
return nil, fmt.Errorf("failed to query identity cache: %w", err)
+
}
+
+
// Convert string method to ResolutionMethod type
+
i.Method = MethodCache // It's from cache now
+
+
return &i, nil
+
}
+
+
// Set caches an identity bidirectionally (by handle and by DID)
+
func (r *postgresCache) Set(ctx context.Context, i *Identity) error {
+
expiresAt := time.Now().UTC().Add(r.ttl)
+
+
// Debug logging for cache operations (helps diagnose TTL issues)
+
log.Printf("[identity-cache] Caching: handle=%s, did=%s, expires=%s (TTL=%s)",
+
i.Handle, i.DID, expiresAt.Format(time.RFC3339), r.ttl)
+
+
query := `
+
INSERT INTO identity_cache (identifier, did, handle, pds_url, resolved_at, resolution_method, expires_at)
+
VALUES ($1, $2, $3, $4, $5, $6, $7)
+
ON CONFLICT (identifier)
+
DO UPDATE SET
+
did = EXCLUDED.did,
+
handle = EXCLUDED.handle,
+
pds_url = EXCLUDED.pds_url,
+
resolved_at = EXCLUDED.resolved_at,
+
resolution_method = EXCLUDED.resolution_method,
+
expires_at = EXCLUDED.expires_at,
+
updated_at = NOW()
+
`
+
+
// Cache by handle if present
+
if i.Handle != "" {
+
normalizedHandle := normalizeIdentifier(i.Handle)
+
_, err := r.db.ExecContext(ctx, query,
+
normalizedHandle, i.DID, i.Handle, i.PDSURL,
+
i.ResolvedAt, string(i.Method), expiresAt,
+
)
+
if err != nil {
+
return fmt.Errorf("failed to cache identity by handle: %w", err)
+
}
+
}
+
+
// Cache by DID
+
_, err := r.db.ExecContext(ctx, query,
+
i.DID, i.DID, i.Handle, i.PDSURL,
+
i.ResolvedAt, string(i.Method), expiresAt,
+
)
+
if err != nil {
+
return fmt.Errorf("failed to cache identity by DID: %w", err)
+
}
+
+
return nil
+
}
+
+
// Delete removes a cached identity by identifier
+
func (r *postgresCache) Delete(ctx context.Context, identifier string) error {
+
identifier = normalizeIdentifier(identifier)
+
+
query := `DELETE FROM identity_cache WHERE identifier = $1`
+
_, err := r.db.ExecContext(ctx, query, identifier)
+
if err != nil {
+
return fmt.Errorf("failed to delete from identity cache: %w", err)
+
}
+
+
return nil
+
}
+
+
// Purge removes all cache entries associated with an identifier
+
// This removes both handle and DID entries in a single atomic query
+
func (r *postgresCache) Purge(ctx context.Context, identifier string) error {
+
identifier = normalizeIdentifier(identifier)
+
+
// Single atomic query: find related entries and delete all at once
+
// This prevents race conditions and is more efficient than multiple queries
+
query := `
+
WITH related AS (
+
SELECT did, handle
+
FROM identity_cache
+
WHERE identifier = $1
+
LIMIT 1
+
)
+
DELETE FROM identity_cache
+
WHERE identifier = $1
+
OR identifier IN (SELECT did FROM related WHERE did IS NOT NULL)
+
OR identifier IN (SELECT handle FROM related WHERE handle IS NOT NULL AND handle != '')
+
`
+
+
result, err := r.db.ExecContext(ctx, query, identifier)
+
if err != nil {
+
return fmt.Errorf("failed to purge identity cache: %w", err)
+
}
+
+
rowsAffected, _ := result.RowsAffected()
+
if rowsAffected > 0 {
+
log.Printf("[identity-cache] Purged %d entries for: %s", rowsAffected, identifier)
+
}
+
+
return nil
+
}
+
+
// normalizeIdentifier normalizes handles to lowercase, leaves DIDs as-is
+
func normalizeIdentifier(identifier string) string {
+
identifier = strings.TrimSpace(identifier)
+
+
// DIDs are case-sensitive, handles are not
+
if strings.HasPrefix(identifier, "did:") {
+
return identifier
+
}
+
+
// It's a handle, normalize to lowercase
+
return strings.ToLower(identifier)
+
}
+40
internal/atproto/identity/resolver.go
···
+
package identity
+
+
import "context"
+
+
// Resolver provides methods for resolving atProto identities
+
type Resolver interface {
+
// Resolve resolves a handle or DID to complete identity information
+
// The identifier can be either:
+
// - A handle (e.g., "alice.bsky.social")
+
// - A DID (e.g., "did:plc:abc123")
+
Resolve(ctx context.Context, identifier string) (*Identity, error)
+
+
// ResolveHandle specifically resolves a handle to DID and PDS URL
+
// This is a convenience method for handle-only resolution
+
ResolveHandle(ctx context.Context, handle string) (did, pdsURL string, err error)
+
+
// ResolveDID retrieves a DID document and extracts the PDS endpoint
+
ResolveDID(ctx context.Context, did string) (*DIDDocument, error)
+
+
// Purge removes an identifier from the cache
+
// The identifier can be either a handle or DID
+
Purge(ctx context.Context, identifier string) error
+
}
+
+
// IdentityCache provides caching for resolved identities
+
type IdentityCache interface {
+
// Get retrieves a cached identity by handle or DID
+
Get(ctx context.Context, identifier string) (*Identity, error)
+
+
// Set caches an identity with the given TTL
+
// This should cache bidirectionally (both handle and DID as keys)
+
Set(ctx context.Context, identity *Identity) error
+
+
// Delete removes a cached identity by identifier
+
Delete(ctx context.Context, identifier string) error
+
+
// Purge removes all cache entries associated with an identifier
+
// (both handle and DID if applicable)
+
Purge(ctx context.Context, identifier string) error
+
}
+35
internal/atproto/identity/types.go
···
+
package identity
+
+
import "time"
+
+
// ResolutionMethod indicates how an identity was resolved
+
type ResolutionMethod string
+
+
const (
+
MethodCache ResolutionMethod = "cache"
+
MethodDNS ResolutionMethod = "dns"
+
MethodHTTPS ResolutionMethod = "https"
+
)
+
+
// Identity represents a fully resolved atProto identity
+
type Identity struct {
+
DID string // Decentralized Identifier (e.g., "did:plc:abc123")
+
Handle string // Human-readable handle (e.g., "alice.bsky.social")
+
PDSURL string // Personal Data Server URL
+
ResolvedAt time.Time // When this identity was resolved
+
Method ResolutionMethod // How it was resolved (cache, DNS, HTTPS)
+
}
+
+
// DIDDocument represents an AT Protocol DID document
+
// For now, we only extract the PDS service endpoint
+
type DIDDocument struct {
+
DID string
+
Service []Service
+
}
+
+
// Service represents a service entry in a DID document
+
type Service struct {
+
ID string
+
Type string
+
ServiceEndpoint string
+
}
+52 -31
internal/atproto/jetstream/user_consumer.go
···
"log"
"time"
+
"Coves/internal/atproto/identity"
"Coves/internal/core/users"
"github.com/gorilla/websocket"
)
···
// UserEventConsumer consumes user-related events from Jetstream
type UserEventConsumer struct {
-
userService users.UserService
-
wsURL string
-
pdsFilter string // Optional: only index users from specific PDS
+
userService users.UserService
+
identityResolver identity.Resolver
+
wsURL string
+
pdsFilter string // Optional: only index users from specific PDS
}
// NewUserEventConsumer creates a new Jetstream consumer for user events
-
func NewUserEventConsumer(userService users.UserService, wsURL string, pdsFilter string) *UserEventConsumer {
+
func NewUserEventConsumer(userService users.UserService, identityResolver identity.Resolver, wsURL string, pdsFilter string) *UserEventConsumer {
return &UserEventConsumer{
-
userService: userService,
-
wsURL: wsURL,
-
pdsFilter: pdsFilter,
+
userService: userService,
+
identityResolver: identityResolver,
+
wsURL: wsURL,
+
pdsFilter: pdsFilter,
}
}
···
log.Printf("Identity event: %s → %s", did, handle)
-
// For now, we'll create/update user on identity events
-
// In a full implementation, you'd want to:
-
// 1. Check if user exists
-
// 2. Update handle if changed
-
// 3. Resolve PDS URL from DID document
+
// Get existing user to check if handle changed
+
existingUser, err := c.userService.GetUserByDID(ctx, did)
+
if err != nil {
+
// User doesn't exist - create new user
+
pdsURL := "https://bsky.social" // Default Bluesky PDS
+
// TODO: Resolve PDS URL from DID document via PLC directory
-
// Simplified: just try to create user (will be idempotent)
-
// We need PDS URL - for now use a placeholder
-
// TODO: Implement DID→PDS resolution via PLC directory (https://plc.directory/{did})
-
// For production federation support, resolve PDS endpoint from DID document
-
// For local dev, this works fine since we filter to our own PDS
-
// See PR review issue #2
-
pdsURL := "https://bsky.social" // Default Bluesky PDS
+
_, createErr := c.userService.CreateUser(ctx, users.CreateUserRequest{
+
DID: did,
+
Handle: handle,
+
PDSURL: pdsURL,
+
})
-
_, err := c.userService.CreateUser(ctx, users.CreateUserRequest{
-
DID: did,
-
Handle: handle,
-
PDSURL: pdsURL,
-
})
+
if createErr != nil && !isDuplicateError(createErr) {
+
return fmt.Errorf("failed to create user: %w", createErr)
+
}
-
if err != nil {
-
// Check if it's a duplicate error (expected for idempotency)
-
if isDuplicateError(err) {
-
log.Printf("User already indexed: %s (%s)", handle, did)
-
return nil
+
log.Printf("Indexed new user: %s (%s)", handle, did)
+
return nil
+
}
+
+
// User exists - check if handle changed
+
if existingUser.Handle != handle {
+
log.Printf("Handle changed: %s → %s (DID: %s)", existingUser.Handle, handle, did)
+
+
// CRITICAL: Update database FIRST, then purge cache
+
// This prevents race condition where cache gets refilled with stale data
+
_, updateErr := c.userService.UpdateHandle(ctx, did, handle)
+
if updateErr != nil {
+
return fmt.Errorf("failed to update handle: %w", updateErr)
+
}
+
+
// CRITICAL: Purge BOTH old handle and DID from cache
+
// Old handle: alice.bsky.social → did:plc:abc123 (must be removed)
+
if purgeErr := c.identityResolver.Purge(ctx, existingUser.Handle); purgeErr != nil {
+
log.Printf("Warning: failed to purge old handle cache for %s: %v", existingUser.Handle, purgeErr)
}
-
return fmt.Errorf("failed to create user: %w", err)
+
+
// DID: did:plc:abc123 → alice.bsky.social (must be removed)
+
if purgeErr := c.identityResolver.Purge(ctx, did); purgeErr != nil {
+
log.Printf("Warning: failed to purge DID cache for %s: %v", did, purgeErr)
+
}
+
+
log.Printf("Updated handle and purged cache: %s → %s", existingUser.Handle, handle)
+
} else {
+
log.Printf("Handle unchanged for %s (%s)", handle, did)
}
-
log.Printf("Indexed new user: %s (%s)", handle, did)
return nil
}
+2
internal/core/users/interfaces.go
···
Create(ctx context.Context, user *User) (*User, error)
GetByDID(ctx context.Context, did string) (*User, error)
GetByHandle(ctx context.Context, handle string) (*User, error)
+
UpdateHandle(ctx context.Context, did, newHandle string) (*User, error)
}
// UserService defines the interface for user business logic
···
CreateUser(ctx context.Context, req CreateUserRequest) (*User, error)
GetUserByDID(ctx context.Context, did string) (*User, error)
GetUserByHandle(ctx context.Context, handle string) (*User, error)
+
UpdateHandle(ctx context.Context, did, newHandle string) (*User, error)
ResolveHandleToDID(ctx context.Context, handle string) (string, error)
RegisterAccount(ctx context.Context, req RegisterAccountRequest) (*RegisterAccountResponse, error)
}
+33 -10
internal/core/users/service.go
···
"regexp"
"strings"
"time"
+
+
"Coves/internal/atproto/identity"
)
// atProto handle validation regex (per official atProto spec: https://atproto.com/specs/handle)
···
)
type userService struct {
-
userRepo UserRepository
-
defaultPDS string // Default PDS URL for this Coves instance (used when creating new local users via registration API)
+
userRepo UserRepository
+
identityResolver identity.Resolver
+
defaultPDS string // Default PDS URL for this Coves instance (used when creating new local users via registration API)
}
// NewUserService creates a new user service
-
func NewUserService(userRepo UserRepository, defaultPDS string) UserService {
+
func NewUserService(userRepo UserRepository, identityResolver identity.Resolver, defaultPDS string) UserService {
return &userService{
-
userRepo: userRepo,
-
defaultPDS: defaultPDS,
+
userRepo: userRepo,
+
identityResolver: identityResolver,
+
defaultPDS: defaultPDS,
}
}
···
return s.userRepo.GetByHandle(ctx, handle)
}
+
// UpdateHandle updates the handle for a user with the given DID
+
func (s *userService) UpdateHandle(ctx context.Context, did, newHandle string) (*User, error) {
+
did = strings.TrimSpace(did)
+
newHandle = strings.TrimSpace(strings.ToLower(newHandle))
+
+
if did == "" {
+
return nil, fmt.Errorf("DID is required")
+
}
+
if newHandle == "" {
+
return nil, fmt.Errorf("handle is required")
+
}
+
+
// Validate new handle format
+
if err := validateHandle(newHandle); err != nil {
+
return nil, err
+
}
+
+
return s.userRepo.UpdateHandle(ctx, did, newHandle)
+
}
+
// ResolveHandleToDID resolves a handle to a DID
// This is critical for login: users enter their handle, we resolve to DID
-
// TODO: Implement actual DNS/HTTPS resolution via atProto
+
// Uses DNS TXT record lookup and HTTPS .well-known/atproto-did resolution
func (s *userService) ResolveHandleToDID(ctx context.Context, handle string) (string, error) {
handle = strings.TrimSpace(strings.ToLower(handle))
if handle == "" {
return "", fmt.Errorf("handle is required")
}
-
// For now, check if user exists in our AppView database
-
// Later: implement DNS TXT record lookup or HTTPS .well-known/atproto-did
-
user, err := s.userRepo.GetByHandle(ctx, handle)
+
// Use identity resolver to resolve handle to DID
+
did, _, err := s.identityResolver.ResolveHandle(ctx, handle)
if err != nil {
return "", fmt.Errorf("failed to resolve handle %s: %w", handle, err)
}
-
return user.DID, nil
+
return did, nil
}
// RegisterAccount creates a new account on the PDS via XRPC
+58
internal/db/migrations/002_create_identity_cache_table.sql
···
+
-- +goose Up
+
-- +goose StatementBegin
+
CREATE TABLE identity_cache (
+
-- Can lookup by either handle or DID
+
identifier TEXT PRIMARY KEY,
+
+
-- Cached resolution data
+
did TEXT NOT NULL,
+
handle TEXT,
+
pds_url TEXT,
+
+
-- Resolution metadata
+
resolved_at TIMESTAMP WITH TIME ZONE NOT NULL,
+
resolution_method TEXT NOT NULL, -- 'dns', 'https', 'cache'
+
+
-- Cache management
+
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
+
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
+
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
+
);
+
+
-- Index for reverse lookup (DID → handle)
+
CREATE INDEX idx_identity_cache_did ON identity_cache(did);
+
+
-- Index for expiry cleanup
+
CREATE INDEX idx_identity_cache_expires ON identity_cache(expires_at);
+
+
-- Function to normalize handles to lowercase
+
CREATE OR REPLACE FUNCTION normalize_handle() RETURNS TRIGGER AS $$
+
BEGIN
+
IF NEW.handle IS NOT NULL THEN
+
NEW.handle = LOWER(TRIM(NEW.handle));
+
END IF;
+
IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
+
-- Normalize identifier if it looks like a handle (contains a dot)
+
IF NEW.identifier LIKE '%.%' THEN
+
NEW.identifier = LOWER(TRIM(NEW.identifier));
+
END IF;
+
END IF;
+
NEW.updated_at = CURRENT_TIMESTAMP;
+
RETURN NEW;
+
END;
+
$$ LANGUAGE plpgsql;
+
+
-- Trigger to normalize handles automatically
+
CREATE TRIGGER normalize_handle_trigger
+
BEFORE INSERT OR UPDATE ON identity_cache
+
FOR EACH ROW
+
EXECUTE FUNCTION normalize_handle();
+
+
-- +goose StatementEnd
+
+
-- +goose Down
+
-- +goose StatementBegin
+
DROP TRIGGER IF EXISTS normalize_handle_trigger ON identity_cache;
+
DROP FUNCTION IF EXISTS normalize_handle();
+
DROP TABLE IF EXISTS identity_cache;
+
-- +goose StatementEnd
+26
internal/db/postgres/user_repo.go
···
return user, nil
}
+
+
// UpdateHandle updates the handle for a user with the given DID
+
func (r *postgresUserRepo) UpdateHandle(ctx context.Context, did, newHandle string) (*users.User, error) {
+
user := &users.User{}
+
query := `
+
UPDATE users
+
SET handle = $2, updated_at = NOW()
+
WHERE did = $1
+
RETURNING did, handle, pds_url, created_at, updated_at`
+
+
err := r.db.QueryRowContext(ctx, query, did, newHandle).
+
Scan(&user.DID, &user.Handle, &user.PDSURL, &user.CreatedAt, &user.UpdatedAt)
+
+
if err == sql.ErrNoRows {
+
return nil, fmt.Errorf("user not found")
+
}
+
if err != nil {
+
// Check for unique constraint violation on handle
+
if strings.Contains(err.Error(), "duplicate key") && strings.Contains(err.Error(), "users_handle_key") {
+
return nil, fmt.Errorf("handle already taken")
+
}
+
return nil, fmt.Errorf("failed to update handle: %w", err)
+
}
+
+
return user, nil
+
}
+4 -1
tests/e2e/user_signup_test.go
···
"testing"
"time"
+
"Coves/internal/atproto/identity"
"Coves/internal/atproto/jetstream"
"Coves/internal/core/users"
"Coves/internal/db/postgres"
···
// Set up services
userRepo := postgres.NewUserRepository(db)
-
userService := users.NewUserService(userRepo, "http://localhost:3001")
+
resolver := identity.NewResolver(db, identity.DefaultConfig())
+
userService := users.NewUserService(userRepo, resolver, "http://localhost:3001")
// Start Jetstream consumer
consumer := jetstream.NewUserEventConsumer(
userService,
+
resolver,
"ws://localhost:6008/subscribe",
"", // No PDS filter
)
+489
tests/integration/identity_resolution_test.go
···
+
package integration
+
+
import (
+
"context"
+
"fmt"
+
"os"
+
"testing"
+
"time"
+
+
"Coves/internal/atproto/identity"
+
)
+
+
// uniqueID generates a unique identifier for test isolation
+
func uniqueID() string {
+
return fmt.Sprintf("test-%d", time.Now().UnixNano())
+
}
+
+
// TestIdentityCache tests the PostgreSQL identity cache operations
+
func TestIdentityCache(t *testing.T) {
+
db := setupTestDB(t)
+
defer db.Close()
+
+
cache := identity.NewPostgresCache(db, 5*time.Minute)
+
ctx := context.Background()
+
+
// Generate unique test prefix for parallel safety
+
testID := fmt.Sprintf("test-%d", time.Now().UnixNano())
+
+
t.Run("Cache Miss on Empty Cache", func(t *testing.T) {
+
_, err := cache.Get(ctx, testID+"-nonexistent.test")
+
if err == nil {
+
t.Error("Expected cache miss error, got nil")
+
}
+
})
+
+
t.Run("Set and Get Identity by Handle", func(t *testing.T) {
+
ident := &identity.Identity{
+
DID: "did:plc:" + testID + "-test123abc",
+
Handle: testID + "-alice.test",
+
PDSURL: "https://pds.alice.test",
+
ResolvedAt: time.Now().UTC(),
+
Method: identity.MethodHTTPS,
+
}
+
+
// Set identity in cache
+
if err := cache.Set(ctx, ident); err != nil {
+
t.Fatalf("Failed to cache identity: %v", err)
+
}
+
+
// Get by handle
+
cached, err := cache.Get(ctx, ident.Handle)
+
if err != nil {
+
t.Fatalf("Failed to get cached identity by handle: %v", err)
+
}
+
+
if cached.DID != ident.DID {
+
t.Errorf("Expected DID %s, got %s", ident.DID, cached.DID)
+
}
+
if cached.Handle != ident.Handle {
+
t.Errorf("Expected handle %s, got %s", ident.Handle, cached.Handle)
+
}
+
if cached.PDSURL != ident.PDSURL {
+
t.Errorf("Expected PDS URL %s, got %s", ident.PDSURL, cached.PDSURL)
+
}
+
})
+
+
t.Run("Get Identity by DID", func(t *testing.T) {
+
// Should be able to retrieve by DID as well (bidirectional cache)
+
expectedDID := "did:plc:" + testID + "-test123abc"
+
expectedHandle := testID + "-alice.test"
+
+
cached, err := cache.Get(ctx, expectedDID)
+
if err != nil {
+
t.Fatalf("Failed to get cached identity by DID: %v", err)
+
}
+
+
if cached.Handle != expectedHandle {
+
t.Errorf("Expected handle %s, got %s", expectedHandle, cached.Handle)
+
}
+
})
+
+
t.Run("Update Existing Cache Entry", func(t *testing.T) {
+
// Update with new PDS URL
+
updated := &identity.Identity{
+
DID: "did:plc:test123abc",
+
Handle: "alice.test",
+
PDSURL: "https://new-pds.alice.test",
+
ResolvedAt: time.Now(),
+
Method: identity.MethodHTTPS,
+
}
+
+
if err := cache.Set(ctx, updated); err != nil {
+
t.Fatalf("Failed to update cached identity: %v", err)
+
}
+
+
cached, err := cache.Get(ctx, "alice.test")
+
if err != nil {
+
t.Fatalf("Failed to get updated identity: %v", err)
+
}
+
+
if cached.PDSURL != "https://new-pds.alice.test" {
+
t.Errorf("Expected updated PDS URL, got %s", cached.PDSURL)
+
}
+
})
+
+
t.Run("Delete Cache Entry", func(t *testing.T) {
+
if err := cache.Delete(ctx, "alice.test"); err != nil {
+
t.Fatalf("Failed to delete cache entry: %v", err)
+
}
+
+
// Should now be a cache miss
+
_, err := cache.Get(ctx, "alice.test")
+
if err == nil {
+
t.Error("Expected cache miss after deletion, got nil error")
+
}
+
})
+
+
t.Run("Purge Removes Both Handle and DID Entries", func(t *testing.T) {
+
ident := &identity.Identity{
+
DID: "did:plc:purgetest",
+
Handle: "purge.test",
+
PDSURL: "https://pds.purge.test",
+
ResolvedAt: time.Now(),
+
Method: identity.MethodDNS,
+
}
+
+
if err := cache.Set(ctx, ident); err != nil {
+
t.Fatalf("Failed to cache identity: %v", err)
+
}
+
+
// Verify both entries exist
+
if _, err := cache.Get(ctx, "purge.test"); err != nil {
+
t.Errorf("Handle entry should exist: %v", err)
+
}
+
if _, err := cache.Get(ctx, "did:plc:purgetest"); err != nil {
+
t.Errorf("DID entry should exist: %v", err)
+
}
+
+
// Purge by handle
+
if err := cache.Purge(ctx, "purge.test"); err != nil {
+
t.Fatalf("Failed to purge: %v", err)
+
}
+
+
// Both should be gone
+
if _, err := cache.Get(ctx, "purge.test"); err == nil {
+
t.Error("Handle entry should be purged")
+
}
+
if _, err := cache.Get(ctx, "did:plc:purgetest"); err == nil {
+
t.Error("DID entry should be purged")
+
}
+
})
+
+
t.Run("Handle Normalization - Case Insensitive", func(t *testing.T) {
+
ident := &identity.Identity{
+
DID: "did:plc:casetest",
+
Handle: "Alice.Test",
+
PDSURL: "https://pds.alice.test",
+
ResolvedAt: time.Now(),
+
Method: identity.MethodHTTPS,
+
}
+
+
if err := cache.Set(ctx, ident); err != nil {
+
t.Fatalf("Failed to cache identity: %v", err)
+
}
+
+
// Should be retrievable with different casing
+
cached, err := cache.Get(ctx, "ALICE.TEST")
+
if err != nil {
+
t.Fatalf("Failed to get identity with different casing: %v", err)
+
}
+
+
if cached.DID != "did:plc:casetest" {
+
t.Errorf("Expected DID did:plc:casetest, got %s", cached.DID)
+
}
+
+
// Cleanup
+
cache.Delete(ctx, "alice.test")
+
})
+
+
t.Run("DID is Case Sensitive", func(t *testing.T) {
+
ident := &identity.Identity{
+
DID: "did:plc:CaseSensitive",
+
Handle: "sensitive.test",
+
PDSURL: "https://pds.test",
+
ResolvedAt: time.Now(),
+
Method: identity.MethodHTTPS,
+
}
+
+
if err := cache.Set(ctx, ident); err != nil {
+
t.Fatalf("Failed to cache identity: %v", err)
+
}
+
+
// Should retrieve with exact case
+
if _, err := cache.Get(ctx, "did:plc:CaseSensitive"); err != nil {
+
t.Errorf("Should retrieve DID with exact case: %v", err)
+
}
+
+
// Different case should miss (DIDs are case-sensitive)
+
if _, err := cache.Get(ctx, "did:plc:casesensitive"); err == nil {
+
t.Error("Should NOT retrieve DID with different case")
+
}
+
+
// Cleanup
+
cache.Delete(ctx, "did:plc:CaseSensitive")
+
})
+
}
+
+
// TestIdentityCacheTTL tests that expired cache entries are not returned
+
func TestIdentityCacheTTL(t *testing.T) {
+
db := setupTestDB(t)
+
defer db.Close()
+
+
// Create cache with very short TTL (reduced from 1s to 100ms for faster, less flaky tests)
+
ttl := 100 * time.Millisecond
+
cache := identity.NewPostgresCache(db, ttl)
+
ctx := context.Background()
+
+
// Use unique ID for test isolation
+
testID := uniqueID()
+
+
ident := &identity.Identity{
+
DID: "did:plc:" + testID,
+
Handle: testID + ".ttl.test",
+
PDSURL: "https://pds.ttl.test",
+
ResolvedAt: time.Now().UTC(),
+
Method: identity.MethodHTTPS,
+
}
+
+
if err := cache.Set(ctx, ident); err != nil {
+
t.Fatalf("Failed to cache identity: %v", err)
+
}
+
+
// Should be retrievable immediately
+
if _, err := cache.Get(ctx, ident.Handle); err != nil {
+
t.Errorf("Should retrieve fresh cache entry: %v", err)
+
}
+
+
// Wait for TTL to expire (1.5x TTL for safety margin on slow systems)
+
waitTime := time.Duration(float64(ttl) * 1.5)
+
t.Logf("Waiting %s for cache entry to expire (TTL=%s)...", waitTime, ttl)
+
time.Sleep(waitTime)
+
+
// Should now be a cache miss
+
_, err := cache.Get(ctx, ident.Handle)
+
if err == nil {
+
t.Error("Expected cache miss after TTL expiration, got nil error")
+
}
+
}
+
+
// TestIdentityResolverWithCache tests the caching resolver behavior
+
func TestIdentityResolverWithCache(t *testing.T) {
+
db := setupTestDB(t)
+
defer db.Close()
+
+
cache := identity.NewPostgresCache(db, 5*time.Minute)
+
+
// Clean slate
+
_, _ = db.Exec("TRUNCATE identity_cache")
+
+
// Create resolver with caching
+
resolver := identity.NewResolver(db, identity.Config{
+
PLCURL: "https://plc.directory",
+
CacheTTL: 5 * time.Minute,
+
})
+
+
ctx := context.Background()
+
+
t.Run("Resolve Invalid Identifier", func(t *testing.T) {
+
_, err := resolver.Resolve(ctx, "")
+
if err == nil {
+
t.Error("Expected error for empty identifier")
+
}
+
+
_, err = resolver.Resolve(ctx, "invalid format")
+
if err == nil {
+
t.Error("Expected error for invalid identifier format")
+
}
+
})
+
+
t.Run("ResolveHandle Returns DID and PDS URL", func(t *testing.T) {
+
// Pre-populate cache with known identity
+
ident := &identity.Identity{
+
DID: "did:plc:resolvetest",
+
Handle: "resolve.test",
+
PDSURL: "https://pds.resolve.test",
+
ResolvedAt: time.Now(),
+
Method: identity.MethodDNS,
+
}
+
+
if err := cache.Set(ctx, ident); err != nil {
+
t.Fatalf("Failed to pre-populate cache: %v", err)
+
}
+
+
did, pdsURL, err := resolver.ResolveHandle(ctx, "resolve.test")
+
if err != nil {
+
t.Fatalf("Failed to resolve handle: %v", err)
+
}
+
+
if did != "did:plc:resolvetest" {
+
t.Errorf("Expected DID did:plc:resolvetest, got %s", did)
+
}
+
if pdsURL != "https://pds.resolve.test" {
+
t.Errorf("Expected PDS URL https://pds.resolve.test, got %s", pdsURL)
+
}
+
})
+
+
t.Run("Purge Removes from Cache", func(t *testing.T) {
+
// Pre-populate cache
+
ident := &identity.Identity{
+
DID: "did:plc:purge123",
+
Handle: "purgetest.test",
+
PDSURL: "https://pds.test",
+
ResolvedAt: time.Now(),
+
Method: identity.MethodHTTPS,
+
}
+
+
if err := cache.Set(ctx, ident); err != nil {
+
t.Fatalf("Failed to cache identity: %v", err)
+
}
+
+
// Verify it's cached
+
if _, err := cache.Get(ctx, "purgetest.test"); err != nil {
+
t.Fatalf("Identity should be cached: %v", err)
+
}
+
+
// Purge via resolver
+
if err := resolver.Purge(ctx, "purgetest.test"); err != nil {
+
t.Fatalf("Failed to purge: %v", err)
+
}
+
+
// Should be gone from cache
+
if _, err := cache.Get(ctx, "purgetest.test"); err == nil {
+
t.Error("Identity should be purged from cache")
+
}
+
})
+
}
+
+
// TestIdentityResolverRealHandles tests resolution with real atProto handles
+
// This is an optional integration test that requires network access
+
func TestIdentityResolverRealHandles(t *testing.T) {
+
if testing.Short() {
+
t.Skip("Skipping real handle resolution test in short mode")
+
}
+
+
// Skip if environment variable is not set (opt-in for real network tests)
+
if os.Getenv("TEST_REAL_HANDLES") != "1" {
+
t.Skip("Skipping real handle resolution - set TEST_REAL_HANDLES=1 to enable")
+
}
+
+
db := setupTestDB(t)
+
defer db.Close()
+
+
resolver := identity.NewResolver(db, identity.Config{
+
PLCURL: "https://plc.directory",
+
CacheTTL: 10 * time.Minute,
+
})
+
+
ctx := context.Background()
+
+
testCases := []struct {
+
name string
+
handle string
+
expectError bool
+
expectedMethod identity.ResolutionMethod
+
}{
+
{
+
name: "Resolve bsky.app (well-known handle)",
+
handle: "bsky.app",
+
expectError: false,
+
expectedMethod: identity.MethodHTTPS,
+
},
+
{
+
name: "Resolve nonexistent handle",
+
handle: "this-handle-definitely-does-not-exist-12345.bsky.social",
+
expectError: true,
+
},
+
}
+
+
for _, tc := range testCases {
+
t.Run(tc.name, func(t *testing.T) {
+
ident, err := resolver.Resolve(ctx, tc.handle)
+
+
if tc.expectError {
+
if err == nil {
+
t.Error("Expected error for nonexistent handle")
+
}
+
return
+
}
+
+
if err != nil {
+
t.Fatalf("Failed to resolve handle %s: %v", tc.handle, err)
+
}
+
+
if ident.Handle != tc.handle {
+
t.Errorf("Expected handle %s, got %s", tc.handle, ident.Handle)
+
}
+
+
if ident.DID == "" {
+
t.Error("Expected non-empty DID")
+
}
+
+
if ident.PDSURL == "" {
+
t.Error("Expected non-empty PDS URL")
+
}
+
+
t.Logf("✅ Resolved %s → %s (PDS: %s, Method: %s)",
+
ident.Handle, ident.DID, ident.PDSURL, ident.Method)
+
+
// Second resolution should hit cache
+
ident2, err := resolver.Resolve(ctx, tc.handle)
+
if err != nil {
+
t.Fatalf("Failed second resolution: %v", err)
+
}
+
+
if ident2.Method != identity.MethodCache {
+
t.Errorf("Second resolution should be from cache, got method: %s", ident2.Method)
+
}
+
+
t.Logf("✅ Second resolution from cache: %s (Method: %s)", tc.handle, ident2.Method)
+
})
+
}
+
}
+
+
// TestResolveDID tests DID document resolution
+
func TestResolveDID(t *testing.T) {
+
if testing.Short() {
+
t.Skip("Skipping DID resolution test in short mode")
+
}
+
+
if os.Getenv("TEST_REAL_HANDLES") != "1" {
+
t.Skip("Skipping DID resolution - set TEST_REAL_HANDLES=1 to enable")
+
}
+
+
db := setupTestDB(t)
+
defer db.Close()
+
+
resolver := identity.NewResolver(db, identity.Config{
+
PLCURL: "https://plc.directory",
+
CacheTTL: 10 * time.Minute,
+
})
+
+
ctx := context.Background()
+
+
t.Run("Resolve Real DID Document", func(t *testing.T) {
+
// First resolve a handle to get a real DID
+
ident, err := resolver.Resolve(ctx, "bsky.app")
+
if err != nil {
+
t.Skipf("Failed to resolve handle for DID test: %v", err)
+
}
+
+
// Now resolve the DID document
+
doc, err := resolver.ResolveDID(ctx, ident.DID)
+
if err != nil {
+
t.Fatalf("Failed to resolve DID document: %v", err)
+
}
+
+
if doc.DID != ident.DID {
+
t.Errorf("Expected DID %s, got %s", ident.DID, doc.DID)
+
}
+
+
// Should have at least PDS service
+
if len(doc.Service) == 0 {
+
t.Error("Expected at least one service in DID document")
+
}
+
+
// Find PDS service
+
foundPDS := false
+
for _, svc := range doc.Service {
+
if svc.Type == "AtprotoPersonalDataServer" {
+
foundPDS = true
+
if svc.ServiceEndpoint == "" {
+
t.Error("PDS service endpoint should not be empty")
+
}
+
t.Logf("✅ PDS Service: %s", svc.ServiceEndpoint)
+
}
+
}
+
+
if !foundPDS {
+
t.Error("Expected to find AtprotoPersonalDataServer service in DID document")
+
}
+
})
+
+
t.Run("Resolve Invalid DID", func(t *testing.T) {
+
_, err := resolver.ResolveDID(ctx, "not-a-did")
+
if err == nil {
+
t.Error("Expected error for invalid DID format")
+
}
+
})
+
}
+110 -6
tests/integration/jetstream_consumer_test.go
···
"testing"
"time"
+
"Coves/internal/atproto/identity"
"Coves/internal/atproto/jetstream"
"Coves/internal/core/users"
"Coves/internal/db/postgres"
···
// Wire up dependencies
userRepo := postgres.NewUserRepository(db)
-
userService := users.NewUserService(userRepo, "http://localhost:3001")
+
resolver := identity.NewResolver(db, identity.DefaultConfig())
+
userService := users.NewUserService(userRepo, resolver, "http://localhost:3001")
ctx := context.Background()
···
},
}
-
consumer := jetstream.NewUserEventConsumer(userService, "", "")
+
consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
// Handle the event
err := consumer.HandleIdentityEventPublic(ctx, &event)
···
},
}
-
consumer := jetstream.NewUserEventConsumer(userService, "", "")
+
consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
// Handle duplicate event - should not error
err = consumer.HandleIdentityEventPublic(ctx, &event)
···
})
t.Run("Index multiple users", func(t *testing.T) {
-
consumer := jetstream.NewUserEventConsumer(userService, "", "")
+
consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
users := []struct {
did string
···
})
t.Run("Skip invalid events", func(t *testing.T) {
-
consumer := jetstream.NewUserEventConsumer(userService, "", "")
+
consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
// Missing DID
invalidEvent1 := jetstream.JetstreamEvent{
···
t.Error("expected error for nil identity data, got nil")
}
})
+
+
t.Run("Handle change updates database and purges cache", func(t *testing.T) {
+
testID := "handlechange"
+
oldHandle := "old." + testID + ".test"
+
newHandle := "new." + testID + ".test"
+
did := "did:plc:" + testID
+
+
// 1. Create user with old handle
+
_, err := userService.CreateUser(ctx, users.CreateUserRequest{
+
DID: did,
+
Handle: oldHandle,
+
PDSURL: "https://bsky.social",
+
})
+
if err != nil {
+
t.Fatalf("failed to create initial user: %v", err)
+
}
+
+
// 2. Manually cache the identity (simulate a previous resolution)
+
cache := identity.NewPostgresCache(db, 24*time.Hour)
+
err = cache.Set(ctx, &identity.Identity{
+
DID: did,
+
Handle: oldHandle,
+
PDSURL: "https://bsky.social",
+
Method: identity.MethodDNS,
+
ResolvedAt: time.Now().UTC(),
+
})
+
if err != nil {
+
t.Fatalf("failed to cache identity: %v", err)
+
}
+
+
// 3. Verify cached (both handle and DID should be cached)
+
cachedByHandle, err := cache.Get(ctx, oldHandle)
+
if err != nil {
+
t.Fatalf("expected old handle to be cached, got error: %v", err)
+
}
+
if cachedByHandle.DID != did {
+
t.Errorf("expected cached DID %s, got %s", did, cachedByHandle.DID)
+
}
+
+
cachedByDID, err := cache.Get(ctx, did)
+
if err != nil {
+
t.Fatalf("expected DID to be cached, got error: %v", err)
+
}
+
if cachedByDID.Handle != oldHandle {
+
t.Errorf("expected cached handle %s, got %s", oldHandle, cachedByDID.Handle)
+
}
+
+
// 4. Send identity event with NEW handle
+
event := jetstream.JetstreamEvent{
+
Did: did,
+
Kind: "identity",
+
Identity: &jetstream.IdentityEvent{
+
Did: did,
+
Handle: newHandle,
+
Seq: 99999,
+
Time: time.Now().Format(time.RFC3339),
+
},
+
}
+
+
consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "")
+
err = consumer.HandleIdentityEventPublic(ctx, &event)
+
if err != nil {
+
t.Fatalf("failed to handle handle change event: %v", err)
+
}
+
+
// 5. Verify database updated
+
user, err := userService.GetUserByDID(ctx, did)
+
if err != nil {
+
t.Fatalf("failed to get user by DID: %v", err)
+
}
+
if user.Handle != newHandle {
+
t.Errorf("expected database to have new handle %s, got %s", newHandle, user.Handle)
+
}
+
+
// 6. Verify old handle purged from cache
+
_, err = cache.Get(ctx, oldHandle)
+
if err == nil {
+
t.Error("expected old handle to be purged from cache, but it's still cached")
+
}
+
if _, isCacheMiss := err.(*identity.ErrCacheMiss); !isCacheMiss {
+
t.Errorf("expected ErrCacheMiss for old handle, got: %v", err)
+
}
+
+
// 7. Verify DID cache purged
+
_, err = cache.Get(ctx, did)
+
if err == nil {
+
t.Error("expected DID to be purged from cache, but it's still cached")
+
}
+
if _, isCacheMiss := err.(*identity.ErrCacheMiss); !isCacheMiss {
+
t.Errorf("expected ErrCacheMiss for DID, got: %v", err)
+
}
+
+
// 8. Verify user can be found by new handle
+
userByHandle, err := userService.GetUserByHandle(ctx, newHandle)
+
if err != nil {
+
t.Fatalf("failed to get user by new handle: %v", err)
+
}
+
if userByHandle.DID != did {
+
t.Errorf("expected DID %s when looking up by new handle, got %s", did, userByHandle.DID)
+
}
+
})
}
func TestUserServiceIdempotency(t *testing.T) {
···
defer db.Close()
userRepo := postgres.NewUserRepository(db)
-
userService := users.NewUserService(userRepo, "http://localhost:3001")
+
resolver := identity.NewResolver(db, identity.DefaultConfig())
+
userService := users.NewUserService(userRepo, resolver, "http://localhost:3001")
ctx := context.Background()
t.Run("CreateUser is idempotent for duplicate DID", func(t *testing.T) {
+17 -9
tests/integration/user_test.go
···
"github.com/pressly/goose/v3"
"Coves/internal/api/routes"
+
"Coves/internal/atproto/identity"
"Coves/internal/core/users"
"Coves/internal/db/postgres"
)
···
// Wire up dependencies
userRepo := postgres.NewUserRepository(db)
-
userService := users.NewUserService(userRepo, "http://localhost:3001")
+
resolver := identity.NewResolver(db, identity.DefaultConfig())
+
userService := users.NewUserService(userRepo, resolver, "http://localhost:3001")
ctx := context.Background()
···
}
})
-
// Test 4: Resolve handle to DID
+
// Test 4: Resolve handle to DID (using real handle)
t.Run("Resolve Handle to DID", func(t *testing.T) {
-
did, err := userService.ResolveHandleToDID(ctx, "alice.test")
+
// Test with a real atProto handle
+
did, err := userService.ResolveHandleToDID(ctx, "bretton.dev")
if err != nil {
-
t.Fatalf("Failed to resolve handle: %v", err)
+
t.Fatalf("Failed to resolve handle bretton.dev: %v", err)
}
-
if did != "did:plc:test123456" {
-
t.Errorf("Expected DID did:plc:test123456, got %s", did)
+
if did == "" {
+
t.Error("Expected non-empty DID")
}
+
+
t.Logf("✅ Resolved bretton.dev → %s", did)
})
}
···
// Wire up dependencies
userRepo := postgres.NewUserRepository(db)
-
userService := users.NewUserService(userRepo, "http://localhost:3001")
+
resolver := identity.NewResolver(db, identity.DefaultConfig())
+
userService := users.NewUserService(userRepo, resolver, "http://localhost:3001")
// Create test user directly in service
ctx := context.Background()
···
defer db.Close()
userRepo := postgres.NewUserRepository(db)
-
userService := users.NewUserService(userRepo, "http://localhost:3001")
+
resolver := identity.NewResolver(db, identity.DefaultConfig())
+
userService := users.NewUserService(userRepo, resolver, "http://localhost:3001")
ctx := context.Background()
// Create first user
···
defer db.Close()
userRepo := postgres.NewUserRepository(db)
-
userService := users.NewUserService(userRepo, "http://localhost:3001")
+
resolver := identity.NewResolver(db, identity.DefaultConfig())
+
userService := users.NewUserService(userRepo, resolver, "http://localhost:3001")
ctx := context.Background()
testCases := []struct {