Package elisa :: Package core :: Package utils :: Module misc

Source Code for Module elisa.core.utils.misc

  1  # -*- coding: utf-8 -*- 
  2  # Moovida - Home multimedia server 
  3  # Copyright (C) 2006-2009 Fluendo Embedded S.L. (www.fluendo.com). 
  4  # All rights reserved. 
  5  # 
  6  # This file is available under one of two license agreements. 
  7  # 
  8  # This file is licensed under the GPL version 3. 
  9  # See "LICENSE.GPL" in the root of this distribution including a special 
 10  # exception to use Moovida with Fluendo's plugins. 
 11  # 
 12  # The GPL part of Moovida is also available under a commercial licensing 
 13  # agreement from Fluendo. 
 14  # See "LICENSE.Moovida" in the root directory of this distribution package 
 15  # for details on that license. 
 16  # 
 17  # Authors: Philippe Normand <philippe@fluendo.com> 
 18  #          Benjamin Kampmann <benjamin@fluendo.com> 
 19   
 20  """ 
 21  Miscellaneaous utilities that don't need their own module because they are 
 22  reasonnably small. 
 23  """ 
 24   
 25  import platform 
 26  import os, re 
 27  import subprocess 
 28   
 29  from twisted.trial.unittest import SkipTest 
 30   
 31  import pkg_resources 
 32   
 33  """ 
 34  The name of the environment variable that has to be set to 'True' to run also 
 35  functional tests 
 36  """ 
 37  FUNCTIONAL_TESTS_STR = 'RUN_FUNCTIONAL_TESTS' 
 38   
 39  # Use the same line separator on all the platforms (yes, including Windows) 
 40  LINESEP = '\n' 
 41   
 42   
43 -def env_var_expand(var_name):
44 """ 45 Expand the given environment variable content. If it contains 46 other references to environment variables, they are expanded too. 47 48 Supported platforms are win32 and linux. 49 50 Example of use:: 51 52 >>> env_var_expand('$HOME') 53 >>> '/home/phil' 54 55 @raises ValueError: if current system's platform is not windows or linux 56 @param var_name: environment variable 57 @type var_name: string 58 @rtype: string 59 """ 60 platform_type = platform.system().lower() 61 if platform_type == 'windows': 62 re_env = re.compile(r'%(\w+)%') 63 def expander(mo): 64 return os.environ.get(mo.group()[1:-1], 'UNKNOWN')
65 66 elif platform_type == 'linux': 67 re_env = re.compile(r'\$(\w+)') 68 69 def expander(mo): 70 xpanded = os.environ.get(mo.group()[1:], 'UNKNOWN') 71 if xpanded.find('$') > -1: 72 xpanded = re_env.sub(expander, xpanded) 73 return xpanded 74 75 else: 76 raise ValueError("Unsupported platform") 77 78 expanded = re_env.sub(expander, var_name) 79 return expanded 80
81 -def env_var_explode_list(var_name, default=''):
82 """ 83 Explode a list of values stored in an environment variable as a 84 single string. On win32 the item separator is ';' and on other 85 platforms it is ':'. 86 87 Example of use:: 88 89 >>> env_var_explode_list('$PATH') 90 >>> ['/usr/bin','/bin'] 91 92 @param var_name: environment variable 93 @type var_name: string 94 @keyword default: value to use if environment variable not found 95 @type default: string 96 @rtype: list of strings 97 """ 98 value = os.environ.get(var_name, default) 99 platform_type = platform.system().lower() 100 if platform_type == 'windows': 101 separator = ';' 102 else: 103 separator = ':' 104 exploded = value.split(separator) 105 if exploded == ['']: 106 exploded = [] 107 return exploded
108 109
110 -def un_camelify(camel_string):
111 """ 112 Convert CamelCase styled strings to lower_cased style. 113 114 @param camel_string: CamelStyled string to convert 115 @type camel_string: string 116 @rtype: string 117 """ 118 if len(camel_string) < 2: 119 un_cameled = camel_string.lower() 120 else: 121 camel_string = camel_string.replace(' ', '') 122 un_cameled = camel_string[0].lower() 123 for letter in camel_string[1:]: 124 if letter.isupper(): 125 un_cameled += '_%s' % letter.lower() 126 else: 127 un_cameled += letter 128 return un_cameled
129 130 # cache the result of the first is_hildon_desktop_running call 131 _hildon_desktop_running = None 132
133 -def is_hildon_desktop_running():
134 global _hildon_desktop_running 135 136 try: 137 import dbus 138 except ImportError: 139 _hildon_desktop_running = False 140 return False 141 142 if _hildon_desktop_running is not None: 143 # return the cached value 144 return _hildon_desktop_running 145 try: 146 bus = dbus.SystemBus() 147 driver = bus.get_object(dbus.BUS_DAEMON_NAME, 148 dbus.BUS_DAEMON_PATH, dbus.BUS_DAEMON_IFACE) 149 iface = dbus.Interface(driver, dbus.BUS_DAEMON_IFACE) 150 _hildon_desktop_running = \ 151 bool(iface.NameHasOwner('org.moblin.SystemDaemon')) 152 except dbus.DBusException: 153 # it fails so we assume it is not hildon 154 _hildon_desktop_running = False 155 return False 156 157 return _hildon_desktop_running
158
159 -def get_distro():
160 if platform.system() == 'Windows': 161 platform_name = platform.platform() 162 # example os_name = Windows, os_version = Vista 163 os_name, os_version = platform_name.split('-')[:2] 164 distro = ' '.join([os_name, os_version]) 165 else: 166 try: 167 output = subprocess.Popen(["lsb_release", "-a"], 168 stderr=subprocess.STDOUT, 169 stdout=subprocess.PIPE).communicate()[0] 170 except OSError, error: 171 # lsb_release program not found, let's use a crappy method. 172 known_distros = {"Debian" : "/etc/debian_version", 173 #"RedHat" : "/etc/redhat-release", 174 "SUSE" : "/etc/SUSE-release", 175 "Fedora" : "/etc/fedora-release", 176 "Gentoo" : "/etc/gentoo-release", 177 "Slackware" : "/etc/slackware-version", 178 "Mandrake" : "/etc/mandrake-release", 179 "YellowDog" : "/etc/yellowdog-release", 180 "SUN JDS" : "/etc/sun-release", 181 "UnitedLinux" : "/etc/UnitedLinux-release" 182 } 183 distro = 'Unknown' 184 for name, location in known_distros.iteritems(): 185 if os.path.exists(location): 186 distro = name 187 break 188 else: 189 distro_infos = {} 190 lines = output.splitlines() 191 for line in lines: 192 try: 193 key, value = line.split(':') 194 except ValueError: 195 continue 196 # strip \t 197 value = value[1:] 198 distro_infos[key] = value 199 200 distro = distro_infos['Distributor ID'] 201 # TODO: maybe make use of Release information (Intrepid, Hardy, 2009, etc) 202 203 return distro
204
205 -def get_os_name():
206 distro = get_distro() 207 if distro != "Unknown": 208 return distro 209 else: 210 return platform.system().lower()
211
212 -def linux_pidof(program_name):
213 """ 214 Get the Linux process id of the given program name. Because 215 multiple processes can exist, we return a list of the pids. 216 217 @rtype: C{list} of C{int} 218 @returns: the list of running pids of given program name 219 """ 220 output = subprocess.Popen(["pidof", program_name], 221 stderr=subprocess.STDOUT, 222 stdout=subprocess.PIPE).communicate()[0] 223 return [int(pid) for pid in output.split()]
224
225 -def get_user_desktop_name():
226 """ Detect the Desktop environment used by the user. On windows 227 this returns empty string. On other OSes we can currently detect 228 gnome, kde and xfce. 229 230 @rtype: C{str} 231 @returns: the desktop name used (either gnome, kde or xfce) 232 """ 233 if platform.system() == 'Windows': 234 desktop_name = "" 235 else: 236 # first try to get the DESKTOP_SESSION env var, which is set 237 # by at least kdm and gdm 238 desktop_name = os.getenv("DESKTOP_SESSION") 239 if desktop_name in (None, 'default'): 240 # if that fails, try some KDE env var 241 if os.getenv('KDE_FULL_SESSION') is not None: 242 desktop_name = 'kde' 243 else: 244 # or scan running processes 245 # TODO: add more here 246 progs = {'gnome-session': 'gnome', 247 # http://en.wikipedia.org/wiki/Gnome-panel 248 'gnome-panel': 'gnome', 249 'ksmserver': 'kde', 250 # http://docs.kde.org/stable/en/kdebase-runtime/userguide/the-kdeinit-mystery.html 251 'kdeinit': 'kde', 252 # http://www.xfce.org/documentation/4.2/manuals/xfce-mcs-manager 253 'xfce-mcs-manager': 'xfce', 254 # http://www.xfce.org/documentation/4.2/manuals/xfwm4 255 'xfwm4': 'xfce', 256 } 257 for prog, name in progs.iteritems(): 258 if linux_pidof(prog) != []: 259 desktop_name = name 260 break 261 if desktop_name is None: 262 desktop_name = '' 263 return desktop_name
264
265 -def pkg_resources_copy_dir(resource_spec, resource_dir, dest_dir):
266 """ 267 Copy a directory recursively using pkg_resources. 268 """ 269 assert pkg_resources.resource_isdir(resource_spec, resource_dir) 270 271 # add '' at the end so that working_dir ends with a slash 272 working_dir = os.path.join(dest_dir, '') 273 274 pkg_resources.ensure_directory(working_dir) 275 276 for rel_entry in pkg_resources.resource_listdir(resource_spec, resource_dir): 277 # rel_entry is relative to resource_dir 278 entry = os.path.join(resource_dir, rel_entry) 279 280 if pkg_resources.resource_isdir(resource_spec, entry): 281 dest_entry = os.path.join(dest_dir, rel_entry) 282 pkg_resources_copy_dir(resource_spec, entry, dest_entry) 283 continue 284 285 working_file = os.path.join(working_dir, rel_entry) 286 resource_stream = pkg_resources.resource_stream(resource_spec, entry) 287 288 out = file(working_file, 'w') 289 out.write(resource_stream.read()) 290 out.close() 291 resource_stream.close()
292
293 -def run_functional_tests_check():
294 """ 295 Check whether functional tests should be run or not. This is done by 296 checking whether the L{FUNCTIONAL_TESTS_STR} environment variable is set to 297 'True'. If it is not explictly set to True, this method raises a 298 L{twisted.trial.unittest.SkipTest} (saying that the functional test is not 299 run). 300 301 This method allows functional tests to have an easy way to check if they 302 should be run. This simple example checks in the setup (but it could also 303 be done inside the method itself):: 304 305 from elisa.core.utils.misc import run_functional_tests_check 306 [...] 307 class MyFunctionalTestCase(TestCase): 308 309 def setUp(self): 310 run_functional_tests_check() 311 ... 312 """ 313 msg = "Functional Test skipped. If you want to run it set the environment"\ 314 " variable '%s' to True" % FUNCTIONAL_TESTS_STR 315 316 if os.environ.get(FUNCTIONAL_TESTS_STR, 'False').lower() != 'true': 317 raise SkipTest(msg)
318
319 -def chainDeferredIgnoringResult(first, chained):
320 def cb(result): 321 chained.callback(result) 322 return result
323 324 def eb(failure): 325 chained.errback(failure) 326 return failure 327 328 first.addCallbacks(cb, eb) 329 330
331 -def mappings_from_text(text):
332 """ 333 Unserialise mappings from a string L{text}. Mappings are lists of 2-uplets. 334 335 Example:: 336 337 key1 = value1 338 key2 = value2a 339 key2 = value2b 340 341 is deserialised into:: 342 343 [("key1", "value1"), ("key2", "value2a"), ("key2", "value2b")] 344 """ 345 mappings = [] 346 if text != None: 347 for line in text.splitlines(): 348 m = tuple([elt.strip() for elt in line.split('=')]) 349 if len(m) != 2: 350 raise ValueError 351 mappings.append(m) 352 353 return mappings
354
355 -def text_from_mappings(mappings):
356 """ 357 Takes a L{mappings} and return a serialised form. Mappings are list of 358 2-uplets. 359 360 Example:: 361 362 [("key1", "value1"), ("key2", "value2a"), ("key2", "value2b")] 363 364 is serialised into:: 365 366 key1 = value1 367 key2 = value2a 368 key2 = value2b 369 """ 370 text = "" 371 if mappings != None: 372 for key, value in mappings: 373 text += "%s = %s%s" % (key, value, LINESEP) 374 375 return text
376
377 -def read_mappings(iter_lines):
378 """ 379 Setuptools compliant deserialiser for mappings. 380 """ 381 text = LINESEP.join(iter_lines) 382 return mappings_from_text(text)
383