forked from tangled.org/core
Monorepo for Tangled — https://tangled.org

appview: ingester: process sh.tangled.repo.artifact records

Changed files
+87 -102
appview
+9 -9
appview/db/artifact.go
···
BlobCid cid.Cid
Name string
Size uint64
-
Mimetype string
}
func (a *Artifact) ArtifactAt() syntax.ATURI {
···
artifact.BlobCid.String(),
artifact.Name,
artifact.Size,
-
artifact.Mimetype,
)
return err
}
-
type Filter struct {
key string
arg any
}
-
func NewFilter(key string, arg any) Filter {
-
return Filter{
key: key,
arg: arg,
}
}
-
func (f Filter) Condition() string {
return fmt.Sprintf("%s = ?", f.key)
}
-
func GetArtifact(e Execer, filters ...Filter) ([]Artifact, error) {
var artifacts []Artifact
var conditions []string
···
&blobCid,
&artifact.Name,
&artifact.Size,
-
&artifact.Mimetype,
); err != nil {
return nil, err
}
···
return artifacts, nil
}
-
func RemoveArtifact(e Execer, filters ...Filter) error {
var conditions []string
var args []any
for _, filter := range filters {
···
BlobCid cid.Cid
Name string
Size uint64
+
MimeType string
}
func (a *Artifact) ArtifactAt() syntax.ATURI {
···
artifact.BlobCid.String(),
artifact.Name,
artifact.Size,
+
artifact.MimeType,
)
return err
}
+
type filter struct {
key string
arg any
}
+
func Filter(key string, arg any) filter {
+
return filter{
key: key,
arg: arg,
}
}
+
func (f filter) Condition() string {
return fmt.Sprintf("%s = ?", f.key)
}
+
func GetArtifact(e Execer, filters ...filter) ([]Artifact, error) {
var artifacts []Artifact
var conditions []string
···
&blobCid,
&artifact.Name,
&artifact.Size,
+
&artifact.MimeType,
); err != nil {
return nil, err
}
···
return artifacts, nil
}
+
func DeleteArtifact(e Execer, filters ...filter) error {
var conditions []string
var args []any
for _, filter := range filters {
+56 -1
appview/ingester.go
···
"encoding/json"
"fmt"
"log"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/jetstream/pkg/models"
-
tangled "tangled.sh/tangled.sh/core/api/tangled"
"tangled.sh/tangled.sh/core/appview/db"
)
···
ingestStar(&d, e)
case tangled.PublicKeyNSID:
ingestPublicKey(&d, e)
}
return err
···
return nil
}
···
"encoding/json"
"fmt"
"log"
+
"time"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/jetstream/pkg/models"
+
"github.com/go-git/go-git/v5/plumbing"
+
"github.com/ipfs/go-cid"
+
"tangled.sh/tangled.sh/core/api/tangled"
"tangled.sh/tangled.sh/core/appview/db"
)
···
ingestStar(&d, e)
case tangled.PublicKeyNSID:
ingestPublicKey(&d, e)
+
case tangled.RepoArtifactNSID:
+
ingestArtifact(&d, e)
}
return err
···
return nil
}
+
+
func ingestArtifact(d *db.DbWrapper, e *models.Event) error {
+
did := e.Did
+
var err error
+
+
switch e.Commit.Operation {
+
case models.CommitOperationCreate, models.CommitOperationUpdate:
+
log.Println("processing add of artifact")
+
raw := json.RawMessage(e.Commit.Record)
+
record := tangled.RepoArtifact{}
+
err = json.Unmarshal(raw, &record)
+
if err != nil {
+
log.Printf("invalid record: %s", err)
+
return err
+
}
+
+
repoAt, err := syntax.ParseATURI(record.Repo)
+
if err != nil {
+
return err
+
}
+
+
createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
+
if err != nil {
+
createdAt = time.Now()
+
}
+
+
artifact := db.Artifact{
+
Did: did,
+
Rkey: e.Commit.RKey,
+
RepoAt: repoAt,
+
Tag: plumbing.Hash(record.Tag),
+
CreatedAt: createdAt,
+
BlobCid: cid.Cid(record.Artifact.Ref),
+
Name: record.Name,
+
Size: uint64(record.Artifact.Size),
+
MimeType: record.Artifact.MimeType,
+
}
+
+
err = db.AddArtifact(d, artifact)
+
case models.CommitOperationDelete:
+
log.Println("processing delete of artifact")
+
err = db.DeleteArtifact(d, db.Filter("did", did), db.Filter("rkey", e.Commit.RKey))
+
}
+
+
if err != nil {
+
return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
+
}
+
+
return nil
+
}
+1 -1
appview/pages/pages.go
···
type RepoArtifactParams struct {
LoggedInUser *auth.User
-
RepoInfo RepoInfo
Artifact db.Artifact
}
···
type RepoArtifactParams struct {
LoggedInUser *auth.User
+
RepoInfo repoinfo.RepoInfo
Artifact db.Artifact
}
+4 -4
appview/pages/templates/repo/fragments/artifact.html
···
<a href="/{{ .RepoInfo.FullName }}/tags/{{ .Artifact.Tag.String }}/download/{{ .Artifact.Name | urlquery }}" class="no-underline hover:no-underline">
{{ .Artifact.Name }}
</a>
-
<span class="text-gray-500 dark:text-gray-400 pl-2">{{ byteFmt .Artifact.Size }}</span>
</div>
-
<div id="right-side" class="text-gray-500 dark:text-gray-400 flex items-center flex-shrink-0 gap-2">
<span title="{{ longTimeFmt .Artifact.CreatedAt }}" class="hidden md:inline">{{ timeFmt .Artifact.CreatedAt }}</span>
<span title="{{ longTimeFmt .Artifact.CreatedAt }}" class=" md:hidden">{{ shortTimeFmt .Artifact.CreatedAt }}</span>
<span class="select-none after:content-['·'] hidden md:inline"></span>
-
<span class="truncate max-w-[100px] hidden md:inline">{{ .Artifact.Mimetype }}</span>
-
{{ if and (.LoggedInUser) (eq .LoggedInUser.Did .Artifact.Did) }}
<button
id="delete-{{ $unique }}"
class="btn text-red-500 hover:text-red-700 dark:text-red-400 dark:hover:text-red-300 gap-2"
···
<a href="/{{ .RepoInfo.FullName }}/tags/{{ .Artifact.Tag.String }}/download/{{ .Artifact.Name | urlquery }}" class="no-underline hover:no-underline">
{{ .Artifact.Name }}
</a>
+
<span class="text-gray-500 dark:text-gray-400 pl-2 text-sm">{{ byteFmt .Artifact.Size }}</span>
</div>
+
<div id="right-side" class="text-gray-500 dark:text-gray-400 flex items-center flex-shrink-0 gap-2 text-sm">
<span title="{{ longTimeFmt .Artifact.CreatedAt }}" class="hidden md:inline">{{ timeFmt .Artifact.CreatedAt }}</span>
<span title="{{ longTimeFmt .Artifact.CreatedAt }}" class=" md:hidden">{{ shortTimeFmt .Artifact.CreatedAt }}</span>
<span class="select-none after:content-['·'] hidden md:inline"></span>
+
<span class="truncate max-w-[100px] hidden md:inline">{{ .Artifact.MimeType }}</span>
+
{{ if and .LoggedInUser (eq .LoggedInUser.Did .Artifact.Did) }}
<button
id="delete-{{ $unique }}"
class="btn text-red-500 hover:text-red-700 dark:text-red-400 dark:hover:text-red-300 gap-2"
+14 -14
appview/state/artifact.go
···
func (s *State) AttachArtifact(w http.ResponseWriter, r *http.Request) {
user := s.auth.GetUser(r)
tagParam := chi.URLParam(r, "tag")
-
f, err := fullyResolvedRepo(r)
if err != nil {
log.Println("failed to get repo and knot", err)
s.pages.Notice(w, "upload", "failed to upload artifact, error in repo resolution")
···
BlobCid: cid.Cid(uploadBlobResp.Blob.Ref),
Name: handler.Filename,
Size: uint64(uploadBlobResp.Blob.Size),
-
Mimetype: uploadBlobResp.Blob.MimeType,
}
err = db.AddArtifact(tx, artifact)
···
func (s *State) DownloadArtifact(w http.ResponseWriter, r *http.Request) {
tagParam := chi.URLParam(r, "tag")
filename := chi.URLParam(r, "file")
-
f, err := fullyResolvedRepo(r)
if err != nil {
log.Println("failed to get repo and knot", err)
return
···
artifacts, err := db.GetArtifact(
s.db,
-
db.NewFilter("repo_at", f.RepoAt),
-
db.NewFilter("tag", tag.Tag.Hash[:]),
-
db.NewFilter("name", filename),
)
if err != nil {
log.Println("failed to get artifacts", err)
···
user := s.auth.GetUser(r)
tagParam := chi.URLParam(r, "tag")
filename := chi.URLParam(r, "file")
-
f, err := fullyResolvedRepo(r)
if err != nil {
log.Println("failed to get repo and knot", err)
return
···
artifacts, err := db.GetArtifact(
s.db,
-
db.NewFilter("repo_at", f.RepoAt),
-
db.NewFilter("tag", tag[:]),
-
db.NewFilter("name", filename),
)
if err != nil {
log.Println("failed to get artifacts", err)
···
}
defer tx.Rollback()
-
err = db.RemoveArtifact(tx,
-
db.NewFilter("repo_at", f.RepoAt),
-
db.NewFilter("tag", artifact.Tag[:]),
-
db.NewFilter("name", filename),
)
if err != nil {
log.Println("failed to remove artifact record from db", err)
···
func (s *State) AttachArtifact(w http.ResponseWriter, r *http.Request) {
user := s.auth.GetUser(r)
tagParam := chi.URLParam(r, "tag")
+
f, err := s.fullyResolvedRepo(r)
if err != nil {
log.Println("failed to get repo and knot", err)
s.pages.Notice(w, "upload", "failed to upload artifact, error in repo resolution")
···
BlobCid: cid.Cid(uploadBlobResp.Blob.Ref),
Name: handler.Filename,
Size: uint64(uploadBlobResp.Blob.Size),
+
MimeType: uploadBlobResp.Blob.MimeType,
}
err = db.AddArtifact(tx, artifact)
···
func (s *State) DownloadArtifact(w http.ResponseWriter, r *http.Request) {
tagParam := chi.URLParam(r, "tag")
filename := chi.URLParam(r, "file")
+
f, err := s.fullyResolvedRepo(r)
if err != nil {
log.Println("failed to get repo and knot", err)
return
···
artifacts, err := db.GetArtifact(
s.db,
+
db.Filter("repo_at", f.RepoAt),
+
db.Filter("tag", tag.Tag.Hash[:]),
+
db.Filter("name", filename),
)
if err != nil {
log.Println("failed to get artifacts", err)
···
user := s.auth.GetUser(r)
tagParam := chi.URLParam(r, "tag")
filename := chi.URLParam(r, "file")
+
f, err := s.fullyResolvedRepo(r)
if err != nil {
log.Println("failed to get repo and knot", err)
return
···
artifacts, err := db.GetArtifact(
s.db,
+
db.Filter("repo_at", f.RepoAt),
+
db.Filter("tag", tag[:]),
+
db.Filter("name", filename),
)
if err != nil {
log.Println("failed to get artifacts", err)
···
}
defer tx.Rollback()
+
err = db.DeleteArtifact(tx,
+
db.Filter("repo_at", f.RepoAt),
+
db.Filter("tag", artifact.Tag[:]),
+
db.Filter("name", filename),
)
if err != nil {
log.Println("failed to remove artifact record from db", err)
-70
appview/state/jetstream.go
···
-
package state
-
-
import (
-
"context"
-
"encoding/json"
-
"fmt"
-
"log"
-
-
"github.com/bluesky-social/indigo/atproto/syntax"
-
"github.com/bluesky-social/jetstream/pkg/models"
-
"tangled.sh/tangled.sh/core/api/tangled"
-
"tangled.sh/tangled.sh/core/appview/db"
-
)
-
-
type Ingester func(ctx context.Context, e *models.Event) error
-
-
func jetstreamIngester(d db.DbWrapper) Ingester {
-
return func(ctx context.Context, e *models.Event) error {
-
var err error
-
defer func() {
-
eventTime := e.TimeUS
-
lastTimeUs := eventTime + 1
-
if err := d.SaveLastTimeUs(lastTimeUs); err != nil {
-
err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
-
}
-
}()
-
-
if e.Kind != models.EventKindCommit {
-
return nil
-
}
-
-
did := e.Did
-
raw := json.RawMessage(e.Commit.Record)
-
-
switch e.Commit.Collection {
-
case tangled.GraphFollowNSID:
-
record := tangled.GraphFollow{}
-
err := json.Unmarshal(raw, &record)
-
if err != nil {
-
log.Println("invalid record")
-
return err
-
}
-
err = db.AddFollow(d, did, record.Subject, e.Commit.RKey)
-
if err != nil {
-
return fmt.Errorf("failed to add follow to db: %w", err)
-
}
-
case tangled.FeedStarNSID:
-
record := tangled.FeedStar{}
-
err := json.Unmarshal(raw, &record)
-
if err != nil {
-
log.Println("invalid record")
-
return err
-
}
-
-
subjectUri, err := syntax.ParseATURI(record.Subject)
-
-
if err != nil {
-
log.Println("invalid record")
-
return err
-
}
-
-
err = db.AddStar(d, did, subjectUri, e.Commit.RKey)
-
if err != nil {
-
return fmt.Errorf("failed to add follow to db: %w", err)
-
}
-
}
-
-
return err
-
}
-
}
···
+1 -1
appview/state/repo.go
···
return
}
-
artifacts, err := db.GetArtifact(s.db, db.NewFilter("repo_at", f.RepoAt))
if err != nil {
log.Println("failed grab artifacts", err)
return
···
return
}
+
artifacts, err := db.GetArtifact(s.db, db.Filter("repo_at", f.RepoAt))
if err != nil {
log.Println("failed grab artifacts", err)
return
+1 -1
appview/state/state.go
···
jc, err := jetstream.NewJetstreamClient(
config.JetstreamEndpoint,
"appview",
-
[]string{tangled.GraphFollowNSID, tangled.FeedStarNSID, tangled.PublicKeyNSID},
nil,
slog.Default(),
wrapper,
···
jc, err := jetstream.NewJetstreamClient(
config.JetstreamEndpoint,
"appview",
+
[]string{tangled.GraphFollowNSID, tangled.FeedStarNSID, tangled.PublicKeyNSID, tangled.RepoArtifactNSID},
nil,
slog.Default(),
wrapper,
+1 -1
flake.nix
···
inherit (gitignore.lib) gitignoreSource;
in {
overlays.default = final: prev: let
-
goModHash = "sha256-EilWxfqrcKDaSR5zA3ZuDSCq7V+/IfWpKPu8HWhpndA=";
buildCmdPackage = name:
final.buildGoModule {
pname = name;
···
inherit (gitignore.lib) gitignoreSource;
in {
overlays.default = final: prev: let
+
goModHash = "sha256-CmBuvv3duQQoc8iTW4244w1rYLGeqMQS+qQ3wwReZZg=";
buildCmdPackage = name:
final.buildGoModule {
pname = name;