From 4dcdf0c3a06521d34fe252e7269dcb8fba99c0d2 Mon Sep 17 00:00:00 2001 From: david_bai Date: Sun, 14 Sep 2025 11:44:35 +0800 Subject: [PATCH] chore:The breakpoint resuming file is saved normally --- frontend/lib/receive/ChunkProcessor.ts | 6 +- .../lib/receive/FileReceiveOrchestrator.ts | 2 + frontend/lib/receive/StreamingFileWriter.ts | 68 ++++++++++++++----- .../lib/transfer/FileTransferOrchestrator.ts | 16 +++-- frontend/lib/transfer/StreamingFileReader.ts | 20 +++--- 5 files changed, 76 insertions(+), 36 deletions(-) diff --git a/frontend/lib/receive/ChunkProcessor.ts b/frontend/lib/receive/ChunkProcessor.ts index 604def3..853fabc 100644 --- a/frontend/lib/receive/ChunkProcessor.ts +++ b/frontend/lib/receive/ChunkProcessor.ts @@ -237,12 +237,14 @@ export class ChunkProcessor { const { chunkMeta, absoluteChunkIndex, relativeChunkIndex } = result; const isFirstFew = absoluteChunkIndex <= 3; const isLastFew = relativeChunkIndex >= expectedChunksCount - 3; - const hasIndexMismatch = writerExpectedIndex !== undefined && relativeChunkIndex !== writerExpectedIndex; + + // 🔧 修复:SequencedWriter期望的是绝对索引,不是相对索引 + const hasIndexMismatch = writerExpectedIndex !== undefined && absoluteChunkIndex !== writerExpectedIndex; if (isFirstFew || isLastFew || hasIndexMismatch) { postLogToBackend( `[CHUNK-DETAIL] #${absoluteChunkIndex} rel:${relativeChunkIndex}${ - hasIndexMismatch ? ` MISMATCH(expected:${writerExpectedIndex})` : '' + hasIndexMismatch ? ` MISMATCH(writer expects:${writerExpectedIndex})` : '' } size:${chunkMeta.chunkSize}` ); } diff --git a/frontend/lib/receive/FileReceiveOrchestrator.ts b/frontend/lib/receive/FileReceiveOrchestrator.ts index ade17bc..1d98337 100644 --- a/frontend/lib/receive/FileReceiveOrchestrator.ts +++ b/frontend/lib/receive/FileReceiveOrchestrator.ts @@ -367,12 +367,14 @@ export class FileReceiveOrchestrator implements MessageProcessorDelegate { // Handle disk writing if needed if (reception.sequencedWriter) { + // 🔧 修复:SequencedWriter使用绝对索引,确保传递正确的索引 this.chunkProcessor.logChunkDetails( result, reception.expectedChunksCount, reception.sequencedWriter.expectedIndex ); + // ✅ 正确使用绝对索引进行磁盘写入 await reception.sequencedWriter.writeChunk( result.absoluteChunkIndex, result.chunkData diff --git a/frontend/lib/receive/StreamingFileWriter.ts b/frontend/lib/receive/StreamingFileWriter.ts index 00e9940..9bfd62d 100644 --- a/frontend/lib/receive/StreamingFileWriter.ts +++ b/frontend/lib/receive/StreamingFileWriter.ts @@ -178,42 +178,74 @@ export class SequencedDiskWriter { */ async close(): Promise { try { - // Try to flush all remaining chunks + // 🔧 修复:确保以正确的WriteParams格式写入剩余chunks const remainingIndexes = Array.from(this.writeQueue.keys()).sort( (a, b) => a - b ); - for (const chunkIndex of remainingIndexes) { - const chunk = this.writeQueue.get(chunkIndex)!; - const fileOffset = ReceptionConfig.getOffsetFromChunkIndex(chunkIndex); - await this.stream.seek(fileOffset); - await this.stream.write(chunk); + + if (remainingIndexes.length > 0) { if (ReceptionConfig.DEBUG_CONFIG.ENABLE_CHUNK_LOGGING) { postLogToBackend( - `[DEBUG] 💾 FINAL_FLUSH chunk #${chunkIndex} at cleanup` + `[DEBUG-FINALIZE] 💾 Flushing ${remainingIndexes.length} remaining chunks: [${remainingIndexes.join(',')}]` ); } + + for (const chunkIndex of remainingIndexes) { + const chunk = this.writeQueue.get(chunkIndex)!; + const fileOffset = ReceptionConfig.getOffsetFromChunkIndex(chunkIndex); + + // 🔧 修复:使用正确的WriteParams格式 + await this.stream.seek(fileOffset); + + // 确保chunk是有效的ArrayBuffer + if (!(chunk instanceof ArrayBuffer) || chunk.byteLength === 0) { + if (ReceptionConfig.DEBUG_CONFIG.ENABLE_CHUNK_LOGGING) { + postLogToBackend( + `[DEBUG-FINALIZE] ⚠️ Skipping invalid chunk #${chunkIndex}: ${Object.prototype.toString.call(chunk)}, size: ${chunk.byteLength}` + ); + } + continue; + } + + // 使用标准WriteParams格式写入 + await this.stream.write({ + type: "write", + data: chunk + }); + + if (ReceptionConfig.DEBUG_CONFIG.ENABLE_CHUNK_LOGGING) { + postLogToBackend( + `[DEBUG-FINALIZE] ✅ FINAL_FLUSH chunk #${chunkIndex} (${chunk.byteLength} bytes)` + ); + } + } } } catch (error) { - // Defensive handling: If stream is not writable during close, handle silently - const errorMessage = - error instanceof Error ? error.message : String(error); + // Enhanced error handling with specific error types + const errorMessage = error instanceof Error ? error.message : String(error); + + if (ReceptionConfig.DEBUG_CONFIG.ENABLE_CHUNK_LOGGING) { + postLogToBackend( + `[DEBUG-FINALIZE] ❌ Error during final flush: ${errorMessage}` + ); + } + if ( errorMessage.includes("closing writable stream") || - errorMessage.includes("stream is closed") + errorMessage.includes("stream is closed") || + errorMessage.includes("The stream is not in a state that permits this operation") ) { console.log( - `[SequencedDiskWriter] Stream closed during final flush - data may be incomplete` + `[SequencedDiskWriter] Stream closed during final flush - completing gracefully` ); } else { - console.warn( - `[SequencedDiskWriter] Error during final flush:`, - errorMessage - ); + console.warn(`[SequencedDiskWriter] Unexpected error during final flush:`, errorMessage); throw error; } + } finally { + // 无论如何都要清理队列 + this.writeQueue.clear(); } - - this.writeQueue.clear(); } } diff --git a/frontend/lib/transfer/FileTransferOrchestrator.ts b/frontend/lib/transfer/FileTransferOrchestrator.ts index 842e4e5..832cfab 100644 --- a/frontend/lib/transfer/FileTransferOrchestrator.ts +++ b/frontend/lib/transfer/FileTransferOrchestrator.ts @@ -205,11 +205,11 @@ export class FileTransferOrchestrator implements MessageHandlerDelegate { const peerState = this.stateManager.getPeerState(peerId); const transferStartTime = performance.now(); + // 🔧 修复:记录传输开始时的初始offset,用于后续统计计算 + const initialReadOffset = peerState.readOffset || 0; + // 1. Initialize streaming file reader - const streamReader = new StreamingFileReader( - file, - peerState.readOffset || 0 - ); + const streamReader = new StreamingFileReader(file, initialReadOffset); if (developmentEnv === "development") { postLogToBackend( @@ -304,9 +304,11 @@ export class FileTransferOrchestrator implements MessageHandlerDelegate { if (developmentEnv === "development") { const totalTime = performance.now() - transferStartTime; const avgSpeedMBps = totalBytesSent / 1024 / 1024 / (totalTime / 1000); + + // 🔧 修复:使用正确的初始offset而不是当前readOffset来计算日志统计 + const initialOffset = initialReadOffset || 0; // 传输开始时的offset const expectedTotalChunks = Math.ceil(file.size / 65536); - const startOffset = peerState.readOffset || 0; - const startChunkIndex = Math.floor(startOffset / 65536); + const startChunkIndex = Math.floor(initialOffset / 65536); const expectedChunksSent = expectedTotalChunks - startChunkIndex; postLogToBackend( @@ -315,7 +317,7 @@ export class FileTransferOrchestrator implements MessageHandlerDelegate { ).toFixed(1)}s, speed: ${avgSpeedMBps.toFixed(1)}MB/s` ); postLogToBackend( - `[DEBUG-CHUNKS] Chunks sent: ${networkChunkIndex}, expected: ${expectedChunksSent}, startChunk: ${startChunkIndex}, totalFileChunks: ${expectedTotalChunks}` + `[DEBUG-CHUNKS] Chunks sent: ${networkChunkIndex}, expected: ${expectedChunksSent}, startChunk: ${startChunkIndex}, totalFileChunks: ${expectedTotalChunks}, initialOffset: ${initialOffset}` ); if (networkChunkIndex !== expectedChunksSent) { diff --git a/frontend/lib/transfer/StreamingFileReader.ts b/frontend/lib/transfer/StreamingFileReader.ts index d500dae..e015520 100644 --- a/frontend/lib/transfer/StreamingFileReader.ts +++ b/frontend/lib/transfer/StreamingFileReader.ts @@ -40,6 +40,7 @@ export class StreamingFileReader { // Global state private totalFileOffset = 0; // Current position in the entire file + private startChunkIndex = 0; // 🔧 记录传输起始的chunk索引 private isFinished = false; private isReading = false; // Prevent concurrent reading @@ -51,6 +52,9 @@ export class StreamingFileReader { this.currentBatchStartOffset = startOffset; this.fileReader = new FileReader(); + // 🔧 记录传输的起始chunk索引,用于边界检测 + this.startChunkIndex = Math.floor(startOffset / this.NETWORK_CHUNK_SIZE); + if (developmentEnv === "development") { // 🎯 关键日志1:发送端总结信息 - 使用统一的chunk范围计算逻辑 const chunkRange = ChunkRangeCalculator.getChunkRange( @@ -95,17 +99,15 @@ export class StreamingFileReader { // 🎯 关键日志:边界chunk验证(临时保留用于验证修复效果) if (developmentEnv === "development") { const totalChunks = this.calculateTotalNetworkChunks(); - const currentOffset = this.totalFileOffset - networkChunk.byteLength; - const firstChunkIndex = Math.floor( - currentOffset / this.NETWORK_CHUNK_SIZE - ); - const isFirst = - globalChunkIndex === firstChunkIndex || - (currentOffset === 0 && globalChunkIndex === 0); + + // 🔧 修复:使用简化的边界检测逻辑 + const isFirst = globalChunkIndex === this.startChunkIndex; + const expectedLastChunk = Math.floor((this.totalFileSize - 1) / this.NETWORK_CHUNK_SIZE); + const isRealLast = isLast && globalChunkIndex === expectedLastChunk; - if (isFirst || isLast) { + if (isFirst || isRealLast) { postLogToBackend( - `[BOUNDARY] Chunk #${globalChunkIndex}/${totalChunks}, isFirst: ${isFirst}, isLast: ${isLast}, size: ${networkChunk.byteLength}` + `[BOUNDARY] Chunk #${globalChunkIndex}/${totalChunks}, isFirst: ${isFirst}, isLast: ${isRealLast}, startIdx: ${this.startChunkIndex}, expectedLastIdx: ${expectedLastChunk}, size: ${networkChunk.byteLength}` ); } }