700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > java实现断点续传 分片上传 文件快传

java实现断点续传 分片上传 文件快传

时间:2019-10-13 22:29:07

相关推荐

java实现断点续传 分片上传 文件快传

本文实现了文件的断点续传,分片上传、文件快传,文件保存在文件服务器上面,文件分片使用redis记录当前分片上传状态,合并时判断文件是否上传完整,然后异步去合成,文件下载时,需要调用接口去查询文件是否生成,生成完才能去下载,文件快传时去判断文件是否存在,存在直接复制一份。

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;import com.fasterxml.jackson.core.JsonProcessingException;import com.szcares.v4.passport.config.MinioConfigProperties;import com.szcares.v4.passport.entity.TmpFileEntity;import com.szcares.v4.passport.enums.TmpFileEnum;import com.szcares.v4.passport.mapper.TmpFileMapper;import com.szcares.v4.passport.model.FileCheckDTO;import com.szcares.v4.passport.model.FileMergeDTO;import com.szcares.v4.passport.model.FileUploadDTO;import com.szcares.v4.passport.service.MinioService;import com.szcares.v4.passport.service.TmpFileService;import lombok.extern.slf4j.Slf4j;import org.springframework.aop.framework.AopContext;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.scheduling.annotation.Async;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import org.springframework.util.CollectionUtils;import org.springframework.web.multipart.MultipartFile;import javax.annotation.Resource;import java.io.*;import java.time.LocalDateTime;import java.util.*;/*** 断点续传临时文件Impl** @author Maruko* @date /07/13 09:36**/@Service@Slf4jpublic class TmpFileServiceImpl extends ServiceImpl<TmpFileMapper, TmpFileEntity> implements TmpFileService {@Resourceprivate RedisTemplate redisTemplate;@Resourceprivate MinioService minioService;@Resourceprivate MinioConfigProperties minioConfigProperties;@Overridepublic Boolean checkChunkUpload(FileCheckDTO dto) {Object o = redisTemplate.opsForHash().get(dto.getFileMd5(), TmpFileEnum.CHUNK_INDEX + dto.getChunkIndex());if (dto.getChunkIndex().equals(o)) {return Boolean.TRUE;}return Boolean.FALSE;}@Overridepublic String chunkUpload(FileUploadDTO dto, MultipartFile multipartFile) throws Exception {String fileMd5 = dto.getFileMd5();Integer chunkIndex = dto.getChunkIndex();//校验md5文件是否已经上传 如果上传了 直接返回成功List<TmpFileEntity> tmpFileEntities = this.queryFiles(fileMd5);if (!CollectionUtils.isEmpty(tmpFileEntities)) {log.info("文件MD5值为:{},分片位置为:{}的文件已上传过,直接跳过", fileMd5, chunkIndex);return String.format("文件MD5值为:%s,分片位置为:%s的文件,已上传成功", fileMd5, chunkIndex);}//校验分片是否已经上传Boolean checked = this.checkChunkUpload(dto);if (checked) {log.warn("文件MD5值为:{},分片位置为:{}的文件已上传 请勿重复上传", fileMd5, chunkIndex);throw new RuntimeException(String.format("文件MD5值为:%s,分片位置为:%s的文件已上传,请勿重复上传", fileMd5, chunkIndex));}long start = System.currentTimeMillis();log.info("文件MD5值为:{},分片位置为:{}的文件开始上传", fileMd5, chunkIndex);String userDir = System.getProperty(TmpFileEnum.USER_DIR);String chunkFilePath = userDir + File.separator + fileMd5 + "_" + chunkIndex;File file = new File(chunkFilePath);multipartFile.transferTo(file);Map<String, Object> map = new HashMap<>(4);//分片存储路径map.put(TmpFileEnum.CHUNK_LOCATION + chunkIndex, chunkFilePath);map.put(TmpFileEnum.FILE_SIZE, dto.getFileSize());map.put(TmpFileEnum.FILE_CHUNKS, dto.getChunks());map.put(TmpFileEnum.CHUNK_INDEX + chunkIndex, chunkIndex);redisTemplate.opsForHash().putAll(fileMd5, map);log.info("文件MD5值为:{},分片位置为:{}的文件上传完成,耗时为:{}", fileMd5, chunkIndex, System.currentTimeMillis() - start);return String.format("文件MD5值为:%s,分片位置为:%s的文件,已上传成功", fileMd5, chunkIndex);}@Override@Transactional(rollbackFor = Exception.class)public Long merge(FileMergeDTO dto) throws IOException {String fileMd5 = dto.getFileMd5();//插入数据库TmpFileEntity tmpFile = new TmpFileEntity();tmpFile.setFileMd5(fileMd5);tmpFile.setDelFlag(TmpFileEnum.UN_DEL_FLAG);tmpFile.setStatus(TmpFileEnum.STATUS_UN_FINISH);tmpFile.setCreateTime(LocalDateTime.now());tmpFile.setUpdateTime(LocalDateTime.now());//设置文件名称tmpFile.setFileName(dto.getFileName());this.save(tmpFile);//校验文件是否已经上传过,上传过直接复制实现快传List<TmpFileEntity> tmpFileEntities = this.queryFiles(fileMd5);if (!CollectionUtils.isEmpty(tmpFileEntities)) {TmpFileEntity fileEntity = tmpFileEntities.get(0);String url = fileEntity.getUrl();String objectName = url.substring(url.lastIndexOf("/") + 1);//设置文件大小tmpFile.setFileSize(fileEntity.getFileSize());//设置文件总片数tmpFile.setChunks(fileEntity.getChunks());TmpFileServiceImpl tmpFileAopService = (TmpFileServiceImpl) AopContext.currentProxy();tmpFileAopService.copyFile(fileMd5, objectName, tmpFile);} else {//校验切片是否己经上传完毕boolean flag = this.checkBeforeMerge(fileMd5);if (!flag) {log.warn("文件MD5值为:{},的文件,切片未完全上传完成,无法合并", fileMd5);throw new RuntimeException(String.format("文件MD5值为:%s的文件,切片未完全上传完成,无法合并", fileMd5));}//获取文件大小Long fileSize = Long.valueOf(redisTemplate.opsForHash().get(fileMd5, TmpFileEnum.FILE_SIZE).toString());tmpFile.setFileSize(fileSize);//获取文件总片数Integer fileChunks = Integer.valueOf(redisTemplate.opsForHash().get(fileMd5, TmpFileEnum.FILE_CHUNKS).toString());tmpFile.setChunks(fileChunks);TmpFileServiceImpl tmpFileAopService = (TmpFileServiceImpl) AopContext.currentProxy();tmpFileAopService.mergeChunks(fileMd5, dto.getFileName(), fileChunks, tmpFile);}return tmpFile.getId();}@Overridepublic Boolean checkFileFinish(Long id) {LambdaQueryWrapper<TmpFileEntity> wrapper = new LambdaQueryWrapper<>();wrapper.eq(TmpFileEntity::getDelFlag, TmpFileEnum.UN_DEL_FLAG);wrapper.eq(TmpFileEntity::getStatus, TmpFileEnum.STATUS_FINISH);wrapper.eq(TmpFileEntity::getId, id);TmpFileEntity one = this.getOne(wrapper);return null != one;}/*** 根据md5值查询已上传文件* true 已上传* false 未上传** @param fileMd5* @return*/private List<TmpFileEntity> queryFiles(String fileMd5) {LambdaQueryWrapper<TmpFileEntity> wrapper = new LambdaQueryWrapper<>();wrapper.eq(TmpFileEntity::getFileMd5, fileMd5);wrapper.eq(TmpFileEntity::getDelFlag, TmpFileEnum.UN_DEL_FLAG);wrapper.eq(TmpFileEntity::getStatus, TmpFileEnum.STATUS_FINISH);return this.list(wrapper);}/*** 合并分片前检验文件整体的所有分片是否全部上传** @param key* @return*/private boolean checkBeforeMerge(String key) {Map map = redisTemplate.opsForHash().entries(key);Object fileChunks = map.get(TmpFileEnum.FILE_CHUNKS);int i = 0;for (Object hashKey : map.keySet()) {if (hashKey.toString().startsWith(TmpFileEnum.CHUNK_INDEX)) {++i;}}if (fileChunks.equals(i)) {return true;}return false;}/*** 删除分片文件** @param fileMd5* @throws JsonProcessingException*/private void delChunkTmpFile(String fileMd5) throws JsonProcessingException {Map map = redisTemplate.opsForHash().entries(fileMd5);List<String> list = new ArrayList<>();for (Object hashKey : map.keySet()) {if (hashKey.toString().startsWith(TmpFileEnum.CHUNK_LOCATION)) {String filePath = map.get(hashKey).toString();File file = new File(filePath);boolean flag = file.delete();list.add(hashKey.toString());log.info("delete:" + filePath + ",:" + flag);}if (hashKey.toString().startsWith(TmpFileEnum.CHUNK_INDEX)) {list.add(hashKey.toString());}}list.add(TmpFileEnum.FILE_CHUNKS);list.add(TmpFileEnum.FILE_SIZE);redisTemplate.opsForHash().delete(fileMd5, list.toArray());}/*** 异步实现文件快传** @param file 文件实体类* @param fileMd5 文件MD5* @param objectName 源文件路径*/@Asyncpublic void copyFile(String fileMd5, String objectName, TmpFileEntity file) {long start = System.currentTimeMillis();log.info("文件MD5值为:{}的文件已上传过,进行复制快传,开始时间为:{}", fileMd5, start);String url = minioService.copyFile(objectName);file.setUpdateTime(LocalDateTime.now());file.setStatus(TmpFileEnum.STATUS_FINISH);file.setUrl(url);this.updateById(file);log.info("文件MD5值为:{}的文件已上传过,进行复制快传完成,耗时为:{}毫秒", fileMd5, System.currentTimeMillis() - start);}/*** 文件合并** @param fileMd5 文件MD5* @param fileName 文件名称* @param tmpFile 文件* @param fileChunks 分片总数* @throws Exception*/@Asyncpublic void mergeChunks(String fileMd5, String fileName, Integer fileChunks, TmpFileEntity tmpFile) throws IOException {long start = System.currentTimeMillis();log.info("文件MD5值为:{}的文件已上传过,进行复制快传,开始时间为:{}", fileMd5, start);//这里要特别注意,合并分片的时候一定要按照分片的索引顺序进行合并,否则文件无法使用;String userDir = System.getProperty("user.dir");String destPath = userDir + File.separator + fileName;//声明向量集合保存 InputStreamVector<InputStream> vis = new Vector<>();SequenceInputStream sis = null;BufferedOutputStream bos = null;// 将每一个切分后的子文件使用输入流 保存到 向量集合中for (int i = 0; i < fileChunks; i++) {String tmpPath = redisTemplate.opsForHash().get(fileMd5, TmpFileEnum.CHUNK_LOCATION + i).toString();vis.add(new BufferedInputStream(new FileInputStream(tmpPath)));}// 将向量集合中的输入流合并成一个 合并流 SequenceInputStreamsis = new SequenceInputStream(vis.elements());// 声明缓冲输出流bos = new BufferedOutputStream(new FileOutputStream(destPath, true));//拷贝//3、操作 (分段读取)//缓冲容器byte[] flush = new byte[1024];int len = -1; //接收长度while ((len = sis.read(flush)) != -1) {//分段写出bos.write(flush, 0, len);}bos.flush();bos.close();sis.close();File file = new File(destPath);String type = fileName.substring(fileName.lastIndexOf(".") + 1);String url = minioService.uploadFileMinio(new FileInputStream(file), type);tmpFile.setUrl(url);tmpFile.setStatus(TmpFileEnum.STATUS_FINISH);tmpFile.setUpdateTime(LocalDateTime.now());this.updateById(tmpFile);//删除临时文件this.delChunkTmpFile(fileMd5);//删除合并文件file.delete();log.info("文件MD5值为:{}的文件已上传过,进行复制快传完成,耗时为:{}毫秒", fileMd5, System.currentTimeMillis() - start);}}

import com.szcares.v4.passport.config.MinioConfigProperties;import com.szcares.v4.passport.service.MinioService;import io.minio.*;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Async;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import java.io.File;import java.io.InputStream;import java.util.HashMap;import java.util.Map;import java.util.UUID;@Slf4j@Servicepublic class MinioServiceImpl implements MinioService {@Autowiredprivate MinioConfigProperties minioConfigProperties;private static final String PERIOD = ".";@Overridepublic String uploadFileMinio(InputStream inputStream, String type) {String fileName;try {//判断是否包含. 文件上传到minio上的Name把文件后缀带上,不然下载出现格式问题if (type.contains(PERIOD)) {fileName = UUID.randomUUID() + type.substring(type.lastIndexOf(PERIOD));} else {fileName = UUID.randomUUID() + PERIOD + type;}//创建头部信息Map<String, String> headers = new HashMap<>(1 << 2);//添加自定义内容类型headers.put("Content-Type", "application/octet-stream");//添加存储类headers.put("X-Amz-Storage-Class", "REDUCED_REDUNDANCY");//添加自定义/用户元数据Map<String, String> userMetadata = new HashMap<>(1 << 2);userMetadata.put("My-Project", "Project One");//判断桶存在否 不存在创建createBucket();//上传minioClient().putObject(PutObjectArgs.builder().bucket(minioConfigProperties.getBucket()).object(fileName).stream(inputStream, inputStream.available(), -1).headers(headers).userMetadata(userMetadata).build());inputStream.close();return minioConfigProperties.getUrl() + "/" + minioConfigProperties.getBucket() + "/" + fileName;} catch (Exception e) {e.printStackTrace();return "上传失败" + e.getMessage();}}public String copyFile(String objectName) {String type = objectName.substring(objectName.lastIndexOf(PERIOD));String srcObjectName = UUID.randomUUID() + type;try {minioClient().copyObject(CopyObjectArgs.builder().source(CopySource.builder().bucket(minioConfigProperties.getBucket()).object(objectName).build()).bucket(minioConfigProperties.getBucket()).object(srcObjectName).build());return minioConfigProperties.getUrl() + "/" + minioConfigProperties.getBucket() + "/" + srcObjectName;} catch (Exception e) {e.printStackTrace();return "复制失败" + e.getMessage();}}@Transactional(rollbackFor = Exception.class)@Overridepublic String removeMinio(String fileName) {try {minioClient().removeObject(RemoveObjectArgs.builder().bucket(minioConfigProperties.getBucket()).object(fileName).build());return "删除成功";} catch (Exception e) {e.printStackTrace();log.error(e.getMessage());return "删除失败";}}@Overridepublic InputStream downloadMinio(String fileName) throws Exception {return minioClient().getObject(GetObjectArgs.builder().bucket(minioConfigProperties.getBucket()).object(fileName).build());}/*** 判断桶是否存在 不存在创建** @throws Exception*/private void createBucket() throws Exception {BucketExistsArgs exist = BucketExistsArgs.builder().bucket(minioConfigProperties.getBucket()).build();boolean result = minioClient().bucketExists(exist);if (!result) {MakeBucketArgs create = MakeBucketArgs.builder().bucket(minioConfigProperties.getBucket()).build();minioClient().makeBucket(create);}}private MinioClient minioClient() {return MinioClient.builder().endpoint(minioConfigProperties.getUrl()).credentials(minioConfigProperties.getAccessKey(), minioConfigProperties.getSecretKey()).build();}}

/*** 断点续传临时文件枚举** @author Maruko* @date /07/13 09:24**/public class TmpFileEnum {/*** 文件未合成*/public static final Integer STATUS_UN_FINISH = 0;/*** 文件已合成*/public static final Integer STATUS_FINISH = 1;/*** 文件已删除*/public static final Integer DEL_FLAG = 0;/*** 文件未删除*/public static final Integer UN_DEL_FLAG = 1;public static final String CHUNK_INDEX = "chunk_index_";/*** 分片存储位置*/public static final String CHUNK_LOCATION = "chunk_location_";/*** 文件总大小*/public static final String FILE_SIZE = "file_size";/*** 文件总切片*/public static final String FILE_CHUNKS = "file_chunks";public static final String USER_DIR = "user.dir";}

断点续传工具类,实现文件的切片与合成,可作为上面校验接口时使用。

import java.io.*;import java.util.ArrayList;import java.util.List;import java.util.Vector;public class SplitFileUtil {private File src;//输入源private String destDir;//分割子文件输出的目录private List<String> destPaths;//保存每个分割子文件的路径private int blockSize;//切割大小private int size;//切割总块数public SplitFileUtil(String srcDir, String destDir) {this(srcDir, destDir, 1024 * 1024);}public SplitFileUtil(String srcDir, String destDir, int blockSize) {this.src = new File(srcDir);//初始化输入源this.destDir = destDir;//初始化分割子文件输出的目录this.destPaths = new ArrayList<>();//初始化保存分割子文件的路径容器this.blockSize = blockSize;//初始化切割大小//初始化对象参数init();System.out.println("初始化SplitFileUtils完成");}/*** 初始化SplitFileUtils 参数*/private void init() {long len = this.src.length();//文件总长度this.size = (int) Math.ceil(len * 1.0 / this.blockSize);// 四舍五入计算分割总总次数//根据size循环保存 切分子文件路径for (int i = 0; i < size; i++) {this.destPaths.add(this.destDir + File.separator + i + "-" + this.src.getName());}// 如果保存切分子文件目录不存在if (len > 0) {File destFile = new File(this.destDir);if (!destFile.exists()) {destFile.mkdirs();}}}public void split() {//总长度long len = src.length();//起始位置和实际大小int beginPos = 0;int actualSize = (int) (blockSize > len ? len : blockSize);for (int i = 0; i < size; i++) {beginPos = i * blockSize;if (i == size - 1) { //最后一块actualSize = (int) len;} else {actualSize = blockSize;len -= actualSize; //剩余量}splitDetail(i, beginPos, actualSize);}System.out.println("子文件切分后保存至" + this.destDir);}/*** 根据 循环次数,偏移量,实际读取量, 使用随机流输入模式读取文件字节,并使用随机流读写模式写入读取字节** @param i* @param beginPos* @param actualSize*/private void splitDetail(int i, int beginPos, int actualSize) {try (RandomAccessFile readRaf = new RandomAccessFile(this.src, "r");RandomAccessFile writeRaf = new RandomAccessFile(this.destPaths.get(i), "rw");) {// 设置随机读取流的偏移量readRaf.seek(beginPos);// 设置缓存容器byte[] flush = new byte[actualSize];int len = -1; //接收长度while ((len = readRaf.read(flush)) != -1) {if (actualSize > len) { //获取本次读取的所有内容writeRaf.write(flush, 0, len);actualSize -= len;} else {writeRaf.write(flush, 0, actualSize);break;}}} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}/*** 5. 调用 merge(String destPath) 将 destPaths 每一个分割文件,声明一个缓冲输入流,然后保存到 Vector<InputStream>中,* 然后使用SequenceInputStream将Vector<InputStream>合并到一个输入流中,* 最后使用destPath,创建一个缓冲输出流,将合并输入流读取字节,全部写入输出流中** @param destPath*/public void merge(String destPath) {//声明向量集合保存 InputStreamVector<InputStream> vis = new Vector<>();SequenceInputStream sis = null;BufferedOutputStream bos = null;try {// 将每一个切分后的子文件使用输入流 保存到 向量集合中for (int i = 0; i < this.destPaths.size(); i++) {vis.add(new BufferedInputStream(new FileInputStream(this.destPaths.get(i))));}// 将向量集合中的输入流合并成一个 合并流 SequenceInputStreamsis = new SequenceInputStream(vis.elements());// 声明缓冲输出流bos = new BufferedOutputStream(new FileOutputStream(destPath, true));//拷贝//3、操作 (分段读取)byte[] flush = new byte[1024 * 10]; //缓冲容器int len = -1; //接收长度while ((len = sis.read(flush)) != -1) {bos.write(flush, 0, len); //分段写出}bos.flush();System.out.println("子文件" + this.destDir + "合并完成");// delFileByPath(new File(this.destDir));System.out.println("删除子文件夹" + this.destDir + "完成");} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {try {if (bos != null) {bos.close();}if (sis != null) {sis.close();}} catch (IOException e) {e.printStackTrace();}}}/*** 5. 调用 merge(String destPath) 将 destPaths 每一个分割文件,声明一个缓冲输入流,然后保存到 Vector<InputStream>中,* 然后使用SequenceInputStream将Vector<InputStream>合并到一个输入流中,* 最后使用destPath,创建一个缓冲输出流,将合并输入流读取字节,全部写入输出流中** @param destPath*//* public void merge(String destPath) {//输出流OutputStream os = new BufferedOutputStream(new FileOutputStream(destPath, true));Vector<InputStream> vi = new Vector<>();SequenceInputStream sis = null;//输入流for (int i = 0; i < destPaths.size(); i++) {vi.add(new BufferedInputStream(new FileInputStream(destPaths.get(i))));}sis = new SequenceInputStream(vi.elements());//拷贝//3、操作 (分段读取)byte[] flush = new byte[1024]; //缓冲容器int len = -1; //接收长度while ((len = sis.read(flush)) != -1) {os.write(flush, 0, len); //分段写出}os.flush();sis.close();os.close();}*//*** @param src 递归删除文件*/public void delFileByPath(File src) {if (null == src || !src.exists()) {return;}if (src.isFile()) {src.delete();}if (src.isDirectory()) { //文件夹for (File sub : src.listFiles()) {delFileByPath(sub);}src.delete();}}public static void main(String[] args) {// 源文件String srcDir = "1.txt";// 保存子文件目录String destDir = "tmp";// 输出文件String destPath = "2.txt";//初始化 SplitFileUtilsSplitFileUtil splitFileUtils = new SplitFileUtil(srcDir, destDir);// 读取 srcDir ,切分子文件到 destDir目录 下面splitFileUtils.split();// 合并 destDir目录下的子文件并输出到 destPath 中splitFileUtils.merge(destPath);}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。