···
4
+
"Coves/internal/core/aggregators"
12
+
type postgresAggregatorRepo struct {
16
+
// NewAggregatorRepository creates a new PostgreSQL aggregator repository
17
+
func NewAggregatorRepository(db *sql.DB) aggregators.Repository {
18
+
return &postgresAggregatorRepo{db: db}
21
+
// ===== Aggregator CRUD Operations =====
23
+
// CreateAggregator indexes a new aggregator service declaration from the firehose
24
+
func (r *postgresAggregatorRepo) CreateAggregator(ctx context.Context, agg *aggregators.Aggregator) error {
26
+
INSERT INTO aggregators (
27
+
did, display_name, description, avatar_url, config_schema,
28
+
maintainer_did, source_url, created_at, indexed_at, record_uri, record_cid
30
+
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11
32
+
ON CONFLICT (did) DO UPDATE SET
33
+
display_name = EXCLUDED.display_name,
34
+
description = EXCLUDED.description,
35
+
avatar_url = EXCLUDED.avatar_url,
36
+
config_schema = EXCLUDED.config_schema,
37
+
maintainer_did = EXCLUDED.maintainer_did,
38
+
source_url = EXCLUDED.source_url,
39
+
created_at = EXCLUDED.created_at,
40
+
indexed_at = EXCLUDED.indexed_at,
41
+
record_uri = EXCLUDED.record_uri,
42
+
record_cid = EXCLUDED.record_cid`
44
+
var configSchema interface{}
45
+
if len(agg.ConfigSchema) > 0 {
46
+
configSchema = agg.ConfigSchema
51
+
_, err := r.db.ExecContext(ctx, query,
54
+
nullString(agg.Description),
55
+
nullString(agg.AvatarURL),
57
+
nullString(agg.MaintainerDID),
58
+
nullString(agg.SourceURL),
61
+
nullString(agg.RecordURI),
62
+
nullString(agg.RecordCID),
66
+
return fmt.Errorf("failed to create aggregator: %w", err)
72
+
// GetAggregator retrieves an aggregator by DID
73
+
func (r *postgresAggregatorRepo) GetAggregator(ctx context.Context, did string) (*aggregators.Aggregator, error) {
76
+
did, display_name, description, avatar_url, config_schema,
77
+
maintainer_did, source_url, communities_using, posts_created,
78
+
created_at, indexed_at, record_uri, record_cid
82
+
agg := &aggregators.Aggregator{}
83
+
var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString
84
+
var configSchema []byte
86
+
err := r.db.QueryRowContext(ctx, query, did).Scan(
94
+
&agg.CommunitiesUsing,
102
+
if err == sql.ErrNoRows {
103
+
return nil, aggregators.ErrAggregatorNotFound
106
+
return nil, fmt.Errorf("failed to get aggregator: %w", err)
109
+
// Map nullable fields
110
+
agg.Description = description.String
111
+
agg.AvatarURL = avatarCID.String
112
+
agg.MaintainerDID = maintainerDID.String
113
+
agg.SourceURL = homepageURL.String
114
+
agg.RecordURI = recordURI.String
115
+
agg.RecordCID = recordCID.String
116
+
if configSchema != nil {
117
+
agg.ConfigSchema = configSchema
123
+
// GetAggregatorsByDIDs retrieves multiple aggregators by DIDs in a single query (avoids N+1)
124
+
func (r *postgresAggregatorRepo) GetAggregatorsByDIDs(ctx context.Context, dids []string) ([]*aggregators.Aggregator, error) {
125
+
if len(dids) == 0 {
126
+
return []*aggregators.Aggregator{}, nil
129
+
// Build IN clause with placeholders
130
+
placeholders := make([]string, len(dids))
131
+
args := make([]interface{}, len(dids))
132
+
for i, did := range dids {
133
+
placeholders[i] = fmt.Sprintf("$%d", i+1)
137
+
query := fmt.Sprintf(`
139
+
did, display_name, description, avatar_url, config_schema,
140
+
maintainer_did, source_url, communities_using, posts_created,
141
+
created_at, indexed_at, record_uri, record_cid
143
+
WHERE did IN (%s)`, strings.Join(placeholders, ", "))
145
+
rows, err := r.db.QueryContext(ctx, query, args...)
147
+
return nil, fmt.Errorf("failed to get aggregators: %w", err)
151
+
var results []*aggregators.Aggregator
153
+
agg := &aggregators.Aggregator{}
154
+
var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString
155
+
var configSchema []byte
165
+
&agg.CommunitiesUsing,
173
+
return nil, fmt.Errorf("failed to scan aggregator: %w", err)
176
+
// Map nullable fields
177
+
agg.Description = description.String
178
+
agg.AvatarURL = avatarCID.String
179
+
agg.MaintainerDID = maintainerDID.String
180
+
agg.SourceURL = homepageURL.String
181
+
agg.RecordURI = recordURI.String
182
+
agg.RecordCID = recordCID.String
183
+
if configSchema != nil {
184
+
agg.ConfigSchema = configSchema
187
+
results = append(results, agg)
190
+
if err = rows.Err(); err != nil {
191
+
return nil, fmt.Errorf("error iterating aggregators: %w", err)
194
+
return results, nil
197
+
// UpdateAggregator updates an existing aggregator
198
+
func (r *postgresAggregatorRepo) UpdateAggregator(ctx context.Context, agg *aggregators.Aggregator) error {
200
+
UPDATE aggregators SET
204
+
config_schema = $5,
205
+
maintainer_did = $6,
213
+
var configSchema interface{}
214
+
if len(agg.ConfigSchema) > 0 {
215
+
configSchema = agg.ConfigSchema
220
+
result, err := r.db.ExecContext(ctx, query,
223
+
nullString(agg.Description),
224
+
nullString(agg.AvatarURL),
226
+
nullString(agg.MaintainerDID),
227
+
nullString(agg.SourceURL),
230
+
nullString(agg.RecordURI),
231
+
nullString(agg.RecordCID),
235
+
return fmt.Errorf("failed to update aggregator: %w", err)
238
+
rows, err := result.RowsAffected()
240
+
return fmt.Errorf("failed to get rows affected: %w", err)
243
+
return aggregators.ErrAggregatorNotFound
249
+
// DeleteAggregator removes an aggregator (cascade deletes authorizations and posts via FK)
250
+
func (r *postgresAggregatorRepo) DeleteAggregator(ctx context.Context, did string) error {
251
+
query := `DELETE FROM aggregators WHERE did = $1`
253
+
result, err := r.db.ExecContext(ctx, query, did)
255
+
return fmt.Errorf("failed to delete aggregator: %w", err)
258
+
rows, err := result.RowsAffected()
260
+
return fmt.Errorf("failed to get rows affected: %w", err)
263
+
return aggregators.ErrAggregatorNotFound
269
+
// ListAggregators retrieves all aggregators with pagination
270
+
func (r *postgresAggregatorRepo) ListAggregators(ctx context.Context, limit, offset int) ([]*aggregators.Aggregator, error) {
273
+
did, display_name, description, avatar_url, config_schema,
274
+
maintainer_did, source_url, communities_using, posts_created,
275
+
created_at, indexed_at, record_uri, record_cid
277
+
ORDER BY communities_using DESC, display_name ASC
278
+
LIMIT $1 OFFSET $2`
280
+
rows, err := r.db.QueryContext(ctx, query, limit, offset)
282
+
return nil, fmt.Errorf("failed to list aggregators: %w", err)
286
+
var aggs []*aggregators.Aggregator
288
+
agg := &aggregators.Aggregator{}
289
+
var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString
290
+
var configSchema []byte
300
+
&agg.CommunitiesUsing,
308
+
return nil, fmt.Errorf("failed to scan aggregator: %w", err)
311
+
// Map nullable fields
312
+
agg.Description = description.String
313
+
agg.AvatarURL = avatarCID.String
314
+
agg.MaintainerDID = maintainerDID.String
315
+
agg.SourceURL = homepageURL.String
316
+
agg.RecordURI = recordURI.String
317
+
agg.RecordCID = recordCID.String
318
+
if configSchema != nil {
319
+
agg.ConfigSchema = configSchema
322
+
aggs = append(aggs, agg)
325
+
if err = rows.Err(); err != nil {
326
+
return nil, fmt.Errorf("error iterating aggregators: %w", err)
332
+
// IsAggregator performs a fast existence check for post creation handler
333
+
func (r *postgresAggregatorRepo) IsAggregator(ctx context.Context, did string) (bool, error) {
334
+
query := `SELECT EXISTS(SELECT 1 FROM aggregators WHERE did = $1)`
337
+
err := r.db.QueryRowContext(ctx, query, did).Scan(&exists)
339
+
return false, fmt.Errorf("failed to check if aggregator exists: %w", err)
345
+
// ===== Authorization CRUD Operations =====
347
+
// CreateAuthorization indexes a new authorization from the firehose
348
+
func (r *postgresAggregatorRepo) CreateAuthorization(ctx context.Context, auth *aggregators.Authorization) error {
350
+
INSERT INTO aggregator_authorizations (
351
+
aggregator_did, community_did, enabled, config,
352
+
created_at, created_by, disabled_at, disabled_by,
353
+
indexed_at, record_uri, record_cid
355
+
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11
357
+
ON CONFLICT (aggregator_did, community_did) DO UPDATE SET
358
+
enabled = EXCLUDED.enabled,
359
+
config = EXCLUDED.config,
360
+
created_at = EXCLUDED.created_at,
361
+
created_by = EXCLUDED.created_by,
362
+
disabled_at = EXCLUDED.disabled_at,
363
+
disabled_by = EXCLUDED.disabled_by,
364
+
indexed_at = EXCLUDED.indexed_at,
365
+
record_uri = EXCLUDED.record_uri,
366
+
record_cid = EXCLUDED.record_cid
369
+
var config interface{}
370
+
if len(auth.Config) > 0 {
371
+
config = auth.Config
376
+
var disabledAt interface{}
377
+
if auth.DisabledAt != nil {
378
+
disabledAt = *auth.DisabledAt
383
+
err := r.db.QueryRowContext(ctx, query,
384
+
auth.AggregatorDID,
389
+
auth.CreatedBy, // Required field, no nullString needed
391
+
nullString(auth.DisabledBy),
393
+
nullString(auth.RecordURI),
394
+
nullString(auth.RecordCID),
398
+
// Check for foreign key violations
399
+
if strings.Contains(err.Error(), "fk_aggregator") {
400
+
return aggregators.ErrAggregatorNotFound
402
+
return fmt.Errorf("failed to create authorization: %w", err)
408
+
// GetAuthorization retrieves an authorization by aggregator and community DID
409
+
func (r *postgresAggregatorRepo) GetAuthorization(ctx context.Context, aggregatorDID, communityDID string) (*aggregators.Authorization, error) {
412
+
id, aggregator_did, community_did, enabled, config,
413
+
created_at, created_by, disabled_at, disabled_by,
414
+
indexed_at, record_uri, record_cid
415
+
FROM aggregator_authorizations
416
+
WHERE aggregator_did = $1 AND community_did = $2`
418
+
auth := &aggregators.Authorization{}
420
+
var createdBy, disabledBy, recordURI, recordCID sql.NullString
421
+
var disabledAt sql.NullTime
423
+
err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID).Scan(
425
+
&auth.AggregatorDID,
426
+
&auth.CommunityDID,
438
+
if err == sql.ErrNoRows {
439
+
return nil, aggregators.ErrAuthorizationNotFound
442
+
return nil, fmt.Errorf("failed to get authorization: %w", err)
445
+
// Map nullable fields
446
+
auth.CreatedBy = createdBy.String
447
+
auth.DisabledBy = disabledBy.String
448
+
if disabledAt.Valid {
449
+
disabledAtVal := disabledAt.Time
450
+
auth.DisabledAt = &disabledAtVal
452
+
auth.RecordURI = recordURI.String
453
+
auth.RecordCID = recordCID.String
455
+
auth.Config = config
461
+
// GetAuthorizationByURI retrieves an authorization by record URI (for Jetstream delete operations)
462
+
func (r *postgresAggregatorRepo) GetAuthorizationByURI(ctx context.Context, recordURI string) (*aggregators.Authorization, error) {
465
+
id, aggregator_did, community_did, enabled, config,
466
+
created_at, created_by, disabled_at, disabled_by,
467
+
indexed_at, record_uri, record_cid
468
+
FROM aggregator_authorizations
469
+
WHERE record_uri = $1`
471
+
auth := &aggregators.Authorization{}
473
+
var createdBy, disabledBy, recordURIField, recordCID sql.NullString
474
+
var disabledAt sql.NullTime
476
+
err := r.db.QueryRowContext(ctx, query, recordURI).Scan(
478
+
&auth.AggregatorDID,
479
+
&auth.CommunityDID,
491
+
if err == sql.ErrNoRows {
492
+
return nil, aggregators.ErrAuthorizationNotFound
495
+
return nil, fmt.Errorf("failed to get authorization by URI: %w", err)
498
+
// Map nullable fields
499
+
auth.CreatedBy = createdBy.String
500
+
auth.DisabledBy = disabledBy.String
501
+
if disabledAt.Valid {
502
+
disabledAtVal := disabledAt.Time
503
+
auth.DisabledAt = &disabledAtVal
505
+
auth.RecordURI = recordURIField.String
506
+
auth.RecordCID = recordCID.String
508
+
auth.Config = config
514
+
// UpdateAuthorization updates an existing authorization
515
+
func (r *postgresAggregatorRepo) UpdateAuthorization(ctx context.Context, auth *aggregators.Authorization) error {
517
+
UPDATE aggregator_authorizations SET
527
+
WHERE aggregator_did = $1 AND community_did = $2`
529
+
var config interface{}
530
+
if len(auth.Config) > 0 {
531
+
config = auth.Config
536
+
var disabledAt interface{}
537
+
if auth.DisabledAt != nil {
538
+
disabledAt = *auth.DisabledAt
543
+
result, err := r.db.ExecContext(ctx, query,
544
+
auth.AggregatorDID,
549
+
nullString(auth.CreatedBy),
551
+
nullString(auth.DisabledBy),
553
+
nullString(auth.RecordURI),
554
+
nullString(auth.RecordCID),
558
+
return fmt.Errorf("failed to update authorization: %w", err)
561
+
rows, err := result.RowsAffected()
563
+
return fmt.Errorf("failed to get rows affected: %w", err)
566
+
return aggregators.ErrAuthorizationNotFound
572
+
// DeleteAuthorization removes an authorization
573
+
func (r *postgresAggregatorRepo) DeleteAuthorization(ctx context.Context, aggregatorDID, communityDID string) error {
574
+
query := `DELETE FROM aggregator_authorizations WHERE aggregator_did = $1 AND community_did = $2`
576
+
result, err := r.db.ExecContext(ctx, query, aggregatorDID, communityDID)
578
+
return fmt.Errorf("failed to delete authorization: %w", err)
581
+
rows, err := result.RowsAffected()
583
+
return fmt.Errorf("failed to get rows affected: %w", err)
586
+
return aggregators.ErrAuthorizationNotFound
592
+
// DeleteAuthorizationByURI removes an authorization by record URI (for Jetstream delete operations)
593
+
func (r *postgresAggregatorRepo) DeleteAuthorizationByURI(ctx context.Context, recordURI string) error {
594
+
query := `DELETE FROM aggregator_authorizations WHERE record_uri = $1`
596
+
result, err := r.db.ExecContext(ctx, query, recordURI)
598
+
return fmt.Errorf("failed to delete authorization by URI: %w", err)
601
+
rows, err := result.RowsAffected()
603
+
return fmt.Errorf("failed to get rows affected: %w", err)
606
+
return aggregators.ErrAuthorizationNotFound
612
+
// ===== Authorization Query Operations =====
614
+
// ListAuthorizationsForAggregator retrieves all communities that authorized an aggregator
615
+
func (r *postgresAggregatorRepo) ListAuthorizationsForAggregator(ctx context.Context, aggregatorDID string, enabledOnly bool, limit, offset int) ([]*aggregators.Authorization, error) {
618
+
id, aggregator_did, community_did, enabled, config,
619
+
created_at, created_by, disabled_at, disabled_by,
620
+
indexed_at, record_uri, record_cid
621
+
FROM aggregator_authorizations
622
+
WHERE aggregator_did = $1`
625
+
var args []interface{}
628
+
query = baseQuery + ` AND enabled = true ORDER BY created_at DESC LIMIT $2 OFFSET $3`
629
+
args = []interface{}{aggregatorDID, limit, offset}
631
+
query = baseQuery + ` ORDER BY created_at DESC LIMIT $2 OFFSET $3`
632
+
args = []interface{}{aggregatorDID, limit, offset}
635
+
rows, err := r.db.QueryContext(ctx, query, args...)
637
+
return nil, fmt.Errorf("failed to list authorizations for aggregator: %w", err)
641
+
return scanAuthorizations(rows)
644
+
// ListAuthorizationsForCommunity retrieves all aggregators authorized by a community
645
+
func (r *postgresAggregatorRepo) ListAuthorizationsForCommunity(ctx context.Context, communityDID string, enabledOnly bool, limit, offset int) ([]*aggregators.Authorization, error) {
648
+
id, aggregator_did, community_did, enabled, config,
649
+
created_at, created_by, disabled_at, disabled_by,
650
+
indexed_at, record_uri, record_cid
651
+
FROM aggregator_authorizations
652
+
WHERE community_did = $1`
655
+
var args []interface{}
658
+
query = baseQuery + ` AND enabled = true ORDER BY created_at DESC LIMIT $2 OFFSET $3`
659
+
args = []interface{}{communityDID, limit, offset}
661
+
query = baseQuery + ` ORDER BY created_at DESC LIMIT $2 OFFSET $3`
662
+
args = []interface{}{communityDID, limit, offset}
665
+
rows, err := r.db.QueryContext(ctx, query, args...)
667
+
return nil, fmt.Errorf("failed to list authorizations for community: %w", err)
671
+
return scanAuthorizations(rows)
674
+
// IsAuthorized performs a fast authorization check (enabled=true)
675
+
// Uses the optimized partial index: idx_aggregator_auth_enabled
676
+
func (r *postgresAggregatorRepo) IsAuthorized(ctx context.Context, aggregatorDID, communityDID string) (bool, error) {
679
+
SELECT 1 FROM aggregator_authorizations
680
+
WHERE aggregator_did = $1 AND community_did = $2 AND enabled = true
683
+
var authorized bool
684
+
err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID).Scan(&authorized)
686
+
return false, fmt.Errorf("failed to check authorization: %w", err)
689
+
return authorized, nil
692
+
// ===== Post Tracking Operations =====
694
+
// RecordAggregatorPost tracks a post created by an aggregator (for rate limiting and stats)
695
+
func (r *postgresAggregatorRepo) RecordAggregatorPost(ctx context.Context, aggregatorDID, communityDID, postURI, postCID string) error {
697
+
INSERT INTO aggregator_posts (aggregator_did, community_did, post_uri, post_cid, created_at)
698
+
VALUES ($1, $2, $3, $4, NOW())`
700
+
_, err := r.db.ExecContext(ctx, query, aggregatorDID, communityDID, postURI, postCID)
702
+
return fmt.Errorf("failed to record aggregator post: %w", err)
708
+
// CountRecentPosts counts posts created by an aggregator in a community since a given time
709
+
// Uses the optimized index: idx_aggregator_posts_rate_limit
710
+
func (r *postgresAggregatorRepo) CountRecentPosts(ctx context.Context, aggregatorDID, communityDID string, since time.Time) (int, error) {
713
+
FROM aggregator_posts
714
+
WHERE aggregator_did = $1 AND community_did = $2 AND created_at >= $3`
717
+
err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID, since).Scan(&count)
719
+
return 0, fmt.Errorf("failed to count recent posts: %w", err)
725
+
// GetRecentPosts retrieves recent posts created by an aggregator in a community
726
+
func (r *postgresAggregatorRepo) GetRecentPosts(ctx context.Context, aggregatorDID, communityDID string, since time.Time) ([]*aggregators.AggregatorPost, error) {
728
+
SELECT id, aggregator_did, community_did, post_uri, created_at
729
+
FROM aggregator_posts
730
+
WHERE aggregator_did = $1 AND community_did = $2 AND created_at >= $3
731
+
ORDER BY created_at DESC`
733
+
rows, err := r.db.QueryContext(ctx, query, aggregatorDID, communityDID, since)
735
+
return nil, fmt.Errorf("failed to get recent posts: %w", err)
739
+
var posts []*aggregators.AggregatorPost
741
+
post := &aggregators.AggregatorPost{}
744
+
&post.AggregatorDID,
745
+
&post.CommunityDID,
750
+
return nil, fmt.Errorf("failed to scan aggregator post: %w", err)
752
+
posts = append(posts, post)
755
+
if err = rows.Err(); err != nil {
756
+
return nil, fmt.Errorf("error iterating aggregator posts: %w", err)
762
+
// ===== Helper Functions =====
764
+
// scanAuthorizations is a helper to scan multiple authorization rows
765
+
func scanAuthorizations(rows *sql.Rows) ([]*aggregators.Authorization, error) {
766
+
var auths []*aggregators.Authorization
769
+
auth := &aggregators.Authorization{}
771
+
var createdBy, disabledBy, recordURI, recordCID sql.NullString
772
+
var disabledAt sql.NullTime
776
+
&auth.AggregatorDID,
777
+
&auth.CommunityDID,
789
+
return nil, fmt.Errorf("failed to scan authorization: %w", err)
792
+
// Map nullable fields
793
+
auth.CreatedBy = createdBy.String
794
+
auth.DisabledBy = disabledBy.String
795
+
if disabledAt.Valid {
796
+
disabledAtVal := disabledAt.Time
797
+
auth.DisabledAt = &disabledAtVal
799
+
auth.RecordURI = recordURI.String
800
+
auth.RecordCID = recordCID.String
802
+
auth.Config = config
805
+
auths = append(auths, auth)
808
+
if err := rows.Err(); err != nil {
809
+
return nil, fmt.Errorf("error iterating authorizations: %w", err)