blob: 6ad72b1f29dddccf6d91664a6485c69ecdd7a33b [file] [log] [blame]
from __future__ import with_statement
# We don't support python2.5 for the server
from nose import SkipTest
import sys
if sys.version_info < (2, 6, 0):
raise SkipTest
import time
import hashlib
import shutil
import tempfile
from unittest import TestCase
from StringIO import StringIO
from ConfigParser import RawConfigParser
import mock
import webob
import signing.server as ss
class TestTokens(TestCase):
def testTokenData(self):
now = int(time.time())
token = ss.make_token_data("1.2.3.4", now, now + 300)
parts = token.split(":")
self.assertEquals(parts[:-1], ["1.2.3.4", str(now), str(now + 300)])
unpacked = ss.unpack_token_data(token)
self.assertEquals(unpacked, dict(
slave_ip="1.2.3.4", valid_from=now, valid_to=now + 300))
config_data = """
[server]
port = 8080
max_file_age = 600
cleanup_interval = 300
[security]
token_secret = asdfasdf
token_secret_old = 1234567890
allowed_ips = 127.0.0.0/24, 127.1.0.0/24
new_token_allowed_ips = 127.1.0.0/24
allowed_filenames = .*
min_filesize = 100
max_token_age = 600
new_token_auth = foo:bar
new_token_auth2 = fuz:baz
[paths]
signed_dir = %(tmpdir)s/signed-files
unsigned_dir = %(tmpdir)s/unsigned-files
[signing]
formats = gpg,signcode,mar,dmg
signscript = signscript.py
concurrency = 4
"""
class TestSigningServer(TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.config_data = config_data % dict(tmpdir=self.tmpdir)
config = RawConfigParser()
config.readfp(StringIO(self.config_data))
passphrases = {"gpg": "foobar"}
self.server = ss.SigningServer(config, passphrases)
def tearDown(self):
self.server.stop()
shutil.rmtree(self.tmpdir)
def testGetToken(self):
token = self.server.get_token("1.2.3.4", 300)
self.assertEquals(True, self.server.verify_token(token, "1.2.3.4"))
def testMaxTokenAge(self):
self.assertRaises(ValueError, self.server.get_token, "1.2.3.4", 3000)
def testExpiredToken(self):
with mock.patch("time.time") as t:
t.return_value = 0
token = self.server.get_token("1.2.3.4", 300)
t.return_value = 299
self.assertEquals(True, self.server.verify_token(token, "1.2.3.4"))
t.return_value = 301
self.assertEquals(
False, self.server.verify_token(token, "1.2.3.4"))
def testBadSlaveIp(self):
token = self.server.get_token("1.2.3.4", 300)
self.assertEquals(False, self.server.verify_token(token, "1.2.3.5"))
def testOldTokenSecret(self):
self.server.token_secret = "1234567890"
token = self.server.get_token("1.2.3.4", 300)
self.server.token_secret = "asdfasdf"
self.assertEquals(True, self.server.verify_token(token, "1.2.3.4"))
def testBadOldTokenSecret(self):
# Make sure that using a bad secret to generate the token results in
# failure to validate
self.server.token_secret = "bad"
token = self.server.get_token("1.2.3.4", 300)
self.server.token_secret = "asdfasdf"
self.assertEquals(False, self.server.verify_token(token, "1.2.3.4"))
def testBadIp(self):
req = webob.Request.blank("/sign/token")
req.environ['REMOTE_ADDR'] = '128.1.0.1'
resp = req.get_response(self.server)
self.assertEquals(resp.status_code, 403)
def testNewToken(self):
req = webob.Request.blank("/token")
req.environ['REMOTE_ADDR'] = '127.1.0.1'
req.headers['Authorization'] = "Basic %s" % "foo:bar".encode("base64")
req.method = 'POST'
req.POST['slave_ip'] = "1.2.3.4"
req.POST['duration'] = "300"
resp = req.get_response(self.server)
self.assertEquals(resp.status_code, 200)
token = resp.body
self.assertTrue(self.server.verify_token(token, "1.2.3.4"))
def testNewTokenAuth2(self):
req = webob.Request.blank("/token")
req.environ['REMOTE_ADDR'] = '127.1.0.1'
req.headers['Authorization'] = "Basic %s" % "fuz:baz".encode("base64")
req.method = 'POST'
req.POST['slave_ip'] = "1.2.3.4"
req.POST['duration'] = "300"
resp = req.get_response(self.server)
self.assertEquals(resp.status_code, 200)
token = resp.body
self.assertTrue(self.server.verify_token(token, "1.2.3.4"))
def testNewTokenBadIp(self):
req = webob.Request.blank("/token")
req.environ['REMOTE_ADDR'] = '127.0.0.1'
req.headers['Authorization'] = "Basic %s" % "foo:bar".encode("base64")
req.method = 'POST'
req.POST['slave_ip'] = "1.2.3.4"
req.POST['duration'] = "300"
resp = req.get_response(self.server)
self.assertEquals(resp.status_code, 403)
def testNewTokenBadAuth(self):
req = webob.Request.blank("/token")
req.environ['REMOTE_ADDR'] = '127.1.0.1'
req.headers['Authorization'] = "Basic %s" % "faz:faz".encode("base64")
req.method = 'POST'
req.POST['slave_ip'] = "1.2.3.4"
req.POST['duration'] = "300"
resp = req.get_response(self.server)
self.assertEquals(resp.status_code, 401)
def test_transactions(self):
master = '127.1.0.1'
slave = '127.0.0.0'
def new_token():
req = webob.Request.blank("/token")
req.environ['REMOTE_ADDR'] = master
req.headers['Authorization'] = "Basic %s" % "foo:bar".encode("base64")
req.method = 'POST'
req.POST['slave_ip'] = slave
req.POST['duration'] = "300"
resp = req.get_response(self.server)
self.assertEquals(resp.status_code, 200)
return resp.body
def sign(token, nonce, filename, data, slave=slave, expect_fail=False):
h = hashlib.new('sha1')
h.update(data)
sha1 = h.hexdigest()
req = webob.Request.blank("/sign/gpg", POST={
'filedata': (filename, data),
'token': token,
'nonce': nonce,
'filename': filename,
'sha1': sha1})
req.environ['REMOTE_ADDR'] = slave
req.method = 'POST'
resp = req.get_response(self.server)
if not expect_fail:
self.assertEquals(resp.status_code, 202)
return resp.headers['X-Nonce']
else:
self.assertEquals(resp.status_code, 400)
token = new_token()
nonce1 = sign(token, '', 'stuff.txt', 'stuff\n' * 100)
nonce2 = sign(token, nonce1, 'morestuff.txt', 'stuff!\n' * 100)
nonce3 = sign(token, nonce2, 'evenmorestuff.txt', 'stuff!!\n' * 100)
# try futzing with the token data
token = token.replace(slave, '127.0.0.99')
sign(token, nonce3, 'evenmorestuff.txt', 'stuff!!\n' * 100, slave='127.0.0.99', expect_fail=True)