wip library to store cold objects in s3, warm objects on disk, and hot objects in memory
nodejs typescript
at main 18 kB view raw
1import { 2 S3Client, 3 PutObjectCommand, 4 GetObjectCommand, 5 DeleteObjectCommand, 6 HeadObjectCommand, 7 ListObjectsV2Command, 8 DeleteObjectsCommand, 9 CopyObjectCommand, 10 type S3ClientConfig, 11} from '@aws-sdk/client-s3'; 12import type { Readable } from 'node:stream'; 13import type { StorageTier, StorageMetadata, TierStats, TierGetResult } from '../types/index.js'; 14 15/** 16 * Configuration for S3StorageTier. 17 */ 18export interface S3StorageTierConfig { 19 /** 20 * S3 bucket name. 21 */ 22 bucket: string; 23 24 /** 25 * AWS region. 26 */ 27 region: string; 28 29 /** 30 * Optional S3-compatible endpoint (for R2, Minio, etc.). 31 * 32 * @example 'https://s3.us-east-1.amazonaws.com' 33 * @example 'https://account-id.r2.cloudflarestorage.com' 34 */ 35 endpoint?: string; 36 37 /** 38 * Optional AWS credentials. 39 * 40 * @remarks 41 * If not provided, uses the default AWS credential chain 42 * (environment variables, ~/.aws/credentials, IAM roles, etc.) 43 */ 44 credentials?: { 45 accessKeyId: string; 46 secretAccessKey: string; 47 }; 48 49 /** 50 * Optional key prefix for namespacing. 51 * 52 * @remarks 53 * All keys will be prefixed with this value. 54 * Useful for multi-tenant scenarios or organizing data. 55 * 56 * @example 'tiered-storage/' 57 */ 58 prefix?: string; 59 60 /** 61 * Force path-style addressing for S3-compatible services. 62 * 63 * @defaultValue true 64 * 65 * @remarks 66 * Most S3-compatible services (MinIO, R2, etc.) require path-style URLs. 67 * AWS S3 uses virtual-hosted-style by default, but path-style also works. 68 * 69 * - true: `https://endpoint.com/bucket/key` (path-style) 70 * - false: `https://bucket.endpoint.com/key` (virtual-hosted-style) 71 */ 72 forcePathStyle?: boolean; 73 74 /** 75 * Optional separate bucket for storing metadata. 76 * 77 * @remarks 78 * **RECOMMENDED for production use!** 79 * 80 * By default, metadata is stored in S3 object metadata fields. However, updating 81 * metadata requires copying the entire object, which is slow and expensive for large files. 82 * 83 * When `metadataBucket` is specified, metadata is stored as separate JSON objects 84 * in this bucket. This allows fast, cheap metadata updates without copying data. 85 * 86 * **Benefits:** 87 * - Fast metadata updates (no object copying) 88 * - Much cheaper for large objects 89 * - No impact on data object performance 90 * 91 * **Trade-offs:** 92 * - Requires managing two buckets 93 * - Metadata and data could become out of sync if not handled carefully 94 * - Additional S3 API calls for metadata operations 95 * 96 * @example 97 * ```typescript 98 * const tier = new S3StorageTier({ 99 * bucket: 'my-data-bucket', 100 * metadataBucket: 'my-metadata-bucket', // Separate bucket for metadata 101 * region: 'us-east-1', 102 * }); 103 * ``` 104 */ 105 metadataBucket?: string; 106} 107 108/** 109 * AWS S3 (or compatible) storage tier. 110 * 111 * @remarks 112 * - Supports AWS S3, Cloudflare R2, MinIO, and other S3-compatible services 113 * - Uses object metadata for StorageMetadata 114 * - Requires `@aws-sdk/client-s3` peer dependency 115 * - Typically used as the cold tier (source of truth) 116 * 117 * **Metadata Storage:** 118 * Metadata is stored in S3 object metadata fields: 119 * - Custom metadata fields are prefixed with `x-amz-meta-` 120 * - Built-in fields use standard S3 headers 121 * 122 * @example 123 * ```typescript 124 * const tier = new S3StorageTier({ 125 * bucket: 'my-bucket', 126 * region: 'us-east-1', 127 * credentials: { 128 * accessKeyId: process.env.AWS_ACCESS_KEY_ID!, 129 * secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!, 130 * }, 131 * prefix: 'cache/', 132 * }); 133 * ``` 134 * 135 * @example Cloudflare R2 136 * ```typescript 137 * const tier = new S3StorageTier({ 138 * bucket: 'my-bucket', 139 * region: 'auto', 140 * endpoint: 'https://account-id.r2.cloudflarestorage.com', 141 * credentials: { 142 * accessKeyId: process.env.R2_ACCESS_KEY_ID!, 143 * secretAccessKey: process.env.R2_SECRET_ACCESS_KEY!, 144 * }, 145 * }); 146 * ``` 147 */ 148export class S3StorageTier implements StorageTier { 149 private client: S3Client; 150 private prefix: string; 151 private metadataBucket?: string; 152 153 constructor(private config: S3StorageTierConfig) { 154 const clientConfig: S3ClientConfig = { 155 region: config.region, 156 // Most S3-compatible services need path-style URLs 157 forcePathStyle: config.forcePathStyle ?? true, 158 ...(config.endpoint && { endpoint: config.endpoint }), 159 ...(config.credentials && { credentials: config.credentials }), 160 }; 161 162 this.client = new S3Client(clientConfig); 163 this.prefix = config.prefix ?? ''; 164 if (config.metadataBucket) { 165 this.metadataBucket = config.metadataBucket; 166 } 167 } 168 169 async get(key: string): Promise<Uint8Array | null> { 170 try { 171 const command = new GetObjectCommand({ 172 Bucket: this.config.bucket, 173 Key: this.getS3Key(key), 174 }); 175 176 const response = await this.client.send(command); 177 178 if (!response.Body) { 179 return null; 180 } 181 182 return await this.streamToUint8Array(response.Body as Readable); 183 } catch (error) { 184 if (this.isNoSuchKeyError(error)) { 185 return null; 186 } 187 throw error; 188 } 189 } 190 191 /** 192 * Retrieve data and metadata together in a single operation. 193 * 194 * @param key - The key to retrieve 195 * @returns The data and metadata, or null if not found 196 * 197 * @remarks 198 * When using a separate metadata bucket, fetches data and metadata in parallel. 199 * Otherwise, uses the data object's embedded metadata. 200 */ 201 async getWithMetadata(key: string): Promise<TierGetResult | null> { 202 const s3Key = this.getS3Key(key); 203 204 try { 205 if (this.metadataBucket) { 206 // Fetch data and metadata in parallel 207 const [dataResponse, metadataResponse] = await Promise.all([ 208 this.client.send(new GetObjectCommand({ 209 Bucket: this.config.bucket, 210 Key: s3Key, 211 })), 212 this.client.send(new GetObjectCommand({ 213 Bucket: this.metadataBucket, 214 Key: s3Key + '.meta', 215 })), 216 ]); 217 218 if (!dataResponse.Body || !metadataResponse.Body) { 219 return null; 220 } 221 222 const [data, metaBuffer] = await Promise.all([ 223 this.streamToUint8Array(dataResponse.Body as Readable), 224 this.streamToUint8Array(metadataResponse.Body as Readable), 225 ]); 226 227 const json = new TextDecoder().decode(metaBuffer); 228 const metadata = JSON.parse(json) as StorageMetadata; 229 metadata.createdAt = new Date(metadata.createdAt); 230 metadata.lastAccessed = new Date(metadata.lastAccessed); 231 if (metadata.ttl) { 232 metadata.ttl = new Date(metadata.ttl); 233 } 234 235 return { data, metadata }; 236 } else { 237 // Get data with embedded metadata from response headers 238 const response = await this.client.send(new GetObjectCommand({ 239 Bucket: this.config.bucket, 240 Key: s3Key, 241 })); 242 243 if (!response.Body || !response.Metadata) { 244 return null; 245 } 246 247 const data = await this.streamToUint8Array(response.Body as Readable); 248 const metadata = this.s3ToMetadata(response.Metadata); 249 250 return { data, metadata }; 251 } 252 } catch (error) { 253 if (this.isNoSuchKeyError(error)) { 254 return null; 255 } 256 throw error; 257 } 258 } 259 260 private async streamToUint8Array(stream: Readable): Promise<Uint8Array> { 261 const chunks: Uint8Array[] = []; 262 263 for await (const chunk of stream) { 264 if (Buffer.isBuffer(chunk)) { 265 chunks.push(new Uint8Array(chunk)); 266 } else if (chunk instanceof Uint8Array) { 267 chunks.push(chunk); 268 } else { 269 throw new Error('Unexpected chunk type in S3 stream'); 270 } 271 } 272 273 const totalLength = chunks.reduce((acc, chunk) => acc + chunk.length, 0); 274 const result = new Uint8Array(totalLength); 275 let offset = 0; 276 for (const chunk of chunks) { 277 result.set(chunk, offset); 278 offset += chunk.length; 279 } 280 281 return result; 282 } 283 284 private isNoSuchKeyError(error: unknown): boolean { 285 return ( 286 typeof error === 'object' && 287 error !== null && 288 'name' in error && 289 (error.name === 'NoSuchKey' || error.name === 'NotFound') 290 ); 291 } 292 293 async set(key: string, data: Uint8Array, metadata: StorageMetadata): Promise<void> { 294 const s3Key = this.getS3Key(key); 295 296 if (this.metadataBucket) { 297 const dataCommand = new PutObjectCommand({ 298 Bucket: this.config.bucket, 299 Key: s3Key, 300 Body: data, 301 ContentLength: data.byteLength, 302 }); 303 304 const metadataJson = JSON.stringify(metadata); 305 const metadataBuffer = new TextEncoder().encode(metadataJson); 306 const metadataCommand = new PutObjectCommand({ 307 Bucket: this.metadataBucket, 308 Key: s3Key + '.meta', 309 Body: metadataBuffer, 310 ContentType: 'application/json', 311 }); 312 313 await Promise.all([ 314 this.client.send(dataCommand), 315 this.client.send(metadataCommand), 316 ]); 317 } else { 318 const command = new PutObjectCommand({ 319 Bucket: this.config.bucket, 320 Key: s3Key, 321 Body: data, 322 ContentLength: data.byteLength, 323 Metadata: this.metadataToS3(metadata), 324 }); 325 326 await this.client.send(command); 327 } 328 } 329 330 async delete(key: string): Promise<void> { 331 const s3Key = this.getS3Key(key); 332 333 try { 334 const dataCommand = new DeleteObjectCommand({ 335 Bucket: this.config.bucket, 336 Key: s3Key, 337 }); 338 339 if (this.metadataBucket) { 340 const metadataCommand = new DeleteObjectCommand({ 341 Bucket: this.metadataBucket, 342 Key: s3Key + '.meta', 343 }); 344 345 await Promise.all([ 346 this.client.send(dataCommand), 347 this.client.send(metadataCommand).catch((error) => { 348 if (!this.isNoSuchKeyError(error)) throw error; 349 }), 350 ]); 351 } else { 352 await this.client.send(dataCommand); 353 } 354 } catch (error) { 355 if (!this.isNoSuchKeyError(error)) { 356 throw error; 357 } 358 } 359 } 360 361 async exists(key: string): Promise<boolean> { 362 try { 363 const command = new HeadObjectCommand({ 364 Bucket: this.config.bucket, 365 Key: this.getS3Key(key), 366 }); 367 368 await this.client.send(command); 369 return true; 370 } catch (error) { 371 if (this.isNoSuchKeyError(error)) { 372 return false; 373 } 374 throw error; 375 } 376 } 377 378 async *listKeys(prefix?: string): AsyncIterableIterator<string> { 379 const s3Prefix = prefix ? this.getS3Key(prefix) : this.prefix; 380 let continuationToken: string | undefined; 381 382 do { 383 const command = new ListObjectsV2Command({ 384 Bucket: this.config.bucket, 385 Prefix: s3Prefix, 386 ContinuationToken: continuationToken, 387 }); 388 389 const response = await this.client.send(command); 390 391 if (response.Contents) { 392 for (const object of response.Contents) { 393 if (object.Key) { 394 // Remove prefix to get original key 395 const key = this.removePrefix(object.Key); 396 yield key; 397 } 398 } 399 } 400 401 continuationToken = response.NextContinuationToken; 402 } while (continuationToken); 403 } 404 405 async deleteMany(keys: string[]): Promise<void> { 406 if (keys.length === 0) return; 407 408 const batchSize = 1000; 409 410 for (let i = 0; i < keys.length; i += batchSize) { 411 const batch = keys.slice(i, i + batchSize); 412 413 const dataCommand = new DeleteObjectsCommand({ 414 Bucket: this.config.bucket, 415 Delete: { 416 Objects: batch.map((key) => ({ Key: this.getS3Key(key) })), 417 }, 418 }); 419 420 if (this.metadataBucket) { 421 const metadataCommand = new DeleteObjectsCommand({ 422 Bucket: this.metadataBucket, 423 Delete: { 424 Objects: batch.map((key) => ({ Key: this.getS3Key(key) + '.meta' })), 425 }, 426 }); 427 428 await Promise.all([ 429 this.client.send(dataCommand), 430 this.client.send(metadataCommand).catch(() => {}), 431 ]); 432 } else { 433 await this.client.send(dataCommand); 434 } 435 } 436 } 437 438 async getMetadata(key: string): Promise<StorageMetadata | null> { 439 if (this.metadataBucket) { 440 try { 441 const command = new GetObjectCommand({ 442 Bucket: this.metadataBucket, 443 Key: this.getS3Key(key) + '.meta', 444 }); 445 446 const response = await this.client.send(command); 447 448 if (!response.Body) { 449 return null; 450 } 451 452 const buffer = await this.streamToUint8Array(response.Body as Readable); 453 const json = new TextDecoder().decode(buffer); 454 const metadata = JSON.parse(json) as StorageMetadata; 455 456 metadata.createdAt = new Date(metadata.createdAt); 457 metadata.lastAccessed = new Date(metadata.lastAccessed); 458 if (metadata.ttl) { 459 metadata.ttl = new Date(metadata.ttl); 460 } 461 462 return metadata; 463 } catch (error) { 464 if (this.isNoSuchKeyError(error)) { 465 return null; 466 } 467 throw error; 468 } 469 } 470 471 try { 472 const command = new HeadObjectCommand({ 473 Bucket: this.config.bucket, 474 Key: this.getS3Key(key), 475 }); 476 477 const response = await this.client.send(command); 478 479 if (!response.Metadata) { 480 return null; 481 } 482 483 return this.s3ToMetadata(response.Metadata); 484 } catch (error) { 485 if (this.isNoSuchKeyError(error)) { 486 return null; 487 } 488 throw error; 489 } 490 } 491 492 async setMetadata(key: string, metadata: StorageMetadata): Promise<void> { 493 if (this.metadataBucket) { 494 const metadataJson = JSON.stringify(metadata); 495 const buffer = new TextEncoder().encode(metadataJson); 496 497 const command = new PutObjectCommand({ 498 Bucket: this.metadataBucket, 499 Key: this.getS3Key(key) + '.meta', 500 Body: buffer, 501 ContentType: 'application/json', 502 }); 503 504 await this.client.send(command); 505 return; 506 } 507 508 const s3Key = this.getS3Key(key); 509 const command = new CopyObjectCommand({ 510 Bucket: this.config.bucket, 511 Key: s3Key, 512 CopySource: `${this.config.bucket}/${s3Key}`, 513 Metadata: this.metadataToS3(metadata), 514 MetadataDirective: 'REPLACE', 515 }); 516 517 await this.client.send(command); 518 } 519 520 async getStats(): Promise<TierStats> { 521 let bytes = 0; 522 let items = 0; 523 524 // List all objects and sum up sizes 525 let continuationToken: string | undefined; 526 527 do { 528 const command = new ListObjectsV2Command({ 529 Bucket: this.config.bucket, 530 Prefix: this.prefix, 531 ContinuationToken: continuationToken, 532 }); 533 534 const response = await this.client.send(command); 535 536 if (response.Contents) { 537 for (const object of response.Contents) { 538 items++; 539 bytes += object.Size ?? 0; 540 } 541 } 542 543 continuationToken = response.NextContinuationToken; 544 } while (continuationToken); 545 546 return { bytes, items }; 547 } 548 549 async clear(): Promise<void> { 550 // List and delete all objects with the prefix 551 const keys: string[] = []; 552 553 for await (const key of this.listKeys()) { 554 keys.push(key); 555 } 556 557 await this.deleteMany(keys); 558 } 559 560 /** 561 * Get the full S3 key including prefix. 562 */ 563 private getS3Key(key: string): string { 564 return this.prefix + key; 565 } 566 567 /** 568 * Remove the prefix from an S3 key to get the original key. 569 */ 570 private removePrefix(s3Key: string): string { 571 if (this.prefix && s3Key.startsWith(this.prefix)) { 572 return s3Key.slice(this.prefix.length); 573 } 574 return s3Key; 575 } 576 577 /** 578 * Convert StorageMetadata to S3 metadata format. 579 * 580 * @remarks 581 * S3 metadata keys must be lowercase and values must be strings. 582 * We serialize complex values as JSON. 583 */ 584 private metadataToS3(metadata: StorageMetadata): Record<string, string> { 585 return { 586 key: metadata.key, 587 size: metadata.size.toString(), 588 createdat: metadata.createdAt.toISOString(), 589 lastaccessed: metadata.lastAccessed.toISOString(), 590 accesscount: metadata.accessCount.toString(), 591 compressed: metadata.compressed.toString(), 592 checksum: metadata.checksum, 593 ...(metadata.ttl && { ttl: metadata.ttl.toISOString() }), 594 ...(metadata.mimeType && { mimetype: metadata.mimeType }), 595 ...(metadata.encoding && { encoding: metadata.encoding }), 596 ...(metadata.customMetadata && { custom: JSON.stringify(metadata.customMetadata) }), 597 }; 598 } 599 600 /** 601 * Convert S3 metadata to StorageMetadata format. 602 */ 603 private s3ToMetadata(s3Metadata: Record<string, string>): StorageMetadata { 604 const metadata: StorageMetadata = { 605 key: s3Metadata.key ?? '', 606 size: parseInt(s3Metadata.size ?? '0', 10), 607 createdAt: new Date(s3Metadata.createdat ?? Date.now()), 608 lastAccessed: new Date(s3Metadata.lastaccessed ?? Date.now()), 609 accessCount: parseInt(s3Metadata.accesscount ?? '0', 10), 610 compressed: s3Metadata.compressed === 'true', 611 checksum: s3Metadata.checksum ?? '', 612 }; 613 614 if (s3Metadata.ttl) { 615 metadata.ttl = new Date(s3Metadata.ttl); 616 } 617 618 if (s3Metadata.mimetype) { 619 metadata.mimeType = s3Metadata.mimetype; 620 } 621 622 if (s3Metadata.encoding) { 623 metadata.encoding = s3Metadata.encoding; 624 } 625 626 if (s3Metadata.custom) { 627 try { 628 metadata.customMetadata = JSON.parse(s3Metadata.custom); 629 } catch { 630 // Ignore invalid JSON 631 } 632 } 633 634 return metadata; 635 } 636}