import sys
import os
import re
import zipfile
import tarfile
import io
__all__ = ["open_zipfile", "open_tarfile", "open_url", "Resources"]
# Python 3.x workarounds for the changed urllib modules.
if sys.version_info[0] >= 3:
import urllib.parse as urlparse
import urllib.request as urllib2
else:
import urlparse
import urllib2
def _validate_path(path, what, write=False):
fullpath = os.path.abspath(path)
fname = os.path.basename(path)
if write:
parent = os.path.abspath(os.path.join(fullpath, os.pardir))
if not os.path.isdir(parent):
e = "The given parent directory '{0}' does not exist"
raise IOError(e.format(parent))
else:
if not os.path.exists(fullpath):
e = "Could not find {0} at the given path: {1}"
raise IOError(e.format(what, fullpath))
return (fullpath, fname)
[docs]def open_zipfile(archive, filename, directory=None):
"""Retrieves a given file from a ZIP archive.
Args:
archive (:obj:`~zipfile.ZipFile`, str): The ZipFile object or path to
the ZIP archive containing the desired file.
filename (str): The name of the file to retrieve from the archive.
directory (str, optional): The path to the directory within the archive
containing the file to retrieve. Defaults to the root level of the
archive.
Returns:
:obj:`~io.BytesIO`: A Python bytestream object containing the requested
file.
Raises:
KeyError: If the given file could not be found within the archive.
TypeError: If the archive is not a valid ZIP archive.
"""
data = None
opened = False
if not isinstance(archive, zipfile.ZipFile):
if not zipfile.is_zipfile(archive):
raise TypeError("passed file does not seem to be a ZIP archive")
else:
archive = zipfile.ZipFile(archive, 'r')
opened = True
apath = filename
if directory:
apath = "%s/%s" % (directory, filename)
try:
dmpdata = archive.open(apath)
data = io.BytesIO(dmpdata.read())
finally:
if opened:
archive.close()
return data
[docs]def open_tarfile(archive, filename, directory=None, ftype=None):
"""Retrieves a given file from a TAR archive.
If the TAR archive uses ``.tar.gz`` or ``.tar.bz2`` compression and the
file name does not contain either of these identifiers, the compression
type must be manually specified.
Args:
archive (:obj:`~tarfile.TarFile`, str): The TarFile object or path to
the TAR archive containing the desired file.
filename (str): The name of the file to retrieve from the archive.
directory (str, optional): The path to the directory within the archive
containing the file to retrieve. Defaults to the root level of the
archive.
ftype (str, optional): The compression type (if any) used for the TAR
file, can be either 'gz', 'bz2', or None (no compression). If not
specified, will default to assuming no compression.
Returns:
:obj:`~io.BytesIO`: A Python bytestream object containing the requested
file.
Raises:
KeyError: If the given file could not be found within the archive.
TypeError: If the archive is not a supported TAR archive.
"""
data = None
opened = False
if not isinstance(archive, tarfile.TarFile):
if not tarfile.is_tarfile(archive):
raise TypeError("passed file does not seem to be a TAR archive")
else:
file_ext = archive.split('.')[-1]
if not ftype and file_ext in ('gz', 'bz2'):
ftype = file_ext
if ftype and ftype not in ('gz', 'bz2'):
e = "invalid TAR compression type '{0}' (must be 'gz' or 'bz2')"
raise TypeError(e.format(ftype))
mode = 'r:{0}'.format(ftype) if ftype else 'r'
archive = tarfile.open(archive, mode)
opened = True
apath = filename
if directory:
apath = "%s/%s" % (directory, filename)
try:
dmpdata = archive.extractfile(apath)
data = io.BytesIO(dmpdata.read())
finally:
if opened:
archive.close()
return data
def open_url(filename, basepath=None):
# Opens and reads a certain file from a web or remote location.
# Deprecated because its argument names are confusing and because
# users are probably better off using urllib directly.
url = filename
if basepath:
url = urlparse.urljoin(basepath, filename)
return urllib2.urlopen(url)
[docs]class Resources(object):
"""A container class for managing application resource files.
This class eases access to resources by allowing access using relative
paths, scanning archives to locate files, and more.
Args:
path (str, optional): The path of a resource directory with which to
initialze the container. Defaults to ``None``.
subdir (str, optional): Deprecated, do not use.
excludepattern (str, optional): A regular expression indicating
which directories (if any) to ignore if initializing the
container with a resource path. Defaults to ``None``.
"""
def __init__(self, path=None, subdir=None, excludepattern=None):
self.files = {}
if path:
self.scan(path, subdir, excludepattern)
def _scanzip(self, filename):
"""Scans the passed ZIP archive and indexes all the files
contained by it.
"""
if not zipfile.is_zipfile(filename):
raise TypeError("file '%s' is not a valid ZIP archive" % filename)
archname = os.path.abspath(filename)
zipf = zipfile.ZipFile(filename, 'r')
for path in zipf.namelist():
fname = os.path.split(path)[1]
if fname:
self.files[fname] = (archname, 'zip', path)
zipf.close()
def _scantar(self, filename, ftype=None):
"""Scans the passed TAR archive and indexes all the files
contained by it.
"""
if not tarfile.is_tarfile(filename):
raise TypeError("file '%s' is not a valid TAR archive" % filename)
file_ext = filename.split('.')[-1]
if not ftype and file_ext in ('gz', 'bz2'):
ftype = file_ext
if ftype and ftype not in ('gz', 'bz2'):
e = "invalid TAR compression type '{0}' (must be 'gz' or 'bz2')"
raise TypeError(e.format(ftype))
mode = 'r:{0}'.format(ftype) if ftype else 'r'
archname = os.path.abspath(filename)
archtype = 'tar'
if ftype:
archtype = 'tar%s' % ftype
tar = tarfile.open(filename, mode)
for path in tar.getnames():
fname = os.path.split(path)[1]
self.files[fname] = (archname, archtype, path)
tar.close()
[docs] def add(self, filename):
"""Adds a file to the Resources container.
If the given file is a supported archive, its contents will be scanned
and added to the container.
Args:
filename (str): The filepath of the resource to add to the
container.
Raises:
ValueError: If the file does not exist at the provided path.
"""
if not os.path.exists(filename):
raise ValueError("invalid file path")
if zipfile.is_zipfile(filename):
self.add_archive(filename)
elif tarfile.is_tarfile(filename):
self.add_archive(filename, 'tar')
else:
self.add_file(filename)
[docs] def add_file(self, filename):
"""Adds a file without scanning to the Resources container.
Unlike :meth:`add`, this method will not attempt to add the contents
of any provided archives to the container.
Args:
filename (str): The filepath of the resource to add to the
container.
Raises:
ValueError: If the file does not exist at the provided path.
"""
if not os.path.exists(filename):
raise ValueError("invalid file path")
abspath = os.path.abspath(filename)
fname = os.path.split(abspath)[1]
if not fname:
raise ValueError("invalid file path")
self.files[fname] = (None, None, abspath)
[docs] def add_archive(self, filename, typehint='zip'):
"""Adds a ``.zip`` or ``.tar`` archive to the container.
This will scan the passed archive and add its contents to the
list of available resources. Currently ``.zip``, ``.tar``,
``.tar.bz2``, and ``.tar.gz`` formats are supported.
Args:
filename (str): The filepath of the archive to scan and add to the
container.
typehint (str, optional): The format of the archive to add to the
container, required if using a custom file extension. Must be
one of ``zip``, ``tar``, ``tarbz2``, or ``targz``. Defaults to
``zip`` if not specified.
Raises:
ValueError: If the file does not exist at the provided path, or if
the file is not a supported archive type.
"""
if not os.path.exists(filename):
raise ValueError("invalid file path")
fname = os.path.basename(filename)
if 'zip' in fname.split('.'):
self._scanzip(filename)
elif 'tar' in fname.split('.'):
self._scantar(filename)
else:
if typehint == 'zip':
self._scanzip(filename)
elif typehint == 'tar':
self._scantar(filename)
elif typehint == 'tarbz2':
self._scantar(filename, 'bz2')
elif typehint == 'targz':
self._scantar(filename, 'gz')
else:
raise ValueError("unsupported archive type")
[docs] def get(self, filename):
"""Retrieves a resource file by name from the container.
Args:
filename (str): The file name of the resource to retrieve.
Returns:
:obj:`~io.BytesIO`: A Python bytestream object containing the
retrieved resource file.
Raises:
KeyError: If the given file could not be found.
"""
archive, ftype, pathname = self.files[filename]
if archive:
if ftype == 'zip':
return open_zipfile(archive, pathname)
elif ftype == 'tar':
return open_tarfile(archive, pathname)
elif ftype == 'tarbz2':
return open_tarfile(archive, pathname, ftype='bz2')
elif ftype == 'targz':
return open_tarfile(archive, pathname, ftype='gz')
else:
raise ValueError("unsupported archive type")
dmpdata = open(pathname, 'rb')
data = io.BytesIO(dmpdata.read())
dmpdata.close()
return data
def get_filelike(self, filename):
# Deprecated, doesn't make much difference in Python 3
archive, ftype, pathname = self.files[filename]
if archive:
if ftype == 'zip':
return open_zipfile(archive, pathname)
elif ftype == 'tar':
return open_tarfile(archive, pathname)
elif ftype == 'tarbz2':
return open_tarfile(archive, pathname, ftype='bz2')
elif ftype == 'targz':
return open_tarfile(archive, pathname, ftype='gz')
else:
raise ValueError("unsupported archive type")
return open(pathname, 'rb')
[docs] def get_path(self, filename):
"""Gets the path of a given resource file.
If the file is only available within an archive, a string in the form
``filename@archivename`` will be returned.
Args:
filename (str): The file name of the resource to locate.
Returns:
str: The absolute path of the resource file, or the archive
identifier string if the resource is inside an archive.
Raises:
KeyError: If the given file could not be found.
"""
archive, ftype, pathname = self.files[filename]
if archive:
return '%s@%s' % (pathname, archive)
return pathname
[docs] def scan(self, path, subdir=None, excludepattern=None):
"""Scans a path, adding all matching files to the container.
If a located file is a ``.zip`` or ``.tar`` archive, its
contents will be indexed and added to the container automatically.
Args:
path (str): The path of the directory to scan.
subdir (str, optional): Deprecated, do not use.
excludepattern (str, optional): A regular expression indicating
which directories (if any) within the file structure of the
given path to exclude from indexing. Defaults to ``None``.
Raises:
ValueError: If the specified path does not exist.
"""
match = None
if excludepattern:
match = re.compile(excludepattern).match
join = os.path.join
add = self.add
abspath = os.path.abspath(path)
if not os.path.exists(abspath):
raise ValueError("invalid path '%s'" % abspath)
if not os.path.isdir(abspath):
abspath = os.path.dirname(abspath)
if subdir is not None:
abspath = os.path.join(abspath, subdir)
if not os.path.exists(abspath):
raise ValueError("invalid path '%s'" % abspath)
for (pdir, dirnames, filenames) in os.walk(abspath):
if match and match(pdir) is not None:
continue
for fname in filenames:
add(join(pdir, fname))