9. How to Implement a Driver
如何实现驱动程序
注
这部分是很久以前写的。它大部分仍然有效,因为它解释了重要的概念,但是这是为较旧的驱动程序接口编写的,因此这些示例不再适用。鼓励读者阅读erl_driver
和driver_entry
文档。
9.1简介
本节介绍如何为Erlang构建自己的驱动程序。
Erlang中的驱动程序是用C语言编写的库,它链接到Erlang仿真器并从Erlang调用。当C比Erlang更适合时,可以使用驱动程序来加快速度,或者提供对Erlang无法直接访问的操作系统资源的访问。
驱动程序可以作为共享库(在Windows上称为DLL)动态加载,或静态加载,并在编译和链接时与仿真程序链接。这里只介绍动态加载的驱动程序,静态链接的驱动程序不在本节的讨论范围之内。
警告
当驱动程序加载时,它将在模拟器的上下文中执行,共享相同的内存和相同的线程。这意味着驱动程序中的所有操作必须是非阻塞的,并且驱动程序中的任何崩溃都会导致整个仿真程序停机。总之,要小心。
9.2样本驱动器
本节介绍使用libpq C客户端库访问postgres数据库的简单驱动程序。使用Postgres是因为它是免费且开源的。有关postgres的信息,请参阅www.postgres.org
。
驱动程序是同步的,它使用客户端库的同步调用。这只是为了简单,但不好,因为它在等待数据库时暂停模拟器。这在异步样本驱动程序中得到了改进。
代码很简单:Erlang和驱动程序之间的所有通信都是通过port_control/3
,驱动程序使用rbuf
。
一个Erlang驱动程序只导出一个函数:驱动程序入口函数。这是用一个宏定义的DRIVER_INIT
,它返回一个指向struct
包含从仿真器调用的入口点的C的指针。的struct
定义了仿真器调用调用驱动程序,具有条目NULL
为没有定义,并且由驾驶员使用的条目的指针。
start
当驱动程序作为端口打开时,会调用该条目open_port/2
。这里我们为用户数据结构分配内存。每次模拟器调用我们时都会传递此用户数据。首先我们存储驱动程序句柄,因为它在以后的调用中需要。我们为LibPQ使用的连接句柄分配内存。我们还设置端口返回分配的驱动程序二进制文件,通过设置标志PORT_CONTROL_FLAG_BINARY
,调用set_port_control_flags
。(这是因为我们不知道我们的数据是否适合control
模拟器设置的默认大小64字节的结果缓冲区。)
init
当驱动程序加载时会调用一个条目。但是,我们不使用它,因为它只执行一次,并且我们希望可能有多个驱动程序实例。
stop
端口在端口关闭时被调用。
control
当Erlang代码调用时port_control/3
,该条目将从模拟器调用,以执行实际的工作。我们已经定义了一组简单的命令:connect
登录到数据库,disconnect
注销并select
发送SQL查询并获得结果。所有结果都通过返回rbuf
。库ei
中的库erl_interface
用于以二进制术语格式编码数据。结果以二进制形式返回到模拟器,因此binary_to_term
在Erlang中调用将结果转换为术语形式。
该代码可在pg_sync.c
在sample
的目录erts
。
驱动程序条目包含将由仿真器调用的函数。在这个例子中,只有start
,stop
以及control
提供:
/* Driver interface declarations */
static ErlDrvData start(ErlDrvPort port, char *command
static void stop(ErlDrvData drv_data
static int control(ErlDrvData drv_data, unsigned int command, char *buf,
int len, char **rbuf, int rlen
static ErlDrvEntry pq_driver_entry = {
NULL, /* init */
start,
stop,
NULL, /* output */
NULL, /* ready_input */
NULL, /* ready_output */
"pg_sync", /* the name of the driver */
NULL, /* finish */
NULL, /* handle */
control,
NULL, /* timeout */
NULL, /* outputv */
NULL, /* ready_async */
NULL, /* flush */
NULL, /* call */
NULL /* event */
};
我们有一个结构来存储驱动程序所需的状态,在这种情况下,我们只需要保持数据库连接:
typedef struct our_data_s {
PGconn* conn;
} our_data_t;
我们定义的控制代码如下:
/* Keep the following definitions in alignment with the
* defines in erl_pq_sync.erl
*/
#define DRV_CONNECT 'C'
#define DRV_DISCONNECT 'D'
#define DRV_SELECT 'S'
这将返回驱动程序结构。宏DRIVER_INIT
定义了唯一导出的函数。所有其他功能都是静态的,不会从库中导出。
/* INITIALIZATION AFTER LOADING */
/*
* This is the init function called after this driver has been loaded.
* It must *not* be declared static. Must return the address to
* the driver entry.
*/
DRIVER_INIT(pq_drv)
{
return &pq_driver_entry;
}
这里进行一些初始化,start
从中调用open_port
。数据将传递给control
和stop
。
/* DRIVER INTERFACE */
static ErlDrvData start(ErlDrvPort port, char *command)
{
our_data_t* data;
data = (our_data_t*)driver_alloc(sizeof(our_data_t)
data->conn = NULL;
set_port_control_flags(port, PORT_CONTROL_FLAG_BINARY
return (ErlDrvData)data;
}
我们称断开连接从数据库注销。(这应该是从Erlang完成的,但以防万一)。
static int do_disconnect(our_data_t* data, ei_x_buff* x
static void stop(ErlDrvData drv_data)
{
our_data_t* data = (our_data_t*)drv_data;
do_disconnect(data, NULL
driver_free(data
}
我们只使用二进制格式将数据返回给仿真器; 输入数据为字符串参数connect
和select
。返回的数据由Erlang条款组成。
函数get_s
和ei_x_to_new_binary
是用于使代码更短的工具。get_s
重复字符串并将其零终止,因为postgres客户端库需要它。ei_x_to_new_binary
需要一个ei_x_buff
缓冲区,分配一个二进制文件,并在那里复制数据。此二进制文件返回*rbuf
。(请注意,这个二进制文件被模拟器释放,而不是由我们释放。)
static char* get_s(const char* buf, int len
static int do_connect(const char *s, our_data_t* data, ei_x_buff* x
static int do_select(const char* s, our_data_t* data, ei_x_buff* x
/* As we are operating in binary mode, the return value from control
* is irrelevant, as long as it is not negative.
*/
static int control(ErlDrvData drv_data, unsigned int command, char *buf,
int len, char **rbuf, int rlen)
{
int r;
ei_x_buff x;
our_data_t* data = (our_data_t*)drv_data;
char* s = get_s(buf, len
ei_x_new_with_version(&x
switch (command) {
case DRV_CONNECT: r = do_connect(s, data, &x break;
case DRV_DISCONNECT: r = do_disconnect(data, &x break;
case DRV_SELECT: r = do_select(s, data, &x break;
default: r = -1; break;
}
*rbuf = (char*)ei_x_to_new_binary(&x
ei_x_free(&x
driver_free(s
return r;
}
do_connect
是我们登录到数据库的地方。如果连接成功,我们将连接句柄存储在驱动程序数据中,然后返回'ok'
。否则,我们从postgres返回错误消息并存储NULL
在驱动程序数据中。
static int do_connect(const char *s, our_data_t* data, ei_x_buff* x)
{
PGconn* conn = PQconnectdb(s
if (PQstatus(conn) != CONNECTION_OK) {
encode_error(x, conn
PQfinish(conn
conn = NULL;
} else {
encode_ok(x
}
data->conn = conn;
return 0;
}
如果我们连接(如果连接句柄不是NULL
),我们从数据库注销。我们需要检查是否应该编码一个'ok'
,因为我们可以从函数中得到这个函数stop
,它不会将数据返回给模拟器:
static int do_disconnect(our_data_t* data, ei_x_buff* x)
{
if (data->conn == NULL)
return 0;
PQfinish(data->conn
data->conn = NULL;
if (x != NULL)
encode_ok(x
return 0;
}
我们执行查询并对结果进行编码。编码在另一个C模块中完成,该模块pg_encode.c
也作为示例代码提供。
static int do_select(const char* s, our_data_t* data, ei_x_buff* x)
{
PGresult* res = PQexec(data->conn, s
encode_result(x, res, data->conn
PQclear(res
return 0;
}
在这里我们检查postgres的结果。如果它是数据,我们将它编码为带列数据的列表列表。postgres中的所有东西都是C字符串,所以我们用ei_x_encode_string
结果作为字符串发送给Erlang。(列表的头部包含列名称。)
void encode_result(ei_x_buff* x, PGresult* res, PGconn* conn)
{
int row, n_rows, col, n_cols;
switch (PQresultStatus(res)) {
case PGRES_TUPLES_OK:
n_rows = PQntuples(res
n_cols = PQnfields(res
ei_x_encode_tuple_header(x, 2
encode_ok(x
ei_x_encode_list_header(x, n_rows+1
ei_x_encode_list_header(x, n_cols
for (col = 0; col < n_cols; ++col) {
ei_x_encode_string(x, PQfname(res, col)
}
ei_x_encode_empty_list(x
for (row = 0; row < n_rows; ++row) {
ei_x_encode_list_header(x, n_cols
for (col = 0; col < n_cols; ++col) {
ei_x_encode_string(x, PQgetvalue(res, row, col)
}
ei_x_encode_empty_list(x
}
ei_x_encode_empty_list(x
break;
case PGRES_COMMAND_OK:
ei_x_encode_tuple_header(x, 2
encode_ok(x
ei_x_encode_string(x, PQcmdTuples(res)
break;
default:
encode_error(x, conn
break;
}
}
9.3编译和链接示例驱动程序
该驱动程序将被编译并链接到共享库(Windows上的DLL)。使用gcc,这是通过链接标志-shared
和-fpic
。在我们使用ei
图书馆时,我们也应该包括它。有几个版本ei
,针对调试或非调试以及多线程或单线程进行编译。在示例的makefile中,该obj
目录用于ei
库,这意味着我们使用非调试单线程版本。
9.4在Erlang中调用驱动程序作为端口
在从Erlang调用驱动程序之前,必须先加载并打开驱动程序。加载是使用erl_ddll
模块完成的(erl_ddll
加载动态驱动程序的驱动程序实际上是驱动程序本身)。如果加载成功,端口可以打开open_port/2
。端口名称必须与共享库的名称和驱动程序条目结构中的名称匹配。
当端口打开后,可以调用驱动程序。在这个pg_sync
例子中,我们没有来自端口的任何数据,只有来自该端口的返回值port_control
。
以下代码是同步postgres驱动程序的Erlang部分pg_sync.erl
:
-module(pg_sync).
-define(DRV_CONNECT, 1).
-define(DRV_DISCONNECT, 2).
-define(DRV_SELECT, 3).
-export([connect/1, disconnect/1, select/2]).
connect(ConnectStr) ->
case erl_ddll:load_driver(".", "pg_sync") of
ok -> ok;
{error, already_loaded} -> ok;
E -> exit{error, E})
end,
Port = open_port{spawn, ?MODULE}, []),
case binary_to_term(port_control(Port, ?DRV_CONNECT, ConnectStr)) of
ok -> {ok, Port};
Error -> Error
end.
disconnect(Port) ->
R = binary_to_term(port_control(Port, ?DRV_DISCONNECT, "")),
port_close(Port),
R.
select(Port, Query) ->
binary_to_term(port_control(Port, ?DRV_SELECT, Query)).
API很简单:
connect/1
加载驱动程序,打开它并登录到数据库,如果成功则返回Erlang端口。
select/2
向驱动程序发送一个查询并返回结果。
disconnect/1
关闭数据库连接和驱动程序。(但是,它不会卸载它。)
连接字符串将成为postgres的连接字符串。
驱动程序已加载erl_ddll:load_driver/2
。如果这是成功的,或者如果它已经被加载,则它被打开。这将调用start
驱动程序中的函数。
我们使用该port_control/3
功能来调用所有的驱动程序。驱动程序的结果立即返回并通过调用转换为条款binary_to_term/1
。(我们相信司机返回的条款是完好的,否则这些binary_to_term
电话可能被包含在catch
中。)
9.5采样异步驱动器
有时,数据库查询可能需要很长时间才能完成,在我们的pg_sync
驱动程序中,仿真程序在驱动程序完成其工作时暂停。这通常是不可接受的,因为没有其他Erlang进程有机会做任何事情。为了改进我们的postgres驱动程序,我们使用LibPQ中的异步调用重新实现它。
驱动程序的异步版本是示例文件pg_async.c
和pg_asyng.erl
。
/* Driver interface declarations */
static ErlDrvData start(ErlDrvPort port, char *command
static void stop(ErlDrvData drv_data
static int control(ErlDrvData drv_data, unsigned int command, char *buf,
int len, char **rbuf, int rlen
static void ready_io(ErlDrvData drv_data, ErlDrvEvent event
static ErlDrvEntry pq_driver_entry = {
NULL, /* init */
start,
stop,
NULL, /* output */
ready_io, /* ready_input */
ready_io, /* ready_output */
"pg_async", /* the name of the driver */
NULL, /* finish */
NULL, /* handle */
control,
NULL, /* timeout */
NULL, /* outputv */
NULL, /* ready_async */
NULL, /* flush */
NULL, /* call */
NULL /* event */
};
typedef struct our_data_t {
PGconn* conn;
ErlDrvPort port;
int socket;
int connecting;
} our_data_t;
有些东西已经从改变pg_sync.c
:我们使用的条目ready_io
为ready_input
和ready_output
,这是从模拟器称为只有当输入要从插槽读取。(实际上,套接字被用在select
模拟器内部的一个函数中,并且当套接字发信号时,指示有数据要读取,这个ready_input
条目被调用。
我们的驱动程序数据也得到了扩展,我们跟踪用于与postgres进行通信的套接字以及我们向端口发送数据时所需的端口driver_output
。我们有一个标志connecting
,告诉驱动程序是否正在等待连接或等待查询结果。(这是必需的,因为ready_io
连接时和查询结果时都会调用该条目。)
static int do_connect(const char *s, our_data_t* data)
{
PGconn* conn = PQconnectStart(s
if (PQstatus(conn) == CONNECTION_BAD) {
ei_x_buff x;
ei_x_new_with_version(&x
encode_error(&x, conn
PQfinish(conn
conn = NULL;
driver_output(data->port, x.buff, x.index
ei_x_free(&x
}
PQconnectPoll(conn
int socket = PQsocket(conn
data->socket = socket;
driver_select(data->port, (ErlDrvEvent)socket, DO_READ, 1
driver_select(data->port, (ErlDrvEvent)socket, DO_WRITE, 1
data->conn = conn;
data->connecting = 1;
return 0;
}
connect
功能看起来有点不同。我们使用异步PQconnectStart
功能进行连接。连接开始后,我们检索与之连接的套接字PQsocket
。该套接字与该driver_select
功能一起用于等待连接。当套接字准备好输入或输出时,ready_io
调用该函数。
注意,我们只返回数据%28与driver_output
%29如果这里有错误,则等待连接完成,在这种情况下,我们将ready_io
函数被调用。
static int do_select(const char* s, our_data_t* data)
{
data->connecting = 0;
PGconn* conn = data->conn;
/* if there's an error return it now */
if (PQsendQuery(conn, s) == 0) {
ei_x_buff x;
ei_x_new_with_version(&x
encode_error(&x, conn
driver_output(data->port, x.buff, x.index
ei_x_free(&x
}
/* else wait for ready_output to get results */
return 0;
}
do_select
函数启动一个select,如果没有立即的错误,则返回。ready_io
调用时返回结果。
static void ready_io(ErlDrvData drv_data, ErlDrvEvent event)
{
PGresult* res = NULL;
our_data_t* data = (our_data_t*)drv_data;
PGconn* conn = data->conn;
ei_x_buff x;
ei_x_new_with_version(&x
if (data->connecting) {
ConnStatusType status;
PQconnectPoll(conn
status = PQstatus(conn
if (status == CONNECTION_OK)
encode_ok(&x
else if (status == CONNECTION_BAD)
encode_error(&x, conn
} else {
PQconsumeInput(conn
if (PQisBusy(conn))
return;
res = PQgetResult(conn
encode_result(&x, res, conn
PQclear(res
for (;;) {
res = PQgetResult(conn
if (res == NULL)
break;
PQclear(res
}
}
if (x.index > 1) {
driver_output(data->port, x.buff, x.index
if (data->connecting)
driver_select(data->port, (ErlDrvEvent)data->socket, DO_WRITE, 0
}
ei_x_free(&x
}
ready_io
当我们从postgres获得的套接字准备好输入或输出时,该函数被调用。在这里,我们首先检查我们是否连接到数据库。在这种情况下,我们检查连接状态,如果连接成功则返回OK,否则返回错误。如果连接尚未建立,我们只需返回; ready_io
再次被调用。
如果我们有一个连接的结果,通过x
缓冲区中的数据表示,我们不再需要选择output(ready_output
),所以我们通过调用来删除它driver_select
。
如果我们没有连接,我们等待a的结果PQsendQuery
,所以我们得到结果并返回。编码使用与前面示例中相同的功能完成。
在这里添加错误处理,例如,检查套接字是否仍然打开,但这只是一个简单的例子。
异步驱动程序的Erlang部分由示例文件组成pg_async.erl
。
-module(pg_async).
-define(DRV_CONNECT, $C).
-define(DRV_DISCONNECT, $D).
-define(DRV_SELECT, $S).
-export([connect/1, disconnect/1, select/2]).
connect(ConnectStr) ->
case erl_ddll:load_driver(".", "pg_async") of
ok -> ok;
{error, already_loaded} -> ok;
_ -> exit{error, could_not_load_driver})
end,
Port = open_port{spawn, ?MODULE}, [binary]),
port_control(Port, ?DRV_CONNECT, ConnectStr),
case return_port_data(Port) of
ok ->
{ok, Port};
Error ->
Error
end.
disconnect(Port) ->
port_control(Port, ?DRV_DISCONNECT, ""),
R = return_port_data(Port),
port_close(Port),
R.
select(Port, Query) ->
port_control(Port, ?DRV_SELECT, Query),
return_port_data(Port).
return_port_data(Port) ->
receive
{Port, {data, Data}} ->
binary_to_term(Data)
end.
Erlang代码略有不同,因为我们不会同步返回结果port_control
,而是从driver_output
消息队列中的数据中获取结果。上述功能return_port_data
从端口接收数据。由于数据是二进制格式,因此我们使用binary_to_term/1
它将其转换为Erlang术语。请注意,驱动程序以二进制模式打开(open_port/2
通过选项调用[binary]
)。这意味着从驱动程序发送到模拟器的数据将作为二进制文件发送。没有选择binary
,他们应该是整数列表。
9.6使用driver_async的异步驱动程序
作为最后一个例子,我们演示了如何使用driver_async。我们也使用驱动程序术语接口。该驱动程序是用C ++编写的。这使我们能够使用STL的算法。我们使用该next_permutation算法来获得整数列表的下一个排列。对于大型列表(> 100,000个元素),这需要一些时间,所以我们将其作为异步任务来执行。
驱动程序的异步API非常复杂。首先,工作必须做好准备。在这个例子中,这是在做output
。我们可以使用control
,但我们希望在示例中有一些变化。在我们的驱动程序中,我们分配了一个包含异步任务执行工作所需的任何内容的结构。这是在主模拟器线程中完成的。然后,异步函数从驱动程序线程调用,与主仿真器线程分开。请注意,驱动程序功能不可重入,因此不能使用它们。最后,函数完成后,驱动程序回调ready_async
将从主模拟器线程调用,这是我们将结果返回给Erlang的地方。(我们不能从异步函数中返回结果,因为我们不能调用驱动函数。)
以下代码来自示例文件next_perm.cc
。该驱动程序条目看起来像之前,但也包含回调ready_async
。
static ErlDrvEntry next_perm_driver_entry = {
NULL, /* init */
start,
NULL, /* stop */
output,
NULL, /* ready_input */
NULL, /* ready_output */
"next_perm", /* the name of the driver */
NULL, /* finish */
NULL, /* handle */
NULL, /* control */
NULL, /* timeout */
NULL, /* outputv */
ready_async,
NULL, /* flush */
NULL, /* call */
NULL /* event */
};
output
函数分配异步函数的工作区。在我们使用C ++的时候,我们使用一个结构体,并将数据填入其中。我们必须复制原始数据,从output
函数返回后该函数无效,do_perm
函数稍后调用,并从另一个线程调用。我们在这里没有返回任何数据,而是稍后从ready_async
回调中发送。
将async_data
被传递给do_perm
函数。我们不使用async_free
函数(最后一个参数driver_async
),它仅在以编程方式取消任务时使用。
struct our_async_data {
bool prev;
vector<int> data;
our_async_data(ErlDrvPort p, int command, const char* buf, int len
};
our_async_data::our_async_data(ErlDrvPort p, int command,
const char* buf, int len)
: prev(command == 2),
data((int*)buf, (int*)buf + len / sizeof(int))
{
}
static void do_perm(void* async_data
static void output(ErlDrvData drv_data, char *buf, int len)
{
if (*buf < 1 || *buf > 2) return;
ErlDrvPort port = reinterpret_cast<ErlDrvPort>(drv_data
void* async_data = new our_async_data(port, *buf, buf+1, len
driver_async(port, NULL, do_perm, async_data, do_free
}
在do_perm
我们做的工作中,按照分配的结构进行操作output
。
static void do_perm(void* async_data)
{
our_async_data* d = reinterpret_cast<our_async_data*>(async_data
if (d->prev)
prev_permutation(d->data.begin(), d->data.end()
else
next_permutation(d->data.begin(), d->data.end()
}
在ready_async
功能中,输出被发送回仿真器。我们使用驱动程序术语格式代替ei
。这是将Erlang术语直接发送给驱动程序的唯一方法,无需调用Erlang代码binary_to_term/1
。在简单的例子中,这很好,我们不需要ei
用来处理二进制术语格式。
当数据返回时,我们释放数据。
static void ready_async(ErlDrvData drv_data, ErlDrvThreadData async_data)
{
ErlDrvPort port = reinterpret_cast<ErlDrvPort>(drv_data
our_async_data* d = reinterpret_cast<our_async_data*>(async_data
int n = d->data.size(), result_n = n*2 + 3;
ErlDrvTermData *result = new ErlDrvTermData[result_n], *rp = result;
for (vector<int>::iterator i = d->data.begin(
i != d->data.end( ++i) {
*rp++ = ERL_DRV_INT;
*rp++ = *i;
}
*rp++ = ERL_DRV_NIL;
*rp++ = ERL_DRV_LIST;
*rp++ = n+1;
driver_output_term(port, result, result_n
delete[] result;
delete d;
}
这个驱动程序被称为Erlang的其他驱动程序。但是,当我们使用时driver_output_term
,不需要调用binary_to_term
。Erlang代码位于示例文件中next_perm.erl
。
将输入更改为整数列表并发送给驱动程序。
-module(next_perm).
-export([next_perm/1, prev_perm/1, load/0, all_perm/1]).
load() ->
case whereis(next_perm) of
undefined ->
case erl_ddll:load_driver(".", "next_perm") of
ok -> ok;
{error, already_loaded} -> ok;
E -> exit(E)
end,
Port = open_port{spawn, "next_perm"}, []),
register(next_perm, Port
_ ->
ok
end.
list_to_integer_binaries(L) ->
[<<I:32/integer-native>> || I <- L].
next_perm(L) ->
next_perm(L, 1).
prev_perm(L) ->
next_perm(L, 2).
next_perm(L, Nxt) ->
load(),
B = list_to_integer_binaries(L),
port_control(next_perm, Nxt, B),
receive
Result ->
Result
end.
all_perm(L) ->
New = prev_perm(L),
all_perm(New, L, [New]).
all_perm(L, L, Acc) ->
Acc;
all_perm(L, Orig, Acc) ->
New = prev_perm(L),
all_perm(New, Orig, [New | Acc]).