Flume-ng源码解析之Sink组件

时间:2022-09-06 22:36:38

如果你还没看过Flume-ng源码解析系列中的启动流程和Channel组件,可以点击下面链接:

Flume-ng源码解析之启动流程

Flume-ng源码解析之Channel组件

作为启动流程中第二个启动的组件,我们今天来看看Sink的细节

1 Sink

Sink在agent中扮演的角色是消费者,将event输送到特定的位置

首先依然是看代码,由代码我们可以看出Sink是一个接口,里面最主要的方法是process(),用来处理从Channel中获取的数据。Sink的实例是由SinkFactory.create()生成的。

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Sink extends LifecycleAware, NamedComponent {
public void setChannel(Channel channel);
public Channel getChannel();
/* 用来处理channel中取来的event*/
public Status process() throws EventDeliveryException;
public static enum Status {
READY, BACKOFF
}
}

在启动流程中我们了解到Application中启动的不是Sink,而是SinkRunner,由名字我们可以看出这是一个驱动类。我们来看看代码,主要看它的start()

public class SinkRunner implements LifecycleAware {

  ...

  @Override
public void start() {
SinkProcessor policy = getPolicy(); policy.start(); runner = new PollingRunner(); runner.policy = policy;
runner.counterGroup = counterGroup;
runner.shouldStop = new AtomicBoolean(); runnerThread = new Thread(runner);
runnerThread.setName("SinkRunner-PollingRunner-" +
policy.getClass().getSimpleName());
runnerThread.start(); lifecycleState = LifecycleState.START;
}
... }

我们知道启动SinkRunner实际上就是调用它的start(),而在start()中可以看到主要是启动了一个SinkProcessor,而这个SinkProcessor在创建SinkRunnner的时候已经指定了,如果你想要了解配置文件是如何处理的,可以要去看看conf包里面的类,可以看看org.apache.flume.node.AbstractConfigurationProvider中的getConfiguration()。

我们接着看看SinkProcessor

public interface SinkProcessor extends LifecycleAware, Configurable {
Status process() throws EventDeliveryException;
void setSinks(List<Sink> sinks);
}

SinkProcesor是一个接口,他的实现类由SinkProcessorFactory的getProcessor()生成,在AbstractConfigurationProvider中的loadSinkGroup()调用SinkGroup中的configure()生成。

public class SinkGroup implements Configurable, ConfigurableComponent {
List<Sink> sinks;
SinkProcessor processor;
SinkGroupConfiguration conf; public SinkGroup(List<Sink> groupSinks) {
sinks = groupSinks;
} public SinkProcessor getProcessor() {
return processor;
} @Override
public void configure(ComponentConfiguration conf) {
this.conf = (SinkGroupConfiguration) conf;
processor =
SinkProcessorFactory.getProcessor(this.conf.getProcessorContext(),
sinks);
}
}

那么我们以DefalutSinkProcessor为例子看看

public class DefaultSinkProcessor implements SinkProcessor, ConfigurableComponent {
private Sink sink;
private LifecycleState lifecycleState; @Override
public void start() {
Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
sink.start();
lifecycleState = LifecycleState.START;
} @Override
public void stop() {
Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
sink.stop();
lifecycleState = LifecycleState.STOP;
} @Override
public LifecycleState getLifecycleState() {
return lifecycleState;
} @Override
public void configure(Context context) {
} @Override
public Status process() throws EventDeliveryException {
return sink.process();
} @Override
public void setSinks(List<Sink> sinks) {
Preconditions.checkNotNull(sinks);
Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
+ "only handle one sink, "
+ "try using a policy that supports multiple sinks");
sink = sinks.get(0);
} @Override
public void configure(ComponentConfiguration conf) { } }

从上面的代码中我们可以看到SinkProcessor执行的还是sink的start、stop和process方法,那么SinkProcessor的作用是什么,Flume提供leFailoverSinkProcessor和LoadBalancingSinkProcessor,顾名思义,一个是失效备援,一个是负载均衡,那么SinkProcessor不同子类的存在就是为了实现不同的分配操作和策略。而sink的start()通常是启动线程去执行消费操作。

Flume-ng源码解析之Sink组件的更多相关文章

  1. Flume-ng源码解析之Source组件

    如果你还没看过Flume-ng源码解析系列中的启动流程.Channel组件和Sink组件,可以点击下面链接: Flume-ng源码解析之启动流程 Flume-ng源码解析之Channel组件 Flum ...

  2. rest-framework源码解析和自定义组件----版本

    版本 url中通过GET传参自定义的版本 12345678910111213141516171819202122 from django.http import HttpResponsefrom dj ...

  3. Flume-ng源码解析之Channel组件

    如果还没看过Flume-ng源码解析之启动流程,可以点击Flume-ng源码解析之启动流程 查看 1 接口介绍 组件的分析顺序是按照上一篇中启动顺序来分析的,首先是Channel,然后是Sink,最后 ...

  4. Spring源码解析系列汇总

    相信我,你会收藏这篇文章的 本篇文章是这段时间撸出来的Spring源码解析系列文章的汇总,总共包含以下专题.喜欢的同学可以收藏起来以备不时之需 SpringIOC源码解析(上) 本篇文章搭建了IOC源 ...

  5. &period;Net Core缓存组件&lpar;Redis&rpar;源码解析

    上一篇文章已经介绍了MemoryCache,MemoryCache存储的数据类型是Object,也说了Redis支持五中数据类型的存储,但是微软的Redis缓存组件只实现了Hash类型的存储.在分析源 ...

  6. &period;Net Core缓存组件&lpar;MemoryCache&rpar;源码解析

    一.介绍 由于CPU从内存中读取数据的速度比从磁盘读取快几个数量级,并且存在内存中,减小了数据库访问的压力,所以缓存几乎每个项目都会用到.一般常用的有MemoryCache.Redis.MemoryC ...

  7. admin源码解析以及仿照admin设计stark组件

    ---恢复内容开始--- admin源码解析 一 启动:每个APP下的apps.py文件中. 首先执行每个APP下的admin.py 文件. def autodiscover(): autodisco ...

  8. admin源码解析及自定义stark组件

    admin源码解析 单例模式 单例模式(Singleton Pattern)是一种常用的软件设计模式,该模式的主要目的是确保某一个类只有一个实例存在.当你希望在整个系统中,某个类只能出现一个实例时,单 ...

  9. Django 之 admin组件使用&amp&semi;源码解析

    admin组件使用 Django 提供了基于 web 的管理工具. Django 自动管理工具是 django.contrib 的一部分.可以在项目的 settings.py 中的 INSTALLED ...

随机推荐

  1. redhat 下 rpm 指令

    1.如何安装rpm软件包rmp软件包的安装可以使用程序rpm来完成.执行下面的命令 rpm -i your-package.rpm 其中your-package.rpm是你要安装的rpm包的文件名,一 ...

  2. C&num; 读写ini文件

    1.添加引用 using System.IO; using System.Runtime.InteropServices; 2.声明API函数 #region API函数声明 [DllImport(& ...

  3. 推荐使用Wiz笔记发表博客

    一直用Wiz笔记,平时随手记录一些东西,可以自动在多台电脑同步,还支持移动客户端,上下班路上用手机也能看.最近在整理之前工作的一些资料,并把自己觉得可以分享的内容发到博客园上.当然会先在Wiz笔记上编 ...

  4. Android常见崩溃或闪退的问题描述及原因总结、及与性能相关的模块——持续更新

    1.nullpointer——就是使用一个对象的时候还没有对其进行初始化导致该问题 一般在何种情况下容易出现呢? (1)父窗口+子窗口同时出现的,父窗口因为某种原因消掉了,子窗口还在,操作子窗口找不到 ...

  5. 设计模式&lowbar;Decorator&lowbar;装饰模式

    形象例子: Mary过完轮到Sarly过生日,还是不要叫她自己挑了,不然这个月伙食费肯定玩完,拿出我去年在华山顶上照的照片,在背面写上“最好的的礼物,就是爱你的Fita”,再到街上礼品店买了个像框(卖 ...

  6. 2015第43周一solr相关概念

    Solr是一种开放源码的.基于Lucene的搜索服务器.它易于安装和配置,而且附带了一个基于HTTP 的管理界面.   官网:http://lucene.apache.org/solr/ solr学习 ...

  7. db2中修改表字段的长度,查看表字段长度,以及查看表字段已存放值大小

    修改表字段语句: alter table 表名 alter column  字段名 set data type varchar(7700) 如: ALTER TABLE JV_BI_BACK_OPER ...

  8. C&plus;&plus;&comma;Python&comma;Go对照学习-02

    main函数         Go中有且只有一个main函数,而且main函数必须在package main当中.main函数无返回值也无参数,如果希望获取从命令行传递的参数有其他包解决这个问题.   ...

  9. Cleaning up old NVIDIA driver files

    原文地址:https://www.gameplayinside.com/optimize/cleaning-up-old-nvidia-driver-files-to-save-disk-space/ ...

  10. &OpenCurlyDoubleQuote;2017面向对象程序设计(Java)第十一周学习总结”存在问题的反馈及教学安排

    “2017面向对象程序设计(Java)第十一周学习总结”存在问题的反馈及教学安排1.“提出表扬的同学:姜依萍,王雪玲,徐楠,相文君,赵晓未提交作业的同学:任红强,王瑞强,宗鹏新,扎西才让,布旦刀杰,范 ...