A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "strings"
8 "time"
9
10 "Coves/internal/core/aggregators"
11)
12
13type postgresAggregatorRepo struct {
14 db *sql.DB
15}
16
17// NewAggregatorRepository creates a new PostgreSQL aggregator repository
18func NewAggregatorRepository(db *sql.DB) aggregators.Repository {
19 return &postgresAggregatorRepo{db: db}
20}
21
22// ===== Aggregator CRUD Operations =====
23
24// CreateAggregator indexes a new aggregator service declaration from the firehose
25func (r *postgresAggregatorRepo) CreateAggregator(ctx context.Context, agg *aggregators.Aggregator) error {
26 query := `
27 INSERT INTO aggregators (
28 did, display_name, description, avatar_url, config_schema,
29 maintainer_did, source_url, created_at, indexed_at, record_uri, record_cid
30 ) VALUES (
31 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11
32 )
33 ON CONFLICT (did) DO UPDATE SET
34 display_name = EXCLUDED.display_name,
35 description = EXCLUDED.description,
36 avatar_url = EXCLUDED.avatar_url,
37 config_schema = EXCLUDED.config_schema,
38 maintainer_did = EXCLUDED.maintainer_did,
39 source_url = EXCLUDED.source_url,
40 created_at = EXCLUDED.created_at,
41 indexed_at = EXCLUDED.indexed_at,
42 record_uri = EXCLUDED.record_uri,
43 record_cid = EXCLUDED.record_cid`
44
45 var configSchema interface{}
46 if len(agg.ConfigSchema) > 0 {
47 configSchema = agg.ConfigSchema
48 } else {
49 configSchema = nil
50 }
51
52 _, err := r.db.ExecContext(ctx, query,
53 agg.DID,
54 agg.DisplayName,
55 nullString(agg.Description),
56 nullString(agg.AvatarURL),
57 configSchema,
58 nullString(agg.MaintainerDID),
59 nullString(agg.SourceURL),
60 agg.CreatedAt,
61 agg.IndexedAt,
62 nullString(agg.RecordURI),
63 nullString(agg.RecordCID),
64 )
65 if err != nil {
66 return fmt.Errorf("failed to create aggregator: %w", err)
67 }
68
69 return nil
70}
71
72// GetAggregator retrieves an aggregator by DID
73func (r *postgresAggregatorRepo) GetAggregator(ctx context.Context, did string) (*aggregators.Aggregator, error) {
74 query := `
75 SELECT
76 did, display_name, description, avatar_url, config_schema,
77 maintainer_did, source_url, communities_using, posts_created,
78 created_at, indexed_at, record_uri, record_cid
79 FROM aggregators
80 WHERE did = $1`
81
82 agg := &aggregators.Aggregator{}
83 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString
84 var configSchema []byte
85
86 err := r.db.QueryRowContext(ctx, query, did).Scan(
87 &agg.DID,
88 &agg.DisplayName,
89 &description,
90 &avatarCID,
91 &configSchema,
92 &maintainerDID,
93 &homepageURL,
94 &agg.CommunitiesUsing,
95 &agg.PostsCreated,
96 &agg.CreatedAt,
97 &agg.IndexedAt,
98 &recordURI,
99 &recordCID,
100 )
101
102 if err == sql.ErrNoRows {
103 return nil, aggregators.ErrAggregatorNotFound
104 }
105 if err != nil {
106 return nil, fmt.Errorf("failed to get aggregator: %w", err)
107 }
108
109 // Map nullable fields
110 agg.Description = description.String
111 agg.AvatarURL = avatarCID.String
112 agg.MaintainerDID = maintainerDID.String
113 agg.SourceURL = homepageURL.String
114 agg.RecordURI = recordURI.String
115 agg.RecordCID = recordCID.String
116 if configSchema != nil {
117 agg.ConfigSchema = configSchema
118 }
119
120 return agg, nil
121}
122
123// GetAggregatorsByDIDs retrieves multiple aggregators by DIDs in a single query (avoids N+1)
124func (r *postgresAggregatorRepo) GetAggregatorsByDIDs(ctx context.Context, dids []string) ([]*aggregators.Aggregator, error) {
125 if len(dids) == 0 {
126 return []*aggregators.Aggregator{}, nil
127 }
128
129 // Build IN clause with placeholders
130 placeholders := make([]string, len(dids))
131 args := make([]interface{}, len(dids))
132 for i, did := range dids {
133 placeholders[i] = fmt.Sprintf("$%d", i+1)
134 args[i] = did
135 }
136
137 query := fmt.Sprintf(`
138 SELECT
139 did, display_name, description, avatar_url, config_schema,
140 maintainer_did, source_url, communities_using, posts_created,
141 created_at, indexed_at, record_uri, record_cid
142 FROM aggregators
143 WHERE did IN (%s)`, strings.Join(placeholders, ", "))
144
145 rows, err := r.db.QueryContext(ctx, query, args...)
146 if err != nil {
147 return nil, fmt.Errorf("failed to get aggregators: %w", err)
148 }
149 defer func() { _ = rows.Close() }()
150
151 var results []*aggregators.Aggregator
152 for rows.Next() {
153 agg := &aggregators.Aggregator{}
154 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString
155 var configSchema []byte
156
157 err := rows.Scan(
158 &agg.DID,
159 &agg.DisplayName,
160 &description,
161 &avatarCID,
162 &configSchema,
163 &maintainerDID,
164 &homepageURL,
165 &agg.CommunitiesUsing,
166 &agg.PostsCreated,
167 &agg.CreatedAt,
168 &agg.IndexedAt,
169 &recordURI,
170 &recordCID,
171 )
172 if err != nil {
173 return nil, fmt.Errorf("failed to scan aggregator: %w", err)
174 }
175
176 // Map nullable fields
177 agg.Description = description.String
178 agg.AvatarURL = avatarCID.String
179 agg.MaintainerDID = maintainerDID.String
180 agg.SourceURL = homepageURL.String
181 agg.RecordURI = recordURI.String
182 agg.RecordCID = recordCID.String
183 if configSchema != nil {
184 agg.ConfigSchema = configSchema
185 }
186
187 results = append(results, agg)
188 }
189
190 if err = rows.Err(); err != nil {
191 return nil, fmt.Errorf("error iterating aggregators: %w", err)
192 }
193
194 return results, nil
195}
196
197// UpdateAggregator updates an existing aggregator
198func (r *postgresAggregatorRepo) UpdateAggregator(ctx context.Context, agg *aggregators.Aggregator) error {
199 query := `
200 UPDATE aggregators SET
201 display_name = $2,
202 description = $3,
203 avatar_url = $4,
204 config_schema = $5,
205 maintainer_did = $6,
206 source_url = $7,
207 created_at = $8,
208 indexed_at = $9,
209 record_uri = $10,
210 record_cid = $11
211 WHERE did = $1`
212
213 var configSchema interface{}
214 if len(agg.ConfigSchema) > 0 {
215 configSchema = agg.ConfigSchema
216 } else {
217 configSchema = nil
218 }
219
220 result, err := r.db.ExecContext(ctx, query,
221 agg.DID,
222 agg.DisplayName,
223 nullString(agg.Description),
224 nullString(agg.AvatarURL),
225 configSchema,
226 nullString(agg.MaintainerDID),
227 nullString(agg.SourceURL),
228 agg.CreatedAt,
229 agg.IndexedAt,
230 nullString(agg.RecordURI),
231 nullString(agg.RecordCID),
232 )
233 if err != nil {
234 return fmt.Errorf("failed to update aggregator: %w", err)
235 }
236
237 rows, err := result.RowsAffected()
238 if err != nil {
239 return fmt.Errorf("failed to get rows affected: %w", err)
240 }
241 if rows == 0 {
242 return aggregators.ErrAggregatorNotFound
243 }
244
245 return nil
246}
247
248// DeleteAggregator removes an aggregator (cascade deletes authorizations and posts via FK)
249func (r *postgresAggregatorRepo) DeleteAggregator(ctx context.Context, did string) error {
250 query := `DELETE FROM aggregators WHERE did = $1`
251
252 result, err := r.db.ExecContext(ctx, query, did)
253 if err != nil {
254 return fmt.Errorf("failed to delete aggregator: %w", err)
255 }
256
257 rows, err := result.RowsAffected()
258 if err != nil {
259 return fmt.Errorf("failed to get rows affected: %w", err)
260 }
261 if rows == 0 {
262 return aggregators.ErrAggregatorNotFound
263 }
264
265 return nil
266}
267
268// ListAggregators retrieves all aggregators with pagination
269func (r *postgresAggregatorRepo) ListAggregators(ctx context.Context, limit, offset int) ([]*aggregators.Aggregator, error) {
270 query := `
271 SELECT
272 did, display_name, description, avatar_url, config_schema,
273 maintainer_did, source_url, communities_using, posts_created,
274 created_at, indexed_at, record_uri, record_cid
275 FROM aggregators
276 ORDER BY communities_using DESC, display_name ASC
277 LIMIT $1 OFFSET $2`
278
279 rows, err := r.db.QueryContext(ctx, query, limit, offset)
280 if err != nil {
281 return nil, fmt.Errorf("failed to list aggregators: %w", err)
282 }
283 defer func() { _ = rows.Close() }()
284
285 var aggs []*aggregators.Aggregator
286 for rows.Next() {
287 agg := &aggregators.Aggregator{}
288 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString
289 var configSchema []byte
290
291 err := rows.Scan(
292 &agg.DID,
293 &agg.DisplayName,
294 &description,
295 &avatarCID,
296 &configSchema,
297 &maintainerDID,
298 &homepageURL,
299 &agg.CommunitiesUsing,
300 &agg.PostsCreated,
301 &agg.CreatedAt,
302 &agg.IndexedAt,
303 &recordURI,
304 &recordCID,
305 )
306 if err != nil {
307 return nil, fmt.Errorf("failed to scan aggregator: %w", err)
308 }
309
310 // Map nullable fields
311 agg.Description = description.String
312 agg.AvatarURL = avatarCID.String
313 agg.MaintainerDID = maintainerDID.String
314 agg.SourceURL = homepageURL.String
315 agg.RecordURI = recordURI.String
316 agg.RecordCID = recordCID.String
317 if configSchema != nil {
318 agg.ConfigSchema = configSchema
319 }
320
321 aggs = append(aggs, agg)
322 }
323
324 if err = rows.Err(); err != nil {
325 return nil, fmt.Errorf("error iterating aggregators: %w", err)
326 }
327
328 return aggs, nil
329}
330
331// IsAggregator performs a fast existence check for post creation handler
332func (r *postgresAggregatorRepo) IsAggregator(ctx context.Context, did string) (bool, error) {
333 query := `SELECT EXISTS(SELECT 1 FROM aggregators WHERE did = $1)`
334
335 var exists bool
336 err := r.db.QueryRowContext(ctx, query, did).Scan(&exists)
337 if err != nil {
338 return false, fmt.Errorf("failed to check if aggregator exists: %w", err)
339 }
340
341 return exists, nil
342}
343
344// ===== Authorization CRUD Operations =====
345
346// CreateAuthorization indexes a new authorization from the firehose
347func (r *postgresAggregatorRepo) CreateAuthorization(ctx context.Context, auth *aggregators.Authorization) error {
348 query := `
349 INSERT INTO aggregator_authorizations (
350 aggregator_did, community_did, enabled, config,
351 created_at, created_by, disabled_at, disabled_by,
352 indexed_at, record_uri, record_cid
353 ) VALUES (
354 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11
355 )
356 ON CONFLICT (aggregator_did, community_did) DO UPDATE SET
357 enabled = EXCLUDED.enabled,
358 config = EXCLUDED.config,
359 created_at = EXCLUDED.created_at,
360 created_by = EXCLUDED.created_by,
361 disabled_at = EXCLUDED.disabled_at,
362 disabled_by = EXCLUDED.disabled_by,
363 indexed_at = EXCLUDED.indexed_at,
364 record_uri = EXCLUDED.record_uri,
365 record_cid = EXCLUDED.record_cid
366 RETURNING id`
367
368 var config interface{}
369 if len(auth.Config) > 0 {
370 config = auth.Config
371 } else {
372 config = nil
373 }
374
375 var disabledAt interface{}
376 if auth.DisabledAt != nil {
377 disabledAt = *auth.DisabledAt
378 } else {
379 disabledAt = nil
380 }
381
382 err := r.db.QueryRowContext(ctx, query,
383 auth.AggregatorDID,
384 auth.CommunityDID,
385 auth.Enabled,
386 config,
387 auth.CreatedAt,
388 auth.CreatedBy, // Required field, no nullString needed
389 disabledAt,
390 nullString(auth.DisabledBy),
391 auth.IndexedAt,
392 nullString(auth.RecordURI),
393 nullString(auth.RecordCID),
394 ).Scan(&auth.ID)
395 if err != nil {
396 // Check for foreign key violations
397 if strings.Contains(err.Error(), "fk_aggregator") {
398 return aggregators.ErrAggregatorNotFound
399 }
400 return fmt.Errorf("failed to create authorization: %w", err)
401 }
402
403 return nil
404}
405
406// GetAuthorization retrieves an authorization by aggregator and community DID
407func (r *postgresAggregatorRepo) GetAuthorization(ctx context.Context, aggregatorDID, communityDID string) (*aggregators.Authorization, error) {
408 query := `
409 SELECT
410 id, aggregator_did, community_did, enabled, config,
411 created_at, created_by, disabled_at, disabled_by,
412 indexed_at, record_uri, record_cid
413 FROM aggregator_authorizations
414 WHERE aggregator_did = $1 AND community_did = $2`
415
416 auth := &aggregators.Authorization{}
417 var config []byte
418 var createdBy, disabledBy, recordURI, recordCID sql.NullString
419 var disabledAt sql.NullTime
420
421 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID).Scan(
422 &auth.ID,
423 &auth.AggregatorDID,
424 &auth.CommunityDID,
425 &auth.Enabled,
426 &config,
427 &auth.CreatedAt,
428 &createdBy,
429 &disabledAt,
430 &disabledBy,
431 &auth.IndexedAt,
432 &recordURI,
433 &recordCID,
434 )
435
436 if err == sql.ErrNoRows {
437 return nil, aggregators.ErrAuthorizationNotFound
438 }
439 if err != nil {
440 return nil, fmt.Errorf("failed to get authorization: %w", err)
441 }
442
443 // Map nullable fields
444 auth.CreatedBy = createdBy.String
445 auth.DisabledBy = disabledBy.String
446 if disabledAt.Valid {
447 disabledAtVal := disabledAt.Time
448 auth.DisabledAt = &disabledAtVal
449 }
450 auth.RecordURI = recordURI.String
451 auth.RecordCID = recordCID.String
452 if config != nil {
453 auth.Config = config
454 }
455
456 return auth, nil
457}
458
459// GetAuthorizationByURI retrieves an authorization by record URI (for Jetstream delete operations)
460func (r *postgresAggregatorRepo) GetAuthorizationByURI(ctx context.Context, recordURI string) (*aggregators.Authorization, error) {
461 query := `
462 SELECT
463 id, aggregator_did, community_did, enabled, config,
464 created_at, created_by, disabled_at, disabled_by,
465 indexed_at, record_uri, record_cid
466 FROM aggregator_authorizations
467 WHERE record_uri = $1`
468
469 auth := &aggregators.Authorization{}
470 var config []byte
471 var createdBy, disabledBy, recordURIField, recordCID sql.NullString
472 var disabledAt sql.NullTime
473
474 err := r.db.QueryRowContext(ctx, query, recordURI).Scan(
475 &auth.ID,
476 &auth.AggregatorDID,
477 &auth.CommunityDID,
478 &auth.Enabled,
479 &config,
480 &auth.CreatedAt,
481 &createdBy,
482 &disabledAt,
483 &disabledBy,
484 &auth.IndexedAt,
485 &recordURIField,
486 &recordCID,
487 )
488
489 if err == sql.ErrNoRows {
490 return nil, aggregators.ErrAuthorizationNotFound
491 }
492 if err != nil {
493 return nil, fmt.Errorf("failed to get authorization by URI: %w", err)
494 }
495
496 // Map nullable fields
497 auth.CreatedBy = createdBy.String
498 auth.DisabledBy = disabledBy.String
499 if disabledAt.Valid {
500 disabledAtVal := disabledAt.Time
501 auth.DisabledAt = &disabledAtVal
502 }
503 auth.RecordURI = recordURIField.String
504 auth.RecordCID = recordCID.String
505 if config != nil {
506 auth.Config = config
507 }
508
509 return auth, nil
510}
511
512// UpdateAuthorization updates an existing authorization
513func (r *postgresAggregatorRepo) UpdateAuthorization(ctx context.Context, auth *aggregators.Authorization) error {
514 query := `
515 UPDATE aggregator_authorizations SET
516 enabled = $3,
517 config = $4,
518 created_at = $5,
519 created_by = $6,
520 disabled_at = $7,
521 disabled_by = $8,
522 indexed_at = $9,
523 record_uri = $10,
524 record_cid = $11
525 WHERE aggregator_did = $1 AND community_did = $2`
526
527 var config interface{}
528 if len(auth.Config) > 0 {
529 config = auth.Config
530 } else {
531 config = nil
532 }
533
534 var disabledAt interface{}
535 if auth.DisabledAt != nil {
536 disabledAt = *auth.DisabledAt
537 } else {
538 disabledAt = nil
539 }
540
541 result, err := r.db.ExecContext(ctx, query,
542 auth.AggregatorDID,
543 auth.CommunityDID,
544 auth.Enabled,
545 config,
546 auth.CreatedAt,
547 nullString(auth.CreatedBy),
548 disabledAt,
549 nullString(auth.DisabledBy),
550 auth.IndexedAt,
551 nullString(auth.RecordURI),
552 nullString(auth.RecordCID),
553 )
554 if err != nil {
555 return fmt.Errorf("failed to update authorization: %w", err)
556 }
557
558 rows, err := result.RowsAffected()
559 if err != nil {
560 return fmt.Errorf("failed to get rows affected: %w", err)
561 }
562 if rows == 0 {
563 return aggregators.ErrAuthorizationNotFound
564 }
565
566 return nil
567}
568
569// DeleteAuthorization removes an authorization
570func (r *postgresAggregatorRepo) DeleteAuthorization(ctx context.Context, aggregatorDID, communityDID string) error {
571 query := `DELETE FROM aggregator_authorizations WHERE aggregator_did = $1 AND community_did = $2`
572
573 result, err := r.db.ExecContext(ctx, query, aggregatorDID, communityDID)
574 if err != nil {
575 return fmt.Errorf("failed to delete authorization: %w", err)
576 }
577
578 rows, err := result.RowsAffected()
579 if err != nil {
580 return fmt.Errorf("failed to get rows affected: %w", err)
581 }
582 if rows == 0 {
583 return aggregators.ErrAuthorizationNotFound
584 }
585
586 return nil
587}
588
589// DeleteAuthorizationByURI removes an authorization by record URI (for Jetstream delete operations)
590func (r *postgresAggregatorRepo) DeleteAuthorizationByURI(ctx context.Context, recordURI string) error {
591 query := `DELETE FROM aggregator_authorizations WHERE record_uri = $1`
592
593 result, err := r.db.ExecContext(ctx, query, recordURI)
594 if err != nil {
595 return fmt.Errorf("failed to delete authorization by URI: %w", err)
596 }
597
598 rows, err := result.RowsAffected()
599 if err != nil {
600 return fmt.Errorf("failed to get rows affected: %w", err)
601 }
602 if rows == 0 {
603 return aggregators.ErrAuthorizationNotFound
604 }
605
606 return nil
607}
608
609// ===== Authorization Query Operations =====
610
611// ListAuthorizationsForAggregator retrieves all communities that authorized an aggregator
612func (r *postgresAggregatorRepo) ListAuthorizationsForAggregator(ctx context.Context, aggregatorDID string, enabledOnly bool, limit, offset int) ([]*aggregators.Authorization, error) {
613 baseQuery := `
614 SELECT
615 id, aggregator_did, community_did, enabled, config,
616 created_at, created_by, disabled_at, disabled_by,
617 indexed_at, record_uri, record_cid
618 FROM aggregator_authorizations
619 WHERE aggregator_did = $1`
620
621 var query string
622 var args []interface{}
623
624 if enabledOnly {
625 query = baseQuery + ` AND enabled = true ORDER BY created_at DESC LIMIT $2 OFFSET $3`
626 args = []interface{}{aggregatorDID, limit, offset}
627 } else {
628 query = baseQuery + ` ORDER BY created_at DESC LIMIT $2 OFFSET $3`
629 args = []interface{}{aggregatorDID, limit, offset}
630 }
631
632 rows, err := r.db.QueryContext(ctx, query, args...)
633 if err != nil {
634 return nil, fmt.Errorf("failed to list authorizations for aggregator: %w", err)
635 }
636 defer func() { _ = rows.Close() }()
637
638 return scanAuthorizations(rows)
639}
640
641// ListAuthorizationsForCommunity retrieves all aggregators authorized by a community
642func (r *postgresAggregatorRepo) ListAuthorizationsForCommunity(ctx context.Context, communityDID string, enabledOnly bool, limit, offset int) ([]*aggregators.Authorization, error) {
643 baseQuery := `
644 SELECT
645 id, aggregator_did, community_did, enabled, config,
646 created_at, created_by, disabled_at, disabled_by,
647 indexed_at, record_uri, record_cid
648 FROM aggregator_authorizations
649 WHERE community_did = $1`
650
651 var query string
652 var args []interface{}
653
654 if enabledOnly {
655 query = baseQuery + ` AND enabled = true ORDER BY created_at DESC LIMIT $2 OFFSET $3`
656 args = []interface{}{communityDID, limit, offset}
657 } else {
658 query = baseQuery + ` ORDER BY created_at DESC LIMIT $2 OFFSET $3`
659 args = []interface{}{communityDID, limit, offset}
660 }
661
662 rows, err := r.db.QueryContext(ctx, query, args...)
663 if err != nil {
664 return nil, fmt.Errorf("failed to list authorizations for community: %w", err)
665 }
666 defer func() { _ = rows.Close() }()
667
668 return scanAuthorizations(rows)
669}
670
671// IsAuthorized performs a fast authorization check (enabled=true)
672// Uses the optimized partial index: idx_aggregator_auth_enabled
673func (r *postgresAggregatorRepo) IsAuthorized(ctx context.Context, aggregatorDID, communityDID string) (bool, error) {
674 query := `
675 SELECT EXISTS(
676 SELECT 1 FROM aggregator_authorizations
677 WHERE aggregator_did = $1 AND community_did = $2 AND enabled = true
678 )`
679
680 var authorized bool
681 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID).Scan(&authorized)
682 if err != nil {
683 return false, fmt.Errorf("failed to check authorization: %w", err)
684 }
685
686 return authorized, nil
687}
688
689// ===== Post Tracking Operations =====
690
691// RecordAggregatorPost tracks a post created by an aggregator (for rate limiting and stats)
692func (r *postgresAggregatorRepo) RecordAggregatorPost(ctx context.Context, aggregatorDID, communityDID, postURI, postCID string) error {
693 query := `
694 INSERT INTO aggregator_posts (aggregator_did, community_did, post_uri, post_cid, created_at)
695 VALUES ($1, $2, $3, $4, NOW())`
696
697 _, err := r.db.ExecContext(ctx, query, aggregatorDID, communityDID, postURI, postCID)
698 if err != nil {
699 return fmt.Errorf("failed to record aggregator post: %w", err)
700 }
701
702 return nil
703}
704
705// CountRecentPosts counts posts created by an aggregator in a community since a given time
706// Uses the optimized index: idx_aggregator_posts_rate_limit
707func (r *postgresAggregatorRepo) CountRecentPosts(ctx context.Context, aggregatorDID, communityDID string, since time.Time) (int, error) {
708 query := `
709 SELECT COUNT(*)
710 FROM aggregator_posts
711 WHERE aggregator_did = $1 AND community_did = $2 AND created_at >= $3`
712
713 var count int
714 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID, since).Scan(&count)
715 if err != nil {
716 return 0, fmt.Errorf("failed to count recent posts: %w", err)
717 }
718
719 return count, nil
720}
721
722// GetRecentPosts retrieves recent posts created by an aggregator in a community
723func (r *postgresAggregatorRepo) GetRecentPosts(ctx context.Context, aggregatorDID, communityDID string, since time.Time) ([]*aggregators.AggregatorPost, error) {
724 query := `
725 SELECT id, aggregator_did, community_did, post_uri, created_at
726 FROM aggregator_posts
727 WHERE aggregator_did = $1 AND community_did = $2 AND created_at >= $3
728 ORDER BY created_at DESC`
729
730 rows, err := r.db.QueryContext(ctx, query, aggregatorDID, communityDID, since)
731 if err != nil {
732 return nil, fmt.Errorf("failed to get recent posts: %w", err)
733 }
734 defer func() { _ = rows.Close() }()
735
736 var posts []*aggregators.AggregatorPost
737 for rows.Next() {
738 post := &aggregators.AggregatorPost{}
739 err := rows.Scan(
740 &post.ID,
741 &post.AggregatorDID,
742 &post.CommunityDID,
743 &post.PostURI,
744 &post.CreatedAt,
745 )
746 if err != nil {
747 return nil, fmt.Errorf("failed to scan aggregator post: %w", err)
748 }
749 posts = append(posts, post)
750 }
751
752 if err = rows.Err(); err != nil {
753 return nil, fmt.Errorf("error iterating aggregator posts: %w", err)
754 }
755
756 return posts, nil
757}
758
759// ===== Helper Functions =====
760
761// scanAuthorizations is a helper to scan multiple authorization rows
762func scanAuthorizations(rows *sql.Rows) ([]*aggregators.Authorization, error) {
763 var auths []*aggregators.Authorization
764
765 for rows.Next() {
766 auth := &aggregators.Authorization{}
767 var config []byte
768 var createdBy, disabledBy, recordURI, recordCID sql.NullString
769 var disabledAt sql.NullTime
770
771 err := rows.Scan(
772 &auth.ID,
773 &auth.AggregatorDID,
774 &auth.CommunityDID,
775 &auth.Enabled,
776 &config,
777 &auth.CreatedAt,
778 &createdBy,
779 &disabledAt,
780 &disabledBy,
781 &auth.IndexedAt,
782 &recordURI,
783 &recordCID,
784 )
785 if err != nil {
786 return nil, fmt.Errorf("failed to scan authorization: %w", err)
787 }
788
789 // Map nullable fields
790 auth.CreatedBy = createdBy.String
791 auth.DisabledBy = disabledBy.String
792 if disabledAt.Valid {
793 disabledAtVal := disabledAt.Time
794 auth.DisabledAt = &disabledAtVal
795 }
796 auth.RecordURI = recordURI.String
797 auth.RecordCID = recordCID.String
798 if config != nil {
799 auth.Config = config
800 }
801
802 auths = append(auths, auth)
803 }
804
805 if err := rows.Err(); err != nil {
806 return nil, fmt.Errorf("error iterating authorizations: %w", err)
807 }
808
809 return auths, nil
810}