| from __future__ import division |
| |
| import contextlib |
| import json |
| import numbers |
| |
| try: |
| import requests |
| except ImportError: |
| requests = None |
| |
| from jsonschema import _utils, _validators |
| from jsonschema.compat import ( |
| Sequence, urljoin, urlsplit, urldefrag, unquote, urlopen, |
| str_types, int_types, iteritems, lru_cache, |
| ) |
| from jsonschema.exceptions import ErrorTree # Backwards compatibility # noqa |
| from jsonschema.exceptions import RefResolutionError, SchemaError, UnknownType |
| |
| |
| _unset = _utils.Unset() |
| |
| validators = {} |
| meta_schemas = _utils.URIDict() |
| |
| |
| def validates(version): |
| """ |
| Register the decorated validator for a ``version`` of the specification. |
| |
| Registered validators and their meta schemas will be considered when |
| parsing ``$schema`` properties' URIs. |
| |
| :argument str version: an identifier to use as the version's name |
| :returns: a class decorator to decorate the validator with the version |
| |
| """ |
| |
| def _validates(cls): |
| validators[version] = cls |
| if u"id" in cls.META_SCHEMA: |
| meta_schemas[cls.META_SCHEMA[u"id"]] = cls |
| return cls |
| return _validates |
| |
| |
| def create(meta_schema, validators=(), version=None, default_types=None): # noqa |
| if default_types is None: |
| default_types = { |
| u"array" : list, u"boolean" : bool, u"integer" : int_types, |
| u"null" : type(None), u"number" : numbers.Number, u"object" : dict, |
| u"string" : str_types, |
| } |
| |
| class Validator(object): |
| VALIDATORS = dict(validators) |
| META_SCHEMA = dict(meta_schema) |
| DEFAULT_TYPES = dict(default_types) |
| |
| def __init__( |
| self, schema, types=(), resolver=None, format_checker=None, |
| ): |
| self._types = dict(self.DEFAULT_TYPES) |
| self._types.update(types) |
| |
| if resolver is None: |
| resolver = RefResolver.from_schema(schema) |
| |
| self.resolver = resolver |
| self.format_checker = format_checker |
| self.schema = schema |
| |
| @classmethod |
| def check_schema(cls, schema): |
| for error in cls(cls.META_SCHEMA).iter_errors(schema): |
| raise SchemaError.create_from(error) |
| |
| def iter_errors(self, instance, _schema=None): |
| if _schema is None: |
| _schema = self.schema |
| |
| scope = _schema.get(u"id") |
| if scope: |
| self.resolver.push_scope(scope) |
| try: |
| ref = _schema.get(u"$ref") |
| if ref is not None: |
| validators = [(u"$ref", ref)] |
| else: |
| validators = iteritems(_schema) |
| |
| for k, v in validators: |
| validator = self.VALIDATORS.get(k) |
| if validator is None: |
| continue |
| |
| errors = validator(self, v, instance, _schema) or () |
| for error in errors: |
| # set details if not already set by the called fn |
| error._set( |
| validator=k, |
| validator_value=v, |
| instance=instance, |
| schema=_schema, |
| ) |
| if k != u"$ref": |
| error.schema_path.appendleft(k) |
| yield error |
| finally: |
| if scope: |
| self.resolver.pop_scope() |
| |
| def descend(self, instance, schema, path=None, schema_path=None): |
| for error in self.iter_errors(instance, schema): |
| if path is not None: |
| error.path.appendleft(path) |
| if schema_path is not None: |
| error.schema_path.appendleft(schema_path) |
| yield error |
| |
| def validate(self, *args, **kwargs): |
| for error in self.iter_errors(*args, **kwargs): |
| raise error |
| |
| def is_type(self, instance, type): |
| if type not in self._types: |
| raise UnknownType(type, instance, self.schema) |
| pytypes = self._types[type] |
| |
| # bool inherits from int, so ensure bools aren't reported as ints |
| if isinstance(instance, bool): |
| pytypes = _utils.flatten(pytypes) |
| is_number = any( |
| issubclass(pytype, numbers.Number) for pytype in pytypes |
| ) |
| if is_number and bool not in pytypes: |
| return False |
| return isinstance(instance, pytypes) |
| |
| def is_valid(self, instance, _schema=None): |
| error = next(self.iter_errors(instance, _schema), None) |
| return error is None |
| |
| if version is not None: |
| Validator = validates(version)(Validator) |
| Validator.__name__ = version.title().replace(" ", "") + "Validator" |
| |
| return Validator |
| |
| |
| def extend(validator, validators, version=None): |
| all_validators = dict(validator.VALIDATORS) |
| all_validators.update(validators) |
| return create( |
| meta_schema=validator.META_SCHEMA, |
| validators=all_validators, |
| version=version, |
| default_types=validator.DEFAULT_TYPES, |
| ) |
| |
| |
| Draft3Validator = create( |
| meta_schema=_utils.load_schema("draft3"), |
| validators={ |
| u"$ref" : _validators.ref, |
| u"additionalItems" : _validators.additionalItems, |
| u"additionalProperties" : _validators.additionalProperties, |
| u"dependencies" : _validators.dependencies, |
| u"disallow" : _validators.disallow_draft3, |
| u"divisibleBy" : _validators.multipleOf, |
| u"enum" : _validators.enum, |
| u"extends" : _validators.extends_draft3, |
| u"format" : _validators.format, |
| u"items" : _validators.items, |
| u"maxItems" : _validators.maxItems, |
| u"maxLength" : _validators.maxLength, |
| u"maximum" : _validators.maximum, |
| u"minItems" : _validators.minItems, |
| u"minLength" : _validators.minLength, |
| u"minimum" : _validators.minimum, |
| u"multipleOf" : _validators.multipleOf, |
| u"pattern" : _validators.pattern, |
| u"patternProperties" : _validators.patternProperties, |
| u"properties" : _validators.properties_draft3, |
| u"type" : _validators.type_draft3, |
| u"uniqueItems" : _validators.uniqueItems, |
| }, |
| version="draft3", |
| ) |
| |
| Draft4Validator = create( |
| meta_schema=_utils.load_schema("draft4"), |
| validators={ |
| u"$ref" : _validators.ref, |
| u"additionalItems" : _validators.additionalItems, |
| u"additionalProperties" : _validators.additionalProperties, |
| u"allOf" : _validators.allOf_draft4, |
| u"anyOf" : _validators.anyOf_draft4, |
| u"dependencies" : _validators.dependencies, |
| u"enum" : _validators.enum, |
| u"format" : _validators.format, |
| u"items" : _validators.items, |
| u"maxItems" : _validators.maxItems, |
| u"maxLength" : _validators.maxLength, |
| u"maxProperties" : _validators.maxProperties_draft4, |
| u"maximum" : _validators.maximum, |
| u"minItems" : _validators.minItems, |
| u"minLength" : _validators.minLength, |
| u"minProperties" : _validators.minProperties_draft4, |
| u"minimum" : _validators.minimum, |
| u"multipleOf" : _validators.multipleOf, |
| u"not" : _validators.not_draft4, |
| u"oneOf" : _validators.oneOf_draft4, |
| u"pattern" : _validators.pattern, |
| u"patternProperties" : _validators.patternProperties, |
| u"properties" : _validators.properties_draft4, |
| u"required" : _validators.required_draft4, |
| u"type" : _validators.type_draft4, |
| u"uniqueItems" : _validators.uniqueItems, |
| }, |
| version="draft4", |
| ) |
| |
| |
| class RefResolver(object): |
| """ |
| Resolve JSON References. |
| |
| :argument str base_uri: URI of the referring document |
| :argument referrer: the actual referring document |
| :argument dict store: a mapping from URIs to documents to cache |
| :argument bool cache_remote: whether remote refs should be cached after |
| first resolution |
| :argument dict handlers: a mapping from URI schemes to functions that |
| should be used to retrieve them |
| :arguments functools.lru_cache urljoin_cache: a cache that will be used for |
| caching the results of joining the resolution scope to subscopes. |
| :arguments functools.lru_cache remote_cache: a cache that will be used for |
| caching the results of resolved remote URLs. |
| |
| """ |
| |
| def __init__( |
| self, |
| base_uri, |
| referrer, |
| store=(), |
| cache_remote=True, |
| handlers=(), |
| urljoin_cache=None, |
| remote_cache=None, |
| ): |
| if urljoin_cache is None: |
| urljoin_cache = lru_cache(1024)(urljoin) |
| if remote_cache is None: |
| remote_cache = lru_cache(1024)(self.resolve_from_url) |
| |
| self.referrer = referrer |
| self.cache_remote = cache_remote |
| self.handlers = dict(handlers) |
| |
| self._scopes_stack = [base_uri] |
| self.store = _utils.URIDict( |
| (id, validator.META_SCHEMA) |
| for id, validator in iteritems(meta_schemas) |
| ) |
| self.store.update(store) |
| self.store[base_uri] = referrer |
| |
| self._urljoin_cache = urljoin_cache |
| self._remote_cache = remote_cache |
| |
| @classmethod |
| def from_schema(cls, schema, *args, **kwargs): |
| """ |
| Construct a resolver from a JSON schema object. |
| |
| :argument schema: the referring schema |
| :rtype: :class:`RefResolver` |
| |
| """ |
| |
| return cls(schema.get(u"id", u""), schema, *args, **kwargs) |
| |
| def push_scope(self, scope): |
| self._scopes_stack.append( |
| self._urljoin_cache(self.resolution_scope, scope), |
| ) |
| |
| def pop_scope(self): |
| try: |
| self._scopes_stack.pop() |
| except IndexError: |
| raise RefResolutionError( |
| "Failed to pop the scope from an empty stack. " |
| "`pop_scope()` should only be called once for every " |
| "`push_scope()`", |
| ) |
| |
| @property |
| def resolution_scope(self): |
| return self._scopes_stack[-1] |
| |
| @property |
| def base_uri(self): |
| uri, _ = urldefrag(self.resolution_scope) |
| return uri |
| |
| @contextlib.contextmanager |
| def in_scope(self, scope): |
| self.push_scope(scope) |
| try: |
| yield |
| finally: |
| self.pop_scope() |
| |
| @contextlib.contextmanager |
| def resolving(self, ref): |
| """ |
| Context manager which resolves a JSON ``ref`` and enters the |
| resolution scope of this ref. |
| |
| :argument str ref: reference to resolve |
| |
| """ |
| |
| url, resolved = self.resolve(ref) |
| self.push_scope(url) |
| try: |
| yield resolved |
| finally: |
| self.pop_scope() |
| |
| def resolve(self, ref): |
| url = self._urljoin_cache(self.resolution_scope, ref) |
| return url, self._remote_cache(url) |
| |
| def resolve_from_url(self, url): |
| url, fragment = urldefrag(url) |
| try: |
| document = self.store[url] |
| except KeyError: |
| try: |
| document = self.resolve_remote(url) |
| except Exception as exc: |
| raise RefResolutionError(exc) |
| |
| return self.resolve_fragment(document, fragment) |
| |
| def resolve_fragment(self, document, fragment): |
| """ |
| Resolve a ``fragment`` within the referenced ``document``. |
| |
| :argument document: the referrant document |
| :argument str fragment: a URI fragment to resolve within it |
| |
| """ |
| |
| fragment = fragment.lstrip(u"/") |
| parts = unquote(fragment).split(u"/") if fragment else [] |
| |
| for part in parts: |
| part = part.replace(u"~1", u"/").replace(u"~0", u"~") |
| |
| if isinstance(document, Sequence): |
| # Array indexes should be turned into integers |
| try: |
| part = int(part) |
| except ValueError: |
| pass |
| try: |
| document = document[part] |
| except (TypeError, LookupError): |
| raise RefResolutionError( |
| "Unresolvable JSON pointer: %r" % fragment |
| ) |
| |
| return document |
| |
| def resolve_remote(self, uri): |
| """ |
| Resolve a remote ``uri``. |
| |
| If called directly, does not check the store first, but after |
| retrieving the document at the specified URI it will be saved in |
| the store if :attr:`cache_remote` is True. |
| |
| .. note:: |
| |
| If the requests_ library is present, ``jsonschema`` will use it to |
| request the remote ``uri``, so that the correct encoding is |
| detected and used. |
| |
| If it isn't, or if the scheme of the ``uri`` is not ``http`` or |
| ``https``, UTF-8 is assumed. |
| |
| :argument str uri: the URI to resolve |
| :returns: the retrieved document |
| |
| .. _requests: http://pypi.python.org/pypi/requests/ |
| |
| """ |
| |
| scheme = urlsplit(uri).scheme |
| |
| if scheme in self.handlers: |
| result = self.handlers[scheme](uri) |
| elif ( |
| scheme in [u"http", u"https"] and |
| requests and |
| getattr(requests.Response, "json", None) is not None |
| ): |
| # Requests has support for detecting the correct encoding of |
| # json over http |
| if callable(requests.Response.json): |
| result = requests.get(uri).json() |
| else: |
| result = requests.get(uri).json |
| else: |
| # Otherwise, pass off to urllib and assume utf-8 |
| result = json.loads(urlopen(uri).read().decode("utf-8")) |
| |
| if self.cache_remote: |
| self.store[uri] = result |
| return result |
| |
| |
| def validator_for(schema, default=_unset): |
| if default is _unset: |
| default = Draft4Validator |
| return meta_schemas.get(schema.get(u"$schema", u""), default) |
| |
| |
| def validate(instance, schema, cls=None, *args, **kwargs): |
| """ |
| Validate an instance under the given schema. |
| |
| >>> validate([2, 3, 4], {"maxItems" : 2}) |
| Traceback (most recent call last): |
| ... |
| ValidationError: [2, 3, 4] is too long |
| |
| :func:`validate` will first verify that the provided schema is itself |
| valid, since not doing so can lead to less obvious error messages and fail |
| in less obvious or consistent ways. If you know you have a valid schema |
| already or don't care, you might prefer using the |
| :meth:`~IValidator.validate` method directly on a specific validator |
| (e.g. :meth:`Draft4Validator.validate`). |
| |
| |
| :argument instance: the instance to validate |
| :argument schema: the schema to validate with |
| :argument cls: an :class:`IValidator` class that will be used to validate |
| the instance. |
| |
| If the ``cls`` argument is not provided, two things will happen in |
| accordance with the specification. First, if the schema has a |
| :validator:`$schema` property containing a known meta-schema [#]_ then the |
| proper validator will be used. The specification recommends that all |
| schemas contain :validator:`$schema` properties for this reason. If no |
| :validator:`$schema` property is found, the default validator class is |
| :class:`Draft4Validator`. |
| |
| Any other provided positional and keyword arguments will be passed on when |
| instantiating the ``cls``. |
| |
| :raises: |
| :exc:`ValidationError` if the instance is invalid |
| |
| :exc:`SchemaError` if the schema itself is invalid |
| |
| .. rubric:: Footnotes |
| .. [#] known by a validator registered with :func:`validates` |
| """ |
| if cls is None: |
| cls = validator_for(schema) |
| cls.check_schema(schema) |
| cls(schema, *args, **kwargs).validate(instance) |