A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/aggregators"
5 "context"
6 "database/sql"
7 "fmt"
8 "strings"
9 "time"
10)
11
12type postgresAggregatorRepo struct {
13 db *sql.DB
14}
15
16// NewAggregatorRepository creates a new PostgreSQL aggregator repository
17func NewAggregatorRepository(db *sql.DB) aggregators.Repository {
18 return &postgresAggregatorRepo{db: db}
19}
20
21// ===== Aggregator CRUD Operations =====
22
23// CreateAggregator indexes a new aggregator service declaration from the firehose
24func (r *postgresAggregatorRepo) CreateAggregator(ctx context.Context, agg *aggregators.Aggregator) error {
25 query := `
26 INSERT INTO aggregators (
27 did, display_name, description, avatar_url, config_schema,
28 maintainer_did, source_url, created_at, indexed_at, record_uri, record_cid
29 ) VALUES (
30 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11
31 )
32 ON CONFLICT (did) DO UPDATE SET
33 display_name = EXCLUDED.display_name,
34 description = EXCLUDED.description,
35 avatar_url = EXCLUDED.avatar_url,
36 config_schema = EXCLUDED.config_schema,
37 maintainer_did = EXCLUDED.maintainer_did,
38 source_url = EXCLUDED.source_url,
39 created_at = EXCLUDED.created_at,
40 indexed_at = EXCLUDED.indexed_at,
41 record_uri = EXCLUDED.record_uri,
42 record_cid = EXCLUDED.record_cid`
43
44 var configSchema interface{}
45 if len(agg.ConfigSchema) > 0 {
46 configSchema = agg.ConfigSchema
47 } else {
48 configSchema = nil
49 }
50
51 _, err := r.db.ExecContext(ctx, query,
52 agg.DID,
53 agg.DisplayName,
54 nullString(agg.Description),
55 nullString(agg.AvatarURL),
56 configSchema,
57 nullString(agg.MaintainerDID),
58 nullString(agg.SourceURL),
59 agg.CreatedAt,
60 agg.IndexedAt,
61 nullString(agg.RecordURI),
62 nullString(agg.RecordCID),
63 )
64 if err != nil {
65 return fmt.Errorf("failed to create aggregator: %w", err)
66 }
67
68 return nil
69}
70
71// GetAggregator retrieves an aggregator by DID
72func (r *postgresAggregatorRepo) GetAggregator(ctx context.Context, did string) (*aggregators.Aggregator, error) {
73 query := `
74 SELECT
75 did, display_name, description, avatar_url, config_schema,
76 maintainer_did, source_url, communities_using, posts_created,
77 created_at, indexed_at, record_uri, record_cid
78 FROM aggregators
79 WHERE did = $1`
80
81 agg := &aggregators.Aggregator{}
82 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString
83 var configSchema []byte
84
85 err := r.db.QueryRowContext(ctx, query, did).Scan(
86 &agg.DID,
87 &agg.DisplayName,
88 &description,
89 &avatarCID,
90 &configSchema,
91 &maintainerDID,
92 &homepageURL,
93 &agg.CommunitiesUsing,
94 &agg.PostsCreated,
95 &agg.CreatedAt,
96 &agg.IndexedAt,
97 &recordURI,
98 &recordCID,
99 )
100
101 if err == sql.ErrNoRows {
102 return nil, aggregators.ErrAggregatorNotFound
103 }
104 if err != nil {
105 return nil, fmt.Errorf("failed to get aggregator: %w", err)
106 }
107
108 // Map nullable fields
109 agg.Description = description.String
110 agg.AvatarURL = avatarCID.String
111 agg.MaintainerDID = maintainerDID.String
112 agg.SourceURL = homepageURL.String
113 agg.RecordURI = recordURI.String
114 agg.RecordCID = recordCID.String
115 if configSchema != nil {
116 agg.ConfigSchema = configSchema
117 }
118
119 return agg, nil
120}
121
122// GetAggregatorsByDIDs retrieves multiple aggregators by DIDs in a single query (avoids N+1)
123func (r *postgresAggregatorRepo) GetAggregatorsByDIDs(ctx context.Context, dids []string) ([]*aggregators.Aggregator, error) {
124 if len(dids) == 0 {
125 return []*aggregators.Aggregator{}, nil
126 }
127
128 // Build IN clause with placeholders
129 placeholders := make([]string, len(dids))
130 args := make([]interface{}, len(dids))
131 for i, did := range dids {
132 placeholders[i] = fmt.Sprintf("$%d", i+1)
133 args[i] = did
134 }
135
136 query := fmt.Sprintf(`
137 SELECT
138 did, display_name, description, avatar_url, config_schema,
139 maintainer_did, source_url, communities_using, posts_created,
140 created_at, indexed_at, record_uri, record_cid
141 FROM aggregators
142 WHERE did IN (%s)`, strings.Join(placeholders, ", "))
143
144 rows, err := r.db.QueryContext(ctx, query, args...)
145 if err != nil {
146 return nil, fmt.Errorf("failed to get aggregators: %w", err)
147 }
148 defer func() { _ = rows.Close() }()
149
150 var results []*aggregators.Aggregator
151 for rows.Next() {
152 agg := &aggregators.Aggregator{}
153 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString
154 var configSchema []byte
155
156 err := rows.Scan(
157 &agg.DID,
158 &agg.DisplayName,
159 &description,
160 &avatarCID,
161 &configSchema,
162 &maintainerDID,
163 &homepageURL,
164 &agg.CommunitiesUsing,
165 &agg.PostsCreated,
166 &agg.CreatedAt,
167 &agg.IndexedAt,
168 &recordURI,
169 &recordCID,
170 )
171 if err != nil {
172 return nil, fmt.Errorf("failed to scan aggregator: %w", err)
173 }
174
175 // Map nullable fields
176 agg.Description = description.String
177 agg.AvatarURL = avatarCID.String
178 agg.MaintainerDID = maintainerDID.String
179 agg.SourceURL = homepageURL.String
180 agg.RecordURI = recordURI.String
181 agg.RecordCID = recordCID.String
182 if configSchema != nil {
183 agg.ConfigSchema = configSchema
184 }
185
186 results = append(results, agg)
187 }
188
189 if err = rows.Err(); err != nil {
190 return nil, fmt.Errorf("error iterating aggregators: %w", err)
191 }
192
193 return results, nil
194}
195
196// UpdateAggregator updates an existing aggregator
197func (r *postgresAggregatorRepo) UpdateAggregator(ctx context.Context, agg *aggregators.Aggregator) error {
198 query := `
199 UPDATE aggregators SET
200 display_name = $2,
201 description = $3,
202 avatar_url = $4,
203 config_schema = $5,
204 maintainer_did = $6,
205 source_url = $7,
206 created_at = $8,
207 indexed_at = $9,
208 record_uri = $10,
209 record_cid = $11
210 WHERE did = $1`
211
212 var configSchema interface{}
213 if len(agg.ConfigSchema) > 0 {
214 configSchema = agg.ConfigSchema
215 } else {
216 configSchema = nil
217 }
218
219 result, err := r.db.ExecContext(ctx, query,
220 agg.DID,
221 agg.DisplayName,
222 nullString(agg.Description),
223 nullString(agg.AvatarURL),
224 configSchema,
225 nullString(agg.MaintainerDID),
226 nullString(agg.SourceURL),
227 agg.CreatedAt,
228 agg.IndexedAt,
229 nullString(agg.RecordURI),
230 nullString(agg.RecordCID),
231 )
232 if err != nil {
233 return fmt.Errorf("failed to update aggregator: %w", err)
234 }
235
236 rows, err := result.RowsAffected()
237 if err != nil {
238 return fmt.Errorf("failed to get rows affected: %w", err)
239 }
240 if rows == 0 {
241 return aggregators.ErrAggregatorNotFound
242 }
243
244 return nil
245}
246
247// DeleteAggregator removes an aggregator (cascade deletes authorizations and posts via FK)
248func (r *postgresAggregatorRepo) DeleteAggregator(ctx context.Context, did string) error {
249 query := `DELETE FROM aggregators WHERE did = $1`
250
251 result, err := r.db.ExecContext(ctx, query, did)
252 if err != nil {
253 return fmt.Errorf("failed to delete aggregator: %w", err)
254 }
255
256 rows, err := result.RowsAffected()
257 if err != nil {
258 return fmt.Errorf("failed to get rows affected: %w", err)
259 }
260 if rows == 0 {
261 return aggregators.ErrAggregatorNotFound
262 }
263
264 return nil
265}
266
267// ListAggregators retrieves all aggregators with pagination
268func (r *postgresAggregatorRepo) ListAggregators(ctx context.Context, limit, offset int) ([]*aggregators.Aggregator, error) {
269 query := `
270 SELECT
271 did, display_name, description, avatar_url, config_schema,
272 maintainer_did, source_url, communities_using, posts_created,
273 created_at, indexed_at, record_uri, record_cid
274 FROM aggregators
275 ORDER BY communities_using DESC, display_name ASC
276 LIMIT $1 OFFSET $2`
277
278 rows, err := r.db.QueryContext(ctx, query, limit, offset)
279 if err != nil {
280 return nil, fmt.Errorf("failed to list aggregators: %w", err)
281 }
282 defer func() { _ = rows.Close() }()
283
284 var aggs []*aggregators.Aggregator
285 for rows.Next() {
286 agg := &aggregators.Aggregator{}
287 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString
288 var configSchema []byte
289
290 err := rows.Scan(
291 &agg.DID,
292 &agg.DisplayName,
293 &description,
294 &avatarCID,
295 &configSchema,
296 &maintainerDID,
297 &homepageURL,
298 &agg.CommunitiesUsing,
299 &agg.PostsCreated,
300 &agg.CreatedAt,
301 &agg.IndexedAt,
302 &recordURI,
303 &recordCID,
304 )
305 if err != nil {
306 return nil, fmt.Errorf("failed to scan aggregator: %w", err)
307 }
308
309 // Map nullable fields
310 agg.Description = description.String
311 agg.AvatarURL = avatarCID.String
312 agg.MaintainerDID = maintainerDID.String
313 agg.SourceURL = homepageURL.String
314 agg.RecordURI = recordURI.String
315 agg.RecordCID = recordCID.String
316 if configSchema != nil {
317 agg.ConfigSchema = configSchema
318 }
319
320 aggs = append(aggs, agg)
321 }
322
323 if err = rows.Err(); err != nil {
324 return nil, fmt.Errorf("error iterating aggregators: %w", err)
325 }
326
327 return aggs, nil
328}
329
330// IsAggregator performs a fast existence check for post creation handler
331func (r *postgresAggregatorRepo) IsAggregator(ctx context.Context, did string) (bool, error) {
332 query := `SELECT EXISTS(SELECT 1 FROM aggregators WHERE did = $1)`
333
334 var exists bool
335 err := r.db.QueryRowContext(ctx, query, did).Scan(&exists)
336 if err != nil {
337 return false, fmt.Errorf("failed to check if aggregator exists: %w", err)
338 }
339
340 return exists, nil
341}
342
343// ===== Authorization CRUD Operations =====
344
345// CreateAuthorization indexes a new authorization from the firehose
346func (r *postgresAggregatorRepo) CreateAuthorization(ctx context.Context, auth *aggregators.Authorization) error {
347 query := `
348 INSERT INTO aggregator_authorizations (
349 aggregator_did, community_did, enabled, config,
350 created_at, created_by, disabled_at, disabled_by,
351 indexed_at, record_uri, record_cid
352 ) VALUES (
353 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11
354 )
355 ON CONFLICT (aggregator_did, community_did) DO UPDATE SET
356 enabled = EXCLUDED.enabled,
357 config = EXCLUDED.config,
358 created_at = EXCLUDED.created_at,
359 created_by = EXCLUDED.created_by,
360 disabled_at = EXCLUDED.disabled_at,
361 disabled_by = EXCLUDED.disabled_by,
362 indexed_at = EXCLUDED.indexed_at,
363 record_uri = EXCLUDED.record_uri,
364 record_cid = EXCLUDED.record_cid
365 RETURNING id`
366
367 var config interface{}
368 if len(auth.Config) > 0 {
369 config = auth.Config
370 } else {
371 config = nil
372 }
373
374 var disabledAt interface{}
375 if auth.DisabledAt != nil {
376 disabledAt = *auth.DisabledAt
377 } else {
378 disabledAt = nil
379 }
380
381 err := r.db.QueryRowContext(ctx, query,
382 auth.AggregatorDID,
383 auth.CommunityDID,
384 auth.Enabled,
385 config,
386 auth.CreatedAt,
387 auth.CreatedBy, // Required field, no nullString needed
388 disabledAt,
389 nullString(auth.DisabledBy),
390 auth.IndexedAt,
391 nullString(auth.RecordURI),
392 nullString(auth.RecordCID),
393 ).Scan(&auth.ID)
394 if err != nil {
395 // Check for foreign key violations
396 if strings.Contains(err.Error(), "fk_aggregator") {
397 return aggregators.ErrAggregatorNotFound
398 }
399 return fmt.Errorf("failed to create authorization: %w", err)
400 }
401
402 return nil
403}
404
405// GetAuthorization retrieves an authorization by aggregator and community DID
406func (r *postgresAggregatorRepo) GetAuthorization(ctx context.Context, aggregatorDID, communityDID string) (*aggregators.Authorization, error) {
407 query := `
408 SELECT
409 id, aggregator_did, community_did, enabled, config,
410 created_at, created_by, disabled_at, disabled_by,
411 indexed_at, record_uri, record_cid
412 FROM aggregator_authorizations
413 WHERE aggregator_did = $1 AND community_did = $2`
414
415 auth := &aggregators.Authorization{}
416 var config []byte
417 var createdBy, disabledBy, recordURI, recordCID sql.NullString
418 var disabledAt sql.NullTime
419
420 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID).Scan(
421 &auth.ID,
422 &auth.AggregatorDID,
423 &auth.CommunityDID,
424 &auth.Enabled,
425 &config,
426 &auth.CreatedAt,
427 &createdBy,
428 &disabledAt,
429 &disabledBy,
430 &auth.IndexedAt,
431 &recordURI,
432 &recordCID,
433 )
434
435 if err == sql.ErrNoRows {
436 return nil, aggregators.ErrAuthorizationNotFound
437 }
438 if err != nil {
439 return nil, fmt.Errorf("failed to get authorization: %w", err)
440 }
441
442 // Map nullable fields
443 auth.CreatedBy = createdBy.String
444 auth.DisabledBy = disabledBy.String
445 if disabledAt.Valid {
446 disabledAtVal := disabledAt.Time
447 auth.DisabledAt = &disabledAtVal
448 }
449 auth.RecordURI = recordURI.String
450 auth.RecordCID = recordCID.String
451 if config != nil {
452 auth.Config = config
453 }
454
455 return auth, nil
456}
457
458// GetAuthorizationByURI retrieves an authorization by record URI (for Jetstream delete operations)
459func (r *postgresAggregatorRepo) GetAuthorizationByURI(ctx context.Context, recordURI string) (*aggregators.Authorization, error) {
460 query := `
461 SELECT
462 id, aggregator_did, community_did, enabled, config,
463 created_at, created_by, disabled_at, disabled_by,
464 indexed_at, record_uri, record_cid
465 FROM aggregator_authorizations
466 WHERE record_uri = $1`
467
468 auth := &aggregators.Authorization{}
469 var config []byte
470 var createdBy, disabledBy, recordURIField, recordCID sql.NullString
471 var disabledAt sql.NullTime
472
473 err := r.db.QueryRowContext(ctx, query, recordURI).Scan(
474 &auth.ID,
475 &auth.AggregatorDID,
476 &auth.CommunityDID,
477 &auth.Enabled,
478 &config,
479 &auth.CreatedAt,
480 &createdBy,
481 &disabledAt,
482 &disabledBy,
483 &auth.IndexedAt,
484 &recordURIField,
485 &recordCID,
486 )
487
488 if err == sql.ErrNoRows {
489 return nil, aggregators.ErrAuthorizationNotFound
490 }
491 if err != nil {
492 return nil, fmt.Errorf("failed to get authorization by URI: %w", err)
493 }
494
495 // Map nullable fields
496 auth.CreatedBy = createdBy.String
497 auth.DisabledBy = disabledBy.String
498 if disabledAt.Valid {
499 disabledAtVal := disabledAt.Time
500 auth.DisabledAt = &disabledAtVal
501 }
502 auth.RecordURI = recordURIField.String
503 auth.RecordCID = recordCID.String
504 if config != nil {
505 auth.Config = config
506 }
507
508 return auth, nil
509}
510
511// UpdateAuthorization updates an existing authorization
512func (r *postgresAggregatorRepo) UpdateAuthorization(ctx context.Context, auth *aggregators.Authorization) error {
513 query := `
514 UPDATE aggregator_authorizations SET
515 enabled = $3,
516 config = $4,
517 created_at = $5,
518 created_by = $6,
519 disabled_at = $7,
520 disabled_by = $8,
521 indexed_at = $9,
522 record_uri = $10,
523 record_cid = $11
524 WHERE aggregator_did = $1 AND community_did = $2`
525
526 var config interface{}
527 if len(auth.Config) > 0 {
528 config = auth.Config
529 } else {
530 config = nil
531 }
532
533 var disabledAt interface{}
534 if auth.DisabledAt != nil {
535 disabledAt = *auth.DisabledAt
536 } else {
537 disabledAt = nil
538 }
539
540 result, err := r.db.ExecContext(ctx, query,
541 auth.AggregatorDID,
542 auth.CommunityDID,
543 auth.Enabled,
544 config,
545 auth.CreatedAt,
546 nullString(auth.CreatedBy),
547 disabledAt,
548 nullString(auth.DisabledBy),
549 auth.IndexedAt,
550 nullString(auth.RecordURI),
551 nullString(auth.RecordCID),
552 )
553 if err != nil {
554 return fmt.Errorf("failed to update authorization: %w", err)
555 }
556
557 rows, err := result.RowsAffected()
558 if err != nil {
559 return fmt.Errorf("failed to get rows affected: %w", err)
560 }
561 if rows == 0 {
562 return aggregators.ErrAuthorizationNotFound
563 }
564
565 return nil
566}
567
568// DeleteAuthorization removes an authorization
569func (r *postgresAggregatorRepo) DeleteAuthorization(ctx context.Context, aggregatorDID, communityDID string) error {
570 query := `DELETE FROM aggregator_authorizations WHERE aggregator_did = $1 AND community_did = $2`
571
572 result, err := r.db.ExecContext(ctx, query, aggregatorDID, communityDID)
573 if err != nil {
574 return fmt.Errorf("failed to delete authorization: %w", err)
575 }
576
577 rows, err := result.RowsAffected()
578 if err != nil {
579 return fmt.Errorf("failed to get rows affected: %w", err)
580 }
581 if rows == 0 {
582 return aggregators.ErrAuthorizationNotFound
583 }
584
585 return nil
586}
587
588// DeleteAuthorizationByURI removes an authorization by record URI (for Jetstream delete operations)
589func (r *postgresAggregatorRepo) DeleteAuthorizationByURI(ctx context.Context, recordURI string) error {
590 query := `DELETE FROM aggregator_authorizations WHERE record_uri = $1`
591
592 result, err := r.db.ExecContext(ctx, query, recordURI)
593 if err != nil {
594 return fmt.Errorf("failed to delete authorization by URI: %w", err)
595 }
596
597 rows, err := result.RowsAffected()
598 if err != nil {
599 return fmt.Errorf("failed to get rows affected: %w", err)
600 }
601 if rows == 0 {
602 return aggregators.ErrAuthorizationNotFound
603 }
604
605 return nil
606}
607
608// ===== Authorization Query Operations =====
609
610// ListAuthorizationsForAggregator retrieves all communities that authorized an aggregator
611func (r *postgresAggregatorRepo) ListAuthorizationsForAggregator(ctx context.Context, aggregatorDID string, enabledOnly bool, limit, offset int) ([]*aggregators.Authorization, error) {
612 baseQuery := `
613 SELECT
614 id, aggregator_did, community_did, enabled, config,
615 created_at, created_by, disabled_at, disabled_by,
616 indexed_at, record_uri, record_cid
617 FROM aggregator_authorizations
618 WHERE aggregator_did = $1`
619
620 var query string
621 var args []interface{}
622
623 if enabledOnly {
624 query = baseQuery + ` AND enabled = true ORDER BY created_at DESC LIMIT $2 OFFSET $3`
625 args = []interface{}{aggregatorDID, limit, offset}
626 } else {
627 query = baseQuery + ` ORDER BY created_at DESC LIMIT $2 OFFSET $3`
628 args = []interface{}{aggregatorDID, limit, offset}
629 }
630
631 rows, err := r.db.QueryContext(ctx, query, args...)
632 if err != nil {
633 return nil, fmt.Errorf("failed to list authorizations for aggregator: %w", err)
634 }
635 defer func() { _ = rows.Close() }()
636
637 return scanAuthorizations(rows)
638}
639
640// ListAuthorizationsForCommunity retrieves all aggregators authorized by a community
641func (r *postgresAggregatorRepo) ListAuthorizationsForCommunity(ctx context.Context, communityDID string, enabledOnly bool, limit, offset int) ([]*aggregators.Authorization, error) {
642 baseQuery := `
643 SELECT
644 id, aggregator_did, community_did, enabled, config,
645 created_at, created_by, disabled_at, disabled_by,
646 indexed_at, record_uri, record_cid
647 FROM aggregator_authorizations
648 WHERE community_did = $1`
649
650 var query string
651 var args []interface{}
652
653 if enabledOnly {
654 query = baseQuery + ` AND enabled = true ORDER BY created_at DESC LIMIT $2 OFFSET $3`
655 args = []interface{}{communityDID, limit, offset}
656 } else {
657 query = baseQuery + ` ORDER BY created_at DESC LIMIT $2 OFFSET $3`
658 args = []interface{}{communityDID, limit, offset}
659 }
660
661 rows, err := r.db.QueryContext(ctx, query, args...)
662 if err != nil {
663 return nil, fmt.Errorf("failed to list authorizations for community: %w", err)
664 }
665 defer func() { _ = rows.Close() }()
666
667 return scanAuthorizations(rows)
668}
669
670// IsAuthorized performs a fast authorization check (enabled=true)
671// Uses the optimized partial index: idx_aggregator_auth_enabled
672func (r *postgresAggregatorRepo) IsAuthorized(ctx context.Context, aggregatorDID, communityDID string) (bool, error) {
673 query := `
674 SELECT EXISTS(
675 SELECT 1 FROM aggregator_authorizations
676 WHERE aggregator_did = $1 AND community_did = $2 AND enabled = true
677 )`
678
679 var authorized bool
680 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID).Scan(&authorized)
681 if err != nil {
682 return false, fmt.Errorf("failed to check authorization: %w", err)
683 }
684
685 return authorized, nil
686}
687
688// ===== Post Tracking Operations =====
689
690// RecordAggregatorPost tracks a post created by an aggregator (for rate limiting and stats)
691func (r *postgresAggregatorRepo) RecordAggregatorPost(ctx context.Context, aggregatorDID, communityDID, postURI, postCID string) error {
692 query := `
693 INSERT INTO aggregator_posts (aggregator_did, community_did, post_uri, post_cid, created_at)
694 VALUES ($1, $2, $3, $4, NOW())`
695
696 _, err := r.db.ExecContext(ctx, query, aggregatorDID, communityDID, postURI, postCID)
697 if err != nil {
698 return fmt.Errorf("failed to record aggregator post: %w", err)
699 }
700
701 return nil
702}
703
704// CountRecentPosts counts posts created by an aggregator in a community since a given time
705// Uses the optimized index: idx_aggregator_posts_rate_limit
706func (r *postgresAggregatorRepo) CountRecentPosts(ctx context.Context, aggregatorDID, communityDID string, since time.Time) (int, error) {
707 query := `
708 SELECT COUNT(*)
709 FROM aggregator_posts
710 WHERE aggregator_did = $1 AND community_did = $2 AND created_at >= $3`
711
712 var count int
713 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID, since).Scan(&count)
714 if err != nil {
715 return 0, fmt.Errorf("failed to count recent posts: %w", err)
716 }
717
718 return count, nil
719}
720
721// GetRecentPosts retrieves recent posts created by an aggregator in a community
722func (r *postgresAggregatorRepo) GetRecentPosts(ctx context.Context, aggregatorDID, communityDID string, since time.Time) ([]*aggregators.AggregatorPost, error) {
723 query := `
724 SELECT id, aggregator_did, community_did, post_uri, created_at
725 FROM aggregator_posts
726 WHERE aggregator_did = $1 AND community_did = $2 AND created_at >= $3
727 ORDER BY created_at DESC`
728
729 rows, err := r.db.QueryContext(ctx, query, aggregatorDID, communityDID, since)
730 if err != nil {
731 return nil, fmt.Errorf("failed to get recent posts: %w", err)
732 }
733 defer func() { _ = rows.Close() }()
734
735 var posts []*aggregators.AggregatorPost
736 for rows.Next() {
737 post := &aggregators.AggregatorPost{}
738 err := rows.Scan(
739 &post.ID,
740 &post.AggregatorDID,
741 &post.CommunityDID,
742 &post.PostURI,
743 &post.CreatedAt,
744 )
745 if err != nil {
746 return nil, fmt.Errorf("failed to scan aggregator post: %w", err)
747 }
748 posts = append(posts, post)
749 }
750
751 if err = rows.Err(); err != nil {
752 return nil, fmt.Errorf("error iterating aggregator posts: %w", err)
753 }
754
755 return posts, nil
756}
757
758// ===== Helper Functions =====
759
760// scanAuthorizations is a helper to scan multiple authorization rows
761func scanAuthorizations(rows *sql.Rows) ([]*aggregators.Authorization, error) {
762 var auths []*aggregators.Authorization
763
764 for rows.Next() {
765 auth := &aggregators.Authorization{}
766 var config []byte
767 var createdBy, disabledBy, recordURI, recordCID sql.NullString
768 var disabledAt sql.NullTime
769
770 err := rows.Scan(
771 &auth.ID,
772 &auth.AggregatorDID,
773 &auth.CommunityDID,
774 &auth.Enabled,
775 &config,
776 &auth.CreatedAt,
777 &createdBy,
778 &disabledAt,
779 &disabledBy,
780 &auth.IndexedAt,
781 &recordURI,
782 &recordCID,
783 )
784 if err != nil {
785 return nil, fmt.Errorf("failed to scan authorization: %w", err)
786 }
787
788 // Map nullable fields
789 auth.CreatedBy = createdBy.String
790 auth.DisabledBy = disabledBy.String
791 if disabledAt.Valid {
792 disabledAtVal := disabledAt.Time
793 auth.DisabledAt = &disabledAtVal
794 }
795 auth.RecordURI = recordURI.String
796 auth.RecordCID = recordCID.String
797 if config != nil {
798 auth.Config = config
799 }
800
801 auths = append(auths, auth)
802 }
803
804 if err := rows.Err(); err != nil {
805 return nil, fmt.Errorf("error iterating authorizations: %w", err)
806 }
807
808 return auths, nil
809}