数据源类型:数组列表
[{field:value}, {field:value}, {field:value}, {field:value}]
1. 定义http数据源链接
package com.etl.datalink; import java.util.Map; public class LinkHttp { private String url;
private Map<String,Object> params; public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public Map<String, Object> getParams() {
return params;
}
public void setParams(Map<String, Object> params) {
this.params = params;
} }
2. 定义hdfs链接配置
package com.etl.datalink; import org.apache.hadoop.conf.Configuration; public class LinkHdfs { private Configuration conf = new Configuration();
private String fsName="fs.defaultFS";
private String fsURI; public LinkHdfs(String fsName, String fsURI) {
this.fsName = fsName;
this.fsURI = fsURI;
conf.set(this.fsName, this.fsURI);
} public LinkHdfs(String fsURI) {
this.fsURI = fsURI;
conf.set(this.fsName, this.fsURI);
} public String getFsName() {
return fsName;
} public void setFsName(String fsName) {
this.fsName = fsName;
} public String getFsURI() {
return fsURI;
} public void setFsURI(String fsURI) {
this.fsURI = fsURI;
} public Configuration getConf() {
return conf;
} public void setConf(Configuration conf) {
this.conf = conf;
} }
3. 定义泛型类用于传送http的内容到hdfs
这里存在一点小问题:由于json是数组列表,所以需要获取每条记录,然后加入换行符号\n写入hdfs。这样在hive中查询才能获取到多个记录。否则会全部当作一条记录。
/**
* 通用的http抽取数据到hdfs文件中
* @author KingWang
* @date 2018-10-15
* @description
*/
public class Api2Hdfs{ private static Logger log = Logger.getLogger(Api2Hdfs.class); public static <T> void run(String[] args, Class<T> clazz) { //http
String url = args[0];
String method = args[1];
String startTime = args[2];
String endTime = args[3]; //hdfs
String fsName = args[4];
String fsURI = args[5];
String targetFilePath = args[6];
//http config
Map<String,Object> params = new HashMap<String,Object>(); //....省略部分参数
params.put("timestamp", System.currentTimeMillis()/1000L);
params.put("start_time", startTime);
params.put("end_time", endTime); LinkHttp http = new LinkHttp();
http.setUrl(url);
http.setParams(params); //hdfs config
LinkHdfs hdfs = new LinkHdfs(fsName, fsURI);
try {
Api2Hdfs.process(http, hdfs, targetFilePath, clazz);
} catch(Exception e) {
e.printStackTrace();
}
} private static <T> void process(LinkHttp http,LinkHdfs hdfs, String hdfsFile, Class<T> clazz) throws Exception{ if(null==http) {
log.error("请求参数http未设置");
throw new Exception("请求参数http未设置");
}
if(null==hdfs) {
log.error("请求参数hdfs未设置");
throw new Exception("请求参数hdfs未设置");
} //创建http请求
String url = http.getUrl();
Map<String,Object> params = http.getParams();
OkHttpClient client = new OkHttpClient(); //添加参数
FormBody.Builder bodyParams=new FormBody.Builder();
if(params!=null && params.size() > 0) {
Iterator<Map.Entry<String,Object>> it = params.entrySet().iterator();
while(it.hasNext()) {
Map.Entry<String, Object> entry = it.next();
bodyParams.add(entry.getKey(), entry.getValue().toString());
}
} final Request request = new Request.Builder().url(url).post(bodyParams.build()).build();
Call call = client.newCall(request);
call.enqueue(new Callback() { //网络错误延迟处理
@Override
public void onFailure(Call call, IOException e) {
e.printStackTrace();
log.error(e.getMessage());
} @Override
public void onResponse(Call call, Response response) throws IOException {
FileSystem fs = null;
try { Path dstPath = new Path(hdfsFile);
fs = FileSystem.get(hdfs.getConf());
DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
if(response.isSuccessful()) { //对后台返回的数据进行处理
System.out.println(df.format(LocalDateTime.now()) +" response.code:" +response.code());
if (200 == response.code()) { //注意:response.body().string()只能有效调用一次
ResponseInfo info = JSONObject.parseObject(response.body().string(), ResponseInfo.class); //error不为空,则错误
if(StringUtils.isNotBlank(info.getError())) {
log.error(info.getError());
} else { String rspcode = info.getResult().getRsp();
//写入hdfs
if(rspcode.equalsIgnoreCase(ResultCode.SUCCESS.getCode())) {
System.out.println(info.getResult().getData());
if(info.getResult().getData().equals("[]")) {
System.out.println(df.format(LocalDateTime.now()) + " " + info.getResult().getMsg());
} else {
List<T> objList = JSON.parseArray(info.getResult().getData(),clazz);
// byte[] bt = info.getResult().getData().getBytes();
FSDataOutputStream outputStream = fs.create(dstPath);
int size = objList.size();
for(int i=0;i<size; i++) {
String orderstr = JSON.toJSONString(objList.get(i)) + '\n';
System.out.println(orderstr);
outputStream.write(orderstr.getBytes());
if(i % 1000==0) {
outputStream.flush();
}
}
outputStream.flush();
outputStream.close();
log.info("create file " + hdfsFile + " success!");
}
} else {
log.error(info.getResult().getMsg());
}
}
}
//对后台返回200~300之间的错误进行处理
else {
log.error(response.message());
} //fs.close();
}
}catch (Exception e){
e.printStackTrace();
log.error(e.getMessage());
}finally {
fs.close();
//关闭
if(response.body()!=null) {
response.body().close();
}
}
log.info("write hdfs file end: " + hdfsFile);
}
}); } }
4. 定义bean用于解析, 由于定义了泛型,可以针对不同到接口定义不同的bean。
类似如下
5. 定义执行的每个接口主类:
public class MemberApi extends Api2Hdfs{ public static void main(String[] args) {
Api2Hdfs.run(args, Member.class);
}
}
public class OrderApi extends Api2Hdfs{ public static void main(String[] args) {
Api2Hdfs.run(args, Order.class);
}
}
6. 定义每个接口的shell脚本,执行即可。
java -Djava.ext.dirs=lib com.etl.MemberApi \
${url} ${method} ${startDate} ${endDate} ${fsName} ${fsURI} ${targetFilePath} ${salt} >> ./logs/${table}.log >& &
java -Djava.ext.dirs=lib com.etl.OrderApi \
${url} ${method} ${startDate} ${endDate} ${fsName} ${fsURI} ${targetFilePath} ${salt} >> ./logs/${table}.log >& &