Source code for nti.externalization.internalization.fields

# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
# -*- coding: utf-8 -*-
"""
Functions for validating and setting individual fields
(attributes) of an object.

"""

from collections.abc import Callable

# pylint:disable=protected-access
# stdlib imports
from sys import exc_info as get_exc_info

from zope.event import notify
from zope.interface import implementedBy
from zope.schema.fieldproperty import NO_VALUE
from zope.schema.fieldproperty import FieldProperty
from zope.schema.fieldproperty import FieldUpdatedEvent
from zope.schema.interfaces import IField
from zope.schema.interfaces import SchemaNotCorrectlyImplemented
from zope.schema.interfaces import SchemaNotProvided
from zope.schema.interfaces import ValidationError
from zope.schema.interfaces import WrongContainedType
from zope.schema.interfaces import WrongType

IField_providedBy = IField.providedBy

__all__ = [
    'validate_field_value',
    'validate_named_field_value',
]

def noop():
    return

class SetattrSet(object):
    """
    A callable object that uses ``setattr`` to set an
    attribute on an object.
    """

    # Needed because Cython had issues compiling the anonymous
    # lambda closures we previously used.

    __slots__ = (
        'ext_self',
        'field_name',
        'value',
    )

    def __init__(self, ext_self, field_name, value):
        self.ext_self = ext_self
        self.field_name = field_name
        self.value = value

    def __call__(self):
        setattr(self.ext_self, self.field_name, self.value)

class FieldSet(object):
    """
    A callable object that uses ``field.set`` to set an
    attribute on an object.
    """

    # See SetattrSet for the justification.

    __slots__ = (
        'ext_self',
        'field',
        'value'
    )

    def __init__(self, ext_self, field, value):
        self.ext_self = ext_self
        # Don't denormalize field.set; there's a tiny
        # chance we won't actually be called.
        # The field must already be bound to ext_self, and
        # the value must already be valid.
        self.field = field
        self.value = value

    def __call__(self):
        # We monkey-patch FieldProperty so we can avoid double
        # validation, which can be quite expensive in benchmarks.
        # (See below.)

        # The object we're updating is either newly created or
        # otherwise local to this thread, so there shouldn't be any
        # race conditions here. We also generally don't expect to be used
        # with objects that have limited __slots__ and no __dict__
        self.ext_self._v_bound_field_already_valid = self.field
        try:
            self.field.set(self.ext_self, self.value)
        finally:
            del self.ext_self._v_bound_field_already_valid


_FieldProperty_orig_set = FieldProperty.__set__

def _FieldProperty__set__valid(self, inst, value):
    valid_field = getattr(inst, '_v_bound_field_already_valid', None)
    if valid_field is not None:
        # Skip the validation, but do everything else just like
        # FieldProperty does.
        oldvalue = self.queryValue(inst, NO_VALUE)
        inst.__dict__[self._FieldProperty__name] = value
        notify(FieldUpdatedEvent(inst, valid_field, oldvalue, value))
    else:
        _FieldProperty_orig_set(self, inst, value)

_FieldProperty__set__valid.orig_func = _FieldProperty_orig_set # type:ignore[attr-defined]

# Detect the case that we're in Cython compiled code, where
# we've already replaced the __set__ function with our own.
if FieldProperty.__set__.__name__ == _FieldProperty__set__valid.__name__: # pragma: no cover
    _FieldProperty_orig_set = FieldProperty.__set__.orig_func # pylint:disable=no-member
    _FieldProperty__set__valid.orig_func = _FieldProperty_orig_set # type:ignore[attr-defined]

FieldProperty.__set__ = _FieldProperty__set__valid



class CannotConvertSequenceError(TypeError):
    """
    A TypeError raised when we are asked to convert a sequence
    but we don't know how.
    """

def _adapt_sequence(field, value) -> list:
    # IObject provides `schema`, which is an interface, so we can adapt
    # using it. Some other things do not, for example nti.schema.field.Variant
    # They might provide a `fromObject` function to do the conversion
    # The field may be able to handle the whole thing by itself or we may need
    # to do the individual objects. If there was a `fromObject` for the field,
    # we called it already.

    # The conversion process may raise TypeError
    value_type = field.value_type
    if hasattr(value_type, 'fromObject'):
        converter = value_type.fromObject
    elif hasattr(value_type, 'schema'):
        converter = value_type.schema
    else:
        raise CannotConvertSequenceError(
            "Don't know how to convert sequence %r for field %s"
            % (value, field))

    value = [converter(v) for v in value]

    return value


def _all_SchemaNotProvided(sequence):
    return all(isinstance(ex, SchemaNotProvided) for ex in sequence)

###
# Fixup functions for various validation errors.
# Because these are called as separate functions *after* the
# exception is caught, the fact that they each take a reference to the
# exception's traceback does not introduce cycles. (Also it helps
# that these are compiled with Cython, which doesn't use frame objects
# in the traceback.) So we don't bother with the usual try/finally: del
###

def _handle_SchemaNotProvided(field_name, field, value): # pylint:disable=unused-argument
    # The object doesn't implement the required interface.
    # Can we adapt the provided object to the desired interface?
    # First, capture the details so we can reraise if needed
    exc_type, exc_val, _exc_tb = get_exc_info()

    try:
        value = field.schema(value)
        field.validate(value)
    except (LookupError, TypeError, ValidationError, AttributeError):
        # Nope. TypeError (or AttrError - Variant) means we couldn't adapt,
        # and a validation error means we could adapt, but it still wasn't
        # right. Raise the original SchemaValidationError.
        raise exc_val if exc_val is not None else exc_type() from None # type:ignore[misc]
    return value

def _handle_WrongType(field_name, field, value): # pylint:disable=unused-argument
    # Like SchemaNotProvided, but for a primitive type,
    # most commonly a date
    # Can we adapt?
    _exc_type, exc_value, _exc_tb = get_exc_info()
    assert isinstance(exc_value, WrongType)

    if not exc_value.expected_type: # pragma: no cover
        raise exc_value

    exp_type = exc_value.expected_type
    implemented_by_type = list(implementedBy(exp_type))
    # If the type unambiguously implements an interface (one interface)
    # that's our target. IDate does this
    if len(implemented_by_type) != 1: # pragma: no cover
        raise exc_value


    schema = implemented_by_type[0]

    try:
        result = schema(value)
    except (LookupError, TypeError):
        # No registered adapter, darn
        raise exc_value from None
    except ValidationError as e:
        # Found an adapter, but it does its own validation,
        # and that validation failed (eg, IDate below)
        # This is still a more useful error than WrongType,
        # so go with it after ensuring it has a field
        e.field = field
        raise

    return result


def _handle_WrongContainedType(field_name, field, value): # pylint:disable=unused-argument
    # We failed to set a sequence. This would be of simple (non externalized)
    # types.
    # Try to adapt each value to what the sequence wants, just as above,
    # if the error is one that may be solved via simple adaptation
    _exc_type, exc_val, _exc_tb = get_exc_info()
    assert isinstance(exc_val, WrongContainedType)

    if not exc_val.errors or not _all_SchemaNotProvided(exc_val.errors):
        raise exc_val

    # IObject provides `schema`, which is an interface, so we can adapt
    # using it. Some other things do not, for example nti.schema.field.Variant
    # They might provide a `fromObject` function to do the conversion
    # The field may be able to handle the whole thing by itself or we may need
    # to do the individual objects

    try:
        value = _adapt_sequence(field, value)
    except TypeError:
        # TypeError means we couldn't adapt, in which case we want
        # to raise the original error. If we could adapt,
        # but the converter does its own validation (e.g., fromObject)
        # then we want to let that validation error rise
        raise exc_val from None

    # Now try to validate the converted value
    try:
        field.validate(value)
    except ValidationError:
        # Nope. TypeError means we couldn't adapt, and a
        # validation error means we could adapt, but it still wasn't
        # right. Raise the original SchemaValidationError.
        raise exc_val from None

    return value

_CONVERTERS = (
    ('fromUnicode', str),
    ('fromBytes', bytes),
    ('fromObject', object)
)

[docs] def validate_field_value(self, field_name:str, field, value) -> Callable[[], None]: """ Given a :class:`zope.schema.interfaces.IField` object from a schema implemented by `self`, validates that the proposed value can be set. If the value needs to be adapted to the schema type for validation to work, this method will attempt that. :param str field_name: The name of the field we are setting. This implementation currently only uses this for informative purposes. :param field: The schema field to use to validate (and set) the value. :type field: :class:`zope.schema.interfaces.IField` :raises zope.interface.Invalid: If the field cannot be validated, along with a good reason (typically better than simply provided by the field itself) :return: A callable of no arguments to call to actually set the value (necessary in case the value had to be adapted). """ field = field.bind(self) try: for meth_name_kind in _CONVERTERS: if isinstance(value, meth_name_kind[1]): meth = getattr(field, meth_name_kind[0], None) if meth is not None: value = meth(value) break else: # Here if we do not break out of the loop. field.validate(value) except SchemaNotProvided: # Raised by Object fields value = _handle_SchemaNotProvided(field_name, field, value) except WrongType: value = _handle_WrongType(field_name, field, value) # Lets try again with the adapted value return validate_field_value(self, field_name, field, value) except SchemaNotCorrectlyImplemented: # Raised by Object fields. Order matters, for BWC this is a type of # WrongContainedType. raise except WrongContainedType: value = _handle_WrongContainedType(field_name, field, value) if (field.readonly and field.query(self) is None and field.queryTaggedValue('_ext_allow_initial_set')): if value is not None: # First time through we get to set it, but we must bypass # the field _do_set = SetattrSet(self, field_name, value) else: _do_set = noop else: _do_set = FieldSet(self, field, value) # pylint:disable=redefined-variable-type return _do_set
[docs] def validate_named_field_value(self, iface, field_name:str, value): """ Given a :class:`zope.interface.Interface` and the name of one of its attributes, validate that the given ``value`` is appropriate to set. See :func:`validate_field_value` for details. :param str field_name: The name of a field contained in `iface`. May name a regular :class:`zope.interface.Attribute`, or a :class:`zope.schema.interfaces.IField`; if the latter, extra validation will be possible. :return: A callable of no arguments to call to actually set the value. """ field = iface[field_name] if IField_providedBy(field): # pylint:disable=no-value-for-parameter return validate_field_value(self, field_name, field, value) return SetattrSet(self, field_name, value)
from nti.externalization._compat import import_c_accel import_c_accel(globals(), 'nti.externalization.internalization._fields')