FileChannel零拷贝传输无法将字节复制到SocketChannel

时间:2022-12-31 11:03:45

I'm seeing some strange behavior when transferring large files from file to socket using zero-copy in Java. My environments:

在Java中使用零拷贝将大文件从文件传输到套接字时,我发现了一些奇怪的行为。我的环境:

  • Windows 7 64-bit JDK 1.6.0_45 and 1.7.0_79.
  • Windows 7 64位JDK 1.6.0_45和1.7.0_79。

  • Centos 6.6 64-bit JDK 1.6.0_35
  • Centos 6.6 64位JDK 1.6.0_35

What the program does: client copies an input file into a socket, and server copies socket to output file using zero-copy methods: transferFrom and transferTo. Not all bytes are reaching the server if file size is relatively large, 100Mb+ in case of Windows and 2GB+ in case of Centos. Client and server reside on the same machine and localhost address is used to transfer data.

程序的作用:客户端将输入文件复制到套接字中,服务器使用零复制方法将套接字复制到输出文件:transferFrom和transferTo。如果文件大小相对较大,并非所有字节都到达服务器,在Windows情况下为100Mb +,在Centos情况下为2GB +。客户端和服务器驻留在同一台计算机上,localhost地址用于传输数据。

The behavior is different depending on OS. On Windows, the client completes transferTo method successfully. The number of transferred bytes is equal to input file size.

行为因操作系统而异。在Windows上,客户端成功完成transferTo方法。传输的字节数等于输入文件大小。

long bytesTransferred = fileChannel.transferTo(0, inputFile.length(), socketChannel);

long bytesTransferred = fileChannel.transferTo(0,inputFile.length(),socketChannel);

The server on the other hand, reports a lower number of received bytes.

另一方面,服务器报告较少的接收字节数。

long transferFromByteCount = fileChannel.transferFrom(socketChannel, 0, inputFile.length());

long transferFromByteCount = fileChannel.transferFrom(socketChannel,0,inputFile.length());

On Linux bytesTransferred on client is 2Gb even if input file size is 4Gb. There is sufficient space on both configurations.

在Linux上,即使输入文件大小为4Gb,客户端上的传输也是2Gb。两种配置都有足够的空间。

On Windows I was able to transfer a 130Mb file with the one of the following workarounds: 1) increasing receive buffer size on server and 2) adding thread sleep method in client. This leads me to think that transferTo method on the client completes when all bytes are sent to socket send buffer, not to server. Whether or not those bytes make it to server is not guaranteed, which creates problems for my use case.

在Windows上,我能够使用以下解决方法之一传输130Mb文件:1)增加服务器上的接收缓冲区大小和2)在客户端中添加线程睡眠方法。这使我认为当所有字节都发送到套接字发送缓冲区而不是服务器时,客户端上的transferTo方法完成。这些字节是否使它成为服务器是不能保证的,这会给我的用例带来问题。

On Linux maximum file size that I'm able to transfer with a single transferTo invocation is 2Gb, however at least the client reports a correct number of bytes sent to server.

在Linux上,我能够通过单个transferTo调用传输的最大文件大小是2Gb,但至少客户端报告发送到服务器的正确字节数。

My questions: what's the best way for the client to ensure guaranteed delivery of the file to the server, cross-platform? What mechanisms are used to emulate sendfile() on Windows?

我的问题:客户确保将文件保证交付到服务器,跨平台的最佳方式是什么?在Windows上使用什么机制来模拟sendfile()?

Here's the code:

这是代码:

Client - ZeroCopyClient.java:

客户端 - ZeroCopyClient.java:

import org.apache.commons.io.FileUtils;

import java.io.*;
import java.net.*;
import java.nio.channels.*;

public class ZeroCopyClient {

    public static void main(String[] args) throws IOException, InterruptedException {

        final File inputFile = new File(args[0]);

        FileInputStream fileInputStream = new FileInputStream(inputFile);
        FileChannel fileChannel = fileInputStream.getChannel();
        SocketAddress socketAddress = new InetSocketAddress("localhost", 8083);
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(socketAddress);

        System.out.println("sending " + inputFile.length() + " bytes to " + socketChannel);

        long startTime = System.currentTimeMillis();
        long totalBytesTransferred = 0;
        while (totalBytesTransferred < inputFile.length()) {
            long st = System.currentTimeMillis();
            long bytesTransferred = fileChannel.transferTo(totalBytesTransferred, inputFile.length()-totalBytesTransferred, socketChannel);
            totalBytesTransferred += bytesTransferred;
            long et = System.currentTimeMillis();
            System.out.println("sent " + bytesTransferred + " out of " + inputFile.length() + " in " + (et-st) + " millis");
        }

        socketChannel.finishConnect();
        long endTime = System.currentTimeMillis();

        System.out.println("sent: totalBytesTransferred= " + totalBytesTransferred + " / " + inputFile.length() + " in " + (endTime-startTime) + " millis");

        final File outputFile = new File(inputFile.getAbsolutePath() + ".out");
        boolean copyEqual = FileUtils.contentEquals(inputFile, outputFile);
        System.out.println("copyEqual= " + copyEqual);

        if (args.length > 1) {
            System.out.println("sleep: " + args[1] + " millis");
            Thread.sleep(Long.parseLong(args[1]));
        }
    }
}

Server - ZeroCopyServer.java:

服务器 - ZeroCopyServer.java:

import java.io.*;
import java.net.*;
import java.nio.channels.*;
import org.apache.commons.io.FileUtils;

public class ZeroCopyServer {

    public static void main(String[] args) throws IOException {

        final File inputFile = new File(args[0]);
        inputFile.delete();
        final File outputFile = new File(inputFile.getAbsolutePath() + ".out");
        outputFile.delete();

        createTempFile(inputFile, Long.parseLong(args[1])*1024L*1024L);

        System.out.println("input file length: " + inputFile.length() + " : output file.exists= " + outputFile.exists());

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().setReceiveBufferSize(8*1024*1024);
        System.out.println("server receive buffer size: " + serverSocketChannel.socket().getReceiveBufferSize());
        serverSocketChannel.socket().bind(new InetSocketAddress("localhost", 8083));
        System.out.println("waiting for connection");
        SocketChannel socketChannel = serverSocketChannel.accept();
        System.out.println("connected. client channel: " + socketChannel);

        FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
        FileChannel fileChannel = fileOutputStream.getChannel();
        long startTime = System.currentTimeMillis();
        long transferFromByteCount = fileChannel.transferFrom(socketChannel, 0, inputFile.length());
        long endTime = System.currentTimeMillis();
        System.out.println("received: transferFromByteCount= " + transferFromByteCount + " : outputFile= " + outputFile.length() + " : inputFile= " + inputFile.length() + " bytes in " + (endTime-startTime) + " millis");

        boolean copyEqual = FileUtils.contentEquals(inputFile, outputFile);
        System.out.println("copyEqual= " + copyEqual);

        serverSocketChannel.close();

    }

    private static void createTempFile(File file, long size) throws IOException{
        RandomAccessFile f = new RandomAccessFile(file.getAbsolutePath(), "rw");
        f.setLength(size);
        f.writeDouble(Math.random());
        f.close();
    }

}

UPDATE 1: Linux code fixed with loop.

更新1:用循环修复的Linux代码。

UPDATE 2: One possible workaround I'm considering requires client-server cooperation. At the end of transmission the server writes the length of received data back to client which the client reads it in blocking mode.

更新2:我正在考虑的一种可能的解决方法需要客户端 - 服务器合作。在传输结束时,服务器将接收到的数据的长度写回客户端,客户端以阻塞模式读取它。

Server responds:

ByteBuffer response = ByteBuffer.allocate(8);
response.putLong(transferFromByteCount);
response.flip();
socketChannel.write(response);   
serverSocketChannel.close(); 

The client blocks with read:

客户端块读取:

ByteBuffer response = ByteBuffer.allocate(8);
socketChannel.read(response);
response.flip();
long totalBytesReceived = response.getLong();

As a result, the client waits for the bytes to pass through send and receive socket buffers, and in fact waits for bytes to get stored in the output file. There is no need to implement out-of-band acknowledgements and there's also no need for the client to wait as suggested in section II.A https://linuxnetworkstack.files.wordpress.com/2013/03/paper.pdf in case file content is mutable.

结果,客户端等待字节通过发送和接收套接字缓冲区,实际上等待字节存储在输出文件中。没有必要实施带外确认,客户也无需按照第II.A部分的建议等待。以防止https://linuxnetworkstack.files.wordpress.com/2013/03/paper.pdf文件内容是可变的。

"wait an “appropriate” amount of time before rewriting the same portion of file"

“等待”适当的“时间,然后重写相同的文件部分”

UPDATE 3:

A modified example incorporating fixes by @EJP and @the8472, with both length and file checksum verification, without output tracing. Note that computing CRC32 checksum for a large file may take a few seconds to complete.

一个修改的示例,包含@EJP和@ the8472的修复程序,包括长度和文件校验和验证,没有输出跟踪。请注意,计算大文件的CRC32校验和可能需要几秒钟才能完成。

Client:

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import org.apache.commons.io.FileUtils;

public class ZeroCopyClient {

    public static void main(String[] args) throws IOException {

        final File inputFile = new File(args[0]);

        FileInputStream fileInputStream = new FileInputStream(inputFile);
        FileChannel fileChannel = fileInputStream.getChannel();
        SocketAddress socketAddress = new InetSocketAddress("localhost", 8083);
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(socketAddress);

        //send input file length and CRC32 checksum to server
        long checksumCRC32 = FileUtils.checksumCRC32(inputFile);
        ByteBuffer request = ByteBuffer.allocate(16);
        request.putLong(inputFile.length());
        request.putLong(checksumCRC32);
        request.flip();
        socketChannel.write(request);

        long totalBytesTransferred = 0;
        while (totalBytesTransferred < inputFile.length()) {
            long bytesTransferred = fileChannel.transferTo(totalBytesTransferred, inputFile.length()-totalBytesTransferred, socketChannel);
            totalBytesTransferred += bytesTransferred;
        }

        //receive output file length and CRC32 checksum from server
        ByteBuffer response = ByteBuffer.allocate(16);
        socketChannel.read(response);
        response.flip();
        long totalBytesReceived = response.getLong();
        long outChecksumCRC32 = response.getLong();

        socketChannel.finishConnect();

        System.out.println("CRC32 equal= " + (checksumCRC32 == outChecksumCRC32));

    }
}

Server:

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import org.apache.commons.io.FileUtils;

public class ZeroCopyServer {

    public static void main(String[] args) throws IOException {

        final File outputFile = new File(args[0]);

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(8083));     
        SocketChannel socketChannel = serverSocketChannel.accept();

        //read input file length and CRC32 checksum sent by client
        ByteBuffer request = ByteBuffer.allocate(16);
        socketChannel.read(request);
        request.flip();
        long length = request.getLong();
        long checksumCRC32 = request.getLong();

        FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
        FileChannel fileChannel = fileOutputStream.getChannel();
        long totalBytesTransferFrom = 0;
        while (totalBytesTransferFrom < length) {
            long transferFromByteCount = fileChannel.transferFrom(socketChannel, totalBytesTransferFrom, length-totalBytesTransferFrom);
            if (transferFromByteCount <= 0){
                break;
            }
            totalBytesTransferFrom += transferFromByteCount;
        }

        long outChecksumCRC32 = FileUtils.checksumCRC32(outputFile);

        //write output file length and CRC32 checksum back to client
        ByteBuffer response = ByteBuffer.allocate(16);
        response.putLong(totalBytesTransferFrom);
        response.putLong(outChecksumCRC32);
        response.flip();
        socketChannel.write(response);

        serverSocketChannel.close();

        System.out.println("CRC32 equal= " + (checksumCRC32 == outChecksumCRC32));

    }
}

1 个解决方案

#1


The solution is to check write counter from fileChannel.transferFrom:

解决方案是从fileChannel.transferFrom检查写计数器:

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import org.apache.commons.io.FileUtils;

public class ZeroCopyServer {

public static void main(String[] args) throws IOException {

    final File outputFile = new File(args[0]);

    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.socket().bind(new InetSocketAddress(8083));     
    SocketChannel socketChannel = serverSocketChannel.accept();

    //read input file length and CRC32 checksum sent by client
    ByteBuffer request = ByteBuffer.allocate(16);
    socketChannel.read(request);
    request.flip();
    long length = request.getLong();
    long checksumCRC32 = request.getLong();

    FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
    FileChannel fileChannel = fileOutputStream.getChannel();
    long totalBytesTransferFrom = 0;
    while (totalBytesTransferFrom < length) {
        long transferFromByteCount = fileChannel.transferFrom(socketChannel, totalBytesTransferFrom, length-totalBytesTransferFrom);
        if (transferFromByteCount <= 0){
            break;
        }
        totalBytesTransferFrom += transferFromByteCount;
    }

    long outChecksumCRC32 = FileUtils.checksumCRC32(outputFile);

    //write output file length and CRC32 checksum back to client
    ByteBuffer response = ByteBuffer.allocate(16);
    response.putLong(totalBytesTransferFrom);
    response.putLong(outChecksumCRC32);
    response.flip();
    socketChannel.write(response);

    serverSocketChannel.close();

    System.out.println("CRC32 equal= " + (checksumCRC32 == outChecksumCRC32));

  }
}

#1


The solution is to check write counter from fileChannel.transferFrom:

解决方案是从fileChannel.transferFrom检查写计数器:

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import org.apache.commons.io.FileUtils;

public class ZeroCopyServer {

public static void main(String[] args) throws IOException {

    final File outputFile = new File(args[0]);

    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.socket().bind(new InetSocketAddress(8083));     
    SocketChannel socketChannel = serverSocketChannel.accept();

    //read input file length and CRC32 checksum sent by client
    ByteBuffer request = ByteBuffer.allocate(16);
    socketChannel.read(request);
    request.flip();
    long length = request.getLong();
    long checksumCRC32 = request.getLong();

    FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
    FileChannel fileChannel = fileOutputStream.getChannel();
    long totalBytesTransferFrom = 0;
    while (totalBytesTransferFrom < length) {
        long transferFromByteCount = fileChannel.transferFrom(socketChannel, totalBytesTransferFrom, length-totalBytesTransferFrom);
        if (transferFromByteCount <= 0){
            break;
        }
        totalBytesTransferFrom += transferFromByteCount;
    }

    long outChecksumCRC32 = FileUtils.checksumCRC32(outputFile);

    //write output file length and CRC32 checksum back to client
    ByteBuffer response = ByteBuffer.allocate(16);
    response.putLong(totalBytesTransferFrom);
    response.putLong(outChecksumCRC32);
    response.flip();
    socketChannel.write(response);

    serverSocketChannel.close();

    System.out.println("CRC32 equal= " + (checksumCRC32 == outChecksumCRC32));

  }
}