forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "tangled.org/core/appview/models"
13 "tangled.org/core/appview/pagination"
14)
15
16func CreateNotification(e Execer, notification *models.Notification) error {
17 query := `
18 INSERT INTO notifications (recipient_did, actor_did, type, entity_type, entity_id, read, repo_id, issue_id, pull_id)
19 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
20 `
21
22 result, err := e.Exec(query,
23 notification.RecipientDid,
24 notification.ActorDid,
25 string(notification.Type),
26 notification.EntityType,
27 notification.EntityId,
28 notification.Read,
29 notification.RepoId,
30 notification.IssueId,
31 notification.PullId,
32 )
33 if err != nil {
34 return fmt.Errorf("failed to create notification: %w", err)
35 }
36
37 id, err := result.LastInsertId()
38 if err != nil {
39 return fmt.Errorf("failed to get notification ID: %w", err)
40 }
41
42 notification.ID = id
43 return nil
44}
45
46// GetNotificationsPaginated retrieves notifications with filters and pagination
47func GetNotificationsPaginated(e Execer, page pagination.Page, filters ...filter) ([]*models.Notification, error) {
48 var conditions []string
49 var args []any
50
51 for _, filter := range filters {
52 conditions = append(conditions, filter.Condition())
53 args = append(args, filter.Arg()...)
54 }
55
56 whereClause := ""
57 if len(conditions) > 0 {
58 whereClause = "WHERE " + conditions[0]
59 for _, condition := range conditions[1:] {
60 whereClause += " AND " + condition
61 }
62 }
63
64 query := fmt.Sprintf(`
65 select id, recipient_did, actor_did, type, entity_type, entity_id, read, created, repo_id, issue_id, pull_id
66 from notifications
67 %s
68 order by created desc
69 limit ? offset ?
70 `, whereClause)
71
72 args = append(args, page.Limit, page.Offset)
73
74 rows, err := e.QueryContext(context.Background(), query, args...)
75 if err != nil {
76 return nil, fmt.Errorf("failed to query notifications: %w", err)
77 }
78 defer rows.Close()
79
80 var notifications []*models.Notification
81 for rows.Next() {
82 var n models.Notification
83 var typeStr string
84 var createdStr string
85 err := rows.Scan(
86 &n.ID,
87 &n.RecipientDid,
88 &n.ActorDid,
89 &typeStr,
90 &n.EntityType,
91 &n.EntityId,
92 &n.Read,
93 &createdStr,
94 &n.RepoId,
95 &n.IssueId,
96 &n.PullId,
97 )
98 if err != nil {
99 return nil, fmt.Errorf("failed to scan notification: %w", err)
100 }
101 n.Type = models.NotificationType(typeStr)
102 n.Created, err = time.Parse(time.RFC3339, createdStr)
103 if err != nil {
104 return nil, fmt.Errorf("failed to parse created timestamp: %w", err)
105 }
106 notifications = append(notifications, &n)
107 }
108
109 return notifications, nil
110}
111
112// GetNotificationsWithEntities retrieves notifications with their related entities
113func GetNotificationsWithEntities(e Execer, page pagination.Page, filters ...filter) ([]*models.NotificationWithEntity, error) {
114 var conditions []string
115 var args []any
116
117 for _, filter := range filters {
118 conditions = append(conditions, filter.Condition())
119 args = append(args, filter.Arg()...)
120 }
121
122 whereClause := ""
123 if len(conditions) > 0 {
124 whereClause = "WHERE " + conditions[0]
125 for _, condition := range conditions[1:] {
126 whereClause += " AND " + condition
127 }
128 }
129
130 query := fmt.Sprintf(`
131 select
132 n.id, n.recipient_did, n.actor_did, n.type, n.entity_type, n.entity_id,
133 n.read, n.created, n.repo_id, n.issue_id, n.pull_id,
134 r.id as r_id, r.did as r_did, r.name as r_name, r.description as r_description,
135 i.id as i_id, i.did as i_did, i.issue_id as i_issue_id, i.title as i_title, i.open as i_open,
136 p.id as p_id, p.owner_did as p_owner_did, p.pull_id as p_pull_id, p.title as p_title, p.state as p_state
137 from notifications n
138 left join repos r on n.repo_id = r.id
139 left join issues i on n.issue_id = i.id
140 left join pulls p on n.pull_id = p.id
141 %s
142 order by n.created desc
143 limit ? offset ?
144 `, whereClause)
145
146 args = append(args, page.Limit, page.Offset)
147
148 rows, err := e.QueryContext(context.Background(), query, args...)
149 if err != nil {
150 return nil, fmt.Errorf("failed to query notifications with entities: %w", err)
151 }
152 defer rows.Close()
153
154 var notifications []*models.NotificationWithEntity
155 for rows.Next() {
156 var n models.Notification
157 var typeStr string
158 var createdStr string
159 var repo models.Repo
160 var issue models.Issue
161 var pull models.Pull
162 var rId, iId, pId sql.NullInt64
163 var rDid, rName, rDescription sql.NullString
164 var iDid sql.NullString
165 var iIssueId sql.NullInt64
166 var iTitle sql.NullString
167 var iOpen sql.NullBool
168 var pOwnerDid sql.NullString
169 var pPullId sql.NullInt64
170 var pTitle sql.NullString
171 var pState sql.NullInt64
172
173 err := rows.Scan(
174 &n.ID, &n.RecipientDid, &n.ActorDid, &typeStr, &n.EntityType, &n.EntityId,
175 &n.Read, &createdStr, &n.RepoId, &n.IssueId, &n.PullId,
176 &rId, &rDid, &rName, &rDescription,
177 &iId, &iDid, &iIssueId, &iTitle, &iOpen,
178 &pId, &pOwnerDid, &pPullId, &pTitle, &pState,
179 )
180 if err != nil {
181 return nil, fmt.Errorf("failed to scan notification with entities: %w", err)
182 }
183
184 n.Type = models.NotificationType(typeStr)
185 n.Created, err = time.Parse(time.RFC3339, createdStr)
186 if err != nil {
187 return nil, fmt.Errorf("failed to parse created timestamp: %w", err)
188 }
189
190 nwe := &models.NotificationWithEntity{Notification: &n}
191
192 // populate repo if present
193 if rId.Valid {
194 repo.Id = rId.Int64
195 if rDid.Valid {
196 repo.Did = rDid.String
197 }
198 if rName.Valid {
199 repo.Name = rName.String
200 }
201 if rDescription.Valid {
202 repo.Description = rDescription.String
203 }
204 nwe.Repo = &repo
205 }
206
207 // populate issue if present
208 if iId.Valid {
209 issue.Id = iId.Int64
210 if iDid.Valid {
211 issue.Did = iDid.String
212 }
213 if iIssueId.Valid {
214 issue.IssueId = int(iIssueId.Int64)
215 }
216 if iTitle.Valid {
217 issue.Title = iTitle.String
218 }
219 if iOpen.Valid {
220 issue.Open = iOpen.Bool
221 }
222 nwe.Issue = &issue
223 }
224
225 // populate pull if present
226 if pId.Valid {
227 pull.ID = int(pId.Int64)
228 if pOwnerDid.Valid {
229 pull.OwnerDid = pOwnerDid.String
230 }
231 if pPullId.Valid {
232 pull.PullId = int(pPullId.Int64)
233 }
234 if pTitle.Valid {
235 pull.Title = pTitle.String
236 }
237 if pState.Valid {
238 pull.State = models.PullState(pState.Int64)
239 }
240 nwe.Pull = &pull
241 }
242
243 notifications = append(notifications, nwe)
244 }
245
246 return notifications, nil
247}
248
249// GetNotifications retrieves notifications with filters
250func GetNotifications(e Execer, filters ...filter) ([]*models.Notification, error) {
251 return GetNotificationsPaginated(e, pagination.FirstPage(), filters...)
252}
253
254func CountNotifications(e Execer, filters ...filter) (int64, error) {
255 var conditions []string
256 var args []any
257 for _, filter := range filters {
258 conditions = append(conditions, filter.Condition())
259 args = append(args, filter.Arg()...)
260 }
261
262 whereClause := ""
263 if conditions != nil {
264 whereClause = " where " + strings.Join(conditions, " and ")
265 }
266
267 query := fmt.Sprintf(`select count(1) from notifications %s`, whereClause)
268 var count int64
269 err := e.QueryRow(query, args...).Scan(&count)
270
271 if !errors.Is(err, sql.ErrNoRows) && err != nil {
272 return 0, err
273 }
274
275 return count, nil
276}
277
278func MarkNotificationRead(e Execer, notificationID int64, userDID string) error {
279 idFilter := FilterEq("id", notificationID)
280 recipientFilter := FilterEq("recipient_did", userDID)
281
282 query := fmt.Sprintf(`
283 UPDATE notifications
284 SET read = 1
285 WHERE %s AND %s
286 `, idFilter.Condition(), recipientFilter.Condition())
287
288 args := append(idFilter.Arg(), recipientFilter.Arg()...)
289
290 result, err := e.Exec(query, args...)
291 if err != nil {
292 return fmt.Errorf("failed to mark notification as read: %w", err)
293 }
294
295 rowsAffected, err := result.RowsAffected()
296 if err != nil {
297 return fmt.Errorf("failed to get rows affected: %w", err)
298 }
299
300 if rowsAffected == 0 {
301 return fmt.Errorf("notification not found or access denied")
302 }
303
304 return nil
305}
306
307func MarkAllNotificationsRead(e Execer, userDID string) error {
308 recipientFilter := FilterEq("recipient_did", userDID)
309 readFilter := FilterEq("read", 0)
310
311 query := fmt.Sprintf(`
312 UPDATE notifications
313 SET read = 1
314 WHERE %s AND %s
315 `, recipientFilter.Condition(), readFilter.Condition())
316
317 args := append(recipientFilter.Arg(), readFilter.Arg()...)
318
319 _, err := e.Exec(query, args...)
320 if err != nil {
321 return fmt.Errorf("failed to mark all notifications as read: %w", err)
322 }
323
324 return nil
325}
326
327func DeleteNotification(e Execer, notificationID int64, userDID string) error {
328 idFilter := FilterEq("id", notificationID)
329 recipientFilter := FilterEq("recipient_did", userDID)
330
331 query := fmt.Sprintf(`
332 DELETE FROM notifications
333 WHERE %s AND %s
334 `, idFilter.Condition(), recipientFilter.Condition())
335
336 args := append(idFilter.Arg(), recipientFilter.Arg()...)
337
338 result, err := e.Exec(query, args...)
339 if err != nil {
340 return fmt.Errorf("failed to delete notification: %w", err)
341 }
342
343 rowsAffected, err := result.RowsAffected()
344 if err != nil {
345 return fmt.Errorf("failed to get rows affected: %w", err)
346 }
347
348 if rowsAffected == 0 {
349 return fmt.Errorf("notification not found or access denied")
350 }
351
352 return nil
353}
354
355func GetNotificationPreference(e Execer, userDid string) (*models.NotificationPreferences, error) {
356 prefs, err := GetNotificationPreferences(e, FilterEq("user_did", userDid))
357 if err != nil {
358 return nil, err
359 }
360
361 p, ok := prefs[syntax.DID(userDid)]
362 if !ok {
363 return models.DefaultNotificationPreferences(syntax.DID(userDid)), nil
364 }
365
366 return p, nil
367}
368
369func GetNotificationPreferences(e Execer, filters ...filter) (map[syntax.DID]*models.NotificationPreferences, error) {
370 prefsMap := make(map[syntax.DID]*models.NotificationPreferences)
371
372 var conditions []string
373 var args []any
374 for _, filter := range filters {
375 conditions = append(conditions, filter.Condition())
376 args = append(args, filter.Arg()...)
377 }
378
379 whereClause := ""
380 if conditions != nil {
381 whereClause = " where " + strings.Join(conditions, " and ")
382 }
383
384 query := fmt.Sprintf(`
385 select
386 id,
387 user_did,
388 repo_starred,
389 issue_created,
390 issue_commented,
391 pull_created,
392 pull_commented,
393 followed,
394 pull_merged,
395 issue_closed,
396 email_notifications
397 from
398 notification_preferences
399 %s
400 `, whereClause)
401
402 rows, err := e.Query(query, args...)
403 if err != nil {
404 return nil, err
405 }
406 defer rows.Close()
407
408 for rows.Next() {
409 var prefs models.NotificationPreferences
410 if err := rows.Scan(
411 &prefs.ID,
412 &prefs.UserDid,
413 &prefs.RepoStarred,
414 &prefs.IssueCreated,
415 &prefs.IssueCommented,
416 &prefs.PullCreated,
417 &prefs.PullCommented,
418 &prefs.Followed,
419 &prefs.PullMerged,
420 &prefs.IssueClosed,
421 &prefs.EmailNotifications,
422 ); err != nil {
423 return nil, err
424 }
425
426 prefsMap[prefs.UserDid] = &prefs
427 }
428
429 if err := rows.Err(); err != nil {
430 return nil, err
431 }
432
433 return prefsMap, nil
434}
435
436func (d *DB) UpdateNotificationPreferences(ctx context.Context, prefs *models.NotificationPreferences) error {
437 query := `
438 INSERT OR REPLACE INTO notification_preferences
439 (user_did, repo_starred, issue_created, issue_commented, pull_created,
440 pull_commented, followed, pull_merged, issue_closed, email_notifications)
441 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
442 `
443
444 result, err := d.DB.ExecContext(ctx, query,
445 prefs.UserDid,
446 prefs.RepoStarred,
447 prefs.IssueCreated,
448 prefs.IssueCommented,
449 prefs.PullCreated,
450 prefs.PullCommented,
451 prefs.Followed,
452 prefs.PullMerged,
453 prefs.IssueClosed,
454 prefs.EmailNotifications,
455 )
456 if err != nil {
457 return fmt.Errorf("failed to update notification preferences: %w", err)
458 }
459
460 if prefs.ID == 0 {
461 id, err := result.LastInsertId()
462 if err != nil {
463 return fmt.Errorf("failed to get preferences ID: %w", err)
464 }
465 prefs.ID = id
466 }
467
468 return nil
469}
470
471func (d *DB) ClearOldNotifications(ctx context.Context, olderThan time.Duration) error {
472 cutoff := time.Now().Add(-olderThan)
473 createdFilter := FilterLte("created", cutoff)
474
475 query := fmt.Sprintf(`
476 DELETE FROM notifications
477 WHERE %s
478 `, createdFilter.Condition())
479
480 _, err := d.DB.ExecContext(ctx, query, createdFilter.Arg()...)
481 if err != nil {
482 return fmt.Errorf("failed to cleanup old notifications: %w", err)
483 }
484
485 return nil
486}