this repo has no description

Compare changes

Choose any two refs to compare.

Changed files
+116 -30
cmd
-2
cmd/main.go
···
if errors.Is(err, context.Canceled) {
return nil
}
-
slog.Error("consume loop", "error", err)
-
bugsnag.Notify(err)
return err
}
return nil
···
if errors.Is(err, context.Canceled) {
return nil
}
return err
}
return nil
+80 -20
consumer.go
···
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
"github.com/bluesky-social/jetstream/pkg/models"
"github.com/bugsnag/bugsnag-go"
-
"tangled.sh/tangled.sh/core/api/tangled"
)
type Issue struct {
···
RKey string `json:"rkey"`
Body string `json:"body"`
Issue string `json:"issue" `
-
ReplyTo string `json:"replyTo"`
CreatedAt int64 `json:"createdAt"`
}
type Store interface {
CreateIssue(issue Issue) error
CreateComment(comment Comment) error
}
// JetstreamConsumer is responsible for consuming from a jetstream instance
···
switch event.Commit.Operation {
case models.CommitOperationCreate, models.CommitOperationUpdate:
-
return h.handleCreateEvent(ctx, event)
-
// TODO: handle deletes too
default:
return nil
}
}
-
func (h *Handler) handleCreateEvent(ctx context.Context, event *models.Event) error {
switch event.Commit.Collection {
case tangled.RepoIssueNSID:
-
h.handleIssueEvent(ctx, event)
case tangled.RepoIssueCommentNSID:
-
h.handleIssueCommentEvent(ctx, event)
default:
slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
return nil
···
return nil
}
-
func (h *Handler) handleIssueEvent(ctx context.Context, event *models.Event) {
var issue tangled.RepoIssue
err := json.Unmarshal(event.Commit.Record, &issue)
···
slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey)
}
-
func (h *Handler) handleIssueCommentEvent(ctx context.Context, event *models.Event) {
var comment tangled.RepoIssueComment
err := json.Unmarshal(event.Commit.Record, &comment)
···
slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt)
createdAt = time.Now().UTC()
}
-
err = h.store.CreateComment(Comment{
-
AuthorDID: did,
-
RKey: rkey,
-
Body: comment.Body,
-
Issue: comment.Issue,
-
CreatedAt: createdAt.UnixMilli(),
-
//ReplyTo: comment, // TODO: there should be a ReplyTo field that can be used as well once the right type is imported
-
})
if err != nil {
bugsnag.Notify(err)
-
slog.Error("create comment", "error", err, "did", did, "rkey", rkey)
return
}
-
// TODO: now send a notification to either the issue creator or whoever the comment was a reply to
-
slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey)
}
···
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
"github.com/bluesky-social/jetstream/pkg/models"
"github.com/bugsnag/bugsnag-go"
+
"tangled.org/core/api/tangled"
)
type Issue struct {
···
RKey string `json:"rkey"`
Body string `json:"body"`
Issue string `json:"issue" `
CreatedAt int64 `json:"createdAt"`
}
type Store interface {
CreateIssue(issue Issue) error
CreateComment(comment Comment) error
+
DeleteIssue(did, rkey string) error
+
DeleteComment(did, rkey string) error
+
DeleteCommentsForIssue(issueURI string) error
}
// JetstreamConsumer is responsible for consuming from a jetstream instance
···
switch event.Commit.Operation {
case models.CommitOperationCreate, models.CommitOperationUpdate:
+
return h.handleCreateUpdateEvent(ctx, event)
+
case models.CommitOperationDelete:
+
return h.handleDeleteEvent(ctx, event)
default:
return nil
}
}
+
func (h *Handler) handleCreateUpdateEvent(ctx context.Context, event *models.Event) error {
switch event.Commit.Collection {
case tangled.RepoIssueNSID:
+
h.handleCreateUpdateIssueEvent(ctx, event)
case tangled.RepoIssueCommentNSID:
+
h.handleCreateUpdateIssueCommentEvent(ctx, event)
default:
slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
return nil
···
return nil
}
+
func (h *Handler) handleDeleteEvent(ctx context.Context, event *models.Event) error {
+
switch event.Commit.Collection {
+
case tangled.RepoIssueNSID:
+
h.handleDeleteIssueEvent(ctx, event)
+
case tangled.RepoIssueCommentNSID:
+
h.handleDeleteIssueCommentEvent(ctx, event)
+
default:
+
slog.Info("create event was not for expected collection", "RKey", "did", event.Did, event.Commit.RKey, "collection", event.Commit.Collection)
+
return nil
+
}
+
+
return nil
+
}
+
+
func (h *Handler) handleCreateUpdateIssueEvent(ctx context.Context, event *models.Event) {
var issue tangled.RepoIssue
err := json.Unmarshal(event.Commit.Record, &issue)
···
slog.Info("created issue ", "value", issue, "did", did, "rkey", rkey)
}
+
func (h *Handler) handleCreateUpdateIssueCommentEvent(ctx context.Context, event *models.Event) {
var comment tangled.RepoIssueComment
err := json.Unmarshal(event.Commit.Record, &comment)
···
slog.Error("parsing createdAt time from comment", "error", err, "timestamp", comment.CreatedAt)
createdAt = time.Now().UTC()
}
+
+
// if there is a replyTo present, don't store the comment because replies can't be replied to so
+
// the reply comment doesn't need to be stored
+
if comment.ReplyTo == nil || *comment.ReplyTo == "" {
+
err = h.store.CreateComment(Comment{
+
AuthorDID: did,
+
RKey: rkey,
+
Body: comment.Body,
+
Issue: comment.Issue,
+
CreatedAt: createdAt.UnixMilli(),
+
})
+
if err != nil {
+
bugsnag.Notify(err)
+
slog.Error("create comment", "error", err, "did", did, "rkey", rkey)
+
return
+
}
+
}
+
+
// TODO: now send a notification to either the issue creator or whoever the comment was a reply to
+
+
slog.Info("created comment ", "value", comment, "did", did, "rkey", rkey)
+
}
+
+
func (h *Handler) handleDeleteIssueEvent(ctx context.Context, event *models.Event) {
+
did := event.Did
+
rkey := event.Commit.RKey
+
+
err := h.store.DeleteIssue(did, rkey)
if err != nil {
bugsnag.Notify(err)
+
slog.Error("delete issue", "error", err, "did", did, "rkey", rkey)
return
}
+
// now attempt to delete any comments on that issue since they can't be replied to now.
+
// Note: if unsuccessful it doesn't matter because a deleted issue and its comments are
+
// not visible on the UI and so no one will be able to reply to them so this is just a
+
// cleanup operation
+
issueURI := fmt.Sprintf("at://%s/%s/%s", did, tangled.RepoIssueNSID, rkey)
+
err = h.store.DeleteCommentsForIssue(issueURI)
+
if err != nil {
+
bugsnag.Notify(err)
+
slog.Error("delete comments for issue", "error", err, "issue URI", issueURI)
+
}
+
slog.Info("deleted issue ", "did", did, "rkey", rkey)
+
}
+
+
func (h *Handler) handleDeleteIssueCommentEvent(ctx context.Context, event *models.Event) {
+
did := event.Did
+
rkey := event.Commit.RKey
+
+
err := h.store.DeleteComment(did, rkey)
+
if err != nil {
+
bugsnag.Notify(err)
+
slog.Error("delete comment", "error", err, "did", did, "rkey", rkey)
+
return
+
}
+
+
slog.Info("deleted comment ", "did", did, "rkey", rkey)
}
+31 -5
database.go
···
"rkey" TEXT,
"body" TEXT,
"issue" TEXT,
-
"replyTo" TEXT,
"createdAt" integer NOT NULL,
UNIQUE(authorDid,rkey)
);`
···
// CreateComment will insert a comment into a database
func (d *Database) CreateComment(comment Comment) error {
-
sql := `REPLACE INTO comments (authorDid, rkey, body, issue, replyTo, createdAt) VALUES (?, ?, ?, ?, ?, ?);`
-
_, err := d.db.Exec(sql, comment.AuthorDID, comment.RKey, comment.Body, comment.Issue, comment.ReplyTo, comment.CreatedAt)
if err != nil {
return fmt.Errorf("exec insert comment: %w", err)
}
···
}
func (d *Database) GetComments() ([]Comment, error) {
-
sql := "SELECT authorDid, rkey, body, issue, replyTo, createdAt FROM comments;"
rows, err := d.db.Query(sql)
if err != nil {
return nil, fmt.Errorf("run query to get comments: %w", err)
···
var results []Comment
for rows.Next() {
var comment Comment
-
if err := rows.Scan(&comment.AuthorDID, &comment.RKey, &comment.Body, &comment.Issue, &comment.ReplyTo, &comment.CreatedAt); err != nil {
return nil, fmt.Errorf("scan row: %w", err)
}
···
}
return results, nil
}
···
"rkey" TEXT,
"body" TEXT,
"issue" TEXT,
"createdAt" integer NOT NULL,
UNIQUE(authorDid,rkey)
);`
···
// CreateComment will insert a comment into a database
func (d *Database) CreateComment(comment Comment) error {
+
sql := `REPLACE INTO comments (authorDid, rkey, body, issue, createdAt) VALUES (?, ?, ?, ?, ?);`
+
_, err := d.db.Exec(sql, comment.AuthorDID, comment.RKey, comment.Body, comment.Issue, comment.CreatedAt)
if err != nil {
return fmt.Errorf("exec insert comment: %w", err)
}
···
}
func (d *Database) GetComments() ([]Comment, error) {
+
sql := "SELECT authorDid, rkey, body, issue, createdAt FROM comments;"
rows, err := d.db.Query(sql)
if err != nil {
return nil, fmt.Errorf("run query to get comments: %w", err)
···
var results []Comment
for rows.Next() {
var comment Comment
+
if err := rows.Scan(&comment.AuthorDID, &comment.RKey, &comment.Body, &comment.Issue, &comment.CreatedAt); err != nil {
return nil, fmt.Errorf("scan row: %w", err)
}
···
}
return results, nil
}
+
+
func (d *Database) DeleteIssue(did, rkey string) error {
+
sql := "DELETE FROM issues WHERE authorDid = ? AND rkey = ?;"
+
_, err := d.db.Exec(sql, did, rkey)
+
if err != nil {
+
return fmt.Errorf("exec delete issue: %w", err)
+
}
+
return nil
+
}
+
+
func (d *Database) DeleteComment(did, rkey string) error {
+
sql := "DELETE FROM comments WHERE authorDid = ? AND rkey = ?;"
+
_, err := d.db.Exec(sql, did, rkey)
+
if err != nil {
+
return fmt.Errorf("exec delete issue: %w", err)
+
}
+
return nil
+
}
+
+
func (d *Database) DeleteCommentsForIssue(issueURI string) error {
+
sql := "DELETE FROM comments WHERE issue = ?;"
+
_, err := d.db.Exec(sql, issueURI)
+
if err != nil {
+
return fmt.Errorf("exec delete comments for issue")
+
}
+
return nil
+
}
+1 -1
go.mod
···
github.com/bugsnag/bugsnag-go v2.6.2+incompatible
github.com/glebarez/go-sqlite v1.22.0
github.com/joho/godotenv v1.5.1
-
tangled.sh/tangled.sh/core v1.8.1-alpha.0.20250828210137-07b009bd6b98
)
require (
···
github.com/bugsnag/bugsnag-go v2.6.2+incompatible
github.com/glebarez/go-sqlite v1.22.0
github.com/joho/godotenv v1.5.1
+
tangled.org/core v1.9.0-alpha.0.20250924195920-24d79d05e4d2
)
require (
+4 -2
go.sum
···
modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E=
modernc.org/sqlite v1.28.0 h1:Zx+LyDDmXczNnEQdvPuEfcFVA2ZPyaD7UCZDjef3BHQ=
modernc.org/sqlite v1.28.0/go.mod h1:Qxpazz0zH8Z1xCFyi5GSL3FzbtZ3fvbjmywNogldEW0=
-
tangled.sh/tangled.sh/core v1.8.1-alpha.0.20250828210137-07b009bd6b98 h1:WovrwwBufU89zoSaStoc6+qyUTEB/LxhUCM1MqGEUwU=
-
tangled.sh/tangled.sh/core v1.8.1-alpha.0.20250828210137-07b009bd6b98/go.mod h1:zXmPB9VMsPWpJ6Y51PWnzB1fL3w69P0IhR9rTXIfGPY=
···
modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E=
modernc.org/sqlite v1.28.0 h1:Zx+LyDDmXczNnEQdvPuEfcFVA2ZPyaD7UCZDjef3BHQ=
modernc.org/sqlite v1.28.0/go.mod h1:Qxpazz0zH8Z1xCFyi5GSL3FzbtZ3fvbjmywNogldEW0=
+
tangled.org/core v1.9.0-alpha.0.20250924195920-24d79d05e4d2 h1:4bcQewZPzb7WfCuUPf4MPVWb04JiTbjbShcg5ONi9co=
+
tangled.org/core v1.9.0-alpha.0.20250924195920-24d79d05e4d2/go.mod h1:tYTB3RkgkeDAOFE0qX/9tQB80fdlDPR+vz4CdTMar3Y=
+
tangled.org/core v1.9.0-alpha.0.20250924200730-b2d8a54abc3d h1:DmdCyK+BZDYitJy6TdqTwvcci2EVYgDu2+LR853nyls=
+
tangled.org/core v1.9.0-alpha.0.20250924200730-b2d8a54abc3d/go.mod h1:tYTB3RkgkeDAOFE0qX/9tQB80fdlDPR+vz4CdTMar3Y=