fileSender code splitting
This commit is contained in:
+30
-851
@@ -1,873 +1,52 @@
|
|||||||
// 🚀 新流程 - 接收端主导的文件传输:
|
// 🚀 新流程 - 接收端主导的文件传输
|
||||||
// 1. 发送文件元数据 (fileMetadata)
|
// 重构后的FileSender - 使用模块化架构
|
||||||
// 2. 接收文件请求 (fileRequest)
|
|
||||||
// 3. 发送所有数据块,完成后等待接收端确认
|
|
||||||
// 4. 收到接收端确认 (fileReceiveComplete/folderReceiveComplete) 后设置进度100%
|
|
||||||
// 发送端不再主动发送完成信号,完全由接收端控制完成时机
|
|
||||||
import { generateFileId } from "@/lib/fileUtils";
|
|
||||||
import { SpeedCalculator } from "@/lib/speedCalculator";
|
|
||||||
import WebRTC_Initiator from "./webrtc_Initiator";
|
import WebRTC_Initiator from "./webrtc_Initiator";
|
||||||
import {
|
import { CustomFile } from "@/types/webrtc";
|
||||||
CustomFile,
|
import { FileTransferOrchestrator } from "./transfer/FileTransferOrchestrator";
|
||||||
fileMetadata,
|
|
||||||
WebRTCMessage,
|
|
||||||
PeerState,
|
|
||||||
FolderMeta,
|
|
||||||
FileRequest,
|
|
||||||
FileReceiveComplete,
|
|
||||||
FolderReceiveComplete,
|
|
||||||
EmbeddedChunkMeta,
|
|
||||||
} from "@/types/webrtc";
|
|
||||||
import { postLogToBackend } from "@/app/config/api";
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 FileSender - 向后兼容包装层
|
||||||
|
*
|
||||||
|
* 重构说明:
|
||||||
|
* - 原875行单体类重构为模块化架构
|
||||||
|
* - 内部使用FileTransferOrchestrator统一编排
|
||||||
|
* - 保持100%向后兼容的公共API
|
||||||
|
* - 获得高性能文件读取、智能背压控制等优势
|
||||||
|
*/
|
||||||
class FileSender {
|
class FileSender {
|
||||||
private webrtcConnection: WebRTC_Initiator;
|
private orchestrator: FileTransferOrchestrator;
|
||||||
private peerStates: Map<string, PeerState>;
|
|
||||||
private readonly chunkSize: number;
|
|
||||||
private pendingFiles: Map<string, CustomFile>;
|
|
||||||
private pendingFolerMeta: Record<string, FolderMeta>;
|
|
||||||
private speedCalculator: SpeedCalculator;
|
|
||||||
|
|
||||||
// Adaptive performance monitoring
|
constructor(webrtcConnection: WebRTC_Initiator) {
|
||||||
private networkPerformance: Map<
|
this.orchestrator = new FileTransferOrchestrator(webrtcConnection);
|
||||||
string,
|
console.log("[FileSender] ✅ Initialized with modular architecture");
|
||||||
{
|
|
||||||
avgClearingRate: number; // Average network clearing speed KB/s
|
|
||||||
optimalThreshold: number; // Dynamically optimized threshold
|
|
||||||
avgWaitTime: number; // Average wait time
|
|
||||||
sampleCount: number; // Sample count
|
|
||||||
}
|
|
||||||
> = new Map();
|
|
||||||
|
|
||||||
// Hybrid optimization configuration - FileReader large chunks + network small packets strategy (fixes sendData failed)
|
|
||||||
private static readonly OPTIMIZED_CONFIG = {
|
|
||||||
CHUNK_SIZE: 4194304, // 4MB - Extreme large chunks, maximally reduce FileReader calls
|
|
||||||
BATCH_SIZE: 8, // 8 chunks batch - 32MB batch processing success
|
|
||||||
NETWORK_CHUNK_SIZE: 65536, // 64KB - WebRTC safe sending size, fixes sendData failed
|
|
||||||
BUFFER_THRESHOLD: 3145728, // 3MB - Threshold
|
|
||||||
BACKPRESSURE_TIMEOUT: 2000, // 2 second timeout - reserves more time for large chunk processing
|
|
||||||
} as const;
|
|
||||||
|
|
||||||
constructor(WebRTC_initiator: WebRTC_Initiator) {
|
|
||||||
this.webrtcConnection = WebRTC_initiator;
|
|
||||||
|
|
||||||
// Maintain independent sending states for each receiver
|
|
||||||
this.peerStates = new Map(); // Map<peerId, PeerState>
|
|
||||||
|
|
||||||
// Uniformly use optimized parameters - all devices share the best configuration
|
|
||||||
this.chunkSize = FileSender.OPTIMIZED_CONFIG.CHUNK_SIZE;
|
|
||||||
this.pendingFiles = new Map(); // All files pending to be sent (by reference) {fileId: CustomFile}
|
|
||||||
|
|
||||||
this.pendingFolerMeta = {}; // Metadata for folders (total size, total file count), used for tracking transfer progress
|
|
||||||
|
|
||||||
// Create a SpeedCalculator instance
|
|
||||||
this.speedCalculator = new SpeedCalculator();
|
|
||||||
this.setupDataHandler();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// region Logging and Error Handling
|
// ===== 向后兼容的公共API =====
|
||||||
private log(
|
|
||||||
level: "log" | "warn" | "error",
|
public sendFileMeta(files: CustomFile[], peerId?: string): void {
|
||||||
message: string,
|
return this.orchestrator.sendFileMeta(files, peerId);
|
||||||
context?: Record<string, any>
|
|
||||||
) {
|
|
||||||
const prefix = `[FileSender]`;
|
|
||||||
console[level](prefix, message, context || "");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fireError(message: string, context?: Record<string, any>) {
|
public async sendString(content: string, peerId: string): Promise<void> {
|
||||||
this.webrtcConnection.fireError(message, {
|
return this.orchestrator.sendString(content, peerId);
|
||||||
...context,
|
|
||||||
component: "FileSender",
|
|
||||||
});
|
|
||||||
}
|
|
||||||
// endregion
|
|
||||||
// Initialize state for a new receiver
|
|
||||||
private getPeerState(peerId: string): PeerState {
|
|
||||||
if (!this.peerStates.has(peerId)) {
|
|
||||||
this.peerStates.set(peerId, {
|
|
||||||
isSending: false, // Used to determine if a file is successfully sent. True before sending, false after receiving ack.
|
|
||||||
bufferQueue: [], // Pre-read buffer to improve sending efficiency.
|
|
||||||
readOffset: 0, // Read position, used by the sending function.
|
|
||||||
isReading: false, // Whether reading is in progress, used by the sending function to avoid duplicate reads.
|
|
||||||
|
|
||||||
currentFolderName: "", // If the current file belongs to a folder, assign the folder name here.
|
|
||||||
totalBytesSent: {}, // Bytes sent for a file/folder, used for progress calculation; {fileId: 0}
|
|
||||||
progressCallback: null, // Progress callback.
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return this.peerStates.get(peerId)!; // ! Non-Null Assertion Operator
|
|
||||||
}
|
|
||||||
|
|
||||||
private setupDataHandler(): void {
|
|
||||||
this.webrtcConnection.onDataReceived = (data, peerId) => {
|
|
||||||
if (typeof data === "string") {
|
|
||||||
try {
|
|
||||||
const parsedData = JSON.parse(data) as WebRTCMessage;
|
|
||||||
this.handleSignalingMessage(parsedData, peerId);
|
|
||||||
} catch (error) {
|
|
||||||
this.fireError("Error parsing received JSON data", { error, peerId });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
private handleSignalingMessage(message: WebRTCMessage, peerId: string): void {
|
|
||||||
const peerState = this.getPeerState(peerId);
|
|
||||||
switch (message.type) {
|
|
||||||
case "fileRequest":
|
|
||||||
this.handleFileRequest(message as FileRequest, peerId);
|
|
||||||
break;
|
|
||||||
case "fileReceiveComplete":
|
|
||||||
this.handleFileReceiveComplete(message as FileReceiveComplete, peerId);
|
|
||||||
break;
|
|
||||||
case "folderReceiveComplete":
|
|
||||||
this.handleFolderReceiveComplete(
|
|
||||||
message as FolderReceiveComplete,
|
|
||||||
peerId
|
|
||||||
);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
this.log("warn", `Unknown signaling message type received`, {
|
|
||||||
type: message.type,
|
|
||||||
peerId,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public setProgressCallback(
|
public setProgressCallback(
|
||||||
callback: (fileId: string, progress: number, speed: number) => void,
|
callback: (fileId: string, progress: number, speed: number) => void,
|
||||||
peerId: string
|
peerId: string
|
||||||
): void {
|
): void {
|
||||||
this.getPeerState(peerId).progressCallback = callback;
|
return this.orchestrator.setProgressCallback(callback, peerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
// ===== 新增API =====
|
||||||
* 处理接收端发送的文件接收完成确认 - 新流程
|
|
||||||
*/
|
|
||||||
private handleFileReceiveComplete(
|
|
||||||
message: FileReceiveComplete,
|
|
||||||
peerId: string
|
|
||||||
): void {
|
|
||||||
const peerState = this.getPeerState(peerId);
|
|
||||||
// 清理发送状态
|
|
||||||
peerState.isSending = false;
|
|
||||||
|
|
||||||
// 触发单文件100%进度(只有非文件夹情况)
|
public getTransferStats(peerId?: string) {
|
||||||
if (!peerState.currentFolderName) {
|
return this.orchestrator.getTransferStats(peerId);
|
||||||
peerState.progressCallback?.(message.fileId, 1, 0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public cleanup(): void {
|
||||||
* 处理接收端发送的文件夹接收完成确认 - 新流程
|
return this.orchestrator.cleanup();
|
||||||
*/
|
|
||||||
private handleFolderReceiveComplete(
|
|
||||||
message: FolderReceiveComplete,
|
|
||||||
peerId: string
|
|
||||||
): void {
|
|
||||||
const peerState = this.getPeerState(peerId);
|
|
||||||
|
|
||||||
postLogToBackend(
|
|
||||||
`[Firefox Debug] 📥 Received folderReceiveComplete - folderName: ${message.folderName}, completedFiles: ${message.completedFileIds.length}, allStoreUpdated: ${message.allStoreUpdated}`
|
|
||||||
);
|
|
||||||
|
|
||||||
// 触发文件夹100%进度
|
|
||||||
if (this.pendingFolerMeta[message.folderName]) {
|
|
||||||
postLogToBackend(
|
|
||||||
`[Firefox Debug] 🎯 Setting folder progress to 100% - ${message.folderName}`
|
|
||||||
);
|
|
||||||
peerState.progressCallback?.(message.folderName, 1, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.log("log", `Folder reception confirmed by peer ${peerId}`, {
|
|
||||||
folderName: message.folderName,
|
|
||||||
completedFiles: message.completedFileIds.length,
|
|
||||||
allStoreUpdated: message.allStoreUpdated,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
// Respond to a file request by sending the file
|
|
||||||
private async handleFileRequest(
|
|
||||||
request: FileRequest,
|
|
||||||
peerId: string
|
|
||||||
): Promise<void> {
|
|
||||||
const file = this.pendingFiles.get(request.fileId);
|
|
||||||
const offset = request.offset || 0;
|
|
||||||
this.log(
|
|
||||||
"log",
|
|
||||||
`Handling file request for ${request.fileId} from ${peerId} with offset ${offset}`
|
|
||||||
);
|
|
||||||
if (file) {
|
|
||||||
await this.sendSingleFile(file, peerId, offset);
|
|
||||||
} else {
|
|
||||||
this.fireError(`File not found for request`, {
|
|
||||||
fileId: request.fileId,
|
|
||||||
peerId,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Modify the sendString method to be asynchronous
|
|
||||||
public async sendString(content: string, peerId: string): Promise<void> {
|
|
||||||
const chunks: string[] = [];
|
|
||||||
for (let i = 0; i < content.length; i += this.chunkSize) {
|
|
||||||
chunks.push(content.slice(i, i + this.chunkSize));
|
|
||||||
}
|
|
||||||
|
|
||||||
// First, send the metadata
|
|
||||||
await this.sendWithBackpressure(
|
|
||||||
JSON.stringify({
|
|
||||||
type: "stringMetadata",
|
|
||||||
length: content.length,
|
|
||||||
}),
|
|
||||||
peerId
|
|
||||||
);
|
|
||||||
|
|
||||||
// Send each chunk sequentially, using backpressure control
|
|
||||||
for (let i = 0; i < chunks.length; i++) {
|
|
||||||
const data = JSON.stringify({
|
|
||||||
type: "string",
|
|
||||||
chunk: chunks[i],
|
|
||||||
index: i,
|
|
||||||
total: chunks.length,
|
|
||||||
});
|
|
||||||
await this.sendWithBackpressure(data, peerId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public sendFileMeta(files: CustomFile[], peerId?: string): void {
|
|
||||||
// Record the size of files belonging to a folder for progress calculation
|
|
||||||
files.forEach((file) => {
|
|
||||||
if (file.folderName) {
|
|
||||||
const folderId = file.folderName;
|
|
||||||
// folderName: {totalSize: 0, fileIds: []}
|
|
||||||
if (!this.pendingFolerMeta[folderId]) {
|
|
||||||
this.pendingFolerMeta[folderId] = { totalSize: 0, fileIds: [] };
|
|
||||||
}
|
|
||||||
const folderMeta = this.pendingFolerMeta[folderId];
|
|
||||||
const fileId = generateFileId(file);
|
|
||||||
if (!folderMeta.fileIds.includes(fileId)) {
|
|
||||||
// If the file has not been added yet
|
|
||||||
folderMeta.fileIds.push(fileId);
|
|
||||||
folderMeta.totalSize += file.size;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
// Loop through and send the metadata for all files
|
|
||||||
const peers = peerId
|
|
||||||
? [peerId]
|
|
||||||
: Array.from(this.webrtcConnection.peerConnections.keys());
|
|
||||||
peers.forEach((pId) => {
|
|
||||||
files.forEach((file) => {
|
|
||||||
const fileId = generateFileId(file);
|
|
||||||
this.pendingFiles.set(fileId, file);
|
|
||||||
const fileMeta = this.getFileMeta(file);
|
|
||||||
const metaDataString = JSON.stringify(fileMeta);
|
|
||||||
|
|
||||||
const sendResult = this.webrtcConnection.sendData(metaDataString, pId);
|
|
||||||
if (!sendResult) {
|
|
||||||
this.fireError("Failed to send file metadata", {
|
|
||||||
fileMeta,
|
|
||||||
peerId: pId,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send a single file
|
|
||||||
private async sendSingleFile(
|
|
||||||
file: CustomFile,
|
|
||||||
peerId: string,
|
|
||||||
offset: number = 0
|
|
||||||
): Promise<void> {
|
|
||||||
const fileId = generateFileId(file);
|
|
||||||
const peerState = this.getPeerState(peerId);
|
|
||||||
|
|
||||||
if (peerState.isSending) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reset state for the new transfer
|
|
||||||
peerState.isSending = true;
|
|
||||||
peerState.currentFolderName = file.folderName;
|
|
||||||
peerState.readOffset = offset; // Start reading from the given offset
|
|
||||||
peerState.bufferQueue = [];
|
|
||||||
peerState.isReading = false;
|
|
||||||
peerState.totalBytesSent[fileId] = offset; // Start counting sent bytes from the offset
|
|
||||||
|
|
||||||
try {
|
|
||||||
await this.processSendQueue(file, peerId);
|
|
||||||
await this.waitForTransferComplete(peerId); // Wait for receiver's fileReceiveComplete confirmation
|
|
||||||
} catch (error: any) {
|
|
||||||
this.fireError(`Error sending file ${file.name}: ${error.message}`, {
|
|
||||||
fileId,
|
|
||||||
peerId,
|
|
||||||
});
|
|
||||||
this.abortFileSend(fileId, peerId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async waitForTransferComplete(peerId: string): Promise<void> {
|
|
||||||
const peerState = this.getPeerState(peerId);
|
|
||||||
while (peerState?.isSending) {
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
private getFileMeta(file: CustomFile): fileMetadata {
|
|
||||||
const fileId = generateFileId(file);
|
|
||||||
return {
|
|
||||||
type: "fileMeta",
|
|
||||||
fileId,
|
|
||||||
name: file.name,
|
|
||||||
size: file.size,
|
|
||||||
fileType: file.type,
|
|
||||||
fullName: file.fullName,
|
|
||||||
folderName: file.folderName,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
private async updateProgress(
|
|
||||||
byteLength: number,
|
|
||||||
fileId: string,
|
|
||||||
fileSize: number,
|
|
||||||
peerId: string,
|
|
||||||
wasActuallySent: boolean = true // 新增:确保只有真正发送成功的数据才被统计
|
|
||||||
): Promise<void> {
|
|
||||||
const peerState = this.getPeerState(peerId);
|
|
||||||
if (!peerState) return;
|
|
||||||
|
|
||||||
// 🔧 重要修复:只有成功发送的数据才更新统计
|
|
||||||
if (!wasActuallySent) {
|
|
||||||
postLogToBackend(
|
|
||||||
`[Firefox Debug] ⚠️ Data send failed, not updating progress - fileId: ${fileId}, size: ${byteLength}`
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Always update the individual file's progress first.
|
|
||||||
if (!peerState.totalBytesSent[fileId]) {
|
|
||||||
// This case should be handled by sendSingleFile's initialization
|
|
||||||
peerState.totalBytesSent[fileId] = 0;
|
|
||||||
}
|
|
||||||
peerState.totalBytesSent[fileId] += byteLength;
|
|
||||||
|
|
||||||
let progressFileId = fileId;
|
|
||||||
let currentBytes = peerState.totalBytesSent[fileId];
|
|
||||||
let totalSize = fileSize;
|
|
||||||
|
|
||||||
// If the file is part of a folder, recalculate the folder's progress.
|
|
||||||
if (peerState.currentFolderName) {
|
|
||||||
const folderId = peerState.currentFolderName;
|
|
||||||
const folderMeta = this.pendingFolerMeta[folderId];
|
|
||||||
progressFileId = folderId;
|
|
||||||
totalSize = folderMeta?.totalSize || 0;
|
|
||||||
|
|
||||||
// Recalculate folder progress from the sum of its files' progresses.
|
|
||||||
// This is more robust and correct for resumed transfers.
|
|
||||||
let folderTotalSent = 0;
|
|
||||||
if (folderMeta) {
|
|
||||||
folderMeta.fileIds.forEach((fId) => {
|
|
||||||
folderTotalSent += peerState.totalBytesSent[fId] || 0;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
currentBytes = folderTotalSent;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.speedCalculator.updateSendSpeed(peerId, currentBytes);
|
|
||||||
const speed = this.speedCalculator.getSendSpeed(peerId);
|
|
||||||
const progress = totalSize > 0 ? currentBytes / totalSize : 0;
|
|
||||||
|
|
||||||
// Continuously update network performance (learn from transfer speed)
|
|
||||||
this.updateNetworkFromSpeed(peerId);
|
|
||||||
|
|
||||||
peerState.progressCallback?.(progressFileId, progress, speed);
|
|
||||||
}
|
|
||||||
|
|
||||||
private async sendWithBackpressure(
|
|
||||||
data: string | ArrayBuffer,
|
|
||||||
peerId: string
|
|
||||||
): Promise<void> {
|
|
||||||
const dataChannel = this.webrtcConnection.dataChannels.get(peerId);
|
|
||||||
if (!dataChannel) {
|
|
||||||
throw new Error("Data channel not found");
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
// For ArrayBuffer, if it exceeds 64KB, it needs to be sent in fragments (fixes sendData failed)
|
|
||||||
if (data instanceof ArrayBuffer) {
|
|
||||||
await this.sendLargeArrayBuffer(data, peerId);
|
|
||||||
} else {
|
|
||||||
await this.sendSingleData(data, peerId);
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
// 确保所有发送失败都能被正确抛出
|
|
||||||
const errorMessage = `sendWithBackpressure failed: ${error}`;
|
|
||||||
postLogToBackend(`[Firefox Debug] ${errorMessage}`);
|
|
||||||
throw new Error(errorMessage);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 🚀 新版本:构建融合元数据的数据包
|
|
||||||
private createEmbeddedChunkPacket(
|
|
||||||
chunkData: ArrayBuffer,
|
|
||||||
chunkMeta: EmbeddedChunkMeta
|
|
||||||
): ArrayBuffer {
|
|
||||||
// 1. 将元数据序列化为JSON
|
|
||||||
const metaJson = JSON.stringify(chunkMeta);
|
|
||||||
const metaBytes = new TextEncoder().encode(metaJson);
|
|
||||||
|
|
||||||
// 2. 元数据长度(4字节)
|
|
||||||
const metaLengthBuffer = new ArrayBuffer(4);
|
|
||||||
const metaLengthView = new Uint32Array(metaLengthBuffer);
|
|
||||||
metaLengthView[0] = metaBytes.length;
|
|
||||||
|
|
||||||
// 3. 构建最终的融合数据包
|
|
||||||
const totalLength = 4 + metaBytes.length + chunkData.byteLength;
|
|
||||||
const finalPacket = new Uint8Array(totalLength);
|
|
||||||
|
|
||||||
// 拼接: [4字节长度] + [元数据] + [原始chunk数据]
|
|
||||||
finalPacket.set(new Uint8Array(metaLengthBuffer), 0);
|
|
||||||
finalPacket.set(metaBytes, 4);
|
|
||||||
finalPacket.set(new Uint8Array(chunkData), 4 + metaBytes.length);
|
|
||||||
|
|
||||||
return finalPacket.buffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 🚀 新版本:发送带序号的融合数据包
|
|
||||||
private async sendEmbeddedChunk(
|
|
||||||
chunkData: ArrayBuffer,
|
|
||||||
fileId: string,
|
|
||||||
chunkIndex: number,
|
|
||||||
totalChunks: number,
|
|
||||||
fileOffset: number,
|
|
||||||
peerId: string
|
|
||||||
): Promise<void> {
|
|
||||||
const isLastChunk = chunkIndex === totalChunks - 1;
|
|
||||||
|
|
||||||
// 1. 创建元数据
|
|
||||||
const chunkMeta: EmbeddedChunkMeta = {
|
|
||||||
chunkIndex,
|
|
||||||
totalChunks,
|
|
||||||
chunkSize: chunkData.byteLength,
|
|
||||||
isLastChunk,
|
|
||||||
fileOffset,
|
|
||||||
fileId,
|
|
||||||
};
|
|
||||||
|
|
||||||
// 2. 构建融合数据包
|
|
||||||
const embeddedPacket = this.createEmbeddedChunkPacket(chunkData, chunkMeta);
|
|
||||||
|
|
||||||
// 3. 🔧 关键修复:融合数据包不能被分片,直接发送
|
|
||||||
await this.sendSingleData(embeddedPacket, peerId);
|
|
||||||
}
|
|
||||||
|
|
||||||
// New: Send large ArrayBuffer in fragments
|
|
||||||
private async sendLargeArrayBuffer(
|
|
||||||
data: ArrayBuffer,
|
|
||||||
peerId: string
|
|
||||||
): Promise<void> {
|
|
||||||
const networkChunkSize = FileSender.OPTIMIZED_CONFIG.NETWORK_CHUNK_SIZE;
|
|
||||||
const totalSize = data.byteLength;
|
|
||||||
|
|
||||||
// If data is less than 64KB, send directly
|
|
||||||
if (totalSize <= networkChunkSize) {
|
|
||||||
await this.sendSingleData(data, peerId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send large chunks in fragments
|
|
||||||
let offset = 0;
|
|
||||||
let fragmentIndex = 0;
|
|
||||||
|
|
||||||
while (offset < totalSize) {
|
|
||||||
const chunkSize = Math.min(networkChunkSize, totalSize - offset);
|
|
||||||
const chunk = data.slice(offset, offset + chunkSize);
|
|
||||||
|
|
||||||
// Send fragment
|
|
||||||
await this.sendSingleData(chunk, peerId);
|
|
||||||
postLogToBackend(
|
|
||||||
`[sender Debug] chunk idx:${fragmentIndex} ,size:${chunkSize}`
|
|
||||||
);
|
|
||||||
offset += chunkSize;
|
|
||||||
fragmentIndex++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 🚀 修复版本:发送单个数据包(禁止分片)
|
|
||||||
private async sendSingleData(
|
|
||||||
data: string | ArrayBuffer,
|
|
||||||
peerId: string
|
|
||||||
): Promise<void> {
|
|
||||||
const dataChannel = this.webrtcConnection.dataChannels.get(peerId);
|
|
||||||
if (!dataChannel) {
|
|
||||||
throw new Error("Data channel not found");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Firefox兼容性调试:记录发送前的数据信息
|
|
||||||
const dataType =
|
|
||||||
typeof data === "string"
|
|
||||||
? "string"
|
|
||||||
: data instanceof ArrayBuffer
|
|
||||||
? "ArrayBuffer"
|
|
||||||
: "unknown";
|
|
||||||
const dataSize =
|
|
||||||
typeof data === "string"
|
|
||||||
? data.length
|
|
||||||
: data instanceof ArrayBuffer
|
|
||||||
? data.byteLength
|
|
||||||
: 0;
|
|
||||||
|
|
||||||
// Intelligent send control - decide sending strategy based on buffer status
|
|
||||||
await this.smartBufferControl(dataChannel, peerId);
|
|
||||||
|
|
||||||
// 🚀 直接发送,不分片
|
|
||||||
const sendResult = this.webrtcConnection.sendData(data, peerId);
|
|
||||||
|
|
||||||
if (!sendResult) {
|
|
||||||
const errorMessage = `sendData failed for ${dataType} data of size ${dataSize}`;
|
|
||||||
postLogToBackend(`[Firefox Debug] ❌ ${errorMessage}`);
|
|
||||||
throw new Error(errorMessage);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize network performance monitoring (called at the start of transfer)
|
|
||||||
private initializeNetworkPerformance(peerId: string): void {
|
|
||||||
if (!this.networkPerformance.has(peerId)) {
|
|
||||||
// Use conservative initial values
|
|
||||||
this.networkPerformance.set(peerId, {
|
|
||||||
avgClearingRate: 5000, // 5MB/s initial estimate
|
|
||||||
optimalThreshold: FileSender.OPTIMIZED_CONFIG.BUFFER_THRESHOLD,
|
|
||||||
avgWaitTime: 50, // 50ms initial estimate
|
|
||||||
sampleCount: 0,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get current transfer speed from SpeedCalculator and update network performance
|
|
||||||
private updateNetworkFromSpeed(peerId: string): void {
|
|
||||||
const currentSpeed = this.speedCalculator.getSendSpeed(peerId); // KB/s
|
|
||||||
if (currentSpeed > 0) {
|
|
||||||
const perf = this.networkPerformance.get(peerId);
|
|
||||||
if (perf) {
|
|
||||||
perf.avgClearingRate = currentSpeed;
|
|
||||||
perf.sampleCount++;
|
|
||||||
|
|
||||||
// Adjust threshold every 10 speed updates
|
|
||||||
if (perf.sampleCount % 10 === 0) {
|
|
||||||
this.adjustOptimalThreshold(perf);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Shared logic for adjusting optimal threshold
|
|
||||||
private adjustOptimalThreshold(perf: {
|
|
||||||
avgClearingRate: number;
|
|
||||||
optimalThreshold: number;
|
|
||||||
avgWaitTime: number;
|
|
||||||
sampleCount: number;
|
|
||||||
}): void {
|
|
||||||
if (perf.avgClearingRate > 8000) {
|
|
||||||
// >8MB/s network is good
|
|
||||||
perf.optimalThreshold = Math.max(
|
|
||||||
FileSender.OPTIMIZED_CONFIG.BUFFER_THRESHOLD,
|
|
||||||
6291456
|
|
||||||
); // 6MB
|
|
||||||
} else if (perf.avgClearingRate > 4000) {
|
|
||||||
// >4MB/s network is average
|
|
||||||
perf.optimalThreshold = FileSender.OPTIMIZED_CONFIG.BUFFER_THRESHOLD; // 3MB
|
|
||||||
} else {
|
|
||||||
// Poor network
|
|
||||||
perf.optimalThreshold = Math.min(
|
|
||||||
FileSender.OPTIMIZED_CONFIG.BUFFER_THRESHOLD,
|
|
||||||
1572864
|
|
||||||
); // 1.5MB
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Adaptive network performance learning (learn from backpressure waiting)
|
|
||||||
private updateNetworkPerformance(
|
|
||||||
peerId: string,
|
|
||||||
clearingRate: number,
|
|
||||||
waitTime: number
|
|
||||||
): void {
|
|
||||||
if (!this.networkPerformance.has(peerId)) {
|
|
||||||
this.initializeNetworkPerformance(peerId);
|
|
||||||
}
|
|
||||||
|
|
||||||
const perf = this.networkPerformance.get(peerId)!;
|
|
||||||
perf.sampleCount++;
|
|
||||||
// Exponential moving average, with higher weight for new data
|
|
||||||
const alpha = 0.3;
|
|
||||||
perf.avgClearingRate =
|
|
||||||
perf.avgClearingRate * (1 - alpha) + clearingRate * alpha;
|
|
||||||
perf.avgWaitTime = perf.avgWaitTime * (1 - alpha) + waitTime * alpha;
|
|
||||||
// Adjust optimal threshold
|
|
||||||
this.adjustOptimalThreshold(perf);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get adaptive threshold
|
|
||||||
private getAdaptiveThreshold(peerId: string): number {
|
|
||||||
const perf = this.networkPerformance.get(peerId);
|
|
||||||
return perf
|
|
||||||
? perf.optimalThreshold
|
|
||||||
: FileSender.OPTIMIZED_CONFIG.BUFFER_THRESHOLD;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Adaptive intelligent send control strategy
|
|
||||||
private async intelligentSendControl(
|
|
||||||
dataChannel: RTCDataChannel,
|
|
||||||
peerId: string
|
|
||||||
): Promise<"AGGRESSIVE" | "NORMAL" | "CAUTIOUS" | "WAIT"> {
|
|
||||||
const bufferedAmount = dataChannel.bufferedAmount;
|
|
||||||
const adaptiveThreshold = this.getAdaptiveThreshold(peerId);
|
|
||||||
const utilizationRate = bufferedAmount / adaptiveThreshold;
|
|
||||||
|
|
||||||
// Dynamically adjust strategy thresholds: based on network performance
|
|
||||||
const perf = this.networkPerformance.get(peerId);
|
|
||||||
const networkQuality = perf
|
|
||||||
? perf.avgClearingRate > 6000
|
|
||||||
? "good"
|
|
||||||
: "poor"
|
|
||||||
: "unknown";
|
|
||||||
|
|
||||||
let aggressiveThreshold = 0.3;
|
|
||||||
let normalThreshold = 0.6;
|
|
||||||
let cautiousThreshold = 0.9;
|
|
||||||
|
|
||||||
if (networkQuality === "good") {
|
|
||||||
// Good network: more aggressive strategy
|
|
||||||
aggressiveThreshold = 0.4; // Actively send below 40%
|
|
||||||
normalThreshold = 0.7; // Normal send below 70%
|
|
||||||
} else if (networkQuality === "poor") {
|
|
||||||
// Poor network: more conservative strategy
|
|
||||||
aggressiveThreshold = 0.2; // Actively send only below 20%
|
|
||||||
normalThreshold = 0.5; // Normal send below 50%
|
|
||||||
cautiousThreshold = 0.8; // Wait above 80%
|
|
||||||
}
|
|
||||||
if (utilizationRate < aggressiveThreshold) {
|
|
||||||
return "AGGRESSIVE";
|
|
||||||
} else if (utilizationRate < normalThreshold) {
|
|
||||||
return "NORMAL";
|
|
||||||
} else if (utilizationRate < cautiousThreshold) {
|
|
||||||
return "CAUTIOUS";
|
|
||||||
} else {
|
|
||||||
return "WAIT";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Intelligent waiting strategy - adjust send control based on buffer status
|
|
||||||
private async smartBufferControl(
|
|
||||||
dataChannel: RTCDataChannel,
|
|
||||||
peerId: string
|
|
||||||
): Promise<void> {
|
|
||||||
const strategy = await this.intelligentSendControl(dataChannel, peerId);
|
|
||||||
|
|
||||||
if (strategy === "AGGRESSIVE") {
|
|
||||||
// Aggressive mode: no need to wait, send immediately
|
|
||||||
return;
|
|
||||||
} else if (strategy === "NORMAL") {
|
|
||||||
await new Promise<void>((resolve) => setTimeout(resolve, 5));
|
|
||||||
// Normal mode: slight wait
|
|
||||||
return;
|
|
||||||
} else if (strategy === "CAUTIOUS") {
|
|
||||||
// Cautious mode: brief wait to let the network consume some data
|
|
||||||
await new Promise<void>((resolve) => setTimeout(resolve, 10));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// WAIT mode: requires active polling wait
|
|
||||||
const POLLING_INTERVAL = 5;
|
|
||||||
const MAX_WAIT_TIME = 3000;
|
|
||||||
const startTime = Date.now();
|
|
||||||
const adaptiveThreshold = this.getAdaptiveThreshold(peerId);
|
|
||||||
const threshold_low = adaptiveThreshold * 0.3;
|
|
||||||
const initialBuffered = dataChannel.bufferedAmount;
|
|
||||||
let pollCount = 0;
|
|
||||||
while (dataChannel.bufferedAmount > threshold_low) {
|
|
||||||
pollCount++;
|
|
||||||
|
|
||||||
if (Date.now() - startTime > MAX_WAIT_TIME) {
|
|
||||||
this.log("warn", "Buffer wait timeout", {
|
|
||||||
bufferedAmount: dataChannel.bufferedAmount,
|
|
||||||
threshold: adaptiveThreshold,
|
|
||||||
waitTime: Date.now() - startTime,
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
await new Promise<void>((resolve) =>
|
|
||||||
setTimeout(resolve, POLLING_INTERVAL)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Record wait end status
|
|
||||||
const waitTime = Date.now() - startTime;
|
|
||||||
const finalBuffered = dataChannel.bufferedAmount;
|
|
||||||
const clearedBytes = initialBuffered - finalBuffered;
|
|
||||||
const clearingRate =
|
|
||||||
waitTime > 0 ? clearedBytes / 1024 / (waitTime / 1000) : 0;
|
|
||||||
|
|
||||||
// Update network performance learning
|
|
||||||
if (clearingRate > 0) {
|
|
||||||
this.updateNetworkPerformance(peerId, clearingRate, waitTime);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Optimized method for reading a single file chunk
|
|
||||||
private readSingleChunk(
|
|
||||||
fileReader: FileReader,
|
|
||||||
file: CustomFile,
|
|
||||||
offset: number,
|
|
||||||
chunkSize: number
|
|
||||||
): Promise<ArrayBuffer> {
|
|
||||||
return new Promise((resolve, reject) => {
|
|
||||||
const slice = file.slice(offset, offset + chunkSize);
|
|
||||||
fileReader.onload = (e) => {
|
|
||||||
if (e.target?.result instanceof ArrayBuffer) {
|
|
||||||
resolve(e.target.result);
|
|
||||||
} else {
|
|
||||||
reject(new Error("Failed to read blob as ArrayBuffer"));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
fileReader.onerror = () =>
|
|
||||||
reject(fileReader.error || new Error("Read error"));
|
|
||||||
fileReader.readAsArrayBuffer(slice);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Batch read multiple file chunks to improve I/O performance
|
|
||||||
private async readMultipleChunks(
|
|
||||||
fileReader: FileReader,
|
|
||||||
file: CustomFile,
|
|
||||||
startOffset: number,
|
|
||||||
chunkSize: number,
|
|
||||||
batchSize: number
|
|
||||||
): Promise<ArrayBuffer[]> {
|
|
||||||
const chunks: ArrayBuffer[] = [];
|
|
||||||
const remainingSize = file.size - startOffset;
|
|
||||||
const actualBatchSize = Math.min(
|
|
||||||
batchSize,
|
|
||||||
Math.ceil(remainingSize / chunkSize)
|
|
||||||
);
|
|
||||||
|
|
||||||
for (let i = 0; i < actualBatchSize; i++) {
|
|
||||||
const offset = startOffset + i * chunkSize;
|
|
||||||
if (offset >= file.size) break;
|
|
||||||
|
|
||||||
const currentChunkSize = Math.min(chunkSize, file.size - offset);
|
|
||||||
const chunk = await this.readSingleChunk(
|
|
||||||
fileReader,
|
|
||||||
file,
|
|
||||||
offset,
|
|
||||||
currentChunkSize
|
|
||||||
);
|
|
||||||
chunks.push(chunk);
|
|
||||||
}
|
|
||||||
|
|
||||||
return chunks;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 🚀 修复版本:在网络传输层面正确添加序号 - 彻底解决Firefox乱序问题
|
|
||||||
private async processSendQueue(
|
|
||||||
file: CustomFile,
|
|
||||||
peerId: string
|
|
||||||
): Promise<void> {
|
|
||||||
const fileId = generateFileId(file);
|
|
||||||
const peerState = this.getPeerState(peerId);
|
|
||||||
const fileReader = new FileReader();
|
|
||||||
|
|
||||||
let fileOffset = peerState.readOffset || 0;
|
|
||||||
const batchSize = FileSender.OPTIMIZED_CONFIG.BATCH_SIZE;
|
|
||||||
let totalBytesSentInLoop = 0;
|
|
||||||
|
|
||||||
// 🔧 关键修复:使用网络传输块大小计算totalChunks
|
|
||||||
const networkChunkSize = FileSender.OPTIMIZED_CONFIG.NETWORK_CHUNK_SIZE; // 64KB
|
|
||||||
const remainingSize = file.size - fileOffset;
|
|
||||||
const totalNetworkChunks = Math.ceil(remainingSize / networkChunkSize);
|
|
||||||
|
|
||||||
// Initialize network performance monitoring
|
|
||||||
this.initializeNetworkPerformance(peerId);
|
|
||||||
|
|
||||||
try {
|
|
||||||
let networkChunkIndex = 0; // 网络块序号
|
|
||||||
let currentFileOffset = fileOffset;
|
|
||||||
|
|
||||||
// 按网络块大小逐个发送
|
|
||||||
while (currentFileOffset < file.size && peerState.isSending) {
|
|
||||||
// 计算当前网络块的实际大小
|
|
||||||
const currentNetworkChunkSize = Math.min(
|
|
||||||
networkChunkSize,
|
|
||||||
file.size - currentFileOffset
|
|
||||||
);
|
|
||||||
|
|
||||||
// 读取当前网络块
|
|
||||||
const networkChunk = await this.readSingleChunk(
|
|
||||||
fileReader,
|
|
||||||
file,
|
|
||||||
currentFileOffset,
|
|
||||||
currentNetworkChunkSize
|
|
||||||
);
|
|
||||||
|
|
||||||
// 发送带序号的融合数据包
|
|
||||||
let sendSuccessful = false;
|
|
||||||
try {
|
|
||||||
await this.sendEmbeddedChunk(
|
|
||||||
networkChunk,
|
|
||||||
fileId,
|
|
||||||
networkChunkIndex,
|
|
||||||
totalNetworkChunks,
|
|
||||||
currentFileOffset,
|
|
||||||
peerId
|
|
||||||
);
|
|
||||||
sendSuccessful = true;
|
|
||||||
totalBytesSentInLoop += networkChunk.byteLength;
|
|
||||||
} catch (error) {
|
|
||||||
postLogToBackend(
|
|
||||||
`[Firefox Debug] ❌ Failed to send network chunk #${networkChunkIndex}: ${error}`
|
|
||||||
);
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新进度和位置
|
|
||||||
if (sendSuccessful) {
|
|
||||||
currentFileOffset += networkChunk.byteLength;
|
|
||||||
peerState.readOffset = currentFileOffset;
|
|
||||||
networkChunkIndex++;
|
|
||||||
|
|
||||||
// Update file and folder progress
|
|
||||||
await this.updateProgress(
|
|
||||||
networkChunk.byteLength,
|
|
||||||
fileId,
|
|
||||||
file.size,
|
|
||||||
peerId,
|
|
||||||
true
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (error: any) {
|
|
||||||
const errorMessage = `Error in network-level embedded transfer: ${error.message}`;
|
|
||||||
postLogToBackend(
|
|
||||||
`[Firefox Debug] ❌ Network embedded send error: ${errorMessage}`
|
|
||||||
);
|
|
||||||
this.fireError(errorMessage, {
|
|
||||||
fileId,
|
|
||||||
peerId,
|
|
||||||
currentFileOffset: peerState.readOffset,
|
|
||||||
totalBytesSentInLoop,
|
|
||||||
});
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private abortFileSend(fileId: string, peerId: string): void {
|
|
||||||
this.log("warn", `Aborting file send for ${fileId} to ${peerId}`);
|
|
||||||
const peerState = this.getPeerState(peerId);
|
|
||||||
peerState.isSending = false;
|
|
||||||
peerState.readOffset = 0;
|
|
||||||
peerState.bufferQueue = [];
|
|
||||||
peerState.isReading = false;
|
|
||||||
// Optionally, send an abort message to the receiver
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,404 @@
|
|||||||
|
import { generateFileId } from "@/lib/fileUtils";
|
||||||
|
import {
|
||||||
|
CustomFile,
|
||||||
|
fileMetadata,
|
||||||
|
WebRTCMessage,
|
||||||
|
FileRequest,
|
||||||
|
EmbeddedChunkMeta,
|
||||||
|
} from "@/types/webrtc";
|
||||||
|
import { StateManager } from "./StateManager";
|
||||||
|
import { MessageHandler, MessageHandlerDelegate } from "./MessageHandler";
|
||||||
|
import { NetworkTransmitter } from "./NetworkTransmitter";
|
||||||
|
import { ProgressTracker, ProgressCallback } from "./ProgressTracker";
|
||||||
|
import { StreamingFileReader } from "./StreamingFileReader";
|
||||||
|
import { TransferConfig } from "./TransferConfig";
|
||||||
|
import WebRTC_Initiator from "../webrtc_Initiator";
|
||||||
|
import { postLogToBackend } from "@/app/config/api";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 文件传输编排器
|
||||||
|
* 整合所有组件,提供统一的文件传输服务
|
||||||
|
*/
|
||||||
|
export class FileTransferOrchestrator implements MessageHandlerDelegate {
|
||||||
|
private stateManager: StateManager;
|
||||||
|
private messageHandler: MessageHandler;
|
||||||
|
private networkTransmitter: NetworkTransmitter;
|
||||||
|
private progressTracker: ProgressTracker;
|
||||||
|
|
||||||
|
constructor(private webrtcConnection: WebRTC_Initiator) {
|
||||||
|
// 初始化所有组件
|
||||||
|
this.stateManager = new StateManager();
|
||||||
|
this.networkTransmitter = new NetworkTransmitter(webrtcConnection, this.stateManager);
|
||||||
|
this.progressTracker = new ProgressTracker(this.stateManager);
|
||||||
|
this.messageHandler = new MessageHandler(this.stateManager, this);
|
||||||
|
|
||||||
|
// 设置数据处理器
|
||||||
|
this.setupDataHandler();
|
||||||
|
|
||||||
|
this.log("log", "FileTransferOrchestrator initialized");
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== 公共API - 简化的接口 =====
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 发送文件元数据
|
||||||
|
*/
|
||||||
|
public sendFileMeta(files: CustomFile[], peerId?: string): void {
|
||||||
|
// 记录属于文件夹的文件大小,用于进度计算
|
||||||
|
files.forEach((file) => {
|
||||||
|
if (file.folderName) {
|
||||||
|
const fileId = generateFileId(file);
|
||||||
|
this.stateManager.addFileToFolder(file.folderName, fileId, file.size);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// 循环发送所有文件的元数据
|
||||||
|
const peers = peerId
|
||||||
|
? [peerId]
|
||||||
|
: Array.from(this.webrtcConnection.peerConnections.keys());
|
||||||
|
|
||||||
|
peers.forEach((pId) => {
|
||||||
|
files.forEach((file) => {
|
||||||
|
const fileId = generateFileId(file);
|
||||||
|
this.stateManager.addPendingFile(fileId, file);
|
||||||
|
|
||||||
|
const fileMeta = this.getFileMeta(file);
|
||||||
|
const metaDataString = JSON.stringify(fileMeta);
|
||||||
|
|
||||||
|
const sendResult = this.webrtcConnection.sendData(metaDataString, pId);
|
||||||
|
if (!sendResult) {
|
||||||
|
this.fireError("Failed to send file metadata", {
|
||||||
|
fileMeta,
|
||||||
|
peerId: pId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 发送字符串内容
|
||||||
|
*/
|
||||||
|
public async sendString(content: string, peerId: string): Promise<void> {
|
||||||
|
const chunkSize = TransferConfig.FILE_CONFIG.CHUNK_SIZE;
|
||||||
|
const chunks: string[] = [];
|
||||||
|
|
||||||
|
for (let i = 0; i < content.length; i += chunkSize) {
|
||||||
|
chunks.push(content.slice(i, i + chunkSize));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 首先发送元数据
|
||||||
|
await this.networkTransmitter.sendWithBackpressure(
|
||||||
|
JSON.stringify({
|
||||||
|
type: "stringMetadata",
|
||||||
|
length: content.length,
|
||||||
|
}),
|
||||||
|
peerId
|
||||||
|
);
|
||||||
|
|
||||||
|
// 逐块发送,使用背压控制
|
||||||
|
for (let i = 0; i < chunks.length; i++) {
|
||||||
|
const data = JSON.stringify({
|
||||||
|
type: "string",
|
||||||
|
chunk: chunks[i],
|
||||||
|
index: i,
|
||||||
|
total: chunks.length,
|
||||||
|
});
|
||||||
|
await this.networkTransmitter.sendWithBackpressure(data, peerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.log("log", `String sent successfully - length: ${content.length}, chunks: ${chunks.length}`, { peerId });
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 设置进度回调
|
||||||
|
*/
|
||||||
|
public setProgressCallback(callback: ProgressCallback, peerId: string): void {
|
||||||
|
this.progressTracker.setProgressCallback(callback, peerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== MessageHandlerDelegate 实现 =====
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📄 处理文件请求(来自MessageHandler的委托)
|
||||||
|
*/
|
||||||
|
async handleFileRequest(request: FileRequest, peerId: string): Promise<void> {
|
||||||
|
const file = this.stateManager.getPendingFile(request.fileId);
|
||||||
|
const offset = request.offset || 0;
|
||||||
|
|
||||||
|
if (!file) {
|
||||||
|
this.fireError(`File not found for request`, {
|
||||||
|
fileId: request.fileId,
|
||||||
|
peerId,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 🚀 Starting file send - fileName: ${file.name}, fileSize: ${file.size}, offset: ${offset}`
|
||||||
|
);
|
||||||
|
|
||||||
|
await this.sendSingleFile(file, peerId, offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📝 日志记录(来自MessageHandler的委托)
|
||||||
|
*/
|
||||||
|
public log(
|
||||||
|
level: "log" | "warn" | "error",
|
||||||
|
message: string,
|
||||||
|
context?: Record<string, any>
|
||||||
|
): void {
|
||||||
|
const prefix = `[FileTransferOrchestrator]`;
|
||||||
|
console[level](prefix, message, context || "");
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== 内部编排方法 =====
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 发送单个文件
|
||||||
|
*/
|
||||||
|
private async sendSingleFile(
|
||||||
|
file: CustomFile,
|
||||||
|
peerId: string,
|
||||||
|
offset: number = 0
|
||||||
|
): Promise<void> {
|
||||||
|
const fileId = generateFileId(file);
|
||||||
|
const peerState = this.stateManager.getPeerState(peerId);
|
||||||
|
|
||||||
|
if (peerState.isSending) {
|
||||||
|
this.log("warn", `Already sending file to peer ${peerId}`, { fileId });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 初始化发送状态
|
||||||
|
this.stateManager.updatePeerState(peerId, {
|
||||||
|
isSending: true,
|
||||||
|
currentFolderName: file.folderName,
|
||||||
|
readOffset: offset,
|
||||||
|
bufferQueue: [],
|
||||||
|
isReading: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
// 初始化进度统计
|
||||||
|
const currentSent = this.stateManager.getFileBytesSent(peerId, fileId);
|
||||||
|
this.stateManager.updateFileBytesSent(peerId, fileId, offset - currentSent);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.processSendQueue(file, peerId);
|
||||||
|
await this.waitForTransferComplete(peerId);
|
||||||
|
} catch (error: any) {
|
||||||
|
this.fireError(`Error sending file ${file.name}: ${error.message}`, {
|
||||||
|
fileId,
|
||||||
|
peerId,
|
||||||
|
});
|
||||||
|
this.abortFileSend(fileId, peerId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 处理发送队列 - 使用StreamingFileReader
|
||||||
|
*/
|
||||||
|
private async processSendQueue(
|
||||||
|
file: CustomFile,
|
||||||
|
peerId: string
|
||||||
|
): Promise<void> {
|
||||||
|
const fileId = generateFileId(file);
|
||||||
|
const peerState = this.stateManager.getPeerState(peerId);
|
||||||
|
|
||||||
|
// 1. 初始化流式文件读取器
|
||||||
|
const streamReader = new StreamingFileReader(file, peerState.readOffset || 0);
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 🚀 STREAMING_SEND start - file: ${file.name}, size: ${file.size}, startOffset: ${peerState.readOffset || 0}`
|
||||||
|
);
|
||||||
|
|
||||||
|
// 初始化网络性能监控
|
||||||
|
this.stateManager.initializeNetworkPerformance(peerId);
|
||||||
|
|
||||||
|
try {
|
||||||
|
let totalBytesSent = 0;
|
||||||
|
let networkChunkIndex = 0;
|
||||||
|
|
||||||
|
// 2. 流式处理:逐个获取64KB网络块并发送
|
||||||
|
while (peerState.isSending) {
|
||||||
|
// 获取下一个网络块
|
||||||
|
const chunkInfo = await streamReader.getNextNetworkChunk();
|
||||||
|
|
||||||
|
// 检查是否已完成
|
||||||
|
if (chunkInfo.chunk === null) {
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 🏁 STREAMING_SEND completed - totalChunks: ${networkChunkIndex}, totalBytes: ${totalBytesSent}`
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 构建嵌入式元数据
|
||||||
|
const embeddedMeta: EmbeddedChunkMeta = {
|
||||||
|
chunkIndex: chunkInfo.chunkIndex,
|
||||||
|
totalChunks: chunkInfo.totalChunks,
|
||||||
|
chunkSize: chunkInfo.chunk.byteLength,
|
||||||
|
isLastChunk: chunkInfo.isLastChunk,
|
||||||
|
fileOffset: chunkInfo.fileOffset,
|
||||||
|
fileId,
|
||||||
|
};
|
||||||
|
|
||||||
|
// 发送带嵌入元数据的网络块
|
||||||
|
let sendSuccessful = false;
|
||||||
|
try {
|
||||||
|
sendSuccessful = await this.networkTransmitter.sendEmbeddedChunk(
|
||||||
|
chunkInfo.chunk,
|
||||||
|
embeddedMeta,
|
||||||
|
peerId
|
||||||
|
);
|
||||||
|
|
||||||
|
if (sendSuccessful) {
|
||||||
|
totalBytesSent += chunkInfo.chunk.byteLength;
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] ✓ STREAMING_CHUNK sent #${chunkInfo.chunkIndex}/${chunkInfo.totalChunks} - size: ${chunkInfo.chunk.byteLength}, isLast: ${chunkInfo.isLastChunk}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] ❌ STREAMING_CHUNK failed #${chunkInfo.chunkIndex}: ${error}`
|
||||||
|
);
|
||||||
|
sendSuccessful = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 更新状态和进度
|
||||||
|
if (sendSuccessful) {
|
||||||
|
this.stateManager.updatePeerState(peerId, {
|
||||||
|
readOffset: chunkInfo.fileOffset + chunkInfo.chunk.byteLength
|
||||||
|
});
|
||||||
|
|
||||||
|
await this.progressTracker.updateFileProgress(
|
||||||
|
chunkInfo.chunk.byteLength,
|
||||||
|
fileId,
|
||||||
|
file.size,
|
||||||
|
peerId,
|
||||||
|
true
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
this.log("warn", `Send failed, continuing with next chunk...`, {
|
||||||
|
chunkIndex: chunkInfo.chunkIndex,
|
||||||
|
fileId,
|
||||||
|
peerId
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
networkChunkIndex++;
|
||||||
|
|
||||||
|
// 检查是否为最后一块
|
||||||
|
if (chunkInfo.isLastChunk) {
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 🏁 Last chunk sent, waiting for receiver confirmation...`
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] ✅ File send completed - ${file.name}, totalChunks: ${networkChunkIndex}, totalBytes: ${totalBytesSent}`
|
||||||
|
);
|
||||||
|
|
||||||
|
} catch (error: any) {
|
||||||
|
const errorMessage = `Streaming send error: ${error.message}`;
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] ❌ STREAMING_ERROR: ${errorMessage}`
|
||||||
|
);
|
||||||
|
this.fireError(errorMessage, { fileId, peerId, offset: peerState.readOffset });
|
||||||
|
throw error;
|
||||||
|
} finally {
|
||||||
|
// 清理资源
|
||||||
|
streamReader.cleanup();
|
||||||
|
postLogToBackend(`[DEBUG] 🧹 StreamingFileReader cleaned up`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ⏳ 等待传输完成确认
|
||||||
|
*/
|
||||||
|
private async waitForTransferComplete(peerId: string): Promise<void> {
|
||||||
|
const peerState = this.stateManager.getPeerState(peerId);
|
||||||
|
while (peerState?.isSending) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📋 获取文件元数据
|
||||||
|
*/
|
||||||
|
private getFileMeta(file: CustomFile): fileMetadata {
|
||||||
|
const fileId = generateFileId(file);
|
||||||
|
return {
|
||||||
|
type: "fileMeta",
|
||||||
|
fileId,
|
||||||
|
name: file.name,
|
||||||
|
size: file.size,
|
||||||
|
fileType: file.type,
|
||||||
|
fullName: file.fullName,
|
||||||
|
folderName: file.folderName,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ❌ 中止文件发送
|
||||||
|
*/
|
||||||
|
private abortFileSend(fileId: string, peerId: string): void {
|
||||||
|
this.log("warn", `Aborting file send for ${fileId} to ${peerId}`);
|
||||||
|
this.stateManager.resetPeerState(peerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🔧 设置数据处理器
|
||||||
|
*/
|
||||||
|
private setupDataHandler(): void {
|
||||||
|
this.webrtcConnection.onDataReceived = (data, peerId) => {
|
||||||
|
if (typeof data === "string") {
|
||||||
|
try {
|
||||||
|
const parsedData = JSON.parse(data) as WebRTCMessage;
|
||||||
|
this.messageHandler.handleSignalingMessage(parsedData, peerId);
|
||||||
|
} catch (error) {
|
||||||
|
this.fireError("Error parsing received JSON data", { error, peerId });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🔥 错误处理
|
||||||
|
*/
|
||||||
|
private fireError(message: string, context?: Record<string, any>) {
|
||||||
|
this.webrtcConnection.fireError(message, {
|
||||||
|
...context,
|
||||||
|
component: "FileTransferOrchestrator",
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== 状态查询和调试 =====
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📊 获取传输统计信息
|
||||||
|
*/
|
||||||
|
public getTransferStats(peerId?: string) {
|
||||||
|
const stats = {
|
||||||
|
stateManager: this.stateManager.getStateStats(),
|
||||||
|
progressTracker: peerId ? this.progressTracker.getProgressStats(peerId) : null,
|
||||||
|
networkTransmitter: peerId ? this.networkTransmitter.getTransmissionStats(peerId) : null,
|
||||||
|
};
|
||||||
|
|
||||||
|
return stats;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🧹 清理所有资源
|
||||||
|
*/
|
||||||
|
public cleanup(): void {
|
||||||
|
this.stateManager.cleanup();
|
||||||
|
this.networkTransmitter.cleanup();
|
||||||
|
this.progressTracker.cleanup();
|
||||||
|
this.messageHandler.cleanup();
|
||||||
|
|
||||||
|
this.log("log", "FileTransferOrchestrator cleaned up");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,169 @@
|
|||||||
|
import {
|
||||||
|
WebRTCMessage,
|
||||||
|
FileRequest,
|
||||||
|
FileReceiveComplete,
|
||||||
|
FolderReceiveComplete,
|
||||||
|
} from "@/types/webrtc";
|
||||||
|
import { StateManager } from "./StateManager";
|
||||||
|
import { postLogToBackend } from "@/app/config/api";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 消息处理接口 - 与主编排器通信
|
||||||
|
*/
|
||||||
|
export interface MessageHandlerDelegate {
|
||||||
|
handleFileRequest(request: FileRequest, peerId: string): Promise<void>;
|
||||||
|
log(level: "log" | "warn" | "error", message: string, context?: Record<string, any>): void;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 消息处理器
|
||||||
|
* 负责WebRTC消息的路由和处理逻辑
|
||||||
|
*/
|
||||||
|
export class MessageHandler {
|
||||||
|
constructor(
|
||||||
|
private stateManager: StateManager,
|
||||||
|
private delegate: MessageHandlerDelegate
|
||||||
|
) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 处理接收到的信令消息
|
||||||
|
*/
|
||||||
|
handleSignalingMessage(message: WebRTCMessage, peerId: string): void {
|
||||||
|
postLogToBackend(`[DEBUG] 📨 Message received - type: ${message.type}, peerId: ${peerId}`);
|
||||||
|
|
||||||
|
switch (message.type) {
|
||||||
|
case "fileRequest":
|
||||||
|
this.handleFileRequest(message as FileRequest, peerId);
|
||||||
|
break;
|
||||||
|
case "fileReceiveComplete":
|
||||||
|
this.handleFileReceiveComplete(message as FileReceiveComplete, peerId);
|
||||||
|
break;
|
||||||
|
case "folderReceiveComplete":
|
||||||
|
this.handleFolderReceiveComplete(message as FolderReceiveComplete, peerId);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
this.delegate.log("warn", `Unknown signaling message type received`, {
|
||||||
|
type: message.type,
|
||||||
|
peerId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📄 处理文件请求消息
|
||||||
|
*/
|
||||||
|
private async handleFileRequest(request: FileRequest, peerId: string): Promise<void> {
|
||||||
|
const offset = request.offset || 0;
|
||||||
|
|
||||||
|
this.delegate.log(
|
||||||
|
"log",
|
||||||
|
`Handling file request for ${request.fileId} from ${peerId} with offset ${offset}`
|
||||||
|
);
|
||||||
|
|
||||||
|
// Firefox兼容性修复:添加稍长延迟确保接收端完全准备好
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||||
|
|
||||||
|
// 委托给主编排器处理具体的文件传输
|
||||||
|
try {
|
||||||
|
await this.delegate.handleFileRequest(request, peerId);
|
||||||
|
} catch (error) {
|
||||||
|
this.delegate.log("error", `Error handling file request`, {
|
||||||
|
fileId: request.fileId,
|
||||||
|
peerId,
|
||||||
|
error: error instanceof Error ? error.message : String(error),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ✅ 处理文件接收完成确认消息
|
||||||
|
*/
|
||||||
|
private handleFileReceiveComplete(
|
||||||
|
message: FileReceiveComplete,
|
||||||
|
peerId: string
|
||||||
|
): void {
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 📥 Received fileReceiveComplete - fileId: ${message.fileId}, receivedSize: ${message.receivedSize}, receivedChunks: ${message.receivedChunks}, storeUpdated: ${message.storeUpdated}`
|
||||||
|
);
|
||||||
|
|
||||||
|
// 清理发送状态
|
||||||
|
this.stateManager.updatePeerState(peerId, { isSending: false });
|
||||||
|
|
||||||
|
// 获取peer状态以触发进度回调
|
||||||
|
const peerState = this.stateManager.getPeerState(peerId);
|
||||||
|
|
||||||
|
// 触发单文件100%进度(只有非文件夹情况)
|
||||||
|
if (!peerState.currentFolderName) {
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 🎯 Setting single file progress to 100% - ${message.fileId}`
|
||||||
|
);
|
||||||
|
peerState.progressCallback?.(message.fileId, 1, 0);
|
||||||
|
} else {
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 📁 File in folder completed, not setting progress yet - ${message.fileId} (folder: ${peerState.currentFolderName})`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.delegate.log("log", `File reception confirmed by peer ${peerId}`, {
|
||||||
|
fileId: message.fileId,
|
||||||
|
receivedSize: message.receivedSize,
|
||||||
|
storeUpdated: message.storeUpdated,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📁 处理文件夹接收完成确认消息
|
||||||
|
*/
|
||||||
|
private handleFolderReceiveComplete(
|
||||||
|
message: FolderReceiveComplete,
|
||||||
|
peerId: string
|
||||||
|
): void {
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 📥 Received folderReceiveComplete - folderName: ${message.folderName}, completedFiles: ${message.completedFileIds.length}, allStoreUpdated: ${message.allStoreUpdated}`
|
||||||
|
);
|
||||||
|
|
||||||
|
// 获取peer状态以触发进度回调
|
||||||
|
const peerState = this.stateManager.getPeerState(peerId);
|
||||||
|
|
||||||
|
// 触发文件夹100%进度
|
||||||
|
const folderMeta = this.stateManager.getFolderMeta(message.folderName);
|
||||||
|
if (folderMeta) {
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 🎯 Setting folder progress to 100% - ${message.folderName}`
|
||||||
|
);
|
||||||
|
peerState.progressCallback?.(message.folderName, 1, 0);
|
||||||
|
} else {
|
||||||
|
this.delegate.log("warn", `Folder metadata not found for completed folder`, {
|
||||||
|
folderName: message.folderName,
|
||||||
|
peerId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
this.delegate.log("log", `Folder reception confirmed by peer ${peerId}`, {
|
||||||
|
folderName: message.folderName,
|
||||||
|
completedFiles: message.completedFileIds.length,
|
||||||
|
allStoreUpdated: message.allStoreUpdated,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📊 获取消息处理统计信息
|
||||||
|
*/
|
||||||
|
public getMessageStats(): {
|
||||||
|
handledMessages: number;
|
||||||
|
lastMessageTime: number | null;
|
||||||
|
} {
|
||||||
|
// 这里可以添加消息统计逻辑,如果需要的话
|
||||||
|
return {
|
||||||
|
handledMessages: 0, // TODO: 实现消息计数
|
||||||
|
lastMessageTime: null, // TODO: 记录最后消息时间
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🧹 清理资源
|
||||||
|
*/
|
||||||
|
public cleanup(): void {
|
||||||
|
postLogToBackend("[DEBUG] 🧹 MessageHandler cleaned up");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,318 @@
|
|||||||
|
import { EmbeddedChunkMeta } from "@/types/webrtc";
|
||||||
|
import { StateManager } from "./StateManager";
|
||||||
|
import { TransferConfig } from "./TransferConfig";
|
||||||
|
import WebRTC_Initiator from "../webrtc_Initiator";
|
||||||
|
import { postLogToBackend } from "@/app/config/api";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 发送策略枚举
|
||||||
|
*/
|
||||||
|
type SendStrategy = "AGGRESSIVE" | "NORMAL" | "CAUTIOUS" | "WAIT";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 网络传输器
|
||||||
|
* 负责所有WebRTC数据传输、背压控制、自适应性能调整
|
||||||
|
*/
|
||||||
|
export class NetworkTransmitter {
|
||||||
|
constructor(
|
||||||
|
private webrtcConnection: WebRTC_Initiator,
|
||||||
|
private stateManager: StateManager
|
||||||
|
) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 发送带序号的融合数据包
|
||||||
|
*/
|
||||||
|
async sendEmbeddedChunk(
|
||||||
|
chunkData: ArrayBuffer,
|
||||||
|
metadata: EmbeddedChunkMeta,
|
||||||
|
peerId: string
|
||||||
|
): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
// 1. 构建融合数据包
|
||||||
|
const embeddedPacket = this.createEmbeddedChunkPacket(chunkData, metadata);
|
||||||
|
|
||||||
|
// 2. 发送完整的融合数据包(不可分片)
|
||||||
|
await this.sendSingleData(embeddedPacket, peerId);
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] ✓ EMBEDDED chunk #${metadata.chunkIndex}/${metadata.totalChunks} sent - size: ${chunkData.byteLength}, packet: ${embeddedPacket.byteLength} bytes, isLast: ${metadata.isLastChunk}`
|
||||||
|
);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} catch (error) {
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] ❌ EMBEDDED chunk #${metadata.chunkIndex} send failed: ${error}`
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 构建融合元数据的数据包
|
||||||
|
*/
|
||||||
|
private createEmbeddedChunkPacket(
|
||||||
|
chunkData: ArrayBuffer,
|
||||||
|
chunkMeta: EmbeddedChunkMeta
|
||||||
|
): ArrayBuffer {
|
||||||
|
// 1. 将元数据序列化为JSON
|
||||||
|
const metaJson = JSON.stringify(chunkMeta);
|
||||||
|
const metaBytes = new TextEncoder().encode(metaJson);
|
||||||
|
|
||||||
|
// 2. 元数据长度(4字节)
|
||||||
|
const metaLengthBuffer = new ArrayBuffer(4);
|
||||||
|
const metaLengthView = new Uint32Array(metaLengthBuffer);
|
||||||
|
metaLengthView[0] = metaBytes.length;
|
||||||
|
|
||||||
|
// 3. 构建最终的融合数据包
|
||||||
|
const totalLength = 4 + metaBytes.length + chunkData.byteLength;
|
||||||
|
const finalPacket = new Uint8Array(totalLength);
|
||||||
|
|
||||||
|
// 拼接: [4字节长度] + [元数据] + [原始chunk数据]
|
||||||
|
finalPacket.set(new Uint8Array(metaLengthBuffer), 0);
|
||||||
|
finalPacket.set(metaBytes, 4);
|
||||||
|
finalPacket.set(new Uint8Array(chunkData), 4 + metaBytes.length);
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 📦 EMBEDDED packet created - chunkIndex: ${chunkMeta.chunkIndex}, metaSize: ${metaBytes.length}, chunkSize: ${chunkData.byteLength}, totalSize: ${totalLength}`
|
||||||
|
);
|
||||||
|
|
||||||
|
return finalPacket.buffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 发送单个数据包(禁止分片)
|
||||||
|
*/
|
||||||
|
private async sendSingleData(
|
||||||
|
data: string | ArrayBuffer,
|
||||||
|
peerId: string
|
||||||
|
): Promise<void> {
|
||||||
|
const dataChannel = this.webrtcConnection.dataChannels.get(peerId);
|
||||||
|
if (!dataChannel) {
|
||||||
|
throw new Error("Data channel not found");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 调试信息
|
||||||
|
const dataType = typeof data === "string" ? "string" : data instanceof ArrayBuffer ? "ArrayBuffer" : "unknown";
|
||||||
|
const dataSize = typeof data === "string" ? data.length : data instanceof ArrayBuffer ? data.byteLength : 0;
|
||||||
|
|
||||||
|
// 智能背压控制
|
||||||
|
await this.smartBufferControl(dataChannel, peerId);
|
||||||
|
|
||||||
|
// 直接发送,不分片
|
||||||
|
const sendResult = this.webrtcConnection.sendData(data, peerId);
|
||||||
|
|
||||||
|
if (!sendResult) {
|
||||||
|
const errorMessage = `sendData failed for ${dataType} data of size ${dataSize}`;
|
||||||
|
postLogToBackend(`[DEBUG] ❌ ${errorMessage}`);
|
||||||
|
throw new Error(errorMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 📤 Data sent successfully - type: ${dataType}, size: ${dataSize}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 发送带背压控制的数据
|
||||||
|
*/
|
||||||
|
async sendWithBackpressure(
|
||||||
|
data: string | ArrayBuffer,
|
||||||
|
peerId: string
|
||||||
|
): Promise<void> {
|
||||||
|
const dataChannel = this.webrtcConnection.dataChannels.get(peerId);
|
||||||
|
if (!dataChannel) {
|
||||||
|
throw new Error("Data channel not found");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 对于ArrayBuffer,如果超过64KB,需要分片发送(修复sendData failed)
|
||||||
|
if (data instanceof ArrayBuffer) {
|
||||||
|
await this.sendLargeArrayBuffer(data, peerId);
|
||||||
|
} else {
|
||||||
|
await this.sendSingleData(data, peerId);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
const errorMessage = `sendWithBackpressure failed: ${error}`;
|
||||||
|
postLogToBackend(`[DEBUG] ${errorMessage}`);
|
||||||
|
throw new Error(errorMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 发送大型ArrayBuffer(分片处理)
|
||||||
|
*/
|
||||||
|
private async sendLargeArrayBuffer(
|
||||||
|
data: ArrayBuffer,
|
||||||
|
peerId: string
|
||||||
|
): Promise<void> {
|
||||||
|
const networkChunkSize = TransferConfig.FILE_CONFIG.NETWORK_CHUNK_SIZE;
|
||||||
|
const totalSize = data.byteLength;
|
||||||
|
|
||||||
|
// 如果数据小于64KB,直接发送
|
||||||
|
if (totalSize <= networkChunkSize) {
|
||||||
|
await this.sendSingleData(data, peerId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 大块数据分片发送
|
||||||
|
let offset = 0;
|
||||||
|
let fragmentIndex = 0;
|
||||||
|
|
||||||
|
while (offset < totalSize) {
|
||||||
|
const chunkSize = Math.min(networkChunkSize, totalSize - offset);
|
||||||
|
const chunk = data.slice(offset, offset + chunkSize);
|
||||||
|
|
||||||
|
// 发送分片
|
||||||
|
await this.sendSingleData(chunk, peerId);
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 📦 Fragment sent #${fragmentIndex} - size: ${chunkSize}`
|
||||||
|
);
|
||||||
|
|
||||||
|
offset += chunkSize;
|
||||||
|
fragmentIndex++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 智能缓冲控制策略
|
||||||
|
*/
|
||||||
|
private async smartBufferControl(
|
||||||
|
dataChannel: RTCDataChannel,
|
||||||
|
peerId: string
|
||||||
|
): Promise<void> {
|
||||||
|
const strategy = await this.intelligentSendControl(dataChannel, peerId);
|
||||||
|
|
||||||
|
switch (strategy) {
|
||||||
|
case "AGGRESSIVE":
|
||||||
|
// 积极模式:立即发送
|
||||||
|
return;
|
||||||
|
|
||||||
|
case "NORMAL":
|
||||||
|
// 正常模式:轻微等待
|
||||||
|
await new Promise<void>((resolve) => setTimeout(resolve, 5));
|
||||||
|
return;
|
||||||
|
|
||||||
|
case "CAUTIOUS":
|
||||||
|
// 谨慎模式:短暂等待
|
||||||
|
await new Promise<void>((resolve) => setTimeout(resolve, 10));
|
||||||
|
return;
|
||||||
|
|
||||||
|
case "WAIT":
|
||||||
|
// 等待模式:主动轮询等待
|
||||||
|
await this.activePollingWait(dataChannel, peerId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 自适应智能发送控制策略
|
||||||
|
*/
|
||||||
|
private async intelligentSendControl(
|
||||||
|
dataChannel: RTCDataChannel,
|
||||||
|
peerId: string
|
||||||
|
): Promise<SendStrategy> {
|
||||||
|
const bufferedAmount = dataChannel.bufferedAmount;
|
||||||
|
const adaptiveThreshold = this.stateManager.getAdaptiveThreshold(peerId);
|
||||||
|
const utilizationRate = bufferedAmount / adaptiveThreshold;
|
||||||
|
|
||||||
|
// 根据网络性能动态调整策略阈值
|
||||||
|
const perf = this.stateManager.getNetworkPerformance(peerId);
|
||||||
|
const networkQuality = this.getNetworkQuality(perf?.avgClearingRate || 0);
|
||||||
|
|
||||||
|
let thresholds = TransferConfig.getAdaptiveThresholds(perf?.avgClearingRate || 0).strategy;
|
||||||
|
|
||||||
|
if (utilizationRate < thresholds.aggressive) {
|
||||||
|
return "AGGRESSIVE";
|
||||||
|
} else if (utilizationRate < thresholds.normal) {
|
||||||
|
return "NORMAL";
|
||||||
|
} else if (utilizationRate < (thresholds.cautious || TransferConfig.SEND_STRATEGY_CONFIG.CAUTIOUS_THRESHOLD)) {
|
||||||
|
return "CAUTIOUS";
|
||||||
|
} else {
|
||||||
|
return "WAIT";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🔍 获取网络质量评级
|
||||||
|
*/
|
||||||
|
private getNetworkQuality(avgClearingRate: number): "good" | "average" | "poor" {
|
||||||
|
const config = TransferConfig.QUALITY_CONFIG;
|
||||||
|
if (avgClearingRate > config.GOOD_NETWORK_SPEED) {
|
||||||
|
return "good";
|
||||||
|
} else if (avgClearingRate > config.AVERAGE_NETWORK_SPEED) {
|
||||||
|
return "average";
|
||||||
|
} else {
|
||||||
|
return "poor";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🔄 主动轮询等待(WAIT模式)
|
||||||
|
*/
|
||||||
|
private async activePollingWait(
|
||||||
|
dataChannel: RTCDataChannel,
|
||||||
|
peerId: string
|
||||||
|
): Promise<void> {
|
||||||
|
const config = TransferConfig.SEND_STRATEGY_CONFIG;
|
||||||
|
const startTime = Date.now();
|
||||||
|
const adaptiveThreshold = this.stateManager.getAdaptiveThreshold(peerId);
|
||||||
|
const threshold_low = adaptiveThreshold * 0.3;
|
||||||
|
const initialBuffered = dataChannel.bufferedAmount;
|
||||||
|
let pollCount = 0;
|
||||||
|
|
||||||
|
while (dataChannel.bufferedAmount > threshold_low) {
|
||||||
|
pollCount++;
|
||||||
|
|
||||||
|
if (Date.now() - startTime > config.MAX_WAIT_TIME) {
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] ⚠️ Buffer wait timeout - buffered: ${dataChannel.bufferedAmount}, threshold: ${adaptiveThreshold}, waitTime: ${Date.now() - startTime}ms`
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
await new Promise<void>((resolve) =>
|
||||||
|
setTimeout(resolve, config.POLLING_INTERVAL)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 记录等待结束状态并更新网络性能
|
||||||
|
const waitTime = Date.now() - startTime;
|
||||||
|
const finalBuffered = dataChannel.bufferedAmount;
|
||||||
|
const clearedBytes = initialBuffered - finalBuffered;
|
||||||
|
const clearingRate = waitTime > 0 ? clearedBytes / 1024 / (waitTime / 1000) : 0;
|
||||||
|
|
||||||
|
// 更新网络性能学习
|
||||||
|
if (clearingRate > 0) {
|
||||||
|
this.stateManager.updateNetworkPerformance(peerId, clearingRate, waitTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 📊 Wait completed - cleared: ${clearedBytes} bytes, rate: ${clearingRate.toFixed(2)} KB/s, time: ${waitTime}ms, polls: ${pollCount}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📊 获取传输统计信息
|
||||||
|
*/
|
||||||
|
public getTransmissionStats(peerId: string) {
|
||||||
|
const networkPerf = this.stateManager.getNetworkPerformance(peerId);
|
||||||
|
const dataChannel = this.webrtcConnection.dataChannels.get(peerId);
|
||||||
|
|
||||||
|
return {
|
||||||
|
peerId,
|
||||||
|
networkPerformance: networkPerf || null,
|
||||||
|
currentBufferedAmount: dataChannel?.bufferedAmount || 0,
|
||||||
|
adaptiveThreshold: this.stateManager.getAdaptiveThreshold(peerId),
|
||||||
|
channelState: dataChannel?.readyState || 'unknown',
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🧹 清理资源
|
||||||
|
*/
|
||||||
|
public cleanup(): void {
|
||||||
|
// NetworkTransmitter本身没有需要清理的资源
|
||||||
|
// 实际的清理工作由StateManager和WebRTC_Initiator处理
|
||||||
|
postLogToBackend("[DEBUG] 🧹 NetworkTransmitter cleaned up");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,232 @@
|
|||||||
|
import { SpeedCalculator } from "@/lib/speedCalculator";
|
||||||
|
import { StateManager } from "./StateManager";
|
||||||
|
import { postLogToBackend } from "@/app/config/api";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 进度回调类型定义
|
||||||
|
*/
|
||||||
|
export type ProgressCallback = (fileId: string, progress: number, speed: number) => void;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 进度跟踪器
|
||||||
|
* 负责文件和文件夹的进度计算、速度统计、回调触发
|
||||||
|
*/
|
||||||
|
export class ProgressTracker {
|
||||||
|
private speedCalculator = new SpeedCalculator();
|
||||||
|
|
||||||
|
constructor(private stateManager: StateManager) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 更新文件传输进度
|
||||||
|
*/
|
||||||
|
async updateFileProgress(
|
||||||
|
byteLength: number,
|
||||||
|
fileId: string,
|
||||||
|
fileSize: number,
|
||||||
|
peerId: string,
|
||||||
|
wasActuallySent: boolean = true
|
||||||
|
): Promise<void> {
|
||||||
|
const peerState = this.stateManager.getPeerState(peerId);
|
||||||
|
if (!peerState) return;
|
||||||
|
|
||||||
|
// 重要修复:只有成功发送的数据才更新统计
|
||||||
|
if (!wasActuallySent) {
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] ⚠️ Data send failed, not updating progress - fileId: ${fileId}, size: ${byteLength}`
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 更新文件已发送字节数
|
||||||
|
this.stateManager.updateFileBytesSent(peerId, fileId, byteLength);
|
||||||
|
|
||||||
|
// 计算进度ID和统计数据
|
||||||
|
let progressFileId = fileId;
|
||||||
|
let currentBytes = this.stateManager.getFileBytesSent(peerId, fileId);
|
||||||
|
let totalSize = fileSize;
|
||||||
|
|
||||||
|
// 如果文件属于文件夹,重新计算文件夹进度
|
||||||
|
if (peerState.currentFolderName) {
|
||||||
|
const folderName = peerState.currentFolderName;
|
||||||
|
const folderMeta = this.stateManager.getFolderMeta(folderName);
|
||||||
|
|
||||||
|
progressFileId = folderName;
|
||||||
|
totalSize = folderMeta?.totalSize || 0;
|
||||||
|
|
||||||
|
// 重新计算文件夹进度(从其所有文件的进度总和)
|
||||||
|
// 这对于断点续传更加健壮和正确
|
||||||
|
currentBytes = this.stateManager.getFolderBytesSent(peerId, folderName);
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 📁 Folder progress update - folder: ${folderName}, file: ${fileId}, currentBytes: ${currentBytes}, totalSize: ${totalSize}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 更新速度计算器
|
||||||
|
this.speedCalculator.updateSendSpeed(peerId, currentBytes);
|
||||||
|
const speed = this.speedCalculator.getSendSpeed(peerId);
|
||||||
|
const progress = totalSize > 0 ? currentBytes / totalSize : 0;
|
||||||
|
|
||||||
|
// 持续更新网络性能(从传输速度学习)
|
||||||
|
this.stateManager.updateNetworkFromSpeed(peerId, speed);
|
||||||
|
|
||||||
|
// 触发进度回调
|
||||||
|
this.triggerProgressCallback(peerId, progressFileId, progress, speed);
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 📊 Progress updated - ${progressFileId}: ${(progress * 100).toFixed(2)}%, speed: ${speed.toFixed(2)} KB/s, bytes: ${currentBytes}/${totalSize}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 更新文件夹传输进度
|
||||||
|
*/
|
||||||
|
async updateFolderProgress(
|
||||||
|
folderName: string,
|
||||||
|
fileProgress: Record<string, number>,
|
||||||
|
peerId: string
|
||||||
|
): Promise<void> {
|
||||||
|
const folderMeta = this.stateManager.getFolderMeta(folderName);
|
||||||
|
if (!folderMeta) {
|
||||||
|
postLogToBackend(`[DEBUG] ⚠️ Folder metadata not found: ${folderName}`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 计算文件夹总进度
|
||||||
|
let totalSentBytes = 0;
|
||||||
|
folderMeta.fileIds.forEach((fileId) => {
|
||||||
|
totalSentBytes += this.stateManager.getFileBytesSent(peerId, fileId);
|
||||||
|
});
|
||||||
|
|
||||||
|
const progress = folderMeta.totalSize > 0 ? totalSentBytes / folderMeta.totalSize : 0;
|
||||||
|
const speed = this.speedCalculator.getSendSpeed(peerId);
|
||||||
|
|
||||||
|
// 触发文件夹进度回调
|
||||||
|
this.triggerProgressCallback(peerId, folderName, progress, speed);
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 📁 Folder progress - ${folderName}: ${(progress * 100).toFixed(2)}%, speed: ${speed.toFixed(2)} KB/s, bytes: ${totalSentBytes}/${folderMeta.totalSize}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 设置进度回调函数
|
||||||
|
*/
|
||||||
|
setProgressCallback(
|
||||||
|
callback: ProgressCallback,
|
||||||
|
peerId: string
|
||||||
|
): void {
|
||||||
|
this.stateManager.updatePeerState(peerId, { progressCallback: callback });
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 触发进度回调
|
||||||
|
*/
|
||||||
|
private triggerProgressCallback(
|
||||||
|
peerId: string,
|
||||||
|
fileId: string,
|
||||||
|
progress: number,
|
||||||
|
speed: number
|
||||||
|
): void {
|
||||||
|
const peerState = this.stateManager.getPeerState(peerId);
|
||||||
|
if (peerState.progressCallback) {
|
||||||
|
try {
|
||||||
|
peerState.progressCallback(fileId, progress, speed);
|
||||||
|
} catch (error) {
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] ❌ Progress callback error - fileId: ${fileId}, error: ${error}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 计算当前传输速度
|
||||||
|
*/
|
||||||
|
getCurrentSpeed(peerId: string): number {
|
||||||
|
return this.speedCalculator.getSendSpeed(peerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 完成文件传输进度(设置为100%)
|
||||||
|
*/
|
||||||
|
completeFileProgress(fileId: string, peerId: string): void {
|
||||||
|
this.triggerProgressCallback(peerId, fileId, 1.0, 0);
|
||||||
|
|
||||||
|
postLogToBackend(`[DEBUG] ✅ File progress completed: ${fileId}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 完成文件夹传输进度(设置为100%)
|
||||||
|
*/
|
||||||
|
completeFolderProgress(folderName: string, peerId: string): void {
|
||||||
|
this.triggerProgressCallback(peerId, folderName, 1.0, 0);
|
||||||
|
|
||||||
|
postLogToBackend(`[DEBUG] ✅ Folder progress completed: ${folderName}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📊 获取详细的进度统计信息
|
||||||
|
*/
|
||||||
|
getProgressStats(peerId: string) {
|
||||||
|
const peerState = this.stateManager.getPeerState(peerId);
|
||||||
|
const currentSpeed = this.getCurrentSpeed(peerId);
|
||||||
|
|
||||||
|
// 计算总的已发送字节数
|
||||||
|
let totalBytesSent = 0;
|
||||||
|
Object.values(peerState.totalBytesSent).forEach(bytes => {
|
||||||
|
totalBytesSent += bytes;
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
peerId,
|
||||||
|
currentSpeed,
|
||||||
|
totalBytesSent,
|
||||||
|
activeTransfers: Object.keys(peerState.totalBytesSent).length,
|
||||||
|
currentFolderName: peerState.currentFolderName,
|
||||||
|
isSending: peerState.isSending,
|
||||||
|
hasProgressCallback: !!peerState.progressCallback,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📊 获取文件夹的详细进度信息
|
||||||
|
*/
|
||||||
|
getFolderProgressDetails(folderName: string, peerId: string) {
|
||||||
|
const folderMeta = this.stateManager.getFolderMeta(folderName);
|
||||||
|
if (!folderMeta) return null;
|
||||||
|
|
||||||
|
const fileProgresses: Record<string, { sent: number; total: number; progress: number }> = {};
|
||||||
|
let totalSent = 0;
|
||||||
|
|
||||||
|
folderMeta.fileIds.forEach(fileId => {
|
||||||
|
const sent = this.stateManager.getFileBytesSent(peerId, fileId);
|
||||||
|
// 注意:这里需要从pendingFiles获取文件大小,暂时使用0
|
||||||
|
const total = 0; // TODO: 需要从StateManager获取文件大小
|
||||||
|
totalSent += sent;
|
||||||
|
|
||||||
|
fileProgresses[fileId] = {
|
||||||
|
sent,
|
||||||
|
total,
|
||||||
|
progress: total > 0 ? sent / total : 0,
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
folderName,
|
||||||
|
totalSize: folderMeta.totalSize,
|
||||||
|
totalSent,
|
||||||
|
overallProgress: folderMeta.totalSize > 0 ? totalSent / folderMeta.totalSize : 0,
|
||||||
|
fileCount: folderMeta.fileIds.length,
|
||||||
|
fileProgresses,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🧹 清理进度跟踪资源
|
||||||
|
*/
|
||||||
|
cleanup(): void {
|
||||||
|
// SpeedCalculator 内部会自动清理过期数据
|
||||||
|
postLogToBackend("[DEBUG] 🧹 ProgressTracker cleaned up");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,283 @@
|
|||||||
|
import { PeerState, CustomFile, FolderMeta } from "@/types/webrtc";
|
||||||
|
import { TransferConfig } from "./TransferConfig";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 网络性能监控指标接口
|
||||||
|
*/
|
||||||
|
export interface NetworkPerformanceMetrics {
|
||||||
|
avgClearingRate: number; // 平均网络清理速度 KB/s
|
||||||
|
optimalThreshold: number; // 动态优化的阈值
|
||||||
|
avgWaitTime: number; // 平均等待时间
|
||||||
|
sampleCount: number; // 样本计数
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 状态管理类
|
||||||
|
* 集中管理所有传输相关的状态数据
|
||||||
|
*/
|
||||||
|
export class StateManager {
|
||||||
|
private peerStates = new Map<string, PeerState>();
|
||||||
|
private pendingFiles = new Map<string, CustomFile>();
|
||||||
|
private pendingFolderMeta: Record<string, FolderMeta> = {};
|
||||||
|
private networkPerformance = new Map<string, NetworkPerformanceMetrics>();
|
||||||
|
|
||||||
|
// ===== Peer状态管理 =====
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取或创建peer状态
|
||||||
|
*/
|
||||||
|
public getPeerState(peerId: string): PeerState {
|
||||||
|
if (!this.peerStates.has(peerId)) {
|
||||||
|
this.peerStates.set(peerId, {
|
||||||
|
isSending: false,
|
||||||
|
bufferQueue: [],
|
||||||
|
readOffset: 0,
|
||||||
|
isReading: false,
|
||||||
|
currentFolderName: "",
|
||||||
|
totalBytesSent: {},
|
||||||
|
progressCallback: null,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return this.peerStates.get(peerId)!;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 更新peer状态
|
||||||
|
*/
|
||||||
|
public updatePeerState(peerId: string, updates: Partial<PeerState>): void {
|
||||||
|
const currentState = this.getPeerState(peerId);
|
||||||
|
Object.assign(currentState, updates);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 重置peer状态(传输完成或出错时)
|
||||||
|
*/
|
||||||
|
public resetPeerState(peerId: string): void {
|
||||||
|
const peerState = this.getPeerState(peerId);
|
||||||
|
peerState.isSending = false;
|
||||||
|
peerState.readOffset = 0;
|
||||||
|
peerState.bufferQueue = [];
|
||||||
|
peerState.isReading = false;
|
||||||
|
// 保留 currentFolderName, totalBytesSent, progressCallback
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除peer状态(peer断开连接时)
|
||||||
|
*/
|
||||||
|
public removePeerState(peerId: string): void {
|
||||||
|
this.peerStates.delete(peerId);
|
||||||
|
this.networkPerformance.delete(peerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== 文件管理 =====
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 添加待发送文件
|
||||||
|
*/
|
||||||
|
public addPendingFile(fileId: string, file: CustomFile): void {
|
||||||
|
this.pendingFiles.set(fileId, file);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取待发送文件
|
||||||
|
*/
|
||||||
|
public getPendingFile(fileId: string): CustomFile | undefined {
|
||||||
|
return this.pendingFiles.get(fileId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除待发送文件
|
||||||
|
*/
|
||||||
|
public removePendingFile(fileId: string): void {
|
||||||
|
this.pendingFiles.delete(fileId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取所有待发送文件
|
||||||
|
*/
|
||||||
|
public getAllPendingFiles(): Map<string, CustomFile> {
|
||||||
|
return new Map(this.pendingFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== 文件夹元数据管理 =====
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 添加或更新文件夹元数据
|
||||||
|
*/
|
||||||
|
public addFileToFolder(folderName: string, fileId: string, fileSize: number): void {
|
||||||
|
if (!this.pendingFolderMeta[folderName]) {
|
||||||
|
this.pendingFolderMeta[folderName] = { totalSize: 0, fileIds: [] };
|
||||||
|
}
|
||||||
|
|
||||||
|
const folderMeta = this.pendingFolderMeta[folderName];
|
||||||
|
if (!folderMeta.fileIds.includes(fileId)) {
|
||||||
|
folderMeta.fileIds.push(fileId);
|
||||||
|
folderMeta.totalSize += fileSize;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取文件夹元数据
|
||||||
|
*/
|
||||||
|
public getFolderMeta(folderName: string): FolderMeta | undefined {
|
||||||
|
return this.pendingFolderMeta[folderName];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取所有文件夹元数据
|
||||||
|
*/
|
||||||
|
public getAllFolderMeta(): Record<string, FolderMeta> {
|
||||||
|
return { ...this.pendingFolderMeta };
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== 网络性能监控管理 =====
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 初始化网络性能监控
|
||||||
|
*/
|
||||||
|
public initializeNetworkPerformance(peerId: string): void {
|
||||||
|
if (!this.networkPerformance.has(peerId)) {
|
||||||
|
this.networkPerformance.set(peerId, {
|
||||||
|
avgClearingRate: TransferConfig.PERFORMANCE_CONFIG.INITIAL_CLEARING_RATE,
|
||||||
|
optimalThreshold: TransferConfig.NETWORK_CONFIG.BUFFER_THRESHOLD,
|
||||||
|
avgWaitTime: TransferConfig.PERFORMANCE_CONFIG.INITIAL_WAIT_TIME,
|
||||||
|
sampleCount: 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 更新网络性能指标
|
||||||
|
*/
|
||||||
|
public updateNetworkPerformance(
|
||||||
|
peerId: string,
|
||||||
|
clearingRate: number,
|
||||||
|
waitTime: number
|
||||||
|
): void {
|
||||||
|
const perf = this.getNetworkPerformance(peerId);
|
||||||
|
if (!perf) return;
|
||||||
|
|
||||||
|
perf.sampleCount++;
|
||||||
|
// 指数移动平均,对新数据给予更高权重
|
||||||
|
const alpha = 0.3;
|
||||||
|
perf.avgClearingRate = perf.avgClearingRate * (1 - alpha) + clearingRate * alpha;
|
||||||
|
perf.avgWaitTime = perf.avgWaitTime * (1 - alpha) + waitTime * alpha;
|
||||||
|
|
||||||
|
// 调整最优阈值
|
||||||
|
this.adjustOptimalThreshold(perf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 从传输速度更新网络性能
|
||||||
|
*/
|
||||||
|
public updateNetworkFromSpeed(peerId: string, currentSpeed: number): void {
|
||||||
|
if (currentSpeed <= 0) return;
|
||||||
|
|
||||||
|
const perf = this.getNetworkPerformance(peerId);
|
||||||
|
if (!perf) return;
|
||||||
|
|
||||||
|
perf.avgClearingRate = currentSpeed;
|
||||||
|
perf.sampleCount++;
|
||||||
|
|
||||||
|
// 每10次速度更新调整一次阈值
|
||||||
|
if (perf.sampleCount % 10 === 0) {
|
||||||
|
this.adjustOptimalThreshold(perf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取网络性能指标
|
||||||
|
*/
|
||||||
|
public getNetworkPerformance(peerId: string): NetworkPerformanceMetrics | undefined {
|
||||||
|
return this.networkPerformance.get(peerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取自适应阈值
|
||||||
|
*/
|
||||||
|
public getAdaptiveThreshold(peerId: string): number {
|
||||||
|
const perf = this.networkPerformance.get(peerId);
|
||||||
|
return perf ? perf.optimalThreshold : TransferConfig.NETWORK_CONFIG.BUFFER_THRESHOLD;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 调整最优阈值(私有方法)
|
||||||
|
*/
|
||||||
|
private adjustOptimalThreshold(perf: NetworkPerformanceMetrics): void {
|
||||||
|
const config = TransferConfig.QUALITY_CONFIG;
|
||||||
|
const bufferThreshold = TransferConfig.NETWORK_CONFIG.BUFFER_THRESHOLD;
|
||||||
|
|
||||||
|
if (perf.avgClearingRate > config.GOOD_NETWORK_SPEED) {
|
||||||
|
// >8MB/s 好网络
|
||||||
|
perf.optimalThreshold = Math.max(bufferThreshold, 6291456); // 6MB
|
||||||
|
} else if (perf.avgClearingRate > config.AVERAGE_NETWORK_SPEED) {
|
||||||
|
// >4MB/s 平均网络
|
||||||
|
perf.optimalThreshold = bufferThreshold; // 3MB
|
||||||
|
} else {
|
||||||
|
// 差网络
|
||||||
|
perf.optimalThreshold = Math.min(bufferThreshold, 1572864); // 1.5MB
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== 进度跟踪相关状态 =====
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 更新文件发送字节数
|
||||||
|
*/
|
||||||
|
public updateFileBytesSent(peerId: string, fileId: string, bytes: number): void {
|
||||||
|
const peerState = this.getPeerState(peerId);
|
||||||
|
if (!peerState.totalBytesSent[fileId]) {
|
||||||
|
peerState.totalBytesSent[fileId] = 0;
|
||||||
|
}
|
||||||
|
peerState.totalBytesSent[fileId] += bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取文件已发送字节数
|
||||||
|
*/
|
||||||
|
public getFileBytesSent(peerId: string, fileId: string): number {
|
||||||
|
const peerState = this.peerStates.get(peerId);
|
||||||
|
return peerState?.totalBytesSent[fileId] || 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 计算文件夹总发送字节数
|
||||||
|
*/
|
||||||
|
public getFolderBytesSent(peerId: string, folderName: string): number {
|
||||||
|
const folderMeta = this.getFolderMeta(folderName);
|
||||||
|
const peerState = this.peerStates.get(peerId);
|
||||||
|
|
||||||
|
if (!folderMeta || !peerState) return 0;
|
||||||
|
|
||||||
|
let totalSent = 0;
|
||||||
|
folderMeta.fileIds.forEach((fileId) => {
|
||||||
|
totalSent += peerState.totalBytesSent[fileId] || 0;
|
||||||
|
});
|
||||||
|
|
||||||
|
return totalSent;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== 清理和重置 =====
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 清理所有状态(系统重置时)
|
||||||
|
*/
|
||||||
|
public cleanup(): void {
|
||||||
|
this.peerStates.clear();
|
||||||
|
this.pendingFiles.clear();
|
||||||
|
this.pendingFolderMeta = {};
|
||||||
|
this.networkPerformance.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取状态统计信息(调试用)
|
||||||
|
*/
|
||||||
|
public getStateStats() {
|
||||||
|
return {
|
||||||
|
peerCount: this.peerStates.size,
|
||||||
|
pendingFileCount: this.pendingFiles.size,
|
||||||
|
folderCount: Object.keys(this.pendingFolderMeta).length,
|
||||||
|
networkPerfCount: this.networkPerformance.size,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,345 @@
|
|||||||
|
import { CustomFile } from "@/types/webrtc";
|
||||||
|
import { TransferConfig } from "./TransferConfig";
|
||||||
|
import { postLogToBackend } from "@/app/config/api";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 网络块信息接口
|
||||||
|
*/
|
||||||
|
export interface NetworkChunk {
|
||||||
|
chunk: ArrayBuffer | null;
|
||||||
|
chunkIndex: number;
|
||||||
|
totalChunks: number;
|
||||||
|
fileOffset: number;
|
||||||
|
isLastChunk: boolean;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🚀 高性能流式文件读取器
|
||||||
|
* 使用双层缓冲架构:大块批量读取 + 小块网络发送
|
||||||
|
* 解决文件读取性能瓶颈问题
|
||||||
|
*/
|
||||||
|
export class StreamingFileReader {
|
||||||
|
// 配置参数
|
||||||
|
private readonly BATCH_SIZE = TransferConfig.FILE_CONFIG.CHUNK_SIZE * TransferConfig.FILE_CONFIG.BATCH_SIZE; // 32MB批次
|
||||||
|
private readonly NETWORK_CHUNK_SIZE = TransferConfig.FILE_CONFIG.NETWORK_CHUNK_SIZE; // 64KB网络块
|
||||||
|
private readonly CHUNKS_PER_BATCH = this.BATCH_SIZE / this.NETWORK_CHUNK_SIZE; // 512块
|
||||||
|
|
||||||
|
// 文件状态
|
||||||
|
private file: File;
|
||||||
|
private fileReader: FileReader;
|
||||||
|
private totalFileSize: number;
|
||||||
|
|
||||||
|
// 批次缓冲状态
|
||||||
|
private currentBatch: ArrayBuffer | null = null; // 当前32MB批次数据
|
||||||
|
private currentBatchStartOffset = 0; // 当前批次在文件中的起始位置
|
||||||
|
private currentChunkIndexInBatch = 0; // 当前网络块在批次中的索引
|
||||||
|
|
||||||
|
// 全局状态
|
||||||
|
private totalFileOffset = 0; // 当前在整个文件中的位置
|
||||||
|
private isFinished = false;
|
||||||
|
private isReading = false; // 防止并发读取
|
||||||
|
|
||||||
|
constructor(file: CustomFile, startOffset: number = 0) {
|
||||||
|
this.file = file;
|
||||||
|
this.totalFileSize = file.size;
|
||||||
|
this.totalFileOffset = startOffset;
|
||||||
|
this.fileReader = new FileReader();
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 📖 StreamingFileReader created - file: ${file.name}, size: ${this.totalFileSize}, startOffset: ${startOffset}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 核心方法:获取下一个64KB网络块
|
||||||
|
*/
|
||||||
|
async getNextNetworkChunk(): Promise<NetworkChunk> {
|
||||||
|
// 1. 检查是否需要加载新批次
|
||||||
|
if (this.needsNewBatch()) {
|
||||||
|
await this.loadNextBatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 检查是否已到文件末尾
|
||||||
|
if (this.isFinished || !this.currentBatch) {
|
||||||
|
return {
|
||||||
|
chunk: null,
|
||||||
|
chunkIndex: this.calculateGlobalChunkIndex(),
|
||||||
|
totalChunks: this.calculateTotalNetworkChunks(),
|
||||||
|
fileOffset: this.totalFileOffset,
|
||||||
|
isLastChunk: true
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 从当前批次中切片出64KB网络块
|
||||||
|
const networkChunk = this.sliceNetworkChunkFromBatch();
|
||||||
|
const globalChunkIndex = this.calculateGlobalChunkIndex();
|
||||||
|
const isLast = this.isLastNetworkChunk(networkChunk);
|
||||||
|
|
||||||
|
// 4. 更新状态
|
||||||
|
this.updateChunkState(networkChunk);
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] ✂️ NETWORK_CHUNK extracted #${globalChunkIndex}/${this.calculateTotalNetworkChunks()} - size: ${networkChunk.byteLength}, isLast: ${isLast}`
|
||||||
|
);
|
||||||
|
|
||||||
|
return {
|
||||||
|
chunk: networkChunk,
|
||||||
|
chunkIndex: globalChunkIndex,
|
||||||
|
totalChunks: this.calculateTotalNetworkChunks(),
|
||||||
|
fileOffset: this.totalFileOffset - networkChunk.byteLength,
|
||||||
|
isLastChunk: isLast
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🔍 判断是否需要加载新批次
|
||||||
|
*/
|
||||||
|
private needsNewBatch(): boolean {
|
||||||
|
return (
|
||||||
|
this.currentBatch === null || // 还未加载任何批次
|
||||||
|
this.currentChunkIndexInBatch >= this.CHUNKS_PER_BATCH || // 当前批次用完
|
||||||
|
this.isCurrentBatchEmpty() // 当前批次已无数据
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🔍 判断当前批次是否为空
|
||||||
|
*/
|
||||||
|
private isCurrentBatchEmpty(): boolean {
|
||||||
|
if (!this.currentBatch) return true;
|
||||||
|
|
||||||
|
const usedBytes = this.currentChunkIndexInBatch * this.NETWORK_CHUNK_SIZE;
|
||||||
|
return usedBytes >= this.currentBatch.byteLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📥 加载下一个32MB批次到内存
|
||||||
|
*/
|
||||||
|
private async loadNextBatch(): Promise<void> {
|
||||||
|
if (this.isReading) {
|
||||||
|
// 防止并发读取
|
||||||
|
while (this.isReading) {
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 10));
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.isReading = true;
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 1. 清理旧批次内存
|
||||||
|
this.currentBatch = null;
|
||||||
|
|
||||||
|
// 2. 计算本次要读取的大小
|
||||||
|
const remainingFileSize = this.totalFileSize - this.totalFileOffset;
|
||||||
|
const batchSize = Math.min(this.BATCH_SIZE, remainingFileSize);
|
||||||
|
|
||||||
|
if (batchSize <= 0) {
|
||||||
|
this.isFinished = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 执行大块文件读取
|
||||||
|
const fileSlice = this.file.slice(
|
||||||
|
this.totalFileOffset,
|
||||||
|
this.totalFileOffset + batchSize
|
||||||
|
);
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 📖 BATCH_READ start - offset: ${this.totalFileOffset}, size: ${batchSize}, remaining: ${remainingFileSize}`
|
||||||
|
);
|
||||||
|
|
||||||
|
// 4. 异步读取文件数据
|
||||||
|
this.currentBatch = await this.readFileSlice(fileSlice);
|
||||||
|
this.currentBatchStartOffset = this.totalFileOffset;
|
||||||
|
this.currentChunkIndexInBatch = 0;
|
||||||
|
|
||||||
|
const expectedNetworkChunks = Math.ceil(this.currentBatch.byteLength / this.NETWORK_CHUNK_SIZE);
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] ✅ BATCH_LOADED - ${this.currentBatch.byteLength} bytes, networkChunks: ${expectedNetworkChunks}`
|
||||||
|
);
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
postLogToBackend(`[DEBUG] ❌ BATCH_READ failed: ${error}`);
|
||||||
|
throw new Error(`Failed to load file batch: ${error}`);
|
||||||
|
} finally {
|
||||||
|
this.isReading = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📄 执行文件读取操作
|
||||||
|
*/
|
||||||
|
private async readFileSlice(fileSlice: Blob): Promise<ArrayBuffer> {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
this.fileReader.onload = () => {
|
||||||
|
const result = this.fileReader.result as ArrayBuffer;
|
||||||
|
if (result) {
|
||||||
|
resolve(result);
|
||||||
|
} else {
|
||||||
|
reject(new Error("FileReader result is null"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
this.fileReader.onerror = () => {
|
||||||
|
reject(new Error(`File reading failed: ${this.fileReader.error?.message || 'Unknown error'}`));
|
||||||
|
};
|
||||||
|
|
||||||
|
this.fileReader.readAsArrayBuffer(fileSlice);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ✂️ 从32MB批次中切片出64KB网络块
|
||||||
|
*/
|
||||||
|
private sliceNetworkChunkFromBatch(): ArrayBuffer {
|
||||||
|
if (!this.currentBatch) {
|
||||||
|
throw new Error("No current batch available for slicing");
|
||||||
|
}
|
||||||
|
|
||||||
|
const chunkStartInBatch = this.currentChunkIndexInBatch * this.NETWORK_CHUNK_SIZE;
|
||||||
|
const remainingInBatch = this.currentBatch.byteLength - chunkStartInBatch;
|
||||||
|
const chunkSize = Math.min(this.NETWORK_CHUNK_SIZE, remainingInBatch);
|
||||||
|
|
||||||
|
if (chunkSize <= 0) {
|
||||||
|
throw new Error("Invalid chunk size calculated");
|
||||||
|
}
|
||||||
|
|
||||||
|
const networkChunk = this.currentBatch.slice(
|
||||||
|
chunkStartInBatch,
|
||||||
|
chunkStartInBatch + chunkSize
|
||||||
|
);
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] ✂️ SLICE_CHUNK batch[${this.currentChunkIndexInBatch}/${Math.ceil(this.currentBatch.byteLength / this.NETWORK_CHUNK_SIZE)}] - size: ${chunkSize}, remaining: ${remainingInBatch - chunkSize}`
|
||||||
|
);
|
||||||
|
|
||||||
|
return networkChunk;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📊 计算全局网络块索引
|
||||||
|
*/
|
||||||
|
private calculateGlobalChunkIndex(): number {
|
||||||
|
const batchesBefore = Math.floor(this.currentBatchStartOffset / this.BATCH_SIZE);
|
||||||
|
const chunksInPreviousBatches = batchesBefore * this.CHUNKS_PER_BATCH;
|
||||||
|
return chunksInPreviousBatches + this.currentChunkIndexInBatch;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📈 计算总网络块数量
|
||||||
|
*/
|
||||||
|
private calculateTotalNetworkChunks(): number {
|
||||||
|
return Math.ceil(this.totalFileSize / this.NETWORK_CHUNK_SIZE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ⏭️ 更新当前处理状态
|
||||||
|
*/
|
||||||
|
private updateChunkState(chunk: ArrayBuffer): void {
|
||||||
|
this.currentChunkIndexInBatch++;
|
||||||
|
this.totalFileOffset += chunk.byteLength;
|
||||||
|
|
||||||
|
// 检查是否到达文件末尾
|
||||||
|
if (this.totalFileOffset >= this.totalFileSize) {
|
||||||
|
this.isFinished = true;
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 🏁 File reading completed - totalOffset: ${this.totalFileOffset}, fileSize: ${this.totalFileSize}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🏁 判断是否为最后一个网络块
|
||||||
|
*/
|
||||||
|
private isLastNetworkChunk(chunk: ArrayBuffer): boolean {
|
||||||
|
return this.totalFileOffset + chunk.byteLength >= this.totalFileSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📊 获取读取进度信息
|
||||||
|
*/
|
||||||
|
public getProgress(): {
|
||||||
|
readBytes: number;
|
||||||
|
totalBytes: number;
|
||||||
|
progressPercent: number;
|
||||||
|
currentBatchInfo?: {
|
||||||
|
batchStartOffset: number;
|
||||||
|
batchSize: number;
|
||||||
|
chunkIndex: number;
|
||||||
|
totalChunks: number;
|
||||||
|
};
|
||||||
|
} {
|
||||||
|
const progressPercent = this.totalFileSize > 0 ? (this.totalFileOffset / this.totalFileSize) * 100 : 0;
|
||||||
|
|
||||||
|
const result = {
|
||||||
|
readBytes: this.totalFileOffset,
|
||||||
|
totalBytes: this.totalFileSize,
|
||||||
|
progressPercent,
|
||||||
|
} as any;
|
||||||
|
|
||||||
|
if (this.currentBatch) {
|
||||||
|
result.currentBatchInfo = {
|
||||||
|
batchStartOffset: this.currentBatchStartOffset,
|
||||||
|
batchSize: this.currentBatch.byteLength,
|
||||||
|
chunkIndex: this.currentChunkIndexInBatch,
|
||||||
|
totalChunks: Math.ceil(this.currentBatch.byteLength / this.NETWORK_CHUNK_SIZE),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🔄 重置读取器状态(用于重新开始读取)
|
||||||
|
*/
|
||||||
|
public reset(startOffset: number = 0): void {
|
||||||
|
this.totalFileOffset = startOffset;
|
||||||
|
this.isFinished = false;
|
||||||
|
this.isReading = false;
|
||||||
|
this.currentBatch = null;
|
||||||
|
this.currentBatchStartOffset = 0;
|
||||||
|
this.currentChunkIndexInBatch = 0;
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 🔄 StreamingFileReader reset - startOffset: ${startOffset}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🧹 清理和释放资源
|
||||||
|
*/
|
||||||
|
public cleanup(): void {
|
||||||
|
// 中断正在进行的文件读取
|
||||||
|
if (this.isReading) {
|
||||||
|
this.fileReader.abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 清理内存
|
||||||
|
this.currentBatch = null;
|
||||||
|
this.isFinished = true;
|
||||||
|
this.isReading = false;
|
||||||
|
|
||||||
|
postLogToBackend(
|
||||||
|
`[DEBUG] 🧹 StreamingFileReader cleaned up - file: ${this.file.name}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🔍 获取调试信息
|
||||||
|
*/
|
||||||
|
public getDebugInfo() {
|
||||||
|
return {
|
||||||
|
fileName: this.file.name,
|
||||||
|
fileSize: this.totalFileSize,
|
||||||
|
currentOffset: this.totalFileOffset,
|
||||||
|
isFinished: this.isFinished,
|
||||||
|
isReading: this.isReading,
|
||||||
|
hasBatch: !!this.currentBatch,
|
||||||
|
batchOffset: this.currentBatchStartOffset,
|
||||||
|
chunkInBatch: this.currentChunkIndexInBatch,
|
||||||
|
globalChunkIndex: this.calculateGlobalChunkIndex(),
|
||||||
|
totalChunks: this.calculateTotalNetworkChunks(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,93 @@
|
|||||||
|
/**
|
||||||
|
* 🚀 传输配置管理类
|
||||||
|
* 集中管理所有文件传输相关的配置参数
|
||||||
|
*/
|
||||||
|
export class TransferConfig {
|
||||||
|
// 文件I/O相关配置
|
||||||
|
static readonly FILE_CONFIG = {
|
||||||
|
CHUNK_SIZE: 4194304, // 4MB - 文件读取块大小,减少FileReader调用次数
|
||||||
|
BATCH_SIZE: 8, // 8个chunk批处理 - 32MB批处理提升性能
|
||||||
|
NETWORK_CHUNK_SIZE: 65536, // 64KB - WebRTC安全发送大小,修复sendData failed
|
||||||
|
} as const;
|
||||||
|
|
||||||
|
// 网络传输相关配置
|
||||||
|
static readonly NETWORK_CONFIG = {
|
||||||
|
BUFFER_THRESHOLD: 3145728, // 3MB - 背压阈值
|
||||||
|
BACKPRESSURE_TIMEOUT: 2000, // 2秒超时 - 为大chunk处理预留更多时间
|
||||||
|
} as const;
|
||||||
|
|
||||||
|
// 性能调优相关配置
|
||||||
|
static readonly PERFORMANCE_CONFIG = {
|
||||||
|
MIN_THRESHOLD: 262144, // 256KB - 最小阈值
|
||||||
|
MAX_THRESHOLD: 16777216, // 16MB - 最大阈值
|
||||||
|
ADJUSTMENT_FACTOR: 0.1, // 调整系数
|
||||||
|
ADAPTIVE_SAMPLES: 5, // 自适应采样数
|
||||||
|
INITIAL_CLEARING_RATE: 5000, // 5MB/s 初始预估
|
||||||
|
INITIAL_WAIT_TIME: 50, // 50ms 初始预估
|
||||||
|
} as const;
|
||||||
|
|
||||||
|
// 智能发送控制策略配置
|
||||||
|
static readonly SEND_STRATEGY_CONFIG = {
|
||||||
|
AGGRESSIVE_THRESHOLD: 0.3, // 积极模式:30%以下
|
||||||
|
NORMAL_THRESHOLD: 0.6, // 正常模式:60%以下
|
||||||
|
CAUTIOUS_THRESHOLD: 0.9, // 谨慎模式:90%以下
|
||||||
|
POLLING_INTERVAL: 5, // 轮询间隔(ms)
|
||||||
|
MAX_WAIT_TIME: 3000, // 最大等待时间(ms)
|
||||||
|
} as const;
|
||||||
|
|
||||||
|
// 网络质量评估配置
|
||||||
|
static readonly QUALITY_CONFIG = {
|
||||||
|
GOOD_NETWORK_SPEED: 8000, // 8MB/s 以上为好网络
|
||||||
|
AVERAGE_NETWORK_SPEED: 4000, // 4MB/s 以上为平均网络
|
||||||
|
GOOD_NETWORK_THRESHOLDS: {
|
||||||
|
aggressive: 0.4, // 好网络:40%以下积极发送
|
||||||
|
normal: 0.7, // 好网络:70%以下正常发送
|
||||||
|
cautious: 0.9, // 好网络:90%以下谨慎发送
|
||||||
|
},
|
||||||
|
POOR_NETWORK_THRESHOLDS: {
|
||||||
|
aggressive: 0.2, // 差网络:20%以下积极发送
|
||||||
|
normal: 0.5, // 差网络:50%以下正常发送
|
||||||
|
cautious: 0.8, // 差网络:80%以上等待
|
||||||
|
},
|
||||||
|
} as const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取适应性阈值计算参数
|
||||||
|
*/
|
||||||
|
static getAdaptiveThresholds(networkSpeed: number) {
|
||||||
|
if (networkSpeed > this.QUALITY_CONFIG.GOOD_NETWORK_SPEED) {
|
||||||
|
// 好网络:使用较高阈值
|
||||||
|
return {
|
||||||
|
threshold: Math.max(this.NETWORK_CONFIG.BUFFER_THRESHOLD, 6291456), // 6MB
|
||||||
|
strategy: this.QUALITY_CONFIG.GOOD_NETWORK_THRESHOLDS,
|
||||||
|
};
|
||||||
|
} else if (networkSpeed > this.QUALITY_CONFIG.AVERAGE_NETWORK_SPEED) {
|
||||||
|
// 平均网络:使用默认阈值
|
||||||
|
return {
|
||||||
|
threshold: this.NETWORK_CONFIG.BUFFER_THRESHOLD, // 3MB
|
||||||
|
strategy: {
|
||||||
|
aggressive: this.SEND_STRATEGY_CONFIG.AGGRESSIVE_THRESHOLD,
|
||||||
|
normal: this.SEND_STRATEGY_CONFIG.NORMAL_THRESHOLD,
|
||||||
|
cautious: this.SEND_STRATEGY_CONFIG.CAUTIOUS_THRESHOLD,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
// 差网络:使用较低阈值
|
||||||
|
return {
|
||||||
|
threshold: Math.min(this.NETWORK_CONFIG.BUFFER_THRESHOLD, 1572864), // 1.5MB
|
||||||
|
strategy: this.QUALITY_CONFIG.POOR_NETWORK_THRESHOLDS,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 验证配置的合理性
|
||||||
|
*/
|
||||||
|
static validateConfig(): boolean {
|
||||||
|
return (
|
||||||
|
this.FILE_CONFIG.NETWORK_CHUNK_SIZE < this.FILE_CONFIG.CHUNK_SIZE &&
|
||||||
|
this.NETWORK_CONFIG.BUFFER_THRESHOLD > this.FILE_CONFIG.NETWORK_CHUNK_SIZE &&
|
||||||
|
this.PERFORMANCE_CONFIG.MIN_THRESHOLD < this.PERFORMANCE_CONFIG.MAX_THRESHOLD
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,52 @@
|
|||||||
|
/**
|
||||||
|
* 🚀 文件传输模块统一导出
|
||||||
|
* 提供模块化的文件传输服务
|
||||||
|
*/
|
||||||
|
|
||||||
|
// 配置管理
|
||||||
|
export { TransferConfig } from "./TransferConfig";
|
||||||
|
|
||||||
|
// 状态管理
|
||||||
|
export { StateManager } from "./StateManager";
|
||||||
|
export type { NetworkPerformanceMetrics } from "./StateManager";
|
||||||
|
|
||||||
|
// 高性能文件读取
|
||||||
|
export { StreamingFileReader } from "./StreamingFileReader";
|
||||||
|
export type { NetworkChunk } from "./StreamingFileReader";
|
||||||
|
|
||||||
|
// 网络传输
|
||||||
|
export { NetworkTransmitter } from "./NetworkTransmitter";
|
||||||
|
|
||||||
|
// 消息处理
|
||||||
|
export { MessageHandler } from "./MessageHandler";
|
||||||
|
export type { MessageHandlerDelegate } from "./MessageHandler";
|
||||||
|
|
||||||
|
// 进度跟踪
|
||||||
|
export { ProgressTracker } from "./ProgressTracker";
|
||||||
|
export type { ProgressCallback } from "./ProgressTracker";
|
||||||
|
|
||||||
|
// 主编排器
|
||||||
|
export { FileTransferOrchestrator } from "./FileTransferOrchestrator";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🎯 便捷创建函数 - 快速初始化文件传输服务
|
||||||
|
*/
|
||||||
|
import WebRTC_Initiator from "../webrtc_Initiator";
|
||||||
|
import { FileTransferOrchestrator } from "./FileTransferOrchestrator";
|
||||||
|
import { TransferConfig } from "./TransferConfig";
|
||||||
|
|
||||||
|
export function createFileTransferService(webrtcConnection: WebRTC_Initiator): FileTransferOrchestrator {
|
||||||
|
return new FileTransferOrchestrator(webrtcConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 📋 版本信息
|
||||||
|
*/
|
||||||
|
export const TRANSFER_MODULE_VERSION = "1.0.0";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 🔍 模块验证 - 确保所有配置都是有效的
|
||||||
|
*/
|
||||||
|
export function validateTransferModule(): boolean {
|
||||||
|
return TransferConfig.validateConfig();
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user