···
···
// Block exists in our index - return it
return existingBlock, nil
568
-
// Race condition: PDS has the block but Jetstream hasn't indexed it yet
569
-
// Return typed conflict error so handler can return 409 instead of 500
570
-
// This is normal in eventually-consistent systems
571
-
return nil, ErrBlockAlreadyExists
569
+
// Only treat as "already exists" if the error is ErrBlockNotFound (race condition)
570
+
// Any other error (DB outage, connection failure, etc.) should bubble up
571
+
if errors.Is(getErr, ErrBlockNotFound) {
572
+
// Race condition: PDS has the block but Jetstream hasn't indexed it yet
573
+
// Return typed conflict error so handler can return 409 instead of 500
574
+
// This is normal in eventually-consistent systems
575
+
return nil, ErrBlockAlreadyExists
577
+
// Real datastore error - bubble it up so operators see the failure
578
+
return nil, fmt.Errorf("PDS reported duplicate block but failed to fetch from index: %w", getErr)
return nil, fmt.Errorf("failed to create block on PDS: %w", err)
···
// PDS write-forward helpers
727
-
func (s *communityService) createRecordOnPDS(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}) (string, string, error) {
728
-
endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/"))
730
-
payload := map[string]interface{}{
732
-
"collection": collection,
737
-
payload["rkey"] = rkey
740
-
return s.callPDS(ctx, "POST", endpoint, payload)
// createRecordOnPDSAs creates a record with a specific access token (for V2 community auth)
func (s *communityService) createRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, record map[string]interface{}, accessToken string) (string, string, error) {
endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", strings.TrimSuffix(s.pdsURL, "/"))
···
return s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
774
-
func (s *communityService) deleteRecordOnPDS(ctx context.Context, repoDID, collection, rkey string) error {
775
-
endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/"))
777
-
payload := map[string]interface{}{
779
-
"collection": collection,
783
-
_, _, err := s.callPDS(ctx, "POST", endpoint, payload)
// deleteRecordOnPDSAs deletes a record with a specific access token (for user-scoped deletions)
788
-
func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey string, accessToken string) error {
766
+
func (s *communityService) deleteRecordOnPDSAs(ctx context.Context, repoDID, collection, rkey, accessToken string) error {
endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", strings.TrimSuffix(s.pdsURL, "/"))
payload := map[string]interface{}{
···
_, _, err := s.callPDSWithAuth(ctx, "POST", endpoint, payload, accessToken)
801
-
func (s *communityService) callPDS(ctx context.Context, method, endpoint string, payload map[string]interface{}) (string, string, error) {
802
-
// Use instance's access token
803
-
return s.callPDSWithAuth(ctx, method, endpoint, payload, s.pdsAccessToken)
// callPDSWithAuth makes a PDS call with a specific access token (V2: for community authentication)
···