Ruby Sockets编程

什么是Socket

Socket是一种双向通讯频道的端点,通过使用socket可以实现不同机器或同一台机器的不同进程之间的通讯。

Everything in Unix is a file

Unix系统的哲学是,一切皆为文件。Socket读写操作和文件读写操作类似,实际上在Ruby中Socket类就继承自IO类

> Socket.ancestors

=> [Socket, BasicSocket, IO, File::Constants, Enumerable, Object, Kernel, BasicObject]

因此socket I/O操作和文件系统类似支持以下方法

  • #read()
  • #write()
  • #close()
  • ...

创建Socket


Socket域

常用的socket域有三种

  • AF_LOCAL(AF_UNIX)
  • AF_INET
  • AF_INET6
其中AF_LOCAL用于本地进程之间通讯,AF_INET用于IPv4协议通讯,AF_INET6用于IPv6协议通讯。其中AF是address family的缩写,INET是internet的缩写。


Socket类型

socket一般有三种类型

  • SOCK_DRAM
  • SOCK_STREAM
  • SOCK_RAW
常用的是datagram和stream类型,datagram用于UDP协议,而stream用于TCP协议通讯

第一段代码

# server.rb
require 'socket'

# 新建一个域为Socket::AF_INET, 类型是Socket::STREAM的socket
server = Socket.new(:INET, :STREAM)

# 将127.0.0.1的5000端口和socket绑定
addr = Socket.pack_sockaddr_in(5000, '127.0.0.1')
server.bind(addr)

# 指定监听队列大小,`Socket::SOMAXCONN`是可以设置的最大值,不同系统值可能不一样
server.listen(128)

# `#accept`方法返回一个Socket对象和Addrinfo对象
connection, _ = server.accept
# 这里可以进行一些IO操作,比如connection.read/write
# 操作完成,关闭server
server.close



# ---------------------
# or, with syntax sugar
server = TCPServer.new('127.0.0.1', 5000)

connection = server.accept

# 这里可以进行一些IO操作,比如connection.read/write
connection.close

# client.rb

require 'socket'

client = Socket.new(:INET, :STREAM)

addr = Socket.pack_sockaddr_in(5000, '127.0.0.1')

client.connect(addr)
client.close

# -------------
# with syntax sugar
client = TCPSocket.new('127.0.0.1', 5000)

client.close

Socket client和server的生命周期

下图展示了socket客户端和服务端的生命周期

生命周期

Demo

remote_proc: 用Ruby实现了一个简易的RPC 服务端和客户端

多路复用与Non-blocking I/O

在server和client端,很多系统调用是阻塞的,例如:accept , read, write, connect

一般情况下,多路复用(multiplexing)会和non-blocking I/O一起使用。

什么是多路复用?

用通俗的语言来解释: 由于很多系统调用会阻塞,为了提升socket操作的性能,我们一般会创建多个socket连接,在某个socket链接阻塞时,我们需要有一个机制用来检测未阻塞的socket连接,多路复用就是用来检测多个(也可以是一个)socket连接的状态(read availability/write availability)是否可用。

最常用的多路复用器是IO.select,Ruby实现了这一方法。下面的代码就是使用select实现多路复用的例子:

connections = [<TCPSocket>, <TCPSocket>, <TCPSocket>]
loop do
  ready = IO.select(connections)
  readable_connections = ready[0]
  readable_connections.each do |conn|
    data = conn.read
    process(data)
  end
end

下面介绍一下,何时会发生阻塞以及如何使用非阻塞I/O:

  • read_nonblock
在使用read时,直到接收eof前,read都会阻塞。解决方法:

begin
  connection.read_nonblock
rescue Errno::EAGAIN
  IO.select([connection])
  retry
end

  • write_nonblock
由于TCP的防拥挤算法的存在,当应用调用write时,数据会被写入buffer,如果此时网络繁忙,或者buffer满了,此时write就会阻塞。解决方法:

require 'socket'
client = TCPSocket.new('localhost', 8000)
payload = "message" * 1000000
begin
  loop do
    bytes = client.write_nonblock(payload)
    break if bytes >= payload.size
    payload.slice!(0, bytes)
    IO.select(nil, [client])
  end
rescue Errno::EAGAIN
  IO.select(nil, [client])
  retry
end

  • accept_nonblock
当监听队列满时,accept会阻塞。解决方法:

loop do
  begin
    connection = server.accept_nonblock
  rescue Errno::EAGAIN
    retry
  end
end

  • connect_nonblock
连接建立通常需要一定时间,在client和server之间的连接成功建立前,connect会阻塞。解决方法:

require 'socket'
client = Socket.new(:INET, :STREAM)
addr = Socket.pack_sockaddr_in(5000, '127.0.0.1')
begin
  client.connect_nonblock(addr)
rescue Errno::EINPROGRESS
  IO.select(nil, [client])
  begin
    client.connect_nonblock(addr)
  rescue Errno::EISCONN
    # success
  rescue Errno::ECONNREFUSED
    # 连接拒绝
  end
end


需要注意的是,多路复用机制有很多种,select是其中最低效的一种。其他的多路复用机制还有poll(2), epoll(2), kqueue(2)。这里暂不介绍这几种的实现机制。

其他资源

  • ZeroMQ 一个基于Socket实现的MQ
  • Nio4r Java生态NIO的Ruby版本