A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/communities" 5 "context" 6 "database/sql" 7 "fmt" 8 "log" 9 "strings" 10) 11 12// Subscribe creates a new subscription record 13func (r *postgresCommunityRepo) Subscribe(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) { 14 query := ` 15 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility) 16 VALUES ($1, $2, $3, $4, $5, $6) 17 RETURNING id, subscribed_at` 18 19 err := r.db.QueryRowContext(ctx, query, 20 subscription.UserDID, 21 subscription.CommunityDID, 22 subscription.SubscribedAt, 23 nullString(subscription.RecordURI), 24 nullString(subscription.RecordCID), 25 subscription.ContentVisibility, 26 ).Scan(&subscription.ID, &subscription.SubscribedAt) 27 if err != nil { 28 if strings.Contains(err.Error(), "duplicate key") { 29 return nil, communities.ErrSubscriptionAlreadyExists 30 } 31 if strings.Contains(err.Error(), "foreign key") { 32 return nil, communities.ErrCommunityNotFound 33 } 34 return nil, fmt.Errorf("failed to create subscription: %w", err) 35 } 36 37 return subscription, nil 38} 39 40// SubscribeWithCount atomically creates subscription and increments subscriber count 41// This is idempotent - safe for Jetstream replays 42func (r *postgresCommunityRepo) SubscribeWithCount(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) { 43 tx, err := r.db.BeginTx(ctx, nil) 44 if err != nil { 45 return nil, fmt.Errorf("failed to begin transaction: %w", err) 46 } 47 defer func() { 48 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 49 log.Printf("Failed to rollback transaction: %v", rollbackErr) 50 } 51 }() 52 53 // Insert subscription with ON CONFLICT DO NOTHING for idempotency 54 query := ` 55 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility) 56 VALUES ($1, $2, $3, $4, $5, $6) 57 ON CONFLICT (user_did, community_did) DO NOTHING 58 RETURNING id, subscribed_at, content_visibility` 59 60 err = tx.QueryRowContext(ctx, query, 61 subscription.UserDID, 62 subscription.CommunityDID, 63 subscription.SubscribedAt, 64 nullString(subscription.RecordURI), 65 nullString(subscription.RecordCID), 66 subscription.ContentVisibility, 67 ).Scan(&subscription.ID, &subscription.SubscribedAt, &subscription.ContentVisibility) 68 69 // If no rows returned, subscription already existed (idempotent behavior) 70 if err == sql.ErrNoRows { 71 // Get existing subscription 72 query = `SELECT id, subscribed_at, content_visibility FROM community_subscriptions WHERE user_did = $1 AND community_did = $2` 73 err = tx.QueryRowContext(ctx, query, subscription.UserDID, subscription.CommunityDID).Scan(&subscription.ID, &subscription.SubscribedAt, &subscription.ContentVisibility) 74 if err != nil { 75 return nil, fmt.Errorf("failed to get existing subscription: %w", err) 76 } 77 // Don't increment count - subscription already existed 78 if commitErr := tx.Commit(); commitErr != nil { 79 return nil, fmt.Errorf("failed to commit transaction: %w", commitErr) 80 } 81 return subscription, nil 82 } 83 84 if err != nil { 85 if strings.Contains(err.Error(), "foreign key") { 86 return nil, communities.ErrCommunityNotFound 87 } 88 return nil, fmt.Errorf("failed to create subscription: %w", err) 89 } 90 91 // Increment subscriber count only if insert succeeded 92 incrementQuery := ` 93 UPDATE communities 94 SET subscriber_count = subscriber_count + 1, updated_at = NOW() 95 WHERE did = $1` 96 97 _, err = tx.ExecContext(ctx, incrementQuery, subscription.CommunityDID) 98 if err != nil { 99 return nil, fmt.Errorf("failed to increment subscriber count: %w", err) 100 } 101 102 if err := tx.Commit(); err != nil { 103 return nil, fmt.Errorf("failed to commit transaction: %w", err) 104 } 105 106 return subscription, nil 107} 108 109// Unsubscribe removes a subscription record 110func (r *postgresCommunityRepo) Unsubscribe(ctx context.Context, userDID, communityDID string) error { 111 query := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2` 112 113 result, err := r.db.ExecContext(ctx, query, userDID, communityDID) 114 if err != nil { 115 return fmt.Errorf("failed to unsubscribe: %w", err) 116 } 117 118 rowsAffected, err := result.RowsAffected() 119 if err != nil { 120 return fmt.Errorf("failed to check unsubscribe result: %w", err) 121 } 122 123 if rowsAffected == 0 { 124 return communities.ErrSubscriptionNotFound 125 } 126 127 return nil 128} 129 130// UnsubscribeWithCount atomically removes subscription and decrements subscriber count 131// This is idempotent - safe for Jetstream replays 132func (r *postgresCommunityRepo) UnsubscribeWithCount(ctx context.Context, userDID, communityDID string) error { 133 tx, err := r.db.BeginTx(ctx, nil) 134 if err != nil { 135 return fmt.Errorf("failed to begin transaction: %w", err) 136 } 137 defer func() { 138 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 139 log.Printf("Failed to rollback transaction: %v", rollbackErr) 140 } 141 }() 142 143 // Delete subscription 144 deleteQuery := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2` 145 result, err := tx.ExecContext(ctx, deleteQuery, userDID, communityDID) 146 if err != nil { 147 return fmt.Errorf("failed to unsubscribe: %w", err) 148 } 149 150 rowsAffected, err := result.RowsAffected() 151 if err != nil { 152 return fmt.Errorf("failed to check unsubscribe result: %w", err) 153 } 154 155 // If no rows deleted, subscription didn't exist (idempotent - not an error) 156 if rowsAffected == 0 { 157 if commitErr := tx.Commit(); commitErr != nil { 158 return fmt.Errorf("failed to commit transaction: %w", commitErr) 159 } 160 return nil 161 } 162 163 // Decrement subscriber count only if delete succeeded 164 decrementQuery := ` 165 UPDATE communities 166 SET subscriber_count = GREATEST(0, subscriber_count - 1), updated_at = NOW() 167 WHERE did = $1` 168 169 _, err = tx.ExecContext(ctx, decrementQuery, communityDID) 170 if err != nil { 171 return fmt.Errorf("failed to decrement subscriber count: %w", err) 172 } 173 174 if err := tx.Commit(); err != nil { 175 return fmt.Errorf("failed to commit transaction: %w", err) 176 } 177 178 return nil 179} 180 181// GetSubscription retrieves a specific subscription 182func (r *postgresCommunityRepo) GetSubscription(ctx context.Context, userDID, communityDID string) (*communities.Subscription, error) { 183 subscription := &communities.Subscription{} 184 query := ` 185 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility 186 FROM community_subscriptions 187 WHERE user_did = $1 AND community_did = $2` 188 189 var recordURI, recordCID sql.NullString 190 191 err := r.db.QueryRowContext(ctx, query, userDID, communityDID).Scan( 192 &subscription.ID, 193 &subscription.UserDID, 194 &subscription.CommunityDID, 195 &subscription.SubscribedAt, 196 &recordURI, 197 &recordCID, 198 &subscription.ContentVisibility, 199 ) 200 201 if err == sql.ErrNoRows { 202 return nil, communities.ErrSubscriptionNotFound 203 } 204 if err != nil { 205 return nil, fmt.Errorf("failed to get subscription: %w", err) 206 } 207 208 subscription.RecordURI = recordURI.String 209 subscription.RecordCID = recordCID.String 210 211 return subscription, nil 212} 213 214// GetSubscriptionByURI retrieves a subscription by its AT-URI 215// This is used by Jetstream consumer for DELETE operations (which don't include record data) 216func (r *postgresCommunityRepo) GetSubscriptionByURI(ctx context.Context, recordURI string) (*communities.Subscription, error) { 217 subscription := &communities.Subscription{} 218 query := ` 219 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility 220 FROM community_subscriptions 221 WHERE record_uri = $1` 222 223 var uri, cid sql.NullString 224 225 err := r.db.QueryRowContext(ctx, query, recordURI).Scan( 226 &subscription.ID, 227 &subscription.UserDID, 228 &subscription.CommunityDID, 229 &subscription.SubscribedAt, 230 &uri, 231 &cid, 232 &subscription.ContentVisibility, 233 ) 234 235 if err == sql.ErrNoRows { 236 return nil, communities.ErrSubscriptionNotFound 237 } 238 if err != nil { 239 return nil, fmt.Errorf("failed to get subscription by URI: %w", err) 240 } 241 242 subscription.RecordURI = uri.String 243 subscription.RecordCID = cid.String 244 245 return subscription, nil 246} 247 248// ListSubscriptions retrieves all subscriptions for a user 249func (r *postgresCommunityRepo) ListSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*communities.Subscription, error) { 250 query := ` 251 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility 252 FROM community_subscriptions 253 WHERE user_did = $1 254 ORDER BY subscribed_at DESC 255 LIMIT $2 OFFSET $3` 256 257 rows, err := r.db.QueryContext(ctx, query, userDID, limit, offset) 258 if err != nil { 259 return nil, fmt.Errorf("failed to list subscriptions: %w", err) 260 } 261 defer func() { 262 if closeErr := rows.Close(); closeErr != nil { 263 log.Printf("Failed to close rows: %v", closeErr) 264 } 265 }() 266 267 result := []*communities.Subscription{} 268 for rows.Next() { 269 subscription := &communities.Subscription{} 270 var recordURI, recordCID sql.NullString 271 272 scanErr := rows.Scan( 273 &subscription.ID, 274 &subscription.UserDID, 275 &subscription.CommunityDID, 276 &subscription.SubscribedAt, 277 &recordURI, 278 &recordCID, 279 &subscription.ContentVisibility, 280 ) 281 if scanErr != nil { 282 return nil, fmt.Errorf("failed to scan subscription: %w", scanErr) 283 } 284 285 subscription.RecordURI = recordURI.String 286 subscription.RecordCID = recordCID.String 287 288 result = append(result, subscription) 289 } 290 291 if err = rows.Err(); err != nil { 292 return nil, fmt.Errorf("error iterating subscriptions: %w", err) 293 } 294 295 return result, nil 296} 297 298// ListSubscribers retrieves all subscribers for a community 299func (r *postgresCommunityRepo) ListSubscribers(ctx context.Context, communityDID string, limit, offset int) ([]*communities.Subscription, error) { 300 query := ` 301 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility 302 FROM community_subscriptions 303 WHERE community_did = $1 304 ORDER BY subscribed_at DESC 305 LIMIT $2 OFFSET $3` 306 307 rows, err := r.db.QueryContext(ctx, query, communityDID, limit, offset) 308 if err != nil { 309 return nil, fmt.Errorf("failed to list subscribers: %w", err) 310 } 311 defer func() { 312 if closeErr := rows.Close(); closeErr != nil { 313 log.Printf("Failed to close rows: %v", closeErr) 314 } 315 }() 316 317 result := []*communities.Subscription{} 318 for rows.Next() { 319 subscription := &communities.Subscription{} 320 var recordURI, recordCID sql.NullString 321 322 scanErr := rows.Scan( 323 &subscription.ID, 324 &subscription.UserDID, 325 &subscription.CommunityDID, 326 &subscription.SubscribedAt, 327 &recordURI, 328 &recordCID, 329 &subscription.ContentVisibility, 330 ) 331 if scanErr != nil { 332 return nil, fmt.Errorf("failed to scan subscriber: %w", scanErr) 333 } 334 335 subscription.RecordURI = recordURI.String 336 subscription.RecordCID = recordCID.String 337 338 result = append(result, subscription) 339 } 340 341 if err = rows.Err(); err != nil { 342 return nil, fmt.Errorf("error iterating subscribers: %w", err) 343 } 344 345 return result, nil 346}