处理用千牛导出淘宝数据,供Logstash到Elasticsearch使用。(NodeJS)

时间:2023-03-09 18:09:41
处理用千牛导出淘宝数据,供Logstash到Elasticsearch使用。(NodeJS)
var rf=require("fs");  

// 加载编码转换模块
//npm install iconv-lite
var iconv = require('iconv-lite'); var fileName = "2017-03-01~2017-05-31"; //读取二进制
var data=rf.readFileSync(fileName+".txt","binary"); //转化GBK格式
var buf = new Buffer(data, 'binary');
var str = iconv.decode(buf, 'GBK'); var newData = handleMS(str);
var oDate = new Date();
writeFile(fileName+newGuid()+".json", newData); console.log("The END"); //解析数据
function handleMS(data){ var newData = ""; var arr = str.split('\r\n'); //获取客服名称
var callcenter = arr[0]; var customer = "";
for (var i = 7 ; i <arr.length; i++) { var item = arr[i]; if (item == "") {
continue;
}; var delimiter = '----------------------------';
if (item.indexOf(delimiter) != -1) {
customer = item.split(delimiter)[1];
continue;
}; var cc = item.split('(')[0];
var date = "";item.split('(')[1]; var message = "";
var preMessage =item.split('): ');
if (preMessage.length == 2) {
message = preMessage[1];
var date = item.split(')')[0].split('(')[1];
};
newData += JSON.stringify({who:cc,date:new Date(date),m:message, isCC:cc == callcenter ? 1 : 0})+"\r\n";
}
return newData;
} //写文件
function writeFile(file, data){
// 把中文转换成字节数组
var arr = iconv.encode(data, 'utf-8'); // appendFile,如果文件不存在,会自动创建新文件
// 如果用writeFile,那么会删除旧文件,直接写新文件
rf.writeFile(file, arr, function(err){
if(err)
console.log("fail " + err);
else
console.log("写入文件ok");
});
} function newGuid()
{
var guid = "";
for (var i = 1; i <= 32; i++){
var n = Math.floor(Math.random()*16.0).toString(16);
guid += n;
if((i==8)||(i==12)||(i==16)||(i==20))
guid += "-";
}
return guid;
}

Logstash.conf

input {
file {
path => "D:/logstash-5.2.2/testdata/*.json"
start_position => "beginning"
sincedb_path => "D:/logstash-5.2.2/bin/sincedb"
codec => json {
charset => "UTF-8"
}
}
}
filter {
json{
source => "message"
} mutate
{
remove_field => [ "message","path","@version","@timestamp","host","_id","value"]
}
}
output {
elasticsearch {
action => "index"
hosts => ["http://172.31.2.9:9200/"]
user => "admin"
password => "" index => "testtbmsdb3"
document_type => "ms"
workers => 1
}
#stdout {
#codec => rubydebug
#codec => json_lines
#}
}