Fix the out-of-order file transfer issue for files saved in memory

This commit is contained in:
david_bai
2025-09-06 22:53:54 +08:00
parent 81c2b204f3
commit e385389e1d
3 changed files with 315 additions and 157 deletions
+162 -98
View File
@@ -19,15 +19,16 @@ import {
FileRequest,
FileReceiveComplete,
FolderReceiveComplete,
EmbeddedChunkMeta,
} from "@/types/webrtc";
import { postLogToBackend } from "@/app/config/api";
/**
* Manages the state of an active file reception.
* 🚀 新版本:管理按序列化融合数据包的文件接收状态
*/
interface ActiveFileReception {
meta: fileMetadata; // If meta is present, it means this file is currently being received; null means no file is being received.
chunks: (ArrayBuffer | null)[]; // Received file chunks (stored in memory).
chunks: (ArrayBuffer | null)[]; // 按序号排列的数据块数组
receivedSize: number;
initialOffset: number; // For resuming downloads
fileHandle: FileSystemFileHandle | null; // Object related to writing to disk -- current file.
@@ -36,10 +37,10 @@ interface ActiveFileReception {
resolve: () => void;
reject: (reason?: any) => void;
};
// 新增:用于跟踪数据接收统计
// 🚀 新版本:简化的按序接收管理
receivedChunksCount: number; // 实际接收到的chunk数量
expectedChunksCount: number; // 预期的chunk数量
lastChunkIndex: number; // 最后接收的chunk索引
chunkSequenceMap: Map<number, boolean>; // 跟踪哪些chunk已经接收(用于chunk序号)
isFinalized?: boolean; // 防止重复finalize的标记
}
@@ -190,16 +191,16 @@ class FileReceiver {
this.activeFileReception = {
meta: fileInfo,
chunks: [],
chunks: new Array(expectedChunksCount).fill(null), // 🚀 初始化为按索引排列的空数组
receivedSize: 0,
initialOffset: offset,
fileHandle: null,
writeStream: null,
completionNotifier: { resolve, reject },
// 新增统计字段
// 🚀 新版本:简化的按序接收管理
receivedChunksCount: 0,
expectedChunksCount: expectedChunksCount,
lastChunkIndex: -1,
chunkSequenceMap: new Map<number, boolean>(),
};
postLogToBackend(
@@ -343,6 +344,60 @@ class FileReceiver {
}
}
/**
* 🚀 新增:解析融合数据包
* 格式: [4字节长度] + [JSON元数据] + [实际chunk数据]
*/
private parseEmbeddedChunkPacket(arrayBuffer: ArrayBuffer): {
chunkMeta: EmbeddedChunkMeta;
chunkData: ArrayBuffer;
} | null {
try {
// 1. 检查数据包最小长度
if (arrayBuffer.byteLength < 4) {
postLogToBackend(`[DEBUG] ❌ Invalid embedded packet - too small: ${arrayBuffer.byteLength}`);
return null;
}
// 2. 读取元数据长度(4字节)
const lengthView = new Uint32Array(arrayBuffer, 0, 1);
const metaLength = lengthView[0];
// 3. 验证数据包的完整性
const expectedTotalLength = 4 + metaLength;
if (arrayBuffer.byteLength < expectedTotalLength) {
postLogToBackend(`[DEBUG] ❌ Incomplete embedded packet - expected: ${expectedTotalLength}, got: ${arrayBuffer.byteLength}`);
return null;
}
// 4. 提取元数据部分
const metaBytes = new Uint8Array(arrayBuffer, 4, metaLength);
const metaJson = new TextDecoder().decode(metaBytes);
const chunkMeta: EmbeddedChunkMeta = JSON.parse(metaJson);
// 5. 提取实际chunk数据部分
const chunkDataStart = 4 + metaLength;
const chunkData = arrayBuffer.slice(chunkDataStart);
// 6. 验证chunk数据大小
if (chunkData.byteLength !== chunkMeta.chunkSize) {
postLogToBackend(
`[DEBUG] ⚠️ Chunk size mismatch - meta: ${chunkMeta.chunkSize}, actual: ${chunkData.byteLength}`
);
}
postLogToBackend(
`[DEBUG] 📦 PARSED embedded packet - chunkIndex: ${chunkMeta.chunkIndex}/${chunkMeta.totalChunks}, chunkSize: ${chunkData.byteLength}, isLast: ${chunkMeta.isLastChunk}`
);
return { chunkMeta, chunkData };
} catch (error) {
postLogToBackend(`[DEBUG] ❌ Failed to parse embedded packet: ${error}`);
return null;
}
}
private async handleReceivedData(
data: string | ArrayBuffer | any,
peerId: string
@@ -366,7 +421,7 @@ class FileReceiver {
this.fireError("Error parsing received JSON data", { error });
}
} else {
// 处理各种格式的二进制数据 - Firefox兼容性修复
// 🚀 新版本:处理融合数据 - 彻底解决Firefox乱序问题
const arrayBuffer = await this.convertToArrayBuffer(data);
if (arrayBuffer) {
@@ -381,8 +436,8 @@ class FileReceiver {
return;
}
this.updateProgress(arrayBuffer.byteLength);
await this.handleFileChunk(arrayBuffer);
// 🚀 统一处理:所有数据都作为融合数据包处理
await this.handleEmbeddedChunkPacket(arrayBuffer);
} else {
postLogToBackend(
`[Firefox Debug] ERROR: Failed to convert binary data to ArrayBuffer`
@@ -452,64 +507,95 @@ class FileReceiver {
// endregion
// region File and Folder Processing
private async handleFileChunk(chunk: ArrayBuffer): Promise<void> {
if (!this.activeFileReception) return;
// 🐛 DEBUG: 记录接收到的原始chunk信息
const currentChunkIndex = this.activeFileReception.receivedChunksCount;
// 更新统计信息
this.activeFileReception.receivedChunksCount++;
this.activeFileReception.lastChunkIndex = Math.max(
this.activeFileReception.lastChunkIndex,
currentChunkIndex
);
// 更新进度统计
this.updateProgress(chunk.byteLength);
if (this.activeFileReception.writeStream) {
await this.writeLargeFileChunk(chunk);
} else {
// 存储chunk到内存
this.activeFileReception.chunks.push(chunk);
// 🐛 DEBUG: 验证存储结果
const storedChunk =
this.activeFileReception.chunks[
this.activeFileReception.chunks.length - 1
];
const currentTotalSize = this.activeFileReception.chunks.reduce(
(sum, c) => sum + (c?.byteLength || 0),
0
);
/**
* 🚀 新版本:处理融合数据包
*/
private async handleEmbeddedChunkPacket(arrayBuffer: ArrayBuffer): Promise<void> {
const parsed = this.parseEmbeddedChunkPacket(arrayBuffer);
if (!parsed) {
this.fireError("Failed to parse embedded chunk packet");
return;
}
const { chunkMeta, chunkData } = parsed;
const reception = this.activeFileReception!;
// 验证fileId匹配
if (chunkMeta.fileId !== reception.meta.fileId) {
postLogToBackend(
`[DEBUG] 📦 STORED chunk#${currentChunkIndex} - original: ${
chunk.byteLength
}, stored: ${
storedChunk?.byteLength || "null"
}, total: ${currentTotalSize}`
`[DEBUG] ⚠️ FileId mismatch - expected: ${reception.meta.fileId}, got: ${chunkMeta.fileId}`
);
return;
}
// 🐛 DEBUG: 特别关注最后几个chunks
if (currentChunkIndex >= 65) {
postLogToBackend(
`[DEBUG] 🔍 CRITICAL_CHUNK#${currentChunkIndex} - input: ${
chunk.byteLength
}, stored: ${storedChunk?.byteLength}, isLast: ${
currentChunkIndex >= 67
}`
);
// 更新预期 chunks 数量(可能与初始预估不同)
if (chunkMeta.totalChunks !== reception.expectedChunksCount) {
postLogToBackend(
`[DEBUG] ⚠️ Chunk count adjustment - expected: ${reception.expectedChunksCount}, actual: ${chunkMeta.totalChunks}`
);
reception.expectedChunksCount = chunkMeta.totalChunks;
// 调整chunks数组大小
if (reception.chunks.length < chunkMeta.totalChunks) {
const newChunks = new Array(chunkMeta.totalChunks).fill(null);
reception.chunks.forEach((chunk, index) => {
if (index < newChunks.length) newChunks[index] = chunk;
});
reception.chunks = newChunks;
}
}
// 按序号存储chunk
const chunkIndex = chunkMeta.chunkIndex;
if (chunkIndex >= 0 && chunkIndex < reception.chunks.length) {
reception.chunks[chunkIndex] = chunkData;
reception.chunkSequenceMap.set(chunkIndex, true);
reception.receivedChunksCount++;
postLogToBackend(
`[DEBUG] ✓ SEQUENCED chunk #${chunkIndex}/${chunkMeta.totalChunks} stored - size: ${chunkData.byteLength}, isLast: ${chunkMeta.isLastChunk}`
);
// 更新进度
this.updateProgress(chunkData.byteLength);
if (reception.writeStream) {
// 对于大文件直写模式,按序写入
await this.writeSequencedLargeFileChunk(chunkData, chunkIndex, chunkMeta.fileOffset);
}
} else {
postLogToBackend(
`[DEBUG] ❌ Invalid chunk index - ${chunkIndex}, expected 0-${reception.chunks.length - 1}`
);
}
await this.checkAndAutoFinalize();
}
/**
* 🚀 新增:按序写入大文件数据块
*/
private async writeSequencedLargeFileChunk(
chunk: ArrayBuffer,
chunkIndex: number,
fileOffset: number
): Promise<void> {
const stream = this.activeFileReception?.writeStream;
if (!stream) return;
try {
// 对于按序写入,可能需要seek到指定位置
// 这里简化处理,假设按序接收就直接写入
await stream.write(chunk);
this.activeFileReception!.chunks[chunkIndex] = null; // Mark as written
} catch (error) {
this.fireError("Error writing sequenced chunk to disk", { error, chunkIndex, fileOffset });
}
}
/**
* 🚀 新流程:自动检查数据完整性并触发finalize
* 不再依赖发送端的fileEnd信号,接收端自主判断完成
* 🚀 新版本:统一的自动完成检查 - 支持融合数据包和旧格式
*/
private async checkAndAutoFinalize(): Promise<void> {
if (!this.activeFileReception) return;
@@ -524,13 +610,24 @@ class FileReceiver {
}, 0);
const expectedSize = reception.meta.size;
const chunksComplete = receivedChunks >= expectedChunks;
// 🚀 统一完整性检查:按序接收模式
let sequencedCount = 0;
for (let i = 0; i < expectedChunks; i++) {
if (reception.chunks[i] instanceof ArrayBuffer) {
sequencedCount++;
}
}
const isSequencedComplete = sequencedCount === expectedChunks;
const sizeComplete = currentTotalSize >= expectedSize;
const isDataComplete = chunksComplete && sizeComplete;
const isDataComplete = isSequencedComplete && sizeComplete;
postLogToBackend(
`[DEBUG] 🔄 Progress check - chunks: ${receivedChunks}/${expectedChunks}, size: ${currentTotalSize}/${expectedSize}, complete: ${isDataComplete}, isFinalized:${reception.isFinalized}`
);
// 更频繁的调试信息只在接近完成时显示
if (receivedChunks % 10 === 0 || receivedChunks >= expectedChunks - 5 || isDataComplete) {
postLogToBackend(
`[DEBUG] 🔄 SEQUENCED progress - received: ${sequencedCount}/${expectedChunks}, total: ${currentTotalSize}/${expectedSize}, complete: ${isDataComplete}`
);
}
// 防止重复finalize
if (reception.isFinalized) {
@@ -539,7 +636,7 @@ class FileReceiver {
if (isDataComplete) {
postLogToBackend(
`[DEBUG] 🎯 TRIGGERING finalize - chunks: ${receivedChunks}/${expectedChunks}, size: ${currentTotalSize}/${expectedSize}`
`[DEBUG] 🎯 TRIGGERING finalize - chunks: ${sequencedCount}/${expectedChunks}, size: ${currentTotalSize}/${expectedSize}`
);
reception.isFinalized = true;
@@ -697,54 +794,21 @@ class FileReceiver {
`[DEBUG] 🔍 FINALIZE START - fileName: ${reception.meta.name}, expectedSize: ${reception.meta.size}, chunksArray: ${reception.chunks.length}`
);
// 🐛 DEBUG: 详细分析每个chunk
// 🚀 简化版:验证按序接收的数据
let totalChunkSize = 0;
let validChunks = 0;
const chunkDetails: string[] = [];
reception.chunks.forEach((chunk, index) => {
if (chunk instanceof ArrayBuffer) {
validChunks++;
totalChunkSize += chunk.byteLength;
// 🐛 DEBUG: 特别关注最后几个chunks
if (index >= reception.chunks.length - 5) {
chunkDetails.push(`chunk#${index}: ${chunk.byteLength}bytes`);
postLogToBackend(
`[DEBUG] 🔍 FINAL_CHUNK_ANALYSIS - index: ${index}, size: ${
chunk.byteLength
}, isLast: ${index === reception.chunks.length - 1}`
);
}
// 检测异常大小
if (chunk.byteLength !== 65536 && index < reception.chunks.length - 1) {
postLogToBackend(
`[DEBUG] ⚠️ UNEXPECTED_SIZE - chunk#${index}: ${chunk.byteLength} (should be 65536)`
);
}
} else {
postLogToBackend(
`[DEBUG] ❌ INVALID_CHUNK - index: ${index}, type: ${Object.prototype.toString.call(
chunk
)}`
);
}
});
// 🐛 DEBUG: 总体分析
postLogToBackend(
`[DEBUG] 📊 CHUNK_SUMMARY - valid: ${validChunks}/${
reception.chunks.length
}, totalSize: ${totalChunkSize}, expected: ${
reception.meta.size
}, diff: ${reception.meta.size - totalChunkSize}`
`[DEBUG] 📊 SEQUENCED_SUMMARY - valid: ${validChunks}/${reception.chunks.length}, totalSize: ${totalChunkSize}, expected: ${reception.meta.size}`
);
if (chunkDetails.length > 0) {
postLogToBackend(`[DEBUG] 🔍 FINAL_CHUNKS: ${chunkDetails.join(", ")}`);
}
// 最终验证
const sizeDifference = reception.meta.size - totalChunkSize;
if (sizeDifference !== 0) {
+138 -59
View File
@@ -16,6 +16,7 @@ import {
FileRequest,
FileReceiveComplete,
FolderReceiveComplete,
EmbeddedChunkMeta,
} from "@/types/webrtc";
import { postLogToBackend } from "@/app/config/api";
@@ -441,6 +442,68 @@ class FileSender {
}
}
// 🚀 新版本:构建融合元数据的数据包
private createEmbeddedChunkPacket(
chunkData: ArrayBuffer,
chunkMeta: EmbeddedChunkMeta
): ArrayBuffer {
// 1. 将元数据序列化为JSON
const metaJson = JSON.stringify(chunkMeta);
const metaBytes = new TextEncoder().encode(metaJson);
// 2. 元数据长度(4字节)
const metaLengthBuffer = new ArrayBuffer(4);
const metaLengthView = new Uint32Array(metaLengthBuffer);
metaLengthView[0] = metaBytes.length;
// 3. 构建最终的融合数据包
const totalLength = 4 + metaBytes.length + chunkData.byteLength;
const finalPacket = new Uint8Array(totalLength);
// 拼接: [4字节长度] + [元数据] + [原始chunk数据]
finalPacket.set(new Uint8Array(metaLengthBuffer), 0);
finalPacket.set(metaBytes, 4);
finalPacket.set(new Uint8Array(chunkData), 4 + metaBytes.length);
postLogToBackend(
`[DEBUG] 📦 EMBEDDED packet created - chunkIndex: ${chunkMeta.chunkIndex}, metaSize: ${metaBytes.length}, chunkSize: ${chunkData.byteLength}, totalSize: ${totalLength}`
);
return finalPacket.buffer;
}
// 🚀 新版本:发送带序号的融合数据包
private async sendEmbeddedChunk(
chunkData: ArrayBuffer,
fileId: string,
chunkIndex: number,
totalChunks: number,
fileOffset: number,
peerId: string
): Promise<void> {
const isLastChunk = chunkIndex === totalChunks - 1;
// 1. 创建元数据
const chunkMeta: EmbeddedChunkMeta = {
chunkIndex,
totalChunks,
chunkSize: chunkData.byteLength,
isLastChunk,
fileOffset,
fileId,
};
// 2. 构建融合数据包
const embeddedPacket = this.createEmbeddedChunkPacket(chunkData, chunkMeta);
// 3. 🔧 关键修复:融合数据包不能被分片,直接发送
await this.sendSingleData(embeddedPacket, peerId);
postLogToBackend(
`[DEBUG] ✓ EMBEDDED chunk #${chunkIndex}/${totalChunks} sent - ${chunkData.byteLength} bytes, packet: ${embeddedPacket.byteLength} bytes, isLast: ${isLastChunk}`
);
}
// New: Send large ArrayBuffer in fragments
private async sendLargeArrayBuffer(
data: ArrayBuffer,
@@ -473,7 +536,7 @@ class FileSender {
}
}
// New: Send single data packet with active polling backpressure control
// 🚀 修复版本:发送单个数据包(禁止分片)
private async sendSingleData(
data: string | ArrayBuffer,
peerId: string
@@ -497,10 +560,18 @@ class FileSender {
? data.byteLength
: 0;
// 🚀 关键修复:检查数据包大小,如果超过64KB则需要警告
const maxSafeSize = 64 * 1024; // 64KB
if (data instanceof ArrayBuffer && data.byteLength > maxSafeSize) {
postLogToBackend(
`[DEBUG] ⚠️ Large embedded packet detected: ${data.byteLength} bytes, this may cause issues`
);
}
// Intelligent send control - decide sending strategy based on buffer status
await this.smartBufferControl(dataChannel, peerId);
// Send data
// 🚀 直接发送,不分片
const sendResult = this.webrtcConnection.sendData(data, peerId);
if (!sendResult) {
@@ -748,7 +819,7 @@ class FileSender {
return chunks;
}
// Unified optimized version - uses batch reading + loop, suitable for all devices
// 🚀 修复版本:在网络传输层面正确添加序号 - 彻底解决Firefox乱序问题
private async processSendQueue(
file: CustomFile,
peerId: string
@@ -757,87 +828,95 @@ class FileSender {
const peerState = this.getPeerState(peerId);
const fileReader = new FileReader();
let offset = peerState.readOffset || 0;
let fileOffset = peerState.readOffset || 0;
const batchSize = FileSender.OPTIMIZED_CONFIG.BATCH_SIZE;
let totalChunksSent = 0;
let totalBytesSentInLoop = 0;
// 🔧 关键修复:使用网络传输块大小计算totalChunks
const networkChunkSize = FileSender.OPTIMIZED_CONFIG.NETWORK_CHUNK_SIZE; // 64KB
const remainingSize = file.size - fileOffset;
const totalNetworkChunks = Math.ceil(remainingSize / networkChunkSize);
postLogToBackend(
`[DEBUG] 🚀 Starting NETWORK-LEVEL EMBEDDED transfer - file: ${file.name}, totalNetworkChunks: ${totalNetworkChunks}, chunkSize: ${networkChunkSize}, startOffset: ${fileOffset}`
);
// Initialize network performance monitoring
this.initializeNetworkPerformance(peerId);
try {
let loopCount = 0;
// Use batch reading + loop instead of traditional recursion to greatly improve performance
while (offset < file.size && peerState.isSending) {
loopCount++;
let networkChunkIndex = 0; // 网络块序号
let currentFileOffset = fileOffset;
// Batch read multiple large chunks - fully utilize memory advantages
const chunks = await this.readMultipleChunks(
fileReader,
file,
offset,
this.chunkSize,
batchSize
// 按网络块大小逐个发送
while (currentFileOffset < file.size && peerState.isSending) {
// 计算当前网络块的实际大小
const currentNetworkChunkSize = Math.min(
networkChunkSize,
file.size - currentFileOffset
);
if (chunks.length === 0) break;
// 读取当前网络块
const networkChunk = await this.readSingleChunk(
fileReader,
file,
currentFileOffset,
currentNetworkChunkSize
);
for (const chunk of chunks) {
if (!peerState.isSending || offset >= file.size) break;
// 发送带序号的融合数据包
let sendSuccessful = false;
try {
await this.sendEmbeddedChunk(
networkChunk,
fileId,
networkChunkIndex,
totalNetworkChunks,
currentFileOffset,
peerId
);
sendSuccessful = true;
totalBytesSentInLoop += networkChunk.byteLength;
// 🔧 修复:检查发送是否成功
let sendSuccessful = false;
try {
await this.sendWithBackpressure(chunk, peerId);
sendSuccessful = true;
postLogToBackend(
`[DEBUG] ✓ Network chunk #${networkChunkIndex}/${totalNetworkChunks} sent - ${networkChunk.byteLength} bytes`
);
} catch (error) {
postLogToBackend(
`[Firefox Debug] ❌ Failed to send network chunk #${networkChunkIndex}: ${error}`
);
throw error;
}
totalChunksSent++;
totalBytesSentInLoop += chunk.byteLength;
} catch (error) {
postLogToBackend(
`[Firefox Debug] ❌ Failed to send chunk ${
totalChunksSent + 1
}: ${error}`
);
sendSuccessful = false;
// 不更新统计,但继续尝试发送下一个chunk
}
// 更新进度和位置
if (sendSuccessful) {
currentFileOffset += networkChunk.byteLength;
peerState.readOffset = currentFileOffset;
networkChunkIndex++;
// Update progress only if send was successful
if (sendSuccessful) {
offset += chunk.byteLength;
peerState.readOffset = offset;
// Update file and folder progress with success flag
await this.updateProgress(
chunk.byteLength,
fileId,
file.size,
peerId,
true // 明确标记为发送成功
);
} else {
// 发送失败但不中止传输,记录失败信息
postLogToBackend(
`[Firefox Debug] 🔄 Chunk send failed but continuing... failed chunks will be missing from total`
);
}
// Update file and folder progress
await this.updateProgress(
networkChunk.byteLength,
fileId,
file.size,
peerId,
true
);
}
}
postLogToBackend(
`[Firefox Debug] 🏁 All data sent, waiting for receiver to confirm completion...`
`[Firefox Debug] 🏁 All network chunks sent (${networkChunkIndex}/${totalNetworkChunks}), waiting for receiver confirmation...`
);
} catch (error: any) {
const errorMessage = `Error in hybrid optimized transfer: ${error.message}`;
const errorMessage = `Error in network-level embedded transfer: ${error.message}`;
postLogToBackend(
`[Firefox Debug] ❌ Send error after ${totalChunksSent} chunks, ${totalBytesSentInLoop} bytes: ${errorMessage}`
`[Firefox Debug] ❌ Network embedded send error: ${errorMessage}`
);
this.fireError(errorMessage, {
fileId,
peerId,
offset,
totalChunksSent,
currentFileOffset: peerState.readOffset,
totalBytesSentInLoop,
});
throw error;
+15
View File
@@ -57,6 +57,21 @@ export interface FolderReceiveComplete {
allStoreUpdated: boolean; // 确认所有文件都已加入Store
}
// 🚀 新增:融合到数据包中的chunk元数据结构
export interface EmbeddedChunkMeta {
chunkIndex: number; // 数据块序号,从0开始
totalChunks: number; // 总数据块数量
chunkSize: number; // 数据块大小(不包含元数据部分)
isLastChunk: boolean; // 是否为最后一个数据块
fileOffset: number; // 在文件中的偏移量
fileId: string; // 文件ID,用于匹配
}
// 注意:EmbeddedChunkMeta不在WebRTCMessage中,因为它嵌入在二进制数据内
// 🚀 融合数据包的二进制结构:
// [4字节:元数据长度] + [JSON元数据] + [实际chunk数据]
// 所有文件传输统一使用这种格式,彻底解决Firefox乱序问题
export type WebRTCMessage =
| fileMetadata
| FileRequest