forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "tangled.org/core/appview/models"
13 "tangled.org/core/appview/pagination"
14)
15
16func CreateNotification(e Execer, notification *models.Notification) error {
17 query := `
18 INSERT INTO notifications (recipient_did, actor_did, type, entity_type, entity_id, read, repo_id, issue_id, pull_id)
19 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
20 `
21
22 result, err := e.Exec(query,
23 notification.RecipientDid,
24 notification.ActorDid,
25 string(notification.Type),
26 notification.EntityType,
27 notification.EntityId,
28 notification.Read,
29 notification.RepoId,
30 notification.IssueId,
31 notification.PullId,
32 )
33 if err != nil {
34 return fmt.Errorf("failed to create notification: %w", err)
35 }
36
37 id, err := result.LastInsertId()
38 if err != nil {
39 return fmt.Errorf("failed to get notification ID: %w", err)
40 }
41
42 notification.ID = id
43 return nil
44}
45
46// GetNotificationsPaginated retrieves notifications with filters and pagination
47func GetNotificationsPaginated(e Execer, page pagination.Page, filters ...filter) ([]*models.Notification, error) {
48 var conditions []string
49 var args []any
50
51 for _, filter := range filters {
52 conditions = append(conditions, filter.Condition())
53 args = append(args, filter.Arg()...)
54 }
55
56 whereClause := ""
57 if len(conditions) > 0 {
58 whereClause = "WHERE " + conditions[0]
59 for _, condition := range conditions[1:] {
60 whereClause += " AND " + condition
61 }
62 }
63 pageClause := ""
64 if page.Limit > 0 {
65 pageClause = " limit ? offset ? "
66 args = append(args, page.Limit, page.Offset)
67 }
68
69 query := fmt.Sprintf(`
70 select id, recipient_did, actor_did, type, entity_type, entity_id, read, created, repo_id, issue_id, pull_id
71 from notifications
72 %s
73 order by created desc
74 %s
75 `, whereClause, pageClause)
76
77 rows, err := e.QueryContext(context.Background(), query, args...)
78 if err != nil {
79 return nil, fmt.Errorf("failed to query notifications: %w", err)
80 }
81 defer rows.Close()
82
83 var notifications []*models.Notification
84 for rows.Next() {
85 var n models.Notification
86 var typeStr string
87 var createdStr string
88 err := rows.Scan(
89 &n.ID,
90 &n.RecipientDid,
91 &n.ActorDid,
92 &typeStr,
93 &n.EntityType,
94 &n.EntityId,
95 &n.Read,
96 &createdStr,
97 &n.RepoId,
98 &n.IssueId,
99 &n.PullId,
100 )
101 if err != nil {
102 return nil, fmt.Errorf("failed to scan notification: %w", err)
103 }
104 n.Type = models.NotificationType(typeStr)
105 n.Created, err = time.Parse(time.RFC3339, createdStr)
106 if err != nil {
107 return nil, fmt.Errorf("failed to parse created timestamp: %w", err)
108 }
109 notifications = append(notifications, &n)
110 }
111
112 return notifications, nil
113}
114
115// GetNotificationsWithEntities retrieves notifications with their related entities
116func GetNotificationsWithEntities(e Execer, page pagination.Page, filters ...filter) ([]*models.NotificationWithEntity, error) {
117 var conditions []string
118 var args []any
119
120 for _, filter := range filters {
121 conditions = append(conditions, filter.Condition())
122 args = append(args, filter.Arg()...)
123 }
124
125 whereClause := ""
126 if len(conditions) > 0 {
127 whereClause = "WHERE " + conditions[0]
128 for _, condition := range conditions[1:] {
129 whereClause += " AND " + condition
130 }
131 }
132
133 query := fmt.Sprintf(`
134 select
135 n.id, n.recipient_did, n.actor_did, n.type, n.entity_type, n.entity_id,
136 n.read, n.created, n.repo_id, n.issue_id, n.pull_id,
137 r.id as r_id, r.did as r_did, r.name as r_name, r.description as r_description,
138 i.id as i_id, i.did as i_did, i.issue_id as i_issue_id, i.title as i_title, i.open as i_open,
139 p.id as p_id, p.owner_did as p_owner_did, p.pull_id as p_pull_id, p.title as p_title, p.state as p_state
140 from notifications n
141 left join repos r on n.repo_id = r.id
142 left join issues i on n.issue_id = i.id
143 left join pulls p on n.pull_id = p.id
144 %s
145 order by n.created desc
146 limit ? offset ?
147 `, whereClause)
148
149 args = append(args, page.Limit, page.Offset)
150
151 rows, err := e.QueryContext(context.Background(), query, args...)
152 if err != nil {
153 return nil, fmt.Errorf("failed to query notifications with entities: %w", err)
154 }
155 defer rows.Close()
156
157 var notifications []*models.NotificationWithEntity
158 for rows.Next() {
159 var n models.Notification
160 var typeStr string
161 var createdStr string
162 var repo models.Repo
163 var issue models.Issue
164 var pull models.Pull
165 var rId, iId, pId sql.NullInt64
166 var rDid, rName, rDescription sql.NullString
167 var iDid sql.NullString
168 var iIssueId sql.NullInt64
169 var iTitle sql.NullString
170 var iOpen sql.NullBool
171 var pOwnerDid sql.NullString
172 var pPullId sql.NullInt64
173 var pTitle sql.NullString
174 var pState sql.NullInt64
175
176 err := rows.Scan(
177 &n.ID, &n.RecipientDid, &n.ActorDid, &typeStr, &n.EntityType, &n.EntityId,
178 &n.Read, &createdStr, &n.RepoId, &n.IssueId, &n.PullId,
179 &rId, &rDid, &rName, &rDescription,
180 &iId, &iDid, &iIssueId, &iTitle, &iOpen,
181 &pId, &pOwnerDid, &pPullId, &pTitle, &pState,
182 )
183 if err != nil {
184 return nil, fmt.Errorf("failed to scan notification with entities: %w", err)
185 }
186
187 n.Type = models.NotificationType(typeStr)
188 n.Created, err = time.Parse(time.RFC3339, createdStr)
189 if err != nil {
190 return nil, fmt.Errorf("failed to parse created timestamp: %w", err)
191 }
192
193 nwe := &models.NotificationWithEntity{Notification: &n}
194
195 // populate repo if present
196 if rId.Valid {
197 repo.Id = rId.Int64
198 if rDid.Valid {
199 repo.Did = rDid.String
200 }
201 if rName.Valid {
202 repo.Name = rName.String
203 }
204 if rDescription.Valid {
205 repo.Description = rDescription.String
206 }
207 nwe.Repo = &repo
208 }
209
210 // populate issue if present
211 if iId.Valid {
212 issue.Id = iId.Int64
213 if iDid.Valid {
214 issue.Did = iDid.String
215 }
216 if iIssueId.Valid {
217 issue.IssueId = int(iIssueId.Int64)
218 }
219 if iTitle.Valid {
220 issue.Title = iTitle.String
221 }
222 if iOpen.Valid {
223 issue.Open = iOpen.Bool
224 }
225 nwe.Issue = &issue
226 }
227
228 // populate pull if present
229 if pId.Valid {
230 pull.ID = int(pId.Int64)
231 if pOwnerDid.Valid {
232 pull.OwnerDid = pOwnerDid.String
233 }
234 if pPullId.Valid {
235 pull.PullId = int(pPullId.Int64)
236 }
237 if pTitle.Valid {
238 pull.Title = pTitle.String
239 }
240 if pState.Valid {
241 pull.State = models.PullState(pState.Int64)
242 }
243 nwe.Pull = &pull
244 }
245
246 notifications = append(notifications, nwe)
247 }
248
249 return notifications, nil
250}
251
252// GetNotifications retrieves notifications with filters
253func GetNotifications(e Execer, filters ...filter) ([]*models.Notification, error) {
254 return GetNotificationsPaginated(e, pagination.FirstPage(), filters...)
255}
256
257func CountNotifications(e Execer, filters ...filter) (int64, error) {
258 var conditions []string
259 var args []any
260 for _, filter := range filters {
261 conditions = append(conditions, filter.Condition())
262 args = append(args, filter.Arg()...)
263 }
264
265 whereClause := ""
266 if conditions != nil {
267 whereClause = " where " + strings.Join(conditions, " and ")
268 }
269
270 query := fmt.Sprintf(`select count(1) from notifications %s`, whereClause)
271 var count int64
272 err := e.QueryRow(query, args...).Scan(&count)
273
274 if !errors.Is(err, sql.ErrNoRows) && err != nil {
275 return 0, err
276 }
277
278 return count, nil
279}
280
281func MarkNotificationRead(e Execer, notificationID int64, userDID string) error {
282 idFilter := FilterEq("id", notificationID)
283 recipientFilter := FilterEq("recipient_did", userDID)
284
285 query := fmt.Sprintf(`
286 UPDATE notifications
287 SET read = 1
288 WHERE %s AND %s
289 `, idFilter.Condition(), recipientFilter.Condition())
290
291 args := append(idFilter.Arg(), recipientFilter.Arg()...)
292
293 result, err := e.Exec(query, args...)
294 if err != nil {
295 return fmt.Errorf("failed to mark notification as read: %w", err)
296 }
297
298 rowsAffected, err := result.RowsAffected()
299 if err != nil {
300 return fmt.Errorf("failed to get rows affected: %w", err)
301 }
302
303 if rowsAffected == 0 {
304 return fmt.Errorf("notification not found or access denied")
305 }
306
307 return nil
308}
309
310func MarkAllNotificationsRead(e Execer, userDID string) error {
311 recipientFilter := FilterEq("recipient_did", userDID)
312 readFilter := FilterEq("read", 0)
313
314 query := fmt.Sprintf(`
315 UPDATE notifications
316 SET read = 1
317 WHERE %s AND %s
318 `, recipientFilter.Condition(), readFilter.Condition())
319
320 args := append(recipientFilter.Arg(), readFilter.Arg()...)
321
322 _, err := e.Exec(query, args...)
323 if err != nil {
324 return fmt.Errorf("failed to mark all notifications as read: %w", err)
325 }
326
327 return nil
328}
329
330func DeleteNotification(e Execer, notificationID int64, userDID string) error {
331 idFilter := FilterEq("id", notificationID)
332 recipientFilter := FilterEq("recipient_did", userDID)
333
334 query := fmt.Sprintf(`
335 DELETE FROM notifications
336 WHERE %s AND %s
337 `, idFilter.Condition(), recipientFilter.Condition())
338
339 args := append(idFilter.Arg(), recipientFilter.Arg()...)
340
341 result, err := e.Exec(query, args...)
342 if err != nil {
343 return fmt.Errorf("failed to delete notification: %w", err)
344 }
345
346 rowsAffected, err := result.RowsAffected()
347 if err != nil {
348 return fmt.Errorf("failed to get rows affected: %w", err)
349 }
350
351 if rowsAffected == 0 {
352 return fmt.Errorf("notification not found or access denied")
353 }
354
355 return nil
356}
357
358func GetNotificationPreference(e Execer, userDid string) (*models.NotificationPreferences, error) {
359 prefs, err := GetNotificationPreferences(e, FilterEq("user_did", userDid))
360 if err != nil {
361 return nil, err
362 }
363
364 p, ok := prefs[syntax.DID(userDid)]
365 if !ok {
366 return models.DefaultNotificationPreferences(syntax.DID(userDid)), nil
367 }
368
369 return p, nil
370}
371
372func GetNotificationPreferences(e Execer, filters ...filter) (map[syntax.DID]*models.NotificationPreferences, error) {
373 prefsMap := make(map[syntax.DID]*models.NotificationPreferences)
374
375 var conditions []string
376 var args []any
377 for _, filter := range filters {
378 conditions = append(conditions, filter.Condition())
379 args = append(args, filter.Arg()...)
380 }
381
382 whereClause := ""
383 if conditions != nil {
384 whereClause = " where " + strings.Join(conditions, " and ")
385 }
386
387 query := fmt.Sprintf(`
388 select
389 id,
390 user_did,
391 repo_starred,
392 issue_created,
393 issue_commented,
394 pull_created,
395 pull_commented,
396 followed,
397 pull_merged,
398 issue_closed,
399 email_notifications
400 from
401 notification_preferences
402 %s
403 `, whereClause)
404
405 rows, err := e.Query(query, args...)
406 if err != nil {
407 return nil, err
408 }
409 defer rows.Close()
410
411 for rows.Next() {
412 var prefs models.NotificationPreferences
413 if err := rows.Scan(
414 &prefs.ID,
415 &prefs.UserDid,
416 &prefs.RepoStarred,
417 &prefs.IssueCreated,
418 &prefs.IssueCommented,
419 &prefs.PullCreated,
420 &prefs.PullCommented,
421 &prefs.Followed,
422 &prefs.PullMerged,
423 &prefs.IssueClosed,
424 &prefs.EmailNotifications,
425 ); err != nil {
426 return nil, err
427 }
428
429 prefsMap[prefs.UserDid] = &prefs
430 }
431
432 if err := rows.Err(); err != nil {
433 return nil, err
434 }
435
436 return prefsMap, nil
437}
438
439func (d *DB) UpdateNotificationPreferences(ctx context.Context, prefs *models.NotificationPreferences) error {
440 query := `
441 INSERT OR REPLACE INTO notification_preferences
442 (user_did, repo_starred, issue_created, issue_commented, pull_created,
443 pull_commented, followed, pull_merged, issue_closed, email_notifications)
444 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
445 `
446
447 result, err := d.DB.ExecContext(ctx, query,
448 prefs.UserDid,
449 prefs.RepoStarred,
450 prefs.IssueCreated,
451 prefs.IssueCommented,
452 prefs.PullCreated,
453 prefs.PullCommented,
454 prefs.Followed,
455 prefs.PullMerged,
456 prefs.IssueClosed,
457 prefs.EmailNotifications,
458 )
459 if err != nil {
460 return fmt.Errorf("failed to update notification preferences: %w", err)
461 }
462
463 if prefs.ID == 0 {
464 id, err := result.LastInsertId()
465 if err != nil {
466 return fmt.Errorf("failed to get preferences ID: %w", err)
467 }
468 prefs.ID = id
469 }
470
471 return nil
472}
473
474func (d *DB) ClearOldNotifications(ctx context.Context, olderThan time.Duration) error {
475 cutoff := time.Now().Add(-olderThan)
476 createdFilter := FilterLte("created", cutoff)
477
478 query := fmt.Sprintf(`
479 DELETE FROM notifications
480 WHERE %s
481 `, createdFilter.Condition())
482
483 _, err := d.DB.ExecContext(ctx, query, createdFilter.Arg()...)
484 if err != nil {
485 return fmt.Errorf("failed to cleanup old notifications: %w", err)
486 }
487
488 return nil
489}