Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate validation of RECORD #147

Merged
merged 12 commits into from
Dec 9, 2022
94 changes: 87 additions & 7 deletions src/installer/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import stat
import zipfile
from contextlib import contextmanager
from typing import BinaryIO, Iterator, List, Tuple, cast
from typing import BinaryIO, ClassVar, Iterator, List, Tuple, Type, cast

from installer.records import parse_record_file
from installer.records import RecordEntry, parse_record_file
from installer.utils import parse_wheel_filename

WheelContentElement = Tuple[Tuple[str, str, str], BinaryIO, bool]
Expand All @@ -22,6 +22,8 @@ class WheelSource:
This is an abstract class, whose methods have to be implemented by subclasses.
"""

validation_error: ClassVar[Type[Exception]] = ValueError

def __init__(self, distribution: str, version: str) -> None:
"""Initialize a WheelSource object.

Expand Down Expand Up @@ -65,6 +67,14 @@ def read_dist_info(self, filename: str) -> str:
"""
raise NotImplementedError

def validate_record(self) -> None:
"""Validate ``RECORD`` of the wheel.

This method should be called before :py:func:`install <installer.install>`
if validation is required.
"""
raise NotImplementedError

def get_contents(self) -> Iterator[WheelContentElement]:
"""Sequential access to all contents of the wheel (including dist-info files).

Expand All @@ -91,6 +101,17 @@ def get_contents(self) -> Iterator[WheelContentElement]:
raise NotImplementedError


class _WheelFileValidationError(ValueError):
"""Raised when a wheel file fails validation."""

def __init__(self, issues: List[str]) -> None: # noqa: D107
super().__init__(repr(issues))
self.issues = issues

def __repr__(self) -> str:
return f"WheelFileValidationError(issues={self.issues!r})"


class WheelFile(WheelSource):
"""Implements `WheelSource`, for an existing file from the filesystem.

Expand All @@ -100,6 +121,8 @@ class WheelFile(WheelSource):
... installer.install(source, destination)
"""

validation_error = _WheelFileValidationError

def __init__(self, f: zipfile.ZipFile) -> None:
"""Initialize a WheelFile object.

Expand Down Expand Up @@ -138,6 +161,66 @@ def read_dist_info(self, filename: str) -> str:
path = posixpath.join(self.dist_info_dir, filename)
return self._zipfile.read(path).decode("utf-8")

def validate_record(self, validate_file: bool = True) -> None:
"""Validate ``RECORD`` of the wheel.

This method should be called before :py:func:`install <installer.install>`
if validation is required.

File names will always be validated against ``RECORD``.
If ``validate_file`` is true, every file in the archive will be validated
against corresponding entry in ``RECORD``.

:param validate_file: Whether to validate content integrity.
"""
try:
record_lines = self.read_dist_info("RECORD").splitlines()
records = parse_record_file(record_lines)
except Exception as exc:
raise _WheelFileValidationError(
[f"Unable to retrieve `RECORD` from {self._zipfile.filename}: {exc!r}"]
) from exc

record_mapping = {record[0]: record for record in records}
issues: List[str] = []

for item in self._zipfile.infolist():
if item.filename[-1:] == "/": # looks like a directory
continue

record_args = record_mapping.pop(item.filename, None)

if self.dist_info_dir == posixpath.commonprefix(
[self.dist_info_dir, item.filename]
) and item.filename.split("/")[-1] in ("RECORD.p7s", "RECORD.jws"):
# both are for digital signatures, and not mentioned in RECORD
continue

if record_args is None:
issues.append(
f"In {self._zipfile.filename}, {item.filename} is not mentioned in RECORD"
)
continue

if item.filename == f"{self.dist_info_dir}/RECORD":
# Skip record file
continue
if validate_file:
record = RecordEntry.from_elements(*record_args)
data = self._zipfile.read(item)
if record.hash_ is None or record.size is None:
# Report empty hash / size
issues.append(
f"In {self._zipfile.filename}, hash / size of {item.filename} is not included in RECORD"
)
elif not record.validate(data):
issues.append(
f"In {self._zipfile.filename}, hash / size of {item.filename} didn't match RECORD"
)

if issues:
raise _WheelFileValidationError(issues)

def get_contents(self) -> Iterator[WheelContentElement]:
"""Sequential access to all contents of the wheel (including dist-info files).

Expand All @@ -154,11 +237,8 @@ def get_contents(self) -> Iterator[WheelContentElement]:
if item.filename[-1:] == "/": # looks like a directory
continue

record = record_mapping.pop(item.filename, None)
assert record is not None, "In {}, {} is not mentioned in RECORD".format(
self._zipfile.filename,
item.filename,
) # should not happen for valid wheels
# Pop record with empty default, because validation is handled by `validate_record`
record = record_mapping.pop(item.filename, (item.filename, "", ""))

# Borrowed from:
# https://github.com/pypa/pip/blob/0f21fb92/src/pip/_internal/utils/unpacking.py#L96-L100
Expand Down
16 changes: 7 additions & 9 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,14 @@ def main():
Platform: UNKNOWN
Classifier: Intended Audience :: Developers
""",
# The RECORD file is indirectly validated by the WheelFile, since it only
# provides the items that are a part of the wheel.
"fancy-1.0.0.dist-info/RECORD": b"""\
fancy/__init__.py,,
fancy/__main__.py,,
fancy-1.0.0.data/data/fancy/data.py,,
fancy-1.0.0.dist-info/top_level.txt,,
fancy-1.0.0.dist-info/entry_points.txt,,
fancy-1.0.0.dist-info/WHEEL,,
fancy-1.0.0.dist-info/METADATA,,
fancy/__init__.py,sha256=qZ2qq7xVBAiUFQVv-QBHhdtCUF5p1NsWwSOiD7qdHN0,36
fancy/__main__.py,sha256=Wd4SyWJOIMsHf_5-0oN6aNFwen8ehJnRo-erk2_K-eY,61
fancy-1.0.0.data/data/fancy/data.py,sha256=nuFRUNQF5vP7FWE-v5ysyrrfpIaAvfzSiGOgfPpLOeI,17
fancy-1.0.0.dist-info/top_level.txt,sha256=SW-yrrF_c8KlserorMw54inhLjZ3_YIuLz7fYT4f8ao,6
fancy-1.0.0.dist-info/entry_points.txt,sha256=AxJl21_zgoNWjCfvSkC9u_rWSzGyCtCzhl84n979jCc,75
fancy-1.0.0.dist-info/WHEEL,sha256=1DrXMF1THfnBjsdS5sZn-e7BKcmUn7jnMbShGeZomgc,84
fancy-1.0.0.dist-info/METADATA,sha256=hRhZavK_Y6WqKurFFAABDnoVMjZFBH0NJRjwLOutnJI,236
fancy-1.0.0.dist-info/RECORD,,
""",
}
Expand Down
4 changes: 4 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ def dist_info_filenames(self):
def read_dist_info(self, filename):
return self.dist_info_files[filename]

def validate_record(self) -> None:
# Skip validation since the logic is different.
return

def get_contents(self):
# Sort for deterministic behaviour for Python versions that do not preserve
# insertion order for dictionaries.
Expand Down
139 changes: 139 additions & 0 deletions tests/test_sources.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import json
import posixpath
import zipfile
from base64 import urlsafe_b64encode
from hashlib import sha256

import pytest

Expand Down Expand Up @@ -30,6 +33,9 @@ def test_raises_not_implemented_error(self):
with pytest.raises(NotImplementedError):
source.get_contents()

with pytest.raises(NotImplementedError):
source.validate_record()


class TestWheelFile:
def test_rejects_not_okay_name(self, tmp_path):
Expand Down Expand Up @@ -92,3 +98,136 @@ def test_provides_correct_contents(self, fancy_wheel):

assert sorted(got_records) == sorted(expected_records)
assert got_files == files


def replace_file_in_zip(path: str, filename: str, content: "bytes | None") -> None:
"""Helper function for replacing a file in the zip.

Exists because ZipFile doesn't support remove.
"""
files = {}
# Copy everything except `filename`, and replace it with `content`.
with zipfile.ZipFile(path) as archive:
for file in archive.namelist():
if file == filename:
if content is None:
continue # Remove the file
files[file] = content
else:
files[file] = archive.read(file)
# Replace original archive
with zipfile.ZipFile(path, mode="w") as archive:
for name, content in files.items():
archive.writestr(name, content)


def test_rejects_no_record_on_validate(fancy_wheel):
# Remove RECORD
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=None,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error, match="Unable to retrieve `RECORD`"
):
source.validate_record()


def test_rejects_record_missing_file_on_validate(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

# Remove the first two entries from the RECORD file
new_record_file_contents = b"\n".join(record_file_contents.split(b"\n")[2:])
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(WheelFile.validation_error, match="not mentioned in RECORD"):
source.validate_record()


def test_rejects_record_missing_hash(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

new_record_file_contents = b"\n".join(
line.split(b",")[0] + b",," # file name with empty size and hash
for line in record_file_contents.split(b"\n")
)
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="hash / size of (.+) is not included in RECORD",
):
source.validate_record()


def test_accept_record_missing_hash_on_skip_validation(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

new_record_file_contents = b"\n".join(
line.split(b",")[0] + b",," # file name with empty size and hash
for line in record_file_contents.split(b"\n")
)
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
source.validate_record(validate_file=False)


def test_accept_wheel_with_signed_file(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()
hash_b64_nopad = (
urlsafe_b64encode(sha256(record_file_contents).digest())
.decode("utf-8")
.rstrip("=")
)
jws_content = json.dumps({"hash": f"sha256={hash_b64_nopad}"})
with zipfile.ZipFile(fancy_wheel, "a") as archive:
archive.writestr("fancy-1.0.0.dist-info/RECORD.jws", jws_content)
with WheelFile.open(fancy_wheel) as source:
source.validate_record()


def test_rejects_record_validation_failed(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

new_record_file_contents = b"\n".join(
line.split(b",")[0] # Original filename
+ b",sha256=pREiHcl39jRySUXMCOrwmSsnOay8FB7fOJP5mZQ3D3A,"
+ line.split(b",")[2] # Original size
for line in record_file_contents.split(b"\n")
if line # ignore trail endline
)
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="hash / size of (.+) didn't match RECORD",
):
source.validate_record()