| #!/usr/bin/env python |
| # -*- coding: utf-8 -*- |
| |
| # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """ |
| psutil test suite. Run it with: |
| $ make test |
| |
| If you're on Python < 2.7 unittest2 module must be installed first: |
| https://pypi.python.org/pypi/unittest2 |
| """ |
| |
| from __future__ import division |
| |
| import ast |
| import atexit |
| import collections |
| import contextlib |
| import datetime |
| import errno |
| import functools |
| import imp |
| import json |
| import os |
| import pickle |
| import pprint |
| import re |
| import select |
| import shutil |
| import signal |
| import socket |
| import stat |
| import subprocess |
| import sys |
| import tempfile |
| import textwrap |
| import threading |
| import time |
| import traceback |
| import types |
| import warnings |
| from socket import AF_INET, SOCK_STREAM, SOCK_DGRAM |
| try: |
| import ipaddress # python >= 3.3 |
| except ImportError: |
| ipaddress = None |
| try: |
| from unittest import mock # py3 |
| except ImportError: |
| import mock # requires "pip install mock" |
| |
| import psutil |
| from psutil._compat import PY3, callable, long, unicode |
| |
| if sys.version_info < (2, 7): |
| import unittest2 as unittest # https://pypi.python.org/pypi/unittest2 |
| else: |
| import unittest |
| if sys.version_info >= (3, 4): |
| import enum |
| else: |
| enum = None |
| |
| |
| # =================================================================== |
| # --- Constants |
| # =================================================================== |
| |
| # conf for retry_before_failing() decorator |
| NO_RETRIES = 10 |
| # bytes tolerance for OS memory related tests |
| TOLERANCE = 500 * 1024 # 500KB |
| # the timeout used in functions which have to wait |
| GLOBAL_TIMEOUT = 3 |
| |
| AF_INET6 = getattr(socket, "AF_INET6") |
| AF_UNIX = getattr(socket, "AF_UNIX", None) |
| PYTHON = os.path.realpath(sys.executable) |
| DEVNULL = open(os.devnull, 'r+') |
| TESTFN = os.path.join(os.getcwd(), "$testfile") |
| TESTFN_UNICODE = TESTFN + "ƒőő" |
| TESTFILE_PREFIX = 'psutil-test-suite-' |
| if not PY3: |
| try: |
| TESTFN_UNICODE = unicode(TESTFN_UNICODE, sys.getfilesystemencoding()) |
| except UnicodeDecodeError: |
| TESTFN_UNICODE = TESTFN + "???" |
| |
| EXAMPLES_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), |
| '..', 'examples')) |
| |
| POSIX = os.name == 'posix' |
| WINDOWS = os.name == 'nt' |
| if WINDOWS: |
| WIN_VISTA = (6, 0, 0) |
| LINUX = sys.platform.startswith("linux") |
| OSX = sys.platform.startswith("darwin") |
| BSD = sys.platform.startswith("freebsd") |
| SUNOS = sys.platform.startswith("sunos") |
| VALID_PROC_STATUSES = [getattr(psutil, x) for x in dir(psutil) |
| if x.startswith('STATUS_')] |
| # whether we're running this test suite on Travis (https://travis-ci.org/) |
| TRAVIS = bool(os.environ.get('TRAVIS')) |
| # whether we're running this test suite on Appveyor for Windows |
| # (http://www.appveyor.com/) |
| APPVEYOR = bool(os.environ.get('APPVEYOR')) |
| |
| if TRAVIS or 'tox' in sys.argv[0]: |
| import ipaddress |
| if TRAVIS or APPVEYOR: |
| GLOBAL_TIMEOUT = GLOBAL_TIMEOUT * 4 |
| |
| |
| # =================================================================== |
| # --- Utility functions |
| # =================================================================== |
| |
| def cleanup(): |
| reap_children(search_all=True) |
| safe_remove(TESTFN) |
| try: |
| safe_rmdir(TESTFN_UNICODE) |
| except UnicodeEncodeError: |
| pass |
| for path in _testfiles: |
| safe_remove(path) |
| |
| atexit.register(cleanup) |
| atexit.register(lambda: DEVNULL.close()) |
| |
| |
| _subprocesses_started = set() |
| |
| |
| def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, |
| stdin=DEVNULL, wait=False): |
| """Return a subprocess.Popen object to use in tests. |
| By default stdout and stderr are redirected to /dev/null and the |
| python interpreter is used as test process. |
| If 'wait' is True attemps to make sure the process is in a |
| reasonably initialized state. |
| """ |
| if cmd is None: |
| pyline = "" |
| if wait: |
| pyline += "open(r'%s', 'w'); " % TESTFN |
| pyline += "import time; time.sleep(60);" |
| cmd_ = [PYTHON, "-c", pyline] |
| else: |
| cmd_ = cmd |
| sproc = subprocess.Popen(cmd_, stdout=stdout, stderr=stderr, stdin=stdin) |
| if wait: |
| if cmd is None: |
| stop_at = time.time() + 3 |
| while stop_at > time.time(): |
| if os.path.exists(TESTFN): |
| break |
| time.sleep(0.001) |
| else: |
| warn("couldn't make sure test file was actually created") |
| else: |
| wait_for_pid(sproc.pid) |
| _subprocesses_started.add(psutil.Process(sproc.pid)) |
| return sproc |
| |
| |
| _testfiles = [] |
| |
| |
| def pyrun(src): |
| """Run python code 'src' in a separate interpreter. |
| Return interpreter subprocess. |
| """ |
| if PY3: |
| src = bytes(src, 'ascii') |
| with tempfile.NamedTemporaryFile( |
| prefix=TESTFILE_PREFIX, delete=False) as f: |
| _testfiles.append(f.name) |
| f.write(src) |
| f.flush() |
| subp = get_test_subprocess([PYTHON, f.name], stdout=None, |
| stderr=None) |
| wait_for_pid(subp.pid) |
| return subp |
| |
| |
| def warn(msg): |
| """Raise a warning msg.""" |
| warnings.warn(msg, UserWarning) |
| |
| |
| def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE): |
| """run cmd in a subprocess and return its output. |
| raises RuntimeError on error. |
| """ |
| p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr) |
| stdout, stderr = p.communicate() |
| if p.returncode != 0: |
| raise RuntimeError(stderr) |
| if stderr: |
| warn(stderr) |
| if PY3: |
| stdout = str(stdout, sys.stdout.encoding) |
| return stdout.strip() |
| |
| |
| def which(program): |
| """Same as UNIX which command. Return None on command not found.""" |
| def is_exe(fpath): |
| return os.path.isfile(fpath) and os.access(fpath, os.X_OK) |
| |
| fpath, fname = os.path.split(program) |
| if fpath: |
| if is_exe(program): |
| return program |
| else: |
| for path in os.environ["PATH"].split(os.pathsep): |
| exe_file = os.path.join(path, program) |
| if is_exe(exe_file): |
| return exe_file |
| return None |
| |
| |
| if POSIX: |
| def get_kernel_version(): |
| """Return a tuple such as (2, 6, 36).""" |
| s = "" |
| uname = os.uname()[2] |
| for c in uname: |
| if c.isdigit() or c == '.': |
| s += c |
| else: |
| break |
| if not s: |
| raise ValueError("can't parse %r" % uname) |
| minor = 0 |
| micro = 0 |
| nums = s.split('.') |
| major = int(nums[0]) |
| if len(nums) >= 2: |
| minor = int(nums[1]) |
| if len(nums) >= 3: |
| micro = int(nums[2]) |
| return (major, minor, micro) |
| |
| |
| if LINUX: |
| RLIMIT_SUPPORT = get_kernel_version() >= (2, 6, 36) |
| else: |
| RLIMIT_SUPPORT = False |
| |
| |
| def wait_for_pid(pid, timeout=GLOBAL_TIMEOUT): |
| """Wait for pid to show up in the process list then return. |
| Used in the test suite to give time the sub process to initialize. |
| """ |
| raise_at = time.time() + timeout |
| while True: |
| if pid in psutil.pids(): |
| # give it one more iteration to allow full initialization |
| time.sleep(0.01) |
| return |
| time.sleep(0.0001) |
| if time.time() >= raise_at: |
| raise RuntimeError("Timed out") |
| |
| |
| def wait_for_file(fname, timeout=GLOBAL_TIMEOUT, delete_file=True): |
| """Wait for a file to be written on disk.""" |
| stop_at = time.time() + 3 |
| while time.time() < stop_at: |
| try: |
| with open(fname, "r") as f: |
| data = f.read() |
| if not data: |
| continue |
| if delete_file: |
| os.remove(fname) |
| return data |
| except IOError: |
| time.sleep(0.001) |
| raise RuntimeError("timed out (couldn't read file)") |
| |
| |
| def reap_children(search_all=False): |
| """Kill any subprocess started by this test suite and ensure that |
| no zombies stick around to hog resources and create problems when |
| looking for refleaks. |
| """ |
| global _subprocesses_started |
| procs = _subprocesses_started.copy() |
| if search_all: |
| this_process = psutil.Process() |
| for p in this_process.children(recursive=True): |
| procs.add(p) |
| for p in procs: |
| try: |
| p.terminate() |
| except psutil.NoSuchProcess: |
| pass |
| gone, alive = psutil.wait_procs(procs, timeout=GLOBAL_TIMEOUT) |
| for p in alive: |
| warn("couldn't terminate process %s" % p) |
| try: |
| p.kill() |
| except psutil.NoSuchProcess: |
| pass |
| _, alive = psutil.wait_procs(alive, timeout=GLOBAL_TIMEOUT) |
| if alive: |
| warn("couldn't not kill processes %s" % str(alive)) |
| _subprocesses_started = set(alive) |
| |
| |
| def check_ip_address(addr, family): |
| """Attempts to check IP address's validity.""" |
| if enum and PY3: |
| assert isinstance(family, enum.IntEnum), family |
| if family == AF_INET: |
| octs = [int(x) for x in addr.split('.')] |
| assert len(octs) == 4, addr |
| for num in octs: |
| assert 0 <= num <= 255, addr |
| if ipaddress: |
| if not PY3: |
| addr = unicode(addr) |
| ipaddress.IPv4Address(addr) |
| elif family == AF_INET6: |
| assert isinstance(addr, str), addr |
| if ipaddress: |
| if not PY3: |
| addr = unicode(addr) |
| ipaddress.IPv6Address(addr) |
| elif family == psutil.AF_LINK: |
| assert re.match('([a-fA-F0-9]{2}[:|\-]?){6}', addr) is not None, addr |
| else: |
| raise ValueError("unknown family %r", family) |
| |
| |
| def check_connection_ntuple(conn): |
| """Check validity of a connection namedtuple.""" |
| valid_conn_states = [getattr(psutil, x) for x in dir(psutil) if |
| x.startswith('CONN_')] |
| assert conn[0] == conn.fd |
| assert conn[1] == conn.family |
| assert conn[2] == conn.type |
| assert conn[3] == conn.laddr |
| assert conn[4] == conn.raddr |
| assert conn[5] == conn.status |
| assert conn.type in (SOCK_STREAM, SOCK_DGRAM), repr(conn.type) |
| assert conn.family in (AF_INET, AF_INET6, AF_UNIX), repr(conn.family) |
| assert conn.status in valid_conn_states, conn.status |
| |
| # check IP address and port sanity |
| for addr in (conn.laddr, conn.raddr): |
| if not addr: |
| continue |
| if conn.family in (AF_INET, AF_INET6): |
| assert isinstance(addr, tuple), addr |
| ip, port = addr |
| assert isinstance(port, int), port |
| assert 0 <= port <= 65535, port |
| check_ip_address(ip, conn.family) |
| elif conn.family == AF_UNIX: |
| assert isinstance(addr, (str, None)), addr |
| else: |
| raise ValueError("unknown family %r", conn.family) |
| |
| if conn.family in (AF_INET, AF_INET6): |
| # actually try to bind the local socket; ignore IPv6 |
| # sockets as their address might be represented as |
| # an IPv4-mapped-address (e.g. "::127.0.0.1") |
| # and that's rejected by bind() |
| if conn.family == AF_INET: |
| s = socket.socket(conn.family, conn.type) |
| with contextlib.closing(s): |
| try: |
| s.bind((conn.laddr[0], 0)) |
| except socket.error as err: |
| if err.errno != errno.EADDRNOTAVAIL: |
| raise |
| elif conn.family == AF_UNIX: |
| assert not conn.raddr, repr(conn.raddr) |
| assert conn.status == psutil.CONN_NONE, conn.status |
| |
| if getattr(conn, 'fd', -1) != -1: |
| assert conn.fd > 0, conn |
| if hasattr(socket, 'fromfd') and not WINDOWS: |
| try: |
| dupsock = socket.fromfd(conn.fd, conn.family, conn.type) |
| except (socket.error, OSError) as err: |
| if err.args[0] != errno.EBADF: |
| raise |
| else: |
| with contextlib.closing(dupsock): |
| assert dupsock.family == conn.family |
| assert dupsock.type == conn.type |
| |
| |
| def safe_remove(file): |
| "Convenience function for removing temporary test files" |
| try: |
| os.remove(file) |
| except OSError as err: |
| if err.errno != errno.ENOENT: |
| # file is being used by another process |
| if WINDOWS and isinstance(err, WindowsError) and err.errno == 13: |
| return |
| raise |
| |
| |
| def safe_rmdir(dir): |
| "Convenience function for removing temporary test directories" |
| try: |
| os.rmdir(dir) |
| except OSError as err: |
| if err.errno != errno.ENOENT: |
| raise |
| |
| |
| def call_until(fun, expr, timeout=GLOBAL_TIMEOUT): |
| """Keep calling function for timeout secs and exit if eval() |
| expression is True. |
| """ |
| stop_at = time.time() + timeout |
| while time.time() < stop_at: |
| ret = fun() |
| if eval(expr): |
| return ret |
| time.sleep(0.001) |
| raise RuntimeError('timed out (ret=%r)' % ret) |
| |
| |
| def retry_before_failing(ntimes=None): |
| """Decorator which runs a test function and retries N times before |
| actually failing. |
| """ |
| def decorator(fun): |
| @functools.wraps(fun) |
| def wrapper(*args, **kwargs): |
| for x in range(ntimes or NO_RETRIES): |
| try: |
| return fun(*args, **kwargs) |
| except AssertionError: |
| pass |
| raise |
| return wrapper |
| return decorator |
| |
| |
| def skip_on_access_denied(only_if=None): |
| """Decorator to Ignore AccessDenied exceptions.""" |
| def decorator(fun): |
| @functools.wraps(fun) |
| def wrapper(*args, **kwargs): |
| try: |
| return fun(*args, **kwargs) |
| except psutil.AccessDenied: |
| if only_if is not None: |
| if not only_if: |
| raise |
| msg = "%r was skipped because it raised AccessDenied" \ |
| % fun.__name__ |
| raise unittest.SkipTest(msg) |
| return wrapper |
| return decorator |
| |
| |
| def skip_on_not_implemented(only_if=None): |
| """Decorator to Ignore NotImplementedError exceptions.""" |
| def decorator(fun): |
| @functools.wraps(fun) |
| def wrapper(*args, **kwargs): |
| try: |
| return fun(*args, **kwargs) |
| except NotImplementedError: |
| if only_if is not None: |
| if not only_if: |
| raise |
| msg = "%r was skipped because it raised NotImplementedError" \ |
| % fun.__name__ |
| raise unittest.SkipTest(msg) |
| return wrapper |
| return decorator |
| |
| |
| def supports_ipv6(): |
| """Return True if IPv6 is supported on this platform.""" |
| if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): |
| return False |
| sock = None |
| try: |
| sock = socket.socket(AF_INET6, SOCK_STREAM) |
| sock.bind(("::1", 0)) |
| except (socket.error, socket.gaierror): |
| return False |
| else: |
| return True |
| finally: |
| if sock is not None: |
| sock.close() |
| |
| |
| if WINDOWS: |
| def get_winver(): |
| wv = sys.getwindowsversion() |
| if hasattr(wv, 'service_pack_major'): # python >= 2.7 |
| sp = wv.service_pack_major or 0 |
| else: |
| r = re.search("\s\d$", wv[4]) |
| if r: |
| sp = int(r.group(0)) |
| else: |
| sp = 0 |
| return (wv[0], wv[1], sp) |
| |
| |
| class ThreadTask(threading.Thread): |
| """A thread object used for running process thread tests.""" |
| |
| def __init__(self): |
| threading.Thread.__init__(self) |
| self._running = False |
| self._interval = None |
| self._flag = threading.Event() |
| |
| def __repr__(self): |
| name = self.__class__.__name__ |
| return '<%s running=%s at %#x>' % (name, self._running, id(self)) |
| |
| def start(self, interval=0.001): |
| """Start thread and keep it running until an explicit |
| stop() request. Polls for shutdown every 'timeout' seconds. |
| """ |
| if self._running: |
| raise ValueError("already started") |
| self._interval = interval |
| threading.Thread.start(self) |
| self._flag.wait() |
| |
| def run(self): |
| self._running = True |
| self._flag.set() |
| while self._running: |
| time.sleep(self._interval) |
| |
| def stop(self): |
| """Stop thread execution and and waits until it is stopped.""" |
| if not self._running: |
| raise ValueError("already stopped") |
| self._running = False |
| self.join() |
| |
| |
| # =================================================================== |
| # --- System-related API tests |
| # =================================================================== |
| |
| class TestSystemAPIs(unittest.TestCase): |
| """Tests for system-related APIs.""" |
| |
| def setUp(self): |
| safe_remove(TESTFN) |
| |
| def tearDown(self): |
| reap_children() |
| |
| def test_process_iter(self): |
| self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()]) |
| sproc = get_test_subprocess() |
| self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()]) |
| p = psutil.Process(sproc.pid) |
| p.kill() |
| p.wait() |
| self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()]) |
| |
| def test_wait_procs(self): |
| def callback(p): |
| l.append(p.pid) |
| |
| l = [] |
| sproc1 = get_test_subprocess() |
| sproc2 = get_test_subprocess() |
| sproc3 = get_test_subprocess() |
| procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] |
| self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1) |
| self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1) |
| t = time.time() |
| gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback) |
| |
| self.assertLess(time.time() - t, 0.5) |
| self.assertEqual(gone, []) |
| self.assertEqual(len(alive), 3) |
| self.assertEqual(l, []) |
| for p in alive: |
| self.assertFalse(hasattr(p, 'returncode')) |
| |
| @retry_before_failing(30) |
| def test(procs, callback): |
| gone, alive = psutil.wait_procs(procs, timeout=0.03, |
| callback=callback) |
| self.assertEqual(len(gone), 1) |
| self.assertEqual(len(alive), 2) |
| return gone, alive |
| |
| sproc3.terminate() |
| gone, alive = test(procs, callback) |
| self.assertIn(sproc3.pid, [x.pid for x in gone]) |
| if POSIX: |
| self.assertEqual(gone.pop().returncode, signal.SIGTERM) |
| else: |
| self.assertEqual(gone.pop().returncode, 1) |
| self.assertEqual(l, [sproc3.pid]) |
| for p in alive: |
| self.assertFalse(hasattr(p, 'returncode')) |
| |
| @retry_before_failing(30) |
| def test(procs, callback): |
| gone, alive = psutil.wait_procs(procs, timeout=0.03, |
| callback=callback) |
| self.assertEqual(len(gone), 3) |
| self.assertEqual(len(alive), 0) |
| return gone, alive |
| |
| sproc1.terminate() |
| sproc2.terminate() |
| gone, alive = test(procs, callback) |
| self.assertEqual(set(l), set([sproc1.pid, sproc2.pid, sproc3.pid])) |
| for p in gone: |
| self.assertTrue(hasattr(p, 'returncode')) |
| |
| def test_wait_procs_no_timeout(self): |
| sproc1 = get_test_subprocess() |
| sproc2 = get_test_subprocess() |
| sproc3 = get_test_subprocess() |
| procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] |
| for p in procs: |
| p.terminate() |
| gone, alive = psutil.wait_procs(procs) |
| |
| def test_boot_time(self): |
| bt = psutil.boot_time() |
| self.assertIsInstance(bt, float) |
| self.assertGreater(bt, 0) |
| self.assertLess(bt, time.time()) |
| |
| @unittest.skipUnless(POSIX, 'posix only') |
| def test_PAGESIZE(self): |
| # pagesize is used internally to perform different calculations |
| # and it's determined by using SC_PAGE_SIZE; make sure |
| # getpagesize() returns the same value. |
| import resource |
| self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize()) |
| |
| def test_virtual_memory(self): |
| mem = psutil.virtual_memory() |
| assert mem.total > 0, mem |
| assert mem.available > 0, mem |
| assert 0 <= mem.percent <= 100, mem |
| assert mem.used > 0, mem |
| assert mem.free >= 0, mem |
| for name in mem._fields: |
| value = getattr(mem, name) |
| if name != 'percent': |
| self.assertIsInstance(value, (int, long)) |
| if name != 'total': |
| if not value >= 0: |
| self.fail("%r < 0 (%s)" % (name, value)) |
| if value > mem.total: |
| self.fail("%r > total (total=%s, %s=%s)" |
| % (name, mem.total, name, value)) |
| |
| def test_swap_memory(self): |
| mem = psutil.swap_memory() |
| assert mem.total >= 0, mem |
| assert mem.used >= 0, mem |
| if mem.total > 0: |
| # likely a system with no swap partition |
| assert mem.free > 0, mem |
| else: |
| assert mem.free == 0, mem |
| assert 0 <= mem.percent <= 100, mem |
| assert mem.sin >= 0, mem |
| assert mem.sout >= 0, mem |
| |
| def test_pid_exists(self): |
| sproc = get_test_subprocess(wait=True) |
| self.assertTrue(psutil.pid_exists(sproc.pid)) |
| p = psutil.Process(sproc.pid) |
| p.kill() |
| p.wait() |
| self.assertFalse(psutil.pid_exists(sproc.pid)) |
| self.assertFalse(psutil.pid_exists(-1)) |
| self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids()) |
| # pid 0 |
| psutil.pid_exists(0) == 0 in psutil.pids() |
| |
| def test_pid_exists_2(self): |
| reap_children() |
| pids = psutil.pids() |
| for pid in pids: |
| try: |
| assert psutil.pid_exists(pid) |
| except AssertionError: |
| # in case the process disappeared in meantime fail only |
| # if it is no longer in psutil.pids() |
| time.sleep(.1) |
| if pid in psutil.pids(): |
| self.fail(pid) |
| pids = range(max(pids) + 5000, max(pids) + 6000) |
| for pid in pids: |
| self.assertFalse(psutil.pid_exists(pid), msg=pid) |
| |
| def test_pids(self): |
| plist = [x.pid for x in psutil.process_iter()] |
| pidlist = psutil.pids() |
| self.assertEqual(plist.sort(), pidlist.sort()) |
| # make sure every pid is unique |
| self.assertEqual(len(pidlist), len(set(pidlist))) |
| |
| def test_test(self): |
| # test for psutil.test() function |
| stdout = sys.stdout |
| sys.stdout = DEVNULL |
| try: |
| psutil.test() |
| finally: |
| sys.stdout = stdout |
| |
| def test_cpu_count(self): |
| logical = psutil.cpu_count() |
| self.assertEqual(logical, len(psutil.cpu_times(percpu=True))) |
| self.assertGreaterEqual(logical, 1) |
| # |
| if LINUX: |
| with open("/proc/cpuinfo") as fd: |
| cpuinfo_data = fd.read() |
| if "physical id" not in cpuinfo_data: |
| raise unittest.SkipTest("cpuinfo doesn't include physical id") |
| physical = psutil.cpu_count(logical=False) |
| self.assertGreaterEqual(physical, 1) |
| self.assertGreaterEqual(logical, physical) |
| |
| def test_sys_cpu_times(self): |
| total = 0 |
| times = psutil.cpu_times() |
| sum(times) |
| for cp_time in times: |
| self.assertIsInstance(cp_time, float) |
| self.assertGreaterEqual(cp_time, 0.0) |
| total += cp_time |
| self.assertEqual(total, sum(times)) |
| str(times) |
| if not WINDOWS: |
| # CPU times are always supposed to increase over time or |
| # remain the same but never go backwards, see: |
| # https://github.com/giampaolo/psutil/issues/392 |
| last = psutil.cpu_times() |
| for x in range(100): |
| new = psutil.cpu_times() |
| for field in new._fields: |
| new_t = getattr(new, field) |
| last_t = getattr(last, field) |
| self.assertGreaterEqual(new_t, last_t, |
| msg="%s %s" % (new_t, last_t)) |
| last = new |
| |
| def test_sys_cpu_times2(self): |
| t1 = sum(psutil.cpu_times()) |
| time.sleep(0.1) |
| t2 = sum(psutil.cpu_times()) |
| difference = t2 - t1 |
| if not difference >= 0.05: |
| self.fail("difference %s" % difference) |
| |
| def test_sys_per_cpu_times(self): |
| for times in psutil.cpu_times(percpu=True): |
| total = 0 |
| sum(times) |
| for cp_time in times: |
| self.assertIsInstance(cp_time, float) |
| self.assertGreaterEqual(cp_time, 0.0) |
| total += cp_time |
| self.assertEqual(total, sum(times)) |
| str(times) |
| self.assertEqual(len(psutil.cpu_times(percpu=True)[0]), |
| len(psutil.cpu_times(percpu=False))) |
| |
| # Note: in theory CPU times are always supposed to increase over |
| # time or remain the same but never go backwards. In practice |
| # sometimes this is not the case. |
| # This issue seemd to be afflict Windows: |
| # https://github.com/giampaolo/psutil/issues/392 |
| # ...but it turns out also Linux (rarely) behaves the same. |
| # last = psutil.cpu_times(percpu=True) |
| # for x in range(100): |
| # new = psutil.cpu_times(percpu=True) |
| # for index in range(len(new)): |
| # newcpu = new[index] |
| # lastcpu = last[index] |
| # for field in newcpu._fields: |
| # new_t = getattr(newcpu, field) |
| # last_t = getattr(lastcpu, field) |
| # self.assertGreaterEqual( |
| # new_t, last_t, msg="%s %s" % (lastcpu, newcpu)) |
| # last = new |
| |
| def test_sys_per_cpu_times_2(self): |
| tot1 = psutil.cpu_times(percpu=True) |
| stop_at = time.time() + 0.1 |
| while True: |
| if time.time() >= stop_at: |
| break |
| tot2 = psutil.cpu_times(percpu=True) |
| for t1, t2 in zip(tot1, tot2): |
| t1, t2 = sum(t1), sum(t2) |
| difference = t2 - t1 |
| if difference >= 0.05: |
| return |
| self.fail() |
| |
| def _test_cpu_percent(self, percent, last_ret, new_ret): |
| try: |
| self.assertIsInstance(percent, float) |
| self.assertGreaterEqual(percent, 0.0) |
| self.assertIsNot(percent, -0.0) |
| self.assertLessEqual(percent, 100.0 * psutil.cpu_count()) |
| except AssertionError as err: |
| raise AssertionError("\n%s\nlast=%s\nnew=%s" % ( |
| err, pprint.pformat(last_ret), pprint.pformat(new_ret))) |
| |
| def test_sys_cpu_percent(self): |
| last = psutil.cpu_percent(interval=0.001) |
| for x in range(100): |
| new = psutil.cpu_percent(interval=None) |
| self._test_cpu_percent(new, last, new) |
| last = new |
| |
| def test_sys_per_cpu_percent(self): |
| last = psutil.cpu_percent(interval=0.001, percpu=True) |
| self.assertEqual(len(last), psutil.cpu_count()) |
| for x in range(100): |
| new = psutil.cpu_percent(interval=None, percpu=True) |
| for percent in new: |
| self._test_cpu_percent(percent, last, new) |
| last = new |
| |
| def test_sys_cpu_times_percent(self): |
| last = psutil.cpu_times_percent(interval=0.001) |
| for x in range(100): |
| new = psutil.cpu_times_percent(interval=None) |
| for percent in new: |
| self._test_cpu_percent(percent, last, new) |
| self._test_cpu_percent(sum(new), last, new) |
| last = new |
| |
| def test_sys_per_cpu_times_percent(self): |
| last = psutil.cpu_times_percent(interval=0.001, percpu=True) |
| self.assertEqual(len(last), psutil.cpu_count()) |
| for x in range(100): |
| new = psutil.cpu_times_percent(interval=None, percpu=True) |
| for cpu in new: |
| for percent in cpu: |
| self._test_cpu_percent(percent, last, new) |
| self._test_cpu_percent(sum(cpu), last, new) |
| last = new |
| |
| def test_sys_per_cpu_times_percent_negative(self): |
| # see: https://github.com/giampaolo/psutil/issues/645 |
| psutil.cpu_times_percent(percpu=True) |
| zero_times = [x._make([0 for x in range(len(x._fields))]) |
| for x in psutil.cpu_times(percpu=True)] |
| with mock.patch('psutil.cpu_times', return_value=zero_times): |
| for cpu in psutil.cpu_times_percent(percpu=True): |
| for percent in cpu: |
| self._test_cpu_percent(percent, None, None) |
| |
| @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), |
| "os.statvfs() function not available on this platform") |
| def test_disk_usage(self): |
| usage = psutil.disk_usage(os.getcwd()) |
| assert usage.total > 0, usage |
| assert usage.used > 0, usage |
| assert usage.free > 0, usage |
| assert usage.total > usage.used, usage |
| assert usage.total > usage.free, usage |
| assert 0 <= usage.percent <= 100, usage.percent |
| if hasattr(shutil, 'disk_usage'): |
| # py >= 3.3, see: http://bugs.python.org/issue12442 |
| shutil_usage = shutil.disk_usage(os.getcwd()) |
| tolerance = 5 * 1024 * 1024 # 5MB |
| self.assertEqual(usage.total, shutil_usage.total) |
| self.assertAlmostEqual(usage.free, shutil_usage.free, |
| delta=tolerance) |
| self.assertAlmostEqual(usage.used, shutil_usage.used, |
| delta=tolerance) |
| |
| # if path does not exist OSError ENOENT is expected across |
| # all platforms |
| fname = tempfile.mktemp() |
| try: |
| psutil.disk_usage(fname) |
| except OSError as err: |
| if err.args[0] != errno.ENOENT: |
| raise |
| else: |
| self.fail("OSError not raised") |
| |
| @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), |
| "os.statvfs() function not available on this platform") |
| def test_disk_usage_unicode(self): |
| # see: https://github.com/giampaolo/psutil/issues/416 |
| # XXX this test is not really reliable as it always fails on |
| # Python 3.X (2.X is fine) |
| try: |
| safe_rmdir(TESTFN_UNICODE) |
| os.mkdir(TESTFN_UNICODE) |
| psutil.disk_usage(TESTFN_UNICODE) |
| safe_rmdir(TESTFN_UNICODE) |
| except UnicodeEncodeError: |
| pass |
| |
| @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), |
| "os.statvfs() function not available on this platform") |
| @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis") |
| def test_disk_partitions(self): |
| # all = False |
| ls = psutil.disk_partitions(all=False) |
| # on travis we get: |
| # self.assertEqual(p.cpu_affinity(), [n]) |
| # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0] |
| self.assertTrue(ls, msg=ls) |
| for disk in ls: |
| if WINDOWS and 'cdrom' in disk.opts: |
| continue |
| if not POSIX: |
| assert os.path.exists(disk.device), disk |
| else: |
| # we cannot make any assumption about this, see: |
| # http://goo.gl/p9c43 |
| disk.device |
| if SUNOS: |
| # on solaris apparently mount points can also be files |
| assert os.path.exists(disk.mountpoint), disk |
| else: |
| assert os.path.isdir(disk.mountpoint), disk |
| assert disk.fstype, disk |
| self.assertIsInstance(disk.opts, str) |
| |
| # all = True |
| ls = psutil.disk_partitions(all=True) |
| self.assertTrue(ls, msg=ls) |
| for disk in psutil.disk_partitions(all=True): |
| if not WINDOWS: |
| try: |
| os.stat(disk.mountpoint) |
| except OSError as err: |
| # http://mail.python.org/pipermail/python-dev/ |
| # 2012-June/120787.html |
| if err.errno not in (errno.EPERM, errno.EACCES): |
| raise |
| else: |
| if SUNOS: |
| # on solaris apparently mount points can also be files |
| assert os.path.exists(disk.mountpoint), disk |
| else: |
| assert os.path.isdir(disk.mountpoint), disk |
| self.assertIsInstance(disk.fstype, str) |
| self.assertIsInstance(disk.opts, str) |
| |
| def find_mount_point(path): |
| path = os.path.abspath(path) |
| while not os.path.ismount(path): |
| path = os.path.dirname(path) |
| return path |
| |
| mount = find_mount_point(__file__) |
| mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] |
| self.assertIn(mount, mounts) |
| psutil.disk_usage(mount) |
| |
| @skip_on_access_denied() |
| def test_net_connections(self): |
| def check(cons, families, types_): |
| for conn in cons: |
| self.assertIn(conn.family, families, msg=conn) |
| if conn.family != getattr(socket, 'AF_UNIX', object()): |
| self.assertIn(conn.type, types_, msg=conn) |
| |
| from psutil._common import conn_tmap |
| for kind, groups in conn_tmap.items(): |
| if SUNOS and kind == 'unix': |
| continue |
| families, types_ = groups |
| cons = psutil.net_connections(kind) |
| self.assertEqual(len(cons), len(set(cons))) |
| check(cons, families, types_) |
| |
| def test_net_io_counters(self): |
| def check_ntuple(nt): |
| self.assertEqual(nt[0], nt.bytes_sent) |
| self.assertEqual(nt[1], nt.bytes_recv) |
| self.assertEqual(nt[2], nt.packets_sent) |
| self.assertEqual(nt[3], nt.packets_recv) |
| self.assertEqual(nt[4], nt.errin) |
| self.assertEqual(nt[5], nt.errout) |
| self.assertEqual(nt[6], nt.dropin) |
| self.assertEqual(nt[7], nt.dropout) |
| assert nt.bytes_sent >= 0, nt |
| assert nt.bytes_recv >= 0, nt |
| assert nt.packets_sent >= 0, nt |
| assert nt.packets_recv >= 0, nt |
| assert nt.errin >= 0, nt |
| assert nt.errout >= 0, nt |
| assert nt.dropin >= 0, nt |
| assert nt.dropout >= 0, nt |
| |
| ret = psutil.net_io_counters(pernic=False) |
| check_ntuple(ret) |
| ret = psutil.net_io_counters(pernic=True) |
| self.assertNotEqual(ret, []) |
| for key in ret: |
| self.assertTrue(key) |
| check_ntuple(ret[key]) |
| |
| def test_net_if_addrs(self): |
| nics = psutil.net_if_addrs() |
| assert nics, nics |
| |
| # Not reliable on all platforms (net_if_addrs() reports more |
| # interfaces). |
| # self.assertEqual(sorted(nics.keys()), |
| # sorted(psutil.net_io_counters(pernic=True).keys())) |
| |
| families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK]) |
| for nic, addrs in nics.items(): |
| self.assertEqual(len(set(addrs)), len(addrs)) |
| for addr in addrs: |
| self.assertIsInstance(addr.family, int) |
| self.assertIsInstance(addr.address, str) |
| self.assertIsInstance(addr.netmask, (str, type(None))) |
| self.assertIsInstance(addr.broadcast, (str, type(None))) |
| self.assertIn(addr.family, families) |
| if sys.version_info >= (3, 4): |
| self.assertIsInstance(addr.family, enum.IntEnum) |
| if addr.family == socket.AF_INET: |
| s = socket.socket(addr.family) |
| with contextlib.closing(s): |
| s.bind((addr.address, 0)) |
| elif addr.family == socket.AF_INET6: |
| info = socket.getaddrinfo( |
| addr.address, 0, socket.AF_INET6, socket.SOCK_STREAM, |
| 0, socket.AI_PASSIVE)[0] |
| af, socktype, proto, canonname, sa = info |
| s = socket.socket(af, socktype, proto) |
| with contextlib.closing(s): |
| s.bind(sa) |
| for ip in (addr.address, addr.netmask, addr.broadcast): |
| if ip is not None: |
| # TODO: skip AF_INET6 for now because I get: |
| # AddressValueError: Only hex digits permitted in |
| # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0' |
| if addr.family != AF_INET6: |
| check_ip_address(ip, addr.family) |
| |
| if BSD or OSX or SUNOS: |
| if hasattr(socket, "AF_LINK"): |
| self.assertEqual(psutil.AF_LINK, socket.AF_LINK) |
| elif LINUX: |
| self.assertEqual(psutil.AF_LINK, socket.AF_PACKET) |
| elif WINDOWS: |
| self.assertEqual(psutil.AF_LINK, -1) |
| |
| @unittest.skipIf(TRAVIS, "EPERM on travis") |
| def test_net_if_stats(self): |
| nics = psutil.net_if_stats() |
| assert nics, nics |
| all_duplexes = (psutil.NIC_DUPLEX_FULL, |
| psutil.NIC_DUPLEX_HALF, |
| psutil.NIC_DUPLEX_UNKNOWN) |
| for nic, stats in nics.items(): |
| isup, duplex, speed, mtu = stats |
| self.assertIsInstance(isup, bool) |
| self.assertIn(duplex, all_duplexes) |
| self.assertIn(duplex, all_duplexes) |
| self.assertGreaterEqual(speed, 0) |
| self.assertGreaterEqual(mtu, 0) |
| |
| @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'), |
| '/proc/diskstats not available on this linux version') |
| @unittest.skipIf(APPVEYOR, |
| "can't find any physical disk on Appveyor") |
| def test_disk_io_counters(self): |
| def check_ntuple(nt): |
| self.assertEqual(nt[0], nt.read_count) |
| self.assertEqual(nt[1], nt.write_count) |
| self.assertEqual(nt[2], nt.read_bytes) |
| self.assertEqual(nt[3], nt.write_bytes) |
| self.assertEqual(nt[4], nt.read_time) |
| self.assertEqual(nt[5], nt.write_time) |
| assert nt.read_count >= 0, nt |
| assert nt.write_count >= 0, nt |
| assert nt.read_bytes >= 0, nt |
| assert nt.write_bytes >= 0, nt |
| assert nt.read_time >= 0, nt |
| assert nt.write_time >= 0, nt |
| |
| ret = psutil.disk_io_counters(perdisk=False) |
| check_ntuple(ret) |
| ret = psutil.disk_io_counters(perdisk=True) |
| # make sure there are no duplicates |
| self.assertEqual(len(ret), len(set(ret))) |
| for key in ret: |
| assert key, key |
| check_ntuple(ret[key]) |
| if LINUX and key[-1].isdigit(): |
| # if 'sda1' is listed 'sda' shouldn't, see: |
| # https://github.com/giampaolo/psutil/issues/338 |
| while key[-1].isdigit(): |
| key = key[:-1] |
| self.assertNotIn(key, ret.keys()) |
| |
| def test_users(self): |
| users = psutil.users() |
| if not APPVEYOR: |
| self.assertNotEqual(users, []) |
| for user in users: |
| assert user.name, user |
| user.terminal |
| user.host |
| assert user.started > 0.0, user |
| datetime.datetime.fromtimestamp(user.started) |
| |
| |
| # =================================================================== |
| # --- psutil.Process class tests |
| # =================================================================== |
| |
| class TestProcess(unittest.TestCase): |
| """Tests for psutil.Process class.""" |
| |
| def setUp(self): |
| safe_remove(TESTFN) |
| |
| def tearDown(self): |
| reap_children() |
| |
| def test_pid(self): |
| self.assertEqual(psutil.Process().pid, os.getpid()) |
| sproc = get_test_subprocess() |
| self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) |
| |
| def test_kill(self): |
| sproc = get_test_subprocess(wait=True) |
| test_pid = sproc.pid |
| p = psutil.Process(test_pid) |
| p.kill() |
| sig = p.wait() |
| self.assertFalse(psutil.pid_exists(test_pid)) |
| if POSIX: |
| self.assertEqual(sig, signal.SIGKILL) |
| |
| def test_terminate(self): |
| sproc = get_test_subprocess(wait=True) |
| test_pid = sproc.pid |
| p = psutil.Process(test_pid) |
| p.terminate() |
| sig = p.wait() |
| self.assertFalse(psutil.pid_exists(test_pid)) |
| if POSIX: |
| self.assertEqual(sig, signal.SIGTERM) |
| |
| def test_send_signal(self): |
| sig = signal.SIGKILL if POSIX else signal.SIGTERM |
| sproc = get_test_subprocess() |
| p = psutil.Process(sproc.pid) |
| p.send_signal(sig) |
| exit_sig = p.wait() |
| self.assertFalse(psutil.pid_exists(p.pid)) |
| if POSIX: |
| self.assertEqual(exit_sig, sig) |
| # |
| sproc = get_test_subprocess() |
| p = psutil.Process(sproc.pid) |
| p.send_signal(sig) |
| with mock.patch('psutil.os.kill', |
| side_effect=OSError(errno.ESRCH, "")) as fun: |
| with self.assertRaises(psutil.NoSuchProcess): |
| p.send_signal(sig) |
| assert fun.called |
| # |
| sproc = get_test_subprocess() |
| p = psutil.Process(sproc.pid) |
| p.send_signal(sig) |
| with mock.patch('psutil.os.kill', |
| side_effect=OSError(errno.EPERM, "")) as fun: |
| with self.assertRaises(psutil.AccessDenied): |
| p.send_signal(sig) |
| assert fun.called |
| |
| def test_wait(self): |
| # check exit code signal |
| sproc = get_test_subprocess() |
| p = psutil.Process(sproc.pid) |
| p.kill() |
| code = p.wait() |
| if POSIX: |
| self.assertEqual(code, signal.SIGKILL) |
| else: |
| self.assertEqual(code, 0) |
| self.assertFalse(p.is_running()) |
| |
| sproc = get_test_subprocess() |
| p = psutil.Process(sproc.pid) |
| p.terminate() |
| code = p.wait() |
| if POSIX: |
| self.assertEqual(code, signal.SIGTERM) |
| else: |
| self.assertEqual(code, 0) |
| self.assertFalse(p.is_running()) |
| |
| # check sys.exit() code |
| code = "import time, sys; time.sleep(0.01); sys.exit(5);" |
| sproc = get_test_subprocess([PYTHON, "-c", code]) |
| p = psutil.Process(sproc.pid) |
| self.assertEqual(p.wait(), 5) |
| self.assertFalse(p.is_running()) |
| |
| # Test wait() issued twice. |
| # It is not supposed to raise NSP when the process is gone. |
| # On UNIX this should return None, on Windows it should keep |
| # returning the exit code. |
| sproc = get_test_subprocess([PYTHON, "-c", code]) |
| p = psutil.Process(sproc.pid) |
| self.assertEqual(p.wait(), 5) |
| self.assertIn(p.wait(), (5, None)) |
| |
| # test timeout |
| sproc = get_test_subprocess() |
| p = psutil.Process(sproc.pid) |
| p.name() |
| self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) |
| |
| # timeout < 0 not allowed |
| self.assertRaises(ValueError, p.wait, -1) |
| |
| # XXX why is this skipped on Windows? |
| @unittest.skipUnless(POSIX, 'skipped on Windows') |
| def test_wait_non_children(self): |
| # test wait() against processes which are not our children |
| code = "import sys;" |
| code += "from subprocess import Popen, PIPE;" |
| code += "cmd = ['%s', '-c', 'import time; time.sleep(60)'];" % PYTHON |
| code += "sp = Popen(cmd, stdout=PIPE);" |
| code += "sys.stdout.write(str(sp.pid));" |
| sproc = get_test_subprocess([PYTHON, "-c", code], |
| stdout=subprocess.PIPE) |
| grandson_pid = int(sproc.stdout.read()) |
| grandson_proc = psutil.Process(grandson_pid) |
| try: |
| self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01) |
| grandson_proc.kill() |
| ret = grandson_proc.wait() |
| self.assertEqual(ret, None) |
| finally: |
| if grandson_proc.is_running(): |
| grandson_proc.kill() |
| grandson_proc.wait() |
| |
| def test_wait_timeout_0(self): |
| sproc = get_test_subprocess() |
| p = psutil.Process(sproc.pid) |
| self.assertRaises(psutil.TimeoutExpired, p.wait, 0) |
| p.kill() |
| stop_at = time.time() + 2 |
| while True: |
| try: |
| code = p.wait(0) |
| except psutil.TimeoutExpired: |
| if time.time() >= stop_at: |
| raise |
| else: |
| break |
| if POSIX: |
| self.assertEqual(code, signal.SIGKILL) |
| else: |
| self.assertEqual(code, 0) |
| self.assertFalse(p.is_running()) |
| |
| def test_cpu_percent(self): |
| p = psutil.Process() |
| p.cpu_percent(interval=0.001) |
| p.cpu_percent(interval=0.001) |
| for x in range(100): |
| percent = p.cpu_percent(interval=None) |
| self.assertIsInstance(percent, float) |
| self.assertGreaterEqual(percent, 0.0) |
| if not POSIX: |
| self.assertLessEqual(percent, 100.0) |
| else: |
| self.assertGreaterEqual(percent, 0.0) |
| |
| def test_cpu_times(self): |
| times = psutil.Process().cpu_times() |
| assert (times.user > 0.0) or (times.system > 0.0), times |
| # make sure returned values can be pretty printed with strftime |
| time.strftime("%H:%M:%S", time.localtime(times.user)) |
| time.strftime("%H:%M:%S", time.localtime(times.system)) |
| |
| # Test Process.cpu_times() against os.times() |
| # os.times() is broken on Python 2.6 |
| # http://bugs.python.org/issue1040026 |
| # XXX fails on OSX: not sure if it's for os.times(). We should |
| # try this with Python 2.7 and re-enable the test. |
| |
| @unittest.skipUnless(sys.version_info > (2, 6, 1) and not OSX, |
| 'os.times() is not reliable on this Python version') |
| def test_cpu_times2(self): |
| user_time, kernel_time = psutil.Process().cpu_times() |
| utime, ktime = os.times()[:2] |
| |
| # Use os.times()[:2] as base values to compare our results |
| # using a tolerance of +/- 0.1 seconds. |
| # It will fail if the difference between the values is > 0.1s. |
| if (max([user_time, utime]) - min([user_time, utime])) > 0.1: |
| self.fail("expected: %s, found: %s" % (utime, user_time)) |
| |
| if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: |
| self.fail("expected: %s, found: %s" % (ktime, kernel_time)) |
| |
| def test_create_time(self): |
| sproc = get_test_subprocess(wait=True) |
| now = time.time() |
| p = psutil.Process(sproc.pid) |
| create_time = p.create_time() |
| |
| # Use time.time() as base value to compare our result using a |
| # tolerance of +/- 1 second. |
| # It will fail if the difference between the values is > 2s. |
| difference = abs(create_time - now) |
| if difference > 2: |
| self.fail("expected: %s, found: %s, difference: %s" |
| % (now, create_time, difference)) |
| |
| # make sure returned value can be pretty printed with strftime |
| time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time())) |
| |
| @unittest.skipIf(WINDOWS, 'Windows only') |
| def test_terminal(self): |
| terminal = psutil.Process().terminal() |
| if sys.stdin.isatty(): |
| self.assertEqual(terminal, sh('tty')) |
| else: |
| assert terminal, repr(terminal) |
| |
| @unittest.skipUnless(LINUX or BSD or WINDOWS, |
| 'not available on this platform') |
| @skip_on_not_implemented(only_if=LINUX) |
| def test_io_counters(self): |
| p = psutil.Process() |
| # test reads |
| io1 = p.io_counters() |
| with open(PYTHON, 'rb') as f: |
| f.read() |
| io2 = p.io_counters() |
| if not BSD: |
| assert io2.read_count > io1.read_count, (io1, io2) |
| self.assertEqual(io2.write_count, io1.write_count) |
| assert io2.read_bytes >= io1.read_bytes, (io1, io2) |
| assert io2.write_bytes >= io1.write_bytes, (io1, io2) |
| # test writes |
| io1 = p.io_counters() |
| with tempfile.TemporaryFile(prefix=TESTFILE_PREFIX) as f: |
| if PY3: |
| f.write(bytes("x" * 1000000, 'ascii')) |
| else: |
| f.write("x" * 1000000) |
| io2 = p.io_counters() |
| assert io2.write_count >= io1.write_count, (io1, io2) |
| assert io2.write_bytes >= io1.write_bytes, (io1, io2) |
| assert io2.read_count >= io1.read_count, (io1, io2) |
| assert io2.read_bytes >= io1.read_bytes, (io1, io2) |
| |
| @unittest.skipUnless(LINUX or (WINDOWS and get_winver() >= WIN_VISTA), |
| 'Linux and Windows Vista only') |
| @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis") |
| def test_ionice(self): |
| if LINUX: |
| from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, |
| IOPRIO_CLASS_BE, IOPRIO_CLASS_IDLE) |
| self.assertEqual(IOPRIO_CLASS_NONE, 0) |
| self.assertEqual(IOPRIO_CLASS_RT, 1) |
| self.assertEqual(IOPRIO_CLASS_BE, 2) |
| self.assertEqual(IOPRIO_CLASS_IDLE, 3) |
| p = psutil.Process() |
| try: |
| p.ionice(2) |
| ioclass, value = p.ionice() |
| if enum is not None: |
| self.assertIsInstance(ioclass, enum.IntEnum) |
| self.assertEqual(ioclass, 2) |
| self.assertEqual(value, 4) |
| # |
| p.ionice(3) |
| ioclass, value = p.ionice() |
| self.assertEqual(ioclass, 3) |
| self.assertEqual(value, 0) |
| # |
| p.ionice(2, 0) |
| ioclass, value = p.ionice() |
| self.assertEqual(ioclass, 2) |
| self.assertEqual(value, 0) |
| p.ionice(2, 7) |
| ioclass, value = p.ionice() |
| self.assertEqual(ioclass, 2) |
| self.assertEqual(value, 7) |
| # |
| self.assertRaises(ValueError, p.ionice, 2, 10) |
| self.assertRaises(ValueError, p.ionice, 2, -1) |
| self.assertRaises(ValueError, p.ionice, 4) |
| self.assertRaises(TypeError, p.ionice, 2, "foo") |
| self.assertRaisesRegexp( |
| ValueError, "can't specify value with IOPRIO_CLASS_NONE", |
| p.ionice, psutil.IOPRIO_CLASS_NONE, 1) |
| self.assertRaisesRegexp( |
| ValueError, "can't specify value with IOPRIO_CLASS_IDLE", |
| p.ionice, psutil.IOPRIO_CLASS_IDLE, 1) |
| self.assertRaisesRegexp( |
| ValueError, "'ioclass' argument must be specified", |
| p.ionice, value=1) |
| finally: |
| p.ionice(IOPRIO_CLASS_NONE) |
| else: |
| p = psutil.Process() |
| original = p.ionice() |
| self.assertIsInstance(original, int) |
| try: |
| value = 0 # very low |
| if original == value: |
| value = 1 # low |
| p.ionice(value) |
| self.assertEqual(p.ionice(), value) |
| finally: |
| p.ionice(original) |
| # |
| self.assertRaises(ValueError, p.ionice, 3) |
| self.assertRaises(TypeError, p.ionice, 2, 1) |
| |
| @unittest.skipUnless(LINUX and RLIMIT_SUPPORT, |
| "only available on Linux >= 2.6.36") |
| def test_rlimit_get(self): |
| import resource |
| p = psutil.Process(os.getpid()) |
| names = [x for x in dir(psutil) if x.startswith('RLIMIT')] |
| assert names, names |
| for name in names: |
| value = getattr(psutil, name) |
| self.assertGreaterEqual(value, 0) |
| if name in dir(resource): |
| self.assertEqual(value, getattr(resource, name)) |
| self.assertEqual(p.rlimit(value), resource.getrlimit(value)) |
| else: |
| ret = p.rlimit(value) |
| self.assertEqual(len(ret), 2) |
| self.assertGreaterEqual(ret[0], -1) |
| self.assertGreaterEqual(ret[1], -1) |
| |
| @unittest.skipUnless(LINUX and RLIMIT_SUPPORT, |
| "only available on Linux >= 2.6.36") |
| def test_rlimit_set(self): |
| sproc = get_test_subprocess() |
| p = psutil.Process(sproc.pid) |
| p.rlimit(psutil.RLIMIT_NOFILE, (5, 5)) |
| self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5)) |
| # If pid is 0 prlimit() applies to the calling process and |
| # we don't want that. |
| with self.assertRaises(ValueError): |
| psutil._psplatform.Process(0).rlimit(0) |
| with self.assertRaises(ValueError): |
| p.rlimit(psutil.RLIMIT_NOFILE, (5, 5, 5)) |
| |
| def test_num_threads(self): |
| # on certain platforms such as Linux we might test for exact |
| # thread number, since we always have with 1 thread per process, |
| # but this does not apply across all platforms (OSX, Windows) |
| p = psutil.Process() |
| step1 = p.num_threads() |
| |
| thread = ThreadTask() |
| thread.start() |
| try: |
| step2 = p.num_threads() |
| self.assertEqual(step2, step1 + 1) |
| thread.stop() |
| finally: |
| if thread._running: |
| thread.stop() |
| |
| @unittest.skipUnless(WINDOWS, 'Windows only') |
| def test_num_handles(self): |
| # a better test is done later into test/_windows.py |
| p = psutil.Process() |
| self.assertGreater(p.num_handles(), 0) |
| |
| def test_threads(self): |
| p = psutil.Process() |
| step1 = p.threads() |
| |
| thread = ThreadTask() |
| thread.start() |
| |
| try: |
| step2 = p.threads() |
| self.assertEqual(len(step2), len(step1) + 1) |
| # on Linux, first thread id is supposed to be this process |
| if LINUX: |
| self.assertEqual(step2[0].id, os.getpid()) |
| athread = step2[0] |
| # test named tuple |
| self.assertEqual(athread.id, athread[0]) |
| self.assertEqual(athread.user_time, athread[1]) |
| self.assertEqual(athread.system_time, athread[2]) |
| # test num threads |
| thread.stop() |
| finally: |
| if thread._running: |
| thread.stop() |
| |
| def test_memory_info(self): |
| p = psutil.Process() |
| |
| # step 1 - get a base value to compare our results |
| rss1, vms1 = p.memory_info() |
| percent1 = p.memory_percent() |
| self.assertGreater(rss1, 0) |
| self.assertGreater(vms1, 0) |
| |
| # step 2 - allocate some memory |
| memarr = [None] * 1500000 |
| |
| rss2, vms2 = p.memory_info() |
| percent2 = p.memory_percent() |
| # make sure that the memory usage bumped up |
| self.assertGreater(rss2, rss1) |
| self.assertGreaterEqual(vms2, vms1) # vms might be equal |
| self.assertGreater(percent2, percent1) |
| del memarr |
| |
| # def test_memory_info_ex(self): |
| # # tested later in fetch all test suite |
| |
| def test_memory_maps(self): |
| p = psutil.Process() |
| maps = p.memory_maps() |
| paths = [x for x in maps] |
| self.assertEqual(len(paths), len(set(paths))) |
| ext_maps = p.memory_maps(grouped=False) |
| |
| for nt in maps: |
| if not nt.path.startswith('['): |
| assert os.path.isabs(nt.path), nt.path |
| if POSIX: |
| assert os.path.exists(nt.path), nt.path |
| else: |
| # XXX - On Windows we have this strange behavior with |
| # 64 bit dlls: they are visible via explorer but cannot |
| # be accessed via os.stat() (wtf?). |
| if '64' not in os.path.basename(nt.path): |
| assert os.path.exists(nt.path), nt.path |
| for nt in ext_maps: |
| for fname in nt._fields: |
| value = getattr(nt, fname) |
| if fname == 'path': |
| continue |
| elif fname in ('addr', 'perms'): |
| assert value, value |
| else: |
| self.assertIsInstance(value, (int, long)) |
| assert value >= 0, value |
| |
| def test_memory_percent(self): |
| p = psutil.Process() |
| self.assertGreater(p.memory_percent(), 0.0) |
| |
| def test_is_running(self): |
| sproc = get_test_subprocess(wait=True) |
| p = psutil.Process(sproc.pid) |
| assert p.is_running() |
| assert p.is_running() |
| p.kill() |
| p.wait() |
| assert not p.is_running() |
| assert not p.is_running() |
| |
| def test_exe(self): |
| sproc = get_test_subprocess(wait=True) |
| exe = psutil.Process(sproc.pid).exe() |
| try: |
| self.assertEqual(exe, PYTHON) |
| except AssertionError: |
| if WINDOWS and len(exe) == len(PYTHON): |
| # on Windows we don't care about case sensitivity |
| self.assertEqual(exe.lower(), PYTHON.lower()) |
| else: |
| # certain platforms such as BSD are more accurate returning: |
| # "/usr/local/bin/python2.7" |
| # ...instead of: |
| # "/usr/local/bin/python" |
| # We do not want to consider this difference in accuracy |
| # an error. |
| ver = "%s.%s" % (sys.version_info[0], sys.version_info[1]) |
| self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, '')) |
| |
| def test_cmdline(self): |
| cmdline = [PYTHON, "-c", "import time; time.sleep(60)"] |
| sproc = get_test_subprocess(cmdline, wait=True) |
| self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()), |
| ' '.join(cmdline)) |
| |
| def test_name(self): |
| sproc = get_test_subprocess(PYTHON, wait=True) |
| name = psutil.Process(sproc.pid).name().lower() |
| pyexe = os.path.basename(os.path.realpath(sys.executable)).lower() |
| assert pyexe.startswith(name), (pyexe, name) |
| |
| @unittest.skipUnless(POSIX, "posix only") |
| # TODO: add support for other compilers |
| @unittest.skipUnless(which("gcc"), "gcc not available") |
| def test_prog_w_funky_name(self): |
| # Test that name(), exe() and cmdline() correctly handle programs |
| # with funky chars such as spaces and ")", see: |
| # https://github.com/giampaolo/psutil/issues/628 |
| funky_name = "/tmp/foo bar )" |
| _, c_file = tempfile.mkstemp(prefix='psutil-', suffix='.c', dir="/tmp") |
| self.addCleanup(lambda: safe_remove(c_file)) |
| self.addCleanup(lambda: safe_remove(funky_name)) |
| with open(c_file, "w") as f: |
| f.write("void main() { pause(); }") |
| subprocess.check_call(["gcc", c_file, "-o", funky_name]) |
| sproc = get_test_subprocess( |
| [funky_name, "arg1", "arg2", "", "arg3", ""]) |
| p = psutil.Process(sproc.pid) |
| # ...in order to try to prevent occasional failures on travis |
| wait_for_pid(p.pid) |
| self.assertEqual(p.name(), "foo bar )") |
| self.assertEqual(p.exe(), "/tmp/foo bar )") |
| self.assertEqual( |
| p.cmdline(), ["/tmp/foo bar )", "arg1", "arg2", "", "arg3", ""]) |
| |
| @unittest.skipUnless(POSIX, 'posix only') |
| def test_uids(self): |
| p = psutil.Process() |
| real, effective, saved = p.uids() |
| # os.getuid() refers to "real" uid |
| self.assertEqual(real, os.getuid()) |
| # os.geteuid() refers to "effective" uid |
| self.assertEqual(effective, os.geteuid()) |
| # no such thing as os.getsuid() ("saved" uid), but starting |
| # from python 2.7 we have os.getresuid()[2] |
| if hasattr(os, "getresuid"): |
| self.assertEqual(saved, os.getresuid()[2]) |
| |
| @unittest.skipUnless(POSIX, 'posix only') |
| def test_gids(self): |
| p = psutil.Process() |
| real, effective, saved = p.gids() |
| # os.getuid() refers to "real" uid |
| self.assertEqual(real, os.getgid()) |
| # os.geteuid() refers to "effective" uid |
| self.assertEqual(effective, os.getegid()) |
| # no such thing as os.getsuid() ("saved" uid), but starting |
| # from python 2.7 we have os.getresgid()[2] |
| if hasattr(os, "getresuid"): |
| self.assertEqual(saved, os.getresgid()[2]) |
| |
| def test_nice(self): |
| p = psutil.Process() |
| self.assertRaises(TypeError, p.nice, "str") |
| if WINDOWS: |
| try: |
| init = p.nice() |
| if sys.version_info > (3, 4): |
| self.assertIsInstance(init, enum.IntEnum) |
| else: |
| self.assertIsInstance(init, int) |
| self.assertEqual(init, psutil.NORMAL_PRIORITY_CLASS) |
| p.nice(psutil.HIGH_PRIORITY_CLASS) |
| self.assertEqual(p.nice(), psutil.HIGH_PRIORITY_CLASS) |
| p.nice(psutil.NORMAL_PRIORITY_CLASS) |
| self.assertEqual(p.nice(), psutil.NORMAL_PRIORITY_CLASS) |
| finally: |
| p.nice(psutil.NORMAL_PRIORITY_CLASS) |
| else: |
| try: |
| first_nice = p.nice() |
| p.nice(1) |
| self.assertEqual(p.nice(), 1) |
| # going back to previous nice value raises |
| # AccessDenied on OSX |
| if not OSX: |
| p.nice(0) |
| self.assertEqual(p.nice(), 0) |
| except psutil.AccessDenied: |
| pass |
| finally: |
| try: |
| p.nice(first_nice) |
| except psutil.AccessDenied: |
| pass |
| |
| def test_status(self): |
| p = psutil.Process() |
| self.assertEqual(p.status(), psutil.STATUS_RUNNING) |
| |
| def test_username(self): |
| sproc = get_test_subprocess() |
| p = psutil.Process(sproc.pid) |
| if POSIX: |
| import pwd |
| self.assertEqual(p.username(), pwd.getpwuid(os.getuid()).pw_name) |
| with mock.patch("psutil.pwd.getpwuid", |
| side_effect=KeyError) as fun: |
| p.username() == str(p.uids().real) |
| assert fun.called |
| |
| elif WINDOWS and 'USERNAME' in os.environ: |
| expected_username = os.environ['USERNAME'] |
| expected_domain = os.environ['USERDOMAIN'] |
| domain, username = p.username().split('\\') |
| self.assertEqual(domain, expected_domain) |
| self.assertEqual(username, expected_username) |
| else: |
| p.username() |
| |
| def test_cwd(self): |
| sproc = get_test_subprocess(wait=True) |
| p = psutil.Process(sproc.pid) |
| self.assertEqual(p.cwd(), os.getcwd()) |
| |
| def test_cwd_2(self): |
| cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(60)"] |
| sproc = get_test_subprocess(cmd, wait=True) |
| p = psutil.Process(sproc.pid) |
| call_until(p.cwd, "ret == os.path.dirname(os.getcwd())") |
| |
| @unittest.skipUnless(WINDOWS or LINUX or BSD, |
| 'not available on this platform') |
| @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis") |
| def test_cpu_affinity(self): |
| p = psutil.Process() |
| initial = p.cpu_affinity() |
| if hasattr(os, "sched_getaffinity"): |
| self.assertEqual(initial, list(os.sched_getaffinity(p.pid))) |
| self.assertEqual(len(initial), len(set(initial))) |
| all_cpus = list(range(len(psutil.cpu_percent(percpu=True)))) |
| # setting on travis doesn't seem to work (always return all |
| # CPUs on get): |
| # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, ... != [0] |
| for n in all_cpus: |
| p.cpu_affinity([n]) |
| self.assertEqual(p.cpu_affinity(), [n]) |
| if hasattr(os, "sched_getaffinity"): |
| self.assertEqual(p.cpu_affinity(), |
| list(os.sched_getaffinity(p.pid))) |
| # |
| p.cpu_affinity(all_cpus) |
| self.assertEqual(p.cpu_affinity(), all_cpus) |
| if hasattr(os, "sched_getaffinity"): |
| self.assertEqual(p.cpu_affinity(), |
| list(os.sched_getaffinity(p.pid))) |
| # |
| self.assertRaises(TypeError, p.cpu_affinity, 1) |
| p.cpu_affinity(initial) |
| # it should work with all iterables, not only lists |
| p.cpu_affinity(set(all_cpus)) |
| p.cpu_affinity(tuple(all_cpus)) |
| invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10] |
| self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu) |
| self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000)) |
| self.assertRaises(TypeError, p.cpu_affinity, [0, "1"]) |
| |
| # TODO |
| @unittest.skipIf(BSD, "broken on BSD, see #595") |
| @unittest.skipIf(APPVEYOR, |
| "can't find any process file on Appveyor") |
| def test_open_files(self): |
| # current process |
| p = psutil.Process() |
| files = p.open_files() |
| self.assertFalse(TESTFN in files) |
| with open(TESTFN, 'w'): |
| # give the kernel some time to see the new file |
| call_until(p.open_files, "len(ret) != %i" % len(files)) |
| filenames = [x.path for x in p.open_files()] |
| self.assertIn(TESTFN, filenames) |
| for file in filenames: |
| assert os.path.isfile(file), file |
| |
| # another process |
| cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % TESTFN |
| sproc = get_test_subprocess([PYTHON, "-c", cmdline], wait=True) |
| p = psutil.Process(sproc.pid) |
| |
| for x in range(100): |
| filenames = [x.path for x in p.open_files()] |
| if TESTFN in filenames: |
| break |
| time.sleep(.01) |
| else: |
| self.assertIn(TESTFN, filenames) |
| for file in filenames: |
| assert os.path.isfile(file), file |
| |
| # TODO |
| @unittest.skipIf(BSD, "broken on BSD, see #595") |
| @unittest.skipIf(APPVEYOR, |
| "can't find any process file on Appveyor") |
| def test_open_files2(self): |
| # test fd and path fields |
| with open(TESTFN, 'w') as fileobj: |
| p = psutil.Process() |
| for path, fd in p.open_files(): |
| if path == fileobj.name or fd == fileobj.fileno(): |
| break |
| else: |
| self.fail("no file found; files=%s" % repr(p.open_files())) |
| self.assertEqual(path, fileobj.name) |
| if WINDOWS: |
| self.assertEqual(fd, -1) |
| else: |
| self.assertEqual(fd, fileobj.fileno()) |
| # test positions |
| ntuple = p.open_files()[0] |
| self.assertEqual(ntuple[0], ntuple.path) |
| self.assertEqual(ntuple[1], ntuple.fd) |
| # test file is gone |
| self.assertTrue(fileobj.name not in p.open_files()) |
| |
| def compare_proc_sys_cons(self, pid, proc_cons): |
| from psutil._common import pconn |
| sys_cons = [] |
| for c in psutil.net_connections(kind='all'): |
| if c.pid == pid: |
| sys_cons.append(pconn(*c[:-1])) |
| if BSD: |
| # on BSD all fds are set to -1 |
| proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons] |
| self.assertEqual(sorted(proc_cons), sorted(sys_cons)) |
| |
| @skip_on_access_denied(only_if=OSX) |
| def test_connections(self): |
| def check_conn(proc, conn, family, type, laddr, raddr, status, kinds): |
| all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", |
| "tcp6", "udp", "udp4", "udp6") |
| check_connection_ntuple(conn) |
| self.assertEqual(conn.family, family) |
| self.assertEqual(conn.type, type) |
| self.assertEqual(conn.laddr, laddr) |
| self.assertEqual(conn.raddr, raddr) |
| self.assertEqual(conn.status, status) |
| for kind in all_kinds: |
| cons = proc.connections(kind=kind) |
| if kind in kinds: |
| self.assertNotEqual(cons, []) |
| else: |
| self.assertEqual(cons, []) |
| # compare against system-wide connections |
| # XXX Solaris can't retrieve system-wide UNIX |
| # sockets. |
| if not SUNOS: |
| self.compare_proc_sys_cons(proc.pid, [conn]) |
| |
| tcp_template = textwrap.dedent(""" |
| import socket, time |
| s = socket.socket($family, socket.SOCK_STREAM) |
| s.bind(('$addr', 0)) |
| s.listen(1) |
| with open('$testfn', 'w') as f: |
| f.write(str(s.getsockname()[:2])) |
| time.sleep(60) |
| """) |
| |
| udp_template = textwrap.dedent(""" |
| import socket, time |
| s = socket.socket($family, socket.SOCK_DGRAM) |
| s.bind(('$addr', 0)) |
| with open('$testfn', 'w') as f: |
| f.write(str(s.getsockname()[:2])) |
| time.sleep(60) |
| """) |
| |
| from string import Template |
| testfile = os.path.basename(TESTFN) |
| tcp4_template = Template(tcp_template).substitute( |
| family=int(AF_INET), addr="127.0.0.1", testfn=testfile) |
| udp4_template = Template(udp_template).substitute( |
| family=int(AF_INET), addr="127.0.0.1", testfn=testfile) |
| tcp6_template = Template(tcp_template).substitute( |
| family=int(AF_INET6), addr="::1", testfn=testfile) |
| udp6_template = Template(udp_template).substitute( |
| family=int(AF_INET6), addr="::1", testfn=testfile) |
| |
| # launch various subprocess instantiating a socket of various |
| # families and types to enrich psutil results |
| tcp4_proc = pyrun(tcp4_template) |
| tcp4_addr = eval(wait_for_file(testfile)) |
| udp4_proc = pyrun(udp4_template) |
| udp4_addr = eval(wait_for_file(testfile)) |
| if supports_ipv6(): |
| tcp6_proc = pyrun(tcp6_template) |
| tcp6_addr = eval(wait_for_file(testfile)) |
| udp6_proc = pyrun(udp6_template) |
| udp6_addr = eval(wait_for_file(testfile)) |
| else: |
| tcp6_proc = None |
| udp6_proc = None |
| tcp6_addr = None |
| udp6_addr = None |
| |
| for p in psutil.Process().children(): |
| cons = p.connections() |
| self.assertEqual(len(cons), 1) |
| for conn in cons: |
| # TCP v4 |
| if p.pid == tcp4_proc.pid: |
| check_conn(p, conn, AF_INET, SOCK_STREAM, tcp4_addr, (), |
| psutil.CONN_LISTEN, |
| ("all", "inet", "inet4", "tcp", "tcp4")) |
| # UDP v4 |
| elif p.pid == udp4_proc.pid: |
| check_conn(p, conn, AF_INET, SOCK_DGRAM, udp4_addr, (), |
| psutil.CONN_NONE, |
| ("all", "inet", "inet4", "udp", "udp4")) |
| # TCP v6 |
| elif p.pid == getattr(tcp6_proc, "pid", None): |
| check_conn(p, conn, AF_INET6, SOCK_STREAM, tcp6_addr, (), |
| psutil.CONN_LISTEN, |
| ("all", "inet", "inet6", "tcp", "tcp6")) |
| # UDP v6 |
| elif p.pid == getattr(udp6_proc, "pid", None): |
| check_conn(p, conn, AF_INET6, SOCK_DGRAM, udp6_addr, (), |
| psutil.CONN_NONE, |
| ("all", "inet", "inet6", "udp", "udp6")) |
| |
| @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), |
| 'AF_UNIX is not supported') |
| @skip_on_access_denied(only_if=OSX) |
| def test_connections_unix(self): |
| def check(type): |
| safe_remove(TESTFN) |
| sock = socket.socket(AF_UNIX, type) |
| with contextlib.closing(sock): |
| sock.bind(TESTFN) |
| cons = psutil.Process().connections(kind='unix') |
| conn = cons[0] |
| check_connection_ntuple(conn) |
| if conn.fd != -1: # != sunos and windows |
| self.assertEqual(conn.fd, sock.fileno()) |
| self.assertEqual(conn.family, AF_UNIX) |
| self.assertEqual(conn.type, type) |
| self.assertEqual(conn.laddr, TESTFN) |
| if not SUNOS: |
| # XXX Solaris can't retrieve system-wide UNIX |
| # sockets. |
| self.compare_proc_sys_cons(os.getpid(), cons) |
| |
| check(SOCK_STREAM) |
| check(SOCK_DGRAM) |
| |
| @unittest.skipUnless(hasattr(socket, "fromfd"), |
| 'socket.fromfd() is not availble') |
| @unittest.skipIf(WINDOWS or SUNOS, |
| 'connection fd not available on this platform') |
| def test_connection_fromfd(self): |
| with contextlib.closing(socket.socket()) as sock: |
| sock.bind(('localhost', 0)) |
| sock.listen(1) |
| p = psutil.Process() |
| for conn in p.connections(): |
| if conn.fd == sock.fileno(): |
| break |
| else: |
| self.fail("couldn't find socket fd") |
| dupsock = socket.fromfd(conn.fd, conn.family, conn.type) |
| with contextlib.closing(dupsock): |
| self.assertEqual(dupsock.getsockname(), conn.laddr) |
| self.assertNotEqual(sock.fileno(), dupsock.fileno()) |
| |
| def test_connection_constants(self): |
| ints = [] |
| strs = [] |
| for name in dir(psutil): |
| if name.startswith('CONN_'): |
| num = getattr(psutil, name) |
| str_ = str(num) |
| assert str_.isupper(), str_ |
| assert str_ not in strs, str_ |
| assert num not in ints, num |
| ints.append(num) |
| strs.append(str_) |
| if SUNOS: |
| psutil.CONN_IDLE |
| psutil.CONN_BOUND |
| if WINDOWS: |
| psutil.CONN_DELETE_TCB |
| |
| @unittest.skipUnless(POSIX, 'posix only') |
| def test_num_fds(self): |
| p = psutil.Process() |
| start = p.num_fds() |
| file = open(TESTFN, 'w') |
| self.addCleanup(file.close) |
| self.assertEqual(p.num_fds(), start + 1) |
| sock = socket.socket() |
| self.addCleanup(sock.close) |
| self.assertEqual(p.num_fds(), start + 2) |
| file.close() |
| sock.close() |
| self.assertEqual(p.num_fds(), start) |
| |
| @skip_on_not_implemented(only_if=LINUX) |
| def test_num_ctx_switches(self): |
| p = psutil.Process() |
| before = sum(p.num_ctx_switches()) |
| for x in range(500000): |
| after = sum(p.num_ctx_switches()) |
| if after > before: |
| return |
| self.fail("num ctx switches still the same after 50.000 iterations") |
| |
| def test_parent_ppid(self): |
| this_parent = os.getpid() |
| sproc = get_test_subprocess() |
| p = psutil.Process(sproc.pid) |
| self.assertEqual(p.ppid(), this_parent) |
| self.assertEqual(p.parent().pid, this_parent) |
| # no other process is supposed to have us as parent |
| for p in psutil.process_iter(): |
| if p.pid == sproc.pid: |
| continue |
| self.assertTrue(p.ppid() != this_parent) |
| |
| def test_children(self): |
| p = psutil.Process() |
| self.assertEqual(p.children(), []) |
| self.assertEqual(p.children(recursive=True), []) |
| sproc = get_test_subprocess() |
| children1 = p.children() |
| children2 = p.children(recursive=True) |
| for children in (children1, children2): |
| self.assertEqual(len(children), 1) |
| self.assertEqual(children[0].pid, sproc.pid) |
| self.assertEqual(children[0].ppid(), os.getpid()) |
| |
| def test_children_recursive(self): |
| # here we create a subprocess which creates another one as in: |
| # A (parent) -> B (child) -> C (grandchild) |
| s = "import subprocess, os, sys, time;" |
| s += "PYTHON = os.path.realpath(sys.executable);" |
| s += "cmd = [PYTHON, '-c', 'import time; time.sleep(60);'];" |
| s += "subprocess.Popen(cmd);" |
| s += "time.sleep(60);" |
| get_test_subprocess(cmd=[PYTHON, "-c", s]) |
| p = psutil.Process() |
| self.assertEqual(len(p.children(recursive=False)), 1) |
| # give the grandchild some time to start |
| stop_at = time.time() + GLOBAL_TIMEOUT |
| while time.time() < stop_at: |
| children = p.children(recursive=True) |
| if len(children) > 1: |
| break |
| self.assertEqual(len(children), 2) |
| self.assertEqual(children[0].ppid(), os.getpid()) |
| self.assertEqual(children[1].ppid(), children[0].pid) |
| |
| def test_children_duplicates(self): |
| # find the process which has the highest number of children |
| table = collections.defaultdict(int) |
| for p in psutil.process_iter(): |
| try: |
| table[p.ppid()] += 1 |
| except psutil.Error: |
| pass |
| # this is the one, now let's make sure there are no duplicates |
| pid = sorted(table.items(), key=lambda x: x[1])[-1][0] |
| p = psutil.Process(pid) |
| try: |
| c = p.children(recursive=True) |
| except psutil.AccessDenied: # windows |
| pass |
| else: |
| self.assertEqual(len(c), len(set(c))) |
| |
| def test_suspend_resume(self): |
| sproc = get_test_subprocess(wait=True) |
| p = psutil.Process(sproc.pid) |
| p.suspend() |
| for x in range(100): |
| if p.status() == psutil.STATUS_STOPPED: |
| break |
| time.sleep(0.01) |
| p.resume() |
| self.assertNotEqual(p.status(), psutil.STATUS_STOPPED) |
| |
| def test_invalid_pid(self): |
| self.assertRaises(TypeError, psutil.Process, "1") |
| self.assertRaises(ValueError, psutil.Process, -1) |
| |
| def test_as_dict(self): |
| p = psutil.Process() |
| d = p.as_dict(attrs=['exe', 'name']) |
| self.assertEqual(sorted(d.keys()), ['exe', 'name']) |
| |
| p = psutil.Process(min(psutil.pids())) |
| d = p.as_dict(attrs=['connections'], ad_value='foo') |
| if not isinstance(d['connections'], list): |
| self.assertEqual(d['connections'], 'foo') |
| |
| def test_halfway_terminated_process(self): |
| # Test that NoSuchProcess exception gets raised in case the |
| # process dies after we create the Process object. |
| # Example: |
| # >>> proc = Process(1234) |
| # >>> time.sleep(2) # time-consuming task, process dies in meantime |
| # >>> proc.name() |
| # Refers to Issue #15 |
| sproc = get_test_subprocess() |
| p = psutil.Process(sproc.pid) |
| p.terminate() |
| p.wait() |
| if WINDOWS: |
| wait_for_pid(p.pid) |
| self.assertFalse(p.is_running()) |
| self.assertFalse(p.pid in psutil.pids()) |
| |
| excluded_names = ['pid', 'is_running', 'wait', 'create_time'] |
| if LINUX and not RLIMIT_SUPPORT: |
| excluded_names.append('rlimit') |
| for name in dir(p): |
| if (name.startswith('_') or |
| name in excluded_names): |
| continue |
| try: |
| meth = getattr(p, name) |
| # get/set methods |
| if name == 'nice': |
| if POSIX: |
| ret = meth(1) |
| else: |
| ret = meth(psutil.NORMAL_PRIORITY_CLASS) |
| elif name == 'ionice': |
| ret = meth() |
| ret = meth(2) |
| elif name == 'rlimit': |
| ret = meth(psutil.RLIMIT_NOFILE) |
| ret = meth(psutil.RLIMIT_NOFILE, (5, 5)) |
| elif name == 'cpu_affinity': |
| ret = meth() |
| ret = meth([0]) |
| elif name == 'send_signal': |
| ret = meth(signal.SIGTERM) |
| else: |
| ret = meth() |
| except psutil.ZombieProcess: |
| self.fail("ZombieProcess for %r was not supposed to happen" % |
| name) |
| except psutil.NoSuchProcess: |
| pass |
| except NotImplementedError: |
| pass |
| else: |
| self.fail( |
| "NoSuchProcess exception not raised for %r, retval=%s" % ( |
| name, ret)) |
| |
| @unittest.skipUnless(POSIX, 'posix only') |
| def test_zombie_process(self): |
| def succeed_or_zombie_p_exc(fun, *args, **kwargs): |
| try: |
| fun(*args, **kwargs) |
| except (psutil.ZombieProcess, psutil.AccessDenied): |
| pass |
| |
| # Note: in this test we'll be creating two sub processes. |
| # Both of them are supposed to be freed / killed by |
| # reap_children() as they are attributable to 'us' |
| # (os.getpid()) via children(recursive=True). |
| src = textwrap.dedent("""\ |
| import os, sys, time, socket, contextlib |
| child_pid = os.fork() |
| if child_pid > 0: |
| time.sleep(3000) |
| else: |
| # this is the zombie process |
| s = socket.socket(socket.AF_UNIX) |
| with contextlib.closing(s): |
| s.connect('%s') |
| if sys.version_info < (3, ): |
| pid = str(os.getpid()) |
| else: |
| pid = bytes(str(os.getpid()), 'ascii') |
| s.sendall(pid) |
| """ % TESTFN) |
| with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock: |
| try: |
| sock.settimeout(GLOBAL_TIMEOUT) |
| sock.bind(TESTFN) |
| sock.listen(1) |
| pyrun(src) |
| conn, _ = sock.accept() |
| select.select([conn.fileno()], [], [], GLOBAL_TIMEOUT) |
| zpid = int(conn.recv(1024)) |
| zproc = psutil.Process(zpid) |
| call_until(lambda: zproc.status(), |
| "ret == psutil.STATUS_ZOMBIE") |
| # A zombie process should always be instantiable |
| zproc = psutil.Process(zpid) |
| # ...and at least its status always be querable |
| self.assertEqual(zproc.status(), psutil.STATUS_ZOMBIE) |
| # ...and it should be considered 'running' |
| self.assertTrue(zproc.is_running()) |
| # ...and as_dict() shouldn't crash |
| zproc.as_dict() |
| if hasattr(zproc, "rlimit"): |
| succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE) |
| succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE, |
| (5, 5)) |
| # set methods |
| succeed_or_zombie_p_exc(zproc.parent) |
| if hasattr(zproc, 'cpu_affinity'): |
| succeed_or_zombie_p_exc(zproc.cpu_affinity, [0]) |
| succeed_or_zombie_p_exc(zproc.nice, 0) |
| if hasattr(zproc, 'ionice'): |
| if LINUX: |
| succeed_or_zombie_p_exc(zproc.ionice, 2, 0) |
| else: |
| succeed_or_zombie_p_exc(zproc.ionice, 0) # Windows |
| if hasattr(zproc, 'rlimit'): |
| succeed_or_zombie_p_exc(zproc.rlimit, |
| psutil.RLIMIT_NOFILE, (5, 5)) |
| succeed_or_zombie_p_exc(zproc.suspend) |
| succeed_or_zombie_p_exc(zproc.resume) |
| succeed_or_zombie_p_exc(zproc.terminate) |
| succeed_or_zombie_p_exc(zproc.kill) |
| |
| # ...its parent should 'see' it |
| # edit: not true on BSD and OSX |
| # descendants = [x.pid for x in psutil.Process().children( |
| # recursive=True)] |
| # self.assertIn(zpid, descendants) |
| # XXX should we also assume ppid be usable? Note: this |
| # would be an important use case as the only way to get |
| # rid of a zombie is to kill its parent. |
| # self.assertEqual(zpid.ppid(), os.getpid()) |
| # ...and all other APIs should be able to deal with it |
| self.assertTrue(psutil.pid_exists(zpid)) |
| self.assertIn(zpid, psutil.pids()) |
| self.assertIn(zpid, [x.pid for x in psutil.process_iter()]) |
| psutil._pmap = {} |
| self.assertIn(zpid, [x.pid for x in psutil.process_iter()]) |
| finally: |
| reap_children(search_all=True) |
| |
| def test_pid_0(self): |
| # Process(0) is supposed to work on all platforms except Linux |
| if 0 not in psutil.pids(): |
| self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0) |
| return |
| |
| p = psutil.Process(0) |
| self.assertTrue(p.name()) |
| |
| if POSIX: |
| try: |
| self.assertEqual(p.uids().real, 0) |
| self.assertEqual(p.gids().real, 0) |
| except psutil.AccessDenied: |
| pass |
| |
| self.assertRaisesRegexp( |
| ValueError, "preventing sending signal to process with PID 0", |
| p.send_signal, signal.SIGTERM) |
| |
| self.assertIn(p.ppid(), (0, 1)) |
| # self.assertEqual(p.exe(), "") |
| p.cmdline() |
| try: |
| p.num_threads() |
| except psutil.AccessDenied: |
| pass |
| |
| try: |
| p.memory_info() |
| except psutil.AccessDenied: |
| pass |
| |
| try: |
| if POSIX: |
| self.assertEqual(p.username(), 'root') |
| elif WINDOWS: |
| self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM') |
| else: |
| p.username() |
| except psutil.AccessDenied: |
| pass |
| |
| self.assertIn(0, psutil.pids()) |
| self.assertTrue(psutil.pid_exists(0)) |
| |
| def test_Popen(self): |
| # Popen class test |
| # XXX this test causes a ResourceWarning on Python 3 because |
| # psutil.__subproc instance doesn't get propertly freed. |
| # Not sure what to do though. |
| cmd = [PYTHON, "-c", "import time; time.sleep(60);"] |
| proc = psutil.Popen(cmd, stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE) |
| try: |
| proc.name() |
| proc.stdin |
| self.assertTrue(hasattr(proc, 'name')) |
| self.assertTrue(hasattr(proc, 'stdin')) |
| self.assertTrue(dir(proc)) |
| self.assertRaises(AttributeError, getattr, proc, 'foo') |
| finally: |
| proc.kill() |
| proc.wait() |
| self.assertIsNotNone(proc.returncode) |
| |
| |
| # =================================================================== |
| # --- Featch all processes test |
| # =================================================================== |
| |
| class TestFetchAllProcesses(unittest.TestCase): |
| """Test which iterates over all running processes and performs |
| some sanity checks against Process API's returned values. |
| """ |
| |
| def setUp(self): |
| if POSIX: |
| import pwd |
| pall = pwd.getpwall() |
| self._uids = set([x.pw_uid for x in pall]) |
| self._usernames = set([x.pw_name for x in pall]) |
| |
| def test_fetch_all(self): |
| valid_procs = 0 |
| excluded_names = set([ |
| 'send_signal', 'suspend', 'resume', 'terminate', 'kill', 'wait', |
| 'as_dict', 'cpu_percent', 'parent', 'children', 'pid']) |
| if LINUX and not RLIMIT_SUPPORT: |
| excluded_names.add('rlimit') |
| attrs = [] |
| for name in dir(psutil.Process): |
| if name.startswith("_"): |
| continue |
| if name in excluded_names: |
| continue |
| attrs.append(name) |
| |
| default = object() |
| failures = [] |
| for name in attrs: |
| for p in psutil.process_iter(): |
| ret = default |
| try: |
| try: |
| args = () |
| attr = getattr(p, name, None) |
| if attr is not None and callable(attr): |
| if name == 'rlimit': |
| args = (psutil.RLIMIT_NOFILE,) |
| ret = attr(*args) |
| else: |
| ret = attr |
| valid_procs += 1 |
| except NotImplementedError: |
| msg = "%r was skipped because not implemented" % ( |
| self.__class__.__name__ + '.test_' + name) |
| warn(msg) |
| except (psutil.NoSuchProcess, psutil.AccessDenied) as err: |
| self.assertEqual(err.pid, p.pid) |
| if err.name: |
| # make sure exception's name attr is set |
| # with the actual process name |
| self.assertEqual(err.name, p.name()) |
| self.assertTrue(str(err)) |
| self.assertTrue(err.msg) |
| else: |
| if ret not in (0, 0.0, [], None, ''): |
| assert ret, ret |
| meth = getattr(self, name) |
| meth(ret) |
| except Exception as err: |
| s = '\n' + '=' * 70 + '\n' |
| s += "FAIL: test_%s (proc=%s" % (name, p) |
| if ret != default: |
| s += ", ret=%s)" % repr(ret) |
| s += ')\n' |
| s += '-' * 70 |
| s += "\n%s" % traceback.format_exc() |
| s = "\n".join((" " * 4) + i for i in s.splitlines()) |
| failures.append(s) |
| break |
| |
| if failures: |
| self.fail(''.join(failures)) |
| |
| # we should always have a non-empty list, not including PID 0 etc. |
| # special cases. |
| self.assertTrue(valid_procs > 0) |
| |
| def cmdline(self, ret): |
| pass |
| |
| def exe(self, ret): |
| if not ret: |
| self.assertEqual(ret, '') |
| else: |
| assert os.path.isabs(ret), ret |
| # Note: os.stat() may return False even if the file is there |
| # hence we skip the test, see: |
| # http://stackoverflow.com/questions/3112546/os-path-exists-lies |
| if POSIX and os.path.isfile(ret): |
| if hasattr(os, 'access') and hasattr(os, "X_OK"): |
| # XXX may fail on OSX |
| self.assertTrue(os.access(ret, os.X_OK)) |
| |
| def ppid(self, ret): |
| self.assertTrue(ret >= 0) |
| |
| def name(self, ret): |
| self.assertIsInstance(ret, (str, unicode)) |
| self.assertTrue(ret) |
| |
| def create_time(self, ret): |
| self.assertTrue(ret > 0) |
| # this can't be taken for granted on all platforms |
| # self.assertGreaterEqual(ret, psutil.boot_time()) |
| # make sure returned value can be pretty printed |
| # with strftime |
| time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret)) |
| |
| def uids(self, ret): |
| for uid in ret: |
| self.assertTrue(uid >= 0) |
| self.assertIn(uid, self._uids) |
| |
| def gids(self, ret): |
| # note: testing all gids as above seems not to be reliable for |
| # gid == 30 (nodoby); not sure why. |
| for gid in ret: |
| self.assertTrue(gid >= 0) |
| # self.assertIn(uid, self.gids |
| |
| def username(self, ret): |
| self.assertTrue(ret) |
| if POSIX: |
| self.assertIn(ret, self._usernames) |
| |
| def status(self, ret): |
| self.assertTrue(ret != "") |
| self.assertTrue(ret != '?') |
| self.assertIn(ret, VALID_PROC_STATUSES) |
| |
| def io_counters(self, ret): |
| for field in ret: |
| if field != -1: |
| self.assertTrue(field >= 0) |
| |
| def ionice(self, ret): |
| if LINUX: |
| self.assertTrue(ret.ioclass >= 0) |
| self.assertTrue(ret.value >= 0) |
| else: |
| self.assertTrue(ret >= 0) |
| self.assertIn(ret, (0, 1, 2)) |
| |
| def num_threads(self, ret): |
| self.assertTrue(ret >= 1) |
| |
| def threads(self, ret): |
| for t in ret: |
| self.assertTrue(t.id >= 0) |
| self.assertTrue(t.user_time >= 0) |
| self.assertTrue(t.system_time >= 0) |
| |
| def cpu_times(self, ret): |
| self.assertTrue(ret.user >= 0) |
| self.assertTrue(ret.system >= 0) |
| |
| def memory_info(self, ret): |
| self.assertTrue(ret.rss >= 0) |
| self.assertTrue(ret.vms >= 0) |
| |
| def memory_info_ex(self, ret): |
| for name in ret._fields: |
| self.assertTrue(getattr(ret, name) >= 0) |
| if POSIX and ret.vms != 0: |
| # VMS is always supposed to be the highest |
| for name in ret._fields: |
| if name != 'vms': |
| value = getattr(ret, name) |
| assert ret.vms > value, ret |
| elif WINDOWS: |
| assert ret.peak_wset >= ret.wset, ret |
| assert ret.peak_paged_pool >= ret.paged_pool, ret |
| assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret |
| assert ret.peak_pagefile >= ret.pagefile, ret |
| |
| def open_files(self, ret): |
| for f in ret: |
| if WINDOWS: |
| assert f.fd == -1, f |
| else: |
| self.assertIsInstance(f.fd, int) |
| assert os.path.isabs(f.path), f |
| assert os.path.isfile(f.path), f |
| |
| def num_fds(self, ret): |
| self.assertTrue(ret >= 0) |
| |
| def connections(self, ret): |
| self.assertEqual(len(ret), len(set(ret))) |
| for conn in ret: |
| check_connection_ntuple(conn) |
| |
| def cwd(self, ret): |
| if ret is not None: # BSD may return None |
| assert os.path.isabs(ret), ret |
| try: |
| st = os.stat(ret) |
| except OSError as err: |
| # directory has been removed in mean time |
| if err.errno != errno.ENOENT: |
| raise |
| else: |
| self.assertTrue(stat.S_ISDIR(st.st_mode)) |
| |
| def memory_percent(self, ret): |
| assert 0 <= ret <= 100, ret |
| |
| def is_running(self, ret): |
| self.assertTrue(ret) |
| |
| def cpu_affinity(self, ret): |
| assert ret != [], ret |
| |
| def terminal(self, ret): |
| if ret is not None: |
| assert os.path.isabs(ret), ret |
| assert os.path.exists(ret), ret |
| |
| def memory_maps(self, ret): |
| for nt in ret: |
| for fname in nt._fields: |
| value = getattr(nt, fname) |
| if fname == 'path': |
| if not value.startswith('['): |
| assert os.path.isabs(nt.path), nt.path |
| # commented as on Linux we might get |
| # '/foo/bar (deleted)' |
| # assert os.path.exists(nt.path), nt.path |
| elif fname in ('addr', 'perms'): |
| self.assertTrue(value) |
| else: |
| self.assertIsInstance(value, (int, long)) |
| assert value >= 0, value |
| |
| def num_handles(self, ret): |
| if WINDOWS: |
| self.assertGreaterEqual(ret, 0) |
| else: |
| self.assertGreaterEqual(ret, 0) |
| |
| def nice(self, ret): |
| if POSIX: |
| assert -20 <= ret <= 20, ret |
| else: |
| priorities = [getattr(psutil, x) for x in dir(psutil) |
| if x.endswith('_PRIORITY_CLASS')] |
| self.assertIn(ret, priorities) |
| |
| def num_ctx_switches(self, ret): |
| self.assertTrue(ret.voluntary >= 0) |
| self.assertTrue(ret.involuntary >= 0) |
| |
| def rlimit(self, ret): |
| self.assertEqual(len(ret), 2) |
| self.assertGreaterEqual(ret[0], -1) |
| self.assertGreaterEqual(ret[1], -1) |
| |
| |
| # =================================================================== |
| # --- Limited user tests |
| # =================================================================== |
| |
| @unittest.skipUnless(POSIX, "UNIX only") |
| @unittest.skipUnless(hasattr(os, 'getuid') and os.getuid() == 0, |
| "super user privileges are required") |
| class LimitedUserTestCase(TestProcess): |
| """Repeat the previous tests by using a limited user. |
| Executed only on UNIX and only if the user who run the test script |
| is root. |
| """ |
| # the uid/gid the test suite runs under |
| if hasattr(os, 'getuid'): |
| PROCESS_UID = os.getuid() |
| PROCESS_GID = os.getgid() |
| |
| def __init__(self, *args, **kwargs): |
| TestProcess.__init__(self, *args, **kwargs) |
| # re-define all existent test methods in order to |
| # ignore AccessDenied exceptions |
| for attr in [x for x in dir(self) if x.startswith('test')]: |
| meth = getattr(self, attr) |
| |
| def test_(self): |
| try: |
| meth() |
| except psutil.AccessDenied: |
| pass |
| setattr(self, attr, types.MethodType(test_, self)) |
| |
| def setUp(self): |
| safe_remove(TESTFN) |
| TestProcess.setUp(self) |
| os.setegid(1000) |
| os.seteuid(1000) |
| |
| def tearDown(self): |
| os.setegid(self.PROCESS_UID) |
| os.seteuid(self.PROCESS_GID) |
| TestProcess.tearDown(self) |
| |
| def test_nice(self): |
| try: |
| psutil.Process().nice(-1) |
| except psutil.AccessDenied: |
| pass |
| else: |
| self.fail("exception not raised") |
| |
| def test_zombie_process(self): |
| # causes problems if test test suite is run as root |
| pass |
| |
| |
| # =================================================================== |
| # --- Misc tests |
| # =================================================================== |
| |
| class TestMisc(unittest.TestCase): |
| """Misc / generic tests.""" |
| |
| def test_process__repr__(self, func=repr): |
| p = psutil.Process() |
| r = func(p) |
| self.assertIn("psutil.Process", r) |
| self.assertIn("pid=%s" % p.pid, r) |
| self.assertIn("name=", r) |
| self.assertIn(p.name(), r) |
| with mock.patch.object(psutil.Process, "name", |
| side_effect=psutil.ZombieProcess(os.getpid())): |
| p = psutil.Process() |
| r = func(p) |
| self.assertIn("pid=%s" % p.pid, r) |
| self.assertIn("zombie", r) |
| self.assertNotIn("name=", r) |
| with mock.patch.object(psutil.Process, "name", |
| side_effect=psutil.NoSuchProcess(os.getpid())): |
| p = psutil.Process() |
| r = func(p) |
| self.assertIn("pid=%s" % p.pid, r) |
| self.assertIn("terminated", r) |
| self.assertNotIn("name=", r) |
| |
| def test_process__str__(self): |
| self.test_process__repr__(func=str) |
| |
| def test_no_such_process__repr__(self, func=repr): |
| self.assertEqual( |
| repr(psutil.NoSuchProcess(321)), |
| "psutil.NoSuchProcess process no longer exists (pid=321)") |
| self.assertEqual( |
| repr(psutil.NoSuchProcess(321, name='foo')), |
| "psutil.NoSuchProcess process no longer exists (pid=321, " |
| "name='foo')") |
| self.assertEqual( |
| repr(psutil.NoSuchProcess(321, msg='foo')), |
| "psutil.NoSuchProcess foo") |
| |
| def test_zombie_process__repr__(self, func=repr): |
| self.assertEqual( |
| repr(psutil.ZombieProcess(321)), |
| "psutil.ZombieProcess process still exists but it's a zombie " |
| "(pid=321)") |
| self.assertEqual( |
| repr(psutil.ZombieProcess(321, name='foo')), |
| "psutil.ZombieProcess process still exists but it's a zombie " |
| "(pid=321, name='foo')") |
| self.assertEqual( |
| repr(psutil.ZombieProcess(321, name='foo', ppid=1)), |
| "psutil.ZombieProcess process still exists but it's a zombie " |
| "(pid=321, name='foo', ppid=1)") |
| self.assertEqual( |
| repr(psutil.ZombieProcess(321, msg='foo')), |
| "psutil.ZombieProcess foo") |
| |
| def test_access_denied__repr__(self, func=repr): |
| self.assertEqual( |
| repr(psutil.AccessDenied(321)), |
| "psutil.AccessDenied (pid=321)") |
| self.assertEqual( |
| repr(psutil.AccessDenied(321, name='foo')), |
| "psutil.AccessDenied (pid=321, name='foo')") |
| self.assertEqual( |
| repr(psutil.AccessDenied(321, msg='foo')), |
| "psutil.AccessDenied foo") |
| |
| def test_timeout_expired__repr__(self, func=repr): |
| self.assertEqual( |
| repr(psutil.TimeoutExpired(321)), |
| "psutil.TimeoutExpired timeout after 321 seconds") |
| self.assertEqual( |
| repr(psutil.TimeoutExpired(321, pid=111)), |
| "psutil.TimeoutExpired timeout after 321 seconds (pid=111)") |
| self.assertEqual( |
| repr(psutil.TimeoutExpired(321, pid=111, name='foo')), |
| "psutil.TimeoutExpired timeout after 321 seconds " |
| "(pid=111, name='foo')") |
| |
| def test_process__eq__(self): |
| p1 = psutil.Process() |
| p2 = psutil.Process() |
| self.assertEqual(p1, p2) |
| p2._ident = (0, 0) |
| self.assertNotEqual(p1, p2) |
| self.assertNotEqual(p1, 'foo') |
| |
| def test_process__hash__(self): |
| s = set([psutil.Process(), psutil.Process()]) |
| self.assertEqual(len(s), 1) |
| |
| def test__all__(self): |
| dir_psutil = dir(psutil) |
| for name in dir_psutil: |
| if name in ('callable', 'error', 'namedtuple', |
| 'long', 'test', 'NUM_CPUS', 'BOOT_TIME', |
| 'TOTAL_PHYMEM'): |
| continue |
| if not name.startswith('_'): |
| try: |
| __import__(name) |
| except ImportError: |
| if name not in psutil.__all__: |
| fun = getattr(psutil, name) |
| if fun is None: |
| continue |
| if (fun.__doc__ is not None and |
| 'deprecated' not in fun.__doc__.lower()): |
| self.fail('%r not in psutil.__all__' % name) |
| |
| # Import 'star' will break if __all__ is inconsistent, see: |
| # https://github.com/giampaolo/psutil/issues/656 |
| # Can't do `from psutil import *` as it won't work on python 3 |
| # so we simply iterate over __all__. |
| for name in psutil.__all__: |
| self.assertIn(name, dir_psutil) |
| |
| def test_version(self): |
| self.assertEqual('.'.join([str(x) for x in psutil.version_info]), |
| psutil.__version__) |
| |
| def test_memoize(self): |
| from psutil._common import memoize |
| |
| @memoize |
| def foo(*args, **kwargs): |
| "foo docstring" |
| calls.append(None) |
| return (args, kwargs) |
| |
| calls = [] |
| # no args |
| for x in range(2): |
| ret = foo() |
| expected = ((), {}) |
| self.assertEqual(ret, expected) |
| self.assertEqual(len(calls), 1) |
| # with args |
| for x in range(2): |
| ret = foo(1) |
| expected = ((1, ), {}) |
| self.assertEqual(ret, expected) |
| self.assertEqual(len(calls), 2) |
| # with args + kwargs |
| for x in range(2): |
| ret = foo(1, bar=2) |
| expected = ((1, ), {'bar': 2}) |
| self.assertEqual(ret, expected) |
| self.assertEqual(len(calls), 3) |
| # clear cache |
| foo.cache_clear() |
| ret = foo() |
| expected = ((), {}) |
| self.assertEqual(ret, expected) |
| self.assertEqual(len(calls), 4) |
| # docstring |
| self.assertEqual(foo.__doc__, "foo docstring") |
| |
| def test_isfile_strict(self): |
| from psutil._common import isfile_strict |
| this_file = os.path.abspath(__file__) |
| assert isfile_strict(this_file) |
| assert not isfile_strict(os.path.dirname(this_file)) |
| with mock.patch('psutil._common.os.stat', |
| side_effect=OSError(errno.EPERM, "foo")): |
| self.assertRaises(OSError, isfile_strict, this_file) |
| with mock.patch('psutil._common.os.stat', |
| side_effect=OSError(errno.EACCES, "foo")): |
| self.assertRaises(OSError, isfile_strict, this_file) |
| with mock.patch('psutil._common.os.stat', |
| side_effect=OSError(errno.EINVAL, "foo")): |
| assert not isfile_strict(this_file) |
| with mock.patch('psutil._common.stat.S_ISREG', return_value=False): |
| assert not isfile_strict(this_file) |
| |
| def test_serialization(self): |
| def check(ret): |
| if json is not None: |
| json.loads(json.dumps(ret)) |
| a = pickle.dumps(ret) |
| b = pickle.loads(a) |
| self.assertEqual(ret, b) |
| |
| check(psutil.Process().as_dict()) |
| check(psutil.virtual_memory()) |
| check(psutil.swap_memory()) |
| check(psutil.cpu_times()) |
| check(psutil.cpu_times_percent(interval=0)) |
| check(psutil.net_io_counters()) |
| if LINUX and not os.path.exists('/proc/diskstats'): |
| pass |
| else: |
| if not APPVEYOR: |
| check(psutil.disk_io_counters()) |
| check(psutil.disk_partitions()) |
| check(psutil.disk_usage(os.getcwd())) |
| check(psutil.users()) |
| |
| def test_setup_script(self): |
| here = os.path.abspath(os.path.dirname(__file__)) |
| setup_py = os.path.realpath(os.path.join(here, '..', 'setup.py')) |
| module = imp.load_source('setup', setup_py) |
| self.assertRaises(SystemExit, module.setup) |
| self.assertEqual(module.get_version(), psutil.__version__) |
| |
| def test_ad_on_process_creation(self): |
| # We are supposed to be able to instantiate Process also in case |
| # of zombie processes or access denied. |
| with mock.patch.object(psutil.Process, 'create_time', |
| side_effect=psutil.AccessDenied) as meth: |
| psutil.Process() |
| assert meth.called |
| with mock.patch.object(psutil.Process, 'create_time', |
| side_effect=psutil.ZombieProcess(1)) as meth: |
| psutil.Process() |
| assert meth.called |
| with mock.patch.object(psutil.Process, 'create_time', |
| side_effect=ValueError) as meth: |
| with self.assertRaises(ValueError): |
| psutil.Process() |
| assert meth.called |
| |
| |
| # =================================================================== |
| # --- Example script tests |
| # =================================================================== |
| |
| class TestExampleScripts(unittest.TestCase): |
| """Tests for scripts in the examples directory.""" |
| |
| def assert_stdout(self, exe, args=None): |
| exe = os.path.join(EXAMPLES_DIR, exe) |
| if args: |
| exe = exe + ' ' + args |
| try: |
| out = sh(sys.executable + ' ' + exe).strip() |
| except RuntimeError as err: |
| if 'AccessDenied' in str(err): |
| return str(err) |
| else: |
| raise |
| assert out, out |
| return out |
| |
| def assert_syntax(self, exe, args=None): |
| exe = os.path.join(EXAMPLES_DIR, exe) |
| with open(exe, 'r') as f: |
| src = f.read() |
| ast.parse(src) |
| |
| def test_check_presence(self): |
| # make sure all example scripts have a test method defined |
| meths = dir(self) |
| for name in os.listdir(EXAMPLES_DIR): |
| if name.endswith('.py'): |
| if 'test_' + os.path.splitext(name)[0] not in meths: |
| # self.assert_stdout(name) |
| self.fail('no test defined for %r script' |
| % os.path.join(EXAMPLES_DIR, name)) |
| |
| def test_disk_usage(self): |
| self.assert_stdout('disk_usage.py') |
| |
| def test_free(self): |
| self.assert_stdout('free.py') |
| |
| def test_meminfo(self): |
| self.assert_stdout('meminfo.py') |
| |
| def test_process_detail(self): |
| self.assert_stdout('process_detail.py') |
| |
| @unittest.skipIf(APPVEYOR, "can't find users on Appveyor") |
| def test_who(self): |
| self.assert_stdout('who.py') |
| |
| def test_ps(self): |
| self.assert_stdout('ps.py') |
| |
| def test_pstree(self): |
| self.assert_stdout('pstree.py') |
| |
| def test_netstat(self): |
| self.assert_stdout('netstat.py') |
| |
| @unittest.skipIf(TRAVIS, "permission denied on travis") |
| def test_ifconfig(self): |
| self.assert_stdout('ifconfig.py') |
| |
| def test_pmap(self): |
| self.assert_stdout('pmap.py', args=str(os.getpid())) |
| |
| @unittest.skipIf(ast is None, |
| 'ast module not available on this python version') |
| def test_killall(self): |
| self.assert_syntax('killall.py') |
| |
| @unittest.skipIf(ast is None, |
| 'ast module not available on this python version') |
| def test_nettop(self): |
| self.assert_syntax('nettop.py') |
| |
| @unittest.skipIf(ast is None, |
| 'ast module not available on this python version') |
| def test_top(self): |
| self.assert_syntax('top.py') |
| |
| @unittest.skipIf(ast is None, |
| 'ast module not available on this python version') |
| def test_iotop(self): |
| self.assert_syntax('iotop.py') |
| |
| def test_pidof(self): |
| output = self.assert_stdout('pidof.py %s' % psutil.Process().name()) |
| self.assertIn(str(os.getpid()), output) |
| |
| |
| def main(): |
| tests = [] |
| test_suite = unittest.TestSuite() |
| tests.append(TestSystemAPIs) |
| tests.append(TestProcess) |
| tests.append(TestFetchAllProcesses) |
| tests.append(TestMisc) |
| tests.append(TestExampleScripts) |
| tests.append(LimitedUserTestCase) |
| |
| if POSIX: |
| from _posix import PosixSpecificTestCase |
| tests.append(PosixSpecificTestCase) |
| |
| # import the specific platform test suite |
| stc = None |
| if LINUX: |
| from _linux import LinuxSpecificTestCase as stc |
| elif WINDOWS: |
| from _windows import WindowsSpecificTestCase as stc |
| from _windows import TestDualProcessImplementation |
| tests.append(TestDualProcessImplementation) |
| elif OSX: |
| from _osx import OSXSpecificTestCase as stc |
| elif BSD: |
| from _bsd import BSDSpecificTestCase as stc |
| elif SUNOS: |
| from _sunos import SunOSSpecificTestCase as stc |
| if stc is not None: |
| tests.append(stc) |
| |
| for test_class in tests: |
| test_suite.addTest(unittest.makeSuite(test_class)) |
| result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
| return result.wasSuccessful() |
| |
| if __name__ == '__main__': |
| if not main(): |
| sys.exit(1) |