| #!/usr/bin/env python |
| |
| # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Linux specific tests. These are implicitly run by test_psutil.py.""" |
| |
| from __future__ import division |
| import contextlib |
| import errno |
| import fcntl |
| import io |
| import os |
| import pprint |
| import re |
| import socket |
| import struct |
| import sys |
| import tempfile |
| import time |
| import warnings |
| |
| try: |
| from unittest import mock # py3 |
| except ImportError: |
| import mock # requires "pip install mock" |
| |
| from test_psutil import POSIX, TOLERANCE, TRAVIS, LINUX |
| from test_psutil import (skip_on_not_implemented, sh, get_test_subprocess, |
| retry_before_failing, get_kernel_version, unittest, |
| which, call_until) |
| |
| import psutil |
| import psutil._pslinux |
| from psutil._compat import PY3, u |
| |
| |
| SIOCGIFADDR = 0x8915 |
| SIOCGIFCONF = 0x8912 |
| SIOCGIFHWADDR = 0x8927 |
| |
| |
| def get_ipv4_address(ifname): |
| ifname = ifname[:15] |
| if PY3: |
| ifname = bytes(ifname, 'ascii') |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| with contextlib.closing(s): |
| return socket.inet_ntoa( |
| fcntl.ioctl(s.fileno(), |
| SIOCGIFADDR, |
| struct.pack('256s', ifname))[20:24]) |
| |
| |
| def get_mac_address(ifname): |
| ifname = ifname[:15] |
| if PY3: |
| ifname = bytes(ifname, 'ascii') |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| with contextlib.closing(s): |
| info = fcntl.ioctl( |
| s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname)) |
| if PY3: |
| def ord(x): |
| return x |
| else: |
| import __builtin__ |
| ord = __builtin__.ord |
| return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1] |
| |
| |
| @unittest.skipUnless(LINUX, "not a Linux system") |
| class LinuxSpecificTestCase(unittest.TestCase): |
| |
| @unittest.skipIf( |
| POSIX and not hasattr(os, 'statvfs'), |
| reason="os.statvfs() function not available on this platform") |
| @skip_on_not_implemented() |
| def test_disks(self): |
| # test psutil.disk_usage() and psutil.disk_partitions() |
| # against "df -a" |
| def df(path): |
| out = sh('df -P -B 1 "%s"' % path).strip() |
| lines = out.split('\n') |
| lines.pop(0) |
| line = lines.pop(0) |
| dev, total, used, free = line.split()[:4] |
| if dev == 'none': |
| dev = '' |
| total, used, free = int(total), int(used), int(free) |
| return dev, total, used, free |
| |
| for part in psutil.disk_partitions(all=False): |
| usage = psutil.disk_usage(part.mountpoint) |
| dev, total, used, free = df(part.mountpoint) |
| self.assertEqual(part.device, dev) |
| self.assertEqual(usage.total, total) |
| # 10 MB tollerance |
| if abs(usage.free - free) > 10 * 1024 * 1024: |
| self.fail("psutil=%s, df=%s" % (usage.free, free)) |
| if abs(usage.used - used) > 10 * 1024 * 1024: |
| self.fail("psutil=%s, df=%s" % (usage.used, used)) |
| |
| def test_memory_maps(self): |
| sproc = get_test_subprocess() |
| time.sleep(1) |
| p = psutil.Process(sproc.pid) |
| maps = p.memory_maps(grouped=False) |
| pmap = sh('pmap -x %s' % p.pid).split('\n') |
| # get rid of header |
| del pmap[0] |
| del pmap[0] |
| while maps and pmap: |
| this = maps.pop(0) |
| other = pmap.pop(0) |
| addr, _, rss, dirty, mode, path = other.split(None, 5) |
| if not path.startswith('[') and not path.endswith(']'): |
| self.assertEqual(path, os.path.basename(this.path)) |
| self.assertEqual(int(rss) * 1024, this.rss) |
| # test only rwx chars, ignore 's' and 'p' |
| self.assertEqual(mode[:3], this.perms[:3]) |
| |
| def test_vmem_total(self): |
| lines = sh('free').split('\n')[1:] |
| total = int(lines[0].split()[1]) * 1024 |
| self.assertEqual(total, psutil.virtual_memory().total) |
| |
| @retry_before_failing() |
| def test_vmem_used(self): |
| lines = sh('free').split('\n')[1:] |
| used = int(lines[0].split()[2]) * 1024 |
| self.assertAlmostEqual(used, psutil.virtual_memory().used, |
| delta=TOLERANCE) |
| |
| @retry_before_failing() |
| def test_vmem_free(self): |
| lines = sh('free').split('\n')[1:] |
| free = int(lines[0].split()[3]) * 1024 |
| self.assertAlmostEqual(free, psutil.virtual_memory().free, |
| delta=TOLERANCE) |
| |
| @retry_before_failing() |
| def test_vmem_buffers(self): |
| lines = sh('free').split('\n')[1:] |
| buffers = int(lines[0].split()[5]) * 1024 |
| self.assertAlmostEqual(buffers, psutil.virtual_memory().buffers, |
| delta=TOLERANCE) |
| |
| @retry_before_failing() |
| def test_vmem_cached(self): |
| lines = sh('free').split('\n')[1:] |
| cached = int(lines[0].split()[6]) * 1024 |
| self.assertAlmostEqual(cached, psutil.virtual_memory().cached, |
| delta=TOLERANCE) |
| |
| def test_swapmem_total(self): |
| lines = sh('free').split('\n')[1:] |
| total = int(lines[2].split()[1]) * 1024 |
| self.assertEqual(total, psutil.swap_memory().total) |
| |
| @retry_before_failing() |
| def test_swapmem_used(self): |
| lines = sh('free').split('\n')[1:] |
| used = int(lines[2].split()[2]) * 1024 |
| self.assertAlmostEqual(used, psutil.swap_memory().used, |
| delta=TOLERANCE) |
| |
| @retry_before_failing() |
| def test_swapmem_free(self): |
| lines = sh('free').split('\n')[1:] |
| free = int(lines[2].split()[3]) * 1024 |
| self.assertAlmostEqual(free, psutil.swap_memory().free, |
| delta=TOLERANCE) |
| |
| @unittest.skipIf(TRAVIS, "unknown failure on travis") |
| def test_cpu_times(self): |
| fields = psutil.cpu_times()._fields |
| kernel_ver = re.findall('\d+\.\d+\.\d+', os.uname()[2])[0] |
| kernel_ver_info = tuple(map(int, kernel_ver.split('.'))) |
| if kernel_ver_info >= (2, 6, 11): |
| self.assertIn('steal', fields) |
| else: |
| self.assertNotIn('steal', fields) |
| if kernel_ver_info >= (2, 6, 24): |
| self.assertIn('guest', fields) |
| else: |
| self.assertNotIn('guest', fields) |
| if kernel_ver_info >= (3, 2, 0): |
| self.assertIn('guest_nice', fields) |
| else: |
| self.assertNotIn('guest_nice', fields) |
| |
| def test_net_if_addrs_ips(self): |
| for name, addrs in psutil.net_if_addrs().items(): |
| for addr in addrs: |
| if addr.family == psutil.AF_LINK: |
| self.assertEqual(addr.address, get_mac_address(name)) |
| elif addr.family == socket.AF_INET: |
| self.assertEqual(addr.address, get_ipv4_address(name)) |
| # TODO: test for AF_INET6 family |
| |
| @unittest.skipUnless(which('ip'), "'ip' utility not available") |
| @unittest.skipIf(TRAVIS, "skipped on Travis") |
| def test_net_if_names(self): |
| out = sh("ip addr").strip() |
| nics = psutil.net_if_addrs() |
| found = 0 |
| for line in out.split('\n'): |
| line = line.strip() |
| if re.search("^\d+:", line): |
| found += 1 |
| name = line.split(':')[1].strip() |
| self.assertIn(name, nics.keys()) |
| self.assertEqual(len(nics), found, msg="%s\n---\n%s" % ( |
| pprint.pformat(nics), out)) |
| |
| @unittest.skipUnless(which("nproc"), "nproc utility not available") |
| def test_cpu_count_logical_w_nproc(self): |
| num = int(sh("nproc --all")) |
| self.assertEqual(psutil.cpu_count(logical=True), num) |
| |
| @unittest.skipUnless(which("lscpu"), "lscpu utility not available") |
| def test_cpu_count_logical_w_lscpu(self): |
| out = sh("lscpu -p") |
| num = len([x for x in out.split('\n') if not x.startswith('#')]) |
| self.assertEqual(psutil.cpu_count(logical=True), num) |
| |
| # --- mocked tests |
| |
| def test_virtual_memory_mocked_warnings(self): |
| with mock.patch('psutil._pslinux.open', create=True) as m: |
| with warnings.catch_warnings(record=True) as ws: |
| warnings.simplefilter("always") |
| ret = psutil._pslinux.virtual_memory() |
| assert m.called |
| self.assertEqual(len(ws), 1) |
| w = ws[0] |
| self.assertTrue(w.filename.endswith('psutil/_pslinux.py')) |
| self.assertIn( |
| "'cached', 'active' and 'inactive' memory stats couldn't " |
| "be determined", str(w.message)) |
| self.assertEqual(ret.cached, 0) |
| self.assertEqual(ret.active, 0) |
| self.assertEqual(ret.inactive, 0) |
| |
| def test_swap_memory_mocked_warnings(self): |
| with mock.patch('psutil._pslinux.open', create=True) as m: |
| with warnings.catch_warnings(record=True) as ws: |
| warnings.simplefilter("always") |
| ret = psutil._pslinux.swap_memory() |
| assert m.called |
| self.assertEqual(len(ws), 1) |
| w = ws[0] |
| self.assertTrue(w.filename.endswith('psutil/_pslinux.py')) |
| self.assertIn( |
| "'sin' and 'sout' swap memory stats couldn't " |
| "be determined", str(w.message)) |
| self.assertEqual(ret.sin, 0) |
| self.assertEqual(ret.sout, 0) |
| |
| def test_cpu_count_logical_mocked(self): |
| import psutil._pslinux |
| original = psutil._pslinux.cpu_count_logical() |
| # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in |
| # order to cause the parsing of /proc/cpuinfo and /proc/stat. |
| with mock.patch( |
| 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m: |
| self.assertEqual(psutil._pslinux.cpu_count_logical(), original) |
| assert m.called |
| |
| # Let's have open() return emtpy data and make sure None is |
| # returned ('cause we mimick os.cpu_count()). |
| with mock.patch('psutil._pslinux.open', create=True) as m: |
| self.assertIsNone(psutil._pslinux.cpu_count_logical()) |
| self.assertEqual(m.call_count, 2) |
| # /proc/stat should be the last one |
| self.assertEqual(m.call_args[0][0], '/proc/stat') |
| |
| # Let's push this a bit further and make sure /proc/cpuinfo |
| # parsing works as expected. |
| with open('/proc/cpuinfo', 'rb') as f: |
| cpuinfo_data = f.read() |
| fake_file = io.BytesIO(cpuinfo_data) |
| with mock.patch('psutil._pslinux.open', |
| return_value=fake_file, create=True) as m: |
| self.assertEqual(psutil._pslinux.cpu_count_logical(), original) |
| |
| def test_cpu_count_physical_mocked(self): |
| # Have open() return emtpy data and make sure None is returned |
| # ('cause we want to mimick os.cpu_count()) |
| with mock.patch('psutil._pslinux.open', create=True) as m: |
| self.assertIsNone(psutil._pslinux.cpu_count_physical()) |
| assert m.called |
| |
| def test_proc_open_files_file_gone(self): |
| # simulates a file which gets deleted during open_files() |
| # execution |
| p = psutil.Process() |
| files = p.open_files() |
| with tempfile.NamedTemporaryFile(): |
| # give the kernel some time to see the new file |
| call_until(p.open_files, "len(ret) != %i" % len(files)) |
| with mock.patch('psutil._pslinux.os.readlink', |
| side_effect=OSError(errno.ENOENT, "")) as m: |
| files = p.open_files() |
| assert not files |
| assert m.called |
| # also simulate the case where os.readlink() returns EINVAL |
| # in which case psutil is supposed to 'continue' |
| with mock.patch('psutil._pslinux.os.readlink', |
| side_effect=OSError(errno.EINVAL, "")) as m: |
| self.assertEqual(p.open_files(), []) |
| assert m.called |
| |
| def test_proc_terminal_mocked(self): |
| with mock.patch('psutil._pslinux._psposix._get_terminal_map', |
| return_value={}) as m: |
| self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal()) |
| assert m.called |
| |
| def test_proc_num_ctx_switches_mocked(self): |
| with mock.patch('psutil._pslinux.open', create=True) as m: |
| self.assertRaises( |
| NotImplementedError, |
| psutil._pslinux.Process(os.getpid()).num_ctx_switches) |
| assert m.called |
| |
| def test_proc_num_threads_mocked(self): |
| with mock.patch('psutil._pslinux.open', create=True) as m: |
| self.assertRaises( |
| NotImplementedError, |
| psutil._pslinux.Process(os.getpid()).num_threads) |
| assert m.called |
| |
| def test_proc_ppid_mocked(self): |
| with mock.patch('psutil._pslinux.open', create=True) as m: |
| self.assertRaises( |
| NotImplementedError, |
| psutil._pslinux.Process(os.getpid()).ppid) |
| assert m.called |
| |
| def test_proc_uids_mocked(self): |
| with mock.patch('psutil._pslinux.open', create=True) as m: |
| self.assertRaises( |
| NotImplementedError, |
| psutil._pslinux.Process(os.getpid()).uids) |
| assert m.called |
| |
| def test_proc_gids_mocked(self): |
| with mock.patch('psutil._pslinux.open', create=True) as m: |
| self.assertRaises( |
| NotImplementedError, |
| psutil._pslinux.Process(os.getpid()).gids) |
| assert m.called |
| |
| def test_proc_cmdline_mocked(self): |
| # see: https://github.com/giampaolo/psutil/issues/639 |
| p = psutil.Process() |
| fake_file = io.StringIO(u('foo\x00bar\x00')) |
| with mock.patch('psutil._pslinux.open', |
| return_value=fake_file, create=True) as m: |
| p.cmdline() == ['foo', 'bar'] |
| assert m.called |
| fake_file = io.StringIO(u('foo\x00bar\x00\x00')) |
| with mock.patch('psutil._pslinux.open', |
| return_value=fake_file, create=True) as m: |
| p.cmdline() == ['foo', 'bar', ''] |
| assert m.called |
| |
| def test_proc_io_counters_mocked(self): |
| with mock.patch('psutil._pslinux.open', create=True) as m: |
| self.assertRaises( |
| NotImplementedError, |
| psutil._pslinux.Process(os.getpid()).io_counters) |
| assert m.called |
| |
| def test_boot_time_mocked(self): |
| with mock.patch('psutil._pslinux.open', create=True) as m: |
| self.assertRaises( |
| RuntimeError, |
| psutil._pslinux.boot_time) |
| assert m.called |
| |
| def test_users_mocked(self): |
| # Make sure ':0' and ':0.0' (returned by C ext) are converted |
| # to 'localhost'. |
| with mock.patch('psutil._pslinux.cext.users', |
| return_value=[('giampaolo', 'pts/2', ':0', |
| 1436573184.0, True)]) as m: |
| self.assertEqual(psutil.users()[0].host, 'localhost') |
| assert m.called |
| with mock.patch('psutil._pslinux.cext.users', |
| return_value=[('giampaolo', 'pts/2', ':0.0', |
| 1436573184.0, True)]) as m: |
| self.assertEqual(psutil.users()[0].host, 'localhost') |
| assert m.called |
| # ...otherwise it should be returned as-is |
| with mock.patch('psutil._pslinux.cext.users', |
| return_value=[('giampaolo', 'pts/2', 'foo', |
| 1436573184.0, True)]) as m: |
| self.assertEqual(psutil.users()[0].host, 'foo') |
| assert m.called |
| |
| def test_disk_partitions_mocked(self): |
| # Test that ZFS partitions are returned. |
| with open("/proc/filesystems", "r") as f: |
| data = f.read() |
| if 'zfs' in data: |
| for part in psutil.disk_partitions(): |
| if part.fstype == 'zfs': |
| break |
| else: |
| self.fail("couldn't find any ZFS partition") |
| else: |
| # No ZFS partitions on this system. Let's fake one. |
| fake_file = io.StringIO(u("nodev\tzfs\n")) |
| with mock.patch('psutil._pslinux.open', |
| return_value=fake_file, create=True) as m1: |
| with mock.patch( |
| 'psutil._pslinux.cext.disk_partitions', |
| return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2: |
| ret = psutil.disk_partitions() |
| assert m1.called |
| assert m2.called |
| assert ret |
| self.assertEqual(ret[0].fstype, 'zfs') |
| |
| # --- tests for specific kernel versions |
| |
| @unittest.skipUnless( |
| get_kernel_version() >= (2, 6, 36), |
| "prlimit() not available on this Linux kernel version") |
| def test_prlimit_availability(self): |
| # prlimit() should be available starting from kernel 2.6.36 |
| p = psutil.Process(os.getpid()) |
| p.rlimit(psutil.RLIMIT_NOFILE) |
| # if prlimit() is supported *at least* these constants should |
| # be available |
| self.assertTrue(hasattr(psutil, "RLIM_INFINITY")) |
| self.assertTrue(hasattr(psutil, "RLIMIT_AS")) |
| self.assertTrue(hasattr(psutil, "RLIMIT_CORE")) |
| self.assertTrue(hasattr(psutil, "RLIMIT_CPU")) |
| self.assertTrue(hasattr(psutil, "RLIMIT_DATA")) |
| self.assertTrue(hasattr(psutil, "RLIMIT_FSIZE")) |
| self.assertTrue(hasattr(psutil, "RLIMIT_LOCKS")) |
| self.assertTrue(hasattr(psutil, "RLIMIT_MEMLOCK")) |
| self.assertTrue(hasattr(psutil, "RLIMIT_NOFILE")) |
| self.assertTrue(hasattr(psutil, "RLIMIT_NPROC")) |
| self.assertTrue(hasattr(psutil, "RLIMIT_RSS")) |
| self.assertTrue(hasattr(psutil, "RLIMIT_STACK")) |
| |
| @unittest.skipUnless( |
| get_kernel_version() >= (3, 0), |
| "prlimit constants not available on this Linux kernel version") |
| def test_resource_consts_kernel_v(self): |
| # more recent constants |
| self.assertTrue(hasattr(psutil, "RLIMIT_MSGQUEUE")) |
| self.assertTrue(hasattr(psutil, "RLIMIT_NICE")) |
| self.assertTrue(hasattr(psutil, "RLIMIT_RTPRIO")) |
| self.assertTrue(hasattr(psutil, "RLIMIT_RTTIME")) |
| self.assertTrue(hasattr(psutil, "RLIMIT_SIGPENDING")) |
| |
| |
| def main(): |
| test_suite = unittest.TestSuite() |
| test_suite.addTest(unittest.makeSuite(LinuxSpecificTestCase)) |
| result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
| return result.wasSuccessful() |
| |
| if __name__ == '__main__': |
| if not main(): |
| sys.exit(1) |