ds-logpipe.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. #!/usr/bin/env python
  2. import sys
  3. import os, os.path
  4. import errno
  5. import signal
  6. import pprint
  7. import types
  8. import time
  9. import fcntl
  10. import pwd
  11. maxlines = 1000 # set on command line
  12. S_IFIFO = 0010000
  13. buffer = [] # default circular buffer used by default plugin
  14. totallines = 0
  15. logfname = "" # name of log pipe
  16. debug = False
  17. # default plugin just keeps a circular buffer
  18. def defaultplugin(line):
  19. global totallines
  20. buffer.append(line)
  21. totallines = totallines + 1
  22. if len(buffer) > maxlines:
  23. del buffer[0]
  24. return True
  25. def printbuffer():
  26. sys.stdout.writelines(buffer)
  27. print "Read %d total lines" % totallines
  28. print logfname, "=" * 60
  29. sys.stdout.flush()
  30. def defaultpost(): printbuffer()
  31. plgfuncs = [] # list of plugin functions
  32. plgpostfuncs = [] # list of post plugin funcs
  33. def finish():
  34. for postfunc in plgpostfuncs: postfunc()
  35. if options.scriptpidfile: os.unlink(options.scriptpidfile)
  36. sys.exit(0)
  37. def sighandler(signum, frame):
  38. if signum != signal.SIGHUP:
  39. signal.signal(signal.SIGHUP, signal.SIG_DFL)
  40. signal.signal(signal.SIGINT, signal.SIG_DFL)
  41. #signal.signal(signal.SIGPIPE, signal.SIG_DFL)
  42. signal.signal(signal.SIGTERM, signal.SIG_DFL)
  43. signal.signal(signal.SIGALRM, signal.SIG_DFL)
  44. if signum == signal.SIGALRM and debug:
  45. print "script timed out waiting to open pipe"
  46. finish()
  47. else: printbuffer()
  48. def isvalidpluginfile(plg):
  49. return os.path.isfile(plg)
  50. def my_import(plgfile):
  51. '''import plgfile as a python module and return
  52. an error string if error - also return the prefunc if any'''
  53. if not isvalidpluginfile(plgfile):
  54. return ("%s is not a valid plugin filename" % plgfile, None, None)
  55. # __import__ searches for the file in sys.path - we cannot
  56. # __import__ a file by the full path
  57. # __import__('basename') looks for basename.py in sys.path
  58. (dir, fname) = os.path.split(plgfile)
  59. base = os.path.splitext(fname)[0]
  60. if not dir: dir = "."
  61. sys.path.insert(0, dir) # put our path first so it will find our file
  62. mod = __import__(base) # will throw exception if problem with python file
  63. sys.path.pop(0) # remove our path
  64. # check for the plugin functions
  65. plgfunc = getattr(mod, 'plugin', None)
  66. if not plgfunc:
  67. return ('%s does not specify a plugin function' % plgfile, None, base)
  68. if not isinstance(plgfunc, types.FunctionType):
  69. return ('the symbol "plugin" in %s is not a function' % plgfile, None, base)
  70. plgfuncs.append(plgfunc) # add to list in cmd line order
  71. # check for 'post' func
  72. plgpostfunc = getattr(mod, 'post', None)
  73. if plgpostfunc:
  74. if not isinstance(plgpostfunc, types.FunctionType):
  75. return ('the symbol "post" in %s is not a function' % plgfile, None, base)
  76. else:
  77. plgpostfuncs.append(plgpostfunc) # add to list in cmd line order
  78. prefunc = getattr(mod, 'pre', None)
  79. # check for 'pre' func
  80. if prefunc and not isinstance(prefunc, types.FunctionType):
  81. return ('the symbol "pre" in %s is not a function' % plgfile, None, base)
  82. return ('', prefunc, base)
  83. def parse_plugins(parser, options, args):
  84. '''Each plugin in the plugins list may have additional
  85. arguments, specified on the command line like this:
  86. --plugin=foo.py foo.bar=1 foo.baz=2 ...
  87. that is, each argument to plugin X will be specified as X.arg=value'''
  88. if not options.plugins: return args
  89. for plgfile in options.plugins:
  90. (errstr, prefunc, base) = my_import(plgfile)
  91. if errstr:
  92. parser.error(errstr)
  93. return args
  94. # parse the arguments to the plugin given on the command line
  95. bvals = {} # holds plugin args and values, if any
  96. newargs = []
  97. for arg in args:
  98. if arg.startswith(base + '.'):
  99. argval = arg.replace(base + '.', '')
  100. (plgarg, plgval) = argval.split('=', 1) # split at first =
  101. if not plgarg in bvals:
  102. bvals[plgarg] = plgval
  103. elif isinstance(bvals[plgarg],list):
  104. bvals[plgarg].append(plgval)
  105. else: # convert to list
  106. bvals[plgarg] = [bvals[plgarg], plgval]
  107. else:
  108. newargs.append(arg)
  109. if prefunc:
  110. if debug:
  111. print 'Calling "pre" function in', plgfile
  112. if not prefunc(bvals):
  113. parser.error('the "pre" function in %s returned an error' % plgfile)
  114. args = newargs
  115. return args
  116. def open_pipe(logfname):
  117. opencompleted = False
  118. logf = None
  119. while not opencompleted:
  120. try:
  121. logf = open(logfname, 'r') # blocks until there is some input
  122. opencompleted = True
  123. except IOError, e:
  124. if e.errno == errno.EINTR:
  125. continue # open was interrupted, try again
  126. else: # hard error
  127. raise Exception, "%s [%d]" % (e.strerror, e.errno)
  128. return logf
  129. def is_proc_alive(procpid):
  130. retval = False
  131. try:
  132. retval = os.path.exists("/proc/%d" % procpid)
  133. except IOError, e:
  134. if e.errno != errno.ENOENT: # may not exist yet - that's ok
  135. # otherwise, probably permissions or other badness
  136. raise Exception, "could not open file %s - %s [%d]" % (procfile, e.strerror, e.errno)
  137. # using /proc/pid failed, try kill
  138. if not retval:
  139. try:
  140. os.kill(procpid, 0) # sig 0 is a "ping"
  141. retval = True # if we got here, proc exists
  142. except OSError, e:
  143. pass # no such process, or EPERM/EACCES
  144. return retval
  145. def get_pid_from_file(pidfile):
  146. procpid = 0
  147. if pidfile:
  148. line = None
  149. try:
  150. pfd = open(pidfile, 'r')
  151. line = pfd.readline()
  152. pfd.close()
  153. except IOError, e:
  154. if e.errno != errno.ENOENT: # may not exist yet - that's ok
  155. # otherwise, probably permissions or other badness
  156. raise Exception, "Could not read pid from file %s - %s [%d]" % (pidfile, e.strerror, e.errno)
  157. if line:
  158. procpid = int(line)
  159. return procpid
  160. def write_pid_file(pidfile):
  161. try:
  162. pfd = open(pidfile, 'w')
  163. pfd.write("%d\n" % os.getpid())
  164. pfd.close()
  165. except IOError, e:
  166. raise Exception, "Could not write pid to file %s - %s [%d]" % (pidfile, e.strerror, e.errno)
  167. def handle_script_pidfile(scriptpidfile):
  168. scriptpid = get_pid_from_file(scriptpidfile)
  169. # 0 if no file or no pid or error
  170. if scriptpid and is_proc_alive(scriptpid):
  171. # already running
  172. if debug:
  173. print "Script is already running: process id %d" % scriptpid
  174. return False
  175. else:
  176. # either process is not running or no file
  177. # write our pid to the file
  178. write_pid_file(scriptpidfile)
  179. return True
  180. def read_and_process_line(logf, plgfuncs):
  181. line = None
  182. done = False
  183. readcompleted = False
  184. while not readcompleted:
  185. try:
  186. line = logf.readline()
  187. readcompleted = True # read completed
  188. except IOError, e:
  189. if e.errno == errno.EINTR:
  190. continue # read was interrupted, try again
  191. else: # hard error
  192. raise Exception, "%s [%d]" % (e.strerror, e.errno)
  193. if line: # read something
  194. for plgfunc in plgfuncs:
  195. if not plgfunc(line):
  196. print "Aborting processing due to function %s.%s" % (plgfunc.__module__, plgfunc.__name__)
  197. finish() # this will exit the process
  198. done = True
  199. break
  200. else: # EOF
  201. done = True
  202. return done
  203. def parse_options():
  204. from optparse import OptionParser
  205. usage = "%prog <name of pipe> [options]"
  206. parser = OptionParser(usage)
  207. parser.add_option("-m", "--maxlines", dest="maxlines", type='int',
  208. help="maximum number of lines to keep in the buffer", default=1000)
  209. parser.add_option("-d", "--debug", dest="debug", action="store_true",
  210. default=False, help="gather extra debugging information")
  211. parser.add_option("-p", "--plugin", type='string', dest='plugins', action='append',
  212. help='filename of a plugin to use with this log')
  213. parser.add_option("-s", "--serverpidfile", type='string', dest='serverpidfile',
  214. help='name of file containing the pid of the server to monitor')
  215. parser.add_option("-t", "--servertimeout", dest="servertimeout", type='int',
  216. help="timeout in seconds to wait for the serverpid to be alive", default=60)
  217. parser.add_option("--serverpid", dest="serverpid", type='int',
  218. help="process id of server to monitor", default=0)
  219. parser.add_option("-u", "--user", type='string', dest='user',
  220. help='name of user to set effective uid to')
  221. parser.add_option("-i", "--scriptpidfile", type='string', dest='scriptpidfile',
  222. help='name of file containing the pid of this script')
  223. options, args = parser.parse_args()
  224. args = parse_plugins(parser, options, args)
  225. if len(args) < 1:
  226. parser.error("You must specify the name of the pipe to use")
  227. if len(args) > 1:
  228. parser.error("error - unhandled command line arguments: %s" % args.join(' '))
  229. return options, args[0]
  230. options, logfname = parse_options()
  231. if options.debug: debug = True
  232. if len(plgfuncs) == 0:
  233. plgfuncs.append(defaultplugin)
  234. if len(plgpostfuncs) == 0:
  235. plgpostfuncs.append(defaultpost)
  236. if options.user:
  237. try: userid = int(options.user)
  238. except ValueError: # not a numeric userid - look it up
  239. userid = pwd.getpwnam(options.user)[2]
  240. os.seteuid(userid)
  241. if options.scriptpidfile:
  242. if not handle_script_pidfile(options.scriptpidfile):
  243. options.scriptpidfile = None
  244. sys.exit(1)
  245. serverpid = options.serverpid
  246. if serverpid:
  247. if not is_proc_alive(serverpid):
  248. print "Server pid [%d] is not alive - exiting" % serverpid
  249. sys.exit(1)
  250. try:
  251. if os.stat(logfname).st_mode & S_IFIFO:
  252. if debug:
  253. print "Using existing log pipe", logfname
  254. else:
  255. print "Error:", logfname, "exists and is not a log pipe"
  256. print "use a filename other than", logfname
  257. sys.exit(1)
  258. except OSError, e:
  259. if e.errno == errno.ENOENT:
  260. if debug:
  261. print "Creating log pipe", logfname
  262. os.mkfifo(logfname)
  263. os.chmod(logfname, 0600)
  264. else:
  265. raise Exception, "%s [%d]" % (e.strerror, e.errno)
  266. if debug:
  267. print "Listening to log pipe", logfname, "number of lines", maxlines
  268. # set up our signal handlers
  269. signal.signal(signal.SIGHUP, sighandler)
  270. signal.signal(signal.SIGINT, sighandler)
  271. #signal.signal(signal.SIGPIPE, sighandler)
  272. signal.signal(signal.SIGTERM, sighandler)
  273. signal.signal(signal.SIGALRM, sighandler)
  274. timerisset = False
  275. neverdone = False
  276. if options.serverpidfile:
  277. # start the timer to wait for the pid file to be available
  278. signal.setitimer(signal.ITIMER_REAL, options.servertimeout)
  279. timerisset = True
  280. # if we are tracking a server, we will be done
  281. # when the server exits
  282. # if not tracking a server, we will only be done
  283. # when we are killed
  284. if not serverpid and not options.serverpidfile:
  285. neverdone = True
  286. done = False
  287. while not done:
  288. # open the pipe - will hang until
  289. # 1. something opens the other end
  290. # 2. alarm goes off - will just exit
  291. logf = open_pipe(logfname)
  292. # if we get here, logf is not None
  293. if debug:
  294. print "opened pipe", logf
  295. if timerisset:
  296. # cancel the timer - the open succeeded
  297. timerisset = False
  298. signal.setitimer(signal.ITIMER_REAL, 0)
  299. if debug:
  300. print "cancelled startup timer"
  301. lines = 0
  302. # read and process the next line in the pipe
  303. # if server exits while we are reading, we will get
  304. # EOF and the func will return True - will also
  305. # return True if a plugin returns failure
  306. while not read_and_process_line(logf, plgfuncs):
  307. lines += 1
  308. # the other end of the pipe closed - we close our end too
  309. if debug:
  310. print "read", lines, "lines"
  311. logf.close()
  312. logf = None
  313. if debug:
  314. print "closed log pipe", logfname
  315. if not serverpid and options.serverpidfile:
  316. # see if the server has written its server pid file yet
  317. # it may take a "long time" for the server to actually
  318. # write its pid file
  319. serverpid = get_pid_from_file(options.serverpidfile)
  320. # if the server is no longer running, just finish
  321. if serverpid and not is_proc_alive(serverpid):
  322. done = True
  323. if debug:
  324. print "server pid", serverpid, "exited - script exiting"
  325. if neverdone:
  326. done = False
  327. elif not done:
  328. if not lines:
  329. # at startup the server will close the log and reopen it
  330. # when it does this lines will be 0 - this means we need
  331. # immediately attempt to reopen the log pipe and read it
  332. # however, at shutdown, the server will close the log before
  333. # the process has exited - so is_proc_alive will return
  334. # true for a short time - if we then attempt to open the
  335. # pipe, the open will hang forever - to avoid this situation
  336. # we set the alarm again to wake up the open - use a short
  337. # timeout so we don't wait a long time if the server
  338. # really is exiting
  339. signal.setitimer(signal.ITIMER_REAL, 0.25)
  340. timerisset = True
  341. if debug:
  342. print "set startup timer - see if server is really shut down"
  343. else: # we read something
  344. # pipe closed - usually when server shuts down
  345. done = True
  346. if not done and debug:
  347. print "log pipe", logfname, "closed - reopening - read", totallines, "total lines"
  348. finish()