A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/communities" 5 "context" 6 "database/sql" 7 "fmt" 8 "log" 9 "strings" 10) 11 12// Subscribe creates a new subscription record 13func (r *postgresCommunityRepo) Subscribe(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) { 14 query := ` 15 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid) 16 VALUES ($1, $2, $3, $4, $5) 17 RETURNING id, subscribed_at` 18 19 err := r.db.QueryRowContext(ctx, query, 20 subscription.UserDID, 21 subscription.CommunityDID, 22 subscription.SubscribedAt, 23 nullString(subscription.RecordURI), 24 nullString(subscription.RecordCID), 25 ).Scan(&subscription.ID, &subscription.SubscribedAt) 26 if err != nil { 27 if strings.Contains(err.Error(), "duplicate key") { 28 return nil, communities.ErrSubscriptionAlreadyExists 29 } 30 if strings.Contains(err.Error(), "foreign key") { 31 return nil, communities.ErrCommunityNotFound 32 } 33 return nil, fmt.Errorf("failed to create subscription: %w", err) 34 } 35 36 return subscription, nil 37} 38 39// SubscribeWithCount atomically creates subscription and increments subscriber count 40// This is idempotent - safe for Jetstream replays 41func (r *postgresCommunityRepo) SubscribeWithCount(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) { 42 tx, err := r.db.BeginTx(ctx, nil) 43 if err != nil { 44 return nil, fmt.Errorf("failed to begin transaction: %w", err) 45 } 46 defer func() { 47 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 48 log.Printf("Failed to rollback transaction: %v", rollbackErr) 49 } 50 }() 51 52 // Insert subscription with ON CONFLICT DO NOTHING for idempotency 53 query := ` 54 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid) 55 VALUES ($1, $2, $3, $4, $5) 56 ON CONFLICT (user_did, community_did) DO NOTHING 57 RETURNING id, subscribed_at` 58 59 err = tx.QueryRowContext(ctx, query, 60 subscription.UserDID, 61 subscription.CommunityDID, 62 subscription.SubscribedAt, 63 nullString(subscription.RecordURI), 64 nullString(subscription.RecordCID), 65 ).Scan(&subscription.ID, &subscription.SubscribedAt) 66 67 // If no rows returned, subscription already existed (idempotent behavior) 68 if err == sql.ErrNoRows { 69 // Get existing subscription 70 query = `SELECT id, subscribed_at FROM community_subscriptions WHERE user_did = $1 AND community_did = $2` 71 err = tx.QueryRowContext(ctx, query, subscription.UserDID, subscription.CommunityDID).Scan(&subscription.ID, &subscription.SubscribedAt) 72 if err != nil { 73 return nil, fmt.Errorf("failed to get existing subscription: %w", err) 74 } 75 // Don't increment count - subscription already existed 76 if commitErr := tx.Commit(); commitErr != nil { 77 return nil, fmt.Errorf("failed to commit transaction: %w", commitErr) 78 } 79 return subscription, nil 80 } 81 82 if err != nil { 83 if strings.Contains(err.Error(), "foreign key") { 84 return nil, communities.ErrCommunityNotFound 85 } 86 return nil, fmt.Errorf("failed to create subscription: %w", err) 87 } 88 89 // Increment subscriber count only if insert succeeded 90 incrementQuery := ` 91 UPDATE communities 92 SET subscriber_count = subscriber_count + 1, updated_at = NOW() 93 WHERE did = $1` 94 95 _, err = tx.ExecContext(ctx, incrementQuery, subscription.CommunityDID) 96 if err != nil { 97 return nil, fmt.Errorf("failed to increment subscriber count: %w", err) 98 } 99 100 if err := tx.Commit(); err != nil { 101 return nil, fmt.Errorf("failed to commit transaction: %w", err) 102 } 103 104 return subscription, nil 105} 106 107// Unsubscribe removes a subscription record 108func (r *postgresCommunityRepo) Unsubscribe(ctx context.Context, userDID, communityDID string) error { 109 query := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2` 110 111 result, err := r.db.ExecContext(ctx, query, userDID, communityDID) 112 if err != nil { 113 return fmt.Errorf("failed to unsubscribe: %w", err) 114 } 115 116 rowsAffected, err := result.RowsAffected() 117 if err != nil { 118 return fmt.Errorf("failed to check unsubscribe result: %w", err) 119 } 120 121 if rowsAffected == 0 { 122 return communities.ErrSubscriptionNotFound 123 } 124 125 return nil 126} 127 128// UnsubscribeWithCount atomically removes subscription and decrements subscriber count 129// This is idempotent - safe for Jetstream replays 130func (r *postgresCommunityRepo) UnsubscribeWithCount(ctx context.Context, userDID, communityDID string) error { 131 tx, err := r.db.BeginTx(ctx, nil) 132 if err != nil { 133 return fmt.Errorf("failed to begin transaction: %w", err) 134 } 135 defer func() { 136 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 137 log.Printf("Failed to rollback transaction: %v", rollbackErr) 138 } 139 }() 140 141 // Delete subscription 142 deleteQuery := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2` 143 result, err := tx.ExecContext(ctx, deleteQuery, userDID, communityDID) 144 if err != nil { 145 return fmt.Errorf("failed to unsubscribe: %w", err) 146 } 147 148 rowsAffected, err := result.RowsAffected() 149 if err != nil { 150 return fmt.Errorf("failed to check unsubscribe result: %w", err) 151 } 152 153 // If no rows deleted, subscription didn't exist (idempotent - not an error) 154 if rowsAffected == 0 { 155 if commitErr := tx.Commit(); commitErr != nil { 156 return fmt.Errorf("failed to commit transaction: %w", commitErr) 157 } 158 return nil 159 } 160 161 // Decrement subscriber count only if delete succeeded 162 decrementQuery := ` 163 UPDATE communities 164 SET subscriber_count = GREATEST(0, subscriber_count - 1), updated_at = NOW() 165 WHERE did = $1` 166 167 _, err = tx.ExecContext(ctx, decrementQuery, communityDID) 168 if err != nil { 169 return fmt.Errorf("failed to decrement subscriber count: %w", err) 170 } 171 172 if err := tx.Commit(); err != nil { 173 return fmt.Errorf("failed to commit transaction: %w", err) 174 } 175 176 return nil 177} 178 179// GetSubscription retrieves a specific subscription 180func (r *postgresCommunityRepo) GetSubscription(ctx context.Context, userDID, communityDID string) (*communities.Subscription, error) { 181 subscription := &communities.Subscription{} 182 query := ` 183 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid 184 FROM community_subscriptions 185 WHERE user_did = $1 AND community_did = $2` 186 187 var recordURI, recordCID sql.NullString 188 189 err := r.db.QueryRowContext(ctx, query, userDID, communityDID).Scan( 190 &subscription.ID, 191 &subscription.UserDID, 192 &subscription.CommunityDID, 193 &subscription.SubscribedAt, 194 &recordURI, 195 &recordCID, 196 ) 197 198 if err == sql.ErrNoRows { 199 return nil, communities.ErrSubscriptionNotFound 200 } 201 if err != nil { 202 return nil, fmt.Errorf("failed to get subscription: %w", err) 203 } 204 205 subscription.RecordURI = recordURI.String 206 subscription.RecordCID = recordCID.String 207 208 return subscription, nil 209} 210 211// ListSubscriptions retrieves all subscriptions for a user 212func (r *postgresCommunityRepo) ListSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*communities.Subscription, error) { 213 query := ` 214 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid 215 FROM community_subscriptions 216 WHERE user_did = $1 217 ORDER BY subscribed_at DESC 218 LIMIT $2 OFFSET $3` 219 220 rows, err := r.db.QueryContext(ctx, query, userDID, limit, offset) 221 if err != nil { 222 return nil, fmt.Errorf("failed to list subscriptions: %w", err) 223 } 224 defer func() { 225 if closeErr := rows.Close(); closeErr != nil { 226 log.Printf("Failed to close rows: %v", closeErr) 227 } 228 }() 229 230 result := []*communities.Subscription{} 231 for rows.Next() { 232 subscription := &communities.Subscription{} 233 var recordURI, recordCID sql.NullString 234 235 scanErr := rows.Scan( 236 &subscription.ID, 237 &subscription.UserDID, 238 &subscription.CommunityDID, 239 &subscription.SubscribedAt, 240 &recordURI, 241 &recordCID, 242 ) 243 if scanErr != nil { 244 return nil, fmt.Errorf("failed to scan subscription: %w", scanErr) 245 } 246 247 subscription.RecordURI = recordURI.String 248 subscription.RecordCID = recordCID.String 249 250 result = append(result, subscription) 251 } 252 253 if err = rows.Err(); err != nil { 254 return nil, fmt.Errorf("error iterating subscriptions: %w", err) 255 } 256 257 return result, nil 258} 259 260// ListSubscribers retrieves all subscribers for a community 261func (r *postgresCommunityRepo) ListSubscribers(ctx context.Context, communityDID string, limit, offset int) ([]*communities.Subscription, error) { 262 query := ` 263 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid 264 FROM community_subscriptions 265 WHERE community_did = $1 266 ORDER BY subscribed_at DESC 267 LIMIT $2 OFFSET $3` 268 269 rows, err := r.db.QueryContext(ctx, query, communityDID, limit, offset) 270 if err != nil { 271 return nil, fmt.Errorf("failed to list subscribers: %w", err) 272 } 273 defer func() { 274 if closeErr := rows.Close(); closeErr != nil { 275 log.Printf("Failed to close rows: %v", closeErr) 276 } 277 }() 278 279 result := []*communities.Subscription{} 280 for rows.Next() { 281 subscription := &communities.Subscription{} 282 var recordURI, recordCID sql.NullString 283 284 scanErr := rows.Scan( 285 &subscription.ID, 286 &subscription.UserDID, 287 &subscription.CommunityDID, 288 &subscription.SubscribedAt, 289 &recordURI, 290 &recordCID, 291 ) 292 if scanErr != nil { 293 return nil, fmt.Errorf("failed to scan subscriber: %w", scanErr) 294 } 295 296 subscription.RecordURI = recordURI.String 297 subscription.RecordCID = recordCID.String 298 299 result = append(result, subscription) 300 } 301 302 if err = rows.Err(); err != nil { 303 return nil, fmt.Errorf("error iterating subscribers: %w", err) 304 } 305 306 return result, nil 307}