package main import ( "bytes" "context" "encoding/json" "fmt" "io" "log" "net/http" "os" "strings" "time" "github.com/bluesky-social/indigo/repo" "github.com/fxamacker/cbor/v2" "github.com/ipfs/go-cid" ) func JSONDecoder(messageChan chan *DatabasePost) OnMessageCB { return func(message []byte) { // log.Printf("recv: %s", message) var data ComBskyFeedPost err := json.Unmarshal(message, &data) if err != nil { log.Println(fmt.Errorf("failed to unmarshal data", err)) return } // log.Printf("parse: %s", data) /// /// /// /// /// /// /// /// /// Begin Filtering Logic /// /// /// /// /// /// /// /// /// if data.Commit.Operation != "create" { return } topic := "unknown" text := data.Commit.Record.Text if MessageIsAboutCoffee(text) { topic = "coffee" } else if MessageSlippedOnBananaPeel(text) { topic = "banana-slip" } else { return } /// /// /// /// /// /// /// /// /// End Filtering Logic /// /// /// /// /// /// /// /// /// log.Printf("text (topic=%s): %s", topic, text) uri := fmt.Sprintf("at://%s/%s/%s", data.Did, data.Commit.Collection, data.Commit.Rev) cid := data.Commit.Cid now := time.Now() indexedAt := now.UTC().Format(time.RFC3339) post := &DatabasePost{ Uri: uri, Cid: cid, Topic: topic, IndexedAt: indexedAt, } // log.Printf("formed: %s", post) messageChan <- post } } func CBORDecoder(messageChan chan *DatabasePost) OnMessageCB { return func(message []byte) { var frame MessageFrameCBOR var err error remaining, err := cbor.UnmarshalFirst(message, &frame.Header) if err != nil { log.Println(fmt.Errorf("failed to unmarshal frame header data", err)) return } if frame.Header.Kind != "#commit" { return } _, err = cbor.UnmarshalFirst(remaining, &frame.Commit) if err != nil { log.Println(fmt.Errorf("failed to unmarshal frame body as commit", err)) return } var car *repo.Repo = nil var ctx *context.Context = nil for _, op := range frame.Commit.Ops { if op.Action != "create" || !strings.Contains(op.Path, "app.bsky.feed.post/") { continue } // Why do we need to remove the first character? No idea, but it's garbage opCID, err := cid.Cast(op.RawCID[1:]) if err != nil { log.Println(fmt.Errorf("failed to unmarshal CID: ", err)) return } if car == nil { blocks := bytes.NewReader(frame.Commit.Blocks) newCtx := context.Background() ctx = &newCtx car, err = repo.ReadRepoFromCar(*ctx, blocks) if err != nil { log.Println(fmt.Errorf("failed to read CAR: ", err)) return } } block, err := car.Blockstore().Get(*ctx, opCID) if err != nil { log.Println(fmt.Errorf("failed to retrieve from blockstore: ", err)) return } rawData := block.RawData() var blockData BlockCBOR _, err = cbor.UnmarshalFirst(rawData, &blockData) if err != nil { log.Println(fmt.Errorf("failed to unmarshall block data: ", err)) return } embedKind := blockData.Embed.Kind if embedKind == "app.bsky.embed.images" && strings.Contains(strings.ToLower(blockData.Text), "coffee") { for _, imageData := range blockData.Embed.Images { // Why do we need to remove the first character? No idea, but it's garbage imageCID, err := cid.Cast(imageData.Image.Ref[1:]) if err != nil { log.Println(fmt.Errorf("failed to unmarshal image CID: ", err)) return } // what do we do with imageCID??? imageCDNURL := fmt.Sprintf( "https://cdn.bsky.app/img/feed_thumbnail/plain/%s/%s", frame.Commit.Did, imageCID.String(), ) res, err := http.Get(imageCDNURL) if err != nil { log.Println(fmt.Errorf("failed to download image: ", err)) return } defer res.Body.Close() // body, err := io.ReadAll(res.Body) // if err != nil { // log.Println(fmt.Errorf("failed to read image: ", err)) // return // } file, err := os.Create(fmt.Sprintf("im/%s-%s.jpeg", frame.Commit.Did, imageCID.String())) if err != nil { log.Println(fmt.Errorf("failed to open image file: ", err)) return } defer file.Close() _, err = io.Copy(file, res.Body) if err != nil { log.Println(fmt.Errorf("failed to copy to image file: ", err)) return } } } // FINALLY text := blockData.Text /// /// /// /// /// /// /// /// /// Begin Filtering Logic /// /// /// /// /// /// /// /// /// topic := "unknown" if MessageIsAboutCoffee(text) { topic = "coffee" } else if MessageSlippedOnBananaPeel(text) { topic = "banana-slip" } else { return } /// /// /// /// /// /// /// /// /// End Filtering Logic /// /// /// /// /// /// /// /// /// log.Printf("text (topic=%s): %s", topic, text) did := frame.Commit.Did path := op.Path uri := fmt.Sprintf("at://%s/%s", did, path) now := time.Now() indexedAt := now.UTC().Format(time.RFC3339) post := &DatabasePost{ Uri: uri, Cid: opCID.String(), Topic: topic, IndexedAt: indexedAt, } // log.Printf("formed: %s", post) messageChan <- post } } }