对NodeJs中的Stream实现感到困惑

时间:2022-08-24 21:02:23

I am trying to implement a protocol to send and receive files over socket. The protocol is specified and I can not change that.

我正在尝试实现一个通过套接字发送和接收文件的协议。协议已指定,我无法更改。

I am new to NodeJs, and this is how I am trying to implement that.

我是NodeJs的新手,这就是我试图实现它的方式。

I will write a duplex stream, and pipe file into it. Then pipe it into socket to send data.

我将写一个双工流,并将管道文件写入其中。然后将其传输到套接字以发送数据。

The confusion comes where I should read this, and where to write that. How to know reading file has finished, and how to tell socket that it is finished. Docs are not very clear to me, and googling added more confusion :)

混淆来自我应该阅读的地方,以及在哪里写。如何知道读取文件已经完成,以及如何告诉socket它已完成。文档对我来说不是很清楚,谷歌搜索增加了更多的混乱:)

Any help would be appreciated.

任何帮助,将不胜感激。

P.S. I will add my own samples when I get home, I just don't have it now.

附:我回家后会添加自己的样品,我现在还没有。

EDIT

编辑

After @MattHarrison's answer, I changed code into this:

在@ MattHarrison回答之后,我将代码更改为:

var stream = require('stream');
var util = require('util');
var bufferpack = require('bufferpack');
var fs = require('fs');
var net = require('net');

var MyProtocolStream = function () {

    this.writtenHeader = false;            // have we written the header yet?
    stream.Transform.call(this);
};

util.inherits(MyProtocolStream, stream.Transform);

MyProtocolStream.prototype._transform = function (chunk, encoding, callback) {

    if (!this.writtenHeader) {
        this.push('==== HEADER ====\n');  // if we've not, send the header first
    }

    // Can this function be interrupted at this very line?
    // Then another _transform comes in and pushes its own data to socket
    // Corrupted data maybe then?
    // How to prevent this behavior? Buffering whole file before sending?

    var self = this;
    // I put a random timeout to simulate overlapped calls
    // Can this happen in real world?
    setTimeout(function () {
        self.push(chunk);  // send the incoming file chunks along as-is
        callback();
    }, Math.random()*10);
};

MyProtocolStream.prototype._flush = function (callback) {

    this.push('==== FOOTER ====\n');      // Just before the stream closes, send footer
    callback();
};

var file = '/tmp/a';

var server = net.createServer(function (sck) {
    sck.addr = sck.remoteAddress;
    console.log('Client connected - ' + sck.addr);
    fs.createReadStream('/tmp/a').pipe(new MyProtocolStream()).pipe(sck);
    fs.createReadStream('/tmp/b').pipe(new MyProtocolStream()).pipe(sck);
    fs.createReadStream('/tmp/c').pipe(new MyProtocolStream()).pipe(sck);
    sck.on('close', function () {
        console.log('Client disconnected - ' + this.addr);
    })
});

server.listen(22333, function () {
    console.log('Server started on ' + 22333)
});

See my comments in _transform.

在_transform中查看我的评论。

2 个解决方案

#1


3  

I'm not sure on the exact details of the protocol that you're trying to implement but the following should give you a good pattern that you can adapt to your needs.

我不确定您要尝试实施的协议的具体细节,但以下内容应该为您提供一个可以适应您需求的良好模式。

My fictional protocol

我的虚构协议

When a client socket connects to my TCP server, I want to send them a file. But first I want to send a header. At the end of the file, before the stream ends, I also want to send a header. So the data written to the socket looks like:

当客户端套接字连接到我的TCP服务器时,我想向他们发送一个文件。但首先我要发送标题。在文件的末尾,在流结束之前,我还想发送一个标题。因此写入套接字的数据如下所示:

==== HEADER ====
[FILE CONTENTS]
==== FOOTER ====

Implementing a Transform stream

实现转换流

All I want to do is transform the data coming out of a readable stream. Notice transform is the keyword here. I can use a Transform stream for this.

我想要做的就是转换来自可读流的数据。注意转换是这里的关键字。我可以使用Transform流。

When creating a Transform stream, you can override two methods: _transform and _flush. _transform is called with each chunk coming of the readable stream. You can change the data, buffer it up or whatever. _flush is called right after all the data from the readable has finished. You can do anymore cleanup here, or write a last bit of data out.

创建Transform流时,您可以覆盖两个方法:_transform和_flush。对可读流的每个块进行调用_transform。您可以更改数据,缓冲数据或其他任何数据。在读取完所有数据后立即调用_flush。你可以在这里做更多的清理工作,或者写出最后一点数据。

var Stream = require('stream');
var Util = require('util');

var MyProtocolStream = function () {

    this.writtenHeader = false;            // have we written the header yet?
    Stream.Transform.call(this);
};

Util.inherits(MyProtocolStream, Stream.Transform);

MyProtocolStream.prototype._transform = function (chunk, encoding, callback) {

    if (!this.writtenHeader) {
        this.push('==== HEADER ====\n');  // if we've not, send the header first
        this.writtenHeader = true;
    }
    this.push(chunk);                     // send the incoming file chunks along as-is
    callback();
};

MyProtocolStream.prototype._flush = function (callback) {

    this.push('==== FOOTER ====\n');      // Just before the stream closes, send footer 
    callback();
};

Using the MyProtocolStream

使用MyProtocolStream

So now I have a stream that does what I want, I can simply pipe a file (or any readable stream for that matter) through my custom Transform stream and into any other Writable stream (such as a socket).

所以现在我有一个做我想要的流,我可以简单地通过我的自定义Transform流管道文件(或任何可读的流)到任何其他可写流(例如套接字)。

var Fs = require('fs');
var Net = require('net');

var server = Net.createServer(function (socket) {

    Fs.createReadStream('./example.txt')
        .pipe(new MyProtocolStream())
        .pipe(socket);
});

server.listen(8000);

Testing it out

测试出来

I can test this out by adding some contents to example.txt:

我可以通过向example.txt添加一些内容来测试它:

This is a line
This is another line
This is the last line

I can spin up my server and then connect with telnet/nc:

我可以启动我的服务器,然后用telnet / nc连接:

$ telnet 127.0.0.1 8000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
==== HEADER ====
This is a line
This is another line
This is the last line
==== FOOTER ====
Connection closed by foreign host.

So what about Duplex streams?

那么Duplex流呢?

A duplex stream is two streams embedded within one. Data comes out of one and you write totally different data into the other one. It's used where there's 2-way communication with another entity (such as a TCP socket). In this example, we don't need a Duplex stream because data is only flowing in one direction:

双工流是嵌入其中的两个流。数据来自一个,你将完全不同的数据写入另一个。它用于与另一个实体(例如TCP套接字)进行双向通信的地方。在此示例中,我们不需要双工流,因为数据仅在一个方向上流动:

file -> MyProtocolStream -> socket

file - > MyProtocolStream - > socket

Learning more

了解更多

As Meir pointed out in the other answer, Substack's stream handbook is the canonical (and best) resource for streams along with the official docs. If you read them thoroughly and implement the examples yourself, you'll learn all you need to know about streams.

正如Meir在另一个答案中指出的那样,Substack的流手册是流的规范(和最佳)资源以及官方文档。如果您仔细阅读并自己实施示例,您将了解有关流的所有信息。

Sending multiple files over a single socket in series

在单个套接字上串行发送多个文件

If you're wanting to write the output of multiple of these Transforms streams into a single writable end, pipe() isn't going to work for you. Once the EOF comes from a single stream, the upstream writable (socket) is also going to be closed. Also there's no guarantee about ordering of data events in this case. So you need to manually aggregate the streams by listening to data/end events, starting to read one stream after another has finished:

如果您想将多个这些Transforms流的输出写入单个可写端,则pipe()不适合您。一旦EOF来自单个流,上游可写(套接字)也将被关闭。在这种情况下,也无法保证数据事件的排序。因此,您需要通过侦听数据/结束事件手动聚合流,开始在另一个流完成后读取一个流:

var server = Net.createServer(function (socket) {

    var incoming1 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var incoming2 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var readStream = function (stream, callback) {

        stream.on('data', socket.write.bind(socket));
        stream.on('end', callback);
    };

    readStream(incoming1, function () {

        readStream(incoming2, function () {

            socket.end();
        });
    });
});

If you're nested callback-averse, you could also use promises:

如果你是嵌套的回调函数,你也可以使用promises:

var server = Net.createServer(function (socket) {

    var incoming1 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var incoming2 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var incoming3 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var readStream = function (stream) {
        return new Promise(function (resolve, reject) {
            stream.on('data', socket.write.bind(socket));
            stream.on('end', resolve);
        });
    };

    readStream(incoming1)
    .then(function () {
        return readStream(incoming2);
    })
    .then(function () {
        return readStream(incoming3);
    })
    .then(function () {
        socket.end();
    });
});

#2


0  

This is a great resource on streams.

这是关于流的很好的资源。

As for socket usage, when you send/get socket message, you send the type and a payload. So, your general socket stream could be to send message of type 'file_data' with the content while you have data to send and at the end send a message of type 'eof' (for end of file) with an empty payload.

至于套接字使用,当您发送/获取套接字消息时,您发送类型和有效负载。因此,您的常规套接字流可能是在您有数据要发送时发送带有内容的'file_data'类型的消息,最后发送一个带有空载荷的'eof'类型的消息(对于文件末尾)。

#1


3  

I'm not sure on the exact details of the protocol that you're trying to implement but the following should give you a good pattern that you can adapt to your needs.

我不确定您要尝试实施的协议的具体细节,但以下内容应该为您提供一个可以适应您需求的良好模式。

My fictional protocol

我的虚构协议

When a client socket connects to my TCP server, I want to send them a file. But first I want to send a header. At the end of the file, before the stream ends, I also want to send a header. So the data written to the socket looks like:

当客户端套接字连接到我的TCP服务器时,我想向他们发送一个文件。但首先我要发送标题。在文件的末尾,在流结束之前,我还想发送一个标题。因此写入套接字的数据如下所示:

==== HEADER ====
[FILE CONTENTS]
==== FOOTER ====

Implementing a Transform stream

实现转换流

All I want to do is transform the data coming out of a readable stream. Notice transform is the keyword here. I can use a Transform stream for this.

我想要做的就是转换来自可读流的数据。注意转换是这里的关键字。我可以使用Transform流。

When creating a Transform stream, you can override two methods: _transform and _flush. _transform is called with each chunk coming of the readable stream. You can change the data, buffer it up or whatever. _flush is called right after all the data from the readable has finished. You can do anymore cleanup here, or write a last bit of data out.

创建Transform流时,您可以覆盖两个方法:_transform和_flush。对可读流的每个块进行调用_transform。您可以更改数据,缓冲数据或其他任何数据。在读取完所有数据后立即调用_flush。你可以在这里做更多的清理工作,或者写出最后一点数据。

var Stream = require('stream');
var Util = require('util');

var MyProtocolStream = function () {

    this.writtenHeader = false;            // have we written the header yet?
    Stream.Transform.call(this);
};

Util.inherits(MyProtocolStream, Stream.Transform);

MyProtocolStream.prototype._transform = function (chunk, encoding, callback) {

    if (!this.writtenHeader) {
        this.push('==== HEADER ====\n');  // if we've not, send the header first
        this.writtenHeader = true;
    }
    this.push(chunk);                     // send the incoming file chunks along as-is
    callback();
};

MyProtocolStream.prototype._flush = function (callback) {

    this.push('==== FOOTER ====\n');      // Just before the stream closes, send footer 
    callback();
};

Using the MyProtocolStream

使用MyProtocolStream

So now I have a stream that does what I want, I can simply pipe a file (or any readable stream for that matter) through my custom Transform stream and into any other Writable stream (such as a socket).

所以现在我有一个做我想要的流,我可以简单地通过我的自定义Transform流管道文件(或任何可读的流)到任何其他可写流(例如套接字)。

var Fs = require('fs');
var Net = require('net');

var server = Net.createServer(function (socket) {

    Fs.createReadStream('./example.txt')
        .pipe(new MyProtocolStream())
        .pipe(socket);
});

server.listen(8000);

Testing it out

测试出来

I can test this out by adding some contents to example.txt:

我可以通过向example.txt添加一些内容来测试它:

This is a line
This is another line
This is the last line

I can spin up my server and then connect with telnet/nc:

我可以启动我的服务器,然后用telnet / nc连接:

$ telnet 127.0.0.1 8000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
==== HEADER ====
This is a line
This is another line
This is the last line
==== FOOTER ====
Connection closed by foreign host.

So what about Duplex streams?

那么Duplex流呢?

A duplex stream is two streams embedded within one. Data comes out of one and you write totally different data into the other one. It's used where there's 2-way communication with another entity (such as a TCP socket). In this example, we don't need a Duplex stream because data is only flowing in one direction:

双工流是嵌入其中的两个流。数据来自一个,你将完全不同的数据写入另一个。它用于与另一个实体(例如TCP套接字)进行双向通信的地方。在此示例中,我们不需要双工流,因为数据仅在一个方向上流动:

file -> MyProtocolStream -> socket

file - > MyProtocolStream - > socket

Learning more

了解更多

As Meir pointed out in the other answer, Substack's stream handbook is the canonical (and best) resource for streams along with the official docs. If you read them thoroughly and implement the examples yourself, you'll learn all you need to know about streams.

正如Meir在另一个答案中指出的那样,Substack的流手册是流的规范(和最佳)资源以及官方文档。如果您仔细阅读并自己实施示例,您将了解有关流的所有信息。

Sending multiple files over a single socket in series

在单个套接字上串行发送多个文件

If you're wanting to write the output of multiple of these Transforms streams into a single writable end, pipe() isn't going to work for you. Once the EOF comes from a single stream, the upstream writable (socket) is also going to be closed. Also there's no guarantee about ordering of data events in this case. So you need to manually aggregate the streams by listening to data/end events, starting to read one stream after another has finished:

如果您想将多个这些Transforms流的输出写入单个可写端,则pipe()不适合您。一旦EOF来自单个流,上游可写(套接字)也将被关闭。在这种情况下,也无法保证数据事件的排序。因此,您需要通过侦听数据/结束事件手动聚合流,开始在另一个流完成后读取一个流:

var server = Net.createServer(function (socket) {

    var incoming1 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var incoming2 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var readStream = function (stream, callback) {

        stream.on('data', socket.write.bind(socket));
        stream.on('end', callback);
    };

    readStream(incoming1, function () {

        readStream(incoming2, function () {

            socket.end();
        });
    });
});

If you're nested callback-averse, you could also use promises:

如果你是嵌套的回调函数,你也可以使用promises:

var server = Net.createServer(function (socket) {

    var incoming1 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var incoming2 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var incoming3 = Fs.createReadStream('./example.txt')
            .pipe(new MyProtocolStream());

    var readStream = function (stream) {
        return new Promise(function (resolve, reject) {
            stream.on('data', socket.write.bind(socket));
            stream.on('end', resolve);
        });
    };

    readStream(incoming1)
    .then(function () {
        return readStream(incoming2);
    })
    .then(function () {
        return readStream(incoming3);
    })
    .then(function () {
        socket.end();
    });
});

#2


0  

This is a great resource on streams.

这是关于流的很好的资源。

As for socket usage, when you send/get socket message, you send the type and a payload. So, your general socket stream could be to send message of type 'file_data' with the content while you have data to send and at the end send a message of type 'eof' (for end of file) with an empty payload.

至于套接字使用,当您发送/获取套接字消息时,您发送类型和有效负载。因此,您的常规套接字流可能是在您有数据要发送时发送带有内容的'file_data'类型的消息,最后发送一个带有空载荷的'eof'类型的消息(对于文件末尾)。