要想通过ES API对es的操作,必须获取到TransportClient对象,让后根据TransportClient获取到IndicesAdminClient对象后,方可以根据IndicesAdminClient对象提供的方法对ES的index进行操作:create index,update index(update index settings,update index mapping),delete index,open index,close index。
准备工作(创建TransportClient,IndicesAdminClient)
第一步:导入ES6.4.2的依赖包:
<dependencies> <!--Spark --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>bijection-avro_2.11</artifactId> <version>0.9.5</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-avro_2.11</artifactId> <version>3.2.0</version> <type>jar</type> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.4.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.4.2</version> </dependency> </dependencies>
备注:这里依赖可能有点多,elastricsearch api操作的话就是依赖org.elasticsearch.client。
第二步:获取TransportClient,IndicesAdminClient对象:
/** * 获取ES Client API对象。 * */ public static TransportClient getClient() { Map<String, String> esOptionsMap = getSparkESCommonOptions(); return getClient(esOptionsMap); } /** * 获取ES Client API对象。 * */ public static TransportClient getClient(Map<String, String> esOptionsMap) { Settings settings = Settings.builder()// .put("cluster.name", esOptionsMap.get("cluster.name")) // .put("client.transport.sniff", true)// .build(); PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(settings); TransportClient client = preBuiltTransportClient; // 192.168.1.120,192.168.1.121,192.168.1.122,192.168.1.123 String esNodeStr = esOptionsMap.get("es.nodes"); String[] esNodeArr = esNodeStr.split(","); try { for (String esNode : esNodeArr) { client.addTransportAddress(new TransportAddress(InetAddress.getByName(esNode), 9300)); } } catch (UnknownHostException e) { e.printStackTrace(); throw new RuntimeException(e); } return client; } public static IndicesAdminClient getAdminClient() { Map<String, String> esOptionsMap = getSparkESCommonOptions(); return getAdminClient(esOptionsMap); } public static IndicesAdminClient getAdminClient(Map<String, String> esOptionsMap) { TransportClient client = getClient(esOptionsMap); IndicesAdminClient adminClient = client.admin().indices(); return adminClient; }
备注:其中getSparkESCommonOptions()中配置对象包含:
cluster.name=es-application es.nodes=192.168.1.120,192.168.1.121,192.168.1.122,192.168.1.123 es.port=9200 es.index.auto.create=true pushdown=true es.nodes.wan.only=true es.mapping.date.rich=false #//设置读取es中date数据类型字段时,把它当做string来读取。 es.scroll.size=10000
ES API之Exists/Create Index:
创建index之前,需要判断index及其对应的类型是否存在,使用这个方法:
/** * 是否ES包含某个索引类型 * * @param indexName * index * @param indexType * index对应的type * */ public static boolean typeExists(String indexName, String indexType) { TypesExistsResponse typeResponse = getAdminClient().prepareTypesExists(indexName).setTypes(indexType).execute().actionGet(); if (typeResponse.isExists()) { return true; } return false; } /** * 判断ES中是否存在某个index<br> * 是否包含类型,待验证,看别人调用时是不需要带类型的。 * */ public static boolean indexExists(String... indices) { IndicesExistsRequest request = new IndicesExistsRequest(indices); IndicesExistsResponse response = getAdminClient().exists(request).actionGet(); if (response.isExists()) { return true; } return false; }
创建index,包含两种:不指定mapping和isettings只创建一个空的index;指定mapping和settings创建复杂的index。
创建一个空的index:
/** * 创建简单索引——没有指定mapping<br> * 此时数据插入时,会读取数据的数据的字段名称,自动创建mapping字段(但是,存在问题数据类型不能完好的控制,比如double类型可能会被匹配为float,date类型的格式消失) * */ public static boolean indexCreate(String indexName) { CreateIndexResponse response = getAdminClient().prepareCreate(indexName).get(); return response.isAcknowledged(); }
备注:此时数据插入时,会读取数据的数据的字段名称,自动创建mapping字段(但是,存在问题数据类型不能完好的控制,比如double类型可能会被匹配为float,date类型的格式消失)
创建复杂的index:
/** * 创建复杂索引(类型/mapping),指定索引的setting和mapping,其中mappingSource是一个json数据字符串。 * * @param indexName * 索引名 * @param indexType * 索引类型名 * @param builder * 索引mapping */ public static boolean indexCreate(String indexName, String indexType, XContentBuilder builder) { Settings settings = Settings.builder() // .put("index.mapping.ignore_malformed", true)// .put("index.refresh_interval", "60s") // .put("index.number_of_shards", 4)// .put("index.number_of_replicas", 0)// .put("index.max_result_window", 500000)// .put("index.translog.durability", "async")// .put("index.translog.sync_interval", "120s")// .put("index.translog.flush_threshold_size", "2gb")// .put("index.merge.scheduler.max_thread_count", 1)// .build(); return indexCreate(indexName, indexType, builder, settings); } /** * 创建复杂索引(类型/mapping),指定索引的setting和mapping,其中mappingSource是一个json数据字符串。 * * @param indexName * 索引名 * @param indexType * 索引类型名 * @param builder * 索引mapping * @param settings * 索引settings<br> * setting http://192.168.1.120:9200/twitter/_settings?pretty<br> * "settings":<br> * {<br> * ----"index":<br> * ----{<br> * --------"mapping":<br> * --------{<br> * ------------"ignore_malformed":"true"<br> * --------},<br> * --------"refresh_interval":"60s",<br> * --------"number_of_shards":"4",<br> * --------"translog":<br> * --------{<br> * ------------"flush_threshold_size":"2048m",<br> * ------------"sync_interval":"120s",<br> * ------------"durability":"async"<br> * --------},<br> * --------"provided_name":"indexName",<br> * --------"merge":{<br> * ------------"scheduler":<br> * ------------{<br> * ----------------"max_thread_count":"1"<br> * ------------}<br> * --------},<br> * --------"max_result_window":"500000",<br> * --------"creation_date":"1540781909323",<br> * --------"number_of_replicas":"0",<br> * --------"uuid":"5c079b5tQrGdX0fF23xtQA",<br> * --------"version":{"created":"6020499"}<br> * ----}<br> * }<br> */ public static boolean indexCreate(String indexName, String indexType, XContentBuilder builder, Settings settings) { if (indexExists(indexName)) { return false; } // CreateIndexResponse准备创建索引,增加setSetting()方法可以设置setting参数,否则将会按默认设置 CreateIndexResponse cIndexResponse = getAdminClient().prepareCreate(indexName)// .setSettings(settings)// setting .addMapping(indexType, builder)// type,mapping 这种方式也可以,经过测试。 .get(); return cIndexResponse.isAcknowledged(); }
如何根据Avro创建动态生成Mapping呢?
/** * 重建index * * @throws IOException * */ protected void createIndex(String indexName, String indexType) throws IOException { Tuple3<List<String>, Map<String, String>, ExpressionEncoder<Row>> src = getTargetSchema(srcSchemaKey, true); Map<String, Map<String, String>> extFields = new HashMap<String, Map<String, String>>(); Map<String, String> insertDateProperty = new HashMap<String, String>(); insertDateProperty.put("type", "date"); insertDateProperty.put("format", "yyyy-MM-dd"); extFields.put("index_date", insertDateProperty); Map<String, String> typeProperty = new HashMap<String, String>(); typeProperty.put("type", "keyword"); extFields.put("type", typeProperty); XContentBuilder mappingSource = getMapping(indexType, src._2(), extFields); if (!indexCreate(indexName, indexType, mappingSource)) { throw new RuntimeException("重新创建index" + indexName + "时,设置mapping失败!"); } } /** * * @param indexType * index类型 * @param schemaColVsTypeMap * 从*.avsc schema文件中读取出的字段,格式:colName vs colType * @param extFields * 新增扩展字段(在*.avsc schema文件中没有包含的字段)<br> * @return mapping:<br> * {<br> * ----"mrs_rsrp_d_2018.10.26":<br> * ----{<br> * --------"aliases":{},<br> * --------"mappings":<br> * --------{<br> * -----------"_doc":{<br> * -----------"properties":<br> * -----------{<br> * --------------"cgi":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},<br> * --------------"timestamp":{"type":"long"}<br> * -----------}<br> * --------},<br> * --------"settings":{}<br> * ----}<br> * }<br> * @throws 生成XContentBuilder时 * ,抛出异常。 */ public static XContentBuilder getMapping(String indexType, Map<String, String> schemaColVsTypeMap, Map<String, Map<String, String>> extFields) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder()// .startObject()// .startObject(indexType)// .startObject("_all").field("enabled", false).endObject()// 是否包一个row中的所有字段作为一个大的索引字段,支持从所有列中查询 // .startObject("_source").field("enabled", false).endObject()// 不可以设为false,否则从es中查不到字段(其属性决定了那些字段存储到es,默认所有字段都存储,也可以通过include,exclude指定特定字段存储与不存储) // .startObject("_field_names").field("enabled", false).endObject()// .startObject("properties"); for (Map.Entry<String, String> kv : schemaColVsTypeMap.entrySet()) { String colName = kv.getKey(); String colType = kv.getValue(); // "insert_time":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"}, // "scan_start_time":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"}, // "scan_stop_time":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"}, if (colName.equalsIgnoreCase("scan_start_time")// || colName.equalsIgnoreCase("scan_stop_time")// || colName.equalsIgnoreCase("insert_time")) { builder.startObject(colName) // .field("type", "date")// .field("format", "yyyy-MM-dd HH:mm:ss")// 也可以 yyyy/MM/dd||yyyy/MM/dd HH:mm:ss .field("index", "true") // not_analyzed|analyzed .endObject(); } // "city_name":{"type":"text","fields":{"keyword":{"type":"keyword"}}}, // "province_name":{"type":"text","fields":{"keyword":{"type":"keyword"}}}, // "region_name":{"type":"text","fields":{"keyword":{"type":"keyword"}}}, else if (colName.equalsIgnoreCase("city_name")// || colName.equalsIgnoreCase("region_name")// || colName.equalsIgnoreCase("province_name")) { builder.startObject(colName).field("type", "keyword").endObject(); } else { if (colType.equalsIgnoreCase("long")) { builder.startObject(colName).field("type", "long").endObject(); } else if (colType.equalsIgnoreCase("string")) { builder.startObject(colName).field("type", "keyword").endObject(); } else if (colType.equalsIgnoreCase("double")) { builder.startObject(colName).field("type", "double").endObject(); } else { builder.startObject(colName).field("type", colType).endObject(); } } } // 追加扩展字段到mapping字段中 for (Map.Entry<String, Map<String, String>> kv : extFields.entrySet()) { String colName = kv.getKey(); builder.startObject(colName); for (Map.Entry<String, String> kvProperty : kv.getValue().entrySet()) { builder.field(kvProperty.getKey(), kvProperty.getValue()); } builder.endObject(); } builder.endObject();// end of properties builder.endObject();// end of indexType builder.endObject();// end of start return builder; } /** * 返回 target columns list,column vs column type map,expression encoder * */ protected Tuple3<List<String>, Map<String, String>, ExpressionEncoder<Row>> getTargetSchema(String schemaFilePath, boolean withTimestamp) { Broadcast<String> targetSchemaContent = null; try { String avroContent = getHdfsFileContent(schemaFilePath); targetSchemaContent = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()).broadcast(avroContent); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } Schema.Parser parser = new Schema.Parser(); Schema targetSchema = parser.parse(targetSchemaContent.getValue()); List<String> targetColumns = new ArrayList<String>(); Map<String, String> targetKeyTypeItems = new LinkedHashMap<String, String>(); for (Field field : targetSchema.getFields()) { targetColumns.add(field.name()); List<Schema> types = targetSchema.getField(field.name()).schema().getTypes(); String datatype = types.get(types.size() - 1).getName(); targetKeyTypeItems.put(field.name(), datatype); } ExpressionEncoder<Row> encoder = SchemaHelper.createSchemaEncoder(targetSchema, withTimestamp); return new Tuple3<List<String>, Map<String, String>, ExpressionEncoder<Row>>(targetColumns, targetKeyTypeItems, encoder); } /** * 将schema转化为Encoder */ protected static ExpressionEncoder<Row> createSchemaEncoder(Schema schema, boolean withTimestamp) { StructType type = (StructType) SchemaConverters.toSqlType(schema).dataType(); if (withTimestamp) { List<String> fields = java.util.Arrays.asList(type.fieldNames()); if (!fields.contains("timestamp")) { type = type.add("timestamp", DataTypes.TimestampType); } else { int index = type.fieldIndex("timestamp"); StructField field = type.fields()[index]; type.fields()[index] = new StructField(field.name(), DataTypes.TimestampType, field.nullable(), field.metadata()); } } ExpressionEncoder<Row> encoder = RowEncoder.apply(type); return encoder; } /** * 读取hdfs上文件内容 */ protected static String getHdfsFileContent(String filePath){ String content = ""; try { reader = getHDFSFileReader(filePath); String line=null; while ((line = reader.readLine()) != null) { if (!line.startsWith("#") && line.trim().length() > 0) { content+=line.trim(); } } reader.close(); } catch (FileNotFoundException e) { e.printStackTrace(); throw new RuntimeException("file not found exception:" + this.avroSchemaPath); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException("reading file while an error was thrown:" + this.avroSchemaPath); } finally { if (reader != null) { try { reader.close(); } catch (IOException e1) { e1.printStackTrace(); } } } return content; } protected static BufferedReader getHDFSFileReader(String hdfsFile) { try { System.out.println("hdfsfile: " + hdfsFile); Path configPath = new Path(hdfsFile); FileSystem fs = FileSystem.get(new Configuration()); if (fs.exists(configPath)) { return new BufferedReader(new InputStreamReader(fs.open(configPath))); } else { throw new FileNotFoundException("file(" + configPath + ") not found."); } } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e); } finally { } }
所有代码都在这里,具体的不加介绍了。
ES API之Update Index:
所谓的修改index,也就是修改index的settings和mapping:
/** * 修改ES索引的mapping属性 * */ public static boolean indexUpdateMapping(String indexName, String indexType, XContentBuilder builder) { org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest mapping = Requests.putMappingRequest(indexName).type(indexType) .source(builder); PutMappingResponse pMappingResource = getAdminClient().putMapping(mapping).actionGet(); return pMappingResource.isAcknowledged(); } /** * 修改ES索引的settings属性<br> * 更新索引属性(更新索引的settings属性,这是更改已经创建的属性、但有些一旦创建不能更改,需要按照自己的需求来进行选择使用) * */ public static boolean indexUpdatSettings(String indexName, Map<String, String> settingsMap) { Builder settings = Settings.builder();// for (Map.Entry<String, String> kv : settingsMap.entrySet()) { settings.put(kv.getKey(), kv.getValue()); } return indexUpdatSettings(indexName, settings); } /** * 修改ES索引的settings属性<br> * 更新索引属性(更新索引的settings属性,这是更改已经创建的属性、但有些一旦创建不能更改,需要按照自己的需求来进行选择使用) * */ public static boolean indexUpdatSettings(String indexName, Builder settings) { UpdateSettingsResponse uIndexResponse = getAdminClient().prepareUpdateSettings(indexName)// .setSettings(settings)// .execute().actionGet(); return uIndexResponse.isAcknowledged(); }
/** * 修改索引,修改索引的setting。 * * @param indexName * 索引名称<br> * 如果不需要实时精确的查询结果,可以把每个索引的index.refresh_interval设置为30s,如果在导入大量的数据,可以把这个值先设置为-1,完成数据导入之后在设置回来<br> * 如果在用bulk导入大量的数据,可以考虑不要副本,设置index.number_of_replicas: * 0。有副本存在的时候,导入数据需要同步到副本,并且副本也要完成分析,索引和段合并的操作,影响导入性能。可以不设置副本导入数据然后在恢复副本。<br> * <b>注意</b>:<br> * 有些属性一旦创建就不可以修改,比如:index.number_of_shards,修改会抛出异常。 */ public static boolean indexUpdateSettings(String indexName) { Settings settings = Settings.builder() // // .put("index.mapping.ignore_malformed", false)// .put("index.refresh_interval", "30s") // // .put("index.number_of_shards", 4)// .put("index.number_of_replicas", 1)// // .put("index.max_result_window", 500000)// // // .put("index.translog.durability", "async")// // .put("index.translog.sync_interval", "120s")// // .put("index.translog.flush_threshold_size", "2gb")// // .put("index.merge.scheduler.max_thread_count", 1)// .build(); return indexUpdatSettings(indexName, settings); }
ES API之Delete/Open/Close Index:
/** * 删除ES中某个或者多个索引 * */ public static boolean indexDelete(String... indices) { DeleteIndexResponse dIndexResponse = getAdminClient().prepareDelete(indices).execute().actionGet(); if (dIndexResponse.isAcknowledged()) { System.out.println("删除索引成功"); return true; } else { System.out.println("删除索引失败"); return false; } } /** * 关闭ES中某个或者多个索引<br> * curl -XPOST "http://127.0.0.1:9200/indexname/_close" * */ public static boolean indexClose(String... indices) { CloseIndexResponse cIndexResponse = getAdminClient().prepareClose(indices).execute().actionGet(); if (cIndexResponse.isAcknowledged()) { System.out.println("关闭索引成功"); return true; } return false; } /** * 开启ES中某个或者多个索引<br> * curl -XPOST "http://127.0.0.1:9200/indexname/_open" * */ public static boolean indexOpen(String... indices) { OpenIndexResponse oIndexResponse = getAdminClient().prepareOpen(indices).execute().actionGet(); if (oIndexResponse.isAcknowledged()) { System.out.println("开启索引成功"); return true; } return false; }