
时间:2022-08-24 21:02:23

I am trying to implement a protocol to send and receive files over socket. The protocol is specified and I can not change that.


I am new to NodeJs, and this is how I am trying to implement that.


I will write a duplex stream, and pipe file into it. Then pipe it into socket to send data.


The confusion comes where I should read this, and where to write that. How to know reading file has finished, and how to tell socket that it is finished. Docs are not very clear to me, and googling added more confusion :)


Any help would be appreciated.


P.S. I will add my own samples when I get home, I just don't have it now.




After @MattHarrison's answer, I changed code into this:

在@ MattHarrison回答之后,我将代码更改为:

var stream = require('stream');
var util = require('util');
var bufferpack = require('bufferpack');
var fs = require('fs');
var net = require('net');

var MyProtocolStream = function () {

    this.writtenHeader = false;            // have we written the header yet?

util.inherits(MyProtocolStream, stream.Transform);

MyProtocolStream.prototype._transform = function (chunk, encoding, callback) {

    if (!this.writtenHeader) {
        this.push('==== HEADER ====\n');  // if we've not, send the header first

    // Can this function be interrupted at this very line?
    // Then another _transform comes in and pushes its own data to socket
    // Corrupted data maybe then?
    // How to prevent this behavior? Buffering whole file before sending?

    var self = this;
    // I put a random timeout to simulate overlapped calls
    // Can this happen in real world?
    setTimeout(function () {
        self.push(chunk);  // send the incoming file chunks along as-is
    }, Math.random()*10);

MyProtocolStream.prototype._flush = function (callback) {

    this.push('==== FOOTER ====\n');      // Just before the stream closes, send footer

var file = '/tmp/a';

var server = net.createServer(function (sck) {
    sck.addr = sck.remoteAddress;
    console.log('Client connected - ' + sck.addr);
    fs.createReadStream('/tmp/a').pipe(new MyProtocolStream()).pipe(sck);
    fs.createReadStream('/tmp/b').pipe(new MyProtocolStream()).pipe(sck);
    fs.createReadStream('/tmp/c').pipe(new MyProtocolStream()).pipe(sck);
    sck.on('close', function () {
        console.log('Client disconnected - ' + this.addr);

server.listen(22333, function () {
    console.log('Server started on ' + 22333)

See my comments in _transform.


2 个解决方案



I'm not sure on the exact details of the protocol that you're trying to implement but the following should give you a good pattern that you can adapt to your needs.


My fictional protocol


When a client socket connects to my TCP server, I want to send them a file. But first I want to send a header. At the end of the file, before the stream ends, I also want to send a header. So the data written to the socket looks like:


==== HEADER ====
==== FOOTER ====

Implementing a Transform stream


All I want to do is transform the data coming out of a readable stream. Notice transform is the keyword here. I can use a Transform stream for this.


When creating a Transform stream, you can override two methods: _transform and _flush. _transform is called with each chunk coming of the readable stream. You can change the data, buffer it up or whatever. _flush is called right after all the data from the readable has finished. You can do anymore cleanup here, or write a last bit of data out.


var Stream = require('stream');
var Util = require('util');

var MyProtocolStream = function () {

    this.writtenHeader = false;            // have we written the header yet?

Util.inherits(MyProtocolStream, Stream.Transform);

MyProtocolStream.prototype._transform = function (chunk, encoding, callback) {

    if (!this.writtenHeader) {
        this.push('==== HEADER ====\n');  // if we've not, send the header first
        this.writtenHeader = true;
    this.push(chunk);                     // send the incoming file chunks along as-is

MyProtocolStream.prototype._flush = function (callback) {

    this.push('==== FOOTER ====\n');      // Just before the stream closes, send footer 

Using the MyProtocolStream


So now I have a stream that does what I want, I can simply pipe a file (or any readable stream for that matter) through my custom Transform stream and into any other Writable stream (such as a socket).


var Fs = require('fs');
var Net = require('net');

var server = Net.createServer(function (socket) {

        .pipe(new MyProtocolStream())


Testing it out


I can test this out by adding some contents to example.txt:


This is a line
This is another line
This is the last line

I can spin up my server and then connect with telnet/nc:

我可以启动我的服务器,然后用telnet / nc连接:

$ telnet 8000
Connected to localhost.
Escape character is '^]'.
==== HEADER ====
This is a line
This is another line
This is the last line
==== FOOTER ====
Connection closed by foreign host.

So what about Duplex streams?


A duplex stream is two streams embedded within one. Data comes out of one and you write totally different data into the other one. It's used where there's 2-way communication with another entity (such as a TCP socket). In this example, we don't need a Duplex stream because data is only flowing in one direction:


file -> MyProtocolStream -> socket

file - > MyProtocolStream - > socket

Learning more


As Meir pointed out in the other answer, Substack's stream handbook is the canonical (and best) resource for streams along with the official docs. If you read them thoroughly and implement the examples yourself, you'll learn all you need to know about streams.


Sending multiple files over a single socket in series


If you're wanting to write the output of multiple of these Transforms streams into a single writable end, pipe() isn't going to work for you. Once the EOF comes from a single stream, the upstream writable (socket) is also going to be closed. Also there's no guarantee about ordering of data events in this case. So you need to manually aggregate the streams by listening to data/end events, starting to read one stream after another has finished:


var server = Net.createServer(function (socket) {

    var incoming1 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var incoming2 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var readStream = function (stream, callback) {

        stream.on('data', socket.write.bind(socket));
        stream.on('end', callback);

    readStream(incoming1, function () {

        readStream(incoming2, function () {


If you're nested callback-averse, you could also use promises:


var server = Net.createServer(function (socket) {

    var incoming1 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var incoming2 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var incoming3 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var readStream = function (stream) {
        return new Promise(function (resolve, reject) {
            stream.on('data', socket.write.bind(socket));
            stream.on('end', resolve);

    .then(function () {
        return readStream(incoming2);
    .then(function () {
        return readStream(incoming3);
    .then(function () {



This is a great resource on streams.


As for socket usage, when you send/get socket message, you send the type and a payload. So, your general socket stream could be to send message of type 'file_data' with the content while you have data to send and at the end send a message of type 'eof' (for end of file) with an empty payload.




I'm not sure on the exact details of the protocol that you're trying to implement but the following should give you a good pattern that you can adapt to your needs.


My fictional protocol


When a client socket connects to my TCP server, I want to send them a file. But first I want to send a header. At the end of the file, before the stream ends, I also want to send a header. So the data written to the socket looks like:


==== HEADER ====
==== FOOTER ====

Implementing a Transform stream


All I want to do is transform the data coming out of a readable stream. Notice transform is the keyword here. I can use a Transform stream for this.


When creating a Transform stream, you can override two methods: _transform and _flush. _transform is called with each chunk coming of the readable stream. You can change the data, buffer it up or whatever. _flush is called right after all the data from the readable has finished. You can do anymore cleanup here, or write a last bit of data out.


var Stream = require('stream');
var Util = require('util');

var MyProtocolStream = function () {

    this.writtenHeader = false;            // have we written the header yet?

Util.inherits(MyProtocolStream, Stream.Transform);

MyProtocolStream.prototype._transform = function (chunk, encoding, callback) {

    if (!this.writtenHeader) {
        this.push('==== HEADER ====\n');  // if we've not, send the header first
        this.writtenHeader = true;
    this.push(chunk);                     // send the incoming file chunks along as-is

MyProtocolStream.prototype._flush = function (callback) {

    this.push('==== FOOTER ====\n');      // Just before the stream closes, send footer 

Using the MyProtocolStream


So now I have a stream that does what I want, I can simply pipe a file (or any readable stream for that matter) through my custom Transform stream and into any other Writable stream (such as a socket).


var Fs = require('fs');
var Net = require('net');

var server = Net.createServer(function (socket) {

        .pipe(new MyProtocolStream())


Testing it out


I can test this out by adding some contents to example.txt:


This is a line
This is another line
This is the last line

I can spin up my server and then connect with telnet/nc:

我可以启动我的服务器,然后用telnet / nc连接:

$ telnet 8000
Connected to localhost.
Escape character is '^]'.
==== HEADER ====
This is a line
This is another line
This is the last line
==== FOOTER ====
Connection closed by foreign host.

So what about Duplex streams?


A duplex stream is two streams embedded within one. Data comes out of one and you write totally different data into the other one. It's used where there's 2-way communication with another entity (such as a TCP socket). In this example, we don't need a Duplex stream because data is only flowing in one direction:


file -> MyProtocolStream -> socket

file - > MyProtocolStream - > socket

Learning more


As Meir pointed out in the other answer, Substack's stream handbook is the canonical (and best) resource for streams along with the official docs. If you read them thoroughly and implement the examples yourself, you'll learn all you need to know about streams.


Sending multiple files over a single socket in series


If you're wanting to write the output of multiple of these Transforms streams into a single writable end, pipe() isn't going to work for you. Once the EOF comes from a single stream, the upstream writable (socket) is also going to be closed. Also there's no guarantee about ordering of data events in this case. So you need to manually aggregate the streams by listening to data/end events, starting to read one stream after another has finished:


var server = Net.createServer(function (socket) {

    var incoming1 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var incoming2 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var readStream = function (stream, callback) {

        stream.on('data', socket.write.bind(socket));
        stream.on('end', callback);

    readStream(incoming1, function () {

        readStream(incoming2, function () {


If you're nested callback-averse, you could also use promises:


var server = Net.createServer(function (socket) {

    var incoming1 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var incoming2 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var incoming3 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var readStream = function (stream) {
        return new Promise(function (resolve, reject) {
            stream.on('data', socket.write.bind(socket));
            stream.on('end', resolve);

    .then(function () {
        return readStream(incoming2);
    .then(function () {
        return readStream(incoming3);
    .then(function () {



This is a great resource on streams.


As for socket usage, when you send/get socket message, you send the type and a payload. So, your general socket stream could be to send message of type 'file_data' with the content while you have data to send and at the end send a message of type 'eof' (for end of file) with an empty payload.
