Source code for nti.externalization.oids

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Functions for finding and parsing OIDs.
"""

# stdlib imports
import binascii
import collections
from collections.abc import Sequence

try:
    from ZODB.interfaces import IConnection
except ModuleNotFoundError:
    def IConnection(_):
        raise TypeError
from zope import component

try:
    from zope.intid.interfaces import IIntIds
except ModuleNotFoundError:
    from zope.interface import Interface
    # pylint: disable-next=inherit-non-class
    class IIntIds(Interface): # type:ignore[no-redef]
        """Mock"""

from nti.externalization._compat import bytes_
from nti.externalization.integer_strings import from_external_string
from nti.externalization.integer_strings import to_external_string
from nti.externalization.proxy import removeAllProxies

__all__ = [
    'to_external_oid',
    'from_external_oid',
    'ParsedOID',
]


[docs] def to_external_oid(self, default=None, add_to_connection=False, add_to_intids=False, use_cache=True): # Override the signature to *not* document use_cache. """ to_external_oid(self, default=None, add_to_connection=False, add_to_intids=False) -> bytes For a `persistent object <persistent.Persistent>`, returns its `persistent OID <persistent.interfaces.IPersistent._p_oid>` in a parseable external format (see :func:`.from_external_oid`). This format includes the database name (so it works in a ZODB multi-database) and the integer ID from the closest :class:`zope.intid.interfaces.IIntIds` utility. If the object implements a method ``toExternalOID()``, that method will be called and its result (or the *default*) will be returned. This should generally be considered legacy behaviour. If the object has not been saved, and *add_to_connection* is `False` (the default) returns the *default*. :param bool add_to_connection: If the object is persistent but not yet added to a connection, setting this to true will attempt to add it to the nearest connection in its containment tree, thus letting it have an OID. :param bool add_to_intids: If we can obtain an OID for this object, but it does not have an intid, and an intid utility is available, then if this is `True` (not the default) we will register it with the utility. :return: A :class:`bytes` string. """ # pylint:disable=too-many-positional-arguments,too-complex # TODO: Simplify # pylint:disable=too-many-branches # _p_oid: pylint:disable=protected-access try: return self.toExternalOID() or default except AttributeError: pass if use_cache: # XXX: And yet we still set it always. try: # See comments in to_external_ntiid_oid return getattr(self, '_v_to_external_oid') except AttributeError: pass # because if it was proxied, we should still read the right thing above; # this saves time self = removeAllProxies(self) try: oid = self._p_oid except AttributeError: return default jar = None if not oid: if add_to_connection: try: jar = IConnection(self) except TypeError: return default jar.add(self) # pylint:disable=too-many-function-args oid = self._p_oid else: return default # The object ID is defined to be 8 charecters long. It gets # padded with null chars to get to that length; we strip # those out. Finally, it probably has chars that # aren't legal in UTF or ASCII, so we go to hex and prepend # a flag, '0x' # TODO: Why are we keeping this as a bytes string, not unicode? oid = oid.lstrip(b'\x00') oid = b'0x' + binascii.hexlify(oid) try: jar = jar or self._p_jar except AttributeError: pass if jar: db_name:str = jar.db().database_name oid = oid + b':' + binascii.hexlify(bytes_(db_name)) intutility = component.queryUtility(IIntIds) if intutility is not None: intid = intutility.queryId(self) if intid is None and add_to_intids: intid = intutility.register(self) if intid is not None: if not jar: oid += b':' # Ensure intid is always the third part oid = oid + b':' + bytes_(to_external_string(intid)) try: setattr(self, str('_v_to_external_oid'), oid) except (AttributeError, TypeError): # pragma: no cover pass return oid
toExternalOID = to_external_oid #: The fields of a parsed OID: ``oid``, ``db_name`` and ``intid`` ParsedOID = collections.namedtuple('ParsedOID', ['oid', 'db_name', 'intid'])
[docs] def from_external_oid(ext_oid): """ Given a byte string, as produced by :func:`.to_external_oid`, parses it into its component parts. :param bytes ext_oid: As produced by :func:`to_external_oid`. (Text/unicode is also accepted.) :return: A three-tuple, :class:`ParsedOID`. Only the OID is guaranteed to be present; the other fields may be empty (``db_name``) or `None` (``intid``). """ # But, for legacy reasons, we accept directly the bytes given # in _p_oid, so we have to be careful with our literals here # to avoid Unicode[en|de]codeError __traceback_info__ = ext_oid # Sometimes raw _p_oid values do contain a b':', so simply splitting # on that is not reliable, so try to detect raw _p_oid directly if (isinstance(ext_oid, bytes) and len(ext_oid) == 8 and not ext_oid.startswith(b'0x') and ext_oid.count(b':') != 2): # The last conditions might be overkill, but toExternalOID is actually # returning bytes, and it could conceivably be exactly 8 chars long; # however, a raw oid could also start with the two chars 0x and contain two colons # so the format is a bit ambiguous... return ParsedOID(ext_oid, '', None) ext_oid = ext_oid.encode("ascii") if not isinstance(ext_oid, bytes) else ext_oid parts:Sequence[bytes] = ext_oid.split(b':') if b':' in ext_oid else (ext_oid,) oid_string = parts[0] name_s = parts[1] if len(parts) > 1 else b"" intid_s = parts[2] if len(parts) > 2 else None # Translate the external format if needed if oid_string.startswith(b'0x'): oid_string = binascii.unhexlify(oid_string[2:]) name_s = binascii.unhexlify(name_s) # Recall that oids are padded to 8 with \x00 oid_string = oid_string.rjust(8, b'\x00') __traceback_info__ = ext_oid, oid_string, name_s, intid_s if intid_s is not None: intid = from_external_string(intid_s) else: intid = None return ParsedOID(oid_string, name_s, intid)
fromExternalOID = from_external_oid