Archive for April 10th, 2008

Programming Erlang第14章Socket编程[完整]

Thursday, April 10th, 2008

Socket编程

译者: gashero

目录

大多数我写的更有趣的程序都包含了Socket。一个Socket是一个允许机器与Internet上另一端使用IP通信的端点。本章关注Internet上两种核心网络协议:TCP和UDP。

UDP允许应用发送简短报文(叫做数据报datagram)到另一端,但是对报文没有交付的担保。并且可能在到达时有错误的顺序。而TCP则提供了可靠的字节流,并且确保在连接后传输数据的顺序也是对的。

为什么Socket编程很有趣呢?因为它允许应用于其他Internet上的机器通信,而这些比本地操作更有潜力。

有两种主要的库用于Socket编程: gen_tcp 用于TCP编程、 gen_udp 用于UDP编程。

在本章,我们看看如果使用TCP和UDP socket编写客户端和服务器。我们将会尝试多种形式的服务器(并行、串行、阻塞、非阻塞)并且看看通信接口应用如何将数据流传递给其他应用。

1   使用TCP

我们学习Socket编程的历险从一个从服务器获取TCP数据的程序开始。然后我们会写一个简单的串行TCP服务器展示如何并行的处理多个并发会话。

1.1   从服务器获取数据

我们先写一个小函数(标准库的 http:request(Url) 实现相同的功能,但是这里是演示TCP的)来看看TCP socket编程获取 http://www.google.com 的HTML页面:

nano_get_url() ->
    nano_get_url("www.google.com").

nano_get_url(Host) ->
    {ok,Socket}=gen_tcp:connect(Host,80,[binary,{packet,0}]),
    ok=gen_tcp:send(Socket,"GET / HTTP/1.0\r\n\r\n"),
    receive_data(Socket,[]).

receive_data(Socket,SoFar) ->
    receive
        {tcp,Socket,Bin} ->
            receive_data(Socket,[Bin|SoFar]);
        {tcp_closed,Socket} ->
            list_to_binary(reverse(SoFar))
    end.

它如何工作呢?

  1. 我们通过 gen_tcp:connect 在 http://www.google.com 打开TCP协议80端口。connect的参数binary告知系统以binary模式打开socket,并且以二进制方式传递数据到应用。 {packet,0} 意味着无需遵守特定格式即可将数据传递到应用。
  2. 我们调用 gen_tcp:send 并发送消息 GET / HTTP/1.0\r\n\r\n 到socket。然后我们等待响应。响应并不会一次性得到,而是分片的、分时间的。这些分片是按照一系列报文的方式接收的,并且通过打开的socket发送到进程。
  3. 我们接收一个 {tcp,Socket,Bin} 报文。第三个参数是binary。这是因为我们已经使用二进制方式打开了socket。这是从WEB服务器发到我们的一个消息报文。我们把这个报文加到分片列表并等待下一个报文。
  4. 我们接收到 {tcp_closed,Socket} 报文,这在服务器发送完所有数据时发生(这仅在HTTP/1.0时正确,现在版本的HTTP使用另外一种结束策略)。
  5. 当我们收到了所有的分片,存储顺序是错误的,所以我们重新对分片排序和连接。

我们看看他如何工作:

1> B=socket_examples:nano_get_url().
<<"HTTP/1.0 302 Found\r\nLocation: http://www.google.se/\r\n
    Cache-Control: private\r\nSet-Cookie: PREF=ID=b57a2c:TM"...>>

Note

当运行 nano_get_url 时,结果是二进制的,而你看到的则是Erlang shell以便于阅读的方式展示的。当以便于阅读方式展示时,所有控制字符都是以转义格式显示。二进制数据也会被截短,后面以省略号显示。如果想要看所有的二进制数据,你可以通过 io:format 打印或者使用 string:tokens 分片显示:

2> io:format("~p~n",[B]).
<<"HTTP/1.0 302 Found\r\nLocation: http://www.google.se/\r\n
    Cache-Control: private\r\nSet-Cookie: PREF=ID=B57a2c:TM"
    TM=176575171639526:LM=1175441639526:S=gkfTrK6AFkybT3;
    ... 还有一大堆行 ...
>>
3> string:tokens(binary_to_list(B),"\r\n").
["HTTP/1.0 302 Found",
"Location: http://www.google.se/",
"Cache-Control: private",
... 还有一大堆行 ...

这就差不多WEB客户端的工作方式(不过在浏览器中正确的显示要做更多的工作)。上面的代码只是实验的开始。你可以尝试做一些修改来下载整个网站或者读取电子邮件。可能性是无限的。

注意分片的重新组装方式如下:

receive_data(Socket,SoFar) ->
    receive
        {tcp,Socket,Bin} ->
            receive_data(Socket,[Bin|SoFar]);
        {tcp_closed,Socket} ->
            list_to_binary(reverse(SoFar))
    end.

每当我们收到分片时,就把他们加入SoFar这个列表的头部。当收到了所有分片时,Socket就关闭了,我们颠倒顺序,并且把所有分片组合起来。

你可能以为收到分片后立即合并会好些:

receive_data(Socket,SoFar) ->
    receive
        {tcp,Socket,Bin} ->
            receive_data(Socket,list_to_binary([SoFar,Bin]));
        {tcp_closed,Socket} ->
            SoFar
    end.

这段代码是正确,但是效率比较低。因为后一种版本不断的把新的二进制数据加到缓冲区后面,也就是包含了多个数据的拷贝的。一个好办法是累积所有分片,尽管顺序是相反的,然后反序整个列表并一次连接所有分片。

1.2   一个简单的TCP服务器

在前一节,我们写了一个简单的客户端。现在我们写个服务器。

服务器开启2345端口然后等待一个消息。这个消息是包含Erlang术语的二进制数据,这个术语是包含Erlang表达式的字符串。服务器对该表达式求值并且将结果通过socket发到客户端。

Note

如何编写WEB服务器

编写WEB客户端或服务器是很有趣的。当然,有些人已经写好这些了,但是如果想要真正理解他们的工作原理,研究底层实现还是很有意义的。谁知道呢,说不定我们写的WEB服务器更好。所以我们看看如何做吧?

想要构建一个WEB服务器,任何一个需要实现标准的Internet协议,我们需要使用正确的工具和了解协议实现。

在我们的例子用来抓取一个WEB页,我们如何知道已经正确打开了80端口,并且如何知道已经发送了 GET / HTTP/1.0\r\n\r\n 到服务器?答案很简单。所有主要协议都已经在RFC文档中有描述。HTTP/1.0定义于RFC1945,所有RFC的官方网站是 http://www.letf.org

另外一个非常有用的方法是抓包。通过一个数据包嗅探器,我们可以抓取和分析所有IP数据包,无论是应用程序发出的还是接收的。大多数嗅探器包含解码器和分析器可以得出数据包的内容和格式。一个著名的嗅探器是Wireshark(以前叫Ethereal),可以到 http://www.wireshark.org/ 了解更多。

使用嗅探器和RFC武装起来的我们,就可以准备编写下一个杀手级应用了。

编写这个程序(或者其他使用TCP/IP的程序),需要响应一些简单的请求:

  • 数据如何组织,知道数据如何组成请求或者响应?
  • 数据在请求和响应中如何编码(encode & marshal)和解码(decode & demarshal)

TCP socket数据只是没有格式的字节流。在传输时,数据会切成任意长度的分片,所以我们需要多少数据如何组成请求或响应。

在Erlang的例子,我们使用简单的转换,把每个逻辑请求或响应前加上N(1/2/4)字节的长度数。这就是 {packet,N} (这里的packet表示一个应用程序请求或响应报文,而不是电线里面的物理包) 参数在 gen_tcp:connect 和 gen_tcp:listen 函数的意义。注意packet附带的那个参数在客户端和服务器端必须商定好。如果服务器使用 {packet,2} 而客户端使用 {packet,4} 则会出错。

在我们以 {packet,N} 选项打开socket后,我们就不需要担心数据分片了。Erlang驱动会自动确保数据报文的所有分片都收到并且长度正确时才发到应用程序。

另一个需要注意的是数据编码和解码。最简单时,我们可以用 term_to_binary 来对Erlang术语编码,并使用 binary_to_term 来解码数据。

注意,客户端和服务器通信的包转换和编码规则是两行代码完成,分别使用 {packet,4} 来打开socket和使用 term_to_binary 和其反函数完成编码和解码数据。

我们可以简单的打包和编码Erlang术语到基于文本的协议如HTTP或XML。使用Erlang的 term_to_binary 和其反函数可以比基于XML等文本的协议性能高出一个数量级。现在我们先看看一个简单的服务器:

start_nano_server() ->
    {ok,Listen}=gen_tcp:listen(2345,[binary,{packet,4},
                                            {reuseaddr,true},
                                            {active,true}]),
    {ok,Socket}=gen_tcp:accept(Listen),
    gen_tcp:close(Listen),
    loop(Socket).

loop(Socket) ->
    receive
        {tcp,Socket,Bin} ->
            io:format("Server received binary = ~p~n",[Bin]),
            Str=binary_to_term(Bin),
            io:format("Server (unpacked) ~p~n",[Str]),
            Reply=lib_misc:string2value(Str),
            io:format("Server replying = ~p~n",[Reply]),
            gen_tcp:send(Socket,term_to_binary(Reply)),
            loop(Socket);
        {tcp_closed,Socket} ->
            io:format("Server socket closed~n")
    end.

它如何工作?

  1. 首先,我们调用 gen_tcp:listen 来监听2345端口,并且设置报文转换格式为 {packet,4} ,意味着每个包有4个字节的包头,代表长度。然后 gen_tcp:listen(..) 会返回 {ok,Socket} 或者 {error,Why} ,但是我们先看看成功的情况。所以写下如下代码 {ok,Listen}=gen_tcp:listen(...), 这在程序返回 {error,…} 时发生匹配错误。如果成功则会绑定Listen到正在监听的socket。对于正在监听的socket,我们只需要做一件事,就是使用它做参数调用 gen_tcp:accept 。
  2. 现在我们调用 gen_tcp:accept(Listen) 。在这里,程序会挂起以等待连接。当我们获得连接时,这个函数返回已经绑定的Socket,这个socket就是可以与客户端连接并且可以通信的了。
  3. 当 gen_tcp:accept 返回,我们立即调用 gen_tcp:close(Listen) 。这就关闭了监听的socket,服务器也就不会继续接受新的连接了。而这不会影响已有的连接,只是针对新连接。
  4. 解码输入数据
  5. 对字符串求值
  6. 编码返回数据并且通过socket发送

注意,这个程序只接受一个请求,程序运行完成后就不会再接受其他请求了。

这是一个非常简单的服务器展示了如何打包和编码应用数据。接收请求,计算响应,发出响应,然后结束。

想要测试这个服务器,我们需要一个对应的客户端:

nano_client_eval(Str) ->
    {ok,Socket}=get_tcp:connect("localhost",2345,[binary,{packet,4}]),
    ok=gen_tcp:send(Socket,term_to_binary(Str)),
    receive
        {tcp,Socket,Bin} ->
            io:format("Client received binary = ~p~n",[Bin]),
            Val=binary_to_term(Bin),
            io:format("Client result = ~p~n",[Val]),
            gen_tcp:close(Socket)
    end.

想要测试你的代码,我们需要在一台机器上同时启动客户端和服务器。所以在 gen_tcp:connect 中的hostname参数就可以用硬编码的 localhost 。

注意客户端和服务器端使用的 term_to_binary 和 binary_to_term 怎样编码和解码数据。

想要运行,我们需要开两个终端然后启动Erlang shell。

首先,我们启动服务器:

1> socket_examples:start_nano_server().

我们看不到任何输出,当然什么也没发生呢。然后我们在另一个终端启动客户端,输入如下命令:

1> socket_examples:nano_client_eval("list_to_tuple([2+3*4,10+20])").

在服务器端的窗口尅看到如下输出:

Server received binary = <<131,107,0,28,108,105,115,116,95,116,
                           111,95,116,117,112,108,101,40,91,50,
                           43,51,42,52,44,49,48,43,50,48,93,41>>
Server (unpacked)  "list_to_tuple([2+3*4,10+20])"
Server replying = {14,30}

在客户端窗口可以看到:

Client received binary = <<131,104,2,97,14,97,30>>
Client result = {14,30}
ok

最后,在服务器窗口看到:

Server socket closed

1.3   改进服务器

前一节我们构造了一个服务器可以接受一个请求并且终止。简单的修改代码,我们可以就可以完成另一个不同类型的服务器:

  1. 序列服务器,同一时间只接受一个请求
  2. 并行服务器,同一时间可以接受多个请求

原始启动代码如下:

start_nano_server() ->
    {ok,Listen} = gen_tcp:listen(...),
    {ok,Socket} = gen_tcp:accept(Listen),
    loop(Socket).
...

我们将会以此为基础完成另外两种服务器。

1.4   序列服务器

想要构造序列服务器,我们如下改变了代码:

start_seq_server() ->
    {ok,Listen} = gen_tcp:listen(...),
    seq_loop(Listen).

seq_loop(Listen) ->
    {ok,Socket} = gen_tcp:accept(Listen),
    loop(Socket),
    seq_loop(Listen).

loop(...) -> %%同以前的一样处理

这会让以前的例子在需要多个请求时工作的很好。我们一直让监听的socket开着而不是关闭。另一个不同是在 loop(Socket) 结束后,我们再次调用 seq_loop(Listen) ,以便接受后面的连接请求。

如果客户端在服务器忙于处理一个已经存在的连接时尝试连接服务器,连接请求会被缓存,直到服务器完成那个已经存在的请求。如果缓存的连接数大于监听backlog,连接会被拒绝。

我们只是展示了如何开启服务器。关闭服务器很简单(停止并行服务器也一样),只要kill掉启动服务器的进程即可。 gen_tcp 连接他本身到控制进程,如果控制进程死掉了,他也就会关闭socket。

1.5   并行服务器

构造并行服务器的秘诀是每次接受新的连接以后马上用 spawn 立即生成一个新的进程:

start_parallel_server() ->
    {ok,Listen} = gen_tcp:listen(...),
    spawn(fun() -> par_connect(Listen) end).

par_connect(Listen) ->
    {ok,Socket} = gen_tcp:accept(Listen),
    spawn(fun() -> par_connect(Listen) end),
    loop(Socket).

loop(...) -> %%同上

这段代码与串行服务器类似。重要的不同在于 spawn ,可以确保为每个连接创建一个进程。现在可以对比两种服务器了,可以看到他是如何将一个串行服务器转变为并行服务器的。

所有这三种服务器调用 gen_tcp:listen 和 gen_tcp:accept 不同在于如何调用函数是并行方式还是串行方式。

Note

知识点:

  • 创建socket的进程(调用 gen_tcp:accept 或者 gen_tcp:connect )叫做这个socket的 控制进程 。所有来自于socket的消息都会被发送到控制进程;如果控制进程死掉了,对应的socket就会被关闭。可以修改一个socket的控制进程为NewPid,通过 gen_tcp:controlling_process(Socket,NewPid) 。

  • 我们的并行服务器可能会随着连接而创建数千个连接。我们可能希望限制最大并发连接数。这可以通过一个活动连接计数器来实现。每次获得一个新连接时就增加1,而在一个连接完成时减少1。可以用这种机制显示并发连接数。

  • 在我们接受连接后,最好明确的设置请求socket选项,如下:

    {ok,Socket} = gen_tcp:accept(Listen),
    inet:setopts(Socket,[{packet,4},binary,{nodelay,true},{active,true}]),
    loop(Socket)
  • 在Erlang R11B-3中,几个Erlang进程可以在同一个监听中的socket调用 gen_tcp:accept/1 。这也是一种建立并发服务器的简单方式,因为你尅拥有预分配的进程池,等待 gen_tcp:accept/1 。

2   控制问题

Erlang中socket可以以3种模式打开:active、active once、passive。其设置可以通过 gen_tcp:connect(Address,Port,Options) 或 gen_tcp:listen(Port,Options) 中的Options参数来设置为 {active,true|false|once} 。

如果指定了 {active,true} ,就会创建一个主动(active)的socket; {active,false} 会创建一个被动的(passive)的socket; {active,once} 创建主动的socket,但是只接受一条消息,接收到消息后,必须手动重新开启(reenable)才能继续接受消息。

我们看看在不同地方使用的区别。

active和passive的socket的区别在于消息到来时的处理方式:

  • 一旦一个active的socket被创建了,控制进程会发送收到的数据,以 {tcp,Socket,Data} 消息的形式。而控制进程无法控制消息流。一个无赖的客户端可以发送无数的消息到系统,而这些都会被发送到控制进程。而控制进程无法停止这个消息流。
  • 如果socket在passive模式,控制进程需要调用 gen_tcp:recv(Socket,N) 来获取数据。它会尝试获取N字节的数据,如果N=0,就会尽可能的拿到所有可以取得的数据。这种情况下,服务器尅通过选择是否调用 gen_tcp:recv 来控制消息流。

被动模式的socket用于控制发送到服务器的数据流。为了举例,我们可以以3种方式编写消息接收循环:

  • 主动消息获取(非阻塞)
  • 被动消息获取(阻塞)
  • 混合方式获取(部分阻塞)

2.1   主动消息获取(非阻塞)

第一个例子是以主动模式打开socket,然后接受来自socket的数据:

{ok,Listen} = gen_tcp:listen(Port,[...,{active,true}...]),
{ok,Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    receive
        {tcp,Socket,Data} ->
            ... 输出处理 ...
        {tcp_closed,Socket} ->
            ...
    end.

这个过程无法控制发到服务器循环的消息流,如果客户端产生数据的速度大于服务器消费数据的速度,系统就会收到洪水般地消息-消息缓冲区溢出,系统将会crash并表现怪异。

这种类型的服务器叫做非阻塞服务器,因为它无法阻塞客户端。我们仅在信任客户端的情况下才会使用非阻塞服务器。

2.2   被动消息获取(阻塞)

在这一节,我们写阻塞服务器:服务器以被动模式打开socket,通过 {active,false} 选项。这个服务器不会被危险的客户端洪水袭击。

服务器循环中的代码调用 gen_tcp:recv 来接收数据。客户端在服务器调用 recv 之前会被阻塞。注意OS会对客户端发来的数据做一下缓冲,以允许客户端在服务器调用 recv 之前仍然可以继续发送一小段数据。

{ok,Listen} = gen_tcp:listen(Port,[...,{active,false}...]),
{ok,Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    case gen_tcp:recv(Socket,N) of
        {ok,B} ->
            ... 数据处理 ...
            loop(Socket);
        {error,closed}
            ...
    end.

2.3   混合消息获取(部分阻塞)

你可能认为把被动模式用到所有服务器上都合适。不幸的是,当我们在被动模式时,我们只能等待来自于一个socket的数据。这对于需要等待多个socket来源数据的服务器则不适用。

幸运的是我们可以用混合方式,既不是阻塞的也不是非阻塞的。我们以一次主动(active once)模式 {active,once} 打开socket。在这个模式中,socket是主动的,但是只能接收一条消息。在控制进程发出一条消息之后,他必须明确的调用 inet:setopts 以便让socket恢复并接收下一条消息。系统在这发生之前会一直阻塞。这是两种世界的最好结合点。如下是代码:

{ok,Listen} = gen_tcp:listen(Port,[...,{active,once}...]),
{ok,Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    receive
        {tcp,Socket,Data} ->
            ... 数据处理 ...
            %%准备好启用下一条消息时
            inet:setopts(Socket,[{active,once}]),
            loop(Socket);
        {tcp_closed,Socket} ->
            ...
    end.

使用 {active,once} 选项,用户可以实现高层次的数据流控制(有时叫交通管制),同时又防止了服务器被过多的消息洪水所淹没。

3   连接从哪里来?

假设我们建立了某种在线服务器,而却一直有人发垃圾邮件。那么我们需要做的第一件事就是确定他的来源。想要发现这件事,我们可以使用 inet:peername(Socket) 。

@spec inet:peername(Socket) -> {ok,{IP_Address,Port}} | {error,Why}

返回连接另一端的IP地址和端口号,以便服务器找到对方的地址。IP_Address是一个元组的整数形如 {N1,N2,N3,N4} ,而 {K1,K2,K3,K4,K5,K6,K7,K8} 则是IPv6的地址。这里的整数取值范围是0到255。

4   Socket的错误处理

socket的错误处理是非常简单的,基本上你不需要做任何事情。犹如前面讲过的,每个socket拥有控制进程(就是创建socket的进程)。如果控制进程死掉了,那么socket也会自动关闭。

这意味着,如果我们有,例如,一个客户端和服务器,而服务器因为编程错误死掉了。那么服务器拥有的socket会自动关闭,而客户端会收到 {tcp_closed,Socket} 消息。

我们可以通过如下的小程序测试这个机制:

error_test() ->
    spawn(fun() -> error_test_server() end),
    lib_misc:sleep(2000),
    {ok,Socket} = gen_tcp:connect("localhost",4321,[binary,{packet,2}]),
    io:format("connected to: ~p~n",[Socket]),
    gen_tcp:send(Socket,<<"123">>),
    receive
        Any ->
            io:format("Any=~p~n",[Any])
    end.

error_test_server() ->
    {ok,Listen} = gen_tcp:listen(4321,[binary,{packet,2}]),
    {ok,Socket} = gen_tcp:accept(Listen),
    error_test_server_loop(Socket).

error_test_server_loop(Socket) ->
    receive
        {tcp,Socket,Data} ->
            io:format("received:~p~n",[Data]),
            atom_to_list(Data),
            error_test_server_loop(Socket)
    end.

当我们运行它时,会看到如下输出:

1> socket_examples:error_test().
connected to:#Port<0.152>
received:<<"123">>
=ERROR REPORT==== 9-Feb-2007::15:18:15 ===
Error in process <0.77.0> with exit value:
 {badarg,[{erlang,atom_to_list,[<<3 bytes>>]},
 {socket_examples,error_test_server_loop,1}]}
Any={tcp_closed,#Port<0.152>}
ok

我们生成了一个服务器,并让它每两秒有一次启动的机会,并且发送包含二进制数据 <<"123">> 的消息。当这个消息到达服务器时,服务器会尝试计算 atom_to_list(Data) ,因为Data是一个二进制数据,所以会立即出错(系统监控器会发现并显示错误)。现在服务器端的控制进程已经死掉了,而且其socket也关闭了。客户端会收到 {tcp_closed,Socket} 消息。

5   UDP

现在我们看看UDP协议(User Datagram Protocol,用户数据报协议)。使用UDP,互联网上的机器之间可以互相发送小段的数据,叫做数据报。UDP数据报是不可靠的,这意味着如果客户端发送一系列的UDP数据报到服务器,收到的数据报顺序可能是错误的。不过收到的数据报肯定是正确的。大的数据报会被分为多个小的分片,IP协议负责重新组装这些分片,并最终交付给应用。

UDP是无连接的协议,这意味着客户端无需连接服务器即可发送消息。这也意味着程序更加适于大量客户端收发小的消息报文。

在Erlang中编写UDP客户端和服务器比TCP时更简单,因为我们无需管理连接。

5.1   简单的UDP服务器和客户端

首先,我们看看服务器,一个通用的服务器样式如下:

server(Port) ->
    {ok,Socket} = gen_udp:open(Port,[binary]),
    loop(Socket).

loop(Socket) ->
    receive
        {udp,Socket,Host,Port,Bin} ->
            BinReply = ... ,
            gen_udp:send(Socket,Host,Port,BinReply),
            loop(Socket)
    end.

这里比TCP协议的例子更简单,因为我们至少不需要关心连接关闭的消息。注意我们以二进制方式打开socket,驱动也会以二进制数据的形式将报文发送到应用。

注意客户端。这里有个简单的客户端。它仅仅打开UDP socket,发送消息到服务器,等待响应(或超时),然后关闭socket并返回从服务器接收到的值。

client(Request) ->
    {ok,Socket} = gen_udp:open(0,[binary]),
    ok = gen_udp:send(Socket,"localhost",4000,Request),
    Value = receive
                {udp,Socket,_,_,Bin} ->
                    {ok,Bin}
                after 2000 ->
                    error
                end,
    gen_udp:close(Socket),
    Value

我们必须拥有一个超时,否则UDP的不可靠会让我们永远得不到响应。

5.2   一个UDP阶乘服务器

我们可以很容易的构造一个UDP的阶乘服务器。代码模仿前一节。

-module(upd_test).
-export([start_server/0,client/1]).

start_server() ->
    spawn(fun() -> server(4000) end).

%% 服务器
server(Port) ->
    {ok,Socket}=gen_udp:open(Port,,[binary]),
    io:format("server opened socket:~p~n",[Socket]),
    loop(Socket).

loop(Socket) ->
    receive
        {udp,Socket,Host,Port,Bin} =Msg ->
            io:format("server received:~p~n",[Msg]),
            N=binary_to_term(Bin),
            Fac=fac(N),
            gen_udp:send(Socket,Host,Port,term_to_binary(Fac)),
            loop(Socket)
    end.

fac(0) -> 1;
fac(N) -> N*fac(N-1).

%% 客户端
client(N) ->
    {ok,Socket} = gen_upd:open(0,[binary]),
    io:format("client opened socket=~p~n",[Socket]),
    ok=gen_udp:send(Socket,"localhost",4000,term_to_binary(N)),
    Value=receive
        {udp,Socket,_,_,Bin}=Msg ->
            io:format("client received:~p~n",[Msg]),
            binary_to_term(Bin)
        after 2000 ->
            0
        end,
    gen_udp:close(Socket),
    Value

注意我增加了一些打印语句,所以我们可以看到程序执行的过程。我一般是开发阶段加很多打印语句,而在工作正常后就注释掉了。

现在让我们运行例子,首先启动服务器:

1> udp_test:start_server().
server opened socket:#Port<0.106>
<0.34.0>

这会在后台运行,所以我们发出一个客户端请求:

2> udp_test:client(40).
client opened socket=#Port<0.105>
server received:{udp,#Port<0.106>,{127,0,0,1},32785,<<131,97,40>>}
client received:{udp,#Port<0.105>,
                  {127,0,0,1},4000,
                  <<131,110,20,0,0,0,0,0,64,37,5,255,
                    100,222,15,8,126,242,199,132,27,
                    232,234,142>>}
815915283247897734345611269596115894272000000000

5.3   UDP的附加注释

我们必须注意的是UDP是无连接的协议,也就四海服务器无法拒绝客户端发送数据,甚至不知道客户端是谁。

大个的UDP报文会被切分成多个分片分别在网络上传输。分片发生在数据报长度大于最大传输单元(MTU)时,以确保通过路由器等网络设备以后仍然可以到达。一般的测量方法是开始于一个足够小的包(比如500字节),然后逐渐增加,直到发现MTU为止。如果在某一点发现数据报被丢弃了,那么,你就直到可以传输的最大报文长度了。

一个UDP数据报可以被传输两次,所以你必须小心的编码以防备这个事。因为他可能会对同一个请求的第二次出现而再做一次响应。想要防止,我们可以修改客户端代码来在每个请求中加一个唯一引用,并且检查响应中的这个唯一引用。想要生成一个唯一引用,我们可以用Erlang BIF的 make_ref ,就会生成一个全局唯一引用。远程过程调用现在可以这样写:

client(Request) ->
    {ok,Socket} = gen_udp:open(0,[binary]),
    Ref=make_ref(),
    B1=term_to_binary({Ref,Request}),
    ok=gen_udp:send(Socket,"localhost",4000,B1),
    wait_for_ref(Socket,Ref).

wait_for_ref(Socket,Ref) ->
    receive
        {udp,Socket,_,_,Bin} ->
            case binary_to_term(Bin) of
                {Ref,Val} ->
                    Val;
                {_SomeOtherRef,_} ->
                    wait_for_ref(Socket,Ref)
            end;
    after 1000 ->
        ...
    end.

6   向多台计算机广播

这里会展示如何构造一个广播通道。这些情况用到的很少,不过也许某天你可能会用到。

-module(broadcast).
-compile(export_all).

send(IoList) ->
    case inet:ifget("eth0",[broadaddr]) of
        {ok,[{broadaddr,Ip}]} ->
            {ok,S}=gen_udp:open(5010,[{broadcast,true}]),
            gen_udp:send(S,Ip,6000,IoList),
            gen_udp:close(S);
        _->
            io:format("Bad interface name, or\n"
                        "broadcasting not supported\n")
    end.

listen() ->
    {ok,S}=gen_udp:open(6000),
    loop(S).

loop(S) ->
    receive
        Any ->
            io:format("received:~p~n",[Any]),
            loop(S)
    end.

这里我们需要两个端口,一个用于发送广播,而另一个用来监听响应。我们选择5010用于发送请求而6000用于监听广播(这两个端口号没有任何联系,可以随便选择)。

一旦进程开启了端口5010用于发送广播,所有在网络里面的机器都可以调用 broadcast:listen() ,来打开端口6000监听广播消息。

broadcast:send(IoList) 广播 IoList 到所有本地网络的机器。

Note

为了实现这个,网络接口必须是正确的,而且必须支持广播。在我的iMac,例如,我使用了名字”en0″代替了”eth0″。同时注意,如果机器在不同的子网上面监听UDP报文,那么也无法送达,因为缺省的路由会丢弃UDP广播。

7   一个SHOUT广播服务器

作为本章的结束,我将会使用一个新的技巧来编写SHOUTcast服务器。SHOUTcast是由folks在Nullsoft开发的音频流数据协议( http://www.shoutcast.com/ )。SHOUTcast使用HTTP发送MP3-或者AAC-编码的音频数据。

想要看看他们如何工作,我们首先看看SHOUTcast协议。然后我们会看看整个服务器的结构。最后是代码。

7.1   SHOUTcast协议

SHOUTcast协议非常简单:

  1. 首先,客户端(如xmms、winamp或iTunes)发送HTTP请求到SHOUTcast服务器。这里有个xmms生成的请求:

    GET / HTTP/1.1
    Host: localhost
    User-Agent: xmms/1.2.10
    Icy-MetaData: 1
  2. 我的SHOUTcast服务器响应如下:

    ICY 200 OK
    icy-notice1: <BR>This stream requires <a href=http://www.winamp.com/>;Winamp</a><BR>
    icy-notice2: Erlang Shoutcast server<BR>
    icy-name: Erlang mix
    icy-genre: Pop Top 40 Dance Rock
    icy-url: http://localhost:3000
    content-type: audio/mpeg
    icy-pub: 1
    icy-metaint:24576
    icy-br: 96
    ... data ...
  3. 现在SHOUTcast服务器可以发送持续的数据流了,有如如下结构:

    FHFHFHF ...

F是正好是24576字节的MP3音乐数据(通过 icy-metaint 参数指定)。H是数据头块,由单一字节的K,随后加上16*k的数据组成。这样,最小的数据头块就是 <<0>> 。下一个数据头块则是:

<<1,B1,B2,...,B16>>

数据部分的内容则是字符串形式的 StreamTitle='...';StreamUrl='http://...'; ,剩余空间用0补满。

7.2   SHOUTcast服务器如何工作

构造服务器必须注意如下细节:

  1. 制作播放列表。我们的服务器使用一个文件来做,参见13.2节,读取ID3标签在232页。随即选取播放列表中的文件。
  2. 制作一个并行服务器,可以同时供应多个数据流。使用14.1节的并行服务器。
  3. 对每个音频文件,只发送音频数据而不发送内嵌的ID3标签。移除标签,使用 id3_tag_lengths ,已经在13.2节开发完成了。读取ID3标签在5.3节。寻找同步帧在92页。这些代码没有在这里。

7.3   SHOUTcast服务器伪码

看整个程序之前先看看不包含细节的工作流程:

start_parallel_server(Port) ->
    {ok,Listen}=gen_tcp:listen(Port,...),
    %%创建歌曲服务器
    PidSongServer=spawn(fun() -> songs() end),
    spawn(fun() -> par_connect(Listen,PidSongServer) end).

%%对每个连接生成一个进程
par_connect(Listen,PidSongServer) ->
    {ok,Socket}=gen_tcp:accept(Listen),
    spawn(fun() -> par_connect(Listen,PidSongServer) end),
    inet:setopts(Socket,[{packet,0},binary,{nodelay,true},
                         {active,true}]),
    get_request(Socket,PidSongServer,[]).

%%等待TCP请求
gen_request(Socket,PidSongServer,L) ->
    receive
        {tcp,Socket,Bin} ->
            ... 如果请求未完成则继续等待请求
            .... got_request(Data,Socket,PidSongServer)
        {tcp_closed,Socket} ->
            ... 在客户端中断时发生
    end.

%%获取了请求,发送响应
got_request(Data,Socket,PidSongServer) ->
    ... 分析请求数据
    gen_tcp:send(Socket,[response()]).
    play_songs(Socket,PidSongServer).

%%持续发送歌曲到客户端
play_songs(Socket,PidSongServer) ->
    ... PidSongServer维护着所有MP3文件的列表
    Song=rpc(PidSongServer,random_song),
    Header=make_header(Song),
    {ok,S}=file:open(File,[read,binary,raw]),
    send_file(1,S,Header,1,Socket),
    file:close(S),
    play_songs(Socket,PidSongServer).

send_file(K,S,Header,OffSet,Socket) ->
    ... 发送文件块到客户端
    ... 全部发送完成时返回

如果查看真实代码,你可以看到细节只有略微不同而已,机制是相同的,如下是完整代码。

-module(shout).

%% 在一个窗口中 > shout:start().
%% 另一个窗口 xmms http://localhost:3000/stream

-export([start/0]).
-import(lists,[map/2,reverse/1]).

-define(CHUNKSIZE,24576).

start() ->
    spawn(fun() ->
            start_parallel_server(3000),
            %% 现在开始睡眠,否则等待的socket会被关闭
            lib_misc:sleep(infinity)
        end).

start_parallel_server(Port) ->
    {ok,Listen}=gen_tcp:listen(Port,[binary,{packet,0},
            {reuseaddr,true},
            {active,true}]),
    PidSongServer=spawn(fun() -> songs() end),
    spawn(fun() -> par_connect(Listen,PidSongServer) end).

par_connect(Listen,PidSongServer) ->
    {ok,Socket}=gen_tcp:accept(Listen),
    spawn(fun() -> par_connect(Listen,PidSongServer) end),
    inet:setopts(Socket,[{packet,0},binary,{nodelay,true},{active,true}]),
    get_request(Socket,PidSongServer,[]).

get_request(Socket,PidSongServer,L) ->
    receive
        {tcp,Socket,Bin} ->
            L1=L ++ binary_to_list(Bin),
            %% 切割请求头检查
            case split(L1,[]) of
                more ->
                    %% 请求头未完成,需要更多数据
                    get_request(Socket,PidSongServer,L1);
                {Request,_Rest} ->
                    %% 头部完成
                    got_request_from_client(Request,Socket,PidSongServer)
            end;
        {tcp_closed,Socket} ->
            void;
        _Any ->
            %% 跳过
            get_request(Socket,PidSongServer,L)
    end.

split("\r\n\r\n" ++ T,L) -> {reverse(L),T};
split([H|T],L) -> split(T,[H|L]);
split([],_) -> more.

got_request_from_client(Request,Socket,PidSongServer) ->
    Cmds=string:tokens(Request,"\r\n"),
    Cmds1=map(fun(I) -> string:tokens(I," ") end,Cmds),
    is_request_for_stream(Cmds1),
    get_tcp:send(Socket,[response()]),
    play_songs(Socket,PidSongServer,<<>>).

play_songs(Socket,PidSongServer,SoFar) ->
    Song=rpc(PidSongServer,random_song),
    {File,PrintStr,Header}=unpack_song_descriptor(Song),
    case id3_tag_lengths:file(File) of
        error ->
            play_songs(Socket,PidSongServer,SoFar);
        {Start,Stop} ->
            io:format("Playing: ~p~n",[PrintStr]),
            {ok,S}=file:open(File,[read,binary,raw]),
            SoFar1=send_file(S,{0,Header},Start,Stop,Socket,SoFar),
            file:close(S),
            play_songs(Socket,PidSongServer,SoFar1)
    end.

send_file(S,Header,OffSet,Stop,Socket,SoFar) ->
    Need=?CHUNKSIZE - size(SoFar),
    Last=OffSet+Need,
    if
        Last>=Stop ->
            Max=Stop-OffSet,
            {ok,Bin}=file:pread(S,OffSet,Max),
            list_to_binary([SoFar,Bin]);
        true ->
            {ok,Bin}=file:pread(S,OffSet,Need),
            write_data(Socket,SoFar,Bin,Header),
            send_file(S,bump(Header),OffSet+Need,Stop,Socket,<<>>)
    end.

Warning

剩余代码主要是文件操作和socket发送数据部分,意义已经不大了。所以略掉。

7.4   运行SHOUTcast服务器

想要运行测试,我们需要做如下三步:

  1. 生成播放列表
  2. 开启服务器
  3. 用客户端连接服务器

7.5   构造播放列表

构造播放列表需要完成如下三步:

  1. 转到代码的目录

  2. 编辑函数 start1 的启动路径到文件 mp3_manager.erl 以便指向包含MP3文件的根目录。

  3. 编译 mp3_manager ,并执行 mp3_manager:start1() ,你可以看到如下输出:

    1> c(mp3_manager).
    {ok,mp3_manager}
    2> mp3_manager:start1().
    Dumping term to mp3data
    ok

如果你有兴趣,你可以看看文件 mp3data 来看分析结果。

7.6   启动SHOUTcast服务器

通过shell命令启动服务器:

1> shout:start().
...

7.7   测试服务器

  1. 到另外一个窗口启动音频播放器,定位到 http://localhost:3000
  2. 查看诊断输出。
  3. 享受音乐

8   深入挖掘

本章只是讲解了操作Socket的基本操作。你可以查阅更多关于Socket接口的文档,包括 gen_tcpgen_udpinet