Package elisa :: Package core :: Package utils :: Module cancellable_queue

Source Code for Module elisa.core.utils.cancellable_queue

  1  # -*- coding: utf-8 -*- 
  2  # Moovida - Home multimedia server 
  3  # Copyright (C) 2006-2009 Fluendo Embedded S.L. (www.fluendo.com). 
  4  # All rights reserved. 
  5  # 
  6  # This file is available under one of two license agreements. 
  7  # 
  8  # This file is licensed under the GPL version 3. 
  9  # See "LICENSE.GPL" in the root of this distribution including a special 
 10  # exception to use Moovida with Fluendo's plugins. 
 11  # 
 12  # The GPL part of Moovida is also available under a commercial licensing 
 13  # agreement from Fluendo. 
 14  # See "LICENSE.Moovida" in the root directory of this distribution package 
 15  # for details on that license. 
 16  # 
 17  # Author: Florian Boucault <florian@fluendo.com> 
 18   
 19  """ 
 20  Queue of cancellable deferreds. 
 21  """ 
 22   
 23  from elisa.core.log import Loggable 
 24  from elisa.core.utils import defer 
 25   
26 -class Request(object):
27 """ 28 Encapsulate a request to call a function. It is used in L{CancellableQueue} 29 to queue the function calls. 30 31 @ivar call: function call encapsulated 32 @type call: callable 33 @ivar args: positional arguments to be passed to the function 34 @type args: tuple of objects 35 @ivar kwargs: keyword arguments to be passed to the function 36 @type kwargs: dict of the form {str: object} 37 @ivar dfr: deferred used as an interface with the request: fired when 38 the request has been processed 39 @type dfr: L{elisa.core.utils.cancellable_defer.CancellableDeferred} 40 @ivar call_dfr: deferred returned by the function call 41 @type call_dfr: L{elisa.core.utils.cancellable_defer.CancellableDeferred} 42 """
43 - def __init__(self, call, *args, **kwargs):
44 """ 45 @param call: function call encapsulated 46 @type call: callable 47 @param args: positional arguments to be passed to the function 48 @type args: tuple of objects 49 @param kwargs: keyword arguments to be passed to the function 50 @type kwargs: dict of the form {str: object} 51 """ 52 self.call = call 53 self.args = args 54 self.kwargs = kwargs 55 56 # build a deferred that will be the interface with the request 57 self.dfr = defer.Deferred(canceller=self._cancel) 58 # deferred returned by the actual call 59 self.call_dfr = None
60
61 - def _cancel(self, dfr):
62 if self.call_dfr != None: 63 self.call_dfr.cancel() 64 self.call_dfr = None
65
66 - def __str__(self):
67 args_str = "" 68 for value in self.args: 69 args_str += "%s, " % value 70 for name, value in self.kwargs.iteritems(): 71 args_str += "%s=%s, " % (name, value) 72 return "call to %s with arguments: (%s)" % (self.call, args_str)
73
74 -class CancellableQueue(Loggable):
75 """ 76 Queue of cancellable function calls. 77 78 One may need to enqueue function calls so that they are called one at a 79 time sequentially. L{CancellableQueue} allows that while giving the 80 possibility to cancel these function calls before they are processed 81 or while they are made if they return 82 L{elisa.core.utils.cancellable_defer.CancellableDeferred}. 83 """ 84
85 - def __init__(self):
86 super(CancellableQueue, self).__init__() 87 88 # queue containing all the requests that were enqueued and not 89 # processed so far, that is the requests for which the function has 90 # not been called yet 91 self._requests_queue = [] 92 93 # request that is currently being processed, that is the request for 94 # which the function has been called and the deferred returned has not 95 # been fired yet 96 self._current_request = None
97
98 - def enqueue(self, call, *args, **kwargs):
99 """ 100 Enqueue a function call that will be called whenever the previously 101 enqueued calls have been processed. 102 Return a cancellable deferred that will allow interaction with the 103 enqueued function call. 104 105 @param call: function call to enqueue 106 @type call: callable 107 @param args: positional arguments to be passed to the function 108 @type args: tuple of objects 109 @param kwargs: keyword arguments to be passed to the function 110 @type kwargs: dict of the form {str: object} 111 112 @rtype: L{elisa.core.utils.cancellable_defer.CancellableDeferred} 113 """ 114 request = Request(call, *args, **kwargs) 115 self._enqueue_request(request) 116 return request.dfr
117
118 - def empty(self):
119 """ 120 Remove all function call that were enqueued previously. The function 121 call currently being processed is cancelled if it returned a cancellable 122 deferred. 123 """ 124 self.info("Empty loading queue that was containing %s requests" \ 125 % len(self._requests_queue)) 126 127 # clear the preloading queue 128 self._requests_queue[:] = [] 129 130 # cancel the current request 131 if self._current_request != None: 132 self._current_request.dfr.cancel() 133 self._current_request = None
134
135 - def _enqueue_request(self, request):
136 self.info("Enqueue %s" % request) 137 self._requests_queue.insert(0, request) 138 139 # if not busy, start processing that very request 140 if self._current_request == None: 141 self._process_next_request()
142
143 - def _process_next_request(self, dummy=None):
144 try: 145 # find the first request that was not yet cancelled 146 cancelled = True 147 while cancelled: 148 request = self._requests_queue.pop() 149 cancelled = request.dfr.called 150 151 self.info("Process next request in the queue: %s" % request) 152 self._current_request = request 153 dfr = self._process_request(request) 154 dfr.addCallbacks(self._current_request_done, 155 self._current_request_failed) 156 dfr.addBoth(self._process_next_request) 157 except IndexError: 158 self.info("No request left to be processed in the queue") 159 # there is no request left in the queue 160 self._current_request = None
161
162 - def _process_request(self, request):
163 dfr = defer.maybeDeferred(request.call, *request.args, **request.kwargs) 164 request.call_dfr = dfr 165 return dfr
166
167 - def _current_request_done(self, result):
168 self.info("Current request processing done (%s)" % self._current_request) 169 self._current_request.dfr.callback(result) 170 self._current_request = None 171 return result
172
173 - def _current_request_failed(self, failure):
174 if not isinstance(failure.value, defer.CancelledError): 175 self.warning("Current request processing failed: %s" % failure) 176 self._current_request.dfr.errback(failure) 177 self._current_request = None 178 return failure
179