Storm(3) - Calculating Term Importance with Trident

时间:2022-11-07 12:39:33

Creating a URL stream using a Twitter filter

Start by creating the project directory and standard Maven folder structure (http://maven.apache.org/guides/introduction/introduction-to-the-standard- directory-layout.html).

1. Create the POM as per the Creating a "Hello World" topology recipe in Chapter 1, Setting Up Your Development Environment, updating the <artifactId> and <name> tag values to tfidf-topology, and include the following dependencies:

2. Import the project into Eclipse after generating the Eclipse project files:

mvn eclipse:eclipse

3. Create a new spout called TwitterSpout that extends from BaseRichSpout, and add the following member-level variables:

public class TwitterSpout extends BaseRichSpout {
    LinkedBlockingQueue<Status> queue = null;
    TwitterStream twitterStream;
    String[] trackTerms;
    long maxQueueDepth;
    SpoutOutputCollector collector;
}

4. In the open method of the spout, initialize the blocking queue and create a Twitter stream listener:

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

    queue = new LinkedBlockingQueue<Status>(1000);

    StatusListener listener = new StatusListener() {
        @Override
        public void onStatus(Status status) {
            if(queue.size() < maxQueueDepth){
                 LOG.trace("TWEET Received: " + status);
                 queue.offer(status);
            }
            else {
              LOG.error("Queue is now full, the following message is dropped: "+status);
            }
        }
    };

    twitterStream = new TwitterStreamFactory().getInstance();
    twitterStream.addListener(listener);

    FilterQuery filter = new FilterQuery();
    filter.count(0);
    filter.track(trackTerms);
    twitterStream.filter(filter);
}

5. Then create the Twitter stream and filter

6. You then need to emit the tweet into the topology.

public void nextTuple() {

    Status ret = queue.poll();

    if(ret == null) {
        try {
            Thread.sleep(50);
        }
        catch (InterruptedException e) {}
    }
    else {
        collector.emit(new Values(ret));
    }
}

7. Next, you must create a bolt to publish the tuple persistently to another topology within the same cluster. Create a BaseRichBolt class called PublishURLBolt that doesn't declare any fields, and provide the following execute method:

public class PublishURLBolt extends BaseRichBolt {

    public void execute(Tuple input) {
        Status ret = (Status) input.getValue(0);
        URLEntity[] urls = ret.getURLEntities();

        for(int i = 0; i < urls.length; i++) {
              jedis.rpush("url", urls[i].getURL().trim());
        }
    }
} 

8. Finally, you will need to read the URL into a stream in the Trident topology. To do this, create another spout called TweetURLSpout:

public class TweetURLSpout {

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("url"));
    }

    @Override
    public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        host = conf.get(Conf.REDIS_HOST_KEY).toString();
        port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
        this.collector = spoutOutputCollector;

        connectToRedis();
    }

    private void connectToRedis() {
        jedis = new Jedis(host, port);
    }

    @Override
    public void nextTuple() {
        String url = jedis.rpop("url");
        if(url==null) {
            try {
                Thread.sleep(50);
            }
            catch (InterruptedException e) {}
        }
        else {
            collector.emit(new Values(url));
        }
    }
} 

Deriving a clean stream of terms from the documents

This recipe consumes the URL stream, downloading the document content and deriving a clean stream of terms that are suitable for later analysis. 

A clean term is defined as a word that:
> Is not a stop word
> Is a valid dictionary word
> Is not a number or URL
> Is a lemma

A lemma is the canonical form of a word; for example, run, runs, ran, and running are forms of the same lexeme with "run" as the lemma. Lexeme, in this context, refers to the set of all the forms that have the same meaning, and lemma refers to the particular form that is chosen by convention to represent the lexeme.

The lemma is important for this recipe because it enables us to group terms that have the same meaning. Where their frequency of occurrence is important, this grouping is important.

1. Create a class named DocumentFetchFunction, that extends from storm.trident.operation.BaseFunction, and provide the following implementation for the execute method:

public class DocumentFetchFunction extends BaseFunction {

    public void execute(TridentTuple tuple, TridentCollector collector) {
        String url = tuple.getStringByField("url");
        try {
            Parser parser = new AutoDetectParser();
            Metadata metadata = new Metadata();
            ParseContext parseContext = new ParseContext();
            URL urlObject = new URL(url);
            ContentHandler handler = new BodyContentHandler(10 * 1024 * 1024);

            parser.parse((InputStream)urlObject.getContent(), handler, metadata, parseContext);
            String[] mimeDetails = metadata.get("Content-Type").split(";");
            if ((mimeDetails.length > 0) && (mimeTypes.contains(mimeDetails[0]))) {
               collector.emit(new Values(handler.toString(), url.trim(), "twitter"));
            }
        }
        catch (Exception e) {
        }
    }
}

2. Next we need to tokenize the document, create another class that extends from BaseFunction and call it DocumentTokenizer. Provide the following execute implementation:

public class DocumentTokenizer extends BaseFunction {

    public void execute(TridentTuple tuple, TridentCollector collector) {
        String documentContents = tuple.getStringByField(TfidfTopologyFields.DOCUMENT);
        TokenStream ts = null;

        try {
            ts = new StopFilter(Version.LUCENE_30,
                  new StandardTokenizer(Version.LUCENE_30, new StringReader(documentContents)),
                  StopAnalyzer.ENGLISH_STOP_WORDS_SET);

             CharTermAttribute termAtt = ts.getAttribute(CharTermAttribute.class);
            while(ts.incrementToken()) {
                  String lemma = MorphaStemmer.stemToken(termAtt.toString());
                  lemma = lemma.trim().replaceAll("\n","").replaceAll("\r", "");
                collector.emit(new Values(lemma));
              }

              ts.close();
        }
        catch (IOException e) {
             LOG.error(e.toString());
        }
        finally {
              if(ts != null) {
                try {
                      ts.close();
                }
                catch (IOException e) {}
            }
        }
    }
}

3. We then need to filter out all the invalid terms that may be emitted by this function. To do this, we need to implement another class that extends BaseFunction called TermFilter. The execute method of this function will simply call a checking function to optionally emit the received tuple. The checking function isKeep() should perform the following validations:

public class TermFilter extends BaseFunction {

    public void execute(TridentTuple tuple, TridentCollector collector) {
        //call isKeep() method
    }

    private boolean isKeep() {
        if(stem == null) {
              return false;
          }

        if(stem.equals("")) {
              return false;
          }

        if(filterTerms.contains(stem)) {
              return false;
          }

        //we don't want integers
        try {
              Integer.parseInt(stem);
              return false;
        }
        catch(Exception e) {}

        //or floating point numbers
        try {
              Double.parseDouble(stem);
              return false;
        }
        catch(Exception e) {}

        try {
              return spellchecker.exist(stem);
        }
        catch (Exception e) {
              LOG.error(e.toString());
              return false;
        }
    }
}

4. The dictionary needs to be initialized during the prepare method for this function:

public void prepare(Map conf, TridentOperationContext context){
    super.prepare(conf, context);

    File dir = new File(System.getProperty("user.home") + "/dictionaries");
    Directory directory;

    try {
        directory = FSDirectory.open(dir);
        spellchecker = new SpellChecker(directory);
        StandardAnalyzer analyzer = new StandardAnalyzer(Version.LUCENE_36);
        IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_36, analyzer);
        URL dictionaryFile = TermFilter.class.getResource("/dictionaries/fulldictionary00.txt");

        spellchecker.indexDictionary(new PlainTextDictionary(new File(dictionaryFile.toURI())), config, true);
    }
    catch (Exception e) {
        LOG.error(e.toString());
        throw new RuntimeException(e);
    }
}

5. Download the dictionary file from http://dl.dropbox.com/u/7215751/ JavaCodeGeeks/LuceneSuggestionsTutorial/fulldictionary00.zip and place it in the src/main/resources/dictionaries folder of your project structure.

6. Finally, you need to create the actual topology, or at least partially for the moment. Create a class named TermTopology that provides a main(String[] args) method and creates a local mode cluster:

public class TermTopology {

    public static void main(String[] args) {
           Config conf = new Config();
        conf.setMaxSpoutPending(20);
        conf.put(Conf.REDIS_HOST_KEY, "localhost");
        conf.put(Conf.REDIS_PORT_KEY, Conf.DEFAULT_JEDIS_PORT);

        if (args.length == 0) {
            LocalDRPC drpc = new LocalDRPC();
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("tfidf", conf, buildTopology(drpc));
            Thread.sleep(60000);
        }
    }
}

7. Then build the appropriate portion of the topology:

public static StormTopology buildTopology(LocalDRPC drpc) {

    TridentTopology topology = new TridentTopology();
    FixedBatchSpout testSpout = new FixedBatchSpout(new Fields("url"), 1, new Values("http://t.co/hP5PM6fm"), new Values("http://t.co/xSFteG23"));
    testSpout.setCycle(true);

    Stream documentStream = topology
        .newStream("tweetSpout", testSpout)
        .parallelismHint(20)
        .each(new Fields("url"), new DocumentFetchFunction(mimeTypes), new Fields("document", "documentId", "source"));

    Stream termStream = documentStream
        .parallelismHint(20)
        .each(new Fields("document"), new DocumentTokenizer(), new Fields("dirtyTerm"))
        .each(new Fields("dirtyTerm"), new TermFilter(), new Fields("term")).project(new Fields("term","documentId","source"));
}

Storm(3) - Calculating Term Importance with Trident的更多相关文章

  1. twitter storm源码走读之7 -- trident topology可靠性分析

    欢迎转载,转载请注明出处,徽沪一郎. 本文详细分析TridentTopology的可靠性实现, TridentTopology通过transactional spout与transactional s ...

  2. Storm入门(十四)Trident API Overview

    The core data model in Trident is the "Stream", processed as a series of batches. A stream ...

  3. twitter storm源码走读之6 -- Trident Topology执行过程分析

    欢迎转载,转载请注明出处,徽沪一郎. TridentTopology是storm提供的高层使用接口,常见的一些SQL中的操作在tridenttopology提供的api中都有类似的影射.关于Tride ...

  4. storm事务

    1. storm 事务 对于容错机制,Storm通过一个系统级别的组件acker,结合xor校验机制判断一个msg是否发送成功,进而spout可以重发该msg,保证一个msg在出错的情况下至少被重发一 ...

  5. Storm系统架构以及代码结构学习

    转自:http://blog.csdn.net/androidlushangderen/article/details/45955833 storm学习系列:http://blog.csdn.net/ ...

  6. Storm编程入门API系列之Storm的Topology的stream grouping

    概念,见博客 Storm概念学习系列之stream grouping(流分组) Storm的stream grouping的Shuffle Grouping 它是随机分组,随机派发stream里面的t ...

  7. Storm编程入门API系列之Storm的定时任务实现

    概念,见博客 Storm概念学习系列之storm的定时任务 Storm的定时任务,分为两种实现方式,都是可以达到目的的. 我这里,分为StormTopologyTimer1.java   和  Sto ...

  8. Storm编程入门API系列之Storm的可靠性的ACK消息确认机制

    概念,见博客 Storm概念学习系列之storm的可靠性  什么业务场景需要storm可靠性的ACK确认机制? 答:想要保住数据不丢,或者保住数据总是被处理.即若没被处理的,得让我们知道. publi ...

  9. storm编程指南

    目录 storm编程指南 (一)创建spout (二)创建split-bolt (三)创建wordcount-bolt (四)创建report-bolt (五)创建topo storm编程指南 @(博 ...

随机推荐

  1. MongoDB碎碎念

    1. 如何从备份节点读取数据 默认是不允许的,会报如下错误: testReplSet:SECONDARY> show dbs --19T10:: E QUERY [thread1] Error: ...

  2. C语言pow函数编写

    C语言pow函数编写 #include<stdio.h> double chaoba(double f,double q); //声明自定义函数 void main(void) { dou ...

  3. Android四大组件及activity的四大启动模式

    Android四大组件 1. 广播接收者的两种类型: (1)系统广播接收者,就是继承BroadcastReceiver这个类,然后还要在清单文件中注册,注册之后给他一个action.当系统发生了这个a ...

  4. LeetCode-Count Univalue Subtrees

    Given a binary tree, count the number of uni-value subtrees. A Uni-value subtree means all nodes of ...

  5. 优秀的web前端工程师要具备什么

    优秀的前端工程师需要具备良好的沟通能力,因为你的工作与很多人的工作息息相关.在任何情况下,前端工程师至少都要满足下列四类客户的需求. 产品经理--这些是负责策划应用程序的一群人.他们能够想象出怎样通过 ...

  6. 使用 Visual Studio Team Test 进行单元测试和java中的测试

    C#中test测试地 方法一. 1.从NUnit官网(http://www.nunit.org/index.php)下载最新版本NUnit,当前版本为NUnit2.5.8. 2.安装后,在VS2008 ...

  7. 子网&sol;ip&sol;子网掩码

    IP地址由网络地址和主机地址组成 而现在IP由“子网掩码”通过子网网络地 址细分出 A,B,C类更小的网络.这种方式 实际上就是将原来的A类,B类,C类等分类 中的的主机地址部分用作子网地址,可以 将 ...

  8. CSS3特性修改&lpar;自定义&rpar;浏览器默认滚动条

    前言:我们做前端时,会遇到一些需求,要求把默认浏览器的滚动条样式给改写了,诶.好好的改它干啥了,也带不来用户体验,就是好看点嘛!实现原理其实是用了伪元素,webkit的伪元素实现很强,可以把滚动条当成 ...

  9. AngularJs 学习笔记(三)依赖注入

    一个对象可以通过三种方式来获取对依赖对象的控制权: 1.在内部创建依赖的对象 2.通过全局变量引用这个依赖对象 3.通过参数进行传递(在这里是通过函数参数) AngularJs通过$injector注 ...

  10. Maven4-仓库

    坐标和构建是一个构件在Maven世界中的逻辑表示方式,而其物理表示方式是文件.Maven通过仓库来统一管理这些文件 什么是Maven仓库? 在Maven世界中,任何一个依赖,插件或者项目构建的输出,都 ...