A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log"
8 "strings"
9
10 "Coves/internal/core/communities"
11)
12
13// Subscribe creates a new subscription record
14func (r *postgresCommunityRepo) Subscribe(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) {
15 query := `
16 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility)
17 VALUES ($1, $2, $3, $4, $5, $6)
18 RETURNING id, subscribed_at`
19
20 err := r.db.QueryRowContext(ctx, query,
21 subscription.UserDID,
22 subscription.CommunityDID,
23 subscription.SubscribedAt,
24 nullString(subscription.RecordURI),
25 nullString(subscription.RecordCID),
26 subscription.ContentVisibility,
27 ).Scan(&subscription.ID, &subscription.SubscribedAt)
28 if err != nil {
29 if strings.Contains(err.Error(), "duplicate key") {
30 return nil, communities.ErrSubscriptionAlreadyExists
31 }
32 if strings.Contains(err.Error(), "foreign key") {
33 return nil, communities.ErrCommunityNotFound
34 }
35 return nil, fmt.Errorf("failed to create subscription: %w", err)
36 }
37
38 return subscription, nil
39}
40
41// SubscribeWithCount atomically creates subscription and increments subscriber count
42// This is idempotent - safe for Jetstream replays
43func (r *postgresCommunityRepo) SubscribeWithCount(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) {
44 tx, err := r.db.BeginTx(ctx, nil)
45 if err != nil {
46 return nil, fmt.Errorf("failed to begin transaction: %w", err)
47 }
48 defer func() {
49 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
50 log.Printf("Failed to rollback transaction: %v", rollbackErr)
51 }
52 }()
53
54 // Insert subscription with ON CONFLICT DO NOTHING for idempotency
55 query := `
56 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility)
57 VALUES ($1, $2, $3, $4, $5, $6)
58 ON CONFLICT (user_did, community_did) DO NOTHING
59 RETURNING id, subscribed_at, content_visibility`
60
61 err = tx.QueryRowContext(ctx, query,
62 subscription.UserDID,
63 subscription.CommunityDID,
64 subscription.SubscribedAt,
65 nullString(subscription.RecordURI),
66 nullString(subscription.RecordCID),
67 subscription.ContentVisibility,
68 ).Scan(&subscription.ID, &subscription.SubscribedAt, &subscription.ContentVisibility)
69
70 // If no rows returned, subscription already existed (idempotent behavior)
71 if err == sql.ErrNoRows {
72 // Get existing subscription
73 query = `SELECT id, subscribed_at, content_visibility FROM community_subscriptions WHERE user_did = $1 AND community_did = $2`
74 err = tx.QueryRowContext(ctx, query, subscription.UserDID, subscription.CommunityDID).Scan(&subscription.ID, &subscription.SubscribedAt, &subscription.ContentVisibility)
75 if err != nil {
76 return nil, fmt.Errorf("failed to get existing subscription: %w", err)
77 }
78 // Don't increment count - subscription already existed
79 if commitErr := tx.Commit(); commitErr != nil {
80 return nil, fmt.Errorf("failed to commit transaction: %w", commitErr)
81 }
82 return subscription, nil
83 }
84
85 if err != nil {
86 if strings.Contains(err.Error(), "foreign key") {
87 return nil, communities.ErrCommunityNotFound
88 }
89 return nil, fmt.Errorf("failed to create subscription: %w", err)
90 }
91
92 // Increment subscriber count only if insert succeeded
93 incrementQuery := `
94 UPDATE communities
95 SET subscriber_count = subscriber_count + 1, updated_at = NOW()
96 WHERE did = $1`
97
98 _, err = tx.ExecContext(ctx, incrementQuery, subscription.CommunityDID)
99 if err != nil {
100 return nil, fmt.Errorf("failed to increment subscriber count: %w", err)
101 }
102
103 if err := tx.Commit(); err != nil {
104 return nil, fmt.Errorf("failed to commit transaction: %w", err)
105 }
106
107 return subscription, nil
108}
109
110// Unsubscribe removes a subscription record
111func (r *postgresCommunityRepo) Unsubscribe(ctx context.Context, userDID, communityDID string) error {
112 query := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2`
113
114 result, err := r.db.ExecContext(ctx, query, userDID, communityDID)
115 if err != nil {
116 return fmt.Errorf("failed to unsubscribe: %w", err)
117 }
118
119 rowsAffected, err := result.RowsAffected()
120 if err != nil {
121 return fmt.Errorf("failed to check unsubscribe result: %w", err)
122 }
123
124 if rowsAffected == 0 {
125 return communities.ErrSubscriptionNotFound
126 }
127
128 return nil
129}
130
131// UnsubscribeWithCount atomically removes subscription and decrements subscriber count
132// This is idempotent - safe for Jetstream replays
133func (r *postgresCommunityRepo) UnsubscribeWithCount(ctx context.Context, userDID, communityDID string) error {
134 tx, err := r.db.BeginTx(ctx, nil)
135 if err != nil {
136 return fmt.Errorf("failed to begin transaction: %w", err)
137 }
138 defer func() {
139 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
140 log.Printf("Failed to rollback transaction: %v", rollbackErr)
141 }
142 }()
143
144 // Delete subscription
145 deleteQuery := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2`
146 result, err := tx.ExecContext(ctx, deleteQuery, userDID, communityDID)
147 if err != nil {
148 return fmt.Errorf("failed to unsubscribe: %w", err)
149 }
150
151 rowsAffected, err := result.RowsAffected()
152 if err != nil {
153 return fmt.Errorf("failed to check unsubscribe result: %w", err)
154 }
155
156 // If no rows deleted, subscription didn't exist (idempotent - not an error)
157 if rowsAffected == 0 {
158 if commitErr := tx.Commit(); commitErr != nil {
159 return fmt.Errorf("failed to commit transaction: %w", commitErr)
160 }
161 return nil
162 }
163
164 // Decrement subscriber count only if delete succeeded
165 decrementQuery := `
166 UPDATE communities
167 SET subscriber_count = GREATEST(0, subscriber_count - 1), updated_at = NOW()
168 WHERE did = $1`
169
170 _, err = tx.ExecContext(ctx, decrementQuery, communityDID)
171 if err != nil {
172 return fmt.Errorf("failed to decrement subscriber count: %w", err)
173 }
174
175 if err := tx.Commit(); err != nil {
176 return fmt.Errorf("failed to commit transaction: %w", err)
177 }
178
179 return nil
180}
181
182// GetSubscription retrieves a specific subscription
183func (r *postgresCommunityRepo) GetSubscription(ctx context.Context, userDID, communityDID string) (*communities.Subscription, error) {
184 subscription := &communities.Subscription{}
185 query := `
186 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility
187 FROM community_subscriptions
188 WHERE user_did = $1 AND community_did = $2`
189
190 var recordURI, recordCID sql.NullString
191
192 err := r.db.QueryRowContext(ctx, query, userDID, communityDID).Scan(
193 &subscription.ID,
194 &subscription.UserDID,
195 &subscription.CommunityDID,
196 &subscription.SubscribedAt,
197 &recordURI,
198 &recordCID,
199 &subscription.ContentVisibility,
200 )
201
202 if err == sql.ErrNoRows {
203 return nil, communities.ErrSubscriptionNotFound
204 }
205 if err != nil {
206 return nil, fmt.Errorf("failed to get subscription: %w", err)
207 }
208
209 subscription.RecordURI = recordURI.String
210 subscription.RecordCID = recordCID.String
211
212 return subscription, nil
213}
214
215// GetSubscriptionByURI retrieves a subscription by its AT-URI
216// This is used by Jetstream consumer for DELETE operations (which don't include record data)
217func (r *postgresCommunityRepo) GetSubscriptionByURI(ctx context.Context, recordURI string) (*communities.Subscription, error) {
218 subscription := &communities.Subscription{}
219 query := `
220 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility
221 FROM community_subscriptions
222 WHERE record_uri = $1`
223
224 var uri, cid sql.NullString
225
226 err := r.db.QueryRowContext(ctx, query, recordURI).Scan(
227 &subscription.ID,
228 &subscription.UserDID,
229 &subscription.CommunityDID,
230 &subscription.SubscribedAt,
231 &uri,
232 &cid,
233 &subscription.ContentVisibility,
234 )
235
236 if err == sql.ErrNoRows {
237 return nil, communities.ErrSubscriptionNotFound
238 }
239 if err != nil {
240 return nil, fmt.Errorf("failed to get subscription by URI: %w", err)
241 }
242
243 subscription.RecordURI = uri.String
244 subscription.RecordCID = cid.String
245
246 return subscription, nil
247}
248
249// ListSubscriptions retrieves all subscriptions for a user
250func (r *postgresCommunityRepo) ListSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*communities.Subscription, error) {
251 query := `
252 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility
253 FROM community_subscriptions
254 WHERE user_did = $1
255 ORDER BY subscribed_at DESC
256 LIMIT $2 OFFSET $3`
257
258 rows, err := r.db.QueryContext(ctx, query, userDID, limit, offset)
259 if err != nil {
260 return nil, fmt.Errorf("failed to list subscriptions: %w", err)
261 }
262 defer func() {
263 if closeErr := rows.Close(); closeErr != nil {
264 log.Printf("Failed to close rows: %v", closeErr)
265 }
266 }()
267
268 result := []*communities.Subscription{}
269 for rows.Next() {
270 subscription := &communities.Subscription{}
271 var recordURI, recordCID sql.NullString
272
273 scanErr := rows.Scan(
274 &subscription.ID,
275 &subscription.UserDID,
276 &subscription.CommunityDID,
277 &subscription.SubscribedAt,
278 &recordURI,
279 &recordCID,
280 &subscription.ContentVisibility,
281 )
282 if scanErr != nil {
283 return nil, fmt.Errorf("failed to scan subscription: %w", scanErr)
284 }
285
286 subscription.RecordURI = recordURI.String
287 subscription.RecordCID = recordCID.String
288
289 result = append(result, subscription)
290 }
291
292 if err = rows.Err(); err != nil {
293 return nil, fmt.Errorf("error iterating subscriptions: %w", err)
294 }
295
296 return result, nil
297}
298
299// ListSubscribers retrieves all subscribers for a community
300func (r *postgresCommunityRepo) ListSubscribers(ctx context.Context, communityDID string, limit, offset int) ([]*communities.Subscription, error) {
301 query := `
302 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility
303 FROM community_subscriptions
304 WHERE community_did = $1
305 ORDER BY subscribed_at DESC
306 LIMIT $2 OFFSET $3`
307
308 rows, err := r.db.QueryContext(ctx, query, communityDID, limit, offset)
309 if err != nil {
310 return nil, fmt.Errorf("failed to list subscribers: %w", err)
311 }
312 defer func() {
313 if closeErr := rows.Close(); closeErr != nil {
314 log.Printf("Failed to close rows: %v", closeErr)
315 }
316 }()
317
318 result := []*communities.Subscription{}
319 for rows.Next() {
320 subscription := &communities.Subscription{}
321 var recordURI, recordCID sql.NullString
322
323 scanErr := rows.Scan(
324 &subscription.ID,
325 &subscription.UserDID,
326 &subscription.CommunityDID,
327 &subscription.SubscribedAt,
328 &recordURI,
329 &recordCID,
330 &subscription.ContentVisibility,
331 )
332 if scanErr != nil {
333 return nil, fmt.Errorf("failed to scan subscriber: %w", scanErr)
334 }
335
336 subscription.RecordURI = recordURI.String
337 subscription.RecordCID = recordCID.String
338
339 result = append(result, subscription)
340 }
341
342 if err = rows.Err(); err != nil {
343 return nil, fmt.Errorf("error iterating subscribers: %w", err)
344 }
345
346 return result, nil
347}