diff --git a/requirements.txt b/requirements.txt index 290cd98..62be83b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ # Optionals jinja2 pyyaml +puremagic # Packaging twine diff --git a/setup.py b/setup.py index ebc4b01..a852a74 100755 --- a/setup.py +++ b/setup.py @@ -44,6 +44,7 @@ def get_packages(package): extras_require={ "jinja2": ["jinja2"], "pyyaml": ["pyyaml"], + "puremagic": ["puremagic"], }, python_requires='>=3.6', classifiers=[ diff --git a/tests/image/head.jpeg b/tests/image/head.jpeg new file mode 100644 index 0000000..aa8d72c Binary files /dev/null and b/tests/image/head.jpeg differ diff --git a/tests/test_fields.py b/tests/test_fields.py index 3b0fb49..e96961b 100644 --- a/tests/test_fields.py +++ b/tests/test_fields.py @@ -16,7 +16,9 @@ DateTime, Decimal, Email, + File, Float, + Image, Integer, IPAddress, Number, @@ -889,3 +891,40 @@ def test_url(): validator = URL() value, error = validator.validate_or_error("example") assert error == ValidationError(text="Must be a real URL.", code="invalid") + + +def test_file(): + validator = File() + with open("test.txt", "w") as f: + f.write("123") + with open("test.txt") as f: + value, error = validator.validate_or_error(f) + assert value == f + + validator = File() + value, error = validator.validate_or_error(None) + assert error == ValidationError(text="Must be a file descriptor.", code="type") + + +def test_image(): + validator = Image(image_types=["png"]) + value, error = validator.validate_or_error(None) + assert error == ValidationError(text="Must be a file descriptor.", code="type") + + with open("test.png", "wb") as f: + f.write(b"\211PNG\r\n\032\nxxxxxxxxxxxxxxxxxxxxxxxy") + + with open("test.png", "rb") as f: + validator = Image(image_types=["png"]) + value, error = validator.validate_or_error(f) + assert error == ValidationError( + text="Do not support this image type.", code="image_types" + ) + validator = Image() + value, error = validator.validate_or_error(f) + assert value is f + + with open("./tests/image/head.jpeg", "rb") as f: + validator = Image(image_types=["jfif"]) + value, error = validator.validate_or_error(f) + assert value == f diff --git a/typesystem/fields.py b/typesystem/fields.py index 2d47d5a..9562e82 100644 --- a/typesystem/fields.py +++ b/typesystem/fields.py @@ -1,4 +1,5 @@ import decimal +import io import re import typing from math import isfinite @@ -7,6 +8,13 @@ from typesystem.base import Message, ValidationError, ValidationResult from typesystem.unique import Uniqueness +try: + # check the image type. + import puremagic +except ImportError: + puremagic = None + + NO_DEFAULT = object() FORMATS = { @@ -385,7 +393,7 @@ def validate(self, value: typing.Any) -> typing.Any: return None elif value is None: raise self.validation_error("null") - elif value not in Uniqueness([key for key, value in self.choices]): + elif value not in Uniqueness([key for key, val in self.choices]): if value == "": if self.allow_null and self.coerce_types: return None @@ -749,7 +757,7 @@ def validate(self, value: typing.Any) -> typing.Any: class Const(Field): """ - Only ever matches the given given value. + Only ever matches the given value. """ errors = {"only_null": "Must be null.", "const": "Must be the value '{const}'."} @@ -790,3 +798,53 @@ def __init__(self, **kwargs: typing.Any) -> None: class URL(String): def __init__(self, **kwargs: typing.Any) -> None: super().__init__(format="url", **kwargs) + + +class File(Field): + errors = { + "type": "Must be a file descriptor.", + } + value_types = ( + io.BufferedReader, + io.TextIOWrapper, + io.BufferedRandom, + io.BufferedWriter, + ) + + def __init__(self, **kwargs: typing.Any) -> None: + super().__init__(**kwargs) + + def validate(self, value: typing.Any) -> typing.Any: + if not isinstance(value, self.value_types): + raise self.validation_error("type") + return value + + +class Image(File): + errors = { + "type": "Must be a file descriptor.", + "image_types": "Do not support this image type.", + } + + def __init__( + self, image_types: typing.List[str] = None, **kwargs: typing.Any + ) -> None: + + super().__init__(**kwargs) + self.image_types = image_types + + def validate(self, value: typing.Any) -> typing.Any: + value = super().validate(value) + if self.image_types is None: + return value + + assert puremagic is not None, "'puremagic' must be installed." + try: + image_type: typing.Optional[str] = puremagic.from_stream(value) + except Exception: + image_type = None + + image_type = (image_type or "").strip(".") + if not image_type or image_type not in self.image_types: + raise self.validation_error("image_types") + return value diff --git a/typesystem/json_schema.py b/typesystem/json_schema.py index b03135e..321b20f 100644 --- a/typesystem/json_schema.py +++ b/typesystem/json_schema.py @@ -408,7 +408,7 @@ def to_json_schema( elif isinstance(arg, NeverMatch): return False - field: typing.Optional[Field] + field: typing.Optional[Field] = None data: dict = {} is_root = _definitions is None definitions = {} if _definitions is None else _definitions @@ -416,7 +416,6 @@ def to_json_schema( if isinstance(arg, Field): field = arg elif isinstance(arg, Definitions): - field = None for key, value in arg.items(): definitions[key] = to_json_schema(value, _definitions=definitions)