package main import ( "database/sql" "fmt" "log" "sync" "time" // SQLite3 Database Driver _ "modernc.org/sqlite" ) const DB_DRIVER = "sqlite" const DB_PATH_DEFAULT = "feed.sqlite" type ManagedDatabaseConnection struct { db *sql.DB mutex *sync.Mutex } func NewDatabaseConnection() (*ManagedDatabaseConnection, error) { env := GetEnvironmentVariables() envDBName, isFound := env["FEEDGEN_SQLITE_LOCATION"] if !isFound { envDBName = DB_PATH_DEFAULT } log.Println(fmt.Sprintf("Opening database at %s", envDBName)) db, err := sql.Open(DB_DRIVER, envDBName) if err != nil { log.Fatalf("failed to open database connection", err) return nil, err } mutex := &sync.Mutex{} driver := ManagedDatabaseConnection{ db, mutex, } return &driver, nil } func (mdbc *ManagedDatabaseConnection) ensureMigration(desc string, migration func() error) error { // If you didn't grab the lock before running this, // you're going to have a bad time // If you try to grab the lock within this method, // you're going to have a bad time rows, err := mdbc.db.Query( "SELECT runAt FROM migration WHERE description = ?", desc, ) if err != nil { return err } var runAt string = "" for rows.Next() { err := rows.Scan(&runAt) if err != nil { return err } } if runAt != "" { log.Println(fmt.Sprintf("migration %s already ran at %s", desc, runAt)) return nil } err = migration() if err != nil { return err } now := time.Now().UTC().Format(time.RFC3339) _, err = mdbc.db.Exec( "INSERT INTO migration (description, runAt) VALUES (?, ?)", desc, now, ) if err != nil { return err } log.Println(fmt.Sprintf("migration %s run at %s", desc, now)) return nil } func (mdbc *ManagedDatabaseConnection) EnsureMigrations() error { /* sqlite> .schema post CREATE TABLE IF NOT EXISTS "post" ("uri" varchar primary key, "cid" varchar not null, "indexedAt" varchar not null); */ mdbc.mutex.Lock() defer mdbc.mutex.Unlock() _, err := mdbc.db.Exec( `CREATE TABLE IF NOT EXISTS "migration" ("description" varchar primary key, "runAt" varchar not null);`, ) if err != nil { return err } err = mdbc.ensureMigration("create posts table", func() error { _, err = mdbc.db.Exec( `CREATE TABLE IF NOT EXISTS "post" ("uri" varchar primary key, "cid" varchar not null, "indexedAt" varchar not null);`, ) return err }) if err != nil { return err } err = mdbc.ensureMigration("add topic to posts table", func() error { _, err = mdbc.db.Exec( `ALTER TABLE post RENAME TO post_old;`, ) if err != nil { return err } _, err = mdbc.db.Exec( `CREATE TABLE IF NOT EXISTS "post" ("uri" varchar primary key, "cid" varchar not null, "topic" varchar not null, "indexedAt" varchar not null);`, ) if err != nil { return err } _, err = mdbc.db.Exec( `INSERT INTO post (uri, cid, indexedAt, topic) SELECT uri, cid, indexedAt, 'coffee' FROM post_old;`, ) if err != nil { return err } _, err = mdbc.db.Exec( `DROP TABLE post_old;`, ) return err }) if err != nil { return err } return err } func (mdbc *ManagedDatabaseConnection) WriteToPostTable(payload *DatabasePost) error { /* sqlite> INSERT INTO post (uri, cid, indexedAt) VALUES ('test', 'test', '2025-05-24T16:33:53.960Z'); sqlite> sqlite> SELECT * FROM post WHERE uri = 'test'; test|test|2025-05-24T16:33:53.960Z */ mdbc.mutex.Lock() defer mdbc.mutex.Unlock() stmt, err := mdbc.db.Prepare("INSERT INTO post (uri, cid, topic, indexedAt) VALUES (?, ?, ?, ?)") if err != nil { return err } defer stmt.Close() _, err = stmt.Exec(payload.Uri, payload.Cid, payload.Topic, payload.IndexedAt) return err } func (mdbc *ManagedDatabaseConnection) StreamPostsFromChannel(postsChannel chan *DatabasePost) { defer close(postsChannel) for { select { case post := <-postsChannel: err := mdbc.WriteToPostTable(post) if err != nil { log.Fatalf("failed to write post: ", err) } } } } func (mdbc *ManagedDatabaseConnection) ReadPostsFromDatabase(topic string, indexedBefore time.Time, limit int) (*[]DatabasePost, error) { /* SELECT uri, cid, indexedAt FROM post WHERE indexedAt < '2026' ORDER BY indexedAt DESC, cid DESC; */ mdbc.mutex.Lock() defer mdbc.mutex.Unlock() rows, err := mdbc.db.Query( "SELECT uri, cid, indexedAt FROM post WHERE topic = ? AND indexedAt < ? ORDER BY indexedAt DESC, cid DESC LIMIT ?", topic, indexedBefore.Format(time.RFC3339), limit, ) if err != nil { return nil, err } posts := make([]DatabasePost, 0) for rows.Next() { var post DatabasePost err := rows.Scan(&post.Uri, &post.Cid, &post.IndexedAt) if err != nil { return nil, err } posts = append(posts, post) } return &posts, nil }