From 341a392c20a2a1ca0da0cb210a176760f5bd398d Mon Sep 17 00:00:00 2001 From: david_bai Date: Mon, 9 Jun 2025 21:32:43 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E4=BA=86,=E5=BE=85=E6=B5=8B?= =?UTF-8?q?=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/lib/fileSender.ts | 372 +++++++++++++++++++----------------- frontend/lib/webrtc_base.ts | 2 +- 2 files changed, 196 insertions(+), 178 deletions(-) diff --git a/frontend/lib/fileSender.ts b/frontend/lib/fileSender.ts index 2f63f08..6d30782 100644 --- a/frontend/lib/fileSender.ts +++ b/frontend/lib/fileSender.ts @@ -1,6 +1,6 @@ -//发送文件(夹)的流程:先发送文件 meta 信息,等待接收端请求,再发送文件内容,文件发送完再发送endMeta,等待接收端ack,结束 -//发送文件夹的流程(同上):接收批量文件请求 -//循环发送所有文件的meta,然后把属于folder的部分关于文件大小的记录下来,用于计算进度。接收展示端来区分单文件和文件夹 +// 发送文件(夹)的流程:先发送文件 meta 信息,等待接收端请求,再发送文件内容,文件发送完再发送endMeta,等待接收端ack,结束 +// 发送文件夹的流程(同上):接收批量文件请求 +// 循环发送所有文件的meta,然后把属于folder的部分关于文件大小的记录下来,用于计算进度。接收展示端来区分单文件和文件夹 import { generateFileId } from "@/lib/fileUtils"; import { SpeedCalculator } from "@/lib/speedCalculator"; import WebRTC_Initiator from "./webrtc_Initiator"; @@ -21,6 +21,7 @@ class FileSender { private pendingFiles: Map; private pendingFolerMeta: Record; private speedCalculator: SpeedCalculator; + constructor(WebRTC_initiator: WebRTC_Initiator) { this.webrtcConnection = WebRTC_initiator; @@ -28,16 +29,33 @@ class FileSender { this.peerStates = new Map(); // Map this.chunkSize = 65536; // 64 KB chunks - this.maxBufferSize = 5; // 预读取的块数 + this.maxBufferSize = 10; // 预读取的块数 this.pendingFiles = new Map(); //所有待发送的文件(引用){fileId:CustomFile} this.pendingFolerMeta = {}; //文件夹对应的meta属性(总大小、文件总个数),用于记录传输进度,fileId:{totalSize:0 , fileIds:[]} // 创建 SpeedCalculator 实例 this.speedCalculator = new SpeedCalculator(); - this.setupDataHandler(); } + + // region Logging and Error Handling + private log( + level: "log" | "warn" | "error", + message: string, + context?: Record + ) { + const prefix = `[FileSender]`; + console[level](prefix, message, context || ""); + } + + private fireError(message: string, context?: Record) { + this.webrtcConnection.fireError(message, { + ...context, + component: "FileSender", + }); + } + // endregion // 初始化新接收方的状态 private getPeerState(peerId: string): PeerState { if (!this.peerStates.has(peerId)) { @@ -54,44 +72,45 @@ class FileSender { } return this.peerStates.get(peerId)!; //! 非空断言(Non-Null Assertion Operator) } + private setupDataHandler(): void { - this.webrtcConnection.onDataReceived = ( - data: string | ArrayBuffer, - peerId: string - ) => { - this.handleReceivedData(data, peerId); + this.webrtcConnection.onDataReceived = (data, peerId) => { + if (typeof data === "string") { + try { + const parsedData = JSON.parse(data) as WebRTCMessage; + this.handleSignalingMessage(parsedData, peerId); + } catch (error) { + this.fireError("Error parsing received JSON data", { error, peerId }); + } + } }; } + private handleSignalingMessage(message: WebRTCMessage, peerId: string): void { + const peerState = this.getPeerState(peerId); + switch (message.type) { + case "fileRequest": + this.handleFileRequest(message as FileRequest, peerId); + break; + case "fileAck": + peerState.isSending = false; + this.log("log", `Received file-finish ack from peer ${peerId}`, { + fileId: (message as any).fileId, + }); + break; + default: + this.log("warn", `Unknown signaling message type received`, { + type: message.type, + peerId, + }); + } + } + public setProgressCallback( callback: (fileId: string, progress: number, speed: number) => void, peerId: string ): void { - const peerState = this.getPeerState(peerId); - peerState.progressCallback = callback; - } - - private handleReceivedData(data: string | ArrayBuffer, peerId: string): void { - if (typeof data === "string") { - try { - const parsedData = JSON.parse(data) as WebRTCMessage; - const peerState = this.getPeerState(peerId); - - const handlers: Record void> = { - fileRequest: () => - this.handleFileRequest(parsedData as FileRequest, peerId), - fileAck: () => { - peerState.isSending = false; - console.log(`Receive file-finish ack from peer ${peerId}`); - }, - }; - - const handler = handlers[parsedData.type]; - if (handler) handler(); - } catch (error) { - console.error("Error parsing JSON:", error); - } - } + this.getPeerState(peerId).progressCallback = callback; } //响应 文件请求,发送文件 private async handleFileRequest( @@ -99,9 +118,17 @@ class FileSender { peerId: string ): Promise { const file = this.pendingFiles.get(request.fileId); - console.log("handleFileRequest", file, peerId); + this.log( + "log", + `Handling file request for ${request.fileId} from ${peerId}` + ); if (file) { await this.sendSingleFile(file, peerId); + } else { + this.fireError(`File not found for request`, { + fileId: request.fileId, + peerId, + }); } } // 修改发送字符串的方法为异步方法 @@ -134,35 +161,40 @@ class FileSender { public sendFileMeta(files: CustomFile[], peerId?: string): void { //把属于folder的部分关于文件大小的记录下来,用于计算进度 - for (const file of files) { + files.forEach((file) => { if (file.folderName) { - const fileId = file.folderName; + const folderId = file.folderName; //folderName:{totalSize:0 , fileIds:[]} - if (!(file.folderName in this.pendingFolerMeta)) { - //初始化 - this.pendingFolerMeta[fileId] = { totalSize: 0, fileIds: [] }; + if (!this.pendingFolerMeta[folderId]) { + this.pendingFolerMeta[folderId] = { totalSize: 0, fileIds: [] }; } - const folderMeta = this.pendingFolerMeta[fileId]; - const fileId2 = generateFileId(file); - if (!folderMeta.fileIds.includes(fileId2)) { + const folderMeta = this.pendingFolerMeta[folderId]; + const fileId = generateFileId(file); + if (!folderMeta.fileIds.includes(fileId)) { //如果文件没被添加过 - folderMeta.fileIds.push(fileId2); + folderMeta.fileIds.push(fileId); folderMeta.totalSize += file.size; } } - } + }); //循环发送所有文件的meta - const sendToPeers = peerId ? [peerId] : Array.from(this.peerStates.keys()); - for (const currentPeerId of sendToPeers) { - for (const file of files) { + const peers = peerId + ? [peerId] + : Array.from(this.webrtcConnection.peerConnections.keys()); + peers.forEach((pId) => { + files.forEach((file) => { const fileId = generateFileId(file); this.pendingFiles.set(fileId, file); - const fileMeta = this.getFileMeta(file); - console.log("fileMeta", fileMeta); - this.webrtcConnection.sendData(JSON.stringify(fileMeta), currentPeerId); - } - } + this.log("log", "Sending file metadata", { fileMeta, peerId: pId }); + if (!this.webrtcConnection.sendData(JSON.stringify(fileMeta), pId)) { + this.fireError("Failed to send file metadata", { + fileMeta, + peerId: pId, + }); + } + }); + }); } //发送单个文件 @@ -171,16 +203,39 @@ class FileSender { peerId: string ): Promise { const fileId = generateFileId(file); - const peerState = this.getPeerState(peerId); + + if (peerState.isSending) { + this.log( + "warn", + `Already sending a file to peer ${peerId}, request for ${file.name} ignored.` + ); + return; + } + + this.log("log", `Starting to send single file: ${file.name} to ${peerId}`); + + // Reset state for the new transfer peerState.isSending = true; peerState.currentFolderName = file.folderName; - console.log("sendSingleFile", peerId, peerState); - await this.startSendingFile(fileId, peerId); + peerState.readOffset = 0; + peerState.bufferQueue = []; + peerState.isReading = false; + peerState.totalBytesSent[fileId] = 0; - // 如果当前正在传输文件,则等待传输完成--接收方确认 - await this.waitForTransferComplete(peerId); - console.log(`fileId:${fileId} send done or already sent to peer ${peerId}`); + try { + await this.processSendQueue(file, peerId); + this.finalizeSendFile(fileId, peerId); + + await this.waitForTransferComplete(peerId); // 等待传输完成--接收方确认 + } catch (error: any) { + this.fireError(`Error sending file ${file.name}`, { + error: error.message, + fileId, + peerId, + }); + this.abortFileSend(fileId, peerId); + } } private async waitForTransferComplete(peerId: string): Promise { @@ -189,10 +244,9 @@ class FileSender { await new Promise((resolve) => setTimeout(resolve, 50)); } } - private getFileMeta(file: CustomFile): fileMetadata { const fileId = generateFileId(file); - const metadata = { + return { type: "fileMeta", fileId, name: file.name, @@ -201,24 +255,8 @@ class FileSender { fullName: file.fullName, folderName: file.folderName, }; - return metadata; } - //同步 文件夹 进度--包含回调 - private syncFolderProgress(fileId: string, peerId: string): void { - const folderMeta = this.pendingFolerMeta[fileId]; //fileId:{totalSize:0 , fileIds:[]} - const peerState = this.getPeerState(peerId); - if (!peerState) return; - this.speedCalculator.updateSendSpeed( - peerId, - peerState.totalBytesSent[fileId] - ); // 使用累计接收量 - const speed = this.speedCalculator.getSendSpeed(peerId); - - const progress = peerState.totalBytesSent[fileId] / folderMeta.totalSize; - peerState.progressCallback?.(fileId, progress, speed); - } - //更新传输进度,并进行回调 private async updateProgress( byteLength: number, fileId: string, @@ -228,78 +266,73 @@ class FileSender { const peerState = this.getPeerState(peerId); if (!peerState) return; + let progressFileId = fileId; + let currentBytes = peerState.totalBytesSent[fileId] || 0; + let totalSize = fileSize; + if (peerState.currentFolderName) { - //文件夹状态 - this.syncFolderProgress(fileId, peerId); + const folderId = peerState.currentFolderName; + progressFileId = folderId; + if (!peerState.totalBytesSent[folderId]) + peerState.totalBytesSent[folderId] = 0; + peerState.totalBytesSent[folderId] += byteLength; + currentBytes = peerState.totalBytesSent[folderId]; + totalSize = this.pendingFolerMeta[folderId]?.totalSize || 0; } else { - // 单文件状态 - const progress = peerState.totalBytesSent[fileId] / fileSize; - - this.speedCalculator.updateSendSpeed( - peerId, - peerState.totalBytesSent[fileId] - ); // 使用累计接收量 - const speed = this.speedCalculator.getSendSpeed(peerId); - - peerState.progressCallback?.(fileId, progress, speed); + peerState.totalBytesSent[fileId] += byteLength; + currentBytes = peerState.totalBytesSent[fileId]; } + + this.speedCalculator.updateSendSpeed(peerId, currentBytes); + const speed = this.speedCalculator.getSendSpeed(peerId); + const progress = totalSize > 0 ? currentBytes / totalSize : 0; + + peerState.progressCallback?.(progressFileId, progress, speed); } private async sendWithBackpressure( data: string | ArrayBuffer, peerId: string - ): Promise { + ): Promise { const dataChannel = this.webrtcConnection.dataChannels.get(peerId); - if (!dataChannel) return false; + if (!dataChannel) { + throw new Error("Data channel not found"); + } - const threshold = dataChannel.bufferedAmountLowThreshold; - if (dataChannel.bufferedAmount > threshold) { + if (dataChannel.bufferedAmount > dataChannel.bufferedAmountLowThreshold) { await new Promise((resolve) => { - const onBufferedAmountLow = () => { - dataChannel.removeEventListener( - "bufferedamountlow", - onBufferedAmountLow - ); + const listener = () => { + dataChannel.removeEventListener("bufferedamountlow", listener); resolve(); }; - dataChannel.addEventListener("bufferedamountlow", onBufferedAmountLow); + dataChannel.addEventListener("bufferedamountlow", listener); }); } - return this.webrtcConnection.sendData(data, peerId); + if (!this.webrtcConnection.sendData(data, peerId)) { + throw new Error("sendData failed"); + } } //开始发送文件内容 - private async startSendingFile( - fileId: string, + private async processSendQueue( + file: CustomFile, peerId: string ): Promise { - const file = this.pendingFiles.get(fileId); - if (!file) return; - + const fileId = generateFileId(file); const peerState = this.getPeerState(peerId); - const folderId = peerState.currentFolderName ?? ""; //fileId - - if (peerState.currentFolderName) { - //当前属于文件夹 - const index = this.pendingFolerMeta[folderId].fileIds.indexOf(fileId); - if (index === 0) { - //发送第一个时清零 - peerState.totalBytesSent[folderId] = 0; //记录文件夹 总发送字节数 - } - } - peerState.totalBytesSent[fileId] = 0; //记录 当前文件 总发送字节数 - peerState.readOffset = 0; - peerState.isReading = false; - const fileReader = new FileReader(); - const readNextChunk = async (): Promise => { - if (peerState.isReading) return; - peerState.isReading = true; - while ( - peerState.bufferQueue.length < this.maxBufferSize && - peerState.readOffset < file.size + while (peerState.readOffset < file.size) { + if (!peerState.isSending) { + throw new Error("File sending was aborted."); + } + + // Read chunks into buffer if not already reading and buffer is not full + if ( + !peerState.isReading && + peerState.bufferQueue.length < this.maxBufferSize ) { + peerState.isReading = true; const slice = file.slice( peerState.readOffset, peerState.readOffset + this.chunkSize @@ -308,58 +341,30 @@ class FileSender { const chunk = await this.readChunkAsArrayBuffer(fileReader, slice); peerState.bufferQueue.push(chunk); peerState.readOffset += chunk.byteLength; - } catch (error) { - console.error("Error reading file chunk:", error); - break; + } catch (error: any) { + throw new Error(`File chunk reading failed: ${error.message}`); + } finally { + peerState.isReading = false; } } - peerState.isReading = false; - }; - const sendNextChunk = async (): Promise => { + // Send chunks from buffer if (peerState.bufferQueue.length > 0) { const chunk = peerState.bufferQueue.shift()!; await this.sendWithBackpressure(chunk, peerId); - - if (peerState.currentFolderName) { - //当前属于文件夹 - peerState.totalBytesSent[folderId] += chunk.byteLength; - await this.updateProgress( - chunk.byteLength, - folderId, - file.size, - peerId - ); //更新文件(夹)的进度 - } else { - await this.updateProgress( - chunk.byteLength, - fileId, - file.size, - peerId - ); //更新文件的进度 - } - peerState.totalBytesSent[fileId] += chunk.byteLength; - - if (peerState.totalBytesSent[fileId] < file.size) { - //没发送完,继续发送 - await readNextChunk(); - sendNextChunk(); - } else { - const speed = this.speedCalculator.getSendSpeed(peerId); - if (!peerState.currentFolderName) - peerState.progressCallback?.(fileId, 1, speed); //传输单文件时回传 - - this.finalizeSendFile(fileId, peerId); //发送完,再发送 fileEnd 信号 - } - } else if (peerState.totalBytesSent[fileId] < file.size) { - //缓冲队列为空,继续读取和发送 - await readNextChunk(); - sendNextChunk(); + await this.updateProgress(chunk.byteLength, fileId, file.size, peerId); + } else if (peerState.isReading) { + // If buffer is empty but we are still reading, wait a bit + await new Promise((resolve) => setTimeout(resolve, 50)); + } else if (peerState.readOffset < file.size) { + // Buffer is empty, not reading, but not done, so trigger a read + continue; } - }; - - await readNextChunk(); //开始读取和发送 - sendNextChunk(); + } + // Final progress update to 100% + if (!peerState.currentFolderName) { + this.getPeerState(peerId).progressCallback?.(fileId, 1, 0); + } } private readChunkAsArrayBuffer( @@ -375,17 +380,30 @@ class FileSender { reject(new Error("Failed to read blob as ArrayBuffer")); } }; - fileReader.onerror = (error) => reject(error); + fileReader.onerror = () => + reject(fileReader.error || new Error("Unknown FileReader error")); + fileReader.onabort = () => reject(new Error("File reading was aborted")); fileReader.readAsArrayBuffer(blob); }); } //发送 fileEnd 信号 private finalizeSendFile(fileId: string, peerId: string): void { - const endMessage = JSON.stringify({ - type: "fileEnd", - fileId: fileId, - }); - this.webrtcConnection.sendData(endMessage, peerId); + // this.log("log", `Finalizing file send for ${fileId} to ${peerId}`); + const endMessage = JSON.stringify({ type: "fileEnd", fileId }); + if (!this.webrtcConnection.sendData(endMessage, peerId)) { + this.log("warn", `Failed to send fileEnd message for ${fileId}`); + } + // The isSending flag will be set to false upon receiving fileAck + } + + private abortFileSend(fileId: string, peerId: string): void { + this.log("warn", `Aborting file send for ${fileId} to ${peerId}`); + const peerState = this.getPeerState(peerId); + peerState.isSending = false; + peerState.readOffset = 0; + peerState.bufferQueue = []; + peerState.isReading = false; + // Optionally, send an abort message to the receiver } } diff --git a/frontend/lib/webrtc_base.ts b/frontend/lib/webrtc_base.ts index 45986b5..9fc166c 100644 --- a/frontend/lib/webrtc_base.ts +++ b/frontend/lib/webrtc_base.ts @@ -100,7 +100,7 @@ export default class BaseWebRTC { console[level](prefix, message, ...args); } - protected fireError(message: string, context?: Record) { + public fireError(message: string, context?: Record) { const error = new WebRTCError(message, context); this.log("error", message, context); this.onError?.(error);