A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/aggregators" 5 "context" 6 "database/sql" 7 "fmt" 8 "strings" 9 "time" 10) 11 12type postgresAggregatorRepo struct { 13 db *sql.DB 14} 15 16// NewAggregatorRepository creates a new PostgreSQL aggregator repository 17func NewAggregatorRepository(db *sql.DB) aggregators.Repository { 18 return &postgresAggregatorRepo{db: db} 19} 20 21// ===== Aggregator CRUD Operations ===== 22 23// CreateAggregator indexes a new aggregator service declaration from the firehose 24func (r *postgresAggregatorRepo) CreateAggregator(ctx context.Context, agg *aggregators.Aggregator) error { 25 query := ` 26 INSERT INTO aggregators ( 27 did, display_name, description, avatar_url, config_schema, 28 maintainer_did, source_url, created_at, indexed_at, record_uri, record_cid 29 ) VALUES ( 30 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 31 ) 32 ON CONFLICT (did) DO UPDATE SET 33 display_name = EXCLUDED.display_name, 34 description = EXCLUDED.description, 35 avatar_url = EXCLUDED.avatar_url, 36 config_schema = EXCLUDED.config_schema, 37 maintainer_did = EXCLUDED.maintainer_did, 38 source_url = EXCLUDED.source_url, 39 created_at = EXCLUDED.created_at, 40 indexed_at = EXCLUDED.indexed_at, 41 record_uri = EXCLUDED.record_uri, 42 record_cid = EXCLUDED.record_cid` 43 44 var configSchema interface{} 45 if len(agg.ConfigSchema) > 0 { 46 configSchema = agg.ConfigSchema 47 } else { 48 configSchema = nil 49 } 50 51 _, err := r.db.ExecContext(ctx, query, 52 agg.DID, 53 agg.DisplayName, 54 nullString(agg.Description), 55 nullString(agg.AvatarURL), 56 configSchema, 57 nullString(agg.MaintainerDID), 58 nullString(agg.SourceURL), 59 agg.CreatedAt, 60 agg.IndexedAt, 61 nullString(agg.RecordURI), 62 nullString(agg.RecordCID), 63 ) 64 65 if err != nil { 66 return fmt.Errorf("failed to create aggregator: %w", err) 67 } 68 69 return nil 70} 71 72// GetAggregator retrieves an aggregator by DID 73func (r *postgresAggregatorRepo) GetAggregator(ctx context.Context, did string) (*aggregators.Aggregator, error) { 74 query := ` 75 SELECT 76 did, display_name, description, avatar_url, config_schema, 77 maintainer_did, source_url, communities_using, posts_created, 78 created_at, indexed_at, record_uri, record_cid 79 FROM aggregators 80 WHERE did = $1` 81 82 agg := &aggregators.Aggregator{} 83 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString 84 var configSchema []byte 85 86 err := r.db.QueryRowContext(ctx, query, did).Scan( 87 &agg.DID, 88 &agg.DisplayName, 89 &description, 90 &avatarCID, 91 &configSchema, 92 &maintainerDID, 93 &homepageURL, 94 &agg.CommunitiesUsing, 95 &agg.PostsCreated, 96 &agg.CreatedAt, 97 &agg.IndexedAt, 98 &recordURI, 99 &recordCID, 100 ) 101 102 if err == sql.ErrNoRows { 103 return nil, aggregators.ErrAggregatorNotFound 104 } 105 if err != nil { 106 return nil, fmt.Errorf("failed to get aggregator: %w", err) 107 } 108 109 // Map nullable fields 110 agg.Description = description.String 111 agg.AvatarURL = avatarCID.String 112 agg.MaintainerDID = maintainerDID.String 113 agg.SourceURL = homepageURL.String 114 agg.RecordURI = recordURI.String 115 agg.RecordCID = recordCID.String 116 if configSchema != nil { 117 agg.ConfigSchema = configSchema 118 } 119 120 return agg, nil 121} 122 123// GetAggregatorsByDIDs retrieves multiple aggregators by DIDs in a single query (avoids N+1) 124func (r *postgresAggregatorRepo) GetAggregatorsByDIDs(ctx context.Context, dids []string) ([]*aggregators.Aggregator, error) { 125 if len(dids) == 0 { 126 return []*aggregators.Aggregator{}, nil 127 } 128 129 // Build IN clause with placeholders 130 placeholders := make([]string, len(dids)) 131 args := make([]interface{}, len(dids)) 132 for i, did := range dids { 133 placeholders[i] = fmt.Sprintf("$%d", i+1) 134 args[i] = did 135 } 136 137 query := fmt.Sprintf(` 138 SELECT 139 did, display_name, description, avatar_url, config_schema, 140 maintainer_did, source_url, communities_using, posts_created, 141 created_at, indexed_at, record_uri, record_cid 142 FROM aggregators 143 WHERE did IN (%s)`, strings.Join(placeholders, ", ")) 144 145 rows, err := r.db.QueryContext(ctx, query, args...) 146 if err != nil { 147 return nil, fmt.Errorf("failed to get aggregators: %w", err) 148 } 149 defer rows.Close() 150 151 var results []*aggregators.Aggregator 152 for rows.Next() { 153 agg := &aggregators.Aggregator{} 154 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString 155 var configSchema []byte 156 157 err := rows.Scan( 158 &agg.DID, 159 &agg.DisplayName, 160 &description, 161 &avatarCID, 162 &configSchema, 163 &maintainerDID, 164 &homepageURL, 165 &agg.CommunitiesUsing, 166 &agg.PostsCreated, 167 &agg.CreatedAt, 168 &agg.IndexedAt, 169 &recordURI, 170 &recordCID, 171 ) 172 if err != nil { 173 return nil, fmt.Errorf("failed to scan aggregator: %w", err) 174 } 175 176 // Map nullable fields 177 agg.Description = description.String 178 agg.AvatarURL = avatarCID.String 179 agg.MaintainerDID = maintainerDID.String 180 agg.SourceURL = homepageURL.String 181 agg.RecordURI = recordURI.String 182 agg.RecordCID = recordCID.String 183 if configSchema != nil { 184 agg.ConfigSchema = configSchema 185 } 186 187 results = append(results, agg) 188 } 189 190 if err = rows.Err(); err != nil { 191 return nil, fmt.Errorf("error iterating aggregators: %w", err) 192 } 193 194 return results, nil 195} 196 197// UpdateAggregator updates an existing aggregator 198func (r *postgresAggregatorRepo) UpdateAggregator(ctx context.Context, agg *aggregators.Aggregator) error { 199 query := ` 200 UPDATE aggregators SET 201 display_name = $2, 202 description = $3, 203 avatar_url = $4, 204 config_schema = $5, 205 maintainer_did = $6, 206 source_url = $7, 207 created_at = $8, 208 indexed_at = $9, 209 record_uri = $10, 210 record_cid = $11 211 WHERE did = $1` 212 213 var configSchema interface{} 214 if len(agg.ConfigSchema) > 0 { 215 configSchema = agg.ConfigSchema 216 } else { 217 configSchema = nil 218 } 219 220 result, err := r.db.ExecContext(ctx, query, 221 agg.DID, 222 agg.DisplayName, 223 nullString(agg.Description), 224 nullString(agg.AvatarURL), 225 configSchema, 226 nullString(agg.MaintainerDID), 227 nullString(agg.SourceURL), 228 agg.CreatedAt, 229 agg.IndexedAt, 230 nullString(agg.RecordURI), 231 nullString(agg.RecordCID), 232 ) 233 234 if err != nil { 235 return fmt.Errorf("failed to update aggregator: %w", err) 236 } 237 238 rows, err := result.RowsAffected() 239 if err != nil { 240 return fmt.Errorf("failed to get rows affected: %w", err) 241 } 242 if rows == 0 { 243 return aggregators.ErrAggregatorNotFound 244 } 245 246 return nil 247} 248 249// DeleteAggregator removes an aggregator (cascade deletes authorizations and posts via FK) 250func (r *postgresAggregatorRepo) DeleteAggregator(ctx context.Context, did string) error { 251 query := `DELETE FROM aggregators WHERE did = $1` 252 253 result, err := r.db.ExecContext(ctx, query, did) 254 if err != nil { 255 return fmt.Errorf("failed to delete aggregator: %w", err) 256 } 257 258 rows, err := result.RowsAffected() 259 if err != nil { 260 return fmt.Errorf("failed to get rows affected: %w", err) 261 } 262 if rows == 0 { 263 return aggregators.ErrAggregatorNotFound 264 } 265 266 return nil 267} 268 269// ListAggregators retrieves all aggregators with pagination 270func (r *postgresAggregatorRepo) ListAggregators(ctx context.Context, limit, offset int) ([]*aggregators.Aggregator, error) { 271 query := ` 272 SELECT 273 did, display_name, description, avatar_url, config_schema, 274 maintainer_did, source_url, communities_using, posts_created, 275 created_at, indexed_at, record_uri, record_cid 276 FROM aggregators 277 ORDER BY communities_using DESC, display_name ASC 278 LIMIT $1 OFFSET $2` 279 280 rows, err := r.db.QueryContext(ctx, query, limit, offset) 281 if err != nil { 282 return nil, fmt.Errorf("failed to list aggregators: %w", err) 283 } 284 defer rows.Close() 285 286 var aggs []*aggregators.Aggregator 287 for rows.Next() { 288 agg := &aggregators.Aggregator{} 289 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString 290 var configSchema []byte 291 292 err := rows.Scan( 293 &agg.DID, 294 &agg.DisplayName, 295 &description, 296 &avatarCID, 297 &configSchema, 298 &maintainerDID, 299 &homepageURL, 300 &agg.CommunitiesUsing, 301 &agg.PostsCreated, 302 &agg.CreatedAt, 303 &agg.IndexedAt, 304 &recordURI, 305 &recordCID, 306 ) 307 if err != nil { 308 return nil, fmt.Errorf("failed to scan aggregator: %w", err) 309 } 310 311 // Map nullable fields 312 agg.Description = description.String 313 agg.AvatarURL = avatarCID.String 314 agg.MaintainerDID = maintainerDID.String 315 agg.SourceURL = homepageURL.String 316 agg.RecordURI = recordURI.String 317 agg.RecordCID = recordCID.String 318 if configSchema != nil { 319 agg.ConfigSchema = configSchema 320 } 321 322 aggs = append(aggs, agg) 323 } 324 325 if err = rows.Err(); err != nil { 326 return nil, fmt.Errorf("error iterating aggregators: %w", err) 327 } 328 329 return aggs, nil 330} 331 332// IsAggregator performs a fast existence check for post creation handler 333func (r *postgresAggregatorRepo) IsAggregator(ctx context.Context, did string) (bool, error) { 334 query := `SELECT EXISTS(SELECT 1 FROM aggregators WHERE did = $1)` 335 336 var exists bool 337 err := r.db.QueryRowContext(ctx, query, did).Scan(&exists) 338 if err != nil { 339 return false, fmt.Errorf("failed to check if aggregator exists: %w", err) 340 } 341 342 return exists, nil 343} 344 345// ===== Authorization CRUD Operations ===== 346 347// CreateAuthorization indexes a new authorization from the firehose 348func (r *postgresAggregatorRepo) CreateAuthorization(ctx context.Context, auth *aggregators.Authorization) error { 349 query := ` 350 INSERT INTO aggregator_authorizations ( 351 aggregator_did, community_did, enabled, config, 352 created_at, created_by, disabled_at, disabled_by, 353 indexed_at, record_uri, record_cid 354 ) VALUES ( 355 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 356 ) 357 ON CONFLICT (aggregator_did, community_did) DO UPDATE SET 358 enabled = EXCLUDED.enabled, 359 config = EXCLUDED.config, 360 created_at = EXCLUDED.created_at, 361 created_by = EXCLUDED.created_by, 362 disabled_at = EXCLUDED.disabled_at, 363 disabled_by = EXCLUDED.disabled_by, 364 indexed_at = EXCLUDED.indexed_at, 365 record_uri = EXCLUDED.record_uri, 366 record_cid = EXCLUDED.record_cid 367 RETURNING id` 368 369 var config interface{} 370 if len(auth.Config) > 0 { 371 config = auth.Config 372 } else { 373 config = nil 374 } 375 376 var disabledAt interface{} 377 if auth.DisabledAt != nil { 378 disabledAt = *auth.DisabledAt 379 } else { 380 disabledAt = nil 381 } 382 383 err := r.db.QueryRowContext(ctx, query, 384 auth.AggregatorDID, 385 auth.CommunityDID, 386 auth.Enabled, 387 config, 388 auth.CreatedAt, 389 auth.CreatedBy, // Required field, no nullString needed 390 disabledAt, 391 nullString(auth.DisabledBy), 392 auth.IndexedAt, 393 nullString(auth.RecordURI), 394 nullString(auth.RecordCID), 395 ).Scan(&auth.ID) 396 397 if err != nil { 398 // Check for foreign key violations 399 if strings.Contains(err.Error(), "fk_aggregator") { 400 return aggregators.ErrAggregatorNotFound 401 } 402 return fmt.Errorf("failed to create authorization: %w", err) 403 } 404 405 return nil 406} 407 408// GetAuthorization retrieves an authorization by aggregator and community DID 409func (r *postgresAggregatorRepo) GetAuthorization(ctx context.Context, aggregatorDID, communityDID string) (*aggregators.Authorization, error) { 410 query := ` 411 SELECT 412 id, aggregator_did, community_did, enabled, config, 413 created_at, created_by, disabled_at, disabled_by, 414 indexed_at, record_uri, record_cid 415 FROM aggregator_authorizations 416 WHERE aggregator_did = $1 AND community_did = $2` 417 418 auth := &aggregators.Authorization{} 419 var config []byte 420 var createdBy, disabledBy, recordURI, recordCID sql.NullString 421 var disabledAt sql.NullTime 422 423 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID).Scan( 424 &auth.ID, 425 &auth.AggregatorDID, 426 &auth.CommunityDID, 427 &auth.Enabled, 428 &config, 429 &auth.CreatedAt, 430 &createdBy, 431 &disabledAt, 432 &disabledBy, 433 &auth.IndexedAt, 434 &recordURI, 435 &recordCID, 436 ) 437 438 if err == sql.ErrNoRows { 439 return nil, aggregators.ErrAuthorizationNotFound 440 } 441 if err != nil { 442 return nil, fmt.Errorf("failed to get authorization: %w", err) 443 } 444 445 // Map nullable fields 446 auth.CreatedBy = createdBy.String 447 auth.DisabledBy = disabledBy.String 448 if disabledAt.Valid { 449 disabledAtVal := disabledAt.Time 450 auth.DisabledAt = &disabledAtVal 451 } 452 auth.RecordURI = recordURI.String 453 auth.RecordCID = recordCID.String 454 if config != nil { 455 auth.Config = config 456 } 457 458 return auth, nil 459} 460 461// GetAuthorizationByURI retrieves an authorization by record URI (for Jetstream delete operations) 462func (r *postgresAggregatorRepo) GetAuthorizationByURI(ctx context.Context, recordURI string) (*aggregators.Authorization, error) { 463 query := ` 464 SELECT 465 id, aggregator_did, community_did, enabled, config, 466 created_at, created_by, disabled_at, disabled_by, 467 indexed_at, record_uri, record_cid 468 FROM aggregator_authorizations 469 WHERE record_uri = $1` 470 471 auth := &aggregators.Authorization{} 472 var config []byte 473 var createdBy, disabledBy, recordURIField, recordCID sql.NullString 474 var disabledAt sql.NullTime 475 476 err := r.db.QueryRowContext(ctx, query, recordURI).Scan( 477 &auth.ID, 478 &auth.AggregatorDID, 479 &auth.CommunityDID, 480 &auth.Enabled, 481 &config, 482 &auth.CreatedAt, 483 &createdBy, 484 &disabledAt, 485 &disabledBy, 486 &auth.IndexedAt, 487 &recordURIField, 488 &recordCID, 489 ) 490 491 if err == sql.ErrNoRows { 492 return nil, aggregators.ErrAuthorizationNotFound 493 } 494 if err != nil { 495 return nil, fmt.Errorf("failed to get authorization by URI: %w", err) 496 } 497 498 // Map nullable fields 499 auth.CreatedBy = createdBy.String 500 auth.DisabledBy = disabledBy.String 501 if disabledAt.Valid { 502 disabledAtVal := disabledAt.Time 503 auth.DisabledAt = &disabledAtVal 504 } 505 auth.RecordURI = recordURIField.String 506 auth.RecordCID = recordCID.String 507 if config != nil { 508 auth.Config = config 509 } 510 511 return auth, nil 512} 513 514// UpdateAuthorization updates an existing authorization 515func (r *postgresAggregatorRepo) UpdateAuthorization(ctx context.Context, auth *aggregators.Authorization) error { 516 query := ` 517 UPDATE aggregator_authorizations SET 518 enabled = $3, 519 config = $4, 520 created_at = $5, 521 created_by = $6, 522 disabled_at = $7, 523 disabled_by = $8, 524 indexed_at = $9, 525 record_uri = $10, 526 record_cid = $11 527 WHERE aggregator_did = $1 AND community_did = $2` 528 529 var config interface{} 530 if len(auth.Config) > 0 { 531 config = auth.Config 532 } else { 533 config = nil 534 } 535 536 var disabledAt interface{} 537 if auth.DisabledAt != nil { 538 disabledAt = *auth.DisabledAt 539 } else { 540 disabledAt = nil 541 } 542 543 result, err := r.db.ExecContext(ctx, query, 544 auth.AggregatorDID, 545 auth.CommunityDID, 546 auth.Enabled, 547 config, 548 auth.CreatedAt, 549 nullString(auth.CreatedBy), 550 disabledAt, 551 nullString(auth.DisabledBy), 552 auth.IndexedAt, 553 nullString(auth.RecordURI), 554 nullString(auth.RecordCID), 555 ) 556 557 if err != nil { 558 return fmt.Errorf("failed to update authorization: %w", err) 559 } 560 561 rows, err := result.RowsAffected() 562 if err != nil { 563 return fmt.Errorf("failed to get rows affected: %w", err) 564 } 565 if rows == 0 { 566 return aggregators.ErrAuthorizationNotFound 567 } 568 569 return nil 570} 571 572// DeleteAuthorization removes an authorization 573func (r *postgresAggregatorRepo) DeleteAuthorization(ctx context.Context, aggregatorDID, communityDID string) error { 574 query := `DELETE FROM aggregator_authorizations WHERE aggregator_did = $1 AND community_did = $2` 575 576 result, err := r.db.ExecContext(ctx, query, aggregatorDID, communityDID) 577 if err != nil { 578 return fmt.Errorf("failed to delete authorization: %w", err) 579 } 580 581 rows, err := result.RowsAffected() 582 if err != nil { 583 return fmt.Errorf("failed to get rows affected: %w", err) 584 } 585 if rows == 0 { 586 return aggregators.ErrAuthorizationNotFound 587 } 588 589 return nil 590} 591 592// DeleteAuthorizationByURI removes an authorization by record URI (for Jetstream delete operations) 593func (r *postgresAggregatorRepo) DeleteAuthorizationByURI(ctx context.Context, recordURI string) error { 594 query := `DELETE FROM aggregator_authorizations WHERE record_uri = $1` 595 596 result, err := r.db.ExecContext(ctx, query, recordURI) 597 if err != nil { 598 return fmt.Errorf("failed to delete authorization by URI: %w", err) 599 } 600 601 rows, err := result.RowsAffected() 602 if err != nil { 603 return fmt.Errorf("failed to get rows affected: %w", err) 604 } 605 if rows == 0 { 606 return aggregators.ErrAuthorizationNotFound 607 } 608 609 return nil 610} 611 612// ===== Authorization Query Operations ===== 613 614// ListAuthorizationsForAggregator retrieves all communities that authorized an aggregator 615func (r *postgresAggregatorRepo) ListAuthorizationsForAggregator(ctx context.Context, aggregatorDID string, enabledOnly bool, limit, offset int) ([]*aggregators.Authorization, error) { 616 baseQuery := ` 617 SELECT 618 id, aggregator_did, community_did, enabled, config, 619 created_at, created_by, disabled_at, disabled_by, 620 indexed_at, record_uri, record_cid 621 FROM aggregator_authorizations 622 WHERE aggregator_did = $1` 623 624 var query string 625 var args []interface{} 626 627 if enabledOnly { 628 query = baseQuery + ` AND enabled = true ORDER BY created_at DESC LIMIT $2 OFFSET $3` 629 args = []interface{}{aggregatorDID, limit, offset} 630 } else { 631 query = baseQuery + ` ORDER BY created_at DESC LIMIT $2 OFFSET $3` 632 args = []interface{}{aggregatorDID, limit, offset} 633 } 634 635 rows, err := r.db.QueryContext(ctx, query, args...) 636 if err != nil { 637 return nil, fmt.Errorf("failed to list authorizations for aggregator: %w", err) 638 } 639 defer rows.Close() 640 641 return scanAuthorizations(rows) 642} 643 644// ListAuthorizationsForCommunity retrieves all aggregators authorized by a community 645func (r *postgresAggregatorRepo) ListAuthorizationsForCommunity(ctx context.Context, communityDID string, enabledOnly bool, limit, offset int) ([]*aggregators.Authorization, error) { 646 baseQuery := ` 647 SELECT 648 id, aggregator_did, community_did, enabled, config, 649 created_at, created_by, disabled_at, disabled_by, 650 indexed_at, record_uri, record_cid 651 FROM aggregator_authorizations 652 WHERE community_did = $1` 653 654 var query string 655 var args []interface{} 656 657 if enabledOnly { 658 query = baseQuery + ` AND enabled = true ORDER BY created_at DESC LIMIT $2 OFFSET $3` 659 args = []interface{}{communityDID, limit, offset} 660 } else { 661 query = baseQuery + ` ORDER BY created_at DESC LIMIT $2 OFFSET $3` 662 args = []interface{}{communityDID, limit, offset} 663 } 664 665 rows, err := r.db.QueryContext(ctx, query, args...) 666 if err != nil { 667 return nil, fmt.Errorf("failed to list authorizations for community: %w", err) 668 } 669 defer rows.Close() 670 671 return scanAuthorizations(rows) 672} 673 674// IsAuthorized performs a fast authorization check (enabled=true) 675// Uses the optimized partial index: idx_aggregator_auth_enabled 676func (r *postgresAggregatorRepo) IsAuthorized(ctx context.Context, aggregatorDID, communityDID string) (bool, error) { 677 query := ` 678 SELECT EXISTS( 679 SELECT 1 FROM aggregator_authorizations 680 WHERE aggregator_did = $1 AND community_did = $2 AND enabled = true 681 )` 682 683 var authorized bool 684 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID).Scan(&authorized) 685 if err != nil { 686 return false, fmt.Errorf("failed to check authorization: %w", err) 687 } 688 689 return authorized, nil 690} 691 692// ===== Post Tracking Operations ===== 693 694// RecordAggregatorPost tracks a post created by an aggregator (for rate limiting and stats) 695func (r *postgresAggregatorRepo) RecordAggregatorPost(ctx context.Context, aggregatorDID, communityDID, postURI, postCID string) error { 696 query := ` 697 INSERT INTO aggregator_posts (aggregator_did, community_did, post_uri, post_cid, created_at) 698 VALUES ($1, $2, $3, $4, NOW())` 699 700 _, err := r.db.ExecContext(ctx, query, aggregatorDID, communityDID, postURI, postCID) 701 if err != nil { 702 return fmt.Errorf("failed to record aggregator post: %w", err) 703 } 704 705 return nil 706} 707 708// CountRecentPosts counts posts created by an aggregator in a community since a given time 709// Uses the optimized index: idx_aggregator_posts_rate_limit 710func (r *postgresAggregatorRepo) CountRecentPosts(ctx context.Context, aggregatorDID, communityDID string, since time.Time) (int, error) { 711 query := ` 712 SELECT COUNT(*) 713 FROM aggregator_posts 714 WHERE aggregator_did = $1 AND community_did = $2 AND created_at >= $3` 715 716 var count int 717 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID, since).Scan(&count) 718 if err != nil { 719 return 0, fmt.Errorf("failed to count recent posts: %w", err) 720 } 721 722 return count, nil 723} 724 725// GetRecentPosts retrieves recent posts created by an aggregator in a community 726func (r *postgresAggregatorRepo) GetRecentPosts(ctx context.Context, aggregatorDID, communityDID string, since time.Time) ([]*aggregators.AggregatorPost, error) { 727 query := ` 728 SELECT id, aggregator_did, community_did, post_uri, created_at 729 FROM aggregator_posts 730 WHERE aggregator_did = $1 AND community_did = $2 AND created_at >= $3 731 ORDER BY created_at DESC` 732 733 rows, err := r.db.QueryContext(ctx, query, aggregatorDID, communityDID, since) 734 if err != nil { 735 return nil, fmt.Errorf("failed to get recent posts: %w", err) 736 } 737 defer rows.Close() 738 739 var posts []*aggregators.AggregatorPost 740 for rows.Next() { 741 post := &aggregators.AggregatorPost{} 742 err := rows.Scan( 743 &post.ID, 744 &post.AggregatorDID, 745 &post.CommunityDID, 746 &post.PostURI, 747 &post.CreatedAt, 748 ) 749 if err != nil { 750 return nil, fmt.Errorf("failed to scan aggregator post: %w", err) 751 } 752 posts = append(posts, post) 753 } 754 755 if err = rows.Err(); err != nil { 756 return nil, fmt.Errorf("error iterating aggregator posts: %w", err) 757 } 758 759 return posts, nil 760} 761 762// ===== Helper Functions ===== 763 764// scanAuthorizations is a helper to scan multiple authorization rows 765func scanAuthorizations(rows *sql.Rows) ([]*aggregators.Authorization, error) { 766 var auths []*aggregators.Authorization 767 768 for rows.Next() { 769 auth := &aggregators.Authorization{} 770 var config []byte 771 var createdBy, disabledBy, recordURI, recordCID sql.NullString 772 var disabledAt sql.NullTime 773 774 err := rows.Scan( 775 &auth.ID, 776 &auth.AggregatorDID, 777 &auth.CommunityDID, 778 &auth.Enabled, 779 &config, 780 &auth.CreatedAt, 781 &createdBy, 782 &disabledAt, 783 &disabledBy, 784 &auth.IndexedAt, 785 &recordURI, 786 &recordCID, 787 ) 788 if err != nil { 789 return nil, fmt.Errorf("failed to scan authorization: %w", err) 790 } 791 792 // Map nullable fields 793 auth.CreatedBy = createdBy.String 794 auth.DisabledBy = disabledBy.String 795 if disabledAt.Valid { 796 disabledAtVal := disabledAt.Time 797 auth.DisabledAt = &disabledAtVal 798 } 799 auth.RecordURI = recordURI.String 800 auth.RecordCID = recordCID.String 801 if config != nil { 802 auth.Config = config 803 } 804 805 auths = append(auths, auth) 806 } 807 808 if err := rows.Err(); err != nil { 809 return nil, fmt.Errorf("error iterating authorizations: %w", err) 810 } 811 812 return auths, nil 813}