Spark2.2+ES6.4.2(三十一):Spark下生成测试数据,并在Spark环境下使用BulkProcessor将测试数据入库到ES

时间:2022-04-16 07:19:52

Spark下生成2000w测试数据(每条记录150列)

使用spark生成大量数据过程中遇到问题,如果sc.parallelize(fukeData, 64);的记录数特别大比如500w,1000w时,会特别慢,而且会抛出内存溢出over head错误。解决方案,一次生成的数据量不高于100w,多次调用,这样下来一共生成2000w耗时十几分钟。

如果环境允许你可以在本地生成测试数据,然后上传到hdfs供spark测试。

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random; public class FileGenerate {
public static void main(String[] args) throws IOException {
BufferedWriter writer = new BufferedWriter(new FileWriter("d://test.csv")); List<String> fukeData = new ArrayList<String>();
for (int i = 1; i <= 20000000; i++) {
fukeData.add(String.valueOf(i));
} List<String> fields = new ArrayList<String>(); fields.add("id");//
fields.add("object_id"); //
fields.add("scan_start_time"); //
fields.add("scan_stop_time");//
fields.add("insert_time");//
fields.add("enodeb_id");
for (int i = 0; i < 145; i++) {
fields.add("mr_tadv_" + (i < 10 ? "0" + i : i));
}
writer.write(String.join(",", fields) + "\r\n"); // 假设有1w个小区,数据一共200w条记录,那么每个小区有200条记录。
Random random = new Random();
for (String id : fukeData) {
List<String> rowItems = new ArrayList<String>();
// id
int intId = Integer.valueOf(id);
rowItems.add(id);
if (intId % 100000 == 0) {
System.out.println(intId);
writer.flush();
}
// object_id
String objectId = String.valueOf((intId % 10000) + 10000 * 256 + 1);
rowItems.add(objectId); int hour = random.nextInt(5) + 2;
int minute = random.nextInt(59) + 1;
int second_start = random.nextInt(30) + 1;
int second_stop = second_start + 15;
String scan_start_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_start;
String scan_stop_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_stop;
// scan_start_time
rowItems.add(scan_start_time);
// scan_stop_time
rowItems.add(scan_stop_time); // insert_time
rowItems.add(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); // enodeb_id
rowItems.add(String.valueOf((int) Integer.valueOf(objectId) / 256)); for (int i = 0; i < 145; i++) {
rowItems.add(String.valueOf(random.nextInt(100)));
} writer.write(String.join(",", rowItems) + "\r\n");
} writer.flush();
writer.close();
}
}

如下代码是spark2.2.0环境下生成2000w测试数据代码:

public class ESWriterTest extends Driver implements Serializable {
private static final long serialVersionUID = 1L;
private ExpressionEncoder<Row> encoder = null;
private StructType type = null;
    private String hdfdFilePath = "/user/my/streaming/test_es/*";
public ESWriterTest() {
} @Override
public void run() {
initSchema();
generateTestData(); sparkSession.stop();
} private void initSchema() {
type = DataTypes.createStructType(Arrays.asList(//
DataTypes.createStructField("id", DataTypes.StringType, true), //
DataTypes.createStructField("object_id", DataTypes.StringType, true), //
DataTypes.createStructField("scan_start_time", DataTypes.StringType, true), //
DataTypes.createStructField("scan_stop_time", DataTypes.StringType, true), //
DataTypes.createStructField("insert_time", DataTypes.StringType, true), //
DataTypes.createStructField("enodeb_id", DataTypes.StringType, true)));
for (int i = 0; i < 145; i++) {
type = type.add("mr_tadv_" + (i < 10 ? "0" + i : i), DataTypes.StringType);
}
encoder = RowEncoder.apply(type);
} private void generateTestData() {
generateData("/user/my/streaming/test_es/1/");
generateData("/user/my/streaming/test_es/2/");
generateData("/user/my/streaming/test_es/3/");
generateData("/user/my/streaming/test_es/4/");
generateData("/user/my/streaming/test_es/5/");
generateData("/user/my/streaming/test_es/6/");
generateData("/user/my/streaming/test_es/7/");
generateData("/user/my/streaming/test_es/8/");
generateData("/user/my/streaming/test_es/9/");
generateData("/user/my/streaming/test_es/10/");
generateData("/user/my/streaming/test_es/11/");
generateData("/user/my/streaming/test_es/12/");
generateData("/user/my/streaming/test_es/13/");
generateData("/user/my/streaming/test_es/14/");
generateData("/user/my/streaming/test_es/15/");
generateData("/user/my/streaming/test_es/16/");
generateData("/user/my/streaming/test_es/17/");
generateData("/user/my/streaming/test_es/18/");
generateData("/user/my/streaming/test_es/19/");
generateData("/user/my/streaming/test_es/20/"); // 支持的文件格式有:text、csv、json、parquet。
StructType structType = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("value", DataTypes.StringType, true)));
Dataset<Row> rows = sparkSession.read().format("text").schema(structType).load(hdfdFilePath);
rows.printSchema();
rows.show(10);
} private void generateData(String hdfsDataFilePath) {
List<Row> fukeData = new ArrayList<Row>();
for (int i = 1; i <= 1000000; i++) {
fukeData.add(RowFactory.create(String.valueOf(i)));
} StructType structType = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("id", DataTypes.StringType, false)));
JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD<Row> javaRDD = sc.parallelize(fukeData, 64);
Dataset<Row> fukeDataset = sparkSession.createDataFrame(javaRDD, structType); Random random = new Random();
// 假设有1w个小区,数据一共200w条记录,那么每个小区有200条记录。
Dataset<Row> rows = fukeDataset.mapPartitions(new MapPartitionsFunction<Row, Row>() {
private static final long serialVersionUID = 1L; @Override
public Iterator<Row> call(Iterator<Row> idItems) throws Exception {
List<Row> newRows = new ArrayList<Row>();
while (idItems.hasNext()) {
String id = idItems.next().getString(0);
List<Object> rowItems = new ArrayList<Object>();
// id
int intId = Integer.valueOf(id);
rowItems.add(id); // object_id
String objectId = String.valueOf((intId % 10000) + 10000 * 256 + 1);
rowItems.add(objectId); int hour = random.nextInt(5) + 2;
int minute = random.nextInt(59) + 1;
int second_start = random.nextInt(30) + 1;
int second_stop = second_start + 15;
String scan_start_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_start;
String scan_stop_time = "2018-10-29 1" + hour + ":" + minute + ":" + second_stop;
// scan_start_time
rowItems.add(scan_start_time);
// scan_stop_time
rowItems.add(scan_stop_time); // insert_time
rowItems.add(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); // enodeb_id
rowItems.add(String.valueOf((int) Integer.valueOf(objectId) / 256)); for (int i = 0; i < 145; i++) {
rowItems.add(String.valueOf(random.nextInt(100)));
} newRows.add(RowFactory.create(rowItems.toArray()));
} return newRows.iterator();
}
}, encoder); rows.toJavaRDD().repartition(20).saveAsTextFile(hdfsDataFilePath);
}
}

Spark环境下使用BulkProcessor将测试数据入库到ES6.4.2

下边是Spark2.2.0环境下,使用BulkProcessor方式插入2000w条记录到ES6.4.2下的测试代码,测试代码调测过程中发现问题:不能再ForeachPartitionFunction的call函数中调用client.close(),和bulkProcessor.close();函数,否则会抛出异常:原因这个client可能是多个executor共用。

    private ExpressionEncoder<Row> encoder = null;
private StructType type = null;
private String hdfdFilePath = "/user/my/streaming/test_es/*"; public static void main(String[] args) {
initSchema(); StructType structType = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("value", DataTypes.StringType, true)));
Dataset<Row> lines = sparkSession.read().format("text").schema(structType).load(hdfdFilePath); Dataset<Row> rows = lines.map(new MapFunction<Row, Row>() {
private static final long serialVersionUID = 1L; @Override
public Row call(Row value) throws Exception {
List<Object> itemsList = new ArrayList<Object>();
String line = value.getAs("value");
String[] fields = line.split(","); for (String filed : fields) {
itemsList.add(filed);
} return RowFactory.create(itemsList.toArray());
}
}, encoder); rows.show(10); rows.toJSON().foreachPartition(new EsForeachPartitionFunction()); sparkSession.stop();
} private void initSchema() {
type = DataTypes.createStructType(Arrays.asList(//
DataTypes.createStructField("id", DataTypes.StringType, true), //
DataTypes.createStructField("object_id", DataTypes.StringType, true), //
DataTypes.createStructField("scan_start_time", DataTypes.StringType, true), //
DataTypes.createStructField("scan_stop_time", DataTypes.StringType, true), //
DataTypes.createStructField("insert_time", DataTypes.StringType, true), //
DataTypes.createStructField("enodeb_id", DataTypes.StringType, true)));
for (int i = 0; i < 145; i++) {
type = type.add("mr_tadv_" + (i < 10 ? "0" + i : i), DataTypes.StringType);
}
encoder = RowEncoder.apply(type);
}

EsForeachPartitionFunction.java

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit; import org.apache.spark.api.java.function.ForeachPartitionFunction;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient; public class EsForeachPartitionFunction implements ForeachPartitionFunction<String> {
private static final long serialVersionUID = 1L; @Override
public void call(Iterator<String> rows) throws Exception {
TransportClient client = null;
BulkProcessor bulkProcessor = null;
try {
client = getClient();
bulkProcessor = getBulkProcessor(client);
} catch (Exception ex) {
System.out.println(ex.getMessage() + "\r\n" + ex.getStackTrace());
}
Map<String, Object> mapType = new HashMap<String, Object>(); while (rows.hasNext()) {
@SuppressWarnings("unchecked")
Map<String, Object> map = new com.google.gson.Gson().fromJson(rows.next(), mapType.getClass());
bulkProcessor.add(new IndexRequest("twitter", "tweet").source(map));
} try {
// Flush any remaining requests
bulkProcessor.flush();
System.out.println("--------------------------------bulkProcessor.flush(); over...------------------------"); } catch (Exception ex) {
System.out.println("" + ex.getMessage() + "\r\n" + ex.getStackTrace());
} try {
// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
System.out.println("--------------------------------bulkProcessor.awaitClose(10, TimeUnit.MINUTES); over...------------------------");
} catch (Exception ex) {
System.out.println("" + ex.getMessage() + "\r\n" + ex.getStackTrace());
}
} private BulkProcessor getBulkProcessor(TransportClient client) {
BulkProcessor bulkProcessor = BulkProcessor//
.builder(client, new BulkProcessor.Listener() {
@Override
public void afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2) {
// TODO Auto-generated method stub
System.out.println("结束afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2)。。。。");
} @Override
public void afterBulk(long arg0, BulkRequest arg1, Throwable arg2) {
// TODO Auto-generated method stub
System.out.println("结束afterBulk(long arg0, BulkRequest arg1, Throwable arg2)。。。。");
System.out.println(arg1.numberOfActions() + " data bulk failed,reason :" + arg2);
} @Override
public void beforeBulk(long arg0, BulkRequest arg1) {
// TODO Auto-generated method stub
System.out.println("开始。。。。");
}
}) //
.setBulkActions(10000)//
.setBulkSize(new ByteSizeValue(64, ByteSizeUnit.MB))//
.setFlushInterval(TimeValue.timeValueSeconds(5))//
.setConcurrentRequests(1)//
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//
.build();
return bulkProcessor;
} private TransportClient getClient() {
Settings settings = Settings.builder()//
.put("cluster.name", "es") //
.put("client.transport.sniff", true)//
.build(); PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(settings); TransportClient client = preBuiltTransportClient;
// 10.205.201.97,10.205.201.98,10.205.201.96,10.205.201.95
try {
client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.97"), 9300));
client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.98"), 9300));
client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.96"), 9300));
client.addTransportAddress(new TransportAddress(InetAddress.getByName("10.205.201.95"), 9300));
} catch (UnknownHostException e) {
e.printStackTrace();
throw new RuntimeException(e);
} return client;
}
}

依赖pom.xml

        <!--Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.4.2</version>
</dependency> <dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.4.2</version>
</dependency>

测试速度有点低3500条记录/s

关于ES+SPARK如何优化的文章请参考:

Elasticsearch进阶(一)写入性能基准测试写入性能优化(56小时到5小时),chunk_size探讨

ElasticSearch写入性能优化

Elasticsearch写入性能优化

elasticsearch写入优化记录,从3000到8000/s

Spark2.x写入Elasticsearch的性能测试

Spark2.2+ES6.4.2(三十一):Spark下生成测试数据,并在Spark环境下使用BulkProcessor将测试数据入库到ES的更多相关文章

  1. OpenCV探索之路(二十一)如何生成能在无opencv环境下运行的exe

    我们经常遇到这样的需求:我们在VS写好的程序,需要在一个没有装opencv甚至没有装vs的电脑下运行,跑出效果.比如,你在你的电脑用opencv+vs2015写出一个程序,然后老师叫你把程序发给他,他 ...

  2. 大数据笔记(三十一)——SparkStreaming详细介绍,开发spark程序

    Spark Streaming: Spark用于处理流式数据的模块,类似Storm 核心:DStream(离散流),就是一个RDD=================================== ...

  3. Spark Streaming性能优化&colon; 如何在生产环境下应对流数据峰值巨变

    1.为什么引入Backpressure 默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time &gt ...

  4. Bootstrap &lt&semi;基础三十一&gt&semi;插件概览

    在前面布局组件中所讨论到的组件仅仅是个开始.Bootstrap 自带 12 种 jQuery 插件,扩展了功能,可以给站点添加更多的互动.即使不是一名高级的 JavaScript 开发人员,也可以着手 ...

  5. COJ969 WZJ的数据结构(负三十一)

    WZJ的数据结构(负三十一) 难度级别:D: 运行时间限制:3000ms: 运行空间限制:262144KB: 代码长度限制:2000000B 试题描述 A国有两个主基站,供给全国的资源.定义一个主基站 ...

  6. NeHe OpenGL教程 第三十一课:加载模型

    转自[翻译]NeHe OpenGL 教程 前言 声明,此 NeHe OpenGL教程系列文章由51博客yarin翻译(2010-08-19),本博客为转载并稍加整理与修改.对NeHe的OpenGL管线 ...

  7. 三十一、Java图形化界面设计——布局管理器之GridLayout(网格布局)

    摘自http://blog.csdn.net/liujun13579/article/details/7772491 三十一.Java图形化界面设计--布局管理器之GridLayout(网格布局) 网 ...

  8. JAVA之旅(三十一)——JAVA的图形化界面,GUI布局,Frame,GUI事件监听机制,Action事件,鼠标事件

    JAVA之旅(三十一)--JAVA的图形化界面,GUI布局,Frame,GUI事件监听机制,Action事件,鼠标事件 有段时间没有更新JAVA了,我们今天来说一下JAVA中的图形化界面,也就是GUI ...

  9. Java进阶&lpar;三十一&rpar; Web服务调用

    Java进阶(三十一) Web服务调用 前言 有朋友问了一个问题:如何调用已知的音乐服务接口,服务文档如下: https://www.evernote.com/shard/s744/sh/c37cd5 ...

随机推荐

  1. MATLAB常用字符串函数之二

    1,lower和upper lower: 将包含的全部字母转换为小写. upper: 将包含的全部字母转化为大写. 实例一: >>str='Sophia is a good girl.'; ...

  2. 单据UI代码开发

    1.构造UI项目后,打开生成的UI项目代码,在Model文件下,如初始化一些字段的值 2.订单明细行中行号设置.订单基本操作按钮提示UFIDA.U9.Base.BaseBP.Agent.dll(代理) ...

  3. 【代码笔记】iOS-判断有无网络

    一,工程图. 二,代码. RootViewController.h #import <UIKit/UIKit.h> @interface RootViewController : UIVi ...

  4. &lbrack;Ljava&period;lang&period;String和java&period;lang&period;String区别

    在做项目时报了一个got class [Ljava.lang.String的提示,当时看到[Ljava.lang.String这个时,感觉有点怪怪的,第一次遇到这种情况.最后在网上查了下才明白.是数组 ...

  5. linux 3&period;10的kdump配置的小坑

    之前在2.6系列linux内核中,当发现某个模块不要在保留内核中加载的时候,可以通过blacklist参数将其在/etc/kdump.conf中屏蔽 blacklist <list of ker ...

  6. 小甲鱼OD学习第2讲

    这次我们的任务是让我们输入任意用户名密码判断正确 我们输入fishc和111111,显示错误 我们猜测这是用GetDlgItemTextW来收集账号密码的输入值 我们找到了两个函数,给这两个函数都下断 ...

  7. HTML5&lowbar;音视频标签 &lt&semi;audio&gt&semi; 和 &lt&semi;video&gt&semi;

    HTML5_音视频标签 <audio> 和 <video> audio 和 video 都是 inline行内元素 如果浏览器支持,则不显示标签文本 IE8 不支持 audio ...

  8. 一步一步搭建vue项目

    1 安装步骤 创建一个目录,我们这里定义为Vue 在Vue目录打开dos窗体,输入如下命令:vue create myproject 选择自定义   4. 先选择要安装的项目,我们这里选择4个   5 ...

  9. 在系统重装后为什么ChemDraw用不了

    作为一款非常受欢迎的化学绘图软件ChemDraw需要在满足运行条件的电脑上运行,但是一些用户发现自己在给自己的电脑重装系统之后,ChemDraw运行不了呢.导致ChemDraw用不了的原因比较多样,不 ...

  10. Highcharts的一些属性

    <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title> ...