# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at
import hashlib
import socket
import os
import re
import StringIO
class FileError(Exception):
" Signifies an error which occurs while doing a file operation."
def __init__(self, msg=''):
self.msg = msg
def __str__(self):
return self.msg
class DMError(Exception):
"generic devicemanager exception."
def __init__(self, msg=''):
self.msg = msg
def __str__(self):
return self.msg
def abstractmethod(method):
line = method.func_code.co_firstlineno
filename = method.func_code.co_filename
def not_implemented(*args, **kwargs):
raise NotImplementedError('Abstract method %s at File "%s", line %s '
'should be implemented by a concrete class' %
(repr(method), filename, line))
return not_implemented
class DeviceManager:
def shell(self, cmd, outputfile, env=None, cwd=None, timeout=None, root=False):
Executes shell command on device.
cmd - Command string to execute
outputfile - File to store output
env - Environment to pass to exec command
cwd - Directory to execute command from
timeout - specified in seconds, defaults to 'default_timeout'
root - Specifies whether command requires root privileges
success: Return code from command
failure: None
def shellCheckOutput(self, cmd, env=None, cwd=None, timeout=None, root=False):
executes shell command on device (with root privileges if
specified) and returns the the output
timeout is specified in seconds, and if no timeout is given,
we will run until the script returns
success: Returns output of shell command
failure: DMError will be raised
buf = StringIO.StringIO()
retval =
cmd, buf, env=env, cwd=cwd, timeout=timeout, root=root)
output = str(buf.getvalue()[0:-1]).rstrip()
if retval is None:
raise DMError("Did not successfully run command %s (output: '%s', retval: 'None')" % (cmd, output))
if retval != 0:
raise DMError("Non-zero return code for command: %s (output: '%s', retval: '%i')" % (cmd, output, retval))
return output
def pushFile(self, localname, destname):
Copies localname from the host to destname on the device
success: True
failure: False
def mkDir(self, name):
Creates a single directory on the device file system
success: directory name
failure: None
def mkDirs(self, filename):
Make directory structure on the device.
WARNING: does not create last part of the path. For example, if asked to
create `/mnt/sdcard/foo/bar/baz`, it will only create `/mnt/sdcard/foo/bar`
dirParts = filename.rsplit('/', 1)
if not self.dirExists(dirParts[0]):
parts = filename.split('/')
name = ""
for i, part in enumerate(parts):
if i is len(parts)-1:
if part != "":
name += '/' + part
self.mkDir(name) # mkDir will check previous existence
def pushDir(self, localDir, remoteDir):
Push localDir from host to remoteDir on the device
success: remoteDir
failure: None
def dirExists(self, dirname):
Checks if dirname exists and is a directory
on the device file system
success: True
failure: False
def fileExists(self, filepath):
Checks if filepath exists and is a file on
the device file system
success: True
failure: False
def listFiles(self, rootdir):
Lists files on the device rootdir
success: array of filenames, ['file1', 'file2', ...]
failure: None
def removeFile(self, filename):
Removes filename from the device
success: output of telnet
failure: None
def removeDir(self, remoteDir):
Does a recursive delete of directory on the device: rm -Rf remoteDir
success: output of telnet
failure: None
def getProcessList(self):
Lists the running processes on the device
success: array of process tuples
failure: []
def processExist(self, appname):
Iterates process list and checks if pid exists
success: pid
failure: None
pid = None
# filter out extra spaces
parts = filter(lambda x: x != '', appname.split(' '))
appname = ' '.join(parts)
# filter out the quoted env string if it exists
# ex: '"name=value;name2=value2;etc=..." process args' -> 'process
# args'
parts = appname.split('"')
if (len(parts) > 2):
appname = ' '.join(parts[2:]).strip()
pieces = appname.split(' ')
parts = pieces[0].split('/')
app = parts[-1]
procList = self.getProcessList()
if (procList == []):
return None
for proc in procList:
procName = proc[1].split('/')[-1]
if (procName == app):
pid = proc[0]
return pid
def killProcess(self, appname, forceKill=False):
Kills the process named appname.
If forceKill is True, process is killed regardless of state
success: True
failure: False
def catFile(self, remoteFile):
Returns the contents of remoteFile
success: filecontents, string
failure: None
def pullFile(self, remoteFile):
Returns contents of remoteFile using the "pull" command.
success: output of pullfile, string
failure: None
def getFile(self, remoteFile, localFile=''):
Copy file from device (remoteFile) to host (localFile)
success: contents of file, string
failure: None
def getDirectory(self, remoteDir, localDir, checkDir=True):
Copy directory structure from device (remoteDir) to host (localDir)
success: list of files, string
failure: None
def isDir(self, remotePath):
Checks if remotePath is a directory on the device
success: True
failure: False
def validateFile(self, remoteFile, localFile):
Checks if the remoteFile has the same md5 hash as the localFile
success: True
failure: False
def _getRemoteHash(self, filename):
Return the md5 sum of a file on the device
success: MD5 hash for given filename
failure: None
def _getLocalHash(filename):
Return the MD5 sum of a file on the host
success: MD5 hash for given filename
failure: None
f = open(filename, 'rb')
if (f == None):
return None
mdsum = hashlib.md5()
return None
while 1:
data =
if not data:
hexval = mdsum.hexdigest()
return hexval
def getDeviceRoot(self):
Gets the device root for the testing area on the device
For all devices we will use / type slashes and depend on the device-agent
to sort those out. The agent will return us the device location where we
should store things, we will then create our /tests structure relative to
that returned path.
Structure on the device is as follows:
/<fennec>|<firefox> --> approot
success: path for device root
failure: None
def getAppRoot(self, packageName=None):
Returns the app root directory
E.g /tests/fennec or /tests/firefox
success: path for app root
failure: None
# TODO Support org.mozilla.firefox and B2G
def getTestRoot(self, harness):
Gets the directory location on the device for a specific test type
Harness is one of: xpcshell|reftest|mochitest
success: path for test root
failure: None
devroot = self.getDeviceRoot()
if (devroot == None):
return None
if ('xpcshell', harness, re.I)):
self.testRoot = devroot + '/xpcshell'
elif ('?(i)reftest', harness)):
self.testRoot = devroot + '/reftest'
elif ('?(i)mochitest', harness)):
self.testRoot = devroot + '/mochitest'
return self.testRoot
def getTempDir(self):
Gets the temporary directory we are using on this device
base on our device root, ensuring also that it exists.
success: path for temporary directory
failure: None
def signal(self, processID, signalType, signalAction):
Sends a specific process ID a signal code and action.
For Example: SIGINT and SIGDFL to process x
# currently not implemented in device agent - todo
def getReturnCode(self, processID):
"""Get a return code from process ending -- needs support on device-agent"""
# TODO: make this real
return 0
def unpackFile(self, file_path, dest_dir=None):
Unzips a remote bundle to a remote location
If dest_dir is not specified, the bundle is extracted
in the same directory
success: output of unzip command
failure: None
def reboot(self, ipAddr=None, port=30000):
Reboots the device
success: status from test agent
failure: None
def validateDir(self, localDir, remoteDir):
Validate localDir from host to remoteDir on the device
success: True
failure: False
if (self.debug >= 2):
print "validating directory: " + localDir + " to " + remoteDir
for root, dirs, files in os.walk(localDir):
parts = root.split(localDir)
for f in files:
remoteRoot = remoteDir + '/' + parts[1]
remoteRoot = remoteRoot.replace('/', '/')
if (parts[1] == ""):
remoteRoot = remoteDir
remoteName = remoteRoot + '/' + f
if (self.validateFile(remoteName, os.path.join(root, f)) != True):
return False
return True
def getInfo(self, directive=None):
Returns information about the device:
Directive indicates the information you want to get, your choices are:
os - name of the os
id - unique id of the device
uptime - uptime of the device
uptimemillis - uptime of the device in milliseconds (NOT supported on all implementations)
systime - system time of the device
screen - screen resolution
memory - memory stats
process - list of running processes (same as ps)
disk - total, free, available bytes on disk
power - power status (charge, battery temp)
all - all of them - or call it with no parameters to get all the information
success: dict of info strings by directive name
failure: None
def installApp(self, appBundlePath, destPath=None):
Installs an application onto the device
appBundlePath - path to the application bundle on the device
destPath - destination directory of where application should be installed to (optional)
success: None
failure: error string
def uninstallApp(self, appName, installPath=None):
Uninstalls the named application from device and DOES NOT cause a reboot
appName - the name of the application (e.g org.mozilla.fennec)
installPath - the path to where the application was installed (optional)
success: None
failure: DMError exception thrown
def uninstallAppAndReboot(self, appName, installPath=None):
Uninstalls the named application from device and causes a reboot
appName - the name of the application (e.g org.mozilla.fennec)
installPath - the path to where the application was installed (optional)
success: None
failure: DMError exception thrown
def updateApp(self, appBundlePath, processName=None, destPath=None, ipAddr=None, port=30000):
Updates the application on the device.
appBundlePath - path to the application bundle on the device
processName - used to end the process if the applicaiton is currently running (optional)
destPath - Destination directory to where the application should be installed (optional)
ipAddr - IP address to await a callback ping to let us know that the device has updated
properly - defaults to current IP.
port - port to await a callback ping to let us know that the device has updated properly
defaults to 30000, and counts up from there if it finds a conflict
success: text status from command or callback server
failure: None
def getCurrentTime(self):
Returns device time in milliseconds since the epoch
success: time in ms
failure: None
def recordLogcat(self):
Clears the logcat file making it easier to view specific events
# TODO: spawn this off in a separate thread/process so we can collect
# all the logcat information
# Right now this is just clearing the logcat so we can only see what
# happens after this call.
buf = StringIO.StringIO()['/system/bin/logcat', '-c'], buf, root=True)
def getLogcat(self):
Returns the contents of the logcat file as a string
success: contents of logcat, string
failure: None
buf = StringIO.StringIO()
if["/system/bin/logcat", "-d", "dalvikvm:S", "ConnectivityService:S", "WifiMonitor:S", "WifiStateTracker:S", "wpa_supplicant:S", "NetworkStateTracker:S"], buf, root=True) != 0:
return None
return str(buf.getvalue()[0:-1]).rstrip().split('\r')
def chmodDir(self, remoteDir, mask="777"):
Recursively changes file permissions in a directory
success: True
failure: False
def _escapedCommandLine(cmd):
""" Utility function to return escaped and quoted version of command line """
quotedCmd = []
for arg in cmd:
arg.replace('&', '\&')
needsQuoting = False
for char in [' ', '(', ')', '"', '&']:
if arg.find(char) >= 0:
needsQuoting = True
if needsQuoting:
arg = '\'%s\'' % arg
return " ".join(quotedCmd)
class NetworkTools:
def __init__(self):
# Utilities to get the local ip address
def getInterfaceIp(self, ifname):
if != "nt":
import fcntl
import struct
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
struct.pack('256s', ifname[:15])
return None
def getLanIp(self):
ip = socket.gethostbyname(socket.gethostname())
except socket.gaierror:
ip = socket.gethostbyname(
socket.gethostname() + ".local") # for Mac OS X
if ip.startswith("127.") and != "nt":
interfaces = ["eth0", "eth1", "eth2", "wlan0",
"wlan1", "wifi0", "ath0", "ath1", "ppp0"]
for ifname in interfaces:
ip = self.getInterfaceIp(ifname)
except IOError:
return ip
# Gets an open port starting with the seed by incrementing by 1 each time
def findOpenPort(self, ip, seed):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
connected = False
if isinstance(seed, basestring):
seed = int(seed)
maxportnum = seed + \
5000 # We will try at most 5000 ports to find an open one
while not connected:
s.bind((ip, seed))
connected = True
if seed > maxportnum:
print "Automation Error: Could not find open port after checking 5000 ports"
seed += 1
print "Automation Error: Socket error trying to find open port"
return seed
def _pop_last_line(file_obj):
Utility function to get the last line from a file (shared between ADB and
SUT device managers). Function also removes it from the file. Intended to
strip off the return code from a shell command.
bytes_from_end = 1, 2)
length = file_obj.tell() + 1
while bytes_from_end < length: * bytes_from_end, 2)
data =
if bytes_from_end == length - 1 and len(data) == 0: # no data, return None
return None
if data[0] == '\n' or bytes_from_end == length - 1:
# found the last line, which should have the return value
if data[0] == '\n':
data = data[1:]
# truncate off the return code line
file_obj.truncate(length - bytes_from_end), 2)
return data
bytes_from_end += 1
return None