blob: cadc716d12cf51fd035d840bb2dbbe361b43462b [file] [log] [blame]
# vim:ts=4 sw=4 expandtab softtabstop=4
from jsonmerge.exceptions import HeadInstanceError, \
BaseInstanceError, \
SchemaError
import jsonschema
import re
class Strategy(object):
"""Base class for merge strategies.
"""
def merge(self, walk, base, head, schema, meta, **kwargs):
"""Merge head instance into base.
walk -- WalkInstance object for the current context.
base -- Value being merged into.
head -- Value being merged.
schema -- Schema used for merging.
meta -- Meta data, as passed to the Merger.merge() method.
kwargs -- Dict with any extra options given in the 'mergeOptions'
keyword
Specific merge strategies should override this method to implement
their behavior.
The function should return the object resulting from the merge.
Recursion into the next level, if necessary, is achieved by calling
walk.descend() method.
"""
raise NotImplemented
def get_schema(self, walk, schema, meta, **kwargs):
"""Return the schema for the merged document.
walk -- WalkSchema object for the current context.
schema -- Original document schema.
meta -- Schema for the meta data, as passed to the Merger.get_schema()
method.
kwargs -- Dict with any extra options given in the 'mergeOptions'
keyword.
Specific merge strategies should override this method to modify the
document schema depending on the behavior of the merge() method.
The function should return the schema for the object resulting from the
merge.
Recursion into the next level, if necessary, is achieved by calling
walk.descend() method.
Implementations should take care that all external schema references
are resolved in the returned schema. This can be achieved by calling
walk.resolve_refs() method.
"""
raise NotImplemented
class Overwrite(Strategy):
def merge(self, walk, base, head, schema, meta, **kwargs):
return head
def get_schema(self, walk, schema, meta, **kwargs):
return walk.resolve_refs(schema)
class Version(Strategy):
def merge(self, walk, base, head, schema, meta, limit=None, unique=None, ignoreDups=True, **kwargs):
# backwards compatibility
if unique is False:
ignoreDups = False
if base is None:
base = []
else:
base = list(base)
if not ignoreDups or not base or base[-1]['value'] != head:
base.append(walk.add_meta(head, meta))
if limit is not None:
base = base[-limit:]
return base
def get_schema(self, walk, schema, meta, limit=None, **kwargs):
if meta is not None:
item = dict(meta)
else:
item = {}
if 'properties' not in item:
item['properties'] = {}
item['properties']['value'] = walk.resolve_refs(schema)
rv = { "type": "array",
"items": item }
if limit is not None:
rv['maxItems'] = limit
return rv
class Append(Strategy):
def merge(self, walk, base, head, schema, meta, **kwargs):
if not walk.is_type(head, "array"):
raise HeadInstanceError("Head for an 'append' merge strategy is not an array")
if base is None:
base = []
else:
if not walk.is_type(base, "array"):
raise BaseInstanceError("Base for an 'append' merge strategy is not an array")
base = list(base)
base += head
return base
def get_schema(self, walk, schema, meta, **kwargs):
schema.pop('maxItems', None)
schema.pop('uniqueItems', None)
return walk.resolve_refs(schema)
class ArrayMergeById(Strategy):
def merge(self, walk, base, head, schema, meta, idRef="id", ignoreId=None, **kwargs):
if not walk.is_type(head, "array"):
raise HeadInstanceError("Head for an 'arrayMergeById' merge strategy is not an array") # nopep8
if base is None:
base = []
else:
if not walk.is_type(base, "array"):
raise BaseInstanceError("Base for an 'arrayMergeById' merge strategy is not an array") # nopep8
base = list(base)
subschema = None
if schema:
subschema = schema.get('items')
if walk.is_type(subschema, "array"):
raise SchemaError("'arrayMergeById' not supported when 'items' is an array")
for head_item in head:
try:
head_key = walk.resolver.resolve_fragment(head_item, idRef)
except jsonschema.RefResolutionError:
# Do nothing if idRef field cannot be found.
continue
if head_key == ignoreId:
continue
key_count = 0
for i, base_item in enumerate(base):
base_key = walk.resolver.resolve_fragment(base_item, idRef)
if base_key == head_key:
key_count += 1
# If there was a match, we replace with a merged item
base[i] = walk.descend(subschema, base_item, head_item, meta)
if key_count == 0:
# If there wasn't a match, we append a new object
base.append(walk.descend(subschema, None, head_item, meta))
if key_count > 1:
raise BaseInstanceError("Id was not unique")
return base
def get_schema(self, walk, schema, meta, **kwargs):
subschema = None
if schema:
subschema = schema.get('items')
# Note we're discarding the walk.descend() result here. This is because
# it would de-reference the $ref if the subschema is a reference - i.e.
# in the result it would replace the reference with the copy of the
# target.
#
# But we want to keep the $ref and do the walk.descend() only on the target of the reference.
#
# This seems to work, but is an ugly workaround. walk.descend() should
# be fixed instead to not dereference $refs when not necessary.
walk.descend(subschema, meta)
return schema
class ObjectMerge(Strategy):
def merge(self, walk, base, head, schema, meta, **kwargs):
if not walk.is_type(head, "object"):
raise HeadInstanceError("Head for an 'object' merge strategy is not an object")
if base is None:
base = {}
else:
if not walk.is_type(base, "object"):
raise BaseInstanceError("Base for an 'object' merge strategy is not an object")
base = dict(base)
for k, v in head.items():
subschema = None
# get subschema for this element
if schema is not None:
p = schema.get('properties')
if p is not None:
subschema = p.get(k)
if subschema is None:
p = schema.get('patternProperties')
if p is not None:
for pattern, s in p.items():
if re.search(pattern, k):
subschema = s
if subschema is None:
p = schema.get('additionalProperties')
if p is not None:
subschema = p.get(k)
base[k] = walk.descend(subschema, base.get(k), v, meta)
return base
def get_schema(self, walk, schema, meta, **kwargs):
for forbidden in ("oneOf", "allOf", "anyOf"):
if forbidden in schema:
raise SchemaError("Type ambiguous schema")
schema2 = dict(schema)
def descend_keyword(keyword):
p = schema.get(keyword)
if p is not None:
for k, v in p.items():
schema2[keyword][k] = walk.descend(v, meta)
descend_keyword("properties")
descend_keyword("patternProperties")
descend_keyword("additionalProperties")
return schema2