A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/communities"
5 "context"
6 "database/sql"
7 "fmt"
8 "log"
9 "strings"
10)
11
12// Subscribe creates a new subscription record
13func (r *postgresCommunityRepo) Subscribe(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) {
14 query := `
15 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid)
16 VALUES ($1, $2, $3, $4, $5)
17 RETURNING id, subscribed_at`
18
19 err := r.db.QueryRowContext(ctx, query,
20 subscription.UserDID,
21 subscription.CommunityDID,
22 subscription.SubscribedAt,
23 nullString(subscription.RecordURI),
24 nullString(subscription.RecordCID),
25 ).Scan(&subscription.ID, &subscription.SubscribedAt)
26 if err != nil {
27 if strings.Contains(err.Error(), "duplicate key") {
28 return nil, communities.ErrSubscriptionAlreadyExists
29 }
30 if strings.Contains(err.Error(), "foreign key") {
31 return nil, communities.ErrCommunityNotFound
32 }
33 return nil, fmt.Errorf("failed to create subscription: %w", err)
34 }
35
36 return subscription, nil
37}
38
39// SubscribeWithCount atomically creates subscription and increments subscriber count
40// This is idempotent - safe for Jetstream replays
41func (r *postgresCommunityRepo) SubscribeWithCount(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) {
42 tx, err := r.db.BeginTx(ctx, nil)
43 if err != nil {
44 return nil, fmt.Errorf("failed to begin transaction: %w", err)
45 }
46 defer func() {
47 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
48 log.Printf("Failed to rollback transaction: %v", rollbackErr)
49 }
50 }()
51
52 // Insert subscription with ON CONFLICT DO NOTHING for idempotency
53 query := `
54 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid)
55 VALUES ($1, $2, $3, $4, $5)
56 ON CONFLICT (user_did, community_did) DO NOTHING
57 RETURNING id, subscribed_at`
58
59 err = tx.QueryRowContext(ctx, query,
60 subscription.UserDID,
61 subscription.CommunityDID,
62 subscription.SubscribedAt,
63 nullString(subscription.RecordURI),
64 nullString(subscription.RecordCID),
65 ).Scan(&subscription.ID, &subscription.SubscribedAt)
66
67 // If no rows returned, subscription already existed (idempotent behavior)
68 if err == sql.ErrNoRows {
69 // Get existing subscription
70 query = `SELECT id, subscribed_at FROM community_subscriptions WHERE user_did = $1 AND community_did = $2`
71 err = tx.QueryRowContext(ctx, query, subscription.UserDID, subscription.CommunityDID).Scan(&subscription.ID, &subscription.SubscribedAt)
72 if err != nil {
73 return nil, fmt.Errorf("failed to get existing subscription: %w", err)
74 }
75 // Don't increment count - subscription already existed
76 if commitErr := tx.Commit(); commitErr != nil {
77 return nil, fmt.Errorf("failed to commit transaction: %w", commitErr)
78 }
79 return subscription, nil
80 }
81
82 if err != nil {
83 if strings.Contains(err.Error(), "foreign key") {
84 return nil, communities.ErrCommunityNotFound
85 }
86 return nil, fmt.Errorf("failed to create subscription: %w", err)
87 }
88
89 // Increment subscriber count only if insert succeeded
90 incrementQuery := `
91 UPDATE communities
92 SET subscriber_count = subscriber_count + 1, updated_at = NOW()
93 WHERE did = $1`
94
95 _, err = tx.ExecContext(ctx, incrementQuery, subscription.CommunityDID)
96 if err != nil {
97 return nil, fmt.Errorf("failed to increment subscriber count: %w", err)
98 }
99
100 if err := tx.Commit(); err != nil {
101 return nil, fmt.Errorf("failed to commit transaction: %w", err)
102 }
103
104 return subscription, nil
105}
106
107// Unsubscribe removes a subscription record
108func (r *postgresCommunityRepo) Unsubscribe(ctx context.Context, userDID, communityDID string) error {
109 query := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2`
110
111 result, err := r.db.ExecContext(ctx, query, userDID, communityDID)
112 if err != nil {
113 return fmt.Errorf("failed to unsubscribe: %w", err)
114 }
115
116 rowsAffected, err := result.RowsAffected()
117 if err != nil {
118 return fmt.Errorf("failed to check unsubscribe result: %w", err)
119 }
120
121 if rowsAffected == 0 {
122 return communities.ErrSubscriptionNotFound
123 }
124
125 return nil
126}
127
128// UnsubscribeWithCount atomically removes subscription and decrements subscriber count
129// This is idempotent - safe for Jetstream replays
130func (r *postgresCommunityRepo) UnsubscribeWithCount(ctx context.Context, userDID, communityDID string) error {
131 tx, err := r.db.BeginTx(ctx, nil)
132 if err != nil {
133 return fmt.Errorf("failed to begin transaction: %w", err)
134 }
135 defer func() {
136 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
137 log.Printf("Failed to rollback transaction: %v", rollbackErr)
138 }
139 }()
140
141 // Delete subscription
142 deleteQuery := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2`
143 result, err := tx.ExecContext(ctx, deleteQuery, userDID, communityDID)
144 if err != nil {
145 return fmt.Errorf("failed to unsubscribe: %w", err)
146 }
147
148 rowsAffected, err := result.RowsAffected()
149 if err != nil {
150 return fmt.Errorf("failed to check unsubscribe result: %w", err)
151 }
152
153 // If no rows deleted, subscription didn't exist (idempotent - not an error)
154 if rowsAffected == 0 {
155 if commitErr := tx.Commit(); commitErr != nil {
156 return fmt.Errorf("failed to commit transaction: %w", commitErr)
157 }
158 return nil
159 }
160
161 // Decrement subscriber count only if delete succeeded
162 decrementQuery := `
163 UPDATE communities
164 SET subscriber_count = GREATEST(0, subscriber_count - 1), updated_at = NOW()
165 WHERE did = $1`
166
167 _, err = tx.ExecContext(ctx, decrementQuery, communityDID)
168 if err != nil {
169 return fmt.Errorf("failed to decrement subscriber count: %w", err)
170 }
171
172 if err := tx.Commit(); err != nil {
173 return fmt.Errorf("failed to commit transaction: %w", err)
174 }
175
176 return nil
177}
178
179// GetSubscription retrieves a specific subscription
180func (r *postgresCommunityRepo) GetSubscription(ctx context.Context, userDID, communityDID string) (*communities.Subscription, error) {
181 subscription := &communities.Subscription{}
182 query := `
183 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid
184 FROM community_subscriptions
185 WHERE user_did = $1 AND community_did = $2`
186
187 var recordURI, recordCID sql.NullString
188
189 err := r.db.QueryRowContext(ctx, query, userDID, communityDID).Scan(
190 &subscription.ID,
191 &subscription.UserDID,
192 &subscription.CommunityDID,
193 &subscription.SubscribedAt,
194 &recordURI,
195 &recordCID,
196 )
197
198 if err == sql.ErrNoRows {
199 return nil, communities.ErrSubscriptionNotFound
200 }
201 if err != nil {
202 return nil, fmt.Errorf("failed to get subscription: %w", err)
203 }
204
205 subscription.RecordURI = recordURI.String
206 subscription.RecordCID = recordCID.String
207
208 return subscription, nil
209}
210
211// ListSubscriptions retrieves all subscriptions for a user
212func (r *postgresCommunityRepo) ListSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*communities.Subscription, error) {
213 query := `
214 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid
215 FROM community_subscriptions
216 WHERE user_did = $1
217 ORDER BY subscribed_at DESC
218 LIMIT $2 OFFSET $3`
219
220 rows, err := r.db.QueryContext(ctx, query, userDID, limit, offset)
221 if err != nil {
222 return nil, fmt.Errorf("failed to list subscriptions: %w", err)
223 }
224 defer func() {
225 if closeErr := rows.Close(); closeErr != nil {
226 log.Printf("Failed to close rows: %v", closeErr)
227 }
228 }()
229
230 result := []*communities.Subscription{}
231 for rows.Next() {
232 subscription := &communities.Subscription{}
233 var recordURI, recordCID sql.NullString
234
235 scanErr := rows.Scan(
236 &subscription.ID,
237 &subscription.UserDID,
238 &subscription.CommunityDID,
239 &subscription.SubscribedAt,
240 &recordURI,
241 &recordCID,
242 )
243 if scanErr != nil {
244 return nil, fmt.Errorf("failed to scan subscription: %w", scanErr)
245 }
246
247 subscription.RecordURI = recordURI.String
248 subscription.RecordCID = recordCID.String
249
250 result = append(result, subscription)
251 }
252
253 if err = rows.Err(); err != nil {
254 return nil, fmt.Errorf("error iterating subscriptions: %w", err)
255 }
256
257 return result, nil
258}
259
260// ListSubscribers retrieves all subscribers for a community
261func (r *postgresCommunityRepo) ListSubscribers(ctx context.Context, communityDID string, limit, offset int) ([]*communities.Subscription, error) {
262 query := `
263 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid
264 FROM community_subscriptions
265 WHERE community_did = $1
266 ORDER BY subscribed_at DESC
267 LIMIT $2 OFFSET $3`
268
269 rows, err := r.db.QueryContext(ctx, query, communityDID, limit, offset)
270 if err != nil {
271 return nil, fmt.Errorf("failed to list subscribers: %w", err)
272 }
273 defer func() {
274 if closeErr := rows.Close(); closeErr != nil {
275 log.Printf("Failed to close rows: %v", closeErr)
276 }
277 }()
278
279 result := []*communities.Subscription{}
280 for rows.Next() {
281 subscription := &communities.Subscription{}
282 var recordURI, recordCID sql.NullString
283
284 scanErr := rows.Scan(
285 &subscription.ID,
286 &subscription.UserDID,
287 &subscription.CommunityDID,
288 &subscription.SubscribedAt,
289 &recordURI,
290 &recordCID,
291 )
292 if scanErr != nil {
293 return nil, fmt.Errorf("failed to scan subscriber: %w", scanErr)
294 }
295
296 subscription.RecordURI = recordURI.String
297 subscription.RecordCID = recordCID.String
298
299 result = append(result, subscription)
300 }
301
302 if err = rows.Err(); err != nil {
303 return nil, fmt.Errorf("error iterating subscribers: %w", err)
304 }
305
306 return result, nil
307}