Source code for nti.webhooks.subscriptions

# -*- coding: utf-8 -*-
"""
Subscription implementations.

"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import sys
import time

from zope import component
from zope.event import notify
from zope.interface import Interface
from zope.interface import implementer
from zope.interface import providedBy
from zope.interface.interfaces import IRegistered
from zope.interface.interfaces import IUnregistered

from zope.component.globalregistry import BaseGlobalComponents
from zope.component.persistentregistry import PersistentComponents

from zope.authentication.interfaces import IAuthentication
from zope.authentication.interfaces import IUnauthenticatedPrincipal
from zope.authentication.interfaces import PrincipalLookupError
from zope.annotation import IAttributeAnnotatable

from zope.lifecycleevent import IObjectRemovedEvent

from zope.security.interfaces import IPermission
from zope.security.management import newInteraction
from zope.security.management import queryInteraction
from zope.security.management import endInteraction
from zope.security.management import checkPermission
from zope.security.testing import Participation

from zope.container.interfaces import INameChooser
from zope.container.btree import BTreeContainer
from zope.container.sample import SampleContainer
from zope.container.constraints import checkObject
from zope.cachedescriptors.property import CachedProperty

from nti.zodb.containers import time_to_64bit_int
from nti.zodb.minmax import NumericPropertyDefaultingToZero
from nti.zodb.minmax import NumericMinimum

from nti.schema.fieldproperty import createDirectFieldProperties
from nti.schema.schema import SchemaConfigured

from nti.webhooks import MessageFactory as _

from nti.webhooks.interfaces import IWebhookDialect
from nti.webhooks.interfaces import IWebhookSubscription
from nti.webhooks.interfaces import ILimitedAttemptWebhookSubscription
from nti.webhooks.interfaces import ILimitedApplicabilityPreconditionFailureWebhookSubscription
from nti.webhooks.interfaces import IWebhookSubscriptionManager
from nti.webhooks.interfaces import IWebhookDestinationValidator
from nti.webhooks.interfaces import IWebhookDeliveryAttemptResolvedEvent
from nti.webhooks.interfaces import IWebhookDeliveryAttemptFailedEvent
from nti.webhooks.interfaces import IWebhookSubscriptionApplicabilityPreconditionFailureLimitReached
from nti.webhooks.interfaces import WebhookSubscriptionApplicabilityPreconditionFailureLimitReached

from nti.webhooks.attempts import WebhookDeliveryAttempt
from nti.webhooks.attempts import PersistentWebhookDeliveryAttempt

from nti.webhooks._util import DCTimesMixin
from nti.webhooks._util import PersistentDCTimesMixin
from nti.webhooks._util import describe_class_or_specification

from persistent import Persistent

logger = __import__('logging').getLogger(__name__)

class _CheckObjectOnSetBTreeContainer(BTreeContainer):
    """
    Extending this makes you persistent.
    """
    # XXX: Taken from nti.containers. Should publish that package.

    def _setitemf(self, key, value):
        checkObject(self, key, value)
        super(_CheckObjectOnSetBTreeContainer, self)._setitemf(key, value)

class _CheckObjectOnSetSampleContainer(SampleContainer):
    """
    Non-persistent.
    """
    def __setitem__(self, key, value):
        checkObject(self, key, value)
        super(_CheckObjectOnSetSampleContainer, self).__setitem__(key, value)

    def _newContainerData(self):
        # We return a BTree so that iteration order is guaranteed.
        from BTrees import family64
        return family64.OO.BTree()


[docs]class IApplicableSubscriptionFactory(Interface): # pylint:disable=inherit-non-class """ A private contract between the Subscription and its SubscriptionManager. This is only called on subscriptions that are already determined to be *active*; if the subscription is also *applicable*, then it should be returned. Otherwise, it should return None. This is called when we intend to attempt delivery, so it's a good time to take cleanup action if the subscription isn't applicable for reasons that aren't directly related to the *data* and the *event*, for example, if the principal cannot be found. """ def __call__(data, event): # pylint:disable=no-self-argument,signature-differs """ See class documentation. """
[docs]@implementer(ILimitedAttemptWebhookSubscription, ILimitedApplicabilityPreconditionFailureWebhookSubscription, IAttributeAnnotatable, IApplicableSubscriptionFactory) class AbstractSubscription(SchemaConfigured): """ Subclasses need to extend a ``Container`` implementation. """ for_ = permission_id = owner_id = dialect_id = when = None to = u'' active = None createDirectFieldProperties(IWebhookSubscription) createDirectFieldProperties(ILimitedAttemptWebhookSubscription) __parent__ = None attempt_limit = 50 applicable_precondition_failure_limit = 50 fallback_to_unauthenticated_principal = True def __init__(self, **kwargs): self.createdTime = self.lastModified = time.time() SchemaConfigured.__init__(self, **kwargs) def keys(self): raise NotImplementedError def _setActive(self, active): # The public field is readonly; that only kicks in # when there is a value in the __dict__ already. self.__dict__.pop('active', None) self.active = active if active: # Back to the default message self.__dict__.pop('status_message', None) # Reset to 0 del self._delivery_applicable_precondition_failed # TODO: If we need to, this would be a good place to notify specific # events about becoming in/active. The ``I[Un]Registered`` event we use to # call *this* function can be used, but isn't obvious (and the order may be # difficult to define).
[docs] def pop(self): """Testing only. Removes and returns a random value.""" k = list(self.keys())[0] v = self[k] del self[k] return v
[docs] def clear(self): """Testing only. Removes all delivery attempts.""" for k in list(self.keys()): del self[k]
def _find_principal(self, data): principal = None for context in (data, None): auth = component.queryUtility(IAuthentication, context=context) if auth is None: continue try: principal = auth.getPrincipal(self.owner_id) except PrincipalLookupError: # If no principal by that name exists, use the # unauthenticatedPrincipal. This could return None. It # will still be replaced by the named principal in the # other IAuthentication, if need be. # # XXX: Using the unauthenticated principal when no # principal can be found is elegant. But it makes it # harder to deactivate broken subscriptions (we'd have # to spread this knowledge around a few functions). # That's why we allow disabling it, and why persistent # subscriptions disable it by default. Maybe there's # something better? Maybe spreading that knowledge is # the best we can do. if self.fallback_to_unauthenticated_principal: principal = auth.unauthenticatedPrincipal() else: assert principal is not None break if principal is None and self.fallback_to_unauthenticated_principal: # Hmm. Either no IAuthentication found, or none of them found a # principal while also not having an unauthenticated principal. # In that case, we will fall back to the global IUnauthenticatedPrincipal as # defined by zope.principalregistry. This should typically not happen. principal = component.getUtility(IUnauthenticatedPrincipal) return principal def _find_permission(self, data): if self.permission_id is None: return None for context in (data, None): perm = component.queryUtility(IPermission, self.permission_id, context=context) if perm is not None: break return perm def isApplicable(self, data): if hasattr(self.for_, 'providedBy'): if not self.for_.providedBy(data): return False else: if not isinstance(data, self.for_): # pylint:disable=isinstance-second-argument-not-valid-type return False # No need for the distinction it makes here. return bool(self.__checkSecurity(data)) def __checkSecurity(self, data): """ Returns a boolean indicating whether *data* passes the security checks defined for this subscription. If we are not able to make the security check because the principal or permission we are supposed to use is not defined, returns the special (false) value `None`. This can be used to distinguish the case where access is denied by the security policy from the case where requested principals are missing. """ if not self.permission_id and not self.owner_id: # If no security is requested, we're good. return True # OK, now we need to find the permission and the principal. # Both should be found in the context of the data; if not # there, then check the currently installed site. principal = self._find_principal(data) permission = self._find_permission(data) if principal is None or permission is None: # A missing permission causes zope.security to grant full access. # It's treated the same as zope.Public. So don't let that happen. return None # Now, we need to set up the interaction and do the security check. participation = Participation(principal) current_interaction = queryInteraction() if current_interaction is not None: # Cool, we can add our participation to the interaction. current_interaction.add(participation) else: newInteraction(participation) try: # Yes, this needs the ID of the permission, not the permission object. return checkPermission(self.permission_id, data) finally: if current_interaction is not None: current_interaction.remove(participation) else: endInteraction() # We only ever use the ``increment()`` method of this, *or* we # delete it (which works even if there's nothing in our ``__dict__``) # when we are making other changes, so subclasses do not have to be # ``PersistentPropertyHolder`` objects. (But they are.) _delivery_applicable_precondition_failed = NumericPropertyDefaultingToZero( '_delivery_applicable_precondition_failed', # We have to use NumericMinimum instead of NumericMaximum or # MergingCounter because we periodically reset to 0. And MergingCounter # has a bug when that happens. (https://github.com/NextThought/nti.zodb/issues/6) NumericMinimum, ) def __call__(self, data, event): # We're assumed applicable for the data and event, no need to double # check that. assert self.active security_check = self.__checkSecurity(data) if security_check: # Yay, access granted! # TODO: Should we decrement the failure count here # (keeping a floor of 0)? return self # Boo, no access :( if security_check is None: # Failed to find the principal/permission. Something is wrong. failures = self._delivery_applicable_precondition_failed try: incr = failures.increment except AttributeError: # See https://github.com/NextThought/nti.zodb/issues/7 failures.value += 1 else: failures = incr() # XXX: JAM: Why did I think checking it here was the best thing, instead # of just sending the event every time a failure occurs? Was I trying to # cut down on the chance of misusing the failure property? Trying to cut down # on the number of events generated? Trying to reduce conflicts? if failures.value >= self.applicable_precondition_failure_limit: notify(WebhookSubscriptionApplicabilityPreconditionFailureLimitReached( self, failures)) return None @CachedProperty('dialect_id') def dialect(self): # Find the dialect with the given name, using our location # as the context to find the enclosing site manager. return component.getUtility(IWebhookDialect, self.dialect_id or u'', self) def _new_deliveryAttempt(self): return WebhookDeliveryAttempt() def createDeliveryAttempt(self, payload_data): attempt = self._new_deliveryAttempt() attempt.payload_data = payload_data # Store the attempt (make it contained by this object) before we # conceivably change its status. Changing the status # can fire events, and we need to know the parent for those events to # work properly. # Choose names that are easily sortable, since that's # our iteration order. now = str(time_to_64bit_int(time.time())) name = INameChooser(self).chooseName(now, attempt) # pylint:disable=too-many-function-args,assignment-from-no-return self[name] = attempt # Verify the destination. Fail early # if it doesn't pass. validator = component.getUtility(IWebhookDestinationValidator, u'', self) try: validator.validateTarget(self.to) except Exception: # pylint:disable=broad-except # The exception value can vary; it's not intended to be presented to end # users as-is attempt.message = ( u'Verification of the destination URL failed. Please check the domain.' ) attempt.internal_info.storeExceptionInfo(sys.exc_info()) attempt.status = 'failed' # This could cause pruning return attempt def __repr__(self): return "<%s.%s at 0x%x to=%r for=%s when=%s>" % ( self.__class__.__module__, self.__class__.__name__, id(self), self.to, describe_class_or_specification(self.for_), describe_class_or_specification(self.when), )
[docs]class Subscription(_CheckObjectOnSetSampleContainer, AbstractSubscription, DCTimesMixin): def __init__(self, **kwargs): AbstractSubscription.__init__(self, **kwargs) _CheckObjectOnSetSampleContainer.__init__(self)
[docs]class PersistentSubscription(_CheckObjectOnSetBTreeContainer, AbstractSubscription, PersistentDCTimesMixin): """ Persistent implementation of `IWebhookSubscription` """ fallback_to_unauthenticated_principal = False def __init__(self, **kwargs): AbstractSubscription.__init__(self, **kwargs) _CheckObjectOnSetBTreeContainer.__init__(self) def _new_deliveryAttempt(self): return PersistentWebhookDeliveryAttempt() __repr__ = Persistent.__repr__ _p_repr = AbstractSubscription.__repr__
def _subscription_full(subscription, strict): return ILimitedAttemptWebhookSubscription.providedBy(subscription) \ and len(subscription) > (subscription.attempt_limit - (1 if strict else 0)) @component.adapter(IWebhookDeliveryAttemptResolvedEvent) def prune_subscription_when_resolved(event): # type: (IWebhookDeliveryAttemptResolvedEvent) -> None attempt = event.object # type: WebhookDeliveryAttempt subscription = attempt.__parent__ if not _subscription_full(subscription, False): return # Copy to avoid concurrent modification. On PyPy, we've seen this # produce ``IndexError: list index out of range`` in some tests. # This can be reproduced in CPython using PURE_PYTHON mode. count = 0 for key, stored_attempt in list(subscription.items()): if stored_attempt.resolved(): del subscription[key] count += 1 if len(subscription) <= subscription.attempt_limit: break logger.debug( "Pruned %d old delivery attempts from subscription %s", count, subscription ) @component.adapter(IWebhookDeliveryAttemptFailedEvent) def deactivate_subscription_when_all_failed(event): # type: (IWebhookDeliveryAttemptFailedEvent) -> None attempt = event.object # type: WebhookDeliveryAttempt subscription = attempt.__parent__ if not _subscription_full(subscription, True): return # This is a very simple-minded approach. Something more featured # might involve a ratio of failed attempts? Over some sort of sliding window? # Or examining the time period? # This has to activate all the sub-objects, which could be expensive. # We could make the subscription use BTree Length objects to track # the various states. if all(attempt.failed() for attempt in subscription.values()): logger.info( "Deactivating webhook subscription %s due to too many delivery failures.", subscription, ) manager = subscription.__parent__ # type:PersistentWebhookSubscriptionManager manager.deactivateSubscription(subscription) subscription.status_message = _(u'Delivery suspended due to too many delivery failures.') @component.adapter(ILimitedApplicabilityPreconditionFailureWebhookSubscription, IWebhookSubscriptionApplicabilityPreconditionFailureLimitReached) def deactivate_subscription_when_applicable_limit_exceeded(subscription, event): # See comments in __call__. This is only sent when the limit is actually # exceeded. logger.info( "Deactivating webhook subscription %s due to too many precondition failures.", subscription, ) manager = subscription.__parent__ manager.deactivateSubscription(subscription) subscription.status_message = _(u'Delivery suspended due to too many precondition failures.') class AbstractWebhookSubscriptionManager(object): def __init__(self): super(AbstractWebhookSubscriptionManager, self).__init__() self.registry = self._make_registry() self.createdTime = self.lastModified = time.time() def _make_registry(self): raise NotImplementedError def _new_Subscription(self, **kwargs): raise NotImplementedError def createSubscription(self, to=None, for_=None, when=None, owner_id=None, permission_id=None, dialect_id=None): subscription = self._new_Subscription(to=to, for_=for_, when=when, owner_id=owner_id, permission_id=permission_id, dialect_id=dialect_id) name_chooser = INameChooser(self) name = name_chooser.chooseName('', subscription) # pylint:disable=too-many-function-args,assignment-from-no-return self[name] = subscription self.activateSubscription(subscription) return subscription def activateSubscription(self, subscription): if subscription.__parent__ is not self: raise AssertionError # active subscriptions are managed as 'subscription adapters'. # This lets us use the ``subscribers()`` API to treat them as # callable factories that return None if they are not # applicable. (Compare with 'handlers', which, while callable, # aren't treated as factories and have no return value). The # major difference is that we need to provide a *provided* argument so that # we can replicate it when we call ``subscribers``. self.registry.registerSubscriptionAdapter(subscription, (subscription.for_, subscription.when), IWebhookSubscription) return True def deactivateSubscription(self, subscription): if subscription.__parent__ is not self: raise AssertionError return self.registry.unregisterSubscriptionAdapter(subscription, (subscription.for_, subscription.when), IWebhookSubscription) def activeSubscriptions(self, data, event): # pylint:disable=no-member return self.registry.adapters.subscriptions((providedBy(data), providedBy(event)), IWebhookSubscription) def subscriptionsToDeliver(self, data, event): return self.registry.subscribers((data, event), IWebhookSubscription) def deleteSubscriptionsForPrincipal(self, principal_id): # We don't think this will be a performance bottleneck, subscription # counts should be small. # pylint:disable=no-member owned = [(k, v) for k, v in self.items() if v.owner_id == principal_id] for k, _ in owned: del self[k] return owned @component.adapter(IWebhookSubscription, IRegistered) def sync_active_status_registered(subscription, _event): # type: (Subscription, Any) -> None subscription._setActive(True) # pylint:disable=protected-access @component.adapter(IWebhookSubscription, IUnregistered) def sync_active_status_unregistered(subscription, _event): subscription._setActive(False) # pylint:disable=protected-access @component.adapter(IWebhookSubscription, IObjectRemovedEvent) def deactivate_subscription_when_removed(subscription, event): event.oldParent.deactivateSubscription(subscription)
[docs]class GlobalSubscriptionComponents(BaseGlobalComponents): """ Exists to be pickled by name. """
global_subscription_registry = GlobalSubscriptionComponents('global_subscription_registry')
[docs]@implementer(IWebhookSubscriptionManager) class GlobalWebhookSubscriptionManager(AbstractWebhookSubscriptionManager, _CheckObjectOnSetSampleContainer, DCTimesMixin): def __init__(self, name): super(GlobalWebhookSubscriptionManager, self).__init__() self.__name__ = name def _make_registry(self): return global_subscription_registry def _new_Subscription(self, **kwargs): return Subscription(**kwargs) def __reduce__(self): # The global manager is pickled as a global object. return self.__name__
[docs]@implementer(IWebhookSubscriptionManager) class PersistentWebhookSubscriptionManager(AbstractWebhookSubscriptionManager, PersistentDCTimesMixin, _CheckObjectOnSetBTreeContainer): def __init__(self): super(PersistentWebhookSubscriptionManager, self).__init__() self.registry = self._make_registry() def _make_registry(self): return PersistentComponents() def _new_Subscription(self, **kwargs): return PersistentSubscription(**kwargs)
# The name string must match the variable name to pickle correctly global_subscription_manager = GlobalWebhookSubscriptionManager('global_subscription_manager') def getGlobalSubscriptionManager(): return global_subscription_manager def resetGlobals(): global_subscription_manager.__dict__.clear() global_subscription_manager.__init__('global_subscription_manager') global_subscription_registry.__init__('global_subscription_registry') try: from zope.testing.cleanup import addCleanUp # pylint:disable=ungrouped-imports except ImportError: # pragma: no cover pass else: addCleanUp(resetGlobals) del addCleanUp