记一次大文件上传
背景介绍
为了实现大模型分析视频的功能,需要将视频文件上传到服务器,然后调用大模型的API进行分析。
功能点概要
支持文件在后台上传
支持文件断点续传
页面多 tab 情况下。手动关闭/刷新浏览器后上传任务会转交到其他tab页面
支持用户在我的上传中查看上传任务的状态。重试上传失败的任务。
具体实现
v1 不考虑断点续传,只实现简单的分片上传。
v2 考虑断点续传,实现分片上传。
v3 考虑用户在我的上传中查看上传任务的状态。重试上传失败的任务。
功能点
V1
V2
V3
计算文件 md5 (计算方式)
✖(无)
✅
✅
分片上传
✅
✅
✅
后台上传
✖
✅
✅
断点续传
✖
✅
✅
V1 实现简单的分片上传
在这里我们实现一个简单的分片上传,每个分片的大小为CHUNK_SIZE。uploadId作为文件的唯一标识,每个分片上传时需要携带该标识用于服务端识别文件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 实现分片
function chunkFile ( uploadFile ) {
const chunks = Math . ceil ( uploadFile . size / CHUNK_SIZE ) // 分片数量
const chunkList = []
let currentChunk = 0
while ( currentChunk < chunks ) {
const start = currentChunk * CHUNK_SIZE
const end = Math . min ( start + CHUNK_SIZE , uploadFile . size )
const chunk = uploadFile . slice ( start , end )
chunkList . push ( chunk )
currentChunk ++
}
return chunkList
}
// 实现上传
function uploadChunks ( chunkList ) {
function uploadChunk ( chunk , index ) {
const uploadConfig = {
Headers : {
'Content-Type' : 'multipart/form-data' ,
},
onUploadProgress : ( progressEvent ) => {
const progress = Math . round (( progressEvent . loaded * 100 ) / progressEvent . total )
console . log ( `上传进度: ${ progress } %` )
},
}
const formData = new FormData ()
formData . append ( 'uploadId' , uploadId ) // 上传任务的唯一标识
formData . append ( 'file' , chunk )
formData . append ( 'chunkIndex' , index )
formData . append ( 'totalChunks' , chunkList . length )
return ApiAxios . post ( '/upload' , formData , uploadConfig )
}
chunkList . forEach (( chunk , index ) => {
uploadChunk ( chunk , index )
})
}
可以注意到,在上传时,是对每个分片直接调用了uploadChunk函数。这里会直接发起大量请求,对服务器压力较大。对此,需要再加上一个上传队列 的控制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 上传队列
class UploadQueue {
constructor ( maxConcurrent = 3 ) {
this . queue = []
this . uploading = 0
this . maxConcurrent = maxConcurrent
}
add ( task ) {
this . queue . push ( task )
this . next ()
}
next () {
if ( this . uploading >= this . maxConcurrent ) return
if ( this . queue . length === 0 ) return
const task = this . queue . shift ()
this . uploading ++
task (). finally (() => {
this . uploading --
this . next ()
})
}
}
上面的代码实现了一个简单的上传队列。在上传时,会先判断当前正在上传的任务数量是否超过了最大并发数。如果没有超过,就会从队列中取出一个任务并执行。任务执行完成后,会调用next方法继续执行下一个任务。
在实际使用时,可以结合需要增加错误重试 、进度管理 等功能。
V2 增加断点续传
在V1版本中,我们没有详细展开uploadId的来源,作为文件的唯一标识,在断点续传中,需要通过该ID来判断文件是否已经上传过以及上传的进度。
flowchart LR
A[客户端] -->|1. 计算md5值作为hash值| B[服务端]
B -->|2. 校验文件是否存在| C[判断文件是否存在]
C -->|3. 文件不存在| D[生成uploadId]
C -->|4. 文件存在| E[获取文件上传进度]
E -->|5. 比较进度与分片数量| F[返回上传进度]
F -->| currentChunk| A
D -->| uploadId | A
flowchart LR
A[客户端] -->|1. 计算md5值作为hash值| B[服务端]
B -->|2. 校验文件是否存在| C[判断文件是否存在]
C -->|3. 文件不存在| D[生成uploadId]
C -->|4. 文件存在| E[获取文件上传进度]
E -->|5. 比较进度与分片数量| F[返回上传进度]
F -->| currentChunk| A
D -->| uploadId | A
flowchart LR
A[客户端] -->|1. 计算md5值作为hash值| B[服务端]
B -->|2. 校验文件是否存在| C[判断文件是否存在]
C -->|3. 文件不存在| D[生成uploadId]
C -->|4. 文件存在| E[获取文件上传进度]
E -->|5. 比较进度与分片数量| F[返回上传进度]
F -->| currentChunk| A
D -->| uploadId | A
flowchart LR
A[客户端] -->|1. 计算md5值作为hash值| B[服务端]
B -->|2. 校验文件是否存在| C[判断文件是否存在]
C -->|3. 文件不存在| D[生成uploadId]
C -->|4. 文件存在| E[获取文件上传进度]
E -->|5. 比较进度与分片数量| F[返回上传进度]
F -->| currentChunk| A
D -->| uploadId | A
服务端至少需要提供三个接口
/init: 用于初始化上传任务,返回uploadId
/upload: 用于上传分片
/merge: 用于合并分片
在上传之前,客户端首先需要计算文件的md5值,对此可以借助SparkMD5库。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const chunkSize = 5 * 1024 * 1024 ; // 5MB
const spark = new SparkMD5 . ArrayBuffer ();
let currentChunk = 0 ;
const chunks = Math . ceil ( file . size / chunkSize );
function loadNext () {
const start = currentChunk * chunkSize ;
const end = Math . min ( file . size , start + chunkSize );
const reader = new FileReader ();
reader . readAsArrayBuffer ( file . slice ( start , end ));
reader . onload = ( e ) => {
spark . append ( e . target . result );
currentChunk ++ ;
if ( currentChunk < chunks ) {
loadNext ();
} else {
const md5 = spark . end ();
console . log ( md5 );
}
};
}
loadNext ();
借助web worker 计算md5值
上面的示例在实际生产环境中,很容易阻塞主线程,导致页面卡顿。
主线程需要避免阻塞,那么我们可以借助Web Worker,将沉重的计算丢给后台线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import SparkMD5 from 'spark-md5'
let calcFile = []
self . onmessage = ( e ) => {
const { uploadFile : file , uid , chunkSize : maxChunkSize , type } = e . data
if ( type === 'CALC' ) {
const chunks = Math . ceil ( file . size / maxChunkSize )
const meanChunks = Math . ceil ( file . size / chunks )
const chunkList = [] // 存放所有的文件切片
let currentChunk = 0
const spark = new SparkMD5 . ArrayBuffer ()
let end = 0
calcFile . push ( uid )
const loadNextChunk = async () => {
while ( end < file . size ) {
console . log ( end < file . size )
if ( ! calcFile . includes ( uid )) {
return { calc : 'error' }
}
const start = currentChunk * meanChunks
end = meanChunks * ( currentChunk + 1 )
const blob = file . slice ( start , end )
chunkList . push ( blob )
const buffer = await blob . arrayBuffer ()
spark . append ( buffer )
currentChunk ++
}
if ( ! calcFile . includes ( uid )) {
return { calc : 'error' }
}
return {
md5 : spark . end (), // 返回最终的 MD5 值
calc : 'success' ,
}
}
loadNextChunk ()
. then (({ md5 , calc }) => {
self . postMessage ({ md5 , chunkList , calc , fileId : uid }) // worker中计算md5成功的通知
})
. catch (( error ) => {
console . log ( 'md5_catch' , error )
})
}
}
完整的断点续传流程
结合上述,我们可以实现一个完整的断点续传流程。
以下是具体实现代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// 核心流程
async function uploadFileWithResume ({
file ,
fileHash ,
chunkSize = 5 * 1024 * 1024 ,
concurrency = 3 ,
}) {
// 1. 初始化上传,获取断点信息
const { data : initData } = await ApiAxios . post ( '/init' , {
fileHash ,
fileName : file . name ,
fileSize : file . size ,
})
const { uploadId , uploadedChunkIndexes , shouldUpload } = initData
// 如果服务端返回 shouldUpload: false,说明文件秒传成功
if ( ! shouldUpload ) {
return { uploadId , status : 'fast_upload' }
}
// 2. 准备分片
const chunks = Math . ceil ( file . size / chunkSize )
const uploadedSet = new Set ( uploadedChunkIndexes || [])
const pendingTasks = []
for ( let i = 0 ; i < chunks ; i ++ ) {
// 过滤已上传的分片
if ( uploadedSet . has ( i )) continue
const start = i * chunkSize
const end = Math . min ( start + chunkSize , file . size )
const chunkBlob = file . slice ( start , end )
// 构建上传任务
const task = async () => {
const formData = new FormData ()
formData . append ( 'uploadId' , uploadId )
formData . append ( 'fileHash' , fileHash )
formData . append ( 'chunkIndex' , i )
formData . append ( 'file' , chunkBlob )
// 包装重试逻辑
await withRetry (() => ApiAxios . post ( '/upload' , formData ), {
retries : 3 ,
})
}
pendingTasks . push ( task )
}
// 3. 并发上传缺失的分片
await runWithConcurrency ( pendingTasks , concurrency )
// 4. 发送合并请求
await ApiAxios . post ( '/merge' , {
uploadId ,
fileHash ,
totalChunks : chunks ,
fileName : file . name ,
fileSize : file . size ,
chunkSize ,
})
return { uploadId , status : 'success' }
}
后台上传
当用户开始上传文件时,很难有耐心等待上传完成,可能会切换页面,关闭tab,关闭浏览器等。
需要明确的是,我们无法实现真正的完全的后台上传,但是可以在交互体验上做出类似的优化。
遇到关闭tab、刷新/关闭浏览器,可以提示用户文件上传可能会中断,是否确认继续上传?,或者将上传任务交由其他tab继续执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 监听任务的转交行为,A页刷新后 将A页的任务传输给其他 tab
onMessage ( 'transferTask' , function ( data , source ) {
if ( data . withTo === tabIds [ 0 ] && data . task ? . length ) {
// 将任务排队到自己这里
data . task . forEach ( task => {
addTask ({ ... task , uploadStatus : 'PAUSE' })
})
}
setTabTask ( tabTask => {
tabTask [ source . from ] = []
return { ... tabTask }
})
// 删除所有的行为事件
})
V3 优化
上述实现了一个简单的后台上传优化,但是还有一些问题需要解决。
md5虽然交由web worker计算,但是遇到20G的文件时,计算时间还是过久,用户需要等待很久。对此提取首片,或者按特定的规则提取分片来计算md5,减少计算量
网络离线时,仍然会持续上传,发出大量不必要的请求。对此,需要增加网络离线时的判断,中断并提示离线,网络恢复后继续上传。