关于netty+randomaccessfile实现客户端往服务端上传文件(支持断点续传)

时间:2022-03-05 13:06:22

    这个功能呢,很简单,由于我是第一次解除netty和randomaccessfile,所以纠结了很久,其实基本代码也是我找来的,根据百度搭建了客户端,服务端,实现了两者的交互。其次就是在randomaccessfile了,用来写断点续传还是很好使的,支持任意位置读写文件(最好是在文件末尾),说下我遇到的问题吧,基本上代码逻辑很简单,在客户端用randomaccessfile读取文件,然后在服务端写,但是吧,遇到了一个问题,就是我客户端在读完文件之后自动关闭了,但是服务端写入操作完成之后,就一直占着文件,使临时文件不能删除,(断点续传我的做法就是每次上传文件的同时写一个临时文件,在文件上传完毕之后会自动删除临时文件,下次上传文件的时候,通过判断临时文件是否存在来决定是继续上传还是重新上传),百思不得其解我明明已经关闭了所有的randomaccessfile,但是为什么还是被占用呢,后来我尝试换到了每次写入结束之后关闭(之前我是在最后,文件写完之后关闭输入流),结果就可以了,我想了一下,原因还是因为输入流没有关闭的原因,因为这个是分片上传的,所以每片都有新建randomaccessfile,但是我最后却只关闭了一次,文件肯定是被占用的。

代码如下(代码写的有点水,见笑):

服务端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class UploadServer {
public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<Channel>() {


                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new ObjectEncoder());
                    ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null))); // 最大长度
                    ch.pipeline().addLast(new FileUploadServerHandler());
                }
            });
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }
        try {
            new UploadServer().bind(port);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

客户端:

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.io.File;
import java.io.FileInputStream;
import java.nio.channels.FileChannel;
import java.util.Scanner;
import uploadServer.FileUploadFile;
public class UploadClient {
public void connect(int port, String host,
final FileUploadFile fileUploadFile) throws Exception {
    EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));
ch.pipeline().addLast(new FileUploadClientHandler(fileUploadFile));
}
});
    ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
   }
}


public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
try {
FileUploadFile uploadFile = new FileUploadFile();
File file = new File("E:/udb/V1.0.37-20160318(web)/CentOS-6.4-i386-bin-DVD1.iso");
// File file = new File("d:/work.txt");
// File file = new File("");
// File file = new File("E:/udb/V1.0.37-20160318(web)/CDP_Restore6.5_V1.7.11.iso");
// File file = new  File("E:/udb/V1.0.37-20160318(web)/cdpweb-V1.0.37.udb");
// String path="";
// Scanner sc=new Scanner(System.in);
// System.out.println("请输入文件路径及文件名:");
// path=sc.next();
// File file =new File(path);
if(!file.exists()){
System.out.println("文件不存在,请确定路径是否正确");
return;
}
String fileName = file.getName();// 文件名
FileInputStream fis = new FileInputStream(file);
FileChannel fc = fis.getChannel();
uploadFile.setFile(file);
uploadFile.setFileName(fileName);
uploadFile.setStarPos(0);// 文件开始位置
uploadFile.setFileSize(fc.size());// 文件大小
new UploadClient().connect(port, "127.0.0.1", uploadFile);
} catch (Exception e) {
e.printStackTrace();
}
}
}

服务端handler:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.HashMap;
import java.util.Map;
public class FileUploadServerHandler extends ChannelInboundHandlerAdapter {
private int byteRead;
private volatile long start = 0;
public RandomAccessFile randomAccessFile;
public RandomAccessFile randomAccessFile1;
private String file_dir = "e:";
private File file;
private FileUploadFile ef;
private Map map;
private long startTime = 0;
private long endTime = 0;
private long startsize = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if (msg instanceof FileUploadFile) {
map = new HashMap();
ef = (FileUploadFile) msg;
byte[] bytes = ef.getBytes();
byteRead = ef.getEndPos();
String status = ef.getStatus();
String fileName = ef.getFileName();
String path = file_dir + File.separator + fileName;
String temppath = file_dir + File.separator
+ fileName.substring(0, fileName.lastIndexOf("."))
+ "_temp"
+ fileName.substring(fileName.lastIndexOf("."));
file = new File(path);
File tempfile = new File(temppath);
// 只有第一次才会进入这个方法
if (start == 0) {
// 临时文件存在,继续上传
if (tempfile.exists()) {
System.out.println("继续上传!!!!!!");
// 获取文件已经传入的部分的大小,把指针移到这,继续上传
start = (long) file.length();
map.put("start", start);
map.put("status", "temp");
ctx.writeAndFlush(map);
// 文件存在,且临时文件不存在,服务端已存在文件不允许重复上传
} else if (file.exists() && !tempfile.exists()) {
// 标志位
status = "exist";
map.put("status", status);
ctx.writeAndFlush(map);
// 文件不存在,开始上传
} else {
System.out.println("开始上传文件");
writeFile(file, fileName, bytes, ctx, byteRead, status);
}
// 客户端已读完,关闭客户端
} else if (start == ef.getFileSize()) {
tempfile.delete();
ctx.close();
} else {
writeFile(file, fileName, bytes, ctx, byteRead, status);
}
}
} catch (Exception e) {

}
}
public void writeFile(File file, String fileName, byte[] bytes,
ChannelHandlerContext ctx, int byteRead, String status)
throws IOException {
randomAccessFile = new RandomAccessFile(file, "rws");
map = new HashMap();
String path1 = file_dir + File.separator
+ fileName.substring(0, fileName.lastIndexOf(".")) + "_temp"
+ fileName.substring(fileName.lastIndexOf("."));
File f = new File(path1);
randomAccessFile1 = new RandomAccessFile(f, "rws");
randomAccessFile.seek(start);
// 开始写文件
randomAccessFile.write(bytes);
randomAccessFile1.seek(start);
randomAccessFile1.write(bytes);
start = start + byteRead;
randomAccessFile1.close();
randomAccessFile.close();
if (byteRead > 0) {
map.put("start", start);
map.put("endTime", endTime);
ctx.writeAndFlush(map);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

客户端handler:

package UploadClient;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Map;
import uploadServer.FileUploadFile;
public class FileUploadClientHandler extends ChannelInboundHandlerAdapter {
private int byteRead;
private volatile long start = 0;
float size;
int length=1024*1024*24;
private volatile int lastLength = 0;
public RandomAccessFile randomAccessFile;
private FileUploadFile fileUploadFile;
private byte[] bytes;
private long startTime=0;
private long endTime=0;
public FileUploadClientHandler(FileUploadFile ef) {
if (ef.getFile().exists()) {
if (!ef.getFile().isFile()) {
System.out.println("Not a file :" + ef.getFile());
return;
}
}
this.fileUploadFile = ef;
}
public void channelActive(ChannelHandlerContext ctx) {
  try {
// 只读形式打开文件
randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(),"r");
// seek(0)把指针移到初始位置,seek(m)把指针移到m字节处
randomAccessFile.seek(fileUploadFile.getStarPos());
  if(randomAccessFile.length()<(1024*1024*120)&&randomAccessFile.length()>(1024*10)){
lastLength = (int) randomAccessFile.length()/10;
bytes = new byte[lastLength];
  }else if(randomAccessFile.length()>=(1024*1024*120)){
bytes = new byte[length];
  }else{
bytes = new byte[(int) randomAccessFile.length()];
  }
// 文件大小
size = (float) (randomAccessFile.length()) / 1024 / 1024;
// 把文件读到内存中
if ((byteRead = randomAccessFile.read(bytes)) != -1) {
fileUploadFile.setEndPos(byteRead);
fileUploadFile.setBytes(bytes);
ctx.writeAndFlush(fileUploadFile);
randomAccessFile.close();
} else {
System.out.println("文件已经读完");
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException i) {
i.printStackTrace();
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
startTime=System.nanoTime();
Map map= (Map) msg;
if(map.get("start")!=null){
start=Long.parseLong(map.get("start").toString());
String status="";
if(map.get("status")!=null){
status=map.get("status").toString();
}
if("temp".equals(status)){
System.out.println("续传开始!!!!");
}
if (start != -1) {
randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(), "r");
// 把指针移到start处
randomAccessFile.seek(start);
int a = (int) (randomAccessFile.length() - start);
int b=0;
if(randomAccessFile.length()<(120*1024*1024)&&randomAccessFile.length()>(10*1024)){
b = (int) (randomAccessFile.length() / 10) ;
}else if(randomAccessFile.length()>=(1024*1024*120)){
b=length;
}else{
b=(int) randomAccessFile.length();
}
if(a!=0){
//             int b = (int) (randomAccessFile.length() / 10);
if (a < b && a > 0) {
   lastLength = a;
}
if (a < 0) {
lastLength = b;
}
if (a > b && a > 0) {
lastLength = b;
}
bytes = new byte[lastLength];
if ((byteRead = randomAccessFile.read(bytes)) != -1&& (randomAccessFile.length() - start) > 0) {
fileUploadFile.setEndPos(byteRead);
fileUploadFile.setBytes(bytes);
try {
ctx.writeAndFlush(fileUploadFile);
endTime=System.nanoTime();
double time=(double)((double)(endTime-startTime)/1000/1000/1000);
double filesize=(double)((double)(bytes.length))/1024/1024;
// startTime=endTime;
// System.out.println("时间为:"+time);
// System.out.println("大小为:"+filesize);
double speed=filesize/time;
if(time!=0){
// time=(double)bytes.length/(double)randomAccessFile.length()*100;
System.out.println("速度为:"+speed+"M/s");
}
randomAccessFile.close();
} catch (Exception e) {
e.printStackTrace();
}
} else {
randomAccessFile.close();
// ctx.close();
}
}else{
randomAccessFile.close();
fileUploadFile.setStatus("q");
ctx.writeAndFlush(fileUploadFile);
     }
}
}else{
     if("exist".equals(map.get("status"))){
     System.out.println("文件已存在,禁止重复上传");
        }
}
}


public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

fileupload类:

package uploadServer;
import java.io.File;
import java.io.Serializable;
public class FileUploadFile implements Serializable {
private static final long serialVersionUID = 1L;
    private File file;// 文件
    private String file_md5;// 文件md5值
    private String fileName;//文件名
    private int starPos;// 开始位置
    private long fileSize;
    private String status;
    public String getStatus() {
return status;
}


public void setStatus(String status) {
this.status = status;
}


public long getFileSize() {
return fileSize;
}


public void setFileSize(long fileSize) {
this.fileSize = fileSize;
}


private byte[] bytes;// 文件字节数组
    private int endPos;// 结尾位置


    public int getStarPos() {
        return starPos;
    }


    public void setStarPos(int starPos) {
        this.starPos = starPos;
    }


    public int getEndPos() {
        return endPos;
    }


    public void setEndPos(int endPos) {
        this.endPos = endPos;
    }


    public byte[] getBytes() {
        return bytes;
    }


    public void setBytes(byte[] bytes) {
        this.bytes = bytes;
    }


    public File getFile() {
        return file;
    }


    public void setFile(File file) {
        this.file = file;
    }


    public String getFile_md5() {
        return file_md5;
    }


    public void setFile_md5(String file_md5) {
        this.file_md5 = file_md5;
    }


public String getFileName() {
return fileName;
}


public void setFileName(String fileName) {
this.fileName = fileName;
}
    
}