January Star
  • Home
  • Categories
  • Tags
  • Archives

SocketServer标准库源码学习

Contents

  • 简介
    • Server: 实现一个服务
    • MixIn: 让你的服务支持多线程 / 多进程
    • RequestHandler: 如何实现请求处理
    • 未解决问题
  • _eintr_retry
    • 慢系统调用 (slow system call)
    • EINTR 错误的产生
  • Server
    • BaseServer
    • TCPServer
      • allow_reuse_address
      • socket 的 close&&shutdown
        • close
        • shutdown
        • 总结
    • UDPServer
      • get_request
      • class UDPServer(TCPServer)
  • MixIn
    • ForkingMixIn
      • os.waitpid(0, 0)
      • os._exit
    • ThreadingMixIn
    • 其它
  • RequestHandler
    • BaseRequestHandler
    • StreamRequestHandler
      • rbufsize&&wbufsize
      • Nagle 算法
      • ECONNABORTED
    • DatagramRequestHandler
      • self.packet, self.socket = self.request

简介

本模块的代码基本分为以下三大块功能。

Server: 实现一个服务

Server 本身只负责搭建整个服务的框架。

至于服务本身如何处理客户端上来的请求,是通过 Server 绑定的 RequestHandler 来进行处理的。

本模块 Server 类的继承关系

+------------+
| BaseServer |
+------------+
      |
      v
+-----------+        +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+        +------------------+
      |
      v
+-----------+        +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+        +--------------------+

MixIn: 让你的服务支持多线程 / 多进程

该模块的代码还用到了 Python 的高级编程技巧 Mixin。

如果你想让你的服务支持多线程 / 多进程,你可以使用如下代码:

1
2
3
4
5
class ThreadingTCPServer(ThreadingMixIn, TCPServer):
    pass

class ThreadingUDPServer(ThreadingMixIn, UDPServer):
    pass

当然你真正使用时不需要写这些代码,它们已经在该模块内已经实现了。

Note

ThreadingMixIn 会覆盖 TCPServer 、 UDPServer 某些特定的方法,以提供额外的多线程功能。

所以 ThreadingMixIn 必须要放在前面,这是由 Python 对于属性 / 方法的查找算法决定。

RequestHandler: 如何实现请求处理

本模块提供的 RequestHandler 基本有两种用法 。

  1. 继承 BaseRequestHandler 类,重载它的 handle() 方法,
  2. 直接使用本模块封装好的 StreamRequestHandler 和 DatagramRequestHandler, 并重载它的 handle() 方法。

未解决问题

目前无法处理带外 (out-of-band) 数据

Tip

我对此不是很了解,谁感兴趣可以自行 百度百科:带外数据

_eintr_retry

1
2
3
4
5
6
7
8
def _eintr_retry(func, *args):
    """ 忽略除 EINTR 错误外所有其它错误 """
    while True:
        try:
            return func(*args)
        except (OSError, select.error) as e:
            if e.args[0] != errno.EINTR:
                raise

下面是搜索到的有关 EINTR 错误的一些说明。

慢系统调用 (slow system call)

此术语适用于那些可能永远阻塞的系统调用。

永远阻塞的系统调用是指调用有可能永远无法返回,多数网络支持函数都属于这一类。如:若没有客户连接到服务器上,那么服务器的 accept 调用就没有返回的保证。

EINTR 错误的产生

当阻塞于某个慢系统调用的一个进程捕获某个信号且相应信号处理函数返回时,该系统调用可能返回一个 EINTR 错误。

例如:在 socket 服务器端,设置了信号捕获机制,有子进程,当在父进程阻塞于慢系统调用时由父进程捕获到了一个有效信号时,内核会致使 accept 返回一个 EINTR 错误 ( 被中断的系统调用 )。

当碰到 EINTR 错误的时候,可以采取有一些可以重启的系统调用要进行重启,而对于有一些系统调用是不能够重启的。

例如:accept、read、write、select、和 open 之类的函数来说,是可以进行重启的。

不过对于套接字编程中的 connect 函数我们是不能重启的,若 connect 函数返回一个 EINTR 错误的时候,我们不能再次调用它,否则将立即返回一个错误。

针对 connect 不能重启的处理方法是,必须调用 select 来等待连接完成。

Server

BaseServer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class BaseServer:
    timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        self.server_address = server_address # 服务绑定的地址 
        self.RequestHandlerClass = RequestHandlerClass # 处理请求的类 
        # 下面这两个属性主要用来给 self.shutdown 方法使用 
        self.__is_shut_down = threading.Event() # 服务结束通知 
        self.__shutdown_request = False # 服务已关闭的标志位 

    def server_activate(self):
        # 通过 `__init__` 调用,可被重载。
        pass

    def serve_forever(self, poll_interval=0.5):
        """ 一直进行监听及处理请求操作,直到服务被关闭 
        self.timeout 对本方法不起作用,如果想使用 self.timeout,可以使用 handle_request
        """
        self.__is_shut_down.clear()
        try:
            while not self.__shutdown_request:
                r, w, e = _eintr_retry(select.select, [self], [], [],
                                       poll_interval)
                if self in r:
                    self._handle_request_noblock()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

    def shutdown(self):
        # 停止服务,该方法必须与 `serve_forever` 在不同的线程中调用,不然会造成死锁 
        self.__shutdown_request = True
        self.__is_shut_down.wait()

    def handle_request(self):
        # 处理请求,如果你不是使用 serve_forever 来启动服务的话,
        # 可以通过 handle_request 来定制自己处理服务的方式 
        timeout = self.socket.gettimeout()
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)
        fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
        if not fd_sets[0]:
            self.handle_timeout()
            return
        self._handle_request_noblock()

    def _handle_request_noblock(self):
        # 在调用本方法之前,已经确认有客户端连接上来,
        # 所以调用 self.get_request 方法时不会阻塞 
        try:
            request, client_address = self.get_request()
        except socket.error:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)

    def handle_timeout(self):
        # 超时之后,还没有新的请求则会调用该方法。
        pass

    def verify_request(self, request, client_address):
        # 对请求进行校验,如果校验成功,必须返回 True
        return True

    def process_request(self, request, client_address):
        # 处理请求 
        self.finish_request(request, client_address)
        self.shutdown_request(request)

    def server_close(self):
        # 用于服务停止后的后处理 
        pass

    def finish_request(self, request, client_address):
        self.RequestHandlerClass(request, client_address, self)

    def shutdown_request(self, request):
        self.close_request(request)

    def close_request(self, request):
        pass

    def handle_error(self, request, client_address):
        # 优雅地处理 error,可被重载。
        # 默认的行为是打印出出错的异常信息,然后继续处理请求。
        # 注意:print 输出的信息会输出到 stdout, 异常信息会输出到 stderr,蛋疼 
        print '-'*40
        print 'Exception happened during processing of request from',
        print client_address
        import traceback
        traceback.print_exc() # 此处的信息会输出到 stderr
        print '-'*40

TCPServer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class TCPServer(BaseServer):

    address_family = socket.AF_INET # TCP 对应的 socket 簇 

    socket_type = socket.SOCK_STREAM # TCP 对应的 socket 类型 

    request_queue_size = 5 # 等待连接队列的最大长度,术语 `backlog`

    allow_reuse_address = False

    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family, self.socket_type)
        if bind_and_activate: # 是否在服务实例化是时就激活地址绑定及监听操作 
            self.server_bind()
            self.server_activate()

    def server_bind(self):
        """ 服务端 socket 地址绑定 """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

    def server_activate(self):
        """ 服务端启用监听操作 """
        self.socket.listen(self.request_queue_size)

    def server_close(self):
        self.socket.close()

    def fileno(self):
        """ 返回 socket 文件句柄数 

        给 `serve_forever` 中的 `select.select` 操作用 
        """
        return self.socket.fileno()

    def get_request(self):
        """ 获取一个有请求需要处理的 socket"""
        return self.socket.accept()

    def shutdown_request(self, request):
        try:
            request.shutdown(socket.SHUT_WR)
        except socket.error:
            # 某些平台会抛出 `ENOTCONN` 异常 
            pass
        self.close_request(request)

    def close_request(self, request):
        request.close()

allow_reuse_address

指示套接字是否应该重用地址。

在程序终止之后 , 如果需要在同一个端口立即重启服务器 , 那么该设置会很有用(其他情况下 , 你必须等待几分钟)。

socket 的 close&&shutdown

close
1
2
3
#include<unistd.h>

int close(int fd);

关闭本进程的 socketfd,但链接还是开着的,用这个 socketfd 的其它进程还能用这个链接,能读或写这个 socketfd。

shutdown
1
2
3
#include <sys/socket.h>

int shutdown(int sockfd,int how);

how 的方式有三种分别是

SHUT_RD(0):关闭 sockfd 上的读功能,此选项将不允许 sockfd 进行读操作。

SHUT_WR(1):关闭 sockfd 的写功能,此选项将不允许 sockfd 进行写操作。

SHUT_RDWR(2):关闭 sockfd 的读写功能。

则破坏了 socket 连接,读的时候可能侦探到 EOF 结束符,写的时候可能会收到一个 SIGPIPE 信号,这个信号可能直到 socket buffer 被填充了才收到。

总结

shutdown 算是比较优雅(更细粒度)地关闭 socket 连接,但是不释放本地资源(socketfd)。

close 则比较简单粗暴一点,直接释放 socketfd。但它也不是立即释放 socketfd,socketfd 有一个引用计数的问题,当一个 socketfd 的引用计数为 0 时,系统才会进行释放。

UDPServer

UDP 是面向无连接的 socket。

既然是无连接的,那就没有监控的操作,以及关闭连接的操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class UDPServer(TCPServer):

    allow_reuse_address = False

    socket_type = socket.SOCK_DGRAM # UDP 对应的 socket 类型 

    max_packet_size = 8192 # 每次接收数据的最大长度 

    def get_request(self):
        data, client_addr = self.socket.recvfrom(self.max_packet_size)
        return (data, self.socket), client_addr

    def server_activate(self):
        pass

    def shutdown_request(self, request):
        self.close_request(request)

    def close_request(self, request):
        pass

get_request

该方法的返回值和 TCPServer 不一样。

它是在 Server 框架里面就获取到了数据,而不是在 RequestHandler 中来进行数据的获取。

具体原因下面的 DatagramRequestHandler 章节会有提到。

class UDPServer(TCPServer)

这种继承方式有点不合理,感觉是纯粹为了代码复用而进行的继承。

最好让 TCPServer 和 UDPServer 共同继承自同一个模板基类。

它们的相同代码全在模板类中,不同代码在各自的类中。

MixIn

ForkingMixIn

使用多进程来处理请求,一个进程对应一个请求。

由于一个进程占用的系统资源比较多,且操作系统都会限制进程数,所以不能无限制的开启进程,代码中对应进程数进行了限制,不过这样也限制了在多进程模式下,并发处理请求的能力。

Note

该功能只支持 Unix 平台,在 Windows 上不可用,具体原因看下面的注释。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class ForkingMixIn:
    timeout = 300
    active_children = None # 当前正在处理请求的子进程 PID 集合 
    max_children = 40 # 支持的最大并发请求数 

    def collect_children(self):
        """ 回收子进程 """
        if self.active_children is None: return
        # 如果子进程数超过上限,则一直等待,直到子进程数低于上限 
        while len(self.active_children) >= self.max_children:
            # 阻塞模式:等待当前进程组的任何子进程的返回值,
            # 注意:不仅仅是通过 ForkingMixIn 生成的子进程。
            try:
                pid, status = os.waitpid(0, 0)
            except os.error:
                pid = None
            # 如果不是 ForkingMixIn 生成的子进程,则忽略掉 
            if pid not in self.active_children: continue
            self.active_children.remove(pid)

        # 下面代码主要是检查是否有已经退出的子进程,如果有则将之对应的保存的 pid 删除掉 
        # 作者在这里说,下面的代码中进行了太多次的系统调用,
        # 原本可以通过将所有子进程设置成一个进程组,
        # 然后只需要使用 os.waitpid(pgid) 获取该进程组内任意结束的子进程 ID,
        # 但是他没有办法获取到一个不冲突的进程组 ID。

        # PS: 虽说对一个列表迭代时同时删除列表中的元素不是不可以,
        # 但是这样做太容易出错,一定要小心处理。
        # 对一个字典边迭代边删除就会抛异常了,具体原因涉及到 Python 字典的实现原理,细节暂不表了。
        for child in self.active_children:
            try:
                # 如果没有任何子进程退出则立即返回 (0, 0)
                # 有,则返回该子进程的 pid,及返回值 
                # 相当于非阻塞模式 
                pid, status = os.waitpid(child, os.WNOHANG)
            except os.error:
                pid = None
            if not pid: continue
            try:
                self.active_children.remove(pid)
            except ValueError, e:
                raise ValueError('%s. x=%d and list=%r' % (e.message, pid,
                                                           self.active_children))

    def handle_timeout(self):
        self.collect_children()

    def process_request(self, request, client_address):
        self.collect_children()
        pid = os.fork() # fork 只在 Unix 平台上有效 
        if pid: # 父进程 
            if self.active_children is None:
                self.active_children = []
            self.active_children.append(pid)
            self.close_request(request) # 父进程只负责生成子进程来处理请求,本身不会处理请求 
            return
        else: # 子进程 
            try:
                self.finish_request(request, client_address)
                self.shutdown_request(request)
                os._exit(0)
            except:
                try:
                    self.handle_error(request, client_address)
                    self.shutdown_request(request)
                finally:
                    os._exit(1)
            # Python2.5 版本之前不支持 except/finally 同时使用 
            # 所以才有这种比较蛋疼的写法 

os.waitpid(0, 0)

表示等待并获取当前进程组中的任何子进程的返回值。

关于该函数的具体说明参见 os.waitpid

os._exit

它会将进程直接终止,之后的所有代码都不会继续执行。

该函数和 sys.exit 有点不一样, sys.exit 会抛出 SystemExit 异常,如果捕获该异常,Python 还是会继续执行捕获该异常的代码的。

具体可以参见: The difference between exit() and sys.exit() in python?

ThreadingMixIn

使用线程来处理请求,一个请求对应一个线程。

线程相对进程来说比较轻量,所以代码里没有对应线程数进行控制,不过这里还是有问题的,如果短时间内有大量的并发请求,生成大量的线程,Python 会 Hold 不住,Python 的性能会急剧下降(大量的 CPU 时间用于线程切换了),到那时候基本就是拒绝服务了。

所以 ThreadingMinIn 多用于性能要求不高的环境,或者使用 gevent 来 monkey patch 一下,性能会有较大的提升。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class ThreadingMixIn:
    # 可以设置该属性来决定当主线程退出时,处理请求的子线程是否也随之退出 
    daemon_threads = False

    def process_request_thread(self, request, client_address):
        try:
            self.finish_request(request, client_address)
            self.shutdown_request(request)
        except:
            self.handle_error(request, client_address)
            self.shutdown_request(request)
        # 此处不用 finally 来调用:self.shutdown_request(request)
        # 是因为 Python2.5 之前版本不支持 except && finally 同时使用。

    def process_request(self, request, client_address):
        # 重载了 process_request 方法,生成一个线程来处理请求 
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        t.start()

其它

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class ForkingUDPServer(ForkingMixIn, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass

class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

if hasattr(socket, 'AF_UNIX'): # Windows 平台不支持 unix domain socket

    class UnixStreamServer(TCPServer):
        address_family = socket.AF_UNIX

    class UnixDatagramServer(UDPServer):
        address_family = socket.AF_UNIX

    class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass

    class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass

该模块提供了现成的支持多线程 / 多进程的服务类,但是当你使用基于 TCP/UDP 的上层协议的服务类时,

为了要支持多线程 / 多进程,你还是要自己手写的。

1
2
3
4
from SimpleXMLRPCServer import SimpleXMLRPCServer

class ThreadXMLRPCServer(ThreadingMixIn, SimpleXMLRPCServer):
   pass

RequestHandler

BaseRequestHandler 定义了基本的请求处理框架。

StreamRequestHandler 和 DatagramRequestHandler 都将 socket 的数据读写操作抽象成了文件的读写。

BaseRequestHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class BaseRequestHandler:

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

StreamRequestHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class StreamRequestHandler(BaseRequestHandler):

    rbufsize = -1 # 默认有缓存 
    wbufsize = 0 # 默认无缓存 

    timeout = None

    # Nagle 算法开关,最好仅在 wbufsize != 0 时打开,用来避免传递大量小的数据包。
    disable_nagle_algorithm = False

    def setup(self):
        self.connection = self.request
        if self.timeout is not None:
            self.connection.settimeout(self.timeout)
        if self.disable_nagle_algorithm:
            self.connection.setsockopt(socket.IPPROTO_TCP,
                                       socket.TCP_NODELAY, True)
        self.rfile = self.connection.makefile('rb', self.rbufsize)
        self.wfile = self.connection.makefile('wb', self.wbufsize)

    def finish(self):
        if not self.wfile.closed:
            try:
                self.wfile.flush()
            except socket.error:
                # socket 结束时可能会有错误发生,比如 `ECONNABORTED`
                pass
        self.wfile.close()
        self.rfile.close()

rbufsize&&wbufsize

rbufsize: 如果读取大量数据时,没有缓存会导致处理速度很慢。

wbufsize: 在写入大量数据时,底层的 stdio 会进行优化。

Nagle 算法

TCP/IP 协议中,无论发送多少数据,总是要在数据前面加上协议头,同时,对方接收到数据,也需要发送 ACK 表示确认。为了尽可能的利用网络带宽,TCP 总是希望尽可能的发送足够大的数据。(一个连接会设置 MSS 参数,因此,TCP/IP 希望每次都能够以 MSS 尺寸的数据块来发送数据)。

Nagle 算法就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。

想具体了解该算法请参见 Nagle 算法

ECONNABORTED

该错误被描述为 ”software caused connection abort”,即 “ 软件引起的连接中止 ”。

原因在于当服务和客户进程在完成用于 TCP 连接的 “ 三次握手 ” 后,客户 TCP 却发送了一个 RST (复位)分节,在服务进程看来,就在该连接已由 TCP 排队,等着服务进程调用 accept 的时候 RST 却到达了。POSIX 规定此时的 errno 值必须 ECONNABORTED。

源自 Berkeley 的实现完全在内核中处理中止的连接,服务进程将永远不知道该中止的发生。服务器进程一般可以忽略该错误,直接再次调用 accept。

DatagramRequestHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class DatagramRequestHandler(BaseRequestHandler):

    def setup(self):
        try:
            from cStringIO import StringIO
        except ImportError:
            from StringIO import StringIO
        self.packet, self.socket = self.request
        self.rfile = StringIO(self.packet)
        self.wfile = StringIO()

    def finish(self):
        self.socket.sendto(self.wfile.getvalue(), self.client_address)

self.packet, self.socket = self.request

这里的 self.request 对象是一个元组,第一个是从 self.socket 中获取的数据,第二个为 self.socket。

至于为什么不在 DatagramRequestHandler 中直接获取数据,作者在注释里作了如下说明 :

# XXX Regrettably, I cannot get this working on Linux; # s.recvfrom() doesn't return a meaningful client address.
Comments
comments powered by Disqus

Published

Nov 19, 2014

Category

Python

Tags

  • python 23
  • stdlibs 15

Contact

  • Powered by Pelican. Theme: Elegant by Talha Mansoor