"use strict"; var __importDefault = (this && this.__importDefault) || function (mod) { return (mod && mod.__esModule) ? mod : { "default": mod }; }; Object.defineProperty(exports, "__esModule", { value: true }); exports.BlockedSource = void 0; const quick_lru_1 = __importDefault(require("quick-lru")); const basesource_js_1 = require("./basesource.js"); const utils_js_1 = require("../utils.js"); class Block { /** * * @param {number} offset * @param {number} length * @param {ArrayBuffer} [data] */ constructor(offset, length, data = null) { this.offset = offset; this.length = length; this.data = data; } /** * @returns {number} the top byte border */ get top() { return this.offset + this.length; } } class BlockGroup { /** * * @param {number} offset * @param {number} length * @param {number[]} blockIds */ constructor(offset, length, blockIds) { this.offset = offset; this.length = length; this.blockIds = blockIds; } } class BlockedSource extends basesource_js_1.BaseSource { /** * * @param {BaseSource} source The underlying source that shall be blocked and cached * @param {object} options * @param {number} [options.blockSize] * @param {number} [options.cacheSize] */ constructor(source, { blockSize = 65536, cacheSize = 100 } = {}) { super(); this.source = source; this.blockSize = blockSize; this.blockCache = new quick_lru_1.default({ maxSize: cacheSize, onEviction: (blockId, block) => { this.evictedBlocks.set(blockId, block); }, }); /** @type {Map} */ this.evictedBlocks = new Map(); // mapping blockId -> Block instance this.blockRequests = new Map(); // set of blockIds missing for the current requests this.blockIdsToFetch = new Set(); this.abortedBlockIds = new Set(); } get fileSize() { return this.source.fileSize; } /** * * @param {import("./basesource").Slice[]} slices */ async fetch(slices, signal) { const blockRequests = []; const missingBlockIds = []; const allBlockIds = []; this.evictedBlocks.clear(); for (const { offset, length } of slices) { let top = offset + length; const { fileSize } = this; if (fileSize !== null) { top = Math.min(top, fileSize); } const firstBlockOffset = Math.floor(offset / this.blockSize) * this.blockSize; for (let current = firstBlockOffset; current < top; current += this.blockSize) { const blockId = Math.floor(current / this.blockSize); if (!this.blockCache.has(blockId) && !this.blockRequests.has(blockId)) { this.blockIdsToFetch.add(blockId); missingBlockIds.push(blockId); } if (this.blockRequests.has(blockId)) { blockRequests.push(this.blockRequests.get(blockId)); } allBlockIds.push(blockId); } } // allow additional block requests to accumulate await (0, utils_js_1.wait)(); this.fetchBlocks(signal); // Gather all of the new requests that this fetch call is contributing to `fetch`. const missingRequests = []; for (const blockId of missingBlockIds) { // The requested missing block could already be in the cache // instead of having its request still be outstanding. if (this.blockRequests.has(blockId)) { missingRequests.push(this.blockRequests.get(blockId)); } } // Actually await all pending requests that are needed for this `fetch`. await Promise.allSettled(blockRequests); await Promise.allSettled(missingRequests); // Perform retries if a block was interrupted by a previous signal const abortedBlockRequests = []; const abortedBlockIds = allBlockIds .filter((id) => this.abortedBlockIds.has(id) || !this.blockCache.has(id)); abortedBlockIds.forEach((id) => this.blockIdsToFetch.add(id)); // start the retry of some blocks if required if (abortedBlockIds.length > 0 && signal && !signal.aborted) { this.fetchBlocks(null); for (const blockId of abortedBlockIds) { const block = this.blockRequests.get(blockId); if (!block) { throw new Error(`Block ${blockId} is not in the block requests`); } abortedBlockRequests.push(block); } await Promise.allSettled(abortedBlockRequests); } // throw an abort error if (signal && signal.aborted) { throw new utils_js_1.AbortError('Request was aborted'); } const blocks = allBlockIds.map((id) => this.blockCache.get(id) || this.evictedBlocks.get(id)); const failedBlocks = blocks.filter((i) => !i); if (failedBlocks.length) { throw new utils_js_1.AggregateError(failedBlocks, 'Request failed'); } // create a final Map, with all required blocks for this request to satisfy const requiredBlocks = new Map((0, utils_js_1.zip)(allBlockIds, blocks)); // TODO: satisfy each slice return this.readSliceData(slices, requiredBlocks); } /** * * @param {AbortSignal} signal */ fetchBlocks(signal) { // check if we still need to if (this.blockIdsToFetch.size > 0) { const groups = this.groupBlocks(this.blockIdsToFetch); // start requesting slices of data const groupRequests = this.source.fetch(groups, signal); for (let groupIndex = 0; groupIndex < groups.length; ++groupIndex) { const group = groups[groupIndex]; for (const blockId of group.blockIds) { // make an async IIFE for each block this.blockRequests.set(blockId, (async () => { try { const response = (await groupRequests)[groupIndex]; const blockOffset = blockId * this.blockSize; const o = blockOffset - response.offset; const t = Math.min(o + this.blockSize, response.data.byteLength); const data = response.data.slice(o, t); const block = new Block(blockOffset, data.byteLength, data, blockId); this.blockCache.set(blockId, block); this.abortedBlockIds.delete(blockId); } catch (err) { if (err.name === 'AbortError') { // store the signal here, we need it to determine later if an // error was caused by this signal err.signal = signal; this.blockCache.delete(blockId); this.abortedBlockIds.add(blockId); } else { throw err; } } finally { this.blockRequests.delete(blockId); } })()); } } this.blockIdsToFetch.clear(); } } /** * * @param {Set} blockIds * @returns {BlockGroup[]} */ groupBlocks(blockIds) { const sortedBlockIds = Array.from(blockIds).sort((a, b) => a - b); if (sortedBlockIds.length === 0) { return []; } let current = []; let lastBlockId = null; const groups = []; for (const blockId of sortedBlockIds) { if (lastBlockId === null || lastBlockId + 1 === blockId) { current.push(blockId); lastBlockId = blockId; } else { groups.push(new BlockGroup(current[0] * this.blockSize, current.length * this.blockSize, current)); current = [blockId]; lastBlockId = blockId; } } groups.push(new BlockGroup(current[0] * this.blockSize, current.length * this.blockSize, current)); return groups; } /** * * @param {import("./basesource").Slice[]} slices * @param {Map} blocks */ readSliceData(slices, blocks) { return slices.map((slice) => { let top = slice.offset + slice.length; if (this.fileSize !== null) { top = Math.min(this.fileSize, top); } const blockIdLow = Math.floor(slice.offset / this.blockSize); const blockIdHigh = Math.floor(top / this.blockSize); const sliceData = new ArrayBuffer(slice.length); const sliceView = new Uint8Array(sliceData); for (let blockId = blockIdLow; blockId <= blockIdHigh; ++blockId) { const block = blocks.get(blockId); const delta = block.offset - slice.offset; const topDelta = block.top - top; let blockInnerOffset = 0; let rangeInnerOffset = 0; let usedBlockLength; if (delta < 0) { blockInnerOffset = -delta; } else if (delta > 0) { rangeInnerOffset = delta; } if (topDelta < 0) { usedBlockLength = block.length - blockInnerOffset; } else { usedBlockLength = top - block.offset - blockInnerOffset; } const blockView = new Uint8Array(block.data, blockInnerOffset, usedBlockLength); sliceView.set(blockView, rangeInnerOffset); } return sliceData; }); } } exports.BlockedSource = BlockedSource; //# sourceMappingURL=blockedsource.js.map