Source code for nti.externalization.internalization.updater

# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
# -*- coding: utf-8 -*-
"""
The driver functions for updating an object from an external form.

"""

import inspect
import warnings
from collections.abc import MutableMapping
from collections.abc import MutableSequence
from typing import TypeVar
from typing import overload

from zope import interface

try:
    from persistent.interfaces import IPersistent
except ModuleNotFoundError:
     # pylint: disable-next=inherit-non-class
    class IPersistent(interface.Interface): # type:ignore[no-redef]
        """Mock"""

from zope.event import notify as notify_event

from nti.externalization._base_interfaces import PRIMITIVES
from nti.externalization.interfaces import IInternalObjectIO
from nti.externalization.interfaces import IInternalObjectUpdater
from nti.externalization.interfaces import INamedExternalizedObjectFactoryFinder
from nti.externalization.interfaces import ObjectWillUpdateFromExternalEvent

from .events import _notifyModified
from .externals import resolve_externals
from .factories import find_factory_for

T = TypeVar('T')

_EMPTY_DICT: dict = {}
IPersistent_providedBy = IPersistent.providedBy


class _RecallArgs(object):
    __slots__ = (
        'context',
        'require_updater',
        'notify',
        'root',
    )

    # We don't have an __init__, we ask the caller
    # to fill us in. In cython, this avoids some
    # unneeded bint->object->bint conversions.

    def __init__(self):
        self.context = None
        self.require_updater = False
        self.notify = True
        self.root = None

##
# Note on caching: We do not expect the updater objects to be proxied.
# So we directly use type() instead of .__class__, which is faster.
# We also do not expect them to be unloaded/updated/unbounded,
# so we use a regular dict to cache info about them, which is faster
# than a WeakKeyDictionary. For the same reason, we use dynamic warning
# strings.

# Support for varying signatures of the updater. This is slow and
# cumbersome and needs to go; we are in the deprecation period now.
# See https://github.com/NextThought/nti.externalization/issues/30

_argspec_cache: dict[type, str] = {}

# update(ext, context) or update(ext, context=None) or update(ext, dataserver)
# exactly two arguments. It doesn't matter what the name is, we'll call it
# positional.
_UPDATE_ARGS_TWO = "update args two"
_UPDATE_ARGS_CONTEXT_KW = "update args **kwargs"
_UPDATE_ARGS_ONE = "update args external only"


def _get_update_signature(updater) -> str:
    kind = type(updater)

    spec = _argspec_cache.get(kind)
    if spec is None:
        try:
            func = updater.updateFromExternalObject
            argspec = inspect.getfullargspec(func) # pylint:disable=no-member
            keywords = argspec.varkw
            args = argspec.args
            defaults = argspec.defaults
        except TypeError: # pragma: no cover (This is hard to catch in pure-python coverage mode)
            # Cython functions and other extension types are "not a Python function"
            # and don't work with this. We assume they use the standard form accepting
            # 'context' as kwarg
            spec = _UPDATE_ARGS_CONTEXT_KW
        else:
            # argspec.args contains the names of all the parameters.
            # argspec.keywords, if not none, is the name of the **kwarg
            # These all must be methods (or at least classmethods), having
            # an extra 'self' argument.
            if not keywords:
                # No **kwarg, good!
                if len(args) == 3:
                    # update(ext, context) or update(ext, context=None) or update(ext, dataserver)
                    spec = _UPDATE_ARGS_TWO
                else:
                    # update(ext)
                    spec = _UPDATE_ARGS_ONE
            else:
                if len(args) == 3: # pylint:disable=else-if-used
                    # update(ext, context, **kwargs) or update(ext, dataserver, **kwargs)
                    spec = _UPDATE_ARGS_TWO
                elif keywords.startswith("unused") or keywords.startswith('_'):
                    spec = _UPDATE_ARGS_ONE
                else:
                    spec = _UPDATE_ARGS_CONTEXT_KW

            if 'dataserver' in args and defaults and len(defaults) >= 1:
                warnings.warn("The type %r still uses updateFromExternalObject(dataserver=None). "
                              "Please change to context=None." % (kind,),
                              FutureWarning)

        _argspec_cache[kind] = spec

    return spec


_usable_updateFromExternalObject_cache: dict[type, bool] = {}

def _obj_has_usable_updateFromExternalObject(obj) -> bool:
    kind = type(obj)

    usable_from = _usable_updateFromExternalObject_cache.get(kind)
    if usable_from is None:
        has_update = hasattr(obj, 'updateFromExternalObject')
        if not has_update:
            usable_from = False
        else:
            wants_ignore = getattr(obj, '__ext_ignore_updateFromExternalObject__', False)
            usable_from = not wants_ignore
            if wants_ignore:
                warnings.warn("The type %r has __ext_ignore_updateFromExternalObject__=True. "
                              "Please remove updateFromExternalObject from the type." % (kind,),
                              FutureWarning)


        _usable_updateFromExternalObject_cache[kind] = usable_from

    return usable_from


try:
    from zope.testing import cleanup  # pylint:disable=ungrouped-imports
except ImportError: # pragma: no cover
    pass
else:
    cleanup.addCleanUp(_argspec_cache.clear)
    cleanup.addCleanUp(_usable_updateFromExternalObject_cache.clear)


class DefaultInternalObjectFactoryFinder(object):

    def find_factory_for_named_value(self, name, value): # pylint:disable=unused-argument
        return find_factory_for(value)


interface.classImplements(DefaultInternalObjectFactoryFinder, INamedExternalizedObjectFactoryFinder)

_default_factory_finder = DefaultInternalObjectFactoryFinder()

@overload
def update_from_external_object(containedObject: MutableSequence,
                                externalObject: MutableSequence,
                                context=None,
                                require_updater=False,
                                notify=True) -> MutableSequence:
    # pylint:disable=too-many-positional-arguments
    ...

@overload
def update_from_external_object(containedObject: T,
                                externalObject: MutableMapping,
                                context=None,
                                require_updater=False,
                                notify=True) -> T:
    # pylint:disable=too-many-positional-arguments
    ...

[docs] def update_from_external_object(containedObject: T|MutableSequence, externalObject: MutableSequence|MutableMapping, context=None, require_updater=False, notify=True) -> T|MutableSequence: # pylint:disable=line-too-long # pylint:disable=too-many-positional-arguments """ update_from_external_object(containedObject, externalObject, context=None, require_updater=False, notify=True) Central method for updating objects from external values. :param containedObject: The object to update. :param externalObject: The object (typically a mapping or sequence) to update the object from. Usually this is obtained by parsing an external format like JSON. :param context: An object passed to the update methods. :param require_updater: If True (not the default) an exception will be raised if no implementation of :class:`~nti.externalization.interfaces.IInternalObjectUpdater` can be found for the *containedObject.* :keyword bool notify: If ``True`` (the default), then if the updater for the *containedObject* either has no preference (returns None) or indicates that the object has changed, then an :class:`~nti.externalization.interfaces.IObjectModifiedFromExternalEvent` will be fired. This may be a recursive process so a top-level call to this object may spawn multiple events. The events that are fired will have a ``descriptions`` list containing one or more :class:`~zope.lifecycleevent.interfaces.IAttributes` each with ``attributes`` for each attribute we modify (assuming that the keys in the ``externalObject`` map one-to-one to an attribute; if this is the case and we can also find an interface declaring the attribute, then the ``IAttributes`` will have the right value for ``interface`` as well). :return: *containedObject* after updates from *externalObject* Notifies `~.IObjectModifiedFromExternalEvent` for each object that is modified, and `~.IObjectWillUpdateFromExternalEvent` before doing so. .. seealso:: `~.INamedExternalizedObjectFactoryFinder` .. versionchanged:: 1.0.0a2 Remove the ``object_hook`` parameter. .. versionchanged:: 1.1.3 Correctly fire `~.IObjectWillUpdateFromExternalEvent` before updating each object. .. versionchanged:: 3.1.0 Remove the long-deprecated 'pre_hook' parameter. Remove the long-deprecated 'registry' parameter. """ kwargs = _RecallArgs() kwargs.context = context kwargs.require_updater = require_updater kwargs.notify = notify kwargs.root = containedObject return _update_from_external_object(containedObject, externalObject, kwargs)
def _invoke_factory(factory, value): # TODO: Add wrappers when we create the factories in ZCML # so we can always pass the argument? if getattr(factory, '__external_factory_wants_arg__', False): return factory(value) return factory() def _update_sequence( externalObject: MutableSequence, args: _RecallArgs, destination_name=None, find_factory_for_named_value=_default_factory_finder.find_factory_for_named_value ) -> MutableSequence: for index, value in enumerate(externalObject): factory = find_factory_for_named_value(destination_name, value) if factory is not None: new_obj = _invoke_factory(factory, value) value = _update_from_external_object(new_obj, value, args) externalObject[index] = value return externalObject def _invoke_updater(containedObject, externalObject, updater, external_keys, args): # *externalObject* should have all of its values already updated # at this point. # Let the updater resolve externals resolve_externals(updater, containedObject, externalObject, context=args.context) updated = None # The signature may vary. arg_kind = _get_update_signature(updater) if arg_kind is _UPDATE_ARGS_TWO: updated = updater.updateFromExternalObject(externalObject, args.context) elif arg_kind is _UPDATE_ARGS_ONE: updated = updater.updateFromExternalObject(externalObject) else: updated = updater.updateFromExternalObject(externalObject, context=args.context) # Broadcast a modified event if the object seems to have changed. if args.notify and (updated is None or updated): _notifyModified(containedObject, externalObject, updater, external_keys, _EMPTY_DICT) def _find_INamedExternalizedObjectFactoryFinder(containedObject): updater = INamedExternalizedObjectFactoryFinder(containedObject, None) if updater is None: # Ok, check to see if an instance of the old root interface # InternalObjectIO is there and also provides INamedExternalizedObjectFactoryFinder; # if so, there's a bad ZCML registration. updater = IInternalObjectIO(containedObject, None) # pylint:disable=redefined-variable-type if INamedExternalizedObjectFactoryFinder.providedBy(updater): # pylint:disable=no-value-for-parameter warnings.warn( "The adapter %r was registered as IInternalObjectIO when it should be " "IInternalObjectIOFinder; a provides= ZCML directive is probably outdated. " "If the object extends InterfacObjectIO, no provides= is usually necessary." % (updater,), UserWarning ) else: updater = None if updater is None: updater = _default_factory_finder return updater def _update_from_external_object(containedObject: T, externalObject: MutableSequence|MutableMapping, args: _RecallArgs) -> T|MutableSequence: # Parse any contained objects # TODO: We're (deliberately?) not actually updating any contained # objects, we're replacing them. Is that right? We could check OIDs... # If we decide that's right, then the internals could be simplified by # splitting the two parts # TODO: Should the current user impact on this process? if IPersistent_providedBy(containedObject): # pylint:disable=no-value-for-parameter # pylint:disable=protected-access containedObject._v_updated_from_external_source = externalObject # type:ignore[attr-defined] # Sequences do not represent python types, they represent collections of # python types. Note that we don't touch the containedObject in this branch! if isinstance(externalObject, MutableSequence): return _update_sequence(externalObject, args) assert isinstance(externalObject, MutableMapping), externalObject factory_finder = _find_INamedExternalizedObjectFactoryFinder(containedObject) find_factory_for_named_value = factory_finder.find_factory_for_named_value # We have to save the list of keys, it's common that they get popped during the update # process, and then we have no descriptions to send external_keys = [] for k, v in externalObject.items(): external_keys.append(k) if isinstance(v, PRIMITIVES): continue if isinstance(v, MutableSequence): # Update the sequence in-place _update_sequence(v, args, k, find_factory_for_named_value) else: factory = find_factory_for_named_value(k, v) # pylint:disable=assignment-from-no-return, too-many-function-args if factory is not None: new_obj = _invoke_factory(factory, v) externalObject[k] = _update_from_external_object(new_obj, v, args) if _obj_has_usable_updateFromExternalObject(containedObject): # legacy support. The __ext_ignore_updateFromExternalObject__ # allows a transition to an adapter without changing # existing callers and without triggering infinite recursion updater = containedObject else: # It's possible for INamedExternalizedObjectFactoryFinder and # IInternalObjectUpdater to be registered at two different levels # of specificity, so we need to look up IInternalObjectUpdater, # not test if it's provided by what we already have. if args.require_updater and not isinstance(containedObject, dict): # pylint:disable=else-if-used updater = IInternalObjectUpdater(containedObject) else: updater = IInternalObjectUpdater(containedObject, None) if updater is not None: notify_event(ObjectWillUpdateFromExternalEvent(containedObject, externalObject, args.root)) _invoke_updater(containedObject, externalObject, updater, external_keys, args) return containedObject from nti.externalization._compat import import_c_accel import_c_accel(globals(), 'nti.externalization.internalization._updater')