InActon-日志分析(KPI)

时间:2023-03-09 22:29:52
InActon-日志分析(KPI)

我参照的前辈的文章http://blog.fens.me/hadoop-mapreduce-log-kpi/

从1.x改到了2.x。虽然没什么大改。(说实话,视频没什么看的,看文章最好)


先用maven构建hadoop项目

下载maven、添加环境变量、替换eclipse默认maven配置、修改maven默认库位置... ...

InActon-日志分析(KPI)

这里没有像前辈一样用maven命令去新建一个maven项目,直接用eclipse这个方便IDE就行了

重要的pom.xml添加依赖

 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>org.admln</groupId>
<artifactId>getKPI</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging> <name>getKPI</name>
<url>http://maven.apache.org</url> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> <dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.7</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
</dependencies>
</project>

然后让maven下载jar包就行了(第一次下载很多很慢,以后就不用下载,快的很了)


然后就是MR了。

这个MR的任务就是根据日志提取一些KPI指标。

日志格式:

 222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939
"http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"

有用的变量:

  • remote_addr: 记录客户端的ip地址, 222.68.172.190
  • remote_user: 记录客户端用户名称, –
  • time_local: 记录访问时间与时区, [18/Sep/2013:06:49:57 +0000]
  • request: 记录请求的url与http协议, “GET /images/my.jpg HTTP/1.1″
  • status: 记录请求状态,成功是200, 200
  • body_bytes_sent: 记录发送给客户端文件主体内容大小, 19939
  • http_referer: 用来记录从那个页面链接访问过来的, “http://www.angularjs.cn/A00n”
  • http_user_agent: 记录客户浏览器的相关信息, “Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36″

KPI目标:

  • PV(PageView): 页面访问量统计
  • IP: 页面独立IP的访问量统计
  • Time: 用户每小时PV的统计
  • Source: 用户来源域名的统计
  • Browser: 用户的访问设备统计

具体MR:

InActon-日志分析(KPI)

KPI.java

 package org.admln.kpi;

 import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set; /**
* @author admln
*
*/
public class KPI {
private String remote_addr;// 记录客户端的ip地址
private String remote_user;// 记录客户端用户名称,忽略属性"-"
private String time_local;// 记录访问时间与时区
private String request;// 记录请求的url与http协议
private String status;// 记录请求状态;成功是200
private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
private String http_referer;// 用来记录从那个页面链接访问过来的
private String http_user_agent;// 记录客户浏览器的相关信息 private boolean valid = true;// 判断数据是否合法 public static KPI parser(String line) {
KPI kpi = new KPI();
String [] arr = line.split(" ");
if(arr.length>11) {
kpi.setRemote_addr(arr[0]);
kpi.setRemote_user(arr[1]);
kpi.setTime_local(arr[3].substring(1));
kpi.setRequest(arr[6]);
kpi.setStatus(arr[8]);
kpi.setBody_bytes_sent(arr[9]);
kpi.setHttp_referer(arr[10]); if(arr.length>12) {
kpi.setHttp_user_agent(arr[11]+" "+arr[12]);
}else {
kpi.setHttp_user_agent(arr[11]);
} if(Integer.parseInt(kpi.getStatus())>400) {
kpi.setValid(false);
} }else {
kpi.setValid(false);
} return kpi; }
public static KPI filterPVs(String line) {
KPI kpi = parser(line);
Set pages = new HashSet();
pages.add("/about");
pages.add("/black-ip-list/");
pages.add("/cassandra-clustor/");
pages.add("/finance-rhive-repurchase/");
pages.add("/hadoop-family-roadmap/");
pages.add("/hadoop-hive-intro/");
pages.add("/hadoop-zookeeper-intro/");
pages.add("/hadoop-mahout-roadmap/"); if (!pages.contains(kpi.getRequest())) {
kpi.setValid(false);
}
return kpi;
} public String getRemote_addr() {
return remote_addr;
} public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
} public String getRemote_user() {
return remote_user;
} public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
} public String getTime_local() {
return time_local;
} public Date getTime_local_Date() throws ParseException {
SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US);
return df.parse(this.time_local);
}
//为了以小时为单位统计数据
public String getTime_local_Date_Hour() throws ParseException {
SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
return df.format(this.getTime_local_Date());
} public void setTime_local(String time_local) {
this.time_local = time_local;
} public String getRequest() {
return request;
} public void setRequest(String request) {
this.request = request;
} public String getStatus() {
return status;
} public void setStatus(String status) {
this.status = status;
} public String getBody_bytes_sent() {
return body_bytes_sent;
} public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
} public String getHttp_referer() {
return http_referer;
} public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
} public String getHttp_user_agent() {
return http_user_agent;
} public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
} public boolean isValid() {
return valid;
} public void setValid(boolean valid) {
this.valid = valid;
}
}

KPIBrowser.java

 package org.admln.kpi;

 import java.io.IOException;

 import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text; /**
* @author admln
*
*/
public class KPIBrowser { public static class browserMapper extends Mapper<Object,Text,Text,IntWritable> {
Text word = new Text();
IntWritable ONE = new IntWritable(1);
@Override
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
KPI kpi = KPI.parser(value.toString());
if(kpi.isValid()) {
word.set(kpi.getHttp_user_agent());
context.write(word, ONE);
}
}
} public static class browserReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
int sum;
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
sum = 0;
for(IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
} public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Path input = new Path("hdfs://hadoop:9001/fens/kpi/input/");
Path output = new Path("hdfs://hadoop:9001/fens/kpi/browser/output"); Configuration conf = new Configuration(); @SuppressWarnings("deprecation")
Job job = new Job(conf,"get KPI Browser"); job.setJarByClass(KPIBrowser.class); job.setMapperClass(browserMapper.class);
job.setCombinerClass(browserReducer.class);
job.setReducerClass(browserReducer.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,output); System.exit(job.waitForCompletion(true)?0:1); }
}

KPIIP.java

 package org.admln.kpi;

 import java.io.IOException;
import java.util.HashSet;
import java.util.Set; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /**
* @author admln
*
*/
public class KPIIP {
//map类
public static class ipMapper extends Mapper<Object,Text,Text,Text> {
private Text word = new Text();
private Text ips = new Text(); @Override
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
KPI kpi = KPI.parser(value.toString());
if(kpi.isValid()) {
word.set(kpi.getRequest());
ips.set(kpi.getRemote_addr());
context.write(word, ips);
}
}
} //reduce类
public static class ipReducer extends Reducer<Text,Text,Text,Text> {
private Text result = new Text();
private Set<String> count = new HashSet<String>(); public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException { for (Text val : values) {
count.add(val.toString());
}
result.set(String.valueOf(count.size()));
context.write(key, result);
}
} public static void main(String[] args) throws Exception {
Path input = new Path("hdfs://hadoop:9001/fens/kpi/input/");
Path output = new Path("hdfs://hadoop:9001/fens/kpi/ip/output"); Configuration conf = new Configuration(); @SuppressWarnings("deprecation")
Job job = new Job(conf,"get KPI IP");
job.setJarByClass(KPIIP.class); job.setMapperClass(ipMapper.class);
job.setCombinerClass(ipReducer.class);
job.setReducerClass(ipReducer.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,output);
System.exit(job.waitForCompletion(true)?0:1); }
}

KPIPV.java

 package org.admln.kpi;

 import java.io.IOException;

 import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /**
* @author admln
*
*/
public class KPIPV { public static class pvMapper extends Mapper<Object,Text,Text,IntWritable> {
private Text word = new Text();
private final static IntWritable ONE = new IntWritable(1); public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
KPI kpi = KPI.filterPVs(value.toString());
if(kpi.isValid()) {
word.set(kpi.getRequest());
context.write(word, ONE);
}
}
} public static class pvReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
IntWritable result = new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key,result);
}
} public static void main(String[] args) throws Exception {
Path input = new Path("hdfs://hadoop:9001/fens/kpi/input/");
Path output = new Path("hdfs://hadoop:9001/fens/kpi/pv/output"); Configuration conf = new Configuration(); @SuppressWarnings("deprecation")
Job job = new Job(conf,"get KPI PV"); job.setJarByClass(KPIPV.class); job.setMapperClass(pvMapper.class);
job.setCombinerClass(pvReducer.class);
job.setReducerClass(pvReducer.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,output); System.exit(job.waitForCompletion(true)?0:1); } }

KPISource.java

 package org.admln.kpi;

 import java.io.IOException;

 import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /**
* @author admln
*
*/
public class KPISource { public static class sourceMapper extends Mapper<Object,Text,Text,IntWritable> {
Text word = new Text();
IntWritable ONE = new IntWritable(1);
@Override
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
KPI kpi = KPI.parser(value.toString());
if(kpi.isValid()) {
word.set(kpi.getHttp_referer());
context.write(word, ONE);
}
}
} public static class sourceReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
int sum;
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
sum = 0;
for(IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
} public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Path input = new Path("hdfs://hadoop:9001/fens/kpi/input/");
Path output = new Path("hdfs://hadoop:9001/fens/kpi/source/output"); Configuration conf = new Configuration(); @SuppressWarnings("deprecation")
Job job = new Job(conf,"get KPI Source"); job.setJarByClass(KPISource.class); job.setMapperClass(sourceMapper.class);
job.setCombinerClass(sourceReducer.class);
job.setReducerClass(sourceReducer.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,output); System.exit(job.waitForCompletion(true)?0:1);
}
}

KPITime.java

 package org.admln.kpi;

 import java.io.IOException;
import java.text.ParseException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /**
* @author admln
*
*/
public class KPITime { public static class timeMapper extends Mapper<Object,Text,Text,IntWritable> {
Text word = new Text();
IntWritable ONE = new IntWritable(1);
@Override
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
KPI kpi = KPI.parser(value.toString());
if(kpi.isValid()) {
try {
word.set(kpi.getTime_local_Date_Hour());
} catch (ParseException e) {
e.printStackTrace();
}
context.write(word, ONE);
}
}
} public static class timeReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
int sum;
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
sum = 0;
for(IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
} public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Path input = new Path("hdfs://hadoop:9001/fens/kpi/input/");
Path output = new Path("hdfs://hadoop:9001/fens/kpi/time/output"); Configuration conf = new Configuration(); @SuppressWarnings("deprecation")
Job job = new Job(conf,"get KPI Time"); job.setJarByClass(KPITime.class); job.setMapperClass(timeMapper.class);
job.setCombinerClass(timeReducer.class);
job.setReducerClass(timeReducer.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,output); System.exit(job.waitForCompletion(true)?0:1); } }

其实五个MR都差不多,都是WordCount稍作改变。(前辈好像写的有点小错误,被我发现改了)
hadoop环境是:hadoop2.2.0;JDK1.7;虚拟机伪分布式;IP 192.168.111.132。

具体效果:

InActon-日志分析(KPI)

这里前辈是把指定目录提取出来了。实际情况可以根据自己的需求提取指定页面。


具体代码和日志文件:http://pan.baidu.com/s/1qW5D63M

实验日志数据也可以从别的地方获得来练手,比如搜狗http://www.sogou.com/labs/dl/q.html

InActon-日志分析(KPI)


关于CRON。我觉得一个可行的方法是:比如我的日志是由tomcat产生的,定义tomcat产生日志是每天写在一个目录里面,目录以日志命名;然后写一个shell脚本,是执行hadoop命令把当天日期的tomcat日志目录复制到HDFS上,然后执行MR;当然HDFS上的命名也要考虑;执行完后把结果再通过shell复制到HBase、Hive、MySQL、redis等需要的地方,供应用使用。


不当之处期盼喷正。