Contents
简介
StringIO 有一个 C 语言实现的版本 cStringIO 。
C 语言版本更快,但不是能创建子类,因为 C 语言版本实现为一个方法了。
StringIO 主要是用来模拟一个 file 对象的行为的。
代码注释里说
Using a real file is often faster (but less convenient).
之前我还困惑 StringIO 不是相当于一个内存对象么,真实的文件对象需要操作磁盘啊,怎么反而更快?
后来我在 Stack Overflow 上问一下,原来是指代码执行的速度, Python 的内置文件对象是用 C 语言实现的。而 StringIO 是用 Python 代码实现的,当然慢了。
代码注释里还说
Seeking far beyond EOF and then writing will insert real null bytes that occupy space in the buffer.
这个好理解,我直接上代码就清楚了。
1 2 3 4 5 6 7 8 9 10 | In [130]: from StringIO import StringIO
In [131]: s = StringIO("abc")
In [132]: s.seek(10)
In [133]: s.write("end")
In [134]: s.getvalue()
Out[134]: 'abc\x00\x00\x00\x00\x00\x00\x00end'
|
_complain_ifclosed
1 2 3 | def _complain_ifclosed(closed):
if closed:
raise ValueError, "I/O operation on closed file"
|
这个方法是 StringIO 的辅助方法,用来检查当前操作的 StringIO 是否已经关闭,如果关闭,则直接抛出异常。
在接下来的代码中,你会看到在多个方法里面手工调用该函数。
其实,这里可以考虑先定义一个元类,针对那些需要调用该函数的方法,先在元类里面就统一调用一下,那么 StringIO 就不用关心当前的状态是否为开还是关了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | class ClosedStatusChecker(type):
def __init__(cls, name, bases, dct):
super(ClosedStatusChecker, cls).__init__(name, bases, dct)
need_check_funcs = dct.pop("need_check_funcs", None)
if need_check_funcs:
def check_func(fname):
def _check_func(self, *args, **kwargs):
if self.closed:
raise ValueError("I/O operation on closed file")
return dct[fname](self, *args, **kwargs)
return _check_func
for fname in need_check_funcs:
setattr(cls, fname, check_func(fname))
class StringIO:
__metaclass__ = ClosedStatusChecker
need_check_funcs = ("next", "isatty", ... ...)
def __init__(self, buf = ''):
if not isinstance(buf, basestring):
buf = str(buf)
self.buf = buf
self.len = len(buf)
self.buflist = []
self.pos = 0
self.closed = False
self.softspace = 0
def next(self):
r = self.readline()
if not r:
raise StopIteration
return r
def isatty(self):
return False
... ...
|
StringIO
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 | class StringIO:
"""class StringIO([buffer])
使用 StringIO 时要注意:不能混用 Unicode 字符串和 8 字节字符串。
虽然 StringIO 对它们都支持,但是是同时只能支持一种。
"""
def __init__(self, buf = ''):
# 如果传入的不是一个字符串,直接简单粗暴地将之转换成字符串
# 刚开始看到这个 buffer,还以为可以传入一个生成字符串的生成器什么的
if not isinstance(buf, basestring):
buf = str(buf)
self.buf = buf
self.len = len(buf)
# 这个列表主要是写入数据的时候,先保存在该列表里
# 到真实用时,再转换成 self.buf
# 算是减少两个字符串相加的操作,也就是减少内存分配
self.buflist = []
self.pos = 0
self.closed = False
self.softspace = 0
def __iter__(self):
# 这个不用解释了,迭代器协议要求的接口
return self
def next(self):
# 这个也是迭代器协议要求的接口,
_complain_ifclosed(self.closed)
r = self.readline()
if not r:
raise StopIteration
return r
def close(self):
"""Free the memory buffer.
"""
if not self.closed:
self.closed = True
del self.buf, self.pos
def isatty(self):
"""Returns False because StringIO objects are not connected to a
tty-like device.
"""
_complain_ifclosed(self.closed)
return False
def seek(self, pos, mode = 0):
"""Set the file's current position.
The mode argument is optional and defaults to 0 (absolute file
positioning); other values are 1 (seek relative to the current
position) and 2 (seek relative to the file's end).
There is no return value.
"""
_complain_ifclosed(self.closed)
if self.buflist:
self.buf += ''.join(self.buflist)
self.buflist = []
if mode == 1:
pos += self.pos
elif mode == 2:
pos += self.len
self.pos = max(0, pos)
def tell(self):
"""Return the file's current position."""
_complain_ifclosed(self.closed)
return self.pos
def read(self, n = -1):
"""Read at most size bytes from the file
(less if the read hits EOF before obtaining size bytes).
If the size argument is negative or omitted, read all data until EOF
is reached. The bytes are returned as a string object. An empty
string is returned when EOF is encountered immediately.
"""
_complain_ifclosed(self.closed)
if self.buflist:
self.buf += ''.join(self.buflist)
self.buflist = []
if n is None or n < 0:
newpos = self.len
else:
newpos = min(self.pos+n, self.len)
r = self.buf[self.pos:newpos]
self.pos = newpos
return r
def readline(self, length=None):
r"""Read one entire line from the file.
A trailing newline character is kept in the string (but may be absent
when a file ends with an incomplete line). If the size argument is
present and non-negative, it is a maximum byte count (including the
trailing newline) and an incomplete line may be returned.
An empty string is returned only when EOF is encountered immediately.
Note: Unlike stdio's fgets(), the returned string contains null
characters ('\0') if they occurred in the input.
"""
_complain_ifclosed(self.closed)
if self.buflist:
self.buf += ''.join(self.buflist)
self.buflist = []
i = self.buf.find('\n', self.pos)
if i < 0:
newpos = self.len
else:
newpos = i+1
if length is not None and length >= 0:
if self.pos + length < newpos:
newpos = self.pos + length
r = self.buf[self.pos:newpos]
self.pos = newpos
return r
def readlines(self, sizehint = 0):
"""Read until EOF using readline() and return a list containing the
lines thus read.
If the optional sizehint argument is present, instead of reading up
to EOF, whole lines totalling approximately sizehint bytes (or more
to accommodate a final whole line).
"""
total = 0
lines = []
line = self.readline()
while line:
lines.append(line)
total += len(line)
if 0 < sizehint <= total:
break
line = self.readline()
return lines
def truncate(self, size=None):
"""Truncate the file's size.
If the optional size argument is present, the file is truncated to
(at most) that size. The size defaults to the current position.
The current file position is not changed unless the position
is beyond the new file size.
If the specified size exceeds the file's current size, the
file remains unchanged.
"""
_complain_ifclosed(self.closed)
if size is None:
size = self.pos
elif size < 0:
raise IOError(EINVAL, "Negative size not allowed")
elif size < self.pos:
self.pos = size
self.buf = self.getvalue()[:size]
self.len = size
def write(self, s):
"""Write a string to the file.
There is no return value.
"""
_complain_ifclosed(self.closed)
if not s: return
# Force s to be a string or unicode
if not isinstance(s, basestring):
s = str(s)
spos = self.pos
slen = self.len
if spos == slen:
self.buflist.append(s)
self.len = self.pos = spos + len(s)
return
# 这一行代码就解释为什么我上面写的的第一段代码
# 会产生那样的结果
if spos > slen:
self.buflist.append('\0'*(spos - slen))
slen = spos
newpos = spos + len(s)
if spos < slen:
if self.buflist:
self.buf += ''.join(self.buflist)
self.buflist = [self.buf[:spos], s, self.buf[newpos:]]
self.buf = ''
if newpos > slen:
slen = newpos
else:
self.buflist.append(s)
slen = newpos
self.len = slen
self.pos = newpos
def writelines(self, iterable):
"""Write a sequence of strings to the file. The sequence can be any
iterable object producing strings, typically a list of strings. There
is no return value.
(The name is intended to match readlines(); writelines() does not add
line separators.)
"""
write = self.write
for line in iterable:
write(line)
def flush(self):
"""Flush the internal buffer
"""
# StringIO 压根不用操作磁盘,所以没有 flush 操作
_complain_ifclosed(self.closed)
def getvalue(self):
"""
Retrieve the entire contents of the "file" at any time before
the StringIO object's close() method is called.
The StringIO object can accept either Unicode or 8-bit strings,
but mixing the two may take some care. If both are used, 8-bit
strings that cannot be interpreted as 7-bit ASCII (that use the
8th bit) will cause a UnicodeError to be raised when getvalue()
is called.
"""
_complain_ifclosed(self.closed)
if self.buflist:
self.buf += ''.join(self.buflist)
self.buflist = []
return self.buf
|
comments powered by Disqus