A community based topic aggregation platform built on atproto
1// cmd/reindex-votes/main.go
2// Quick tool to reindex votes from PDS to AppView database
3package main
4
5import (
6 "context"
7 "database/sql"
8 "encoding/json"
9 "fmt"
10 "log"
11 "net/http"
12 "net/url"
13 "os"
14 "strings"
15 "time"
16
17 _ "github.com/lib/pq"
18)
19
20type ListRecordsResponse struct {
21 Records []Record `json:"records"`
22 Cursor string `json:"cursor"`
23}
24
25type Record struct {
26 URI string `json:"uri"`
27 CID string `json:"cid"`
28 Value map[string]interface{} `json:"value"`
29}
30
31func main() {
32 // Get config from env
33 dbURL := os.Getenv("DATABASE_URL")
34 if dbURL == "" {
35 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
36 }
37 pdsURL := os.Getenv("PDS_URL")
38 if pdsURL == "" {
39 pdsURL = "http://localhost:3001"
40 }
41
42 log.Printf("Connecting to database...")
43 db, err := sql.Open("postgres", dbURL)
44 if err != nil {
45 log.Fatalf("Failed to connect to database: %v", err)
46 }
47 defer db.Close()
48
49 ctx := context.Background()
50
51 // Get all accounts directly from the PDS
52 log.Printf("Fetching accounts from PDS (%s)...", pdsURL)
53 dids, err := fetchAllAccountsFromPDS(pdsURL)
54 if err != nil {
55 log.Fatalf("Failed to fetch accounts from PDS: %v", err)
56 }
57 log.Printf("Found %d accounts on PDS to check for votes", len(dids))
58
59 // Reset vote counts first
60 log.Printf("Resetting all vote counts...")
61 if _, err := db.ExecContext(ctx, "DELETE FROM votes"); err != nil {
62 log.Fatalf("Failed to clear votes table: %v", err)
63 }
64 if _, err := db.ExecContext(ctx, "UPDATE posts SET upvote_count = 0, downvote_count = 0, score = 0"); err != nil {
65 log.Fatalf("Failed to reset post vote counts: %v", err)
66 }
67 if _, err := db.ExecContext(ctx, "UPDATE comments SET upvote_count = 0, downvote_count = 0, score = 0"); err != nil {
68 log.Fatalf("Failed to reset comment vote counts: %v", err)
69 }
70
71 // For each user, fetch their votes from PDS
72 totalVotes := 0
73 for _, did := range dids {
74 votes, err := fetchVotesFromPDS(pdsURL, did)
75 if err != nil {
76 log.Printf("Warning: failed to fetch votes for %s: %v", did, err)
77 continue
78 }
79
80 if len(votes) == 0 {
81 continue
82 }
83
84 log.Printf("Found %d votes for %s", len(votes), did)
85
86 // Index each vote
87 for _, vote := range votes {
88 if err := indexVote(ctx, db, did, vote); err != nil {
89 log.Printf("Warning: failed to index vote %s: %v", vote.URI, err)
90 continue
91 }
92 totalVotes++
93 }
94 }
95
96 log.Printf("✓ Reindexed %d votes from PDS", totalVotes)
97}
98
99// fetchAllAccountsFromPDS queries the PDS sync API to get all repo DIDs
100func fetchAllAccountsFromPDS(pdsURL string) ([]string, error) {
101 // Use com.atproto.sync.listRepos to get all repos on this PDS
102 var allDIDs []string
103 cursor := ""
104
105 for {
106 reqURL := fmt.Sprintf("%s/xrpc/com.atproto.sync.listRepos?limit=100", pdsURL)
107 if cursor != "" {
108 reqURL += "&cursor=" + url.QueryEscape(cursor)
109 }
110
111 resp, err := http.Get(reqURL)
112 if err != nil {
113 return nil, fmt.Errorf("HTTP request failed: %w", err)
114 }
115 defer resp.Body.Close()
116
117 if resp.StatusCode != 200 {
118 return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
119 }
120
121 var result struct {
122 Repos []struct {
123 DID string `json:"did"`
124 } `json:"repos"`
125 Cursor string `json:"cursor"`
126 }
127 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
128 return nil, fmt.Errorf("failed to decode response: %w", err)
129 }
130
131 for _, repo := range result.Repos {
132 allDIDs = append(allDIDs, repo.DID)
133 }
134
135 if result.Cursor == "" {
136 break
137 }
138 cursor = result.Cursor
139 }
140
141 return allDIDs, nil
142}
143
144func fetchVotesFromPDS(pdsURL, did string) ([]Record, error) {
145 var allRecords []Record
146 cursor := ""
147 collection := "social.coves.feed.vote"
148
149 for {
150 reqURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s&limit=100",
151 pdsURL, url.QueryEscape(did), url.QueryEscape(collection))
152 if cursor != "" {
153 reqURL += "&cursor=" + url.QueryEscape(cursor)
154 }
155
156 resp, err := http.Get(reqURL)
157 if err != nil {
158 return nil, fmt.Errorf("HTTP request failed: %w", err)
159 }
160 defer resp.Body.Close()
161
162 if resp.StatusCode == 400 {
163 // User doesn't exist on this PDS or has no records - that's OK
164 return nil, nil
165 }
166 if resp.StatusCode != 200 {
167 return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
168 }
169
170 var result ListRecordsResponse
171 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
172 return nil, fmt.Errorf("failed to decode response: %w", err)
173 }
174
175 allRecords = append(allRecords, result.Records...)
176
177 if result.Cursor == "" {
178 break
179 }
180 cursor = result.Cursor
181 }
182
183 return allRecords, nil
184}
185
186func indexVote(ctx context.Context, db *sql.DB, voterDID string, record Record) error {
187 // Extract vote data from record
188 subject, ok := record.Value["subject"].(map[string]interface{})
189 if !ok {
190 return fmt.Errorf("missing subject")
191 }
192 subjectURI, _ := subject["uri"].(string)
193 subjectCID, _ := subject["cid"].(string)
194 direction, _ := record.Value["direction"].(string)
195 createdAtStr, _ := record.Value["createdAt"].(string)
196
197 if subjectURI == "" || direction == "" {
198 return fmt.Errorf("invalid vote record: missing required fields")
199 }
200
201 // Parse created_at
202 createdAt, err := time.Parse(time.RFC3339, createdAtStr)
203 if err != nil {
204 createdAt = time.Now()
205 }
206
207 // Extract rkey from URI (at://did/collection/rkey)
208 parts := strings.Split(record.URI, "/")
209 if len(parts) < 5 {
210 return fmt.Errorf("invalid URI format: %s", record.URI)
211 }
212 rkey := parts[len(parts)-1]
213
214 // Start transaction
215 tx, err := db.BeginTx(ctx, nil)
216 if err != nil {
217 return fmt.Errorf("failed to begin transaction: %w", err)
218 }
219 defer tx.Rollback()
220
221 // Insert vote
222 _, err = tx.ExecContext(ctx, `
223 INSERT INTO votes (uri, cid, rkey, voter_did, subject_uri, subject_cid, direction, created_at, indexed_at)
224 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW())
225 ON CONFLICT (uri) DO NOTHING
226 `, record.URI, record.CID, rkey, voterDID, subjectURI, subjectCID, direction, createdAt)
227 if err != nil {
228 return fmt.Errorf("failed to insert vote: %w", err)
229 }
230
231 // Update post/comment counts
232 collection := extractCollectionFromURI(subjectURI)
233 var updateQuery string
234
235 switch collection {
236 case "social.coves.community.post":
237 if direction == "up" {
238 updateQuery = `UPDATE posts SET upvote_count = upvote_count + 1, score = upvote_count + 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL`
239 } else {
240 updateQuery = `UPDATE posts SET downvote_count = downvote_count + 1, score = upvote_count - (downvote_count + 1) WHERE uri = $1 AND deleted_at IS NULL`
241 }
242 case "social.coves.community.comment":
243 if direction == "up" {
244 updateQuery = `UPDATE comments SET upvote_count = upvote_count + 1, score = upvote_count + 1 - downvote_count WHERE uri = $1 AND deleted_at IS NULL`
245 } else {
246 updateQuery = `UPDATE comments SET downvote_count = downvote_count + 1, score = upvote_count - (downvote_count + 1) WHERE uri = $1 AND deleted_at IS NULL`
247 }
248 default:
249 // Unknown collection, just index the vote
250 return tx.Commit()
251 }
252
253 if _, err := tx.ExecContext(ctx, updateQuery, subjectURI); err != nil {
254 return fmt.Errorf("failed to update vote counts: %w", err)
255 }
256
257 return tx.Commit()
258}
259
260func extractCollectionFromURI(uri string) string {
261 // at://did:plc:xxx/social.coves.community.post/rkey
262 parts := strings.Split(uri, "/")
263 if len(parts) >= 4 {
264 return parts[3]
265 }
266 return ""
267}