This commit is contained in:
FrederikBaerentsen 2022-08-13 20:07:28 +02:00
parent aadf6d6914
commit 3fe0c9eaa8
10 changed files with 0 additions and 6364 deletions

View File

@ -1,656 +0,0 @@
"""A collection of string operations (most are no longer used).
Warning: most of the code you see here isn't normally used nowadays.
Beginning with Python 1.6, many of these functions are implemented as
methods on the standard string object. They used to be implemented by
a built-in module called strop, but strop is now obsolete itself.
Public module variables:
whitespace -- a string containing all characters considered whitespace
lowercase -- a string containing all characters considered lowercase letters
uppercase -- a string containing all characters considered uppercase letters
letters -- a string containing all characters considered letters
digits -- a string containing all characters considered decimal digits
hexdigits -- a string containing all characters considered hexadecimal digits
octdigits -- a string containing all characters considered octal digits
punctuation -- a string containing all characters considered punctuation
printable -- a string containing all characters considered printable
"""
# Some strings for ctype-style character classification
whitespace = ' \t\n\r\v\f'
lowercase = 'abcdefghijklmnopqrstuvwxyz'
uppercase = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
letters = lowercase + uppercase
ascii_lowercase = lowercase
ascii_uppercase = uppercase
ascii_letters = ascii_lowercase + ascii_uppercase
digits = '0123456789'
hexdigits = digits + 'abcdef' + 'ABCDEF'
octdigits = '01234567'
punctuation = """!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~"""
printable = digits + letters + punctuation + whitespace
# Case conversion helpers
# Use str to convert Unicode literal in case of -U
l = map(chr, xrange(256))
_idmap = str('').join(l)
del l
# Functions which aren't available as string methods.
# Capitalize the words in a string, e.g. " aBc dEf " -> "Abc Def".
def capwords(s, sep=None):
"""capwords(s [,sep]) -> string
Split the argument into words using split, capitalize each
word using capitalize, and join the capitalized words using
join. If the optional second argument sep is absent or None,
runs of whitespace characters are replaced by a single space
and leading and trailing whitespace are removed, otherwise
sep is used to split and join the words.
"""
return (sep or ' ').join(x.capitalize() for x in s.split(sep))
# Construct a translation string
_idmapL = None
def maketrans(fromstr, tostr):
"""maketrans(frm, to) -> string
Return a translation table (a string of 256 bytes long)
suitable for use in string.translate. The strings frm and to
must be of the same length.
"""
if len(fromstr) != len(tostr):
raise ValueError, "maketrans arguments must have same length"
global _idmapL
if not _idmapL:
_idmapL = list(_idmap)
L = _idmapL[:]
fromstr = map(ord, fromstr)
for i in range(len(fromstr)):
L[fromstr[i]] = tostr[i]
return ''.join(L)
####################################################################
import re as _re
class _multimap:
"""Helper class for combining multiple mappings.
Used by .{safe_,}substitute() to combine the mapping and keyword
arguments.
"""
def __init__(self, primary, secondary):
self._primary = primary
self._secondary = secondary
def __getitem__(self, key):
try:
return self._primary[key]
except KeyError:
return self._secondary[key]
class _TemplateMetaclass(type):
pattern = r"""
%(delim)s(?:
(?P<escaped>%(delim)s) | # Escape sequence of two delimiters
(?P<named>%(id)s) | # delimiter and a Python identifier
{(?P<braced>%(id)s)} | # delimiter and a braced identifier
(?P<invalid>) # Other ill-formed delimiter exprs
)
"""
def __init__(cls, name, bases, dct):
super(_TemplateMetaclass, cls).__init__(name, bases, dct)
if 'pattern' in dct:
pattern = cls.pattern
else:
pattern = _TemplateMetaclass.pattern % {
'delim' : _re.escape(cls.delimiter),
'id' : cls.idpattern,
}
cls.pattern = _re.compile(pattern, _re.IGNORECASE | _re.VERBOSE)
class Template:
"""A string class for supporting $-substitutions."""
__metaclass__ = _TemplateMetaclass
delimiter = '$'
idpattern = r'[_a-z][_a-z0-9]*'
def __init__(self, template):
self.template = template
# Search for $$, $identifier, ${identifier}, and any bare $'s
def _invalid(self, mo):
i = mo.start('invalid')
lines = self.template[:i].splitlines(True)
if not lines:
colno = 1
lineno = 1
else:
colno = i - len(''.join(lines[:-1]))
lineno = len(lines)
raise ValueError('Invalid placeholder in string: line %d, col %d' %
(lineno, colno))
def substitute(*args, **kws):
if not args:
raise TypeError("descriptor 'substitute' of 'Template' object "
"needs an argument")
self, args = args[0], args[1:] # allow the "self" keyword be passed
if len(args) > 1:
raise TypeError('Too many positional arguments')
if not args:
mapping = kws
elif kws:
mapping = _multimap(kws, args[0])
else:
mapping = args[0]
# Helper function for .sub()
def convert(mo):
# Check the most common path first.
named = mo.group('named') or mo.group('braced')
if named is not None:
val = mapping[named]
# We use this idiom instead of str() because the latter will
# fail if val is a Unicode containing non-ASCII characters.
return '%s' % (val,)
if mo.group('escaped') is not None:
return self.delimiter
if mo.group('invalid') is not None:
self._invalid(mo)
raise ValueError('Unrecognized named group in pattern',
self.pattern)
return self.pattern.sub(convert, self.template)
def safe_substitute(*args, **kws):
if not args:
raise TypeError("descriptor 'safe_substitute' of 'Template' object "
"needs an argument")
self, args = args[0], args[1:] # allow the "self" keyword be passed
if len(args) > 1:
raise TypeError('Too many positional arguments')
if not args:
mapping = kws
elif kws:
mapping = _multimap(kws, args[0])
else:
mapping = args[0]
# Helper function for .sub()
def convert(mo):
named = mo.group('named') or mo.group('braced')
if named is not None:
try:
# We use this idiom instead of str() because the latter
# will fail if val is a Unicode containing non-ASCII
return '%s' % (mapping[named],)
except KeyError:
return mo.group()
if mo.group('escaped') is not None:
return self.delimiter
if mo.group('invalid') is not None:
return mo.group()
raise ValueError('Unrecognized named group in pattern',
self.pattern)
return self.pattern.sub(convert, self.template)
####################################################################
# NOTE: Everything below here is deprecated. Use string methods instead.
# This stuff will go away in Python 3.0.
# Backward compatible names for exceptions
index_error = ValueError
atoi_error = ValueError
atof_error = ValueError
atol_error = ValueError
# convert UPPER CASE letters to lower case
def lower(s):
"""lower(s) -> string
Return a copy of the string s converted to lowercase.
"""
return s.lower()
# Convert lower case letters to UPPER CASE
def upper(s):
"""upper(s) -> string
Return a copy of the string s converted to uppercase.
"""
return s.upper()
# Swap lower case letters and UPPER CASE
def swapcase(s):
"""swapcase(s) -> string
Return a copy of the string s with upper case characters
converted to lowercase and vice versa.
"""
return s.swapcase()
# Strip leading and trailing tabs and spaces
def strip(s, chars=None):
"""strip(s [,chars]) -> string
Return a copy of the string s with leading and trailing
whitespace removed.
If chars is given and not None, remove characters in chars instead.
If chars is unicode, S will be converted to unicode before stripping.
"""
return s.strip(chars)
# Strip leading tabs and spaces
def lstrip(s, chars=None):
"""lstrip(s [,chars]) -> string
Return a copy of the string s with leading whitespace removed.
If chars is given and not None, remove characters in chars instead.
"""
return s.lstrip(chars)
# Strip trailing tabs and spaces
def rstrip(s, chars=None):
"""rstrip(s [,chars]) -> string
Return a copy of the string s with trailing whitespace removed.
If chars is given and not None, remove characters in chars instead.
"""
return s.rstrip(chars)
# Split a string into a list of space/tab-separated words
def split(s, sep=None, maxsplit=-1):
"""split(s [,sep [,maxsplit]]) -> list of strings
Return a list of the words in the string s, using sep as the
delimiter string. If maxsplit is given, splits at no more than
maxsplit places (resulting in at most maxsplit+1 words). If sep
is not specified or is None, any whitespace string is a separator.
(split and splitfields are synonymous)
"""
return s.split(sep, maxsplit)
splitfields = split
# Split a string into a list of space/tab-separated words
def rsplit(s, sep=None, maxsplit=-1):
"""rsplit(s [,sep [,maxsplit]]) -> list of strings
Return a list of the words in the string s, using sep as the
delimiter string, starting at the end of the string and working
to the front. If maxsplit is given, at most maxsplit splits are
done. If sep is not specified or is None, any whitespace string
is a separator.
"""
return s.rsplit(sep, maxsplit)
# Join fields with optional separator
def join(words, sep = ' '):
"""join(list [,sep]) -> string
Return a string composed of the words in list, with
intervening occurrences of sep. The default separator is a
single space.
(joinfields and join are synonymous)
"""
return sep.join(words)
joinfields = join
# Find substring, raise exception if not found
def index(s, *args):
"""index(s, sub [,start [,end]]) -> int
Like find but raises ValueError when the substring is not found.
"""
return s.index(*args)
# Find last substring, raise exception if not found
def rindex(s, *args):
"""rindex(s, sub [,start [,end]]) -> int
Like rfind but raises ValueError when the substring is not found.
"""
return s.rindex(*args)
# Count non-overlapping occurrences of substring
def count(s, *args):
"""count(s, sub[, start[,end]]) -> int
Return the number of occurrences of substring sub in string
s[start:end]. Optional arguments start and end are
interpreted as in slice notation.
"""
return s.count(*args)
# Find substring, return -1 if not found
def find(s, *args):
"""find(s, sub [,start [,end]]) -> in
Return the lowest index in s where substring sub is found,
such that sub is contained within s[start,end]. Optional
arguments start and end are interpreted as in slice notation.
Return -1 on failure.
"""
return s.find(*args)
# Find last substring, return -1 if not found
def rfind(s, *args):
"""rfind(s, sub [,start [,end]]) -> int
Return the highest index in s where substring sub is found,
such that sub is contained within s[start,end]. Optional
arguments start and end are interpreted as in slice notation.
Return -1 on failure.
"""
return s.rfind(*args)
# for a bit of speed
_float = float
_int = int
_long = long
# Convert string to float
def atof(s):
"""atof(s) -> float
Return the floating point number represented by the string s.
"""
return _float(s)
# Convert string to integer
def atoi(s , base=10):
"""atoi(s [,base]) -> int
Return the integer represented by the string s in the given
base, which defaults to 10. The string s must consist of one
or more digits, possibly preceded by a sign. If base is 0, it
is chosen from the leading characters of s, 0 for octal, 0x or
0X for hexadecimal. If base is 16, a preceding 0x or 0X is
accepted.
"""
return _int(s, base)
# Convert string to long integer
def atol(s, base=10):
"""atol(s [,base]) -> long
Return the long integer represented by the string s in the
given base, which defaults to 10. The string s must consist
of one or more digits, possibly preceded by a sign. If base
is 0, it is chosen from the leading characters of s, 0 for
octal, 0x or 0X for hexadecimal. If base is 16, a preceding
0x or 0X is accepted. A trailing L or l is not accepted,
unless base is 0.
"""
return _long(s, base)
# Left-justify a string
def ljust(s, width, *args):
"""ljust(s, width[, fillchar]) -> string
Return a left-justified version of s, in a field of the
specified width, padded with spaces as needed. The string is
never truncated. If specified the fillchar is used instead of spaces.
"""
return s.ljust(width, *args)
# Right-justify a string
def rjust(s, width, *args):
"""rjust(s, width[, fillchar]) -> string
Return a right-justified version of s, in a field of the
specified width, padded with spaces as needed. The string is
never truncated. If specified the fillchar is used instead of spaces.
"""
return s.rjust(width, *args)
# Center a string
def center(s, width, *args):
"""center(s, width[, fillchar]) -> string
Return a center version of s, in a field of the specified
width. padded with spaces as needed. The string is never
truncated. If specified the fillchar is used instead of spaces.
"""
return s.center(width, *args)
# Zero-fill a number, e.g., (12, 3) --> '012' and (-3, 3) --> '-03'
# Decadent feature: the argument may be a string or a number
# (Use of this is deprecated; it should be a string as with ljust c.s.)
def zfill(x, width):
"""zfill(x, width) -> string
Pad a numeric string x with zeros on the left, to fill a field
of the specified width. The string x is never truncated.
"""
if not isinstance(x, basestring):
x = repr(x)
return x.zfill(width)
# Expand tabs in a string.
# Doesn't take non-printing chars into account, but does understand \n.
def expandtabs(s, tabsize=8):
"""expandtabs(s [,tabsize]) -> string
Return a copy of the string s with all tab characters replaced
by the appropriate number of spaces, depending on the current
column, and the tabsize (default 8).
"""
return s.expandtabs(tabsize)
# Character translation through look-up table.
def translate(s, table, deletions=""):
"""translate(s,table [,deletions]) -> string
Return a copy of the string s, where all characters occurring
in the optional argument deletions are removed, and the
remaining characters have been mapped through the given
translation table, which must be a string of length 256. The
deletions argument is not allowed for Unicode strings.
"""
if deletions or table is None:
return s.translate(table, deletions)
else:
# Add s[:0] so that if s is Unicode and table is an 8-bit string,
# table is converted to Unicode. This means that table *cannot*
# be a dictionary -- for that feature, use u.translate() directly.
return s.translate(table + s[:0])
# Capitalize a string, e.g. "aBc dEf" -> "Abc def".
def capitalize(s):
"""capitalize(s) -> string
Return a copy of the string s with only its first character
capitalized.
"""
return s.capitalize()
# Substring replacement (global)
def replace(s, old, new, maxreplace=-1):
"""replace (str, old, new[, maxreplace]) -> string
Return a copy of string str with all occurrences of substring
old replaced by new. If the optional argument maxreplace is
given, only the first maxreplace occurrences are replaced.
"""
return s.replace(old, new, maxreplace)
# Try importing optional built-in module "strop" -- if it exists,
# it redefines some string operations that are 100-1000 times faster.
# It also defines values for whitespace, lowercase and uppercase
# that match <ctype.h>'s definitions.
try:
from strop import maketrans, lowercase, uppercase, whitespace
letters = lowercase + uppercase
except ImportError:
pass # Use the original versions
########################################################################
# the Formatter class
# see PEP 3101 for details and purpose of this class
# The hard parts are reused from the C implementation. They're exposed as "_"
# prefixed methods of str and unicode.
# The overall parser is implemented in str._formatter_parser.
# The field name parser is implemented in str._formatter_field_name_split
class Formatter(object):
def format(*args, **kwargs):
if not args:
raise TypeError("descriptor 'format' of 'Formatter' object "
"needs an argument")
self, args = args[0], args[1:] # allow the "self" keyword be passed
try:
format_string, args = args[0], args[1:] # allow the "format_string" keyword be passed
except IndexError:
if 'format_string' in kwargs:
format_string = kwargs.pop('format_string')
else:
raise TypeError("format() missing 1 required positional "
"argument: 'format_string'")
return self.vformat(format_string, args, kwargs)
def vformat(self, format_string, args, kwargs):
used_args = set()
result = self._vformat(format_string, args, kwargs, used_args, 2)
self.check_unused_args(used_args, args, kwargs)
return result
def _vformat(self, format_string, args, kwargs, used_args, recursion_depth):
if recursion_depth < 0:
raise ValueError('Max string recursion exceeded')
result = []
for literal_text, field_name, format_spec, conversion in \
self.parse(format_string):
# output the literal text
if literal_text:
result.append(literal_text)
# if there's a field, output it
if field_name is not None:
# this is some markup, find the object and do
# the formatting
# given the field_name, find the object it references
# and the argument it came from
obj, arg_used = self.get_field(field_name, args, kwargs)
used_args.add(arg_used)
# do any conversion on the resulting object
obj = self.convert_field(obj, conversion)
# expand the format spec, if needed
format_spec = self._vformat(format_spec, args, kwargs,
used_args, recursion_depth-1)
# format the object and append to the result
result.append(self.format_field(obj, format_spec))
return ''.join(result)
def get_value(self, key, args, kwargs):
if isinstance(key, (int, long)):
return args[key]
else:
return kwargs[key]
def check_unused_args(self, used_args, args, kwargs):
pass
def format_field(self, value, format_spec):
return format(value, format_spec)
def convert_field(self, value, conversion):
# do any conversion on the resulting object
if conversion is None:
return value
elif conversion == 's':
return str(value)
elif conversion == 'r':
return repr(value)
raise ValueError("Unknown conversion specifier {0!s}".format(conversion))
# returns an iterable that contains tuples of the form:
# (literal_text, field_name, format_spec, conversion)
# literal_text can be zero length
# field_name can be None, in which case there's no
# object to format and output
# if field_name is not None, it is looked up, formatted
# with format_spec and conversion and then used
def parse(self, format_string):
return format_string._formatter_parser()
# given a field_name, find the object it references.
# field_name: the field being looked up, e.g. "0.name"
# or "lookup[3]"
# used_args: a set of which args have been used
# args, kwargs: as passed in to vformat
def get_field(self, field_name, args, kwargs):
first, rest = field_name._formatter_field_name_split()
obj = self.get_value(first, args, kwargs)
# loop through the rest of the field_name, doing
# getattr or getitem as needed
for is_attr, i in rest:
if is_attr:
obj = getattr(obj, i)
else:
obj = obj[i]
return obj, first

View File

@ -1,639 +0,0 @@
"""Temporary files.
This module provides generic, low- and high-level interfaces for
creating temporary files and directories. All of the interfaces
provided by this module can be used without fear of race conditions
except for 'mktemp'. 'mktemp' is subject to race conditions and
should not be used; it is provided for backward compatibility only.
This module also provides some data items to the user:
TMP_MAX - maximum number of names that will be tried before
giving up.
template - the default prefix for all temporary names.
You may change this to control the default prefix.
tempdir - If this is set to a string before the first use of
any routine from this module, it will be considered as
another candidate location to store temporary files.
"""
__all__ = [
"NamedTemporaryFile", "TemporaryFile", # high level safe interfaces
"SpooledTemporaryFile",
"mkstemp", "mkdtemp", # low level safe interfaces
"mktemp", # deprecated unsafe interface
"TMP_MAX", "gettempprefix", # constants
"tempdir", "gettempdir"
]
# Imports.
import io as _io
import os as _os
import errno as _errno
from random import Random as _Random
try:
from cStringIO import StringIO as _StringIO
except ImportError:
from StringIO import StringIO as _StringIO
try:
import fcntl as _fcntl
except ImportError:
def _set_cloexec(fd):
pass
else:
def _set_cloexec(fd):
try:
flags = _fcntl.fcntl(fd, _fcntl.F_GETFD, 0)
except IOError:
pass
else:
# flags read successfully, modify
flags |= _fcntl.FD_CLOEXEC
_fcntl.fcntl(fd, _fcntl.F_SETFD, flags)
try:
import thread as _thread
except ImportError:
import dummy_thread as _thread
_allocate_lock = _thread.allocate_lock
_text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL
if hasattr(_os, 'O_NOINHERIT'):
_text_openflags |= _os.O_NOINHERIT
if hasattr(_os, 'O_NOFOLLOW'):
_text_openflags |= _os.O_NOFOLLOW
_bin_openflags = _text_openflags
if hasattr(_os, 'O_BINARY'):
_bin_openflags |= _os.O_BINARY
if hasattr(_os, 'TMP_MAX'):
TMP_MAX = _os.TMP_MAX
else:
TMP_MAX = 10000
template = "tmp"
# Internal routines.
_once_lock = _allocate_lock()
if hasattr(_os, "lstat"):
_stat = _os.lstat
elif hasattr(_os, "stat"):
_stat = _os.stat
else:
# Fallback. All we need is something that raises os.error if the
# file doesn't exist.
def _stat(fn):
try:
f = open(fn)
except IOError:
raise _os.error
f.close()
def _exists(fn):
try:
_stat(fn)
except _os.error:
return False
else:
return True
class _RandomNameSequence:
"""An instance of _RandomNameSequence generates an endless
sequence of unpredictable strings which can safely be incorporated
into file names. Each string is six characters long. Multiple
threads can safely use the same instance at the same time.
_RandomNameSequence is an iterator."""
characters = ("abcdefghijklmnopqrstuvwxyz" +
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
"0123456789_")
def __init__(self):
self.mutex = _allocate_lock()
self.normcase = _os.path.normcase
@property
def rng(self):
cur_pid = _os.getpid()
if cur_pid != getattr(self, '_rng_pid', None):
self._rng = _Random()
self._rng_pid = cur_pid
return self._rng
def __iter__(self):
return self
def next(self):
m = self.mutex
c = self.characters
choose = self.rng.choice
m.acquire()
try:
letters = [choose(c) for dummy in "123456"]
finally:
m.release()
return self.normcase(''.join(letters))
def _candidate_tempdir_list():
"""Generate a list of candidate temporary directories which
_get_default_tempdir will try."""
dirlist = []
# First, try the environment.
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = _os.getenv(envname)
if dirname: dirlist.append(dirname)
# Failing that, try OS-specific locations.
if _os.name == 'riscos':
dirname = _os.getenv('Wimp$ScrapDir')
if dirname: dirlist.append(dirname)
elif _os.name == 'nt':
dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
else:
dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])
# As a last resort, the current directory.
try:
dirlist.append(_os.getcwd())
except (AttributeError, _os.error):
dirlist.append(_os.curdir)
return dirlist
def _get_default_tempdir():
"""Calculate the default directory to use for temporary files.
This routine should be called exactly once.
We determine whether or not a candidate temp dir is usable by
trying to create and write to a file in that directory. If this
is successful, the test file is deleted. To prevent denial of
service, the name of the test file must be randomized."""
namer = _RandomNameSequence()
dirlist = _candidate_tempdir_list()
flags = _text_openflags
for dir in dirlist:
if dir != _os.curdir:
dir = _os.path.normcase(_os.path.abspath(dir))
# Try only a few names per directory.
for seq in xrange(100):
name = namer.next()
filename = _os.path.join(dir, name)
try:
fd = _os.open(filename, flags, 0o600)
try:
try:
with _io.open(fd, 'wb', closefd=False) as fp:
fp.write(b'blat')
finally:
_os.close(fd)
finally:
_os.unlink(filename)
return dir
except (OSError, IOError) as e:
if e.args[0] == _errno.EEXIST:
continue
if (_os.name == 'nt' and e.args[0] == _errno.EACCES and
_os.path.isdir(dir) and _os.access(dir, _os.W_OK)):
# On windows, when a directory with the chosen name already
# exists, EACCES error code is returned instead of EEXIST.
continue
break # no point trying more names in this directory
raise IOError, (_errno.ENOENT,
("No usable temporary directory found in %s" % dirlist))
_name_sequence = None
def _get_candidate_names():
"""Common setup sequence for all user-callable interfaces."""
global _name_sequence
if _name_sequence is None:
_once_lock.acquire()
try:
if _name_sequence is None:
_name_sequence = _RandomNameSequence()
finally:
_once_lock.release()
return _name_sequence
def _mkstemp_inner(dir, pre, suf, flags):
"""Code common to mkstemp, TemporaryFile, and NamedTemporaryFile."""
names = _get_candidate_names()
for seq in xrange(TMP_MAX):
name = names.next()
file = _os.path.join(dir, pre + name + suf)
try:
fd = _os.open(file, flags, 0600)
_set_cloexec(fd)
return (fd, _os.path.abspath(file))
except OSError, e:
if e.errno == _errno.EEXIST:
continue # try again
if (_os.name == 'nt' and e.errno == _errno.EACCES and
_os.path.isdir(dir) and _os.access(dir, _os.W_OK)):
# On windows, when a directory with the chosen name already
# exists, EACCES error code is returned instead of EEXIST.
continue
raise
raise IOError, (_errno.EEXIST, "No usable temporary file name found")
# User visible interfaces.
def gettempprefix():
"""Accessor for tempdir.template."""
return template
tempdir = None
def gettempdir():
"""Accessor for tempfile.tempdir."""
global tempdir
if tempdir is None:
_once_lock.acquire()
try:
if tempdir is None:
tempdir = _get_default_tempdir()
finally:
_once_lock.release()
return tempdir
def mkstemp(suffix="", prefix=template, dir=None, text=False):
"""User-callable function to create and return a unique temporary
file. The return value is a pair (fd, name) where fd is the
file descriptor returned by os.open, and name is the filename.
If 'suffix' is specified, the file name will end with that suffix,
otherwise there will be no suffix.
If 'prefix' is specified, the file name will begin with that prefix,
otherwise a default prefix is used.
If 'dir' is specified, the file will be created in that directory,
otherwise a default directory is used.
If 'text' is specified and true, the file is opened in text
mode. Else (the default) the file is opened in binary mode. On
some operating systems, this makes no difference.
The file is readable and writable only by the creating user ID.
If the operating system uses permission bits to indicate whether a
file is executable, the file is executable by no one. The file
descriptor is not inherited by children of this process.
Caller is responsible for deleting the file when done with it.
"""
if dir is None:
dir = gettempdir()
if text:
flags = _text_openflags
else:
flags = _bin_openflags
return _mkstemp_inner(dir, prefix, suffix, flags)
def mkdtemp(suffix="", prefix=template, dir=None):
"""User-callable function to create and return a unique temporary
directory. The return value is the pathname of the directory.
Arguments are as for mkstemp, except that the 'text' argument is
not accepted.
The directory is readable, writable, and searchable only by the
creating user.
Caller is responsible for deleting the directory when done with it.
"""
if dir is None:
dir = gettempdir()
names = _get_candidate_names()
for seq in xrange(TMP_MAX):
name = names.next()
file = _os.path.join(dir, prefix + name + suffix)
try:
_os.mkdir(file, 0700)
return file
except OSError, e:
if e.errno == _errno.EEXIST:
continue # try again
if (_os.name == 'nt' and e.errno == _errno.EACCES and
_os.path.isdir(dir) and _os.access(dir, _os.W_OK)):
# On windows, when a directory with the chosen name already
# exists, EACCES error code is returned instead of EEXIST.
continue
raise
raise IOError, (_errno.EEXIST, "No usable temporary directory name found")
def mktemp(suffix="", prefix=template, dir=None):
"""User-callable function to return a unique temporary file name. The
file is not created.
Arguments are as for mkstemp, except that the 'text' argument is
not accepted.
This function is unsafe and should not be used. The file name
refers to a file that did not exist at some point, but by the time
you get around to creating it, someone else may have beaten you to
the punch.
"""
## from warnings import warn as _warn
## _warn("mktemp is a potential security risk to your program",
## RuntimeWarning, stacklevel=2)
if dir is None:
dir = gettempdir()
names = _get_candidate_names()
for seq in xrange(TMP_MAX):
name = names.next()
file = _os.path.join(dir, prefix + name + suffix)
if not _exists(file):
return file
raise IOError, (_errno.EEXIST, "No usable temporary filename found")
class _TemporaryFileWrapper:
"""Temporary file wrapper
This class provides a wrapper around files opened for
temporary use. In particular, it seeks to automatically
remove the file when it is no longer needed.
"""
def __init__(self, file, name, delete=True):
self.file = file
self.name = name
self.close_called = False
self.delete = delete
def __getattr__(self, name):
# Attribute lookups are delegated to the underlying file
# and cached for non-numeric results
# (i.e. methods are cached, closed and friends are not)
file = self.__dict__['file']
a = getattr(file, name)
if not issubclass(type(a), type(0)):
setattr(self, name, a)
return a
# The underlying __enter__ method returns the wrong object
# (self.file) so override it to return the wrapper
def __enter__(self):
self.file.__enter__()
return self
# NT provides delete-on-close as a primitive, so we don't need
# the wrapper to do anything special. We still use it so that
# file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile.
if _os.name != 'nt':
# Cache the unlinker so we don't get spurious errors at
# shutdown when the module-level "os" is None'd out. Note
# that this must be referenced as self.unlink, because the
# name TemporaryFileWrapper may also get None'd out before
# __del__ is called.
unlink = _os.unlink
def close(self):
if not self.close_called:
self.close_called = True
try:
self.file.close()
finally:
if self.delete:
self.unlink(self.name)
def __del__(self):
self.close()
# Need to trap __exit__ as well to ensure the file gets
# deleted when used in a with statement
def __exit__(self, exc, value, tb):
result = self.file.__exit__(exc, value, tb)
self.close()
return result
else:
def __exit__(self, exc, value, tb):
self.file.__exit__(exc, value, tb)
def NamedTemporaryFile(mode='w+b', bufsize=-1, suffix="",
prefix=template, dir=None, delete=True):
"""Create and return a temporary file.
Arguments:
'prefix', 'suffix', 'dir' -- as for mkstemp.
'mode' -- the mode argument to os.fdopen (default "w+b").
'bufsize' -- the buffer size argument to os.fdopen (default -1).
'delete' -- whether the file is deleted on close (default True).
The file is created as mkstemp() would do it.
Returns an object with a file-like interface; the name of the file
is accessible as file.name. The file will be automatically deleted
when it is closed unless the 'delete' argument is set to False.
"""
if dir is None:
dir = gettempdir()
if 'b' in mode:
flags = _bin_openflags
else:
flags = _text_openflags
# Setting O_TEMPORARY in the flags causes the OS to delete
# the file when it is closed. This is only supported by Windows.
if _os.name == 'nt' and delete:
flags |= _os.O_TEMPORARY
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
try:
file = _os.fdopen(fd, mode, bufsize)
return _TemporaryFileWrapper(file, name, delete)
except:
_os.close(fd)
raise
if _os.name != 'posix' or _os.sys.platform == 'cygwin':
# On non-POSIX and Cygwin systems, assume that we cannot unlink a file
# while it is open.
TemporaryFile = NamedTemporaryFile
else:
def TemporaryFile(mode='w+b', bufsize=-1, suffix="",
prefix=template, dir=None):
"""Create and return a temporary file.
Arguments:
'prefix', 'suffix', 'dir' -- as for mkstemp.
'mode' -- the mode argument to os.fdopen (default "w+b").
'bufsize' -- the buffer size argument to os.fdopen (default -1).
The file is created as mkstemp() would do it.
Returns an object with a file-like interface. The file has no
name, and will cease to exist when it is closed.
"""
if dir is None:
dir = gettempdir()
if 'b' in mode:
flags = _bin_openflags
else:
flags = _text_openflags
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
try:
_os.unlink(name)
return _os.fdopen(fd, mode, bufsize)
except:
_os.close(fd)
raise
class SpooledTemporaryFile:
"""Temporary file wrapper, specialized to switch from
StringIO to a real file when it exceeds a certain size or
when a fileno is needed.
"""
_rolled = False
def __init__(self, max_size=0, mode='w+b', bufsize=-1,
suffix="", prefix=template, dir=None):
self._file = _StringIO()
self._max_size = max_size
self._rolled = False
self._TemporaryFileArgs = (mode, bufsize, suffix, prefix, dir)
def _check(self, file):
if self._rolled: return
max_size = self._max_size
if max_size and file.tell() > max_size:
self.rollover()
def rollover(self):
if self._rolled: return
file = self._file
newfile = self._file = TemporaryFile(*self._TemporaryFileArgs)
del self._TemporaryFileArgs
newfile.write(file.getvalue())
newfile.seek(file.tell(), 0)
self._rolled = True
# The method caching trick from NamedTemporaryFile
# won't work here, because _file may change from a
# _StringIO instance to a real file. So we list
# all the methods directly.
# Context management protocol
def __enter__(self):
if self._file.closed:
raise ValueError("Cannot enter context with closed file")
return self
def __exit__(self, exc, value, tb):
self._file.close()
# file protocol
def __iter__(self):
return self._file.__iter__()
def close(self):
self._file.close()
@property
def closed(self):
return self._file.closed
def fileno(self):
self.rollover()
return self._file.fileno()
def flush(self):
self._file.flush()
def isatty(self):
return self._file.isatty()
@property
def mode(self):
try:
return self._file.mode
except AttributeError:
return self._TemporaryFileArgs[0]
@property
def name(self):
try:
return self._file.name
except AttributeError:
return None
def next(self):
return self._file.next
def read(self, *args):
return self._file.read(*args)
def readline(self, *args):
return self._file.readline(*args)
def readlines(self, *args):
return self._file.readlines(*args)
def seek(self, *args):
self._file.seek(*args)
@property
def softspace(self):
return self._file.softspace
def tell(self):
return self._file.tell()
def truncate(self):
self._file.truncate()
def write(self, s):
file = self._file
rv = file.write(s)
self._check(file)
return rv
def writelines(self, iterable):
file = self._file
rv = file.writelines(iterable)
self._check(file)
return rv
def xreadlines(self, *args):
if hasattr(self._file, 'xreadlines'): # real file
return iter(self._file)
else: # StringIO()
return iter(self._file.readlines(*args))

View File

@ -1,429 +0,0 @@
"""Text wrapping and filling.
"""
# Copyright (C) 1999-2001 Gregory P. Ward.
# Copyright (C) 2002, 2003 Python Software Foundation.
# Written by Greg Ward <gward@python.net>
__revision__ = "$Id$"
import string, re
try:
_unicode = unicode
except NameError:
# If Python is built without Unicode support, the unicode type
# will not exist. Fake one.
class _unicode(object):
pass
# Do the right thing with boolean values for all known Python versions
# (so this module can be copied to projects that don't depend on Python
# 2.3, e.g. Optik and Docutils) by uncommenting the block of code below.
#try:
# True, False
#except NameError:
# (True, False) = (1, 0)
__all__ = ['TextWrapper', 'wrap', 'fill', 'dedent']
# Hardcode the recognized whitespace characters to the US-ASCII
# whitespace characters. The main reason for doing this is that in
# ISO-8859-1, 0xa0 is non-breaking whitespace, so in certain locales
# that character winds up in string.whitespace. Respecting
# string.whitespace in those cases would 1) make textwrap treat 0xa0 the
# same as any other whitespace char, which is clearly wrong (it's a
# *non-breaking* space), 2) possibly cause problems with Unicode,
# since 0xa0 is not in range(128).
_whitespace = '\t\n\x0b\x0c\r '
class TextWrapper:
"""
Object for wrapping/filling text. The public interface consists of
the wrap() and fill() methods; the other methods are just there for
subclasses to override in order to tweak the default behaviour.
If you want to completely replace the main wrapping algorithm,
you'll probably have to override _wrap_chunks().
Several instance attributes control various aspects of wrapping:
width (default: 70)
the maximum width of wrapped lines (unless break_long_words
is false)
initial_indent (default: "")
string that will be prepended to the first line of wrapped
output. Counts towards the line's width.
subsequent_indent (default: "")
string that will be prepended to all lines save the first
of wrapped output; also counts towards each line's width.
expand_tabs (default: true)
Expand tabs in input text to spaces before further processing.
Each tab will become 1 .. 8 spaces, depending on its position in
its line. If false, each tab is treated as a single character.
replace_whitespace (default: true)
Replace all whitespace characters in the input text by spaces
after tab expansion. Note that if expand_tabs is false and
replace_whitespace is true, every tab will be converted to a
single space!
fix_sentence_endings (default: false)
Ensure that sentence-ending punctuation is always followed
by two spaces. Off by default because the algorithm is
(unavoidably) imperfect.
break_long_words (default: true)
Break words longer than 'width'. If false, those words will not
be broken, and some lines might be longer than 'width'.
break_on_hyphens (default: true)
Allow breaking hyphenated words. If true, wrapping will occur
preferably on whitespaces and right after hyphens part of
compound words.
drop_whitespace (default: true)
Drop leading and trailing whitespace from lines.
"""
whitespace_trans = string.maketrans(_whitespace, ' ' * len(_whitespace))
unicode_whitespace_trans = {}
uspace = ord(u' ')
for x in map(ord, _whitespace):
unicode_whitespace_trans[x] = uspace
# This funky little regex is just the trick for splitting
# text up into word-wrappable chunks. E.g.
# "Hello there -- you goof-ball, use the -b option!"
# splits into
# Hello/ /there/ /--/ /you/ /goof-/ball,/ /use/ /the/ /-b/ /option!
# (after stripping out empty strings).
wordsep_re = re.compile(
r'(\s+|' # any whitespace
r'[^\s\w]*\w+[^0-9\W]-(?=\w+[^0-9\W])|' # hyphenated words
r'(?<=[\w\!\"\'\&\.\,\?])-{2,}(?=\w))') # em-dash
# This less funky little regex just split on recognized spaces. E.g.
# "Hello there -- you goof-ball, use the -b option!"
# splits into
# Hello/ /there/ /--/ /you/ /goof-ball,/ /use/ /the/ /-b/ /option!/
wordsep_simple_re = re.compile(r'(\s+)')
# XXX this is not locale- or charset-aware -- string.lowercase
# is US-ASCII only (and therefore English-only)
sentence_end_re = re.compile(r'[%s]' # lowercase letter
r'[\.\!\?]' # sentence-ending punct.
r'[\"\']?' # optional end-of-quote
r'\Z' # end of chunk
% string.lowercase)
def __init__(self,
width=70,
initial_indent="",
subsequent_indent="",
expand_tabs=True,
replace_whitespace=True,
fix_sentence_endings=False,
break_long_words=True,
drop_whitespace=True,
break_on_hyphens=True):
self.width = width
self.initial_indent = initial_indent
self.subsequent_indent = subsequent_indent
self.expand_tabs = expand_tabs
self.replace_whitespace = replace_whitespace
self.fix_sentence_endings = fix_sentence_endings
self.break_long_words = break_long_words
self.drop_whitespace = drop_whitespace
self.break_on_hyphens = break_on_hyphens
# recompile the regexes for Unicode mode -- done in this clumsy way for
# backwards compatibility because it's rather common to monkey-patch
# the TextWrapper class' wordsep_re attribute.
self.wordsep_re_uni = re.compile(self.wordsep_re.pattern, re.U)
self.wordsep_simple_re_uni = re.compile(
self.wordsep_simple_re.pattern, re.U)
# -- Private methods -----------------------------------------------
# (possibly useful for subclasses to override)
def _munge_whitespace(self, text):
"""_munge_whitespace(text : string) -> string
Munge whitespace in text: expand tabs and convert all other
whitespace characters to spaces. Eg. " foo\\tbar\\n\\nbaz"
becomes " foo bar baz".
"""
if self.expand_tabs:
text = text.expandtabs()
if self.replace_whitespace:
if isinstance(text, str):
text = text.translate(self.whitespace_trans)
elif isinstance(text, _unicode):
text = text.translate(self.unicode_whitespace_trans)
return text
def _split(self, text):
"""_split(text : string) -> [string]
Split the text to wrap into indivisible chunks. Chunks are
not quite the same as words; see _wrap_chunks() for full
details. As an example, the text
Look, goof-ball -- use the -b option!
breaks into the following chunks:
'Look,', ' ', 'goof-', 'ball', ' ', '--', ' ',
'use', ' ', 'the', ' ', '-b', ' ', 'option!'
if break_on_hyphens is True, or in:
'Look,', ' ', 'goof-ball', ' ', '--', ' ',
'use', ' ', 'the', ' ', '-b', ' ', option!'
otherwise.
"""
if isinstance(text, _unicode):
if self.break_on_hyphens:
pat = self.wordsep_re_uni
else:
pat = self.wordsep_simple_re_uni
else:
if self.break_on_hyphens:
pat = self.wordsep_re
else:
pat = self.wordsep_simple_re
chunks = pat.split(text)
chunks = filter(None, chunks) # remove empty chunks
return chunks
def _fix_sentence_endings(self, chunks):
"""_fix_sentence_endings(chunks : [string])
Correct for sentence endings buried in 'chunks'. Eg. when the
original text contains "... foo.\\nBar ...", munge_whitespace()
and split() will convert that to [..., "foo.", " ", "Bar", ...]
which has one too few spaces; this method simply changes the one
space to two.
"""
i = 0
patsearch = self.sentence_end_re.search
while i < len(chunks)-1:
if chunks[i+1] == " " and patsearch(chunks[i]):
chunks[i+1] = " "
i += 2
else:
i += 1
def _handle_long_word(self, reversed_chunks, cur_line, cur_len, width):
"""_handle_long_word(chunks : [string],
cur_line : [string],
cur_len : int, width : int)
Handle a chunk of text (most likely a word, not whitespace) that
is too long to fit in any line.
"""
# Figure out when indent is larger than the specified width, and make
# sure at least one character is stripped off on every pass
if width < 1:
space_left = 1
else:
space_left = width - cur_len
# If we're allowed to break long words, then do so: put as much
# of the next chunk onto the current line as will fit.
if self.break_long_words:
cur_line.append(reversed_chunks[-1][:space_left])
reversed_chunks[-1] = reversed_chunks[-1][space_left:]
# Otherwise, we have to preserve the long word intact. Only add
# it to the current line if there's nothing already there --
# that minimizes how much we violate the width constraint.
elif not cur_line:
cur_line.append(reversed_chunks.pop())
# If we're not allowed to break long words, and there's already
# text on the current line, do nothing. Next time through the
# main loop of _wrap_chunks(), we'll wind up here again, but
# cur_len will be zero, so the next line will be entirely
# devoted to the long word that we can't handle right now.
def _wrap_chunks(self, chunks):
"""_wrap_chunks(chunks : [string]) -> [string]
Wrap a sequence of text chunks and return a list of lines of
length 'self.width' or less. (If 'break_long_words' is false,
some lines may be longer than this.) Chunks correspond roughly
to words and the whitespace between them: each chunk is
indivisible (modulo 'break_long_words'), but a line break can
come between any two chunks. Chunks should not have internal
whitespace; ie. a chunk is either all whitespace or a "word".
Whitespace chunks will be removed from the beginning and end of
lines, but apart from that whitespace is preserved.
"""
lines = []
if self.width <= 0:
raise ValueError("invalid width %r (must be > 0)" % self.width)
# Arrange in reverse order so items can be efficiently popped
# from a stack of chucks.
chunks.reverse()
while chunks:
# Start the list of chunks that will make up the current line.
# cur_len is just the length of all the chunks in cur_line.
cur_line = []
cur_len = 0
# Figure out which static string will prefix this line.
if lines:
indent = self.subsequent_indent
else:
indent = self.initial_indent
# Maximum width for this line.
width = self.width - len(indent)
# First chunk on line is whitespace -- drop it, unless this
# is the very beginning of the text (ie. no lines started yet).
if self.drop_whitespace and chunks[-1].strip() == '' and lines:
del chunks[-1]
while chunks:
l = len(chunks[-1])
# Can at least squeeze this chunk onto the current line.
if cur_len + l <= width:
cur_line.append(chunks.pop())
cur_len += l
# Nope, this line is full.
else:
break
# The current line is full, and the next chunk is too big to
# fit on *any* line (not just this one).
if chunks and len(chunks[-1]) > width:
self._handle_long_word(chunks, cur_line, cur_len, width)
# If the last chunk on this line is all whitespace, drop it.
if self.drop_whitespace and cur_line and cur_line[-1].strip() == '':
del cur_line[-1]
# Convert current line back to a string and store it in list
# of all lines (return value).
if cur_line:
lines.append(indent + ''.join(cur_line))
return lines
# -- Public interface ----------------------------------------------
def wrap(self, text):
"""wrap(text : string) -> [string]
Reformat the single paragraph in 'text' so it fits in lines of
no more than 'self.width' columns, and return a list of wrapped
lines. Tabs in 'text' are expanded with string.expandtabs(),
and all other whitespace characters (including newline) are
converted to space.
"""
text = self._munge_whitespace(text)
chunks = self._split(text)
if self.fix_sentence_endings:
self._fix_sentence_endings(chunks)
return self._wrap_chunks(chunks)
def fill(self, text):
"""fill(text : string) -> string
Reformat the single paragraph in 'text' to fit in lines of no
more than 'self.width' columns, and return a new string
containing the entire wrapped paragraph.
"""
return "\n".join(self.wrap(text))
# -- Convenience interface ---------------------------------------------
def wrap(text, width=70, **kwargs):
"""Wrap a single paragraph of text, returning a list of wrapped lines.
Reformat the single paragraph in 'text' so it fits in lines of no
more than 'width' columns, and return a list of wrapped lines. By
default, tabs in 'text' are expanded with string.expandtabs(), and
all other whitespace characters (including newline) are converted to
space. See TextWrapper class for available keyword args to customize
wrapping behaviour.
"""
w = TextWrapper(width=width, **kwargs)
return w.wrap(text)
def fill(text, width=70, **kwargs):
"""Fill a single paragraph of text, returning a new string.
Reformat the single paragraph in 'text' to fit in lines of no more
than 'width' columns, and return a new string containing the entire
wrapped paragraph. As with wrap(), tabs are expanded and other
whitespace characters converted to space. See TextWrapper class for
available keyword args to customize wrapping behaviour.
"""
w = TextWrapper(width=width, **kwargs)
return w.fill(text)
# -- Loosely related functionality -------------------------------------
_whitespace_only_re = re.compile('^[ \t]+$', re.MULTILINE)
_leading_whitespace_re = re.compile('(^[ \t]*)(?:[^ \t\n])', re.MULTILINE)
def dedent(text):
"""Remove any common leading whitespace from every line in `text`.
This can be used to make triple-quoted strings line up with the left
edge of the display, while still presenting them in the source code
in indented form.
Note that tabs and spaces are both treated as whitespace, but they
are not equal: the lines " hello" and "\\thello" are
considered to have no common leading whitespace. (This behaviour is
new in Python 2.5; older versions of this module incorrectly
expanded tabs before searching for common leading whitespace.)
"""
# Look for the longest leading string of spaces and tabs common to
# all lines.
margin = None
text = _whitespace_only_re.sub('', text)
indents = _leading_whitespace_re.findall(text)
for indent in indents:
if margin is None:
margin = indent
# Current line more deeply indented than previous winner:
# no change (previous winner is still on top).
elif indent.startswith(margin):
pass
# Current line consistent with and no deeper than previous winner:
# it's the new winner.
elif margin.startswith(indent):
margin = indent
# Find the largest common whitespace between current line and previous
# winner.
else:
for i, (x, y) in enumerate(zip(margin, indent)):
if x != y:
margin = margin[:i]
break
else:
margin = margin[:len(indent)]
# sanity check (testing/debugging only)
if 0 and margin:
for line in text.split("\n"):
assert not line or line.startswith(margin), \
"line = %r, margin = %r" % (line, margin)
if margin:
text = re.sub(r'(?m)^' + margin, '', text)
return text
if __name__ == "__main__":
#print dedent("\tfoo\n\tbar")
#print dedent(" \thello there\n \t how are you?")
print dedent("Hello there.\n This is indented.")

View File

@ -1,86 +0,0 @@
"""Define names for all type symbols known in the standard interpreter.
Types that are part of optional modules (e.g. array) are not listed.
"""
import sys
# Iterators in Python aren't a matter of type but of protocol. A large
# and changing number of builtin types implement *some* flavor of
# iterator. Don't check the type! Use hasattr to check for both
# "__iter__" and "next" attributes instead.
NoneType = type(None)
TypeType = type
ObjectType = object
IntType = int
LongType = long
FloatType = float
BooleanType = bool
try:
ComplexType = complex
except NameError:
pass
StringType = str
# StringTypes is already outdated. Instead of writing "type(x) in
# types.StringTypes", you should use "isinstance(x, basestring)". But
# we keep around for compatibility with Python 2.2.
try:
UnicodeType = unicode
StringTypes = (StringType, UnicodeType)
except NameError:
StringTypes = (StringType,)
BufferType = buffer
TupleType = tuple
ListType = list
DictType = DictionaryType = dict
def _f(): pass
FunctionType = type(_f)
LambdaType = type(lambda: None) # Same as FunctionType
CodeType = type(_f.func_code)
def _g():
yield 1
GeneratorType = type(_g())
class _C:
def _m(self): pass
ClassType = type(_C)
UnboundMethodType = type(_C._m) # Same as MethodType
_x = _C()
InstanceType = type(_x)
MethodType = type(_x._m)
BuiltinFunctionType = type(len)
BuiltinMethodType = type([].append) # Same as BuiltinFunctionType
ModuleType = type(sys)
FileType = file
XRangeType = xrange
try:
raise TypeError
except TypeError:
tb = sys.exc_info()[2]
TracebackType = type(tb)
FrameType = type(tb.tb_frame)
del tb
SliceType = slice
EllipsisType = type(Ellipsis)
DictProxyType = type(TypeType.__dict__)
NotImplementedType = type(NotImplemented)
# For Jython, the following two types are identical
GetSetDescriptorType = type(FunctionType.func_code)
MemberDescriptorType = type(FunctionType.func_globals)
del sys, _f, _g, _C, _x # Not for export
__all__ = list(n for n in globals() if n[:1] != '_')

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,428 +0,0 @@
"""Parse (absolute and relative) URLs.
urlparse module is based upon the following RFC specifications.
RFC 3986 (STD66): "Uniform Resource Identifiers" by T. Berners-Lee, R. Fielding
and L. Masinter, January 2005.
RFC 2732 : "Format for Literal IPv6 Addresses in URL's by R.Hinden, B.Carpenter
and L.Masinter, December 1999.
RFC 2396: "Uniform Resource Identifiers (URI)": Generic Syntax by T.
Berners-Lee, R. Fielding, and L. Masinter, August 1998.
RFC 2368: "The mailto URL scheme", by P.Hoffman , L Masinter, J. Zwinski, July 1998.
RFC 1808: "Relative Uniform Resource Locators", by R. Fielding, UC Irvine, June
1995.
RFC 1738: "Uniform Resource Locators (URL)" by T. Berners-Lee, L. Masinter, M.
McCahill, December 1994
RFC 3986 is considered the current standard and any future changes to
urlparse module should conform with it. The urlparse module is
currently not entirely compliant with this RFC due to defacto
scenarios for parsing, and for backward compatibility purposes, some
parsing quirks from older RFCs are retained. The testcases in
test_urlparse.py provides a good indicator of parsing behavior.
"""
import re
__all__ = ["urlparse", "urlunparse", "urljoin", "urldefrag",
"urlsplit", "urlunsplit", "parse_qs", "parse_qsl"]
# A classification of schemes ('' means apply by default)
uses_relative = ['ftp', 'http', 'gopher', 'nntp', 'imap',
'wais', 'file', 'https', 'shttp', 'mms',
'prospero', 'rtsp', 'rtspu', '', 'sftp',
'svn', 'svn+ssh']
uses_netloc = ['ftp', 'http', 'gopher', 'nntp', 'telnet',
'imap', 'wais', 'file', 'mms', 'https', 'shttp',
'snews', 'prospero', 'rtsp', 'rtspu', 'rsync', '',
'svn', 'svn+ssh', 'sftp','nfs','git', 'git+ssh']
uses_params = ['ftp', 'hdl', 'prospero', 'http', 'imap',
'https', 'shttp', 'rtsp', 'rtspu', 'sip', 'sips',
'mms', '', 'sftp', 'tel']
# These are not actually used anymore, but should stay for backwards
# compatibility. (They are undocumented, but have a public-looking name.)
non_hierarchical = ['gopher', 'hdl', 'mailto', 'news',
'telnet', 'wais', 'imap', 'snews', 'sip', 'sips']
uses_query = ['http', 'wais', 'imap', 'https', 'shttp', 'mms',
'gopher', 'rtsp', 'rtspu', 'sip', 'sips', '']
uses_fragment = ['ftp', 'hdl', 'http', 'gopher', 'news',
'nntp', 'wais', 'https', 'shttp', 'snews',
'file', 'prospero', '']
# Characters valid in scheme names
scheme_chars = ('abcdefghijklmnopqrstuvwxyz'
'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
'0123456789'
'+-.')
MAX_CACHE_SIZE = 20
_parse_cache = {}
def clear_cache():
"""Clear the parse cache."""
_parse_cache.clear()
class ResultMixin(object):
"""Shared methods for the parsed result objects."""
@property
def username(self):
netloc = self.netloc
if "@" in netloc:
userinfo = netloc.rsplit("@", 1)[0]
if ":" in userinfo:
userinfo = userinfo.split(":", 1)[0]
return userinfo
return None
@property
def password(self):
netloc = self.netloc
if "@" in netloc:
userinfo = netloc.rsplit("@", 1)[0]
if ":" in userinfo:
return userinfo.split(":", 1)[1]
return None
@property
def hostname(self):
netloc = self.netloc.split('@')[-1]
if '[' in netloc and ']' in netloc:
return netloc.split(']')[0][1:].lower()
elif ':' in netloc:
return netloc.split(':')[0].lower()
elif netloc == '':
return None
else:
return netloc.lower()
@property
def port(self):
netloc = self.netloc.split('@')[-1].split(']')[-1]
if ':' in netloc:
port = netloc.split(':')[1]
if port:
port = int(port, 10)
# verify legal port
if (0 <= port <= 65535):
return port
return None
from collections import namedtuple
class SplitResult(namedtuple('SplitResult', 'scheme netloc path query fragment'), ResultMixin):
__slots__ = ()
def geturl(self):
return urlunsplit(self)
class ParseResult(namedtuple('ParseResult', 'scheme netloc path params query fragment'), ResultMixin):
__slots__ = ()
def geturl(self):
return urlunparse(self)
def urlparse(url, scheme='', allow_fragments=True):
"""Parse a URL into 6 components:
<scheme>://<netloc>/<path>;<params>?<query>#<fragment>
Return a 6-tuple: (scheme, netloc, path, params, query, fragment).
Note that we don't break the components up in smaller bits
(e.g. netloc is a single string) and we don't expand % escapes."""
tuple = urlsplit(url, scheme, allow_fragments)
scheme, netloc, url, query, fragment = tuple
if scheme in uses_params and ';' in url:
url, params = _splitparams(url)
else:
params = ''
return ParseResult(scheme, netloc, url, params, query, fragment)
def _splitparams(url):
if '/' in url:
i = url.find(';', url.rfind('/'))
if i < 0:
return url, ''
else:
i = url.find(';')
return url[:i], url[i+1:]
def _splitnetloc(url, start=0):
delim = len(url) # position of end of domain part of url, default is end
for c in '/?#': # look for delimiters; the order is NOT important
wdelim = url.find(c, start) # find first of this delim
if wdelim >= 0: # if found
delim = min(delim, wdelim) # use earliest delim position
return url[start:delim], url[delim:] # return (domain, rest)
def urlsplit(url, scheme='', allow_fragments=True):
"""Parse a URL into 5 components:
<scheme>://<netloc>/<path>?<query>#<fragment>
Return a 5-tuple: (scheme, netloc, path, query, fragment).
Note that we don't break the components up in smaller bits
(e.g. netloc is a single string) and we don't expand % escapes."""
allow_fragments = bool(allow_fragments)
key = url, scheme, allow_fragments, type(url), type(scheme)
cached = _parse_cache.get(key, None)
if cached:
return cached
if len(_parse_cache) >= MAX_CACHE_SIZE: # avoid runaway growth
clear_cache()
netloc = query = fragment = ''
i = url.find(':')
if i > 0:
if url[:i] == 'http': # optimize the common case
scheme = url[:i].lower()
url = url[i+1:]
if url[:2] == '//':
netloc, url = _splitnetloc(url, 2)
if (('[' in netloc and ']' not in netloc) or
(']' in netloc and '[' not in netloc)):
raise ValueError("Invalid IPv6 URL")
if allow_fragments and '#' in url:
url, fragment = url.split('#', 1)
if '?' in url:
url, query = url.split('?', 1)
v = SplitResult(scheme, netloc, url, query, fragment)
_parse_cache[key] = v
return v
for c in url[:i]:
if c not in scheme_chars:
break
else:
# make sure "url" is not actually a port number (in which case
# "scheme" is really part of the path)
rest = url[i+1:]
if not rest or any(c not in '0123456789' for c in rest):
# not a port number
scheme, url = url[:i].lower(), rest
if url[:2] == '//':
netloc, url = _splitnetloc(url, 2)
if (('[' in netloc and ']' not in netloc) or
(']' in netloc and '[' not in netloc)):
raise ValueError("Invalid IPv6 URL")
if allow_fragments and '#' in url:
url, fragment = url.split('#', 1)
if '?' in url:
url, query = url.split('?', 1)
v = SplitResult(scheme, netloc, url, query, fragment)
_parse_cache[key] = v
return v
def urlunparse(data):
"""Put a parsed URL back together again. This may result in a
slightly different, but equivalent URL, if the URL that was parsed
originally had redundant delimiters, e.g. a ? with an empty query
(the draft states that these are equivalent)."""
scheme, netloc, url, params, query, fragment = data
if params:
url = "%s;%s" % (url, params)
return urlunsplit((scheme, netloc, url, query, fragment))
def urlunsplit(data):
"""Combine the elements of a tuple as returned by urlsplit() into a
complete URL as a string. The data argument can be any five-item iterable.
This may result in a slightly different, but equivalent URL, if the URL that
was parsed originally had unnecessary delimiters (for example, a ? with an
empty query; the RFC states that these are equivalent)."""
scheme, netloc, url, query, fragment = data
if netloc or (scheme and scheme in uses_netloc and url[:2] != '//'):
if url and url[:1] != '/': url = '/' + url
url = '//' + (netloc or '') + url
if scheme:
url = scheme + ':' + url
if query:
url = url + '?' + query
if fragment:
url = url + '#' + fragment
return url
def urljoin(base, url, allow_fragments=True):
"""Join a base URL and a possibly relative URL to form an absolute
interpretation of the latter."""
if not base:
return url
if not url:
return base
bscheme, bnetloc, bpath, bparams, bquery, bfragment = \
urlparse(base, '', allow_fragments)
scheme, netloc, path, params, query, fragment = \
urlparse(url, bscheme, allow_fragments)
if scheme != bscheme or scheme not in uses_relative:
return url
if scheme in uses_netloc:
if netloc:
return urlunparse((scheme, netloc, path,
params, query, fragment))
netloc = bnetloc
if path[:1] == '/':
return urlunparse((scheme, netloc, path,
params, query, fragment))
if not path and not params:
path = bpath
params = bparams
if not query:
query = bquery
return urlunparse((scheme, netloc, path,
params, query, fragment))
segments = bpath.split('/')[:-1] + path.split('/')
# XXX The stuff below is bogus in various ways...
if segments[-1] == '.':
segments[-1] = ''
while '.' in segments:
segments.remove('.')
while 1:
i = 1
n = len(segments) - 1
while i < n:
if (segments[i] == '..'
and segments[i-1] not in ('', '..')):
del segments[i-1:i+1]
break
i = i+1
else:
break
if segments == ['', '..']:
segments[-1] = ''
elif len(segments) >= 2 and segments[-1] == '..':
segments[-2:] = ['']
return urlunparse((scheme, netloc, '/'.join(segments),
params, query, fragment))
def urldefrag(url):
"""Removes any existing fragment from URL.
Returns a tuple of the defragmented URL and the fragment. If
the URL contained no fragments, the second element is the
empty string.
"""
if '#' in url:
s, n, p, a, q, frag = urlparse(url)
defrag = urlunparse((s, n, p, a, q, ''))
return defrag, frag
else:
return url, ''
try:
unicode
except NameError:
def _is_unicode(x):
return 0
else:
def _is_unicode(x):
return isinstance(x, unicode)
# unquote method for parse_qs and parse_qsl
# Cannot use directly from urllib as it would create a circular reference
# because urllib uses urlparse methods (urljoin). If you update this function,
# update it also in urllib. This code duplication does not existin in Python3.
_hexdig = '0123456789ABCDEFabcdef'
_hextochr = dict((a+b, chr(int(a+b,16)))
for a in _hexdig for b in _hexdig)
_asciire = re.compile('([\x00-\x7f]+)')
def unquote(s):
"""unquote('abc%20def') -> 'abc def'."""
if _is_unicode(s):
if '%' not in s:
return s
bits = _asciire.split(s)
res = [bits[0]]
append = res.append
for i in range(1, len(bits), 2):
append(unquote(str(bits[i])).decode('latin1'))
append(bits[i + 1])
return ''.join(res)
bits = s.split('%')
# fastpath
if len(bits) == 1:
return s
res = [bits[0]]
append = res.append
for item in bits[1:]:
try:
append(_hextochr[item[:2]])
append(item[2:])
except KeyError:
append('%')
append(item)
return ''.join(res)
def parse_qs(qs, keep_blank_values=0, strict_parsing=0):
"""Parse a query given as a string argument.
Arguments:
qs: percent-encoded query string to be parsed
keep_blank_values: flag indicating whether blank values in
percent-encoded queries should be treated as blank strings.
A true value indicates that blanks should be retained as
blank strings. The default false value indicates that
blank values are to be ignored and treated as if they were
not included.
strict_parsing: flag indicating what to do with parsing errors.
If false (the default), errors are silently ignored.
If true, errors raise a ValueError exception.
"""
dict = {}
for name, value in parse_qsl(qs, keep_blank_values, strict_parsing):
if name in dict:
dict[name].append(value)
else:
dict[name] = [value]
return dict
def parse_qsl(qs, keep_blank_values=0, strict_parsing=0):
"""Parse a query given as a string argument.
Arguments:
qs: percent-encoded query string to be parsed
keep_blank_values: flag indicating whether blank values in
percent-encoded queries should be treated as blank strings. A
true value indicates that blanks should be retained as blank
strings. The default false value indicates that blank values
are to be ignored and treated as if they were not included.
strict_parsing: flag indicating what to do with parsing errors. If
false (the default), errors are silently ignored. If true,
errors raise a ValueError exception.
Returns a list, as G-d intended.
"""
pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
r = []
for name_value in pairs:
if not name_value and not strict_parsing:
continue
nv = name_value.split('=', 1)
if len(nv) != 2:
if strict_parsing:
raise ValueError, "bad query field: %r" % (name_value,)
# Handle case of a control-name with no equal sign
if keep_blank_values:
nv.append('')
else:
continue
if len(nv[1]) or keep_blank_values:
name = unquote(nv[0].replace('+', ' '))
value = unquote(nv[1].replace('+', ' '))
r.append((name, value))
return r

View File

@ -1,422 +0,0 @@
"""Python part of the warnings subsystem."""
# Note: function level imports should *not* be used
# in this module as it may cause import lock deadlock.
# See bug 683658.
import linecache
import sys
import types
__all__ = ["warn", "warn_explicit", "showwarning",
"formatwarning", "filterwarnings", "simplefilter",
"resetwarnings", "catch_warnings"]
def warnpy3k(message, category=None, stacklevel=1):
"""Issue a deprecation warning for Python 3.x related changes.
Warnings are omitted unless Python is started with the -3 option.
"""
if sys.py3kwarning:
if category is None:
category = DeprecationWarning
warn(message, category, stacklevel+1)
def _show_warning(message, category, filename, lineno, file=None, line=None):
"""Hook to write a warning to a file; replace if you like."""
if file is None:
file = sys.stderr
if file is None:
# sys.stderr is None - warnings get lost
return
try:
file.write(formatwarning(message, category, filename, lineno, line))
except (IOError, UnicodeError):
pass # the file (probably stderr) is invalid - this warning gets lost.
# Keep a working version around in case the deprecation of the old API is
# triggered.
showwarning = _show_warning
def formatwarning(message, category, filename, lineno, line=None):
"""Function to format a warning the standard way."""
try:
unicodetype = unicode
except NameError:
unicodetype = ()
try:
message = str(message)
except UnicodeEncodeError:
pass
s = "%s: %s: %s\n" % (lineno, category.__name__, message)
line = linecache.getline(filename, lineno) if line is None else line
if line:
line = line.strip()
if isinstance(s, unicodetype) and isinstance(line, str):
line = unicode(line, 'latin1')
s += " %s\n" % line
if isinstance(s, unicodetype) and isinstance(filename, str):
enc = sys.getfilesystemencoding()
if enc:
try:
filename = unicode(filename, enc)
except UnicodeDecodeError:
pass
s = "%s:%s" % (filename, s)
return s
def filterwarnings(action, message="", category=Warning, module="", lineno=0,
append=0):
"""Insert an entry into the list of warnings filters (at the front).
'action' -- one of "error", "ignore", "always", "default", "module",
or "once"
'message' -- a regex that the warning message must match
'category' -- a class that the warning must be a subclass of
'module' -- a regex that the module name must match
'lineno' -- an integer line number, 0 matches all warnings
'append' -- if true, append to the list of filters
"""
import re
assert action in ("error", "ignore", "always", "default", "module",
"once"), "invalid action: %r" % (action,)
assert isinstance(message, basestring), "message must be a string"
assert isinstance(category, (type, types.ClassType)), \
"category must be a class"
assert issubclass(category, Warning), "category must be a Warning subclass"
assert isinstance(module, basestring), "module must be a string"
assert isinstance(lineno, int) and lineno >= 0, \
"lineno must be an int >= 0"
item = (action, re.compile(message, re.I), category,
re.compile(module), lineno)
if append:
filters.append(item)
else:
filters.insert(0, item)
def simplefilter(action, category=Warning, lineno=0, append=0):
"""Insert a simple entry into the list of warnings filters (at the front).
A simple filter matches all modules and messages.
'action' -- one of "error", "ignore", "always", "default", "module",
or "once"
'category' -- a class that the warning must be a subclass of
'lineno' -- an integer line number, 0 matches all warnings
'append' -- if true, append to the list of filters
"""
assert action in ("error", "ignore", "always", "default", "module",
"once"), "invalid action: %r" % (action,)
assert isinstance(lineno, int) and lineno >= 0, \
"lineno must be an int >= 0"
item = (action, None, category, None, lineno)
if append:
filters.append(item)
else:
filters.insert(0, item)
def resetwarnings():
"""Clear the list of warning filters, so that no filters are active."""
filters[:] = []
class _OptionError(Exception):
"""Exception used by option processing helpers."""
pass
# Helper to process -W options passed via sys.warnoptions
def _processoptions(args):
for arg in args:
try:
_setoption(arg)
except _OptionError, msg:
print >>sys.stderr, "Invalid -W option ignored:", msg
# Helper for _processoptions()
def _setoption(arg):
import re
parts = arg.split(':')
if len(parts) > 5:
raise _OptionError("too many fields (max 5): %r" % (arg,))
while len(parts) < 5:
parts.append('')
action, message, category, module, lineno = [s.strip()
for s in parts]
action = _getaction(action)
message = re.escape(message)
category = _getcategory(category)
module = re.escape(module)
if module:
module = module + '$'
if lineno:
try:
lineno = int(lineno)
if lineno < 0:
raise ValueError
except (ValueError, OverflowError):
raise _OptionError("invalid lineno %r" % (lineno,))
else:
lineno = 0
filterwarnings(action, message, category, module, lineno)
# Helper for _setoption()
def _getaction(action):
if not action:
return "default"
if action == "all": return "always" # Alias
for a in ('default', 'always', 'ignore', 'module', 'once', 'error'):
if a.startswith(action):
return a
raise _OptionError("invalid action: %r" % (action,))
# Helper for _setoption()
def _getcategory(category):
import re
if not category:
return Warning
if re.match("^[a-zA-Z0-9_]+$", category):
try:
cat = eval(category)
except NameError:
raise _OptionError("unknown warning category: %r" % (category,))
else:
i = category.rfind(".")
module = category[:i]
klass = category[i+1:]
try:
m = __import__(module, None, None, [klass])
except ImportError:
raise _OptionError("invalid module name: %r" % (module,))
try:
cat = getattr(m, klass)
except AttributeError:
raise _OptionError("unknown warning category: %r" % (category,))
if not issubclass(cat, Warning):
raise _OptionError("invalid warning category: %r" % (category,))
return cat
# Code typically replaced by _warnings
def warn(message, category=None, stacklevel=1):
"""Issue a warning, or maybe ignore it or raise an exception."""
# Check if message is already a Warning object
if isinstance(message, Warning):
category = message.__class__
# Check category argument
if category is None:
category = UserWarning
assert issubclass(category, Warning)
# Get context information
try:
caller = sys._getframe(stacklevel)
except ValueError:
globals = sys.__dict__
lineno = 1
else:
globals = caller.f_globals
lineno = caller.f_lineno
if '__name__' in globals:
module = globals['__name__']
else:
module = "<string>"
filename = globals.get('__file__')
if filename:
fnl = filename.lower()
if fnl.endswith((".pyc", ".pyo")):
filename = filename[:-1]
else:
if module == "__main__":
try:
filename = sys.argv[0]
except AttributeError:
# embedded interpreters don't have sys.argv, see bug #839151
filename = '__main__'
if not filename:
filename = module
registry = globals.setdefault("__warningregistry__", {})
warn_explicit(message, category, filename, lineno, module, registry,
globals)
def warn_explicit(message, category, filename, lineno,
module=None, registry=None, module_globals=None):
lineno = int(lineno)
if module is None:
module = filename or "<unknown>"
if module[-3:].lower() == ".py":
module = module[:-3] # XXX What about leading pathname?
if registry is None:
registry = {}
if isinstance(message, Warning):
text = str(message)
category = message.__class__
else:
text = message
message = category(message)
key = (text, category, lineno)
# Quick test for common case
if registry.get(key):
return
# Search the filters
for item in filters:
action, msg, cat, mod, ln = item
if ((msg is None or msg.match(text)) and
issubclass(category, cat) and
(mod is None or mod.match(module)) and
(ln == 0 or lineno == ln)):
break
else:
action = defaultaction
# Early exit actions
if action == "ignore":
registry[key] = 1
return
# Prime the linecache for formatting, in case the
# "file" is actually in a zipfile or something.
linecache.getlines(filename, module_globals)
if action == "error":
raise message
# Other actions
if action == "once":
registry[key] = 1
oncekey = (text, category)
if onceregistry.get(oncekey):
return
onceregistry[oncekey] = 1
elif action == "always":
pass
elif action == "module":
registry[key] = 1
altkey = (text, category, 0)
if registry.get(altkey):
return
registry[altkey] = 1
elif action == "default":
registry[key] = 1
else:
# Unrecognized actions are errors
raise RuntimeError(
"Unrecognized action (%r) in warnings.filters:\n %s" %
(action, item))
# Print message and context
showwarning(message, category, filename, lineno)
class WarningMessage(object):
"""Holds the result of a single showwarning() call."""
_WARNING_DETAILS = ("message", "category", "filename", "lineno", "file",
"line")
def __init__(self, message, category, filename, lineno, file=None,
line=None):
local_values = locals()
for attr in self._WARNING_DETAILS:
setattr(self, attr, local_values[attr])
self._category_name = category.__name__ if category else None
def __str__(self):
return ("{message : %r, category : %r, filename : %r, lineno : %s, "
"line : %r}" % (self.message, self._category_name,
self.filename, self.lineno, self.line))
class catch_warnings(object):
"""A context manager that copies and restores the warnings filter upon
exiting the context.
The 'record' argument specifies whether warnings should be captured by a
custom implementation of warnings.showwarning() and be appended to a list
returned by the context manager. Otherwise None is returned by the context
manager. The objects appended to the list are arguments whose attributes
mirror the arguments to showwarning().
The 'module' argument is to specify an alternative module to the module
named 'warnings' and imported under that name. This argument is only useful
when testing the warnings module itself.
"""
def __init__(self, record=False, module=None):
"""Specify whether to record warnings and if an alternative module
should be used other than sys.modules['warnings'].
For compatibility with Python 3.0, please consider all arguments to be
keyword-only.
"""
self._record = record
self._module = sys.modules['warnings'] if module is None else module
self._entered = False
def __repr__(self):
args = []
if self._record:
args.append("record=True")
if self._module is not sys.modules['warnings']:
args.append("module=%r" % self._module)
name = type(self).__name__
return "%s(%s)" % (name, ", ".join(args))
def __enter__(self):
if self._entered:
raise RuntimeError("Cannot enter %r twice" % self)
self._entered = True
self._filters = self._module.filters
self._module.filters = self._filters[:]
self._showwarning = self._module.showwarning
if self._record:
log = []
def showwarning(*args, **kwargs):
log.append(WarningMessage(*args, **kwargs))
self._module.showwarning = showwarning
return log
else:
return None
def __exit__(self, *exc_info):
if not self._entered:
raise RuntimeError("Cannot exit %r without entering first" % self)
self._module.filters = self._filters
self._module.showwarning = self._showwarning
# filters contains a sequence of filter 5-tuples
# The components of the 5-tuple are:
# - an action: error, ignore, always, default, module, or once
# - a compiled regex that must match the warning message
# - a class representing the warning category
# - a compiled regex that must match the module that is being warned
# - a line number for the line being warning, or 0 to mean any line
# If either if the compiled regexs are None, match anything.
_warnings_defaults = False
try:
from _warnings import (filters, default_action, once_registry,
warn, warn_explicit)
defaultaction = default_action
onceregistry = once_registry
_warnings_defaults = True
except ImportError:
filters = []
defaultaction = "default"
onceregistry = {}
# Module initialization
_processoptions(sys.warnoptions)
if not _warnings_defaults:
silence = [ImportWarning, PendingDeprecationWarning]
# Don't silence DeprecationWarning if -3 or -Q was used.
if not sys.py3kwarning and not sys.flags.division_warning:
silence.append(DeprecationWarning)
for cls in silence:
simplefilter("ignore", category=cls)
bytes_warning = sys.flags.bytes_warning
if bytes_warning > 1:
bytes_action = "error"
elif bytes_warning:
bytes_action = "default"
else:
bytes_action = "ignore"
simplefilter(bytes_action, category=BytesWarning, append=1)
del _warnings_defaults

View File

@ -1,458 +0,0 @@
"""Weak reference support for Python.
This module is an implementation of PEP 205:
http://www.python.org/dev/peps/pep-0205/
"""
# Naming convention: Variables named "wr" are weak reference objects;
# they are called this instead of "ref" to avoid name collisions with
# the module-global ref() function imported from _weakref.
import UserDict
from _weakref import (
getweakrefcount,
getweakrefs,
ref,
proxy,
CallableProxyType,
ProxyType,
ReferenceType)
from _weakrefset import WeakSet, _IterationGuard
from exceptions import ReferenceError
ProxyTypes = (ProxyType, CallableProxyType)
__all__ = ["ref", "proxy", "getweakrefcount", "getweakrefs",
"WeakKeyDictionary", "ReferenceError", "ReferenceType", "ProxyType",
"CallableProxyType", "ProxyTypes", "WeakValueDictionary", 'WeakSet']
class WeakValueDictionary(UserDict.UserDict):
"""Mapping class that references values weakly.
Entries in the dictionary will be discarded when no strong
reference to the value exists anymore
"""
# We inherit the constructor without worrying about the input
# dictionary; since it uses our .update() method, we get the right
# checks (if the other dictionary is a WeakValueDictionary,
# objects are unwrapped on the way out, and we always wrap on the
# way in).
def __init__(*args, **kw):
if not args:
raise TypeError("descriptor '__init__' of 'WeakValueDictionary' "
"object needs an argument")
self = args[0]
args = args[1:]
if len(args) > 1:
raise TypeError('expected at most 1 arguments, got %d' % len(args))
def remove(wr, selfref=ref(self)):
self = selfref()
if self is not None:
if self._iterating:
self._pending_removals.append(wr.key)
else:
del self.data[wr.key]
self._remove = remove
# A list of keys to be removed
self._pending_removals = []
self._iterating = set()
UserDict.UserDict.__init__(self, *args, **kw)
def _commit_removals(self):
l = self._pending_removals
d = self.data
# We shouldn't encounter any KeyError, because this method should
# always be called *before* mutating the dict.
while l:
del d[l.pop()]
def __getitem__(self, key):
o = self.data[key]()
if o is None:
raise KeyError, key
else:
return o
def __delitem__(self, key):
if self._pending_removals:
self._commit_removals()
del self.data[key]
def __contains__(self, key):
try:
o = self.data[key]()
except KeyError:
return False
return o is not None
def has_key(self, key):
try:
o = self.data[key]()
except KeyError:
return False
return o is not None
def __repr__(self):
return "<WeakValueDictionary at %s>" % id(self)
def __setitem__(self, key, value):
if self._pending_removals:
self._commit_removals()
self.data[key] = KeyedRef(value, self._remove, key)
def clear(self):
if self._pending_removals:
self._commit_removals()
self.data.clear()
def copy(self):
new = WeakValueDictionary()
for key, wr in self.data.items():
o = wr()
if o is not None:
new[key] = o
return new
__copy__ = copy
def __deepcopy__(self, memo):
from copy import deepcopy
new = self.__class__()
for key, wr in self.data.items():
o = wr()
if o is not None:
new[deepcopy(key, memo)] = o
return new
def get(self, key, default=None):
try:
wr = self.data[key]
except KeyError:
return default
else:
o = wr()
if o is None:
# This should only happen
return default
else:
return o
def items(self):
L = []
for key, wr in self.data.items():
o = wr()
if o is not None:
L.append((key, o))
return L
def iteritems(self):
with _IterationGuard(self):
for wr in self.data.itervalues():
value = wr()
if value is not None:
yield wr.key, value
def iterkeys(self):
with _IterationGuard(self):
for k in self.data.iterkeys():
yield k
__iter__ = iterkeys
def itervaluerefs(self):
"""Return an iterator that yields the weak references to the values.
The references are not guaranteed to be 'live' at the time
they are used, so the result of calling the references needs
to be checked before being used. This can be used to avoid
creating references that will cause the garbage collector to
keep the values around longer than needed.
"""
with _IterationGuard(self):
for wr in self.data.itervalues():
yield wr
def itervalues(self):
with _IterationGuard(self):
for wr in self.data.itervalues():
obj = wr()
if obj is not None:
yield obj
def popitem(self):
if self._pending_removals:
self._commit_removals()
while 1:
key, wr = self.data.popitem()
o = wr()
if o is not None:
return key, o
def pop(self, key, *args):
if self._pending_removals:
self._commit_removals()
try:
o = self.data.pop(key)()
except KeyError:
if args:
return args[0]
raise
if o is None:
raise KeyError, key
else:
return o
def setdefault(self, key, default=None):
try:
wr = self.data[key]
except KeyError:
if self._pending_removals:
self._commit_removals()
self.data[key] = KeyedRef(default, self._remove, key)
return default
else:
return wr()
def update(*args, **kwargs):
if not args:
raise TypeError("descriptor 'update' of 'WeakValueDictionary' "
"object needs an argument")
self = args[0]
args = args[1:]
if len(args) > 1:
raise TypeError('expected at most 1 arguments, got %d' % len(args))
dict = args[0] if args else None
if self._pending_removals:
self._commit_removals()
d = self.data
if dict is not None:
if not hasattr(dict, "items"):
dict = type({})(dict)
for key, o in dict.items():
d[key] = KeyedRef(o, self._remove, key)
if len(kwargs):
self.update(kwargs)
def valuerefs(self):
"""Return a list of weak references to the values.
The references are not guaranteed to be 'live' at the time
they are used, so the result of calling the references needs
to be checked before being used. This can be used to avoid
creating references that will cause the garbage collector to
keep the values around longer than needed.
"""
return self.data.values()
def values(self):
L = []
for wr in self.data.values():
o = wr()
if o is not None:
L.append(o)
return L
class KeyedRef(ref):
"""Specialized reference that includes a key corresponding to the value.
This is used in the WeakValueDictionary to avoid having to create
a function object for each key stored in the mapping. A shared
callback object can use the 'key' attribute of a KeyedRef instead
of getting a reference to the key from an enclosing scope.
"""
__slots__ = "key",
def __new__(type, ob, callback, key):
self = ref.__new__(type, ob, callback)
self.key = key
return self
def __init__(self, ob, callback, key):
super(KeyedRef, self).__init__(ob, callback)
class WeakKeyDictionary(UserDict.UserDict):
""" Mapping class that references keys weakly.
Entries in the dictionary will be discarded when there is no
longer a strong reference to the key. This can be used to
associate additional data with an object owned by other parts of
an application without adding attributes to those objects. This
can be especially useful with objects that override attribute
accesses.
"""
def __init__(self, dict=None):
self.data = {}
def remove(k, selfref=ref(self)):
self = selfref()
if self is not None:
if self._iterating:
self._pending_removals.append(k)
else:
del self.data[k]
self._remove = remove
# A list of dead weakrefs (keys to be removed)
self._pending_removals = []
self._iterating = set()
if dict is not None:
self.update(dict)
def _commit_removals(self):
# NOTE: We don't need to call this method before mutating the dict,
# because a dead weakref never compares equal to a live weakref,
# even if they happened to refer to equal objects.
# However, it means keys may already have been removed.
l = self._pending_removals
d = self.data
while l:
try:
del d[l.pop()]
except KeyError:
pass
def __delitem__(self, key):
del self.data[ref(key)]
def __getitem__(self, key):
return self.data[ref(key)]
def __repr__(self):
return "<WeakKeyDictionary at %s>" % id(self)
def __setitem__(self, key, value):
self.data[ref(key, self._remove)] = value
def copy(self):
new = WeakKeyDictionary()
for key, value in self.data.items():
o = key()
if o is not None:
new[o] = value
return new
__copy__ = copy
def __deepcopy__(self, memo):
from copy import deepcopy
new = self.__class__()
for key, value in self.data.items():
o = key()
if o is not None:
new[o] = deepcopy(value, memo)
return new
def get(self, key, default=None):
return self.data.get(ref(key),default)
def has_key(self, key):
try:
wr = ref(key)
except TypeError:
return 0
return wr in self.data
def __contains__(self, key):
try:
wr = ref(key)
except TypeError:
return 0
return wr in self.data
def items(self):
L = []
for key, value in self.data.items():
o = key()
if o is not None:
L.append((o, value))
return L
def iteritems(self):
with _IterationGuard(self):
for wr, value in self.data.iteritems():
key = wr()
if key is not None:
yield key, value
def iterkeyrefs(self):
"""Return an iterator that yields the weak references to the keys.
The references are not guaranteed to be 'live' at the time
they are used, so the result of calling the references needs
to be checked before being used. This can be used to avoid
creating references that will cause the garbage collector to
keep the keys around longer than needed.
"""
with _IterationGuard(self):
for wr in self.data.iterkeys():
yield wr
def iterkeys(self):
with _IterationGuard(self):
for wr in self.data.iterkeys():
obj = wr()
if obj is not None:
yield obj
__iter__ = iterkeys
def itervalues(self):
with _IterationGuard(self):
for value in self.data.itervalues():
yield value
def keyrefs(self):
"""Return a list of weak references to the keys.
The references are not guaranteed to be 'live' at the time
they are used, so the result of calling the references needs
to be checked before being used. This can be used to avoid
creating references that will cause the garbage collector to
keep the keys around longer than needed.
"""
return self.data.keys()
def keys(self):
L = []
for wr in self.data.keys():
o = wr()
if o is not None:
L.append(o)
return L
def popitem(self):
while 1:
key, value = self.data.popitem()
o = key()
if o is not None:
return o, value
def pop(self, key, *args):
return self.data.pop(ref(key), *args)
def setdefault(self, key, default=None):
return self.data.setdefault(ref(key, self._remove),default)
def update(self, dict=None, **kwargs):
d = self.data
if dict is not None:
if not hasattr(dict, "items"):
dict = type({})(dict)
for key, value in dict.items():
d[ref(key, self._remove)] = value
if len(kwargs):
self.update(kwargs)

View File

@ -1,121 +0,0 @@
#####################################################################################
#
# Copyright (c) Harry Pierson. All rights reserved.
#
# This source code is subject to terms and conditions of the Microsoft Public License.
# A copy of the license can be found at http://opensource.org/licenses/ms-pl.html
# By using this source code in any fashion, you are agreeing to be bound
# by the terms of the Microsoft Public License.
#
# You must not remove this notice, or any other, from this software.
#
#####################################################################################
import ipypulldom
from System.Xml import XmlNodeType
class _type_factory(object):
class _type_node(object):
def __init__(self, node):
ty = type(node)
self.name = ty.__name__
self.namespace = ty.xmlns
def __init__(self):
self.types = {}
def find_type(self, node, parent):
def create_type(node, parent):
return type(node.name, (parent,), {'xmlns':node.namespace})
if parent not in self.types:
self.types[parent] = {}
tp = self.types[parent]
if node.name not in tp:
tp[node.name] = [create_type(node, parent)]
tpn = tp[node.name]
for t in tpn:
if t.xmlns == node.namespace:
return t
#if there's no matching namespace type, create one and add it to the list
new_type = create_type(node, parent)
tpn.append(new_type)
return new_type
def __call__(self, node, parent=object):
if isinstance(node, ipypulldom.XmlNode):
return self.find_type(node, parent)
return self.find_type(self._type_node(node), parent)
xtype = _type_factory()
def xml2py(nodelist):
def children(nodelist):
while True:
child = xml2py(nodelist)
if child is None:
break
yield child
def set_attribute(parent, child):
name = type(child).__name__
if not hasattr(parent, name):
setattr(parent, name, child)
else:
val = getattr(parent, name)
if isinstance(val, list):
val.append(child)
else:
setattr(parent, name, [val, child])
node = nodelist.next()
if node.nodeType == XmlNodeType.EndElement:
return None
elif node.nodeType == XmlNodeType.Text or node.nodeType == XmlNodeType.CDATA:
return node.value
elif node.nodeType == XmlNodeType.Element:
#create a new object type named for the element name
cur = xtype(node)()
cur._nodetype = XmlNodeType.Element
#collect all the attributes and children in lists
attributes = [xtype(attr, str)(attr.value) for attr in node.attributes]
children = [child for child in children(nodelist)]
if len(children) == 1 and isinstance(children[0], str):
#fold up elements with a single text node
cur = xtype(cur, str)(children[0])
cur._nodetype = XmlNodeType.Element
else:
#otherwise, add child elements as properties on the current node
for child in children:
set_attribute(cur, child)
for attr in attributes:
attr._nodetype = XmlNodeType.Attribute
set_attribute(cur, attr)
return cur
def parse(xml):
return xml2py(ipypulldom.parse(xml))
def parseString(xml):
return xml2py(ipypulldom.parseString(xml))
if __name__ == '__main__':
rss = parse('http://feeds.feedburner.com/Devhawk')
for item in rss.channel.item:
print item.title