Skip to content

大文件上传

前言

  • 商户在录入信息时如果上传视频过大,在弱网或者网络波动比较大的时候,会导致上传中断,又需要从头开始上传用户体验非常糟糕
  • 为此需要设计一套能够分片上传机制,确保系统健壮性。

实现思路

  • 1、首先拿到用户上传的文件后,通过FileReader将文件转换成ArrayBuffer,在通过spark-md5.js计算出文件hash值,这个计算比较耗时,我们放在worker线程中去做,完成后通过postMessage消息告诉主线程
  • 2、拿到文件的hash值之后,发送文件hash到服务器,如果文件存在直接跳过后续步骤,实现秒传
  • 2、如果服务器上文件不存在,服务端通过文件hash去临时目录中查找分片的信息,并且返回当前分片已经上传的字节和分片的名称
  • 3、前端此时会把一个大的文件通过slice进行分片,通过文件hash+切片的位置生成一个文件名称(存储到队列对象中)
  • 4、将服务器返回的分片信息,和本地的分片信息进行合并,计算出分片已经上传的字节
  • 5、将每一个分片构建成一个Request对象,并且记录请求取消的cancelToken,通过Promise.all 并发上传
  • 6、当所有的分片上传完成后,调用服务器,把临时目录的分片进行合并成一个完成的文件,并且删除临时目录的分片
js
import { Toast, Button, Space, ProgressBar, Card } from 'antd-mobile'
import axios, { CancelTokenSource } from 'axios'
import { ChangeEvent, MutableRefObject, useRef, useState } from 'react'
// 上传状态
enum UploadStatus {
  INIT, // 默认
  UPLOADING, // 正在上传
  PAUSE, // 暂停
  DONE // 完成
}

// 切片信息
type Chunk = {
  chunk: Blob // 切片后的Blob
  size: number, // 切片的大小
  filename?: string, // 切片分组 1427e69f138d5fac5bdaed5769c5a858.mp4
  chunkName?: string, // 切片的名称 1427e69f138d5fac5bdaed5769c5a858.mp4-0 
  loaded?: number, // 已经上传的字节
  percent?: number // 上传进度
}

// 已上传切片
interface Uploaded {
  chunkName: string;
  size: number
}

// 每块切片到大小10M
const DEFAULT_SIZE = 1024 * 1024 * 10;

function App () {
  const inputEl: MutableRefObject<HTMLInputElement | null> = useRef(null)
  const [uploadState, setUploadState] = useState(UploadStatus.INIT) // 上传状态
  const [currentFile, setCurrentFile] = useState<File>(); // 原始文件
  const [filename, setFilename] = useState<string>(''); // 预览
  const [hashPercent, setHashPercent] = useState<number>(0); // hash计算进度
  const [chunkList, setChunkList] = useState<Chunk[]>([]); // 切片
  const [cancelToken, setCancelToken] = useState<CancelTokenSource[]>([]) // 取消请求
  const totalPercent = chunkList.length > 0 ? chunkList.reduce(
    (l: number, r: Chunk) => l + r.percent!, 0) / (chunkList.length * 100) * 100
    : 0
  // 选择文件
  const handleChange = (event: ChangeEvent<HTMLInputElement>) => {
    if (event.target.files) {
      const check = checkSize(event.target.files[0])
      if (check) {
        setCurrentFile(event.target.files[0])
      } else if (inputEl.current) {
        inputEl.current.value = ''
      }
    }
  }

  // 校验文件小大
  const checkSize = (file: File) => {
    const isLessThan3G = file.size < 1024 * 1024 * 3000;
    if (!isLessThan3G) {
      Toast.show('上传的文件不能大于3G')
      return false
    }
    return true
  }

  // 上传
  const upload = async () => {
    if (uploadState !== UploadStatus.INIT) {
      return Toast.show('正在上传请耐心等待')
    }
    if (!currentFile) {
      return Toast.show('请选择文件')
    }
    setUploadState(UploadStatus.UPLOADING)
    // 切片
    console.time('切片耗时');
    const { chunks, chunksHash } = createChunks(currentFile);
    console.timeEnd('切片耗时');
    // 计算文件hash值
    console.time('hash耗时');
    const fileHash = await calculateHash(chunksHash);
    console.timeEnd('hash耗时');
    const lastIndex = currentFile.name.lastIndexOf('.')
    // 获取文件后缀
    const extName = currentFile.name.slice(lastIndex) // .mp4
    const filename = `${fileHash}${extName}`;// hash.mp4
    setFilename(filename)
    chunks.forEach((item: Chunk, index) => {
      item.filename = filename;
      item.chunkName = `${filename}-${index}`;
      item.loaded = 0;
      item.percent = 0;
    });
    setChunkList(chunks);
    await uploadParts(chunks, filename);
  }

  // 暂停
  const pause = () => {
    setUploadState(UploadStatus.PAUSE);
    cancelToken.forEach(_ => _.cancel())
  }
  // 继续
  async function handleResume () {
    setUploadState(UploadStatus.UPLOADING);
    await uploadParts(chunkList, filename);
  }

  // 切片批量上传
  const uploadParts = async (chunks: Chunk[], filename: string) => {
    // 检测是否已经上传
    const { data: { uploadDone, uploadList } } = await axios.get<any, { data: { uploadDone: boolean, uploadList: Uploaded[] } }>(`http://localhost:3000/verify/${filename}`)
    if (uploadDone) {
      setUploadState(UploadStatus.DONE)
      setChunkList([])
      return Toast.show('秒传成功')
    } else {
      // 筛选出没有上传完成的切片
      chunks = chunks.filter((chunk) => {
        const chunkInfo = uploadList.find(_ => chunk.chunkName === _.chunkName)
        if (!chunkInfo) {
          // 切片未上传
          return true
        }
        chunk.loaded = chunkInfo.size // 已经上传的字节
        chunk.percent = Number((chunk.loaded / chunk.size * 100).toFixed(2)); // 计算已经上传的百分比
        return true
      })
    }
    const tasks = chunks.map((chunk) => {
      const source = axios.CancelToken.source()
      cancelToken.push(source)
      setCancelToken(cancelToken)
      return ()=>{
        return new Promise<void>((resolve) => {
          const formData = new FormData()
          // 从上次断点的地方开始上传
          formData.append('file', chunk.chunk.slice(chunk.loaded))
          axios({
            method: 'post',
            url: `http://localhost:3000/upload/${chunk.filename}/${chunk.chunkName}/${chunk.loaded}`,
            data: formData,
            cancelToken: source.token,
            headers: {
              'Content-Type': 'multipart/form-data'
            },
            onUploadProgress (progress) {
              // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
              chunk.percent = (chunk.loaded! + progress.loaded / (progress.total as number)) * 100
              setChunkList([...chunks])
            },
          }).then(() => resolve())
        })
      }
     
    })
    await httpPool(tasks)
    console.log('上传完成');
    await axios.get(`http://localhost:3000/merge/${filename}`)
    setUploadState(UploadStatus.DONE)
  }
 

  async function httpPool (chunks: (() => Promise<void>)[], poolMax = 6) {
    let i = 0;
    const tasksLength = chunks.length;
    const taskPool: Promise<void>[] = [];
    const failTaskPool:Promise<void>[] = [];
    while (i < tasksLength) {
      const task = chunks[i]();
      taskPool.push(task);
      task.catch(() => {
        failTaskPool.push(task);
        console.log('任务发生错误');
      }).finally(() => { 
        taskPool.splice(taskPool.findIndex((_) => _ === task), 1)
       });

      if (taskPool.length >= poolMax) {
        try {
          await Promise.race(taskPool);
        } catch (error) {
          console.log('任务池中有任务发生错误');
        }

      }
      i++;
    }
    await Promise.all(taskPool);
    console.log('成功任务', taskPool);
    // TODO:重试机制
    console.log('失败任务', failTaskPool);
  }

  const calculateHash = (blobs: Blob[]) => {
    return new Promise(function (resolve) {

      const worker = new Worker('./hash.js')
      // 发送发消息给子进程
      worker.postMessage({ blobs });
      // 接收子线程发回来的消息
      worker.onmessage = function (event) {
        const { percent, hash } = event.data;
        setHashPercent(percent);
        if (hash) {
          resolve(hash);
        }
      }
    });
  }

  // 切片
  const createChunks = (file: File): { chunks: Chunk[], chunksHash: Blob[] } => {
    let chunkSize = 0;
    const chunks: Chunk[] = [];
    const chunksHash: Blob[] = [];
    const totalChunksIndex: number = Math.ceil(file.size / DEFAULT_SIZE);
    let currentChunksIndex = 0;

    while (chunkSize < file.size) {
      // 切片
      const chunk: Blob = file.slice(chunkSize, chunkSize + DEFAULT_SIZE)
      if (currentChunksIndex === 0 || currentChunksIndex === totalChunksIndex) {
        // 头尾完成字节
        chunksHash.push(chunk)
      } else {
        // 前2个字节
        chunksHash.push(chunk.slice(0, 2))
        // 中间2个字节
        chunksHash.push(chunk.slice(DEFAULT_SIZE / 2, DEFAULT_SIZE / 2 + 2))
        // 后2个字节
        chunksHash.push(chunk.slice(DEFAULT_SIZE - 2, DEFAULT_SIZE))
      }
      currentChunksIndex++;
      chunks.push({ chunk, size: chunk.size })
      chunkSize += DEFAULT_SIZE
    }
    return { chunks, chunksHash }
  }

  return (
    <>
      <input type="file" ref={inputEl} onChange={(event: ChangeEvent<HTMLInputElement>) => handleChange(event)} accept='image/jpeg,image/png,image/gif,video/mp4' />
      <Card title='hash计算'>
        <ProgressBar percent={hashPercent} />
      </Card>
      <Card title='上传进度'>
        <ProgressBar percent={uploadState == UploadStatus.DONE ? 100 : totalPercent} />
      </Card>
      {
        chunkList.map((chunk: Chunk) => {
          return <Card title={chunk.chunkName} key={chunk.chunkName}>
            <ProgressBar percent={chunk.percent} />
          </Card>
        })
      }
      <Space direction='vertical' block>
        {uploadState === UploadStatus.INIT && <Button block type='submit' color='primary' size='large' onClick={upload}>
          上传
        </Button>}

        {
          uploadState === UploadStatus.UPLOADING && <Button block type='submit' color='primary' size='large' onClick={pause}>
            暂停
          </Button>
        }
        {
          uploadState === UploadStatus.PAUSE && <Button block type='submit' color='primary' size='large' onClick={handleResume}>
            恢复
          </Button>
        }
      </Space>

    </>
  )
}

export default App