| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644 |
- #!/usr/bin/env python
- # encoding: utf-8
- #
- # Copyright (c) 2017 Dean Jackson <[email protected]>
- #
- # MIT Licence. See http://opensource.org/licenses/MIT
- #
- # Created on 2017-12-17
- #
- """A selection of helper functions useful for building workflows."""
- from __future__ import print_function, absolute_import
- import atexit
- from collections import namedtuple
- from contextlib import contextmanager
- import errno
- import fcntl
- import functools
- import json
- import os
- import signal
- import subprocess
- import sys
- from threading import Event
- import time
- # JXA scripts to call Alfred's API via the Scripting Bridge
- # {app} is automatically replaced with "Alfred 3" or
- # "com.runningwithcrayons.Alfred" depending on version.
- #
- # Open Alfred in search (regular) mode
- JXA_SEARCH = 'Application({app}).search({arg});'
- # Open Alfred's File Actions on an argument
- JXA_ACTION = 'Application({app}).action({arg});'
- # Open Alfred's navigation mode at path
- JXA_BROWSE = 'Application({app}).browse({arg});'
- # Set the specified theme
- JXA_SET_THEME = 'Application({app}).setTheme({arg});'
- # Call an External Trigger
- JXA_TRIGGER = 'Application({app}).runTrigger({arg}, {opts});'
- # Save a variable to the workflow configuration sheet/info.plist
- JXA_SET_CONFIG = 'Application({app}).setConfiguration({arg}, {opts});'
- # Delete a variable from the workflow configuration sheet/info.plist
- JXA_UNSET_CONFIG = 'Application({app}).removeConfiguration({arg}, {opts});'
- # Tell Alfred to reload a workflow from disk
- JXA_RELOAD_WORKFLOW = 'Application({app}).reloadWorkflow({arg});'
- class AcquisitionError(Exception):
- """Raised if a lock cannot be acquired."""
- AppInfo = namedtuple('AppInfo', ['name', 'path', 'bundleid'])
- """Information about an installed application.
- Returned by :func:`appinfo`. All attributes are Unicode.
- .. py:attribute:: name
- Name of the application, e.g. ``u'Safari'``.
- .. py:attribute:: path
- Path to the application bundle, e.g. ``u'/Applications/Safari.app'``.
- .. py:attribute:: bundleid
- Application's bundle ID, e.g. ``u'com.apple.Safari'``.
- """
- def jxa_app_name():
- """Return name of application to call currently running Alfred.
- .. versionadded: 1.37
- Returns 'Alfred 3' or 'com.runningwithcrayons.Alfred' depending
- on which version of Alfred is running.
- This name is suitable for use with ``Application(name)`` in JXA.
- Returns:
- unicode: Application name or ID.
- """
- if os.getenv('alfred_version', '').startswith('3'):
- # Alfred 3
- return u'Alfred 3'
- # Alfred 4+
- return u'com.runningwithcrayons.Alfred'
- def unicodify(s, encoding='utf-8', norm=None):
- """Ensure string is Unicode.
- .. versionadded:: 1.31
- Decode encoded strings using ``encoding`` and normalise Unicode
- to form ``norm`` if specified.
- Args:
- s (str): String to decode. May also be Unicode.
- encoding (str, optional): Encoding to use on bytestrings.
- norm (None, optional): Normalisation form to apply to Unicode string.
- Returns:
- unicode: Decoded, optionally normalised, Unicode string.
- """
- if not isinstance(s, unicode):
- s = unicode(s, encoding)
- if norm:
- from unicodedata import normalize
- s = normalize(norm, s)
- return s
- def utf8ify(s):
- """Ensure string is a bytestring.
- .. versionadded:: 1.31
- Returns `str` objects unchanced, encodes `unicode` objects to
- UTF-8, and calls :func:`str` on anything else.
- Args:
- s (object): A Python object
- Returns:
- str: UTF-8 string or string representation of s.
- """
- if isinstance(s, str):
- return s
- if isinstance(s, unicode):
- return s.encode('utf-8')
- return str(s)
- def applescriptify(s):
- """Escape string for insertion into an AppleScript string.
- .. versionadded:: 1.31
- Replaces ``"`` with `"& quote &"`. Use this function if you want
- to insert a string into an AppleScript script:
- >>> applescriptify('g "python" test')
- 'g " & quote & "python" & quote & "test'
- Args:
- s (unicode): Unicode string to escape.
- Returns:
- unicode: Escaped string.
- """
- return s.replace(u'"', u'" & quote & "')
- def run_command(cmd, **kwargs):
- """Run a command and return the output.
- .. versionadded:: 1.31
- A thin wrapper around :func:`subprocess.check_output` that ensures
- all arguments are encoded to UTF-8 first.
- Args:
- cmd (list): Command arguments to pass to :func:`~subprocess.check_output`.
- **kwargs: Keyword arguments to pass to :func:`~subprocess.check_output`.
- Returns:
- str: Output returned by :func:`~subprocess.check_output`.
- """
- cmd = [utf8ify(s) for s in cmd]
- return subprocess.check_output(cmd, **kwargs)
- def run_applescript(script, *args, **kwargs):
- """Execute an AppleScript script and return its output.
- .. versionadded:: 1.31
- Run AppleScript either by filepath or code. If ``script`` is a valid
- filepath, that script will be run, otherwise ``script`` is treated
- as code.
- Args:
- script (str, optional): Filepath of script or code to run.
- *args: Optional command-line arguments to pass to the script.
- **kwargs: Pass ``lang`` to run a language other than AppleScript.
- Any other keyword arguments are passed to :func:`run_command`.
- Returns:
- str: Output of run command.
- """
- lang = 'AppleScript'
- if 'lang' in kwargs:
- lang = kwargs['lang']
- del kwargs['lang']
- cmd = ['/usr/bin/osascript', '-l', lang]
- if os.path.exists(script):
- cmd += [script]
- else:
- cmd += ['-e', script]
- cmd.extend(args)
- return run_command(cmd, **kwargs)
- def run_jxa(script, *args):
- """Execute a JXA script and return its output.
- .. versionadded:: 1.31
- Wrapper around :func:`run_applescript` that passes ``lang=JavaScript``.
- Args:
- script (str): Filepath of script or code to run.
- *args: Optional command-line arguments to pass to script.
- Returns:
- str: Output of script.
- """
- return run_applescript(script, *args, lang='JavaScript')
- def run_trigger(name, bundleid=None, arg=None):
- """Call an Alfred External Trigger.
- .. versionadded:: 1.31
- If ``bundleid`` is not specified, the bundle ID of the calling
- workflow is used.
- Args:
- name (str): Name of External Trigger to call.
- bundleid (str, optional): Bundle ID of workflow trigger belongs to.
- arg (str, optional): Argument to pass to trigger.
- """
- bundleid = bundleid or os.getenv('alfred_workflow_bundleid')
- appname = jxa_app_name()
- opts = {'inWorkflow': bundleid}
- if arg:
- opts['withArgument'] = arg
- script = JXA_TRIGGER.format(app=json.dumps(appname),
- arg=json.dumps(name),
- opts=json.dumps(opts, sort_keys=True))
- run_applescript(script, lang='JavaScript')
- def set_theme(theme_name):
- """Change Alfred's theme.
- .. versionadded:: 1.39.0
- Args:
- theme_name (unicode): Name of theme Alfred should use.
- """
- appname = jxa_app_name()
- script = JXA_SET_THEME.format(app=json.dumps(appname),
- arg=json.dumps(theme_name))
- run_applescript(script, lang='JavaScript')
- def set_config(name, value, bundleid=None, exportable=False):
- """Set a workflow variable in ``info.plist``.
- .. versionadded:: 1.33
- If ``bundleid`` is not specified, the bundle ID of the calling
- workflow is used.
- Args:
- name (str): Name of variable to set.
- value (str): Value to set variable to.
- bundleid (str, optional): Bundle ID of workflow variable belongs to.
- exportable (bool, optional): Whether variable should be marked
- as exportable (Don't Export checkbox).
- """
- bundleid = bundleid or os.getenv('alfred_workflow_bundleid')
- appname = jxa_app_name()
- opts = {
- 'toValue': value,
- 'inWorkflow': bundleid,
- 'exportable': exportable,
- }
- script = JXA_SET_CONFIG.format(app=json.dumps(appname),
- arg=json.dumps(name),
- opts=json.dumps(opts, sort_keys=True))
- run_applescript(script, lang='JavaScript')
- def unset_config(name, bundleid=None):
- """Delete a workflow variable from ``info.plist``.
- .. versionadded:: 1.33
- If ``bundleid`` is not specified, the bundle ID of the calling
- workflow is used.
- Args:
- name (str): Name of variable to delete.
- bundleid (str, optional): Bundle ID of workflow variable belongs to.
- """
- bundleid = bundleid or os.getenv('alfred_workflow_bundleid')
- appname = jxa_app_name()
- opts = {'inWorkflow': bundleid}
- script = JXA_UNSET_CONFIG.format(app=json.dumps(appname),
- arg=json.dumps(name),
- opts=json.dumps(opts, sort_keys=True))
- run_applescript(script, lang='JavaScript')
- def search_in_alfred(query=None):
- """Open Alfred with given search query.
- .. versionadded:: 1.39.0
- Omit ``query`` to simply open Alfred's main window.
- Args:
- query (unicode, optional): Search query.
- """
- query = query or u''
- appname = jxa_app_name()
- script = JXA_SEARCH.format(app=json.dumps(appname), arg=json.dumps(query))
- run_applescript(script, lang='JavaScript')
- def browse_in_alfred(path):
- """Open Alfred's filesystem navigation mode at ``path``.
- .. versionadded:: 1.39.0
- Args:
- path (unicode): File or directory path.
- """
- appname = jxa_app_name()
- script = JXA_BROWSE.format(app=json.dumps(appname), arg=json.dumps(path))
- run_applescript(script, lang='JavaScript')
- def action_in_alfred(paths):
- """Action the give filepaths in Alfred.
- .. versionadded:: 1.39.0
- Args:
- paths (list): Unicode paths to files/directories to action.
- """
- appname = jxa_app_name()
- script = JXA_ACTION.format(app=json.dumps(appname), arg=json.dumps(paths))
- run_applescript(script, lang='JavaScript')
- def reload_workflow(bundleid=None):
- """Tell Alfred to reload a workflow from disk.
- .. versionadded:: 1.39.0
- If ``bundleid`` is not specified, the bundle ID of the calling
- workflow is used.
- Args:
- bundleid (unicode, optional): Bundle ID of workflow to reload.
- """
- bundleid = bundleid or os.getenv('alfred_workflow_bundleid')
- appname = jxa_app_name()
- script = JXA_RELOAD_WORKFLOW.format(app=json.dumps(appname),
- arg=json.dumps(bundleid))
- run_applescript(script, lang='JavaScript')
- def appinfo(name):
- """Get information about an installed application.
- .. versionadded:: 1.31
- Args:
- name (str): Name of application to look up.
- Returns:
- AppInfo: :class:`AppInfo` tuple or ``None`` if app isn't found.
- """
- cmd = [
- 'mdfind',
- '-onlyin', '/Applications',
- '-onlyin', '/System/Applications',
- '-onlyin', os.path.expanduser('~/Applications'),
- '(kMDItemContentTypeTree == com.apple.application &&'
- '(kMDItemDisplayName == "{0}" || kMDItemFSName == "{0}.app"))'
- .format(name)
- ]
- output = run_command(cmd).strip()
- if not output:
- return None
- path = output.split('\n')[0]
- cmd = ['mdls', '-raw', '-name', 'kMDItemCFBundleIdentifier', path]
- bid = run_command(cmd).strip()
- if not bid: # pragma: no cover
- return None
- return AppInfo(unicodify(name), unicodify(path), unicodify(bid))
- @contextmanager
- def atomic_writer(fpath, mode):
- """Atomic file writer.
- .. versionadded:: 1.12
- Context manager that ensures the file is only written if the write
- succeeds. The data is first written to a temporary file.
- :param fpath: path of file to write to.
- :type fpath: ``unicode``
- :param mode: sames as for :func:`open`
- :type mode: string
- """
- suffix = '.{}.tmp'.format(os.getpid())
- temppath = fpath + suffix
- with open(temppath, mode) as fp:
- try:
- yield fp
- os.rename(temppath, fpath)
- finally:
- try:
- os.remove(temppath)
- except (OSError, IOError):
- pass
- class LockFile(object):
- """Context manager to protect filepaths with lockfiles.
- .. versionadded:: 1.13
- Creates a lockfile alongside ``protected_path``. Other ``LockFile``
- instances will refuse to lock the same path.
- >>> path = '/path/to/file'
- >>> with LockFile(path):
- >>> with open(path, 'wb') as fp:
- >>> fp.write(data)
- Args:
- protected_path (unicode): File to protect with a lockfile
- timeout (float, optional): Raises an :class:`AcquisitionError`
- if lock cannot be acquired within this number of seconds.
- If ``timeout`` is 0 (the default), wait forever.
- delay (float, optional): How often to check (in seconds) if
- lock has been released.
- Attributes:
- delay (float): How often to check (in seconds) whether the lock
- can be acquired.
- lockfile (unicode): Path of the lockfile.
- timeout (float): How long to wait to acquire the lock.
- """
- def __init__(self, protected_path, timeout=0.0, delay=0.05):
- """Create new :class:`LockFile` object."""
- self.lockfile = protected_path + '.lock'
- self._lockfile = None
- self.timeout = timeout
- self.delay = delay
- self._lock = Event()
- atexit.register(self.release)
- @property
- def locked(self):
- """``True`` if file is locked by this instance."""
- return self._lock.is_set()
- def acquire(self, blocking=True):
- """Acquire the lock if possible.
- If the lock is in use and ``blocking`` is ``False``, return
- ``False``.
- Otherwise, check every :attr:`delay` seconds until it acquires
- lock or exceeds attr:`timeout` and raises an :class:`AcquisitionError`.
- """
- if self.locked and not blocking:
- return False
- start = time.time()
- while True:
- # Raise error if we've been waiting too long to acquire the lock
- if self.timeout and (time.time() - start) >= self.timeout:
- raise AcquisitionError('lock acquisition timed out')
- # If already locked, wait then try again
- if self.locked:
- time.sleep(self.delay)
- continue
- # Create in append mode so we don't lose any contents
- if self._lockfile is None:
- self._lockfile = open(self.lockfile, 'a')
- # Try to acquire the lock
- try:
- fcntl.lockf(self._lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
- self._lock.set()
- break
- except IOError as err: # pragma: no cover
- if err.errno not in (errno.EACCES, errno.EAGAIN):
- raise
- # Don't try again
- if not blocking: # pragma: no cover
- return False
- # Wait, then try again
- time.sleep(self.delay)
- return True
- def release(self):
- """Release the lock by deleting `self.lockfile`."""
- if not self._lock.is_set():
- return False
- try:
- fcntl.lockf(self._lockfile, fcntl.LOCK_UN)
- except IOError: # pragma: no cover
- pass
- finally:
- self._lock.clear()
- self._lockfile = None
- try:
- os.unlink(self.lockfile)
- except (IOError, OSError): # pragma: no cover
- pass
- return True
- def __enter__(self):
- """Acquire lock."""
- self.acquire()
- return self
- def __exit__(self, typ, value, traceback):
- """Release lock."""
- self.release()
- def __del__(self):
- """Clear up `self.lockfile`."""
- self.release() # pragma: no cover
- class uninterruptible(object):
- """Decorator that postpones SIGTERM until wrapped function returns.
- .. versionadded:: 1.12
- .. important:: This decorator is NOT thread-safe.
- As of version 2.7, Alfred allows Script Filters to be killed. If
- your workflow is killed in the middle of critical code (e.g.
- writing data to disk), this may corrupt your workflow's data.
- Use this decorator to wrap critical functions that *must* complete.
- If the script is killed while a wrapped function is executing,
- the SIGTERM will be caught and handled after your function has
- finished executing.
- Alfred-Workflow uses this internally to ensure its settings, data
- and cache writes complete.
- """
- def __init__(self, func, class_name=''):
- """Decorate `func`."""
- self.func = func
- functools.update_wrapper(self, func)
- self._caught_signal = None
- def signal_handler(self, signum, frame):
- """Called when process receives SIGTERM."""
- self._caught_signal = (signum, frame)
- def __call__(self, *args, **kwargs):
- """Trap ``SIGTERM`` and call wrapped function."""
- self._caught_signal = None
- # Register handler for SIGTERM, then call `self.func`
- self.old_signal_handler = signal.getsignal(signal.SIGTERM)
- signal.signal(signal.SIGTERM, self.signal_handler)
- self.func(*args, **kwargs)
- # Restore old signal handler
- signal.signal(signal.SIGTERM, self.old_signal_handler)
- # Handle any signal caught during execution
- if self._caught_signal is not None:
- signum, frame = self._caught_signal
- if callable(self.old_signal_handler):
- self.old_signal_handler(signum, frame)
- elif self.old_signal_handler == signal.SIG_DFL:
- sys.exit(0)
- def __get__(self, obj=None, klass=None):
- """Decorator API."""
- return self.__class__(self.func.__get__(obj, klass),
- klass.__name__)
|