重构了,待测试

This commit is contained in:
david_bai
2025-06-09 21:32:43 +08:00
parent 75901898a1
commit 341a392c20
2 changed files with 196 additions and 178 deletions
+195 -177
View File
@@ -1,6 +1,6 @@
//发送文件(夹)的流程:先发送文件 meta 信息,等待接收端请求,再发送文件内容,文件发送完再发送endMeta,等待接收端ack,结束
//发送文件夹的流程(同上):接收批量文件请求
//循环发送所有文件的meta,然后把属于folder的部分关于文件大小的记录下来,用于计算进度。接收展示端来区分单文件和文件夹
// 发送文件(夹)的流程:先发送文件 meta 信息,等待接收端请求,再发送文件内容,文件发送完再发送endMeta,等待接收端ack,结束
// 发送文件夹的流程(同上):接收批量文件请求
// 循环发送所有文件的meta,然后把属于folder的部分关于文件大小的记录下来,用于计算进度。接收展示端来区分单文件和文件夹
import { generateFileId } from "@/lib/fileUtils";
import { SpeedCalculator } from "@/lib/speedCalculator";
import WebRTC_Initiator from "./webrtc_Initiator";
@@ -21,6 +21,7 @@ class FileSender {
private pendingFiles: Map<string, CustomFile>;
private pendingFolerMeta: Record<string, FolderMeta>;
private speedCalculator: SpeedCalculator;
constructor(WebRTC_initiator: WebRTC_Initiator) {
this.webrtcConnection = WebRTC_initiator;
@@ -28,16 +29,33 @@ class FileSender {
this.peerStates = new Map(); // Map<peerId, PeerState>
this.chunkSize = 65536; // 64 KB chunks
this.maxBufferSize = 5; // 预读取的块数
this.maxBufferSize = 10; // 预读取的块数
this.pendingFiles = new Map(); //所有待发送的文件(引用){fileId:CustomFile}
this.pendingFolerMeta = {}; //文件夹对应的meta属性(总大小、文件总个数),用于记录传输进度,fileId:{totalSize:0 , fileIds:[]}
// 创建 SpeedCalculator 实例
this.speedCalculator = new SpeedCalculator();
this.setupDataHandler();
}
// region Logging and Error Handling
private log(
level: "log" | "warn" | "error",
message: string,
context?: Record<string, any>
) {
const prefix = `[FileSender]`;
console[level](prefix, message, context || "");
}
private fireError(message: string, context?: Record<string, any>) {
this.webrtcConnection.fireError(message, {
...context,
component: "FileSender",
});
}
// endregion
// 初始化新接收方的状态
private getPeerState(peerId: string): PeerState {
if (!this.peerStates.has(peerId)) {
@@ -54,44 +72,45 @@ class FileSender {
}
return this.peerStates.get(peerId)!; //! 非空断言(Non-Null Assertion Operator
}
private setupDataHandler(): void {
this.webrtcConnection.onDataReceived = (
data: string | ArrayBuffer,
peerId: string
) => {
this.handleReceivedData(data, peerId);
this.webrtcConnection.onDataReceived = (data, peerId) => {
if (typeof data === "string") {
try {
const parsedData = JSON.parse(data) as WebRTCMessage;
this.handleSignalingMessage(parsedData, peerId);
} catch (error) {
this.fireError("Error parsing received JSON data", { error, peerId });
}
}
};
}
private handleSignalingMessage(message: WebRTCMessage, peerId: string): void {
const peerState = this.getPeerState(peerId);
switch (message.type) {
case "fileRequest":
this.handleFileRequest(message as FileRequest, peerId);
break;
case "fileAck":
peerState.isSending = false;
this.log("log", `Received file-finish ack from peer ${peerId}`, {
fileId: (message as any).fileId,
});
break;
default:
this.log("warn", `Unknown signaling message type received`, {
type: message.type,
peerId,
});
}
}
public setProgressCallback(
callback: (fileId: string, progress: number, speed: number) => void,
peerId: string
): void {
const peerState = this.getPeerState(peerId);
peerState.progressCallback = callback;
}
private handleReceivedData(data: string | ArrayBuffer, peerId: string): void {
if (typeof data === "string") {
try {
const parsedData = JSON.parse(data) as WebRTCMessage;
const peerState = this.getPeerState(peerId);
const handlers: Record<string, () => void> = {
fileRequest: () =>
this.handleFileRequest(parsedData as FileRequest, peerId),
fileAck: () => {
peerState.isSending = false;
console.log(`Receive file-finish ack from peer ${peerId}`);
},
};
const handler = handlers[parsedData.type];
if (handler) handler();
} catch (error) {
console.error("Error parsing JSON:", error);
}
}
this.getPeerState(peerId).progressCallback = callback;
}
//响应 文件请求,发送文件
private async handleFileRequest(
@@ -99,9 +118,17 @@ class FileSender {
peerId: string
): Promise<void> {
const file = this.pendingFiles.get(request.fileId);
console.log("handleFileRequest", file, peerId);
this.log(
"log",
`Handling file request for ${request.fileId} from ${peerId}`
);
if (file) {
await this.sendSingleFile(file, peerId);
} else {
this.fireError(`File not found for request`, {
fileId: request.fileId,
peerId,
});
}
}
// 修改发送字符串的方法为异步方法
@@ -134,35 +161,40 @@ class FileSender {
public sendFileMeta(files: CustomFile[], peerId?: string): void {
//把属于folder的部分关于文件大小的记录下来,用于计算进度
for (const file of files) {
files.forEach((file) => {
if (file.folderName) {
const fileId = file.folderName;
const folderId = file.folderName;
//folderName:{totalSize:0 , fileIds:[]}
if (!(file.folderName in this.pendingFolerMeta)) {
//初始化
this.pendingFolerMeta[fileId] = { totalSize: 0, fileIds: [] };
if (!this.pendingFolerMeta[folderId]) {
this.pendingFolerMeta[folderId] = { totalSize: 0, fileIds: [] };
}
const folderMeta = this.pendingFolerMeta[fileId];
const fileId2 = generateFileId(file);
if (!folderMeta.fileIds.includes(fileId2)) {
const folderMeta = this.pendingFolerMeta[folderId];
const fileId = generateFileId(file);
if (!folderMeta.fileIds.includes(fileId)) {
//如果文件没被添加过
folderMeta.fileIds.push(fileId2);
folderMeta.fileIds.push(fileId);
folderMeta.totalSize += file.size;
}
}
}
});
//循环发送所有文件的meta
const sendToPeers = peerId ? [peerId] : Array.from(this.peerStates.keys());
for (const currentPeerId of sendToPeers) {
for (const file of files) {
const peers = peerId
? [peerId]
: Array.from(this.webrtcConnection.peerConnections.keys());
peers.forEach((pId) => {
files.forEach((file) => {
const fileId = generateFileId(file);
this.pendingFiles.set(fileId, file);
const fileMeta = this.getFileMeta(file);
console.log("fileMeta", fileMeta);
this.webrtcConnection.sendData(JSON.stringify(fileMeta), currentPeerId);
}
}
this.log("log", "Sending file metadata", { fileMeta, peerId: pId });
if (!this.webrtcConnection.sendData(JSON.stringify(fileMeta), pId)) {
this.fireError("Failed to send file metadata", {
fileMeta,
peerId: pId,
});
}
});
});
}
//发送单个文件
@@ -171,16 +203,39 @@ class FileSender {
peerId: string
): Promise<void> {
const fileId = generateFileId(file);
const peerState = this.getPeerState(peerId);
if (peerState.isSending) {
this.log(
"warn",
`Already sending a file to peer ${peerId}, request for ${file.name} ignored.`
);
return;
}
this.log("log", `Starting to send single file: ${file.name} to ${peerId}`);
// Reset state for the new transfer
peerState.isSending = true;
peerState.currentFolderName = file.folderName;
console.log("sendSingleFile", peerId, peerState);
await this.startSendingFile(fileId, peerId);
peerState.readOffset = 0;
peerState.bufferQueue = [];
peerState.isReading = false;
peerState.totalBytesSent[fileId] = 0;
// 如果当前正在传输文件,则等待传输完成--接收方确认
await this.waitForTransferComplete(peerId);
console.log(`fileId:${fileId} send done or already sent to peer ${peerId}`);
try {
await this.processSendQueue(file, peerId);
this.finalizeSendFile(fileId, peerId);
await this.waitForTransferComplete(peerId); // 等待传输完成--接收方确认
} catch (error: any) {
this.fireError(`Error sending file ${file.name}`, {
error: error.message,
fileId,
peerId,
});
this.abortFileSend(fileId, peerId);
}
}
private async waitForTransferComplete(peerId: string): Promise<void> {
@@ -189,10 +244,9 @@ class FileSender {
await new Promise((resolve) => setTimeout(resolve, 50));
}
}
private getFileMeta(file: CustomFile): fileMetadata {
const fileId = generateFileId(file);
const metadata = {
return {
type: "fileMeta",
fileId,
name: file.name,
@@ -201,24 +255,8 @@ class FileSender {
fullName: file.fullName,
folderName: file.folderName,
};
return metadata;
}
//同步 文件夹 进度--包含回调
private syncFolderProgress(fileId: string, peerId: string): void {
const folderMeta = this.pendingFolerMeta[fileId]; //fileId:{totalSize:0 , fileIds:[]}
const peerState = this.getPeerState(peerId);
if (!peerState) return;
this.speedCalculator.updateSendSpeed(
peerId,
peerState.totalBytesSent[fileId]
); // 使用累计接收量
const speed = this.speedCalculator.getSendSpeed(peerId);
const progress = peerState.totalBytesSent[fileId] / folderMeta.totalSize;
peerState.progressCallback?.(fileId, progress, speed);
}
//更新传输进度,并进行回调
private async updateProgress(
byteLength: number,
fileId: string,
@@ -228,78 +266,73 @@ class FileSender {
const peerState = this.getPeerState(peerId);
if (!peerState) return;
let progressFileId = fileId;
let currentBytes = peerState.totalBytesSent[fileId] || 0;
let totalSize = fileSize;
if (peerState.currentFolderName) {
//文件夹状态
this.syncFolderProgress(fileId, peerId);
const folderId = peerState.currentFolderName;
progressFileId = folderId;
if (!peerState.totalBytesSent[folderId])
peerState.totalBytesSent[folderId] = 0;
peerState.totalBytesSent[folderId] += byteLength;
currentBytes = peerState.totalBytesSent[folderId];
totalSize = this.pendingFolerMeta[folderId]?.totalSize || 0;
} else {
// 单文件状态
const progress = peerState.totalBytesSent[fileId] / fileSize;
this.speedCalculator.updateSendSpeed(
peerId,
peerState.totalBytesSent[fileId]
); // 使用累计接收量
const speed = this.speedCalculator.getSendSpeed(peerId);
peerState.progressCallback?.(fileId, progress, speed);
peerState.totalBytesSent[fileId] += byteLength;
currentBytes = peerState.totalBytesSent[fileId];
}
this.speedCalculator.updateSendSpeed(peerId, currentBytes);
const speed = this.speedCalculator.getSendSpeed(peerId);
const progress = totalSize > 0 ? currentBytes / totalSize : 0;
peerState.progressCallback?.(progressFileId, progress, speed);
}
private async sendWithBackpressure(
data: string | ArrayBuffer,
peerId: string
): Promise<boolean> {
): Promise<void> {
const dataChannel = this.webrtcConnection.dataChannels.get(peerId);
if (!dataChannel) return false;
if (!dataChannel) {
throw new Error("Data channel not found");
}
const threshold = dataChannel.bufferedAmountLowThreshold;
if (dataChannel.bufferedAmount > threshold) {
if (dataChannel.bufferedAmount > dataChannel.bufferedAmountLowThreshold) {
await new Promise<void>((resolve) => {
const onBufferedAmountLow = () => {
dataChannel.removeEventListener(
"bufferedamountlow",
onBufferedAmountLow
);
const listener = () => {
dataChannel.removeEventListener("bufferedamountlow", listener);
resolve();
};
dataChannel.addEventListener("bufferedamountlow", onBufferedAmountLow);
dataChannel.addEventListener("bufferedamountlow", listener);
});
}
return this.webrtcConnection.sendData(data, peerId);
if (!this.webrtcConnection.sendData(data, peerId)) {
throw new Error("sendData failed");
}
}
//开始发送文件内容
private async startSendingFile(
fileId: string,
private async processSendQueue(
file: CustomFile,
peerId: string
): Promise<void> {
const file = this.pendingFiles.get(fileId);
if (!file) return;
const fileId = generateFileId(file);
const peerState = this.getPeerState(peerId);
const folderId = peerState.currentFolderName ?? ""; //fileId
if (peerState.currentFolderName) {
//当前属于文件夹
const index = this.pendingFolerMeta[folderId].fileIds.indexOf(fileId);
if (index === 0) {
//发送第一个时清零
peerState.totalBytesSent[folderId] = 0; //记录文件夹 总发送字节数
}
}
peerState.totalBytesSent[fileId] = 0; //记录 当前文件 总发送字节数
peerState.readOffset = 0;
peerState.isReading = false;
const fileReader = new FileReader();
const readNextChunk = async (): Promise<void> => {
if (peerState.isReading) return;
peerState.isReading = true;
while (
peerState.bufferQueue.length < this.maxBufferSize &&
peerState.readOffset < file.size
while (peerState.readOffset < file.size) {
if (!peerState.isSending) {
throw new Error("File sending was aborted.");
}
// Read chunks into buffer if not already reading and buffer is not full
if (
!peerState.isReading &&
peerState.bufferQueue.length < this.maxBufferSize
) {
peerState.isReading = true;
const slice = file.slice(
peerState.readOffset,
peerState.readOffset + this.chunkSize
@@ -308,58 +341,30 @@ class FileSender {
const chunk = await this.readChunkAsArrayBuffer(fileReader, slice);
peerState.bufferQueue.push(chunk);
peerState.readOffset += chunk.byteLength;
} catch (error) {
console.error("Error reading file chunk:", error);
break;
} catch (error: any) {
throw new Error(`File chunk reading failed: ${error.message}`);
} finally {
peerState.isReading = false;
}
}
peerState.isReading = false;
};
const sendNextChunk = async (): Promise<void> => {
// Send chunks from buffer
if (peerState.bufferQueue.length > 0) {
const chunk = peerState.bufferQueue.shift()!;
await this.sendWithBackpressure(chunk, peerId);
if (peerState.currentFolderName) {
//当前属于文件夹
peerState.totalBytesSent[folderId] += chunk.byteLength;
await this.updateProgress(
chunk.byteLength,
folderId,
file.size,
peerId
); //更新文件(夹)的进度
} else {
await this.updateProgress(
chunk.byteLength,
fileId,
file.size,
peerId
); //更新文件的进度
}
peerState.totalBytesSent[fileId] += chunk.byteLength;
if (peerState.totalBytesSent[fileId] < file.size) {
//没发送完,继续发送
await readNextChunk();
sendNextChunk();
} else {
const speed = this.speedCalculator.getSendSpeed(peerId);
if (!peerState.currentFolderName)
peerState.progressCallback?.(fileId, 1, speed); //传输单文件时回传
this.finalizeSendFile(fileId, peerId); //发送完,再发送 fileEnd 信号
}
} else if (peerState.totalBytesSent[fileId] < file.size) {
//缓冲队列为空,继续读取和发送
await readNextChunk();
sendNextChunk();
await this.updateProgress(chunk.byteLength, fileId, file.size, peerId);
} else if (peerState.isReading) {
// If buffer is empty but we are still reading, wait a bit
await new Promise((resolve) => setTimeout(resolve, 50));
} else if (peerState.readOffset < file.size) {
// Buffer is empty, not reading, but not done, so trigger a read
continue;
}
};
await readNextChunk(); //开始读取和发送
sendNextChunk();
}
// Final progress update to 100%
if (!peerState.currentFolderName) {
this.getPeerState(peerId).progressCallback?.(fileId, 1, 0);
}
}
private readChunkAsArrayBuffer(
@@ -375,17 +380,30 @@ class FileSender {
reject(new Error("Failed to read blob as ArrayBuffer"));
}
};
fileReader.onerror = (error) => reject(error);
fileReader.onerror = () =>
reject(fileReader.error || new Error("Unknown FileReader error"));
fileReader.onabort = () => reject(new Error("File reading was aborted"));
fileReader.readAsArrayBuffer(blob);
});
}
//发送 fileEnd 信号
private finalizeSendFile(fileId: string, peerId: string): void {
const endMessage = JSON.stringify({
type: "fileEnd",
fileId: fileId,
});
this.webrtcConnection.sendData(endMessage, peerId);
// this.log("log", `Finalizing file send for ${fileId} to ${peerId}`);
const endMessage = JSON.stringify({ type: "fileEnd", fileId });
if (!this.webrtcConnection.sendData(endMessage, peerId)) {
this.log("warn", `Failed to send fileEnd message for ${fileId}`);
}
// The isSending flag will be set to false upon receiving fileAck
}
private abortFileSend(fileId: string, peerId: string): void {
this.log("warn", `Aborting file send for ${fileId} to ${peerId}`);
const peerState = this.getPeerState(peerId);
peerState.isSending = false;
peerState.readOffset = 0;
peerState.bufferQueue = [];
peerState.isReading = false;
// Optionally, send an abort message to the receiver
}
}
+1 -1
View File
@@ -100,7 +100,7 @@ export default class BaseWebRTC {
console[level](prefix, message, ...args);
}
protected fireError(message: string, context?: Record<string, any>) {
public fireError(message: string, context?: Record<string, any>) {
const error = new WebRTCError(message, context);
this.log("error", message, context);
this.onError?.(error);