A community based topic aggregation platform built on atproto
1package identity 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log" 8 "strings" 9 "time" 10) 11 12// postgresCache implements IdentityCache using PostgreSQL 13type postgresCache struct { 14 db *sql.DB 15 ttl time.Duration 16} 17 18// NewPostgresCache creates a new PostgreSQL-backed identity cache 19func NewPostgresCache(db *sql.DB, ttl time.Duration) IdentityCache { 20 return &postgresCache{ 21 db: db, 22 ttl: ttl, 23 } 24} 25 26// Get retrieves a cached identity by handle or DID 27func (r *postgresCache) Get(ctx context.Context, identifier string) (*Identity, error) { 28 identifier = normalizeIdentifier(identifier) 29 30 query := ` 31 SELECT did, handle, pds_url, resolved_at, resolution_method, expires_at 32 FROM identity_cache 33 WHERE identifier = $1 AND expires_at > NOW() 34 ` 35 36 var i Identity 37 var method string 38 var expiresAt time.Time 39 40 err := r.db.QueryRowContext(ctx, query, identifier).Scan( 41 &i.DID, 42 &i.Handle, 43 &i.PDSURL, 44 &i.ResolvedAt, 45 &method, 46 &expiresAt, 47 ) 48 49 if err == sql.ErrNoRows { 50 return nil, &ErrCacheMiss{Identifier: identifier} 51 } 52 if err != nil { 53 return nil, fmt.Errorf("failed to query identity cache: %w", err) 54 } 55 56 // Convert string method to ResolutionMethod type 57 i.Method = MethodCache // It's from cache now 58 59 return &i, nil 60} 61 62// Set caches an identity bidirectionally (by handle and by DID) 63func (r *postgresCache) Set(ctx context.Context, i *Identity) error { 64 expiresAt := time.Now().UTC().Add(r.ttl) 65 66 // Debug logging for cache operations (helps diagnose TTL issues) 67 log.Printf("[identity-cache] Caching: handle=%s, did=%s, expires=%s (TTL=%s)", 68 i.Handle, i.DID, expiresAt.Format(time.RFC3339), r.ttl) 69 70 query := ` 71 INSERT INTO identity_cache (identifier, did, handle, pds_url, resolved_at, resolution_method, expires_at) 72 VALUES ($1, $2, $3, $4, $5, $6, $7) 73 ON CONFLICT (identifier) 74 DO UPDATE SET 75 did = EXCLUDED.did, 76 handle = EXCLUDED.handle, 77 pds_url = EXCLUDED.pds_url, 78 resolved_at = EXCLUDED.resolved_at, 79 resolution_method = EXCLUDED.resolution_method, 80 expires_at = EXCLUDED.expires_at, 81 updated_at = NOW() 82 ` 83 84 // Cache by handle if present 85 if i.Handle != "" { 86 normalizedHandle := normalizeIdentifier(i.Handle) 87 _, err := r.db.ExecContext(ctx, query, 88 normalizedHandle, i.DID, i.Handle, i.PDSURL, 89 i.ResolvedAt, string(i.Method), expiresAt, 90 ) 91 if err != nil { 92 return fmt.Errorf("failed to cache identity by handle: %w", err) 93 } 94 } 95 96 // Cache by DID 97 _, err := r.db.ExecContext(ctx, query, 98 i.DID, i.DID, i.Handle, i.PDSURL, 99 i.ResolvedAt, string(i.Method), expiresAt, 100 ) 101 if err != nil { 102 return fmt.Errorf("failed to cache identity by DID: %w", err) 103 } 104 105 return nil 106} 107 108// Delete removes a cached identity by identifier 109func (r *postgresCache) Delete(ctx context.Context, identifier string) error { 110 identifier = normalizeIdentifier(identifier) 111 112 query := `DELETE FROM identity_cache WHERE identifier = $1` 113 _, err := r.db.ExecContext(ctx, query, identifier) 114 if err != nil { 115 return fmt.Errorf("failed to delete from identity cache: %w", err) 116 } 117 118 return nil 119} 120 121// Purge removes all cache entries associated with an identifier 122// This removes both handle and DID entries in a single atomic query 123func (r *postgresCache) Purge(ctx context.Context, identifier string) error { 124 identifier = normalizeIdentifier(identifier) 125 126 // Single atomic query: find related entries and delete all at once 127 // This prevents race conditions and is more efficient than multiple queries 128 query := ` 129 WITH related AS ( 130 SELECT did, handle 131 FROM identity_cache 132 WHERE identifier = $1 133 LIMIT 1 134 ) 135 DELETE FROM identity_cache 136 WHERE identifier = $1 137 OR identifier IN (SELECT did FROM related WHERE did IS NOT NULL) 138 OR identifier IN (SELECT handle FROM related WHERE handle IS NOT NULL AND handle != '') 139 ` 140 141 result, err := r.db.ExecContext(ctx, query, identifier) 142 if err != nil { 143 return fmt.Errorf("failed to purge identity cache: %w", err) 144 } 145 146 rowsAffected, err := result.RowsAffected() 147 if err == nil && rowsAffected > 0 { 148 log.Printf("[identity-cache] Purged %d entries for: %s", rowsAffected, identifier) 149 } 150 151 return nil 152} 153 154// normalizeIdentifier normalizes handles to lowercase, leaves DIDs as-is 155func normalizeIdentifier(identifier string) string { 156 identifier = strings.TrimSpace(identifier) 157 158 // DIDs are case-sensitive, handles are not 159 if strings.HasPrefix(identifier, "did:") { 160 return identifier 161 } 162 163 // It's a handle, normalize to lowercase 164 return strings.ToLower(identifier) 165}