A community based topic aggregation platform built on atproto
at main 7.4 kB view raw
1// cmd/reindex-votes/main.go 2// Quick tool to reindex votes from PDS to AppView database 3package main 4 5import ( 6 "context" 7 "database/sql" 8 "encoding/json" 9 "fmt" 10 "log" 11 "net/http" 12 "net/url" 13 "os" 14 "strings" 15 "time" 16 17 _ "github.com/lib/pq" 18) 19 20type ListRecordsResponse struct { 21 Records []Record `json:"records"` 22 Cursor string `json:"cursor"` 23} 24 25type Record struct { 26 URI string `json:"uri"` 27 CID string `json:"cid"` 28 Value map[string]interface{} `json:"value"` 29} 30 31func main() { 32 // Get config from env 33 dbURL := os.Getenv("DATABASE_URL") 34 if dbURL == "" { 35 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 36 } 37 pdsURL := os.Getenv("PDS_URL") 38 if pdsURL == "" { 39 pdsURL = "http://localhost:3001" 40 } 41 42 log.Printf("Connecting to database...") 43 db, err := sql.Open("postgres", dbURL) 44 if err != nil { 45 log.Fatalf("Failed to connect to database: %v", err) 46 } 47 defer db.Close() 48 49 ctx := context.Background() 50 51 // Get all accounts directly from the PDS 52 log.Printf("Fetching accounts from PDS (%s)...", pdsURL) 53 dids, err := fetchAllAccountsFromPDS(pdsURL) 54 if err != nil { 55 log.Fatalf("Failed to fetch accounts from PDS: %v", err) 56 } 57 log.Printf("Found %d accounts on PDS to check for votes", len(dids)) 58 59 // Reset vote counts first 60 log.Printf("Resetting all vote counts...") 61 if _, err := db.ExecContext(ctx, "DELETE FROM votes"); err != nil { 62 log.Fatalf("Failed to clear votes table: %v", err) 63 } 64 if _, err := db.ExecContext(ctx, "UPDATE posts SET upvote_count = 0, downvote_count = 0, score = 0"); err != nil { 65 log.Fatalf("Failed to reset post vote counts: %v", err) 66 } 67 if _, err := db.ExecContext(ctx, "UPDATE comments SET upvote_count = 0, downvote_count = 0, score = 0"); err != nil { 68 log.Fatalf("Failed to reset comment vote counts: %v", err) 69 } 70 71 // For each user, fetch their votes from PDS 72 totalVotes := 0 73 for _, did := range dids { 74 votes, err := fetchVotesFromPDS(pdsURL, did) 75 if err != nil { 76 log.Printf("Warning: failed to fetch votes for %s: %v", did, err) 77 continue 78 } 79 80 if len(votes) == 0 { 81 continue 82 } 83 84 log.Printf("Found %d votes for %s", len(votes), did) 85 86 // Index each vote 87 for _, vote := range votes { 88 if err := indexVote(ctx, db, did, vote); err != nil { 89 log.Printf("Warning: failed to index vote %s: %v", vote.URI, err) 90 continue 91 } 92 totalVotes++ 93 } 94 } 95 96 log.Printf("✓ Reindexed %d votes from PDS", totalVotes) 97} 98 99// fetchAllAccountsFromPDS queries the PDS sync API to get all repo DIDs 100func fetchAllAccountsFromPDS(pdsURL string) ([]string, error) { 101 // Use com.atproto.sync.listRepos to get all repos on this PDS 102 var allDIDs []string 103 cursor := "" 104 105 for { 106 reqURL := fmt.Sprintf("%s/xrpc/com.atproto.sync.listRepos?limit=100", pdsURL) 107 if cursor != "" { 108 reqURL += "&cursor=" + url.QueryEscape(cursor) 109 } 110 111 resp, err := http.Get(reqURL) 112 if err != nil { 113 return nil, fmt.Errorf("HTTP request failed: %w", err) 114 } 115 defer resp.Body.Close() 116 117 if resp.StatusCode != 200 { 118 return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 119 } 120 121 var result struct { 122 Repos []struct { 123 DID string `json:"did"` 124 } `json:"repos"` 125 Cursor string `json:"cursor"` 126 } 127 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 128 return nil, fmt.Errorf("failed to decode response: %w", err) 129 } 130 131 for _, repo := range result.Repos { 132 allDIDs = append(allDIDs, repo.DID) 133 } 134 135 if result.Cursor == "" { 136 break 137 } 138 cursor = result.Cursor 139 } 140 141 return allDIDs, nil 142} 143 144func fetchVotesFromPDS(pdsURL, did string) ([]Record, error) { 145 var allRecords []Record 146 cursor := "" 147 collection := "social.coves.feed.vote" 148 149 for { 150 reqURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s&limit=100", 151 pdsURL, url.QueryEscape(did), url.QueryEscape(collection)) 152 if cursor != "" { 153 reqURL += "&cursor=" + url.QueryEscape(cursor) 154 } 155 156 resp, err := http.Get(reqURL) 157 if err != nil { 158 return nil, fmt.Errorf("HTTP request failed: %w", err) 159 } 160 defer resp.Body.Close() 161 162 if resp.StatusCode == 400 { 163 // User doesn't exist on this PDS or has no records - that's OK 164 return nil, nil 165 } 166 if resp.StatusCode != 200 { 167 return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 168 } 169 170 var result ListRecordsResponse 171 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 172 return nil, fmt.Errorf("failed to decode response: %w", err) 173 } 174 175 allRecords = append(allRecords, result.Records...) 176 177 if result.Cursor == "" { 178 break 179 } 180 cursor = result.Cursor 181 } 182 183 return allRecords, nil 184} 185 186func indexVote(ctx context.Context, db *sql.DB, voterDID string, record Record) error { 187 // Extract vote data from record 188 subject, ok := record.Value["subject"].(map[string]interface{}) 189 if !ok { 190 return fmt.Errorf("missing subject") 191 } 192 subjectURI, _ := subject["uri"].(string) 193 subjectCID, _ := subject["cid"].(string) 194 direction, _ := record.Value["direction"].(string) 195 createdAtStr, _ := record.Value["createdAt"].(string) 196 197 if subjectURI == "" || direction == "" { 198 return fmt.Errorf("invalid vote record: missing required fields") 199 } 200 201 // Parse created_at 202 createdAt, err := time.Parse(time.RFC3339, createdAtStr) 203 if err != nil { 204 createdAt = time.Now() 205 } 206 207 // Extract rkey from URI (at://did/collection/rkey) 208 parts := strings.Split(record.URI, "/") 209 if len(parts) < 5 { 210 return fmt.Errorf("invalid URI format: %s", record.URI) 211 } 212 rkey := parts[len(parts)-1] 213 214 // Start transaction 215 tx, err := db.BeginTx(ctx, nil) 216 if err != nil { 217 return fmt.Errorf("failed to begin transaction: %w", err) 218 } 219 defer tx.Rollback() 220 221 // Insert vote 222 _, err = tx.ExecContext(ctx, ` 223 INSERT INTO votes (uri, cid, rkey, voter_did, subject_uri, subject_cid, direction, created_at, indexed_at) 224 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW()) 225 ON CONFLICT (uri) DO NOTHING 226 `, record.URI, record.CID, rkey, voterDID, subjectURI, subjectCID, direction, createdAt) 227 if err != nil { 228 return fmt.Errorf("failed to insert vote: %w", err) 229 } 230 231 // Update post/comment counts 232 collection := extractCollectionFromURI(subjectURI) 233 var updateQuery string 234 235 switch collection { 236 case "social.coves.community.post": 237 if direction == "up" { 238 updateQuery = `UPDATE posts SET upvote_count = upvote_count + 1, score = upvote_count + 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL` 239 } else { 240 updateQuery = `UPDATE posts SET downvote_count = downvote_count + 1, score = upvote_count - (downvote_count + 1) WHERE uri = $1 AND deleted_at IS NULL` 241 } 242 case "social.coves.community.comment": 243 if direction == "up" { 244 updateQuery = `UPDATE comments SET upvote_count = upvote_count + 1, score = upvote_count + 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL` 245 } else { 246 updateQuery = `UPDATE comments SET downvote_count = downvote_count + 1, score = upvote_count - (downvote_count + 1) WHERE uri = $1 AND deleted_at IS NULL` 247 } 248 default: 249 // Unknown collection, just index the vote 250 return tx.Commit() 251 } 252 253 if _, err := tx.ExecContext(ctx, updateQuery, subjectURI); err != nil { 254 return fmt.Errorf("failed to update vote counts: %w", err) 255 } 256 257 return tx.Commit() 258} 259 260func extractCollectionFromURI(uri string) string { 261 // at://did:plc:xxx/social.coves.community.post/rkey 262 parts := strings.Split(uri, "/") 263 if len(parts) >= 4 { 264 return parts[3] 265 } 266 return "" 267}