Archive for April, 2008

sendpkt已经更新到winpcap4.0.1了

Monday, April 28th, 2008

sendpkt已经更新到winpcap4.0.1了

因为一直崇尚“最新的软件不一定是最好的”,所以向来不适用最新版本的各类软件,不过终于在所有朋友的催促之下,将winpcap由3.1升级到4.0.1了。并且提供了sendpkt的新windows二进制发行版。

sendpkt是提供win32/linux兼容接口的发包函数库。倪补了长久以前python只能抓包不能发包的问题,另外同时提供的兼容接口,也提高了程序的可移植性和适用范围。

最近因为给朋友帮忙封装了供lua使用的winpcap,现在已经积累了好多东西了,可以考虑以后把一些常用的组包和ARP查询等函数一起加入提供方便。再者,就是仔细考虑一下自己封装一个供win32使用的使用mingw编译的libnet包。毕竟libnet总比winpcap自带的那个要专业许多,也省事许多。

至于解包,我现在还没找到比较合适的库,一直自己写呢,现在已经完善而稳定的包含ethernet、arp、ip包三种,tcp包几天内就要开始写了。

http://sendpkt.googlecode.com

我要重写pypcap

Monday, April 28th, 2008

我要重写pypcap

接触信息安全由来已久,因为一直用python比较多,于是刚开始玩抓包时也是用pypcap的。开始时很方便,发现原来生活可以这么美好,抓包原来可以放在循环里的。后来,问题一点点出来了。列举一下,当然这里也有我对pcap知识不够全面的问题:

1、在不同机器上setnonblock后效果不同,有些机器上设置以后可以支持Ctrl+C的中止,而有些机器上则直接进入死循环,死得很惨。
2、pcap_next_ex()函数丢失了,这个可是用于支持可超时抓包的重要工具。
3、建立pcap对象时无法指定超时,同样很好理解,不过这里也顺便把pcap_dispatch()函数搞得很惨。

pypcap最初是用pyrex写的,很好的避免了很多复杂性,不过同样的问题是丢失了很多细节控制。最终的发行版本已经是使用pyrex转换后的C代码了。

直到最近我给lua加上pcap的支持模块时,才开始真正接触pcap的更多细节,这时候恍然大悟。我在pypcap中郁闷好久的功能,其实pcap本身已经提供了,只是被pypcap给忽略掉了。唉,以前我还经常嘲笑有IDE综合症的人呢,现在自己也不小心落入了这个陷阱。

Python的产品环境部署

Wednesday, April 23rd, 2008

Python的产品环境部署

作者: gashero

目录

1   Python的版本及安装

产品环境最常用的Python运行环境是Python 2.4.x系列,至今最新版本是2.4.4。而Centos4.x默认附带的Python为2.3.x系列,不支持某些Python2.4.x的功能和第三方插件。需要重新安装。

1.1   Python 2.4.4的安装

首先得到Python2.4.4的安装包 Python-2.4.4.tar.bz2 ,解压/编译/安装的步骤与一般的tarball方式相同:

$ su
# tar xjvf Python-2.4.4.tar.bz2
# cd Python-2.4.4
# ./configure
# make
# make install

这样的过程最终会把Python安装到 /usr/local/bin 下。

1.2   修改选择的Python版本

经过这样的编译安装之后,系统中会同时有两个版本的Python存在。默认情况下,会使用新安装的Python-2.4.4。不过对于某些运行方 式,如crontab,则会调用旧版本的。所以应该删除所有python->python2.3的符号链接,并且建立python2.4-> python的符号链接。如果有直接命名为python的Python2.3版本,那么直接删除即可。

查找所有可以被$PATH索引到的Python可执行文件版本:

$ whereis python

2   安装普通的Python第三方模块

一般的纯Python第三方模块的安装过程如下,假设模块名称为 XXX-x.y.z.tar.gz

$ tar xzvf XXX-x.y.z.tar.gz
$ cd XXX-x.y.z
$ su
# python setup.py install
... ...
# exit
$ python
>>> import XXX
>>>

如上过程已包含测试,即模块的导入成功。

3   安装特别的扩展模块

有些特别的扩展模块需要特别的安装方式。

3.1   MySQLdb

Python连接MySQL数据库的扩展模块。需要在机器上已经安装了MySQL的客户端开发包,包含已经安装的源码。Python还需要可以找到MySQL客户端的配置时(configure)的配置文件。

安装过程如下(已忽略解压缩过程):

# python setup.py config
# python setup.py build
# python setup.py install

4   通过 easy_install 安装扩展模块

这是PEAK开发的新的Python扩展包方式,使用方式还不是很广泛,但是有些模块必须使用他来安装。比如 MySQLdb-1.2.2 ,在使用相同的安装命令时,内部就会调用 easy_install 来安装。

安装命令与默认的方式相同。

另一种安装方式:先下载 easy_install 然后通过他在线安装Python扩展模块,有如ubuntu的apt-get安装方式,不需要先下载好模块的文件。这种方式需要先下载 ez_setup.py ,地址如下 http://peak.telecommunity.com/dist/ez_setup.py

下载后执行该模块即可在线安装最新版本的 easy_install

# python ez_setup.py

通过这种方式就安装成功了 easy_install ,之后可用如下方式安装扩展模块 XXX

# easy_install XXX

比如安装MySQLdb:

# easy_install MySQLdb

一般不推荐使用这种方式安装,因为所有的模块都是必须在线安装,产品环境的很多服务器是不允许上网的。另外,这种方式并不方便控制需要安装的模块的版本。这种版本差异可能是致命的,所以必须非常严格的控制。

StacklessPython简要笔记

Wednesday, April 23rd, 2008

StacklessPython简要笔记

作者: gashero

目录

1   微进程tasklet

stackless的基本构成单元,一种可调度对象。

1.1   建立微进程对象

示例:

import stackless
def print_x(x):
    print x

stackless.tasklet(print_x)('one')
stackless.run()

微进程建立后并不运行,直到调用 stackless.run() 才开始运行。

1.2   调度

微进程执行的函数内部可以调用 stackless.schedule() 来暂停当前微进程而把执行权交给执行队列的下一个微进程。

这也是在stackless中实现协程的基本方式。

1.3   轻量级进程

class HackySacker:
    def __init__(self):
        #todo...
        self.channel=stackless.channel()
        stackless.tasklet(self.messageLoop)()
    def messageLoop(self):
        while True:
            message=self.channel.receive()
            if message=='exit':
                return
            #todo...
            stackless.schedule()    #这里注意要调度一下

执行速度比线程的更快,而且可以允许10000个微进程而不死掉。

2   通道channel

用于微进程之间传递信息和控制微进程的运行流程。可以替代传统线程程序中的Queue模块的互斥队列。

2.1   交换信息

示例:

import stackless
channel=stackless.channel()
def recv_tasklet():
    msg=channel.receive()
    print msg
def send_tasklet():
    channel.send('hello')

调用 channel.receive() 时会阻塞当前微进程,等待消息到来。调用 channel.send() 时也会阻塞,直到有微进程读取了消息。

2.2   任务分发

如果一个入口微进程将消息源源不断的发送到通道,而多个工作者微进程等待接受消息,那么每个消息只能被一个微进程所接受,而不是被所有接受。

2.3   事件管理器

使用channel实现的一个基类,用于事件的等待与处理。

class EventHandler:
    def __init__(self,*outputs):
        if outputs==None:
            self.outputs=[]
        else:
            self.outputs=list(outputs)
        self.channel=stackless.channel()
        stackless.tasklet(self.listen)()
    def listen(self):
        while True:
            val=self.channel.receive()
            self.processMessage(val)
            for output in self.outputs:
                self.notify(output)
    def processMessage(self,val):
        pass
    def notify(self,output):
        pass
    def registerOutput(self,output):
        self.outputs.append(output)
    def __call__(self,val):
        self.channel.send(val)

功能:

  1. 通过 listen 方法,持续的监听通道上传来的消息。
  2. 通过 processMessage 处理收到的消息。
  3. 通过 notify 方法将收到的结果发送到输出端。
  4. registerOutput 可以添加新加的输出端。
  5. __call__ 可以方便的用对象后加参数来给它发消息。

3   协程coroutine

使用通道实现,两个函数都在循环中等待对方发来的消息。

3.1   乒乓球的例子

import stackless
ping_channel=stackless.channel()
pong_channel=stackless.channel()
def ping():
    while ping_channel.receive():   #在此阻塞
        #todo...
        pong_channel.send('from ping')
def pong():
    while pong_channle.receive():
        #todo...
        ping_channel.send('from pong')
stackless.tasklet(ping)()
stackless.tasklet(pong)()
stackless.tasklet(ping_channel.send)('startup') #启动循环的微进程
stackless.run()

在Twisted中使用线程

Wednesday, April 16th, 2008

在Twisted中使用线程

译者: gashero

目录

1   以线程安全的模式运行代码

Twisted中的大部分代码都不是线程安全的。例如protocol向transport写入数据就不是线程安全的。因此我们需要一种方法来在主事件循环中进行调度。者可以使用函数 twisted.internet.interfaces.IReactorThreads.callFromThread 来实现:

from twisted.internet import reactor

def notThreadSafe(x):
    """做一些非线程安全的事情"""
    # ...

def threadSafeScheduler():
    """以线程安全方式运行"""
    reactor.callFromThread(notThreadSafe,3) #将会运行notThreadSafe(3)在主时间循环中

Note

译者注

callFromThread 意指从线程调用,这个方法是供线程调用的,并且使其指定的函数加入到主事件循环中执行。比如worker线程可以调用此方法将提交结果的函数加入到主事件循环中。这样就可以确保多线程的运行worker,而有可以使用线程安全的方式提交结果。

2   在线程中运行代码

有时我们希望在线程中运行代码,比如阻塞的存取API。Twisted提供了这样做的方法在 IReactorThread API 中。附加的工具在包 twisted.internet.threads 中提供。这些方法允许我们把任务排队以后在线程池中运行。

例如,在线程中运行一个函数,我们可以:

from twisted.internet import reactor

def aSillyBlockingMethod(x):
    import time
    time.sleep(2)
    print x

# 在线程中运行
reactor.callInThread(aSillyBlockingMethod,"2 secodns have passed")

Note

译者注

callInThread 意指在线程中运行,调用该方法需要在主事件循环中,而执行其传入的函数则是在线程中。可以与上一节提供的 callFromThread`结合使用,即在worker线程函数中调用 `callFromThread 提交结果。

3   工具函数

工具函数作为 twisted.internet.reactor 的一部分API提供,但是并不是在 twisted.internet.threads 中实现的。

如果我们有多个方法需要在线程中以队列方式运行,我们可以做:

from twisted.internet import threads

def aSillyBlockingMethodOne(x):
    import time
    time.sleep(2)
    print x

def aSillyBlockingMethodTwo(x):
    print x

# 排队后在线程中运行两个方法
commands=[(aSillyBlockingMethodOne,["calling first"])]
commands.append((aSillyBlockingMethodTwo,["and the second"],{}))
threads.callMultipleInThread(commands)

如果我们希望得到函数的运行结果,那么我们可以使用Deferred:

from twisted.internet import threads

def doLongCalculation():
    # ... do long calculation here ...
    return 3

def printResult(x):
    print x

# 在线程中运行,并且通过 defer.Deferred 获取结果
d=threads.deferToThread(doLongCalculation)
d.addCallback(printResult)

如果你希望在reactor线程中调用一个方法,并且获取结果,你可以使用 blockingCallFromThread

from twisted.internet import threads,reactor,defer
from twisted.web.client import getPage
from twisted.web.error import Error

def inThread():
    try:
        result=threads.blockingCallFromThread(reactor,getPage,"http://twistedmatrix.com/")
    except Error,exc:
        print exc
    else:
        print result
    reactor.callFromThread(reactor.stop)

reactor.callInThread(inThread)
reactor.run()

blockingCallFromThread 将会返回对象或者抛出异常,或者通过抛出到传递给他的函数。如果传递给它的函数返回了一个Deferred,他会返回Deferred回调的值或者抛出异常到errback。

4   管理线程池

线程池是在 twisted.python.threadpool.ThreadPool 中实现的。

我们可以修改线程池的大小,增加或者减少可用线程的数量,可以这么做:

from twisted.internet import reactor
reactor.suggestThreadPoolSize(30)

缺省的线程池大小依赖于使用的reactor,缺省的reactor使用最小为5个,最大为10个。在你改变线程池尺寸之前,确保你理解了线程和他们的资源使用方式。

在Twisted2.5.0中使用线程

译者: gashero

刚才翻译了对应版本8.0.0的Twisted的线程指南,但是我还是在用2.5.0,所以这里只记录与8.0.0的差异,不做重新翻译。

当你开始使用线程之前,确保你在启动程序的时候使用了如下:

from twisted.python import threadable
threadable.init()

这回让Twisted以线程安全的方式初始化,不过仍然注意,Twisted的大部分仍然不是线程安全的。

以线程安全的方式运行代码中初始化多了两行:

from twisted.python import threadable
threadable.init(1)

2.5.0的文档中没有 blockingCallFromThread 的例子。也许根本就没有这个方法。

实际我下载文档的版本是2.4.0,不过应该与2.5.0一样的。

Programming Erlang 第12章 接口技术 (完整)

Tuesday, April 15th, 2008

接口技术

译者: gashero

目录

假设我们需要以Erlang接口运行以C/Python/Shell编写的程序。想要实现这些,我们需要在一个单独的操作系统进程中运行这些程序,而不是在Erlang运行时系统中,他们之间以面向字节的通道完成通信。Erlang端通过 Port 控制。创建端口的进程成为端口的连接进程。连接进程拥有特殊意义:让所有来自扩展程序的消息都会以连接进程的PID来标志。所有的扩展程序消息都会发送到连接进程。

我们可以看看连接进程(C)与端口(P)和扩展操作系统进程的关系。

chpt12.00.jpg从程序员的角度,端口就像是Erlang进程。你可以发送消息给它,你可以注册(register)它,等等。如果扩展程序crash了,那么连接程序就会收到退出信号,如果连接进程死掉了,扩展进程也会被kill掉。

你可能会好奇与为什么要这么做。很多编程语言允许其他语言编写的程序连接到可执行文件。而在Erlang,我们为了安全性不允许这么做。如果我们连 接一个扩展程序到Erlang可执行程序,那么扩展程序的错误将会轻易的干掉Erlang。所以,其他语言编写的程序必须以单独的操作系统进程来运行。 Erlang运行时系统和扩展进程通过字节流通信。

1   端口

创建端口使用如下命令:

Port=open_port(PortName,PortSettings)

这会返回端口,而如下消息是会被发送到端口的(这些消息中PidC是连接进程的PID):

Port ! {PidC,{command,Data}} :发送数据到端口

Port ! {PidC,{connect,Pid1}} :改变控制进程的PID,从PidC到Pid1

Port ! {PidC,close} :关闭端口

连接进程会从扩展程序收到如下消息:

receive
    {Port,{data,Data}} ->
        ... 数据处理 ...

下面的节,我们会编写Erlang与C结合的简单例子。C程序尽可能的简单以避开细节,直接谈接口技术。

注意,下面的例子对接口机制和协议做了加亮。编码和解码复杂的数据结构是个困难的问题,这里没有谈及。在本章末尾,我们会指出一些用于其他编程语言的接口库。

2   扩展C程序的接口

我们先从C程序开始:

int twice(int x) {
    return 2*x;
}

int sum(int x, int y) {
    return x+y;
}

我们最终目标是从Erlang调用这些例程,我们希望看到的是这个样子(Erlang中):

X1=example1:twice(23),
Y1=example1:sum(45,32),

与用户的设置有关,example1是一个Erlang模块,所有与C接口的细节都被隐藏在了模块example1中。

我们的接口需要一个主程序,用来解码Erlang程序发来的数据。在我们的例子,我们首先定义端口和扩展C程序的协议。我们使用一个超级简单的协议,并展示如何在Erlang和C中实现。协议定义如下:

  • 所有包都以2字节的长度代码开头,后面跟着这些字节的数据。
  • 想要调用 twice(N) ,Erlang程序必须以特定形式编码函数调用。我们假设编码是2字节序列 [1,N] ;参数1表示调用函数 twice ,后面的N代表一个1字节的参数。
  • 调用 sum(N,M) ,我们编码请求到字节序列 [2,N,M] 。
  • 假设返回值都是单一的字节长度的。

扩展C程序和Erlang程序都必须遵守这个协议。作为例子,我们通过 sum(45,32) 来看看工作流程:

  1. 端口发送字节序列 0,3,2,45,32 到扩展程序。头两个字节0,3,表示包的长度是3,代码2表示调用扩展的 sum 函数,而 45 和 32 表示调用函数的参数。
  2. 扩展程序从标准输入(stdin)读取这5个字节,调用 sum 函数,然后把字节序列 0,2,77 写到标准输出(stdout)。前两个字节表示包长度,后面的77是结果(仍然是1字节长度)。

我们现在写在两端遵守协议的接口,先以C程序开始。

2.1   C程序

扩展的C程序需要编写3个文件:

  • example1.c :这个包含了我们需要调用的原始函数。
  • example1_driver.c :实现了字节流协议和对 example1.c 的调用例程。
  • erl_comm.c :包含读写内存缓冲区的例程。

example1_driver.c

#include <stdio.h>
typedef unsigned char byte;

int read_cmd(byte* buff);
int write_cmd(byte* buff, int len);

int main() {
    int fn, arg1, arg2, result;
    byte buff[100];

    while(read_cmd(buff)>0) {
        fn=buff[0];
        if (fn==1) {
            arg1=buff[1];
            result=twice(arg1);
        } else if (fn==2) {
            arg1=buff[1],
            arg2=buff[2],
            //如果希望调试,可以写到stderr
            //fprintf(stderr,"call sum %i %i\n",arg1,arg2);
            result=sum(arg1,arg2);
        }
        buff[0]=result;
        write_cmd(buff,1);
    }
}

这段代码运行一个无限循环,从标准输入stdin读取调用命令,并且将结果写入到标准输出。

如果希望调试,可以写到stderr,上面的例子里面就有相关的输出语句。

erl_comm.c 是对stdin和stdout读写带有2字节包头的数据包的代码。这么写是允许有分片的。

#include <unistd.h>
typedef unsigned char byte;

int read_cmd(byte* buf);
int write_cmd(byte* buf, int len);
int read_exact(byte* buf, int len);
int write_exact(byte* buf, int len);

int read_cmd(byte* buf) {
    int len;
    if (read_exact(buf,2)!=2)
        return(-1);
    len=(buf[0]<<8) | buf[1];
    return read_exact(buf,len);
}

int write_cmd(byte* buf, int len) {
    byte li;
    li=(len>>8) & 0xff;
    write_exact(&li,1);
    li=len & 0xff
    write_exact(&li,1);
    return write_exact(buf,len);
}

int read_exact(byte* buf, int len) {
    int i, got=0;
    do {
        if ((i=read(0,buf+got,len-got)<=0)
            return i;
        got+=i;
    } while (got<len);
    return len;
}

int write_exact(byte* buf, int len) {
    itn i,wrote=0;
    do {
        if ((i=write(1,buf+wrote,len-wrote)<=0)
            return i;
        wrote+=i;
    } while(wrote<len);
    return len;
}

这段代码处理带有2字节长度包头的数据包,所以用于匹配 {packet,2} 选项的端口驱动程序。

2.2   Erlang程序

Erlang端的驱动程序参考如下:

-module(example1).
-export([start/0,stop/0]).
-export([twice/1,sum/2]).

start() ->
    spawn(fun() ->
            register(example1,self()),
            process_flag(trap_exit,true),
            Port=open_port({spawn,"./example1"},[{packet,2}]),
            loop(Port)
        end).

stop() ->
    example1 ! stop.

twice(X) -> call_port({twice,X}).
sum(X,Y) -> call_port({sum,X,Y}).

call_port(Msg) ->
    example1 ! {call,self(),Msg},
    receive
        {example1,Result} ->
            Result
    end.

loop(Port) ->
    receive
        {call,Caller,Msg} ->
            Port ! {self(),{command,encode(Msg)}},
            receive
                {Port,{data,Data}} ->
                    Caller ! {example1,decode(Data)}
            end,
            loop(Port);
        stop ->
            Port ! {self(),close},
            receive
                {Port,closed} ->
                    exit(normal)
            end;
        {'EXIT',Port,Reason} ->
            exit({port_terminated,Reason})
    end.

encode({twice,X}) -> [1,X];
encode({sum,X,Y}) -> [2,X,Y];

decode([Int]) -> Int.

端口通过如下方式打开:

Port=open_port({spawn,"./example1"},[{packet,2}])

选项 {packet,2} 告诉系统自动在发送数据包时添加一个2字节的包头。所以,会在发送消息 {PidC,{command,[2,45,32]}} 自动加上两字节的长度包头,也就是发送 0,3,2,45,32 到扩展程序。

而输入端口会假设输入数据包也有2字节的长度包头,获取指定长度的数据包之后就会去掉长度包头。

完善程序还需要一个makefile来构建。命令 make example1 用于构建 open_port 调用的扩展程序。注意makefile同时也链接了本章稍后会用到的内联驱动。

makefile

.SUFFIXES: .erl .beam .yrl

.erl.beam:
    erlc -W $<

MODS=example1 example1_lid.c

all: ${MODS:%=%.beam} example1 example1_drv.so

example1: example1.c erl_comm.c example1_driver.c
    gcc -o example1 example1.c erl_comm.c example1_driver.c

example1_drv.so: example1_lid.c example.c
    gcc -o example1_drv.so -fpic -shared example1.c example1_lid.c

clean:
    rm example example1_drv.so *.beam

2.3   运行程序

现在运行程序:

1> example1:start().
<0.32.0>
2> example1:sum(45,32).
77
4> example1:twice(10).
20
...

这就完成了我们的第一个例子。

进入下个主题之前,必须注意的是:

  1. 例子程序没有尝试统一Erlang和C程序的整数。这里我们假设整数都是单字节的,并且忽略了精度和符号的问题。在现实的应用中必须注意类型和精度的问题。这是个困难的问题,因为erlang管理着不限制大小的整数,而像C一类的语言却必须管理整数的精度等等。
  2. 我们无法在运行扩展程序之前调用对应的erlang的函数(也就是运行 example1:start() 之前不能调用对应函数)。我们更希望可以自动的启动。这在18.7节会详细讲解。

3   open_port

前一节使用了 open_port 却没有介绍具体的参数。只是使用了增加了 {packet,2} 选项的 open_port 而已。 open_port 包含很多参数。

一些常用参数如下:

@spec open_port(PortName,[Opt]) -> Port

PortName是如下之一:

{spawn,Command}

启动一个扩展程序。Command是扩展程序的名字。Command会在Erlang的工作空间以外工作,除非找到了叫做Command的内联驱动。

{fd,In,Out}

允许Erlang进程存取一个已经打开的文件描述符。文件描述符 “In” 用作stdin,而文件描述符 “Out” 用作stdout。查看例子 http://www.erlang.org/examples/examples-2.0.html

Opt是如下之一:

{packet,N}

包前面加上N(1,2,4)字节长度的长度包头。

stream

消息不是按照包长度发送的。应用自己知道如何处理这些包。

{line,Max}

以行为单位传递消息。如果一行大于Max字节,则会切割为只有Max字节。

{cd,Dir}

仅用于 {spawn,Command} 选项,扩展程序的初始目录为Dir。

{env,Env}

仅用于 {spawn,Command} 选项。指定扩展程序可用的环境变量为Env。Env是一个列表的 {VarName,Value} 对。两个变量都是字符串。

这并不是完整的 open_port 参数列表。我们可以在参考手册的erlang模块里找到详细的描述。

4   内联驱动

有时候我们希望在Erlang运行时系统内运行一个外语程序。在这种情况下,程序以共享库的方式编写并动态链接到Erlang运行时系统。内联驱动对程序员来说就是一个端口程序,而且与端口程序使用相同的协议。

创建内联驱动是最有效率的工作方式,不过却很危险。任何致命错误都会干掉整个Erlang系统,并且影响到正在工作的所有进程。因为这个原因,非常不推荐内联驱动。

通过刚才的例子讲讲内联驱动。你需要三个文件:

  1. example1_lid.erl :这个是erlang的服务器。
  2. example1.c :包含我们需要调用的C函数,跟上一节的一样。
  3. example1_lid.c :调用 example1.c 中好函数的函数。

Erlang管理接口的代码如下:

-module(example1_lid).
-export([start/0,stop/0]).
-export([twice/1,sum/2]).

start() ->
    start("example1_drv").

start(SharedLib) ->
    case erl_ddl1:load_driver(".",SharedLib) of
        ok -> ok;
        {error,already_loaded} -> ok;
        _ -> exit({error,could_not_load_driver})
    end,
    spawn(fun() -> init(SharedLib) end).

init(SharedLib) ->
    register(example1_lid,self()),
    Port=open_port({spawn,SharedLib},[]),
    loop(Port).

stop() ->
    example1_lid ! stop.

twice(X) -> call_port({twice,X}).
sum(X,Y) -> call_port({sum,X,Y}).

call_port(Msg) ->
    example1_lid ! {call,self(),Msg},
    receive
        {example1_lid,Result} ->
            Result
    end.

loop(Port) ->
    receive
        {call,Caller,Msg} ->
            Port ! {self(),{command,encode(Msg)}},
            receive
                {Port,{data,Data}} ->
                    Caller ! {example1_lid,decode(Data)}
            end,
            loop(Port);
        stop ->
            Port ! {self(),close},
            receive
                {Port,closed} ->
                    exit(normal)
            end;
        {'EXIT',Port,Reason} ->
            io:format("~p~n",[Reason]),
            exit(port_terminated)
    end.

encode({twice,X}) -> [1,X];
encode({sum,X,Y}) -> [2,X,Y].

decode([Int]) -> Int.

对比该程序与较早的版本,看起来基本相同。

驱动程序由 driver 结构体组成。命令 make example1_drv.so 用于构建动态链接库。

// example1_lid.c
#include <stdio.h>
#include "erl_driver.h"

typedef struct {
    ErlDrvPort port;
} example_data;

static ErlDrvData example_drv_start(ErlDrvPort port, char* buff) {
    example_data* d=(example_data*)driver_alloc(sizeof(example_data));
    d->port=port;
    return (ErlDrvData)d;
}

static void example_drv_stop(ErlDrvData handle) {
    driver_free((char*) handle);
}

static void example_drv_output(ErlDrvData handle, char* buff, int bufflen) {
    example_data* d=(example_data*)handle;
    char fn=buff[0], arg=buff[1], res;
    if (fn==1) {
        res=twice(arg);
    } else if (fn==2) {
        res=sum(buff[1],buff[2]);
    }
    driver_output(d->port,&res,1);
}

ErlDrvEntry example_driver_entry={
    NULL,                   // F_PTR init, N/A
    example_drv_start,      // L_PTR start, 在端口打开时调用
    example_drv_stop,       // F_PTR stop, 在端口关闭时调用
    example_drv_output,     // F_PTR output, 当erlang发送数据到端口时
    NULL,                   // F_PTR ready_input
    NULL,                   // F_PTR ready_output
    "example1_drv",         // 驱动的名字
    NULL,                   // F_PTR finish, 卸载(unload)时
    NULL,                   // F_PTR control, port_command回调
    NULL,                   // F_PTR timeout, 保留
    NULL                    // F_PTR outputv, 保留
};

DRIVER_INIT(example_drv) {  //必须与driver_entry中的名字匹配
    return &example_driver_entry;
}

这里时运行程序:

1> c(example_lid).
{ok,example1_lid}
2> example1_lid:start().
<0.41.0>
3> example1_lid:twice(50).
100
4> example1_lid:sum(10,20).
30

5   注意

本章研究了如何以端口来调用扩展程序。除了使用端口协议之外,还可以使用一些其他的BIF来操作端口。这些都在erlang模块的手册中。

在这一点上,你可能会关心如何方便的发送复杂的数据结构,如字符串、元组等等到扩展程序?很不幸,答案是还没有比较简单的方法来实现。所有的端口只 是提供了很底层的机制来传输一系列字节而已。同样的问题也发生在Socket编程中。一个Socket提供了数据流与应用交互,对于数据流的解释还需要应 用自己做。

不过一些库提供了简单的交互方法,可以查看:

http://www.erlang.org/doc/pdf/erl_interface.pdf

Erl interface (ei) 是一系列的C例程和宏,用于编码和解码Erlang的扩展格式。在Erlang端,一个程序使用 term_to_binary 来将Erlang术语串行化,而在C端ei的例程可以把binary解包。ei也可以用于构造binary,而在Erlang使用 binary_to_term 来解包。

http://www.erlang.org/doc/pdf/ic.pdf

Erlang IDL Compiler (ic) 。这是一个Erlang的OMG IDL编译器实现。

http://www.erlang.org/doc/pdf/jinterface.pdf

Jinterface 是一系列Java到Erlang的接口工具。它提供了Erlang类型到Java类型的完全映射,编码和解码Erlang术语,连接Erlang进程等等。还有一大堆的扩展功能。

Erlang的语法,一个陷阱

Monday, April 14th, 2008

最近正在用Erlang写MySQL的通信协议,以前用Python实现的一个性能是非常高的,所以这次做系统也想使用这个协议做通信。

写着的时候发现个问题,这里有个bit语法的赋值语句:

BinFC=<<Number:2>>,

本想给变量BinFC赋值为一个binary类型,但是编译时一直提示出错,后来才想起来。符号”=<“在erlang中是小于等于的意思,而这样写就会先识别成一个布尔表达式,然后就出错了。

我的习惯是并不在操作符之间留空格,这次终于被教训了。不过也确实与Erlang这种古怪的风格有关。其他语言中的小于等于都是用”<=”的,偏偏erlang必须要使用”=<“,而且,又有这个binary语法。

唉……,继续写。

Programming Erlang 第6章 编译和运行(完整)

Monday, April 14th, 2008

编译和运行

译者: gashero

目录

上一章并没有详细的说明如何编译和运行程序,而只是使用了Erlang shell。这对小例子是很好的,但是会让你的程序更复杂,你将会需要一个自动处理过程来简化它。我们一般使用Makefile。

事实上有三种不同的方式可以运行程序。本章将会结合特定场合来讲解三种运行方式。

有时候也会遇到问题:Makefile失败、环境变量错误或者搜索路径不对。我们在遇到这些问题时也会帮你处理这些问题(issue)。

1   启动和停止Erlang shell

在Unix系统(包括Mac OS X),可以从命令行启动Erlang shell:

$ erl
Erlang (BEAM) emulator version 5.5.1 [source] [async-threads:0] [hipe]

Eshell V5.5.1  (abort with ^G)
1>

而在Windows系统,则可以点击erl的图标。

最简单的退出方法是按下 Ctrl+C (对Windows系统则是 Ctrl+Break ),随后按下 A 。如下:

BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded
       (v)ersion (k)ill (D)b-tables (d)istribution
a
$

另外,你也可以对 erlang:halt() 求值以退出。

erlang:halt() 是一个BIF,可以立即停止系统,也是我最常用的方法。不过这种方法也有个问题。如果正在运行一个大型数据库应用,那么系统在下次启动时就会进入错误回复过程,所以你应该以可控的方式关闭系统。

可控的关闭系统就是如果shell可用时就输入如下:

1> q().
ok
$

这会将所有打开的文件写入磁盘,停止数据库(如果在运行),并且按照顺序关闭所有OTP应用。 q() 其实只是shell上 init:stop() 的别名。

如果这些方法都不工作,可以查阅6.6节。

2   修改开发环境

当你启动程序时,一般都是把所有模块和文件都放在同一个目录当中,并且在这个目录启动Erlang。如果这样做当然没问题。不过如果你的应用变得更加复杂时,你可能需要把它们分成便于管理的块,把代码放入不同的目录。而且你从其他项目导入代码时,扩展代码也有其自己的目录结构。

2.1   设置装载代码的搜索路径

Erlang运行时系统包含了代码自动重新载入的功能。想要让这个功能正常工作,你必须设置一些搜索路径以便找到正确版本的代码。

代码装载功能是内置在Erlang中的,我们将在E.4节详细讨论。代码的装载仅在需要时才执行。

当系统尝试调用一个模块中的函数,而这个模块还没有装载时,就会发生异常,系统就会尝试这个模块的代码文件。如果需要的模块叫做 myMissingModule ,那么代码装载器将会在当前目录的所有目录中搜索一个叫做 myMissingModule.beam 的文件。寻找到的第一个匹配会停止搜索,然后就会把这个文件中的对象代码载入系统。

你可以在Erlang shell中看看当前装载路径,使用如下命令 code:get_path() 。如下是例子:

code:get_path()
[".",
"/usr/local/lib/erlang/lib/kernel-2.11.3/ebin",
"/usr/local/lib/erlang/lib/stdlib-1.14.3/ebin",
"/usr/local/lib/erlang/lib/xmerl-1.1/ebin",
"/usr/local/lib/erlang/lib/webtool-0.8.3/ebin",
"/usr/local/lib/erlang/lib/typer-0.1.0/ebin",
"/usr/local/lib/erlang/lib/tv-2.1.3/ebin",
"/usr/local/lib/erlang/lib/tools-2.5.3/ebin",
"/usr/local/lib/erlang/lib/toolbar-1.3/ebin",
"/usr/local/lib/erlang/lib/syntax_tools-1.5.2/ebin",
...]

管理装载路径两个最常用的函数如下:

@spec code:add_patha(Dir) => true | {error,bad_directory}

添加新目录Dir到装载路径的开头

@spec code:add_pathz(Dir) => true | {error,bad_directory}

添加新目录Dir到装载路径的末尾

通常并不需要自己来关注。只要注意两个函数产生的结果是不同的。如果你怀疑装载了错误的模块,你可以调用 code:all_loaded() (返回所有已经装载的模块列表),或者 code:clash() 来帮助你研究错误所在。

code 模块中还有其他一些程序可以用于管理路径,但是你可能根本没机会用到它们,除非你正在做一些系统编程。

按照惯例,经常把这些命令放入一个叫做 .erlang 的文件到你的HOME目录。另外,也可以在启动Erlang时的命令行中指定:

> erl -pa Dir1 -pa Dir2 ... -pz DirK1 -pz DirK2

其中 -pa 标志将目录加到搜索路径的开头,而 -pz 则把目录加到搜索路径的末尾。

2.2   在系统启动时执行一系列命令

我们刚才把载入路径放到了HOME目录的 .erlang 文件中。事实上,你可以把任意的Erlang代码放入这个文件,在你启动Erlang时,它就会读取和求值这个文件中的所有命令。

假设我的 .erlang 文件如下:

io:format("Running Erlang~n").
code.add_patha(".")
code.add_pathz("/home/joe/2005/erl/lib/supported").
code.add_pathz("/home/joe/bin").

当我启动系统时,我就可以看到如下输出:

$ erl
Erlang (BEAM) emulator version 5.5.1 [source] [async-threads:0] [hipe]

Running Erlang
Eshell V5.5.1  (abort with ^G)
1>

如果当前目录也有个 .erlang 文件,则会在优先于HOME目录的。这样就可以在不同启动位置定制Erlang的行为,这对特定应用非常有用。在这种情况下,推荐加入一些打印语句到启动文件;否则你可能忘记了本地的启动文件,这可能会很混乱。

某些系统很难确定HOME目录的位置,或者根本就不是你以为的位置。可以看看Erlang认为的HOME目录的位置,通过如下的:

1> init:get_argument(home).
{ok,[["/home/joe"]]}

通过这里,我们可以推断出Erlang认为的HOME目录就是 /home/joe

3   运行程序的其他方式

Erlang程序存储在模块中。一旦写好了程序,运行前需要先编译。不过,也可以以脚本的方式直接运行程序,叫做 escript 。

下一节会展示如何用多种方式编译和运行一对程序。这两个程序很不同,启动和停止的方式也不同。

第一个程序 hello.erl 只是打印 “Hello world” ,这不是启动和停止系统的可靠方式,而且他也不需要存取任何命令行参数。与之对比的第二个程序则需要存取命令行参数。

这里是一个简单的程序。它输出 “Hello world” 然后输出换行。 “~n” 在Erlang的io和io_lib模块中解释为换行。

-module(hello).
-export([start/0]).

start() ->
    io:format("Hello world~n").

让我们以3种方式编译和运行它。

3.1   在Erlang shell中编译和运行

$ erl
...
1> c(hello).
{ok,hello}
2> hello:start().
Hello world
ok

3.2   在命令行编译和运行

$ erlc hello.erl
$ erl -noshell -s hello start -s init stop
Hello World
$

Note

快速脚本:

有时我们需要在命令行执行一个函数。可以使用 -eval 参数来快速方便的实现。这里是例子:

erl -eval 'io:format("Memory: ~p~n", [erlang:memory(total)]).'\
    -noshell -s init stop

Windows用户:想要让如上工作,你需要把Erlang可执行文件目录加入到环境变量中。否则就要以引号中的全路径来启动,如下:

"C:\Program Files\erl5.5.3\bin\erlc.exe" hello.erl

第一行 erlc hello.erl 会编译文件 hello.erl ,生成叫做 hello.beam 的代码文件。第一个命令拥有三个选项:

-noshell :启动Erlang而没有交互式shell,此时不会得到Erlang的启动信息来提示欢迎

-s hello start :运行函数 hello:start() ,注意使用 -s Mod ... 选项时,相关的模块Mod必须已经编译完成了。

-s init stop :当我们调用 apply(hello,start,[]) 结束时,系统就会对函数 init:stop() 求值。

命令 erl -noshell ... 可以放入shell脚本,所以我们可以写一个shell脚本负责设置路径(使用-pa参数)和启动程序。

在我们的例子中,我们使用了两个 -s 选项,我们可以在一行拥有多个函数。每个 -s 都会使用 apply 语句来求职,而且,在一个执行完成后才会执行下一个。

如下是启动hello.erl的例子:

#! /bin/sh
erl -noshell -pa /home/joe/2006/book/JAERANG/Book/code\
    -s hello start -s init stop

Note

这个脚本需要使用绝对路径指向包含 hello.beam 。不过这个脚本是运行在我的电脑上,你使用时应该修改。

运行这个shell脚本,我们需要改变文件属性(chmod),然后运行脚本:

$ chmod u+x hello.sh
$ ./hello.sh
Hello world
$

Note

在Windows上, #! 不会起效。在Windows环境下,可以创建.bat批处理文件,并且使用全路径的Erlang来启动(假如尚未设置PATH环境变量)。

一个典型的Windows批处理文件如下:

"C:\Program Files\erl5.5.3\bin\erl.exe" -noshell -s hello start -s init stop

3.3   以Escript运行

使用escript,你可以直接运行你的程序,而不需要先编译。

Warning

escript包含在Erlang R11B-4或以后的版本,如果你的Erlang实在太老了,你需要升级到最新版本。

想要以escript方式运行hello,我们需要创建如下文件:

#! /usr/bin/env escript

main(_) ->
    io:format("Hello world\n").

Note

开发阶段的导出函数

如果你正在开发代码,可能会非常痛苦于需要不断在导出函数列表增删函数。

一个声明 -compile(export_all) ,告知编译器导出所有函数。使用这个可以让你的开发工作简化一点。

当你完成开发工作时,你需要抓实掉这一行,并添加适当的导出列表。首先,重要的函数需要导出,而其他的都要隐藏起来。隐藏方式可以按照个人喜好,提供接口也是一样的效果。第二,编译器会对需要导出的函数生成更好的代码。

在Unix系统中,我们可以立即按照如下方式运行:

$ chmod u+x hello
$ ./hello
Hello world
$

Note

这里的文件模式在Unix系统中表示可执行,这个通过chmod修改文件属性的步骤只需要执行一次,而不是每次运行程序时。

在Windows系统中可以如下方式运行:

C:\> escript hello
Hello world
C:\>

Note

在以escript方式运行时,执行速度会明显的比编译方式慢上一个数量级。

3.4   程序的命令行参数

“Hello world”没有参数。让我们重新来做一个计算阶乘的程序,可以接受一个参数。

首先是代码:

-module(fac).
-export([fac/1]).

fac(0) -> 1;
fac(N) -> N*fac(N-1).

我们可以编译 fac.erl 并且在Erlang中运行:

$ erl
1> c(fac).
{ok,fac}
2> fac:fac(25).
15511210043330985984000000

如果我们想要在命令行运行这个程序,我们需要修改一下他的命令行参数:

-module(fac1).
-export([main/1]).

main([A]) ->
    I=list_to_integer(atom_to_list(A)).
    F=fac(I),
    io:format("factorial ~w= ~w~n",[I,F]),
    init:stop().

fac(0) -> 1;
fac(N) -> N*fac(N-1).

让我们编译和运行:

$ erlc fac1.erl
$ erl -noshell -s fac1 main 25
factorial 25 = 15511210043330985984000000

Note

事实上这里的main()函数并没有特殊意义,你可以把它改成任何名字。重要的一点是命令行参数中要使用函数名。

最终,我们可以把它直接作为escript来运行:

#! /usr/bin/env escript

main([A]) ->
    I=list_to_integer(A),
    F=fac(I),
    io:format("factorial ~w = ~w~n",[I,F]).

fac(0) -> 1;
fac(N) -> N*fac(N-1).

运行时无需编译,可以直接运行:

$ ./factorial 25
factorial 25 = 15511210043330985984000000
$

4   通过makefile自动编译

当编写一个大型程序时,我希望只在需要的时候自动编译。这里有两个原因。首先,节省那些一遍遍相同的打字。第二,因为经常需要同时进行多个项目,当再次回到这个项目时,我已经忘了该如何编译这些代码。而 make 可以帮助我们解决这个问题。

make 是一个自动工具,用以编译和发布Erlang代码。大多数我的makefile都非常简单,并且我有个模板可以解决大多数问题。

我不想在这里解释makefile的意义。而我会展示如何将makefile用于Erlang程序。推荐看看本书附带的makefile,然后你就会明白他们并且构建你自己的makefile了。

4.1   一个makefile模板

如下的模板是很常用的,很多时候可以基于他们来做实际的事情:

#让这一行就这么放着
.SUFFIXES: .erl .beam .yrl

.erl.beam:
    erlc -W $<

.yrl.erl:
    erlc -W $<

ERL=erl -boot start_clean

#如下是需要编译的模块列表
#如果一行写不下可以用反斜线 \ 来折行

#修改如下行
MODS=module1 module2 \
     module3 ....

#makefile中的第一个目标是缺省目标
#也就是键入make时默认的目标
all: compile

compile: ${MODS:%=%.beam} subdirs

#指定编译需求,并且添加到这里
special1.beam: special1.erl
    ${ERL} -Dflag1 -W0 special1.erl

#从makefile运行一个应用
application1: compile
    ${ERL} -pa Dir1 -s application1 start Arg1 Arg2

#在子目录编译子目录的目标
subdirs:
    cd dir1; make
    cd dir2; make
    ...

#清除所有编译后的文件
clean:
    rm -rf *.beam erl_crash.dump
    cd dir1; make clean
    cd dir2; make clean

makefile开始于一些编译Erlang模块的规则,包括扩展名为 .yrl 的文件(是Erlang词法解析器的定义文件)(Erlang的词法解析器生成器为yecc,是yacc的Erlang版本,参考 http://www.erlang.org/contrib/parser_tutorial-1.0.tgz )。

重要的部分开始于这一行:

MODS=module1 module2

这是我们需要编译的Erlang模块的列表。

任何在MODS列表中的模块都会被Erlang命令 erlc Mod.erl 来编译。一些模块可能会需要特殊的待遇,比如模板中的special1。所以会有单独的规则用来处理这些。

makefile中有一些目标。一个目标是一个字符串,随后是冒号”:”。在makefile模板中,all,compile和special1.beam都是目标。想要运行makefile,你可以在shell中执行:

$ make [Target]

参数Target是可选的。如果Target忽略掉了,则假设被第一个目标。在前面的例子中,目标all就是缺省的。

如果我希望构建软件并运行,我们就会使用 make application1 。如果我希望这个作为缺省行为,也就是在我每次键入 make 时都会执行,我就可以把application1目标作为第一个目标。

目标clean会删除所有编译过的Erlang目标代码和 erl_crash.dump 。crashdump包含了帮助调试应用程序的信息。查看6.10了解更多。

4.2   实际修改makefile模板

我并不热衷于让自己的程序变得混乱,所以我一般以一个不包含无用行的模板开始。所以得到的makefile会更短,而且也更易于阅读。另外,你也可以使用到处都是变量的makefile,以方便定制。

一旦我遍历整个流程,就会得到一个很简单的makefile,有如如下:

.SUFFIXES: .erl .beam

.erl.beam:
    erlc -W $<

ERL = erl -boot start_clean

MODS=module1 module2 module3

all: compile
    ${ERL} -pa '/home/joe/.../this/dir' -s module1 start

compile: ${MODS:%=%.beam}

clean:
    rm -rf *.beam erl_crash.dump

5   Erlang shell中的命令编辑

Erlang shell包含了内置的行编辑器。它可以执行emacs的一部分行编辑命令,前一行可以以多种方式来调用。可用命令如下,注意 “^Key” 是指按下 “Ctrl+Key” 。

命令 描述
^A 开始一行
^E 最后一行
^F或右箭头 向前一个字符
^B或左箭头 向后一个字符
^P或上箭头 前一行
^N或下箭头 下一行
^T 调换最后两个字符的顺序
Tab 尝试补全模块名或函数名

6   解决错误

Erlang有时会发生一些问题而退出。下面是可能的错误原因:

  1. shell没有响应
  2. Ctrl+C处理器被禁用了
  3. Erlang以 -detached 标志启动,这时你甚至感觉不到他在运行
  4. Erlang以 -heart Cmd 标志启动。这会让OS监控器进程看管Erlang的OS进程。如果Erlang的OS进程死掉,那么就会求值 Cmd 。一般来说 Cmd 只是用于简单的重启Erlang系统。这个是用于生产可容错系统的一个重要技巧,用于结点-如果erlang自己死掉了(基本不可能发生),就会自己重启。这个技巧对Unix类操作系统使用 ps 命令来监控,对Windows使用任务管理器。进行心跳信息检测并且尝试kill掉erlang进程。
  5. 有且确实很无奈的错误,留下一个erlang僵尸进程。

7   当确实出错时

本节列出了一些常见错误和解决方案。

7.1   未定义(丢失)的代码

如果你尝试调用一个模块,而代码载入器却无法找到时(比如搜索路径出错),你将会遇到 undef 错误信息,如下是例子:

1> glurk:oops(1,23).
** exited: {undef,[{glurk,oops,[1,23]},
                   {erl_eval,do_apply,5},
                   {shell,exprs,6},
                   {shell,eval_loop,3}]}**

事实上,这里没有一个叫做glurk,但是这里没有关联问题,你只需要关心错误信息就可以了。错误信息告诉我们系统尝试调用glurk模块的函数oops,参数1是23,这四个事物中的一个出了问题。

  1. 有可能不存在模块glurk,找不到或者根本不存在。可能是拼写错误。
  2. 存在模块glurk,但是还没有编译。系统尝试寻找文件 glurk.beam ,但是没有找到。
  3. 有模块glurk,但是包含 glurk.beam 的目录并没有在模块搜索路径中。想要修复这个问题,你必须改变搜索路径,一会再讲。
  4. 在载入路径中有多个不同版本的 “glurk” ,而我们选择了错误的。这是个罕见的错误,但是有可能会发生。如果你担心发生,你可以运行 code:clash() 函数,这将会报告代码搜索路径中的重复模块。

Note

有人看见我的分号了么?

如果你忘记了两个子句之间的分号,或者是放了个句点,那你可就麻烦了。

如果你定义了一个函数 foo/2 在模块bar的1234行,并且在该写分号的地方用了句点,那么编译器会提示:

bar.erl:1234 function foo/2 already defined.

不要这么做,确保你的子句总是以分号分开。

7.2   我的makefile无法make

你怎么能让makefile出错?当然,尽管这本书不是讲makefile的,但是我还是会给出一些常见错误提示。如下是两个常见的错误:

  1. makefile中的空白:makefile是很严格而挑剔的。虽然你看不到他们,但是缩进行必须以一个tab字符开始。如果这里有其他空白,那么make就会出错,而你会看到一些错误。(当然折行,也就是上一行末尾有 “\” 字符的不算)

  2. 丢失的erlang文件。如果在MODS变量中定义的模块丢失了,你会得到错误信息。举例,假设MODS中包含一个模块叫做glurk,但是却没有glurk.erl文件。这种情况下,make会出错,并给出如下信息:

    $ make
    make: *** No rule to make target 'glurk.beam',
                needed by 'compile'. Stop.

另外模块名拼写错误也会导致这个问题。

7.3   shell没有响应了

如果shell对命令不做出响应了,有可能发生一系列的问题。shell进程可能会crash了,或者你可能放任一个命令永远无法终止。你可能忘了输入关闭的括号,或者忘记了 点号回车

无论哪种问题,你都可以通过按下 “Ctrl+G” 啦关闭当前shell,并且学着如下的例子:

1> receive foo -> true end.
^G
User switch command
--> h
c [nn]    - connect to job
i [nn]    - interrupt job
k [nn]    - kill job
j         - list all jobs
s         - start local shell
r [node]  - start remote shell
q         - quit erlang
? | h     - this message
--> j
1* {shell,start,[init]}
--> s
--> j
1 {shell,start,[init]}
2* {shell,start,[]}
--> c 2
Eshell V5.5.1  (abort with ^G)
1> init:stop().
ok
2> $
  1. 第一行告诉shell接收foo消息,当时不会有人发送消息到shell,shell会进入一个无限的等待,所以需要按下 “Ctrl+G” 。
  2. “–> h” ,系统进入了 “shell JCL” 模式(Job Control Language),这里我可以不知道任何命令所以键入 h 来获取帮助。
  3. “–> j” ,列出所有任务。任务的号码1标记为星号,表示为默认的shell。所有命令选项参数 [nn] 使用缺省的shell除非指定了参数。
  4. “–> s” ,键入s以启动一个新的shell,而在其后可以看到已经把2号标记为默认的shell了
  5. “–> c 2″ ,让我连接到新启动的2号shell,然后停止了系统。

有如你所见,你可以拥有多个shell,并且通过按下Ctrl+G以后来切换。你可以通过命令r在远程启动一个shell方便调试。

8   获取帮助

在Unix类系统中,如下:

$ erl -man erl
NAME
erl - The Erlang Emulator

DESCRIPTION
...

你也可以通过这种方式获取模块的帮助文档:

$ erl -man lists
MODULE
lists - List Processing Functions
...

Note

在Unix系统,man手册页并不是默认安装的,如果命令 erl -man ... 没有工作,你需要自己安装man手册页。所有的man手册页都是单独压缩存档的,可以到 http://www.erlang.org/download.html 下载。man手册页可以通过root解压并安装到Erlang的安装目录,通常为 /usr/local/lib/erlang

也可以下载HTML文档。在Windows下默认会安装HTML文档,可以通过开始菜单访问到。

9   定制环境

Erlang shell拥有一系列的内置命令。你可以通过 help() 来看到这些命令:

1> help().
** shell internal commands **
b()        -- display all variables bindings
e(N)       -- repeat the expression in query <N>
f()        -- forget all variable bindings
f(X)       -- forget the binding of variable X
h()        -- history
...

所有这些命令都是定义在模块 shell_default 中。

如果你想定义自己的命令,只需要创建一个叫做 user_default 的模块,例如:

-module(user_default).
-compile(export_all).

hello() ->
    "Hello joe how are you?".

away(Time) ->
    io:format("Joe is away and will be back in ~w minutes~n",[Time]).

一旦你编译了它并且放在了你的载入路径,那么你可以调用 user_default 中的任何函数,而不用给出模块名:

1> hello().
"Hello joe how are you?"
2> away(10).
Joe is away and will be back in 10 minutes
ok

10   crash dump

当erlang crash时,他会生成一个文件叫做 erl_crash.dump 。文件内容包含出错的地方的相关信息。想要分析crash dump这里有个基于web的分析器。启动这个分析器,你可以给出如下命令:

1> webtool:start().
WebTool is available at http://localhost:8888/
Or http://127.0.0.1:8888/
{ok,<0.34.0>}

然后让浏览器指向 http://localhost:8888/ 。你就可以看到刚才发生的问题了。

现在我们看到的只是问题回溯的原料,具体还需要你仔细的分析,并发的程序不太好调试。从这时开始,你将离开熟悉的环境,不过这时也正是历险的开始。

Programming Erlang第14章Socket编程[完整]

Thursday, April 10th, 2008

Socket编程

译者: gashero

目录

大多数我写的更有趣的程序都包含了Socket。一个Socket是一个允许机器与Internet上另一端使用IP通信的端点。本章关注Internet上两种核心网络协议:TCP和UDP。

UDP允许应用发送简短报文(叫做数据报datagram)到另一端,但是对报文没有交付的担保。并且可能在到达时有错误的顺序。而TCP则提供了可靠的字节流,并且确保在连接后传输数据的顺序也是对的。

为什么Socket编程很有趣呢?因为它允许应用于其他Internet上的机器通信,而这些比本地操作更有潜力。

有两种主要的库用于Socket编程: gen_tcp 用于TCP编程、 gen_udp 用于UDP编程。

在本章,我们看看如果使用TCP和UDP socket编写客户端和服务器。我们将会尝试多种形式的服务器(并行、串行、阻塞、非阻塞)并且看看通信接口应用如何将数据流传递给其他应用。

1   使用TCP

我们学习Socket编程的历险从一个从服务器获取TCP数据的程序开始。然后我们会写一个简单的串行TCP服务器展示如何并行的处理多个并发会话。

1.1   从服务器获取数据

我们先写一个小函数(标准库的 http:request(Url) 实现相同的功能,但是这里是演示TCP的)来看看TCP socket编程获取 http://www.google.com 的HTML页面:

nano_get_url() ->
    nano_get_url("www.google.com").

nano_get_url(Host) ->
    {ok,Socket}=gen_tcp:connect(Host,80,[binary,{packet,0}]),
    ok=gen_tcp:send(Socket,"GET / HTTP/1.0\r\n\r\n"),
    receive_data(Socket,[]).

receive_data(Socket,SoFar) ->
    receive
        {tcp,Socket,Bin} ->
            receive_data(Socket,[Bin|SoFar]);
        {tcp_closed,Socket} ->
            list_to_binary(reverse(SoFar))
    end.

它如何工作呢?

  1. 我们通过 gen_tcp:connect 在 http://www.google.com 打开TCP协议80端口。connect的参数binary告知系统以binary模式打开socket,并且以二进制方式传递数据到应用。 {packet,0} 意味着无需遵守特定格式即可将数据传递到应用。
  2. 我们调用 gen_tcp:send 并发送消息 GET / HTTP/1.0\r\n\r\n 到socket。然后我们等待响应。响应并不会一次性得到,而是分片的、分时间的。这些分片是按照一系列报文的方式接收的,并且通过打开的socket发送到进程。
  3. 我们接收一个 {tcp,Socket,Bin} 报文。第三个参数是binary。这是因为我们已经使用二进制方式打开了socket。这是从WEB服务器发到我们的一个消息报文。我们把这个报文加到分片列表并等待下一个报文。
  4. 我们接收到 {tcp_closed,Socket} 报文,这在服务器发送完所有数据时发生(这仅在HTTP/1.0时正确,现在版本的HTTP使用另外一种结束策略)。
  5. 当我们收到了所有的分片,存储顺序是错误的,所以我们重新对分片排序和连接。

我们看看他如何工作:

1> B=socket_examples:nano_get_url().
<<"HTTP/1.0 302 Found\r\nLocation: http://www.google.se/\r\n
    Cache-Control: private\r\nSet-Cookie: PREF=ID=b57a2c:TM"...>>

Note

当运行 nano_get_url 时,结果是二进制的,而你看到的则是Erlang shell以便于阅读的方式展示的。当以便于阅读方式展示时,所有控制字符都是以转义格式显示。二进制数据也会被截短,后面以省略号显示。如果想要看所有的二进制数据,你可以通过 io:format 打印或者使用 string:tokens 分片显示:

2> io:format("~p~n",[B]).
<<"HTTP/1.0 302 Found\r\nLocation: http://www.google.se/\r\n
    Cache-Control: private\r\nSet-Cookie: PREF=ID=B57a2c:TM"
    TM=176575171639526:LM=1175441639526:S=gkfTrK6AFkybT3;
    ... 还有一大堆行 ...
>>
3> string:tokens(binary_to_list(B),"\r\n").
["HTTP/1.0 302 Found",
"Location: http://www.google.se/",
"Cache-Control: private",
... 还有一大堆行 ...

这就差不多WEB客户端的工作方式(不过在浏览器中正确的显示要做更多的工作)。上面的代码只是实验的开始。你可以尝试做一些修改来下载整个网站或者读取电子邮件。可能性是无限的。

注意分片的重新组装方式如下:

receive_data(Socket,SoFar) ->
    receive
        {tcp,Socket,Bin} ->
            receive_data(Socket,[Bin|SoFar]);
        {tcp_closed,Socket} ->
            list_to_binary(reverse(SoFar))
    end.

每当我们收到分片时,就把他们加入SoFar这个列表的头部。当收到了所有分片时,Socket就关闭了,我们颠倒顺序,并且把所有分片组合起来。

你可能以为收到分片后立即合并会好些:

receive_data(Socket,SoFar) ->
    receive
        {tcp,Socket,Bin} ->
            receive_data(Socket,list_to_binary([SoFar,Bin]));
        {tcp_closed,Socket} ->
            SoFar
    end.

这段代码是正确,但是效率比较低。因为后一种版本不断的把新的二进制数据加到缓冲区后面,也就是包含了多个数据的拷贝的。一个好办法是累积所有分片,尽管顺序是相反的,然后反序整个列表并一次连接所有分片。

1.2   一个简单的TCP服务器

在前一节,我们写了一个简单的客户端。现在我们写个服务器。

服务器开启2345端口然后等待一个消息。这个消息是包含Erlang术语的二进制数据,这个术语是包含Erlang表达式的字符串。服务器对该表达式求值并且将结果通过socket发到客户端。

Note

如何编写WEB服务器

编写WEB客户端或服务器是很有趣的。当然,有些人已经写好这些了,但是如果想要真正理解他们的工作原理,研究底层实现还是很有意义的。谁知道呢,说不定我们写的WEB服务器更好。所以我们看看如何做吧?

想要构建一个WEB服务器,任何一个需要实现标准的Internet协议,我们需要使用正确的工具和了解协议实现。

在我们的例子用来抓取一个WEB页,我们如何知道已经正确打开了80端口,并且如何知道已经发送了 GET / HTTP/1.0\r\n\r\n 到服务器?答案很简单。所有主要协议都已经在RFC文档中有描述。HTTP/1.0定义于RFC1945,所有RFC的官方网站是 http://www.letf.org

另外一个非常有用的方法是抓包。通过一个数据包嗅探器,我们可以抓取和分析所有IP数据包,无论是应用程序发出的还是接收的。大多数嗅探器包含解码器和分析器可以得出数据包的内容和格式。一个著名的嗅探器是Wireshark(以前叫Ethereal),可以到 http://www.wireshark.org/ 了解更多。

使用嗅探器和RFC武装起来的我们,就可以准备编写下一个杀手级应用了。

编写这个程序(或者其他使用TCP/IP的程序),需要响应一些简单的请求:

  • 数据如何组织,知道数据如何组成请求或者响应?
  • 数据在请求和响应中如何编码(encode & marshal)和解码(decode & demarshal)

TCP socket数据只是没有格式的字节流。在传输时,数据会切成任意长度的分片,所以我们需要多少数据如何组成请求或响应。

在Erlang的例子,我们使用简单的转换,把每个逻辑请求或响应前加上N(1/2/4)字节的长度数。这就是 {packet,N} (这里的packet表示一个应用程序请求或响应报文,而不是电线里面的物理包) 参数在 gen_tcp:connect 和 gen_tcp:listen 函数的意义。注意packet附带的那个参数在客户端和服务器端必须商定好。如果服务器使用 {packet,2} 而客户端使用 {packet,4} 则会出错。

在我们以 {packet,N} 选项打开socket后,我们就不需要担心数据分片了。Erlang驱动会自动确保数据报文的所有分片都收到并且长度正确时才发到应用程序。

另一个需要注意的是数据编码和解码。最简单时,我们可以用 term_to_binary 来对Erlang术语编码,并使用 binary_to_term 来解码数据。

注意,客户端和服务器通信的包转换和编码规则是两行代码完成,分别使用 {packet,4} 来打开socket和使用 term_to_binary 和其反函数完成编码和解码数据。

我们可以简单的打包和编码Erlang术语到基于文本的协议如HTTP或XML。使用Erlang的 term_to_binary 和其反函数可以比基于XML等文本的协议性能高出一个数量级。现在我们先看看一个简单的服务器:

start_nano_server() ->
    {ok,Listen}=gen_tcp:listen(2345,[binary,{packet,4},
                                            {reuseaddr,true},
                                            {active,true}]),
    {ok,Socket}=gen_tcp:accept(Listen),
    gen_tcp:close(Listen),
    loop(Socket).

loop(Socket) ->
    receive
        {tcp,Socket,Bin} ->
            io:format("Server received binary = ~p~n",[Bin]),
            Str=binary_to_term(Bin),
            io:format("Server (unpacked) ~p~n",[Str]),
            Reply=lib_misc:string2value(Str),
            io:format("Server replying = ~p~n",[Reply]),
            gen_tcp:send(Socket,term_to_binary(Reply)),
            loop(Socket);
        {tcp_closed,Socket} ->
            io:format("Server socket closed~n")
    end.

它如何工作?

  1. 首先,我们调用 gen_tcp:listen 来监听2345端口,并且设置报文转换格式为 {packet,4} ,意味着每个包有4个字节的包头,代表长度。然后 gen_tcp:listen(..) 会返回 {ok,Socket} 或者 {error,Why} ,但是我们先看看成功的情况。所以写下如下代码 {ok,Listen}=gen_tcp:listen(...), 这在程序返回 {error,…} 时发生匹配错误。如果成功则会绑定Listen到正在监听的socket。对于正在监听的socket,我们只需要做一件事,就是使用它做参数调用 gen_tcp:accept 。
  2. 现在我们调用 gen_tcp:accept(Listen) 。在这里,程序会挂起以等待连接。当我们获得连接时,这个函数返回已经绑定的Socket,这个socket就是可以与客户端连接并且可以通信的了。
  3. 当 gen_tcp:accept 返回,我们立即调用 gen_tcp:close(Listen) 。这就关闭了监听的socket,服务器也就不会继续接受新的连接了。而这不会影响已有的连接,只是针对新连接。
  4. 解码输入数据
  5. 对字符串求值
  6. 编码返回数据并且通过socket发送

注意,这个程序只接受一个请求,程序运行完成后就不会再接受其他请求了。

这是一个非常简单的服务器展示了如何打包和编码应用数据。接收请求,计算响应,发出响应,然后结束。

想要测试这个服务器,我们需要一个对应的客户端:

nano_client_eval(Str) ->
    {ok,Socket}=get_tcp:connect("localhost",2345,[binary,{packet,4}]),
    ok=gen_tcp:send(Socket,term_to_binary(Str)),
    receive
        {tcp,Socket,Bin} ->
            io:format("Client received binary = ~p~n",[Bin]),
            Val=binary_to_term(Bin),
            io:format("Client result = ~p~n",[Val]),
            gen_tcp:close(Socket)
    end.

想要测试你的代码,我们需要在一台机器上同时启动客户端和服务器。所以在 gen_tcp:connect 中的hostname参数就可以用硬编码的 localhost 。

注意客户端和服务器端使用的 term_to_binary 和 binary_to_term 怎样编码和解码数据。

想要运行,我们需要开两个终端然后启动Erlang shell。

首先,我们启动服务器:

1> socket_examples:start_nano_server().

我们看不到任何输出,当然什么也没发生呢。然后我们在另一个终端启动客户端,输入如下命令:

1> socket_examples:nano_client_eval("list_to_tuple([2+3*4,10+20])").

在服务器端的窗口尅看到如下输出:

Server received binary = <<131,107,0,28,108,105,115,116,95,116,
                           111,95,116,117,112,108,101,40,91,50,
                           43,51,42,52,44,49,48,43,50,48,93,41>>
Server (unpacked)  "list_to_tuple([2+3*4,10+20])"
Server replying = {14,30}

在客户端窗口可以看到:

Client received binary = <<131,104,2,97,14,97,30>>
Client result = {14,30}
ok

最后,在服务器窗口看到:

Server socket closed

1.3   改进服务器

前一节我们构造了一个服务器可以接受一个请求并且终止。简单的修改代码,我们可以就可以完成另一个不同类型的服务器:

  1. 序列服务器,同一时间只接受一个请求
  2. 并行服务器,同一时间可以接受多个请求

原始启动代码如下:

start_nano_server() ->
    {ok,Listen} = gen_tcp:listen(...),
    {ok,Socket} = gen_tcp:accept(Listen),
    loop(Socket).
...

我们将会以此为基础完成另外两种服务器。

1.4   序列服务器

想要构造序列服务器,我们如下改变了代码:

start_seq_server() ->
    {ok,Listen} = gen_tcp:listen(...),
    seq_loop(Listen).

seq_loop(Listen) ->
    {ok,Socket} = gen_tcp:accept(Listen),
    loop(Socket),
    seq_loop(Listen).

loop(...) -> %%同以前的一样处理

这会让以前的例子在需要多个请求时工作的很好。我们一直让监听的socket开着而不是关闭。另一个不同是在 loop(Socket) 结束后,我们再次调用 seq_loop(Listen) ,以便接受后面的连接请求。

如果客户端在服务器忙于处理一个已经存在的连接时尝试连接服务器,连接请求会被缓存,直到服务器完成那个已经存在的请求。如果缓存的连接数大于监听backlog,连接会被拒绝。

我们只是展示了如何开启服务器。关闭服务器很简单(停止并行服务器也一样),只要kill掉启动服务器的进程即可。 gen_tcp 连接他本身到控制进程,如果控制进程死掉了,他也就会关闭socket。

1.5   并行服务器

构造并行服务器的秘诀是每次接受新的连接以后马上用 spawn 立即生成一个新的进程:

start_parallel_server() ->
    {ok,Listen} = gen_tcp:listen(...),
    spawn(fun() -> par_connect(Listen) end).

par_connect(Listen) ->
    {ok,Socket} = gen_tcp:accept(Listen),
    spawn(fun() -> par_connect(Listen) end),
    loop(Socket).

loop(...) -> %%同上

这段代码与串行服务器类似。重要的不同在于 spawn ,可以确保为每个连接创建一个进程。现在可以对比两种服务器了,可以看到他是如何将一个串行服务器转变为并行服务器的。

所有这三种服务器调用 gen_tcp:listen 和 gen_tcp:accept 不同在于如何调用函数是并行方式还是串行方式。

Note

知识点:

  • 创建socket的进程(调用 gen_tcp:accept 或者 gen_tcp:connect )叫做这个socket的 控制进程 。所有来自于socket的消息都会被发送到控制进程;如果控制进程死掉了,对应的socket就会被关闭。可以修改一个socket的控制进程为NewPid,通过 gen_tcp:controlling_process(Socket,NewPid) 。

  • 我们的并行服务器可能会随着连接而创建数千个连接。我们可能希望限制最大并发连接数。这可以通过一个活动连接计数器来实现。每次获得一个新连接时就增加1,而在一个连接完成时减少1。可以用这种机制显示并发连接数。

  • 在我们接受连接后,最好明确的设置请求socket选项,如下:

    {ok,Socket} = gen_tcp:accept(Listen),
    inet:setopts(Socket,[{packet,4},binary,{nodelay,true},{active,true}]),
    loop(Socket)
  • 在Erlang R11B-3中,几个Erlang进程可以在同一个监听中的socket调用 gen_tcp:accept/1 。这也是一种建立并发服务器的简单方式,因为你尅拥有预分配的进程池,等待 gen_tcp:accept/1 。

2   控制问题

Erlang中socket可以以3种模式打开:active、active once、passive。其设置可以通过 gen_tcp:connect(Address,Port,Options) 或 gen_tcp:listen(Port,Options) 中的Options参数来设置为 {active,true|false|once} 。

如果指定了 {active,true} ,就会创建一个主动(active)的socket; {active,false} 会创建一个被动的(passive)的socket; {active,once} 创建主动的socket,但是只接受一条消息,接收到消息后,必须手动重新开启(reenable)才能继续接受消息。

我们看看在不同地方使用的区别。

active和passive的socket的区别在于消息到来时的处理方式:

  • 一旦一个active的socket被创建了,控制进程会发送收到的数据,以 {tcp,Socket,Data} 消息的形式。而控制进程无法控制消息流。一个无赖的客户端可以发送无数的消息到系统,而这些都会被发送到控制进程。而控制进程无法停止这个消息流。
  • 如果socket在passive模式,控制进程需要调用 gen_tcp:recv(Socket,N) 来获取数据。它会尝试获取N字节的数据,如果N=0,就会尽可能的拿到所有可以取得的数据。这种情况下,服务器尅通过选择是否调用 gen_tcp:recv 来控制消息流。

被动模式的socket用于控制发送到服务器的数据流。为了举例,我们可以以3种方式编写消息接收循环:

  • 主动消息获取(非阻塞)
  • 被动消息获取(阻塞)
  • 混合方式获取(部分阻塞)

2.1   主动消息获取(非阻塞)

第一个例子是以主动模式打开socket,然后接受来自socket的数据:

{ok,Listen} = gen_tcp:listen(Port,[...,{active,true}...]),
{ok,Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    receive
        {tcp,Socket,Data} ->
            ... 输出处理 ...
        {tcp_closed,Socket} ->
            ...
    end.

这个过程无法控制发到服务器循环的消息流,如果客户端产生数据的速度大于服务器消费数据的速度,系统就会收到洪水般地消息-消息缓冲区溢出,系统将会crash并表现怪异。

这种类型的服务器叫做非阻塞服务器,因为它无法阻塞客户端。我们仅在信任客户端的情况下才会使用非阻塞服务器。

2.2   被动消息获取(阻塞)

在这一节,我们写阻塞服务器:服务器以被动模式打开socket,通过 {active,false} 选项。这个服务器不会被危险的客户端洪水袭击。

服务器循环中的代码调用 gen_tcp:recv 来接收数据。客户端在服务器调用 recv 之前会被阻塞。注意OS会对客户端发来的数据做一下缓冲,以允许客户端在服务器调用 recv 之前仍然可以继续发送一小段数据。

{ok,Listen} = gen_tcp:listen(Port,[...,{active,false}...]),
{ok,Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    case gen_tcp:recv(Socket,N) of
        {ok,B} ->
            ... 数据处理 ...
            loop(Socket);
        {error,closed}
            ...
    end.

2.3   混合消息获取(部分阻塞)

你可能认为把被动模式用到所有服务器上都合适。不幸的是,当我们在被动模式时,我们只能等待来自于一个socket的数据。这对于需要等待多个socket来源数据的服务器则不适用。

幸运的是我们可以用混合方式,既不是阻塞的也不是非阻塞的。我们以一次主动(active once)模式 {active,once} 打开socket。在这个模式中,socket是主动的,但是只能接收一条消息。在控制进程发出一条消息之后,他必须明确的调用 inet:setopts 以便让socket恢复并接收下一条消息。系统在这发生之前会一直阻塞。这是两种世界的最好结合点。如下是代码:

{ok,Listen} = gen_tcp:listen(Port,[...,{active,once}...]),
{ok,Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    receive
        {tcp,Socket,Data} ->
            ... 数据处理 ...
            %%准备好启用下一条消息时
            inet:setopts(Socket,[{active,once}]),
            loop(Socket);
        {tcp_closed,Socket} ->
            ...
    end.

使用 {active,once} 选项,用户可以实现高层次的数据流控制(有时叫交通管制),同时又防止了服务器被过多的消息洪水所淹没。

3   连接从哪里来?

假设我们建立了某种在线服务器,而却一直有人发垃圾邮件。那么我们需要做的第一件事就是确定他的来源。想要发现这件事,我们可以使用 inet:peername(Socket) 。

@spec inet:peername(Socket) -> {ok,{IP_Address,Port}} | {error,Why}

返回连接另一端的IP地址和端口号,以便服务器找到对方的地址。IP_Address是一个元组的整数形如 {N1,N2,N3,N4} ,而 {K1,K2,K3,K4,K5,K6,K7,K8} 则是IPv6的地址。这里的整数取值范围是0到255。

4   Socket的错误处理

socket的错误处理是非常简单的,基本上你不需要做任何事情。犹如前面讲过的,每个socket拥有控制进程(就是创建socket的进程)。如果控制进程死掉了,那么socket也会自动关闭。

这意味着,如果我们有,例如,一个客户端和服务器,而服务器因为编程错误死掉了。那么服务器拥有的socket会自动关闭,而客户端会收到 {tcp_closed,Socket} 消息。

我们可以通过如下的小程序测试这个机制:

error_test() ->
    spawn(fun() -> error_test_server() end),
    lib_misc:sleep(2000),
    {ok,Socket} = gen_tcp:connect("localhost",4321,[binary,{packet,2}]),
    io:format("connected to: ~p~n",[Socket]),
    gen_tcp:send(Socket,<<"123">>),
    receive
        Any ->
            io:format("Any=~p~n",[Any])
    end.

error_test_server() ->
    {ok,Listen} = gen_tcp:listen(4321,[binary,{packet,2}]),
    {ok,Socket} = gen_tcp:accept(Listen),
    error_test_server_loop(Socket).

error_test_server_loop(Socket) ->
    receive
        {tcp,Socket,Data} ->
            io:format("received:~p~n",[Data]),
            atom_to_list(Data),
            error_test_server_loop(Socket)
    end.

当我们运行它时,会看到如下输出:

1> socket_examples:error_test().
connected to:#Port<0.152>
received:<<"123">>
=ERROR REPORT==== 9-Feb-2007::15:18:15 ===
Error in process <0.77.0> with exit value:
 {badarg,[{erlang,atom_to_list,[<<3 bytes>>]},
 {socket_examples,error_test_server_loop,1}]}
Any={tcp_closed,#Port<0.152>}
ok

我们生成了一个服务器,并让它每两秒有一次启动的机会,并且发送包含二进制数据 <<"123">> 的消息。当这个消息到达服务器时,服务器会尝试计算 atom_to_list(Data) ,因为Data是一个二进制数据,所以会立即出错(系统监控器会发现并显示错误)。现在服务器端的控制进程已经死掉了,而且其socket也关闭了。客户端会收到 {tcp_closed,Socket} 消息。

5   UDP

现在我们看看UDP协议(User Datagram Protocol,用户数据报协议)。使用UDP,互联网上的机器之间可以互相发送小段的数据,叫做数据报。UDP数据报是不可靠的,这意味着如果客户端发送一系列的UDP数据报到服务器,收到的数据报顺序可能是错误的。不过收到的数据报肯定是正确的。大的数据报会被分为多个小的分片,IP协议负责重新组装这些分片,并最终交付给应用。

UDP是无连接的协议,这意味着客户端无需连接服务器即可发送消息。这也意味着程序更加适于大量客户端收发小的消息报文。

在Erlang中编写UDP客户端和服务器比TCP时更简单,因为我们无需管理连接。

5.1   简单的UDP服务器和客户端

首先,我们看看服务器,一个通用的服务器样式如下:

server(Port) ->
    {ok,Socket} = gen_udp:open(Port,[binary]),
    loop(Socket).

loop(Socket) ->
    receive
        {udp,Socket,Host,Port,Bin} ->
            BinReply = ... ,
            gen_udp:send(Socket,Host,Port,BinReply),
            loop(Socket)
    end.

这里比TCP协议的例子更简单,因为我们至少不需要关心连接关闭的消息。注意我们以二进制方式打开socket,驱动也会以二进制数据的形式将报文发送到应用。

注意客户端。这里有个简单的客户端。它仅仅打开UDP socket,发送消息到服务器,等待响应(或超时),然后关闭socket并返回从服务器接收到的值。

client(Request) ->
    {ok,Socket} = gen_udp:open(0,[binary]),
    ok = gen_udp:send(Socket,"localhost",4000,Request),
    Value = receive
                {udp,Socket,_,_,Bin} ->
                    {ok,Bin}
                after 2000 ->
                    error
                end,
    gen_udp:close(Socket),
    Value

我们必须拥有一个超时,否则UDP的不可靠会让我们永远得不到响应。

5.2   一个UDP阶乘服务器

我们可以很容易的构造一个UDP的阶乘服务器。代码模仿前一节。

-module(upd_test).
-export([start_server/0,client/1]).

start_server() ->
    spawn(fun() -> server(4000) end).

%% 服务器
server(Port) ->
    {ok,Socket}=gen_udp:open(Port,,[binary]),
    io:format("server opened socket:~p~n",[Socket]),
    loop(Socket).

loop(Socket) ->
    receive
        {udp,Socket,Host,Port,Bin} =Msg ->
            io:format("server received:~p~n",[Msg]),
            N=binary_to_term(Bin),
            Fac=fac(N),
            gen_udp:send(Socket,Host,Port,term_to_binary(Fac)),
            loop(Socket)
    end.

fac(0) -> 1;
fac(N) -> N*fac(N-1).

%% 客户端
client(N) ->
    {ok,Socket} = gen_upd:open(0,[binary]),
    io:format("client opened socket=~p~n",[Socket]),
    ok=gen_udp:send(Socket,"localhost",4000,term_to_binary(N)),
    Value=receive
        {udp,Socket,_,_,Bin}=Msg ->
            io:format("client received:~p~n",[Msg]),
            binary_to_term(Bin)
        after 2000 ->
            0
        end,
    gen_udp:close(Socket),
    Value

注意我增加了一些打印语句,所以我们可以看到程序执行的过程。我一般是开发阶段加很多打印语句,而在工作正常后就注释掉了。

现在让我们运行例子,首先启动服务器:

1> udp_test:start_server().
server opened socket:#Port<0.106>
<0.34.0>

这会在后台运行,所以我们发出一个客户端请求:

2> udp_test:client(40).
client opened socket=#Port<0.105>
server received:{udp,#Port<0.106>,{127,0,0,1},32785,<<131,97,40>>}
client received:{udp,#Port<0.105>,
                  {127,0,0,1},4000,
                  <<131,110,20,0,0,0,0,0,64,37,5,255,
                    100,222,15,8,126,242,199,132,27,
                    232,234,142>>}
815915283247897734345611269596115894272000000000

5.3   UDP的附加注释

我们必须注意的是UDP是无连接的协议,也就四海服务器无法拒绝客户端发送数据,甚至不知道客户端是谁。

大个的UDP报文会被切分成多个分片分别在网络上传输。分片发生在数据报长度大于最大传输单元(MTU)时,以确保通过路由器等网络设备以后仍然可以到达。一般的测量方法是开始于一个足够小的包(比如500字节),然后逐渐增加,直到发现MTU为止。如果在某一点发现数据报被丢弃了,那么,你就直到可以传输的最大报文长度了。

一个UDP数据报可以被传输两次,所以你必须小心的编码以防备这个事。因为他可能会对同一个请求的第二次出现而再做一次响应。想要防止,我们可以修改客户端代码来在每个请求中加一个唯一引用,并且检查响应中的这个唯一引用。想要生成一个唯一引用,我们可以用Erlang BIF的 make_ref ,就会生成一个全局唯一引用。远程过程调用现在可以这样写:

client(Request) ->
    {ok,Socket} = gen_udp:open(0,[binary]),
    Ref=make_ref(),
    B1=term_to_binary({Ref,Request}),
    ok=gen_udp:send(Socket,"localhost",4000,B1),
    wait_for_ref(Socket,Ref).

wait_for_ref(Socket,Ref) ->
    receive
        {udp,Socket,_,_,Bin} ->
            case binary_to_term(Bin) of
                {Ref,Val} ->
                    Val;
                {_SomeOtherRef,_} ->
                    wait_for_ref(Socket,Ref)
            end;
    after 1000 ->
        ...
    end.

6   向多台计算机广播

这里会展示如何构造一个广播通道。这些情况用到的很少,不过也许某天你可能会用到。

-module(broadcast).
-compile(export_all).

send(IoList) ->
    case inet:ifget("eth0",[broadaddr]) of
        {ok,[{broadaddr,Ip}]} ->
            {ok,S}=gen_udp:open(5010,[{broadcast,true}]),
            gen_udp:send(S,Ip,6000,IoList),
            gen_udp:close(S);
        _->
            io:format("Bad interface name, or\n"
                        "broadcasting not supported\n")
    end.

listen() ->
    {ok,S}=gen_udp:open(6000),
    loop(S).

loop(S) ->
    receive
        Any ->
            io:format("received:~p~n",[Any]),
            loop(S)
    end.

这里我们需要两个端口,一个用于发送广播,而另一个用来监听响应。我们选择5010用于发送请求而6000用于监听广播(这两个端口号没有任何联系,可以随便选择)。

一旦进程开启了端口5010用于发送广播,所有在网络里面的机器都可以调用 broadcast:listen() ,来打开端口6000监听广播消息。

broadcast:send(IoList) 广播 IoList 到所有本地网络的机器。

Note

为了实现这个,网络接口必须是正确的,而且必须支持广播。在我的iMac,例如,我使用了名字”en0″代替了”eth0″。同时注意,如果机器在不同的子网上面监听UDP报文,那么也无法送达,因为缺省的路由会丢弃UDP广播。

7   一个SHOUT广播服务器

作为本章的结束,我将会使用一个新的技巧来编写SHOUTcast服务器。SHOUTcast是由folks在Nullsoft开发的音频流数据协议( http://www.shoutcast.com/ )。SHOUTcast使用HTTP发送MP3-或者AAC-编码的音频数据。

想要看看他们如何工作,我们首先看看SHOUTcast协议。然后我们会看看整个服务器的结构。最后是代码。

7.1   SHOUTcast协议

SHOUTcast协议非常简单:

  1. 首先,客户端(如xmms、winamp或iTunes)发送HTTP请求到SHOUTcast服务器。这里有个xmms生成的请求:

    GET / HTTP/1.1
    Host: localhost
    User-Agent: xmms/1.2.10
    Icy-MetaData: 1
  2. 我的SHOUTcast服务器响应如下:

    ICY 200 OK
    icy-notice1: <BR>This stream requires <a href=http://www.winamp.com/>;Winamp</a><BR>
    icy-notice2: Erlang Shoutcast server<BR>
    icy-name: Erlang mix
    icy-genre: Pop Top 40 Dance Rock
    icy-url: http://localhost:3000
    content-type: audio/mpeg
    icy-pub: 1
    icy-metaint:24576
    icy-br: 96
    ... data ...
  3. 现在SHOUTcast服务器可以发送持续的数据流了,有如如下结构:

    FHFHFHF ...

F是正好是24576字节的MP3音乐数据(通过 icy-metaint 参数指定)。H是数据头块,由单一字节的K,随后加上16*k的数据组成。这样,最小的数据头块就是 <<0>> 。下一个数据头块则是:

<<1,B1,B2,...,B16>>

数据部分的内容则是字符串形式的 StreamTitle='...';StreamUrl='http://...'; ,剩余空间用0补满。

7.2   SHOUTcast服务器如何工作

构造服务器必须注意如下细节:

  1. 制作播放列表。我们的服务器使用一个文件来做,参见13.2节,读取ID3标签在232页。随即选取播放列表中的文件。
  2. 制作一个并行服务器,可以同时供应多个数据流。使用14.1节的并行服务器。
  3. 对每个音频文件,只发送音频数据而不发送内嵌的ID3标签。移除标签,使用 id3_tag_lengths ,已经在13.2节开发完成了。读取ID3标签在5.3节。寻找同步帧在92页。这些代码没有在这里。

7.3   SHOUTcast服务器伪码

看整个程序之前先看看不包含细节的工作流程:

start_parallel_server(Port) ->
    {ok,Listen}=gen_tcp:listen(Port,...),
    %%创建歌曲服务器
    PidSongServer=spawn(fun() -> songs() end),
    spawn(fun() -> par_connect(Listen,PidSongServer) end).

%%对每个连接生成一个进程
par_connect(Listen,PidSongServer) ->
    {ok,Socket}=gen_tcp:accept(Listen),
    spawn(fun() -> par_connect(Listen,PidSongServer) end),
    inet:setopts(Socket,[{packet,0},binary,{nodelay,true},
                         {active,true}]),
    get_request(Socket,PidSongServer,[]).

%%等待TCP请求
gen_request(Socket,PidSongServer,L) ->
    receive
        {tcp,Socket,Bin} ->
            ... 如果请求未完成则继续等待请求
            .... got_request(Data,Socket,PidSongServer)
        {tcp_closed,Socket} ->
            ... 在客户端中断时发生
    end.

%%获取了请求,发送响应
got_request(Data,Socket,PidSongServer) ->
    ... 分析请求数据
    gen_tcp:send(Socket,[response()]).
    play_songs(Socket,PidSongServer).

%%持续发送歌曲到客户端
play_songs(Socket,PidSongServer) ->
    ... PidSongServer维护着所有MP3文件的列表
    Song=rpc(PidSongServer,random_song),
    Header=make_header(Song),
    {ok,S}=file:open(File,[read,binary,raw]),
    send_file(1,S,Header,1,Socket),
    file:close(S),
    play_songs(Socket,PidSongServer).

send_file(K,S,Header,OffSet,Socket) ->
    ... 发送文件块到客户端
    ... 全部发送完成时返回

如果查看真实代码,你可以看到细节只有略微不同而已,机制是相同的,如下是完整代码。

-module(shout).

%% 在一个窗口中 > shout:start().
%% 另一个窗口 xmms http://localhost:3000/stream

-export([start/0]).
-import(lists,[map/2,reverse/1]).

-define(CHUNKSIZE,24576).

start() ->
    spawn(fun() ->
            start_parallel_server(3000),
            %% 现在开始睡眠,否则等待的socket会被关闭
            lib_misc:sleep(infinity)
        end).

start_parallel_server(Port) ->
    {ok,Listen}=gen_tcp:listen(Port,[binary,{packet,0},
            {reuseaddr,true},
            {active,true}]),
    PidSongServer=spawn(fun() -> songs() end),
    spawn(fun() -> par_connect(Listen,PidSongServer) end).

par_connect(Listen,PidSongServer) ->
    {ok,Socket}=gen_tcp:accept(Listen),
    spawn(fun() -> par_connect(Listen,PidSongServer) end),
    inet:setopts(Socket,[{packet,0},binary,{nodelay,true},{active,true}]),
    get_request(Socket,PidSongServer,[]).

get_request(Socket,PidSongServer,L) ->
    receive
        {tcp,Socket,Bin} ->
            L1=L ++ binary_to_list(Bin),
            %% 切割请求头检查
            case split(L1,[]) of
                more ->
                    %% 请求头未完成,需要更多数据
                    get_request(Socket,PidSongServer,L1);
                {Request,_Rest} ->
                    %% 头部完成
                    got_request_from_client(Request,Socket,PidSongServer)
            end;
        {tcp_closed,Socket} ->
            void;
        _Any ->
            %% 跳过
            get_request(Socket,PidSongServer,L)
    end.

split("\r\n\r\n" ++ T,L) -> {reverse(L),T};
split([H|T],L) -> split(T,[H|L]);
split([],_) -> more.

got_request_from_client(Request,Socket,PidSongServer) ->
    Cmds=string:tokens(Request,"\r\n"),
    Cmds1=map(fun(I) -> string:tokens(I," ") end,Cmds),
    is_request_for_stream(Cmds1),
    get_tcp:send(Socket,[response()]),
    play_songs(Socket,PidSongServer,<<>>).

play_songs(Socket,PidSongServer,SoFar) ->
    Song=rpc(PidSongServer,random_song),
    {File,PrintStr,Header}=unpack_song_descriptor(Song),
    case id3_tag_lengths:file(File) of
        error ->
            play_songs(Socket,PidSongServer,SoFar);
        {Start,Stop} ->
            io:format("Playing: ~p~n",[PrintStr]),
            {ok,S}=file:open(File,[read,binary,raw]),
            SoFar1=send_file(S,{0,Header},Start,Stop,Socket,SoFar),
            file:close(S),
            play_songs(Socket,PidSongServer,SoFar1)
    end.

send_file(S,Header,OffSet,Stop,Socket,SoFar) ->
    Need=?CHUNKSIZE - size(SoFar),
    Last=OffSet+Need,
    if
        Last>=Stop ->
            Max=Stop-OffSet,
            {ok,Bin}=file:pread(S,OffSet,Max),
            list_to_binary([SoFar,Bin]);
        true ->
            {ok,Bin}=file:pread(S,OffSet,Need),
            write_data(Socket,SoFar,Bin,Header),
            send_file(S,bump(Header),OffSet+Need,Stop,Socket,<<>>)
    end.

Warning

剩余代码主要是文件操作和socket发送数据部分,意义已经不大了。所以略掉。

7.4   运行SHOUTcast服务器

想要运行测试,我们需要做如下三步:

  1. 生成播放列表
  2. 开启服务器
  3. 用客户端连接服务器

7.5   构造播放列表

构造播放列表需要完成如下三步:

  1. 转到代码的目录

  2. 编辑函数 start1 的启动路径到文件 mp3_manager.erl 以便指向包含MP3文件的根目录。

  3. 编译 mp3_manager ,并执行 mp3_manager:start1() ,你可以看到如下输出:

    1> c(mp3_manager).
    {ok,mp3_manager}
    2> mp3_manager:start1().
    Dumping term to mp3data
    ok

如果你有兴趣,你可以看看文件 mp3data 来看分析结果。

7.6   启动SHOUTcast服务器

通过shell命令启动服务器:

1> shout:start().
...

7.7   测试服务器

  1. 到另外一个窗口启动音频播放器,定位到 http://localhost:3000
  2. 查看诊断输出。
  3. 享受音乐

8   深入挖掘

本章只是讲解了操作Socket的基本操作。你可以查阅更多关于Socket接口的文档,包括 gen_tcpgen_udpinet