A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log" 8 "strings" 9 10 "Coves/internal/core/communities" 11) 12 13// Subscribe creates a new subscription record 14func (r *postgresCommunityRepo) Subscribe(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) { 15 query := ` 16 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility) 17 VALUES ($1, $2, $3, $4, $5, $6) 18 RETURNING id, subscribed_at` 19 20 err := r.db.QueryRowContext(ctx, query, 21 subscription.UserDID, 22 subscription.CommunityDID, 23 subscription.SubscribedAt, 24 nullString(subscription.RecordURI), 25 nullString(subscription.RecordCID), 26 subscription.ContentVisibility, 27 ).Scan(&subscription.ID, &subscription.SubscribedAt) 28 if err != nil { 29 if strings.Contains(err.Error(), "duplicate key") { 30 return nil, communities.ErrSubscriptionAlreadyExists 31 } 32 if strings.Contains(err.Error(), "foreign key") { 33 return nil, communities.ErrCommunityNotFound 34 } 35 return nil, fmt.Errorf("failed to create subscription: %w", err) 36 } 37 38 return subscription, nil 39} 40 41// SubscribeWithCount atomically creates subscription and increments subscriber count 42// This is idempotent - safe for Jetstream replays 43func (r *postgresCommunityRepo) SubscribeWithCount(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) { 44 tx, err := r.db.BeginTx(ctx, nil) 45 if err != nil { 46 return nil, fmt.Errorf("failed to begin transaction: %w", err) 47 } 48 defer func() { 49 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 50 log.Printf("Failed to rollback transaction: %v", rollbackErr) 51 } 52 }() 53 54 // Insert subscription with ON CONFLICT DO NOTHING for idempotency 55 query := ` 56 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility) 57 VALUES ($1, $2, $3, $4, $5, $6) 58 ON CONFLICT (user_did, community_did) DO NOTHING 59 RETURNING id, subscribed_at, content_visibility` 60 61 err = tx.QueryRowContext(ctx, query, 62 subscription.UserDID, 63 subscription.CommunityDID, 64 subscription.SubscribedAt, 65 nullString(subscription.RecordURI), 66 nullString(subscription.RecordCID), 67 subscription.ContentVisibility, 68 ).Scan(&subscription.ID, &subscription.SubscribedAt, &subscription.ContentVisibility) 69 70 // If no rows returned, subscription already existed (idempotent behavior) 71 if err == sql.ErrNoRows { 72 // Get existing subscription 73 query = `SELECT id, subscribed_at, content_visibility FROM community_subscriptions WHERE user_did = $1 AND community_did = $2` 74 err = tx.QueryRowContext(ctx, query, subscription.UserDID, subscription.CommunityDID).Scan(&subscription.ID, &subscription.SubscribedAt, &subscription.ContentVisibility) 75 if err != nil { 76 return nil, fmt.Errorf("failed to get existing subscription: %w", err) 77 } 78 // Don't increment count - subscription already existed 79 if commitErr := tx.Commit(); commitErr != nil { 80 return nil, fmt.Errorf("failed to commit transaction: %w", commitErr) 81 } 82 return subscription, nil 83 } 84 85 if err != nil { 86 if strings.Contains(err.Error(), "foreign key") { 87 return nil, communities.ErrCommunityNotFound 88 } 89 return nil, fmt.Errorf("failed to create subscription: %w", err) 90 } 91 92 // Increment subscriber count only if insert succeeded 93 incrementQuery := ` 94 UPDATE communities 95 SET subscriber_count = subscriber_count + 1, updated_at = NOW() 96 WHERE did = $1` 97 98 _, err = tx.ExecContext(ctx, incrementQuery, subscription.CommunityDID) 99 if err != nil { 100 return nil, fmt.Errorf("failed to increment subscriber count: %w", err) 101 } 102 103 if err := tx.Commit(); err != nil { 104 return nil, fmt.Errorf("failed to commit transaction: %w", err) 105 } 106 107 return subscription, nil 108} 109 110// Unsubscribe removes a subscription record 111func (r *postgresCommunityRepo) Unsubscribe(ctx context.Context, userDID, communityDID string) error { 112 query := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2` 113 114 result, err := r.db.ExecContext(ctx, query, userDID, communityDID) 115 if err != nil { 116 return fmt.Errorf("failed to unsubscribe: %w", err) 117 } 118 119 rowsAffected, err := result.RowsAffected() 120 if err != nil { 121 return fmt.Errorf("failed to check unsubscribe result: %w", err) 122 } 123 124 if rowsAffected == 0 { 125 return communities.ErrSubscriptionNotFound 126 } 127 128 return nil 129} 130 131// UnsubscribeWithCount atomically removes subscription and decrements subscriber count 132// This is idempotent - safe for Jetstream replays 133func (r *postgresCommunityRepo) UnsubscribeWithCount(ctx context.Context, userDID, communityDID string) error { 134 tx, err := r.db.BeginTx(ctx, nil) 135 if err != nil { 136 return fmt.Errorf("failed to begin transaction: %w", err) 137 } 138 defer func() { 139 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 140 log.Printf("Failed to rollback transaction: %v", rollbackErr) 141 } 142 }() 143 144 // Delete subscription 145 deleteQuery := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2` 146 result, err := tx.ExecContext(ctx, deleteQuery, userDID, communityDID) 147 if err != nil { 148 return fmt.Errorf("failed to unsubscribe: %w", err) 149 } 150 151 rowsAffected, err := result.RowsAffected() 152 if err != nil { 153 return fmt.Errorf("failed to check unsubscribe result: %w", err) 154 } 155 156 // If no rows deleted, subscription didn't exist (idempotent - not an error) 157 if rowsAffected == 0 { 158 if commitErr := tx.Commit(); commitErr != nil { 159 return fmt.Errorf("failed to commit transaction: %w", commitErr) 160 } 161 return nil 162 } 163 164 // Decrement subscriber count only if delete succeeded 165 decrementQuery := ` 166 UPDATE communities 167 SET subscriber_count = GREATEST(0, subscriber_count - 1), updated_at = NOW() 168 WHERE did = $1` 169 170 _, err = tx.ExecContext(ctx, decrementQuery, communityDID) 171 if err != nil { 172 return fmt.Errorf("failed to decrement subscriber count: %w", err) 173 } 174 175 if err := tx.Commit(); err != nil { 176 return fmt.Errorf("failed to commit transaction: %w", err) 177 } 178 179 return nil 180} 181 182// GetSubscription retrieves a specific subscription 183func (r *postgresCommunityRepo) GetSubscription(ctx context.Context, userDID, communityDID string) (*communities.Subscription, error) { 184 subscription := &communities.Subscription{} 185 query := ` 186 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility 187 FROM community_subscriptions 188 WHERE user_did = $1 AND community_did = $2` 189 190 var recordURI, recordCID sql.NullString 191 192 err := r.db.QueryRowContext(ctx, query, userDID, communityDID).Scan( 193 &subscription.ID, 194 &subscription.UserDID, 195 &subscription.CommunityDID, 196 &subscription.SubscribedAt, 197 &recordURI, 198 &recordCID, 199 &subscription.ContentVisibility, 200 ) 201 202 if err == sql.ErrNoRows { 203 return nil, communities.ErrSubscriptionNotFound 204 } 205 if err != nil { 206 return nil, fmt.Errorf("failed to get subscription: %w", err) 207 } 208 209 subscription.RecordURI = recordURI.String 210 subscription.RecordCID = recordCID.String 211 212 return subscription, nil 213} 214 215// GetSubscriptionByURI retrieves a subscription by its AT-URI 216// This is used by Jetstream consumer for DELETE operations (which don't include record data) 217func (r *postgresCommunityRepo) GetSubscriptionByURI(ctx context.Context, recordURI string) (*communities.Subscription, error) { 218 subscription := &communities.Subscription{} 219 query := ` 220 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility 221 FROM community_subscriptions 222 WHERE record_uri = $1` 223 224 var uri, cid sql.NullString 225 226 err := r.db.QueryRowContext(ctx, query, recordURI).Scan( 227 &subscription.ID, 228 &subscription.UserDID, 229 &subscription.CommunityDID, 230 &subscription.SubscribedAt, 231 &uri, 232 &cid, 233 &subscription.ContentVisibility, 234 ) 235 236 if err == sql.ErrNoRows { 237 return nil, communities.ErrSubscriptionNotFound 238 } 239 if err != nil { 240 return nil, fmt.Errorf("failed to get subscription by URI: %w", err) 241 } 242 243 subscription.RecordURI = uri.String 244 subscription.RecordCID = cid.String 245 246 return subscription, nil 247} 248 249// ListSubscriptions retrieves all subscriptions for a user 250func (r *postgresCommunityRepo) ListSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*communities.Subscription, error) { 251 query := ` 252 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility 253 FROM community_subscriptions 254 WHERE user_did = $1 255 ORDER BY subscribed_at DESC 256 LIMIT $2 OFFSET $3` 257 258 rows, err := r.db.QueryContext(ctx, query, userDID, limit, offset) 259 if err != nil { 260 return nil, fmt.Errorf("failed to list subscriptions: %w", err) 261 } 262 defer func() { 263 if closeErr := rows.Close(); closeErr != nil { 264 log.Printf("Failed to close rows: %v", closeErr) 265 } 266 }() 267 268 result := []*communities.Subscription{} 269 for rows.Next() { 270 subscription := &communities.Subscription{} 271 var recordURI, recordCID sql.NullString 272 273 scanErr := rows.Scan( 274 &subscription.ID, 275 &subscription.UserDID, 276 &subscription.CommunityDID, 277 &subscription.SubscribedAt, 278 &recordURI, 279 &recordCID, 280 &subscription.ContentVisibility, 281 ) 282 if scanErr != nil { 283 return nil, fmt.Errorf("failed to scan subscription: %w", scanErr) 284 } 285 286 subscription.RecordURI = recordURI.String 287 subscription.RecordCID = recordCID.String 288 289 result = append(result, subscription) 290 } 291 292 if err = rows.Err(); err != nil { 293 return nil, fmt.Errorf("error iterating subscriptions: %w", err) 294 } 295 296 return result, nil 297} 298 299// ListSubscribers retrieves all subscribers for a community 300func (r *postgresCommunityRepo) ListSubscribers(ctx context.Context, communityDID string, limit, offset int) ([]*communities.Subscription, error) { 301 query := ` 302 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility 303 FROM community_subscriptions 304 WHERE community_did = $1 305 ORDER BY subscribed_at DESC 306 LIMIT $2 OFFSET $3` 307 308 rows, err := r.db.QueryContext(ctx, query, communityDID, limit, offset) 309 if err != nil { 310 return nil, fmt.Errorf("failed to list subscribers: %w", err) 311 } 312 defer func() { 313 if closeErr := rows.Close(); closeErr != nil { 314 log.Printf("Failed to close rows: %v", closeErr) 315 } 316 }() 317 318 result := []*communities.Subscription{} 319 for rows.Next() { 320 subscription := &communities.Subscription{} 321 var recordURI, recordCID sql.NullString 322 323 scanErr := rows.Scan( 324 &subscription.ID, 325 &subscription.UserDID, 326 &subscription.CommunityDID, 327 &subscription.SubscribedAt, 328 &recordURI, 329 &recordCID, 330 &subscription.ContentVisibility, 331 ) 332 if scanErr != nil { 333 return nil, fmt.Errorf("failed to scan subscriber: %w", scanErr) 334 } 335 336 subscription.RecordURI = recordURI.String 337 subscription.RecordCID = recordCID.String 338 339 result = append(result, subscription) 340 } 341 342 if err = rows.Err(); err != nil { 343 return nil, fmt.Errorf("error iterating subscribers: %w", err) 344 } 345 346 return result, nil 347}