【原】Storm分布式RPC

时间:2021-05-09 20:20:38

5. Storm高级篇

序列化

分布式RPC

High level overview

LinearDRPCTopologyBuilder

Local mode DRPC

Remote mode DRPC

更复杂的例子

Non-linear DRPC topologies

LinearDRPCTopologyBuilder如何起作用

Advanced



分布式RPC

分布式 RPC(DRPC)的设计目标是充分利用 Storm 的计算能力实现高密度的并行实时计算。Storm 接收若干个函数参数作为输入流,然后通过 DRPC 输出这些函数调用的结果。严格来说,DRPC 并不能算作是 Storm 的一个特性,因为它只是一种基于 Storm 原语 (Stream、Spout、Bolt、Topology) 实现的计算模式。虽然可以将 DRPC 从 Storm 中打包出来作为一个独立的库,但是与 Storm 集成在一起显然更有用。

High level overview

分布式RPC是通过“DRPC server”协调处理的(Storm用一个包来实现该功能)。DRPC server 负责接收 RPC 请求,并将该请求发送到 Storm 中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。因此,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。例如,以下是一个使用参数 “http://twitter.com” 调用 “reach” 函数计算结果的例子:

DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");

分布式RPC工作流示意图如下所示:

【原】Storm分布式RPC

客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC server中接收一个函数调用流,DRPC Server会为每个函数调用都标记了一个唯一的 id,随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC Server,根据函数调用的 id 来将函数调用的结果返回。

LinearDRPCTopologyBuilder

Storm中提供了名为LinearDRPCTopologyBuilder 的topology builder,它几乎自动完成了DRPC的所有步骤,如下所示:

1.设置spout

2.向DRPC server返回运行结果。

3.给bolts提供了聚集元组的功能。

让我们一起看一下简单的例子,该例子是DRPC topology的一个实现并返回结果为输入附加字符串“!”。

public static class ExclaimBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + "!"));
} public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);
// ...}

正如你所见,当创建LinearDRPCTopologyBuilder 时,你需要让topology知道 DRPC函数的名字。单个DRPC server负责很多函数的协调处理,且这些函数的功能不同。你声明的第一个bolt将接受一个2元组,第一个域是请求id,第二个域是请求参数。LinearDRPCTopologyBuilder 中最后一个bolt会输出形式为[id,result]的2元组输出流。最后,所有中间结果的元组的第一个域必须包括请求id。

在本例子中,ExclaimBolt 只是简单地给第二个域附加字符串“!”。LinearDRPCTopologyBuilder 继续和DRPC server通信并将结果返回。

Local mode DRPC

DRPC可以以本地模式运行,下面以本地模式运行的例子:

LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));
cluster.shutdown();
drpc.shutdown();

首先你会创建一个 LocalDPRC 对象,该对象会在进程中模拟一个 DRPC 服务器,就像LocalCluster 在进程中模拟 Storm 集群的功能一样。然后,创建LocalCluster以本地模式运行topology 。LinearDRPCTopologyBuilder 有独立的方法用于创建本地topologies 和远程topologies 。在本地模式下,LocalDRPC 对象没有绑定任何端口所以topology需要知道正在和它进行通信的对象,这是方法createLocalTopology 接受LocalDRPC 对象作为输入参数的原因。

在启动拓扑后,你可以使用 execute 方法来完成 DRPC 调用。

Remote mode DRPC

在一个真实的集群中使用 DRPC 有以下三个步骤:

1.启动 DRPC Server;

2.配置 DRPC Server的地址;

3.将 DRPC topologies 提交到集群运行。

可以像 Nimbus、Supervisor 那样使用 storm 命令来启动 DRPC Serve,如下:

bin/storm drpc

接下来,你需要在集群上配置 DRPC Server的地址。这是为了让 DRPCSpout 获取从哪里触发函数调用的方法。可以通过编辑 storm.yaml 或者添加拓扑配置的方式实现配置。配置 storm.yaml 的方式类似于下面这样:

drpc.servers:
- "drpc1.foo.com"
- "drpc2.foo.com"

最后,你可以像其他拓扑一样使用 StormSubmitter 来启动拓扑。以下是使用远程模式构造拓扑的一个例子:

StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());

createRemoteTopology 方法是用来创建集群模式下运行的topologies 。

更复杂的例子

上面描述的exclamation DRPC是为了说明DRPC的简单例子。下面让我们共同学习一下更复杂的例子-Storm集群并行计算的DRPC函数调用,该例子是计算Twitter上URL的访问。

URL访问是指不同的人在Twitter上发的推文,你需要完成如下计算:

1.获取所有tweeted了该URL的所有人。

2.获取所有关注了1中的所有人。

3.2中所有人的set集合。

4.统计3中set集合的个数。

一次计算可能涉及上百次的数据库调用和数以千万计的关注记录,这计算规模确实很大。正如你所见,实现Storm函数是非常简单的。在单台机器上,计算需要1分钟;但在集群中,即使最难计算的URL访问也只需数秒。

一个简单的访问topology 可以在storm-starter中找到。下面是定义访问topology 的具体步骤:

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 3);
builder.addBolt(new GetFollowers(), 12)
.shuffleGrouping();builder.addBolt(new PartialUniquer(), 6)
.fieldsGrouping(new Fields("id", "follower"));
builder.addBolt(new CountAggregator(), 2)
.fieldsGrouping(new Fields("id"));

topology 的执行按照以下四步骤:

1.GetTweeters 得到tweeted了URL的用户。它将[id,url]格式的输入流转换为[id,tweeter]格式的输出流。每个url将映射为多个tweeter元组。

2.GetFollowers得到tweeters的关注者。它将[id,tweeter]格式的输入流转换为[id,follower]格式的输出流。这些任务中,可能有重复的元组,因为一些人可能关注了多个人都tweeted了同样的URL。

3.PartialUniquer根据关注者id分组。这会导致同样的关注者在同样的任务中处理,所以每个PartialUniquer 的任务将会接受多个互补的关注者集合。一旦PartialUniquer 接受了所有的关注者元组,它将会输出关注者子集合元素的个数。

4.最后,CountAggregator 从PartialUniquer 接受部分count值然后累加完成整个计算,并返回结果。

下面看PartialUniquer bolt的代码实现:

public class PartialUniquer extends BaseBatchBolt {
BatchOutputCollector _collector;
Object _id;
Set<String> _followers = new HashSet<String>(); @Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
} @Override
public void execute(Tuple tuple) {
_followers.add(tuple.getString(1));
} @Override
public void finishBatch() {
_collector.emit(new Values(_id, _followers.size()));
} @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "partial-count"));
}}

PartialUniquer 通过继承BaseBatchBolt 间接实现了IBatchBolt 接口。batch bolt提供了第一个类API,用于将一批元组作为一个具体单元进行处理。对于每个请求id,会创建一个新的batch bolt实例,在适当时候Storm也会清理这些实例。

当PartialUniquer 在execute方法中接受了一个关注者元组时,会将该元组添加到请求id的set集合中。

当任务中的一批元组处理完成后会调用batch bolts的finishBatch 方法,该方法输出关注者id集合元素个数形式的一元组。在后台,CoordinatedBolt 用来监测何时给定bolt已经接受到了请求id所有的元组,它用direct stream来管理。

topology 中剩下的部分都能很容易明白,正如你看到的,每个访问计算步骤都是并行完成的,并且定义DRPC topology是非常简单的。

Non-linear DRPC topologies

LinearDRPCTopologyBuilder 只处理线性DRPC的topology,它的计算是一个序列步骤。不难想象功能需求将需要更复杂的topology ,它可能涉及到bolts的分支与整合。

LinearDRPCTopologyBuilder如何起作用

DRPCSpout 发送[args,return-info]元组。return-info是DRPC server的主机名、端口号和生成的id。

topology的构造参数包括:

DRPCSpout

PrepareRequest (生成请求id和创建返回信息stream和参数stream)

CoordinatedBolt wrappers and direct groupings

JoinResult (返回信息进行join操作)

ReturnResult (连接到DRPC server并返回结果)

LinearDRPCTopologyBuilder 是一个很好的例子

Advanced

KeyedFairBolt 同时处理多个请求。

如何直接使用CoordinatedBolt。