A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/communities"
5 "context"
6 "database/sql"
7 "fmt"
8 "log"
9 "strings"
10)
11
12// Subscribe creates a new subscription record
13func (r *postgresCommunityRepo) Subscribe(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) {
14 query := `
15 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility)
16 VALUES ($1, $2, $3, $4, $5, $6)
17 RETURNING id, subscribed_at`
18
19 err := r.db.QueryRowContext(ctx, query,
20 subscription.UserDID,
21 subscription.CommunityDID,
22 subscription.SubscribedAt,
23 nullString(subscription.RecordURI),
24 nullString(subscription.RecordCID),
25 subscription.ContentVisibility,
26 ).Scan(&subscription.ID, &subscription.SubscribedAt)
27 if err != nil {
28 if strings.Contains(err.Error(), "duplicate key") {
29 return nil, communities.ErrSubscriptionAlreadyExists
30 }
31 if strings.Contains(err.Error(), "foreign key") {
32 return nil, communities.ErrCommunityNotFound
33 }
34 return nil, fmt.Errorf("failed to create subscription: %w", err)
35 }
36
37 return subscription, nil
38}
39
40// SubscribeWithCount atomically creates subscription and increments subscriber count
41// This is idempotent - safe for Jetstream replays
42func (r *postgresCommunityRepo) SubscribeWithCount(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) {
43 tx, err := r.db.BeginTx(ctx, nil)
44 if err != nil {
45 return nil, fmt.Errorf("failed to begin transaction: %w", err)
46 }
47 defer func() {
48 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
49 log.Printf("Failed to rollback transaction: %v", rollbackErr)
50 }
51 }()
52
53 // Insert subscription with ON CONFLICT DO NOTHING for idempotency
54 query := `
55 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility)
56 VALUES ($1, $2, $3, $4, $5, $6)
57 ON CONFLICT (user_did, community_did) DO NOTHING
58 RETURNING id, subscribed_at, content_visibility`
59
60 err = tx.QueryRowContext(ctx, query,
61 subscription.UserDID,
62 subscription.CommunityDID,
63 subscription.SubscribedAt,
64 nullString(subscription.RecordURI),
65 nullString(subscription.RecordCID),
66 subscription.ContentVisibility,
67 ).Scan(&subscription.ID, &subscription.SubscribedAt, &subscription.ContentVisibility)
68
69 // If no rows returned, subscription already existed (idempotent behavior)
70 if err == sql.ErrNoRows {
71 // Get existing subscription
72 query = `SELECT id, subscribed_at, content_visibility FROM community_subscriptions WHERE user_did = $1 AND community_did = $2`
73 err = tx.QueryRowContext(ctx, query, subscription.UserDID, subscription.CommunityDID).Scan(&subscription.ID, &subscription.SubscribedAt, &subscription.ContentVisibility)
74 if err != nil {
75 return nil, fmt.Errorf("failed to get existing subscription: %w", err)
76 }
77 // Don't increment count - subscription already existed
78 if commitErr := tx.Commit(); commitErr != nil {
79 return nil, fmt.Errorf("failed to commit transaction: %w", commitErr)
80 }
81 return subscription, nil
82 }
83
84 if err != nil {
85 if strings.Contains(err.Error(), "foreign key") {
86 return nil, communities.ErrCommunityNotFound
87 }
88 return nil, fmt.Errorf("failed to create subscription: %w", err)
89 }
90
91 // Increment subscriber count only if insert succeeded
92 incrementQuery := `
93 UPDATE communities
94 SET subscriber_count = subscriber_count + 1, updated_at = NOW()
95 WHERE did = $1`
96
97 _, err = tx.ExecContext(ctx, incrementQuery, subscription.CommunityDID)
98 if err != nil {
99 return nil, fmt.Errorf("failed to increment subscriber count: %w", err)
100 }
101
102 if err := tx.Commit(); err != nil {
103 return nil, fmt.Errorf("failed to commit transaction: %w", err)
104 }
105
106 return subscription, nil
107}
108
109// Unsubscribe removes a subscription record
110func (r *postgresCommunityRepo) Unsubscribe(ctx context.Context, userDID, communityDID string) error {
111 query := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2`
112
113 result, err := r.db.ExecContext(ctx, query, userDID, communityDID)
114 if err != nil {
115 return fmt.Errorf("failed to unsubscribe: %w", err)
116 }
117
118 rowsAffected, err := result.RowsAffected()
119 if err != nil {
120 return fmt.Errorf("failed to check unsubscribe result: %w", err)
121 }
122
123 if rowsAffected == 0 {
124 return communities.ErrSubscriptionNotFound
125 }
126
127 return nil
128}
129
130// UnsubscribeWithCount atomically removes subscription and decrements subscriber count
131// This is idempotent - safe for Jetstream replays
132func (r *postgresCommunityRepo) UnsubscribeWithCount(ctx context.Context, userDID, communityDID string) error {
133 tx, err := r.db.BeginTx(ctx, nil)
134 if err != nil {
135 return fmt.Errorf("failed to begin transaction: %w", err)
136 }
137 defer func() {
138 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
139 log.Printf("Failed to rollback transaction: %v", rollbackErr)
140 }
141 }()
142
143 // Delete subscription
144 deleteQuery := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2`
145 result, err := tx.ExecContext(ctx, deleteQuery, userDID, communityDID)
146 if err != nil {
147 return fmt.Errorf("failed to unsubscribe: %w", err)
148 }
149
150 rowsAffected, err := result.RowsAffected()
151 if err != nil {
152 return fmt.Errorf("failed to check unsubscribe result: %w", err)
153 }
154
155 // If no rows deleted, subscription didn't exist (idempotent - not an error)
156 if rowsAffected == 0 {
157 if commitErr := tx.Commit(); commitErr != nil {
158 return fmt.Errorf("failed to commit transaction: %w", commitErr)
159 }
160 return nil
161 }
162
163 // Decrement subscriber count only if delete succeeded
164 decrementQuery := `
165 UPDATE communities
166 SET subscriber_count = GREATEST(0, subscriber_count - 1), updated_at = NOW()
167 WHERE did = $1`
168
169 _, err = tx.ExecContext(ctx, decrementQuery, communityDID)
170 if err != nil {
171 return fmt.Errorf("failed to decrement subscriber count: %w", err)
172 }
173
174 if err := tx.Commit(); err != nil {
175 return fmt.Errorf("failed to commit transaction: %w", err)
176 }
177
178 return nil
179}
180
181// GetSubscription retrieves a specific subscription
182func (r *postgresCommunityRepo) GetSubscription(ctx context.Context, userDID, communityDID string) (*communities.Subscription, error) {
183 subscription := &communities.Subscription{}
184 query := `
185 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility
186 FROM community_subscriptions
187 WHERE user_did = $1 AND community_did = $2`
188
189 var recordURI, recordCID sql.NullString
190
191 err := r.db.QueryRowContext(ctx, query, userDID, communityDID).Scan(
192 &subscription.ID,
193 &subscription.UserDID,
194 &subscription.CommunityDID,
195 &subscription.SubscribedAt,
196 &recordURI,
197 &recordCID,
198 &subscription.ContentVisibility,
199 )
200
201 if err == sql.ErrNoRows {
202 return nil, communities.ErrSubscriptionNotFound
203 }
204 if err != nil {
205 return nil, fmt.Errorf("failed to get subscription: %w", err)
206 }
207
208 subscription.RecordURI = recordURI.String
209 subscription.RecordCID = recordCID.String
210
211 return subscription, nil
212}
213
214// GetSubscriptionByURI retrieves a subscription by its AT-URI
215// This is used by Jetstream consumer for DELETE operations (which don't include record data)
216func (r *postgresCommunityRepo) GetSubscriptionByURI(ctx context.Context, recordURI string) (*communities.Subscription, error) {
217 subscription := &communities.Subscription{}
218 query := `
219 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility
220 FROM community_subscriptions
221 WHERE record_uri = $1`
222
223 var uri, cid sql.NullString
224
225 err := r.db.QueryRowContext(ctx, query, recordURI).Scan(
226 &subscription.ID,
227 &subscription.UserDID,
228 &subscription.CommunityDID,
229 &subscription.SubscribedAt,
230 &uri,
231 &cid,
232 &subscription.ContentVisibility,
233 )
234
235 if err == sql.ErrNoRows {
236 return nil, communities.ErrSubscriptionNotFound
237 }
238 if err != nil {
239 return nil, fmt.Errorf("failed to get subscription by URI: %w", err)
240 }
241
242 subscription.RecordURI = uri.String
243 subscription.RecordCID = cid.String
244
245 return subscription, nil
246}
247
248// ListSubscriptions retrieves all subscriptions for a user
249func (r *postgresCommunityRepo) ListSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*communities.Subscription, error) {
250 query := `
251 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility
252 FROM community_subscriptions
253 WHERE user_did = $1
254 ORDER BY subscribed_at DESC
255 LIMIT $2 OFFSET $3`
256
257 rows, err := r.db.QueryContext(ctx, query, userDID, limit, offset)
258 if err != nil {
259 return nil, fmt.Errorf("failed to list subscriptions: %w", err)
260 }
261 defer func() {
262 if closeErr := rows.Close(); closeErr != nil {
263 log.Printf("Failed to close rows: %v", closeErr)
264 }
265 }()
266
267 result := []*communities.Subscription{}
268 for rows.Next() {
269 subscription := &communities.Subscription{}
270 var recordURI, recordCID sql.NullString
271
272 scanErr := rows.Scan(
273 &subscription.ID,
274 &subscription.UserDID,
275 &subscription.CommunityDID,
276 &subscription.SubscribedAt,
277 &recordURI,
278 &recordCID,
279 &subscription.ContentVisibility,
280 )
281 if scanErr != nil {
282 return nil, fmt.Errorf("failed to scan subscription: %w", scanErr)
283 }
284
285 subscription.RecordURI = recordURI.String
286 subscription.RecordCID = recordCID.String
287
288 result = append(result, subscription)
289 }
290
291 if err = rows.Err(); err != nil {
292 return nil, fmt.Errorf("error iterating subscriptions: %w", err)
293 }
294
295 return result, nil
296}
297
298// ListSubscribers retrieves all subscribers for a community
299func (r *postgresCommunityRepo) ListSubscribers(ctx context.Context, communityDID string, limit, offset int) ([]*communities.Subscription, error) {
300 query := `
301 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid, content_visibility
302 FROM community_subscriptions
303 WHERE community_did = $1
304 ORDER BY subscribed_at DESC
305 LIMIT $2 OFFSET $3`
306
307 rows, err := r.db.QueryContext(ctx, query, communityDID, limit, offset)
308 if err != nil {
309 return nil, fmt.Errorf("failed to list subscribers: %w", err)
310 }
311 defer func() {
312 if closeErr := rows.Close(); closeErr != nil {
313 log.Printf("Failed to close rows: %v", closeErr)
314 }
315 }()
316
317 result := []*communities.Subscription{}
318 for rows.Next() {
319 subscription := &communities.Subscription{}
320 var recordURI, recordCID sql.NullString
321
322 scanErr := rows.Scan(
323 &subscription.ID,
324 &subscription.UserDID,
325 &subscription.CommunityDID,
326 &subscription.SubscribedAt,
327 &recordURI,
328 &recordCID,
329 &subscription.ContentVisibility,
330 )
331 if scanErr != nil {
332 return nil, fmt.Errorf("failed to scan subscriber: %w", scanErr)
333 }
334
335 subscription.RecordURI = recordURI.String
336 subscription.RecordCID = recordCID.String
337
338 result = append(result, subscription)
339 }
340
341 if err = rows.Err(); err != nil {
342 return nil, fmt.Errorf("error iterating subscribers: %w", err)
343 }
344
345 return result, nil
346}