A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/aggregators"
5 "context"
6 "database/sql"
7 "fmt"
8 "strings"
9 "time"
10)
11
12type postgresAggregatorRepo struct {
13 db *sql.DB
14}
15
16// NewAggregatorRepository creates a new PostgreSQL aggregator repository
17func NewAggregatorRepository(db *sql.DB) aggregators.Repository {
18 return &postgresAggregatorRepo{db: db}
19}
20
21// ===== Aggregator CRUD Operations =====
22
23// CreateAggregator indexes a new aggregator service declaration from the firehose
24func (r *postgresAggregatorRepo) CreateAggregator(ctx context.Context, agg *aggregators.Aggregator) error {
25 query := `
26 INSERT INTO aggregators (
27 did, display_name, description, avatar_url, config_schema,
28 maintainer_did, source_url, created_at, indexed_at, record_uri, record_cid
29 ) VALUES (
30 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11
31 )
32 ON CONFLICT (did) DO UPDATE SET
33 display_name = EXCLUDED.display_name,
34 description = EXCLUDED.description,
35 avatar_url = EXCLUDED.avatar_url,
36 config_schema = EXCLUDED.config_schema,
37 maintainer_did = EXCLUDED.maintainer_did,
38 source_url = EXCLUDED.source_url,
39 created_at = EXCLUDED.created_at,
40 indexed_at = EXCLUDED.indexed_at,
41 record_uri = EXCLUDED.record_uri,
42 record_cid = EXCLUDED.record_cid`
43
44 var configSchema interface{}
45 if len(agg.ConfigSchema) > 0 {
46 configSchema = agg.ConfigSchema
47 } else {
48 configSchema = nil
49 }
50
51 _, err := r.db.ExecContext(ctx, query,
52 agg.DID,
53 agg.DisplayName,
54 nullString(agg.Description),
55 nullString(agg.AvatarURL),
56 configSchema,
57 nullString(agg.MaintainerDID),
58 nullString(agg.SourceURL),
59 agg.CreatedAt,
60 agg.IndexedAt,
61 nullString(agg.RecordURI),
62 nullString(agg.RecordCID),
63 )
64
65 if err != nil {
66 return fmt.Errorf("failed to create aggregator: %w", err)
67 }
68
69 return nil
70}
71
72// GetAggregator retrieves an aggregator by DID
73func (r *postgresAggregatorRepo) GetAggregator(ctx context.Context, did string) (*aggregators.Aggregator, error) {
74 query := `
75 SELECT
76 did, display_name, description, avatar_url, config_schema,
77 maintainer_did, source_url, communities_using, posts_created,
78 created_at, indexed_at, record_uri, record_cid
79 FROM aggregators
80 WHERE did = $1`
81
82 agg := &aggregators.Aggregator{}
83 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString
84 var configSchema []byte
85
86 err := r.db.QueryRowContext(ctx, query, did).Scan(
87 &agg.DID,
88 &agg.DisplayName,
89 &description,
90 &avatarCID,
91 &configSchema,
92 &maintainerDID,
93 &homepageURL,
94 &agg.CommunitiesUsing,
95 &agg.PostsCreated,
96 &agg.CreatedAt,
97 &agg.IndexedAt,
98 &recordURI,
99 &recordCID,
100 )
101
102 if err == sql.ErrNoRows {
103 return nil, aggregators.ErrAggregatorNotFound
104 }
105 if err != nil {
106 return nil, fmt.Errorf("failed to get aggregator: %w", err)
107 }
108
109 // Map nullable fields
110 agg.Description = description.String
111 agg.AvatarURL = avatarCID.String
112 agg.MaintainerDID = maintainerDID.String
113 agg.SourceURL = homepageURL.String
114 agg.RecordURI = recordURI.String
115 agg.RecordCID = recordCID.String
116 if configSchema != nil {
117 agg.ConfigSchema = configSchema
118 }
119
120 return agg, nil
121}
122
123// GetAggregatorsByDIDs retrieves multiple aggregators by DIDs in a single query (avoids N+1)
124func (r *postgresAggregatorRepo) GetAggregatorsByDIDs(ctx context.Context, dids []string) ([]*aggregators.Aggregator, error) {
125 if len(dids) == 0 {
126 return []*aggregators.Aggregator{}, nil
127 }
128
129 // Build IN clause with placeholders
130 placeholders := make([]string, len(dids))
131 args := make([]interface{}, len(dids))
132 for i, did := range dids {
133 placeholders[i] = fmt.Sprintf("$%d", i+1)
134 args[i] = did
135 }
136
137 query := fmt.Sprintf(`
138 SELECT
139 did, display_name, description, avatar_url, config_schema,
140 maintainer_did, source_url, communities_using, posts_created,
141 created_at, indexed_at, record_uri, record_cid
142 FROM aggregators
143 WHERE did IN (%s)`, strings.Join(placeholders, ", "))
144
145 rows, err := r.db.QueryContext(ctx, query, args...)
146 if err != nil {
147 return nil, fmt.Errorf("failed to get aggregators: %w", err)
148 }
149 defer rows.Close()
150
151 var results []*aggregators.Aggregator
152 for rows.Next() {
153 agg := &aggregators.Aggregator{}
154 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString
155 var configSchema []byte
156
157 err := rows.Scan(
158 &agg.DID,
159 &agg.DisplayName,
160 &description,
161 &avatarCID,
162 &configSchema,
163 &maintainerDID,
164 &homepageURL,
165 &agg.CommunitiesUsing,
166 &agg.PostsCreated,
167 &agg.CreatedAt,
168 &agg.IndexedAt,
169 &recordURI,
170 &recordCID,
171 )
172 if err != nil {
173 return nil, fmt.Errorf("failed to scan aggregator: %w", err)
174 }
175
176 // Map nullable fields
177 agg.Description = description.String
178 agg.AvatarURL = avatarCID.String
179 agg.MaintainerDID = maintainerDID.String
180 agg.SourceURL = homepageURL.String
181 agg.RecordURI = recordURI.String
182 agg.RecordCID = recordCID.String
183 if configSchema != nil {
184 agg.ConfigSchema = configSchema
185 }
186
187 results = append(results, agg)
188 }
189
190 if err = rows.Err(); err != nil {
191 return nil, fmt.Errorf("error iterating aggregators: %w", err)
192 }
193
194 return results, nil
195}
196
197// UpdateAggregator updates an existing aggregator
198func (r *postgresAggregatorRepo) UpdateAggregator(ctx context.Context, agg *aggregators.Aggregator) error {
199 query := `
200 UPDATE aggregators SET
201 display_name = $2,
202 description = $3,
203 avatar_url = $4,
204 config_schema = $5,
205 maintainer_did = $6,
206 source_url = $7,
207 created_at = $8,
208 indexed_at = $9,
209 record_uri = $10,
210 record_cid = $11
211 WHERE did = $1`
212
213 var configSchema interface{}
214 if len(agg.ConfigSchema) > 0 {
215 configSchema = agg.ConfigSchema
216 } else {
217 configSchema = nil
218 }
219
220 result, err := r.db.ExecContext(ctx, query,
221 agg.DID,
222 agg.DisplayName,
223 nullString(agg.Description),
224 nullString(agg.AvatarURL),
225 configSchema,
226 nullString(agg.MaintainerDID),
227 nullString(agg.SourceURL),
228 agg.CreatedAt,
229 agg.IndexedAt,
230 nullString(agg.RecordURI),
231 nullString(agg.RecordCID),
232 )
233
234 if err != nil {
235 return fmt.Errorf("failed to update aggregator: %w", err)
236 }
237
238 rows, err := result.RowsAffected()
239 if err != nil {
240 return fmt.Errorf("failed to get rows affected: %w", err)
241 }
242 if rows == 0 {
243 return aggregators.ErrAggregatorNotFound
244 }
245
246 return nil
247}
248
249// DeleteAggregator removes an aggregator (cascade deletes authorizations and posts via FK)
250func (r *postgresAggregatorRepo) DeleteAggregator(ctx context.Context, did string) error {
251 query := `DELETE FROM aggregators WHERE did = $1`
252
253 result, err := r.db.ExecContext(ctx, query, did)
254 if err != nil {
255 return fmt.Errorf("failed to delete aggregator: %w", err)
256 }
257
258 rows, err := result.RowsAffected()
259 if err != nil {
260 return fmt.Errorf("failed to get rows affected: %w", err)
261 }
262 if rows == 0 {
263 return aggregators.ErrAggregatorNotFound
264 }
265
266 return nil
267}
268
269// ListAggregators retrieves all aggregators with pagination
270func (r *postgresAggregatorRepo) ListAggregators(ctx context.Context, limit, offset int) ([]*aggregators.Aggregator, error) {
271 query := `
272 SELECT
273 did, display_name, description, avatar_url, config_schema,
274 maintainer_did, source_url, communities_using, posts_created,
275 created_at, indexed_at, record_uri, record_cid
276 FROM aggregators
277 ORDER BY communities_using DESC, display_name ASC
278 LIMIT $1 OFFSET $2`
279
280 rows, err := r.db.QueryContext(ctx, query, limit, offset)
281 if err != nil {
282 return nil, fmt.Errorf("failed to list aggregators: %w", err)
283 }
284 defer rows.Close()
285
286 var aggs []*aggregators.Aggregator
287 for rows.Next() {
288 agg := &aggregators.Aggregator{}
289 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString
290 var configSchema []byte
291
292 err := rows.Scan(
293 &agg.DID,
294 &agg.DisplayName,
295 &description,
296 &avatarCID,
297 &configSchema,
298 &maintainerDID,
299 &homepageURL,
300 &agg.CommunitiesUsing,
301 &agg.PostsCreated,
302 &agg.CreatedAt,
303 &agg.IndexedAt,
304 &recordURI,
305 &recordCID,
306 )
307 if err != nil {
308 return nil, fmt.Errorf("failed to scan aggregator: %w", err)
309 }
310
311 // Map nullable fields
312 agg.Description = description.String
313 agg.AvatarURL = avatarCID.String
314 agg.MaintainerDID = maintainerDID.String
315 agg.SourceURL = homepageURL.String
316 agg.RecordURI = recordURI.String
317 agg.RecordCID = recordCID.String
318 if configSchema != nil {
319 agg.ConfigSchema = configSchema
320 }
321
322 aggs = append(aggs, agg)
323 }
324
325 if err = rows.Err(); err != nil {
326 return nil, fmt.Errorf("error iterating aggregators: %w", err)
327 }
328
329 return aggs, nil
330}
331
332// IsAggregator performs a fast existence check for post creation handler
333func (r *postgresAggregatorRepo) IsAggregator(ctx context.Context, did string) (bool, error) {
334 query := `SELECT EXISTS(SELECT 1 FROM aggregators WHERE did = $1)`
335
336 var exists bool
337 err := r.db.QueryRowContext(ctx, query, did).Scan(&exists)
338 if err != nil {
339 return false, fmt.Errorf("failed to check if aggregator exists: %w", err)
340 }
341
342 return exists, nil
343}
344
345// ===== Authorization CRUD Operations =====
346
347// CreateAuthorization indexes a new authorization from the firehose
348func (r *postgresAggregatorRepo) CreateAuthorization(ctx context.Context, auth *aggregators.Authorization) error {
349 query := `
350 INSERT INTO aggregator_authorizations (
351 aggregator_did, community_did, enabled, config,
352 created_at, created_by, disabled_at, disabled_by,
353 indexed_at, record_uri, record_cid
354 ) VALUES (
355 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11
356 )
357 ON CONFLICT (aggregator_did, community_did) DO UPDATE SET
358 enabled = EXCLUDED.enabled,
359 config = EXCLUDED.config,
360 created_at = EXCLUDED.created_at,
361 created_by = EXCLUDED.created_by,
362 disabled_at = EXCLUDED.disabled_at,
363 disabled_by = EXCLUDED.disabled_by,
364 indexed_at = EXCLUDED.indexed_at,
365 record_uri = EXCLUDED.record_uri,
366 record_cid = EXCLUDED.record_cid
367 RETURNING id`
368
369 var config interface{}
370 if len(auth.Config) > 0 {
371 config = auth.Config
372 } else {
373 config = nil
374 }
375
376 var disabledAt interface{}
377 if auth.DisabledAt != nil {
378 disabledAt = *auth.DisabledAt
379 } else {
380 disabledAt = nil
381 }
382
383 err := r.db.QueryRowContext(ctx, query,
384 auth.AggregatorDID,
385 auth.CommunityDID,
386 auth.Enabled,
387 config,
388 auth.CreatedAt,
389 auth.CreatedBy, // Required field, no nullString needed
390 disabledAt,
391 nullString(auth.DisabledBy),
392 auth.IndexedAt,
393 nullString(auth.RecordURI),
394 nullString(auth.RecordCID),
395 ).Scan(&auth.ID)
396
397 if err != nil {
398 // Check for foreign key violations
399 if strings.Contains(err.Error(), "fk_aggregator") {
400 return aggregators.ErrAggregatorNotFound
401 }
402 return fmt.Errorf("failed to create authorization: %w", err)
403 }
404
405 return nil
406}
407
408// GetAuthorization retrieves an authorization by aggregator and community DID
409func (r *postgresAggregatorRepo) GetAuthorization(ctx context.Context, aggregatorDID, communityDID string) (*aggregators.Authorization, error) {
410 query := `
411 SELECT
412 id, aggregator_did, community_did, enabled, config,
413 created_at, created_by, disabled_at, disabled_by,
414 indexed_at, record_uri, record_cid
415 FROM aggregator_authorizations
416 WHERE aggregator_did = $1 AND community_did = $2`
417
418 auth := &aggregators.Authorization{}
419 var config []byte
420 var createdBy, disabledBy, recordURI, recordCID sql.NullString
421 var disabledAt sql.NullTime
422
423 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID).Scan(
424 &auth.ID,
425 &auth.AggregatorDID,
426 &auth.CommunityDID,
427 &auth.Enabled,
428 &config,
429 &auth.CreatedAt,
430 &createdBy,
431 &disabledAt,
432 &disabledBy,
433 &auth.IndexedAt,
434 &recordURI,
435 &recordCID,
436 )
437
438 if err == sql.ErrNoRows {
439 return nil, aggregators.ErrAuthorizationNotFound
440 }
441 if err != nil {
442 return nil, fmt.Errorf("failed to get authorization: %w", err)
443 }
444
445 // Map nullable fields
446 auth.CreatedBy = createdBy.String
447 auth.DisabledBy = disabledBy.String
448 if disabledAt.Valid {
449 disabledAtVal := disabledAt.Time
450 auth.DisabledAt = &disabledAtVal
451 }
452 auth.RecordURI = recordURI.String
453 auth.RecordCID = recordCID.String
454 if config != nil {
455 auth.Config = config
456 }
457
458 return auth, nil
459}
460
461// GetAuthorizationByURI retrieves an authorization by record URI (for Jetstream delete operations)
462func (r *postgresAggregatorRepo) GetAuthorizationByURI(ctx context.Context, recordURI string) (*aggregators.Authorization, error) {
463 query := `
464 SELECT
465 id, aggregator_did, community_did, enabled, config,
466 created_at, created_by, disabled_at, disabled_by,
467 indexed_at, record_uri, record_cid
468 FROM aggregator_authorizations
469 WHERE record_uri = $1`
470
471 auth := &aggregators.Authorization{}
472 var config []byte
473 var createdBy, disabledBy, recordURIField, recordCID sql.NullString
474 var disabledAt sql.NullTime
475
476 err := r.db.QueryRowContext(ctx, query, recordURI).Scan(
477 &auth.ID,
478 &auth.AggregatorDID,
479 &auth.CommunityDID,
480 &auth.Enabled,
481 &config,
482 &auth.CreatedAt,
483 &createdBy,
484 &disabledAt,
485 &disabledBy,
486 &auth.IndexedAt,
487 &recordURIField,
488 &recordCID,
489 )
490
491 if err == sql.ErrNoRows {
492 return nil, aggregators.ErrAuthorizationNotFound
493 }
494 if err != nil {
495 return nil, fmt.Errorf("failed to get authorization by URI: %w", err)
496 }
497
498 // Map nullable fields
499 auth.CreatedBy = createdBy.String
500 auth.DisabledBy = disabledBy.String
501 if disabledAt.Valid {
502 disabledAtVal := disabledAt.Time
503 auth.DisabledAt = &disabledAtVal
504 }
505 auth.RecordURI = recordURIField.String
506 auth.RecordCID = recordCID.String
507 if config != nil {
508 auth.Config = config
509 }
510
511 return auth, nil
512}
513
514// UpdateAuthorization updates an existing authorization
515func (r *postgresAggregatorRepo) UpdateAuthorization(ctx context.Context, auth *aggregators.Authorization) error {
516 query := `
517 UPDATE aggregator_authorizations SET
518 enabled = $3,
519 config = $4,
520 created_at = $5,
521 created_by = $6,
522 disabled_at = $7,
523 disabled_by = $8,
524 indexed_at = $9,
525 record_uri = $10,
526 record_cid = $11
527 WHERE aggregator_did = $1 AND community_did = $2`
528
529 var config interface{}
530 if len(auth.Config) > 0 {
531 config = auth.Config
532 } else {
533 config = nil
534 }
535
536 var disabledAt interface{}
537 if auth.DisabledAt != nil {
538 disabledAt = *auth.DisabledAt
539 } else {
540 disabledAt = nil
541 }
542
543 result, err := r.db.ExecContext(ctx, query,
544 auth.AggregatorDID,
545 auth.CommunityDID,
546 auth.Enabled,
547 config,
548 auth.CreatedAt,
549 nullString(auth.CreatedBy),
550 disabledAt,
551 nullString(auth.DisabledBy),
552 auth.IndexedAt,
553 nullString(auth.RecordURI),
554 nullString(auth.RecordCID),
555 )
556
557 if err != nil {
558 return fmt.Errorf("failed to update authorization: %w", err)
559 }
560
561 rows, err := result.RowsAffected()
562 if err != nil {
563 return fmt.Errorf("failed to get rows affected: %w", err)
564 }
565 if rows == 0 {
566 return aggregators.ErrAuthorizationNotFound
567 }
568
569 return nil
570}
571
572// DeleteAuthorization removes an authorization
573func (r *postgresAggregatorRepo) DeleteAuthorization(ctx context.Context, aggregatorDID, communityDID string) error {
574 query := `DELETE FROM aggregator_authorizations WHERE aggregator_did = $1 AND community_did = $2`
575
576 result, err := r.db.ExecContext(ctx, query, aggregatorDID, communityDID)
577 if err != nil {
578 return fmt.Errorf("failed to delete authorization: %w", err)
579 }
580
581 rows, err := result.RowsAffected()
582 if err != nil {
583 return fmt.Errorf("failed to get rows affected: %w", err)
584 }
585 if rows == 0 {
586 return aggregators.ErrAuthorizationNotFound
587 }
588
589 return nil
590}
591
592// DeleteAuthorizationByURI removes an authorization by record URI (for Jetstream delete operations)
593func (r *postgresAggregatorRepo) DeleteAuthorizationByURI(ctx context.Context, recordURI string) error {
594 query := `DELETE FROM aggregator_authorizations WHERE record_uri = $1`
595
596 result, err := r.db.ExecContext(ctx, query, recordURI)
597 if err != nil {
598 return fmt.Errorf("failed to delete authorization by URI: %w", err)
599 }
600
601 rows, err := result.RowsAffected()
602 if err != nil {
603 return fmt.Errorf("failed to get rows affected: %w", err)
604 }
605 if rows == 0 {
606 return aggregators.ErrAuthorizationNotFound
607 }
608
609 return nil
610}
611
612// ===== Authorization Query Operations =====
613
614// ListAuthorizationsForAggregator retrieves all communities that authorized an aggregator
615func (r *postgresAggregatorRepo) ListAuthorizationsForAggregator(ctx context.Context, aggregatorDID string, enabledOnly bool, limit, offset int) ([]*aggregators.Authorization, error) {
616 baseQuery := `
617 SELECT
618 id, aggregator_did, community_did, enabled, config,
619 created_at, created_by, disabled_at, disabled_by,
620 indexed_at, record_uri, record_cid
621 FROM aggregator_authorizations
622 WHERE aggregator_did = $1`
623
624 var query string
625 var args []interface{}
626
627 if enabledOnly {
628 query = baseQuery + ` AND enabled = true ORDER BY created_at DESC LIMIT $2 OFFSET $3`
629 args = []interface{}{aggregatorDID, limit, offset}
630 } else {
631 query = baseQuery + ` ORDER BY created_at DESC LIMIT $2 OFFSET $3`
632 args = []interface{}{aggregatorDID, limit, offset}
633 }
634
635 rows, err := r.db.QueryContext(ctx, query, args...)
636 if err != nil {
637 return nil, fmt.Errorf("failed to list authorizations for aggregator: %w", err)
638 }
639 defer rows.Close()
640
641 return scanAuthorizations(rows)
642}
643
644// ListAuthorizationsForCommunity retrieves all aggregators authorized by a community
645func (r *postgresAggregatorRepo) ListAuthorizationsForCommunity(ctx context.Context, communityDID string, enabledOnly bool, limit, offset int) ([]*aggregators.Authorization, error) {
646 baseQuery := `
647 SELECT
648 id, aggregator_did, community_did, enabled, config,
649 created_at, created_by, disabled_at, disabled_by,
650 indexed_at, record_uri, record_cid
651 FROM aggregator_authorizations
652 WHERE community_did = $1`
653
654 var query string
655 var args []interface{}
656
657 if enabledOnly {
658 query = baseQuery + ` AND enabled = true ORDER BY created_at DESC LIMIT $2 OFFSET $3`
659 args = []interface{}{communityDID, limit, offset}
660 } else {
661 query = baseQuery + ` ORDER BY created_at DESC LIMIT $2 OFFSET $3`
662 args = []interface{}{communityDID, limit, offset}
663 }
664
665 rows, err := r.db.QueryContext(ctx, query, args...)
666 if err != nil {
667 return nil, fmt.Errorf("failed to list authorizations for community: %w", err)
668 }
669 defer rows.Close()
670
671 return scanAuthorizations(rows)
672}
673
674// IsAuthorized performs a fast authorization check (enabled=true)
675// Uses the optimized partial index: idx_aggregator_auth_enabled
676func (r *postgresAggregatorRepo) IsAuthorized(ctx context.Context, aggregatorDID, communityDID string) (bool, error) {
677 query := `
678 SELECT EXISTS(
679 SELECT 1 FROM aggregator_authorizations
680 WHERE aggregator_did = $1 AND community_did = $2 AND enabled = true
681 )`
682
683 var authorized bool
684 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID).Scan(&authorized)
685 if err != nil {
686 return false, fmt.Errorf("failed to check authorization: %w", err)
687 }
688
689 return authorized, nil
690}
691
692// ===== Post Tracking Operations =====
693
694// RecordAggregatorPost tracks a post created by an aggregator (for rate limiting and stats)
695func (r *postgresAggregatorRepo) RecordAggregatorPost(ctx context.Context, aggregatorDID, communityDID, postURI, postCID string) error {
696 query := `
697 INSERT INTO aggregator_posts (aggregator_did, community_did, post_uri, post_cid, created_at)
698 VALUES ($1, $2, $3, $4, NOW())`
699
700 _, err := r.db.ExecContext(ctx, query, aggregatorDID, communityDID, postURI, postCID)
701 if err != nil {
702 return fmt.Errorf("failed to record aggregator post: %w", err)
703 }
704
705 return nil
706}
707
708// CountRecentPosts counts posts created by an aggregator in a community since a given time
709// Uses the optimized index: idx_aggregator_posts_rate_limit
710func (r *postgresAggregatorRepo) CountRecentPosts(ctx context.Context, aggregatorDID, communityDID string, since time.Time) (int, error) {
711 query := `
712 SELECT COUNT(*)
713 FROM aggregator_posts
714 WHERE aggregator_did = $1 AND community_did = $2 AND created_at >= $3`
715
716 var count int
717 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID, since).Scan(&count)
718 if err != nil {
719 return 0, fmt.Errorf("failed to count recent posts: %w", err)
720 }
721
722 return count, nil
723}
724
725// GetRecentPosts retrieves recent posts created by an aggregator in a community
726func (r *postgresAggregatorRepo) GetRecentPosts(ctx context.Context, aggregatorDID, communityDID string, since time.Time) ([]*aggregators.AggregatorPost, error) {
727 query := `
728 SELECT id, aggregator_did, community_did, post_uri, created_at
729 FROM aggregator_posts
730 WHERE aggregator_did = $1 AND community_did = $2 AND created_at >= $3
731 ORDER BY created_at DESC`
732
733 rows, err := r.db.QueryContext(ctx, query, aggregatorDID, communityDID, since)
734 if err != nil {
735 return nil, fmt.Errorf("failed to get recent posts: %w", err)
736 }
737 defer rows.Close()
738
739 var posts []*aggregators.AggregatorPost
740 for rows.Next() {
741 post := &aggregators.AggregatorPost{}
742 err := rows.Scan(
743 &post.ID,
744 &post.AggregatorDID,
745 &post.CommunityDID,
746 &post.PostURI,
747 &post.CreatedAt,
748 )
749 if err != nil {
750 return nil, fmt.Errorf("failed to scan aggregator post: %w", err)
751 }
752 posts = append(posts, post)
753 }
754
755 if err = rows.Err(); err != nil {
756 return nil, fmt.Errorf("error iterating aggregator posts: %w", err)
757 }
758
759 return posts, nil
760}
761
762// ===== Helper Functions =====
763
764// scanAuthorizations is a helper to scan multiple authorization rows
765func scanAuthorizations(rows *sql.Rows) ([]*aggregators.Authorization, error) {
766 var auths []*aggregators.Authorization
767
768 for rows.Next() {
769 auth := &aggregators.Authorization{}
770 var config []byte
771 var createdBy, disabledBy, recordURI, recordCID sql.NullString
772 var disabledAt sql.NullTime
773
774 err := rows.Scan(
775 &auth.ID,
776 &auth.AggregatorDID,
777 &auth.CommunityDID,
778 &auth.Enabled,
779 &config,
780 &auth.CreatedAt,
781 &createdBy,
782 &disabledAt,
783 &disabledBy,
784 &auth.IndexedAt,
785 &recordURI,
786 &recordCID,
787 )
788 if err != nil {
789 return nil, fmt.Errorf("failed to scan authorization: %w", err)
790 }
791
792 // Map nullable fields
793 auth.CreatedBy = createdBy.String
794 auth.DisabledBy = disabledBy.String
795 if disabledAt.Valid {
796 disabledAtVal := disabledAt.Time
797 auth.DisabledAt = &disabledAtVal
798 }
799 auth.RecordURI = recordURI.String
800 auth.RecordCID = recordCID.String
801 if config != nil {
802 auth.Config = config
803 }
804
805 auths = append(auths, auth)
806 }
807
808 if err := rows.Err(); err != nil {
809 return nil, fmt.Errorf("error iterating authorizations: %w", err)
810 }
811
812 return auths, nil
813}