1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 from twisted.internet import defer, task, reactor
60 from twisted.python import failure
61 from twisted.python.util import mergeFunctionMetadata
62
63 """
64 Implements the Cancellable Deferred as discussed at
65 http://twistedmatrix.com/trac/ticket/990
66 """
67
70
72 """
73 see L{twisted.internet.defer.Deferred}.
74
75 When creating this Deferred, you may provide a canceller function,
76 which will be called by d.cancel() to let you do any cleanup necessary
77 if the user decides not to wait for the deferred to complete.
78 """
79
80 suppressAlreadyCalled = 0
81
85
87 """Cancel this deferred.
88
89 If the deferred is waiting on another deferred, forward the
90 cancellation to the other deferred.
91
92 If the deferred has not yet been errback'd/callback'd, call
93 the canceller function provided to the constructor. If that
94 function does not do a callback/errback, or if no canceller
95 function was provided, errback with CancelledError.
96
97 Otherwise, raise AlreadyCalledError.
98 """
99 canceller=self.canceller
100 if not self.called:
101 if canceller:
102 canceller(self)
103 else:
104
105
106 self.suppressAlreadyCalled = 1
107
108 if not self.called:
109
110 try:
111 raise CancelledError
112 except:
113 self.errback(failure.Failure())
114 elif isinstance(self.result, CancellableDeferred):
115
116
117 self.result.cancel()
118 else:
119
120 raise defer.AlreadyCalledError
121
133
135 - def __init__(self, iterator, *args, **kw):
136 self.iterator = iterator
137 self.args = args
138 self.kw = kw
139 self.deferred = None
140 self.cancelled = False
141
143 self.cancelled = True
144 if self.deferred is not None:
145 self.deferred.cancel()
146 self.deferred = None
147
148 - def _called(self, result_or_failure):
149 self.deferred = None
150
151 return result_or_failure
152
154 if self.cancelled:
155 yield iter([])
156 return
157
158 for deferred in self.iterator(*self.args, **self.kw):
159 self.deferred = deferred or defer.succeed(None)
160 self.deferred.addBoth(self._called)
161 yield self.deferred
162
163 if self.cancelled:
164 break
165
167 """
168 Decorator for iterators that internally use cancellable deferreds. This
169 one allows you to cancel the iterator operation and it propagates the
170 cancel call to the deferred that is currently hold.
171 """
172 def wrapper(*args, **kw):
173 return CancellableDeferredIterator(iterator, *args, **kw)
174
175 wrapper = mergeFunctionMetadata(iterator, wrapper)
176 return wrapper
177
179 """
180 Wrapper around @coiterate to iterate and return a usual
181 L{CancellableDeferred} that can cancel the iteration.
182 """
183 cancel_iter = CancellableDeferredIterator(iterator_factory, *args, **kw)
184 def cancel(deferred):
185 cancel_iter.cancel()
186
187 cancel_dfr = CancellableDeferred(cancel)
188 dfr = coiterate(iter(cancel_iter))
189 dfr.chainDeferred(cancel_dfr)
190 return cancel_dfr
191
193 """
194 Wrapper around task.coiterate to iterate and return a usual
195 L{CancellableDeferred} that can cancel the iteration.
196 """
197 return _cancellable_coiterate_impl(task.coiterate, iterator_factory,
198 *args, **kw)
199
201 """
202 Does the same thing as task.coiterate(), but only run an iteration every
203 @delay seconds
204 """
205
206 if delay == 0:
207
208
209 return task.coiterate(iterator)
210
211 def delay_scheduler(x):
212
213 return reactor.callLater(delay, x)
214
215 def always_terminate_factory():
216
217
218
219 return lambda: True
220
221 cooperator = task.Cooperator(scheduler=delay_scheduler,
222 terminationPredicateFactory=always_terminate_factory)
223 return cooperator.coiterate(iterator)
224
226 """
227 Wrapper around delay_coiterate to iterate and return a usual
228 L{CancellableDeferred} that can cancel the iteration.
229 """
230 def coiterate(iterator):
231 return delay_coiterate(delay, iterator)
232
233 return _cancellable_coiterate_impl(coiterate, iterator_factory, *args, **kw)
234