spark 集成elasticsearch

时间:2025-04-26 08:04:43

pyspark读写elasticsearch依赖elasticsearch-hadoop包,需要首先在这里下载,版本号可以通过自行修改url解决。

"""
write data to elastic search
https://starsift.com/2018/01/18/integrating-pyspark-and-elasticsearch/
"""
from __future__ import print_function
import os
import json from pyspark import SparkContext
from pyspark import SparkConf os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
# pay attention here, jars could be added at here
os.environ['PYSPARK_SUBMIT_ARGS'] = \
'--jars /home/buxizhizhoum/2-Learning/pyspark_tutorial/jars/elasticsearch-hadoop-6.4.2/dist/elasticsearch-spark-20_2.11-6.4.2.jar ' \
'--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.1 ' \
'pyspark-shell' conf = SparkConf().setAppName("write_es").setMaster("local[2]")
sc = SparkContext(conf=conf) # config refer: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
es_write_conf = {
# specify the node that we are sending data to (this should be the master)
"es.nodes": 'localhost',
# specify the port in case it is not the default port
"es.port": '',
# specify a resource in the form 'index/doc-type'
"es.resource": 'testindex/testdoc',
# is the input JSON?
"es.input.json": "yes",
# is there a field in the mapping that should be used to specify the ES document ID
"es.mapping.id": "doc_id"
} if __name__ == "__main__":
data = [
{'': '', 'doc_id': 1},
{'': '', 'doc_id': 2},
{'': '', 'doc_id': 3},
{'': '', 'doc_id': 4},
{'': '', 'doc_id': 5},
{'': '', 'doc_id': 6},
{'': '', 'doc_id': 7},
{'': '', 'doc_id': 8},
{'': '', 'doc_id': 9},
{'': '', 'doc_id': 10}
]
rdd = sc.parallelize(data)
rdd = rdd.map(lambda x: (x["doc_id"], json.dumps(x)))
rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
# critically, we must specify our `es_write_conf`
conf=es_write_conf)

更多代码见:https://github.com/buxizhizhoum/pyspark_tutorial/tree/master/spark_elasticsearch

refer: https://starsift.com/2018/01/18/integrating-pyspark-and-elasticsearch/