background.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. #
  4. # Copyright (c) 2014 [email protected]
  5. #
  6. # MIT Licence. See http://opensource.org/licenses/MIT
  7. #
  8. # Created on 2014-04-06
  9. #
  10. """This module provides an API to run commands in background processes.
  11. Combine with the :ref:`caching API <caching-data>` to work from cached data
  12. while you fetch fresh data in the background.
  13. See :ref:`the User Manual <background-processes>` for more information
  14. and examples.
  15. """
  16. from __future__ import print_function, unicode_literals
  17. import signal
  18. import sys
  19. import os
  20. import subprocess
  21. import pickle
  22. from workflow import Workflow
  23. __all__ = ['is_running', 'run_in_background']
  24. _wf = None
  25. def wf():
  26. global _wf
  27. if _wf is None:
  28. _wf = Workflow()
  29. return _wf
  30. def _log():
  31. return wf().logger
  32. def _arg_cache(name):
  33. """Return path to pickle cache file for arguments.
  34. :param name: name of task
  35. :type name: ``unicode``
  36. :returns: Path to cache file
  37. :rtype: ``unicode`` filepath
  38. """
  39. return wf().cachefile(name + '.argcache')
  40. def _pid_file(name):
  41. """Return path to PID file for ``name``.
  42. :param name: name of task
  43. :type name: ``unicode``
  44. :returns: Path to PID file for task
  45. :rtype: ``unicode`` filepath
  46. """
  47. return wf().cachefile(name + '.pid')
  48. def _process_exists(pid):
  49. """Check if a process with PID ``pid`` exists.
  50. :param pid: PID to check
  51. :type pid: ``int``
  52. :returns: ``True`` if process exists, else ``False``
  53. :rtype: ``Boolean``
  54. """
  55. try:
  56. os.kill(pid, 0)
  57. except OSError: # not running
  58. return False
  59. return True
  60. def _job_pid(name):
  61. """Get PID of job or `None` if job does not exist.
  62. Args:
  63. name (str): Name of job.
  64. Returns:
  65. int: PID of job process (or `None` if job doesn't exist).
  66. """
  67. pidfile = _pid_file(name)
  68. if not os.path.exists(pidfile):
  69. return
  70. with open(pidfile, 'rb') as fp:
  71. pid = int(fp.read())
  72. if _process_exists(pid):
  73. return pid
  74. os.unlink(pidfile)
  75. def is_running(name):
  76. """Test whether task ``name`` is currently running.
  77. :param name: name of task
  78. :type name: unicode
  79. :returns: ``True`` if task with name ``name`` is running, else ``False``
  80. :rtype: bool
  81. """
  82. if _job_pid(name) is not None:
  83. return True
  84. return False
  85. def _background(pidfile, stdin='/dev/null', stdout='/dev/null',
  86. stderr='/dev/null'): # pragma: no cover
  87. """Fork the current process into a background daemon.
  88. :param pidfile: file to write PID of daemon process to.
  89. :type pidfile: filepath
  90. :param stdin: where to read input
  91. :type stdin: filepath
  92. :param stdout: where to write stdout output
  93. :type stdout: filepath
  94. :param stderr: where to write stderr output
  95. :type stderr: filepath
  96. """
  97. def _fork_and_exit_parent(errmsg, wait=False, write=False):
  98. try:
  99. pid = os.fork()
  100. if pid > 0:
  101. if write: # write PID of child process to `pidfile`
  102. tmp = pidfile + '.tmp'
  103. with open(tmp, 'wb') as fp:
  104. fp.write(str(pid))
  105. os.rename(tmp, pidfile)
  106. if wait: # wait for child process to exit
  107. os.waitpid(pid, 0)
  108. os._exit(0)
  109. except OSError as err:
  110. _log().critical('%s: (%d) %s', errmsg, err.errno, err.strerror)
  111. raise err
  112. # Do first fork and wait for second fork to finish.
  113. _fork_and_exit_parent('fork #1 failed', wait=True)
  114. # Decouple from parent environment.
  115. os.chdir(wf().workflowdir)
  116. os.setsid()
  117. # Do second fork and write PID to pidfile.
  118. _fork_and_exit_parent('fork #2 failed', write=True)
  119. # Now I am a daemon!
  120. # Redirect standard file descriptors.
  121. si = open(stdin, 'r', 0)
  122. so = open(stdout, 'a+', 0)
  123. se = open(stderr, 'a+', 0)
  124. if hasattr(sys.stdin, 'fileno'):
  125. os.dup2(si.fileno(), sys.stdin.fileno())
  126. if hasattr(sys.stdout, 'fileno'):
  127. os.dup2(so.fileno(), sys.stdout.fileno())
  128. if hasattr(sys.stderr, 'fileno'):
  129. os.dup2(se.fileno(), sys.stderr.fileno())
  130. def kill(name, sig=signal.SIGTERM):
  131. """Send a signal to job ``name`` via :func:`os.kill`.
  132. .. versionadded:: 1.29
  133. Args:
  134. name (str): Name of the job
  135. sig (int, optional): Signal to send (default: SIGTERM)
  136. Returns:
  137. bool: `False` if job isn't running, `True` if signal was sent.
  138. """
  139. pid = _job_pid(name)
  140. if pid is None:
  141. return False
  142. os.kill(pid, sig)
  143. return True
  144. def run_in_background(name, args, **kwargs):
  145. r"""Cache arguments then call this script again via :func:`subprocess.call`.
  146. :param name: name of job
  147. :type name: unicode
  148. :param args: arguments passed as first argument to :func:`subprocess.call`
  149. :param \**kwargs: keyword arguments to :func:`subprocess.call`
  150. :returns: exit code of sub-process
  151. :rtype: int
  152. When you call this function, it caches its arguments and then calls
  153. ``background.py`` in a subprocess. The Python subprocess will load the
  154. cached arguments, fork into the background, and then run the command you
  155. specified.
  156. This function will return as soon as the ``background.py`` subprocess has
  157. forked, returning the exit code of *that* process (i.e. not of the command
  158. you're trying to run).
  159. If that process fails, an error will be written to the log file.
  160. If a process is already running under the same name, this function will
  161. return immediately and will not run the specified command.
  162. """
  163. if is_running(name):
  164. _log().info('[%s] job already running', name)
  165. return
  166. argcache = _arg_cache(name)
  167. # Cache arguments
  168. with open(argcache, 'wb') as fp:
  169. pickle.dump({'args': args, 'kwargs': kwargs}, fp)
  170. _log().debug('[%s] command cached: %s', name, argcache)
  171. # Call this script
  172. cmd = ['/usr/bin/python', __file__, name]
  173. _log().debug('[%s] passing job to background runner: %r', name, cmd)
  174. retcode = subprocess.call(cmd)
  175. if retcode: # pragma: no cover
  176. _log().error('[%s] background runner failed with %d', name, retcode)
  177. else:
  178. _log().debug('[%s] background job started', name)
  179. return retcode
  180. def main(wf): # pragma: no cover
  181. """Run command in a background process.
  182. Load cached arguments, fork into background, then call
  183. :meth:`subprocess.call` with cached arguments.
  184. """
  185. log = wf.logger
  186. name = wf.args[0]
  187. argcache = _arg_cache(name)
  188. if not os.path.exists(argcache):
  189. msg = '[{0}] command cache not found: {1}'.format(name, argcache)
  190. log.critical(msg)
  191. raise IOError(msg)
  192. # Fork to background and run command
  193. pidfile = _pid_file(name)
  194. _background(pidfile)
  195. # Load cached arguments
  196. with open(argcache, 'rb') as fp:
  197. data = pickle.load(fp)
  198. # Cached arguments
  199. args = data['args']
  200. kwargs = data['kwargs']
  201. # Delete argument cache file
  202. os.unlink(argcache)
  203. try:
  204. # Run the command
  205. log.debug('[%s] running command: %r', name, args)
  206. retcode = subprocess.call(args, **kwargs)
  207. if retcode:
  208. log.error('[%s] command failed with status %d', name, retcode)
  209. finally:
  210. os.unlink(pidfile)
  211. log.debug('[%s] job complete', name)
  212. if __name__ == '__main__': # pragma: no cover
  213. wf().run(main)