A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "strings"
8
9 "Coves/internal/core/communities"
10)
11
12// Subscribe creates a new subscription record
13func (r *postgresCommunityRepo) Subscribe(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) {
14 query := `
15 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid)
16 VALUES ($1, $2, $3, $4, $5)
17 RETURNING id, subscribed_at`
18
19 err := r.db.QueryRowContext(ctx, query,
20 subscription.UserDID,
21 subscription.CommunityDID,
22 subscription.SubscribedAt,
23 nullString(subscription.RecordURI),
24 nullString(subscription.RecordCID),
25 ).Scan(&subscription.ID, &subscription.SubscribedAt)
26
27 if err != nil {
28 if strings.Contains(err.Error(), "duplicate key") {
29 return nil, communities.ErrSubscriptionAlreadyExists
30 }
31 if strings.Contains(err.Error(), "foreign key") {
32 return nil, communities.ErrCommunityNotFound
33 }
34 return nil, fmt.Errorf("failed to create subscription: %w", err)
35 }
36
37 return subscription, nil
38}
39
40// SubscribeWithCount atomically creates subscription and increments subscriber count
41// This is idempotent - safe for Jetstream replays
42func (r *postgresCommunityRepo) SubscribeWithCount(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) {
43 tx, err := r.db.BeginTx(ctx, nil)
44 if err != nil {
45 return nil, fmt.Errorf("failed to begin transaction: %w", err)
46 }
47 defer tx.Rollback()
48
49 // Insert subscription with ON CONFLICT DO NOTHING for idempotency
50 query := `
51 INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid)
52 VALUES ($1, $2, $3, $4, $5)
53 ON CONFLICT (user_did, community_did) DO NOTHING
54 RETURNING id, subscribed_at`
55
56 err = tx.QueryRowContext(ctx, query,
57 subscription.UserDID,
58 subscription.CommunityDID,
59 subscription.SubscribedAt,
60 nullString(subscription.RecordURI),
61 nullString(subscription.RecordCID),
62 ).Scan(&subscription.ID, &subscription.SubscribedAt)
63
64 // If no rows returned, subscription already existed (idempotent behavior)
65 if err == sql.ErrNoRows {
66 // Get existing subscription
67 query = `SELECT id, subscribed_at FROM community_subscriptions WHERE user_did = $1 AND community_did = $2`
68 err = tx.QueryRowContext(ctx, query, subscription.UserDID, subscription.CommunityDID).Scan(&subscription.ID, &subscription.SubscribedAt)
69 if err != nil {
70 return nil, fmt.Errorf("failed to get existing subscription: %w", err)
71 }
72 // Don't increment count - subscription already existed
73 if err := tx.Commit(); err != nil {
74 return nil, fmt.Errorf("failed to commit transaction: %w", err)
75 }
76 return subscription, nil
77 }
78
79 if err != nil {
80 if strings.Contains(err.Error(), "foreign key") {
81 return nil, communities.ErrCommunityNotFound
82 }
83 return nil, fmt.Errorf("failed to create subscription: %w", err)
84 }
85
86 // Increment subscriber count only if insert succeeded
87 incrementQuery := `
88 UPDATE communities
89 SET subscriber_count = subscriber_count + 1, updated_at = NOW()
90 WHERE did = $1`
91
92 _, err = tx.ExecContext(ctx, incrementQuery, subscription.CommunityDID)
93 if err != nil {
94 return nil, fmt.Errorf("failed to increment subscriber count: %w", err)
95 }
96
97 if err := tx.Commit(); err != nil {
98 return nil, fmt.Errorf("failed to commit transaction: %w", err)
99 }
100
101 return subscription, nil
102}
103
104// Unsubscribe removes a subscription record
105func (r *postgresCommunityRepo) Unsubscribe(ctx context.Context, userDID, communityDID string) error {
106 query := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2`
107
108 result, err := r.db.ExecContext(ctx, query, userDID, communityDID)
109 if err != nil {
110 return fmt.Errorf("failed to unsubscribe: %w", err)
111 }
112
113 rowsAffected, err := result.RowsAffected()
114 if err != nil {
115 return fmt.Errorf("failed to check unsubscribe result: %w", err)
116 }
117
118 if rowsAffected == 0 {
119 return communities.ErrSubscriptionNotFound
120 }
121
122 return nil
123}
124
125// UnsubscribeWithCount atomically removes subscription and decrements subscriber count
126// This is idempotent - safe for Jetstream replays
127func (r *postgresCommunityRepo) UnsubscribeWithCount(ctx context.Context, userDID, communityDID string) error {
128 tx, err := r.db.BeginTx(ctx, nil)
129 if err != nil {
130 return fmt.Errorf("failed to begin transaction: %w", err)
131 }
132 defer tx.Rollback()
133
134 // Delete subscription
135 deleteQuery := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2`
136 result, err := tx.ExecContext(ctx, deleteQuery, userDID, communityDID)
137 if err != nil {
138 return fmt.Errorf("failed to unsubscribe: %w", err)
139 }
140
141 rowsAffected, err := result.RowsAffected()
142 if err != nil {
143 return fmt.Errorf("failed to check unsubscribe result: %w", err)
144 }
145
146 // If no rows deleted, subscription didn't exist (idempotent - not an error)
147 if rowsAffected == 0 {
148 if err := tx.Commit(); err != nil {
149 return fmt.Errorf("failed to commit transaction: %w", err)
150 }
151 return nil
152 }
153
154 // Decrement subscriber count only if delete succeeded
155 decrementQuery := `
156 UPDATE communities
157 SET subscriber_count = GREATEST(0, subscriber_count - 1), updated_at = NOW()
158 WHERE did = $1`
159
160 _, err = tx.ExecContext(ctx, decrementQuery, communityDID)
161 if err != nil {
162 return fmt.Errorf("failed to decrement subscriber count: %w", err)
163 }
164
165 if err := tx.Commit(); err != nil {
166 return fmt.Errorf("failed to commit transaction: %w", err)
167 }
168
169 return nil
170}
171
172// GetSubscription retrieves a specific subscription
173func (r *postgresCommunityRepo) GetSubscription(ctx context.Context, userDID, communityDID string) (*communities.Subscription, error) {
174 subscription := &communities.Subscription{}
175 query := `
176 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid
177 FROM community_subscriptions
178 WHERE user_did = $1 AND community_did = $2`
179
180 var recordURI, recordCID sql.NullString
181
182 err := r.db.QueryRowContext(ctx, query, userDID, communityDID).Scan(
183 &subscription.ID,
184 &subscription.UserDID,
185 &subscription.CommunityDID,
186 &subscription.SubscribedAt,
187 &recordURI,
188 &recordCID,
189 )
190
191 if err == sql.ErrNoRows {
192 return nil, communities.ErrSubscriptionNotFound
193 }
194 if err != nil {
195 return nil, fmt.Errorf("failed to get subscription: %w", err)
196 }
197
198 subscription.RecordURI = recordURI.String
199 subscription.RecordCID = recordCID.String
200
201 return subscription, nil
202}
203
204// ListSubscriptions retrieves all subscriptions for a user
205func (r *postgresCommunityRepo) ListSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*communities.Subscription, error) {
206 query := `
207 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid
208 FROM community_subscriptions
209 WHERE user_did = $1
210 ORDER BY subscribed_at DESC
211 LIMIT $2 OFFSET $3`
212
213 rows, err := r.db.QueryContext(ctx, query, userDID, limit, offset)
214 if err != nil {
215 return nil, fmt.Errorf("failed to list subscriptions: %w", err)
216 }
217 defer rows.Close()
218
219 result := []*communities.Subscription{}
220 for rows.Next() {
221 subscription := &communities.Subscription{}
222 var recordURI, recordCID sql.NullString
223
224 err := rows.Scan(
225 &subscription.ID,
226 &subscription.UserDID,
227 &subscription.CommunityDID,
228 &subscription.SubscribedAt,
229 &recordURI,
230 &recordCID,
231 )
232 if err != nil {
233 return nil, fmt.Errorf("failed to scan subscription: %w", err)
234 }
235
236 subscription.RecordURI = recordURI.String
237 subscription.RecordCID = recordCID.String
238
239 result = append(result, subscription)
240 }
241
242 if err = rows.Err(); err != nil {
243 return nil, fmt.Errorf("error iterating subscriptions: %w", err)
244 }
245
246 return result, nil
247}
248
249// ListSubscribers retrieves all subscribers for a community
250func (r *postgresCommunityRepo) ListSubscribers(ctx context.Context, communityDID string, limit, offset int) ([]*communities.Subscription, error) {
251 query := `
252 SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid
253 FROM community_subscriptions
254 WHERE community_did = $1
255 ORDER BY subscribed_at DESC
256 LIMIT $2 OFFSET $3`
257
258 rows, err := r.db.QueryContext(ctx, query, communityDID, limit, offset)
259 if err != nil {
260 return nil, fmt.Errorf("failed to list subscribers: %w", err)
261 }
262 defer rows.Close()
263
264 result := []*communities.Subscription{}
265 for rows.Next() {
266 subscription := &communities.Subscription{}
267 var recordURI, recordCID sql.NullString
268
269 err := rows.Scan(
270 &subscription.ID,
271 &subscription.UserDID,
272 &subscription.CommunityDID,
273 &subscription.SubscribedAt,
274 &recordURI,
275 &recordCID,
276 )
277 if err != nil {
278 return nil, fmt.Errorf("failed to scan subscriber: %w", err)
279 }
280
281 subscription.RecordURI = recordURI.String
282 subscription.RecordCID = recordCID.String
283
284 result = append(result, subscription)
285 }
286
287 if err = rows.Err(); err != nil {
288 return nil, fmt.Errorf("error iterating subscribers: %w", err)
289 }
290
291 return result, nil
292}