Erlang 20

9. How to Implement a Driver

如何实现驱动程序

这部分是很久以前写的。它大部分仍然有效,因为它解释了重要的概念,但是这是为较旧的驱动程序接口编写的,因此这些示例不再适用。鼓励读者阅读erl_driverdriver_entry文档。

9.1简介

本节介绍如何为Erlang构建自己的驱动程序。

Erlang中的驱动程序是用C语言编写的库,它链接到Erlang仿真器并从Erlang调用。当C比Erlang更适合时,可以使用驱动程序来加快速度,或者提供对Erlang无法直接访问的操作系统资源的访问。

驱动程序可以作为共享库(在Windows上称为DLL)动态加载,或静态加载,并在编译和链接时与仿真程序链接。这里只介绍动态加载的驱动程序,静态链接的驱动程序不在本节的讨论范围之内。

警告

当驱动程序加载时,它将在模拟器的上下文中执行,共享相同的内存和相同的线程。这意味着驱动程序中的所有操作必须是非阻塞的,并且驱动程序中的任何崩溃都会导致整个仿真程序停机。总之,要小心。

9.2样本驱动器

本节介绍使用libpq C客户端库访问postgres数据库的简单驱动程序。使用Postgres是因为它是免费且开源的。有关postgres的信息,请参阅www.postgres.org

驱动程序是同步的,它使用客户端库的同步调用。这只是为了简单,但不好,因为它在等待数据库时暂停模拟器。这在异步样本驱动程序中得到了改进。

代码很简单:Erlang和驱动程序之间的所有通信都是通过port_control/3,驱动程序使用rbuf

一个Erlang驱动程序只导出一个函数:驱动程序入口函数。这是用一个宏定义的DRIVER_INIT,它返回一个指向struct包含从仿真器调用的入口点的C的指针。的struct定义了仿真器调用调用驱动程序,具有条目NULL为没有定义,并且由驾驶员使用的条目的指针。

start当驱动程序作为端口打开时,会调用该条目open_port/2。这里我们为用户数据结构分配内存。每次模拟器调用我们时都会传递此用户数据。首先我们存储驱动程序句柄,因为它在以后的调用中需要。我们为LibPQ使用的连接句柄分配内存。我们还设置端口返回分配的驱动程序二进制文件,通过设置标志PORT_CONTROL_FLAG_BINARY,调用set_port_control_flags。(这是因为我们不知道我们的数据是否适合control模拟器设置的默认大小64字节的结果缓冲区。)

init当驱动程序加载时会调用一个条目。但是,我们不使用它,因为它只执行一次,并且我们希望可能有多个驱动程序实例。

stop端口在端口关闭时被调用。

control当Erlang代码调用时port_control/3,该条目将从模拟器调用,以执行实际的工作。我们已经定义了一组简单的命令:connect登录到数据库,disconnect注销并select发送SQL查询并获得结果。所有结果都通过返回rbuf。库ei中的库erl_interface用于以二进制术语格式编码数据。结果以二进制形式返回到模拟器,因此binary_to_term在Erlang中调用将结果转换为术语形式。

该代码可在pg_sync.csample的目录erts

驱动程序条目包含将由仿真器调用的函数。在这个例子中,只有startstop以及control提供:

/* Driver interface declarations */ static ErlDrvData start(ErlDrvPort port, char *command static void stop(ErlDrvData drv_data static int control(ErlDrvData drv_data, unsigned int command, char *buf, int len, char **rbuf, int rlen static ErlDrvEntry pq_driver_entry = { NULL, /* init */ start, stop, NULL, /* output */ NULL, /* ready_input */ NULL, /* ready_output */ "pg_sync", /* the name of the driver */ NULL, /* finish */ NULL, /* handle */ control, NULL, /* timeout */ NULL, /* outputv */ NULL, /* ready_async */ NULL, /* flush */ NULL, /* call */ NULL /* event */ };

我们有一个结构来存储驱动程序所需的状态,在这种情况下,我们只需要保持数据库连接:

typedef struct our_data_s { PGconn* conn; } our_data_t;

我们定义的控制代码如下:

/* Keep the following definitions in alignment with the * defines in erl_pq_sync.erl */ #define DRV_CONNECT 'C' #define DRV_DISCONNECT 'D' #define DRV_SELECT 'S'

这将返回驱动程序结构。宏DRIVER_INIT定义了唯一导出的函数。所有其他功能都是静态的,不会从库中导出。

/* INITIALIZATION AFTER LOADING */ /* * This is the init function called after this driver has been loaded. * It must *not* be declared static. Must return the address to * the driver entry. */ DRIVER_INIT(pq_drv) { return &pq_driver_entry; }

这里进行一些初始化,start从中调用open_port。数据将传递给controlstop

/* DRIVER INTERFACE */ static ErlDrvData start(ErlDrvPort port, char *command) { our_data_t* data; data = (our_data_t*)driver_alloc(sizeof(our_data_t) data->conn = NULL; set_port_control_flags(port, PORT_CONTROL_FLAG_BINARY return (ErlDrvData)data; }

我们称断开连接从数据库注销。(这应该是从Erlang完成的,但以防万一)。

static int do_disconnect(our_data_t* data, ei_x_buff* x static void stop(ErlDrvData drv_data) { our_data_t* data = (our_data_t*)drv_data; do_disconnect(data, NULL driver_free(data }

我们只使用二进制格式将数据返回给仿真器; 输入数据为字符串参数connectselect。返回的数据由Erlang条款组成。

函数get_sei_x_to_new_binary是用于使代码更短的工具。get_s重复字符串并将其零终止,因为postgres客户端库需要它。ei_x_to_new_binary需要一个ei_x_buff缓冲区,分配一个二进制文件,并在那里复制数据。此二进制文件返回*rbuf。(请注意,这个二进制文件被模拟器释放,而不是由我们释放。)

static char* get_s(const char* buf, int len static int do_connect(const char *s, our_data_t* data, ei_x_buff* x static int do_select(const char* s, our_data_t* data, ei_x_buff* x /* As we are operating in binary mode, the return value from control * is irrelevant, as long as it is not negative. */ static int control(ErlDrvData drv_data, unsigned int command, char *buf, int len, char **rbuf, int rlen) { int r; ei_x_buff x; our_data_t* data = (our_data_t*)drv_data; char* s = get_s(buf, len ei_x_new_with_version(&x switch (command) { case DRV_CONNECT: r = do_connect(s, data, &x break; case DRV_DISCONNECT: r = do_disconnect(data, &x break; case DRV_SELECT: r = do_select(s, data, &x break; default: r = -1; break; } *rbuf = (char*)ei_x_to_new_binary(&x ei_x_free(&x driver_free(s return r; }

do_connect是我们登录到数据库的地方。如果连接成功,我们将连接句柄存储在驱动程序数据中,然后返回'ok'。否则,我们从postgres返回错误消息并存储NULL在驱动程序数据中。

static int do_connect(const char *s, our_data_t* data, ei_x_buff* x) { PGconn* conn = PQconnectdb(s if (PQstatus(conn) != CONNECTION_OK) { encode_error(x, conn PQfinish(conn conn = NULL; } else { encode_ok(x } data->conn = conn; return 0; }

如果我们连接(如果连接句柄不是NULL),我们从数据库注销。我们需要检查是否应该编码一个'ok',因为我们可以从函数中得到这个函数stop,它不会将数据返回给模拟器:

static int do_disconnect(our_data_t* data, ei_x_buff* x) { if (data->conn == NULL) return 0; PQfinish(data->conn data->conn = NULL; if (x != NULL) encode_ok(x return 0; }

我们执行查询并对结果进行编码。编码在另一个C模块中完成,该模块pg_encode.c也作为示例代码提供。

static int do_select(const char* s, our_data_t* data, ei_x_buff* x) { PGresult* res = PQexec(data->conn, s encode_result(x, res, data->conn PQclear(res return 0; }

在这里我们检查postgres的结果。如果它是数据,我们将它编码为带列数据的列表列表。postgres中的所有东西都是C字符串,所以我们用ei_x_encode_string结果作为字符串发送给Erlang。(列表的头部包含列名称。)

void encode_result(ei_x_buff* x, PGresult* res, PGconn* conn) { int row, n_rows, col, n_cols; switch (PQresultStatus(res)) { case PGRES_TUPLES_OK: n_rows = PQntuples(res n_cols = PQnfields(res ei_x_encode_tuple_header(x, 2 encode_ok(x ei_x_encode_list_header(x, n_rows+1 ei_x_encode_list_header(x, n_cols for (col = 0; col < n_cols; ++col) { ei_x_encode_string(x, PQfname(res, col) } ei_x_encode_empty_list(x for (row = 0; row < n_rows; ++row) { ei_x_encode_list_header(x, n_cols for (col = 0; col < n_cols; ++col) { ei_x_encode_string(x, PQgetvalue(res, row, col) } ei_x_encode_empty_list(x } ei_x_encode_empty_list(x break; case PGRES_COMMAND_OK: ei_x_encode_tuple_header(x, 2 encode_ok(x ei_x_encode_string(x, PQcmdTuples(res) break; default: encode_error(x, conn break; } }

9.3编译和链接示例驱动程序

该驱动程序将被编译并链接到共享库(Windows上的DLL)。使用gcc,这是通过链接标志-shared-fpic。在我们使用ei图书馆时,我们也应该包括它。有几个版本ei,针对调试或非调试以及多线程或单线程进行编译。在示例的makefile中,该obj目录用于ei库,这意味着我们使用非调试单线程版本。

9.4在Erlang中调用驱动程序作为端口

在从Erlang调用驱动程序之前,必须先加载并打开驱动程序。加载是使用erl_ddll模块完成的(erl_ddll加载动态驱动程序的驱动程序实际上是驱动程序本身)。如果加载成功,端口可以打开open_port/2。端口名称必须与共享库的名称和驱动程序条目结构中的名称匹配。

当端口打开后,可以调用驱动程序。在这个pg_sync例子中,我们没有来自端口的任何数据,只有来自该端口的返回值port_control

以下代码是同步postgres驱动程序的Erlang部分pg_sync.erl

-module(pg_sync). -define(DRV_CONNECT, 1). -define(DRV_DISCONNECT, 2). -define(DRV_SELECT, 3). -export([connect/1, disconnect/1, select/2]). connect(ConnectStr) -> case erl_ddll:load_driver(".", "pg_sync") of ok -> ok; {error, already_loaded} -> ok; E -> exit{error, E}) end, Port = open_port{spawn, ?MODULE}, []), case binary_to_term(port_control(Port, ?DRV_CONNECT, ConnectStr)) of ok -> {ok, Port}; Error -> Error end. disconnect(Port) -> R = binary_to_term(port_control(Port, ?DRV_DISCONNECT, "")), port_close(Port), R. select(Port, Query) -> binary_to_term(port_control(Port, ?DRV_SELECT, Query)).

API很简单:

  • connect/1 加载驱动程序,打开它并登录到数据库,如果成功则返回Erlang端口。

  • select/2 向驱动程序发送一个查询并返回结果。

  • disconnect/1关闭数据库连接和驱动程序。(但是,它不会卸载它。)

连接字符串将成为postgres的连接字符串。

驱动程序已加载erl_ddll:load_driver/2。如果这是成功的,或者如果它已经被加载,则它被打开。这将调用start驱动程序中的函数。

我们使用该port_control/3功能来调用所有的驱动程序。驱动程序的结果立即返回并通过调用转换为条款binary_to_term/1。(我们相信司机返回的条款是完好的,否则这些binary_to_term电话可能被包含在catch中。)

9.5采样异步驱动器

有时,数据库查询可能需要很长时间才能完成,在我们的pg_sync驱动程序中,仿真程序在驱动程序完成其工作时暂停。这通常是不可接受的,因为没有其他Erlang进程有机会做任何事情。为了改进我们的postgres驱动程序,我们使用LibPQ中的异步调用重新实现它。

驱动程序的异步版本是示例文件pg_async.cpg_asyng.erl

/* Driver interface declarations */ static ErlDrvData start(ErlDrvPort port, char *command static void stop(ErlDrvData drv_data static int control(ErlDrvData drv_data, unsigned int command, char *buf, int len, char **rbuf, int rlen static void ready_io(ErlDrvData drv_data, ErlDrvEvent event static ErlDrvEntry pq_driver_entry = { NULL, /* init */ start, stop, NULL, /* output */ ready_io, /* ready_input */ ready_io, /* ready_output */ "pg_async", /* the name of the driver */ NULL, /* finish */ NULL, /* handle */ control, NULL, /* timeout */ NULL, /* outputv */ NULL, /* ready_async */ NULL, /* flush */ NULL, /* call */ NULL /* event */ }; typedef struct our_data_t { PGconn* conn; ErlDrvPort port; int socket; int connecting; } our_data_t;

有些东西已经从改变pg_sync.c:我们使用的条目ready_ioready_inputready_output,这是从模拟器称为只有当输入要从插槽读取。(实际上,套接字被用在select模拟器内部的一个函数中,并且当套接字发信号时,指示有数据要读取,这个ready_input条目被调用。

我们的驱动程序数据也得到了扩展,我们跟踪用于与postgres进行通信的套接字以及我们向端口发送数据时所需的端口driver_output。我们有一个标志connecting,告诉驱动程序是否正在等待连接或等待查询结果。(这是必需的,因为ready_io连接时和查询结果时都会调用该条目。)

static int do_connect(const char *s, our_data_t* data) { PGconn* conn = PQconnectStart(s if (PQstatus(conn) == CONNECTION_BAD) { ei_x_buff x; ei_x_new_with_version(&x encode_error(&x, conn PQfinish(conn conn = NULL; driver_output(data->port, x.buff, x.index ei_x_free(&x } PQconnectPoll(conn int socket = PQsocket(conn data->socket = socket; driver_select(data->port, (ErlDrvEvent)socket, DO_READ, 1 driver_select(data->port, (ErlDrvEvent)socket, DO_WRITE, 1 data->conn = conn; data->connecting = 1; return 0; }

connect功能看起来有点不同。我们使用异步PQconnectStart功能进行连接。连接开始后,我们检索与之连接的套接字PQsocket。该套接字与该driver_select功能一起用于等待连接。当套接字准备好输入或输出时,ready_io调用该函数。

注意,我们只返回数据%28与driver_output%29如果这里有错误,则等待连接完成,在这种情况下,我们将ready_io函数被调用。

static int do_select(const char* s, our_data_t* data) { data->connecting = 0; PGconn* conn = data->conn; /* if there's an error return it now */ if (PQsendQuery(conn, s) == 0) { ei_x_buff x; ei_x_new_with_version(&x encode_error(&x, conn driver_output(data->port, x.buff, x.index ei_x_free(&x } /* else wait for ready_output to get results */ return 0; }

do_select函数启动一个select,如果没有立即的错误,则返回。ready_io调用时返回结果。

static void ready_io(ErlDrvData drv_data, ErlDrvEvent event) { PGresult* res = NULL; our_data_t* data = (our_data_t*)drv_data; PGconn* conn = data->conn; ei_x_buff x; ei_x_new_with_version(&x if (data->connecting) { ConnStatusType status; PQconnectPoll(conn status = PQstatus(conn if (status == CONNECTION_OK) encode_ok(&x else if (status == CONNECTION_BAD) encode_error(&x, conn } else { PQconsumeInput(conn if (PQisBusy(conn)) return; res = PQgetResult(conn encode_result(&x, res, conn PQclear(res for (;;) { res = PQgetResult(conn if (res == NULL) break; PQclear(res } } if (x.index > 1) { driver_output(data->port, x.buff, x.index if (data->connecting) driver_select(data->port, (ErlDrvEvent)data->socket, DO_WRITE, 0 } ei_x_free(&x }

ready_io当我们从postgres获得的套接字准备好输入或输出时,该函数被调用。在这里,我们首先检查我们是否连接到数据库。在这种情况下,我们检查连接状态,如果连接成功则返回OK,否则返回错误。如果连接尚未建立,我们只需返回; ready_io再次被调用。

如果我们有一个连接的结果,通过x缓冲区中的数据表示,我们不再需要选择output(ready_output),所以我们通过调用来删除它driver_select

如果我们没有连接,我们等待a的结果PQsendQuery,所以我们得到结果并返回。编码使用与前面示例中相同的功能完成。

在这里添加错误处理,例如,检查套接字是否仍然打开,但这只是一个简单的例子。

异步驱动程序的Erlang部分由示例文件组成pg_async.erl

-module(pg_async). -define(DRV_CONNECT, $C). -define(DRV_DISCONNECT, $D). -define(DRV_SELECT, $S). -export([connect/1, disconnect/1, select/2]). connect(ConnectStr) -> case erl_ddll:load_driver(".", "pg_async") of ok -> ok; {error, already_loaded} -> ok; _ -> exit{error, could_not_load_driver}) end, Port = open_port{spawn, ?MODULE}, [binary]), port_control(Port, ?DRV_CONNECT, ConnectStr), case return_port_data(Port) of ok -> {ok, Port}; Error -> Error end. disconnect(Port) -> port_control(Port, ?DRV_DISCONNECT, ""), R = return_port_data(Port), port_close(Port), R. select(Port, Query) -> port_control(Port, ?DRV_SELECT, Query), return_port_data(Port). return_port_data(Port) -> receive {Port, {data, Data}} -> binary_to_term(Data) end.

Erlang代码略有不同,因为我们不会同步返回结果port_control,而是从driver_output消息队列中的数据中获取结果。上述功能return_port_data从端口接收数据。由于数据是二进制格式,因此我们使用binary_to_term/1它将其转换为Erlang术语。请注意,驱动程序以二进制模式打开(open_port/2通过选项调用[binary])。这意味着从驱动程序发送到模拟器的数据将作为二进制文件发送。没有选择binary,他们应该是整数列表。

9.6使用driver_async的异步驱动程序

作为最后一个例子,我们演示了如何使用driver_async。我们也使用驱动程序术语接口。该驱动程序是用C ++编写的。这使我们能够使用STL的算法。我们使用该next_permutation算法来获得整数列表的下一个排列。对于大型列表(> 100,000个元素),这需要一些时间,所以我们将其作为异步任务来执行。

驱动程序的异步API非常复杂。首先,工作必须做好准备。在这个例子中,这是在做output。我们可以使用control,但我们希望在示例中有一些变化。在我们的驱动程序中,我们分配了一个包含异步任务执行工作所需的任何内容的结构。这是在主模拟器线程中完成的。然后,异步函数从驱动程序线程调用,与主仿真器线程分开。请注意,驱动程序功能不可重入,因此不能使用它们。最后,函数完成后,驱动程序回调ready_async将从主模拟器线程调用,这是我们将结果返回给Erlang的地方。(我们不能从异步函数中返回结果,因为我们不能调用驱动函数。)

以下代码来自示例文件next_perm.cc。该驱动程序条目看起来像之前,但也包含回调ready_async

static ErlDrvEntry next_perm_driver_entry = { NULL, /* init */ start, NULL, /* stop */ output, NULL, /* ready_input */ NULL, /* ready_output */ "next_perm", /* the name of the driver */ NULL, /* finish */ NULL, /* handle */ NULL, /* control */ NULL, /* timeout */ NULL, /* outputv */ ready_async, NULL, /* flush */ NULL, /* call */ NULL /* event */ };

output函数分配异步函数的工作区。在我们使用C ++的时候,我们使用一个结构体,并将数据填入其中。我们必须复制原始数据,从output函数返回后该函数无效,do_perm函数稍后调用,并从另一个线程调用。我们在这里没有返回任何数据,而是稍后从ready_async回调中发送。

async_data被传递给do_perm函数。我们不使用async_free函数(最后一个参数driver_async),它仅在以编程方式取消任务时使用。

struct our_async_data { bool prev; vector<int> data; our_async_data(ErlDrvPort p, int command, const char* buf, int len }; our_async_data::our_async_data(ErlDrvPort p, int command, const char* buf, int len) : prev(command == 2), data((int*)buf, (int*)buf + len / sizeof(int)) { } static void do_perm(void* async_data static void output(ErlDrvData drv_data, char *buf, int len) { if (*buf < 1 || *buf > 2) return; ErlDrvPort port = reinterpret_cast<ErlDrvPort>(drv_data void* async_data = new our_async_data(port, *buf, buf+1, len driver_async(port, NULL, do_perm, async_data, do_free }

do_perm我们做的工作中,按照分配的结构进行操作output

static void do_perm(void* async_data) { our_async_data* d = reinterpret_cast<our_async_data*>(async_data if (d->prev) prev_permutation(d->data.begin(), d->data.end() else next_permutation(d->data.begin(), d->data.end() }

ready_async功能中,输出被发送回仿真器。我们使用驱动程序术语格式代替ei。这是将Erlang术语直接发送给驱动程序的唯一方法,无需调用Erlang代码binary_to_term/1。在简单的例子中,这很好,我们不需要ei用来处理二进制术语格式。

当数据返回时,我们释放数据。

static void ready_async(ErlDrvData drv_data, ErlDrvThreadData async_data) { ErlDrvPort port = reinterpret_cast<ErlDrvPort>(drv_data our_async_data* d = reinterpret_cast<our_async_data*>(async_data int n = d->data.size(), result_n = n*2 + 3; ErlDrvTermData *result = new ErlDrvTermData[result_n], *rp = result; for (vector<int>::iterator i = d->data.begin( i != d->data.end( ++i) { *rp++ = ERL_DRV_INT; *rp++ = *i; } *rp++ = ERL_DRV_NIL; *rp++ = ERL_DRV_LIST; *rp++ = n+1; driver_output_term(port, result, result_n delete[] result; delete d; }

这个驱动程序被称为Erlang的其他驱动程序。但是,当我们使用时driver_output_term,不需要调用binary_to_term。Erlang代码位于示例文件中next_perm.erl

将输入更改为整数列表并发送给驱动程序。

-module(next_perm). -export([next_perm/1, prev_perm/1, load/0, all_perm/1]). load() -> case whereis(next_perm) of undefined -> case erl_ddll:load_driver(".", "next_perm") of ok -> ok; {error, already_loaded} -> ok; E -> exit(E) end, Port = open_port{spawn, "next_perm"}, []), register(next_perm, Port _ -> ok end. list_to_integer_binaries(L) -> [<<I:32/integer-native>> || I <- L]. next_perm(L) -> next_perm(L, 1). prev_perm(L) -> next_perm(L, 2). next_perm(L, Nxt) -> load(), B = list_to_integer_binaries(L), port_control(next_perm, Nxt, B), receive Result -> Result end. all_perm(L) -> New = prev_perm(L), all_perm(New, L, [New]). all_perm(L, L, Acc) -> Acc; all_perm(L, Orig, Acc) -> New = prev_perm(L), all_perm(New, Orig, [New | Acc]).