542
593
self.fail("%r is an instance of %s rather than %s" % (
543
594
obj, obj.__class__, kls))
596
def _capture_warnings(self, a_callable, *args, **kwargs):
597
"""A helper for callDeprecated and applyDeprecated.
599
:param a_callable: A callable to call.
600
:param args: The positional arguments for the callable
601
:param kwargs: The keyword arguments for the callable
602
:return: A tuple (warnings, result). result is the result of calling
603
a_callable(*args, **kwargs).
606
def capture_warnings(msg, cls, stacklevel=None):
607
# we've hooked into a deprecation specific callpath,
608
# only deprecations should getting sent via it.
609
self.assertEqual(cls, DeprecationWarning)
610
local_warnings.append(msg)
611
original_warning_method = symbol_versioning.warn
612
symbol_versioning.set_warning_method(capture_warnings)
614
result = a_callable(*args, **kwargs)
616
symbol_versioning.set_warning_method(original_warning_method)
617
return (local_warnings, result)
619
def applyDeprecated(self, deprecation_format, a_callable, *args, **kwargs):
620
"""Call a deprecated callable without warning the user.
622
:param deprecation_format: The deprecation format that the callable
623
should have been deprecated with. This is the same type as the
624
parameter to deprecated_method/deprecated_function. If the
625
callable is not deprecated with this format, an assertion error
627
:param a_callable: A callable to call. This may be a bound method or
628
a regular function. It will be called with *args and **kwargs.
629
:param args: The positional arguments for the callable
630
:param kwargs: The keyword arguments for the callable
631
:return: The result of a_callable(*args, **kwargs)
633
call_warnings, result = self._capture_warnings(a_callable,
635
expected_first_warning = symbol_versioning.deprecation_string(
636
a_callable, deprecation_format)
637
if len(call_warnings) == 0:
638
self.fail("No assertion generated by call to %s" %
640
self.assertEqual(expected_first_warning, call_warnings[0])
643
def callDeprecated(self, expected, callable, *args, **kwargs):
644
"""Assert that a callable is deprecated in a particular way.
646
This is a very precise test for unusual requirements. The
647
applyDeprecated helper function is probably more suited for most tests
648
as it allows you to simply specify the deprecation format being used
649
and will ensure that that is issued for the function being called.
651
:param expected: a list of the deprecation warnings expected, in order
652
:param callable: The callable to call
653
:param args: The positional arguments for the callable
654
:param kwargs: The keyword arguments for the callable
656
call_warnings, result = self._capture_warnings(callable,
658
self.assertEqual(expected, call_warnings)
545
661
def _startLogFile(self):
546
662
"""Send bzr and test log messages to a temporary file.
548
664
The file is removed as the test is torn down.
550
666
fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
551
encoder, decoder, stream_reader, stream_writer = codecs.lookup('UTF-8')
552
self._log_file = stream_writer(os.fdopen(fileno, 'w+'))
667
self._log_file = os.fdopen(fileno, 'w+')
553
668
self._log_nonce = bzrlib.trace.enable_test_log(self._log_file)
554
669
self._log_file_name = name
555
670
self.addCleanup(self._finishLogFile)
652
760
def log(self, *args):
656
"""Return as a string the log for this test"""
657
if self._log_file_name:
658
return open(self._log_file_name).read()
763
def _get_log(self, keep_log_file=False):
764
"""Return as a string the log for this test. If the file is still
765
on disk and keep_log_file=False, delete the log file and store the
766
content in self._log_contents."""
767
# flush the log file, to get all content
769
bzrlib.trace._trace_file.flush()
770
if self._log_contents:
660
771
return self._log_contents
661
# TODO: Delete the log after it's been read in
772
if self._log_file_name is not None:
773
logfile = open(self._log_file_name)
775
log_contents = logfile.read()
778
if not keep_log_file:
779
self._log_contents = log_contents
780
os.remove(self._log_file_name)
783
return "DELETED log file to reduce memory footprint"
663
785
def capture(self, cmd, retcode=0):
664
786
"""Shortcut that splits cmd into words, runs, and returns stdout"""
665
787
return self.run_bzr_captured(cmd.split(), retcode=retcode)[0]
667
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None):
789
def run_bzr_captured(self, argv, retcode=0, encoding=None, stdin=None,
668
791
"""Invoke bzr and return (stdout, stderr).
670
793
Useful for code that wants to check the contents of the
787
921
profiled or debugged so easily.
789
923
:param retcode: The status code that is expected. Defaults to 0. If
790
None is supplied, the status code is not checked.
924
None is supplied, the status code is not checked.
925
:param env_changes: A dictionary which lists changes to environment
926
variables. A value of None will unset the env variable.
927
The values must be strings. The change will only occur in the
928
child, so you don't need to fix the environment after running.
929
:param universal_newlines: Convert CRLF => LF
931
env_changes = kwargs.get('env_changes', {})
932
working_dir = kwargs.get('working_dir', None)
933
process = self.start_bzr_subprocess(args, env_changes=env_changes,
934
working_dir=working_dir)
935
# We distinguish between retcode=None and retcode not passed.
936
supplied_retcode = kwargs.get('retcode', 0)
937
return self.finish_bzr_subprocess(process, retcode=supplied_retcode,
938
universal_newlines=kwargs.get('universal_newlines', False),
941
def start_bzr_subprocess(self, process_args, env_changes=None,
942
skip_if_plan_to_signal=False,
944
"""Start bzr in a subprocess for testing.
946
This starts a new Python interpreter and runs bzr in there.
947
This should only be used for tests that have a justifiable need for
948
this isolation: e.g. they are testing startup time, or signal
949
handling, or early startup code, etc. Subprocess code can't be
950
profiled or debugged so easily.
952
:param process_args: a list of arguments to pass to the bzr executable,
953
for example `['--version']`.
954
:param env_changes: A dictionary which lists changes to environment
955
variables. A value of None will unset the env variable.
956
The values must be strings. The change will only occur in the
957
child, so you don't need to fix the environment after running.
958
:param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
961
:returns: Popen object for the started process.
963
if skip_if_plan_to_signal:
964
if not getattr(os, 'kill', None):
965
raise TestSkipped("os.kill not available.")
967
if env_changes is None:
971
def cleanup_environment():
972
for env_var, value in env_changes.iteritems():
973
old_env[env_var] = osutils.set_or_unset_env(env_var, value)
975
def restore_environment():
976
for env_var, value in old_env.iteritems():
977
osutils.set_or_unset_env(env_var, value)
979
bzr_path = self.get_bzr_path()
982
if working_dir is not None:
983
cwd = osutils.getcwd()
984
os.chdir(working_dir)
987
# win32 subprocess doesn't support preexec_fn
988
# so we will avoid using it on all platforms, just to
989
# make sure the code path is used, and we don't break on win32
990
cleanup_environment()
991
process = Popen([sys.executable, bzr_path] + list(process_args),
992
stdin=PIPE, stdout=PIPE, stderr=PIPE)
994
restore_environment()
1000
def get_bzr_path(self):
1001
"""Return the path of the 'bzr' executable for this test suite."""
792
1002
bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
794
process = Popen([sys.executable, bzr_path]+args, stdout=PIPE,
796
out = process.stdout.read()
797
err = process.stderr.read()
798
retcode = process.wait()
799
supplied_retcode = kwargs.get('retcode', 0)
800
if supplied_retcode is not None:
801
assert supplied_retcode == retcode
1003
if not os.path.isfile(bzr_path):
1004
# We are probably installed. Assume sys.argv is the right file
1005
bzr_path = sys.argv[0]
1008
def finish_bzr_subprocess(self, process, retcode=0, send_signal=None,
1009
universal_newlines=False, process_args=None):
1010
"""Finish the execution of process.
1012
:param process: the Popen object returned from start_bzr_subprocess.
1013
:param retcode: The status code that is expected. Defaults to 0. If
1014
None is supplied, the status code is not checked.
1015
:param send_signal: an optional signal to send to the process.
1016
:param universal_newlines: Convert CRLF => LF
1017
:returns: (stdout, stderr)
1019
if send_signal is not None:
1020
os.kill(process.pid, send_signal)
1021
out, err = process.communicate()
1023
if universal_newlines:
1024
out = out.replace('\r\n', '\n')
1025
err = err.replace('\r\n', '\n')
1027
if retcode is not None and retcode != process.returncode:
1028
if process_args is None:
1029
process_args = "(unknown args)"
1030
mutter('Output of bzr %s:\n%s', process_args, out)
1031
mutter('Error for bzr %s:\n%s', process_args, err)
1032
self.fail('Command bzr %s failed with retcode %s != %s'
1033
% (process_args, retcode, process.returncode))
802
1034
return [out, err]
804
1036
def check_inventory_shape(self, inv, shape):
864
1097
base_rev = common_ancestor(branch_from.last_revision(),
865
1098
wt_to.branch.last_revision(),
866
1099
wt_to.branch.repository)
867
merge_inner(wt_to.branch, branch_from.basis_tree(),
1100
merge_inner(wt_to.branch, branch_from.basis_tree(),
868
1101
wt_to.branch.repository.revision_tree(base_rev),
869
1102
this_tree=wt_to)
870
wt_to.add_pending_merge(branch_from.last_revision())
1103
wt_to.add_parent_tree_id(branch_from.last_revision())
873
1106
BzrTestBase = TestCase
1109
class TestCaseWithMemoryTransport(TestCase):
1110
"""Common test class for tests that do not need disk resources.
1112
Tests that need disk resources should derive from TestCaseWithTransport.
1114
TestCaseWithMemoryTransport sets the TEST_ROOT variable for all bzr tests.
1116
For TestCaseWithMemoryTransport the test_home_dir is set to the name of
1117
a directory which does not exist. This serves to help ensure test isolation
1118
is preserved. test_dir is set to the TEST_ROOT, as is cwd, because they
1119
must exist. However, TestCaseWithMemoryTransport does not offer local
1120
file defaults for the transport in tests, nor does it obey the command line
1121
override, so tests that accidentally write to the common directory should
1129
def __init__(self, methodName='runTest'):
1130
# allow test parameterisation after test construction and before test
1131
# execution. Variables that the parameteriser sets need to be
1132
# ones that are not set by setUp, or setUp will trash them.
1133
super(TestCaseWithMemoryTransport, self).__init__(methodName)
1134
self.transport_server = default_transport
1135
self.transport_readonly_server = None
1137
def failUnlessExists(self, path):
1138
"""Fail unless path, which may be abs or relative, exists."""
1139
self.failUnless(osutils.lexists(path))
1141
def failIfExists(self, path):
1142
"""Fail if path, which may be abs or relative, exists."""
1143
self.failIf(osutils.lexists(path))
1145
def get_transport(self):
1146
"""Return a writeable transport for the test scratch space"""
1147
t = get_transport(self.get_url())
1148
self.assertFalse(t.is_readonly())
1151
def get_readonly_transport(self):
1152
"""Return a readonly transport for the test scratch space
1154
This can be used to test that operations which should only need
1155
readonly access in fact do not try to write.
1157
t = get_transport(self.get_readonly_url())
1158
self.assertTrue(t.is_readonly())
1161
def get_readonly_server(self):
1162
"""Get the server instance for the readonly transport
1164
This is useful for some tests with specific servers to do diagnostics.
1166
if self.__readonly_server is None:
1167
if self.transport_readonly_server is None:
1168
# readonly decorator requested
1169
# bring up the server
1171
self.__readonly_server = ReadonlyServer()
1172
self.__readonly_server.setUp(self.__server)
1174
self.__readonly_server = self.transport_readonly_server()
1175
self.__readonly_server.setUp()
1176
self.addCleanup(self.__readonly_server.tearDown)
1177
return self.__readonly_server
1179
def get_readonly_url(self, relpath=None):
1180
"""Get a URL for the readonly transport.
1182
This will either be backed by '.' or a decorator to the transport
1183
used by self.get_url()
1184
relpath provides for clients to get a path relative to the base url.
1185
These should only be downwards relative, not upwards.
1187
base = self.get_readonly_server().get_url()
1188
if relpath is not None:
1189
if not base.endswith('/'):
1191
base = base + relpath
1194
def get_server(self):
1195
"""Get the read/write server instance.
1197
This is useful for some tests with specific servers that need
1200
For TestCaseWithMemoryTransport this is always a MemoryServer, and there
1201
is no means to override it.
1203
if self.__server is None:
1204
self.__server = MemoryServer()
1205
self.__server.setUp()
1206
self.addCleanup(self.__server.tearDown)
1207
return self.__server
1209
def get_url(self, relpath=None):
1210
"""Get a URL (or maybe a path) for the readwrite transport.
1212
This will either be backed by '.' or to an equivalent non-file based
1214
relpath provides for clients to get a path relative to the base url.
1215
These should only be downwards relative, not upwards.
1217
base = self.get_server().get_url()
1218
if relpath is not None and relpath != '.':
1219
if not base.endswith('/'):
1221
# XXX: Really base should be a url; we did after all call
1222
# get_url()! But sometimes it's just a path (from
1223
# LocalAbspathServer), and it'd be wrong to append urlescaped data
1224
# to a non-escaped local path.
1225
if base.startswith('./') or base.startswith('/'):
1228
base += urlutils.escape(relpath)
1231
def _make_test_root(self):
1232
if TestCaseWithMemoryTransport.TEST_ROOT is not None:
1236
root = u'test%04d.tmp' % i
1240
if e.errno == errno.EEXIST:
1245
# successfully created
1246
TestCaseWithMemoryTransport.TEST_ROOT = osutils.abspath(root)
1248
# make a fake bzr directory there to prevent any tests propagating
1249
# up onto the source directory's real branch
1250
bzrdir.BzrDir.create_standalone_workingtree(
1251
TestCaseWithMemoryTransport.TEST_ROOT)
1253
def makeAndChdirToTestDir(self):
1254
"""Create a temporary directories for this one test.
1256
This must set self.test_home_dir and self.test_dir and chdir to
1259
For TestCaseWithMemoryTransport we chdir to the TEST_ROOT for this test.
1261
os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
1262
self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
1263
self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
1265
def make_branch(self, relpath, format=None):
1266
"""Create a branch on the transport at relpath."""
1267
repo = self.make_repository(relpath, format=format)
1268
return repo.bzrdir.create_branch()
1270
def make_bzrdir(self, relpath, format=None):
1272
# might be a relative or absolute path
1273
maybe_a_url = self.get_url(relpath)
1274
segments = maybe_a_url.rsplit('/', 1)
1275
t = get_transport(maybe_a_url)
1276
if len(segments) > 1 and segments[-1] not in ('', '.'):
1279
except errors.FileExists:
1282
format = bzrlib.bzrdir.BzrDirFormat.get_default_format()
1283
return format.initialize_on_transport(t)
1284
except errors.UninitializableFormat:
1285
raise TestSkipped("Format %s is not initializable." % format)
1287
def make_repository(self, relpath, shared=False, format=None):
1288
"""Create a repository on our default transport at relpath."""
1289
made_control = self.make_bzrdir(relpath, format=format)
1290
return made_control.create_repository(shared=shared)
1292
def make_branch_and_memory_tree(self, relpath):
1293
"""Create a branch on the default transport and a MemoryTree for it."""
1294
b = self.make_branch(relpath)
1295
return memorytree.MemoryTree.create_on_branch(b)
1297
def overrideEnvironmentForTesting(self):
1298
os.environ['HOME'] = self.test_home_dir
1299
os.environ['APPDATA'] = self.test_home_dir
1302
super(TestCaseWithMemoryTransport, self).setUp()
1303
self._make_test_root()
1304
_currentdir = os.getcwdu()
1305
def _leaveDirectory():
1306
os.chdir(_currentdir)
1307
self.addCleanup(_leaveDirectory)
1308
self.makeAndChdirToTestDir()
1309
self.overrideEnvironmentForTesting()
1310
self.__readonly_server = None
1311
self.__server = None
876
class TestCaseInTempDir(TestCase):
1314
class TestCaseInTempDir(TestCaseWithMemoryTransport):
877
1315
"""Derived class that runs a test within a temporary directory.
879
1317
This is useful for tests that need to create a branch, etc.
980
1397
raise errors.BzrError('Invalid line ending request %r' % (line_endings,))
981
1398
content = "contents of %s%s" % (name.encode('utf-8'), end)
982
transport.put(urlutils.escape(name), StringIO(content))
1399
# Technically 'put()' is the right command. However, put
1400
# uses an AtomicFile, which requires an extra rename into place
1401
# As long as the files didn't exist in the past, append() will
1402
# do the same thing as put()
1403
# On jam's machine, make_kernel_like_tree is:
1404
# put: 4.5-7.5s (averaging 6s)
1406
# put_non_atomic: 2.9-4.5s
1407
transport.put_bytes_non_atomic(urlutils.escape(name), content)
984
1409
def build_tree_contents(self, shape):
985
1410
build_tree_contents(shape)
987
def failUnlessExists(self, path):
988
"""Fail unless path, which may be abs or relative, exists."""
989
self.failUnless(osutils.lexists(path))
991
def failIfExists(self, path):
992
"""Fail if path, which may be abs or relative, exists."""
993
self.failIf(osutils.lexists(path))
995
1412
def assertFileEqual(self, content, path):
996
1413
"""Fail if path does not contain 'content'."""
997
1414
self.failUnless(osutils.lexists(path))
1013
1430
readwrite one must both define get_url() as resolving to os.getcwd().
1016
def __init__(self, methodName='testMethod'):
1017
super(TestCaseWithTransport, self).__init__(methodName)
1018
self.__readonly_server = None
1019
self.__server = None
1020
self.transport_server = default_transport
1021
self.transport_readonly_server = None
1023
def get_readonly_url(self, relpath=None):
1024
"""Get a URL for the readonly transport.
1026
This will either be backed by '.' or a decorator to the transport
1027
used by self.get_url()
1028
relpath provides for clients to get a path relative to the base url.
1029
These should only be downwards relative, not upwards.
1031
base = self.get_readonly_server().get_url()
1032
if relpath is not None:
1033
if not base.endswith('/'):
1035
base = base + relpath
1038
def get_readonly_server(self):
1039
"""Get the server instance for the readonly transport
1041
This is useful for some tests with specific servers to do diagnostics.
1043
if self.__readonly_server is None:
1044
if self.transport_readonly_server is None:
1045
# readonly decorator requested
1046
# bring up the server
1048
self.__readonly_server = ReadonlyServer()
1049
self.__readonly_server.setUp(self.__server)
1051
self.__readonly_server = self.transport_readonly_server()
1052
self.__readonly_server.setUp()
1053
self.addCleanup(self.__readonly_server.tearDown)
1054
return self.__readonly_server
1056
1433
def get_server(self):
1057
"""Get the read/write server instance.
1434
"""See TestCaseWithMemoryTransport.
1059
1436
This is useful for some tests with specific servers that need
1065
1442
self.addCleanup(self.__server.tearDown)
1066
1443
return self.__server
1068
def get_url(self, relpath=None):
1069
"""Get a URL for the readwrite transport.
1071
This will either be backed by '.' or to an equivalent non-file based
1073
relpath provides for clients to get a path relative to the base url.
1074
These should only be downwards relative, not upwards.
1076
base = self.get_server().get_url()
1077
if relpath is not None and relpath != '.':
1078
if not base.endswith('/'):
1080
base = base + urlutils.escape(relpath)
1083
def get_transport(self):
1084
"""Return a writeable transport for the test scratch space"""
1085
t = get_transport(self.get_url())
1086
self.assertFalse(t.is_readonly())
1089
def get_readonly_transport(self):
1090
"""Return a readonly transport for the test scratch space
1092
This can be used to test that operations which should only need
1093
readonly access in fact do not try to write.
1095
t = get_transport(self.get_readonly_url())
1096
self.assertTrue(t.is_readonly())
1099
def make_branch(self, relpath, format=None):
1100
"""Create a branch on the transport at relpath."""
1101
repo = self.make_repository(relpath, format=format)
1102
return repo.bzrdir.create_branch()
1104
def make_bzrdir(self, relpath, format=None):
1106
url = self.get_url(relpath)
1107
mutter('relpath %r => url %r', relpath, url)
1108
segments = url.split('/')
1109
if segments and segments[-1] not in ('', '.'):
1110
parent = '/'.join(segments[:-1])
1111
t = get_transport(parent)
1113
t.mkdir(segments[-1])
1114
except errors.FileExists:
1117
format=bzrlib.bzrdir.BzrDirFormat.get_default_format()
1118
# FIXME: make this use a single transport someday. RBC 20060418
1119
return format.initialize_on_transport(get_transport(relpath))
1120
except errors.UninitializableFormat:
1121
raise TestSkipped("Format %s is not initializable." % format)
1123
def make_repository(self, relpath, shared=False, format=None):
1124
"""Create a repository on our default transport at relpath."""
1125
made_control = self.make_bzrdir(relpath, format=format)
1126
return made_control.create_repository(shared=shared)
1128
1445
def make_branch_and_tree(self, relpath, format=None):
1129
1446
"""Create a branch on the transport and a tree locally.
1448
If the transport is not a LocalTransport, the Tree can't be created on
1449
the transport. In that case the working tree is created in the local
1450
directory, and the returned tree's branch and repository will also be
1453
This will fail if the original default transport for this test
1454
case wasn't backed by the working directory, as the branch won't
1455
be on disk for us to open it.
1457
:param format: The BzrDirFormat.
1458
:returns: the WorkingTree.
1133
1460
# TODO: always use the local disk path for the working tree,
1134
1461
# this obviously requires a format that supports branch references
1255
1600
'bzrlib.tests.test_decorators',
1256
1601
'bzrlib.tests.test_diff',
1257
1602
'bzrlib.tests.test_doc_generate',
1258
'bzrlib.tests.test_emptytree',
1259
1603
'bzrlib.tests.test_errors',
1260
1604
'bzrlib.tests.test_escaped_store',
1261
1605
'bzrlib.tests.test_fetch',
1606
'bzrlib.tests.test_ftp_transport',
1262
1607
'bzrlib.tests.test_gpg',
1263
1608
'bzrlib.tests.test_graph',
1264
1609
'bzrlib.tests.test_hashcache',
1265
1610
'bzrlib.tests.test_http',
1266
1611
'bzrlib.tests.test_http_response',
1267
1612
'bzrlib.tests.test_identitymap',
1613
'bzrlib.tests.test_ignores',
1268
1614
'bzrlib.tests.test_inv',
1269
1615
'bzrlib.tests.test_knit',
1616
'bzrlib.tests.test_lazy_import',
1617
'bzrlib.tests.test_lazy_regex',
1270
1618
'bzrlib.tests.test_lockdir',
1271
1619
'bzrlib.tests.test_lockable_files',
1272
1620
'bzrlib.tests.test_log',
1621
'bzrlib.tests.test_memorytree',
1273
1622
'bzrlib.tests.test_merge',
1274
1623
'bzrlib.tests.test_merge3',
1275
1624
'bzrlib.tests.test_merge_core',