记一次大文件上传

记一次大文件上传

背景介绍

为了实现大模型分析视频的功能,需要将视频文件上传到服务器,然后调用大模型的API进行分析。

功能点概要

  1. 支持文件在后台上传

  2. 支持文件断点续传

  3. 页面多 tab 情况下。手动关闭/刷新浏览器后上传任务会转交到其他tab页面

  4. 支持用户在我的上传中查看上传任务的状态。重试上传失败的任务。

具体实现

v1 不考虑断点续传,只实现简单的分片上传。

v2 考虑断点续传,实现分片上传。

v3 考虑用户在我的上传中查看上传任务的状态。重试上传失败的任务。

功能点 V1 V2 V3
计算文件 md5 (计算方式) ✖(无)
分片上传
后台上传
断点续传

V1 实现简单的分片上传

在这里我们实现一个简单的分片上传,每个分片的大小为CHUNK_SIZE。uploadId作为文件的唯一标识,每个分片上传时需要携带该标识用于服务端识别文件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 实现分片
function chunkFile(uploadFile) {
  const chunks = Math.ceil(uploadFile.size / CHUNK_SIZE) // 分片数量
  const chunkList = []

  let currentChunk = 0

  while (currentChunk < chunks) {
    const start = currentChunk * CHUNK_SIZE
    const end = Math.min(start + CHUNK_SIZE, uploadFile.size)
    const chunk = uploadFile.slice(start, end)
    chunkList.push(chunk)
    currentChunk++
  }

  return chunkList
}

// 实现上传
function uploadChunks(chunkList) {
  function uploadChunk(chunk, index) {
    const uploadConfig = {
      Headers: {
        'Content-Type': 'multipart/form-data',
      },
      onUploadProgress: (progressEvent) => {
        const progress = Math.round((progressEvent.loaded * 100) / progressEvent.total)
        console.log(`上传进度: ${progress}%`)
      },
    }
    const formData = new FormData()
    formData.append('uploadId', uploadId) // 上传任务的唯一标识
    formData.append('file', chunk)
    formData.append('chunkIndex', index)
    formData.append('totalChunks', chunkList.length)

    return ApiAxios.post('/upload', formData, uploadConfig)
  }

  chunkList.forEach((chunk, index) => {
    uploadChunk(chunk, index)
  })
}

可以注意到,在上传时,是对每个分片直接调用了uploadChunk函数。这里会直接发起大量请求,对服务器压力较大。对此,需要再加上一个上传队列的控制。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 上传队列
class UploadQueue {
  constructor(maxConcurrent = 3) {
    this.queue = []
    this.uploading = 0
    this.maxConcurrent = maxConcurrent
  }

  add(task) {
    this.queue.push(task)
    this.next()
  }

  next() {
    if (this.uploading >= this.maxConcurrent) return
    if (this.queue.length === 0) return
    const task = this.queue.shift()
    this.uploading++

    task().finally(() => {
      this.uploading--
      this.next()
    })
  }
}

上面的代码实现了一个简单的上传队列。在上传时,会先判断当前正在上传的任务数量是否超过了最大并发数。如果没有超过,就会从队列中取出一个任务并执行。任务执行完成后,会调用next方法继续执行下一个任务。
在实际使用时,可以结合需要增加错误重试进度管理等功能。

V2 增加断点续传

在V1版本中,我们没有详细展开uploadId的来源,作为文件的唯一标识,在断点续传中,需要通过该ID来判断文件是否已经上传过以及上传的进度。

   flowchart LR
    A[客户端] -->|1. 计算md5值作为hash值| B[服务端]
    B -->|2. 校验文件是否存在| C[判断文件是否存在]
    C -->|3. 文件不存在| D[生成uploadId]
    C -->|4. 文件存在| E[获取文件上传进度]
    E -->|5. 比较进度与分片数量| F[返回上传进度]
    F -->| currentChunk| A
    D -->| uploadId | A 
   flowchart LR
    A[客户端] -->|1. 计算md5值作为hash值| B[服务端]
    B -->|2. 校验文件是否存在| C[判断文件是否存在]
    C -->|3. 文件不存在| D[生成uploadId]
    C -->|4. 文件存在| E[获取文件上传进度]
    E -->|5. 比较进度与分片数量| F[返回上传进度]
    F -->| currentChunk| A
    D -->| uploadId | A 
   flowchart LR
    A[客户端] -->|1. 计算md5值作为hash值| B[服务端]
    B -->|2. 校验文件是否存在| C[判断文件是否存在]
    C -->|3. 文件不存在| D[生成uploadId]
    C -->|4. 文件存在| E[获取文件上传进度]
    E -->|5. 比较进度与分片数量| F[返回上传进度]
    F -->| currentChunk| A
    D -->| uploadId | A 

服务端至少需要提供三个接口
/init: 用于初始化上传任务,返回uploadId
/upload: 用于上传分片
/merge: 用于合并分片

在上传之前,客户端首先需要计算文件的md5值,对此可以借助SparkMD5库。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const chunkSize = 5 * 1024 * 1024; // 5MB
const spark = new SparkMD5.ArrayBuffer();

let currentChunk = 0;
const chunks = Math.ceil(file.size / chunkSize);

function loadNext() {
  const start = currentChunk * chunkSize;
  const end = Math.min(file.size, start + chunkSize);

  const reader = new FileReader();
  reader.readAsArrayBuffer(file.slice(start, end));

  reader.onload = (e) => {
    spark.append(e.target.result);
    currentChunk++;

    if (currentChunk < chunks) {
      loadNext();
    } else {
      const md5 = spark.end();
      console.log(md5);
    }
  };
}

loadNext();

借助web worker 计算md5值

上面的示例在实际生产环境中,很容易阻塞主线程,导致页面卡顿。
主线程需要避免阻塞,那么我们可以借助Web Worker,将沉重的计算丢给后台线程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import SparkMD5 from 'spark-md5'

let calcFile = []

self.onmessage = (e) => {
  const { uploadFile: file, uid, chunkSize: maxChunkSize, type } = e.data
  if (type === 'CALC') {
    const chunks = Math.ceil(file.size / maxChunkSize)
    const meanChunks = Math.ceil(file.size / chunks)
    const chunkList = [] // 存放所有的文件切片
    let currentChunk = 0
    const spark = new SparkMD5.ArrayBuffer()
    let end = 0
    calcFile.push(uid)

    const loadNextChunk = async () => {
      while (end < file.size) {
        console.log(end < file.size)
        if (!calcFile.includes(uid)) {
          return { calc: 'error' }
        }
        const start = currentChunk * meanChunks
        end = meanChunks * (currentChunk + 1)
        const blob = file.slice(start, end)
        chunkList.push(blob)
        const buffer = await blob.arrayBuffer()
        spark.append(buffer)
        currentChunk++
      }

      if (!calcFile.includes(uid)) {
        return { calc: 'error' }
      }

      return {
        md5: spark.end(), // 返回最终的 MD5 值
        calc: 'success',
      }
    }

    loadNextChunk()
      .then(({ md5, calc }) => {
        self.postMessage({ md5, chunkList, calc, fileId: uid }) // worker中计算md5成功的通知
      })
      .catch((error) => {
        console.log('md5_catch', error)
      })
  }
}

完整的断点续传流程

结合上述,我们可以实现一个完整的断点续传流程。 以下是具体实现代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// 核心流程
async function uploadFileWithResume({
  file,
  fileHash,
  chunkSize = 5 * 1024 * 1024,
  concurrency = 3,
}) {
  // 1. 初始化上传,获取断点信息
  const { data: initData } = await ApiAxios.post('/init', {
    fileHash,
    fileName: file.name,
    fileSize: file.size,
  })

  const { uploadId, uploadedChunkIndexes, shouldUpload } = initData

  // 如果服务端返回 shouldUpload: false,说明文件秒传成功
  if (!shouldUpload) {
    return { uploadId, status: 'fast_upload' }
  }

  // 2. 准备分片
  const chunks = Math.ceil(file.size / chunkSize)
  const uploadedSet = new Set(uploadedChunkIndexes || [])
  const pendingTasks = []

  for (let i = 0; i < chunks; i++) {
    // 过滤已上传的分片
    if (uploadedSet.has(i)) continue

    const start = i * chunkSize
    const end = Math.min(start + chunkSize, file.size)
    const chunkBlob = file.slice(start, end)

    // 构建上传任务
    const task = async () => {
      const formData = new FormData()
      formData.append('uploadId', uploadId)
      formData.append('fileHash', fileHash)
      formData.append('chunkIndex', i)
      formData.append('file', chunkBlob)

      // 包装重试逻辑
      await withRetry(() => ApiAxios.post('/upload', formData), {
        retries: 3,
      })
    }

    pendingTasks.push(task)
  }

  // 3. 并发上传缺失的分片
  await runWithConcurrency(pendingTasks, concurrency)

  // 4. 发送合并请求
  await ApiAxios.post('/merge', {
    uploadId,
    fileHash,
    totalChunks: chunks,
    fileName: file.name,
    fileSize: file.size,
    chunkSize,
  })

  return { uploadId, status: 'success' }
}

后台上传

当用户开始上传文件时,很难有耐心等待上传完成,可能会切换页面,关闭tab,关闭浏览器等。
需要明确的是,我们无法实现真正的完全的后台上传,但是可以在交互体验上做出类似的优化。 遇到关闭tab、刷新/关闭浏览器,可以提示用户文件上传可能会中断,是否确认继续上传?,或者将上传任务交由其他tab继续执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 监听任务的转交行为,A页刷新后 将A页的任务传输给其他 tab
onMessage('transferTask', function (data, source) {
  if (data.withTo === tabIds[0] && data.task?.length) {
    // 将任务排队到自己这里
    data.task.forEach(task => {
      addTask({ ...task, uploadStatus: 'PAUSE' })
    })
  }
  setTabTask(tabTask => {
    tabTask[source.from] = []
    return { ...tabTask }
  })
  // 删除所有的行为事件
})

V3 优化

上述实现了一个简单的后台上传优化,但是还有一些问题需要解决。

  1. md5虽然交由web worker计算,但是遇到20G的文件时,计算时间还是过久,用户需要等待很久。对此提取首片,或者按特定的规则提取分片来计算md5,减少计算量
  2. 网络离线时,仍然会持续上传,发出大量不必要的请求。对此,需要增加网络离线时的判断,中断并提示离线,网络恢复后继续上传。