SRS之监听端口的管理:RTMP

时间:2021-11-25 17:06:58

1. 监听端口管理的入口函数

监听端口的管理入口在 run_master 函数中,如下:

int run_master()
{
... if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {
return ret;
} ...
}

在 run_master 函数中,调用了 SrsServer 类的成员函数 listen:

int SrsServer::listen()
{
int ret = ERROR_SUCCESS; if ((ret = listen_rtmp()) != ERROR_SUCCESS) {
return ret;
} if ((ret = listen_http_api()) != ERROR_SUCCESS) {
return ret;
} if ((ret = listen_http_stream()) != ERROR_SUCCESS) {
return ret;
} if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {
return ret;
} return ret;
}

2. rtmp 端口的管理

SrsServer 与 SrsListener 之间的关系图

SRS之监听端口的管理:RTMP

2.1 SrsServer::listen_rtmp

int SrsServer::listen_rtmp()
{
int ret = ERROR_SUCCESS; // stream service port.
/* 获取配置文件中所有要监听的端口 */
std::vector<std::string> ip_ports = _srs_config->get_listens();
srs_assert((int)ip_ports.size() > 0); /* 清空 listeners 容器中所有类型为 SrsListenerRtmpStream 的项 */
close_listeners(SrsListenerRtmpStream); for (int i = 0; i < (int)ip_ports.size(); i++) {
/* 父类 SrsListener 的指针listener 指向新构造的子类 SrsStreamListener 的对象 */
SrsListener* listener = new SrsStreamListener(this, SrsListenerRtmpStream);
/* 然后将该指向新构造的子类 SrsStreamListener 对象的 listener 指针放入到
* listeners vector 容器中 */
listeners.push_back(listener); std::string ip;
int port;
/* 分割 ip 地址(若有的话)和 port 端口 */
srs_parse_endpoint(ip_ports[i], ip, port); /* 多态:调用子类 SrsStreamListener 的成员函数 listen */
if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
srs_error("RTMP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
return ret;
}
} return ret;
}

2.1.1 监听的类型:SrsListenerType

// listener type for server to identify the connection,
// that is, use different type to process the connection.
enum SrsListenerType
{
// RTMP client,
SrsListenerRtmpStream = 0,
// HTTP api,
SrsListenerHttpApi = 1,
// HTTP stream, HDS/HLS/DASH
SrsListenerHttpStream = 2,
// UDP stream, MPEG-TS over udp
SrsListenerMpegTsOverUdp = 3,
// TCP stream, RTSP stream.
SrsListenerRtsp = 4,
// TCP stream, FLV stream over HTTP.
SrsListenerFlv = 5,
};

2.2 SrsServer::close_listeners

void SrsServer::close_listeners(SrsListenerType type)
{
std::vector<SrsListener*>::iterator it;
for (it = listeners.begin(); it != listeners.end();) {
SrsListener* listener = *it; if (listener->listen_type() != type) {
++it;
continue;
} srs_freep(listener);
it = listeners.erase(it);
}
}

该函数是将 listeners 中所有类型值为 type 的元素移除。

2.3 SrsStreamListener 的构造

2.3.1 SrsStreamListener 类定义

/**
* tcp listener.
*/
class SrsStreamListener : virtual public SrsListener, virtual public ISrsTcpHandler
{
private:
SrsTcpListener* listener;
public:
SrsStreamListener(SrsServer* server, SrsListenerType type);
virtual ~SrsStreamListener();
public:
virtual int listen(std::string ip, int port);
// ISrsTcpHandler
public:
virtual int on_tcp_client(st_netfd_t stfd);
};

该类继承自 SrsListener 和 ISrsTcpHandler。

2.3.2 SrsListener 类定义

/**
* the common tcp listener, for RTMP/HTTP server.
*/
class SrsListener
{
protected:
/* 监听类型:rtmp 或 http 或其他 */
SrsListenerType type;
protected:
/* 服务器地址 */
std::string ip;
/* 监听的端口 */
int port;
SrsServer* server;
public:
SrsListener(SrsServer* svr, SrsListenerType t);
virtual ~SrsListener();
public:
virtual SrsListenerType listen_type();
virtual int listen(std::string i, int p) = 0;
};

2.3.3 ISrsTcpHandler 类定义

/**
* the tcp connection handler.
*/
class ISrsTcpHandler
{
public:
ISrsTcpHandler();
virtual ~ISrsTcpHandler();
public:
/**
* when got tcp client.
*/
virtual int on_tcp_client(st_netfd_t stfd) = 0;
};

该类中定义了当接收到 tcp 客户端连接时调用的纯虚函数,由子类实现.

SrsStreamListener 和 SrsListener、ISrsTcpHandler 之间的关系图

SRS之监听端口的管理:RTMP

2.3.4 构造 SrsStreamListener 类

下面按构造 SrsStreamListener 类调用的构造函数的先后顺序进行分析。

2.3.4.1 SrsListener 构造函数

SrsListener::SrsListener(SrsServer* svr, SrsListenerType t)
{
port = 0;
server = svr;
type = t;
}

该 SrsListener 的构造函数仅对该类的成员进行初始化。

SrsStreamListener 的父类 ISrsTcpHandler 类的构造函数是个空壳,什么也没做。

2.3.4.2 SrsStreamListener 构造函数

SrsStreamListener::SrsStreamListener(SrsServer* svr, SrsListenerType t)
: SrsListener(svr, t)
{
listener = NULL;
}

这里仅对 listener 置初值 NULL。

2.4 srs_parse_endpoint:解析端口

void srs_parse_endpoint(string ip_port, string& ip, int& port)
{
std::string the_port;
srs_parse_endpoint(ip_port, ip, the_port);
port = ::atoi(the_port.c_str());
}

该函数接着调用 srs_parse_endpoint 的另一重载函数:

void srs_parse_endpoint(string ip_port, string& ip, string& port)
{
ip = "0.0.0.0";
port = ip_port; /* string::npos 是一个长度参数,表示直到字符串的结束 */
size_t pos = string::npos;
if ((pos = pos.find(":")) != string::npos) {
/* 分割 ip 地址和端口 */
ip = port.substr(0, pos);
port = port.substr(pos + 1);
}
}

2.5 SrsStreamListener::listen

在该函数中,开始绑定并监听端口:

int SrsStreamListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS; ip = i;
port = p; srs_freep(listener);
/* 构造 SrsTcpListener 类,该类中创建了一个可重复利用的线程: tcp */
listener = new SrsTcpListener(this, ip, port); /* 开始真正的绑定和监听端口 */
if ((ret = listener->listen()) != ERROR_SUCCESS) {
srs_error("tcp listen failed. ret=%d", ret);
return ret;
} srs_info("listen thread current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
_srs_context->get_id(), p, type, listener->fd(), i.c_str(), p); srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(),
ip.c_str(), port, listener->fd()); return ret;
}
SrsStreamListener 与 SrsTcpListener 之间的关系图

SRS之监听端口的管理:RTMP

2.5.1 SrsTcpListener 类定义

/**
* bind and listen tcp port, use handler to process the client.
*/
class SrsTcpListener : public ISrsReuseableThreadHandler
{
private:
int _fd;
st_netfd_t _stfd;
/* 定义一个 SrsReuseableThread 类的指针变量,创建一个可被其他线程启动或终止的线程 */
SrsReuseableThread* pthread;
private:
ISrsTcpHandler* handler;
std::string ip;
int port;
public:
SrsTcpListener(ISrsTcpHandler* h, std::string i, int p);
virtual ~SrsTcpListener();
public:
virtual int fd();
public:
virtual int listen();
// interface ISrsReusableThreadHandler.
public:
virtual int cycle();
};

该类主要用于绑定和监听 tcp 端口,且继承自 ISrsReuseableThreadHandler 类。

2.5.2 ISrsReuseableThreadHandler 类定义

/**
* the reuse thread is a thread stop and start by other thread.
* user can create thread and stop then start again and again,
* generally must provides a start and stop method, @see SrsIngester.
* the step to create a thread stop by other thread:
* 1. create SrsReuseableThread field.
* 2. must manually stop the thread when started it.
* for example:
* class SrsIngester : public ISrsReusableThreadHandler {
* public: SrsIngester() {
* pthread = new SrsReuseableThread("ingest",
* this, SRS_AUTO_INGESTER_SLEEP_US);
*
* }
* public: virtual int start() { return pthread->start(); }
* public: virtual void stop() { pthread->stop(); }
* public: virtual int cycle() {
* // check status, start ffmpeg when stopped.
* }
* };
*/
class ISrsReusableThreadHandler
{
public:
ISrsReusableThreadHandler();
virtual ~ISrsReusableThreadHandler();
public:
/**
* the cycle method for the one cycle thread.
* @remark when the cycle has its inner loop, it must check whether
* the thread is intrrupted.
*/
virtual int cycle() = 0;
public:
/**
* other callback for handler.
* @remark all callback is optional, handler can ignore it.
*/
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop();
};
SrsTcpListener 与 ISrsReusableThreadHandler 之间的关系图

SRS之监听端口的管理:RTMP

2.5.3 SrsTcpListener 构造函数

SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
{
handler = h;
ip = i;
port = p; _fd = -1;
_stfd = NULL; /* 创建一个可重复利用的线程(即由其他线程终止后可再次启动) */
pthread = new SrsReusableThread("tcp", this);
}

在该构造函数中,除了初始化 SrsTcpListener 类的一些成员变量外,还创建了一个可被重复使用的线程:"tcp",即可被其他线程启动或终止的线程。

SrsTcpListener 与 SrsReusableThread 之间的关系图

SRS之监听端口的管理:RTMP

2.5.4 SrsReusableThread 类

2.5.4.1 SrsReusableThread 类定义

class SrsReusableThread : public internal::ISrsThreadHandler
{
private:
internal::SrsThread* pthread;
ISrsReusableThreadHandler* handler;
public:
SrsReusableThread(const char *n, ISrsReusableThreadHandler* h,
int64_t interval_us = 0);
virtual ~SrsReusableThread();
public:
/**
* for the reusable thread, start and stop by user.
*/
virtual int start();
/**
* stop the thread, wait for the thread to terminater.
* @remark user can stop multiple times, ignore if already stopped.
*/
virtual void stop();
public:
/**
* get the context id. @see: ISrsThreadContext.get_id().
* used for parent thread to get the id.
* @remark when start thread, parent thread will block and wait for this id ready.
*/
virtual int cid();
// interface internal::ISrsThreadHandler
public:
virtual int cycle();
virtual void on_thread_start();
virtual int on_before_cycle();
virtual int on_end_cycle();
virtual void on_thread_stop();
};
SrsReusableThread 和 internal::ISrsThreadHandler 之间的关系图

SRS之监听端口的管理:RTMP

2.5.4.2 SrsReusableThread 构造函数

SrsReusableThread::SrsReusableThread(const char* n, ISrsReusableThreadHandler* h,
int64_t interval_us)
{
handler = h;
/**
* @n: 指定了该线程的名字
* @this: 指定了该线程的处理者为 SrsReusableThread
* @interval_us: 指定了该线程每次循环后休眠的时间
* @true: 指定该线程是 joinable 的,必须由其他线程终止该线程
*/
pthread = new internal::SrsThread(n, this, interval_us, true);
}

2.5.4.3 SrsThread 构造函数

SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler,
int64_t interval_us, bool joinable)
{
_name = name;
/* 父类 ISrsThreadHandler 指针 handler 指向子类 SrsReusableThread 对象的首地址 */
handler = thread_handler;
/* 每次循环后休眠的时间 */
cycle_interval_us = interval_us; tid = NULL;
loop = false;
really_terminated = true;
_cid = -1;
_joinable = joinable;
disposed = false; // in start(), the thread cycle method maybe stop and remove the thread itself,
// and the thread start() is waiting for the _cid, and segment fault then.
// @see https://github.com/ossrs/srs/issues/110
// thread will set _cid, callback on_thread_start(), then wait for the can_run signal.
can_run = false;
}

该 SrsThread 的构造函数中仅初始化了该类的一些成员变量,该类没有父类。

2.5.5 SrsTcpListener::listen

int SrsTcpListener::listen()
{
int ret = ERROR_SUCCESS; /* 创建一个 tcp socket 套接字 */
if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
ret = ERROR_SOCKET_CREATE;
srs_error("create linux socket error. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd); /* 设置该 tcp 套接字的属性为地址可复用 */
int reuse_socket = 1;
if (setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) {
ret = ERROR_SOCKET_SETREUSE;
srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, _fd); /* 将该端口绑定在具体的 ip 地址上 */
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
ret = ERROR_SOCKET_BIND;
srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
}
srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); /* 监听该端口 */
if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {
ret = ERROR_SOCKET_LISTEN;
srs_error("listen socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
}
srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); /* 构造一个 _st_netfd_t 的结构体,同时设置 _fd 为非阻塞,以便 ST 库使用 */
if ((_stfd = st_netfd_open_socket(_fd)) == NULL) {
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d",
ip.c_str(), port, ret);
return ret;
}
srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd); if ((ret = pthread->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(),
port, ret);
return ret;
}
srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port); return ret;
}

2.5.6 st_netfd_open_socket

_st_netfd_t *st_netfd_open_socket(int osfd)
{
return _st_netfd_new(osfd, 1, 1);
}

该函数中,又调用了 _st_netfd_new 函数,主要是创建并初始化一个 _st_netfd_t 结构体,同时设置 osfd 套接字为非阻塞,代码如下:

static _st_netfd_t *_st_netfd_new(int osfd, int nonblock, int is_socket)
{
_st_netfd_t *fd;
int flags = 1; if ((*_st_eventsys->fd_new)(osfd) < 0)
return NULL; if (_st_netfd_freelist) {
fd = _st_netfd_freelist;
_st_netfd_freelist = _st_netfd_freelist->next;
} else {
fd = calloc(1, sizeof(_st_netfd_t));
if (!fd)
return NULL;
} fd->osfd = osfd;
fd->inuse = 1;
fd->next = NULL; if (nonblock) {
/* Use just one system call */
if (is_socket && ioctl(osfd, FIONBIO, &flags) != -1)
return fd;
/* Do it the Posix way */
if ((flags = fcntl(osfd, F_GETFL, 0)) < 0 ||
fcntl(osfd, F_SETFL, flags | O_NONBLOCK) < 0) {
st_netfd_free(fd);
return NULL;
}
} return fd;
}

2.5.7 SrsReusableThread::start

int SrsReusableThread::start()
{
/* 调用 internal::SrsThread->start() 函数 */
return pthread->start();
}

2.5.8 SrsThread::start

int SrsThread::start()
{
int ret = ERROR_SUCCESS; /* 检测该线程 id 是否为正值,即已经创建并运行了 */
if (tid) {
srs_info("thread %s already running.", _name);
return ret;
} /* 调用 st_thread_create 函数创建一个线程,线程函数为 thread_fun,
* 对于 "tcp" 线程,可知 _joinable 为 1, 同时将该线程添加到 run 队列中 */
if ((tid = st_thread_create(thread_fun, this, (_joinable ? 1 : 0), 0)) == NULL) {
ret = ERROR_ST_CREATE_CYCLE_THREAD;
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
} disposed = false;
// we set to loop to true for thread to run.
loop = true; // wait for cid to ready, for parent thread to get the cid.
while (_cid < 0) {
/* 这里会更改当前线程的状态为 SLEEPING,并将其添加到 sleep 队列中,
* 然后切换上下文环境,当 run 队列中可调度运行的线程时,则调度它们 */
st_usleep(10 * 1000);
} // now, cycle thread can run.
can_run = true; return ret;
}

2.5.9 st_usleep

int st_usleep(st_utime_t usecs)
{
_st_thread_t *me = _ST_CURRENT_THREAD(); if (me->flags & _ST_FL_INTERRUPT) {
me->flags &= ~_ST_FL_INTERRUPT;
errno = EINTR;
return -1;
} if (usecs != ST_UTIME_NO_TIMEOUT) {
/* 设置当前线程的状态并将其添加到 sleep 队列中 */
me->state = _ST_ST_SLEEPING;
_ST_ADD_SLEEPQ(me, usecs);
} else
me->state = _ST_ST_SUSPENDED; _ST_SWITCH_CONTEXT(me); if (me->flags & _ST_FL_INTERRUPT) {
me->flags &= ~_ST_FL_INTERRUPT;
errno = EINTR;
return -1;
} return 0;
}

该函数主要执行流程如下:

  1. 设置当前线程的 state 为 _ST_ST_SLEEPING,并将其添加到 sleep 队列中;
  2. 调用 _ST_SWITCH_CONTEXT,该宏又执行以下操作:
    • 先调用 setjmp(这里调用的是 md.S 中的汇编代码)保存当前线程的上下文环境,以便当前线程的休眠时间超时时可以 longjmp 到这里,唤醒当前线程,继续往下执行。
    • 第一次调用 setjmp 返回 0,因此接着调用 _st_vp_schedule 函数,在该函数中会检测到若 run 队列中有可调度运行的线程的,则会将其取出来,然后 longjmp 到该线程中执行;否则切换到 idle 线程中。

2.5.10 _ST_SWITCH_CONTEXT

/*
* Switch away from the current thread context by saving its state and
* calling the thread scheduler
*/
#define _ST_SWITCH_CONTEXT(_thread) \
ST_BEGIN_MACRO \
ST_SWITCH_OUT_CB(_thread); \
if (!MD_SETJMP((_thread)->context)) { \
_st_vp_schedule(); \
} \
ST_DEBUG_ITERATE_THREADS(); \
ST_SWITCH_IN_CB(_thread); \
ST_END_MACRO

这里保存上下文环境,然后调度其他线程运行。

注意:这里 setjmp 和 longjmp 一般会使用 md.S 中的汇编代码。

2.5.11 _st_vp_schedule

void _st_vp_schedule(void)
{
_st_thread_t *thread; /* 若 run 队列中有可调度运行的线程,则将其取出来,同时从 run 队列移除它 */
if (_ST_RUNQ.next != &_ST_RUNQ) {
/* Pull thread off of the run queue */
thread = _ST_THREAD_PTR(_ST_RUNQ.next);
_ST_DEL_RUNQ(thread);
} else {
/* 否则调度到 idle 线程中 */
/* If there are no threads to run, switch to the idle thread */
thread = _st_this_vp.idle_thread;
}
ST_ASSERT(thread->state == _ST_ST_RUNNABLE); /* 这里调用 longjmp 切换到待调度运行的线程上下文环境 */
/* Resume the thread */
thread->state = _ST_ST_RUNNING;
_ST_RESTORE_CONTEXT(thread);
}

注,idle 线程做的操作主要就是:

  1. epoll_wait 激活监听到 I/O 事件的线程,将其添加到 run 队列中,等待调度;
  2. 检测 sleep 队列中的超时线程,若有线程的超时时间到达了,则调度该线程;

2.5.12 _ST_RESTORE_CONTEXT

/*
* Restore a thread context that was saved by _ST_SWITCH_CONTEXT or
* initialized by _ST_INIT_CONTEXT
*/
#define _ST_RESTORE_CONTEXT(_thread) \
ST_BEGIN_MACRO \
_ST_SET_CURRENT_THREAD(_thread); \
MD_LONGJMP((_thread)->context, 1); \
ST_END_MACRO

这里首先设置标识当前线程的全局变量指向该将要调度运行的线程,然后才 longjmp 到该线程中。

2.6 rtmp tcp 线程: SrsThread::thread_fun

在 SrsThread::start 函数中,当 SrsThread 的成员 _cid 小于 0 时,会循环调度 st_usleep 函数,将当前休眠休眠 10 * 1000 us,直到 _cid 准备好。在 st_usleep 函数中,会将当前线程的控制权让出去,进而调度其他线程执行。因而有可能会调度到刚创建的 "tcp" 线程,该线程的回调函数为 SrsThread::thread_fun。

void *SrsThread::thread_fun(void *arg)
{
SrsThread* obj = (SrsThread*)arg;
srs_assert(obj); /* 进入线程循环 */
obj->thread_cycle(); /* 下面是用于 valgrind 检测内存泄漏和非法内存操作 */
// for valgrind to detect.
SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);
if (ctx) {
ctx->clear_cid();
} st_thread_exit(NULL); return NULL;
}

2.6.1 SrsThread::thread_cycle

void SrsThread::thread_cycle()
{
int ret = ERROR_SUCCESS; /* 生成一个该线程的 context id */
_srs_context->generate_id();
srs_info("thread %s cycle start", _name); /* 获取该线程的 context id */
_cid = _srs_context->get_id(); srs_assert(handler);
/* 父类 ISrsThreadHandler 指针 handler 调用子类对象
* SrsReusableThread 的成员函数 on_thread_start */
handler->on_thread_start(); // thread is running now
really_terminated = false; // wait for cid to ready, for parent thread to get the cid.
while (!can_run && loop) {
/* 这里又将当前线程的控制权切换给其他线程,直到 can_run 和 loop 都为 1 时,
* 该线程再次调度到时才会继续往下执行真正的循环 */
st_usleep(10 * 1000);
} /* 当该线程的 can_run 和 loop 都为 1 时,且该线程的休眠时间到达了,才会再次
* 调度该线程从这里继续往下执行 */ while (loop) { if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d",
_name, ret);
goto failed;
}
srs_info("thread %s on before cycle success", _name); if ((ret = handler->cycle()) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret))
{
srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
}
goto failed;
}
srs_info("thread %s cycle success", _name); if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d",
_name, ret);
goto failed;
}
srs_info("thread %s on end cycle success", _name); failed:
if (!loop) {
break;
} // to improve performance, donot sleep when interval is zero.
// @see: https://github.com/ossrs/srs/issues/237
/* 每次线程的循环执行完毕时,都将当前线程休眠 cycle_interval_us 微妙,同时调度其他线程执行 */
if (cycle_interval_us != 0) {
st_usleep(cycle_interval_us);
}
} // readly terminated now.
really_terminated = true; handler->on_thread_stop();
srs_info("thread %s cycle finished", _name);
}

2.6.2 SrsReusableThread::on_thread_start

该函数位于 srs_app_thread.cpp 中:

void SrsReusableThread::on_thread_start()
{
handler->on_thread_start();
}

这里接着调用 ISrsReusableThreadHandler::on_thread_start() 函数,该函数为空。

3. 总结

配置文件中 rtmp 端口建立监听过程:

  1. 根据监听端口的个数,为每个待监听的端口都构建一个 SrsStreamListener 类,然后调用 SrsStreamListener::listen 函数;
  2. 在 SrsStreamListener::listen 函数中又构造了 SrsTcpListener 类,在该类的构造函数中,创建了一个可重复使用的线程 "tcp";
  3. 接着调用 SrsTcpListener::listen 开始绑定 ip 地址和监听端口;
  4. 最后调用 st_create_thread 创建一个线程.