cloudera search1.0.0环境搭建(2):利用flume-ng的MorphlineSolrSink实现近实时(NRT)搜索

时间:2022-09-13 00:05:28

要实现近实时搜索,就必须有一种机制来实时的处理数据然后生成到solr的索引中去,flume-ng刚好提供了这样一种机制,它可以实时收集数据,然后通过MorphlineSolrSink对数据进行ETL,最后写入到solr的索引中,这样就能在solr搜索引擎中近实时的查询到新进来的数据了。

搭建步骤:

1 我们这里只做演示效果,所以新建了一个文件file01来保存了两条数据,后面将通过flume-ng avro-client -H localhost -p 44444 -F file01将这两条数据提交给flume agent。

这两条数据如下:

{"id": "1234567890", "user_friends_count": 111, "user_location": "Palo Alto", "user_description": "desc1", "user_statuses_count": 11111, "user_followers_count": 111, "user_name": "name1", "user_screen_name": "fake_user1", "created_at": "1985-09-04T18:01:01Z", "text": "sample tweet one", "retweet_count": 0, "retweeted": false, "in_reply_to_user_id": 0, "source": "href=\"http:\/\/sample.com\"", "in_reply_to_status_id": 0, "media_url_https": null, "expanded_url": null}
{"id": "2345678901", "user_friends_count": 222, "user_location": "San Francisco", "user_description": "desc2", "user_statuses_count": 222222, "user_followers_count": 222, "user_name": "name2", "user_screen_name": "fake_user2", "created_at": "1985-09-04T19:14:34Z", "text": "sample tweet two", "retweet_count": 0, "retweeted": false, "in_reply_to_user_id": 0, "source": "href=\"http:\/\/sample.com\"", "in_reply_to_status_id": 0, "media_url_https": null, "expanded_url": null}

是两条JSON数据,后面我们会用morphlines来对json数据进行ETL抽取指定的几个字段。

2 在CM中flume的配置中配置Flume-NG Solr 接收器,如下图:cloudera search1.0.0环境搭建(2):利用flume-ng的MorphlineSolrSink实现近实时(NRT)搜索

morphlines配置文件如下:

# Specify server locations in a SOLR_LOCATOR variable; used later in 
# variable substitutions:
SOLR_LOCATOR : {
  # Name of solr collection
  collection : collection1
  
  # ZooKeeper ensemble
  zkHost : "master68:2181,slave69:2181,slave76:2181/solr"  
}

# Specify an array of one or more morphlines, each of which defines an ETL 
# transformation chain. A morphline consists of one or more potentially 
# nested commands. A morphline is a way to consume records such as Flume events, 
# HDFS files or blocks, turn them into a stream of records, and pipe the stream 
# of records through a set of easily configurable transformations on its way to 
# Solr.
morphlines : [
  {
    # Name used to identify a morphline. For example, used if there are multiple 
    # morphlines in a morphline config file.
    id : morphline1 
    
    # Import all morphline commands in these java packages and their subpackages.
    # Other commands that may be present on the classpath are not visible to this
    # morphline.
    importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
    
    commands : [                    
      {
      	readJson {}
      }
      { 
        extractJsonPaths {
          flatten : false
          paths : { 
            id : /id            
            user_name : /user_screen_name
            created_at : /created_at
            text : /text      
          }
        }
      }
      
      # Consume the output record of the previous command and pipe another
      # record downstream.
      #
      # convert timestamp field to native Solr timestamp format
      # such as 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z
      {
        convertTimestamp {
          field : created_at
          inputFormats : ["yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd"]
          inputTimezone : America/Los_Angeles
          outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"                                 
          outputTimezone : UTC
        }
      }
      
      # Consume the output record of the previous command and pipe another
      # record downstream.
      #
      # This command deletes record fields that are unknown to Solr 
      # schema.xml.
      #
      # Recall that Solr throws an exception on any attempt to load a document 
      # that contains a field that is not specified in schema.xml.
      {
        sanitizeUnknownSolrFields {
          # Location from which to fetch Solr schema
          solrLocator : ${SOLR_LOCATOR}
        }
      }  
            
      # log the record at DEBUG level to SLF4J
      { logDebug { format : "output record: {}", args : ["@{}"] } }    
      
      # load the record into a Solr server or MapReduce Reducer
      { 
        loadSolr {
          solrLocator : ${SOLR_LOCATOR}
        }
      }
    ]
  }
]

简单解释一下这个morphlines配置文件,首先执行了一个readJson命令,将读入的event的内容转换成了一个json对象,然后使用extractJsonPaths命令抽取json对象的具体字段值并重新赋值给另一个字段(例如user_name : /user_screen_name 是读取user_screen_name的值并赋值给user_name ),然后使用convertTimestamp对create_at字段进行格式化,最后执行sanitizeUnknownSolrFields命令舍弃solr的schema中没有配置的field字段,即通过ETL之后record最终只保留solr中已配置的字段。然后通过loadSolr指令将最终的record提交到solr。

3 接下来就是flume agent的配置:

tier1.sources=source1
tier1.channels=channel1
tier1.sinks=sink1

tier1.sources.source1.type = avro
tier1.sources.source1.bind = 0.0.0.0
tier1.sources.source1.port = 44444
tier1.sources.source1.channels=channel1

tier1.channels.channel1.type=memory
tier1.channels.channel1.capacity=10000

tier1.sinks.sink1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.morphlineFile = morphlines.conf
tier1.sinks.sink1.morphlineId = morphline1

这里一个注意点就是我们在CM中配置的Flume-NG Solr 接收器,所以morphlineFile直接写morphlines.conf就行了,否则需要写绝对路径,不然没法找到morphlines的配置文件。

4 上面三部准备好之后,启动agent,然后在shell控制台执行 flume-ng avro-client -H localhost -p 44444 -F file01命令,将我们第一步创建的数据文件提交给agent。

5执行完后,如果没报错的话,可以去solr中通过http://slave77:8983/solr/collection1/select?q=*:*查询一下这两条数据是不是已经创建到搜索引擎的索引库中

cloudera search1.0.0环境搭建(2):利用flume-ng的MorphlineSolrSink实现近实时(NRT)搜索

如果看到如下图示的结果,恭喜你,你已经成功完成了本篇文章的NRT架构的搭建。

cloudera search1.0.0环境搭建(2):利用flume-ng的MorphlineSolrSink实现近实时(NRT)搜索