3.并发编程 | 3. Concurrent Programming

3 并发编程

3.1进程

使用Erlang而不是其他功能语言的主要原因之一是Erlang处理并发和分布式编程的能力。并发是指可以同时处理多个执行线程的程序。例如,现代操作系统允许您使用同时运行的文字处理器,电子表格,邮件客户端和打印作业。系统中的每个处理器(CPU)可能一次只能处理一个线程(或作业),但它以这样的速率在作业之间交换,使得它们同时运行它们的幻觉。在Erlang程序中创建并行执行线程并允许这些线程相互通信很容易。在Erlang中,每个执行线程都称为进程

(另外:当执行的线程彼此没有共享数据时,通常使用术语“进程”,当他们以某种方式共享数据时使用术语“线程”。Erlang中的执行线程不共享数据,这就是为什么它们是称为进程)。

Erlang BIF spawn被用来创建一个新的进程:spawn(Module, Exported_Function, List of Arguments)。考虑以下模块:

-module(tut14). -export([start/0, say_something/2]). say_something(What, 0) -> done; say_something(What, Times) -> io:format("~p~n", [What]), say_something(What, Times - 1). start() -> spawn(tut14, say_something, [hello, 3]), spawn(tut14, say_something, [goodbye, 3]).

5> c(tut14). {ok,tut14} 6> tut14:say_something(hello, 3). hello hello hello done

如图所示,该函数say_something将其第一个参数写入由第二个参数指定的次数。该函数start启动两个Erlang进程,一个写三个“hello”,一个写“再见”三个。两个进程都使用该功能say_something。请注意,以这种方式spawn用于启动进程的函数必须从模块中导出(即在模块-export的开头)。

9> tut14:start(). hello goodbye <0.63.0> hello goodbye hello goodbye

注意它没有三次写“你好”,然后三次写“再见”。相反,第一个过程写了一个“你好”,第二个过程是“再见”,第一个过程是“你好”等等。但是<0.63.0>来自哪里?函数的返回值是函数中最后一个“事物”的返回值。函数中的最后一件事start是

spawn(tut14, say_something, [goodbye, 3]).

spawn返回一个进程标识符或pid,它唯一标识进程。所以<0.63.0>是spawn上面函数调用的pid 。下一个示例显示如何使用pids。

还要注意〜p被用来代替〜w in io:format。引用手册:“〜p以〜w的方式用标准语法写入数据,但将打印表示长于一行的术语拆分为许多行并合理地缩进每行,并尝试检测可打印列表字符并将它们输出为字符串“。

3.2消息传递

在下面的例子中,创建了两个进程,并且它们多次向对方发送消息。

-module(tut15). -export([start/0, ping/2, pong/0]). ping(0, Pong_PID) -> Pong_PID ! finished, io:format("ping finished~n", [] ping(N, Pong_PID) -> Pong_PID ! {ping, self()}, receive pong -> io:format("Ping received pong~n", []) end, ping(N - 1, Pong_PID). pong() -> receive finished -> io:format("Pong finished~n", [] {ping, Ping_PID} -> io:format("Pong received ping~n", []), Ping_PID ! pong, pong() end. start() -> Pong_PID = spawn(tut15, pong, []), spawn(tut15, ping, [3, Pong_PID]).

1> c(tut15). {ok,tut15} 2> tut15: start(). <0.36.0> Pong received ping Ping received pong Pong received ping Ping received pong Pong received ping Ping received pong ping finished Pong finished

该函数start首先创建一个进程,让我们称之为“pong”:

Pong_PID = spawn(tut15, pong, [])

此过程执行tut15:pong()Pong_PID是“乒乓”过程的过程身份。该函数start现在创建另一个进程“ping”:

spawn(tut15, ping, [3, Pong_PID]),

此过程执行:

tut15:ping(3, Pong_PID)

<0.36.0>是start功能。

这个过程“pong”现在做到:

receive finished -> io:format("Pong finished~n", [] {ping, Ping_PID} -> io:format("Pong received ping~n", []), Ping_PID ! pong, pong() end.

receive构造用于允许进程等待来自其他进程的消息。它的格式如下:

receive pattern1 -> actions1; pattern2 -> actions2; .... patternN actionsN end.

注意没有“;” 之前end

Erlang进程之间的消息只是有效的Erlang术语。也就是说,它们可以是列表,元组,整数,原子,pid等等。

每个进程都有自己的输入队列,用于接收消息。收到的新消息放在队列的末尾。当一个进程执行a时receive,队列中的第一条消息将与第一条消息匹配receive。如果匹配,消息将从队列中移除,并执行与该模式相对应的操作。

但是,如果第一个模式不匹配,则会测试第二个模式。如果匹配,消息将从队列中移除,并执行与第二个模式相对应的操作。如果第二个模式不匹配,则第三个模式将被尝试,直到没有更多模式要测试为止。如果没有更多模式要测试,则第一条消息将保留在队列中,而第二条消息则会尝试使用。如果这匹配任何模式,则会执行相应的操作,并将第二条消息从队列中删除(将第一条消息和其他消息保留在队列中)。如果第二条消息不匹配,则尝试第三条消息,依此类推,直到达到队列末尾。如果到达队列的末尾,

Erlang的实现是“clever”,并且最大限度地减少了每个消息针对每个消息的模式进行测试的次数receive

现在回到乒乓球的例子。

“Pong”正在等待消息。如果finished接收到原子,“pong”将“Pong finished”写入输出,并且因为它没有其他要做的事情,所以终止。如果它收到格式为:

{ping, Ping_PID}

它将“Pong received ping”写入输出并将原子发送pong到进程“ping”:

Ping_PID ! pong

注意运算符“!” 用于发送消息。“!”的语法 是:

Pid ! Message

也就是说,Message(任何Erlang术语)都以身份被发送到流程Pid

在将消息发送pong到进程“ping”之后,“pongpong再次调用该函数,这导致它重新返回receive并等待另一个消息。

现在让我们看看“ping”过程。回想一下,它是通过执行:

tut15:ping(3, Pong_PID)

查看函数ping/2ping/2由于第一个参数的值是3(不是0)(第一个子句的头是ping(0,Pong_PID),第二个子句的头是ping(N,Pong_PID),所以N变成了3),所以执行第二个子句。

第二个条款发送消息给“pong”:

Pong_PID ! {ping, self()},

self()返回执行进程的pid self(),在这种情况下是“ping”的pid。(回想一下“pong”的代码,它Ping_PIDreceive前面解释的变量中出现)。

“Ping”现在等待“pong”的回复:

receive pong -> io:format("Ping received pong~n", []) end,

当这个回复到达时,它写入“Ping pong”,然后“pingping再次调用该函数。

ping(N - 1, Pong_PID)

N-1会导致第一个参数递减,直到它变为0.发生这种情况时,ping/2会执行第一个子句:

ping(0, Pong_PID) -> Pong_PID ! finished, io:format("ping finished~n", []

原子finished被发送到“pong”(导致它如上所述终止)并且“ping完成”被写入输出。“Ping”然后终止,因为它没有什么可做的。

3.3注册进程名

在上面的例子中,“乒乓球”首先被创建,以便能够在“乒乓”开始时给出“乒乓球”的身份。也就是说,在某种程度上,“ping”必须能够知道“pong”的身份才能够向其发送消息。有时需要了解对方身份的流程是彼此独立开始的。因此,Erlang为进程提供了一个名称机制,以便这些名称可以用作身份而不是pids。这是通过使用registerBIF 完成的:

register(some_atom, Pid)

现在让我们用这个重写乒乓示例,并将名称命名pong为“pong”过程:

-module(tut16). -export([start/0, ping/1, pong/0]). ping(0) -> pong ! finished, io:format("ping finished~n", [] ping(N) -> pong ! {ping, self()}, receive pong -> io:format("Ping received pong~n", []) end, ping(N - 1). pong() -> receive finished -> io:format("Pong finished~n", [] {ping, Ping_PID} -> io:format("Pong received ping~n", []), Ping_PID ! pong, pong() end. start() -> register(pong, spawn(tut16, pong, [])), spawn(tut16, ping, [3]).

2> c(tut16). {ok, tut16} 3> tut16:start(). <0.38.0> Pong received ping Ping received pong Pong received ping Ping received pong Pong received ping Ping received pong ping finished Pong finished

这里的start/0功能,

register(pong, spawn(tut16, pong, [])),

都产生了“pong”过程并给出了它的名字pong。在“ping”过程中,消息可以通过以下方式发送pong

pong ! {ping, self()},

ping/2现在变成ping/1作为论据Pong_PID不需要。

3.4分布式编程

让我们在不同的计算机上用“ping”和“pong”重写乒乓程序。首先需要设置几件事才能使其发挥作用。分布式Erlang实现提供了一种非常基本的身份验证机制,以防止无意访问另一台计算机上的Erlang系统。与彼此交谈的Erlang系统必须具有相同的魔法cookie。实现这一目标的最简单方法是.erlang.cookie在您要运行Erlang系统的所有计算机上的主目录中调用一个文件,以便相互之间进行通信:

  • 在Windows系统上,主目录是由环境变量$HOME指出的目录,您可能需要设置这个目录。

  • 在Linux或UNIX上,您可以放心地忽略这一点,只需.erlang.cookiecd没有任何参数的情况下执行命令后在目录中创建一个名为的文件即可。

.erlang.cookie文件将包含具有相同原子的行。例如,在Linux或UNIX上,在OS shell中:

$ cd $ cat > .erlang.cookie this_is_very_secret $ chmod 400 .erlang.cookie

chmod上述使得.erlang.cookie只能由文件的所有者访问的文件。这是一项要求。

当你启动一个要与其他Erlang系统交流的Erlang系统时,你必须给它一个名字,例如:

$ erl -sname my_name

稍后我们会看到更多细节。如果你想试验分布式的Erlang,但你只有一台计算机可以工作,你可以在同一台计算机上启动两个独立的Erlang系统,但给它们不同的名称。在计算机上运行的每个Erlang系统称为Erlang节点

(注意:erl -sname假设所有节点都在同一个IP域中,并且我们只能使用IP地址的第一个组成部分,如果我们想要使用我们使用的不同域中的节点-name,但是必须全部给出所有IP地址。 )

下面是修改为在两个独立节点上运行的乒乓示例:

-module(tut17). -export([start_ping/1, start_pong/0, ping/2, pong/0]). ping(0, Pong_Node) -> {pong, Pong_Node} ! finished, io:format("ping finished~n", [] ping(N, Pong_Node) -> {pong, Pong_Node} ! {ping, self()}, receive pong -> io:format("Ping received pong~n", []) end, ping(N - 1, Pong_Node). pong() -> receive finished -> io:format("Pong finished~n", [] {ping, Ping_PID} -> io:format("Pong received ping~n", []), Ping_PID ! pong, pong() end. start_pong() -> register(pong, spawn(tut17, pong, [])). start_ping(Pong_Node) -> spawn(tut17, ping, [3, Pong_Node]).

让我们假设有两台计算机叫做gollum和kosken。首先在kosken上启动节点,称为ping,然后是gollum上的节点,称为pong。

On kosken(是Linux/UNIX系统):

kosken> erl -sname ping Erlang (BEAM) emulator version 5.2.3.7 [hipe] [threads:0] Eshell V5.2.3.7 (abort with ^G) (ping@kosken)1>

关于Gollum:

gollum> erl -sname pong Erlang (BEAM) emulator version 5.2.3.7 [hipe] [threads:0] Eshell V5.2.3.7 (abort with ^G) (pong@gollum)1>

现在,gollum上的“pong”过程开始了:

(pong@gollum)1> tut17:start_pong(). true

并开始kosken上的“ping”过程(从上面的代码中可以看到,该start_ping函数的参数是运行“pong”的Erlang系统的节点名称):

(ping@kosken)1> tut17:start_ping(pong@gollum). <0.37.0> Ping received pong Ping received pong Ping received pong ping finished

如所示,乒乓程序已经运行。在“pong”方面:

(pong@gollum)2> Pong received ping Pong received ping Pong received ping Pong finished (pong@gollum)2>

tut17代码,你会发现pong函数本身没有改变,下面的代码行以相同的方式工作,不管在哪个节点上执行“ping”进程:

{ping, Ping_PID} -> io:format("Pong received ping~n", []), Ping_PID ! pong,

因此,Erlang pids包含有关进程在哪里执行的信息。所以如果你知道一个进程的PID,那么“!” 运算符可用于向其发送消息,无论进程位于同一节点还是不同节点上。

不同之处在于消息如何发送到另一个节点上的注册进程:

{pong, Pong_Node} ! {ping, self()},

一个元组{registered_name,node_name}被用来代替仅仅registered_name

在前面的例子中,“ping”和“pong”是从两个独立的Erlang节点的shell中启动的。spawn也可以用来启动其他节点中的进程。

下一个例子是乒乓程序,但是这次“ping”是在另一个节点中启动的:

-module(tut18). -export([start/1, ping/2, pong/0]). ping(0, Pong_Node) -> {pong, Pong_Node} ! finished, io:format("ping finished~n", [] ping(N, Pong_Node) -> {pong, Pong_Node} ! {ping, self()}, receive pong -> io:format("Ping received pong~n", []) end, ping(N - 1, Pong_Node). pong() -> receive finished -> io:format("Pong finished~n", [] {ping, Ping_PID} -> io:format("Pong received ping~n", []), Ping_PID ! pong, pong() end. start(Ping_Node) -> register(pong, spawn(tut18, pong, [])), spawn(Ping_Node, tut18, ping, [3, node()]).

假设一个称为ping的Erlang系统(但不是“ping”进程)已经在kosken上启动,那么在gollum上,这是完成的:

(pong@gollum)1> tut18:start(ping@kosken). <3934.39.0> Pong received ping Ping received pong Pong received ping Ping received pong Pong received ping Ping received pong Pong finished ping finished

请注意,所有的输出都是在gollum上收到的。这是因为I / O系统发现进程从哪里产生并在那里发送所有输出。

3.5更大的例子

下面是一个简单的“messenger”的更大的例子。信使是一个程序,允许用户登录不同的节点并相互发送简单的消息。

在开始之前,注意以下几点:

  • 这个例子只显示消息传递逻辑-没有尝试提供一个良好的图形用户界面,虽然这也可以在Erlang。

  • 这种问题可以通过使用OTP中的工具来解决,这些工具还提供了即时更新代码的方法等(请参阅参考资料OTP Design Principles)。

  • 第一个程序包含处理消失的节点的一些不足之处。这些在更高版本的程序中得到纠正。

  • 配置server_node()功能。

  • 将编译后的代码(messenger.beam)复制到您启动Erlang的每台计算机上的目录中。

在以下使用此程序的示例中,节点在四台不同的计算机上启动。如果网络上没有可用的机器,则可以在同一台机器上启动多个节点。

四个Erlang节点启动:messenger @ super,c1 @ bilbo,c2 @ kosken,c3 @ gollum。

首先启动messenger @ super的服务器:

(messenger@super)1> messenger:start_server(). true

现在Peter登录c1@Bilbo:

(c1@bilbo)1> messenger:logon(peter). true logged_on

James 登录c2 @kosken:

(c2@kosken)1> messenger:logon(james). true logged_on

Fred在c3@Gollum登录:

(c3@gollum)1> messenger:logon(fred). true logged_on

现在Peter给Fred发了一条消息

(c1@bilbo)2> messenger:message(fred, "hello"). ok sent

Fred收到消息并向Peter发送消息并注销:

Message from peter: "hello" (c3@gollum)2> messenger:message(peter, "go away, I'm busy"). ok sent (c3@gollum)3> messenger:logoff(). logoff

James现在试图向Fred发送一条消息:

(c2@kosken)2> messenger:message(fred, "peter doesn't like you"). ok receiver_not_found

但是,由于Fred已经注销,这失败了。

首先,让我们看一下引入的一些新概念。

有两个版本的server_transfer函数:一个带有四个参数(server_transfer/4),另一个带有五个(server_transfer/5)。这些被Erlang认为是两个独立的功能。

注意如何编写该server函数,以便通过调用它自己,server(User_List)从而创建一个循环。Erlang编译器是“聪明的”并且优化了代码,所以这真的是一种循环,而不是一个正确的函数调用。但是这只有在通话结束后没有代码才有效。否则,编译器希望该调用返回并进行正确的函数调用。这会导致每个循环的过程变得越来越大。

lists使用模块中的功能。这是一个非常有用的模块,推荐使用手册页的研究(erl -man lists)。lists:keymember(Key,Position,Lists)通过元组列表查看并查看Position每个元组,以查看它是否与Key。第一个元素是位置1.如果它找到元素at所在的元组Position相同Key,则返回true,否则返回false

3> lists:keymember(a, 2, [{x,y,z},{b,b,b},{b,a,c},{q,r,s}]). true 4> lists:keymember(p, 2, [{x,y,z},{b,b,b},{b,a,c},{q,r,s}]). false

lists:keydelete 以相同的方式工作,但删除找到的第一个元组(如果有的话)并返回剩余的列表:

5> lists:keydelete(a, 2, [{x,y,z},{b,b,b},{b,a,c},{q,r,s}]). [{x,y,z},{b,b,b},{q,r,s}]

lists:keysearch就像lists:keymember,但它返回{value,Tuple_Found}或原子false

lists模块中有许多非常有用的功能。

一个Erlang进程(在概念上)一直运行,直到它执行一个receive并且没有消息要在消息队列中接收。这里使用“概念上的”,因为Erlang系统共享系统中活动进程之间的CPU时间。

一个进程在没有其他任何事情的情况下终止,也就是说,它调用的最后一个函数只是返回并且不调用另一个函数。进程终止的另一种方式是调用它exit/1。这个论点exit/1具有特殊的含义,这在后面讨论。在这个例子中,exit(normal)已经完成了,它和用于调用函数的进程具有相同的效果。

BIF whereis(RegisteredName)检查是否存在已注册的名称进程RegisteredName。如果存在,则返回该进程的PID。如果它不存在,则原子undefined返回。

你现在应该能够理解messenger-module中的大部分代码。让我们详细研究一个案例:从一个用户向另一个用户发送消息。

第一个用户通过以下方式“发送”上述示例中的消息:

messenger:message(fred, "hello")

在测试客户端进程是否存在之后:

whereis(mess_client)

然后向mess_client*

mess_client ! {message_to, fred, "hello"}

客户端通过以下方式向服务器发送消息:

{messenger, messenger@super} ! {self(), message_to, fred, "hello"},

并等待服务器的回复。

服务器接收到此消息并调用:

server_transfer(From, fred, "hello", User_List),

这会检查该pid From是否在User_List

lists:keysearch(From, 1, User_List)

如果keysearch返回原子false,发生了一些错误,服务器将消息发回:

From ! {messenger, stop, you_are_not_logged_on}

这是客户收到的,而客户又收到exit(normal)并终止。如果keysearch返回,{value,{From,Name}}那么肯定是用户登录并且他的名字(peter)是可变的Name

现在让我们调用:

server_transfer(From, peter, fred, "hello", User_List)

请注意,这是server_transfer/5因为它与前面的函数不一样server_transfer/4。另一个工作keysearchUser_List查找与fred对应的客户端的pid:

lists:keysearch(fred, 2, User_List)

这次使用了参数2,它是元组中的第二个元素。如果这返回原子false,则fred不会登录并发送以下消息:

From ! {messenger, receiver_not_found};

这是客户端收到的。

如果keysearch返回:

{value, {ToPid, fred}}

以下消息发送给fred的客户端:

ToPid ! {message_from, peter, "hello"},

以下消息发送给peter的客户端:

From ! {messenger, sent}

Fred的客户收到该消息并打印出来:

{message_from, peter, "hello"} -> io:format("Message from ~p: ~p~n", [peter, "hello"])

Peter的客户端收到该await_result函数中的消息。