A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "strings" 8 9 "Coves/internal/core/communities" 10) 11 12// Subscribe creates a new subscription record 13func (r *postgresCommunityRepo) Subscribe(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) { 14 query := ` 15 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid) 16 VALUES ($1, $2, $3, $4, $5) 17 RETURNING id, subscribed_at` 18 19 err := r.db.QueryRowContext(ctx, query, 20 subscription.UserDID, 21 subscription.CommunityDID, 22 subscription.SubscribedAt, 23 nullString(subscription.RecordURI), 24 nullString(subscription.RecordCID), 25 ).Scan(&subscription.ID, &subscription.SubscribedAt) 26 27 if err != nil { 28 if strings.Contains(err.Error(), "duplicate key") { 29 return nil, communities.ErrSubscriptionAlreadyExists 30 } 31 if strings.Contains(err.Error(), "foreign key") { 32 return nil, communities.ErrCommunityNotFound 33 } 34 return nil, fmt.Errorf("failed to create subscription: %w", err) 35 } 36 37 return subscription, nil 38} 39 40// SubscribeWithCount atomically creates subscription and increments subscriber count 41// This is idempotent - safe for Jetstream replays 42func (r *postgresCommunityRepo) SubscribeWithCount(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) { 43 tx, err := r.db.BeginTx(ctx, nil) 44 if err != nil { 45 return nil, fmt.Errorf("failed to begin transaction: %w", err) 46 } 47 defer tx.Rollback() 48 49 // Insert subscription with ON CONFLICT DO NOTHING for idempotency 50 query := ` 51 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid) 52 VALUES ($1, $2, $3, $4, $5) 53 ON CONFLICT (user_did, community_did) DO NOTHING 54 RETURNING id, subscribed_at` 55 56 err = tx.QueryRowContext(ctx, query, 57 subscription.UserDID, 58 subscription.CommunityDID, 59 subscription.SubscribedAt, 60 nullString(subscription.RecordURI), 61 nullString(subscription.RecordCID), 62 ).Scan(&subscription.ID, &subscription.SubscribedAt) 63 64 // If no rows returned, subscription already existed (idempotent behavior) 65 if err == sql.ErrNoRows { 66 // Get existing subscription 67 query = `SELECT id, subscribed_at FROM community_subscriptions WHERE user_did = $1 AND community_did = $2` 68 err = tx.QueryRowContext(ctx, query, subscription.UserDID, subscription.CommunityDID).Scan(&subscription.ID, &subscription.SubscribedAt) 69 if err != nil { 70 return nil, fmt.Errorf("failed to get existing subscription: %w", err) 71 } 72 // Don't increment count - subscription already existed 73 if err := tx.Commit(); err != nil { 74 return nil, fmt.Errorf("failed to commit transaction: %w", err) 75 } 76 return subscription, nil 77 } 78 79 if err != nil { 80 if strings.Contains(err.Error(), "foreign key") { 81 return nil, communities.ErrCommunityNotFound 82 } 83 return nil, fmt.Errorf("failed to create subscription: %w", err) 84 } 85 86 // Increment subscriber count only if insert succeeded 87 incrementQuery := ` 88 UPDATE communities 89 SET subscriber_count = subscriber_count + 1, updated_at = NOW() 90 WHERE did = $1` 91 92 _, err = tx.ExecContext(ctx, incrementQuery, subscription.CommunityDID) 93 if err != nil { 94 return nil, fmt.Errorf("failed to increment subscriber count: %w", err) 95 } 96 97 if err := tx.Commit(); err != nil { 98 return nil, fmt.Errorf("failed to commit transaction: %w", err) 99 } 100 101 return subscription, nil 102} 103 104// Unsubscribe removes a subscription record 105func (r *postgresCommunityRepo) Unsubscribe(ctx context.Context, userDID, communityDID string) error { 106 query := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2` 107 108 result, err := r.db.ExecContext(ctx, query, userDID, communityDID) 109 if err != nil { 110 return fmt.Errorf("failed to unsubscribe: %w", err) 111 } 112 113 rowsAffected, err := result.RowsAffected() 114 if err != nil { 115 return fmt.Errorf("failed to check unsubscribe result: %w", err) 116 } 117 118 if rowsAffected == 0 { 119 return communities.ErrSubscriptionNotFound 120 } 121 122 return nil 123} 124 125// UnsubscribeWithCount atomically removes subscription and decrements subscriber count 126// This is idempotent - safe for Jetstream replays 127func (r *postgresCommunityRepo) UnsubscribeWithCount(ctx context.Context, userDID, communityDID string) error { 128 tx, err := r.db.BeginTx(ctx, nil) 129 if err != nil { 130 return fmt.Errorf("failed to begin transaction: %w", err) 131 } 132 defer tx.Rollback() 133 134 // Delete subscription 135 deleteQuery := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2` 136 result, err := tx.ExecContext(ctx, deleteQuery, userDID, communityDID) 137 if err != nil { 138 return fmt.Errorf("failed to unsubscribe: %w", err) 139 } 140 141 rowsAffected, err := result.RowsAffected() 142 if err != nil { 143 return fmt.Errorf("failed to check unsubscribe result: %w", err) 144 } 145 146 // If no rows deleted, subscription didn't exist (idempotent - not an error) 147 if rowsAffected == 0 { 148 if err := tx.Commit(); err != nil { 149 return fmt.Errorf("failed to commit transaction: %w", err) 150 } 151 return nil 152 } 153 154 // Decrement subscriber count only if delete succeeded 155 decrementQuery := ` 156 UPDATE communities 157 SET subscriber_count = GREATEST(0, subscriber_count - 1), updated_at = NOW() 158 WHERE did = $1` 159 160 _, err = tx.ExecContext(ctx, decrementQuery, communityDID) 161 if err != nil { 162 return fmt.Errorf("failed to decrement subscriber count: %w", err) 163 } 164 165 if err := tx.Commit(); err != nil { 166 return fmt.Errorf("failed to commit transaction: %w", err) 167 } 168 169 return nil 170} 171 172// GetSubscription retrieves a specific subscription 173func (r *postgresCommunityRepo) GetSubscription(ctx context.Context, userDID, communityDID string) (*communities.Subscription, error) { 174 subscription := &communities.Subscription{} 175 query := ` 176 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid 177 FROM community_subscriptions 178 WHERE user_did = $1 AND community_did = $2` 179 180 var recordURI, recordCID sql.NullString 181 182 err := r.db.QueryRowContext(ctx, query, userDID, communityDID).Scan( 183 &subscription.ID, 184 &subscription.UserDID, 185 &subscription.CommunityDID, 186 &subscription.SubscribedAt, 187 &recordURI, 188 &recordCID, 189 ) 190 191 if err == sql.ErrNoRows { 192 return nil, communities.ErrSubscriptionNotFound 193 } 194 if err != nil { 195 return nil, fmt.Errorf("failed to get subscription: %w", err) 196 } 197 198 subscription.RecordURI = recordURI.String 199 subscription.RecordCID = recordCID.String 200 201 return subscription, nil 202} 203 204// ListSubscriptions retrieves all subscriptions for a user 205func (r *postgresCommunityRepo) ListSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*communities.Subscription, error) { 206 query := ` 207 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid 208 FROM community_subscriptions 209 WHERE user_did = $1 210 ORDER BY subscribed_at DESC 211 LIMIT $2 OFFSET $3` 212 213 rows, err := r.db.QueryContext(ctx, query, userDID, limit, offset) 214 if err != nil { 215 return nil, fmt.Errorf("failed to list subscriptions: %w", err) 216 } 217 defer rows.Close() 218 219 result := []*communities.Subscription{} 220 for rows.Next() { 221 subscription := &communities.Subscription{} 222 var recordURI, recordCID sql.NullString 223 224 err := rows.Scan( 225 &subscription.ID, 226 &subscription.UserDID, 227 &subscription.CommunityDID, 228 &subscription.SubscribedAt, 229 &recordURI, 230 &recordCID, 231 ) 232 if err != nil { 233 return nil, fmt.Errorf("failed to scan subscription: %w", err) 234 } 235 236 subscription.RecordURI = recordURI.String 237 subscription.RecordCID = recordCID.String 238 239 result = append(result, subscription) 240 } 241 242 if err = rows.Err(); err != nil { 243 return nil, fmt.Errorf("error iterating subscriptions: %w", err) 244 } 245 246 return result, nil 247} 248 249// ListSubscribers retrieves all subscribers for a community 250func (r *postgresCommunityRepo) ListSubscribers(ctx context.Context, communityDID string, limit, offset int) ([]*communities.Subscription, error) { 251 query := ` 252 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid 253 FROM community_subscriptions 254 WHERE community_did = $1 255 ORDER BY subscribed_at DESC 256 LIMIT $2 OFFSET $3` 257 258 rows, err := r.db.QueryContext(ctx, query, communityDID, limit, offset) 259 if err != nil { 260 return nil, fmt.Errorf("failed to list subscribers: %w", err) 261 } 262 defer rows.Close() 263 264 result := []*communities.Subscription{} 265 for rows.Next() { 266 subscription := &communities.Subscription{} 267 var recordURI, recordCID sql.NullString 268 269 err := rows.Scan( 270 &subscription.ID, 271 &subscription.UserDID, 272 &subscription.CommunityDID, 273 &subscription.SubscribedAt, 274 &recordURI, 275 &recordCID, 276 ) 277 if err != nil { 278 return nil, fmt.Errorf("failed to scan subscriber: %w", err) 279 } 280 281 subscription.RecordURI = recordURI.String 282 subscription.RecordCID = recordCID.String 283 284 result = append(result, subscription) 285 } 286 287 if err = rows.Err(); err != nil { 288 return nil, fmt.Errorf("error iterating subscribers: %w", err) 289 } 290 291 return result, nil 292}