摘自:http://www.daileinote.com/computer/openresty/12

Openresty 提供了强大的 cosocket 功能,它源自 nginx 的 upstream 机制,但又超越了 upstream 机制,结合了 nginx 的事件机制和 Lua 协程特性,以同步非阻塞模式实现 socket 编程,可以高效的与后端服务器通信,而且还支持连接池功能。

ngx.socket.tcp

1
2
3
syntax: tcpsock = ngx.socket.tcp()
context: rewrite_by_lua*, access_by_lua*, content_by_lua*, ngx.timer.*, \
ssl_certificate_by_lua*, ssl_session_fetch_by_lua*

创建一个 tcp 的 cosocket 对象,使用完后最好手动关闭或者通过 setkeepalive 存入连接池。如果没有关闭或者存入连接池,那么如下2中情况发生也会关闭。

1.当前请求的 Lua 句柄结束。
2.被 Lua GC 垃圾回收。

tcpsock:connect

1
syntax: ok, err = tcpsock:connect(host, port, options_table?)

向远程发起 tcp 或者 unix domain 连接,首先会检查连接池,如果有会优先使用而不会创建新连接,出错返回 nil。

如果使用域名,那么需要配置 resolver,如果返回多个ip,那么会随机选择一个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
resolver 8.8.8.8;  # use Google's public DNS nameserver
location /test {
resolver 8.8.8.8;

content_by_lua_block {
local sock = ngx.socket.tcp()
local ok, err = sock:connect("www.google.com", 80)
if not ok then
ngx.say("failed to connect to google: ", err)
return
end
ngx.say("successfully connected to google!")
sock:close()
}
}

unix domain 例子

1
2
3
4
5
6
local sock = ngx.socket.tcp()
local ok, err = sock:connect("unix:/tmp/memcached.sock")
if not ok then
ngx.say("failed to connect to the memcached unix domain socket: ", err)
return
end

连接超时可以由 lua_socket_connect_timeout 设置全局,也可以由 settimeout 来设置单个,后面那个优先级高。

1
2
3
local sock = ngx.socket.tcp()
sock:settimeout(1000) -- one second timeout
local ok, err = sock:connect(host, port)

如果重复调用连接,那么之前的连接将会关闭。

tcpsock:sslhandshake

1
2
syntax: session, err = tcpsock:sslhandshake(reused_session?, server_name?, 
ssl_verify?, send_status_req?)

发起 ssl/tls 握手

tcpsock:send

1
syntax: bytes, err = tcpsock:send(data)

同步非阻塞发送数据,返回发送的字节数,失败返回 nil。data可以是字符串或者是包含多个字符串的表格,如果表格里有多个字符串待发送,直接放在表格里发送可以省去在 lua 里连接字符串的开销。

发送超时可以由 lua_socket_send_timeout 设置全局,也可以由 settimeout 来设置单个,后面那个优先级高。

1
2
sock:settimeout(1000)  -- one second timeout
local bytes, err = sock:send(request)

发生错误会自动关闭连接。

tcpsock:receive

1
syntax: data, err, partial = tcpsock:receive(size)

同步非阻塞接收数据,成功返回接收到的数据,失败返回 nil。

如果 size 是数字,那么读取到 size 个字节后才返回,或者报错返回。

如果 size 为以下值:

*a:读取数据直到连接关闭。
*l:读取一行,换行符不会返回

如果没指定,默认为读取一行。

读取超时可以由 lua_socket_read_timeout 设置全局,也可以由 settimeout 来设置单个,后面那个优先级高。

1
2
3
4
5
6
7
sock:settimeout(1000)  -- one second timeout
local line, err, partial = sock:receive()
if not line then
ngx.say("failed to read a line: ", err)
return
end
ngx.say("successfully read a line: ", line)

超时错误不会关闭连接,其他错误会自动关闭连接。

tcpsock:receiveany

1
syntax: data, err = tcpsock:receiveany(max)

同步非阻塞接收数据,最大为 max 字节,成功返回接收到的数据,失败返回 nil。

如果接收的数据超过了 max,那么值返回 max 字节,其他的数据在下次读取时返回。

读取超时可以由 lua_socket_read_timeout 设置全局,也可以由 settimeout 来设置单个,后面那个优先级高。

1
2
3
4
5
6
7
sock:settimeouts(1000, 1000, 1000)  -- 1秒超时,同时设置3个 connect/read/write
local data, err = sock:receiveany(10 * 1024 * 1024) -- read any data, at most 10K
if not data then
ngx.say("failed to read any data:",err)
return
end
ngx.say("successful:", data)

超时错误不会关闭连接,其他错误会自动关闭连接。

tcpsock:receiveuntil

1
syntax: iterator = tcpsock:receiveuntil(pattern, options?)

返回迭代器函数,可以持续调用来读取数据。

1
2
3
4
5
6
local reader = sock:receiveuntil("\r\n--abcedhb")
local data, err, partial = reader()
if not data then
ngx.say("failed to read the data stream: ", err)
end
ngx.say("read the data stream: ", data)

如果接收到的数据为 'hello, world! -agentzh\r\n--abcedhb blah blah',那么将返回 'hello, world! -agentzh'。

迭代器函数可以指定 size ,暗示读取多少个字节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 local reader = sock:receiveuntil("\r\n--abcedhb")

while true do
local data, err, partial = reader(4)
if not data then
if err then
ngx.say("failed to read the data stream: ", err)
break
end

ngx.say("read done")
break
end
ngx.say("read chunk: [", data, "]")
end
read chunk: [hell]
read chunk: [o, w]
read chunk: [orld]
read chunk: [! -a]
read chunk: [gent]
read chunk: [zh]
read done

读取超时可以由 lua_socket_read_timeout 设置全局,也可以由 settimeout 来设置单个,后面那个优先级高。

1
2
3
4
5
6
7
8
9
local readline = sock:receiveuntil("\r\n")

sock:settimeout(1000) -- one second timeout
line, err, partial = readline()
if not line then
ngx.say("failed to read a line: ", err)
return
end
ngx.say("successfully read a line: ", line)

如果在 option 参数里指定,inclusive = true,那么里面的分割字符串也会返回。

1
2
3
local reader = tcpsock:receiveuntil("_END_", { inclusive = true })
local data = reader()
ngx.say(data)

比如发送 'hello world END blah blah blah',那么将受到 'hello world END'。

超时错误不会关闭连接,其他错误会自动关闭连接。

tcpsock:close

1
syntax: ok, err = tcpsock:close()

主动关闭连接,成功返回 1,失败返回 nil。

如果调用了 setkeepalive,则无需调用 close。

tcpsock:settimeout

1
syntax: tcpsock:settimeout(time)

设置超时时间,单位毫秒,优先级高于全局设置 lua_socket_connect_timeout、lua_socket_send_timeout、and lua_socket_read_timeout。

这个函数不会影响 lua_socket_keepalive_timeout 设置,应该由 setkeepalive(timeout) 来设置。

tcpsock:settimeouts

1
syntax: tcpsock:settimeouts(connect_timeout, send_timeout, read_timeout)

同时设置连接、发送、接收的超时时间,单位毫秒。

tcpsock:setoption

1
syntax: tcpsock:setoption(option, value?)

设置连接参数,目前暂时没有实现。

tcpsock:setkeepalive

1
syntax: ok, err = tcpsock:setkeepalive(timeout?, size?)

将连接存入连接池,成功返回 1,失败返回 nil。调用该函数了,就没必要再调用 close()。

注意:这个设置是针对单个 worker 进程的。

timeout 指定在连接池里多久没被使用则关闭的超时时间,单位毫秒,默认为 lua_socket_keepalive_timeout 设置的时间。

size 连接池可以存放多少个相同key的连接。tcp下key为host-port。unix domain下key为文件路径。超过了该数量,最旧的连接将会被关闭以便腾出空间。默认为 lua_socket_pool_size 设置的大小。

如果当前连接缓冲区还有未读取完的数据,那么将会报错,err为 'connection in dubious state'。

tcpsock:getreusedtimes

1
syntax: count, err = tcpsock:getreusedtimes()

返回当前连接被重用的次数,如果是新连接或者是更本没有存入到连接池,总是返回0。如果该连接是从连接池里拿出来的,肯定返回大于0的数值。所以可以用来判断当前连接是否来自连接池。失败返回 nil。

ngx.socket.connect

1
2
syntax: tcpsock, err = ngx.socket.connect(host, port)
syntax: tcpsock, err = ngx.socket.connect("unix:/path/to/unix-domain.socket")

将 ngx.socket.tcp() 和 connect() 组合成一个函数完成,类似于下面代码的组合

1
2
3
4
5
6
local sock = ngx.socket.tcp()
local ok, err = sock:connect(...)
if not ok then
return nil, err
end
return sock

调用此函数就不能通过 settimeout 函数来设置连接超时时间了,必须使用 lua_socket_connect_timeout 指令。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
location /tcp {
content_by_lua_block {
local sock = ngx.socket.tcp()
local ok, err = sock:connect("127.0.0.1", 8888)
if not ok then
ngx.say("failed to connect to 8888:", err)
return
end
ngx.say("successfully connected")

local bytes, err = sock:send("hello\n")

while true do
local line, err, partial = sock:receive()
if line then
ngx.say("received:", line)
ngx.flush() #为了实时看到输出
else
ngx.say("err:", err)
ngx.say("line:", line)
break;
end

end

sock:close()
}
}

接下来利用linux命令 nc 来建立 tcp 服务器,监听 8888 端口。

1
2
3
4
5
6
7
8
[root@192 ~]# nc -l 8888
hello
aaa
bbb
ccc
ddd
eee
^C

然后开始请求,此时会阻塞,上面会受到 hello,然后在上面nc命令里可以发送数据。

1
2
3
4
5
6
7
8
9
[root@192 ~]# curl localhost/tcp
successfully connected
received:aaa
received:bbb
received:ccc
received:ddd
received:eee
err:closed
line:nil

ngx.socket.udp

1
syntax: udpsock = ngx.socket.udp()

创建 udp 或者 基于数据报的 unix domain cosocket 对象。

udpsock:setpeername

1
2
3
syntax: ok, err = udpsock:setpeername(host, port)

syntax: ok, err = udpsock:setpeername("unix:/path/to/unix-domain.socket")

成功返回1,失败返回 nil。 因为 udp 是无连接,这个只是用来标记,以便接下的读取发送。

如果使用域名,那么需要配置 resolver,如果返回多个ip,那么会随机选择一个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
resolver 8.8.8.8;  # use Google's public DNS nameserver
location /test {
resolver 8.8.8.8;

content_by_lua_block {
local sock = ngx.socket.udp()
local ok, err = sock:setpeername("my.memcached.server.domain", 11211)
if not ok then
ngx.say("failed to connect to memcached: ", err)
return
end
ngx.say("successfully connected to memcached!")
sock:close()
}
}

unix domain

1
2
3
4
5
6
local sock = ngx.socket.udp()
local ok, err = sock:setpeername("unix:/tmp/some-datagram-service.sock")
if not ok then
ngx.say("failed to connect to the datagram unix domain socket: ", err)
return
end

注意,多次调用将会覆盖前面的。

udpsock:send

1
syntax: ok, err = udpsock:send(data)

发送数据,成功返回1,失败返回 nil。data可以是字符串或者是包含多个字符串的表格,如果表格里有多个字符串待发送,直接放在表格里发送可以省去在 lua 里连接字符串的开销。

udpsock:receive

1
syntax: data, err = udpsock:receive(size?)

接收数据,成功返回数据,失败返回 nil。

如果指定 size,那么 接收缓冲区大小就为 size,如果超过 8192 则会使用 8192。默认就为8192。

读取超时可以由 lua_socket_read_timeout 设置全局,也可以由 settimeout 来设置单个,后面那个优先级高。

1
2
3
4
5
6
7
sock:settimeout(1000)  -- one second timeout
local data, err = sock:receive()
if not data then
ngx.say("failed to read a packet: ", err)
return
end
ngx.say("successfully read a packet: ", data)

udpsock:close

1
syntax: ok, err = udpsock:close()

主动关闭,成功返回1,失败返回nil。

udpsock:settimeout

1
syntax: udpsock:settimeout(time)

设置读取超时时间,单位毫秒,优先级高于 lua_socket_read_timeout。