Package elisa :: Package core :: Package utils :: Module cancellable_defer

Source Code for Module elisa.core.utils.cancellable_defer

  1  # -*- coding: utf-8 -*- 
  2  # as a copy of http://twistedmatrix.com/trac/ticket/990 . The licence of this 
  3  # file is for twisted under the MIT: 
  4  #       http://www.opensource.org/licenses/mit-license.php 
  5  #  
  6  # Copyright (c) 2001-2008 
  7  # Allen Short 
  8  # Andrew Bennetts 
  9  # Apple Computer, Inc. 
 10  # Benjamin Bruheim 
 11  # Bob Ippolito 
 12  # Canonical Limited 
 13  # Christopher Armstrong 
 14  # David Reid 
 15  # Donovan Preston 
 16  # Eric Mangold 
 17  # Itamar Shtull-Trauring 
 18  # James Knight 
 19  # Jason A. Mobarak 
 20  # Jonathan Lange 
 21  # Jonathan D. Simms 
 22  # Jp Calderone 
 23  # Jürgen Hermann 
 24  # Kevin Turner 
 25  # Mary Gardiner 
 26  # Matthew Lefkowitz 
 27  # Massachusetts Institute of Technology 
 28  # Moshe Zadka 
 29  # Paul Swartz 
 30  # Pavel Pergamenshchik 
 31  # Ralph Meijer 
 32  # Sean Riley 
 33  # Travis B. Hartwell 
 34  # Thomas Herve 
 35  # Eyal Lotem 
 36  # Antoine Pitrou 
 37  # Andy Gayton 
 38  #  
 39  # Permission is hereby granted, free of charge, to any person obtaining 
 40  # a copy of this software and associated documentation files (the 
 41  # "Software"), to deal in the Software without restriction, including 
 42  # without limitation the rights to use, copy, modify, merge, publish, 
 43  # distribute, sublicense, and/or sell copies of the Software, and to 
 44  # permit persons to whom the Software is furnished to do so, subject to 
 45  # the following conditions: 
 46  #  
 47  # The above copyright notice and this permission notice shall be 
 48  # included in all copies or substantial portions of the Software. 
 49  #  
 50  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 
 51  # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 
 52  # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
 53  # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 
 54  # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 
 55  # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 
 56  # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 
 57  #  
 58   
 59  from twisted.internet import defer, task, reactor 
 60  from twisted.python import failure 
 61  from twisted.python.util import mergeFunctionMetadata 
 62   
 63  """ 
 64  Implements the Cancellable Deferred as discussed at 
 65  http://twistedmatrix.com/trac/ticket/990 
 66  """ 
 67   
68 -class CancelledError(Exception):
69 pass
70
71 -class CancellableDeferred(defer.Deferred):
72 """ 73 see L{twisted.internet.defer.Deferred}. 74 75 When creating this Deferred, you may provide a canceller function, 76 which will be called by d.cancel() to let you do any cleanup necessary 77 if the user decides not to wait for the deferred to complete. 78 """ 79 80 suppressAlreadyCalled = 0 81
82 - def __init__(self, canceller=None):
83 self.canceller = canceller 84 defer.Deferred.__init__(self)
85
86 - def cancel(self):
87 """Cancel this deferred. 88 89 If the deferred is waiting on another deferred, forward the 90 cancellation to the other deferred. 91 92 If the deferred has not yet been errback'd/callback'd, call 93 the canceller function provided to the constructor. If that 94 function does not do a callback/errback, or if no canceller 95 function was provided, errback with CancelledError. 96 97 Otherwise, raise AlreadyCalledError. 98 """ 99 canceller=self.canceller 100 if not self.called: 101 if canceller: 102 canceller(self) 103 else: 104 # Eat the callback that will eventually be fired 105 # since there was no real canceller. 106 self.suppressAlreadyCalled = 1 107 108 if not self.called: 109 # The canceller didn't do an errback of its own 110 try: 111 raise CancelledError 112 except: 113 self.errback(failure.Failure()) 114 elif isinstance(self.result, CancellableDeferred): 115 # Waiting for another deferred -- cancel it instead 116 # FIXME: possible problem with old API? 117 self.result.cancel() 118 else: 119 # Called and not waiting for another deferred 120 raise defer.AlreadyCalledError
121
122 - def _startRunCallbacks(self, result):
123 # Canceller is no longer relevant 124 self.canceller=None 125 126 if self.called: 127 if self.suppressAlreadyCalled: 128 self.suppressAlreadyCalled = False 129 return 130 raise defer.AlreadyCalledError 131 132 defer.Deferred._startRunCallbacks(self, result)
133
134 -class CancellableDeferredIterator(object):
135 - def __init__(self, iterator, *args, **kw):
136 self.iterator = iterator 137 self.args = args 138 self.kw = kw 139 self.deferred = None 140 self.cancelled = False
141
142 - def cancel(self):
143 self.cancelled = True 144 if self.deferred is not None: 145 self.deferred.cancel() 146 self.deferred = None
147
148 - def _called(self, result_or_failure):
149 self.deferred = None 150 151 return result_or_failure
152
153 - def __iter__(self):
154 if self.cancelled: 155 yield iter([]) 156 return 157 158 for deferred in self.iterator(*self.args, **self.kw): 159 self.deferred = deferred or defer.succeed(None) 160 self.deferred.addBoth(self._called) 161 yield self.deferred 162 163 if self.cancelled: 164 break
165
166 -def cancellable_deferred_iterator(iterator):
167 """ 168 Decorator for iterators that internally use cancellable deferreds. This 169 one allows you to cancel the iterator operation and it propagates the 170 cancel call to the deferred that is currently hold. 171 """ 172 def wrapper(*args, **kw): 173 return CancellableDeferredIterator(iterator, *args, **kw)
174 175 wrapper = mergeFunctionMetadata(iterator, wrapper) 176 return wrapper 177
178 -def _cancellable_coiterate_impl(coiterate, iterator_factory, *args, **kw):
179 """ 180 Wrapper around @coiterate to iterate and return a usual 181 L{CancellableDeferred} that can cancel the iteration. 182 """ 183 cancel_iter = CancellableDeferredIterator(iterator_factory, *args, **kw) 184 def cancel(deferred): 185 cancel_iter.cancel()
186 187 cancel_dfr = CancellableDeferred(cancel) 188 dfr = coiterate(iter(cancel_iter)) 189 dfr.chainDeferred(cancel_dfr) 190 return cancel_dfr 191
192 -def cancellable_coiterate(iterator_factory, *args, **kw):
193 """ 194 Wrapper around task.coiterate to iterate and return a usual 195 L{CancellableDeferred} that can cancel the iteration. 196 """ 197 return _cancellable_coiterate_impl(task.coiterate, iterator_factory, 198 *args, **kw)
199
200 -def delay_coiterate(delay, iterator):
201 """ 202 Does the same thing as task.coiterate(), but only run an iteration every 203 @delay seconds 204 """ 205 206 if delay == 0: 207 # We use task.coiterate, else our delay_scheduler would make us starve 208 # the reactor 209 return task.coiterate(iterator) 210 211 def delay_scheduler(x): 212 # Determines when the next cooperator step will be run 213 return reactor.callLater(delay, x)
214 215 def always_terminate_factory(): 216 # Returns a function that will determine whether to stop iterating 217 # for the current step of the cooperator. We always return true, so 218 # that the cooperator does only one iteration at each step 219 return lambda: True 220 221 cooperator = task.Cooperator(scheduler=delay_scheduler, 222 terminationPredicateFactory=always_terminate_factory) 223 return cooperator.coiterate(iterator) 224
225 -def cancellable_delay_coiterate(delay, iterator_factory, *args, **kw):
226 """ 227 Wrapper around delay_coiterate to iterate and return a usual 228 L{CancellableDeferred} that can cancel the iteration. 229 """ 230 def coiterate(iterator): 231 return delay_coiterate(delay, iterator)
232 233 return _cancellable_coiterate_impl(coiterate, iterator_factory, *args, **kw) 234