Feed generator written in Golang
at main 5.2 kB view raw
1package main 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log" 10 "net/http" 11 "os" 12 "strings" 13 "time" 14 15 "github.com/bluesky-social/indigo/repo" 16 "github.com/fxamacker/cbor/v2" 17 "github.com/ipfs/go-cid" 18) 19 20func JSONDecoder(messageChan chan *DatabasePost) OnMessageCB { 21 return func(message []byte) { 22 23 // log.Printf("recv: %s", message) 24 25 var data ComBskyFeedPost 26 err := json.Unmarshal(message, &data) 27 if err != nil { 28 log.Println(fmt.Errorf("failed to unmarshal data", err)) 29 return 30 } 31 32 // log.Printf("parse: %s", data) 33 34 /// /// /// /// /// /// /// /// 35 /// Begin Filtering Logic /// 36 /// /// /// /// /// /// /// /// 37 38 if data.Commit.Operation != "create" { 39 return 40 } 41 42 topic := "unknown" 43 text := data.Commit.Record.Text 44 if MessageIsAboutCoffee(text) { 45 topic = "coffee" 46 } else if MessageSlippedOnBananaPeel(text) { 47 topic = "banana-slip" 48 } else { 49 return 50 } 51 52 /// /// /// /// /// /// /// /// 53 /// End Filtering Logic /// 54 /// /// /// /// /// /// /// /// 55 56 log.Printf("text (topic=%s): %s", topic, text) 57 58 uri := fmt.Sprintf("at://%s/%s/%s", data.Did, data.Commit.Collection, data.Commit.Rev) 59 cid := data.Commit.Cid 60 61 now := time.Now() 62 indexedAt := now.UTC().Format(time.RFC3339) 63 64 post := &DatabasePost{ 65 Uri: uri, 66 Cid: cid, 67 Topic: topic, 68 IndexedAt: indexedAt, 69 } 70 71 // log.Printf("formed: %s", post) 72 messageChan <- post 73 } 74} 75 76func CBORDecoder(messageChan chan *DatabasePost) OnMessageCB { 77 return func(message []byte) { 78 var frame MessageFrameCBOR 79 var err error 80 81 remaining, err := cbor.UnmarshalFirst(message, &frame.Header) 82 if err != nil { 83 log.Println(fmt.Errorf("failed to unmarshal frame header data", err)) 84 return 85 } 86 87 if frame.Header.Kind != "#commit" { 88 return 89 } 90 91 _, err = cbor.UnmarshalFirst(remaining, &frame.Commit) 92 if err != nil { 93 log.Println(fmt.Errorf("failed to unmarshal frame body as commit", err)) 94 return 95 } 96 97 var car *repo.Repo = nil 98 var ctx *context.Context = nil 99 for _, op := range frame.Commit.Ops { 100 if op.Action != "create" || !strings.Contains(op.Path, "app.bsky.feed.post/") { 101 continue 102 } 103 104 // Why do we need to remove the first character? No idea, but it's garbage 105 opCID, err := cid.Cast(op.RawCID[1:]) 106 if err != nil { 107 log.Println(fmt.Errorf("failed to unmarshal CID: ", err)) 108 return 109 } 110 111 if car == nil { 112 blocks := bytes.NewReader(frame.Commit.Blocks) 113 newCtx := context.Background() 114 ctx = &newCtx 115 car, err = repo.ReadRepoFromCar(*ctx, blocks) 116 if err != nil { 117 log.Println(fmt.Errorf("failed to read CAR: ", err)) 118 return 119 } 120 } 121 122 block, err := car.Blockstore().Get(*ctx, opCID) 123 if err != nil { 124 log.Println(fmt.Errorf("failed to retrieve from blockstore: ", err)) 125 return 126 } 127 128 rawData := block.RawData() 129 var blockData BlockCBOR 130 _, err = cbor.UnmarshalFirst(rawData, &blockData) 131 if err != nil { 132 log.Println(fmt.Errorf("failed to unmarshall block data: ", err)) 133 return 134 } 135 136 embedKind := blockData.Embed.Kind 137 if embedKind == "app.bsky.embed.images" && strings.Contains(strings.ToLower(blockData.Text), "coffee") { 138 for _, imageData := range blockData.Embed.Images { 139 // Why do we need to remove the first character? No idea, but it's garbage 140 imageCID, err := cid.Cast(imageData.Image.Ref[1:]) 141 if err != nil { 142 log.Println(fmt.Errorf("failed to unmarshal image CID: ", err)) 143 return 144 } 145 146 // what do we do with imageCID??? 147 imageCDNURL := fmt.Sprintf( 148 "https://cdn.bsky.app/img/feed_thumbnail/plain/%s/%s", 149 frame.Commit.Did, 150 imageCID.String(), 151 ) 152 153 res, err := http.Get(imageCDNURL) 154 if err != nil { 155 log.Println(fmt.Errorf("failed to download image: ", err)) 156 return 157 } 158 159 defer res.Body.Close() 160 // body, err := io.ReadAll(res.Body) 161 // if err != nil { 162 // log.Println(fmt.Errorf("failed to read image: ", err)) 163 // return 164 // } 165 166 file, err := os.Create(fmt.Sprintf("im/%s-%s.jpeg", frame.Commit.Did, imageCID.String())) 167 if err != nil { 168 log.Println(fmt.Errorf("failed to open image file: ", err)) 169 return 170 } 171 defer file.Close() 172 173 _, err = io.Copy(file, res.Body) 174 if err != nil { 175 log.Println(fmt.Errorf("failed to copy to image file: ", err)) 176 return 177 } 178 } 179 } 180 181 // FINALLY 182 text := blockData.Text 183 184 /// /// /// /// /// /// /// /// 185 /// Begin Filtering Logic /// 186 /// /// /// /// /// /// /// /// 187 188 topic := "unknown" 189 if MessageIsAboutCoffee(text) { 190 topic = "coffee" 191 } else if MessageSlippedOnBananaPeel(text) { 192 topic = "banana-slip" 193 } else { 194 return 195 } 196 197 /// /// /// /// /// /// /// /// 198 /// End Filtering Logic /// 199 /// /// /// /// /// /// /// /// 200 201 log.Printf("text (topic=%s): %s", topic, text) 202 203 did := frame.Commit.Did 204 path := op.Path 205 206 uri := fmt.Sprintf("at://%s/%s", did, path) 207 208 now := time.Now() 209 indexedAt := now.UTC().Format(time.RFC3339) 210 211 post := &DatabasePost{ 212 Uri: uri, 213 Cid: opCID.String(), 214 Topic: topic, 215 IndexedAt: indexedAt, 216 } 217 218 // log.Printf("formed: %s", post) 219 messageChan <- post 220 } 221 } 222}