Erlang简单并行服务器

时间:2023-02-04 20:11:56

Erlang简单并行服务器

(金庆的专栏)

Erlang并行服务器为每个Tcp连接创建对应的连接进程,处理客户端数据。

参考 Erlang程序设计(第2版)
17.1.3 顺序和并行服务器

并行服务器的诀窍是:每当gen_tcp:accept收到一个新连接时就立即分裂一个新进程。
为每个新套接字连接创建一个并行进程。

-module(gs_svr).
-author("jinqing").

-behaviour(gen_server).

%% API
-export([start_link/0]).

init([]) ->
    gs_listener:start_parallel(),
    {ok, #{}}.

gs_svr(GameServer gen_server)启动Tcp监听,并维护连接,如连接计数,发送广播。

start_parallel()创建监听端口,然后创建连接进程。

start_parallel() ->
    Port = server_csv:get_my_port(),
    lager:info("Starting game server on port ~p...", [Port]),
    {ok, ListenSocket} = gen_tcp:listen(Port,
        [binary, {packet, 4},
            {packet_size, 256 * 1024},  % limit packet size
            {reuseaddr, true},
            {nodelay, true},
            {backlog, 999999},
            {active, once}]),
    connection:spawn_connection(ListenSocket).

spawn_connection()创建连接进程。每接受一个连接就再创建一个新的连接进程。

-module(connection).
-author("jinqing").

%% API
-export([spawn_connection/1]).
-export([parallel_connect/1, loop/2]).

-spec spawn_connection(ListenSocket :: gen_tcp:socket()) -> pid().
spawn_connection(ListenSocket) ->
    spawn(fun() -> ?MODULE:parallel_connect(ListenSocket) end).

-spec parallel_connect(ListenSocket :: gen_tcp:socket()) -> ok.
parallel_connect(ListenSocket) ->
    {ok, Socket} = gen_tcp:accept(ListenSocket),
    spawn_connection(ListenSocket),
    
    gs_svr:cast_connection_new(self()),
    ConnStat = conn_stat:new(),
    erlang:send_after(1000, self(), timer_sec),
    try ?MODULE:loop(Socket, ConnStat)
    catch
        Type:E -> lager:error("loop() ~p:~p. ~p",
            [Type, E, erlang:get_stacktrace()])
    end,
    gs_svr:cast_connection_ended(self()),
    ok.

-spec loop(Socket :: gen_tcp:socket(), ConnStat :: conn_stat:conn_stat()) -> any().
loop(Socket, ConnStat) ->
    receive
        {tcp, Socket, Bin} ->
            NewConnStat = rpc_handler:handle_bin(Socket, Bin, ConnStat),
            inet:setopts(Socket, [{active, once}]),
            NewConnStat2 = cutil_dos_checker:on_data(size(Bin), NewConnStat),
            ?MODULE:loop(Socket, NewConnStat2#{idle_sec=>0});
        {tcp_closed, Socket} ->
            save_on_end(ConnStat);
        {tcp_error, Socket, Reason} ->
            save_on_end(ConnStat);

        {gs_to_connection, Msg} ->
            NewConnStat = handle_gs_msg(Msg, Socket, ConnStat),
            ?MODULE:loop(Socket, NewConnStat);

        timer_sec ->
            case conn_timer:timer_sec(ConnStat) of
                {ok, NewConnStat} ->
                    erlang:send_after(1000, self(), timer_sec),
                    ?MODULE:loop(Socket, NewConnStat);
            end;
        Other ->
            lager:error("Unknown msg: ~p", [Other]),
            ?MODULE:loop(Socket, ConnStat)
    end.  % This is tail-recursive.

缺点是连接进程没有加入监控树。gs_svr出错重启时,连接进程connection应该断开并退出。