Feed generator written in Golang
1package main
2
3import (
4 "database/sql"
5 "fmt"
6 "log"
7 "sync"
8 "time"
9
10 // SQLite3 Database Driver
11 _ "modernc.org/sqlite"
12)
13
14const DB_DRIVER = "sqlite"
15const DB_PATH_DEFAULT = "feed.sqlite"
16
17type ManagedDatabaseConnection struct {
18 db *sql.DB
19 mutex *sync.Mutex
20}
21
22func NewDatabaseConnection() (*ManagedDatabaseConnection, error) {
23 env := GetEnvironmentVariables()
24 envDBName, isFound := env["FEEDGEN_SQLITE_LOCATION"]
25 if !isFound {
26 envDBName = DB_PATH_DEFAULT
27 }
28
29 log.Println(fmt.Sprintf("Opening database at %s", envDBName))
30 db, err := sql.Open(DB_DRIVER, envDBName)
31 if err != nil {
32 log.Fatalf("failed to open database connection", err)
33 return nil, err
34 }
35
36 mutex := &sync.Mutex{}
37
38 driver := ManagedDatabaseConnection{
39 db,
40 mutex,
41 }
42
43 return &driver, nil
44}
45
46func (mdbc *ManagedDatabaseConnection) ensureMigration(desc string, migration func() error) error {
47 // If you didn't grab the lock before running this,
48 // you're going to have a bad time
49
50 // If you try to grab the lock within this method,
51 // you're going to have a bad time
52
53 rows, err := mdbc.db.Query(
54 "SELECT runAt FROM migration WHERE description = ?",
55 desc,
56 )
57
58 if err != nil {
59 return err
60 }
61
62 var runAt string = ""
63 for rows.Next() {
64 err := rows.Scan(&runAt)
65 if err != nil {
66 return err
67 }
68 }
69
70 if runAt != "" {
71 log.Println(fmt.Sprintf("migration %s already ran at %s", desc, runAt))
72 return nil
73 }
74
75 err = migration()
76 if err != nil {
77 return err
78 }
79
80 now := time.Now().UTC().Format(time.RFC3339)
81 _, err = mdbc.db.Exec(
82 "INSERT INTO migration (description, runAt) VALUES (?, ?)",
83 desc,
84 now,
85 )
86 if err != nil {
87 return err
88 }
89
90 log.Println(fmt.Sprintf("migration %s run at %s", desc, now))
91
92 return nil
93}
94
95func (mdbc *ManagedDatabaseConnection) EnsureMigrations() error {
96 /*
97 sqlite> .schema post
98 CREATE TABLE IF NOT EXISTS "post" ("uri" varchar primary key, "cid" varchar not null, "indexedAt" varchar not null);
99 */
100
101 mdbc.mutex.Lock()
102 defer mdbc.mutex.Unlock()
103
104 _, err := mdbc.db.Exec(
105 `CREATE TABLE IF NOT EXISTS "migration" ("description" varchar primary key, "runAt" varchar not null);`,
106 )
107 if err != nil {
108 return err
109 }
110
111 err = mdbc.ensureMigration("create posts table", func() error {
112 _, err = mdbc.db.Exec(
113 `CREATE TABLE IF NOT EXISTS "post" ("uri" varchar primary key, "cid" varchar not null, "indexedAt" varchar not null);`,
114 )
115 return err
116 })
117 if err != nil {
118 return err
119 }
120
121 err = mdbc.ensureMigration("add topic to posts table", func() error {
122 _, err = mdbc.db.Exec(
123 `ALTER TABLE post RENAME TO post_old;`,
124 )
125 if err != nil {
126 return err
127 }
128 _, err = mdbc.db.Exec(
129 `CREATE TABLE IF NOT EXISTS "post" ("uri" varchar primary key, "cid" varchar not null, "topic" varchar not null, "indexedAt" varchar not null);`,
130 )
131 if err != nil {
132 return err
133 }
134 _, err = mdbc.db.Exec(
135 `INSERT INTO post (uri, cid, indexedAt, topic) SELECT uri, cid, indexedAt, 'coffee' FROM post_old;`,
136 )
137 if err != nil {
138 return err
139 }
140
141 _, err = mdbc.db.Exec(
142 `DROP TABLE post_old;`,
143 )
144 return err
145 })
146 if err != nil {
147 return err
148 }
149
150 return err
151}
152
153func (mdbc *ManagedDatabaseConnection) WriteToPostTable(payload *DatabasePost) error {
154 /*
155 sqlite> INSERT INTO post (uri, cid, indexedAt) VALUES ('test', 'test', '2025-05-24T16:33:53.960Z');
156 sqlite>
157 sqlite> SELECT * FROM post WHERE uri = 'test';
158 test|test|2025-05-24T16:33:53.960Z
159 */
160
161 mdbc.mutex.Lock()
162 defer mdbc.mutex.Unlock()
163
164 stmt, err := mdbc.db.Prepare("INSERT INTO post (uri, cid, topic, indexedAt) VALUES (?, ?, ?, ?)")
165 if err != nil {
166 return err
167 }
168 defer stmt.Close()
169
170 _, err = stmt.Exec(payload.Uri, payload.Cid, payload.Topic, payload.IndexedAt)
171
172 return err
173}
174
175func (mdbc *ManagedDatabaseConnection) StreamPostsFromChannel(postsChannel chan *DatabasePost) {
176 defer close(postsChannel)
177 for {
178 select {
179 case post := <-postsChannel:
180 err := mdbc.WriteToPostTable(post)
181 if err != nil {
182 log.Fatalf("failed to write post: ", err)
183 }
184 }
185 }
186}
187
188func (mdbc *ManagedDatabaseConnection) ReadPostsFromDatabase(topic string, indexedBefore time.Time, limit int) (*[]DatabasePost, error) {
189 /*
190 SELECT uri, cid, indexedAt FROM post WHERE indexedAt < '2026' ORDER BY indexedAt DESC, cid DESC;
191 */
192
193 mdbc.mutex.Lock()
194 defer mdbc.mutex.Unlock()
195
196 rows, err := mdbc.db.Query(
197 "SELECT uri, cid, indexedAt FROM post WHERE topic = ? AND indexedAt < ? ORDER BY indexedAt DESC, cid DESC LIMIT ?",
198 topic,
199 indexedBefore.Format(time.RFC3339),
200 limit,
201 )
202
203 if err != nil {
204 return nil, err
205 }
206
207 posts := make([]DatabasePost, 0)
208 for rows.Next() {
209 var post DatabasePost
210 err := rows.Scan(&post.Uri, &post.Cid, &post.IndexedAt)
211 if err != nil {
212 return nil, err
213 }
214
215 posts = append(posts, post)
216 }
217
218 return &posts, nil
219}