libkas: Provide asynchronous interface for command execution

Enhance _stream_subprocess to run_cmd_async, a co-routine variant of
run_cmd that the caller can use to parallelize command execution.
run_cmd becomes a simple wrapper that waits for the async variant to
complete.

Signed-off-by: Jan Kiszka <jan.kiszka@siemens.com>
This commit is contained in:
Jan Kiszka 2017-06-28 12:43:55 +02:00 committed by Daniel Wagner
parent dddbdb0112
commit 8e4fbd6d3f

View File

@ -81,13 +81,20 @@ def _read_stream(stream, callback):
@asyncio.coroutine @asyncio.coroutine
def _stream_subprocess(cmd, cwd, env, shell, stdout_cb, stderr_cb): def run_cmd_async(cmd, cwd, env=None, fail=True, shell=False, liveupdate=True):
""" """
This function starts the subprocess, sets up the output stream Run a command asynchronously.
handlers and waits until the process has existed
""" """
# pylint: disable=too-many-arguments # pylint: disable=too-many-arguments
env = env or {}
cmdstr = cmd
if not shell:
cmdstr = ' '.join(cmd)
logging.info('%s$ %s', cwd, cmdstr)
logo = LogOutput(liveupdate)
if shell: if shell:
process = yield from asyncio.create_subprocess_shell( process = yield from asyncio.create_subprocess_shell(
cmd, cmd,
@ -105,40 +112,30 @@ def _stream_subprocess(cmd, cwd, env, shell, stdout_cb, stderr_cb):
stderr=asyncio.subprocess.PIPE) stderr=asyncio.subprocess.PIPE)
yield from asyncio.wait([ yield from asyncio.wait([
_read_stream(process.stdout, stdout_cb), _read_stream(process.stdout, logo.log_stdout),
_read_stream(process.stderr, stderr_cb) _read_stream(process.stderr, logo.log_stderr)
]) ])
ret = yield from process.wait() ret = yield from process.wait()
return ret
if ret and fail:
def run_cmd(cmd, cwd, env=None, fail=True, shell=False, liveupdate=True):
"""
Starts a command.
"""
# pylint: disable=too-many-arguments
env = env or {}
cmdstr = cmd
if not shell:
cmdstr = ' '.join(cmd)
logging.info('%s$ %s', cwd, cmdstr)
logo = LogOutput(liveupdate)
loop = asyncio.get_event_loop()
retc = loop.run_until_complete(
_stream_subprocess(cmd, cwd, env, shell,
logo.log_stdout, logo.log_stderr))
if retc and fail:
msg = 'Command "{cwd}$ {cmd}" failed\n'.format(cwd=cwd, cmd=cmdstr) msg = 'Command "{cwd}$ {cmd}" failed\n'.format(cwd=cwd, cmd=cmdstr)
for line in logo.stderr: for line in logo.stderr:
msg += line msg += line
logging.error(msg) logging.error(msg)
sys.exit(retc) sys.exit(ret)
return (retc, ''.join(logo.stdout)) return (ret, ''.join(logo.stdout))
def run_cmd(cmd, cwd, env=None, fail=True, shell=False, liveupdate=True):
"""
Runs a command synchronously.
"""
# pylint: disable=too-many-arguments
loop = asyncio.get_event_loop()
return loop.run_until_complete(
run_cmd_async(cmd, cwd, env, fail, shell, liveupdate))
def find_program(paths, name): def find_program(paths, name):