Feed generator written in Golang
at main 4.8 kB view raw
1package main 2 3import ( 4 "database/sql" 5 "fmt" 6 "log" 7 "sync" 8 "time" 9 10 // SQLite3 Database Driver 11 _ "modernc.org/sqlite" 12) 13 14const DB_DRIVER = "sqlite" 15const DB_PATH_DEFAULT = "feed.sqlite" 16 17type ManagedDatabaseConnection struct { 18 db *sql.DB 19 mutex *sync.Mutex 20} 21 22func NewDatabaseConnection() (*ManagedDatabaseConnection, error) { 23 env := GetEnvironmentVariables() 24 envDBName, isFound := env["FEEDGEN_SQLITE_LOCATION"] 25 if !isFound { 26 envDBName = DB_PATH_DEFAULT 27 } 28 29 log.Println(fmt.Sprintf("Opening database at %s", envDBName)) 30 db, err := sql.Open(DB_DRIVER, envDBName) 31 if err != nil { 32 log.Fatalf("failed to open database connection", err) 33 return nil, err 34 } 35 36 mutex := &sync.Mutex{} 37 38 driver := ManagedDatabaseConnection{ 39 db, 40 mutex, 41 } 42 43 return &driver, nil 44} 45 46func (mdbc *ManagedDatabaseConnection) ensureMigration(desc string, migration func() error) error { 47 // If you didn't grab the lock before running this, 48 // you're going to have a bad time 49 50 // If you try to grab the lock within this method, 51 // you're going to have a bad time 52 53 rows, err := mdbc.db.Query( 54 "SELECT runAt FROM migration WHERE description = ?", 55 desc, 56 ) 57 58 if err != nil { 59 return err 60 } 61 62 var runAt string = "" 63 for rows.Next() { 64 err := rows.Scan(&runAt) 65 if err != nil { 66 return err 67 } 68 } 69 70 if runAt != "" { 71 log.Println(fmt.Sprintf("migration %s already ran at %s", desc, runAt)) 72 return nil 73 } 74 75 err = migration() 76 if err != nil { 77 return err 78 } 79 80 now := time.Now().UTC().Format(time.RFC3339) 81 _, err = mdbc.db.Exec( 82 "INSERT INTO migration (description, runAt) VALUES (?, ?)", 83 desc, 84 now, 85 ) 86 if err != nil { 87 return err 88 } 89 90 log.Println(fmt.Sprintf("migration %s run at %s", desc, now)) 91 92 return nil 93} 94 95func (mdbc *ManagedDatabaseConnection) EnsureMigrations() error { 96 /* 97 sqlite> .schema post 98 CREATE TABLE IF NOT EXISTS "post" ("uri" varchar primary key, "cid" varchar not null, "indexedAt" varchar not null); 99 */ 100 101 mdbc.mutex.Lock() 102 defer mdbc.mutex.Unlock() 103 104 _, err := mdbc.db.Exec( 105 `CREATE TABLE IF NOT EXISTS "migration" ("description" varchar primary key, "runAt" varchar not null);`, 106 ) 107 if err != nil { 108 return err 109 } 110 111 err = mdbc.ensureMigration("create posts table", func() error { 112 _, err = mdbc.db.Exec( 113 `CREATE TABLE IF NOT EXISTS "post" ("uri" varchar primary key, "cid" varchar not null, "indexedAt" varchar not null);`, 114 ) 115 return err 116 }) 117 if err != nil { 118 return err 119 } 120 121 err = mdbc.ensureMigration("add topic to posts table", func() error { 122 _, err = mdbc.db.Exec( 123 `ALTER TABLE post RENAME TO post_old;`, 124 ) 125 if err != nil { 126 return err 127 } 128 _, err = mdbc.db.Exec( 129 `CREATE TABLE IF NOT EXISTS "post" ("uri" varchar primary key, "cid" varchar not null, "topic" varchar not null, "indexedAt" varchar not null);`, 130 ) 131 if err != nil { 132 return err 133 } 134 _, err = mdbc.db.Exec( 135 `INSERT INTO post (uri, cid, indexedAt, topic) SELECT uri, cid, indexedAt, 'coffee' FROM post_old;`, 136 ) 137 if err != nil { 138 return err 139 } 140 141 _, err = mdbc.db.Exec( 142 `DROP TABLE post_old;`, 143 ) 144 return err 145 }) 146 if err != nil { 147 return err 148 } 149 150 return err 151} 152 153func (mdbc *ManagedDatabaseConnection) WriteToPostTable(payload *DatabasePost) error { 154 /* 155 sqlite> INSERT INTO post (uri, cid, indexedAt) VALUES ('test', 'test', '2025-05-24T16:33:53.960Z'); 156 sqlite> 157 sqlite> SELECT * FROM post WHERE uri = 'test'; 158 test|test|2025-05-24T16:33:53.960Z 159 */ 160 161 mdbc.mutex.Lock() 162 defer mdbc.mutex.Unlock() 163 164 stmt, err := mdbc.db.Prepare("INSERT INTO post (uri, cid, topic, indexedAt) VALUES (?, ?, ?, ?)") 165 if err != nil { 166 return err 167 } 168 defer stmt.Close() 169 170 _, err = stmt.Exec(payload.Uri, payload.Cid, payload.Topic, payload.IndexedAt) 171 172 return err 173} 174 175func (mdbc *ManagedDatabaseConnection) StreamPostsFromChannel(postsChannel chan *DatabasePost) { 176 defer close(postsChannel) 177 for { 178 select { 179 case post := <-postsChannel: 180 err := mdbc.WriteToPostTable(post) 181 if err != nil { 182 log.Fatalf("failed to write post: ", err) 183 } 184 } 185 } 186} 187 188func (mdbc *ManagedDatabaseConnection) ReadPostsFromDatabase(topic string, indexedBefore time.Time, limit int) (*[]DatabasePost, error) { 189 /* 190 SELECT uri, cid, indexedAt FROM post WHERE indexedAt < '2026' ORDER BY indexedAt DESC, cid DESC; 191 */ 192 193 mdbc.mutex.Lock() 194 defer mdbc.mutex.Unlock() 195 196 rows, err := mdbc.db.Query( 197 "SELECT uri, cid, indexedAt FROM post WHERE topic = ? AND indexedAt < ? ORDER BY indexedAt DESC, cid DESC LIMIT ?", 198 topic, 199 indexedBefore.Format(time.RFC3339), 200 limit, 201 ) 202 203 if err != nil { 204 return nil, err 205 } 206 207 posts := make([]DatabasePost, 0) 208 for rows.Next() { 209 var post DatabasePost 210 err := rows.Scan(&post.Uri, &post.Cid, &post.IndexedAt) 211 if err != nil { 212 return nil, err 213 } 214 215 posts = append(posts, post) 216 } 217 218 return &posts, nil 219}