background.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. #
  4. # Copyright (c) 2014 [email protected]
  5. #
  6. # MIT Licence. See http://opensource.org/licenses/MIT
  7. #
  8. # Created on 2014-04-06
  9. #
  10. """Run background tasks."""
  11. from __future__ import print_function, unicode_literals
  12. import sys
  13. import os
  14. import subprocess
  15. import pickle
  16. from workflow import Workflow
  17. __all__ = ['is_running', 'run_in_background']
  18. _wf = None
  19. def wf():
  20. global _wf
  21. if _wf is None:
  22. _wf = Workflow()
  23. return _wf
  24. def _arg_cache(name):
  25. """Return path to pickle cache file for arguments.
  26. :param name: name of task
  27. :type name: ``unicode``
  28. :returns: Path to cache file
  29. :rtype: ``unicode`` filepath
  30. """
  31. return wf().cachefile('{0}.argcache'.format(name))
  32. def _pid_file(name):
  33. """Return path to PID file for ``name``.
  34. :param name: name of task
  35. :type name: ``unicode``
  36. :returns: Path to PID file for task
  37. :rtype: ``unicode`` filepath
  38. """
  39. return wf().cachefile('{0}.pid'.format(name))
  40. def _process_exists(pid):
  41. """Check if a process with PID ``pid`` exists.
  42. :param pid: PID to check
  43. :type pid: ``int``
  44. :returns: ``True`` if process exists, else ``False``
  45. :rtype: ``Boolean``
  46. """
  47. try:
  48. os.kill(pid, 0)
  49. except OSError: # not running
  50. return False
  51. return True
  52. def is_running(name):
  53. """Test whether task is running under ``name``.
  54. :param name: name of task
  55. :type name: ``unicode``
  56. :returns: ``True`` if task with name ``name`` is running, else ``False``
  57. :rtype: ``Boolean``
  58. """
  59. pidfile = _pid_file(name)
  60. if not os.path.exists(pidfile):
  61. return False
  62. with open(pidfile, 'rb') as file_obj:
  63. pid = int(file_obj.read().strip())
  64. if _process_exists(pid):
  65. return True
  66. elif os.path.exists(pidfile):
  67. os.unlink(pidfile)
  68. return False
  69. def _background(stdin='/dev/null', stdout='/dev/null',
  70. stderr='/dev/null'): # pragma: no cover
  71. """Fork the current process into a background daemon.
  72. :param stdin: where to read input
  73. :type stdin: filepath
  74. :param stdout: where to write stdout output
  75. :type stdout: filepath
  76. :param stderr: where to write stderr output
  77. :type stderr: filepath
  78. """
  79. def _fork_and_exit_parent(errmsg):
  80. try:
  81. pid = os.fork()
  82. if pid > 0:
  83. os._exit(0)
  84. except OSError as err:
  85. wf().logger.critical('%s: (%d) %s', errmsg, err.errno,
  86. err.strerror)
  87. raise err
  88. # Do first fork.
  89. _fork_and_exit_parent('fork #1 failed')
  90. # Decouple from parent environment.
  91. os.chdir(wf().workflowdir)
  92. os.setsid()
  93. # Do second fork.
  94. _fork_and_exit_parent('fork #2 failed')
  95. # Now I am a daemon!
  96. # Redirect standard file descriptors.
  97. si = open(stdin, 'r', 0)
  98. so = open(stdout, 'a+', 0)
  99. se = open(stderr, 'a+', 0)
  100. if hasattr(sys.stdin, 'fileno'):
  101. os.dup2(si.fileno(), sys.stdin.fileno())
  102. if hasattr(sys.stdout, 'fileno'):
  103. os.dup2(so.fileno(), sys.stdout.fileno())
  104. if hasattr(sys.stderr, 'fileno'):
  105. os.dup2(se.fileno(), sys.stderr.fileno())
  106. def run_in_background(name, args, **kwargs):
  107. r"""Cache arguments then call this script again via :func:`subprocess.call`.
  108. :param name: name of task
  109. :type name: ``unicode``
  110. :param args: arguments passed as first argument to :func:`subprocess.call`
  111. :param \**kwargs: keyword arguments to :func:`subprocess.call`
  112. :returns: exit code of sub-process
  113. :rtype: ``int``
  114. When you call this function, it caches its arguments and then calls
  115. ``background.py`` in a subprocess. The Python subprocess will load the
  116. cached arguments, fork into the background, and then run the command you
  117. specified.
  118. This function will return as soon as the ``background.py`` subprocess has
  119. forked, returning the exit code of *that* process (i.e. not of the command
  120. you're trying to run).
  121. If that process fails, an error will be written to the log file.
  122. If a process is already running under the same name, this function will
  123. return immediately and will not run the specified command.
  124. """
  125. if is_running(name):
  126. wf().logger.info('Task `{0}` is already running'.format(name))
  127. return
  128. argcache = _arg_cache(name)
  129. # Cache arguments
  130. with open(argcache, 'wb') as file_obj:
  131. pickle.dump({'args': args, 'kwargs': kwargs}, file_obj)
  132. wf().logger.debug('Command arguments cached to `{0}`'.format(argcache))
  133. # Call this script
  134. cmd = ['/usr/bin/python', __file__, name]
  135. wf().logger.debug('Calling {0!r} ...'.format(cmd))
  136. retcode = subprocess.call(cmd)
  137. if retcode: # pragma: no cover
  138. wf().logger.error('Failed to call task in background')
  139. else:
  140. wf().logger.debug('Executing task `{0}` in background...'.format(name))
  141. return retcode
  142. def main(wf): # pragma: no cover
  143. """Run command in a background process.
  144. Load cached arguments, fork into background, then call
  145. :meth:`subprocess.call` with cached arguments.
  146. """
  147. name = wf.args[0]
  148. argcache = _arg_cache(name)
  149. if not os.path.exists(argcache):
  150. wf.logger.critical('No arg cache found : {0!r}'.format(argcache))
  151. return 1
  152. # Load cached arguments
  153. with open(argcache, 'rb') as file_obj:
  154. data = pickle.load(file_obj)
  155. # Cached arguments
  156. args = data['args']
  157. kwargs = data['kwargs']
  158. # Delete argument cache file
  159. os.unlink(argcache)
  160. pidfile = _pid_file(name)
  161. # Fork to background
  162. _background()
  163. # Write PID to file
  164. with open(pidfile, 'wb') as file_obj:
  165. file_obj.write('{0}'.format(os.getpid()))
  166. # Run the command
  167. try:
  168. wf.logger.debug('Task `{0}` running'.format(name))
  169. wf.logger.debug('cmd : {0!r}'.format(args))
  170. retcode = subprocess.call(args, **kwargs)
  171. if retcode:
  172. wf.logger.error('Command failed with [{0}] : {1!r}'.format(
  173. retcode, args))
  174. finally:
  175. if os.path.exists(pidfile):
  176. os.unlink(pidfile)
  177. wf.logger.debug('Task `{0}` finished'.format(name))
  178. if __name__ == '__main__': # pragma: no cover
  179. wf().run(main)