Archive for February, 2008

Programming Erlang 第16章 OTP简介(完整)

Sunday, February 24th, 2008

OTP简介

译者: gashero

目录

OTP代表Open Telecom Platform。这个名字容易让人误解,但是其实OTP比你想象的更加通用。它是一个应用操作系统和一堆函数库,用以构建大型、容错和分布式的应用。它最初是在瑞典的爱立信开发的,并且在爱立信用于构建容错系统。(爱立信已经以EPL的形式放出了Erlang,EPL是从Mozilla许可协议继承来的)

OTP包含一些有用的工具,比如一个完整的WEB服务器,一个FTP服务器,一个CORBA ORB,等等,他们都是用Erlang编写的。OTP同时包含在电信技术中H248的技术实现标准、SNMP、和ASN.1到Erlang的交叉编译器。这里先不谈这些,你可以从后面的附录C了解更多。

如果你想使用OTP来编写你的系统,你会发现OTP行为(behavior)的概念对你很有用。一个行为包装了很多动作模式-可以理解为使用参数回调的应用程序框架。

OTP的强大之处在于容错、可量度的和代码热部署等等,他们都可以通过行为来提供。在其他部分,回调作者无需担心容错的问题,因为OTP已经内置了。在Java的思想中,你可以把行为看作是J2EE容器。

简单来说,行为解决了问题的非功能性部分,而回调函数解决功能性部分。对大多数应用来说,非功能性部分基本是相似的(例如代码热部署)。而功能性部分则各个应用不同。

在本章,我们来了解这些行为, gen_server 模块更详细一些。在我们讨论 gen_server 如何工作之前,我们首先启动一个简单的服务器(我们能想象的最简单的服务器),然后一步步的改变它,直到我们得到 gen_server 模块。通过这种方法,你可以深入的了解 gen_server 的工作原理,并且了解足够多的底层细节。

这里是本章的计划:

  1. 编写一个很小的CS程序
  2. 一步步扩展程序并添加功能
  3. 放入真实的代码

1   通向通用服务器之路

Note

这将是本书中最重要的章节,所以首先阅读一遍,然后再阅读一遍,直到一百遍,确保知道了每一个细节。

我们即将开始编写我们的服务器,分别叫做server1、server2、…,每个只是与前一个略有不同。目标是从程序中提取非功能性部分。最后一句可能对你来说有点晕,不过别担心。深呼吸……

1.1   例子1:基本服务器

如下是我们的第一个尝试。这是一个小服务器,我们可以参数化我们的回调模块:

-module(server1).
-export([start/2,rpc/2]).

start(Name,Mod) ->
    register(Name,spawn(fun() -> loop(Name,Mod,Mod:init()) end)).

rpc(Name,Request) ->
    Name ! {self(), Request},
    receive
        {Name,Response} -> Response
    end.

loop(Name,Mod,State) ->
    receive
        {From,Request} ->
            {Response,State} = Mod:handle(Request,State),
            From ! {Name,Response},
            loop(Name,Mod,State)
    end.

这是一个服务器的代码精髓,我们来写server1的回调模块,如下是命名服务器回调:

-module(name_server).
-export([init/0,add/2,whereis/1,handle/2]).
-import(server1,[rpc/2]).

%% client routines
add(Name,Place) -> rpc(name_server,{add,Name,Place}).
whereis(Name) -> rpc(name_server,{whereis,Name}).

%% callback routines
init() -> dict:new().

handle({add,Name,Place},Dict) -> {ok,dict:store(Name,Place,Dict)};
handle({whereis,Name},Dict) -> {dict:find(Name,Dict),Dict}.

这段代码实际完成了两项任务。他作为一个回调模块被服务器框架代码所调用,同时它包含可以被客户端调用的接口程序。OTP的惯例是把相同功能的代码放在一个模块中。

我们想让它工作,只需要:

1> server1:start(name_server,name_server).
true
2> name_server:add(joe,"at home").
ok
3> name_server:whereis(joe).
{ok,"at home"}

现在停止思考。回调没有并发,没有spawn,没有send,没有receive,没有register。他是纯的序列代码-这意味着什么呢?这意味着我们可以可以在并不理解底层并发模块的情况下泄CS模型。

这是一个简单服务器的通用模式。一旦你理解的简单的结构,就可以让你更快的理解了。

1.2   例子2:包含事务处理的服务器

这里的服务器在遇到客户端错误查询时会挂掉:

-module(server2).
-export([start/2,rpc/2]).

start(Name,Mod) ->
    register(Name,spawn(fun() -> loop(Name,Mod,Mod:init()) end)).

rpc(Name,Request) ->
    Name ! {self(), Request},
    receive
        {Name,crash} -> exit(rpc);
        {Name,ok,Response} -> Response
    end.

loop(Name,Mod,OldState) ->
    receive
        {From,Request} ->
            try Mod:handle(Request,OldState) of
                {Response,NewState} ->
                    From ! {Name,ok,Response},
                    loop(Name,Mod,NewState)
            catch
                _:Why ->
                    log_the_error(Name,Request,Why),
                    %% 发送消息导致客户端挂掉
                    From ! {Name,crash},
                    %% 使用原始状态继续循环
                    loop(Name,Mod,OldState)
            end
    end.

log_the_error(Name,Request,Why) ->
    io:format("Server ~p request ~p ~n"
              "caused exception ~p~n",
              [Name,Request,Why]).

这个例子中使用了事务语义,在发生异常时,它使用原来的值继续循环。但是如果处理函数执行成功,则会以NewState值来继续下一次循环。

为什么要保留原来的值呢?当处理函数失败时,错误的消息来自于客户端的错误。而客户端无法处理,因为错误的请求已经让处理函数失败了。但是其他客户端希望服务器仍然可用。此外,服务器的状态在发生错误时不会改变。

注意回调模块与server1的非常像。通过改变服务器和保持回调模块不变,我们可以改变回调模块中的非功能行为。

Note

最后一个语句并不总是true。我们在从server1到server2时还是要修改一点代码,就是把 -import 语句从server1改为server2。除此之外,就没有改变了。

1.3   例子3:含有代码热交换的服务器

现在我们加上代码热交换:

-module(server3).
-export([start/2,rpc/2,swap_code/2]).

start(Name,Mod) ->
    register(Name,spawn(fun() -> loop(Name,Mod,Mod:init()) end)).

swap_code(Name,Mod) -> rpc(Name,{swap_code,Mod}).

rpc(Name,Request) ->
    Name ! {self(), Request},
    receive
        {Name,Response} -> Response
    end.

loop(Name,Mod,OldState) ->
    receive
        {From, {swap_code,NewCallBackMod}} ->
            From ! {Name,ack},
            loop(Name,NewCallBackMod,OldState);
        {From,Request} ->
            {Response,NewState} = Mod:handle(Request,OldState),
            From ! {Name,Response},
            loop(Name,Mod,NewState)
    end.

他如何工作?

我们首先交换代码消息给服务器,然后他会改变回调模块到消息中包含的新的模块。

我们可以通过启动server3来演示,然后动态交换毁掉模块。我们不能使用 name_server 作为回调模块,因为我们把服务器名硬编码到服务器中了。所以,我们需要复制一份,叫做 name_server1 ,我们还需要这么改变代码:

-module(name_server1).
-export([init/0,add/2,whereis/1,handle/2]).
-import(server3,[rpc/2]).

%%客户端程序
add(Name,Place) -> rpc(name_server,{add,Name,Place}).
whereis(Name) -> rpc(name_server,{whereis,Name}).

%%回调函数
init() -> dict:new().

handle({add,Name,Place},Dict) -> {ok,dict:store(Name,Place,Dict)};
handle({whereis,Name},Dict) -> {dict:find(Name,Dict),Dict}.

首先我们以name_server1启动server3:

1> server3:start(name_server,name_server1).
true
2> name_server:add(joe,"at home").
ok
3> name_server:add(helen,"at work").
ok

现在假设我们想要找到名字服务器中的所有名字。而没有函数提供此功能,name_server1模块只提供了增加和查询名字的功能。

所以,我们以迅雷不及掩耳盗铃之势启动了编辑器并写了新的回调模块:

-module(new_name_server).
-export([init/0,add/2,all_names/0,delete/1,whereis/1,handle/2]).
-import(server3,[rpc/2]).

%%接口
all_names() -> rpc(name_server,allNames).
add(Name,Place) -> rpc(name_server,{add,Name,Place}).
delete(Name) -> rpc(name_server,{delete,Name}).
whereis(Name) -> rpc(name_server,{whereis,Name}).

%%回调函数
init() -> dict:new().

handle({add,Name,Place},Dict) -> {ok,dict:store(Name,Place,Dict)};
handle(allNames,Dict) -> {dict:fetch_keys(Dict),Dict};
handle({delete,Name},Dict) -> {ok,dict:erase(Name,Dict)};
handle({whereis,Name},Dict) -> {dict:find(Name,Dict),Dict}.

我们编译如上模块,并且叫服务器交换回调模块:

4> c(new_name_server).
{ok,new_name_server}
5> server3:swap_code(name_server,new_name_server).
ack

现在我们可以运行服务器上的函数了:

6> new_name_server:all_names().
[joe,helen]

这次我们修改模块就是热部署的-动态代码升级,在你眼前运行的,有如魔术一般。

现在停下再想想。我们做的最后两项任务看起来很简单,但是事实上很困难。包含“事务语义”的服务器非常难写,含有代码热部署的服务器也很困难。

这个技术非常强大。传统意义上,我们认为服务器作为程序包含状态,并且在我们发送消息时改变状态。服务器代码在第一次被调用时就固定了,如果我们想要改变服务器的代码就需要重启服务器。在我们给出的例子中,服务器的代码改变起来有如我们改变状态一样容易(在软件的不停机维护升级中,我经常使用该技术)。

1.4   例子4:事务和代码热交换

在上两个服务器中,代码热升级和事务语义是分开介绍的。这里我们把它们合并到一起:

-module(server4).
-export([start/2,rpc/2,swap_code/2]).

start(Name,Mod) ->
    register(Name,spawn(fun() -> loop(Name,Mod,Mod:init()) end)).

swap_code(Name,Mod) -> rpc(Name,{swap_code,Mod}).

rpc(Name,Request) ->
    Name ! {self(), Request},
    receive
        {Name,crash} -> exit(rpc);
        {Name,ok,Response} -> Response
    end.

loop(Name,Mod,OldState) ->
    receive
        {From,{swap_code,NewCallBackMod}} ->
            From ! {Name,ok,ack},
            loop(Name,NewCallBackMod,OldState);
        {From,Request} ->
            try Mod:handle(Request,OldState) of
                {Response,NewState} ->
                    From ! {Name,ok,Response},
                    loop(Name,Mod,NewState)
            catch
                _:Why ->
                    log_the_error(Name,Request,Why),
                    From ! {Name,crash},
                    loop(Name,Mod,OldState)
            end
    end.

log_the_error(Name,Request,Why) ->
    io:format("Server ~p request ~p~n"
              "caused exception ~p~n",
              [Name,Request,Why]).

这个服务器同时提供了代码热交换和事务语义,并且很整洁。

1.5   例子5:更有趣的功能

现在我们已经对代码热交换有主意了,我们还可以找些有趣的做。下面的服务器在你告知他作为特定类型的服务器之前什么都不做:

-module(server5).
-export([start/0,rpc/2]).

start() -> spawn(fun() -> wait() end).

wait() ->
    receive
        {become,F} -> F()
    end.

rpc(Pid,Q) ->
    Pid ! {self(),Q},
    receive
        {Pid,Reply} -> Reply
    end.

如果我们启动它然后发送 {become,F} 消息,它就会变成对F()求值的服务器,我们可以这样启动:

1> Pid=server5:start().
<0.57.0>

我们的服务器在等待消息时什么都不做。

现在我们定义服务器函数。也并不复杂,只要计算斐波拉契:

-module(my_fac_server).
-export([loop/0]).

loop() ->
    receive
        {From,{fac,N}} ->
            From ! {self(),fac(N)},
            loop();
        {become,Something} ->
            Something()
    end.

fac(0) -> 1;
fac(N) -> N*fac(N-1).

Note

PlantLab中的Erlang

几年前,在我PlanetLab做研究。我有权访问PlanetLab的网络,所以我在所有的服务器上都安装了空的Erlang服务器(大约450台)。我并不知道我要这些机器干什么用,所以后来我开始研究服务器结构。

当我可以让这一层很好的运行时,发送消息到一台空服务器,以便让他成为真正的服务器就很简单了。

常见的途径是启动WEB服务器,然后安装WEB服务器插件。我的方法是再后退一步,只是安装一个空服务器,然后在空服务器中安装WEB服务器。而当我们安装好WEB服务器以后,再告诉它应该变成什么。

只要确保他们编译好了,然后我们就可以告诉进程 <0.57.0> 变成一个斐波拉契服务器:

2> c(my_fac_server).
{ok,my_fac_server}
3> Pid ! {become,fun my_fac_server:loop/0}.
{become,#Fun<my_fac_server.loop.0>}

现在,我们的进程已经变成斐波拉契服务器了,我们可以这样调用:

4> server5:rpc(Pid,{fac,30}).
2652528598121910586363084800000000

我们的还会继续保持作为斐波拉契服务器的状态,直到我们发送消息 {become,Something} 来让他改变。

有如你所看到的,我们可以在一定范围内改变服务器的类型。这种技术很强大,使用该技术可以构建小巧而美观的服务器,却有很高的灵活性。在我们编写工业规模的,拥有数百名程序员的项目时,我们并不希望有些事情变得太过于动态。我们必须在通用和功能强大方面做出取舍,因为我们要的是产品。灵活多变的代码往往造成一堆难于调试的bug。如果我们已经在程序中做了多处动态修改,然后crash了,那会很难找到问题。

本节的服务器例子并不是非常正确。他们只是用来展示棘手问题的发展,他们实在太小了,而且还有些狡猾的错误。我不会立即告诉你们这些,但是在本章末尾,我会给出一些提示。

Erlang模块 gen_server 是构建久经考验服务器的优秀方案。

他在1998年就开始应用于工业级别的产品中了。一个产品中往往包含数百个服务器。这些服务器是使用正规的顺序化代码写成。所有的错误和非功能性行为都被放在了服务器中的通用部分了。

所以现在我们可以直接跳到 gen_server 的怀抱了。

2   gen_server 入门

这里是另一个极端。注意使用 gen_server 编写回调模块的三点计划:

  1. 决定回调函数名
  2. 编写接口函数
  3. 编写六个必需的回调函数

这很简单,无需思考,只要跟着感觉走就行了!

2.1   步骤1:决定回调函数名

我们来做一个简单的支付系统,我们把模块名叫做 my_bank ,这个系统已经在应用了,只是客户封闭了代码,如果他们再次发布源码你会感觉跟这个例子很像。

2.2   步骤2:编写接口函数

我们定义五个接口例程,都在模块 my_bank 中:

start() :开启银行

stop() :关闭银行

new_account(Who) :创建账户

deposit(Who,Amount) :存款

withdraw(Who,Amount) :取款

他们最终都是需要调用 gen_server 的,如:

start() -> gen_server:start_link({local,?MODULE},?MODULE,[],[]).
stop() -> gen_server:call(?MODULE,stop).

new_account(Who) -> gen_server:call(?MODULE,{new,Who}).
deposit(Who,Amount) -> gen_server:call(?MODULE,{add,Who,Amount}).
withdraw(Who,Amount) -> gen_server:call(?MODULE,{remove,Who,Amount}).

gen_server:start_link({local,Name},Mod,...) 开启一个本地服务器(通过参数global,可以构造一个可以被Erlang集群节点所访问的服务器)。宏 ?MODULE 会被解析为模块名 my_bank 。 Mod 是回调模块名。我们暂时会忽略其他传递给 gen_server:start 的参数。

gen_server:call(?MODULE,Term) 用于对服务器的远程调用。

2.3   步骤3:编写六个必需的回调函数

我们的回调模块导出了六个回调例程: init/1 、 handle_call/3 、 handle_cast/2 、 handle_info/2 、 terminate/2 、 code_change/3 。为了快速实现,我们使用模板来构造 gen_server 。如下是最简单的例子:

-module().
%% gen_server_mini_template

-behaviour(gen_server).
-export([start_link/0]).
%% gen_server callbacks
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,
        terminate/2,code_change/3]).

start_link() -> gen_server:start_link({local,?SERVER},?MODULE,[],[]).

init([]) -> {ok,State}.

handle_call(_Request,_From,State) -> {reply,Reply,State}.
handle_cast(_Msg,State) -> {noreply,State}.
handle_info(_Info,State) -> {noreply,State}.
terminate(_Reason,_State) -> ok.
code_change(_OldVsn,State,Extra) -> {ok,State}.

模板包含了最简单的需要填入服务器的框架(skeleton)。关键字 -behaviour 用于告知编译器在我们没有定义适当的回调函数时给出错误信息。

Tip

如果你正在使用emacs,你可以把 gen_server 模板放到快捷键中。可以修改erlang-mode,然后 Erlang>Skeleton菜单提供创建 gen_server 的模块。如果你不使用emacs,也别担心,我在本章末尾提供了该模板。

我们从模板开始编辑修改。我们所需要做的只是让参数与接口相匹配。

最重要的部分是 handle_call/3 函数。我们需要写出匹配3个查询术语的接口程序。也就是我们需要填充省略号处的代码:

handle_call({new,Who},From,State) ->
    Reply = ...
    State1 = ...
    {reply,Reply,State1};
handle_call({add,Who,Amount},From,State) ->
    Reply = ...
    State1 = ...
    {reply,Reply,State1};
handle_call({remove,Who,Amount},From,State) ->
    Reply = ...
    State1 = ...
    {reply,Reply,State1};

Reply的值会被发送到客户端作为远程调用的返回值。

State是服务器状态的全局变量,需要被服务器中持续传递。在我们的银行模块中,状态不会改变;他只是一个ETS表索引,而且是个常量(尽管表格的内容会改变)。

当我们填写模板并且修改后,我们得到如下代码:

init([]) -> {ok,ets:new(?MODULE,[])}.

handle_call({new,Who},_From,Tab) ->
    Reply=case ets:lookup(Tab,Who) of
                [] -> ets:insert(Tab,{Who,0}),
                    {welcome,Who};
                [_] -> {Who,you_already_are_a_customer}
            end,
    {reply,Reply,Tab};
handle_call({add,Who,X},_From,Tab) ->
    Reply=case ets:lookup(Tab,Who) of
                [] -> not_a_customer;
                [{Who,Balance}] ->
                    NewBalance=Balance+X,
                    ets:insert(Tab,{Who,NewBalance}),
                    {thanks,Who,your_balance_is,NewBalance}
            end,
    {reply,Reply,Tab};
handle_call({remove,Who,X},_From,Tab) ->
    Reply=case ets:lookup(Tab,Who) of
                [] -> not_a_customer;
                [{Who,Balance}] when X =< Balance ->
                    NewBalance=Balance -X,
                    ets:insert(Tab,{Who,NewBalance}),
                    {thanks,Who,your_balance_is,NewBalance};
                [{Who,Balance}] ->
                    {sorry,Who,you_only_have,Balance,in_the_bank}
            end,
    {reply,Reply,Tab};
handle_call(stop,_From,Tab) ->
    {stop,normal,stopped,Tab}.

handle_cast(_Msg,State) -> {noreply,State}.
handle_info(_Info,State) -> {noreply,State}.
terminate(_Reason,_State) -> ok.
code_change(_OldVsn,State,Extra) -> {ok,State}.

我们可以通过调用 gen_server:start_link(Name,CallBackMod,StartArgs,Opts) 来启动服务器,然后第一个被调用的历程是回调模块的 Mod:init(StartArgs) ,而且必须返回 {ok,State} 。State的值会在以后作为 handle_call 的第三个参数一直传递。

注意我们如何停止服务器。 handle_call(Stop,From,Tab) 返回 {stop,Normal,stopped,Tab} 会停止服务器。第二个参数normal用作调用 my_bank:terminate/2 的第一个参数。第三个参数stopped会作为 my_bank:stop() 的返回值。

就这些了,让我们看看如何使用银行:

1> my_bank:start().
{ok,<0.33.0>}
2> my_bank:deposit("joe",10).
not_a_customer
3> my_bank:new_account("joe").
{welcome,"joe"}
4> my_bank:deposit("joe",10).
{thanks,"joe",your_balance_is,10}
5> my_bank:deposit("joe",30).
{thanks,"joe",your_balance_is,40}
6> my_bank:withdraw("joe",15)
{thanks,"joe",your_balance_is,25}
7> my_bank:withdraw("joe",45).
{sorry,"joe",you_only_have,25,in_the_bank}

3   gen_server 回调结构

现在我们已经有主意了,我们多了解一下 gen_server 的回调结构。

3.1   启动服务器时发生了什么?

通过 gen_server:start_link(Name,Mod,InitArgs,Opts) 启动时,他会创建一个叫做Name的通用服务器。回调模块是Mod。Opts控制通用服务器的行为,可以指定消息日志、调试函数等等。通用服务器会启动回调 Mod:init(InitArgs) 。

模板的初始化入口如下:

%% Function: init(Args) -> {ok,State}
%%                         {ok,State,Timeout}
%%                         ignore
%%                         {stop,Reason}
init([]) ->
    {ok,#state{}}.

对于正常操作,我们只需要返回 {ok,State} 。其他参数的含义可以参考 gen_server 的手册页。

如果返回了 {ok,State} ,我们就算是成功的启动了服务器,并初始化了状态State。

3.2   我们调用服务器时发生了什么?

想要调用服务器,客户端程序调用 gen_server:call(Name,Request) 。这最终会调用回调模块的 handle_call/3 。

handle_call/3 拥有如下入口模板:

%% Function: handle_call(Request,From,State) -> {reply,Reply,State}
%%                                              {reply,Reply,State,Timeout}
%%                                              {noreply,State}
%%                                              {noreply,State,Timeout}
%%                                              {stop,Reason,Reply,State}
%%                                              {stop,Reason,State}
handle_call(_Request,_From,State) ->
    Reply=ok,
    {reply,Reply,State}.

Request参数重新出现在 handle_call/3 的第一个参数中。From是请求客户端进程的PID,而State则是当前客户端的状态。

一般来说,我们返回 {reply,Reply,NewState} 。当这发生时,Reply会发送到客户端,而它是作为 gen_server:call 的返回值的。NewState是服务器的下一个状态。

另一个返回值 {noreply,…} 和 {stop,…} 用于罕见的特殊情况。没有返回值会让服务器继续工作,而客户端会继续等待服务器的响应,所以服务器需要委托其他进程来作出响应。调用stop会适当的结束服务器。

3.3   调用与投送(Cast)

我们已经看到了 gen_server:call 与 handle_call 的相互影响了。这是用于实现RPC(Remote Procedure Call)的。 gen_server:cast(Name,Name) 实现了投送(cast),就是没有返回值的调用(实际上是消息,但是一般叫做投送,以区别RPC)。

相应的回调例程是 handle_cast ,入口模板如下:

%% Function: handle_cast(Msg,State) -> {noreply,NewState}
%%                                     {noreply,NewState,Timeout}
%%                                     {stop,Reason,NewState}
handle_cast(_Msg,State) ->
    {noreply,NewState}.

处理函数通常返回 {noreply,NewState} ,就是改变服务器的状态。而 {stop,…} 则会停止服务器。

3.4   发到服务器的自发信息

回调函数 handle_info(Info,State) 用于处理发送到服务器的自发信息。那什么是自发信息呢?如果服务器连接到的一个进程突然退出了,那么他会立即收到不期望的 {“EXIT”,Pid,What} 消息。另外,系统中任何知道通用服务器PID的进程都可以给他发消息。在服务器有如Info的值时会发出死掉的消息(Any message like this ends up at the server as the value of Info)。

handle_info 的入口模板如下:

%% Function: handle_info(Info,State) -> {noreply,State}
%%                                      {noreply,State,Timeout}
%%                                      {stop,Reason,State}
handle_info(_Info,State) ->
    {noreply,State}.

返回值与 handle_cast 相同。

3.5   来吧,宝贝

服务器可能会因为多种原因而停止。其中一个原因是 handle_Something 例程可能会返回 {stop,Reason,NewState} ,或者服务器可能会挂了并抛出 {‘EXIT’,reason} 。在这些情况中,无论发生了什么, terminate(Reason,NewState) 会被调用。

如下是模板:

%% Function: terminate(Reason,State) -> void()
terminate(_Reason,State) ->
    ok.

这些代码不会返回新的状态,因为他们已经停止了。所以,我们应该如何处理状态呢?可以做很多事情,比如打扫战场。我们可以把它存储到磁盘,发送消息通知其他进程,或者抛弃其依赖的应用程序。如果你想要服务器在未来被重启,你还需要编写被 terminate/2 触发的“我会回来”函数。

3.6   代码改变

你可以在服务器运行时动态改变你的代码。这个回调函数包含处理释放子系统并提供软件更新的功能。

相关章节请参考OTP设计原则文档(http://www.erlang.org/doc/pdf/design_principles.pdf)。

%% Function: code_change(OldVsn,State,Extra) -> {ok,NewState}
code_change(_OldVsn,State,_Extra) -> {ok,State}.

4   代码与模板

这是通过emacs-mode生成的,通用服务器模板:

%%% -----------------------------------------------
%%% File: gen_server_template.full
%%% Author: ...
%%% Description: ...
%%% Created: ...
%%% -----------------------------------------------
-module().
-behaviour(gen_server).

%% API
-export([start_link/0]).

%% gen_server callbacks
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,
        terminate/2,code_change/3]).

-record(state,{}).

%%% Function start_link() -> {ok,Pid} | ignore | {error,Error}
%%% 启动服务器
start_link() ->
    gen_server:start_link({local,?SERVER},?MODULE,[],[]).

%%% Function init(Args) -> {ok,State} |
%%%                        {ok,State,Timeout} |
%%%                        ignore |
%%%                        {stop,Reason}
%%% 初始化服务器
init([]) ->
    {ok,#state{}}.

%%% Function handle_call(Request,From,State) -> {reply,Reply,State} |
%%%                                             {reply,Reply,State,Timeout} |
%%%                                             {noreply,State} |
%%%                                             {noreply,State,Timeout} |
%%%                                             {stop,Reason,Reply,State} |
%%%                                             {stop,Reason,State}
%%% 处理所有消息
handle_call(_Request,_From,State) ->
    Reply=ok,
    {reply,Reply,State}.

%%% Function handle_cast(Msg,State) -> {noreply,State} |
%%%                                    {noreply,State,Timeout} |
%%%                                    {stop,Reason,State}
%%% 处理所有投送消息
handle_cast(_Msg,State) ->
    {noreply,State}.

%%% Function handle_info(Info,State) -> {noreply,State} |
%%%                                     {noreply,State,Timeout} |
%%%                                     {stop,Reason,State}
%%% 处理所有调用或投送消息
handle_info(_Info,State) ->
    {noreply,State}.

%%% Function terminate(Reason,State) -> void()
%%% 在服务器停止时被调用,与init/1做的事情相反,释放资源。当它返回时,
%%% gen_server就会以Reason终止,返回值会被忽略
terminate(_Reason,_State) ->
    ok.

%%% Function code_change(OldVsn,State,Extra) -> {ok,NewState}
%%% 在代码改变时覆盖进程状态
code_change(_OldVsn,State,_Extra) ->
    {ok,State}.

%%% 内部函数

my_bank的代码:

-module(my_bank).
-behaviour(gen_server).
-export([start/0]).
%% gen_server回调
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,
        terminate/2,code_change/3]).
-compile(export_all).

start() -> gen_server:start_link({local,?MODULE},?MODULE,[],[]).
stop() -> gen_server:call(?MODULE,stop).

new_account(Who) -> gen_server:call(?MODULE,{new,Who}).
deposit(Who,Amount) -> gen_server:call(?MODULE,{add,Who,Amount}).
withdraw(Who,Amount) -> gen_server:call(?MODULE,{remove,Who,Amount}).

init([]) -> {ok,ets:new(?MODULE,[])}.

handle_call({new,Who},_From,Tab) ->
    Reply=case ets:lookup(Tab,Who) of
            [] -> ets:insert(Tab,{Who,0}),
                    {welcome,Who};
            [_] -> {Who,you_already_are_a_customer}
          end,
        {reply,Reply,Tab};
handle_call({add,Who,X},_From,Tab) ->
    Reply=case ets:lookup(Tab,Who) of
            [] -> not_a_customer;
            [{Who,Balance}] ->
                NewBalance=Balance+X,
                ets:insert(Tab,{Who,NewBalance}),
                {thanks,Who,your_balance_is,NewBalance}
          end,
        {reply,Reply,Tab};
handle_call({remove,Who,X},_From,Tab) ->
    Reply=case ets:lookup(Tab,Who) of
            [] -> not_a_customer;
            [{Who,Balance}] when X =< Balance ->
                NewBalance=Balance-X,
                ets:insert(Tab,{Who,NewBalance}),
                {thanks,Who,you_only_have,Balance,in_the_bank}
          end,
        {reply,Reply,Tab};
handle_call(stop,_From,Tab) ->
    {stop,normal,stopped,Tab}.
handle_cast(_Msg,State) -> {noreply,State}.
handle_info(_Info,State) -> {noreply,State}.
terminate(_Reason,_State) -> ok.
code_change(_OldVsn,State,Extra) -> {ok,State}.

5   深度挖掘

gen_server 实际上很简单。不过我们并没有了解 gen_server 的所有接口,也没有必要知道所有参数的含义。当你明白基本原理之后,你只需要查阅 gen_server 的手册就够了。

在本章,我们只是讨论了使用 gen_server 的最简单情况,但是适用于大多数情况。复杂的应用经常让 gen_server 返回 noreply 而是委派真正的响应给另一个进程想要了解这些,请参与设计原则文档(http://www.erlang.org/doc/pdf/design_principles.pdf)。和模块 sysproc_lib 的手册页。

Programming Erlang 第14章笔记 Socket编程

Thursday, February 21st, 2008

Socket编程

译者: gashero

目录

大多数我写的更有趣的程序都包含了Socket。一个Socket是一个允许机器与Internet上另一端使用IP通信的端点。本章关注Internet上两种核心网络协议:TCP和UDP。

UDP允许应用发送简短报文(叫做数据报datagram)到另一端,但是对报文没有交付的担保。并且可能在到达时有错误的顺序。而TCP则提供了可靠的字节流,并且确保在连接后传输数据的顺序也是对的。

为什么Socket编程很有趣呢?因为它允许应用于其他Internet上的机器通信,而这些比本地操作更有潜力。

有两种主要的库用于Socket编程: gen_tcp 用于TCP编程、 gen_udp 用于UDP编程。

在本章,我们看看如果使用TCP和UDP socket编写客户端和服务器。我们将会尝试多种形式的服务器(并行、串行、阻塞、非阻塞)并且看看通信接口应用如何将数据流传递给其他应用。

1   使用TCP

我们学习Socket编程的历险从一个从服务器获取TCP数据的程序开始。然后我们会写一个简单的串行TCP服务器展示如何并行的处理多个并发会话。

1.1   从服务器获取数据

我们先写一个小函数(标准库的 http:request(Url) 实现相同的功能,但是这里是演示TCP的)来看看TCP socket编程获取 http://www.google.com 的HTML页面:

nano_get_url() ->
    nano_get_url("www.google.com").

nano_get_url(Host) ->
    {ok,Socket}=gen_tcp:connect(Host,80,[binary,{packet,0}]),
    ok=gen_tcp:send(Socket,"GET / HTTP/1.0\r\n\r\n"),
    receive_data(Socket,[]).

receive_data(Socket,SoFar) ->
    receive
        {tcp,Socket,Bin} ->
            receive_data(Socket,[Bin|SoFar]);
        {tcp_closed,Socket} ->
            list_to_binary(reverse(SoFar))
    end.

它如何工作呢?

  1. 我们通过 gen_tcp:connect 在 http://www.google.com 打开TCP协议80端口。connect的参数binary告知系统以binary模式打开socket,并且以二进制方式传递数据到应用。 {packet,0} 意味着无需遵守特定格式即可将数据传递到应用。
  2. 我们调用 gen_tcp:send 并发送消息 GET / HTTP/1.0\r\n\r\n 到socket。然后我们等待响应。响应并不会一次性得到,而是分片的、分时间的。这些分片是按照一系列报文的方式接收的,并且通过打开的socket发送到进程。
  3. 我们接收一个 {tcp,Socket,Bin} 报文。第三个参数是binary。这是因为我们已经使用二进制方式打开了socket。这是从WEB服务器发到我们的一个消息报文。我们把这个报文加到分片列表并等待下一个报文。
  4. 我们接收到 {tcp_closed,Socket} 报文,这在服务器发送完所有数据时发生(这仅在HTTP/1.0时正确,现在版本的HTTP使用另外一种结束策略)。
  5. 当我们收到了所有的分片,存储顺序是错误的,所以我们重新对分片排序和连接。

我们看看他如何工作:

1> B=socket_examples:nano_get_url().
<<"HTTP/1.0 302 Found\r\nLocation: http://www.google.se/\r\n
    Cache-Control: private\r\nSet-Cookie: PREF=ID=b57a2c:TM"...>>

Note

当运行 nano_get_url 时,结果是二进制的,而你看到的则是Erlang shell以便于阅读的方式展示的。当以便于阅读方式展示时,所有控制字符都是以转义格式显示。二进制数据也会被截短,后面以省略号显示。如果想要看所有的二进制数据,你可以通过 io:format 打印或者使用 string:tokens 分片显示:

2> io:format("~p~n",[B]).
<<"HTTP/1.0 302 Found\r\nLocation: http://www.google.se/\r\n
    Cache-Control: private\r\nSet-Cookie: PREF=ID=B57a2c:TM"
    TM=176575171639526:LM=1175441639526:S=gkfTrK6AFkybT3;
    ... 还有一大堆行 ...
>>
3> string:tokens(binary_to_list(B),"\r\n").
["HTTP/1.0 302 Found",
"Location: http://www.google.se/",
"Cache-Control: private",
... 还有一大堆行 ...

这就差不多WEB客户端的工作方式(不过在浏览器中正确的显示要做更多的工作)。上面的代码只是实验的开始。你可以尝试做一些修改来下载整个网站或者读取电子邮件。可能性是无限的。

注意分片的重新组装方式如下:

receive_data(Socket,SoFar) ->
    receive
        {tcp,Socket,Bin} ->
            receive_data(Socket,[Bin|SoFar]);
        {tcp_closed,Socket} ->
            list_to_binary(reverse(SoFar))
    end.

每当我们收到分片时,就把他们加入SoFar这个列表的头部。当收到了所有分片时,Socket就关闭了,我们颠倒顺序,并且把所有分片组合起来。

你可能以为收到分片后立即合并会好些:

receive_data(Socket,SoFar) ->
    receive
        {tcp,Socket,Bin} ->
            receive_data(Socket,list_to_binary([SoFar,Bin]));
        {tcp_closed,Socket} ->
            SoFar
    end.

这段代码是正确,但是效率比较低。因为后一种版本不断的把新的二进制数据加到缓冲区后面,也就是包含了多个数据的拷贝的。一个好办法是累积所有分片,尽管顺序是相反的,然后反序整个列表并一次连接所有分片。

1.2   一个简单的TCP服务器

在前一节,我们写了一个简单的客户端。现在我们写个服务器。

服务器开启2345端口然后等待一个消息。这个消息是包含Erlang术语的二进制数据,这个术语是包含Erlang表达式的字符串。服务器对该表达式求值并且将结果通过socket发到客户端。

Note

如何编写WEB服务器

编写WEB客户端或服务器是很有趣的。当然,有些人已经写好这些了,但是如果想要真正理解他们的工作原理,研究底层实现还是很有意义的。谁知道呢,说不定我们写的WEB服务器更好。所以我们看看如何做吧?

想要构建一个WEB服务器,任何一个需要实现标准的Internet协议,我们需要使用正确的工具和了解协议实现。

在我们的例子用来抓取一个WEB页,我们如何知道已经正确打开了80端口,并且如何知道已经发送了 GET / HTTP/1.0\r\n\r\n 到服务器?答案很简单。所有主要协议都已经在RFC文档中有描述。HTTP/1.0定义于RFC1945,所有RFC的官方网站是 http://www.letf.org

另外一个非常有用的方法是抓包。通过一个数据包嗅探器,我们可以抓取和分析所有IP数据包,无论是应用程序发出的还是接收的。大多数嗅探器包含解码器和分析器可以得出数据包的内容和格式。一个著名的嗅探器是Wireshark(以前叫Ethereal),可以到 http://www.wireshark.org/ 了解更多。

使用嗅探器和RFC武装起来的我们,就可以准备编写下一个杀手级应用了。

编写这个程序(或者其他使用TCP/IP的程序),需要响应一些简单的请求:

  • 数据如何组织,知道数据如何组成请求或者响应?
  • 数据在请求和响应中如何编码(encode & marshal)和解码(decode & demarshal)

TCP socket数据只是没有格式的字节流。在传输时,数据会切成任意长度的分片,所以我们需要多少数据如何组成请求或响应。

在Erlang的例子,我们使用简单的转换,把每个逻辑请求或响应前加上N(1/2/4)字节的长度数。这就是 {packet,N} (这里的packet表示一个应用程序请求或响应报文,而不是电线里面的物理包) 参数在 gen_tcp:connect 和 gen_tcp:listen 函数的意义。注意packet附带的那个参数在客户端和服务器端必须商定好。如果服务器使用 {packet,2} 而客户端使用 {packet,4} 则会出错。

在我们以 {packet,N} 选项打开socket后,我们就不需要担心数据分片了。Erlang驱动会自动确保数据报文的所有分片都收到并且长度正确时才发到应用程序。

另一个需要注意的是数据编码和解码。最简单时,我们可以用 term_to_binary 来对Erlang术语编码,并使用 binary_to_term 来解码数据。

注意,客户端和服务器通信的包转换和编码规则是两行代码完成,分别使用 {packet,4} 来打开socket和使用 term_to_binary 和其反函数完成编码和解码数据。

我们可以简单的打包和编码Erlang术语到基于文本的协议如HTTP或XML。使用Erlang的 term_to_binary 和其反函数可以比基于XML等文本的协议性能高出一个数量级。现在我们先看看一个简单的服务器:

start_nano_server() ->
    {ok,Listen}=gen_tcp:listen(2345,[binary,{packet,4},
                                            {reuseaddr,true},
                                            {active,true}]),
    {ok,Socket}=gen_tcp:accept(Listen),
    gen_tcp:close(Listen),
    loop(Socket).

loop(Socket) ->
    receive
        {tcp,Socket,Bin} ->
            io:format("Server received binary = ~p~n",[Bin]),
            Str=binary_to_term(Bin),
            io:format("Server (unpacked) ~p~n",[Str]),
            Reply=lib_misc:string2value(Str),
            io:format("Server replying = ~p~n",[Reply]),
            gen_tcp:send(Socket,term_to_binary(Reply)),
            loop(Socket);
        {tcp_closed,Socket} ->
            io:format("Server socket closed~n")
    end.

它如何工作?

  1. 首先,我们调用 gen_tcp:listen 来监听2345端口,并且设置报文转换格式为 {packet,4} ,意味着每个包有4个字节的包头,代表长度。然后 gen_tcp:listen(..) 会返回 {ok,Socket} 或者 {error,Why} ,但是我们先看看成功的情况。所以写下如下代码 {ok,Listen}=gen_tcp:listen(...), 这在程序返回 {error,…} 时发生匹配错误。如果成功则会绑定Listen到正在监听的socket。对于正在监听的socket,我们只需要做一件事,就是使用它做参数调用 gen_tcp:accept 。
  2. 现在我们调用 gen_tcp:accept(Listen) 。在这里,程序会挂起以等待连接。当我们获得连接时,这个函数返回已经绑定的Socket,这个socket就是可以与客户端连接并且可以通信的了。
  3. 当 gen_tcp:accept 返回,我们立即调用 gen_tcp:close(Listen) 。这就关闭了监听的socket,服务器也就不会继续接受新的连接了。而这不会影响已有的连接,只是针对新连接。
  4. 解码输入数据
  5. 对字符串求值
  6. 编码返回数据并且通过socket发送

注意,这个程序只接受一个请求,程序运行完成后就不会再接受其他请求了。

这是一个非常简单的服务器展示了如何打包和编码应用数据。接收请求,计算响应,发出响应,然后结束。

想要测试这个服务器,我们需要一个对应的客户端:

nano_client_eval(Str) ->
    {ok,Socket}=get_tcp:connect("localhost",2345,[binary,{packet,4}]),
    ok=gen_tcp:send(Socket,term_to_binary(Str)),
    receive
        {tcp,Socket,Bin} ->
            io:format("Client received binary = ~p~n",[Bin]),
            Val=binary_to_term(Bin),
            io:format("Client result = ~p~n",[Val]),
            gen_tcp:close(Socket)
    end.

想要测试你的代码,我们需要在一台机器上同时启动客户端和服务器。所以在 gen_tcp:connect 中的hostname参数就可以用硬编码的 localhost 。

注意客户端和服务器端使用的 term_to_binary 和 binary_to_term 怎样编码和解码数据。

想要运行,我们需要开两个终端然后启动Erlang shell。

首先,我们启动服务器:

1> socket_examples:start_nano_server().

我们看不到任何输出,当然什么也没发生呢。然后我们在另一个终端启动客户端,输入如下命令:

1> socket_examples:nano_client_eval("list_to_tuple([2+3*4,10+20])").

在服务器端的窗口尅看到如下输出:

Server received binary = <<131,107,0,28,108,105,115,116,95,116,
                           111,95,116,117,112,108,101,40,91,50,
                           43,51,42,52,44,49,48,43,50,48,93,41>>
Server (unpacked)  "list_to_tuple([2+3*4,10+20])"
Server replying = {14,30}

在客户端窗口可以看到:

Client received binary = <<131,104,2,97,14,97,30>>
Client result = {14,30}
ok

最后,在服务器窗口看到:

Server socket closed

1.3   改进服务器

前一节我们构造了一个服务器可以接受一个请求并且终止。简单的修改代码,我们可以就可以完成另一个不同类型的服务器:

  1. 序列服务器,同一时间只接受一个请求
  2. 并行服务器,同一时间可以接受多个请求

原始启动代码如下:

start_nano_server() ->
    {ok,Listen} = gen_tcp:listen(...),
    {ok,Socket} = gen_tcp:accept(Listen),
    loop(Socket).
...

我们将会以此为基础完成另外两种服务器。

1.4   序列服务器

想要构造序列服务器,我们如下改变了代码:

start_seq_server() ->
    {ok,Listen} = gen_tcp:listen(...),
    seq_loop(Listen).

seq_loop(Listen) ->
    {ok,Socket} = gen_tcp:accept(Listen),
    loop(Socket),
    seq_loop(Listen).

loop(...) -> %%同以前的一样处理

这会让以前的例子在需要多个请求时工作的很好。我们一直让监听的socket开着而不是关闭。另一个不同是在 loop(Socket) 结束后,我们再次调用 seq_loop(Listen) ,以便接受后面的连接请求。

如果客户端在服务器忙于处理一个已经存在的连接时尝试连接服务器,连接请求会被缓存,直到服务器完成那个已经存在的请求。如果缓存的连接数大于监听backlog,连接会被拒绝。

我们只是展示了如何开启服务器。关闭服务器很简单(停止并行服务器也一样),只要kill掉启动服务器的进程即可。 gen_tcp 连接他本身到控制进程,如果控制进程死掉了,他也就会关闭socket。

1.5   并行服务器

构造并行服务器的秘诀是每次接受新的连接以后马上用 spawn 立即生成一个新的进程:

start_parallel_server() ->
    {ok,Listen} = gen_tcp:listen(...),
    spawn(fun() -> par_connect(Listen) end).

par_connect(Listen) ->
    {ok,Socket} = gen_tcp:accept(Listen),
    spawn(fun() -> par_connect(Listen) end),
    loop(Socket).

loop(...) -> %%同上

这段代码与串行服务器类似。重要的不同在于 spawn ,可以确保为每个连接创建一个进程。现在可以对比两种服务器了,可以看到他是如何将一个串行服务器转变为并行服务器的。

所有这三种服务器调用 gen_tcp:listen 和 gen_tcp:accept 不同在于如何调用函数是并行方式还是串行方式。

Note

知识点:

  • 创建socket的进程(调用 gen_tcp:accept 或者 gen_tcp:connect )叫做这个socket的 控制进程 。所有来自于socket的消息都会被发送到控制进程;如果控制进程死掉了,对应的socket就会被关闭。可以修改一个socket的控制进程为NewPid,通过 gen_tcp:controlling_process(Socket,NewPid) 。

  • 我们的并行服务器可能会随着连接而创建数千个连接。我们可能希望限制最大并发连接数。这可以通过一个活动连接计数器来实现。每次获得一个新连接时就增加1,而在一个连接完成时减少1。可以用这种机制显示并发连接数。

  • 在我们接受连接后,最好明确的设置请求socket选项,如下:

    {ok,Socket} = gen_tcp:accept(Listen),
    inet:setopts(Socket,[{packet,4},binary,{nodelay,true},{active,true}]),
    loop(Socket)
  • 在Erlang R11B-3中,几个Erlang进程可以在同一个监听中的socket调用 gen_tcp:accept/1 。这也是一种建立并发服务器的简单方式,因为你尅拥有预分配的进程池,等待 gen_tcp:accept/1 。

2   控制问题

Erlang中socket可以以3种模式打开:active、active once、passive。其设置可以通过 gen_tcp:connect(Address,Port,Options) 或 gen_tcp:listen(Port,Options) 中的Options参数来设置为 {active,true|false|once} 。

如果指定了 {active,true} ,就会创建一个主动(active)的socket; {active,false} 会创建一个被动的(passive)的socket; {active,once} 创建主动的socket,但是只接受一条消息,接收到消息后,必须手动重新开启(reenable)才能继续接受消息。

我们看看在不同地方使用的区别。

active和passive的socket的区别在于消息到来时的处理方式:

  • 一旦一个active的socket被创建了,控制进程会发送收到的数据,以 {tcp,Socket,Data} 消息的形式。而控制进程无法控制消息流。一个无赖的客户端可以发送无数的消息到系统,而这些都会被发送到控制进程。而控制进程无法停止这个消息流。
  • 如果socket在passive模式,控制进程需要调用 gen_tcp:recv(Socket,N) 来获取数据。它会尝试获取N字节的数据,如果N=0,就会尽可能的拿到所有可以取得的数据。这种情况下,服务器尅通过选择是否调用 gen_tcp:recv 来控制消息流。

被动模式的socket用于控制发送到服务器的数据流。为了举例,我们可以以3种方式编写消息接收循环:

  • 主动消息获取(非阻塞)
  • 被动消息获取(阻塞)
  • 混合方式获取(部分阻塞)

2.1   主动消息获取(非阻塞)

第一个例子是以主动模式打开socket,然后接受来自socket的数据:

{ok,Listen} = gen_tcp:listen(Port,[...,{active,true}...]),
{ok,Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    receive
        {tcp,Socket,Data} ->
            ... 输出处理 ...
        {tcp_closed,Socket} ->
            ...
    end.

这个过程无法控制发到服务器循环的消息流,如果客户端产生数据的速度大于服务器消费数据的速度,系统就会收到洪水般地消息-消息缓冲区溢出,系统将会crash并表现怪异。

这种类型的服务器叫做非阻塞服务器,因为它无法阻塞客户端。我们仅在信任客户端的情况下才会使用非阻塞服务器。

2.2   被动消息获取(阻塞)

在这一节,我们写阻塞服务器:服务器以被动模式打开socket,通过 {active,false} 选项。这个服务器不会被危险的客户端洪水袭击。

服务器循环中的代码调用 gen_tcp:recv 来接收数据。客户端在服务器调用 recv 之前会被阻塞。注意OS会对客户端发来的数据做一下缓冲,以允许客户端在服务器调用 recv 之前仍然可以继续发送一小段数据。

{ok,Listen} = gen_tcp:listen(Port,[...,{active,false}...]),
{ok,Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    case gen_tcp:recv(Socket,N) of
        {ok,B} ->
            ... 数据处理 ...
            loop(Socket);
        {error,closed}
            ...
    end.

2.3   混合消息获取(部分阻塞)

你可能认为把被动模式用到所有服务器上都合适。不幸的是,当我们在被动模式时,我们只能等待来自于一个socket的数据。这对于需要等待多个socket来源数据的服务器则不适用。

幸运的是我们可以用混合方式,既不是阻塞的也不是非阻塞的。我们以一次主动(active once)模式 {active,once} 打开socket。在这个模式中,socket是主动的,但是只能接收一条消息。在控制进程发出一条消息之后,他必须明确的调用 inet:setopts 以便让socket恢复并接收下一条消息。系统在这发生之前会一直阻塞。这是两种世界的最好结合点。如下是代码:

{ok,Listen} = gen_tcp:listen(Port,[...,{active,once}...]),
{ok,Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    receive
        {tcp,Socket,Data} ->
            ... 数据处理 ...
            %%准备好启用下一条消息时
            inet:setopts(Socket,[{active,once}]),
            loop(Socket);
        {tcp_closed,Socket} ->
            ...
    end.

使用 {active,once} 选项,用户可以实现高层次的数据流控制(有时叫交通管制),同时又防止了服务器被过多的消息洪水所淹没。

3   连接从哪里来?

假设我们建立了某种在线服务器,而却一直有人发垃圾邮件。那么我们需要做的第一件事就是确定他的来源。想要发现这件事,我们可以使用 inet:peername(Socket) 。

@spec inet:peername(Socket) -> {ok,{IP_Address,Port}} | {error,Why}

返回连接另一端的IP地址和端口号,以便服务器找到对方的地址。IP_Address是一个元组的整数形如 {N1,N2,N3,N4} ,而 {K1,K2,K3,K4,K5,K6,K7,K8} 则是IPv6的地址。这里的整数取值范围是0到255。

4   Socket的错误处理

socket的错误处理是非常简单的,基本上你不需要做任何事情。犹如前面讲过的,每个socket拥有控制进程(就是创建socket的进程)。如果控制进程死掉了,那么socket也会自动关闭。

这意味着,如果我们有,例如,一个客户端和服务器,而服务器因为编程错误死掉了。那么服务器拥有的socket会自动关闭,而客户端会收到 {tcp_closed,Socket} 消息。

我们可以通过如下的小程序测试这个机制:

error_test() ->
    spawn(fun() -> error_test_server() end),
    lib_misc:sleep(2000),
    {ok,Socket} = gen_tcp:connect("localhost",4321,[binary,{packet,2}]),
    io:format("connected to: ~p~n",[Socket]),
    gen_tcp:send(Socket,<<"123">>),
    receive
        Any ->
            io:format("Any=~p~n",[Any])
    end.

error_test_server() ->
    {ok,Listen} = gen_tcp:listen(4321,[binary,{packet,2}]),
    {ok,Socket} = gen_tcp:accept(Listen),
    error_test_server_loop(Socket).

error_test_server_loop(Socket) ->
    receive
        {tcp,Socket,Data} ->
            io:format("received:~p~n",[Data]),
            atom_to_list(Data),
            error_test_server_loop(Socket)
    end.

当我们运行它时,会看到如下输出:

1> socket_examples:error_test().
connected to:#Port<0.152>
received:<<"123">>
=ERROR REPORT==== 9-Feb-2007::15:18:15 ===
Error in process <0.77.0> with exit value:
 {badarg,[{erlang,atom_to_list,[<<3 bytes>>]},
 {socket_examples,error_test_server_loop,1}]}
Any={tcp_closed,#Port<0.152>}
ok

我们生成了一个服务器,并让它每两秒有一次启动的机会,并且发送包含二进制数据 <<"123">> 的消息。当这个消息到达服务器时,服务器会尝试计算 atom_to_list(Data) ,因为Data是一个二进制数据,所以会立即出错(系统监控器会发现并显示错误)。现在服务器端的控制进程已经死掉了,而且其socket也关闭了。客户端会收到 {tcp_closed,Socket} 消息。

5   UDP

现在我们看看UDP协议(User Datagram Protocol,用户数据报协议)。使用UDP,互联网上的机器之间可以互相发送小段的数据,叫做数据报。UDP数据报是不可靠的,这意味着如果客户端发送一系列的UDP数据报到服务器,收到的数据报顺序可能是错误的。不过收到的数据报肯定是正确的。大的数据报会被分为多个小的分片,IP协议负责重新组装这些分片,并最终交付给应用。

UDP是无连接的协议,这意味着客户端无需连接服务器即可发送消息。这也意味着程序更加适于大量客户端收发小的消息报文。

在Erlang中编写UDP客户端和服务器比TCP时更简单,因为我们无需管理连接。

5.1   简单的UDP服务器和客户端

首先,我们看看服务器,一个通用的服务器样式如下:

server(Port) ->
    {ok,Socket} = gen_udp:open(Port,[binary]),
    loop(Socket).

loop(Socket) ->
    receive
        {udp,Socket,Host,Port,Bin} ->
            BinReply = ... ,
            gen_udp:send(Socket,Host,Port,BinReply),
            loop(Socket)
    end.

这里比TCP协议的例子更简单,因为我们至少不需要关心连接关闭的消息。注意我们以二进制方式打开socket,驱动也会以二进制数据的形式将报文发送到应用。

注意客户端。这里有个简单的客户端。它仅仅打开UDP socket,发送消息到服务器,等待响应(或超时),然后关闭socket并返回从服务器接收到的值。

client(Request) ->
    {ok,Socket} = gen_udp:open(0,[binary]),
    ok = gen_udp:send(Socket,"localhost",4000,Request),
    Value = receive
                {udp,Socket,_,_,Bin} ->
                    {ok,Bin}
                after 2000 ->
                    error
                end,
    gen_udp:close(Socket),
    Value

我们必须拥有一个超时,否则UDP的不可靠会让我们永远得不到响应。

5.2   一个UDP阶乘服务器

@page 261

Programming Erlang 第13章笔记 文件编程

Thursday, February 21st, 2008

文件编程

译者: gashero

目录

本章我们会看看文件管理的常见函数。Erlang标准发布版包含大量文件管理函数。我们看看程序中最常用的一部分。也会看一些文件操作的例子。另外也会提及一些罕见的文件操作,以便让你知道,他们是存在的。如果你想了解更多,去看手册吧。

1   库的组织

文件管理函数被阻止到了4个模块中:

file :用于打开、关闭、读取、写入文件、列目录等等的例程。 file 模块中的最常用函数在13.2节中有所讲解。更多细节请参考手册。

filename :管理不同平台上的文件名形式的细节,所以你可以在不同操作系统上使用相同的代码。

filelib :是 file 模块的扩展,包含一些列出文件、检查文件类型等等的函数。大部分都是使用 file 模块的函数来编写的。

io :在打开的文件上的操作函数。包含解析数据、按照格式写入数据到文件等。

2   读取文件的不同方式

让我们先看看读取文件的一些选项。首先写一个小程序来打开文件和读取数字,通过几种不同方式。

文件的内容只是字节流。他们的意义依赖于如何解释。

为了演示,我们对所有例子使用相同的文件。它事实上包含Erlang术语的序列。依赖于如何打开和读取文件,我们可以把内容解释为一列Erlang术语,作为一系列文本行,或者作为原始的二进制数据而不作任何解释。

这是原始文件:

{person, "joe", "armstrong",
    [{occupation, programmer},
     {favoriteLanguage, erlang}]}.

{cat, {name, "zorro"},
      {owner, "joe"}}.

现在我们用几种方式来读取它。

file 模块中的函数及其解释:

函数 解释
change_group 改变文件所属的组
change_owner 改变文件的拥有者
change_time 改变文件的修改时间和上次访问时间
close 关闭文件
consolt 从文件读取Erlang术语
copy 复制文件内容
del_dir 删除目录
delete 删除文件
eval 求值文件中的Erlang表达式
format_error 返回描述字符串或错误原因
get_cwd 获取当前工作目录
list_dir 列出一个目录中的所有文件
make_dir 新建目录
make_link 新建硬链接
make_symlink 新建符号链接
open 打开文件
position 设置文件指针位置
pread 在指定位置读取文件
pwrite 在指定位置写入文件
read 从文件读取
read_file 返回整个文件
read_file_info 获取文件信息
read_link 获取链接指向的位置
read_link_info 获取链接或文件的信息
rename 重命名文件
script 求值并返回文件中的Erlang表达式
set_cwd 设置当前工作目录
sync 同步内存缓冲到硬盘
truncate 截短文件
write 写入到文件
write_file 写入整个文件
write_file_info 改变文件信息

2.1   读取文件中的所有术语

Programming Erlang 第12章笔记 接口技术

Thursday, February 21st, 2008

接口技术

译者: gashero

目录

假设我们需要以Erlang接口运行以C/Python/Shell编写的程序。想要实现这些,我们需要在一个单独的操作系统进程中运行这些程序,而不是在Erlang运行时系统中,他们之间以面向字节的通道完成通信。Erlang端通过 Port 控制。创建端口的进程成为端口的连接进程。连接进程拥有特殊意义:让所有来自扩展程序的消息都会以连接进程的PID来标志。所有的扩展程序消息都会发送到连接进程。

我们可以看看连接进程(C)与端口(P)和扩展操作系统进程的关系。

chpt12.00.jpg从程序员的角度,端口就像是Erlang进程。你可以发送消息给它,你可以注册(register)它,等等。如果扩展程序crash了,那么连接程序就会收到退出信号,如果连接进程死掉了,扩展进程也会被kill掉。

你可能会好奇与为什么要这么做。很多编程语言允许其他语言编写的程序连接到可执行文件。而在Erlang,我们为了安全性不允许这么做。如果我们连接一个扩展程序到Erlang可执行程序,那么扩展程序的错误将会轻易的干掉Erlang。所以,其他语言编写的程序必须以单独的操作系统进程来运行。Erlang运行时系统和扩展进程通过字节流通信。

1   端口

创建端口使用如下命令:

Port=open_port(PortName,PortSettings)

这会返回端口,而如下消息是会被发送到端口的(这些消息中PidC是连接进程的PID):

Port ! {PidC,{command,Data}} :发送数据到端口

Port ! {PidC,{connect,Pid1}} :改变控制进程的PID,从PidC到Pid1

Port ! {PidC,close} :关闭端口

连接进程会从扩展程序收到如下消息:

receive
    {Port,{data,Data}} ->
        ... 数据处理 ...

下面的节,我们会编写Erlang与C结合的简单例子。C程序尽可能的简单以避开细节,直接谈接口技术。

注意,下面的例子对接口机制和协议做了加亮。编码和解码复杂的数据结构是个困难的问题,这里没有谈及。在本章末尾,我们会指出一些用于其他编程语言的接口库。

2   扩展C程序的接口

我们先从C程序开始:

int twice(int x) {
    return 2*x;
}

int sum(int x, int y) {
    return x+y;
}

我们最终目标是从Erlang调用这些例程,我们希望看到的是这个样子(Erlang中):

X1=example1:twice(23),
Y1=example1:sum(45,32),

与用户的设置有关,example1是一个Erlang模块,所有与C接口的细节都被隐藏在了模块example1中。

我们的接口需要一个主程序,用来解码Erlang程序发来的数据。在我们的例子,我们首先定义端口和扩展C程序的协议。我们使用一个超级简单的协议,并展示如何在Erlang和C中实现。协议定义如下:

  • 所有包都以2字节的长度代码开头,后面跟着这些字节的数据。
  • 想要调用 twice(N) ,Erlang程序必须以特定形式编码函数调用。我们假设编码是2字节序列 [1,N] ;参数1表示调用函数 twice ,后面的N代表一个1字节的参数。
  • 调用 sum(N,M) ,我们编码请求到字节序列 [2,N,M] 。
  • 假设返回值都是单一的字节长度的。

扩展C程序和Erlang程序都必须遵守这个协议。作为例子,我们通过 sum(45,32) 来看看工作流程:

  1. 端口发送字节序列 0,3,2,45,32 到扩展程序。头两个字节0,3,表示包的长度是3,代码2表示调用扩展的 sum 函数,而 45 和 32 表示调用函数的参数。
  2. 扩展程序从标准输入(stdin)读取这5个字节,调用 sum 函数,然后把字节序列 0,2,77 写到标准输出(stdout)。前两个字节表示包长度,后面的77是结果(仍然是1字节长度)。

我们现在写在两端遵守协议的接口,先以C程序开始。

2.1   C程序

@page 215

Programming Erlang 第6章笔记 编译和运行

Thursday, February 21st, 2008

编译和运行

译者: gashero

目录

上一章并没有详细的说明如何编译和运行程序,而只是使用了Erlang shell。这对小例子是很好的,但是会让你的程序更复杂,你将会需要一个自动处理过程来简化它。我们一般使用Makefile。

事实上有三种不同的方式可以运行程序。本章将会结合特定场合来讲解三种运行方式。

有时候也会遇到问题:Makefile失败、环境变量错误或者搜索路径不对。我们在遇到这些问题时也会帮你处理这些问题(issue)。

1   启动和停止Erlang shell

在Unix系统(包括Mac OS X),可以从命令行启动Erlang shell:

$ erl
Erlang (BEAM) emulator version 5.5.1 [source] [async-threads:0] [hipe]

Eshell V5.5.1  (abort with ^G)
1>

而在Windows系统,则可以点击erl的图标。

最简单的退出方法是按下 Ctrl+C (对Windows系统则是 Ctrl+Break ),随后按下 A 。如下:

BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded
       (v)ersion (k)ill (D)b-tables (d)istribution
a
$

另外,你也可以对 erlang:halt() 求值以退出。

erlang:halt() 是一个BIF,可以立即停止系统,也是我最常用的方法。不过这种方法也有个问题。如果正在运行一个大型数据库应用,那么系统在下次启动时就会进入错误回复过程,所以你应该以可控的方式关闭系统。

可控的关闭系统就是如果shell可用时就输入如下:

1> q().
ok
$

这会将所有打开的文件写入磁盘,停止数据库(如果在运行),并且按照顺序关闭所有OTP应用。 q() 其实只是shell上 init:stop() 的别名。

如果这些方法都不工作,可以查阅6.6节。

2   修改开发环境

当你启动程序时,一般都是把所有模块和文件都放在同一个目录当中,并且在这个目录启动Erlang。如果这样做当然没问题。不过如果你的应用变得更加复杂时,你可能需要把它们分成便于管理的块,把代码放入不同的目录。而且你从其他项目导入代码时,扩展代码也有其自己的目录结构。

2.1   设置装载代码的搜索路径

Erlang运行时系统包含了代码自动重新载入的功能。想要让这个功能正常工作,你必须设置一些搜索路径以便找到正确版本的代码。

代码装载功能是内置在Erlang中的,我们将在E.4节详细讨论。代码的装载仅在需要时才执行。

当系统尝试调用一个模块中的函数,而这个模块还没有装载时,就会发生异常,系统就会尝试这个模块的代码文件。如果需要的模块叫做 myMissingModule ,那么代码装载器将会在当前目录的所有目录中搜索一个叫做 myMissingModule.beam 的文件。寻找到的第一个匹配会停止搜索,然后就会把这个文件中的对象代码载入系统。

你可以在Erlang shell中看看当前装载路径,使用如下命令 code:get_path() 。如下是例子:

code:get_path()
[".",
"/usr/local/lib/erlang/lib/kernel-2.11.3/ebin",
"/usr/local/lib/erlang/lib/stdlib-1.14.3/ebin",
"/usr/local/lib/erlang/lib/xmerl-1.1/ebin",
"/usr/local/lib/erlang/lib/webtool-0.8.3/ebin",
"/usr/local/lib/erlang/lib/typer-0.1.0/ebin",
"/usr/local/lib/erlang/lib/tv-2.1.3/ebin",
"/usr/local/lib/erlang/lib/tools-2.5.3/ebin",
"/usr/local/lib/erlang/lib/toolbar-1.3/ebin",
"/usr/local/lib/erlang/lib/syntax_tools-1.5.2/ebin",
...]

管理装载路径两个最常用的函数如下:

@spec code:add_patha(Dir) => true | {error,bad_directory}

添加新目录Dir到装载路径的开头

@spec code:add_pathz(Dir) => true | {error,bad_directory}

添加新目录Dir到装载路径的末尾

通常并不需要自己来关注。只要注意两个函数产生的结果是不同的。如果你怀疑装载了错误的模块,你可以调用 code:all_loaded() (返回所有已经装载的模块列表),或者 code:clash() 来帮助你研究错误所在。

code 模块中还有其他一些程序可以用于管理路径,但是你可能根本没机会用到它们,除非你正在做一些系统编程。

按照惯例,经常把这些命令放入一个叫做 .erlang 的文件到你的HOME目录。另外,也可以在启动Erlang时的命令行中指定:

> erl -pa Dir1 -pa Dir2 ... -pz DirK1 -pz DirK2

其中 -pa 标志将目录加到搜索路径的开头,而 -pz 则把目录加到搜索路径的末尾。

2.2   在系统启动时执行一系列命令

我们刚才把载入路径放到了HOME目录的 .erlang 文件中。事实上,你可以把任意的Erlang代码放入这个文件,在你启动Erlang时,它就会读取和求值这个文件中的所有命令。

假设我的 .erlang 文件如下:

io:format("Running Erlang~n").
code.add_patha(".")
code.add_pathz("/home/joe/2005/erl/lib/supported").
code.add_pathz("/home/joe/bin").

当我启动系统时,我就可以看到如下输出:

$ erl
Erlang (BEAM) emulator version 5.5.1 [source] [async-threads:0] [hipe]

Running Erlang
Eshell V5.5.1  (abort with ^G)
1>

如果当前目录也有个 .erlang 文件,则会在优先于HOME目录的。这样就可以在不同启动位置定制Erlang的行为,这对特定应用非常有用。在这种情况下,推荐加入一些打印语句到启动文件;否则你可能忘记了本地的启动文件,这可能会很混乱。

某些系统很难确定HOME目录的位置,或者根本就不是你以为的位置。可以看看Erlang认为的HOME目录的位置,通过如下的:

1> init:get_argument(home).
{ok,[["/home/joe"]]}

通过这里,我们可以推断出Erlang认为的HOME目录就是 /home/joe

3   运行程序的其他方式

Erlang程序存储在模块中。一旦写好了程序,运行前需要先编译。不过,也可以以脚本的方式直接运行程序,叫做 escript 。

下一节会展示如何用多种方式编译和运行一对程序。这两个程序很不同,启动和停止的方式也不同。

第一个程序 hello.erl 只是打印 “Hello world” ,这不是启动和停止系统的可靠方式,而且他也不需要存取任何命令行参数。与之对比的第二个程序则需要存取命令行参数。

这里是一个简单的程序。它输出 “Hello world” 然后输出换行。 “~n” 在Erlang的io和io_lib模块中解释为换行。

-module(hello).
-export([start/0]).

start() ->
    io:format("Hello world~n").

让我们以3种方式编译和运行它。

3.1   在Erlang shell中编译和运行

$ erl
...
1> c(hello).
{ok,hello}
2> hello:start().
Hello world
ok

3.2   在命令行编译和运行

$ erlc hello.erl
$ erl -noshell -s hello start -s init stop
Hello World
$

Note

快速脚本:

有时我们需要在命令行执行一个函数。可以使用 -eval 参数来快速方便的实现。这里是例子:

erl -eval 'io:format("Memory: ~p~n", [erlang:memory(total)]).'\
    -noshell -s init stop

Windows用户:想要让如上工作,你需要把Erlang可执行文件目录加入到环境变量中。否则就要以引号中的全路径来启动,如下:

"C:\Program Files\erl5.5.3\bin\erlc.exe" hello.erl

第一行 erlc hello.erl 会编译文件 hello.erl ,生成叫做 hello.beam 的代码文件。第一个命令拥有三个选项:

-noshell :启动Erlang而没有交互式shell,此时不会得到Erlang的启动信息来提示欢迎

-s hello start :运行函数 hello:start() ,注意使用 -s Mod ... 选项时,相关的模块Mod必须已经编译完成了。

-s init stop :当我们调用 apply(hello,start,[]) 结束时,系统就会对函数 init:stop() 求值。

命令 erl -noshell ... 可以放入shell脚本,所以我们可以写一个shell脚本负责设置路径(使用-pa参数)和启动程序。

在我们的例子中,我们使用了两个 -s 选项,我们可以在一行拥有多个函数。每个 -s 都会使用 apply 语句来求职,而且,在一个执行完成后才会执行下一个。

如下是启动hello.erl的例子:

#! /bin/sh
erl -noshell -pa /home/joe/2006/book/JAERANG/Book/code\
    -s hello start -s init stop

Note

这个脚本需要使用绝对路径指向包含 hello.beam 。不过这个脚本是运行在我的电脑上,你使用时应该修改。

运行这个shell脚本,我们需要改变文件属性(chmod),然后运行脚本:

$ chmod u+x hello.sh
$ ./hello.sh
Hello world
$

Note

在Windows上, #! 不会起效。在Windows环境下,可以创建.bat批处理文件,并且使用全路径的Erlang来启动(假如尚未设置PATH环境变量)。

一个典型的Windows批处理文件如下:

"C:\Program Files\erl5.5.3\bin\erl.exe" -noshell -s hello start -s init stop

3.3   以Escript运行

使用escript,你可以直接运行你的程序,而不需要先编译。

Warning

escript包含在Erlang R11B-4或以后的版本,如果你的Erlang实在太老了,你需要升级到最新版本。

想要以escript方式运行hello,我们需要创建如下文件:

#! /usr/bin/env escript

main(_) ->
    io:format("Hello world\n").

Note

开发阶段的导出函数

如果你正在开发代码,可能会非常痛苦于需要不断在导出函数列表增删函数。

一个声明 -compile(export_all) ,告知编译器导出所有函数。使用这个可以让你的开发工作简化一点。

当你完成开发工作时,你需要抓实掉这一行,并添加适当的导出列表。首先,重要的函数需要导出,而其他的都要隐藏起来。隐藏方式可以按照个人喜好,提供接口也是一样的效果。第二,编译器会对需要导出的函数生成更好的代码。

在Unix系统中,我们可以立即按照如下方式运行:

$ chmod u+x hello
$ ./hello
Hello world
$

Note

这里的文件模式在Unix系统中表示可执行,这个通过chmod修改文件属性的步骤只需要执行一次,而不是每次运行程序时。

在Windows系统中可以如下方式运行:

C:\> escript hello
Hello world
C:\>

Note

在以escript方式运行时,执行速度会明显的比编译方式慢上一个数量级。

3.4   程序的命令行参数

@page 125

Programming Erlang 第5章笔记 高级顺序编程

Thursday, February 21st, 2008

高级顺序编程

译者: gashero

目录

现在我们已经可以很好的理解顺序编程了。本章包含如下内容:

  • BIF:是 built-in function 的缩写,是包含在Erlang语言中的一部分。他们看起来像是在Erlang中写的一样,但是实际上是Erlang虚拟机实现的原始操作。
  • binary:这是一种常用的原始数据类型,高效率的内存段。
  • bit语法:模式匹配语法,用于打包和解包binary中的字段。
  • 工具箱:包含一些小专题来完成顺序编程。

一旦你掌握了本节,你就会很了解Erlang的顺序编程了,你也可以准备深入学习并行编程了。

1 BIF

BIF就是内置在Erlang中的函数。通常用于Erlang程序无法实现的功能。例如转换list到tuple或者获取当前的时间和日期。要完成这些任务时,我们就需要调用BIF。

例如BIF的 tuple/to_list/1 转换tuple到list,而 time/0 返回当前时间:

1> tuple_to_list({12,cat,"hello"}).
[12,cat,"hello"]
2> time().
{20,0,3}

所有的BIF其实是属于 erlang 模块的虽然大多数的BIF(比如tuple_to_list)是自动导入的,所以我们可以直接使用 tuple_to_list(…) 而不是 erlang:tuple_to_list(…) 。

你可以在手册页找到BIF的完整列表,或者在 http://www.erlang.org/doc/man/erlang.html

2 Binary

Binary 数据结构用以存储大量的原始数据。二进制对象存储数据具有比list和tuple更高的空间效率,而且,运行时系统也对二进制对象的输入和输出做了优化。

二进制对象书写和打印作一系列的整数或字符串,包含在双小于号和双大于号中。例如:

1> <<5,10,20>>.
<<5,10,20>>
2> <<"hello">>.
<<"hello">>

当你在二进制对象中使用整数形式时,每个数字必须在0-255的范围内。二进制对象 <<"cat">> 其实是 <<99,97,116>> 的速记形式;也就是说二进制对象使用ASCII字符构成字符串。

同字符串一样,如果二进制对象是可打印字符串,shell就会将二进制对象当作字符串打印;否则他会按照一个序列的整数来打印。

我们可以构造一个二进制对象或者解析二进制对象的元素,通过BIF,或者我们也可以使用BIF语法(查看5.3节)。在本节,只是看看使用BIF。

Note

@spec func(Arg1,…ArgN) -> Val

@spec代表什么?

这是一种Erlang类型符号,可以被转换成描述函数的文档,包括参数和返回值类型。这是一种很好的自省方式,不过对于想要包含更多细节,请参考附录A。

2.1 管理二进制对象的BIF

如下BIF可以管理二进制对象:

@spec list_to_binary(IoList) -> binary()

list_to_binary 返回一个通过参数IoList构造的二进制对象。这里的IoList是列表,其元素是0-255的整数、二进制对象或IoList:

1> Bin1=<<1,2,3>>.
<<1,2,3>>
2> Bin2=<<4,5>>.
<<4,5>>
3> Bin3=<<6>>.
<<6>>
4> list_to_binary([Bin1,1,[2,3,Bin2],4|Bin3]).
<<1,2,3,1,2,3,4,5,4,6>>

@spec split_binary(Bin,Pos) -> {Bin1,Bin2}

这个函数按照指定位置将二进制对象切割为两部分:

1> split_binary(<<1,2,3,4,5,6,7,8,9,10>>,3).
{<<1,2,3>>,<<4,5,6,7,8,9,10>>}

@spec term_to_binary(Term) -> Bin

转换Erlang术语到二进制对象。

通过 term_to_binary 产生的二进制对象存储在叫做扩展术语格式中。转换来的术语可以存储在文件中、通过网络报文发送等等,而原始的术语还可以重建。这对于在文件中或远程机器上存储复杂数据结构非常有用。

@spec binary_to_term(Bin) -> Term

这个 term_to_binary 的反函数:

1> B=term_to_binary({binaries,"are",useful}).
<<131,104,3,100,0,8,98,105,110,97,114,105,101,115,107,
0,3,97,114,101,100,0,6,117,115,101,102,117,108>>
2> binary_to_term(B).
{binaries,"are",useful}

@spec size(Bin) -> int

获取二进制对象的字节数:

1> size(<<1,2,3,4,5>>).
5

3 比特语法

比特语法是一种扩展语法用以对二进制对象中的比特序列进行模式匹配。当你编写底层的用以解包二进制对象时,你会发现比特语法非常有用。比特语法最初设计用于协议编程(Erlang很擅长的方向)和产生高效率的打包数据。

假设我们有三个变量X/Y/Z,是我们需要从16bit的内存M中提取的字段。X占用3bit,Y占用7bit,Z占用6bit。在大多数语言中都是使用底层的位操作,包括移位和掩码。而在Erlang中,你可以这么写:

M=<<X:3,Y:7,Z:6>>

完整的比特语法会稍微复杂一点,所以我们继续下一小步。首先我们看一个简单的打包和解包RGB颜色到16bit字中的例子。然后我们会深入了解比特语法表达式。最后我们看3个实际的比特语法的例子。

3.1 打包和解包16bit颜色

我们来写一个简单的例子。假设我们想要描述一个16bit的RGB颜色。我们让5bit代表红色频道、6bit代表绿色频道、5bit代表蓝色频道。(使用更多的空间给绿色是因为,人眼对绿色更敏感)。

我们可以创建16bit的内存段Mem包含单一的RGB颜色组:

1> Red=2.
2
2> Green=61.
61
3> Blue=20.
20
4> Mem=<<Red:5,Green:6,Blue:5>>.
<<23,180>>

注意在第4行我们创建了2字节的二进制对象,包含16bit,而shell打印的则是 <<23,180>>

想要解包一个字,我们编写如下模式:

5>  <<R1:5,G1:6,B1:5>>=Mem.
<<23,180>>
6> R1.
2
7> G1.
61
8> B1.
20

3.2 比特语法表达式

比特语法表达式是如下形式的:

<<>>
<<E1,E2,...,En>>

每个元素Ei指定了二进制对象的一个字段。每个元素Ei有四种格式的可能:

Ei=Value |
    Value:Size |
    Value/TypeSpecifierList |
    Value:Size/TypeSpecifierList

无论使用哪种格式,在二进制对象中的总bit数必须可以被8整除。因为二进制对象实际上只是包含了多个字节的数据,所以没法保存不是以字节为单位的数据。

当你构造一个二进制对象时,Value必须已经是确定的了,可以是字符串、或者可以生成整数、浮点数、二进制对象的表达式。当用于模式匹配操作时,Value可以是已经绑定的或者尚未绑定的变量、整数、字符串、浮点数或二进制对象。

Size必须是一个得到整数的表达式。在模式匹配中,Size必须是整数或者值为整数的变量。Size不可以是尚未绑定的变量。

Size的值指定了数据段的单元数。缺省值依赖于类型。对整数缺省为8,浮点数缺省为64,而二进制对象则对应其长度。在模式匹配时,缺省值仅对最后一个元素有效。其他所有匹配时的二进制对象元素长度必须指定。

TypeSpecifierList是以连字符分割的一列元素,形式为End-Sign-Type-Unit。任何前述元素都可以省略,元素也可以在任何顺序。如果一个元素被省略,就使用其缺省值。

TypeSpecifierList中的项目的值可以是如下:

@type End=big | little | native

(@type是Erlang的类型符号,参阅附录A)

这是指定机器的字节序,native是运行时检测,依赖于具体的CPU。缺省是big。这个仅对从二进制对象中打包和解包整数时才有用。在从不同的字节序的机器上打包和解包二进制对象中的整数时,你必须注意正确的字节序。

有些时候,当你必须确定自己理解这些时,这里有些实验可以用。测试你所在的机器,可以尝试在shell中如下输入:

1> {<<16#12345678:32/big>>,<<16#12345678:32/little>>,
    <<16#12345678:32/native>>,<<16#12345678:32>>}.
{<<18,52,86,120>>,<<120,86,52,18>>,
 <<120,86,52,18>>,<<18,52,86,120>>}

这些输出展示了编码到二进制对象的比特语法。

如果你还是无法放心,那么可以用 term_to_binary/1 来完成转换工作,随后用 binary_to_term/1 完成解包。这样就不用担心字节序的问题了。因为在tuple中总是有正确的字节序。

@type Sign=signed | unsigned

这个参数仅用于模式匹配,缺省是unsigned。

@type Type=integer | float | binary

缺省是integer

@type Unit=1 | 2 | … 255

这个段的总单位数,这个单位数必须大于等于0,而且必须是8的整倍数。

Unit的缺省值依赖于Type,如果Type是integer则为1,如果Type是binary则为8。

如果你感觉比特语法有点复杂,不要怕。让比特语法匹配还算简单。最好的实践方法是在shell中不断的尝试,直到符合要求,然后把代码复制粘贴到程序中。我就这么干的。

3.3 高级比特语法例子

学习比特语法还是略有难度的,但是好处也是巨大的。本届包含3个实际的例子。所有代码都是从现实的程序中挖出来的。

3.3.1 寻找MPEG中的同步帧

假设我们需要一个程序管理MPEG音频数据。我们可能想要使用Erlang编写流媒体服务器而需要获得MPEG音频的tag和内容描述。想要实现这些,我们需要识别出数据流中的同步帧。

MPEG音频是从一大堆帧组成的。每个帧都有他自己的头和跟随的音频信息,不过没有文件头。原理上讲,你可以把一个MPEG文件分成很多段并且分别播放。任何相关软件都需要先读取MPEG流的头信息和同步帧。

一个MPEG头部以11bit的同步帧,就是11个连续的bit组成,后面跟真描述信息,例如:

AAAAAAAA AAABBCCD EEEEFFGH IIJJKLMM
字段 意义
AAAAAAAAAAA 同步字(11bit,全是1)
BB 2bit是MPEG音频的版本号
CC 2bit是层(layer)描述
D 1bit,保护位(bit)

其他相关细节这里不关心。基本上通过A-M的值,我们就可以计算一个MPEG帧的长度了。

想要找到同步点,我们首先假设我们已经正确的定位了MPEG帧的开始。我们使用位置找到并计算帧长度。不过也有可能定位到无效的数值。假设我们已经得到了帧长度,我们就可以跳过开始的下一帧,看看下一段是否是另外一个帧头部。

想要找到同步点,我们首先假设我们已经定位了MPEG头部。我们随后计算帧长度。然后发生如下步骤:

  • 我们的假设是正确的,所以当我们向前跳过一个帧以后,我们会找到下一个MPEG头部。
  • 我们的假设是错误的,我们定位的不是以11个1开头的帧头部标志,所以无法计算帧长度。
  • 我们的假设不正确,但是我们定位了音乐数据的两个字节,看起来像是帧头部。在这种情况下,我们计算帧长度,但是当我们向前跳这个长度时却无法找到新的头部。

为了验证,我们会尝试3个连续的头部。同步帧计算程序如下:

find_sync(Bin,N) ->
    case is_header(N,Bin) of
        {ok,Len1,_} ->
            case is_header(N+Len1,Bin) of
                {ok,Len2,_} ->
                    case is_header(N+Len1+Len2,Bin) of
                        {ok,_,_} ->
                            {ok,N};
                        error ->
                            find_sync(Bin,N+1)
                    end.
                error ->
                    find_sync(Bin,N+1)
            end.
        error ->
            find_sync(Bin,N+1)
    end.

find_sync 尝试找到3个连续的MPEG帧头部。如果字节N在Bin世帧头部的开头,随后 is_header(N,Bin) 会返回 {ok,Length,Info} 。如果 is_header 返回 error ,那么N就无法指向正确的帧开始位置。我们可以在shell中做一个快速的测试来确保它工作正常:

1> {ok,Bin} = file:read("/home/joe/music/mymusic.mp3").
{ok,<<73,68,51,3,0,0,0,0,33,22,84,73,84,50,0,0,0,28, ...>>
2> mp3_sync:find_sync(Bin,1).
{ok,4256}

这里使用 file:read_file 来读取整个文件到二进制对象。现在是函数 is_header

is_header(N,Bin) ->
    @page 94

3.3.2 解包COFF数据

3.3.3 解包IPv4数据包头部