this repo has no description

Compare changes

Choose any two refs to compare.

+1
Dockerfile
···
RUN go mod download
COPY . .
+
#compiling for Pi at the moment
RUN CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=5 go build -a -installsuffix cgo -o tangled-alert-bot ./cmd/.
FROM alpine:latest
+93 -1
cmd/main.go
···
import (
"context"
+
"encoding/json"
"errors"
"fmt"
"log"
"log/slog"
+
"net/http"
"os"
"os/signal"
"path"
"syscall"
+
"time"
tangledalertbot "tangled.sh/willdot.net/tangled-alert-bot"
"github.com/avast/retry-go/v4"
+
"github.com/bugsnag/bugsnag-go"
"github.com/joho/godotenv"
)
···
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
+
bugsnag.Configure(bugsnag.Configuration{
+
APIKey: os.Getenv("BUGSNAG"),
+
})
+
dbPath := os.Getenv("DATABASE_PATH")
if dbPath == "" {
dbPath = "./"
···
}
defer database.Close()
+
dmService, err := tangledalertbot.NewDmService(database, time.Second*30)
+
if err != nil {
+
return fmt.Errorf("create dm service: %w", err)
+
}
+
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go consumeLoop(ctx, database)
+
+
go startHttpServer(ctx, database)
+
+
go dmService.Start(ctx)
<-signals
cancel()
···
if errors.Is(err, context.Canceled) {
return nil
}
-
slog.Error("consume loop", "error", err)
return err
}
return nil
···
slog.Warn("exiting consume loop")
}
+
+
func startHttpServer(ctx context.Context, db *tangledalertbot.Database) {
+
srv := server{
+
db: db,
+
}
+
mux := http.NewServeMux()
+
mux.HandleFunc("/issues", srv.handleListIssues)
+
mux.HandleFunc("/comments", srv.handleListComments)
+
mux.HandleFunc("/users", srv.handleListUsers)
+
+
err := http.ListenAndServe(":3000", mux)
+
if err != nil {
+
slog.Error("http listen and serve", "error", err)
+
}
+
}
+
+
type server struct {
+
db *tangledalertbot.Database
+
}
+
+
func (s *server) handleListIssues(w http.ResponseWriter, r *http.Request) {
+
issues, err := s.db.GetIssues()
+
if err != nil {
+
slog.Error("getting issues from DB", "error", err)
+
http.Error(w, "error getting issues from DB", http.StatusInternalServerError)
+
return
+
}
+
+
b, err := json.Marshal(issues)
+
if err != nil {
+
slog.Error("marshalling issues from DB", "error", err)
+
http.Error(w, "marshalling issues from DB", http.StatusInternalServerError)
+
return
+
}
+
+
w.Header().Set("Content-Type", "application/json")
+
w.Write(b)
+
}
+
+
func (s *server) handleListComments(w http.ResponseWriter, r *http.Request) {
+
comments, err := s.db.GetComments()
+
if err != nil {
+
slog.Error("getting comments from DB", "error", err)
+
http.Error(w, "error getting comments from DB", http.StatusInternalServerError)
+
return
+
}
+
+
b, err := json.Marshal(comments)
+
if err != nil {
+
slog.Error("marshalling comments from DB", "error", err)
+
http.Error(w, "marshalling comments from DB", http.StatusInternalServerError)
+
return
+
}
+
+
w.Header().Set("Content-Type", "application/json")
+
w.Write(b)
+
}
+
+
func (s *server) handleListUsers(w http.ResponseWriter, r *http.Request) {
+
users, err := s.db.GetUsers()
+
if err != nil {
+
slog.Error("getting users from DB", "error", err)
+
http.Error(w, "error getting users from DB", http.StatusInternalServerError)
+
return
+
}
+
+
b, err := json.Marshal(users)
+
if err != nil {
+
slog.Error("marshalling users from DB", "error", err)
+
http.Error(w, "marshalling users from DB", http.StatusInternalServerError)
+
return
+
}
+
+
w.Header().Set("Content-Type", "application/json")
+
w.Write(b)
+
}
+137 -22
consumer.go
···
import (
"context"
"encoding/json"
+
"strings"
"fmt"
"log/slog"
···
"github.com/bluesky-social/jetstream/pkg/client"
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
"github.com/bluesky-social/jetstream/pkg/models"
-
"tangled.sh/tangled.sh/core/api/tangled"
+
"github.com/bugsnag/bugsnag-go"
+
"tangled.org/core/api/tangled"
)
type Issue struct {
···
RKey string `json:"rkey"`
Body string `json:"body"`
Issue string `json:"issue" `
-
ReplyTo string `json:"replyTo"`
CreatedAt int64 `json:"createdAt"`
}
type Store interface {
CreateIssue(issue Issue) error
CreateComment(comment Comment) error
+
DeleteIssue(did, rkey string) error
+
DeleteComment(did, rkey string) error
+
DeleteCommentsForIssue(issueURI string) error
+
GetUser(did string) (User, error)
+
CreateUser(user User) error
}
// JetstreamConsumer is responsible for consuming from a jetstream instance
···
}
switch event.Commit.Operation {
-
case models.CommitOperationCreate:
-
return h.handleCreateEvent(ctx, event)
-
// TODO: handle deletes too
+
case models.CommitOperationCreate, models.CommitOperationUpdate:
+
return h.handleCreateUpdateEvent(ctx, event)
+
case models.CommitOperationDelete:
+
return h.handleDeleteEvent(ctx, event)
default:
return nil
}
}
-
func (h *Handler) handleCreateEvent(ctx context.Context, event *models.Event) error {
+
func (h *Handler) handleCreateUpdateEvent(ctx context.Context, event *models.Event) error {
switch event.Commit.Collection {
case tangled.RepoIssueNSID:
-
h.handleIssueEvent(ctx, event)
+
h.handleCreateUpdateIssueEvent(ctx, event)
case tangled.RepoIssueCommentNSID:
-
h.handleIssueCommentEvent(ctx, event)
+
h.handleCreateUpdateIssueCommentEvent(ctx, event)
default:
slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
return nil
···
return nil
}
-
func (h *Handler) handleIssueEvent(ctx context.Context, event *models.Event) {
+
func (h *Handler) handleDeleteEvent(ctx context.Context, event *models.Event) error {
+
switch event.Commit.Collection {
+
case tangled.RepoIssueNSID:
+
h.handleDeleteIssueEvent(ctx, event)
+
case tangled.RepoIssueCommentNSID:
+
h.handleDeleteIssueCommentEvent(ctx, event)
+
default:
+
slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
+
return nil
+
}
+
+
return nil
+
}
+
+
func (h *Handler) handleCreateUpdateIssueEvent(ctx context.Context, event *models.Event) {
var issue tangled.RepoIssue
err := json.Unmarshal(event.Commit.Record, &issue)
if err != nil {
+
bugsnag.Notify(err)
slog.Error("error unmarshalling event record to issue", "error", err)
return
}
···
createdAt, err := time.Parse(time.RFC3339, issue.CreatedAt)
if err != nil {
+
bugsnag.Notify(err)
slog.Error("parsing createdAt time from issue", "error", err, "timestamp", issue.CreatedAt)
createdAt = time.Now().UTC()
}
body := ""
if issue.Body != nil {
-
body = *&body
+
body = *issue.Body
}
err = h.store.CreateIssue(Issue{
AuthorDID: did,
···
Repo: issue.Repo,
})
if err != nil {
+
bugsnag.Notify(err)
slog.Error("create issue", "error", err, "did", did, "rkey", rkey)
return
}
slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey)
}
-
func (h *Handler) handleIssueCommentEvent(ctx context.Context, event *models.Event) {
+
func (h *Handler) handleCreateUpdateIssueCommentEvent(ctx context.Context, event *models.Event) {
var comment tangled.RepoIssueComment
err := json.Unmarshal(event.Commit.Record, &comment)
if err != nil {
+
bugsnag.Notify(err)
slog.Error("error unmarshalling event record to comment", "error", err)
return
}
···
createdAt, err := time.Parse(time.RFC3339, comment.CreatedAt)
if err != nil {
+
bugsnag.Notify(err)
slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt)
createdAt = time.Now().UTC()
}
-
err = h.store.CreateComment(Comment{
-
AuthorDID: did,
-
RKey: rkey,
-
Body: comment.Body,
-
Issue: comment.Issue,
-
CreatedAt: createdAt.UnixMilli(),
-
//ReplyTo: comment, // TODO: there should be a ReplyTo field that can be used as well once the right type is imported
-
})
+
+
// if there is a replyTo present, don't store the comment because replies can't be replied to so
+
// the reply comment doesn't need to be stored
+
if comment.ReplyTo == nil || *comment.ReplyTo == "" {
+
err = h.store.CreateComment(Comment{
+
AuthorDID: did,
+
RKey: rkey,
+
Body: comment.Body,
+
Issue: comment.Issue,
+
CreatedAt: createdAt.UnixMilli(),
+
})
+
if err != nil {
+
bugsnag.Notify(err)
+
slog.Error("create comment", "error", err, "did", did, "rkey", rkey)
+
return
+
}
+
}
+
+
// TODO: now send a notification to either the issue creator or whoever the comment was a reply to
+
didToNotify := getUserToAlert(comment)
+
if didToNotify == "" {
+
slog.Info("could not work out did to send alert to", "comment", comment)
+
return
+
}
+
+
user, err := h.store.GetUser(didToNotify)
+
if err != nil {
+
slog.Error("getting user to send alert to", "error", err, "did", didToNotify)
+
return
+
}
+
+
slog.Info("sending alert to user", "value", comment, "did", didToNotify, "convo", user.ConvoID)
+
}
+
+
func (h *Handler) handleDeleteIssueEvent(ctx context.Context, event *models.Event) {
+
did := event.Did
+
rkey := event.Commit.RKey
+
+
err := h.store.DeleteIssue(did, rkey)
+
if err != nil {
+
bugsnag.Notify(err)
+
slog.Error("delete issue", "error", err, "did", did, "rkey", rkey)
+
return
+
}
+
+
// now attempt to delete any comments on that issue since they can't be replied to now.
+
// Note: if unsuccessful it doesn't matter because a deleted issue and its comments are
+
// not visible on the UI and so no one will be able to reply to them so this is just a
+
// cleanup operation
+
issueURI := fmt.Sprintf("at://%s/%s/%s", did, tangled.RepoIssueNSID, rkey)
+
err = h.store.DeleteCommentsForIssue(issueURI)
+
if err != nil {
+
bugsnag.Notify(err)
+
slog.Error("delete comments for issue", "error", err, "issue URI", issueURI)
+
}
+
+
slog.Info("deleted issue ", "did", did, "rkey", rkey)
+
}
+
+
func (h *Handler) handleDeleteIssueCommentEvent(ctx context.Context, event *models.Event) {
+
did := event.Did
+
rkey := event.Commit.RKey
+
+
err := h.store.DeleteComment(did, rkey)
if err != nil {
-
slog.Error("create comment", "error", err, "did", did, "rkey", rkey)
+
bugsnag.Notify(err)
+
slog.Error("delete comment", "error", err, "did", did, "rkey", rkey)
return
}
-
// TODO: now send a notification to either the issue creator or whoever the comment was a reply to
+
slog.Info("deleted comment ", "did", did, "rkey", rkey)
+
}
+
+
// at://did:plc:dadhhalkfcq3gucaq25hjqon/sh.tangled.repo.issue.comment/3lzkp4va62m22
+
func getUserToAlert(comment tangled.RepoIssueComment) string {
+
if comment.ReplyTo != nil {
+
return getDidFromCommentURI(*comment.ReplyTo)
+
}
+
return getDidFromIssueURI(comment.Issue)
+
}
+
+
func getDidFromCommentURI(uri string) string {
+
split := strings.Split(uri, tangled.RepoIssueCommentNSID)
+
if len(split) != 2 {
+
slog.Error("invalid comment URI received", "uri", uri)
+
return ""
+
}
+
+
did := strings.TrimPrefix(split[0], "at://")
+
did = strings.TrimSuffix(did, "/")
+
+
return did
+
}
+
+
func getDidFromIssueURI(uri string) string {
+
split := strings.Split(uri, tangled.RepoIssueNSID)
+
if len(split) != 2 {
+
slog.Error("invalid issue URI received", "uri", uri)
+
return ""
+
}
+
+
did := strings.TrimPrefix(split[0], "at://")
+
did = strings.TrimSuffix(did, "/")
-
slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey)
+
return did
}
+156 -4
database.go
···
return nil, fmt.Errorf("creating comments table: %w", err)
}
+
err = createUsersTable(db)
+
if err != nil {
+
return nil, fmt.Errorf("creating users table: %w", err)
+
}
+
return &Database{db: db}, nil
}
···
"rkey" TEXT,
"body" TEXT,
"issue" TEXT,
-
"replyTo" TEXT,
"createdAt" integer NOT NULL,
UNIQUE(authorDid,rkey)
);`
···
return nil
}
+
func createUsersTable(db *sql.DB) error {
+
createTableSQL := `CREATE TABLE IF NOT EXISTS users (
+
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
+
"did" TEXT,
+
"handle" TEXT,
+
"convoId" TEXT,
+
"createdAt" integer NOT NULL,
+
UNIQUE(did)
+
);`
+
+
slog.Info("Create users table...")
+
statement, err := db.Prepare(createTableSQL)
+
if err != nil {
+
return fmt.Errorf("prepare DB statement to create users table: %w", err)
+
}
+
_, err = statement.Exec()
+
if err != nil {
+
return fmt.Errorf("exec sql statement to create users table: %w", err)
+
}
+
slog.Info("users table created")
+
+
return nil
+
}
+
// CreateIssue will insert a issue into a database
func (d *Database) CreateIssue(issue Issue) error {
-
sql := `INSERT INTO issues (authorDid, rkey, title, body, repo, createdAt) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(authorDid, rkey) DO NOTHING;`
+
sql := `REPLACE INTO issues (authorDid, rkey, title, body, repo, createdAt) VALUES (?, ?, ?, ?, ?, ?);`
_, err := d.db.Exec(sql, issue.AuthorDID, issue.RKey, issue.Title, issue.Body, issue.Repo, issue.CreatedAt)
if err != nil {
return fmt.Errorf("exec insert issue: %w", err)
···
// CreateComment will insert a comment into a database
func (d *Database) CreateComment(comment Comment) error {
-
sql := `INSERT INTO comments (authorDid, rkey, body, issue, replyTo, createdAt) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(authorDid, rkey) DO NOTHING;`
-
_, err := d.db.Exec(sql, comment.AuthorDID, comment.RKey, comment.Body, comment.Issue, comment.ReplyTo, comment.CreatedAt)
+
sql := `REPLACE INTO comments (authorDid, rkey, body, issue, createdAt) VALUES (?, ?, ?, ?, ?);`
+
_, err := d.db.Exec(sql, comment.AuthorDID, comment.RKey, comment.Body, comment.Issue, comment.CreatedAt)
if err != nil {
return fmt.Errorf("exec insert comment: %w", err)
}
return nil
}
+
+
// CreateUser will insert a user into a database
+
func (d *Database) CreateUser(user User) error {
+
sql := `REPLACE INTO users (did, handle, convoId, createdAt) VALUES (?, ?, ?, ?);`
+
_, err := d.db.Exec(sql, user.DID, user.Handle, user.ConvoID, user.CreatedAt)
+
if err != nil {
+
return fmt.Errorf("exec insert user: %w", err)
+
}
+
return nil
+
}
+
+
func (d *Database) GetIssues() ([]Issue, error) {
+
sql := "SELECT authorDid, rkey, title, body, repo, createdAt FROM issues;"
+
rows, err := d.db.Query(sql)
+
if err != nil {
+
return nil, fmt.Errorf("run query to get issues: %w", err)
+
}
+
defer rows.Close()
+
+
var results []Issue
+
for rows.Next() {
+
var issue Issue
+
if err := rows.Scan(&issue.AuthorDID, &issue.RKey, &issue.Title, &issue.Body, &issue.Repo, &issue.CreatedAt); err != nil {
+
return nil, fmt.Errorf("scan row: %w", err)
+
}
+
+
results = append(results, issue)
+
}
+
return results, nil
+
}
+
+
func (d *Database) GetComments() ([]Comment, error) {
+
sql := "SELECT authorDid, rkey, body, issue, createdAt FROM comments;"
+
rows, err := d.db.Query(sql)
+
if err != nil {
+
return nil, fmt.Errorf("run query to get comments: %w", err)
+
}
+
defer rows.Close()
+
+
var results []Comment
+
for rows.Next() {
+
var comment Comment
+
if err := rows.Scan(&comment.AuthorDID, &comment.RKey, &comment.Body, &comment.Issue, &comment.CreatedAt); err != nil {
+
return nil, fmt.Errorf("scan row: %w", err)
+
}
+
+
results = append(results, comment)
+
}
+
return results, nil
+
}
+
+
func (d *Database) GetUser(did string) (User, error) {
+
sql := "SELECT did, handle, convoId, createdAt FROM users WHERE did = ?;"
+
rows, err := d.db.Query(sql, did)
+
if err != nil {
+
return User{}, fmt.Errorf("run query to get user: %w", err)
+
}
+
defer rows.Close()
+
+
for rows.Next() {
+
var user User
+
if err := rows.Scan(&user.DID, &user.Handle, &user.ConvoID, &user.CreatedAt); err != nil {
+
return User{}, fmt.Errorf("scan row: %w", err)
+
}
+
+
return user, nil
+
}
+
return User{}, fmt.Errorf("user not found")
+
}
+
+
func (d *Database) GetUsers() ([]User, error) {
+
sql := "SELECT did, handle, convoId, createdAt FROM users;"
+
rows, err := d.db.Query(sql)
+
if err != nil {
+
return nil, fmt.Errorf("run query to get user: %w", err)
+
}
+
defer rows.Close()
+
+
var results []User
+
for rows.Next() {
+
var user User
+
if err := rows.Scan(&user.DID, &user.Handle, &user.ConvoID, &user.CreatedAt); err != nil {
+
return nil, fmt.Errorf("scan row: %w", err)
+
}
+
results = append(results, user)
+
}
+
return results, nil
+
}
+
+
func (d *Database) DeleteIssue(did, rkey string) error {
+
sql := "DELETE FROM issues WHERE authorDid = ? AND rkey = ?;"
+
_, err := d.db.Exec(sql, did, rkey)
+
if err != nil {
+
return fmt.Errorf("exec delete issue: %w", err)
+
}
+
return nil
+
}
+
+
func (d *Database) DeleteComment(did, rkey string) error {
+
sql := "DELETE FROM comments WHERE authorDid = ? AND rkey = ?;"
+
_, err := d.db.Exec(sql, did, rkey)
+
if err != nil {
+
return fmt.Errorf("exec delete issue: %w", err)
+
}
+
return nil
+
}
+
+
func (d *Database) DeleteCommentsForIssue(issueURI string) error {
+
sql := "DELETE FROM comments WHERE issue = ?;"
+
_, err := d.db.Exec(sql, issueURI)
+
if err != nil {
+
return fmt.Errorf("exec delete comments for issue")
+
}
+
return nil
+
}
+
+
func (d *Database) DeleteUser(did string) error {
+
sql := "DELETE FROM users WHERE did = ?;"
+
_, err := d.db.Exec(sql, did)
+
if err != nil {
+
return fmt.Errorf("exec delete user")
+
}
+
return nil
+
}
+441
dm_handler.go
···
+
package tangledalertbot
+
+
import (
+
"bytes"
+
"context"
+
"encoding/json"
+
"fmt"
+
"io"
+
"log/slog"
+
"net/http"
+
"os"
+
"strings"
+
"time"
+
+
"github.com/pkg/errors"
+
)
+
+
const (
+
httpClientTimeoutDuration = time.Second * 5
+
transportIdleConnTimeoutDuration = time.Second * 90
+
baseBskyURL = "https://bsky.social/xrpc"
+
)
+
+
type auth struct {
+
AccessJwt string `json:"accessJwt"`
+
RefershJWT string `json:"refreshJwt"`
+
Did string `json:"did"`
+
}
+
+
type accessData struct {
+
handle string
+
appPassword string
+
}
+
+
type ListConvosResponse struct {
+
Cursor string `json:"cursor"`
+
Convos []Convo `json:"convos"`
+
}
+
+
type Convo struct {
+
ID string `json:"id"`
+
Members []ConvoMember `json:"members"`
+
UnreadCount int `json:"unreadCount"`
+
}
+
+
type ConvoMember struct {
+
Did string `json:"did"`
+
Handle string `json:"handle"`
+
}
+
+
type ErrorResponse struct {
+
Error string `json:"error"`
+
}
+
+
type MessageResp struct {
+
Messages []Message `json:"messages"`
+
Cursor string `json:"cursor"`
+
}
+
+
type Message struct {
+
ID string `json:"id"`
+
Sender MessageSender `json:"sender"`
+
Text string `json:"text"`
+
}
+
+
type MessageSender struct {
+
Did string `json:"did"`
+
}
+
+
type UpdateMessageReadRequest struct {
+
ConvoID string `json:"convoId"`
+
MessageID string `json:"messageId"`
+
}
+
+
type User struct {
+
DID string
+
Handle string
+
ConvoID string
+
CreatedAt int
+
}
+
+
type DmService struct {
+
httpClient *http.Client
+
accessData accessData
+
auth auth
+
timerDuration time.Duration
+
pdsURL string
+
store Store
+
}
+
+
func NewDmService(store Store, timerDuration time.Duration) (*DmService, error) {
+
httpClient := http.Client{
+
Timeout: httpClientTimeoutDuration,
+
Transport: &http.Transport{
+
IdleConnTimeout: transportIdleConnTimeoutDuration,
+
},
+
}
+
+
accessHandle := os.Getenv("MESSAGING_ACCESS_HANDLE")
+
accessAppPassword := os.Getenv("MESSAGING_ACCESS_APP_PASSWORD")
+
pdsURL := os.Getenv("MESSAGING_PDS_URL")
+
+
service := DmService{
+
httpClient: &httpClient,
+
accessData: accessData{
+
handle: accessHandle,
+
appPassword: accessAppPassword,
+
},
+
timerDuration: timerDuration,
+
pdsURL: pdsURL,
+
store: store,
+
}
+
+
auth, err := service.Authenicate()
+
if err != nil {
+
return nil, fmt.Errorf("authenticating: %w", err)
+
}
+
+
service.auth = auth
+
+
return &service, nil
+
}
+
+
func (d *DmService) Start(ctx context.Context) {
+
go d.RefreshTask(ctx)
+
+
timer := time.NewTimer(d.timerDuration)
+
defer timer.Stop()
+
+
for {
+
select {
+
case <-ctx.Done():
+
slog.Warn("context canceled - stopping dm task")
+
return
+
case <-timer.C:
+
err := d.HandleMessageTimer(ctx)
+
if err != nil {
+
slog.Error("handle message timer", "error", err)
+
}
+
timer.Reset(d.timerDuration)
+
}
+
}
+
}
+
+
func (d *DmService) RefreshTask(ctx context.Context) {
+
timer := time.NewTimer(time.Hour)
+
defer timer.Stop()
+
+
for {
+
select {
+
case <-ctx.Done():
+
return
+
case <-timer.C:
+
err := d.RefreshAuthenication(ctx)
+
if err != nil {
+
slog.Error("handle refresh auth timer", "error", err)
+
// TODO: better retry with backoff probably
+
timer.Reset(time.Minute)
+
continue
+
}
+
timer.Reset(time.Hour)
+
}
+
}
+
}
+
+
func (d *DmService) HandleMessageTimer(ctx context.Context) error {
+
convoResp, err := d.GetUnreadMessages()
+
if err != nil {
+
return fmt.Errorf("get unread messages: %w", err)
+
}
+
+
// TODO: handle the cursor pagination
+
+
for _, convo := range convoResp.Convos {
+
if convo.UnreadCount == 0 {
+
continue
+
}
+
+
messageResp, err := d.GetMessages(ctx, convo.ID)
+
if err != nil {
+
slog.Error("failed to get messages for convo", "error", err, "convo id", convo.ID)
+
continue
+
}
+
+
unreadCount := convo.UnreadCount
+
unreadMessages := make([]Message, 0, convo.UnreadCount)
+
// TODO: handle cursor pagination
+
for _, msg := range messageResp.Messages {
+
// TODO: techincally if I get to a message that's from the bot account, then there shouldn't be
+
// an more unread messages?
+
if msg.Sender.Did == d.auth.Did {
+
continue
+
}
+
+
unreadMessages = append(unreadMessages, msg)
+
unreadCount--
+
if unreadCount == 0 {
+
break
+
}
+
}
+
+
for _, msg := range unreadMessages {
+
d.handleMessage(msg, convo)
+
+
err = d.MarkMessageRead(msg.ID, convo.ID)
+
if err != nil {
+
slog.Error("marking message read", "error", err)
+
continue
+
}
+
}
+
}
+
+
return nil
+
}
+
+
func (d *DmService) handleMessage(msg Message, convo Convo) {
+
// TODO: add or remote user the list of "subsribed" users
+
if strings.ToLower(msg.Text) == "subscribe" {
+
userHandle := ""
+
for _, member := range convo.Members {
+
if member.Did == msg.Sender.Did {
+
userHandle = member.Handle
+
break
+
}
+
}
+
+
if userHandle == "" {
+
slog.Error("user handle for sent message not found", "sender did", msg.Sender.Did, "convo members", convo.Members)
+
return
+
}
+
+
user := User{
+
DID: msg.Sender.Did,
+
ConvoID: convo.ID,
+
Handle: userHandle,
+
CreatedAt: int(time.Now().UnixMilli()),
+
}
+
+
err := d.store.CreateUser(user)
+
if err != nil {
+
slog.Error("error creating user", "error", err, "user", user)
+
return
+
}
+
}
+
}
+
+
func (d *DmService) GetUnreadMessages() (ListConvosResponse, error) {
+
url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.listConvos?readState=unread", d.pdsURL)
+
request, err := http.NewRequest("GET", url, nil)
+
if err != nil {
+
return ListConvosResponse{}, fmt.Errorf("create new list convos http request: %w", err)
+
}
+
+
request.Header.Add("Content-Type", "application/json")
+
request.Header.Add("Accept", "application/json")
+
request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat")
+
request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt))
+
+
resp, err := d.httpClient.Do(request)
+
if err != nil {
+
return ListConvosResponse{}, fmt.Errorf("do http request to list convos: %w", err)
+
}
+
defer resp.Body.Close()
+
+
if resp.StatusCode != http.StatusOK {
+
var errorResp ErrorResponse
+
err = decodeResp(resp.Body, &errorResp)
+
if err != nil {
+
return ListConvosResponse{}, err
+
}
+
+
return ListConvosResponse{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error)
+
}
+
+
var listConvoResp ListConvosResponse
+
err = decodeResp(resp.Body, &listConvoResp)
+
if err != nil {
+
return ListConvosResponse{}, err
+
}
+
+
return listConvoResp, nil
+
}
+
+
func (d *DmService) MarkMessageRead(messageID, convoID string) error {
+
bodyReq := UpdateMessageReadRequest{
+
ConvoID: convoID,
+
MessageID: messageID,
+
}
+
+
bodyB, err := json.Marshal(bodyReq)
+
if err != nil {
+
return fmt.Errorf("marshal update message request body: %w", err)
+
}
+
+
r := bytes.NewReader(bodyB)
+
+
url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.updateRead", d.pdsURL)
+
request, err := http.NewRequest("POST", url, r)
+
if err != nil {
+
return fmt.Errorf("create new list convos http request: %w", err)
+
}
+
+
request.Header.Add("Content-Type", "application/json")
+
request.Header.Add("Accept", "application/json")
+
request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat")
+
request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt))
+
+
resp, err := d.httpClient.Do(request)
+
if err != nil {
+
return fmt.Errorf("do http request to update message read: %w", err)
+
}
+
defer resp.Body.Close()
+
+
if resp.StatusCode == http.StatusOK {
+
return nil
+
}
+
+
var errorResp ErrorResponse
+
err = decodeResp(resp.Body, &errorResp)
+
if err != nil {
+
return err
+
}
+
+
return fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error)
+
+
}
+
+
func (d *DmService) Authenicate() (auth, error) {
+
url := fmt.Sprintf("%s/com.atproto.server.createSession", baseBskyURL)
+
+
requestData := map[string]interface{}{
+
"identifier": d.accessData.handle,
+
"password": d.accessData.appPassword,
+
}
+
+
data, err := json.Marshal(requestData)
+
if err != nil {
+
return auth{}, errors.Wrap(err, "failed to marshal request")
+
}
+
+
r := bytes.NewReader(data)
+
+
request, err := http.NewRequest("POST", url, r)
+
if err != nil {
+
return auth{}, errors.Wrap(err, "failed to create request")
+
}
+
+
request.Header.Add("Content-Type", "application/json")
+
+
resp, err := d.httpClient.Do(request)
+
if err != nil {
+
return auth{}, errors.Wrap(err, "failed to make request")
+
}
+
defer resp.Body.Close()
+
+
var loginResp auth
+
err = decodeResp(resp.Body, &loginResp)
+
if err != nil {
+
return auth{}, err
+
}
+
+
return loginResp, nil
+
}
+
+
func (d *DmService) RefreshAuthenication(ctx context.Context) error {
+
url := fmt.Sprintf("%s/com.atproto.server.refreshSession", baseBskyURL)
+
+
request, err := http.NewRequest("POST", url, nil)
+
if err != nil {
+
return errors.Wrap(err, "failed to create request")
+
}
+
+
request.Header.Add("Content-Type", "application/json")
+
request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.RefershJWT))
+
+
resp, err := d.httpClient.Do(request)
+
if err != nil {
+
return errors.Wrap(err, "failed to make request")
+
}
+
defer resp.Body.Close()
+
+
var loginResp auth
+
err = decodeResp(resp.Body, &loginResp)
+
if err != nil {
+
return err
+
}
+
+
d.auth = loginResp
+
+
return nil
+
}
+
+
func (d *DmService) GetMessages(ctx context.Context, convoID string) (MessageResp, error) {
+
url := fmt.Sprintf("%s/xrpc/chat.bsky.convo.getMessages?convoId=%s", d.pdsURL, convoID)
+
request, err := http.NewRequest("GET", url, nil)
+
if err != nil {
+
return MessageResp{}, fmt.Errorf("create new get messages http request: %w", err)
+
}
+
+
request.Header.Add("Content-Type", "application/json")
+
request.Header.Add("Accept", "application/json")
+
request.Header.Add("Atproto-Proxy", "did:web:api.bsky.chat#bsky_chat")
+
request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", d.auth.AccessJwt))
+
+
resp, err := d.httpClient.Do(request)
+
if err != nil {
+
return MessageResp{}, fmt.Errorf("do http request to get messages: %w", err)
+
}
+
defer resp.Body.Close()
+
+
if resp.StatusCode != http.StatusOK {
+
var errorResp ErrorResponse
+
err = decodeResp(resp.Body, &errorResp)
+
if err != nil {
+
return MessageResp{}, err
+
}
+
+
return MessageResp{}, fmt.Errorf("listing convos responded with code %d: %s", resp.StatusCode, errorResp.Error)
+
}
+
+
var messageResp MessageResp
+
err = decodeResp(resp.Body, &messageResp)
+
if err != nil {
+
return MessageResp{}, err
+
}
+
+
return messageResp, nil
+
}
+
+
func decodeResp(body io.Reader, result any) error {
+
resBody, err := io.ReadAll(body)
+
if err != nil {
+
return errors.Wrap(err, "failed to read response")
+
}
+
+
err = json.Unmarshal(resBody, result)
+
if err != nil {
+
return errors.Wrap(err, "failed to unmarshal response")
+
}
+
return nil
+
}
+2
docker-compose.yaml
···
tangled-alert-bot:
container_name: tangled-alert-bot
image: willdot/tangled-alert-bot
+
ports:
+
- "3000:3000"
volumes:
- ./data:/app/data
environment:
+6 -1
go.mod
···
require (
github.com/avast/retry-go/v4 v4.6.1
github.com/bluesky-social/jetstream v0.0.0-20250815235753-306e46369336
+
github.com/bugsnag/bugsnag-go v2.6.2+incompatible
github.com/glebarez/go-sqlite v1.22.0
github.com/joho/godotenv v1.5.1
-
tangled.sh/tangled.sh/core v1.8.1-alpha
+
tangled.org/core v1.9.0-alpha.0.20250924195920-24d79d05e4d2
)
require (
github.com/beorn7/perks v1.0.1 // indirect
+
github.com/bitly/go-simplejson v0.5.1 // indirect
github.com/bluesky-social/indigo v0.0.0-20250808182429-6f0837c2d12b // indirect
+
github.com/bugsnag/panicwrap v1.3.4 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/goccy/go-json v0.10.5 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
github.com/ipfs/go-cid v0.5.0 // indirect
+
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
···
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
+
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.22.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.64.0 // indirect
+14 -2
go.sum
···
github.com/avast/retry-go/v4 v4.6.1/go.mod h1:V6oF8njAwxJ5gRo1Q7Cxab24xs5NCWZBeaHHBklR8mA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+
github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pgdoow=
+
github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q=
github.com/bluesky-social/indigo v0.0.0-20250808182429-6f0837c2d12b h1:bJTlFwMhB9sluuqZxVXtv2yFcaWOC/PZokz9mcwb4u4=
github.com/bluesky-social/indigo v0.0.0-20250808182429-6f0837c2d12b/go.mod h1:0XUyOCRtL4/OiyeqMTmr6RlVHQMDgw3LS7CfibuZR5Q=
github.com/bluesky-social/jetstream v0.0.0-20250815235753-306e46369336 h1:NM3wfeFUrdjCE/xHLXQorwQvEKlI9uqnWl7L0Y9KA8U=
github.com/bluesky-social/jetstream v0.0.0-20250815235753-306e46369336/go.mod h1:3ihWQCbXeayg41G8lQ5DfB/3NnEhl0XX24eZ3mLpf7Q=
+
github.com/bugsnag/bugsnag-go v2.6.2+incompatible h1:6R/uadVvhrciRbPXFmCY7sZ7ElbGKsxxOvG78HcGwj8=
+
github.com/bugsnag/bugsnag-go v2.6.2+incompatible/go.mod h1:2oa8nejYd4cQ/b0hMIopN0lCRxU0bueqREvZLWFrtK8=
+
github.com/bugsnag/panicwrap v1.3.4 h1:A6sXFtDGsgU/4BLf5JT0o5uYg3EeKgGx3Sfs+/uk3pU=
+
github.com/bugsnag/panicwrap v1.3.4/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
···
github.com/ipfs/go-cid v0.5.0/go.mod h1:0L7vmeNXpQpUS9vt+yEARkJ8rOg43DF3iPgn4GIN0mk=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
+
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA=
+
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
···
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
+
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q=
···
modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E=
modernc.org/sqlite v1.28.0 h1:Zx+LyDDmXczNnEQdvPuEfcFVA2ZPyaD7UCZDjef3BHQ=
modernc.org/sqlite v1.28.0/go.mod h1:Qxpazz0zH8Z1xCFyi5GSL3FzbtZ3fvbjmywNogldEW0=
-
tangled.sh/tangled.sh/core v1.8.1-alpha h1:mCBXOUfzNCT1AnbMnaBrc/AgvfnxOIf5rSIescecpko=
-
tangled.sh/tangled.sh/core v1.8.1-alpha/go.mod h1:9kSVXCu9DMszZoQ5P4Rgdpz+RHLMjbHy++53qE7EBoU=
+
tangled.org/core v1.9.0-alpha.0.20250924195920-24d79d05e4d2 h1:4bcQewZPzb7WfCuUPf4MPVWb04JiTbjbShcg5ONi9co=
+
tangled.org/core v1.9.0-alpha.0.20250924195920-24d79d05e4d2/go.mod h1:tYTB3RkgkeDAOFE0qX/9tQB80fdlDPR+vz4CdTMar3Y=
+
tangled.org/core v1.9.0-alpha.0.20250924200730-b2d8a54abc3d h1:DmdCyK+BZDYitJy6TdqTwvcci2EVYgDu2+LR853nyls=
+
tangled.org/core v1.9.0-alpha.0.20250924200730-b2d8a54abc3d/go.mod h1:tYTB3RkgkeDAOFE0qX/9tQB80fdlDPR+vz4CdTMar3Y=