Python操作ElasticSearch

时间:2022-08-27 11:20:32

Python批量向ElasticSearch插入数据

Python 2的多进程不能序列化类方法, 所以改为函数的形式.

直接上代码:

#!/usr/bin/python
# -*- coding:utf-8 -*- import os
import re
import json
import time
import elasticsearch from elasticsearch.helpers import bulk
from multiprocessing import Pool def write_file(doc_type, action_list):
""""""
with open("/home/{}_error.json".format(doc_type), "a") as f:
for i in action_list:
f.write(str(i)) def add_one(file_path, doc_type, index):
"""准备插入一条"""
print doc_type, index
es_client = elasticsearch.Elasticsearch(hosts=[{"host": "localhost", "port": "9200"}])
with open(file_path, "r") as f:
for line in f:
try:
line = re.sub("\n", "", line)
dict_obj = json.loads(line)
es_client.index(index=index, doc_type=doc_type, body=dict_obj)
except Exception as e:
print "出错了, 错误信息: {}".format(e) def add_bulk(doc_type, file_path, bulk_num, index):
""""""
es_client = elasticsearch.Elasticsearch(hosts=[{"host": "localhost", "port": "9200"}])
action_list = []
# 文件过大, 先插入5000万试水
total = 50000000
num = 0 with open(file_path, "r") as f: for line in f: num += 0
if num >= total:
break # 去除每一行数据中的"\n"字符, 也可以替换为"\\n"
line = line.replace("\n", "")
dict_obj = json.loads(line) # 根据bulk_num的值发送一个批量插入请求
# action = {
# "_index": index,
# "_type": doc_type,
# "_source": {
# "ip": dict_obj.get("ip", "None"),
# "data": str(dict_obj.get("data", "None"))
# }
# } # 如果动态插入,字段过长,会报错,导致插不进去, 转为字符串就可以
action = {
'_op_type': 'index',
"_index": index,
"_type": doc_type,
"_source": dict_obj
}
action_list.append(action) if len(action_list) >= bulk_num: try:
print "Start Bulk {}...".format(doc_type)
success, failed = bulk(es_client, action_list, index=index, raise_on_error=True)
print "End Bulk {}...".format(doc_type)
except Exception as e:
print "出错了, Type:{}, 错误信息:{}".format(doc_type, e[0])
write_file(doc_type, action_list)
finally:
del action_list[0:len(action_list)] # 如果不是bulk_num的等值, 那么就判断列表是否为空, 再次发送一次请求
if len(action_list) > 0:
try:
success, failed = bulk(es_client, action_list, index=index, raise_on_error=True)
except Exception as e:
print "出错了, Type:{}, 错误信息:{}".format(doc_type, e[0])
write_file(doc_type, action_list)
finally:
del action_list[0:len(action_list)] def mulit_process(path, index, bulk_num, data):
""""""
# 多进程执行
pool = Pool(10) results = []
for i in data:
doc_type = i["doc_type"]
file_path = i["file_path"]
result = pool.apply_async(add_bulk, args=(doc_type, file_path, bulk_num, index))
results.append(result) pool.close()
pool.join() def all_info(path):
data = []
for i in os.listdir(path):
file_dict = {}
if i.endswith(".json"):
doc_type = i.split("_")[0]
file_path = path + i
if doc_type == "443":
continue
file_dict["doc_type"] = doc_type
file_dict["file_path"] = file_path
data.append(file_dict) return data def es_insert(process_func=None):
""""""
# 库
index = "test"
# 文件路径
path="/home/data/" # 批量插入的数量, 如果是json整条数据插入的话, 可能会出现字段过长的问题, 导致插不进去, 适当调整bulk_num的值
bulk_num = 5000 if not path.endswith("/"):
path += "/" data = all_info(path) if process_func == "bulk":
# 插入多条, doc_type, file_path, bulk_num, index
add_bulk("80", path + "80_result.json", bulk_num, index)
elif process_func == "one":
# 插入单条file_path, doc_type, index
add_one(path + "80_result.json", "80", index)
else:
# 多进程
mulit_process(path, index, bulk_num, data) if __name__ == "__main__":
# 计算脚本执行时间
start_time = time.time()
if not os.path.exists("/home/test"):
os.makedirs("/home/test") # 插入数据
es_insert() # 计算脚本执行时间
end_time = time.time()
print end_time - start_time

Python搜索ElasticSearch

示例:

#!/usr/bin/python
# -*- coding:utf -*- import json
import elasticsearch def es_login(host="localhost", port="9200"):
"""连接es"""
return elasticsearch.Elasticsearch(hosts=[{"host": host, "port": port}]) def get(es_client, _id):
"""获取一条内容"""
# result = es_client.get(index="test", doc_type="80", id=_id)
result = es_client.get(index="test", id=_id)
return json.dumps(result) def search(es_client, query, field="_all"):
"""聚合搜索内容""" result = es_client.search(index="test", body={
"query": {
"bool": {
"must": [
{
"query_string": {
# 指定字段
"default_field": field,
# 查询字段
"query": query
}
},
{
"match_all": {}
}
],
"must_not": [],
"should": []
}
},
"from": 0,
"size": 10,
"sort": [],
# 聚合
"aggs": {
# "all_interests":{
# "terms":{
# "field":"interests"
# }
# }
}
}) return json.dumps(result) def main():
"""入口""" # 连接es
es_client = es_login() # result = search(es_client, query="123.125.115.110", field="_all") result = get(es_client, "AWTv-ROzCxZ1gYRliWhu") print result if __name__ == "__main__":
main()

删除ElasticSearch全部数据

curl -X DELETE localhost:9200/test, test为自己的index名称

Python操作ElasticSearch的更多相关文章

  1. Python 操作 ElasticSearch

    Python 操作 ElasticSearch 学习了:https://www.cnblogs.com/shaosks/p/7592229.html 官网:https://elasticsearch- ...

  2. python操作elasticsearch增、删、改、查

    最近接触了个新东西--es数据库 这东西虽然被用的很多,但我是前些天刚刚接触的,发现其资料不多,学起来极其痛苦,写个文章记录下 导入库from elasticsearch import Elastic ...

  3. python操作Elasticsearch (一、例子)

    E lasticsearch是一款分布式搜索引擎,支持在大数据环境中进行实时数据分析.它基于Apache Lucene文本搜索引擎,内部功能通过ReST API暴露给外部.除了通过HTTP直接访问El ...

  4. python实现elasticsearch操作-CRUD API

    python操作elasticsearch常用API 目录 目录 python操作elasticsearch常用API1.基础2.常见增删改操作创建更新删除3.查询操作查询拓展类实现es的CRUD操作 ...

  5. python使用elasticsearch模块操作elasticsearch

    1.创建索引 命令如下 from elasticsearch import Elasticsearch es = Elasticsearch([{"host":"10.8 ...

  6. java操作elasticsearch实现批量添加数据(bulk)

    java操作elasticsearch实现批量添加主要使用了bulk 代码如下: //bulk批量操作(批量添加) @Test public void test7() throws IOExcepti ...

  7. 利用NEST2.0 在C#中操作Elasticsearch

    前言:本文主要演示了如何通过c#来操作elasticsearch,分两个方面来演示: 索引数据 搜索数据 Note: 注意我索引数据和搜索数据是两个不同的例子,没有前后依赖关系 准备工作:需要在vis ...

  8. Python 和 Elasticsearch 构建简易搜索

    Python 和 Elasticsearch 构建简易搜索 作者:白宁超 2019年5月24日17:22:41 导读:件开发最大的麻烦事之一就是环境配置,操作系统设置,各种库和组件的安装.只有它们都正 ...

  9. 笔记13:Python 和 Elasticsearch 构建简易搜索

    Python 和 Elasticsearch 构建简易搜索 1 ES基本介绍 概念介绍 Elasticsearch是一个基于Lucene库的搜索引擎.它提供了一个分布式.支持多租户的全文搜索引擎,它可 ...

随机推荐

  1. 使用Alcatraz来管理Xcode插件(转)

    转自唐巧的博客:http://blog.devtang.com/blog/2014/03/05/use-alcatraz-to-manage-xcode-plugins/ 简介 Alcatraz是一个 ...

  2. VMware Workstation(虚拟机)v10.0.1 简体中文破解版

    http://www.xp510.com/xiazai/ossoft/desktools/22610.html

  3. 好玩的Prim算法

    这段时间学算法,用JS实现了一个Prim,用于在连通图中找出最小树,具体内容.代码解析周末会不上,现在先把源码献上: <!DOCTYPE html> <html charset='G ...

  4. 使用命令行设置svn忽略列表

    Windows 上的 TortoiseSVN 设置 svn 的忽略列表是非常方便的,但是在Mac OS X上,好用的图形化 svn 客户端都有点儿贵,比如 Versions 和 CornerStone ...

  5. java&period;imageIo给图片添加水印

    最近项目在做一个商城项目, 项目上的图片要添加水印①,添加图片水印;②:添加文字水印; 一下提供下个方法,希望大家可以用得着: package com.blogs.image; import java ...

  6. sitemesh网页布局

    看项目时发现对应页面下找不到侧栏部分代码,仔细观察后发现页面引入了sitemesh标签,查了下资料原来是页面用了sitemesh框架解!耦!了! 以前多个模块包含相同模块时总是include jsp文 ...

  7. &lbrack;ExtJS5学习笔记&rsqb;第十三节 Extjs5的Ext&period;each方法学习

    本文地址:http://blog.csdn.net/sushengmiyan/article/details/39009555 sencha官方API:http://docs.sencha.com/e ...

  8. 微信二维码支付-模式一&lpar;PC端&comma;解决中文乱码问题&rpar;

    近期公司调完银联,调支付宝,调完支付宝调微信.说实话微信的帮助文档确实是烂,而且有没有技术支持,害的我头发都掉了一桌.不说废话了,看代码. 首先登陆微信的公众平台(微信的服务号不是订阅号),然后选择微 ...

  9. C&sol;C&plus;&plus;基础----关联容器

    基本属性 与顺序容器的差别,按照关键字来保存和访问,而顺序容器是按照容器中的位置来顺序保存和访问. map:每个元素是一对键值(key-valye)组合:set每个元素只包含关键字.. 每个根据关键字 ...

  10. 基于线程池的多并发Socket程序的实现

    Socket“服务器-客户端”模型的多线程并发实现效果的大体思路是:首先,在Server端建立“链接循环”,每一个链接都开启一个“线程”,使得每一个Client端都能通过已经建立好的线程来同时与Ser ...