A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "strings" 8 "time" 9 10 "Coves/internal/core/aggregators" 11) 12 13type postgresAggregatorRepo struct { 14 db *sql.DB 15} 16 17// NewAggregatorRepository creates a new PostgreSQL aggregator repository 18func NewAggregatorRepository(db *sql.DB) aggregators.Repository { 19 return &postgresAggregatorRepo{db: db} 20} 21 22// ===== Aggregator CRUD Operations ===== 23 24// CreateAggregator indexes a new aggregator service declaration from the firehose 25func (r *postgresAggregatorRepo) CreateAggregator(ctx context.Context, agg *aggregators.Aggregator) error { 26 query := ` 27 INSERT INTO aggregators ( 28 did, display_name, description, avatar_url, config_schema, 29 maintainer_did, source_url, created_at, indexed_at, record_uri, record_cid 30 ) VALUES ( 31 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 32 ) 33 ON CONFLICT (did) DO UPDATE SET 34 display_name = EXCLUDED.display_name, 35 description = EXCLUDED.description, 36 avatar_url = EXCLUDED.avatar_url, 37 config_schema = EXCLUDED.config_schema, 38 maintainer_did = EXCLUDED.maintainer_did, 39 source_url = EXCLUDED.source_url, 40 created_at = EXCLUDED.created_at, 41 indexed_at = EXCLUDED.indexed_at, 42 record_uri = EXCLUDED.record_uri, 43 record_cid = EXCLUDED.record_cid` 44 45 var configSchema interface{} 46 if len(agg.ConfigSchema) > 0 { 47 configSchema = agg.ConfigSchema 48 } else { 49 configSchema = nil 50 } 51 52 _, err := r.db.ExecContext(ctx, query, 53 agg.DID, 54 agg.DisplayName, 55 nullString(agg.Description), 56 nullString(agg.AvatarURL), 57 configSchema, 58 nullString(agg.MaintainerDID), 59 nullString(agg.SourceURL), 60 agg.CreatedAt, 61 agg.IndexedAt, 62 nullString(agg.RecordURI), 63 nullString(agg.RecordCID), 64 ) 65 if err != nil { 66 return fmt.Errorf("failed to create aggregator: %w", err) 67 } 68 69 return nil 70} 71 72// GetAggregator retrieves an aggregator by DID 73func (r *postgresAggregatorRepo) GetAggregator(ctx context.Context, did string) (*aggregators.Aggregator, error) { 74 query := ` 75 SELECT 76 did, display_name, description, avatar_url, config_schema, 77 maintainer_did, source_url, communities_using, posts_created, 78 created_at, indexed_at, record_uri, record_cid 79 FROM aggregators 80 WHERE did = $1` 81 82 agg := &aggregators.Aggregator{} 83 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString 84 var configSchema []byte 85 86 err := r.db.QueryRowContext(ctx, query, did).Scan( 87 &agg.DID, 88 &agg.DisplayName, 89 &description, 90 &avatarCID, 91 &configSchema, 92 &maintainerDID, 93 &homepageURL, 94 &agg.CommunitiesUsing, 95 &agg.PostsCreated, 96 &agg.CreatedAt, 97 &agg.IndexedAt, 98 &recordURI, 99 &recordCID, 100 ) 101 102 if err == sql.ErrNoRows { 103 return nil, aggregators.ErrAggregatorNotFound 104 } 105 if err != nil { 106 return nil, fmt.Errorf("failed to get aggregator: %w", err) 107 } 108 109 // Map nullable fields 110 agg.Description = description.String 111 agg.AvatarURL = avatarCID.String 112 agg.MaintainerDID = maintainerDID.String 113 agg.SourceURL = homepageURL.String 114 agg.RecordURI = recordURI.String 115 agg.RecordCID = recordCID.String 116 if configSchema != nil { 117 agg.ConfigSchema = configSchema 118 } 119 120 return agg, nil 121} 122 123// GetAggregatorsByDIDs retrieves multiple aggregators by DIDs in a single query (avoids N+1) 124func (r *postgresAggregatorRepo) GetAggregatorsByDIDs(ctx context.Context, dids []string) ([]*aggregators.Aggregator, error) { 125 if len(dids) == 0 { 126 return []*aggregators.Aggregator{}, nil 127 } 128 129 // Build IN clause with placeholders 130 placeholders := make([]string, len(dids)) 131 args := make([]interface{}, len(dids)) 132 for i, did := range dids { 133 placeholders[i] = fmt.Sprintf("$%d", i+1) 134 args[i] = did 135 } 136 137 query := fmt.Sprintf(` 138 SELECT 139 did, display_name, description, avatar_url, config_schema, 140 maintainer_did, source_url, communities_using, posts_created, 141 created_at, indexed_at, record_uri, record_cid 142 FROM aggregators 143 WHERE did IN (%s)`, strings.Join(placeholders, ", ")) 144 145 rows, err := r.db.QueryContext(ctx, query, args...) 146 if err != nil { 147 return nil, fmt.Errorf("failed to get aggregators: %w", err) 148 } 149 defer func() { _ = rows.Close() }() 150 151 var results []*aggregators.Aggregator 152 for rows.Next() { 153 agg := &aggregators.Aggregator{} 154 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString 155 var configSchema []byte 156 157 err := rows.Scan( 158 &agg.DID, 159 &agg.DisplayName, 160 &description, 161 &avatarCID, 162 &configSchema, 163 &maintainerDID, 164 &homepageURL, 165 &agg.CommunitiesUsing, 166 &agg.PostsCreated, 167 &agg.CreatedAt, 168 &agg.IndexedAt, 169 &recordURI, 170 &recordCID, 171 ) 172 if err != nil { 173 return nil, fmt.Errorf("failed to scan aggregator: %w", err) 174 } 175 176 // Map nullable fields 177 agg.Description = description.String 178 agg.AvatarURL = avatarCID.String 179 agg.MaintainerDID = maintainerDID.String 180 agg.SourceURL = homepageURL.String 181 agg.RecordURI = recordURI.String 182 agg.RecordCID = recordCID.String 183 if configSchema != nil { 184 agg.ConfigSchema = configSchema 185 } 186 187 results = append(results, agg) 188 } 189 190 if err = rows.Err(); err != nil { 191 return nil, fmt.Errorf("error iterating aggregators: %w", err) 192 } 193 194 return results, nil 195} 196 197// UpdateAggregator updates an existing aggregator 198func (r *postgresAggregatorRepo) UpdateAggregator(ctx context.Context, agg *aggregators.Aggregator) error { 199 query := ` 200 UPDATE aggregators SET 201 display_name = $2, 202 description = $3, 203 avatar_url = $4, 204 config_schema = $5, 205 maintainer_did = $6, 206 source_url = $7, 207 created_at = $8, 208 indexed_at = $9, 209 record_uri = $10, 210 record_cid = $11 211 WHERE did = $1` 212 213 var configSchema interface{} 214 if len(agg.ConfigSchema) > 0 { 215 configSchema = agg.ConfigSchema 216 } else { 217 configSchema = nil 218 } 219 220 result, err := r.db.ExecContext(ctx, query, 221 agg.DID, 222 agg.DisplayName, 223 nullString(agg.Description), 224 nullString(agg.AvatarURL), 225 configSchema, 226 nullString(agg.MaintainerDID), 227 nullString(agg.SourceURL), 228 agg.CreatedAt, 229 agg.IndexedAt, 230 nullString(agg.RecordURI), 231 nullString(agg.RecordCID), 232 ) 233 if err != nil { 234 return fmt.Errorf("failed to update aggregator: %w", err) 235 } 236 237 rows, err := result.RowsAffected() 238 if err != nil { 239 return fmt.Errorf("failed to get rows affected: %w", err) 240 } 241 if rows == 0 { 242 return aggregators.ErrAggregatorNotFound 243 } 244 245 return nil 246} 247 248// DeleteAggregator removes an aggregator (cascade deletes authorizations and posts via FK) 249func (r *postgresAggregatorRepo) DeleteAggregator(ctx context.Context, did string) error { 250 query := `DELETE FROM aggregators WHERE did = $1` 251 252 result, err := r.db.ExecContext(ctx, query, did) 253 if err != nil { 254 return fmt.Errorf("failed to delete aggregator: %w", err) 255 } 256 257 rows, err := result.RowsAffected() 258 if err != nil { 259 return fmt.Errorf("failed to get rows affected: %w", err) 260 } 261 if rows == 0 { 262 return aggregators.ErrAggregatorNotFound 263 } 264 265 return nil 266} 267 268// ListAggregators retrieves all aggregators with pagination 269func (r *postgresAggregatorRepo) ListAggregators(ctx context.Context, limit, offset int) ([]*aggregators.Aggregator, error) { 270 query := ` 271 SELECT 272 did, display_name, description, avatar_url, config_schema, 273 maintainer_did, source_url, communities_using, posts_created, 274 created_at, indexed_at, record_uri, record_cid 275 FROM aggregators 276 ORDER BY communities_using DESC, display_name ASC 277 LIMIT $1 OFFSET $2` 278 279 rows, err := r.db.QueryContext(ctx, query, limit, offset) 280 if err != nil { 281 return nil, fmt.Errorf("failed to list aggregators: %w", err) 282 } 283 defer func() { _ = rows.Close() }() 284 285 var aggs []*aggregators.Aggregator 286 for rows.Next() { 287 agg := &aggregators.Aggregator{} 288 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString 289 var configSchema []byte 290 291 err := rows.Scan( 292 &agg.DID, 293 &agg.DisplayName, 294 &description, 295 &avatarCID, 296 &configSchema, 297 &maintainerDID, 298 &homepageURL, 299 &agg.CommunitiesUsing, 300 &agg.PostsCreated, 301 &agg.CreatedAt, 302 &agg.IndexedAt, 303 &recordURI, 304 &recordCID, 305 ) 306 if err != nil { 307 return nil, fmt.Errorf("failed to scan aggregator: %w", err) 308 } 309 310 // Map nullable fields 311 agg.Description = description.String 312 agg.AvatarURL = avatarCID.String 313 agg.MaintainerDID = maintainerDID.String 314 agg.SourceURL = homepageURL.String 315 agg.RecordURI = recordURI.String 316 agg.RecordCID = recordCID.String 317 if configSchema != nil { 318 agg.ConfigSchema = configSchema 319 } 320 321 aggs = append(aggs, agg) 322 } 323 324 if err = rows.Err(); err != nil { 325 return nil, fmt.Errorf("error iterating aggregators: %w", err) 326 } 327 328 return aggs, nil 329} 330 331// IsAggregator performs a fast existence check for post creation handler 332func (r *postgresAggregatorRepo) IsAggregator(ctx context.Context, did string) (bool, error) { 333 query := `SELECT EXISTS(SELECT 1 FROM aggregators WHERE did = $1)` 334 335 var exists bool 336 err := r.db.QueryRowContext(ctx, query, did).Scan(&exists) 337 if err != nil { 338 return false, fmt.Errorf("failed to check if aggregator exists: %w", err) 339 } 340 341 return exists, nil 342} 343 344// ===== Authorization CRUD Operations ===== 345 346// CreateAuthorization indexes a new authorization from the firehose 347func (r *postgresAggregatorRepo) CreateAuthorization(ctx context.Context, auth *aggregators.Authorization) error { 348 query := ` 349 INSERT INTO aggregator_authorizations ( 350 aggregator_did, community_did, enabled, config, 351 created_at, created_by, disabled_at, disabled_by, 352 indexed_at, record_uri, record_cid 353 ) VALUES ( 354 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 355 ) 356 ON CONFLICT (aggregator_did, community_did) DO UPDATE SET 357 enabled = EXCLUDED.enabled, 358 config = EXCLUDED.config, 359 created_at = EXCLUDED.created_at, 360 created_by = EXCLUDED.created_by, 361 disabled_at = EXCLUDED.disabled_at, 362 disabled_by = EXCLUDED.disabled_by, 363 indexed_at = EXCLUDED.indexed_at, 364 record_uri = EXCLUDED.record_uri, 365 record_cid = EXCLUDED.record_cid 366 RETURNING id` 367 368 var config interface{} 369 if len(auth.Config) > 0 { 370 config = auth.Config 371 } else { 372 config = nil 373 } 374 375 var disabledAt interface{} 376 if auth.DisabledAt != nil { 377 disabledAt = *auth.DisabledAt 378 } else { 379 disabledAt = nil 380 } 381 382 err := r.db.QueryRowContext(ctx, query, 383 auth.AggregatorDID, 384 auth.CommunityDID, 385 auth.Enabled, 386 config, 387 auth.CreatedAt, 388 auth.CreatedBy, // Required field, no nullString needed 389 disabledAt, 390 nullString(auth.DisabledBy), 391 auth.IndexedAt, 392 nullString(auth.RecordURI), 393 nullString(auth.RecordCID), 394 ).Scan(&auth.ID) 395 if err != nil { 396 // Check for foreign key violations 397 if strings.Contains(err.Error(), "fk_aggregator") { 398 return aggregators.ErrAggregatorNotFound 399 } 400 return fmt.Errorf("failed to create authorization: %w", err) 401 } 402 403 return nil 404} 405 406// GetAuthorization retrieves an authorization by aggregator and community DID 407func (r *postgresAggregatorRepo) GetAuthorization(ctx context.Context, aggregatorDID, communityDID string) (*aggregators.Authorization, error) { 408 query := ` 409 SELECT 410 id, aggregator_did, community_did, enabled, config, 411 created_at, created_by, disabled_at, disabled_by, 412 indexed_at, record_uri, record_cid 413 FROM aggregator_authorizations 414 WHERE aggregator_did = $1 AND community_did = $2` 415 416 auth := &aggregators.Authorization{} 417 var config []byte 418 var createdBy, disabledBy, recordURI, recordCID sql.NullString 419 var disabledAt sql.NullTime 420 421 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID).Scan( 422 &auth.ID, 423 &auth.AggregatorDID, 424 &auth.CommunityDID, 425 &auth.Enabled, 426 &config, 427 &auth.CreatedAt, 428 &createdBy, 429 &disabledAt, 430 &disabledBy, 431 &auth.IndexedAt, 432 &recordURI, 433 &recordCID, 434 ) 435 436 if err == sql.ErrNoRows { 437 return nil, aggregators.ErrAuthorizationNotFound 438 } 439 if err != nil { 440 return nil, fmt.Errorf("failed to get authorization: %w", err) 441 } 442 443 // Map nullable fields 444 auth.CreatedBy = createdBy.String 445 auth.DisabledBy = disabledBy.String 446 if disabledAt.Valid { 447 disabledAtVal := disabledAt.Time 448 auth.DisabledAt = &disabledAtVal 449 } 450 auth.RecordURI = recordURI.String 451 auth.RecordCID = recordCID.String 452 if config != nil { 453 auth.Config = config 454 } 455 456 return auth, nil 457} 458 459// GetAuthorizationByURI retrieves an authorization by record URI (for Jetstream delete operations) 460func (r *postgresAggregatorRepo) GetAuthorizationByURI(ctx context.Context, recordURI string) (*aggregators.Authorization, error) { 461 query := ` 462 SELECT 463 id, aggregator_did, community_did, enabled, config, 464 created_at, created_by, disabled_at, disabled_by, 465 indexed_at, record_uri, record_cid 466 FROM aggregator_authorizations 467 WHERE record_uri = $1` 468 469 auth := &aggregators.Authorization{} 470 var config []byte 471 var createdBy, disabledBy, recordURIField, recordCID sql.NullString 472 var disabledAt sql.NullTime 473 474 err := r.db.QueryRowContext(ctx, query, recordURI).Scan( 475 &auth.ID, 476 &auth.AggregatorDID, 477 &auth.CommunityDID, 478 &auth.Enabled, 479 &config, 480 &auth.CreatedAt, 481 &createdBy, 482 &disabledAt, 483 &disabledBy, 484 &auth.IndexedAt, 485 &recordURIField, 486 &recordCID, 487 ) 488 489 if err == sql.ErrNoRows { 490 return nil, aggregators.ErrAuthorizationNotFound 491 } 492 if err != nil { 493 return nil, fmt.Errorf("failed to get authorization by URI: %w", err) 494 } 495 496 // Map nullable fields 497 auth.CreatedBy = createdBy.String 498 auth.DisabledBy = disabledBy.String 499 if disabledAt.Valid { 500 disabledAtVal := disabledAt.Time 501 auth.DisabledAt = &disabledAtVal 502 } 503 auth.RecordURI = recordURIField.String 504 auth.RecordCID = recordCID.String 505 if config != nil { 506 auth.Config = config 507 } 508 509 return auth, nil 510} 511 512// UpdateAuthorization updates an existing authorization 513func (r *postgresAggregatorRepo) UpdateAuthorization(ctx context.Context, auth *aggregators.Authorization) error { 514 query := ` 515 UPDATE aggregator_authorizations SET 516 enabled = $3, 517 config = $4, 518 created_at = $5, 519 created_by = $6, 520 disabled_at = $7, 521 disabled_by = $8, 522 indexed_at = $9, 523 record_uri = $10, 524 record_cid = $11 525 WHERE aggregator_did = $1 AND community_did = $2` 526 527 var config interface{} 528 if len(auth.Config) > 0 { 529 config = auth.Config 530 } else { 531 config = nil 532 } 533 534 var disabledAt interface{} 535 if auth.DisabledAt != nil { 536 disabledAt = *auth.DisabledAt 537 } else { 538 disabledAt = nil 539 } 540 541 result, err := r.db.ExecContext(ctx, query, 542 auth.AggregatorDID, 543 auth.CommunityDID, 544 auth.Enabled, 545 config, 546 auth.CreatedAt, 547 nullString(auth.CreatedBy), 548 disabledAt, 549 nullString(auth.DisabledBy), 550 auth.IndexedAt, 551 nullString(auth.RecordURI), 552 nullString(auth.RecordCID), 553 ) 554 if err != nil { 555 return fmt.Errorf("failed to update authorization: %w", err) 556 } 557 558 rows, err := result.RowsAffected() 559 if err != nil { 560 return fmt.Errorf("failed to get rows affected: %w", err) 561 } 562 if rows == 0 { 563 return aggregators.ErrAuthorizationNotFound 564 } 565 566 return nil 567} 568 569// DeleteAuthorization removes an authorization 570func (r *postgresAggregatorRepo) DeleteAuthorization(ctx context.Context, aggregatorDID, communityDID string) error { 571 query := `DELETE FROM aggregator_authorizations WHERE aggregator_did = $1 AND community_did = $2` 572 573 result, err := r.db.ExecContext(ctx, query, aggregatorDID, communityDID) 574 if err != nil { 575 return fmt.Errorf("failed to delete authorization: %w", err) 576 } 577 578 rows, err := result.RowsAffected() 579 if err != nil { 580 return fmt.Errorf("failed to get rows affected: %w", err) 581 } 582 if rows == 0 { 583 return aggregators.ErrAuthorizationNotFound 584 } 585 586 return nil 587} 588 589// DeleteAuthorizationByURI removes an authorization by record URI (for Jetstream delete operations) 590func (r *postgresAggregatorRepo) DeleteAuthorizationByURI(ctx context.Context, recordURI string) error { 591 query := `DELETE FROM aggregator_authorizations WHERE record_uri = $1` 592 593 result, err := r.db.ExecContext(ctx, query, recordURI) 594 if err != nil { 595 return fmt.Errorf("failed to delete authorization by URI: %w", err) 596 } 597 598 rows, err := result.RowsAffected() 599 if err != nil { 600 return fmt.Errorf("failed to get rows affected: %w", err) 601 } 602 if rows == 0 { 603 return aggregators.ErrAuthorizationNotFound 604 } 605 606 return nil 607} 608 609// ===== Authorization Query Operations ===== 610 611// ListAuthorizationsForAggregator retrieves all communities that authorized an aggregator 612func (r *postgresAggregatorRepo) ListAuthorizationsForAggregator(ctx context.Context, aggregatorDID string, enabledOnly bool, limit, offset int) ([]*aggregators.Authorization, error) { 613 baseQuery := ` 614 SELECT 615 id, aggregator_did, community_did, enabled, config, 616 created_at, created_by, disabled_at, disabled_by, 617 indexed_at, record_uri, record_cid 618 FROM aggregator_authorizations 619 WHERE aggregator_did = $1` 620 621 var query string 622 var args []interface{} 623 624 if enabledOnly { 625 query = baseQuery + ` AND enabled = true ORDER BY created_at DESC LIMIT $2 OFFSET $3` 626 args = []interface{}{aggregatorDID, limit, offset} 627 } else { 628 query = baseQuery + ` ORDER BY created_at DESC LIMIT $2 OFFSET $3` 629 args = []interface{}{aggregatorDID, limit, offset} 630 } 631 632 rows, err := r.db.QueryContext(ctx, query, args...) 633 if err != nil { 634 return nil, fmt.Errorf("failed to list authorizations for aggregator: %w", err) 635 } 636 defer func() { _ = rows.Close() }() 637 638 return scanAuthorizations(rows) 639} 640 641// ListAuthorizationsForCommunity retrieves all aggregators authorized by a community 642func (r *postgresAggregatorRepo) ListAuthorizationsForCommunity(ctx context.Context, communityDID string, enabledOnly bool, limit, offset int) ([]*aggregators.Authorization, error) { 643 baseQuery := ` 644 SELECT 645 id, aggregator_did, community_did, enabled, config, 646 created_at, created_by, disabled_at, disabled_by, 647 indexed_at, record_uri, record_cid 648 FROM aggregator_authorizations 649 WHERE community_did = $1` 650 651 var query string 652 var args []interface{} 653 654 if enabledOnly { 655 query = baseQuery + ` AND enabled = true ORDER BY created_at DESC LIMIT $2 OFFSET $3` 656 args = []interface{}{communityDID, limit, offset} 657 } else { 658 query = baseQuery + ` ORDER BY created_at DESC LIMIT $2 OFFSET $3` 659 args = []interface{}{communityDID, limit, offset} 660 } 661 662 rows, err := r.db.QueryContext(ctx, query, args...) 663 if err != nil { 664 return nil, fmt.Errorf("failed to list authorizations for community: %w", err) 665 } 666 defer func() { _ = rows.Close() }() 667 668 return scanAuthorizations(rows) 669} 670 671// IsAuthorized performs a fast authorization check (enabled=true) 672// Uses the optimized partial index: idx_aggregator_auth_enabled 673func (r *postgresAggregatorRepo) IsAuthorized(ctx context.Context, aggregatorDID, communityDID string) (bool, error) { 674 query := ` 675 SELECT EXISTS( 676 SELECT 1 FROM aggregator_authorizations 677 WHERE aggregator_did = $1 AND community_did = $2 AND enabled = true 678 )` 679 680 var authorized bool 681 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID).Scan(&authorized) 682 if err != nil { 683 return false, fmt.Errorf("failed to check authorization: %w", err) 684 } 685 686 return authorized, nil 687} 688 689// ===== Post Tracking Operations ===== 690 691// RecordAggregatorPost tracks a post created by an aggregator (for rate limiting and stats) 692func (r *postgresAggregatorRepo) RecordAggregatorPost(ctx context.Context, aggregatorDID, communityDID, postURI, postCID string) error { 693 query := ` 694 INSERT INTO aggregator_posts (aggregator_did, community_did, post_uri, post_cid, created_at) 695 VALUES ($1, $2, $3, $4, NOW())` 696 697 _, err := r.db.ExecContext(ctx, query, aggregatorDID, communityDID, postURI, postCID) 698 if err != nil { 699 return fmt.Errorf("failed to record aggregator post: %w", err) 700 } 701 702 return nil 703} 704 705// CountRecentPosts counts posts created by an aggregator in a community since a given time 706// Uses the optimized index: idx_aggregator_posts_rate_limit 707func (r *postgresAggregatorRepo) CountRecentPosts(ctx context.Context, aggregatorDID, communityDID string, since time.Time) (int, error) { 708 query := ` 709 SELECT COUNT(*) 710 FROM aggregator_posts 711 WHERE aggregator_did = $1 AND community_did = $2 AND created_at >= $3` 712 713 var count int 714 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID, since).Scan(&count) 715 if err != nil { 716 return 0, fmt.Errorf("failed to count recent posts: %w", err) 717 } 718 719 return count, nil 720} 721 722// GetRecentPosts retrieves recent posts created by an aggregator in a community 723func (r *postgresAggregatorRepo) GetRecentPosts(ctx context.Context, aggregatorDID, communityDID string, since time.Time) ([]*aggregators.AggregatorPost, error) { 724 query := ` 725 SELECT id, aggregator_did, community_did, post_uri, created_at 726 FROM aggregator_posts 727 WHERE aggregator_did = $1 AND community_did = $2 AND created_at >= $3 728 ORDER BY created_at DESC` 729 730 rows, err := r.db.QueryContext(ctx, query, aggregatorDID, communityDID, since) 731 if err != nil { 732 return nil, fmt.Errorf("failed to get recent posts: %w", err) 733 } 734 defer func() { _ = rows.Close() }() 735 736 var posts []*aggregators.AggregatorPost 737 for rows.Next() { 738 post := &aggregators.AggregatorPost{} 739 err := rows.Scan( 740 &post.ID, 741 &post.AggregatorDID, 742 &post.CommunityDID, 743 &post.PostURI, 744 &post.CreatedAt, 745 ) 746 if err != nil { 747 return nil, fmt.Errorf("failed to scan aggregator post: %w", err) 748 } 749 posts = append(posts, post) 750 } 751 752 if err = rows.Err(); err != nil { 753 return nil, fmt.Errorf("error iterating aggregator posts: %w", err) 754 } 755 756 return posts, nil 757} 758 759// ===== Helper Functions ===== 760 761// scanAuthorizations is a helper to scan multiple authorization rows 762func scanAuthorizations(rows *sql.Rows) ([]*aggregators.Authorization, error) { 763 var auths []*aggregators.Authorization 764 765 for rows.Next() { 766 auth := &aggregators.Authorization{} 767 var config []byte 768 var createdBy, disabledBy, recordURI, recordCID sql.NullString 769 var disabledAt sql.NullTime 770 771 err := rows.Scan( 772 &auth.ID, 773 &auth.AggregatorDID, 774 &auth.CommunityDID, 775 &auth.Enabled, 776 &config, 777 &auth.CreatedAt, 778 &createdBy, 779 &disabledAt, 780 &disabledBy, 781 &auth.IndexedAt, 782 &recordURI, 783 &recordCID, 784 ) 785 if err != nil { 786 return nil, fmt.Errorf("failed to scan authorization: %w", err) 787 } 788 789 // Map nullable fields 790 auth.CreatedBy = createdBy.String 791 auth.DisabledBy = disabledBy.String 792 if disabledAt.Valid { 793 disabledAtVal := disabledAt.Time 794 auth.DisabledAt = &disabledAtVal 795 } 796 auth.RecordURI = recordURI.String 797 auth.RecordCID = recordCID.String 798 if config != nil { 799 auth.Config = config 800 } 801 802 auths = append(auths, auth) 803 } 804 805 if err := rows.Err(); err != nil { 806 return nil, fmt.Errorf("error iterating authorizations: %w", err) 807 } 808 809 return auths, nil 810}