我们可以像在Spark Streaming中一样使用NodeJS作为TCP流数据吗?

时间:2021-10-10 18:39:59

I want to create a Node JS TCP listener/ client which will continuously keep on reading/ listening data which will be written on a TCP server.

我想创建一个Node JS TCP侦听器/客户端,它将持续读取/侦听将写入TCP服务器的数据。

Use case: using IOC, I will get sensor related data on the TCP server, which I want to read continuously using Node JS.

使用案例:使用IOC,我将在TCP服务器上获取传感器相关数据,我想继续使用Node JS读取。

Is it possible to create a Node JS application which will suffice my above use case? If no please suggest how to accomplish it on a better way? If possible please suggest me how to do it.

是否可以创建一个足以满足上述用例的Node JS应用程序?如果没有,请建议如何以更好的方式完成它?如果可能的话请建议我怎么做。

Tagging spark, as currently we are thinking to use spark streaming to read TCP server in continuous fashion, but as data is not that huge I am thinking if any better way to achieve this task.

标记火花,因为目前我们正在考虑使用火花流以连续方式读取TCP服务器,但由于数据不是那么庞大,我在想是否有更好的方法来实现这一任务。

1 个解决方案

#1


1  

There are some problems with using of raw socket: you must synchronize sending data to port; check manually that socket is connected; divide incoming raw-data by package. Below simple client example with | as package separator.

使用原始套接字存在一些问题:必须同步发送数据到端口;手动检查插座是否已连接;按包划分原始数据。下面是简单的客户端示例|作为包分隔符。

// Bad code; wait of improvements
'use strict'
const net = require('net');
const EventEmitter = require('events');
const util = require('util');

function Listener (opts) {
    this.send = send; 
    let listener = this;

    let queue = [];
    let isBusy = true;
    let socket;

    connect(opts);

    function send (data) {
        queue.push(data);

        if (isBusy)
            return;

        isBusy = true;
        next();
    }

    function next() {
        if (queue.length == 0) 
            return (isBusy = false);

        if (socket) {
            socket.write(data + '|', function (err) {
                if (err)
                    return socket.emit('error', err);
                queue.shift(); 
                next();
            });
        }   
    }

    function connect (opts) {
        socket = net.connect({host: opts.host, port: opts.port});
        queue = [];
        isBusy = true;

        socket.on('connect', () => {
            isBusy = false; 
            listener.emit('connect');
        });

        let buffer = '';
        socket.on('data', function(chunk) {
            buffer += chunk;
            let msgs = buffer.split('|');
            buffer = msgs.pop();

            msgs.forEach((msg) => listener.emit('message', msg));
        }); 

        socket.on('close', () => listener.emit('disconect'));
        socket.on('error', (err) => listener.emit('error', err));       
    }   
}
util.inherits(Listener, EventEmitter);

let listener = new Listener({host: '127.0.0.1', port: 111});
listener.on('connect', () => ...);
listener.on('disconnect', () => ...);
listener.on('error', (err) => ...);
listener.on('message', (msg) => ...);
listener.send('Hello world'); // between connect and disconnect;

#1


1  

There are some problems with using of raw socket: you must synchronize sending data to port; check manually that socket is connected; divide incoming raw-data by package. Below simple client example with | as package separator.

使用原始套接字存在一些问题:必须同步发送数据到端口;手动检查插座是否已连接;按包划分原始数据。下面是简单的客户端示例|作为包分隔符。

// Bad code; wait of improvements
'use strict'
const net = require('net');
const EventEmitter = require('events');
const util = require('util');

function Listener (opts) {
    this.send = send; 
    let listener = this;

    let queue = [];
    let isBusy = true;
    let socket;

    connect(opts);

    function send (data) {
        queue.push(data);

        if (isBusy)
            return;

        isBusy = true;
        next();
    }

    function next() {
        if (queue.length == 0) 
            return (isBusy = false);

        if (socket) {
            socket.write(data + '|', function (err) {
                if (err)
                    return socket.emit('error', err);
                queue.shift(); 
                next();
            });
        }   
    }

    function connect (opts) {
        socket = net.connect({host: opts.host, port: opts.port});
        queue = [];
        isBusy = true;

        socket.on('connect', () => {
            isBusy = false; 
            listener.emit('connect');
        });

        let buffer = '';
        socket.on('data', function(chunk) {
            buffer += chunk;
            let msgs = buffer.split('|');
            buffer = msgs.pop();

            msgs.forEach((msg) => listener.emit('message', msg));
        }); 

        socket.on('close', () => listener.emit('disconect'));
        socket.on('error', (err) => listener.emit('error', err));       
    }   
}
util.inherits(Listener, EventEmitter);

let listener = new Listener({host: '127.0.0.1', port: 111});
listener.on('connect', () => ...);
listener.on('disconnect', () => ...);
listener.on('error', (err) => ...);
listener.on('message', (msg) => ...);
listener.send('Hello world'); // between connect and disconnect;