# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
# -*- coding: utf-8 -*-
"""
The driver functions for updating an object from an external form.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
# stdlib imports
try:
from collections.abc import MutableSequence
except ImportError: # Python 2
# pylint:disable=deprecated-class
from collections import MutableSequence
from collections import MutableMapping
else: # pragma: no cover
from collections.abc import MutableMapping
import inspect
import warnings
from persistent.interfaces import IPersistent
from six import iteritems
from zope import interface
from zope.event import notify as notify_event
from nti.externalization._base_interfaces import PRIMITIVES
from nti.externalization._base_interfaces import NotGiven
from nti.externalization.interfaces import IInternalObjectUpdater
from nti.externalization.interfaces import IInternalObjectIO
from nti.externalization.interfaces import INamedExternalizedObjectFactoryFinder
from nti.externalization.interfaces import ObjectWillUpdateFromExternalEvent
from .factories import find_factory_for
from .events import _notifyModified
from .externals import resolve_externals
_EMPTY_DICT = {}
IPersistent_providedBy = IPersistent.providedBy
class _RecallArgs(object):
__slots__ = (
'context',
'require_updater',
'notify',
'pre_hook',
'root',
)
# We don't have an __init__, we ask the caller
# to fill us in. In cython, this avoids some
# unneeded bint->object->bint conversions.
def __init__(self):
self.context = None
self.require_updater = False
self.notify = True
self.pre_hook = None
self.root = None
##
# Note on caching: We do not expect the updater objects to be proxied.
# So we directly use type() instead of .__class__, which is faster.
# We also do not expect them to be unloaded/updated/unbounded,
# so we use a regular dict to cache info about them, which is faster
# than a WeakKeyDictionary. For the same reason, we use dynamic warning
# strings.
# Support for varying signatures of the updater. This is slow and
# cumbersome and needs to go; we are in the deprecation period now.
# See https://github.com/NextThought/nti.externalization/issues/30
_argspec_cache = {}
# update(ext, context) or update(ext, context=None) or update(ext, dataserver)
# exactly two arguments. It doesn't matter what the name is, we'll call it
# positional.
_UPDATE_ARGS_TWO = "update args two"
_UPDATE_ARGS_CONTEXT_KW = "update args **kwargs"
_UPDATE_ARGS_ONE = "update args external only"
def _get_update_signature(updater):
kind = type(updater)
spec = _argspec_cache.get(kind)
if spec is None:
try:
func = updater.updateFromExternalObject
if hasattr(inspect, 'getfullargspec'):
# Python 3. getargspec() is deprecated.
argspec = inspect.getfullargspec(func) # pylint:disable=no-member
keywords = argspec.varkw
else: # Python 2
argspec = inspect.getargspec(func) # pylint:disable=deprecated-method
keywords = argspec.keywords
args = argspec.args
defaults = argspec.defaults
except TypeError: # pragma: no cover (This is hard to catch in pure-python coverage mode)
# Cython functions and other extension types are "not a Python function"
# and don't work with this. We assume they use the standard form accepting
# 'context' as kwarg
spec = _UPDATE_ARGS_CONTEXT_KW
else:
# argspec.args contains the names of all the parameters.
# argspec.keywords, if not none, is the name of the **kwarg
# These all must be methods (or at least classmethods), having
# an extra 'self' argument.
if not keywords:
# No **kwarg, good!
if len(args) == 3:
# update(ext, context) or update(ext, context=None) or update(ext, dataserver)
spec = _UPDATE_ARGS_TWO
else:
# update(ext)
spec = _UPDATE_ARGS_ONE
else:
if len(args) == 3:
# update(ext, context, **kwargs) or update(ext, dataserver, **kwargs)
spec = _UPDATE_ARGS_TWO
elif keywords.startswith("unused") or keywords.startswith('_'):
spec = _UPDATE_ARGS_ONE
else:
spec = _UPDATE_ARGS_CONTEXT_KW
if 'dataserver' in args and defaults and len(defaults) >= 1:
warnings.warn("The type %r still uses updateFromExternalObject(dataserver=None). "
"Please change to context=None." % (kind,),
FutureWarning)
_argspec_cache[kind] = spec
return spec
_usable_updateFromExternalObject_cache = {}
def _obj_has_usable_updateFromExternalObject(obj):
kind = type(obj)
usable_from = _usable_updateFromExternalObject_cache.get(kind)
if usable_from is None:
has_update = hasattr(obj, 'updateFromExternalObject')
if not has_update:
usable_from = False
else:
wants_ignore = getattr(obj, '__ext_ignore_updateFromExternalObject__', False)
usable_from = not wants_ignore
if wants_ignore:
warnings.warn("The type %r has __ext_ignore_updateFromExternalObject__=True. "
"Please remove updateFromExternalObject from the type." % (kind,),
FutureWarning)
_usable_updateFromExternalObject_cache[kind] = usable_from
return usable_from
try:
from zope.testing import cleanup # pylint:disable=ungrouped-imports
except ImportError: # pragma: no cover
pass
else:
cleanup.addCleanUp(_argspec_cache.clear)
cleanup.addCleanUp(_usable_updateFromExternalObject_cache.clear)
class DefaultInternalObjectFactoryFinder(object):
def find_factory_for_named_value(self, name, value): # pylint:disable=unused-argument
return find_factory_for(value)
interface.classImplements(DefaultInternalObjectFactoryFinder, INamedExternalizedObjectFactoryFinder)
_default_factory_finder = DefaultInternalObjectFactoryFinder()
[docs]def update_from_external_object(containedObject, externalObject,
registry=NotGiven, context=None,
require_updater=False,
notify=True,
pre_hook=None):
# pylint:disable=line-too-long
"""
update_from_external_object(containedObject, externalObject, context=None, require_updater=False, notify=True)
Central method for updating objects from external values.
:param containedObject: The object to update.
:param externalObject: The object (typically a mapping or sequence) to update
the object from. Usually this is obtained by parsing an external
format like JSON.
:param context: An object passed to the update methods.
:param require_updater: If True (not the default) an exception
will be raised if no implementation of
:class:`~nti.externalization.interfaces.IInternalObjectUpdater`
can be found for the *containedObject.*
:keyword bool notify: If ``True`` (the default), then if the updater
for the *containedObject* either has no preference (returns
None) or indicates that the object has changed, then an
:class:`~nti.externalization.interfaces.IObjectModifiedFromExternalEvent`
will be fired. This may be a recursive process so a top-level
call to this object may spawn multiple events. The events that
are fired will have a ``descriptions`` list containing one or
more :class:`~zope.lifecycleevent.interfaces.IAttributes` each
with ``attributes`` for each attribute we modify (assuming
that the keys in the ``externalObject`` map one-to-one to an
attribute; if this is the case and we can also find an
interface declaring the attribute, then the ``IAttributes``
will have the right value for ``interface`` as well).
:keyword callable pre_hook: If given, called with the before
update_from_external_object is called for every nested object.
Signature ``f(k,x)`` where ``k`` is either the key name, or
None in the case of a sequence and ``x`` is the external
object. Deprecated.
:return: *containedObject* after updates from *externalObject*
Notifies `~.IObjectModifiedFromExternalEvent` for each object that is modified,
and `~.IObjectWillUpdateFromExternalEvent` before doing so.
.. seealso:: `~.INamedExternalizedObjectFactoryFinder`
.. versionchanged:: 1.0.0a2
Remove the ``object_hook`` parameter.
.. versionchanged:: 1.1.3
Correctly file `~.IObjectWillUpdateFromExternalEvent` before updating
each object.
"""
if pre_hook is not None: # pragma: no cover
for i in range(3):
warnings.warn('pre_hook is deprecated', FutureWarning, stacklevel=i)
if registry is not NotGiven: # pragma: no cover
warnings.warn(
"registry is deprecated and ignored. Call in a correct site.",
FutureWarning
)
kwargs = _RecallArgs()
kwargs.context = context
kwargs.require_updater = require_updater
kwargs.notify = notify
kwargs.pre_hook = pre_hook
kwargs.root = containedObject
return _update_from_external_object(containedObject, externalObject, kwargs)
def _invoke_factory(factory, value):
# TODO: Add wrappers when we create the factories in ZCML
# so we can always pass the argument?
if getattr(factory, '__external_factory_wants_arg__', False):
return factory(value)
return factory()
def _update_sequence(
externalObject, args,
destination_name=None,
find_factory_for_named_value=_default_factory_finder.find_factory_for_named_value):
for index, value in enumerate(externalObject):
if args.pre_hook is not None: # pragma: no cover
args.pre_hook(None, value)
factory = find_factory_for_named_value(destination_name, value)
if factory is not None:
new_obj = _invoke_factory(factory, value)
value = _update_from_external_object(new_obj, value, args)
externalObject[index] = value
return externalObject
def _invoke_updater(containedObject, externalObject,
updater, external_keys, args):
# *externalObject* should have all of its values already updated
# at this point.
# Let the updater resolve externals
resolve_externals(updater, containedObject, externalObject,
context=args.context)
updated = None
# The signature may vary.
arg_kind = _get_update_signature(updater)
if arg_kind is _UPDATE_ARGS_TWO:
updated = updater.updateFromExternalObject(externalObject, args.context)
elif arg_kind is _UPDATE_ARGS_ONE:
updated = updater.updateFromExternalObject(externalObject)
else:
updated = updater.updateFromExternalObject(externalObject,
context=args.context)
# Broadcast a modified event if the object seems to have changed.
if args.notify and (updated is None or updated):
_notifyModified(containedObject, externalObject,
updater, external_keys, _EMPTY_DICT)
def _find_INamedExternalizedObjectFactoryFinder(containedObject):
updater = INamedExternalizedObjectFactoryFinder(containedObject, None)
if updater is None:
# Ok, check to see if an instance of the old root interface
# InternalObjectIO is there and also provides INamedExternalizedObjectFactoryFinder;
# if so, there's a bad ZCML registration.
updater = IInternalObjectIO(containedObject, None)
if INamedExternalizedObjectFactoryFinder.providedBy(updater): # pylint:disable=no-value-for-parameter
warnings.warn(
"The adapter %r was registered as IInternalObjectIO when it should be "
"IInternalObjectIOFinder; a provides= ZCML directive is probably outdated. "
"If the object extends InterfacObjectIO, no provides= is usually necessary."
% (updater,),
UserWarning
)
else:
updater = None
if updater is None:
updater = _default_factory_finder
return updater
def _update_from_external_object(containedObject, externalObject, args):
# Parse any contained objects
# TODO: We're (deliberately?) not actually updating any contained
# objects, we're replacing them. Is that right? We could check OIDs...
# If we decide that's right, then the internals could be simplified by
# splitting the two parts
# TODO: Should the current user impact on this process?
if IPersistent_providedBy(containedObject): # pylint:disable=no-value-for-parameter
# pylint:disable=protected-access
containedObject._v_updated_from_external_source = externalObject
# Sequences do not represent python types, they represent collections of
# python types. Note that we don't touch the containedObject in this branch!
if isinstance(externalObject, MutableSequence):
return _update_sequence(externalObject, args)
assert isinstance(externalObject, MutableMapping), externalObject
factory_finder = _find_INamedExternalizedObjectFactoryFinder(containedObject)
find_factory_for_named_value = factory_finder.find_factory_for_named_value
# We have to save the list of keys, it's common that they get popped during the update
# process, and then we have no descriptions to send
external_keys = []
for k, v in iteritems(externalObject):
external_keys.append(k)
if isinstance(v, PRIMITIVES):
continue
if args.pre_hook is not None: # pragma: no cover
args.pre_hook(k, v)
if isinstance(v, MutableSequence):
# Update the sequence in-place
_update_sequence(v, args, k, find_factory_for_named_value)
else:
factory = find_factory_for_named_value(k, v) # pylint:disable=assignment-from-no-return, too-many-function-args
if factory is not None:
new_obj = _invoke_factory(factory, v)
externalObject[k] = _update_from_external_object(new_obj, v, args)
if _obj_has_usable_updateFromExternalObject(containedObject):
# legacy support. The __ext_ignore_updateFromExternalObject__
# allows a transition to an adapter without changing
# existing callers and without triggering infinite recursion
updater = containedObject
else:
# It's possible for INamedExternalizedObjectFactoryFinder and
# IInternalObjectUpdater to be registered at two different levels
# of specificity, so we need to look up IInternalObjectUpdater,
# not test if it's provided by what we already have.
if args.require_updater and not isinstance(containedObject, dict):
updater = IInternalObjectUpdater(containedObject)
else:
updater = IInternalObjectUpdater(containedObject, None)
if updater is not None:
notify_event(ObjectWillUpdateFromExternalEvent(containedObject, externalObject, args.root))
_invoke_updater(containedObject, externalObject, updater, external_keys, args)
return containedObject
from nti.externalization._compat import import_c_accel # pylint:disable=wrong-import-position,wrong-import-order
import_c_accel(globals(), 'nti.externalization.internalization._updater')