1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the smart wire/domain protocol.
19
This module contains tests for the domain-level smart requests and responses,
20
such as the 'Branch.lock_write' request. Many of these use specific disk
21
formats to exercise calls that only make sense for formats with specific
24
Tests for low-level protocol encoding are found in test_smart_transport.
28
from cStringIO import StringIO
41
from bzrlib.branch import Branch, BranchReferenceFormat
42
import bzrlib.smart.branch
43
import bzrlib.smart.bzrdir, bzrlib.smart.bzrdir as smart_dir
44
import bzrlib.smart.packrepository
45
import bzrlib.smart.repository
46
import bzrlib.smart.vfs
47
from bzrlib.smart.request import (
48
FailedSmartServerResponse,
51
SuccessfulSmartServerResponse,
53
from bzrlib.tests import (
56
from bzrlib.transport import chroot, get_transport, local, memory
59
def load_tests(standard_tests, module, loader):
60
"""Multiply tests version and protocol consistency."""
61
# FindRepository tests.
62
bzrdir_mod = bzrlib.smart.bzrdir
65
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
66
("find_repositoryV2", {
67
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
68
("find_repositoryV3", {
69
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV3}),
71
to_adapt, result = split_suite_by_re(standard_tests,
72
"TestSmartServerRequestFindRepository")
73
v2_only, v1_and_2 = split_suite_by_re(to_adapt,
75
tests.multiply_tests(v1_and_2, scenarios, result)
76
# The first scenario is only applicable to v1 protocols, it is deleted
78
tests.multiply_tests(v2_only, scenarios[1:], result)
82
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
85
self.vfs_transport_factory = memory.MemoryServer
86
tests.TestCaseWithTransport.setUp(self)
87
self._chroot_server = None
89
def get_transport(self, relpath=None):
90
if self._chroot_server is None:
91
backing_transport = tests.TestCaseWithTransport.get_transport(self)
92
self._chroot_server = chroot.ChrootServer(backing_transport)
93
self.start_server(self._chroot_server)
94
t = get_transport(self._chroot_server.get_url())
95
if relpath is not None:
100
class TestCaseWithSmartMedium(tests.TestCaseWithMemoryTransport):
103
super(TestCaseWithSmartMedium, self).setUp()
104
# We're allowed to set the transport class here, so that we don't use
105
# the default or a parameterized class, but rather use the
106
# TestCaseWithTransport infrastructure to set up a smart server and
108
self.transport_server = self.make_transport_server
110
def make_transport_server(self):
111
return smart.server.SmartTCPServer_for_testing('-' + self.id())
113
def get_smart_medium(self):
114
"""Get a smart medium to use in tests."""
115
return self.get_transport().get_smart_medium()
118
class TestByteStreamToStream(tests.TestCase):
120
def test_repeated_substreams_same_kind_are_one_stream(self):
121
# Make a stream - an iterable of bytestrings.
122
stream = [('text', [versionedfile.FulltextContentFactory(('k1',), None,
123
None, 'foo')]),('text', [
124
versionedfile.FulltextContentFactory(('k2',), None, None, 'bar')])]
125
fmt = bzrdir.format_registry.get('pack-0.92')().repository_format
126
bytes = smart.repository._stream_to_byte_stream(stream, fmt)
128
# Iterate the resulting iterable; checking that we get only one stream
130
fmt, stream = smart.repository._byte_stream_to_stream(bytes)
131
for kind, substream in stream:
132
streams.append((kind, list(substream)))
133
self.assertLength(1, streams)
134
self.assertLength(2, streams[0][1])
137
class TestSmartServerResponse(tests.TestCase):
139
def test__eq__(self):
140
self.assertEqual(SmartServerResponse(('ok', )),
141
SmartServerResponse(('ok', )))
142
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
143
SmartServerResponse(('ok', ), 'body'))
144
self.assertNotEqual(SmartServerResponse(('ok', )),
145
SmartServerResponse(('notok', )))
146
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
147
SmartServerResponse(('ok', )))
148
self.assertNotEqual(None,
149
SmartServerResponse(('ok', )))
151
def test__str__(self):
152
"""SmartServerResponses can be stringified."""
154
"<SuccessfulSmartServerResponse args=('args',) body='body'>",
155
str(SuccessfulSmartServerResponse(('args',), 'body')))
157
"<FailedSmartServerResponse args=('args',) body='body'>",
158
str(FailedSmartServerResponse(('args',), 'body')))
161
class TestSmartServerRequest(tests.TestCaseWithMemoryTransport):
163
def test_translate_client_path(self):
164
transport = self.get_transport()
165
request = SmartServerRequest(transport, 'foo/')
166
self.assertEqual('./', request.translate_client_path('foo/'))
168
errors.InvalidURLJoin, request.translate_client_path, 'foo/..')
170
errors.PathNotChild, request.translate_client_path, '/')
172
errors.PathNotChild, request.translate_client_path, 'bar/')
173
self.assertEqual('./baz', request.translate_client_path('foo/baz'))
174
e_acute = u'\N{LATIN SMALL LETTER E WITH ACUTE}'.encode('utf-8')
175
self.assertEqual('./' + urlutils.escape(e_acute),
176
request.translate_client_path('foo/' + e_acute))
178
def test_translate_client_path_vfs(self):
179
"""VfsRequests receive escaped paths rather than raw UTF-8."""
180
transport = self.get_transport()
181
request = smart.vfs.VfsRequest(transport, 'foo/')
182
e_acute = u'\N{LATIN SMALL LETTER E WITH ACUTE}'.encode('utf-8')
183
escaped = urlutils.escape('foo/' + e_acute)
184
self.assertEqual('./' + urlutils.escape(e_acute),
185
request.translate_client_path(escaped))
187
def test_transport_from_client_path(self):
188
transport = self.get_transport()
189
request = SmartServerRequest(transport, 'foo/')
192
request.transport_from_client_path('foo/').base)
195
class TestSmartServerBzrDirRequestCloningMetaDir(
196
tests.TestCaseWithMemoryTransport):
197
"""Tests for BzrDir.cloning_metadir."""
199
def test_cloning_metadir(self):
200
"""When there is a bzrdir present, the call succeeds."""
201
backing = self.get_transport()
202
dir = self.make_bzrdir('.')
203
local_result = dir.cloning_metadir()
204
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
205
request = request_class(backing)
206
expected = SuccessfulSmartServerResponse(
207
(local_result.network_name(),
208
local_result.repository_format.network_name(),
209
('branch', local_result.get_branch_format().network_name())))
210
self.assertEqual(expected, request.execute('', 'False'))
212
def test_cloning_metadir_reference(self):
213
"""The request fails when bzrdir contains a branch reference."""
214
backing = self.get_transport()
215
referenced_branch = self.make_branch('referenced')
216
dir = self.make_bzrdir('.')
217
local_result = dir.cloning_metadir()
218
reference = BranchReferenceFormat().initialize(dir, referenced_branch)
219
reference_url = BranchReferenceFormat().get_reference(dir)
220
# The server shouldn't try to follow the branch reference, so it's fine
221
# if the referenced branch isn't reachable.
222
backing.rename('referenced', 'moved')
223
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
224
request = request_class(backing)
225
expected = FailedSmartServerResponse(('BranchReference',))
226
self.assertEqual(expected, request.execute('', 'False'))
229
class TestSmartServerRequestCreateRepository(tests.TestCaseWithMemoryTransport):
230
"""Tests for BzrDir.create_repository."""
232
def test_makes_repository(self):
233
"""When there is a bzrdir present, the call succeeds."""
234
backing = self.get_transport()
235
self.make_bzrdir('.')
236
request_class = bzrlib.smart.bzrdir.SmartServerRequestCreateRepository
237
request = request_class(backing)
238
reference_bzrdir_format = bzrdir.format_registry.get('pack-0.92')()
239
reference_format = reference_bzrdir_format.repository_format
240
network_name = reference_format.network_name()
241
expected = SuccessfulSmartServerResponse(
242
('ok', 'no', 'no', 'no', network_name))
243
self.assertEqual(expected, request.execute('', network_name, 'True'))
246
class TestSmartServerRequestFindRepository(tests.TestCaseWithMemoryTransport):
247
"""Tests for BzrDir.find_repository."""
249
def test_no_repository(self):
250
"""When there is no repository to be found, ('norepository', ) is returned."""
251
backing = self.get_transport()
252
request = self._request_class(backing)
253
self.make_bzrdir('.')
254
self.assertEqual(SmartServerResponse(('norepository', )),
257
def test_nonshared_repository(self):
258
# nonshared repositorys only allow 'find' to return a handle when the
259
# path the repository is being searched on is the same as that that
260
# the repository is at.
261
backing = self.get_transport()
262
request = self._request_class(backing)
263
result = self._make_repository_and_result()
264
self.assertEqual(result, request.execute(''))
265
self.make_bzrdir('subdir')
266
self.assertEqual(SmartServerResponse(('norepository', )),
267
request.execute('subdir'))
269
def _make_repository_and_result(self, shared=False, format=None):
270
"""Convenience function to setup a repository.
272
:result: The SmartServerResponse to expect when opening it.
274
repo = self.make_repository('.', shared=shared, format=format)
275
if repo.supports_rich_root():
279
if repo._format.supports_tree_reference:
283
if repo._format.supports_external_lookups:
287
if (smart.bzrdir.SmartServerRequestFindRepositoryV3 ==
288
self._request_class):
289
return SuccessfulSmartServerResponse(
290
('ok', '', rich_root, subtrees, external,
291
repo._format.network_name()))
292
elif (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
293
self._request_class):
294
# All tests so far are on formats, and for non-external
296
return SuccessfulSmartServerResponse(
297
('ok', '', rich_root, subtrees, external))
299
return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
301
def test_shared_repository(self):
302
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
303
backing = self.get_transport()
304
request = self._request_class(backing)
305
result = self._make_repository_and_result(shared=True)
306
self.assertEqual(result, request.execute(''))
307
self.make_bzrdir('subdir')
308
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
309
self.assertEqual(result2,
310
request.execute('subdir'))
311
self.make_bzrdir('subdir/deeper')
312
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
313
self.assertEqual(result3,
314
request.execute('subdir/deeper'))
316
def test_rich_root_and_subtree_encoding(self):
317
"""Test for the format attributes for rich root and subtree support."""
318
backing = self.get_transport()
319
request = self._request_class(backing)
320
result = self._make_repository_and_result(format='dirstate-with-subtree')
321
# check the test will be valid
322
self.assertEqual('yes', result.args[2])
323
self.assertEqual('yes', result.args[3])
324
self.assertEqual(result, request.execute(''))
326
def test_supports_external_lookups_no_v2(self):
327
"""Test for the supports_external_lookups attribute."""
328
backing = self.get_transport()
329
request = self._request_class(backing)
330
result = self._make_repository_and_result(format='dirstate-with-subtree')
331
# check the test will be valid
332
self.assertEqual('no', result.args[4])
333
self.assertEqual(result, request.execute(''))
336
class TestSmartServerBzrDirRequestGetConfigFile(
337
tests.TestCaseWithMemoryTransport):
338
"""Tests for BzrDir.get_config_file."""
340
def test_present(self):
341
backing = self.get_transport()
342
dir = self.make_bzrdir('.')
343
dir.get_config().set_default_stack_on("/")
344
local_result = dir._get_config()._get_config_file().read()
345
request_class = smart_dir.SmartServerBzrDirRequestConfigFile
346
request = request_class(backing)
347
expected = SuccessfulSmartServerResponse((), local_result)
348
self.assertEqual(expected, request.execute(''))
350
def test_missing(self):
351
backing = self.get_transport()
352
dir = self.make_bzrdir('.')
353
request_class = smart_dir.SmartServerBzrDirRequestConfigFile
354
request = request_class(backing)
355
expected = SuccessfulSmartServerResponse((), '')
356
self.assertEqual(expected, request.execute(''))
359
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
361
def test_empty_dir(self):
362
"""Initializing an empty dir should succeed and do it."""
363
backing = self.get_transport()
364
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
365
self.assertEqual(SmartServerResponse(('ok', )),
367
made_dir = bzrdir.BzrDir.open_from_transport(backing)
368
# no branch, tree or repository is expected with the current
370
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
371
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
372
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
374
def test_missing_dir(self):
375
"""Initializing a missing directory should fail like the bzrdir api."""
376
backing = self.get_transport()
377
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
378
self.assertRaises(errors.NoSuchFile,
379
request.execute, 'subdir')
381
def test_initialized_dir(self):
382
"""Initializing an extant bzrdir should fail like the bzrdir api."""
383
backing = self.get_transport()
384
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
385
self.make_bzrdir('subdir')
386
self.assertRaises(errors.FileExists,
387
request.execute, 'subdir')
390
class TestSmartServerRequestBzrDirInitializeEx(tests.TestCaseWithMemoryTransport):
391
"""Basic tests for BzrDir.initialize_ex_1.16 in the smart server.
393
The main unit tests in test_bzrdir exercise the API comprehensively.
396
def test_empty_dir(self):
397
"""Initializing an empty dir should succeed and do it."""
398
backing = self.get_transport()
399
name = self.make_bzrdir('reference')._format.network_name()
400
request = smart.bzrdir.SmartServerRequestBzrDirInitializeEx(backing)
401
self.assertEqual(SmartServerResponse(('', '', '', '', '', '', name,
402
'False', '', '', '')),
403
request.execute(name, '', 'True', 'False', 'False', '', '', '', '',
405
made_dir = bzrdir.BzrDir.open_from_transport(backing)
406
# no branch, tree or repository is expected with the current
408
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
409
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
410
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
412
def test_missing_dir(self):
413
"""Initializing a missing directory should fail like the bzrdir api."""
414
backing = self.get_transport()
415
name = self.make_bzrdir('reference')._format.network_name()
416
request = smart.bzrdir.SmartServerRequestBzrDirInitializeEx(backing)
417
self.assertRaises(errors.NoSuchFile, request.execute, name,
418
'subdir/dir', 'False', 'False', 'False', '', '', '', '', 'False')
420
def test_initialized_dir(self):
421
"""Initializing an extant directory should fail like the bzrdir api."""
422
backing = self.get_transport()
423
name = self.make_bzrdir('reference')._format.network_name()
424
request = smart.bzrdir.SmartServerRequestBzrDirInitializeEx(backing)
425
self.make_bzrdir('subdir')
426
self.assertRaises(errors.FileExists, request.execute, name, 'subdir',
427
'False', 'False', 'False', '', '', '', '', 'False')
430
class TestSmartServerRequestOpenBzrDir(tests.TestCaseWithMemoryTransport):
432
def test_no_directory(self):
433
backing = self.get_transport()
434
request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing)
435
self.assertEqual(SmartServerResponse(('no', )),
436
request.execute('does-not-exist'))
438
def test_empty_directory(self):
439
backing = self.get_transport()
440
backing.mkdir('empty')
441
request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing)
442
self.assertEqual(SmartServerResponse(('no', )),
443
request.execute('empty'))
445
def test_outside_root_client_path(self):
446
backing = self.get_transport()
447
request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing,
448
root_client_path='root')
449
self.assertEqual(SmartServerResponse(('no', )),
450
request.execute('not-root'))
453
class TestSmartServerRequestOpenBzrDir_2_1(tests.TestCaseWithMemoryTransport):
455
def test_no_directory(self):
456
backing = self.get_transport()
457
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
458
self.assertEqual(SmartServerResponse(('no', )),
459
request.execute('does-not-exist'))
461
def test_empty_directory(self):
462
backing = self.get_transport()
463
backing.mkdir('empty')
464
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
465
self.assertEqual(SmartServerResponse(('no', )),
466
request.execute('empty'))
468
def test_present_without_workingtree(self):
469
backing = self.get_transport()
470
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
471
self.make_bzrdir('.')
472
self.assertEqual(SmartServerResponse(('yes', 'no')),
475
def test_outside_root_client_path(self):
476
backing = self.get_transport()
477
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing,
478
root_client_path='root')
479
self.assertEqual(SmartServerResponse(('no',)),
480
request.execute('not-root'))
483
class TestSmartServerRequestOpenBzrDir_2_1_disk(TestCaseWithChrootedTransport):
485
def test_present_with_workingtree(self):
486
self.vfs_transport_factory = local.LocalURLServer
487
backing = self.get_transport()
488
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
489
bd = self.make_bzrdir('.')
490
bd.create_repository()
492
bd.create_workingtree()
493
self.assertEqual(SmartServerResponse(('yes', 'yes')),
497
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
499
def test_no_branch(self):
500
"""When there is no branch, ('nobranch', ) is returned."""
501
backing = self.get_transport()
502
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
503
self.make_bzrdir('.')
504
self.assertEqual(SmartServerResponse(('nobranch', )),
507
def test_branch(self):
508
"""When there is a branch, 'ok' is returned."""
509
backing = self.get_transport()
510
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
511
self.make_branch('.')
512
self.assertEqual(SmartServerResponse(('ok', '')),
515
def test_branch_reference(self):
516
"""When there is a branch reference, the reference URL is returned."""
517
self.vfs_transport_factory = local.LocalURLServer
518
backing = self.get_transport()
519
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
520
branch = self.make_branch('branch')
521
checkout = branch.create_checkout('reference',lightweight=True)
522
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
523
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
524
self.assertEqual(SmartServerResponse(('ok', reference_url)),
525
request.execute('reference'))
527
def test_notification_on_branch_from_repository(self):
528
"""When there is a repository, the error should return details."""
529
backing = self.get_transport()
530
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
531
repo = self.make_repository('.')
532
self.assertEqual(SmartServerResponse(('nobranch',)),
536
class TestSmartServerRequestOpenBranchV2(TestCaseWithChrootedTransport):
538
def test_no_branch(self):
539
"""When there is no branch, ('nobranch', ) is returned."""
540
backing = self.get_transport()
541
self.make_bzrdir('.')
542
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
543
self.assertEqual(SmartServerResponse(('nobranch', )),
546
def test_branch(self):
547
"""When there is a branch, 'ok' is returned."""
548
backing = self.get_transport()
549
expected = self.make_branch('.')._format.network_name()
550
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
551
self.assertEqual(SuccessfulSmartServerResponse(('branch', expected)),
554
def test_branch_reference(self):
555
"""When there is a branch reference, the reference URL is returned."""
556
self.vfs_transport_factory = local.LocalURLServer
557
backing = self.get_transport()
558
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
559
branch = self.make_branch('branch')
560
checkout = branch.create_checkout('reference',lightweight=True)
561
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
562
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
563
self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
564
request.execute('reference'))
566
def test_stacked_branch(self):
567
"""Opening a stacked branch does not open the stacked-on branch."""
568
trunk = self.make_branch('trunk')
569
feature = self.make_branch('feature')
570
feature.set_stacked_on_url(trunk.base)
572
Branch.hooks.install_named_hook('open', opened_branches.append, None)
573
backing = self.get_transport()
574
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
577
response = request.execute('feature')
579
request.teardown_jail()
580
expected_format = feature._format.network_name()
582
SuccessfulSmartServerResponse(('branch', expected_format)),
584
self.assertLength(1, opened_branches)
586
def test_notification_on_branch_from_repository(self):
587
"""When there is a repository, the error should return details."""
588
backing = self.get_transport()
589
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
590
repo = self.make_repository('.')
591
self.assertEqual(SmartServerResponse(('nobranch',)),
595
class TestSmartServerRequestOpenBranchV3(TestCaseWithChrootedTransport):
597
def test_no_branch(self):
598
"""When there is no branch, ('nobranch', ) is returned."""
599
backing = self.get_transport()
600
self.make_bzrdir('.')
601
request = smart.bzrdir.SmartServerRequestOpenBranchV3(backing)
602
self.assertEqual(SmartServerResponse(('nobranch',)),
605
def test_branch(self):
606
"""When there is a branch, 'ok' is returned."""
607
backing = self.get_transport()
608
expected = self.make_branch('.')._format.network_name()
609
request = smart.bzrdir.SmartServerRequestOpenBranchV3(backing)
610
self.assertEqual(SuccessfulSmartServerResponse(('branch', expected)),
613
def test_branch_reference(self):
614
"""When there is a branch reference, the reference URL is returned."""
615
self.vfs_transport_factory = local.LocalURLServer
616
backing = self.get_transport()
617
request = smart.bzrdir.SmartServerRequestOpenBranchV3(backing)
618
branch = self.make_branch('branch')
619
checkout = branch.create_checkout('reference',lightweight=True)
620
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
621
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
622
self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
623
request.execute('reference'))
625
def test_stacked_branch(self):
626
"""Opening a stacked branch does not open the stacked-on branch."""
627
trunk = self.make_branch('trunk')
628
feature = self.make_branch('feature')
629
feature.set_stacked_on_url(trunk.base)
631
Branch.hooks.install_named_hook('open', opened_branches.append, None)
632
backing = self.get_transport()
633
request = smart.bzrdir.SmartServerRequestOpenBranchV3(backing)
636
response = request.execute('feature')
638
request.teardown_jail()
639
expected_format = feature._format.network_name()
641
SuccessfulSmartServerResponse(('branch', expected_format)),
643
self.assertLength(1, opened_branches)
645
def test_notification_on_branch_from_repository(self):
646
"""When there is a repository, the error should return details."""
647
backing = self.get_transport()
648
request = smart.bzrdir.SmartServerRequestOpenBranchV3(backing)
649
repo = self.make_repository('.')
651
SmartServerResponse(('nobranch', 'location is a repository')),
655
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
657
def test_empty(self):
658
"""For an empty branch, the body is empty."""
659
backing = self.get_transport()
660
request = smart.branch.SmartServerRequestRevisionHistory(backing)
661
self.make_branch('.')
662
self.assertEqual(SmartServerResponse(('ok', ), ''),
665
def test_not_empty(self):
666
"""For a non-empty branch, the body is empty."""
667
backing = self.get_transport()
668
request = smart.branch.SmartServerRequestRevisionHistory(backing)
669
tree = self.make_branch_and_memory_tree('.')
672
r1 = tree.commit('1st commit')
673
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
676
SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
680
class TestSmartServerBranchRequest(tests.TestCaseWithMemoryTransport):
682
def test_no_branch(self):
683
"""When there is a bzrdir and no branch, NotBranchError is raised."""
684
backing = self.get_transport()
685
request = smart.branch.SmartServerBranchRequest(backing)
686
self.make_bzrdir('.')
687
self.assertRaises(errors.NotBranchError,
690
def test_branch_reference(self):
691
"""When there is a branch reference, NotBranchError is raised."""
692
backing = self.get_transport()
693
request = smart.branch.SmartServerBranchRequest(backing)
694
branch = self.make_branch('branch')
695
checkout = branch.create_checkout('reference',lightweight=True)
696
self.assertRaises(errors.NotBranchError,
697
request.execute, 'checkout')
700
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithMemoryTransport):
702
def test_empty(self):
703
"""For an empty branch, the result is ('ok', '0', 'null:')."""
704
backing = self.get_transport()
705
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
706
self.make_branch('.')
707
self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
710
def test_not_empty(self):
711
"""For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
712
backing = self.get_transport()
713
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
714
tree = self.make_branch_and_memory_tree('.')
717
rev_id_utf8 = u'\xc8'.encode('utf-8')
718
r1 = tree.commit('1st commit')
719
r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
722
SmartServerResponse(('ok', '2', rev_id_utf8)),
726
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithMemoryTransport):
728
def test_default(self):
729
"""With no file, we get empty content."""
730
backing = self.get_transport()
731
request = smart.branch.SmartServerBranchGetConfigFile(backing)
732
branch = self.make_branch('.')
733
# there should be no file by default
735
self.assertEqual(SmartServerResponse(('ok', ), content),
738
def test_with_content(self):
739
# SmartServerBranchGetConfigFile should return the content from
740
# branch.control_files.get('branch.conf') for now - in the future it may
741
# perform more complex processing.
742
backing = self.get_transport()
743
request = smart.branch.SmartServerBranchGetConfigFile(backing)
744
branch = self.make_branch('.')
745
branch._transport.put_bytes('branch.conf', 'foo bar baz')
746
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
750
class TestLockedBranch(tests.TestCaseWithMemoryTransport):
752
def get_lock_tokens(self, branch):
753
branch_token = branch.lock_write()
754
repo_token = branch.repository.lock_write()
755
branch.repository.unlock()
756
return branch_token, repo_token
759
class TestSmartServerBranchRequestSetConfigOption(TestLockedBranch):
761
def test_value_name(self):
762
branch = self.make_branch('.')
763
request = smart.branch.SmartServerBranchRequestSetConfigOption(
764
branch.bzrdir.root_transport)
765
branch_token, repo_token = self.get_lock_tokens(branch)
766
config = branch._get_config()
767
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
769
self.assertEqual(SuccessfulSmartServerResponse(()), result)
770
self.assertEqual('bar', config.get_option('foo'))
774
def test_value_name_section(self):
775
branch = self.make_branch('.')
776
request = smart.branch.SmartServerBranchRequestSetConfigOption(
777
branch.bzrdir.root_transport)
778
branch_token, repo_token = self.get_lock_tokens(branch)
779
config = branch._get_config()
780
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
782
self.assertEqual(SuccessfulSmartServerResponse(()), result)
783
self.assertEqual('bar', config.get_option('foo', 'gam'))
788
class TestSmartServerBranchRequestSetTagsBytes(TestLockedBranch):
789
# Only called when the branch format and tags match [yay factory
790
# methods] so only need to test straight forward cases.
792
def test_set_bytes(self):
793
base_branch = self.make_branch('base')
794
tag_bytes = base_branch._get_tags_bytes()
795
# get_lock_tokens takes out a lock.
796
branch_token, repo_token = self.get_lock_tokens(base_branch)
797
request = smart.branch.SmartServerBranchSetTagsBytes(
798
self.get_transport())
799
response = request.execute('base', branch_token, repo_token)
800
self.assertEqual(None, response)
801
response = request.do_chunk(tag_bytes)
802
self.assertEqual(None, response)
803
response = request.do_end()
805
SuccessfulSmartServerResponse(()), response)
808
def test_lock_failed(self):
809
base_branch = self.make_branch('base')
810
base_branch.lock_write()
811
tag_bytes = base_branch._get_tags_bytes()
812
request = smart.branch.SmartServerBranchSetTagsBytes(
813
self.get_transport())
814
self.assertRaises(errors.TokenMismatch, request.execute,
815
'base', 'wrong token', 'wrong token')
816
# The request handler will keep processing the message parts, so even
817
# if the request fails immediately do_chunk and do_end are still
819
request.do_chunk(tag_bytes)
825
class SetLastRevisionTestBase(TestLockedBranch):
826
"""Base test case for verbs that implement set_last_revision."""
829
tests.TestCaseWithMemoryTransport.setUp(self)
830
backing_transport = self.get_transport()
831
self.request = self.request_class(backing_transport)
832
self.tree = self.make_branch_and_memory_tree('.')
834
def lock_branch(self):
835
return self.get_lock_tokens(self.tree.branch)
837
def unlock_branch(self):
838
self.tree.branch.unlock()
840
def set_last_revision(self, revision_id, revno):
841
branch_token, repo_token = self.lock_branch()
842
response = self._set_last_revision(
843
revision_id, revno, branch_token, repo_token)
847
def assertRequestSucceeds(self, revision_id, revno):
848
response = self.set_last_revision(revision_id, revno)
849
self.assertEqual(SuccessfulSmartServerResponse(('ok',)), response)
852
class TestSetLastRevisionVerbMixin(object):
853
"""Mixin test case for verbs that implement set_last_revision."""
855
def test_set_null_to_null(self):
856
"""An empty branch can have its last revision set to 'null:'."""
857
self.assertRequestSucceeds('null:', 0)
859
def test_NoSuchRevision(self):
860
"""If the revision_id is not present, the verb returns NoSuchRevision.
862
revision_id = 'non-existent revision'
864
FailedSmartServerResponse(('NoSuchRevision', revision_id)),
865
self.set_last_revision(revision_id, 1))
867
def make_tree_with_two_commits(self):
868
self.tree.lock_write()
870
rev_id_utf8 = u'\xc8'.encode('utf-8')
871
r1 = self.tree.commit('1st commit', rev_id=rev_id_utf8)
872
r2 = self.tree.commit('2nd commit', rev_id='rev-2')
875
def test_branch_last_revision_info_is_updated(self):
876
"""A branch's tip can be set to a revision that is present in its
879
# Make a branch with an empty revision history, but two revisions in
881
self.make_tree_with_two_commits()
882
rev_id_utf8 = u'\xc8'.encode('utf-8')
883
self.tree.branch.set_revision_history([])
885
(0, 'null:'), self.tree.branch.last_revision_info())
886
# We can update the branch to a revision that is present in the
888
self.assertRequestSucceeds(rev_id_utf8, 1)
890
(1, rev_id_utf8), self.tree.branch.last_revision_info())
892
def test_branch_last_revision_info_rewind(self):
893
"""A branch's tip can be set to a revision that is an ancestor of the
896
self.make_tree_with_two_commits()
897
rev_id_utf8 = u'\xc8'.encode('utf-8')
899
(2, 'rev-2'), self.tree.branch.last_revision_info())
900
self.assertRequestSucceeds(rev_id_utf8, 1)
902
(1, rev_id_utf8), self.tree.branch.last_revision_info())
904
def test_TipChangeRejected(self):
905
"""If a pre_change_branch_tip hook raises TipChangeRejected, the verb
906
returns TipChangeRejected.
908
rejection_message = u'rejection message\N{INTERROBANG}'
909
def hook_that_rejects(params):
910
raise errors.TipChangeRejected(rejection_message)
911
Branch.hooks.install_named_hook(
912
'pre_change_branch_tip', hook_that_rejects, None)
914
FailedSmartServerResponse(
915
('TipChangeRejected', rejection_message.encode('utf-8'))),
916
self.set_last_revision('null:', 0))
919
class TestSmartServerBranchRequestSetLastRevision(
920
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
921
"""Tests for Branch.set_last_revision verb."""
923
request_class = smart.branch.SmartServerBranchRequestSetLastRevision
925
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
926
return self.request.execute(
927
'', branch_token, repo_token, revision_id)
930
class TestSmartServerBranchRequestSetLastRevisionInfo(
931
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
932
"""Tests for Branch.set_last_revision_info verb."""
934
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionInfo
936
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
937
return self.request.execute(
938
'', branch_token, repo_token, revno, revision_id)
940
def test_NoSuchRevision(self):
941
"""Branch.set_last_revision_info does not have to return
942
NoSuchRevision if the revision_id is absent.
944
raise tests.TestNotApplicable()
947
class TestSmartServerBranchRequestSetLastRevisionEx(
948
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
949
"""Tests for Branch.set_last_revision_ex verb."""
951
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionEx
953
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
954
return self.request.execute(
955
'', branch_token, repo_token, revision_id, 0, 0)
957
def assertRequestSucceeds(self, revision_id, revno):
958
response = self.set_last_revision(revision_id, revno)
960
SuccessfulSmartServerResponse(('ok', revno, revision_id)),
963
def test_branch_last_revision_info_rewind(self):
964
"""A branch's tip can be set to a revision that is an ancestor of the
965
current tip, but only if allow_overwrite_descendant is passed.
967
self.make_tree_with_two_commits()
968
rev_id_utf8 = u'\xc8'.encode('utf-8')
970
(2, 'rev-2'), self.tree.branch.last_revision_info())
971
# If allow_overwrite_descendant flag is 0, then trying to set the tip
972
# to an older revision ID has no effect.
973
branch_token, repo_token = self.lock_branch()
974
response = self.request.execute(
975
'', branch_token, repo_token, rev_id_utf8, 0, 0)
977
SuccessfulSmartServerResponse(('ok', 2, 'rev-2')),
980
(2, 'rev-2'), self.tree.branch.last_revision_info())
982
# If allow_overwrite_descendant flag is 1, then setting the tip to an
984
response = self.request.execute(
985
'', branch_token, repo_token, rev_id_utf8, 0, 1)
987
SuccessfulSmartServerResponse(('ok', 1, rev_id_utf8)),
991
(1, rev_id_utf8), self.tree.branch.last_revision_info())
993
def make_branch_with_divergent_history(self):
994
"""Make a branch with divergent history in its repo.
996
The branch's tip will be 'child-2', and the repo will also contain
997
'child-1', which diverges from a common base revision.
999
self.tree.lock_write()
1001
r1 = self.tree.commit('1st commit')
1002
revno_1, revid_1 = self.tree.branch.last_revision_info()
1003
r2 = self.tree.commit('2nd commit', rev_id='child-1')
1004
# Undo the second commit
1005
self.tree.branch.set_last_revision_info(revno_1, revid_1)
1006
self.tree.set_parent_ids([revid_1])
1007
# Make a new second commit, child-2. child-2 has diverged from
1009
new_r2 = self.tree.commit('2nd commit', rev_id='child-2')
1012
def test_not_allow_diverged(self):
1013
"""If allow_diverged is not passed, then setting a divergent history
1014
returns a Diverged error.
1016
self.make_branch_with_divergent_history()
1018
FailedSmartServerResponse(('Diverged',)),
1019
self.set_last_revision('child-1', 2))
1020
# The branch tip was not changed.
1021
self.assertEqual('child-2', self.tree.branch.last_revision())
1023
def test_allow_diverged(self):
1024
"""If allow_diverged is passed, then setting a divergent history
1027
self.make_branch_with_divergent_history()
1028
branch_token, repo_token = self.lock_branch()
1029
response = self.request.execute(
1030
'', branch_token, repo_token, 'child-1', 1, 0)
1032
SuccessfulSmartServerResponse(('ok', 2, 'child-1')),
1034
self.unlock_branch()
1035
# The branch tip was changed.
1036
self.assertEqual('child-1', self.tree.branch.last_revision())
1039
class TestSmartServerBranchRequestGetParent(tests.TestCaseWithMemoryTransport):
1041
def test_get_parent_none(self):
1042
base_branch = self.make_branch('base')
1043
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
1044
response = request.execute('base')
1046
SuccessfulSmartServerResponse(('',)), response)
1048
def test_get_parent_something(self):
1049
base_branch = self.make_branch('base')
1050
base_branch.set_parent(self.get_url('foo'))
1051
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
1052
response = request.execute('base')
1054
SuccessfulSmartServerResponse(("../foo",)),
1058
class TestSmartServerBranchRequestSetParent(tests.TestCaseWithMemoryTransport):
1060
def test_set_parent_none(self):
1061
branch = self.make_branch('base', format="1.9")
1063
branch._set_parent_location('foo')
1065
request = smart.branch.SmartServerBranchRequestSetParentLocation(
1066
self.get_transport())
1067
branch_token = branch.lock_write()
1068
repo_token = branch.repository.lock_write()
1070
response = request.execute('base', branch_token, repo_token, '')
1072
branch.repository.unlock()
1074
self.assertEqual(SuccessfulSmartServerResponse(()), response)
1075
self.assertEqual(None, branch.get_parent())
1077
def test_set_parent_something(self):
1078
branch = self.make_branch('base', format="1.9")
1079
request = smart.branch.SmartServerBranchRequestSetParentLocation(
1080
self.get_transport())
1081
branch_token = branch.lock_write()
1082
repo_token = branch.repository.lock_write()
1084
response = request.execute('base', branch_token, repo_token,
1087
branch.repository.unlock()
1089
self.assertEqual(SuccessfulSmartServerResponse(()), response)
1090
self.assertEqual('http://bar/', branch.get_parent())
1093
class TestSmartServerBranchRequestGetTagsBytes(tests.TestCaseWithMemoryTransport):
1094
# Only called when the branch format and tags match [yay factory
1095
# methods] so only need to test straight forward cases.
1097
def test_get_bytes(self):
1098
base_branch = self.make_branch('base')
1099
request = smart.branch.SmartServerBranchGetTagsBytes(
1100
self.get_transport())
1101
response = request.execute('base')
1103
SuccessfulSmartServerResponse(('',)), response)
1106
class TestSmartServerBranchRequestGetStackedOnURL(tests.TestCaseWithMemoryTransport):
1108
def test_get_stacked_on_url(self):
1109
base_branch = self.make_branch('base', format='1.6')
1110
stacked_branch = self.make_branch('stacked', format='1.6')
1111
# typically should be relative
1112
stacked_branch.set_stacked_on_url('../base')
1113
request = smart.branch.SmartServerBranchRequestGetStackedOnURL(
1114
self.get_transport())
1115
response = request.execute('stacked')
1117
SmartServerResponse(('ok', '../base')),
1121
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
1124
tests.TestCaseWithMemoryTransport.setUp(self)
1126
def test_lock_write_on_unlocked_branch(self):
1127
backing = self.get_transport()
1128
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
1129
branch = self.make_branch('.', format='knit')
1130
repository = branch.repository
1131
response = request.execute('')
1132
branch_nonce = branch.control_files._lock.peek().get('nonce')
1133
repository_nonce = repository.control_files._lock.peek().get('nonce')
1135
SmartServerResponse(('ok', branch_nonce, repository_nonce)),
1137
# The branch (and associated repository) is now locked. Verify that
1138
# with a new branch object.
1139
new_branch = repository.bzrdir.open_branch()
1140
self.assertRaises(errors.LockContention, new_branch.lock_write)
1142
request = smart.branch.SmartServerBranchRequestUnlock(backing)
1143
response = request.execute('', branch_nonce, repository_nonce)
1145
def test_lock_write_on_locked_branch(self):
1146
backing = self.get_transport()
1147
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
1148
branch = self.make_branch('.')
1149
branch_token = branch.lock_write()
1150
branch.leave_lock_in_place()
1152
response = request.execute('')
1154
SmartServerResponse(('LockContention',)), response)
1156
branch.lock_write(branch_token)
1157
branch.dont_leave_lock_in_place()
1160
def test_lock_write_with_tokens_on_locked_branch(self):
1161
backing = self.get_transport()
1162
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
1163
branch = self.make_branch('.', format='knit')
1164
branch_token = branch.lock_write()
1165
repo_token = branch.repository.lock_write()
1166
branch.repository.unlock()
1167
branch.leave_lock_in_place()
1168
branch.repository.leave_lock_in_place()
1170
response = request.execute('',
1171
branch_token, repo_token)
1173
SmartServerResponse(('ok', branch_token, repo_token)), response)
1175
branch.repository.lock_write(repo_token)
1176
branch.repository.dont_leave_lock_in_place()
1177
branch.repository.unlock()
1178
branch.lock_write(branch_token)
1179
branch.dont_leave_lock_in_place()
1182
def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
1183
backing = self.get_transport()
1184
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
1185
branch = self.make_branch('.', format='knit')
1186
branch_token = branch.lock_write()
1187
repo_token = branch.repository.lock_write()
1188
branch.repository.unlock()
1189
branch.leave_lock_in_place()
1190
branch.repository.leave_lock_in_place()
1192
response = request.execute('',
1193
branch_token+'xxx', repo_token)
1195
SmartServerResponse(('TokenMismatch',)), response)
1197
branch.repository.lock_write(repo_token)
1198
branch.repository.dont_leave_lock_in_place()
1199
branch.repository.unlock()
1200
branch.lock_write(branch_token)
1201
branch.dont_leave_lock_in_place()
1204
def test_lock_write_on_locked_repo(self):
1205
backing = self.get_transport()
1206
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
1207
branch = self.make_branch('.', format='knit')
1208
repo = branch.repository
1209
repo_token = repo.lock_write()
1210
repo.leave_lock_in_place()
1212
response = request.execute('')
1214
SmartServerResponse(('LockContention',)), response)
1216
repo.lock_write(repo_token)
1217
repo.dont_leave_lock_in_place()
1220
def test_lock_write_on_readonly_transport(self):
1221
backing = self.get_readonly_transport()
1222
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
1223
branch = self.make_branch('.')
1224
root = self.get_transport().clone('/')
1225
path = urlutils.relative_url(root.base, self.get_transport().base)
1226
response = request.execute(path)
1227
error_name, lock_str, why_str = response.args
1228
self.assertFalse(response.is_successful())
1229
self.assertEqual('LockFailed', error_name)
1232
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithMemoryTransport):
1235
tests.TestCaseWithMemoryTransport.setUp(self)
1237
def test_unlock_on_locked_branch_and_repo(self):
1238
backing = self.get_transport()
1239
request = smart.branch.SmartServerBranchRequestUnlock(backing)
1240
branch = self.make_branch('.', format='knit')
1242
branch_token = branch.lock_write()
1243
repo_token = branch.repository.lock_write()
1244
branch.repository.unlock()
1245
# Unlock the branch (and repo) object, leaving the physical locks
1247
branch.leave_lock_in_place()
1248
branch.repository.leave_lock_in_place()
1250
response = request.execute('',
1251
branch_token, repo_token)
1253
SmartServerResponse(('ok',)), response)
1254
# The branch is now unlocked. Verify that with a new branch
1256
new_branch = branch.bzrdir.open_branch()
1257
new_branch.lock_write()
1260
def test_unlock_on_unlocked_branch_unlocked_repo(self):
1261
backing = self.get_transport()
1262
request = smart.branch.SmartServerBranchRequestUnlock(backing)
1263
branch = self.make_branch('.', format='knit')
1264
response = request.execute(
1265
'', 'branch token', 'repo token')
1267
SmartServerResponse(('TokenMismatch',)), response)
1269
def test_unlock_on_unlocked_branch_locked_repo(self):
1270
backing = self.get_transport()
1271
request = smart.branch.SmartServerBranchRequestUnlock(backing)
1272
branch = self.make_branch('.', format='knit')
1273
# Lock the repository.
1274
repo_token = branch.repository.lock_write()
1275
branch.repository.leave_lock_in_place()
1276
branch.repository.unlock()
1277
# Issue branch lock_write request on the unlocked branch (with locked
1279
response = request.execute(
1280
'', 'branch token', repo_token)
1282
SmartServerResponse(('TokenMismatch',)), response)
1284
branch.repository.lock_write(repo_token)
1285
branch.repository.dont_leave_lock_in_place()
1286
branch.repository.unlock()
1289
class TestSmartServerRepositoryRequest(tests.TestCaseWithMemoryTransport):
1291
def test_no_repository(self):
1292
"""Raise NoRepositoryPresent when there is a bzrdir and no repo."""
1293
# we test this using a shared repository above the named path,
1294
# thus checking the right search logic is used - that is, that
1295
# its the exact path being looked at and the server is not
1297
backing = self.get_transport()
1298
request = smart.repository.SmartServerRepositoryRequest(backing)
1299
self.make_repository('.', shared=True)
1300
self.make_bzrdir('subdir')
1301
self.assertRaises(errors.NoRepositoryPresent,
1302
request.execute, 'subdir')
1305
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithMemoryTransport):
1307
def test_trivial_bzipped(self):
1308
# This tests that the wire encoding is actually bzipped
1309
backing = self.get_transport()
1310
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
1311
tree = self.make_branch_and_memory_tree('.')
1313
self.assertEqual(None,
1314
request.execute('', 'missing-id'))
1315
# Note that it returns a body that is bzipped.
1317
SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
1318
request.do_body('\n\n0\n'))
1320
def test_trivial_include_missing(self):
1321
backing = self.get_transport()
1322
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
1323
tree = self.make_branch_and_memory_tree('.')
1325
self.assertEqual(None,
1326
request.execute('', 'missing-id', 'include-missing:'))
1328
SuccessfulSmartServerResponse(('ok', ),
1329
bz2.compress('missing:missing-id')),
1330
request.do_body('\n\n0\n'))
1333
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithMemoryTransport):
1335
def test_none_argument(self):
1336
backing = self.get_transport()
1337
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1338
tree = self.make_branch_and_memory_tree('.')
1341
r1 = tree.commit('1st commit')
1342
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1345
# the lines of revision_id->revision_parent_list has no guaranteed
1346
# order coming out of a dict, so sort both our test and response
1347
lines = sorted([' '.join([r2, r1]), r1])
1348
response = request.execute('', '')
1349
response.body = '\n'.join(sorted(response.body.split('\n')))
1352
SmartServerResponse(('ok', ), '\n'.join(lines)), response)
1354
def test_specific_revision_argument(self):
1355
backing = self.get_transport()
1356
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1357
tree = self.make_branch_and_memory_tree('.')
1360
rev_id_utf8 = u'\xc9'.encode('utf-8')
1361
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
1362
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1365
self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
1366
request.execute('', rev_id_utf8))
1368
def test_no_such_revision(self):
1369
backing = self.get_transport()
1370
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1371
tree = self.make_branch_and_memory_tree('.')
1374
r1 = tree.commit('1st commit')
1377
# Note that it still returns body (of zero bytes).
1379
SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
1380
request.execute('', 'missingrevision'))
1383
class TestSmartServerRepositoryGetRevIdForRevno(tests.TestCaseWithMemoryTransport):
1385
def test_revno_found(self):
1386
backing = self.get_transport()
1387
request = smart.repository.SmartServerRepositoryGetRevIdForRevno(backing)
1388
tree = self.make_branch_and_memory_tree('.')
1391
rev1_id_utf8 = u'\xc8'.encode('utf-8')
1392
rev2_id_utf8 = u'\xc9'.encode('utf-8')
1393
tree.commit('1st commit', rev_id=rev1_id_utf8)
1394
tree.commit('2nd commit', rev_id=rev2_id_utf8)
1397
self.assertEqual(SmartServerResponse(('ok', rev1_id_utf8)),
1398
request.execute('', 1, (2, rev2_id_utf8)))
1400
def test_known_revid_missing(self):
1401
backing = self.get_transport()
1402
request = smart.repository.SmartServerRepositoryGetRevIdForRevno(backing)
1403
repo = self.make_repository('.')
1405
FailedSmartServerResponse(('nosuchrevision', 'ghost')),
1406
request.execute('', 1, (2, 'ghost')))
1408
def test_history_incomplete(self):
1409
backing = self.get_transport()
1410
request = smart.repository.SmartServerRepositoryGetRevIdForRevno(backing)
1411
parent = self.make_branch_and_memory_tree('parent', format='1.9')
1413
parent.add([''], ['TREE_ROOT'])
1414
r1 = parent.commit(message='first commit')
1415
r2 = parent.commit(message='second commit')
1417
local = self.make_branch_and_memory_tree('local', format='1.9')
1418
local.branch.pull(parent.branch)
1419
local.set_parent_ids([r2])
1420
r3 = local.commit(message='local commit')
1421
local.branch.create_clone_on_transport(
1422
self.get_transport('stacked'), stacked_on=self.get_url('parent'))
1424
SmartServerResponse(('history-incomplete', 2, r2)),
1425
request.execute('stacked', 1, (3, r3)))
1428
class TestSmartServerRepositoryGetStream(tests.TestCaseWithMemoryTransport):
1430
def make_two_commit_repo(self):
1431
tree = self.make_branch_and_memory_tree('.')
1434
r1 = tree.commit('1st commit')
1435
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1437
repo = tree.branch.repository
1440
def test_ancestry_of(self):
1441
"""The search argument may be a 'ancestry-of' some heads'."""
1442
backing = self.get_transport()
1443
request = smart.repository.SmartServerRepositoryGetStream(backing)
1444
repo, r1, r2 = self.make_two_commit_repo()
1445
fetch_spec = ['ancestry-of', r2]
1446
lines = '\n'.join(fetch_spec)
1447
request.execute('', repo._format.network_name())
1448
response = request.do_body(lines)
1449
self.assertEqual(('ok',), response.args)
1450
stream_bytes = ''.join(response.body_stream)
1451
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1453
def test_search(self):
1454
"""The search argument may be a 'search' of some explicit keys."""
1455
backing = self.get_transport()
1456
request = smart.repository.SmartServerRepositoryGetStream(backing)
1457
repo, r1, r2 = self.make_two_commit_repo()
1458
fetch_spec = ['search', '%s %s' % (r1, r2), 'null:', '2']
1459
lines = '\n'.join(fetch_spec)
1460
request.execute('', repo._format.network_name())
1461
response = request.do_body(lines)
1462
self.assertEqual(('ok',), response.args)
1463
stream_bytes = ''.join(response.body_stream)
1464
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1467
class TestSmartServerRequestHasRevision(tests.TestCaseWithMemoryTransport):
1469
def test_missing_revision(self):
1470
"""For a missing revision, ('no', ) is returned."""
1471
backing = self.get_transport()
1472
request = smart.repository.SmartServerRequestHasRevision(backing)
1473
self.make_repository('.')
1474
self.assertEqual(SmartServerResponse(('no', )),
1475
request.execute('', 'revid'))
1477
def test_present_revision(self):
1478
"""For a present revision, ('yes', ) is returned."""
1479
backing = self.get_transport()
1480
request = smart.repository.SmartServerRequestHasRevision(backing)
1481
tree = self.make_branch_and_memory_tree('.')
1484
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1485
r1 = tree.commit('a commit', rev_id=rev_id_utf8)
1487
self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
1488
self.assertEqual(SmartServerResponse(('yes', )),
1489
request.execute('', rev_id_utf8))
1492
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithMemoryTransport):
1494
def test_empty_revid(self):
1495
"""With an empty revid, we get only size an number and revisions"""
1496
backing = self.get_transport()
1497
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1498
repository = self.make_repository('.')
1499
stats = repository.gather_stats()
1500
expected_body = 'revisions: 0\n'
1501
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1502
request.execute('', '', 'no'))
1504
def test_revid_with_committers(self):
1505
"""For a revid we get more infos."""
1506
backing = self.get_transport()
1507
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1508
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1509
tree = self.make_branch_and_memory_tree('.')
1512
# Let's build a predictable result
1513
tree.commit('a commit', timestamp=123456.2, timezone=3600)
1514
tree.commit('a commit', timestamp=654321.4, timezone=0,
1518
stats = tree.branch.repository.gather_stats()
1519
expected_body = ('firstrev: 123456.200 3600\n'
1520
'latestrev: 654321.400 0\n'
1522
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1526
def test_not_empty_repository_with_committers(self):
1527
"""For a revid and requesting committers we get the whole thing."""
1528
backing = self.get_transport()
1529
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1530
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1531
tree = self.make_branch_and_memory_tree('.')
1534
# Let's build a predictable result
1535
tree.commit('a commit', timestamp=123456.2, timezone=3600,
1537
tree.commit('a commit', timestamp=654321.4, timezone=0,
1538
committer='bar', rev_id=rev_id_utf8)
1540
stats = tree.branch.repository.gather_stats()
1542
expected_body = ('committers: 2\n'
1543
'firstrev: 123456.200 3600\n'
1544
'latestrev: 654321.400 0\n'
1546
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1548
rev_id_utf8, 'yes'))
1551
class TestSmartServerRepositoryIsShared(tests.TestCaseWithMemoryTransport):
1553
def test_is_shared(self):
1554
"""For a shared repository, ('yes', ) is returned."""
1555
backing = self.get_transport()
1556
request = smart.repository.SmartServerRepositoryIsShared(backing)
1557
self.make_repository('.', shared=True)
1558
self.assertEqual(SmartServerResponse(('yes', )),
1559
request.execute('', ))
1561
def test_is_not_shared(self):
1562
"""For a shared repository, ('no', ) is returned."""
1563
backing = self.get_transport()
1564
request = smart.repository.SmartServerRepositoryIsShared(backing)
1565
self.make_repository('.', shared=False)
1566
self.assertEqual(SmartServerResponse(('no', )),
1567
request.execute('', ))
1570
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithMemoryTransport):
1572
def test_lock_write_on_unlocked_repo(self):
1573
backing = self.get_transport()
1574
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1575
repository = self.make_repository('.', format='knit')
1576
response = request.execute('')
1577
nonce = repository.control_files._lock.peek().get('nonce')
1578
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
1579
# The repository is now locked. Verify that with a new repository
1581
new_repo = repository.bzrdir.open_repository()
1582
self.assertRaises(errors.LockContention, new_repo.lock_write)
1584
request = smart.repository.SmartServerRepositoryUnlock(backing)
1585
response = request.execute('', nonce)
1587
def test_lock_write_on_locked_repo(self):
1588
backing = self.get_transport()
1589
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1590
repository = self.make_repository('.', format='knit')
1591
repo_token = repository.lock_write()
1592
repository.leave_lock_in_place()
1594
response = request.execute('')
1596
SmartServerResponse(('LockContention',)), response)
1598
repository.lock_write(repo_token)
1599
repository.dont_leave_lock_in_place()
1602
def test_lock_write_on_readonly_transport(self):
1603
backing = self.get_readonly_transport()
1604
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1605
repository = self.make_repository('.', format='knit')
1606
response = request.execute('')
1607
self.assertFalse(response.is_successful())
1608
self.assertEqual('LockFailed', response.args[0])
1611
class TestInsertStreamBase(tests.TestCaseWithMemoryTransport):
1613
def make_empty_byte_stream(self, repo):
1614
byte_stream = smart.repository._stream_to_byte_stream([], repo._format)
1615
return ''.join(byte_stream)
1618
class TestSmartServerRepositoryInsertStream(TestInsertStreamBase):
1620
def test_insert_stream_empty(self):
1621
backing = self.get_transport()
1622
request = smart.repository.SmartServerRepositoryInsertStream(backing)
1623
repository = self.make_repository('.')
1624
response = request.execute('', '')
1625
self.assertEqual(None, response)
1626
response = request.do_chunk(self.make_empty_byte_stream(repository))
1627
self.assertEqual(None, response)
1628
response = request.do_end()
1629
self.assertEqual(SmartServerResponse(('ok', )), response)
1632
class TestSmartServerRepositoryInsertStreamLocked(TestInsertStreamBase):
1634
def test_insert_stream_empty(self):
1635
backing = self.get_transport()
1636
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1638
repository = self.make_repository('.', format='knit')
1639
lock_token = repository.lock_write()
1640
response = request.execute('', '', lock_token)
1641
self.assertEqual(None, response)
1642
response = request.do_chunk(self.make_empty_byte_stream(repository))
1643
self.assertEqual(None, response)
1644
response = request.do_end()
1645
self.assertEqual(SmartServerResponse(('ok', )), response)
1648
def test_insert_stream_with_wrong_lock_token(self):
1649
backing = self.get_transport()
1650
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1652
repository = self.make_repository('.', format='knit')
1653
lock_token = repository.lock_write()
1655
errors.TokenMismatch, request.execute, '', '', 'wrong-token')
1659
class TestSmartServerRepositoryUnlock(tests.TestCaseWithMemoryTransport):
1662
tests.TestCaseWithMemoryTransport.setUp(self)
1664
def test_unlock_on_locked_repo(self):
1665
backing = self.get_transport()
1666
request = smart.repository.SmartServerRepositoryUnlock(backing)
1667
repository = self.make_repository('.', format='knit')
1668
token = repository.lock_write()
1669
repository.leave_lock_in_place()
1671
response = request.execute('', token)
1673
SmartServerResponse(('ok',)), response)
1674
# The repository is now unlocked. Verify that with a new repository
1676
new_repo = repository.bzrdir.open_repository()
1677
new_repo.lock_write()
1680
def test_unlock_on_unlocked_repo(self):
1681
backing = self.get_transport()
1682
request = smart.repository.SmartServerRepositoryUnlock(backing)
1683
repository = self.make_repository('.', format='knit')
1684
response = request.execute('', 'some token')
1686
SmartServerResponse(('TokenMismatch',)), response)
1689
class TestSmartServerIsReadonly(tests.TestCaseWithMemoryTransport):
1691
def test_is_readonly_no(self):
1692
backing = self.get_transport()
1693
request = smart.request.SmartServerIsReadonly(backing)
1694
response = request.execute()
1696
SmartServerResponse(('no',)), response)
1698
def test_is_readonly_yes(self):
1699
backing = self.get_readonly_transport()
1700
request = smart.request.SmartServerIsReadonly(backing)
1701
response = request.execute()
1703
SmartServerResponse(('yes',)), response)
1706
class TestSmartServerRepositorySetMakeWorkingTrees(tests.TestCaseWithMemoryTransport):
1708
def test_set_false(self):
1709
backing = self.get_transport()
1710
repo = self.make_repository('.', shared=True)
1711
repo.set_make_working_trees(True)
1712
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1713
request = request_class(backing)
1714
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1715
request.execute('', 'False'))
1716
repo = repo.bzrdir.open_repository()
1717
self.assertFalse(repo.make_working_trees())
1719
def test_set_true(self):
1720
backing = self.get_transport()
1721
repo = self.make_repository('.', shared=True)
1722
repo.set_make_working_trees(False)
1723
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1724
request = request_class(backing)
1725
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1726
request.execute('', 'True'))
1727
repo = repo.bzrdir.open_repository()
1728
self.assertTrue(repo.make_working_trees())
1731
class TestSmartServerPackRepositoryAutopack(tests.TestCaseWithTransport):
1733
def make_repo_needing_autopacking(self, path='.'):
1734
# Make a repo in need of autopacking.
1735
tree = self.make_branch_and_tree('.', format='pack-0.92')
1736
repo = tree.branch.repository
1737
# monkey-patch the pack collection to disable autopacking
1738
repo._pack_collection._max_pack_count = lambda count: count
1740
tree.commit('commit %s' % x)
1741
self.assertEqual(10, len(repo._pack_collection.names()))
1742
del repo._pack_collection._max_pack_count
1745
def test_autopack_needed(self):
1746
repo = self.make_repo_needing_autopacking()
1748
self.addCleanup(repo.unlock)
1749
backing = self.get_transport()
1750
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1752
response = request.execute('')
1753
self.assertEqual(SmartServerResponse(('ok',)), response)
1754
repo._pack_collection.reload_pack_names()
1755
self.assertEqual(1, len(repo._pack_collection.names()))
1757
def test_autopack_not_needed(self):
1758
tree = self.make_branch_and_tree('.', format='pack-0.92')
1759
repo = tree.branch.repository
1761
self.addCleanup(repo.unlock)
1763
tree.commit('commit %s' % x)
1764
backing = self.get_transport()
1765
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1767
response = request.execute('')
1768
self.assertEqual(SmartServerResponse(('ok',)), response)
1769
repo._pack_collection.reload_pack_names()
1770
self.assertEqual(9, len(repo._pack_collection.names()))
1772
def test_autopack_on_nonpack_format(self):
1773
"""A request to autopack a non-pack repo is a no-op."""
1774
repo = self.make_repository('.', format='knit')
1775
backing = self.get_transport()
1776
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1778
response = request.execute('')
1779
self.assertEqual(SmartServerResponse(('ok',)), response)
1782
class TestSmartServerVfsGet(tests.TestCaseWithMemoryTransport):
1784
def test_unicode_path(self):
1785
"""VFS requests expect unicode paths to be escaped."""
1786
filename = u'foo\N{INTERROBANG}'
1787
filename_escaped = urlutils.escape(filename)
1788
backing = self.get_transport()
1789
request = smart.vfs.GetRequest(backing)
1790
backing.put_bytes_non_atomic(filename_escaped, 'contents')
1791
self.assertEqual(SmartServerResponse(('ok', ), 'contents'),
1792
request.execute(filename_escaped))
1795
class TestHandlers(tests.TestCase):
1796
"""Tests for the request.request_handlers object."""
1798
def test_all_registrations_exist(self):
1799
"""All registered request_handlers can be found."""
1800
# If there's a typo in a register_lazy call, this loop will fail with
1801
# an AttributeError.
1802
for key, item in smart.request.request_handlers.iteritems():
1805
def assertHandlerEqual(self, verb, handler):
1806
self.assertEqual(smart.request.request_handlers.get(verb), handler)
1808
def test_registered_methods(self):
1809
"""Test that known methods are registered to the correct object."""
1810
self.assertHandlerEqual('Branch.get_config_file',
1811
smart.branch.SmartServerBranchGetConfigFile)
1812
self.assertHandlerEqual('Branch.get_parent',
1813
smart.branch.SmartServerBranchGetParent)
1814
self.assertHandlerEqual('Branch.get_tags_bytes',
1815
smart.branch.SmartServerBranchGetTagsBytes)
1816
self.assertHandlerEqual('Branch.lock_write',
1817
smart.branch.SmartServerBranchRequestLockWrite)
1818
self.assertHandlerEqual('Branch.last_revision_info',
1819
smart.branch.SmartServerBranchRequestLastRevisionInfo)
1820
self.assertHandlerEqual('Branch.revision_history',
1821
smart.branch.SmartServerRequestRevisionHistory)
1822
self.assertHandlerEqual('Branch.set_config_option',
1823
smart.branch.SmartServerBranchRequestSetConfigOption)
1824
self.assertHandlerEqual('Branch.set_last_revision',
1825
smart.branch.SmartServerBranchRequestSetLastRevision)
1826
self.assertHandlerEqual('Branch.set_last_revision_info',
1827
smart.branch.SmartServerBranchRequestSetLastRevisionInfo)
1828
self.assertHandlerEqual('Branch.set_last_revision_ex',
1829
smart.branch.SmartServerBranchRequestSetLastRevisionEx)
1830
self.assertHandlerEqual('Branch.set_parent_location',
1831
smart.branch.SmartServerBranchRequestSetParentLocation)
1832
self.assertHandlerEqual('Branch.unlock',
1833
smart.branch.SmartServerBranchRequestUnlock)
1834
self.assertHandlerEqual('BzrDir.find_repository',
1835
smart.bzrdir.SmartServerRequestFindRepositoryV1)
1836
self.assertHandlerEqual('BzrDir.find_repositoryV2',
1837
smart.bzrdir.SmartServerRequestFindRepositoryV2)
1838
self.assertHandlerEqual('BzrDirFormat.initialize',
1839
smart.bzrdir.SmartServerRequestInitializeBzrDir)
1840
self.assertHandlerEqual('BzrDirFormat.initialize_ex_1.16',
1841
smart.bzrdir.SmartServerRequestBzrDirInitializeEx)
1842
self.assertHandlerEqual('BzrDir.cloning_metadir',
1843
smart.bzrdir.SmartServerBzrDirRequestCloningMetaDir)
1844
self.assertHandlerEqual('BzrDir.get_config_file',
1845
smart.bzrdir.SmartServerBzrDirRequestConfigFile)
1846
self.assertHandlerEqual('BzrDir.open_branch',
1847
smart.bzrdir.SmartServerRequestOpenBranch)
1848
self.assertHandlerEqual('BzrDir.open_branchV2',
1849
smart.bzrdir.SmartServerRequestOpenBranchV2)
1850
self.assertHandlerEqual('BzrDir.open_branchV3',
1851
smart.bzrdir.SmartServerRequestOpenBranchV3)
1852
self.assertHandlerEqual('PackRepository.autopack',
1853
smart.packrepository.SmartServerPackRepositoryAutopack)
1854
self.assertHandlerEqual('Repository.gather_stats',
1855
smart.repository.SmartServerRepositoryGatherStats)
1856
self.assertHandlerEqual('Repository.get_parent_map',
1857
smart.repository.SmartServerRepositoryGetParentMap)
1858
self.assertHandlerEqual('Repository.get_rev_id_for_revno',
1859
smart.repository.SmartServerRepositoryGetRevIdForRevno)
1860
self.assertHandlerEqual('Repository.get_revision_graph',
1861
smart.repository.SmartServerRepositoryGetRevisionGraph)
1862
self.assertHandlerEqual('Repository.get_stream',
1863
smart.repository.SmartServerRepositoryGetStream)
1864
self.assertHandlerEqual('Repository.has_revision',
1865
smart.repository.SmartServerRequestHasRevision)
1866
self.assertHandlerEqual('Repository.insert_stream',
1867
smart.repository.SmartServerRepositoryInsertStream)
1868
self.assertHandlerEqual('Repository.insert_stream_locked',
1869
smart.repository.SmartServerRepositoryInsertStreamLocked)
1870
self.assertHandlerEqual('Repository.is_shared',
1871
smart.repository.SmartServerRepositoryIsShared)
1872
self.assertHandlerEqual('Repository.lock_write',
1873
smart.repository.SmartServerRepositoryLockWrite)
1874
self.assertHandlerEqual('Repository.tarball',
1875
smart.repository.SmartServerRepositoryTarball)
1876
self.assertHandlerEqual('Repository.unlock',
1877
smart.repository.SmartServerRepositoryUnlock)
1878
self.assertHandlerEqual('Transport.is_readonly',
1879
smart.request.SmartServerIsReadonly)