January Star
  • Home
  • Categories
  • Tags
  • Archives

fnmatch标准库源码学习

Contents

  • 简介
  • translate
  • fnmatch&&fnmatchcase
  • filter
  • 总结

简介

fnmatch 库的还是很简单的,代码加注释才 100 行多一点。

它主要做的工作就是比较一个给定的文件名和给定的模式字符串,判断两者是否相等。

然后围绕这个功能,提供一系列的相关函数。

  • filter

    输入 一个文件名列表和一个指定模式(仅支持 Shell 通配符)
    输出 一个符合指定模式的文件名列表
  • fnmatch

    输入 一个文件名和一个指定模式(仅支持 Shell 通配符)
    输出 该文件名是否匹配指定模式
    说明 具体行为和平台相关,比如某些平台忽略大小写等等
  • fnmatchcase

    输入 一个文件名和一个指定模式(仅支持 Shell 通配符)
    输出 该文件名是否匹配指定模式
    说明 大小写敏感
  • translate

    输入 Shell 通配符格式的模式字符串
    输出 正则格式的模式字符串

translate

至于如何判断两者相等,它的做法是:

使用 正则表达式

既然要使用 正则表达式 ,那么就要考虑如何将通配符表达式替换成正则表达式。

这个功能就是由 translate 函数来实现的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def ntranslate(pat):
    """Translate a shell PATTERN to a regular expression.

    There is no way to quote meta-characters.
    """

    i, n = 0, len(pat)
    res = ''
    while i < n:
        c = pat[i]
        i = i+1
        if c == '*':
            res = res + '.*'
        elif c == '?':
            res = res + '.'
        elif c == '[':
            j = i
            if j < n and pat[j] == '!':
                j = j+1
            if j < n and pat[j] == ']':
                j = j+1
            while j < n and pat[j] != ']':
                j = j+1
            if j >= n:
                res = res + '\\['
            else:
                stuff = pat[i:j].replace('\\','\\\\')
                i = j+1
                if stuff[0] == '!':
                    stuff = '^' + stuff[1:]
                elif stuff[0] == '^':
                    stuff = '\\' + stuff
                res = '%s[%s]' % (res, stuff)
        else:
            res = res + re.escape(c)
    return res + '\Z(?ms)'

上面的代码明显是命令式编程的思路。那我想换一种思路,用函数式编程来重写这个函数。

我们首先会考虑这个 translate 函数的基本功能是什么?

将通配符表达式转换成正则表达式。

那么它们俩的区别是什么?

通配符表达式的 * 在正则表达式中为 .*

通配符表达式的 ? 在正则表达式中为 .

通配符表达式的 [XXX] 在正则表达式中为 [XXX]

通配符表达式的 [!XXX] 、 [^XXX] 在正则表达式中为 [^XXX]

通配符表达式的其它字符, 在正则表达式中都需要 re.escape 进行转义

好的,上面分析的几种情况,都可以通过首个字符来判断其接下来的处理属于哪一类,但是 [XXX] 这种类型,还要包括后面的好几个字符,所以处理完这种类型需要知道剩下的未处理字符串。

大家都统一一下接口就提取出如下几个辅助函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
trans_default = lambda c, left_pat: (re.escape(c), left_pat)
trans_asterisk = lambda c, left_pat: ('.*', left_pat)
trans_questionmark = lambda c, left_pat: ('.', left_pat)
def trans_choice(c, left_pat):
    # 这个函数最初也是用递归实现,然后再转换成 while 循环 
    if not left_pat:
        return ('\\' + c, '')

    res = c
    while left_pat:
        c, left_pat = left_pat[0], left_pat[1:]
        if c in ('!', '^'):
            res += '\\^'
        elif c == '\\':
            res += '\\\\'
        elif c == ']':
            res += ']'
            break
        else:
            res += c

    return (res, left_pat) if ']' in res else ('\\' + res, left_pat)

我们再将上面的思路转换为递归代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
TRANS_MAP = {
    "*": trans_asterisk,
    "?": trans_questionmark,
    "[": trans_choice,
}

def _translate(pat):
    if not pat:
        return ''

    c, left_pat = pat[0], pat[1:]
    trans_func = TRANS_MAP.get(c, trans_default)
    res_part, left_pat = trans_func(c, left_pat)
    return res_part + _translate(left_pat)

def translate(pat):
    res = _translate(pat)
    return res + '\Z(?ms)'

递归的效率是比较低的,但是我们可以转换成尾递归形式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
TRANS_MAP = {
    "*": trans_asterisk,
    "?": trans_questionmark,
    "[": trans_choice,
}

def _translate(pat, res):
    if not pat:
        return res

    c, left_pat = pat[0], pat[1:]
    handle_func = TRANS_MAP.get(c, trans_default)
    res_part, left_pat = handle_func(c, left_pat)
    return _translate(left_pat, res + res_part)

def translate(pat):
    res = _translate(pat, '')
    return res + '\Z(?ms)'

好了,如果 Python 的解释器支持尾递归,到上面一步就可以了。

因为支持尾递归就代表解释器会自动将尾递归转换成迭代的形式。

但是我们必须手工做这一步,不过这也只是手到摛来的事了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
TRANS_MAP = {
    "*": trans_asterisk,
    "?": trans_questionmark,
    "[": trans_choice,
}

def translate(pat):
    if not pat:
        return ''

    def _yield_res():
        left_pat = pat
        while 1:
            c, left_pat = left_pat[0], left_pat[1:]
            handle_func = TRANS_MAP.get(c, trans_default)
            res_part, left_pat = handle_func(c, left_pat)
            yield res_part
            if not left_pat:
                break

    return ''.join(_yield_res()) + '\Z(?ms)'

fnmatch&&fnmatchcase

fnmatch 本质上其实就是调用 fnmatchcase 来实现的。只不过在调用之前使用了 os.path.normcase 方法来消除平台差异而已。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def fnmatchcase(name, pat):
    """Test whether FILENAME matches PATTERN, including case.

    This is a version of fnmatch() which doesn't case-normalize
    its arguments.
    """

    if not pat in _cache:
        res = translate(pat)
        if len(_cache) >= _MAXCACHE:
            _cache.clear()
        _cache[pat] = re.compile(res)
     return _cache[pat].match(name) is not None

看了代码就知道它的实现太简单了,将参数中的模式字符串调用 translate 函数转换成正则字符串,然后直接使用正则表达式对象的 match 方法就完事了。反而是使用了 Cache 这个非主要功能,占用了比较多的代码。

接下来咱们重新整理一下代码。

  1. Cache 操作直接和函数的主要逻辑混在一起

    既然使用到 Cache,那么操作 Cache 的具体代码就要封装起来了啊。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    class LimitedCache(object):
    
        def __init__(self, size=100):
            super(LimitedCache, self).__init__()
            self.__max_size = size
            self.__cache = {}
    
        def clear(self):
            self.__cache.clear()
    
        def is_full(self):
            cache_size = len(self.__cache)
            return cache_size >= self.__max_size
    
        def get(self, key):
            value = self.__cache.get(key, None)
            self.is_full() and self.clear()
            return value
    
        def put(self, key, value):
            self.__cache[key] = value
    
    CACHE = LimitedCache(size=100)
    
    def fnmatchcase(name, pat):
        def _cache_pat(pat):
            re_pat = translate(pat)
            re_obj = re.compile(re_pat)
            CACHE.put(ret_obj)
            return re_obj
    
        re_obj = CACHE.get(pat) or _cache_pat(pat)
        return re_obj.match(name) is not None
    

    这样看 fnmatchcase 函数是不是比较原来的版本简单明了许多。

  2. Cache 功能好像也不是 fnmatchcase 的主要功能吧。

    既然不是主要功能,那么我们就不能和实现主要功能的代码混在一块。

    那怎么实现?

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    class LimitedCache(object):
    
        def __init__(self, size=100):
            super(LimitedCache, self).__init__()
            self.__max_size = size
            self.__cache = {}
    
        def clear(self):
            self.__cache.clear()
    
        def is_full(self):
            cache_size = len(self.__cache)
            return cache_size >= self.__max_size
    
        def get(self, key):
            value = self.__cache.get(key, None)
            self.is_full() and self.clear()
            return value
    
        def put(self, key, value):
            self.__cache[key] = value
    
    CACHE = LimitedCache(size=100)
    
    def cache_pat(func):
        def _cache_pat(pat):
            re_pat = translate(pat)
            re_obj = re.compile(re_pat)
            CACHE.put(re_obj)
            return re_obj
    
        return lambda name, pat: func(name, CACHE.get(pat) or _cache_pat(pat))
    
    fnmatchcase = cache_pat(lambda name, re_obj: re_obj.match(name) is not None)
    # 版本 2
    # def fnmatchcase(name, pat):
    #     def _fnmatchcase(name, re_obj):
    #         return re_obj.match(name) is not None
    #     return cache_pat(_fnmatchcase)(name, pat)
    #
    # 版本 3(问题:修改了 fnmatchcase 的参数)
    # @cache_pat
    # def fnmatchcase(name, re_obj):
    #      return re_obj.match(name) is not None
    

    嗯,这样就差不多了。

filter

filter 也比较简单, 其实就是对一系列的文件名进行 fnmatchcase 判断,只保留匹配正确的文件名而已。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def filter(names, pat):
    """Return the subset of the list NAMES that match PAT"""
    import os,posixpath
    result=[]
    pat=os.path.normcase(pat)
    if not pat in _cache:
        res = translate(pat)
        if len(_cache) >= _MAXCACHE:
            _cache.clear()
        _cache[pat] = re.compile(res)
    match=_cache[pat].match
    if os.path is posixpath:
        # normcase on posix is NOP. Optimize it away from the loop.
        for name in names:
            if match(name):
                result.append(name)
    else:
        for name in names:
            if match(os.path.normcase(name)):
                result.append(name)
    return result

filter 内部的主要逻辑和 fnmatchcase 差不多么,为什么不复用?

1
2
3
4
5
6
7
8
9
def fnfilter(names, pat):
    return [n for n in names if fnmatch(n)]
    # 版本 2
    # return filter(fnmatch, names)

def fnfiltercase(names, pat):
    return [n for n in names if fnmatchcase(n)]
    # 版本 2
    # return filter(fnmatchcase, names)

怎么样,比原来的代码清晰太多了吧。

总结

  1. 你是依据什么原则来吐槽的?

    一个函数只做一件事。

  2. Cache 值为什么设为 100?

    之间我看过相关文章说,Python 源码里的某些具体数值是经过大量实践统计出来,我估计这个数值可能也是统计出来的,当然也可能是写个模块的作者个人喜好吧。

    PS:要是我写,我就写 128,哈哈。

  3. 能写 Python 标准库的作者水平应该不错,为什么你会挑出这么多毛病?

    一般写标准库需要考虑:

    1. 库之间的尽量不要有依赖
    2. 效率要尽量高
    3. 版本兼容性
    4. ... ...

    LimitedCache 一方面是为了封装高内聚的操作,一方面也是为了重用。但这两方面在标准库代码中都不是首要的,你封装了一层,那肯定要多调一层代码,效率就降低了,库之间尽量不要有依赖,那重用这一块就更不用提了。

    filter 如果是我那种实现的话,就要每次从 Cache 中取一次 pat。而原来的代码只要在循环开始之前取一次即可。不过话又说回来了,我再提供一个不用 Cache 的 fnmatchcase 版本不就行了。

    PS:这些都是我自己想的,当然有可能作者觉得这几个功能太简单,没必要将代码设计的太复杂。 Simple is better than complex. , 嘿嘿。

Comments
comments powered by Disqus

Published

Sep 1, 2014

Category

Python

Tags

  • python 23
  • stdlibs 15

Contact

  • Powered by Pelican. Theme: Elegant by Talha Mansoor