diff --git a/CHANGELOG.md b/CHANGELOG.md index 7126b0a9..d1b17a44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [2.4.12] - (Unreleased) +### Added + +- Added `passwd` argument and `setpassword` for ReadZipFS to extract password + protected date from zip file. [#360](https://github.com/PyFilesystem/pyfilesystem2/issues/360) + ### Changed - Start testing on PyPy. Due to [#342](https://github.com/PyFilesystem/pyfilesystem2/issues/342) diff --git a/fs/errors.py b/fs/errors.py index b70b62e3..6379bce2 100644 --- a/fs/errors.py +++ b/fs/errors.py @@ -41,6 +41,7 @@ "NoURL", "OperationFailed", "OperationTimeout", + "PasswordUnsupported", "PathError", "PermissionDenied", "RemoteConnectionError", @@ -255,6 +256,13 @@ class RemoveRootError(OperationFailed): default_message = "root directory may not be removed" +class PasswordUnsupported(Unsupported): + """Attempt to create a password protected zip file. + """ + + default_message = "can not create password protected zip" + + class ResourceError(FSError): """Base exception class for error associated with a specific resource. """ diff --git a/fs/opener/zipfs.py b/fs/opener/zipfs.py index 81e48455..64dd7b7e 100644 --- a/fs/opener/zipfs.py +++ b/fs/opener/zipfs.py @@ -38,5 +38,10 @@ def open_fs( if not create and writeable: raise NotWriteable("Unable to open existing ZIP file for writing") - zip_fs = ZipFS(parse_result.resource, write=create) + + password = parse_result.params.get("password") + if password is not None: + password = password.encode() + + zip_fs = ZipFS(parse_result.resource, write=create, password=password) return zip_fs diff --git a/fs/zipfs.py b/fs/zipfs.py index 8feb9e56..4e23a629 100644 --- a/fs/zipfs.py +++ b/fs/zipfs.py @@ -44,12 +44,14 @@ class _ZipExtFile(RawWrapper): - def __init__(self, fs, name): - # type: (ReadZipFS, Text) -> None + def __init__(self, fs, name, password=None): + # type: (ReadZipFS, Text, Optional[bytes]) -> None + if password is not None: + _password_type_check(password) self._zip = _zip = fs._zip self._end = _zip.getinfo(name).file_size self._pos = 0 - super(_ZipExtFile, self).__init__(_zip.open(name), "r", name) + super(_ZipExtFile, self).__init__(_zip.open(name, pwd=password), "r", name) def read(self, size=-1): # type: (int) -> bytes @@ -160,6 +162,8 @@ class ZipFS(WrapFS): defined in the `zipfile` module in the stdlib). temp_fs (str): An FS URL for the temporary filesystem used to store data prior to zipping. + password (bytes): Password for extracting file from zip file. Only used + for read mode. """ @@ -171,16 +175,19 @@ def __new__( # type: ignore compression=zipfile.ZIP_DEFLATED, # type: int encoding="utf-8", # type: Text temp_fs="temp://__ziptemp__", # type: Text + password=None, # type: Optional[bytes] ): # type: (...) -> FS # This magic returns a different class instance based on the # value of the ``write`` parameter. if write: + if password is not None: + raise errors.PasswordUnsupported() return WriteZipFS( file, compression=compression, encoding=encoding, temp_fs=temp_fs ) else: - return ReadZipFS(file, encoding=encoding) + return ReadZipFS(file, encoding=encoding, password=password) if typing.TYPE_CHECKING: @@ -191,6 +198,7 @@ def __init__( compression=zipfile.ZIP_DEFLATED, # type: int encoding="utf-8", # type: Text temp_fs="temp://__ziptemp__", # type: Text + password=None, # type: Optional[bytes] ): # type: (...) -> None pass @@ -290,13 +298,15 @@ class ReadZipFS(FS): } @errors.CreateFailed.catch_all - def __init__(self, file, encoding="utf-8"): - # type: (Union[BinaryIO, Text], Text) -> None + def __init__(self, file, encoding="utf-8", password=None): + # type: (Union[BinaryIO, Text], Text, Optional[bytes]) -> None super(ReadZipFS, self).__init__() self._file = file self.encoding = encoding self._zip = zipfile.ZipFile(file, "r") self._directory_fs = None # type: Optional[MemoryFS] + if password is not None: + self.setpassword(password) def __repr__(self): # type: () -> Text @@ -409,8 +419,8 @@ def makedir( self.check() raise errors.ResourceReadOnly(path) - def openbin(self, path, mode="r", buffering=-1, **kwargs): - # type: (Text, Text, int, **Any) -> BinaryIO + def openbin(self, path, mode="r", buffering=-1, password=None, **kwargs): + # type: (Text, Text, int, Optional[bytes], **Any) -> BinaryIO self.check() if "w" in mode or "+" in mode or "a" in mode: raise errors.ResourceReadOnly(path) @@ -421,7 +431,7 @@ def openbin(self, path, mode="r", buffering=-1, **kwargs): raise errors.FileExpected(path) zip_name = self._path_to_zip_name(path) - return _ZipExtFile(self, zip_name) # type: ignore + return _ZipExtFile(self, zip_name, password) # type: ignore def remove(self, path): # type: (Text) -> None @@ -439,13 +449,15 @@ def close(self): if hasattr(self, "_zip"): self._zip.close() - def readbytes(self, path): - # type: (Text) -> bytes + def readbytes(self, path, password=None): + # type: (Text, Optional[bytes]) -> bytes self.check() if not self._directory.isfile(path): raise errors.ResourceNotFound(path) + if password is not None: + _password_type_check(password) zip_name = self._path_to_zip_name(path) - zip_bytes = self._zip.read(zip_name) + zip_bytes = self._zip.read(zip_name, pwd=password) return zip_bytes def geturl(self, path, purpose="download"): @@ -456,3 +468,16 @@ def geturl(self, path, purpose="download"): return "zip://{}!/{}".format(quoted_file, quoted_path) else: raise errors.NoURL(path, purpose) + + def setpassword(self, password): + # type: (bytes) -> None + """Set *password* as default password to extract encrypted files. + """ + _password_type_check(password) + self._zip.setpassword(password) + + +def _password_type_check(password): + if isinstance(password, six.binary_type): + return + raise TypeError("except bytes for password, not " + type(password).__name__) diff --git a/tests/test_zipfs.py b/tests/test_zipfs.py index 9b2e82ea..0f438aec 100644 --- a/tests/test_zipfs.py +++ b/tests/test_zipfs.py @@ -1,6 +1,7 @@ # -*- encoding: UTF-8 from __future__ import unicode_literals +import codecs import os import sys import tempfile @@ -13,7 +14,7 @@ from fs.compress import write_zip from fs.opener import open_fs from fs.opener.errors import NotWriteable -from fs.errors import NoURL +from fs.errors import NoURL, PasswordUnsupported from fs.test import FSTestCases from fs.enums import Seek @@ -40,6 +41,10 @@ def test_unicode_paths(self): with zip_fs.openbin(path) as f: f.read() + def test_create_password(self): + with self.assertRaises(PasswordUnsupported): + zipfs.ZipFS(self._temp_path, write=True, password="hello") + class TestWriteZipFS(FSTestCases, unittest.TestCase): """ @@ -220,7 +225,64 @@ def test_implied(self): os.remove(path) +class TestPasswordReadZipFS(unittest.TestCase): + + ZIP_BIN = ( + b"UEsDBAoACQAAAH2whk8tOwivGAAAAAwAAAADABwAZm9vVVQJAAPNX+pdzl/qXXV4CwABBPUBAAAE" + b"FAAAAJ6pj1kohibjIq4YqnEKUZ8SCJMeUkl9oVBLBwgtOwivGAAAAAwAAABQSwECHgMKAAkAAAB9" + b"sIZPLTsIrxgAAAAMAAAAAwAYAAAAAAABAAAApIEAAAAAZm9vVVQFAAPNX+pddXgLAAEE9QEAAAQU" + b"AAAAUEsFBgAAAAABAAEASQAAAGUAAAAAAA==" + ) + + PASSWD = b"P@ssw0rd" + + def setUp(self): + fh, path = tempfile.mkstemp("testzip.zip") + os.write(fh, codecs.decode(self.ZIP_BIN, "base64")) + os.close(fh) + self.path = path + + def tearDown(self): + os.remove(self.path) + + def test_openbin(self): + with zipfs.ReadZipFS(self.path, password=self.PASSWD) as zip_fs: + with zip_fs.openbin("foo") as fp: + self.assertEqual(fp.read(), b"hello world\n") + + with zipfs.ReadZipFS(self.path) as zip_fs: + with zip_fs.openbin("foo", password=self.PASSWD) as fp: + self.assertEqual(fp.read(), b"hello world\n") + + def test_readbytes(self): + with zipfs.ReadZipFS(self.path, password=self.PASSWD) as zip_fs: + self.assertEqual(zip_fs.readbytes("foo"), b"hello world\n") + + with zipfs.ReadZipFS(self.path) as zip_fs: + self.assertEqual( + zip_fs.readbytes("foo", password=self.PASSWD), b"hello world\n" + ) + + def test_setpassword(self): + with zipfs.ReadZipFS(self.path) as zip_fs: + with self.assertRaises(RuntimeError): + zip_fs._zip.read("foo") + + zip_fs.setpassword(self.PASSWD) + self.assertEqual(zip_fs._zip.read("foo"), b"hello world\n") + + +class TestPasswordTypeCheck(unittest.TestCase): + def test_raise(self): + with self.assertRaises(TypeError): + zipfs._password_type_check("string") + + zipfs._password_type_check(b"bytes") + + class TestOpener(unittest.TestCase): def test_not_writeable(self): with self.assertRaises(NotWriteable): open_fs("zip://foo.zip", writeable=True) + + open_fs("zip://foo.zip?password=1234")