Feed generator written in Golang
1package main
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log"
10 "net/http"
11 "os"
12 "strings"
13 "time"
14
15 "github.com/bluesky-social/indigo/repo"
16 "github.com/fxamacker/cbor/v2"
17 "github.com/ipfs/go-cid"
18)
19
20func JSONDecoder(messageChan chan *DatabasePost) OnMessageCB {
21 return func(message []byte) {
22
23 // log.Printf("recv: %s", message)
24
25 var data ComBskyFeedPost
26 err := json.Unmarshal(message, &data)
27 if err != nil {
28 log.Println(fmt.Errorf("failed to unmarshal data", err))
29 return
30 }
31
32 // log.Printf("parse: %s", data)
33
34 /// /// /// /// /// /// /// ///
35 /// Begin Filtering Logic ///
36 /// /// /// /// /// /// /// ///
37
38 if data.Commit.Operation != "create" {
39 return
40 }
41
42 topic := "unknown"
43 text := data.Commit.Record.Text
44 if MessageIsAboutCoffee(text) {
45 topic = "coffee"
46 } else if MessageSlippedOnBananaPeel(text) {
47 topic = "banana-slip"
48 } else {
49 return
50 }
51
52 /// /// /// /// /// /// /// ///
53 /// End Filtering Logic ///
54 /// /// /// /// /// /// /// ///
55
56 log.Printf("text (topic=%s): %s", topic, text)
57
58 uri := fmt.Sprintf("at://%s/%s/%s", data.Did, data.Commit.Collection, data.Commit.Rev)
59 cid := data.Commit.Cid
60
61 now := time.Now()
62 indexedAt := now.UTC().Format(time.RFC3339)
63
64 post := &DatabasePost{
65 Uri: uri,
66 Cid: cid,
67 Topic: topic,
68 IndexedAt: indexedAt,
69 }
70
71 // log.Printf("formed: %s", post)
72 messageChan <- post
73 }
74}
75
76func CBORDecoder(messageChan chan *DatabasePost) OnMessageCB {
77 return func(message []byte) {
78 var frame MessageFrameCBOR
79 var err error
80
81 remaining, err := cbor.UnmarshalFirst(message, &frame.Header)
82 if err != nil {
83 log.Println(fmt.Errorf("failed to unmarshal frame header data", err))
84 return
85 }
86
87 if frame.Header.Kind != "#commit" {
88 return
89 }
90
91 _, err = cbor.UnmarshalFirst(remaining, &frame.Commit)
92 if err != nil {
93 log.Println(fmt.Errorf("failed to unmarshal frame body as commit", err))
94 return
95 }
96
97 var car *repo.Repo = nil
98 var ctx *context.Context = nil
99 for _, op := range frame.Commit.Ops {
100 if op.Action != "create" || !strings.Contains(op.Path, "app.bsky.feed.post/") {
101 continue
102 }
103
104 // Why do we need to remove the first character? No idea, but it's garbage
105 opCID, err := cid.Cast(op.RawCID[1:])
106 if err != nil {
107 log.Println(fmt.Errorf("failed to unmarshal CID: ", err))
108 return
109 }
110
111 if car == nil {
112 blocks := bytes.NewReader(frame.Commit.Blocks)
113 newCtx := context.Background()
114 ctx = &newCtx
115 car, err = repo.ReadRepoFromCar(*ctx, blocks)
116 if err != nil {
117 log.Println(fmt.Errorf("failed to read CAR: ", err))
118 return
119 }
120 }
121
122 block, err := car.Blockstore().Get(*ctx, opCID)
123 if err != nil {
124 log.Println(fmt.Errorf("failed to retrieve from blockstore: ", err))
125 return
126 }
127
128 rawData := block.RawData()
129 var blockData BlockCBOR
130 _, err = cbor.UnmarshalFirst(rawData, &blockData)
131 if err != nil {
132 log.Println(fmt.Errorf("failed to unmarshall block data: ", err))
133 return
134 }
135
136 embedKind := blockData.Embed.Kind
137 if embedKind == "app.bsky.embed.images" && strings.Contains(strings.ToLower(blockData.Text), "coffee") {
138 for _, imageData := range blockData.Embed.Images {
139 // Why do we need to remove the first character? No idea, but it's garbage
140 imageCID, err := cid.Cast(imageData.Image.Ref[1:])
141 if err != nil {
142 log.Println(fmt.Errorf("failed to unmarshal image CID: ", err))
143 return
144 }
145
146 // what do we do with imageCID???
147 imageCDNURL := fmt.Sprintf(
148 "https://cdn.bsky.app/img/feed_thumbnail/plain/%s/%s",
149 frame.Commit.Did,
150 imageCID.String(),
151 )
152
153 res, err := http.Get(imageCDNURL)
154 if err != nil {
155 log.Println(fmt.Errorf("failed to download image: ", err))
156 return
157 }
158
159 defer res.Body.Close()
160 // body, err := io.ReadAll(res.Body)
161 // if err != nil {
162 // log.Println(fmt.Errorf("failed to read image: ", err))
163 // return
164 // }
165
166 file, err := os.Create(fmt.Sprintf("im/%s-%s.jpeg", frame.Commit.Did, imageCID.String()))
167 if err != nil {
168 log.Println(fmt.Errorf("failed to open image file: ", err))
169 return
170 }
171 defer file.Close()
172
173 _, err = io.Copy(file, res.Body)
174 if err != nil {
175 log.Println(fmt.Errorf("failed to copy to image file: ", err))
176 return
177 }
178 }
179 }
180
181 // FINALLY
182 text := blockData.Text
183
184 /// /// /// /// /// /// /// ///
185 /// Begin Filtering Logic ///
186 /// /// /// /// /// /// /// ///
187
188 topic := "unknown"
189 if MessageIsAboutCoffee(text) {
190 topic = "coffee"
191 } else if MessageSlippedOnBananaPeel(text) {
192 topic = "banana-slip"
193 } else {
194 return
195 }
196
197 /// /// /// /// /// /// /// ///
198 /// End Filtering Logic ///
199 /// /// /// /// /// /// /// ///
200
201 log.Printf("text (topic=%s): %s", topic, text)
202
203 did := frame.Commit.Did
204 path := op.Path
205
206 uri := fmt.Sprintf("at://%s/%s", did, path)
207
208 now := time.Now()
209 indexedAt := now.UTC().Format(time.RFC3339)
210
211 post := &DatabasePost{
212 Uri: uri,
213 Cid: opCID.String(),
214 Topic: topic,
215 IndexedAt: indexedAt,
216 }
217
218 // log.Printf("formed: %s", post)
219 messageChan <- post
220 }
221 }
222}