Learning storm book 笔记8-Log Processing With Storm

时间:2021-02-07 13:08:31

有代码的书籍看起来就是爽,看完顺便跑个demo,感觉很爽!

场景分析

主要是利用apache的访问日志来进行分析统计

如用户的IP来源,来自哪个国家或地区,用户使用的Os,浏览器等信息,以及像搜索的热词等信息的统计

这里日志信息如下

24.25.135.19 - - [1-01-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"

这里为了后面的keyword关键词不为空,增加了name=qq

180.183.50.208 - - [1-01-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html?name=qq" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"

这里主要涉及到这几个属性

ip ,ip来源 180.183.50.208

dateTime 访问时间 2011:06:20:31

request 请求类型 GET / HTTP/1.1

response 相应状态200

bytesSent 864

referrer http://www.adeveloper.com/resource.html

useragent Mozilla/5.0xxxxx.....

country

browser Firefox

os Windows

keyword qq

主要流程

文中使用file->kafka-->storm-->mysql的模式

这里不让kafka直接读文件了,改了下从flume获取数据,还是使用典型的:

flume—>kafka-->storm-->mysql 这一套来做

环境要求zookeeper,kafka,mysql,flume,storm,其实跑demo本地模式完全不需要storm的

apache-flume-1.4.0-bin.tar.gz
kafka_2.8.0-0.8.0.tar.gz
zookeeper-3.4.5-cdh4.4.0.tar.gz
Storm

首先是建立数据库

create table apachelog(
id INT NOT NULL AUTO_INCREMENT,
ip VARCHAR(100) NOT NULL,
dateTime VARCHAR(200) NOT NULL,
request VARCHAR(100) NOT NULL,
response VARCHAR(200) NOT NULL,
bytesSent VARCHAR(200) NOT NULL,
referrer VARCHAR(500) NOT NULL,
useragent VARCHAR(500) NOT NULL,
country VARCHAR(200) NOT NULL,
browser VARCHAR(200) NOT NULL,
os VARCHAR(200) NOT NULL,
keyword VARCHAR(200) NOT NULL,
PRIMARY KEY (id)
);

然后,flume kafka producer的配置

cat conf/producer2.properties

#agent section
producer.sources = s
producer.channels = c
producer.sinks = r #source section
producer.sources.s.type = exec
producer.sources.s.command = tail -F /data/apache.log
producer.sources.s.channels = c # Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=127.0.0.1:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=kafkaToptic #Specify the channel the sink should use
producer.sinks.r.channel = c # Each channel's type is defined.
producer.channels.c.type = memory
producer.channels.c.capacity = 1000

这里数据准备可以参考kafka-producer项目下的apache_test.log,这里为了给keyword创造点数据,特意加上了?name=qq

echo '202.27.9.1 - - [2-01-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html?name=qq" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"'>>/data/apache.log

数据准备ok

启动zookeeper,启动kafka,

cd kafka_2.8.0-0.8.0
bin/kafka-server-start.sh config/server.properties

启动flume

bin/flume-ng agent --conf conf  --conf-file conf/producer2.properties  --name producer -Dflume.root.logger=INFO,console

启动flume之后可以自己造点数据了

echo '202.27.9.1 - - [2-01-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html?name=qq" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"'>>/data/apache.log

到此数据准备完成,下面是Topology

主类:LogProcessingTopology

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder; public class LogProcessingTopology {
public static void main(String[] args) throws Exception { ZkHosts zkHosts = new ZkHosts("192.168.137.10:2181");
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "kafkaToptic", "","id"); kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//每次都从头开始,额呵呵!
kafkaConfig.forceFromStart = true;
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1); builder.setBolt("LogSplitter", new ApacheLogSplitterBolt(), 1)
.globalGrouping("KafkaSpout");
builder.setBolt("IpToCountry",new UserInformationGetterBolt(args[0]), 1)
.globalGrouping("LogSplitter");
builder.setBolt("Keyword", new KeyWordIdentifierBolt(), 1)
.globalGrouping("IpToCountry");
builder.setBolt("PersistenceBolt",new PersistenceBolt(args[1], args[2], args[3], args[4]),
1).globalGrouping("Keyword");
if (args.length == 6) {
// Run the topology on remote cluster.
Config conf = new Config();
conf.setNumWorkers(4);
try {
StormSubmitter.submitTopology(args[4], conf,
builder.createTopology());
} catch (AlreadyAliveException alreadyAliveException) {
System.out.println(alreadyAliveException);
} catch (InvalidTopologyException invalidTopologyException) {
System.out.println(invalidTopologyException);
}
} else {
// in local mode.
LocalCluster cluster = new LocalCluster();
Config conf = new Config();
cluster.submitTopology("KafkaToplogy", conf,builder.createTopology());
try {
System.out.println("**********************Waiting to consume from kafka");
Thread.sleep(10000);
} catch (Exception exception) {
System.out.println("******************Thread interrupted exception : "+ exception);
}
cluster.killTopology("KafkaToplogy"); cluster.shutdown(); } }
}

这里有KafkaSpout,以及ApacheLogSplitterBolt,UserInformationGetterBolt,KeyWordI

dentifierBolt,PersistenceBolt,开着类名其实就知道是干啥的了,KafkaSpout必然是从

kafka获取数据了,ApacheLogSplitterBolt用来split日志,UserInformationGetterBolt是

用户信息相关,KeyWordIdentifierBolt关键词,热词,PersistenceBolt最后一个自然是讲

数据写入mysql

在来看看各个类的实现

ApacheLogSplitterBolt就是通过表达式来从日志中匹配我们要得东西,主要还是看ApacheLogSplitter

import java.util.ArrayList;
import java.util.List;
import java.util.Map; import org.apache.commons.lang.StringUtils; import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple; public class ApacheLogSplitterBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L;
private static final ApacheLogSplitter apacheLogSplitter = new ApacheLogSplitter();
private static final List<String> LOG_ELEMENTS = new ArrayList<String>();
static {
LOG_ELEMENTS.add("ip");
LOG_ELEMENTS.add("dateTime");
LOG_ELEMENTS.add("request");
LOG_ELEMENTS.add("response");
LOG_ELEMENTS.add("bytesSent");
LOG_ELEMENTS.add("referrer");
LOG_ELEMENTS.add("useragent");
} public void execute(Tuple input, BasicOutputCollector collector) {
String log = input.getString(0); if (StringUtils.isBlank(log)||log.equals("xxxx")) {
return;
}
Map<String, Object> logMap = apacheLogSplitter.logSplitter(log);
List<Object> logdata = new ArrayList<Object>();
for (String element : LOG_ELEMENTS) {
logdata.add(logMap.get(element));
}
collector.emit(logdata); } public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("ip", "dateTime", "request", "response","bytesSent", "referrer", "useragent"));
}
}

ApacheLogSplitter类,负责日志split。

public class ApacheLogSplitter {

	public Map<String, Object> logSplitter(String apacheLog) {

		String logEntryLine = apacheLog;
String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w-:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\""; Pattern p = Pattern.compile(logEntryPattern);
Matcher matcher = p.matcher(logEntryLine);
Map<String, Object> logMap = new HashMap<String, Object>();
if (!matcher.matches() || 9 != matcher.groupCount()) {
System.err.println("Bad log entry (or problem with RE?):");
System.err.println(logEntryLine);
return logMap;
}
// set the ip, dateTime, request, etc into map.
logMap.put("ip", matcher.group(1));
logMap.put("dateTime", matcher.group(4));
logMap.put("request", matcher.group(5));
logMap.put("response", matcher.group(6));
logMap.put("bytesSent", matcher.group(7));
logMap.put("referrer", matcher.group(8));
System.out.println("#######"+matcher.group(8));
logMap.put("useragent", matcher.group(9));
return logMap;
}
}

UserInformationGetterBolt这个做的事有点多,主要是从ip到country的,以及os,浏览器的的定位

package com.learningstorm.stormlogprocessing;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values; /**
* This class use the IpToCountryConverter and UserAgentTools class to calculate
* the country, os and browser from log line.
*
*/
public class UserInformationGetterBolt extends BaseRichBolt { private static final long serialVersionUID = 1L;
private IpToCountryConverter ipToCountryConverter = null;
private UserAgentTools userAgentTools = null;
public OutputCollector collector;
private String pathTOGeoLiteCityFile; public UserInformationGetterBolt(String pathTOGeoLiteCityFile) {
this.pathTOGeoLiteCityFile = pathTOGeoLiteCityFile;
} public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("ip", "dateTime", "request", "response",
"bytesSent", "referrer", "useragent", "country", "browser",
"os"));
} public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.ipToCountryConverter = new IpToCountryConverter(this.pathTOGeoLiteCityFile);
this.userAgentTools = new UserAgentTools(); } public void execute(Tuple input) { String ip = input.getStringByField("ip").toString(); // calculate the country from ip
Object country = ipToCountryConverter.ipToCountry(ip);
// calculate the browser from useragent.
Object browser = userAgentTools.getBrowser(input.getStringByField(
"useragent").toString())[1];
// calculate the os from useragent.
Object os = userAgentTools.getOS(input.getStringByField("useragent")
.toString())[1];
collector.emit(new Values(input.getString(0), input.getString(1), input
.getString(2), input.getString(3), input.getString(4), input
.getString(5), input.getString(6), country, browser, os)); }
}

ip到country使用的是GeoIP包得LookupService

package com.learningstorm.stormlogprocessing;

import com.maxmind.geoip.Location;
import com.maxmind.geoip.LookupService; /**
* This class contains logic to calculate the country name from IP address
*
*/
public class IpToCountryConverter { private static LookupService cl = null; /**
* An parameterised constructor which would take the location of
* GeoLiteCity.dat file as input.
*
* @param pathTOGeoLiteCityFile
*/
public IpToCountryConverter(String pathTOGeoLiteCityFile) {
try {
cl = new LookupService(pathTOGeoLiteCityFile,
LookupService.GEOIP_MEMORY_CACHE);
} catch (Exception e) {
throw new RuntimeException(
"Error occurred while initializing IpToCountryConverter class : "+e.getMessage());
}
} /**
* This method takes ip address an input and convert it into country name.
*
* @param ip
* @return
*/
public String ipToCountry(String ip) {
Location location = cl.getLocation(ip);
if (location == null) {
return "NA";
}
if (location.countryName == null) {
return "NA";
}
return location.countryName;
}
}

用户,浏览器,终端这种慢慢看吧,太废劲!

package com.learningstorm.stormlogprocessing;

public class UserAgentTools {

	  public  String getFirstVersionNumber(String a_userAgent, int a_position, int numDigits) {
String ver = getVersionNumber(a_userAgent, a_position);
if (ver==null) return "";
int i = 0;
String res="";
while (i<ver.length() && i<numDigits) {
res+=String.valueOf(ver.charAt(i));
i++;
}
return res;
}
public String getVersionNumber(String a_userAgent, int a_position) {
if (a_position<0) return "";
StringBuffer res = new StringBuffer();
int status = 0; while (a_position < a_userAgent.length()) {
char c = a_userAgent.charAt(a_position);
switch (status) {
case 0: //<SPAN class="codecomment"> No valid digits encountered yet</span>
if (c == ' ' || c=='/') break;
if (c == ';' || c==')') return "";
status = 1;
case 1: //<SPAN class="codecomment"> Version number in progress</span>
if (c == ';' || c=='/' || c==')' || c=='(' || c=='[') return res.toString().trim();
if (c == ' ') status = 2;
res.append(c);
break;
case 2: //<SPAN class="codecomment"> Space encountered - Might need to end the parsing</span>
if ((Character.isLetter(c) &&
Character.isLowerCase(c)) ||
Character.isDigit(c)) {
res.append(c);
status=1;
} else
return res.toString().trim();
break;
}
a_position++;
}
return res.toString().trim();
} public String[]getArray(String a, String b, String c) {
String[]res = new String[3];
res[0]=a;
res[1]=b;
res[2]=c;
return res;
} public String[] getBotName(String userAgent) {
userAgent = userAgent.toLowerCase();
int pos=0;
String res=null;
if ((pos=userAgent.indexOf("help.yahoo.com/"))>-1) {
res= "Yahoo";
pos+=7;
} else
if ((pos=userAgent.indexOf("google/"))>-1) {
res= "Google";
pos+=7;
} else
if ((pos=userAgent.indexOf("msnbot/"))>-1) {
res= "MSNBot";
pos+=7;
} else
if ((pos=userAgent.indexOf("googlebot/"))>-1) {
res= "Google";
pos+=10;
} else
if ((pos=userAgent.indexOf("webcrawler/"))>-1) {
res= "WebCrawler";
pos+=11;
} else
//<SPAN class="codecomment"> The following two bots don't have any version number in their User-Agent strings.</span>
if ((pos=userAgent.indexOf("inktomi"))>-1) {
res= "Inktomi";
pos=-1;
} else
if ((pos=userAgent.indexOf("teoma"))>-1) {
res= "Teoma";
pos=-1;
}
if (res==null) return null;
return getArray(res,res,res + getVersionNumber(userAgent,pos));
} public String[] getOS(String userAgent) {
if (getBotName(userAgent)!=null) return getArray("Bot","Bot","Bot");
String[]res = null;
int pos;
if ((pos=userAgent.indexOf("Windows-NT"))>-1) {
res = getArray("Win","WinNT","Win"+getVersionNumber(userAgent,pos+8));
} else
if (userAgent.indexOf("Windows NT")>-1) {
//<SPAN class="codecomment"> The different versions of Windows NT are decoded in the verbosity level 2</span>
//<SPAN class="codecomment"> ie: Windows NT 5.1 = Windows XP</span>
if ((pos=userAgent.indexOf("Windows NT 5.1"))>-1) {
res = getArray("Win","WinXP","Win"+getVersionNumber(userAgent,pos+7));
} else
if ((pos=userAgent.indexOf("Windows NT 6.0"))>-1) {
res = getArray("Win","Vista","Vista"+getVersionNumber(userAgent,pos+7));
} else
if ((pos=userAgent.indexOf("Windows NT 6.1"))>-1) {
res = getArray("Win","Seven","Seven "+getVersionNumber(userAgent,pos+7));
} else
if ((pos=userAgent.indexOf("Windows NT 5.0"))>-1) {
res = getArray("Win","Win2000","Win"+getVersionNumber(userAgent,pos+7));
} else
if ((pos=userAgent.indexOf("Windows NT 5.2"))>-1) {
res = getArray("Win","Win2003","Win"+getVersionNumber(userAgent,pos+7));
} else
if ((pos=userAgent.indexOf("Windows NT 4.0"))>-1) {
res = getArray("Win","WinNT4","Win"+getVersionNumber(userAgent,pos+7));
} else
if ((pos=userAgent.indexOf("Windows NT)"))>-1) {
res = getArray("Win","WinNT","WinNT");
} else
if ((pos=userAgent.indexOf("Windows NT;"))>-1) {
res = getArray("Win","WinNT","WinNT");
} else
res = getArray("Win","<B>WinNT?</B>","<B>WinNT?</B>");
} else
if (userAgent.indexOf("Win")>-1) {
if (userAgent.indexOf("Windows")>-1) {
if ((pos=userAgent.indexOf("Windows 98"))>-1) {
res = getArray("Win","Win98","Win"+getVersionNumber(userAgent,pos+7));
} else
if ((pos=userAgent.indexOf("Windows_98"))>-1) {
res = getArray("Win","Win98","Win"+getVersionNumber(userAgent,pos+8));
} else
if ((pos=userAgent.indexOf("Windows 2000"))>-1) {
res = getArray("Win","Win2000","Win"+getVersionNumber(userAgent,pos+7));
} else
if ((pos=userAgent.indexOf("Windows 95"))>-1) {
res = getArray("Win","Win95","Win"+getVersionNumber(userAgent,pos+7));
} else
if ((pos=userAgent.indexOf("Windows 9x"))>-1) {
res = getArray("Win","Win9x","Win"+getVersionNumber(userAgent,pos+7));
} else
if ((pos=userAgent.indexOf("Windows ME"))>-1) {
res = getArray("Win","WinME","Win"+getVersionNumber(userAgent,pos+7));
} else
if ((pos=userAgent.indexOf("Windows CE;"))>-1) {
res = getArray("Win","WinCE","WinCE");
} else
if ((pos=userAgent.indexOf("Windows 3.1"))>-1) {
res = getArray("Win","Win31","Win"+getVersionNumber(userAgent,pos+7));
} }
if (res == null) {
if ((pos=userAgent.indexOf("Win98"))>-1) {
res = getArray("Win","Win98","Win"+getVersionNumber(userAgent,pos+3));
} else
if ((pos=userAgent.indexOf("Win31"))>-1) {
res = getArray("Win","Win31","Win"+getVersionNumber(userAgent,pos+3));
} else
if ((pos=userAgent.indexOf("Win95"))>-1) {
res = getArray("Win","Win95","Win"+getVersionNumber(userAgent,pos+3));
} else
if ((pos=userAgent.indexOf("Win 9x"))>-1) {
res = getArray("Win","Win9x","Win"+getVersionNumber(userAgent,pos+3));
} else
if ((pos=userAgent.indexOf("WinNT4.0"))>-1) {
res = getArray("Win","WinNT4","Win"+getVersionNumber(userAgent,pos+3));
} else
if ((pos=userAgent.indexOf("WinNT"))>-1) {
res = getArray("Win","WinNT","Win"+getVersionNumber(userAgent,pos+3));
}
}
if (res == null) {
if ((pos=userAgent.indexOf("Windows"))>-1) {
res = getArray("Win","<B>Win?</B>","<B>Win?"+getVersionNumber(userAgent,pos+7)+"</B>");
} else
if ((pos=userAgent.indexOf("Win"))>-1) {
res = getArray("Win","<B>Win?</B>","<B>Win?"+getVersionNumber(userAgent,pos+3)+"</B>");
} else
res = getArray("Win","<B>Win?</B>","<B>Win?</B>");
}
} else
if ((pos=userAgent.indexOf("Mac OS X"))>-1) {
if ((userAgent.indexOf("iPhone"))>-1) {
pos = userAgent.indexOf("iPhone OS");
if ((userAgent.indexOf("iPod"))>-1) {
res = getArray("iOS","iOS-iPod","iOS-iPod "+((pos<0)?"":getVersionNumber(userAgent,pos+9)));
} else {
res = getArray("iOS","iOS-iPhone","iOS-iPhone "+((pos<0)?"":getVersionNumber(userAgent,pos+9)));
}
} else
if ((userAgent.indexOf("iPad"))>-1) {
pos = userAgent.indexOf("CPU OS");
res = getArray("iOS","iOS-iPad","iOS-iPad "+((pos<0)?"":getVersionNumber(userAgent,pos+6)));
} else
res = getArray("Mac","MacOSX","MacOS "+getVersionNumber(userAgent,pos+8));
} else
if ((pos=userAgent.indexOf("Android"))>-1) {
res = getArray("Linux","Android","Android "+getVersionNumber(userAgent,pos+8));
} else
if ((pos=userAgent.indexOf("Mac_PowerPC"))>-1) {
res = getArray("Mac","MacPPC","MacOS "+getVersionNumber(userAgent,pos+3));
} else
if ((pos=userAgent.indexOf("Macintosh"))>-1) {
if (userAgent.indexOf("PPC")>-1)
res = getArray("Mac","MacPPC","Mac PPC");
else
res = getArray("Mac?","Mac?","MacOS?");
} else
if ((pos=userAgent.indexOf("FreeBSD"))>-1) {
res = getArray("*BSD","*BSD FreeBSD","FreeBSD "+getVersionNumber(userAgent,pos+7));
} else
if ((pos=userAgent.indexOf("OpenBSD"))>-1) {
res = getArray("*BSD","*BSD OpenBSD","OpenBSD "+getVersionNumber(userAgent,pos+7));
} else
if ((pos=userAgent.indexOf("Linux"))>-1) {
String detail = "Linux "+getVersionNumber(userAgent,pos+5);
String med = "Linux";
if ((pos=userAgent.indexOf("Ubuntu/"))>-1) {
detail = "Ubuntu "+getVersionNumber(userAgent,pos+7);
med+=" Ubuntu";
}
res = getArray("Linux",med,detail);
} else
if ((pos=userAgent.indexOf("CentOS"))>-1) {
res = getArray("Linux","Linux CentOS","CentOS");
} else
if ((pos=userAgent.indexOf("NetBSD"))>-1) {
res = getArray("*BSD","*BSD NetBSD","NetBSD "+getVersionNumber(userAgent,pos+6));
} else
if ((pos=userAgent.indexOf("Unix"))>-1) {
res = getArray("Linux","Linux","Linux "+getVersionNumber(userAgent,pos+4));
} else
if ((pos=userAgent.indexOf("SunOS"))>-1) {
res = getArray("Unix","SunOS","SunOS"+getVersionNumber(userAgent,pos+5));
} else
if ((pos=userAgent.indexOf("IRIX"))>-1) {
res = getArray("Unix","IRIX","IRIX"+getVersionNumber(userAgent,pos+4));
} else
if ((pos=userAgent.indexOf("SonyEricsson"))>-1) {
res = getArray("SonyEricsson","SonyEricsson","SonyEricsson"+getVersionNumber(userAgent,pos+12));
} else
if ((pos=userAgent.indexOf("Nokia"))>-1) {
res = getArray("Nokia","Nokia","Nokia"+getVersionNumber(userAgent,pos+5));
} else
if ((pos=userAgent.indexOf("BlackBerry"))>-1) {
res = getArray("BlackBerry","BlackBerry","BlackBerry"+getVersionNumber(userAgent,pos+10));
} else
if ((pos=userAgent.indexOf("SymbianOS"))>-1) {
res = getArray("SymbianOS","SymbianOS","SymbianOS"+getVersionNumber(userAgent,pos+10));
} else
if ((pos=userAgent.indexOf("BeOS"))>-1) {
res = getArray("BeOS","BeOS","BeOS");
} else
if ((pos=userAgent.indexOf("Nintendo Wii"))>-1) {
res = getArray("Nintendo Wii","Nintendo Wii","Nintendo Wii"+getVersionNumber(userAgent,pos+10));
} else
if ((pos=userAgent.indexOf("J2ME/MIDP"))>-1) {
res = getArray("Java","J2ME","J2ME/MIDP");
} else
res = getArray("<b>?</b>","<b>?</b>","<b>?</b>");
return res;
} public String []getBrowser(String userAgent) {
String []botName;
if ((botName=getBotName(userAgent))!=null) return botName;
String[]res = null;
int pos;
if ((pos=userAgent.indexOf("Lotus-Notes/"))>-1) {
res = getArray("LotusNotes","LotusNotes","LotusNotes"+getVersionNumber(userAgent,pos+12));
} else
if ((pos=userAgent.indexOf("Opera"))>-1) {
String ver = getVersionNumber(userAgent,pos+5);
res = getArray("Opera","Opera"+getFirstVersionNumber(userAgent,pos+5,1),"Opera"+ver);
if ((pos=userAgent.indexOf("Opera Mini/"))>-1) {
String ver2 = getVersionNumber(userAgent,pos+11);
res = getArray("Opera","Opera Mini","Opera Mini "+ver2);
} else
if ((pos=userAgent.indexOf("Opera Mobi/"))>-1) {
String ver2 = getVersionNumber(userAgent,pos+11);
res = getArray("Opera","Opera Mobi","Opera Mobi "+ver2);
}
} else
if (userAgent.indexOf("MSIE")>-1) {
if ((pos=userAgent.indexOf("MSIE 6.0"))>-1) {
res = getArray("MSIE","MSIE6","MSIE"+getVersionNumber(userAgent,pos+4));
} else
if ((pos=userAgent.indexOf("MSIE 5.0"))>-1) {
res = getArray("MSIE","MSIE5","MSIE"+getVersionNumber(userAgent,pos+4));
} else
if ((pos=userAgent.indexOf("MSIE 5.5"))>-1) {
res = getArray("MSIE","MSIE5.5","MSIE"+getVersionNumber(userAgent,pos+4));
} else
if ((pos=userAgent.indexOf("MSIE 5."))>-1) {
res = getArray("MSIE","MSIE5.x","MSIE"+getVersionNumber(userAgent,pos+4));
} else
if ((pos=userAgent.indexOf("MSIE 4"))>-1) {
res = getArray("MSIE","MSIE4","MSIE"+getVersionNumber(userAgent,pos+4));
} else
if ((pos=userAgent.indexOf("MSIE 7"))>-1 && userAgent.indexOf("Trident/4.0")<0) {
res = getArray("MSIE","MSIE7","MSIE"+getVersionNumber(userAgent,pos+4));
} else
if ((pos=userAgent.indexOf("MSIE 8"))>-1 || userAgent.indexOf("Trident/4.0")>-1) {
res = getArray("MSIE","MSIE8","MSIE"+getVersionNumber(userAgent,pos+4));
} else
if ((pos=userAgent.indexOf("MSIE 9"))>-1 || userAgent.indexOf("Trident/4.0")>-1) {
res = getArray("MSIE","MSIE9","MSIE"+getVersionNumber(userAgent,pos+4));
} else
res = getArray("MSIE","<B>MSIE?</B>","<B>MSIE?"+getVersionNumber(userAgent,userAgent.indexOf("MSIE")+4)+"</B>");
} else
if ((pos=userAgent.indexOf("Gecko/"))>-1) {
res = getArray("Gecko","Gecko","Gecko"+getFirstVersionNumber(userAgent,pos+5,4));
if ((pos=userAgent.indexOf("Camino/"))>-1) {
res[1]+="(Camino)";
res[2]+="(Camino"+getVersionNumber(userAgent,pos+7)+")";
} else
if ((pos=userAgent.indexOf("Chimera/"))>-1) {
res[1]+="(Chimera)";
res[2]+="(Chimera"+getVersionNumber(userAgent,pos+8)+")";
} else
if ((pos=userAgent.indexOf("Firebird/"))>-1) {
res[1]+="(Firebird)";
res[2]+="(Firebird"+getVersionNumber(userAgent,pos+9)+")";
} else
if ((pos=userAgent.indexOf("Phoenix/"))>-1) {
res[1]+="(Phoenix)";
res[2]+="(Phoenix"+getVersionNumber(userAgent,pos+8)+")";
} else
if ((pos=userAgent.indexOf("Galeon/"))>-1) {
res[1]+="(Galeon)";
res[2]+="(Galeon"+getVersionNumber(userAgent,pos+7)+")";
} else
if ((pos=userAgent.indexOf("Firefox/"))>-1) {
res[1]+="(Firefox)";
res[2]+="(Firefox"+getVersionNumber(userAgent,pos+8)+")";
} else
if ((pos=userAgent.indexOf("Netscape/"))>-1) {
if ((pos=userAgent.indexOf("Netscape/6"))>-1) {
res[1]+="(NS6)";
res[2]+="(NS"+getVersionNumber(userAgent,pos+9)+")";
} else
if ((pos=userAgent.indexOf("Netscape/7"))>-1) {
res[1]+="(NS7)";
res[2]+="(NS"+getVersionNumber(userAgent,pos+9)+")";
} else
if ((pos=userAgent.indexOf("Netscape/8"))>-1) {
res[1]+="(NS8)";
res[2]+="(NS"+getVersionNumber(userAgent,pos+9)+")";
} else
if ((pos=userAgent.indexOf("Netscape/9"))>-1) {
res[1]+="(NS9)";
res[2]+="(NS"+getVersionNumber(userAgent,pos+9)+")";
} else {
res[1]+="(NS?)";
res[2]+="(NS?"+getVersionNumber(userAgent,userAgent.indexOf("Netscape/")+9)+")";
}
}
} else
if ((pos=userAgent.indexOf("Netscape/"))>-1) {
if ((pos=userAgent.indexOf("Netscape/4"))>-1) {
res = getArray("NS","NS4","NS"+getVersionNumber(userAgent,pos+9));
} else
res = getArray("NS","NS?","NS?"+getVersionNumber(userAgent,pos+9));
} else
if ((pos=userAgent.indexOf("Chrome/"))>-1) {
res = getArray("KHTML","KHTML(Chrome)","KHTML(Chrome"+getVersionNumber(userAgent,pos+6)+")");
} else
if ((pos=userAgent.indexOf("Safari/"))>-1) {
res = getArray("KHTML","KHTML(Safari)","KHTML(Safari"+getVersionNumber(userAgent,pos+6)+")");
} else
if ((pos=userAgent.indexOf("Konqueror/"))>-1) {
res = getArray("KHTML","KHTML(Konqueror)","KHTML(Konqueror"+getVersionNumber(userAgent,pos+9)+")");
} else
if ((pos=userAgent.indexOf("KHTML"))>-1) {
res = getArray("KHTML","KHTML?","KHTML?("+getVersionNumber(userAgent,pos+5)+")");
} else
if ((pos=userAgent.indexOf("NetFront"))>-1) {
res = getArray("NetFront","NetFront","NetFront "+getVersionNumber(userAgent,pos+8));
} else
if ((pos=userAgent.indexOf("BlackBerry"))>-1) {
pos=userAgent.indexOf("/",pos+2);
res = getArray("BlackBerry","BlackBerry","BlackBerry"+getVersionNumber(userAgent,pos+1));
} else
//<SPAN class="codecomment"> We will interpret Mozilla/4.x as Netscape Communicator is and only if x</span>
//<SPAN class="codecomment"> is not 0 or 5</span>
if (userAgent.indexOf("Mozilla/4.")==0 &&
userAgent.indexOf("Mozilla/4.0")<0 &&
userAgent.indexOf("Mozilla/4.5 ")<0) {
res = getArray("Communicator","Communicator","Communicator"+getVersionNumber(userAgent,pos+8));
} else
return getArray("<B>?</B>","<B>?</B>","<B>?</B>");
return res;
}
}

还有个keyword的提取

package com.learningstorm.stormlogprocessing;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values; /**
* This class use the KeywordGenerator class to generate the search keyword from
* referrer URL.
*
*/
public class KeyWordIdentifierBolt extends BaseRichBolt { private static final long serialVersionUID = 1L;
private KeywordGenerator keywordGenerator = null;
public OutputCollector collector; public KeyWordIdentifierBolt() { } public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("ip", "dateTime", "request", "response",
"bytesSent", "referrer", "useragent", "country", "browser",
"os", "keyword"));
} public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.keywordGenerator = new KeywordGenerator(); } public void execute(Tuple input) { String referrer = input.getStringByField("referrer").toString();
// call the getKeyword(String referrer) method KeywordGenerator class to
// generate the search keyword.
Object keyword = keywordGenerator.getKeyword(referrer);
// emits all the field emitted by previous bolt + keyword
collector.emit(new Values(input.getString(0), input.getString(1), input
.getString(2), input.getString(3), input.getString(4), input
.getString(5), input.getString(6), input.getString(7), input
.getString(8), input.getString(9), keyword)); }
}

这里干事的其实也是keywordGenerator,通过匹配URL的参数


import java.util.regex.Matcher;
import java.util.regex.Pattern; public class KeywordGenerator {
public String getKeyword(String referer) { String[] temp;
Pattern pat = Pattern.compile("[?&#]name=([^&]+)");
Matcher m = pat.matcher(referer);
if (m.find()) {
String searchTerm = null;
searchTerm = m.group(1);
temp = searchTerm.split("\\+");
searchTerm = temp[0];
for (int i = 1; i < temp.length; i++) {
searchTerm = searchTerm + " " + temp[i];
}
return searchTerm;
} else {
pat = Pattern.compile("[?&#]p=([^&]+)");
m = pat.matcher(referer);
if (m.find()) {
String searchTerm = null;
searchTerm = m.group(1);
temp = searchTerm.split("\\+");
searchTerm = temp[0];
for (int i = 1; i < temp.length; i++) {
searchTerm = searchTerm + " " + temp[i];
}
return searchTerm;
} else {
//
pat = Pattern.compile("[?&#]query=([^&]+)");
m = pat.matcher(referer);
if (m.find()) {
String searchTerm = null;
searchTerm = m.group(1);
temp = searchTerm.split("\\+");
searchTerm = temp[0];
for (int i = 1; i < temp.length; i++) {
searchTerm = searchTerm + " " + temp[i];
}
return searchTerm;
} else {
return "NA";
}
}
}
} }

最后是PersistenceBolt,主要是将最终的结果

public class PersistenceBolt implements IBasicBolt {

	private MySQLDump mySQLDump = null;
private static final long serialVersionUID = 1L;
private String database;
private String user;
private String ip;
private String password; public PersistenceBolt(String ip, String database, String user,
String password) {
this.ip = ip;
this.database = database;
this.user = user;
this.password = password;
} public void declareOutputFields(OutputFieldsDeclarer declarer) {
} public Map<String, Object> getComponentConfiguration() {
return null;
} public void prepare(Map stormConf, TopologyContext context) {
mySQLDump = new MySQLDump(ip, database, user, password);
} public void execute(Tuple input, BasicOutputCollector collector) {
System.out.println("Input tuple : " + input);
mySQLDump.persistRecord(input);
} public void cleanup() {
mySQLDump.close();
} }

实际的数据库操作

package com.learningstorm.stormlogprocessing;

import java.sql.Connection;
import java.sql.PreparedStatement; import backtype.storm.tuple.Tuple;
/**
* This class contains logic to persist record into MySQL database.
*
*/
public class MySQLDump {
private String database;
private String user;
private String ip;
private String password; public MySQLDump(String ip, String database, String user, String password) {
this.ip = ip;
this.database = database;
this.user = user;
this.password = password;
} private Connection connect = MySQLConnection.getMySQLConnection("192.168.137.10", "test", "root", "111111"); private PreparedStatement preparedStatement = null;
public void persistRecord(Tuple tuple) {
try { preparedStatement = connect
.prepareStatement("insert into apachelog values (default,?, ?, ?, ?, ? , ?, ?, ?,?,?,?)"); preparedStatement.setString(1, tuple.getStringByField("ip"));
preparedStatement.setString(2, tuple.getStringByField("dateTime"));
preparedStatement.setString(3, tuple.getStringByField("request"));
preparedStatement.setString(4, tuple.getStringByField("response"));
preparedStatement.setString(5, tuple.getStringByField("bytesSent"));
preparedStatement.setString(6, tuple.getStringByField("referrer"));
preparedStatement.setString(7, tuple.getStringByField("useragent"));
preparedStatement.setString(8, tuple.getStringByField("country"));
preparedStatement.setString(9, tuple.getStringByField("browser"));
preparedStatement.setString(10, tuple.getStringByField("os"));
preparedStatement.setString(11, tuple.getStringByField("keyword")); preparedStatement.executeUpdate(); } catch (Exception e) {
throw new RuntimeException(
"Error occurred while persisting records in mysql : ");
} finally {
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (Exception exception) {
System.out
.println("Error occurred while closing PreparedStatement : ");
}
}
} } public void close() {
try {
connect.close();
}catch(Exception exception) {
System.out.println("Error occurred while clossing the connection");
}
} }

数据库连接类MySQLConnection

public class MySQLConnection {

	private static Connection connect = null;

	public static Connection getMySQLConnection(String ip, String database, String user, String password) {
try {
Class.forName("com.mysql.jdbc.Driver");
String url ="jdbc:mysql://"+ip+"/"+database+"?"
+ "user="+user+"&password="+password+"";
connect = DriverManager
.getConnection(url);
return connect;
} catch (Exception e) {
throw new RuntimeException("Error occurred while get mysql connection : " +e.getMessage());
}
}

运行producer,topo之后,数据库结果

+----------------+----------+
| browser | count(*) |
+----------------+----------+
| Gecko(Firefox) | 66 |
+----------------+----------+
1 row in set (0.00 sec) mysql> select browser,count(*) from apachelog group by browser;
+----------------+----------+
| browser | count(*) |
+----------------+----------+
| Gecko(Firefox) | 66 |
+----------------+----------+
1 row in set (0.00 sec) mysql> select os,count(*) from apachelog group by os;
+-------+----------+
| os | count(*) |
+-------+----------+
| WinXP | 66 |
+-------+----------+
1 row in set (0.00 sec) mysql> select country,count(*) from apachelog group by country;
+---------+----------+
| country | count(*) |
+---------+----------+
| India | 66 |
+---------+----------+
1 row in set (0.01 sec) mysql>