细节介绍我这里是使用一个timer类(java.util.timer)来实现断点功能的,就是使用这个类,每隔一段时间进行一次记录,记录的内容是每个线程复制的进度。
timer 类的介绍:
a facility for threads to schedule tasks for future execution in a background thread. tasks may be scheduled for one-time execution, or for repeated execution at regular intervals. 线程在后台线程中调度任务以供将来执行的工具。任务可以安排为一次性执行,也可以安排为定期重复执行。
根据 api 中的介绍可以看出,这个 timer 类可以只执行一次任务,也可以周期性地执行任务。(注意这个类是 java.util.timer 类,不是 javax 包下面的类。)
这个类的有很多和时间相关的方法,这里就不介绍了,感兴趣的可以去了解,这里只介绍我们需要使用的一个方法。
public void schedule(timertask task, long delay, long period)
schedules the specified task for repeated fixed-delay execution beginning after the specified delay. subsequent executions take place at approximately regular intervals separated by the specified period. 为指定的任务安排在指定延迟之后开始的重复固定延迟执行。随后的执行发生在按规定时间间隔的大致间隔。
使用这个方法,按照一个固定的时间间隔记录各个线程的复制进度信息即可。
代码部分定时任务类package dragon.local;import java.io.file;import java.io.filenotfoundexception;import java.io.fileoutputstream;import java.io.ioexception;import java.io.objectoutputstream;import java.util.list;import java.util.timer;import java.util.timertask;public class recordtask extends timertask {public static final string filename = "breakpointrecord.txt";private timer timer;private list<filecopythread> copythreads;private string outputpath;public recordtask(timer timer, list<filecopythread> copythreads, string outputpath) {this.timer = timer;this.copythreads = copythreads;this.outputpath = outputpath;}@overridepublic void run() {try {this.breakpointrecord();} catch (ioexception e) {e.printstacktrace();}}public void breakpointrecord() throws filenotfoundexception, ioexception {int alivethreadnum = 0; //存活线程数目//不使用追加方式,这里只需要最新的记录即可。file recordfile = new file(outputpath, filename);try (objectoutputstream oos = new objectoutputstream(new fileoutputstream(recordfile))){//每次记录一个线程的下载位置,但是取出来又需要进行转换,太麻烦了。//我们直接使用序列化来进行操作,哈哈!long[] curlen = new long[4];int index = 0;for (filecopythread copythread : copythreads) {if (copythread.isalive()) {alivethreadnum++;}curlen[index++] = copythread.getcurlen();system.out.println(index+" curlen: "+copythread.getcurlen());}//创建 record 对象,并序列化。oos.writeobject(new record(curlen));}//当所有的线程都死亡时,关闭计时器,删除记录文件。(所有线程死亡的话,就是文件已经复制完成了!)if (alivethreadnum == 0) {timer.cancel();recordfile.delete();}system.out.println("线程数量: "+alivethreadnum);}}
说明:
if (alivethreadnum == 0) {timer.cancel();recordfile.delete();}
如果线程都已经结束了,就表示程序已经正常执行结束了。这个时候就删除记录文件。这里这个记录文件是一个标志(flag),如果存在记录文件就表示程序没有正常结束,再次启动时,会进行断点复制。
注意:这里没有考虑复制过程中的 io 异常,如果线程抛出 io 异常,那么线程的状态也是结束了。但是考虑,本地文件复制出现 io 异常的情况还是比较少的,就没有考虑,如果是网络下载的话,这个程序的功能可能就需要进行改进了。
记录信息类每次需要依次写入各个线程的信息,但是读取出来还需要进行转换,还是感觉过于麻烦了,这里直接利用java的序列化机制了。 有时候,直接操作对象是很方便的。 注意: 数组的下标表示的就是每个线程的位置。
package dragon.local;import java.io.serializable;public class record implements serializable{/** * 序列化 id */private static final long serialversionuid = 1l;private long[] curlen;public record(long[] curlen) {this.curlen = curlen;} public long[] getcurlen() {return this.curlen;}}
复制线程类package dragon.local;import java.io.bufferedinputstream;import java.io.file;import java.io.fileinputstream;import java.io.ioexception;import java.io.randomaccessfile;public class filecopythread extends thread {private int index;private long position;private long size;private file targetfile;private file outputfile;private long curlen; //当前下载的长度public filecopythread(int index, long position, long size, file targetfile, file outputfile) {this.index = index;this.position = position;this.size = size;this.targetfile = targetfile;this.outputfile = outputfile;this.curlen = 0l;}@overridepublic void run() {try (bufferedinputstream bis = new bufferedinputstream(new fileinputstream(targetfile));randomaccessfile raf = new randomaccessfile(outputfile, "rw")){bis.skip(position); //跳过不需要读取的字节数,注意只能先后跳raf.seek(position); //跳到需要写入的位置,没有这句话,会出错,但是很难改。int hasread = 0;byte[] b = new byte[1024];/** * 注意,每个线程只是读取一部分数据,不能只以 -1 作为循环结束的条件 * 循环退出条件应该是两个,即写入的字节数大于需要读取的字节数 或者 文件读取结束(最后一个线程读取到文件末尾) */while(curlen < size && (hasread = bis.read(b)) != -1) {raf.write(b, 0, hasread);curlen += (long)hasread;//强制停止程序。//if (curlen > 17_000_000) {//system.exit(0);//}}system.out.println(index+" "+position+" "+curlen+" "+size);} catch (ioexception e) {e.printstacktrace();}}public long getcurlen() { //获取当前的进度,用于记录,以便必要时恢复读取进度。return position+this.curlen;}}
这段代码是为了测试断点复制的。如果你想要进行测试,可以将 if 判断中的条件按照你要复制的文件大小进行相应的调整。如果要进行测试,可以先将这段代码的注释取消再执行程序(然后程序退出,这时候文件没有复制完成。),然后再将这段代码注释再次执行程序,文件将会复制成功。
//强制停止程序。//if (curlen > 17_000_000) {//system.exit(0);//}
复制工具类package dragon.local;import java.io.file;import java.io.fileinputstream;import java.io.filenotfoundexception;import java.io.ioexception;import java.io.objectinputstream;import java.io.randomaccessfile;import java.util.arraylist;import java.util.list;import java.util.timer;/** * 设计思路: * 获取目标文件的大小,然后设置复制文件的大小(这样做是有好处的), * 然后使用将文件分为 n 分,使用 n 个线程同时进行复制(这里我将 n 取为 4)。 * * 进一步拓展: * 加强为断点复制功能,即程序中断以后, * 仍然可以继续从上次位置恢复复制,减少不必要的重复开销 * */public class filecopyutil {//设置一个常量,复制线程的数量private static final int thread_num = 4;private filecopyutil() {}/** * @param targetpath 目标文件的路径 * @param outputpath 复制输出文件的路径 * @throws ioexception * @throws classnotfoundexception * */public static void transferfile(string targetpath, string outputpath) throws ioexception, classnotfoundexception {file targetfile = new file(targetpath);file outputfilepath = new file(outputpath);if (!targetfile.exists() || targetfile.isdirectory()) { //目标文件不存在,或者是一个文件夹,则抛出异常throw new filenotfoundexception("目标文件不存在:"+targetpath);}if (!outputfilepath.exists()) { //如果输出文件夹不存在,将会尝试创建,创建失败,则抛出异常。if(!outputfilepath.mkdir()) {throw new filenotfoundexception("无法创建输出文件:"+outputpath);}}long len = targetfile.length();file outputfile = new file(outputfilepath, "copy"+targetfile.getname());createoutputfile(outputfile, len); //创建输出文件,设置好大小。//创建计时器 timer 对象timer timer = new timer();long[] position = new long[4];//每一个线程需要复制文件的起点long size = len / filecopyutil.thread_num + 1; //保存复制线程的集合list<filecopythread> copythreads = new arraylist<>();record record = getrecord(outputpath);for (int i = 0; i < filecopyutil.thread_num; i++) {//如果已经有了 记录文件,就从使用记录数据,否则就是新的下载。position[i] = record == null ? i*size : record.getcurlen()[i];filecopythread copythread = new filecopythread(i, position[i], size, targetfile, outputfile);copythread.start(); //启动复制线程copythreads.add(copythread); //将复制线程添加到集合中。}timer.schedule(new recordtask(timer, copythreads, outputpath), 0l, 100l); //立即启动计时器,每隔10秒记录一次位置。system.out.println("开始了!");}//创建输出文件,设置好大小。private static void createoutputfile(file file, long length) throws ioexception {try ( randomaccessfile raf = new randomaccessfile(file, "rw")){raf.setlength(length);}}//获取以及下载的位置private static record getrecord(string outputpath) throws filenotfoundexception, ioexception, classnotfoundexception {file recordfile = new file(outputpath, recordtask.filename);if (recordfile.exists()) {try (objectinputstream ois = new objectinputstream(new fileinputstream(recordfile))){return (record) ois.readobject();}}return null;}}
说明: 根据复制的目录中,是否存在记录文件来判断是否启动断点复制。
private static record getrecord(string outputpath) throws filenotfoundexception, ioexception, classnotfoundexception {file recordfile = new file(outputpath, recordtask.filename);if (recordfile.exists()) {try (objectinputstream ois = new objectinputstream(new fileinputstream(recordfile))){return (record) ois.readobject();}}return null;}
启动断点复制原来其实很简单,就是和复制一样,只不过起始复制位置变成了记录的位置了。
//如果已经有了 记录文件,就从使用记录数据,否则就是新的下载。position[i] = record == null ? i*size : record.getcurlen()[i];
以上就是java多线程断点复制的方法是什么的详细内容。