| """This class extends pexpect.spawn to specialize setting up SSH connections. |
| This adds methods for login, logout, and expecting the shell prompt. |
| |
| $Id: pxssh.py 513 2008-02-09 18:26:13Z noah $ |
| """ |
| |
| from pexpect import * |
| import pexpect |
| import time |
| |
| __all__ = ['ExceptionPxssh', 'pxssh'] |
| |
| # Exception classes used by this module. |
| |
| |
| class ExceptionPxssh(ExceptionPexpect): |
| """Raised for pxssh exceptions. |
| """ |
| |
| |
| class pxssh (spawn): |
| |
| """This class extends pexpect.spawn to specialize setting up SSH |
| connections. This adds methods for login, logout, and expecting the shell |
| prompt. It does various tricky things to handle many situations in the SSH |
| login process. For example, if the session is your first login, then pxssh |
| automatically accepts the remote certificate; or if you have public key |
| authentication setup then pxssh won't wait for the password prompt. |
| |
| pxssh uses the shell prompt to synchronize output from the remote host. In |
| order to make this more robust it sets the shell prompt to something more |
| unique than just $ or #. This should work on most Borne/Bash or Csh style |
| shells. |
| |
| Example that runs a few commands on a remote server and prints the result:: |
| |
| import pxssh |
| import getpass |
| try: |
| s = pxssh.pxssh() |
| hostname = raw_input('hostname: ') |
| username = raw_input('username: ') |
| password = getpass.getpass('password: ') |
| s.login (hostname, username, password) |
| s.sendline ('uptime') # run a command |
| s.prompt() # match the prompt |
| print s.before # print everything before the prompt. |
| s.sendline ('ls -l') |
| s.prompt() |
| print s.before |
| s.sendline ('df') |
| s.prompt() |
| print s.before |
| s.logout() |
| except pxssh.ExceptionPxssh, e: |
| print "pxssh failed on login." |
| print str(e) |
| |
| Note that if you have ssh-agent running while doing development with pxssh |
| then this can lead to a lot of confusion. Many X display managers (xdm, |
| gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI |
| dialog box popup asking for a password during development. You should turn |
| off any key agents during testing. The 'force_password' attribute will turn |
| off public key authentication. This will only work if the remote SSH server |
| is configured to allow password logins. Example of using 'force_password' |
| attribute:: |
| |
| s = pxssh.pxssh() |
| s.force_password = True |
| hostname = raw_input('hostname: ') |
| username = raw_input('username: ') |
| password = getpass.getpass('password: ') |
| s.login (hostname, username, password) |
| """ |
| |
| def __init__( |
| self, |
| timeout=30, |
| maxread=2000, |
| searchwindowsize=None, |
| logfile=None, |
| cwd=None, |
| env=None): |
| spawn.__init__( |
| self, |
| None, |
| timeout=timeout, |
| maxread=maxread, |
| searchwindowsize=searchwindowsize, |
| logfile=logfile, |
| cwd=cwd, |
| env=env) |
| |
| self.name = '<pxssh>' |
| |
| # SUBTLE HACK ALERT! Note that the command to set the prompt uses a |
| # slightly different string than the regular expression to match it. This |
| # is because when you set the prompt the command will echo back, but we |
| # don't want to match the echoed command. So if we make the set command |
| # slightly different than the regex we eliminate the problem. To make the |
| # set command different we add a backslash in front of $. The $ doesn't |
| # need to be escaped, but it doesn't hurt and serves to make the set |
| # prompt command different than the regex. |
| |
| # used to match the command-line prompt |
| self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] " |
| self.PROMPT = self.UNIQUE_PROMPT |
| |
| # used to set shell command-line prompt to UNIQUE_PROMPT. |
| self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '" |
| self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '" |
| self.SSH_OPTS = "-o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'" |
| # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from |
| # displaying a GUI password dialog. I have not figured out how to |
| # disable only SSH_ASKPASS without also disabling X11 forwarding. |
| # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying! |
| #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'" |
| self.force_password = False |
| self.auto_prompt_reset = True |
| |
| def levenshtein_distance(self, a, b): |
| """This calculates the Levenshtein distance between a and b. |
| """ |
| |
| n, m = len(a), len(b) |
| if n > m: |
| a, b = b, a |
| n, m = m, n |
| current = range(n + 1) |
| for i in range(1, m + 1): |
| previous, current = current, [i] + [0] * n |
| for j in range(1, n + 1): |
| add, delete = previous[j] + 1, current[j - 1] + 1 |
| change = previous[j - 1] |
| if a[j - 1] != b[i - 1]: |
| change = change + 1 |
| current[j] = min(add, delete, change) |
| return current[n] |
| |
| def sync_original_prompt(self): |
| """This attempts to find the prompt. Basically, press enter and record |
| the response; press enter again and record the response; if the two |
| responses are similar then assume we are at the original prompt. This |
| is a slow function. It can take over 10 seconds. """ |
| |
| # All of these timing pace values are magic. |
| # I came up with these based on what seemed reliable for |
| # connecting to a heavily loaded machine I have. |
| # If latency is worse than these values then this will fail. |
| |
| try: |
| # GAS: Clear out the cache before getting the prompt |
| self.read_nonblocking(size=10000, timeout=1) |
| except TIMEOUT: |
| pass |
| time.sleep(0.1) |
| self.sendline() |
| time.sleep(0.5) |
| x = self.read_nonblocking(size=1000, timeout=1) |
| time.sleep(0.1) |
| self.sendline() |
| time.sleep(0.5) |
| a = self.read_nonblocking(size=1000, timeout=1) |
| time.sleep(0.1) |
| self.sendline() |
| time.sleep(0.5) |
| b = self.read_nonblocking(size=1000, timeout=1) |
| ld = self.levenshtein_distance(a, b) |
| len_a = len(a) |
| if len_a == 0: |
| return False |
| if float(ld) / len_a < 0.4: |
| return True |
| return False |
| |
| # TODO: This is getting messy and I'm pretty sure this isn't perfect. |
| # TODO: I need to draw a flow chart for this. |
| def login( |
| self, |
| server, |
| username, |
| password='', |
| terminal_type='ansi', |
| original_prompt=r"[#$]", |
| login_timeout=10, |
| port=None, |
| auto_prompt_reset=True): |
| """This logs the user into the given server. It uses the |
| 'original_prompt' to try to find the prompt right after login. When it |
| finds the prompt it immediately tries to reset the prompt to something |
| more easily matched. The default 'original_prompt' is very optimistic |
| and is easily fooled. It's more reliable to try to match the original |
| prompt as exactly as possible to prevent false matches by server |
| strings such as the "Message Of The Day". On many systems you can |
| disable the MOTD on the remote server by creating a zero-length file |
| called "~/.hushlogin" on the remote server. If a prompt cannot be found |
| then this will not necessarily cause the login to fail. In the case of |
| a timeout when looking for the prompt we assume that the original |
| prompt was so weird that we could not match it, so we use a few tricks |
| to guess when we have reached the prompt. Then we hope for the best and |
| blindly try to reset the prompt to something more unique. If that fails |
| then login() raises an ExceptionPxssh exception. |
| |
| In some situations it is not possible or desirable to reset the |
| original prompt. In this case, set 'auto_prompt_reset' to False to |
| inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh |
| uses a unique prompt in the prompt() method. If the original prompt is |
| not reset then this will disable the prompt() method unless you |
| manually set the PROMPT attribute. """ |
| |
| ssh_options = '-q' |
| if self.force_password: |
| ssh_options = ssh_options + ' ' + self.SSH_OPTS |
| if port is not None: |
| ssh_options = ssh_options + ' -p %s' % (str(port)) |
| cmd = "ssh %s -l %s %s" % (ssh_options, username, server) |
| |
| # This does not distinguish between a remote server 'password' prompt |
| # and a local ssh 'passphrase' prompt (for unlocking a private key). |
| spawn._spawn(self, cmd) |
| i = self.expect( |
| [ |
| "(?i)are you sure you want to continue connecting", |
| original_prompt, |
| "(?i)(?:password)|(?:passphrase for key)", |
| "(?i)permission denied", |
| "(?i)terminal type", |
| TIMEOUT, |
| "(?i)connection closed by remote host"], |
| timeout=login_timeout) |
| |
| # First phase |
| if i == 0: |
| # New certificate -- always accept it. |
| # This is what you get if SSH does not have the remote host's |
| # public key stored in the 'known_hosts' cache. |
| self.sendline("yes") |
| i = self.expect( |
| [ |
| "(?i)are you sure you want to continue connecting", |
| original_prompt, |
| "(?i)(?:password)|(?:passphrase for key)", |
| "(?i)permission denied", |
| "(?i)terminal type", |
| TIMEOUT]) |
| if i == 2: # password or passphrase |
| self.sendline(password) |
| i = self.expect( |
| [ |
| "(?i)are you sure you want to continue connecting", |
| original_prompt, |
| "(?i)(?:password)|(?:passphrase for key)", |
| "(?i)permission denied", |
| "(?i)terminal type", |
| TIMEOUT]) |
| if i == 4: |
| self.sendline(terminal_type) |
| i = self.expect( |
| [ |
| "(?i)are you sure you want to continue connecting", |
| original_prompt, |
| "(?i)(?:password)|(?:passphrase for key)", |
| "(?i)permission denied", |
| "(?i)terminal type", |
| TIMEOUT]) |
| |
| # Second phase |
| if i == 0: |
| # This is weird. This should not happen twice in a row. |
| self.close() |
| raise ExceptionPxssh( |
| 'Weird error. Got "are you sure" prompt twice.') |
| elif i == 1: # can occur if you have a public key pair set to authenticate. |
| # TODO: May NOT be OK if expect() got tricked and matched a false |
| # prompt. |
| pass |
| elif i == 2: # password prompt again |
| # For incorrect passwords, some ssh servers will |
| # ask for the password again, others return 'denied' right away. |
| # If we get the password prompt again then this means |
| # we didn't get the password right the first time. |
| self.close() |
| raise ExceptionPxssh('password refused') |
| elif i == 3: # permission denied -- password was bad. |
| self.close() |
| raise ExceptionPxssh('permission denied') |
| elif i == 4: # terminal type again? WTF? |
| self.close() |
| raise ExceptionPxssh( |
| 'Weird error. Got "terminal type" prompt twice.') |
| elif i == 5: # Timeout |
| # This is tricky... I presume that we are at the command-line prompt. |
| # It may be that the shell prompt was so weird that we couldn't match |
| # it. Or it may be that we couldn't log in for some other reason. I |
| # can't be sure, but it's safe to guess that we did login because if |
| # I presume wrong and we are not logged in then this should be caught |
| # later when I try to set the shell prompt. |
| pass |
| elif i == 6: # Connection closed by remote host |
| self.close() |
| raise ExceptionPxssh('connection closed') |
| else: # Unexpected |
| self.close() |
| raise ExceptionPxssh('unexpected login response') |
| if not self.sync_original_prompt(): |
| self.close() |
| raise ExceptionPxssh('could not synchronize with original prompt') |
| # We appear to be in. |
| # set shell prompt to something unique. |
| if auto_prompt_reset: |
| if not self.set_unique_prompt(): |
| self.close() |
| raise ExceptionPxssh( |
| 'could not set shell prompt\n' + self.before) |
| return True |
| |
| def logout(self): |
| """This sends exit to the remote shell. If there are stopped jobs then |
| this automatically sends exit twice. """ |
| |
| self.sendline("exit") |
| index = self.expect([EOF, "(?i)there are stopped jobs"]) |
| if index == 1: |
| self.sendline("exit") |
| self.expect(EOF) |
| self.close() |
| |
| def prompt(self, timeout=20): |
| """This matches the shell prompt. This is little more than a short-cut |
| to the expect() method. This returns True if the shell prompt was |
| matched. This returns False if there was a timeout. Note that if you |
| called login() with auto_prompt_reset set to False then you should have |
| manually set the PROMPT attribute to a regex pattern for matching the |
| prompt. """ |
| |
| i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout) |
| if i == 1: |
| return False |
| return True |
| |
| def set_unique_prompt(self): |
| """This sets the remote prompt to something more unique than # or $. |
| This makes it easier for the prompt() method to match the shell prompt |
| unambiguously. This method is called automatically by the login() |
| method, but you may want to call it manually if you somehow reset the |
| shell prompt. For example, if you 'su' to a different user then you |
| will need to manually reset the prompt. This sends shell commands to |
| the remote host to set the prompt, so this assumes the remote host is |
| ready to receive commands. |
| |
| Alternatively, you may use your own prompt pattern. Just set the PROMPT |
| attribute to a regular expression that matches it. In this case you |
| should call login() with auto_prompt_reset=False; then set the PROMPT |
| attribute. After that the prompt() method will try to match your prompt |
| pattern.""" |
| |
| self.sendline("unset PROMPT_COMMAND") |
| self.sendline(self.PROMPT_SET_SH) # sh-style |
| i = self.expect([TIMEOUT, self.PROMPT], timeout=10) |
| if i == 0: # csh-style |
| self.sendline(self.PROMPT_SET_CSH) |
| i = self.expect([TIMEOUT, self.PROMPT], timeout=10) |
| if i == 0: |
| return False |
| return True |
| |
| # vi:ts=4:sw=4:expandtab:ft=python: |