A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/aggregators" 5 "context" 6 "database/sql" 7 "fmt" 8 "strings" 9 "time" 10) 11 12type postgresAggregatorRepo struct { 13 db *sql.DB 14} 15 16// NewAggregatorRepository creates a new PostgreSQL aggregator repository 17func NewAggregatorRepository(db *sql.DB) aggregators.Repository { 18 return &postgresAggregatorRepo{db: db} 19} 20 21// ===== Aggregator CRUD Operations ===== 22 23// CreateAggregator indexes a new aggregator service declaration from the firehose 24func (r *postgresAggregatorRepo) CreateAggregator(ctx context.Context, agg *aggregators.Aggregator) error { 25 query := ` 26 INSERT INTO aggregators ( 27 did, display_name, description, avatar_url, config_schema, 28 maintainer_did, source_url, created_at, indexed_at, record_uri, record_cid 29 ) VALUES ( 30 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 31 ) 32 ON CONFLICT (did) DO UPDATE SET 33 display_name = EXCLUDED.display_name, 34 description = EXCLUDED.description, 35 avatar_url = EXCLUDED.avatar_url, 36 config_schema = EXCLUDED.config_schema, 37 maintainer_did = EXCLUDED.maintainer_did, 38 source_url = EXCLUDED.source_url, 39 created_at = EXCLUDED.created_at, 40 indexed_at = EXCLUDED.indexed_at, 41 record_uri = EXCLUDED.record_uri, 42 record_cid = EXCLUDED.record_cid` 43 44 var configSchema interface{} 45 if len(agg.ConfigSchema) > 0 { 46 configSchema = agg.ConfigSchema 47 } else { 48 configSchema = nil 49 } 50 51 _, err := r.db.ExecContext(ctx, query, 52 agg.DID, 53 agg.DisplayName, 54 nullString(agg.Description), 55 nullString(agg.AvatarURL), 56 configSchema, 57 nullString(agg.MaintainerDID), 58 nullString(agg.SourceURL), 59 agg.CreatedAt, 60 agg.IndexedAt, 61 nullString(agg.RecordURI), 62 nullString(agg.RecordCID), 63 ) 64 if err != nil { 65 return fmt.Errorf("failed to create aggregator: %w", err) 66 } 67 68 return nil 69} 70 71// GetAggregator retrieves an aggregator by DID 72func (r *postgresAggregatorRepo) GetAggregator(ctx context.Context, did string) (*aggregators.Aggregator, error) { 73 query := ` 74 SELECT 75 did, display_name, description, avatar_url, config_schema, 76 maintainer_did, source_url, communities_using, posts_created, 77 created_at, indexed_at, record_uri, record_cid 78 FROM aggregators 79 WHERE did = $1` 80 81 agg := &aggregators.Aggregator{} 82 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString 83 var configSchema []byte 84 85 err := r.db.QueryRowContext(ctx, query, did).Scan( 86 &agg.DID, 87 &agg.DisplayName, 88 &description, 89 &avatarCID, 90 &configSchema, 91 &maintainerDID, 92 &homepageURL, 93 &agg.CommunitiesUsing, 94 &agg.PostsCreated, 95 &agg.CreatedAt, 96 &agg.IndexedAt, 97 &recordURI, 98 &recordCID, 99 ) 100 101 if err == sql.ErrNoRows { 102 return nil, aggregators.ErrAggregatorNotFound 103 } 104 if err != nil { 105 return nil, fmt.Errorf("failed to get aggregator: %w", err) 106 } 107 108 // Map nullable fields 109 agg.Description = description.String 110 agg.AvatarURL = avatarCID.String 111 agg.MaintainerDID = maintainerDID.String 112 agg.SourceURL = homepageURL.String 113 agg.RecordURI = recordURI.String 114 agg.RecordCID = recordCID.String 115 if configSchema != nil { 116 agg.ConfigSchema = configSchema 117 } 118 119 return agg, nil 120} 121 122// GetAggregatorsByDIDs retrieves multiple aggregators by DIDs in a single query (avoids N+1) 123func (r *postgresAggregatorRepo) GetAggregatorsByDIDs(ctx context.Context, dids []string) ([]*aggregators.Aggregator, error) { 124 if len(dids) == 0 { 125 return []*aggregators.Aggregator{}, nil 126 } 127 128 // Build IN clause with placeholders 129 placeholders := make([]string, len(dids)) 130 args := make([]interface{}, len(dids)) 131 for i, did := range dids { 132 placeholders[i] = fmt.Sprintf("$%d", i+1) 133 args[i] = did 134 } 135 136 query := fmt.Sprintf(` 137 SELECT 138 did, display_name, description, avatar_url, config_schema, 139 maintainer_did, source_url, communities_using, posts_created, 140 created_at, indexed_at, record_uri, record_cid 141 FROM aggregators 142 WHERE did IN (%s)`, strings.Join(placeholders, ", ")) 143 144 rows, err := r.db.QueryContext(ctx, query, args...) 145 if err != nil { 146 return nil, fmt.Errorf("failed to get aggregators: %w", err) 147 } 148 defer func() { _ = rows.Close() }() 149 150 var results []*aggregators.Aggregator 151 for rows.Next() { 152 agg := &aggregators.Aggregator{} 153 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString 154 var configSchema []byte 155 156 err := rows.Scan( 157 &agg.DID, 158 &agg.DisplayName, 159 &description, 160 &avatarCID, 161 &configSchema, 162 &maintainerDID, 163 &homepageURL, 164 &agg.CommunitiesUsing, 165 &agg.PostsCreated, 166 &agg.CreatedAt, 167 &agg.IndexedAt, 168 &recordURI, 169 &recordCID, 170 ) 171 if err != nil { 172 return nil, fmt.Errorf("failed to scan aggregator: %w", err) 173 } 174 175 // Map nullable fields 176 agg.Description = description.String 177 agg.AvatarURL = avatarCID.String 178 agg.MaintainerDID = maintainerDID.String 179 agg.SourceURL = homepageURL.String 180 agg.RecordURI = recordURI.String 181 agg.RecordCID = recordCID.String 182 if configSchema != nil { 183 agg.ConfigSchema = configSchema 184 } 185 186 results = append(results, agg) 187 } 188 189 if err = rows.Err(); err != nil { 190 return nil, fmt.Errorf("error iterating aggregators: %w", err) 191 } 192 193 return results, nil 194} 195 196// UpdateAggregator updates an existing aggregator 197func (r *postgresAggregatorRepo) UpdateAggregator(ctx context.Context, agg *aggregators.Aggregator) error { 198 query := ` 199 UPDATE aggregators SET 200 display_name = $2, 201 description = $3, 202 avatar_url = $4, 203 config_schema = $5, 204 maintainer_did = $6, 205 source_url = $7, 206 created_at = $8, 207 indexed_at = $9, 208 record_uri = $10, 209 record_cid = $11 210 WHERE did = $1` 211 212 var configSchema interface{} 213 if len(agg.ConfigSchema) > 0 { 214 configSchema = agg.ConfigSchema 215 } else { 216 configSchema = nil 217 } 218 219 result, err := r.db.ExecContext(ctx, query, 220 agg.DID, 221 agg.DisplayName, 222 nullString(agg.Description), 223 nullString(agg.AvatarURL), 224 configSchema, 225 nullString(agg.MaintainerDID), 226 nullString(agg.SourceURL), 227 agg.CreatedAt, 228 agg.IndexedAt, 229 nullString(agg.RecordURI), 230 nullString(agg.RecordCID), 231 ) 232 if err != nil { 233 return fmt.Errorf("failed to update aggregator: %w", err) 234 } 235 236 rows, err := result.RowsAffected() 237 if err != nil { 238 return fmt.Errorf("failed to get rows affected: %w", err) 239 } 240 if rows == 0 { 241 return aggregators.ErrAggregatorNotFound 242 } 243 244 return nil 245} 246 247// DeleteAggregator removes an aggregator (cascade deletes authorizations and posts via FK) 248func (r *postgresAggregatorRepo) DeleteAggregator(ctx context.Context, did string) error { 249 query := `DELETE FROM aggregators WHERE did = $1` 250 251 result, err := r.db.ExecContext(ctx, query, did) 252 if err != nil { 253 return fmt.Errorf("failed to delete aggregator: %w", err) 254 } 255 256 rows, err := result.RowsAffected() 257 if err != nil { 258 return fmt.Errorf("failed to get rows affected: %w", err) 259 } 260 if rows == 0 { 261 return aggregators.ErrAggregatorNotFound 262 } 263 264 return nil 265} 266 267// ListAggregators retrieves all aggregators with pagination 268func (r *postgresAggregatorRepo) ListAggregators(ctx context.Context, limit, offset int) ([]*aggregators.Aggregator, error) { 269 query := ` 270 SELECT 271 did, display_name, description, avatar_url, config_schema, 272 maintainer_did, source_url, communities_using, posts_created, 273 created_at, indexed_at, record_uri, record_cid 274 FROM aggregators 275 ORDER BY communities_using DESC, display_name ASC 276 LIMIT $1 OFFSET $2` 277 278 rows, err := r.db.QueryContext(ctx, query, limit, offset) 279 if err != nil { 280 return nil, fmt.Errorf("failed to list aggregators: %w", err) 281 } 282 defer func() { _ = rows.Close() }() 283 284 var aggs []*aggregators.Aggregator 285 for rows.Next() { 286 agg := &aggregators.Aggregator{} 287 var description, avatarCID, maintainerDID, homepageURL, recordURI, recordCID sql.NullString 288 var configSchema []byte 289 290 err := rows.Scan( 291 &agg.DID, 292 &agg.DisplayName, 293 &description, 294 &avatarCID, 295 &configSchema, 296 &maintainerDID, 297 &homepageURL, 298 &agg.CommunitiesUsing, 299 &agg.PostsCreated, 300 &agg.CreatedAt, 301 &agg.IndexedAt, 302 &recordURI, 303 &recordCID, 304 ) 305 if err != nil { 306 return nil, fmt.Errorf("failed to scan aggregator: %w", err) 307 } 308 309 // Map nullable fields 310 agg.Description = description.String 311 agg.AvatarURL = avatarCID.String 312 agg.MaintainerDID = maintainerDID.String 313 agg.SourceURL = homepageURL.String 314 agg.RecordURI = recordURI.String 315 agg.RecordCID = recordCID.String 316 if configSchema != nil { 317 agg.ConfigSchema = configSchema 318 } 319 320 aggs = append(aggs, agg) 321 } 322 323 if err = rows.Err(); err != nil { 324 return nil, fmt.Errorf("error iterating aggregators: %w", err) 325 } 326 327 return aggs, nil 328} 329 330// IsAggregator performs a fast existence check for post creation handler 331func (r *postgresAggregatorRepo) IsAggregator(ctx context.Context, did string) (bool, error) { 332 query := `SELECT EXISTS(SELECT 1 FROM aggregators WHERE did = $1)` 333 334 var exists bool 335 err := r.db.QueryRowContext(ctx, query, did).Scan(&exists) 336 if err != nil { 337 return false, fmt.Errorf("failed to check if aggregator exists: %w", err) 338 } 339 340 return exists, nil 341} 342 343// ===== Authorization CRUD Operations ===== 344 345// CreateAuthorization indexes a new authorization from the firehose 346func (r *postgresAggregatorRepo) CreateAuthorization(ctx context.Context, auth *aggregators.Authorization) error { 347 query := ` 348 INSERT INTO aggregator_authorizations ( 349 aggregator_did, community_did, enabled, config, 350 created_at, created_by, disabled_at, disabled_by, 351 indexed_at, record_uri, record_cid 352 ) VALUES ( 353 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 354 ) 355 ON CONFLICT (aggregator_did, community_did) DO UPDATE SET 356 enabled = EXCLUDED.enabled, 357 config = EXCLUDED.config, 358 created_at = EXCLUDED.created_at, 359 created_by = EXCLUDED.created_by, 360 disabled_at = EXCLUDED.disabled_at, 361 disabled_by = EXCLUDED.disabled_by, 362 indexed_at = EXCLUDED.indexed_at, 363 record_uri = EXCLUDED.record_uri, 364 record_cid = EXCLUDED.record_cid 365 RETURNING id` 366 367 var config interface{} 368 if len(auth.Config) > 0 { 369 config = auth.Config 370 } else { 371 config = nil 372 } 373 374 var disabledAt interface{} 375 if auth.DisabledAt != nil { 376 disabledAt = *auth.DisabledAt 377 } else { 378 disabledAt = nil 379 } 380 381 err := r.db.QueryRowContext(ctx, query, 382 auth.AggregatorDID, 383 auth.CommunityDID, 384 auth.Enabled, 385 config, 386 auth.CreatedAt, 387 auth.CreatedBy, // Required field, no nullString needed 388 disabledAt, 389 nullString(auth.DisabledBy), 390 auth.IndexedAt, 391 nullString(auth.RecordURI), 392 nullString(auth.RecordCID), 393 ).Scan(&auth.ID) 394 if err != nil { 395 // Check for foreign key violations 396 if strings.Contains(err.Error(), "fk_aggregator") { 397 return aggregators.ErrAggregatorNotFound 398 } 399 return fmt.Errorf("failed to create authorization: %w", err) 400 } 401 402 return nil 403} 404 405// GetAuthorization retrieves an authorization by aggregator and community DID 406func (r *postgresAggregatorRepo) GetAuthorization(ctx context.Context, aggregatorDID, communityDID string) (*aggregators.Authorization, error) { 407 query := ` 408 SELECT 409 id, aggregator_did, community_did, enabled, config, 410 created_at, created_by, disabled_at, disabled_by, 411 indexed_at, record_uri, record_cid 412 FROM aggregator_authorizations 413 WHERE aggregator_did = $1 AND community_did = $2` 414 415 auth := &aggregators.Authorization{} 416 var config []byte 417 var createdBy, disabledBy, recordURI, recordCID sql.NullString 418 var disabledAt sql.NullTime 419 420 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID).Scan( 421 &auth.ID, 422 &auth.AggregatorDID, 423 &auth.CommunityDID, 424 &auth.Enabled, 425 &config, 426 &auth.CreatedAt, 427 &createdBy, 428 &disabledAt, 429 &disabledBy, 430 &auth.IndexedAt, 431 &recordURI, 432 &recordCID, 433 ) 434 435 if err == sql.ErrNoRows { 436 return nil, aggregators.ErrAuthorizationNotFound 437 } 438 if err != nil { 439 return nil, fmt.Errorf("failed to get authorization: %w", err) 440 } 441 442 // Map nullable fields 443 auth.CreatedBy = createdBy.String 444 auth.DisabledBy = disabledBy.String 445 if disabledAt.Valid { 446 disabledAtVal := disabledAt.Time 447 auth.DisabledAt = &disabledAtVal 448 } 449 auth.RecordURI = recordURI.String 450 auth.RecordCID = recordCID.String 451 if config != nil { 452 auth.Config = config 453 } 454 455 return auth, nil 456} 457 458// GetAuthorizationByURI retrieves an authorization by record URI (for Jetstream delete operations) 459func (r *postgresAggregatorRepo) GetAuthorizationByURI(ctx context.Context, recordURI string) (*aggregators.Authorization, error) { 460 query := ` 461 SELECT 462 id, aggregator_did, community_did, enabled, config, 463 created_at, created_by, disabled_at, disabled_by, 464 indexed_at, record_uri, record_cid 465 FROM aggregator_authorizations 466 WHERE record_uri = $1` 467 468 auth := &aggregators.Authorization{} 469 var config []byte 470 var createdBy, disabledBy, recordURIField, recordCID sql.NullString 471 var disabledAt sql.NullTime 472 473 err := r.db.QueryRowContext(ctx, query, recordURI).Scan( 474 &auth.ID, 475 &auth.AggregatorDID, 476 &auth.CommunityDID, 477 &auth.Enabled, 478 &config, 479 &auth.CreatedAt, 480 &createdBy, 481 &disabledAt, 482 &disabledBy, 483 &auth.IndexedAt, 484 &recordURIField, 485 &recordCID, 486 ) 487 488 if err == sql.ErrNoRows { 489 return nil, aggregators.ErrAuthorizationNotFound 490 } 491 if err != nil { 492 return nil, fmt.Errorf("failed to get authorization by URI: %w", err) 493 } 494 495 // Map nullable fields 496 auth.CreatedBy = createdBy.String 497 auth.DisabledBy = disabledBy.String 498 if disabledAt.Valid { 499 disabledAtVal := disabledAt.Time 500 auth.DisabledAt = &disabledAtVal 501 } 502 auth.RecordURI = recordURIField.String 503 auth.RecordCID = recordCID.String 504 if config != nil { 505 auth.Config = config 506 } 507 508 return auth, nil 509} 510 511// UpdateAuthorization updates an existing authorization 512func (r *postgresAggregatorRepo) UpdateAuthorization(ctx context.Context, auth *aggregators.Authorization) error { 513 query := ` 514 UPDATE aggregator_authorizations SET 515 enabled = $3, 516 config = $4, 517 created_at = $5, 518 created_by = $6, 519 disabled_at = $7, 520 disabled_by = $8, 521 indexed_at = $9, 522 record_uri = $10, 523 record_cid = $11 524 WHERE aggregator_did = $1 AND community_did = $2` 525 526 var config interface{} 527 if len(auth.Config) > 0 { 528 config = auth.Config 529 } else { 530 config = nil 531 } 532 533 var disabledAt interface{} 534 if auth.DisabledAt != nil { 535 disabledAt = *auth.DisabledAt 536 } else { 537 disabledAt = nil 538 } 539 540 result, err := r.db.ExecContext(ctx, query, 541 auth.AggregatorDID, 542 auth.CommunityDID, 543 auth.Enabled, 544 config, 545 auth.CreatedAt, 546 nullString(auth.CreatedBy), 547 disabledAt, 548 nullString(auth.DisabledBy), 549 auth.IndexedAt, 550 nullString(auth.RecordURI), 551 nullString(auth.RecordCID), 552 ) 553 if err != nil { 554 return fmt.Errorf("failed to update authorization: %w", err) 555 } 556 557 rows, err := result.RowsAffected() 558 if err != nil { 559 return fmt.Errorf("failed to get rows affected: %w", err) 560 } 561 if rows == 0 { 562 return aggregators.ErrAuthorizationNotFound 563 } 564 565 return nil 566} 567 568// DeleteAuthorization removes an authorization 569func (r *postgresAggregatorRepo) DeleteAuthorization(ctx context.Context, aggregatorDID, communityDID string) error { 570 query := `DELETE FROM aggregator_authorizations WHERE aggregator_did = $1 AND community_did = $2` 571 572 result, err := r.db.ExecContext(ctx, query, aggregatorDID, communityDID) 573 if err != nil { 574 return fmt.Errorf("failed to delete authorization: %w", err) 575 } 576 577 rows, err := result.RowsAffected() 578 if err != nil { 579 return fmt.Errorf("failed to get rows affected: %w", err) 580 } 581 if rows == 0 { 582 return aggregators.ErrAuthorizationNotFound 583 } 584 585 return nil 586} 587 588// DeleteAuthorizationByURI removes an authorization by record URI (for Jetstream delete operations) 589func (r *postgresAggregatorRepo) DeleteAuthorizationByURI(ctx context.Context, recordURI string) error { 590 query := `DELETE FROM aggregator_authorizations WHERE record_uri = $1` 591 592 result, err := r.db.ExecContext(ctx, query, recordURI) 593 if err != nil { 594 return fmt.Errorf("failed to delete authorization by URI: %w", err) 595 } 596 597 rows, err := result.RowsAffected() 598 if err != nil { 599 return fmt.Errorf("failed to get rows affected: %w", err) 600 } 601 if rows == 0 { 602 return aggregators.ErrAuthorizationNotFound 603 } 604 605 return nil 606} 607 608// ===== Authorization Query Operations ===== 609 610// ListAuthorizationsForAggregator retrieves all communities that authorized an aggregator 611func (r *postgresAggregatorRepo) ListAuthorizationsForAggregator(ctx context.Context, aggregatorDID string, enabledOnly bool, limit, offset int) ([]*aggregators.Authorization, error) { 612 baseQuery := ` 613 SELECT 614 id, aggregator_did, community_did, enabled, config, 615 created_at, created_by, disabled_at, disabled_by, 616 indexed_at, record_uri, record_cid 617 FROM aggregator_authorizations 618 WHERE aggregator_did = $1` 619 620 var query string 621 var args []interface{} 622 623 if enabledOnly { 624 query = baseQuery + ` AND enabled = true ORDER BY created_at DESC LIMIT $2 OFFSET $3` 625 args = []interface{}{aggregatorDID, limit, offset} 626 } else { 627 query = baseQuery + ` ORDER BY created_at DESC LIMIT $2 OFFSET $3` 628 args = []interface{}{aggregatorDID, limit, offset} 629 } 630 631 rows, err := r.db.QueryContext(ctx, query, args...) 632 if err != nil { 633 return nil, fmt.Errorf("failed to list authorizations for aggregator: %w", err) 634 } 635 defer func() { _ = rows.Close() }() 636 637 return scanAuthorizations(rows) 638} 639 640// ListAuthorizationsForCommunity retrieves all aggregators authorized by a community 641func (r *postgresAggregatorRepo) ListAuthorizationsForCommunity(ctx context.Context, communityDID string, enabledOnly bool, limit, offset int) ([]*aggregators.Authorization, error) { 642 baseQuery := ` 643 SELECT 644 id, aggregator_did, community_did, enabled, config, 645 created_at, created_by, disabled_at, disabled_by, 646 indexed_at, record_uri, record_cid 647 FROM aggregator_authorizations 648 WHERE community_did = $1` 649 650 var query string 651 var args []interface{} 652 653 if enabledOnly { 654 query = baseQuery + ` AND enabled = true ORDER BY created_at DESC LIMIT $2 OFFSET $3` 655 args = []interface{}{communityDID, limit, offset} 656 } else { 657 query = baseQuery + ` ORDER BY created_at DESC LIMIT $2 OFFSET $3` 658 args = []interface{}{communityDID, limit, offset} 659 } 660 661 rows, err := r.db.QueryContext(ctx, query, args...) 662 if err != nil { 663 return nil, fmt.Errorf("failed to list authorizations for community: %w", err) 664 } 665 defer func() { _ = rows.Close() }() 666 667 return scanAuthorizations(rows) 668} 669 670// IsAuthorized performs a fast authorization check (enabled=true) 671// Uses the optimized partial index: idx_aggregator_auth_enabled 672func (r *postgresAggregatorRepo) IsAuthorized(ctx context.Context, aggregatorDID, communityDID string) (bool, error) { 673 query := ` 674 SELECT EXISTS( 675 SELECT 1 FROM aggregator_authorizations 676 WHERE aggregator_did = $1 AND community_did = $2 AND enabled = true 677 )` 678 679 var authorized bool 680 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID).Scan(&authorized) 681 if err != nil { 682 return false, fmt.Errorf("failed to check authorization: %w", err) 683 } 684 685 return authorized, nil 686} 687 688// ===== Post Tracking Operations ===== 689 690// RecordAggregatorPost tracks a post created by an aggregator (for rate limiting and stats) 691func (r *postgresAggregatorRepo) RecordAggregatorPost(ctx context.Context, aggregatorDID, communityDID, postURI, postCID string) error { 692 query := ` 693 INSERT INTO aggregator_posts (aggregator_did, community_did, post_uri, post_cid, created_at) 694 VALUES ($1, $2, $3, $4, NOW())` 695 696 _, err := r.db.ExecContext(ctx, query, aggregatorDID, communityDID, postURI, postCID) 697 if err != nil { 698 return fmt.Errorf("failed to record aggregator post: %w", err) 699 } 700 701 return nil 702} 703 704// CountRecentPosts counts posts created by an aggregator in a community since a given time 705// Uses the optimized index: idx_aggregator_posts_rate_limit 706func (r *postgresAggregatorRepo) CountRecentPosts(ctx context.Context, aggregatorDID, communityDID string, since time.Time) (int, error) { 707 query := ` 708 SELECT COUNT(*) 709 FROM aggregator_posts 710 WHERE aggregator_did = $1 AND community_did = $2 AND created_at >= $3` 711 712 var count int 713 err := r.db.QueryRowContext(ctx, query, aggregatorDID, communityDID, since).Scan(&count) 714 if err != nil { 715 return 0, fmt.Errorf("failed to count recent posts: %w", err) 716 } 717 718 return count, nil 719} 720 721// GetRecentPosts retrieves recent posts created by an aggregator in a community 722func (r *postgresAggregatorRepo) GetRecentPosts(ctx context.Context, aggregatorDID, communityDID string, since time.Time) ([]*aggregators.AggregatorPost, error) { 723 query := ` 724 SELECT id, aggregator_did, community_did, post_uri, created_at 725 FROM aggregator_posts 726 WHERE aggregator_did = $1 AND community_did = $2 AND created_at >= $3 727 ORDER BY created_at DESC` 728 729 rows, err := r.db.QueryContext(ctx, query, aggregatorDID, communityDID, since) 730 if err != nil { 731 return nil, fmt.Errorf("failed to get recent posts: %w", err) 732 } 733 defer func() { _ = rows.Close() }() 734 735 var posts []*aggregators.AggregatorPost 736 for rows.Next() { 737 post := &aggregators.AggregatorPost{} 738 err := rows.Scan( 739 &post.ID, 740 &post.AggregatorDID, 741 &post.CommunityDID, 742 &post.PostURI, 743 &post.CreatedAt, 744 ) 745 if err != nil { 746 return nil, fmt.Errorf("failed to scan aggregator post: %w", err) 747 } 748 posts = append(posts, post) 749 } 750 751 if err = rows.Err(); err != nil { 752 return nil, fmt.Errorf("error iterating aggregator posts: %w", err) 753 } 754 755 return posts, nil 756} 757 758// ===== Helper Functions ===== 759 760// scanAuthorizations is a helper to scan multiple authorization rows 761func scanAuthorizations(rows *sql.Rows) ([]*aggregators.Authorization, error) { 762 var auths []*aggregators.Authorization 763 764 for rows.Next() { 765 auth := &aggregators.Authorization{} 766 var config []byte 767 var createdBy, disabledBy, recordURI, recordCID sql.NullString 768 var disabledAt sql.NullTime 769 770 err := rows.Scan( 771 &auth.ID, 772 &auth.AggregatorDID, 773 &auth.CommunityDID, 774 &auth.Enabled, 775 &config, 776 &auth.CreatedAt, 777 &createdBy, 778 &disabledAt, 779 &disabledBy, 780 &auth.IndexedAt, 781 &recordURI, 782 &recordCID, 783 ) 784 if err != nil { 785 return nil, fmt.Errorf("failed to scan authorization: %w", err) 786 } 787 788 // Map nullable fields 789 auth.CreatedBy = createdBy.String 790 auth.DisabledBy = disabledBy.String 791 if disabledAt.Valid { 792 disabledAtVal := disabledAt.Time 793 auth.DisabledAt = &disabledAtVal 794 } 795 auth.RecordURI = recordURI.String 796 auth.RecordCID = recordCID.String 797 if config != nil { 798 auth.Config = config 799 } 800 801 auths = append(auths, auth) 802 } 803 804 if err := rows.Err(); err != nil { 805 return nil, fmt.Errorf("error iterating authorizations: %w", err) 806 } 807 808 return auths, nil 809}