如何限制服务器的最大并发连接数

在网络编程中,我们通常用Reactor模式来处理并发连接。listening scoket是一种特殊的IO对象,当有新连接达到时,此listening文件描述符变得可读(POLLIN),epoll_wait返回这一事件。然后我们用accept(2)系统返回获得新连接的socket文件描述符。

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(('', 2006))
serversocket.listen(5)
serversocket.setblocking(0)

poll = select.poll()
poll.register(serversocket.fileno(), select.POLLIN)
connections = {}

while True:
    events = poll.poll(1000)
    for fileno, event in events:                          # (1)
        if fileno == serversocket.fileno():
            (clientsocket, address) = serversocket.accept()       # (2)
            clientsocket.setblocking(0)
            poll.register(clientsocket.fileno(), select.POLLIN)
            connections[clientsocket.fileno()] = client.socket
        elif event & select.POLLIN:
            # ...

假设(2)处返回EMFILE该如何应对?这意味着本进程的文件描述符已经达到上限,无法为新连接建立socket文件描述符。但是,既然没有scoekt文件描述符来表示连接,我们就无法close(2)它。程序继续运行,回到(1)处再一次调用epoll_wait,这时候epoll_wait会立刻返回,因为新连接还等待处理,listening fd还是可读的。这样程序就立刻陷入busy loop,CPU占用率接近100%. 这既影响同一event loop上的连接,也影响同一机器上的其他服务。

这种情况下,有以下几种解决方案:

  1. 提高进程的文件描述符数目。治标不治本。
  2. 死等。
  3. 退出程序,小题大作
  4. 关闭listening fd,那什么时候重新打开呢?
  5. 改用edge trigger,如果漏掉一次accept(2),程序再也不会收到新连接。
  6. 准备一个空闲的文件描述符,遇到这种情况,先关闭这个空闲描述符,获得一个文件描述符的名额;再accept(2)拿到新socket连接的描述符;随后立刻close(2)它,这样就优雅地断开了客户端连接;最后重新打开一个空闲文件,把”坑”站住,以备再次出现这种情况时使用。

私以为第6种方案最佳,muduo的Acceptor正是用这种方案,相关代码如下:

Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
  : loop_(loop),
    acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
    acceptChannel_(loop, acceptSocket_.fd()),
    listenning_(false),
    idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
  assert(idleFd_ >= 0);
  acceptSocket_.setReuseAddr(true);
  acceptSocket_.setReusePort(reuseport);
  acceptSocket_.bindAddress(listenAddr);
  acceptChannel_.setReadCallback(
      boost::bind(&Acceptor::handleRead, this));
}

//....


void Acceptor::handleRead()
{
  loop_->assertInLoopThread();
  InetAddress peerAddr;
  //FIXME loop until no more
  int connfd = acceptSocket_.accept(&peerAddr);
  if (connfd >= 0)
  {
    // string hostport = peerAddr.toIpPort();
    // LOG_TRACE << "Accepts of " << hostport;
    if (newConnectionCallback_)
    {
      newConnectionCallback_(connfd, peerAddr);
    }
    else
    {
      sockets::close(connfd);
    }
  }
  else
  {
    LOG_SYSERR << "in Acceptor::handleRead";
    // Read the section named "The special problem of
    // accept()ing when you can't" in libev's doc.
    // By Marc Lehmann, author of libev.
    if (errno == EMFILE)
    {
      ::close(idleFd_);
      idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
      ::close(idleFd_);
      idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
    }
  }

其中idleFd_就是这个作用。

参考

《Linux多线程服务端编程》 陈硕