1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the smart wire/domain protocol.
19
This module contains tests for the domain-level smart requests and responses,
20
such as the 'Branch.lock_write' request. Many of these use specific disk
21
formats to exercise calls that only make sense for formats with specific
24
Tests for low-level protocol encoding are found in test_smart_transport.
28
from cStringIO import StringIO
40
from bzrlib.branch import Branch, BranchReferenceFormat
41
import bzrlib.smart.branch
42
import bzrlib.smart.bzrdir, bzrlib.smart.bzrdir as smart_dir
43
import bzrlib.smart.packrepository
44
import bzrlib.smart.repository
45
from bzrlib.smart.request import (
46
FailedSmartServerResponse,
49
SuccessfulSmartServerResponse,
51
from bzrlib.tests import (
54
from bzrlib.transport import chroot, get_transport
57
def load_tests(standard_tests, module, loader):
58
"""Multiply tests version and protocol consistency."""
59
# FindRepository tests.
60
bzrdir_mod = bzrlib.smart.bzrdir
63
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
64
("find_repositoryV2", {
65
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
66
("find_repositoryV3", {
67
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV3}),
69
to_adapt, result = split_suite_by_re(standard_tests,
70
"TestSmartServerRequestFindRepository")
71
v2_only, v1_and_2 = split_suite_by_re(to_adapt,
73
tests.multiply_tests(v1_and_2, scenarios, result)
74
# The first scenario is only applicable to v1 protocols, it is deleted
76
tests.multiply_tests(v2_only, scenarios[1:], result)
80
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
83
tests.TestCaseWithTransport.setUp(self)
84
self._chroot_server = None
86
def get_transport(self, relpath=None):
87
if self._chroot_server is None:
88
backing_transport = tests.TestCaseWithTransport.get_transport(self)
89
self._chroot_server = chroot.ChrootServer(backing_transport)
90
self._chroot_server.setUp()
91
self.addCleanup(self._chroot_server.tearDown)
92
t = get_transport(self._chroot_server.get_url())
93
if relpath is not None:
98
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
101
super(TestCaseWithSmartMedium, self).setUp()
102
# We're allowed to set the transport class here, so that we don't use
103
# the default or a parameterized class, but rather use the
104
# TestCaseWithTransport infrastructure to set up a smart server and
106
self.transport_server = self.make_transport_server
108
def make_transport_server(self):
109
return smart.server.SmartTCPServer_for_testing('-' + self.id())
111
def get_smart_medium(self):
112
"""Get a smart medium to use in tests."""
113
return self.get_transport().get_smart_medium()
116
class TestSmartServerResponse(tests.TestCase):
118
def test__eq__(self):
119
self.assertEqual(SmartServerResponse(('ok', )),
120
SmartServerResponse(('ok', )))
121
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
122
SmartServerResponse(('ok', ), 'body'))
123
self.assertNotEqual(SmartServerResponse(('ok', )),
124
SmartServerResponse(('notok', )))
125
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
126
SmartServerResponse(('ok', )))
127
self.assertNotEqual(None,
128
SmartServerResponse(('ok', )))
130
def test__str__(self):
131
"""SmartServerResponses can be stringified."""
133
"<SuccessfulSmartServerResponse args=('args',) body='body'>",
134
str(SuccessfulSmartServerResponse(('args',), 'body')))
136
"<FailedSmartServerResponse args=('args',) body='body'>",
137
str(FailedSmartServerResponse(('args',), 'body')))
140
class TestSmartServerRequest(tests.TestCaseWithMemoryTransport):
142
def test_translate_client_path(self):
143
transport = self.get_transport()
144
request = SmartServerRequest(transport, 'foo/')
145
self.assertEqual('./', request.translate_client_path('foo/'))
147
errors.InvalidURLJoin, request.translate_client_path, 'foo/..')
149
errors.PathNotChild, request.translate_client_path, '/')
151
errors.PathNotChild, request.translate_client_path, 'bar/')
152
self.assertEqual('./baz', request.translate_client_path('foo/baz'))
154
def test_transport_from_client_path(self):
155
transport = self.get_transport()
156
request = SmartServerRequest(transport, 'foo/')
159
request.transport_from_client_path('foo/').base)
162
class TestSmartServerBzrDirRequestCloningMetaDir(
163
tests.TestCaseWithMemoryTransport):
164
"""Tests for BzrDir.cloning_metadir."""
166
def test_cloning_metadir(self):
167
"""When there is a bzrdir present, the call succeeds."""
168
backing = self.get_transport()
169
dir = self.make_bzrdir('.')
170
local_result = dir.cloning_metadir()
171
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
172
request = request_class(backing)
173
expected = SuccessfulSmartServerResponse(
174
(local_result.network_name(),
175
local_result.repository_format.network_name(),
176
('branch', local_result.get_branch_format().network_name())))
177
self.assertEqual(expected, request.execute('', 'False'))
179
def test_cloning_metadir_reference(self):
180
"""The request fails when bzrdir contains a branch reference."""
181
backing = self.get_transport()
182
referenced_branch = self.make_branch('referenced')
183
dir = self.make_bzrdir('.')
184
local_result = dir.cloning_metadir()
185
reference = BranchReferenceFormat().initialize(dir, referenced_branch)
186
reference_url = BranchReferenceFormat().get_reference(dir)
187
# The server shouldn't try to follow the branch reference, so it's fine
188
# if the referenced branch isn't reachable.
189
backing.rename('referenced', 'moved')
190
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
191
request = request_class(backing)
192
expected = FailedSmartServerResponse(('BranchReference',))
193
self.assertEqual(expected, request.execute('', 'False'))
196
class TestSmartServerRequestCreateRepository(tests.TestCaseWithMemoryTransport):
197
"""Tests for BzrDir.create_repository."""
199
def test_makes_repository(self):
200
"""When there is a bzrdir present, the call succeeds."""
201
backing = self.get_transport()
202
self.make_bzrdir('.')
203
request_class = bzrlib.smart.bzrdir.SmartServerRequestCreateRepository
204
request = request_class(backing)
205
reference_bzrdir_format = bzrdir.format_registry.get('default')()
206
reference_format = reference_bzrdir_format.repository_format
207
network_name = reference_format.network_name()
208
expected = SuccessfulSmartServerResponse(
209
('ok', 'no', 'no', 'no', network_name))
210
self.assertEqual(expected, request.execute('', network_name, 'True'))
213
class TestSmartServerRequestFindRepository(tests.TestCaseWithMemoryTransport):
214
"""Tests for BzrDir.find_repository."""
216
def test_no_repository(self):
217
"""When there is no repository to be found, ('norepository', ) is returned."""
218
backing = self.get_transport()
219
request = self._request_class(backing)
220
self.make_bzrdir('.')
221
self.assertEqual(SmartServerResponse(('norepository', )),
224
def test_nonshared_repository(self):
225
# nonshared repositorys only allow 'find' to return a handle when the
226
# path the repository is being searched on is the same as that that
227
# the repository is at.
228
backing = self.get_transport()
229
request = self._request_class(backing)
230
result = self._make_repository_and_result()
231
self.assertEqual(result, request.execute(''))
232
self.make_bzrdir('subdir')
233
self.assertEqual(SmartServerResponse(('norepository', )),
234
request.execute('subdir'))
236
def _make_repository_and_result(self, shared=False, format=None):
237
"""Convenience function to setup a repository.
239
:result: The SmartServerResponse to expect when opening it.
241
repo = self.make_repository('.', shared=shared, format=format)
242
if repo.supports_rich_root():
246
if repo._format.supports_tree_reference:
250
if (smart.bzrdir.SmartServerRequestFindRepositoryV3 ==
251
self._request_class):
252
return SuccessfulSmartServerResponse(
253
('ok', '', rich_root, subtrees, 'no',
254
repo._format.network_name()))
255
elif (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
256
self._request_class):
257
# All tests so far are on formats, and for non-external
259
return SuccessfulSmartServerResponse(
260
('ok', '', rich_root, subtrees, 'no'))
262
return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
264
def test_shared_repository(self):
265
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
266
backing = self.get_transport()
267
request = self._request_class(backing)
268
result = self._make_repository_and_result(shared=True)
269
self.assertEqual(result, request.execute(''))
270
self.make_bzrdir('subdir')
271
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
272
self.assertEqual(result2,
273
request.execute('subdir'))
274
self.make_bzrdir('subdir/deeper')
275
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
276
self.assertEqual(result3,
277
request.execute('subdir/deeper'))
279
def test_rich_root_and_subtree_encoding(self):
280
"""Test for the format attributes for rich root and subtree support."""
281
backing = self.get_transport()
282
request = self._request_class(backing)
283
result = self._make_repository_and_result(format='dirstate-with-subtree')
284
# check the test will be valid
285
self.assertEqual('yes', result.args[2])
286
self.assertEqual('yes', result.args[3])
287
self.assertEqual(result, request.execute(''))
289
def test_supports_external_lookups_no_v2(self):
290
"""Test for the supports_external_lookups attribute."""
291
backing = self.get_transport()
292
request = self._request_class(backing)
293
result = self._make_repository_and_result(format='dirstate-with-subtree')
294
# check the test will be valid
295
self.assertEqual('no', result.args[4])
296
self.assertEqual(result, request.execute(''))
299
class TestSmartServerBzrDirRequestGetConfigFile(
300
tests.TestCaseWithMemoryTransport):
301
"""Tests for BzrDir.get_config_file."""
303
def test_present(self):
304
backing = self.get_transport()
305
dir = self.make_bzrdir('.')
306
dir.get_config().set_default_stack_on("/")
307
local_result = dir._get_config()._get_config_file().read()
308
request_class = smart_dir.SmartServerBzrDirRequestConfigFile
309
request = request_class(backing)
310
expected = SuccessfulSmartServerResponse((), local_result)
311
self.assertEqual(expected, request.execute(''))
313
def test_missing(self):
314
backing = self.get_transport()
315
dir = self.make_bzrdir('.')
316
request_class = smart_dir.SmartServerBzrDirRequestConfigFile
317
request = request_class(backing)
318
expected = SuccessfulSmartServerResponse((), '')
319
self.assertEqual(expected, request.execute(''))
322
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
324
def test_empty_dir(self):
325
"""Initializing an empty dir should succeed and do it."""
326
backing = self.get_transport()
327
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
328
self.assertEqual(SmartServerResponse(('ok', )),
330
made_dir = bzrdir.BzrDir.open_from_transport(backing)
331
# no branch, tree or repository is expected with the current
333
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
334
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
335
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
337
def test_missing_dir(self):
338
"""Initializing a missing directory should fail like the bzrdir api."""
339
backing = self.get_transport()
340
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
341
self.assertRaises(errors.NoSuchFile,
342
request.execute, 'subdir')
344
def test_initialized_dir(self):
345
"""Initializing an extant bzrdir should fail like the bzrdir api."""
346
backing = self.get_transport()
347
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
348
self.make_bzrdir('subdir')
349
self.assertRaises(errors.FileExists,
350
request.execute, 'subdir')
353
class TestSmartServerRequestBzrDirInitializeEx(tests.TestCaseWithMemoryTransport):
354
"""Basic tests for BzrDir.initialize_ex_1.16 in the smart server.
356
The main unit tests in test_bzrdir exercise the API comprehensively.
359
def test_empty_dir(self):
360
"""Initializing an empty dir should succeed and do it."""
361
backing = self.get_transport()
362
name = self.make_bzrdir('reference')._format.network_name()
363
request = smart.bzrdir.SmartServerRequestBzrDirInitializeEx(backing)
364
self.assertEqual(SmartServerResponse(('', '', '', '', '', '', name,
365
'False', '', '', '')),
366
request.execute(name, '', 'True', 'False', 'False', '', '', '', '',
368
made_dir = bzrdir.BzrDir.open_from_transport(backing)
369
# no branch, tree or repository is expected with the current
371
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
372
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
373
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
375
def test_missing_dir(self):
376
"""Initializing a missing directory should fail like the bzrdir api."""
377
backing = self.get_transport()
378
name = self.make_bzrdir('reference')._format.network_name()
379
request = smart.bzrdir.SmartServerRequestBzrDirInitializeEx(backing)
380
self.assertRaises(errors.NoSuchFile, request.execute, name,
381
'subdir/dir', 'False', 'False', 'False', '', '', '', '', 'False')
383
def test_initialized_dir(self):
384
"""Initializing an extant directory should fail like the bzrdir api."""
385
backing = self.get_transport()
386
name = self.make_bzrdir('reference')._format.network_name()
387
request = smart.bzrdir.SmartServerRequestBzrDirInitializeEx(backing)
388
self.make_bzrdir('subdir')
389
self.assertRaises(errors.FileExists, request.execute, name, 'subdir',
390
'False', 'False', 'False', '', '', '', '', 'False')
393
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
395
def test_no_branch(self):
396
"""When there is no branch, ('nobranch', ) is returned."""
397
backing = self.get_transport()
398
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
399
self.make_bzrdir('.')
400
self.assertEqual(SmartServerResponse(('nobranch', )),
403
def test_branch(self):
404
"""When there is a branch, 'ok' is returned."""
405
backing = self.get_transport()
406
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
407
self.make_branch('.')
408
self.assertEqual(SmartServerResponse(('ok', '')),
411
def test_branch_reference(self):
412
"""When there is a branch reference, the reference URL is returned."""
413
backing = self.get_transport()
414
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
415
branch = self.make_branch('branch')
416
checkout = branch.create_checkout('reference',lightweight=True)
417
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
418
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
419
self.assertEqual(SmartServerResponse(('ok', reference_url)),
420
request.execute('reference'))
423
class TestSmartServerRequestOpenBranchV2(TestCaseWithChrootedTransport):
425
def test_no_branch(self):
426
"""When there is no branch, ('nobranch', ) is returned."""
427
backing = self.get_transport()
428
self.make_bzrdir('.')
429
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
430
self.assertEqual(SmartServerResponse(('nobranch', )),
433
def test_branch(self):
434
"""When there is a branch, 'ok' is returned."""
435
backing = self.get_transport()
436
expected = self.make_branch('.')._format.network_name()
437
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
438
self.assertEqual(SuccessfulSmartServerResponse(('branch', expected)),
441
def test_branch_reference(self):
442
"""When there is a branch reference, the reference URL is returned."""
443
backing = self.get_transport()
444
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
445
branch = self.make_branch('branch')
446
checkout = branch.create_checkout('reference',lightweight=True)
447
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
448
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
449
self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
450
request.execute('reference'))
452
def test_stacked_branch(self):
453
"""Opening a stacked branch does not open the stacked-on branch."""
454
trunk = self.make_branch('trunk')
455
feature = self.make_branch('feature', format='1.9')
456
feature.set_stacked_on_url(trunk.base)
458
Branch.hooks.install_named_hook('open', opened_branches.append, None)
459
backing = self.get_transport()
460
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
463
response = request.execute('feature')
465
request.teardown_jail()
466
expected_format = feature._format.network_name()
468
SuccessfulSmartServerResponse(('branch', expected_format)),
470
self.assertLength(1, opened_branches)
473
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
475
def test_empty(self):
476
"""For an empty branch, the body is empty."""
477
backing = self.get_transport()
478
request = smart.branch.SmartServerRequestRevisionHistory(backing)
479
self.make_branch('.')
480
self.assertEqual(SmartServerResponse(('ok', ), ''),
483
def test_not_empty(self):
484
"""For a non-empty branch, the body is empty."""
485
backing = self.get_transport()
486
request = smart.branch.SmartServerRequestRevisionHistory(backing)
487
tree = self.make_branch_and_memory_tree('.')
490
r1 = tree.commit('1st commit')
491
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
494
SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
498
class TestSmartServerBranchRequest(tests.TestCaseWithMemoryTransport):
500
def test_no_branch(self):
501
"""When there is a bzrdir and no branch, NotBranchError is raised."""
502
backing = self.get_transport()
503
request = smart.branch.SmartServerBranchRequest(backing)
504
self.make_bzrdir('.')
505
self.assertRaises(errors.NotBranchError,
508
def test_branch_reference(self):
509
"""When there is a branch reference, NotBranchError is raised."""
510
backing = self.get_transport()
511
request = smart.branch.SmartServerBranchRequest(backing)
512
branch = self.make_branch('branch')
513
checkout = branch.create_checkout('reference',lightweight=True)
514
self.assertRaises(errors.NotBranchError,
515
request.execute, 'checkout')
518
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithMemoryTransport):
520
def test_empty(self):
521
"""For an empty branch, the result is ('ok', '0', 'null:')."""
522
backing = self.get_transport()
523
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
524
self.make_branch('.')
525
self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
528
def test_not_empty(self):
529
"""For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
530
backing = self.get_transport()
531
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
532
tree = self.make_branch_and_memory_tree('.')
535
rev_id_utf8 = u'\xc8'.encode('utf-8')
536
r1 = tree.commit('1st commit')
537
r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
540
SmartServerResponse(('ok', '2', rev_id_utf8)),
544
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithMemoryTransport):
546
def test_default(self):
547
"""With no file, we get empty content."""
548
backing = self.get_transport()
549
request = smart.branch.SmartServerBranchGetConfigFile(backing)
550
branch = self.make_branch('.')
551
# there should be no file by default
553
self.assertEqual(SmartServerResponse(('ok', ), content),
556
def test_with_content(self):
557
# SmartServerBranchGetConfigFile should return the content from
558
# branch.control_files.get('branch.conf') for now - in the future it may
559
# perform more complex processing.
560
backing = self.get_transport()
561
request = smart.branch.SmartServerBranchGetConfigFile(backing)
562
branch = self.make_branch('.')
563
branch._transport.put_bytes('branch.conf', 'foo bar baz')
564
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
568
class TestLockedBranch(tests.TestCaseWithMemoryTransport):
570
def get_lock_tokens(self, branch):
571
branch_token = branch.lock_write()
572
repo_token = branch.repository.lock_write()
573
branch.repository.unlock()
574
return branch_token, repo_token
577
class TestSmartServerBranchRequestSetConfigOption(TestLockedBranch):
579
def test_value_name(self):
580
branch = self.make_branch('.')
581
request = smart.branch.SmartServerBranchRequestSetConfigOption(
582
branch.bzrdir.root_transport)
583
branch_token, repo_token = self.get_lock_tokens(branch)
584
config = branch._get_config()
585
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
587
self.assertEqual(SuccessfulSmartServerResponse(()), result)
588
self.assertEqual('bar', config.get_option('foo'))
592
def test_value_name_section(self):
593
branch = self.make_branch('.')
594
request = smart.branch.SmartServerBranchRequestSetConfigOption(
595
branch.bzrdir.root_transport)
596
branch_token, repo_token = self.get_lock_tokens(branch)
597
config = branch._get_config()
598
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
600
self.assertEqual(SuccessfulSmartServerResponse(()), result)
601
self.assertEqual('bar', config.get_option('foo', 'gam'))
606
class TestSmartServerBranchRequestSetTagsBytes(TestLockedBranch):
607
# Only called when the branch format and tags match [yay factory
608
# methods] so only need to test straight forward cases.
610
def test_set_bytes(self):
611
base_branch = self.make_branch('base')
612
tag_bytes = base_branch._get_tags_bytes()
613
# get_lock_tokens takes out a lock.
614
branch_token, repo_token = self.get_lock_tokens(base_branch)
615
request = smart.branch.SmartServerBranchSetTagsBytes(
616
self.get_transport())
617
response = request.execute('base', branch_token, repo_token)
618
self.assertEqual(None, response)
619
response = request.do_chunk(tag_bytes)
620
self.assertEqual(None, response)
621
response = request.do_end()
623
SuccessfulSmartServerResponse(()), response)
626
def test_lock_failed(self):
627
base_branch = self.make_branch('base')
628
base_branch.lock_write()
629
tag_bytes = base_branch._get_tags_bytes()
630
request = smart.branch.SmartServerBranchSetTagsBytes(
631
self.get_transport())
632
self.assertRaises(errors.TokenMismatch, request.execute,
633
'base', 'wrong token', 'wrong token')
634
# The request handler will keep processing the message parts, so even
635
# if the request fails immediately do_chunk and do_end are still
637
request.do_chunk(tag_bytes)
643
class SetLastRevisionTestBase(TestLockedBranch):
644
"""Base test case for verbs that implement set_last_revision."""
647
tests.TestCaseWithMemoryTransport.setUp(self)
648
backing_transport = self.get_transport()
649
self.request = self.request_class(backing_transport)
650
self.tree = self.make_branch_and_memory_tree('.')
652
def lock_branch(self):
653
return self.get_lock_tokens(self.tree.branch)
655
def unlock_branch(self):
656
self.tree.branch.unlock()
658
def set_last_revision(self, revision_id, revno):
659
branch_token, repo_token = self.lock_branch()
660
response = self._set_last_revision(
661
revision_id, revno, branch_token, repo_token)
665
def assertRequestSucceeds(self, revision_id, revno):
666
response = self.set_last_revision(revision_id, revno)
667
self.assertEqual(SuccessfulSmartServerResponse(('ok',)), response)
670
class TestSetLastRevisionVerbMixin(object):
671
"""Mixin test case for verbs that implement set_last_revision."""
673
def test_set_null_to_null(self):
674
"""An empty branch can have its last revision set to 'null:'."""
675
self.assertRequestSucceeds('null:', 0)
677
def test_NoSuchRevision(self):
678
"""If the revision_id is not present, the verb returns NoSuchRevision.
680
revision_id = 'non-existent revision'
682
FailedSmartServerResponse(('NoSuchRevision', revision_id)),
683
self.set_last_revision(revision_id, 1))
685
def make_tree_with_two_commits(self):
686
self.tree.lock_write()
688
rev_id_utf8 = u'\xc8'.encode('utf-8')
689
r1 = self.tree.commit('1st commit', rev_id=rev_id_utf8)
690
r2 = self.tree.commit('2nd commit', rev_id='rev-2')
693
def test_branch_last_revision_info_is_updated(self):
694
"""A branch's tip can be set to a revision that is present in its
697
# Make a branch with an empty revision history, but two revisions in
699
self.make_tree_with_two_commits()
700
rev_id_utf8 = u'\xc8'.encode('utf-8')
701
self.tree.branch.set_revision_history([])
703
(0, 'null:'), self.tree.branch.last_revision_info())
704
# We can update the branch to a revision that is present in the
706
self.assertRequestSucceeds(rev_id_utf8, 1)
708
(1, rev_id_utf8), self.tree.branch.last_revision_info())
710
def test_branch_last_revision_info_rewind(self):
711
"""A branch's tip can be set to a revision that is an ancestor of the
714
self.make_tree_with_two_commits()
715
rev_id_utf8 = u'\xc8'.encode('utf-8')
717
(2, 'rev-2'), self.tree.branch.last_revision_info())
718
self.assertRequestSucceeds(rev_id_utf8, 1)
720
(1, rev_id_utf8), self.tree.branch.last_revision_info())
722
def test_TipChangeRejected(self):
723
"""If a pre_change_branch_tip hook raises TipChangeRejected, the verb
724
returns TipChangeRejected.
726
rejection_message = u'rejection message\N{INTERROBANG}'
727
def hook_that_rejects(params):
728
raise errors.TipChangeRejected(rejection_message)
729
Branch.hooks.install_named_hook(
730
'pre_change_branch_tip', hook_that_rejects, None)
732
FailedSmartServerResponse(
733
('TipChangeRejected', rejection_message.encode('utf-8'))),
734
self.set_last_revision('null:', 0))
737
class TestSmartServerBranchRequestSetLastRevision(
738
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
739
"""Tests for Branch.set_last_revision verb."""
741
request_class = smart.branch.SmartServerBranchRequestSetLastRevision
743
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
744
return self.request.execute(
745
'', branch_token, repo_token, revision_id)
748
class TestSmartServerBranchRequestSetLastRevisionInfo(
749
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
750
"""Tests for Branch.set_last_revision_info verb."""
752
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionInfo
754
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
755
return self.request.execute(
756
'', branch_token, repo_token, revno, revision_id)
758
def test_NoSuchRevision(self):
759
"""Branch.set_last_revision_info does not have to return
760
NoSuchRevision if the revision_id is absent.
762
raise tests.TestNotApplicable()
765
class TestSmartServerBranchRequestSetLastRevisionEx(
766
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
767
"""Tests for Branch.set_last_revision_ex verb."""
769
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionEx
771
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
772
return self.request.execute(
773
'', branch_token, repo_token, revision_id, 0, 0)
775
def assertRequestSucceeds(self, revision_id, revno):
776
response = self.set_last_revision(revision_id, revno)
778
SuccessfulSmartServerResponse(('ok', revno, revision_id)),
781
def test_branch_last_revision_info_rewind(self):
782
"""A branch's tip can be set to a revision that is an ancestor of the
783
current tip, but only if allow_overwrite_descendant is passed.
785
self.make_tree_with_two_commits()
786
rev_id_utf8 = u'\xc8'.encode('utf-8')
788
(2, 'rev-2'), self.tree.branch.last_revision_info())
789
# If allow_overwrite_descendant flag is 0, then trying to set the tip
790
# to an older revision ID has no effect.
791
branch_token, repo_token = self.lock_branch()
792
response = self.request.execute(
793
'', branch_token, repo_token, rev_id_utf8, 0, 0)
795
SuccessfulSmartServerResponse(('ok', 2, 'rev-2')),
798
(2, 'rev-2'), self.tree.branch.last_revision_info())
800
# If allow_overwrite_descendant flag is 1, then setting the tip to an
802
response = self.request.execute(
803
'', branch_token, repo_token, rev_id_utf8, 0, 1)
805
SuccessfulSmartServerResponse(('ok', 1, rev_id_utf8)),
809
(1, rev_id_utf8), self.tree.branch.last_revision_info())
811
def make_branch_with_divergent_history(self):
812
"""Make a branch with divergent history in its repo.
814
The branch's tip will be 'child-2', and the repo will also contain
815
'child-1', which diverges from a common base revision.
817
self.tree.lock_write()
819
r1 = self.tree.commit('1st commit')
820
revno_1, revid_1 = self.tree.branch.last_revision_info()
821
r2 = self.tree.commit('2nd commit', rev_id='child-1')
822
# Undo the second commit
823
self.tree.branch.set_last_revision_info(revno_1, revid_1)
824
self.tree.set_parent_ids([revid_1])
825
# Make a new second commit, child-2. child-2 has diverged from
827
new_r2 = self.tree.commit('2nd commit', rev_id='child-2')
830
def test_not_allow_diverged(self):
831
"""If allow_diverged is not passed, then setting a divergent history
832
returns a Diverged error.
834
self.make_branch_with_divergent_history()
836
FailedSmartServerResponse(('Diverged',)),
837
self.set_last_revision('child-1', 2))
838
# The branch tip was not changed.
839
self.assertEqual('child-2', self.tree.branch.last_revision())
841
def test_allow_diverged(self):
842
"""If allow_diverged is passed, then setting a divergent history
845
self.make_branch_with_divergent_history()
846
branch_token, repo_token = self.lock_branch()
847
response = self.request.execute(
848
'', branch_token, repo_token, 'child-1', 1, 0)
850
SuccessfulSmartServerResponse(('ok', 2, 'child-1')),
853
# The branch tip was changed.
854
self.assertEqual('child-1', self.tree.branch.last_revision())
857
class TestSmartServerBranchRequestGetParent(tests.TestCaseWithMemoryTransport):
859
def test_get_parent_none(self):
860
base_branch = self.make_branch('base')
861
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
862
response = request.execute('base')
864
SuccessfulSmartServerResponse(('',)), response)
866
def test_get_parent_something(self):
867
base_branch = self.make_branch('base')
868
base_branch.set_parent(self.get_url('foo'))
869
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
870
response = request.execute('base')
872
SuccessfulSmartServerResponse(("../foo",)),
876
class TestSmartServerBranchRequestSetParent(tests.TestCaseWithMemoryTransport):
878
def test_set_parent_none(self):
879
branch = self.make_branch('base', format="1.9")
881
branch._set_parent_location('foo')
883
request = smart.branch.SmartServerBranchRequestSetParentLocation(
884
self.get_transport())
885
branch_token = branch.lock_write()
886
repo_token = branch.repository.lock_write()
888
response = request.execute('base', branch_token, repo_token, '')
890
branch.repository.unlock()
892
self.assertEqual(SuccessfulSmartServerResponse(()), response)
893
self.assertEqual(None, branch.get_parent())
895
def test_set_parent_something(self):
896
branch = self.make_branch('base', format="1.9")
897
request = smart.branch.SmartServerBranchRequestSetParentLocation(
898
self.get_transport())
899
branch_token = branch.lock_write()
900
repo_token = branch.repository.lock_write()
902
response = request.execute('base', branch_token, repo_token,
905
branch.repository.unlock()
907
self.assertEqual(SuccessfulSmartServerResponse(()), response)
908
self.assertEqual('http://bar/', branch.get_parent())
911
class TestSmartServerBranchRequestGetTagsBytes(tests.TestCaseWithMemoryTransport):
912
# Only called when the branch format and tags match [yay factory
913
# methods] so only need to test straight forward cases.
915
def test_get_bytes(self):
916
base_branch = self.make_branch('base')
917
request = smart.branch.SmartServerBranchGetTagsBytes(
918
self.get_transport())
919
response = request.execute('base')
921
SuccessfulSmartServerResponse(('',)), response)
924
class TestSmartServerBranchRequestGetStackedOnURL(tests.TestCaseWithMemoryTransport):
926
def test_get_stacked_on_url(self):
927
base_branch = self.make_branch('base', format='1.6')
928
stacked_branch = self.make_branch('stacked', format='1.6')
929
# typically should be relative
930
stacked_branch.set_stacked_on_url('../base')
931
request = smart.branch.SmartServerBranchRequestGetStackedOnURL(
932
self.get_transport())
933
response = request.execute('stacked')
935
SmartServerResponse(('ok', '../base')),
939
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
942
tests.TestCaseWithMemoryTransport.setUp(self)
944
def test_lock_write_on_unlocked_branch(self):
945
backing = self.get_transport()
946
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
947
branch = self.make_branch('.', format='knit')
948
repository = branch.repository
949
response = request.execute('')
950
branch_nonce = branch.control_files._lock.peek().get('nonce')
951
repository_nonce = repository.control_files._lock.peek().get('nonce')
953
SmartServerResponse(('ok', branch_nonce, repository_nonce)),
955
# The branch (and associated repository) is now locked. Verify that
956
# with a new branch object.
957
new_branch = repository.bzrdir.open_branch()
958
self.assertRaises(errors.LockContention, new_branch.lock_write)
960
request = smart.branch.SmartServerBranchRequestUnlock(backing)
961
response = request.execute('', branch_nonce, repository_nonce)
963
def test_lock_write_on_locked_branch(self):
964
backing = self.get_transport()
965
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
966
branch = self.make_branch('.')
967
branch_token = branch.lock_write()
968
branch.leave_lock_in_place()
970
response = request.execute('')
972
SmartServerResponse(('LockContention',)), response)
974
branch.lock_write(branch_token)
975
branch.dont_leave_lock_in_place()
978
def test_lock_write_with_tokens_on_locked_branch(self):
979
backing = self.get_transport()
980
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
981
branch = self.make_branch('.', format='knit')
982
branch_token = branch.lock_write()
983
repo_token = branch.repository.lock_write()
984
branch.repository.unlock()
985
branch.leave_lock_in_place()
986
branch.repository.leave_lock_in_place()
988
response = request.execute('',
989
branch_token, repo_token)
991
SmartServerResponse(('ok', branch_token, repo_token)), response)
993
branch.repository.lock_write(repo_token)
994
branch.repository.dont_leave_lock_in_place()
995
branch.repository.unlock()
996
branch.lock_write(branch_token)
997
branch.dont_leave_lock_in_place()
1000
def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
1001
backing = self.get_transport()
1002
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
1003
branch = self.make_branch('.', format='knit')
1004
branch_token = branch.lock_write()
1005
repo_token = branch.repository.lock_write()
1006
branch.repository.unlock()
1007
branch.leave_lock_in_place()
1008
branch.repository.leave_lock_in_place()
1010
response = request.execute('',
1011
branch_token+'xxx', repo_token)
1013
SmartServerResponse(('TokenMismatch',)), response)
1015
branch.repository.lock_write(repo_token)
1016
branch.repository.dont_leave_lock_in_place()
1017
branch.repository.unlock()
1018
branch.lock_write(branch_token)
1019
branch.dont_leave_lock_in_place()
1022
def test_lock_write_on_locked_repo(self):
1023
backing = self.get_transport()
1024
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
1025
branch = self.make_branch('.', format='knit')
1026
repo = branch.repository
1027
repo_token = repo.lock_write()
1028
repo.leave_lock_in_place()
1030
response = request.execute('')
1032
SmartServerResponse(('LockContention',)), response)
1034
repo.lock_write(repo_token)
1035
repo.dont_leave_lock_in_place()
1038
def test_lock_write_on_readonly_transport(self):
1039
backing = self.get_readonly_transport()
1040
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
1041
branch = self.make_branch('.')
1042
root = self.get_transport().clone('/')
1043
path = urlutils.relative_url(root.base, self.get_transport().base)
1044
response = request.execute(path)
1045
error_name, lock_str, why_str = response.args
1046
self.assertFalse(response.is_successful())
1047
self.assertEqual('LockFailed', error_name)
1050
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithMemoryTransport):
1053
tests.TestCaseWithMemoryTransport.setUp(self)
1055
def test_unlock_on_locked_branch_and_repo(self):
1056
backing = self.get_transport()
1057
request = smart.branch.SmartServerBranchRequestUnlock(backing)
1058
branch = self.make_branch('.', format='knit')
1060
branch_token = branch.lock_write()
1061
repo_token = branch.repository.lock_write()
1062
branch.repository.unlock()
1063
# Unlock the branch (and repo) object, leaving the physical locks
1065
branch.leave_lock_in_place()
1066
branch.repository.leave_lock_in_place()
1068
response = request.execute('',
1069
branch_token, repo_token)
1071
SmartServerResponse(('ok',)), response)
1072
# The branch is now unlocked. Verify that with a new branch
1074
new_branch = branch.bzrdir.open_branch()
1075
new_branch.lock_write()
1078
def test_unlock_on_unlocked_branch_unlocked_repo(self):
1079
backing = self.get_transport()
1080
request = smart.branch.SmartServerBranchRequestUnlock(backing)
1081
branch = self.make_branch('.', format='knit')
1082
response = request.execute(
1083
'', 'branch token', 'repo token')
1085
SmartServerResponse(('TokenMismatch',)), response)
1087
def test_unlock_on_unlocked_branch_locked_repo(self):
1088
backing = self.get_transport()
1089
request = smart.branch.SmartServerBranchRequestUnlock(backing)
1090
branch = self.make_branch('.', format='knit')
1091
# Lock the repository.
1092
repo_token = branch.repository.lock_write()
1093
branch.repository.leave_lock_in_place()
1094
branch.repository.unlock()
1095
# Issue branch lock_write request on the unlocked branch (with locked
1097
response = request.execute(
1098
'', 'branch token', repo_token)
1100
SmartServerResponse(('TokenMismatch',)), response)
1102
branch.repository.lock_write(repo_token)
1103
branch.repository.dont_leave_lock_in_place()
1104
branch.repository.unlock()
1107
class TestSmartServerRepositoryRequest(tests.TestCaseWithMemoryTransport):
1109
def test_no_repository(self):
1110
"""Raise NoRepositoryPresent when there is a bzrdir and no repo."""
1111
# we test this using a shared repository above the named path,
1112
# thus checking the right search logic is used - that is, that
1113
# its the exact path being looked at and the server is not
1115
backing = self.get_transport()
1116
request = smart.repository.SmartServerRepositoryRequest(backing)
1117
self.make_repository('.', shared=True)
1118
self.make_bzrdir('subdir')
1119
self.assertRaises(errors.NoRepositoryPresent,
1120
request.execute, 'subdir')
1123
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithMemoryTransport):
1125
def test_trivial_bzipped(self):
1126
# This tests that the wire encoding is actually bzipped
1127
backing = self.get_transport()
1128
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
1129
tree = self.make_branch_and_memory_tree('.')
1131
self.assertEqual(None,
1132
request.execute('', 'missing-id'))
1133
# Note that it returns a body that is bzipped.
1135
SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
1136
request.do_body('\n\n0\n'))
1138
def test_trivial_include_missing(self):
1139
backing = self.get_transport()
1140
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
1141
tree = self.make_branch_and_memory_tree('.')
1143
self.assertEqual(None,
1144
request.execute('', 'missing-id', 'include-missing:'))
1146
SuccessfulSmartServerResponse(('ok', ),
1147
bz2.compress('missing:missing-id')),
1148
request.do_body('\n\n0\n'))
1151
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithMemoryTransport):
1153
def test_none_argument(self):
1154
backing = self.get_transport()
1155
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1156
tree = self.make_branch_and_memory_tree('.')
1159
r1 = tree.commit('1st commit')
1160
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1163
# the lines of revision_id->revision_parent_list has no guaranteed
1164
# order coming out of a dict, so sort both our test and response
1165
lines = sorted([' '.join([r2, r1]), r1])
1166
response = request.execute('', '')
1167
response.body = '\n'.join(sorted(response.body.split('\n')))
1170
SmartServerResponse(('ok', ), '\n'.join(lines)), response)
1172
def test_specific_revision_argument(self):
1173
backing = self.get_transport()
1174
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1175
tree = self.make_branch_and_memory_tree('.')
1178
rev_id_utf8 = u'\xc9'.encode('utf-8')
1179
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
1180
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1183
self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
1184
request.execute('', rev_id_utf8))
1186
def test_no_such_revision(self):
1187
backing = self.get_transport()
1188
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1189
tree = self.make_branch_and_memory_tree('.')
1192
r1 = tree.commit('1st commit')
1195
# Note that it still returns body (of zero bytes).
1197
SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
1198
request.execute('', 'missingrevision'))
1201
class TestSmartServerRepositoryGetRevIdForRevno(tests.TestCaseWithMemoryTransport):
1203
def test_revno_found(self):
1204
backing = self.get_transport()
1205
request = smart.repository.SmartServerRepositoryGetRevIdForRevno(backing)
1206
tree = self.make_branch_and_memory_tree('.')
1209
rev1_id_utf8 = u'\xc8'.encode('utf-8')
1210
rev2_id_utf8 = u'\xc9'.encode('utf-8')
1211
tree.commit('1st commit', rev_id=rev1_id_utf8)
1212
tree.commit('2nd commit', rev_id=rev2_id_utf8)
1215
self.assertEqual(SmartServerResponse(('ok', rev1_id_utf8)),
1216
request.execute('', 1, (2, rev2_id_utf8)))
1218
def test_known_revid_missing(self):
1219
backing = self.get_transport()
1220
request = smart.repository.SmartServerRepositoryGetRevIdForRevno(backing)
1221
repo = self.make_repository('.')
1223
FailedSmartServerResponse(('nosuchrevision', 'ghost')),
1224
request.execute('', 1, (2, 'ghost')))
1226
def test_history_incomplete(self):
1227
backing = self.get_transport()
1228
request = smart.repository.SmartServerRepositoryGetRevIdForRevno(backing)
1229
parent = self.make_branch_and_memory_tree('parent', format='1.9')
1231
parent.add([''], ['TREE_ROOT'])
1232
r1 = parent.commit(message='first commit')
1233
r2 = parent.commit(message='second commit')
1235
local = self.make_branch_and_memory_tree('local', format='1.9')
1236
local.branch.pull(parent.branch)
1237
local.set_parent_ids([r2])
1238
r3 = local.commit(message='local commit')
1239
local.branch.create_clone_on_transport(
1240
self.get_transport('stacked'), stacked_on=self.get_url('parent'))
1242
SmartServerResponse(('history-incomplete', 2, r2)),
1243
request.execute('stacked', 1, (3, r3)))
1245
class TestSmartServerRepositoryGetStream(tests.TestCaseWithMemoryTransport):
1247
def make_two_commit_repo(self):
1248
tree = self.make_branch_and_memory_tree('.')
1251
r1 = tree.commit('1st commit')
1252
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1254
repo = tree.branch.repository
1257
def test_ancestry_of(self):
1258
"""The search argument may be a 'ancestry-of' some heads'."""
1259
backing = self.get_transport()
1260
request = smart.repository.SmartServerRepositoryGetStream(backing)
1261
repo, r1, r2 = self.make_two_commit_repo()
1262
fetch_spec = ['ancestry-of', r2]
1263
lines = '\n'.join(fetch_spec)
1264
request.execute('', repo._format.network_name())
1265
response = request.do_body(lines)
1266
self.assertEqual(('ok',), response.args)
1267
stream_bytes = ''.join(response.body_stream)
1268
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1270
def test_search(self):
1271
"""The search argument may be a 'search' of some explicit keys."""
1272
backing = self.get_transport()
1273
request = smart.repository.SmartServerRepositoryGetStream(backing)
1274
repo, r1, r2 = self.make_two_commit_repo()
1275
fetch_spec = ['search', '%s %s' % (r1, r2), 'null:', '2']
1276
lines = '\n'.join(fetch_spec)
1277
request.execute('', repo._format.network_name())
1278
response = request.do_body(lines)
1279
self.assertEqual(('ok',), response.args)
1280
stream_bytes = ''.join(response.body_stream)
1281
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1284
class TestSmartServerRequestHasRevision(tests.TestCaseWithMemoryTransport):
1286
def test_missing_revision(self):
1287
"""For a missing revision, ('no', ) is returned."""
1288
backing = self.get_transport()
1289
request = smart.repository.SmartServerRequestHasRevision(backing)
1290
self.make_repository('.')
1291
self.assertEqual(SmartServerResponse(('no', )),
1292
request.execute('', 'revid'))
1294
def test_present_revision(self):
1295
"""For a present revision, ('yes', ) is returned."""
1296
backing = self.get_transport()
1297
request = smart.repository.SmartServerRequestHasRevision(backing)
1298
tree = self.make_branch_and_memory_tree('.')
1301
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1302
r1 = tree.commit('a commit', rev_id=rev_id_utf8)
1304
self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
1305
self.assertEqual(SmartServerResponse(('yes', )),
1306
request.execute('', rev_id_utf8))
1309
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithMemoryTransport):
1311
def test_empty_revid(self):
1312
"""With an empty revid, we get only size an number and revisions"""
1313
backing = self.get_transport()
1314
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1315
repository = self.make_repository('.')
1316
stats = repository.gather_stats()
1317
expected_body = 'revisions: 0\n'
1318
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1319
request.execute('', '', 'no'))
1321
def test_revid_with_committers(self):
1322
"""For a revid we get more infos."""
1323
backing = self.get_transport()
1324
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1325
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1326
tree = self.make_branch_and_memory_tree('.')
1329
# Let's build a predictable result
1330
tree.commit('a commit', timestamp=123456.2, timezone=3600)
1331
tree.commit('a commit', timestamp=654321.4, timezone=0,
1335
stats = tree.branch.repository.gather_stats()
1336
expected_body = ('firstrev: 123456.200 3600\n'
1337
'latestrev: 654321.400 0\n'
1339
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1343
def test_not_empty_repository_with_committers(self):
1344
"""For a revid and requesting committers we get the whole thing."""
1345
backing = self.get_transport()
1346
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1347
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1348
tree = self.make_branch_and_memory_tree('.')
1351
# Let's build a predictable result
1352
tree.commit('a commit', timestamp=123456.2, timezone=3600,
1354
tree.commit('a commit', timestamp=654321.4, timezone=0,
1355
committer='bar', rev_id=rev_id_utf8)
1357
stats = tree.branch.repository.gather_stats()
1359
expected_body = ('committers: 2\n'
1360
'firstrev: 123456.200 3600\n'
1361
'latestrev: 654321.400 0\n'
1363
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1365
rev_id_utf8, 'yes'))
1368
class TestSmartServerRepositoryIsShared(tests.TestCaseWithMemoryTransport):
1370
def test_is_shared(self):
1371
"""For a shared repository, ('yes', ) is returned."""
1372
backing = self.get_transport()
1373
request = smart.repository.SmartServerRepositoryIsShared(backing)
1374
self.make_repository('.', shared=True)
1375
self.assertEqual(SmartServerResponse(('yes', )),
1376
request.execute('', ))
1378
def test_is_not_shared(self):
1379
"""For a shared repository, ('no', ) is returned."""
1380
backing = self.get_transport()
1381
request = smart.repository.SmartServerRepositoryIsShared(backing)
1382
self.make_repository('.', shared=False)
1383
self.assertEqual(SmartServerResponse(('no', )),
1384
request.execute('', ))
1387
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithMemoryTransport):
1389
def test_lock_write_on_unlocked_repo(self):
1390
backing = self.get_transport()
1391
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1392
repository = self.make_repository('.', format='knit')
1393
response = request.execute('')
1394
nonce = repository.control_files._lock.peek().get('nonce')
1395
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
1396
# The repository is now locked. Verify that with a new repository
1398
new_repo = repository.bzrdir.open_repository()
1399
self.assertRaises(errors.LockContention, new_repo.lock_write)
1401
request = smart.repository.SmartServerRepositoryUnlock(backing)
1402
response = request.execute('', nonce)
1404
def test_lock_write_on_locked_repo(self):
1405
backing = self.get_transport()
1406
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1407
repository = self.make_repository('.', format='knit')
1408
repo_token = repository.lock_write()
1409
repository.leave_lock_in_place()
1411
response = request.execute('')
1413
SmartServerResponse(('LockContention',)), response)
1415
repository.lock_write(repo_token)
1416
repository.dont_leave_lock_in_place()
1419
def test_lock_write_on_readonly_transport(self):
1420
backing = self.get_readonly_transport()
1421
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1422
repository = self.make_repository('.', format='knit')
1423
response = request.execute('')
1424
self.assertFalse(response.is_successful())
1425
self.assertEqual('LockFailed', response.args[0])
1428
class TestInsertStreamBase(tests.TestCaseWithMemoryTransport):
1430
def make_empty_byte_stream(self, repo):
1431
byte_stream = smart.repository._stream_to_byte_stream([], repo._format)
1432
return ''.join(byte_stream)
1435
class TestSmartServerRepositoryInsertStream(TestInsertStreamBase):
1437
def test_insert_stream_empty(self):
1438
backing = self.get_transport()
1439
request = smart.repository.SmartServerRepositoryInsertStream(backing)
1440
repository = self.make_repository('.')
1441
response = request.execute('', '')
1442
self.assertEqual(None, response)
1443
response = request.do_chunk(self.make_empty_byte_stream(repository))
1444
self.assertEqual(None, response)
1445
response = request.do_end()
1446
self.assertEqual(SmartServerResponse(('ok', )), response)
1449
class TestSmartServerRepositoryInsertStreamLocked(TestInsertStreamBase):
1451
def test_insert_stream_empty(self):
1452
backing = self.get_transport()
1453
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1455
repository = self.make_repository('.', format='knit')
1456
lock_token = repository.lock_write()
1457
response = request.execute('', '', lock_token)
1458
self.assertEqual(None, response)
1459
response = request.do_chunk(self.make_empty_byte_stream(repository))
1460
self.assertEqual(None, response)
1461
response = request.do_end()
1462
self.assertEqual(SmartServerResponse(('ok', )), response)
1465
def test_insert_stream_with_wrong_lock_token(self):
1466
backing = self.get_transport()
1467
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1469
repository = self.make_repository('.', format='knit')
1470
lock_token = repository.lock_write()
1472
errors.TokenMismatch, request.execute, '', '', 'wrong-token')
1476
class TestSmartServerRepositoryUnlock(tests.TestCaseWithMemoryTransport):
1479
tests.TestCaseWithMemoryTransport.setUp(self)
1481
def test_unlock_on_locked_repo(self):
1482
backing = self.get_transport()
1483
request = smart.repository.SmartServerRepositoryUnlock(backing)
1484
repository = self.make_repository('.', format='knit')
1485
token = repository.lock_write()
1486
repository.leave_lock_in_place()
1488
response = request.execute('', token)
1490
SmartServerResponse(('ok',)), response)
1491
# The repository is now unlocked. Verify that with a new repository
1493
new_repo = repository.bzrdir.open_repository()
1494
new_repo.lock_write()
1497
def test_unlock_on_unlocked_repo(self):
1498
backing = self.get_transport()
1499
request = smart.repository.SmartServerRepositoryUnlock(backing)
1500
repository = self.make_repository('.', format='knit')
1501
response = request.execute('', 'some token')
1503
SmartServerResponse(('TokenMismatch',)), response)
1506
class TestSmartServerIsReadonly(tests.TestCaseWithMemoryTransport):
1508
def test_is_readonly_no(self):
1509
backing = self.get_transport()
1510
request = smart.request.SmartServerIsReadonly(backing)
1511
response = request.execute()
1513
SmartServerResponse(('no',)), response)
1515
def test_is_readonly_yes(self):
1516
backing = self.get_readonly_transport()
1517
request = smart.request.SmartServerIsReadonly(backing)
1518
response = request.execute()
1520
SmartServerResponse(('yes',)), response)
1523
class TestSmartServerRepositorySetMakeWorkingTrees(tests.TestCaseWithMemoryTransport):
1525
def test_set_false(self):
1526
backing = self.get_transport()
1527
repo = self.make_repository('.', shared=True)
1528
repo.set_make_working_trees(True)
1529
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1530
request = request_class(backing)
1531
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1532
request.execute('', 'False'))
1533
repo = repo.bzrdir.open_repository()
1534
self.assertFalse(repo.make_working_trees())
1536
def test_set_true(self):
1537
backing = self.get_transport()
1538
repo = self.make_repository('.', shared=True)
1539
repo.set_make_working_trees(False)
1540
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1541
request = request_class(backing)
1542
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1543
request.execute('', 'True'))
1544
repo = repo.bzrdir.open_repository()
1545
self.assertTrue(repo.make_working_trees())
1548
class TestSmartServerPackRepositoryAutopack(tests.TestCaseWithTransport):
1550
def make_repo_needing_autopacking(self, path='.'):
1551
# Make a repo in need of autopacking.
1552
tree = self.make_branch_and_tree('.', format='pack-0.92')
1553
repo = tree.branch.repository
1554
# monkey-patch the pack collection to disable autopacking
1555
repo._pack_collection._max_pack_count = lambda count: count
1557
tree.commit('commit %s' % x)
1558
self.assertEqual(10, len(repo._pack_collection.names()))
1559
del repo._pack_collection._max_pack_count
1562
def test_autopack_needed(self):
1563
repo = self.make_repo_needing_autopacking()
1565
self.addCleanup(repo.unlock)
1566
backing = self.get_transport()
1567
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1569
response = request.execute('')
1570
self.assertEqual(SmartServerResponse(('ok',)), response)
1571
repo._pack_collection.reload_pack_names()
1572
self.assertEqual(1, len(repo._pack_collection.names()))
1574
def test_autopack_not_needed(self):
1575
tree = self.make_branch_and_tree('.', format='pack-0.92')
1576
repo = tree.branch.repository
1578
self.addCleanup(repo.unlock)
1580
tree.commit('commit %s' % x)
1581
backing = self.get_transport()
1582
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1584
response = request.execute('')
1585
self.assertEqual(SmartServerResponse(('ok',)), response)
1586
repo._pack_collection.reload_pack_names()
1587
self.assertEqual(9, len(repo._pack_collection.names()))
1589
def test_autopack_on_nonpack_format(self):
1590
"""A request to autopack a non-pack repo is a no-op."""
1591
repo = self.make_repository('.', format='knit')
1592
backing = self.get_transport()
1593
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1595
response = request.execute('')
1596
self.assertEqual(SmartServerResponse(('ok',)), response)
1599
class TestHandlers(tests.TestCase):
1600
"""Tests for the request.request_handlers object."""
1602
def test_all_registrations_exist(self):
1603
"""All registered request_handlers can be found."""
1604
# If there's a typo in a register_lazy call, this loop will fail with
1605
# an AttributeError.
1606
for key, item in smart.request.request_handlers.iteritems():
1609
def assertHandlerEqual(self, verb, handler):
1610
self.assertEqual(smart.request.request_handlers.get(verb), handler)
1612
def test_registered_methods(self):
1613
"""Test that known methods are registered to the correct object."""
1614
self.assertHandlerEqual('Branch.get_config_file',
1615
smart.branch.SmartServerBranchGetConfigFile)
1616
self.assertHandlerEqual('Branch.get_parent',
1617
smart.branch.SmartServerBranchGetParent)
1618
self.assertHandlerEqual('Branch.get_tags_bytes',
1619
smart.branch.SmartServerBranchGetTagsBytes)
1620
self.assertHandlerEqual('Branch.lock_write',
1621
smart.branch.SmartServerBranchRequestLockWrite)
1622
self.assertHandlerEqual('Branch.last_revision_info',
1623
smart.branch.SmartServerBranchRequestLastRevisionInfo)
1624
self.assertHandlerEqual('Branch.revision_history',
1625
smart.branch.SmartServerRequestRevisionHistory)
1626
self.assertHandlerEqual('Branch.set_config_option',
1627
smart.branch.SmartServerBranchRequestSetConfigOption)
1628
self.assertHandlerEqual('Branch.set_last_revision',
1629
smart.branch.SmartServerBranchRequestSetLastRevision)
1630
self.assertHandlerEqual('Branch.set_last_revision_info',
1631
smart.branch.SmartServerBranchRequestSetLastRevisionInfo)
1632
self.assertHandlerEqual('Branch.set_last_revision_ex',
1633
smart.branch.SmartServerBranchRequestSetLastRevisionEx)
1634
self.assertHandlerEqual('Branch.set_parent_location',
1635
smart.branch.SmartServerBranchRequestSetParentLocation)
1636
self.assertHandlerEqual('Branch.unlock',
1637
smart.branch.SmartServerBranchRequestUnlock)
1638
self.assertHandlerEqual('BzrDir.find_repository',
1639
smart.bzrdir.SmartServerRequestFindRepositoryV1)
1640
self.assertHandlerEqual('BzrDir.find_repositoryV2',
1641
smart.bzrdir.SmartServerRequestFindRepositoryV2)
1642
self.assertHandlerEqual('BzrDirFormat.initialize',
1643
smart.bzrdir.SmartServerRequestInitializeBzrDir)
1644
self.assertHandlerEqual('BzrDirFormat.initialize_ex_1.16',
1645
smart.bzrdir.SmartServerRequestBzrDirInitializeEx)
1646
self.assertHandlerEqual('BzrDir.cloning_metadir',
1647
smart.bzrdir.SmartServerBzrDirRequestCloningMetaDir)
1648
self.assertHandlerEqual('BzrDir.get_config_file',
1649
smart.bzrdir.SmartServerBzrDirRequestConfigFile)
1650
self.assertHandlerEqual('BzrDir.open_branch',
1651
smart.bzrdir.SmartServerRequestOpenBranch)
1652
self.assertHandlerEqual('BzrDir.open_branchV2',
1653
smart.bzrdir.SmartServerRequestOpenBranchV2)
1654
self.assertHandlerEqual('PackRepository.autopack',
1655
smart.packrepository.SmartServerPackRepositoryAutopack)
1656
self.assertHandlerEqual('Repository.gather_stats',
1657
smart.repository.SmartServerRepositoryGatherStats)
1658
self.assertHandlerEqual('Repository.get_parent_map',
1659
smart.repository.SmartServerRepositoryGetParentMap)
1660
self.assertHandlerEqual('Repository.get_rev_id_for_revno',
1661
smart.repository.SmartServerRepositoryGetRevIdForRevno)
1662
self.assertHandlerEqual('Repository.get_revision_graph',
1663
smart.repository.SmartServerRepositoryGetRevisionGraph)
1664
self.assertHandlerEqual('Repository.get_stream',
1665
smart.repository.SmartServerRepositoryGetStream)
1666
self.assertHandlerEqual('Repository.has_revision',
1667
smart.repository.SmartServerRequestHasRevision)
1668
self.assertHandlerEqual('Repository.insert_stream',
1669
smart.repository.SmartServerRepositoryInsertStream)
1670
self.assertHandlerEqual('Repository.insert_stream_locked',
1671
smart.repository.SmartServerRepositoryInsertStreamLocked)
1672
self.assertHandlerEqual('Repository.is_shared',
1673
smart.repository.SmartServerRepositoryIsShared)
1674
self.assertHandlerEqual('Repository.lock_write',
1675
smart.repository.SmartServerRepositoryLockWrite)
1676
self.assertHandlerEqual('Repository.tarball',
1677
smart.repository.SmartServerRepositoryTarball)
1678
self.assertHandlerEqual('Repository.unlock',
1679
smart.repository.SmartServerRepositoryUnlock)
1680
self.assertHandlerEqual('Transport.is_readonly',
1681
smart.request.SmartServerIsReadonly)