# Licensed to Tomaz Muraus under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# Tomaz muraus licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import os
import subprocess
import time
import socket
import atexit
__all__ = [
'TCPProcessRunner'
]
[docs]class TCPProcessRunner(object):
"""
Represents a long running process which exposes a TCP interface
and should be running during the test execution.
"""
def __init__(self, args, wait_for_address, wait_for_timeout=10, cwd=None,
log_path='process.log'):
"""
:param args: Arguments passed to the subprocess.Popen.
:type args: ``list``
:param wait_for_address: IP address and port to which we will connect
to, to determine if the process is running.
:type wait_for_address: ``tuple`` (e.g. ``('127.0.0.1', 8080)``)
:param wait_for_timeout: How long to wait (in seconds) for the process
to start before giving up.
:type wait_for_timeout: ``float``
:param cwd: Working directory for the subprocess.Popen. (optional)
:type cwd: ``str``
:param log_path: Path to the log file where the process output will be
saved. (optional)
:type log_path: ``str``
"""
if not isinstance(args, (tuple, list)):
raise ValueError('args argument must be a list or a tuple')
if not isinstance(wait_for_address, (list, tuple)) or \
len(wait_for_address) != 2:
raise ValueError('wait_for_address must be a tuple with 2 '
'elements')
self._args = args or []
self._cwd = cwd or os.getcwd()
self._wait_for_address = wait_for_address
self._wait_for_timeout = wait_for_timeout
self._log_path = log_path
self._process = None
[docs] def setUp(self, *args, **kwargs):
"""
Start a managed process and wait for it to come online.
"""
env = os.environ.copy()
with open(self._log_path, 'a+') as log_fp:
self.process = subprocess.Popen(self._args, shell=False,
cwd=self._cwd, stdout=log_fp,
stderr=log_fp,
env=env)
self._wait_for_running(self._wait_for_address,
self._wait_for_timeout)
atexit.register(self.tearDown)
return self._process
def _wait_for_running(self, address, timeout=10):
"""
Wait for the process to come online.
:param address: IP address and port to which we will connect
to, to determine if the process is running.
:type address: ``tuple`` (e.g. ``('127.0.0.1', 8080)``)
:param timeout: How long to wait (in seconds) for the process to start
before giving up.
:type timeout: ``float``
"""
process = self.process
start = time.time()
while time.time() < start + timeout:
process.poll()
if process.returncode:
# Process exited early
msg = ('Process failed to start and exited with code: %s.\n'
'More info might be available in the following log '
'file: %s' % (process.returncode, self._log_path))
raise RuntimeError(msg)
try:
s = socket.create_connection(address)
s.close()
break
except:
time.sleep(0.5)
else:
process.poll()
if process and process.returncode is None:
process.terminate()
raise RuntimeError('Couldn\'t connect to server')
[docs] def tearDown(self, *args, **kwargs):
"""
Terminate the running process.
Note: This function does not need to be called manually. Once you call
:func:`setUp` function it automatically registers this function to run
on the process exit.
"""
if self.process:
self.process.terminate()