Using a simple backpressure mechanism

This commit is contained in:
david_bai
2025-09-07 23:38:15 +08:00
parent 230a06b3fb
commit 5ca911d1e1
8 changed files with 222 additions and 426 deletions
+68 -36
View File
@@ -20,8 +20,11 @@ export interface NetworkChunk {
*/
export class StreamingFileReader {
// 配置参数
private readonly BATCH_SIZE = TransferConfig.FILE_CONFIG.CHUNK_SIZE * TransferConfig.FILE_CONFIG.BATCH_SIZE; // 32MB批次
private readonly NETWORK_CHUNK_SIZE = TransferConfig.FILE_CONFIG.NETWORK_CHUNK_SIZE; // 64KB网络块
private readonly BATCH_SIZE =
TransferConfig.FILE_CONFIG.CHUNK_SIZE *
TransferConfig.FILE_CONFIG.BATCH_SIZE; // 32MB批次
private readonly NETWORK_CHUNK_SIZE =
TransferConfig.FILE_CONFIG.NETWORK_CHUNK_SIZE; // 64KB网络块
private readonly CHUNKS_PER_BATCH = this.BATCH_SIZE / this.NETWORK_CHUNK_SIZE; // 512块
// 文件状态
@@ -44,7 +47,7 @@ export class StreamingFileReader {
this.totalFileSize = file.size;
this.totalFileOffset = startOffset;
this.fileReader = new FileReader();
postLogToBackend(
`[DEBUG] 📖 StreamingFileReader created - file: ${file.name}, size: ${this.totalFileSize}, startOffset: ${startOffset}`
);
@@ -66,7 +69,7 @@ export class StreamingFileReader {
chunkIndex: this.calculateGlobalChunkIndex(),
totalChunks: this.calculateTotalNetworkChunks(),
fileOffset: this.totalFileOffset,
isLastChunk: true
isLastChunk: true,
};
}
@@ -78,16 +81,21 @@ export class StreamingFileReader {
// 4. 更新状态
this.updateChunkState(networkChunk);
postLogToBackend(
`[DEBUG] ✂️ NETWORK_CHUNK extracted #${globalChunkIndex}/${this.calculateTotalNetworkChunks()} - size: ${networkChunk.byteLength}, isLast: ${isLast}`
);
// 只在关键节点输出日志
if (globalChunkIndex % 100 === 0 || isLast) {
postLogToBackend(
`[PERF] ✂️ CHUNK progress #${globalChunkIndex}/${this.calculateTotalNetworkChunks()} - size: ${
networkChunk.byteLength
}, isLast: ${isLast}`
);
}
return {
chunk: networkChunk,
chunkIndex: globalChunkIndex,
totalChunks: this.calculateTotalNetworkChunks(),
fileOffset: this.totalFileOffset - networkChunk.byteLength,
isLastChunk: isLast
isLastChunk: isLast,
};
}
@@ -96,9 +104,9 @@ export class StreamingFileReader {
*/
private needsNewBatch(): boolean {
return (
this.currentBatch === null || // 还未加载任何批次
this.currentChunkIndexInBatch >= this.CHUNKS_PER_BATCH || // 当前批次用完
this.isCurrentBatchEmpty() // 当前批次已无数据
this.currentBatch === null || // 还未加载任何批次
this.currentChunkIndexInBatch >= this.CHUNKS_PER_BATCH || // 当前批次用完
this.isCurrentBatchEmpty() // 当前批次已无数据
);
}
@@ -107,7 +115,7 @@ export class StreamingFileReader {
*/
private isCurrentBatchEmpty(): boolean {
if (!this.currentBatch) return true;
const usedBytes = this.currentChunkIndexInBatch * this.NETWORK_CHUNK_SIZE;
return usedBytes >= this.currentBatch.byteLength;
}
@@ -119,12 +127,13 @@ export class StreamingFileReader {
if (this.isReading) {
// 防止并发读取
while (this.isReading) {
await new Promise(resolve => setTimeout(resolve, 10));
await new Promise((resolve) => setTimeout(resolve, 10));
}
return;
}
this.isReading = true;
const startTime = performance.now();
try {
// 1. 清理旧批次内存
@@ -140,27 +149,39 @@ export class StreamingFileReader {
}
// 3. 执行大块文件读取
const sliceStartTime = performance.now();
const fileSlice = this.file.slice(
this.totalFileOffset,
this.totalFileOffset,
this.totalFileOffset + batchSize
);
postLogToBackend(
`[DEBUG] 📖 BATCH_READ start - offset: ${this.totalFileOffset}, size: ${batchSize}, remaining: ${remainingFileSize}`
);
const sliceTime = performance.now() - sliceStartTime;
// 4. 异步读取文件数据
const readStartTime = performance.now();
this.currentBatch = await this.readFileSlice(fileSlice);
const readTime = performance.now() - readStartTime;
this.currentBatchStartOffset = this.totalFileOffset;
this.currentChunkIndexInBatch = 0;
const expectedNetworkChunks = Math.ceil(this.currentBatch.byteLength / this.NETWORK_CHUNK_SIZE);
postLogToBackend(
`[DEBUG] ✅ BATCH_LOADED - ${this.currentBatch.byteLength} bytes, networkChunks: ${expectedNetworkChunks}`
);
const totalTime = performance.now() - startTime;
const speedMBps = batchSize / 1024 / 1024 / (totalTime / 1000);
postLogToBackend(
`[PERF] 📖 BATCH_READ - size: ${(batchSize / 1024 / 1024).toFixed(
1
)}MB, total: ${totalTime.toFixed(1)}ms, slice: ${sliceTime.toFixed(
1
)}ms, read: ${readTime.toFixed(1)}ms, speed: ${speedMBps.toFixed(
1
)}MB/s`
);
} catch (error) {
postLogToBackend(`[DEBUG] ❌ BATCH_READ failed: ${error}`);
postLogToBackend(
`[PERF] ❌ BATCH_READ failed after ${(
performance.now() - startTime
).toFixed(1)}ms: ${error}`
);
throw new Error(`Failed to load file batch: ${error}`);
} finally {
this.isReading = false;
@@ -180,11 +201,17 @@ export class StreamingFileReader {
reject(new Error("FileReader result is null"));
}
};
this.fileReader.onerror = () => {
reject(new Error(`File reading failed: ${this.fileReader.error?.message || 'Unknown error'}`));
reject(
new Error(
`File reading failed: ${
this.fileReader.error?.message || "Unknown error"
}`
)
);
};
this.fileReader.readAsArrayBuffer(fileSlice);
});
}
@@ -197,7 +224,8 @@ export class StreamingFileReader {
throw new Error("No current batch available for slicing");
}
const chunkStartInBatch = this.currentChunkIndexInBatch * this.NETWORK_CHUNK_SIZE;
const chunkStartInBatch =
this.currentChunkIndexInBatch * this.NETWORK_CHUNK_SIZE;
const remainingInBatch = this.currentBatch.byteLength - chunkStartInBatch;
const chunkSize = Math.min(this.NETWORK_CHUNK_SIZE, remainingInBatch);
@@ -210,10 +238,7 @@ export class StreamingFileReader {
chunkStartInBatch + chunkSize
);
postLogToBackend(
`[DEBUG] ✂️ SLICE_CHUNK batch[${this.currentChunkIndexInBatch}/${Math.ceil(this.currentBatch.byteLength / this.NETWORK_CHUNK_SIZE)}] - size: ${chunkSize}, remaining: ${remainingInBatch - chunkSize}`
);
// 删除频繁的slice日志,只在需要时输出
return networkChunk;
}
@@ -221,7 +246,9 @@ export class StreamingFileReader {
* 📊 计算全局网络块索引
*/
private calculateGlobalChunkIndex(): number {
const batchesBefore = Math.floor(this.currentBatchStartOffset / this.BATCH_SIZE);
const batchesBefore = Math.floor(
this.currentBatchStartOffset / this.BATCH_SIZE
);
const chunksInPreviousBatches = batchesBefore * this.CHUNKS_PER_BATCH;
return chunksInPreviousBatches + this.currentChunkIndexInBatch;
}
@@ -270,8 +297,11 @@ export class StreamingFileReader {
totalChunks: number;
};
} {
const progressPercent = this.totalFileSize > 0 ? (this.totalFileOffset / this.totalFileSize) * 100 : 0;
const progressPercent =
this.totalFileSize > 0
? (this.totalFileOffset / this.totalFileSize) * 100
: 0;
const result = {
readBytes: this.totalFileOffset,
totalBytes: this.totalFileSize,
@@ -283,7 +313,9 @@ export class StreamingFileReader {
batchStartOffset: this.currentBatchStartOffset,
batchSize: this.currentBatch.byteLength,
chunkIndex: this.currentChunkIndexInBatch,
totalChunks: Math.ceil(this.currentBatch.byteLength / this.NETWORK_CHUNK_SIZE),
totalChunks: Math.ceil(
this.currentBatch.byteLength / this.NETWORK_CHUNK_SIZE
),
};
}
@@ -300,7 +332,7 @@ export class StreamingFileReader {
this.currentBatch = null;
this.currentBatchStartOffset = 0;
this.currentChunkIndexInBatch = 0;
postLogToBackend(
`[DEBUG] 🔄 StreamingFileReader reset - startOffset: ${startOffset}`
);