1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the smart wire/domain protocol.
19
This module contains tests for the domain-level smart requests and responses,
20
such as the 'Branch.lock_write' request. Many of these use specific disk
21
formats to exercise calls that only make sense for formats with specific
24
Tests for low-level protocol encoding are found in test_smart_transport.
28
from cStringIO import StringIO
39
from bzrlib.branch import Branch, BranchReferenceFormat
40
import bzrlib.smart.branch
41
import bzrlib.smart.bzrdir, bzrlib.smart.bzrdir as smart_dir
42
import bzrlib.smart.packrepository
43
import bzrlib.smart.repository
44
from bzrlib.smart.request import (
45
FailedSmartServerResponse,
48
SuccessfulSmartServerResponse,
50
from bzrlib.tests import (
53
from bzrlib.transport import chroot, get_transport
54
from bzrlib.util import bencode
57
def load_tests(standard_tests, module, loader):
58
"""Multiply tests version and protocol consistency."""
59
# FindRepository tests.
60
bzrdir_mod = bzrlib.smart.bzrdir
63
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
64
("find_repositoryV2", {
65
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
66
("find_repositoryV3", {
67
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV3}),
69
to_adapt, result = split_suite_by_re(standard_tests,
70
"TestSmartServerRequestFindRepository")
71
v2_only, v1_and_2 = split_suite_by_re(to_adapt,
73
tests.multiply_tests(v1_and_2, scenarios, result)
74
# The first scenario is only applicable to v1 protocols, it is deleted
76
tests.multiply_tests(v2_only, scenarios[1:], result)
80
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
83
tests.TestCaseWithTransport.setUp(self)
84
self._chroot_server = None
86
def get_transport(self, relpath=None):
87
if self._chroot_server is None:
88
backing_transport = tests.TestCaseWithTransport.get_transport(self)
89
self._chroot_server = chroot.ChrootServer(backing_transport)
90
self._chroot_server.setUp()
91
self.addCleanup(self._chroot_server.tearDown)
92
t = get_transport(self._chroot_server.get_url())
93
if relpath is not None:
98
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
101
super(TestCaseWithSmartMedium, self).setUp()
102
# We're allowed to set the transport class here, so that we don't use
103
# the default or a parameterized class, but rather use the
104
# TestCaseWithTransport infrastructure to set up a smart server and
106
self.transport_server = self.make_transport_server
108
def make_transport_server(self):
109
return smart.server.SmartTCPServer_for_testing('-' + self.id())
111
def get_smart_medium(self):
112
"""Get a smart medium to use in tests."""
113
return self.get_transport().get_smart_medium()
116
class TestSmartServerResponse(tests.TestCase):
118
def test__eq__(self):
119
self.assertEqual(SmartServerResponse(('ok', )),
120
SmartServerResponse(('ok', )))
121
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
122
SmartServerResponse(('ok', ), 'body'))
123
self.assertNotEqual(SmartServerResponse(('ok', )),
124
SmartServerResponse(('notok', )))
125
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
126
SmartServerResponse(('ok', )))
127
self.assertNotEqual(None,
128
SmartServerResponse(('ok', )))
130
def test__str__(self):
131
"""SmartServerResponses can be stringified."""
133
"<SuccessfulSmartServerResponse args=('args',) body='body'>",
134
str(SuccessfulSmartServerResponse(('args',), 'body')))
136
"<FailedSmartServerResponse args=('args',) body='body'>",
137
str(FailedSmartServerResponse(('args',), 'body')))
140
class TestSmartServerRequest(tests.TestCaseWithMemoryTransport):
142
def test_translate_client_path(self):
143
transport = self.get_transport()
144
request = SmartServerRequest(transport, 'foo/')
145
self.assertEqual('./', request.translate_client_path('foo/'))
147
errors.InvalidURLJoin, request.translate_client_path, 'foo/..')
149
errors.PathNotChild, request.translate_client_path, '/')
151
errors.PathNotChild, request.translate_client_path, 'bar/')
152
self.assertEqual('./baz', request.translate_client_path('foo/baz'))
154
def test_transport_from_client_path(self):
155
transport = self.get_transport()
156
request = SmartServerRequest(transport, 'foo/')
159
request.transport_from_client_path('foo/').base)
162
class TestSmartServerBzrDirRequestCloningMetaDir(
163
tests.TestCaseWithMemoryTransport):
164
"""Tests for BzrDir.cloning_metadir."""
166
def test_cloning_metadir(self):
167
"""When there is a bzrdir present, the call succeeds."""
168
backing = self.get_transport()
169
dir = self.make_bzrdir('.')
170
local_result = dir.cloning_metadir()
171
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
172
request = request_class(backing)
173
expected = SuccessfulSmartServerResponse(
174
(local_result.network_name(),
175
local_result.repository_format.network_name(),
176
('branch', local_result.get_branch_format().network_name())))
177
self.assertEqual(expected, request.execute('', 'False'))
179
def test_cloning_metadir_reference(self):
180
"""The request fails when bzrdir contains a branch reference."""
181
backing = self.get_transport()
182
referenced_branch = self.make_branch('referenced')
183
dir = self.make_bzrdir('.')
184
local_result = dir.cloning_metadir()
185
reference = BranchReferenceFormat().initialize(dir, referenced_branch)
186
reference_url = BranchReferenceFormat().get_reference(dir)
187
# The server shouldn't try to follow the branch reference, so it's fine
188
# if the referenced branch isn't reachable.
189
backing.rename('referenced', 'moved')
190
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
191
request = request_class(backing)
192
expected = FailedSmartServerResponse(('BranchReference',))
193
self.assertEqual(expected, request.execute('', 'False'))
196
class TestSmartServerRequestCreateRepository(tests.TestCaseWithMemoryTransport):
197
"""Tests for BzrDir.create_repository."""
199
def test_makes_repository(self):
200
"""When there is a bzrdir present, the call succeeds."""
201
backing = self.get_transport()
202
self.make_bzrdir('.')
203
request_class = bzrlib.smart.bzrdir.SmartServerRequestCreateRepository
204
request = request_class(backing)
205
reference_bzrdir_format = bzrdir.format_registry.get('default')()
206
reference_format = reference_bzrdir_format.repository_format
207
network_name = reference_format.network_name()
208
expected = SuccessfulSmartServerResponse(
209
('ok', 'no', 'no', 'no', network_name))
210
self.assertEqual(expected, request.execute('', network_name, 'True'))
213
class TestSmartServerRequestFindRepository(tests.TestCaseWithMemoryTransport):
214
"""Tests for BzrDir.find_repository."""
216
def test_no_repository(self):
217
"""When there is no repository to be found, ('norepository', ) is returned."""
218
backing = self.get_transport()
219
request = self._request_class(backing)
220
self.make_bzrdir('.')
221
self.assertEqual(SmartServerResponse(('norepository', )),
224
def test_nonshared_repository(self):
225
# nonshared repositorys only allow 'find' to return a handle when the
226
# path the repository is being searched on is the same as that that
227
# the repository is at.
228
backing = self.get_transport()
229
request = self._request_class(backing)
230
result = self._make_repository_and_result()
231
self.assertEqual(result, request.execute(''))
232
self.make_bzrdir('subdir')
233
self.assertEqual(SmartServerResponse(('norepository', )),
234
request.execute('subdir'))
236
def _make_repository_and_result(self, shared=False, format=None):
237
"""Convenience function to setup a repository.
239
:result: The SmartServerResponse to expect when opening it.
241
repo = self.make_repository('.', shared=shared, format=format)
242
if repo.supports_rich_root():
246
if repo._format.supports_tree_reference:
250
if (smart.bzrdir.SmartServerRequestFindRepositoryV3 ==
251
self._request_class):
252
return SuccessfulSmartServerResponse(
253
('ok', '', rich_root, subtrees, 'no',
254
repo._format.network_name()))
255
elif (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
256
self._request_class):
257
# All tests so far are on formats, and for non-external
259
return SuccessfulSmartServerResponse(
260
('ok', '', rich_root, subtrees, 'no'))
262
return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
264
def test_shared_repository(self):
265
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
266
backing = self.get_transport()
267
request = self._request_class(backing)
268
result = self._make_repository_and_result(shared=True)
269
self.assertEqual(result, request.execute(''))
270
self.make_bzrdir('subdir')
271
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
272
self.assertEqual(result2,
273
request.execute('subdir'))
274
self.make_bzrdir('subdir/deeper')
275
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
276
self.assertEqual(result3,
277
request.execute('subdir/deeper'))
279
def test_rich_root_and_subtree_encoding(self):
280
"""Test for the format attributes for rich root and subtree support."""
281
backing = self.get_transport()
282
request = self._request_class(backing)
283
result = self._make_repository_and_result(format='dirstate-with-subtree')
284
# check the test will be valid
285
self.assertEqual('yes', result.args[2])
286
self.assertEqual('yes', result.args[3])
287
self.assertEqual(result, request.execute(''))
289
def test_supports_external_lookups_no_v2(self):
290
"""Test for the supports_external_lookups attribute."""
291
backing = self.get_transport()
292
request = self._request_class(backing)
293
result = self._make_repository_and_result(format='dirstate-with-subtree')
294
# check the test will be valid
295
self.assertEqual('no', result.args[4])
296
self.assertEqual(result, request.execute(''))
299
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
301
def test_empty_dir(self):
302
"""Initializing an empty dir should succeed and do it."""
303
backing = self.get_transport()
304
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
305
self.assertEqual(SmartServerResponse(('ok', )),
307
made_dir = bzrdir.BzrDir.open_from_transport(backing)
308
# no branch, tree or repository is expected with the current
310
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
311
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
312
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
314
def test_missing_dir(self):
315
"""Initializing a missing directory should fail like the bzrdir api."""
316
backing = self.get_transport()
317
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
318
self.assertRaises(errors.NoSuchFile,
319
request.execute, 'subdir')
321
def test_initialized_dir(self):
322
"""Initializing an extant bzrdir should fail like the bzrdir api."""
323
backing = self.get_transport()
324
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
325
self.make_bzrdir('subdir')
326
self.assertRaises(errors.FileExists,
327
request.execute, 'subdir')
330
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
332
def test_no_branch(self):
333
"""When there is no branch, ('nobranch', ) is returned."""
334
backing = self.get_transport()
335
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
336
self.make_bzrdir('.')
337
self.assertEqual(SmartServerResponse(('nobranch', )),
340
def test_branch(self):
341
"""When there is a branch, 'ok' is returned."""
342
backing = self.get_transport()
343
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
344
self.make_branch('.')
345
self.assertEqual(SmartServerResponse(('ok', '')),
348
def test_branch_reference(self):
349
"""When there is a branch reference, the reference URL is returned."""
350
backing = self.get_transport()
351
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
352
branch = self.make_branch('branch')
353
checkout = branch.create_checkout('reference',lightweight=True)
354
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
355
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
356
self.assertEqual(SmartServerResponse(('ok', reference_url)),
357
request.execute('reference'))
360
class TestSmartServerRequestOpenBranchV2(TestCaseWithChrootedTransport):
362
def test_no_branch(self):
363
"""When there is no branch, ('nobranch', ) is returned."""
364
backing = self.get_transport()
365
self.make_bzrdir('.')
366
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
367
self.assertEqual(SmartServerResponse(('nobranch', )),
370
def test_branch(self):
371
"""When there is a branch, 'ok' is returned."""
372
backing = self.get_transport()
373
expected = self.make_branch('.')._format.network_name()
374
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
375
self.assertEqual(SuccessfulSmartServerResponse(('branch', expected)),
378
def test_branch_reference(self):
379
"""When there is a branch reference, the reference URL is returned."""
380
backing = self.get_transport()
381
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
382
branch = self.make_branch('branch')
383
checkout = branch.create_checkout('reference',lightweight=True)
384
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
385
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
386
self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
387
request.execute('reference'))
389
def test_stacked_branch(self):
390
"""Opening a stacked branch does not open the stacked-on branch."""
391
trunk = self.make_branch('trunk')
392
feature = self.make_branch('feature', format='1.9')
393
feature.set_stacked_on_url(trunk.base)
395
Branch.hooks.install_named_hook('open', opened_branches.append, None)
396
backing = self.get_transport()
397
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
400
response = request.execute('feature')
402
request.teardown_jail()
403
expected_format = feature._format.network_name()
405
SuccessfulSmartServerResponse(('branch', expected_format)),
407
self.assertLength(1, opened_branches)
410
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
412
def test_empty(self):
413
"""For an empty branch, the body is empty."""
414
backing = self.get_transport()
415
request = smart.branch.SmartServerRequestRevisionHistory(backing)
416
self.make_branch('.')
417
self.assertEqual(SmartServerResponse(('ok', ), ''),
420
def test_not_empty(self):
421
"""For a non-empty branch, the body is empty."""
422
backing = self.get_transport()
423
request = smart.branch.SmartServerRequestRevisionHistory(backing)
424
tree = self.make_branch_and_memory_tree('.')
427
r1 = tree.commit('1st commit')
428
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
431
SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
435
class TestSmartServerBranchRequest(tests.TestCaseWithMemoryTransport):
437
def test_no_branch(self):
438
"""When there is a bzrdir and no branch, NotBranchError is raised."""
439
backing = self.get_transport()
440
request = smart.branch.SmartServerBranchRequest(backing)
441
self.make_bzrdir('.')
442
self.assertRaises(errors.NotBranchError,
445
def test_branch_reference(self):
446
"""When there is a branch reference, NotBranchError is raised."""
447
backing = self.get_transport()
448
request = smart.branch.SmartServerBranchRequest(backing)
449
branch = self.make_branch('branch')
450
checkout = branch.create_checkout('reference',lightweight=True)
451
self.assertRaises(errors.NotBranchError,
452
request.execute, 'checkout')
455
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithMemoryTransport):
457
def test_empty(self):
458
"""For an empty branch, the result is ('ok', '0', 'null:')."""
459
backing = self.get_transport()
460
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
461
self.make_branch('.')
462
self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
465
def test_not_empty(self):
466
"""For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
467
backing = self.get_transport()
468
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
469
tree = self.make_branch_and_memory_tree('.')
472
rev_id_utf8 = u'\xc8'.encode('utf-8')
473
r1 = tree.commit('1st commit')
474
r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
477
SmartServerResponse(('ok', '2', rev_id_utf8)),
481
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithMemoryTransport):
483
def test_default(self):
484
"""With no file, we get empty content."""
485
backing = self.get_transport()
486
request = smart.branch.SmartServerBranchGetConfigFile(backing)
487
branch = self.make_branch('.')
488
# there should be no file by default
490
self.assertEqual(SmartServerResponse(('ok', ), content),
493
def test_with_content(self):
494
# SmartServerBranchGetConfigFile should return the content from
495
# branch.control_files.get('branch.conf') for now - in the future it may
496
# perform more complex processing.
497
backing = self.get_transport()
498
request = smart.branch.SmartServerBranchGetConfigFile(backing)
499
branch = self.make_branch('.')
500
branch._transport.put_bytes('branch.conf', 'foo bar baz')
501
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
505
class TestLockedBranch(tests.TestCaseWithMemoryTransport):
507
def get_lock_tokens(self, branch):
508
branch_token = branch.lock_write()
509
repo_token = branch.repository.lock_write()
510
branch.repository.unlock()
511
return branch_token, repo_token
514
class TestSmartServerBranchRequestSetConfigOption(TestLockedBranch):
516
def test_value_name(self):
517
branch = self.make_branch('.')
518
request = smart.branch.SmartServerBranchRequestSetConfigOption(
519
branch.bzrdir.root_transport)
520
branch_token, repo_token = self.get_lock_tokens(branch)
521
config = branch._get_config()
522
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
524
self.assertEqual(SuccessfulSmartServerResponse(()), result)
525
self.assertEqual('bar', config.get_option('foo'))
527
def test_value_name_section(self):
528
branch = self.make_branch('.')
529
request = smart.branch.SmartServerBranchRequestSetConfigOption(
530
branch.bzrdir.root_transport)
531
branch_token, repo_token = self.get_lock_tokens(branch)
532
config = branch._get_config()
533
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
535
self.assertEqual(SuccessfulSmartServerResponse(()), result)
536
self.assertEqual('bar', config.get_option('foo', 'gam'))
539
class SetLastRevisionTestBase(TestLockedBranch):
540
"""Base test case for verbs that implement set_last_revision."""
543
tests.TestCaseWithMemoryTransport.setUp(self)
544
backing_transport = self.get_transport()
545
self.request = self.request_class(backing_transport)
546
self.tree = self.make_branch_and_memory_tree('.')
548
def lock_branch(self):
549
return self.get_lock_tokens(self.tree.branch)
551
def unlock_branch(self):
552
self.tree.branch.unlock()
554
def set_last_revision(self, revision_id, revno):
555
branch_token, repo_token = self.lock_branch()
556
response = self._set_last_revision(
557
revision_id, revno, branch_token, repo_token)
561
def assertRequestSucceeds(self, revision_id, revno):
562
response = self.set_last_revision(revision_id, revno)
563
self.assertEqual(SuccessfulSmartServerResponse(('ok',)), response)
566
class TestSetLastRevisionVerbMixin(object):
567
"""Mixin test case for verbs that implement set_last_revision."""
569
def test_set_null_to_null(self):
570
"""An empty branch can have its last revision set to 'null:'."""
571
self.assertRequestSucceeds('null:', 0)
573
def test_NoSuchRevision(self):
574
"""If the revision_id is not present, the verb returns NoSuchRevision.
576
revision_id = 'non-existent revision'
578
FailedSmartServerResponse(('NoSuchRevision', revision_id)),
579
self.set_last_revision(revision_id, 1))
581
def make_tree_with_two_commits(self):
582
self.tree.lock_write()
584
rev_id_utf8 = u'\xc8'.encode('utf-8')
585
r1 = self.tree.commit('1st commit', rev_id=rev_id_utf8)
586
r2 = self.tree.commit('2nd commit', rev_id='rev-2')
589
def test_branch_last_revision_info_is_updated(self):
590
"""A branch's tip can be set to a revision that is present in its
593
# Make a branch with an empty revision history, but two revisions in
595
self.make_tree_with_two_commits()
596
rev_id_utf8 = u'\xc8'.encode('utf-8')
597
self.tree.branch.set_revision_history([])
599
(0, 'null:'), self.tree.branch.last_revision_info())
600
# We can update the branch to a revision that is present in the
602
self.assertRequestSucceeds(rev_id_utf8, 1)
604
(1, rev_id_utf8), self.tree.branch.last_revision_info())
606
def test_branch_last_revision_info_rewind(self):
607
"""A branch's tip can be set to a revision that is an ancestor of the
610
self.make_tree_with_two_commits()
611
rev_id_utf8 = u'\xc8'.encode('utf-8')
613
(2, 'rev-2'), self.tree.branch.last_revision_info())
614
self.assertRequestSucceeds(rev_id_utf8, 1)
616
(1, rev_id_utf8), self.tree.branch.last_revision_info())
618
def test_TipChangeRejected(self):
619
"""If a pre_change_branch_tip hook raises TipChangeRejected, the verb
620
returns TipChangeRejected.
622
rejection_message = u'rejection message\N{INTERROBANG}'
623
def hook_that_rejects(params):
624
raise errors.TipChangeRejected(rejection_message)
625
Branch.hooks.install_named_hook(
626
'pre_change_branch_tip', hook_that_rejects, None)
628
FailedSmartServerResponse(
629
('TipChangeRejected', rejection_message.encode('utf-8'))),
630
self.set_last_revision('null:', 0))
633
class TestSmartServerBranchRequestSetLastRevision(
634
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
635
"""Tests for Branch.set_last_revision verb."""
637
request_class = smart.branch.SmartServerBranchRequestSetLastRevision
639
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
640
return self.request.execute(
641
'', branch_token, repo_token, revision_id)
644
class TestSmartServerBranchRequestSetLastRevisionInfo(
645
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
646
"""Tests for Branch.set_last_revision_info verb."""
648
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionInfo
650
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
651
return self.request.execute(
652
'', branch_token, repo_token, revno, revision_id)
654
def test_NoSuchRevision(self):
655
"""Branch.set_last_revision_info does not have to return
656
NoSuchRevision if the revision_id is absent.
658
raise tests.TestNotApplicable()
661
class TestSmartServerBranchRequestSetLastRevisionEx(
662
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
663
"""Tests for Branch.set_last_revision_ex verb."""
665
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionEx
667
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
668
return self.request.execute(
669
'', branch_token, repo_token, revision_id, 0, 0)
671
def assertRequestSucceeds(self, revision_id, revno):
672
response = self.set_last_revision(revision_id, revno)
674
SuccessfulSmartServerResponse(('ok', revno, revision_id)),
677
def test_branch_last_revision_info_rewind(self):
678
"""A branch's tip can be set to a revision that is an ancestor of the
679
current tip, but only if allow_overwrite_descendant is passed.
681
self.make_tree_with_two_commits()
682
rev_id_utf8 = u'\xc8'.encode('utf-8')
684
(2, 'rev-2'), self.tree.branch.last_revision_info())
685
# If allow_overwrite_descendant flag is 0, then trying to set the tip
686
# to an older revision ID has no effect.
687
branch_token, repo_token = self.lock_branch()
688
response = self.request.execute(
689
'', branch_token, repo_token, rev_id_utf8, 0, 0)
691
SuccessfulSmartServerResponse(('ok', 2, 'rev-2')),
694
(2, 'rev-2'), self.tree.branch.last_revision_info())
696
# If allow_overwrite_descendant flag is 1, then setting the tip to an
698
response = self.request.execute(
699
'', branch_token, repo_token, rev_id_utf8, 0, 1)
701
SuccessfulSmartServerResponse(('ok', 1, rev_id_utf8)),
705
(1, rev_id_utf8), self.tree.branch.last_revision_info())
707
def make_branch_with_divergent_history(self):
708
"""Make a branch with divergent history in its repo.
710
The branch's tip will be 'child-2', and the repo will also contain
711
'child-1', which diverges from a common base revision.
713
self.tree.lock_write()
715
r1 = self.tree.commit('1st commit')
716
revno_1, revid_1 = self.tree.branch.last_revision_info()
717
r2 = self.tree.commit('2nd commit', rev_id='child-1')
718
# Undo the second commit
719
self.tree.branch.set_last_revision_info(revno_1, revid_1)
720
self.tree.set_parent_ids([revid_1])
721
# Make a new second commit, child-2. child-2 has diverged from
723
new_r2 = self.tree.commit('2nd commit', rev_id='child-2')
726
def test_not_allow_diverged(self):
727
"""If allow_diverged is not passed, then setting a divergent history
728
returns a Diverged error.
730
self.make_branch_with_divergent_history()
732
FailedSmartServerResponse(('Diverged',)),
733
self.set_last_revision('child-1', 2))
734
# The branch tip was not changed.
735
self.assertEqual('child-2', self.tree.branch.last_revision())
737
def test_allow_diverged(self):
738
"""If allow_diverged is passed, then setting a divergent history
741
self.make_branch_with_divergent_history()
742
branch_token, repo_token = self.lock_branch()
743
response = self.request.execute(
744
'', branch_token, repo_token, 'child-1', 1, 0)
746
SuccessfulSmartServerResponse(('ok', 2, 'child-1')),
749
# The branch tip was changed.
750
self.assertEqual('child-1', self.tree.branch.last_revision())
753
class TestSmartServerBranchRequestGetParent(tests.TestCaseWithMemoryTransport):
755
def test_get_parent_none(self):
756
base_branch = self.make_branch('base')
757
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
758
response = request.execute('base')
760
SuccessfulSmartServerResponse(('',)), response)
762
def test_get_parent_something(self):
763
base_branch = self.make_branch('base')
764
base_branch.set_parent(self.get_url('foo'))
765
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
766
response = request.execute('base')
768
SuccessfulSmartServerResponse(("../foo",)),
772
class TestSmartServerBranchRequestGetTagsBytes(tests.TestCaseWithMemoryTransport):
773
# Only called when the branch format and tags match [yay factory
774
# methods] so only need to test straight forward cases.
776
def test_get_bytes(self):
777
base_branch = self.make_branch('base')
778
request = smart.branch.SmartServerBranchGetTagsBytes(
779
self.get_transport())
780
response = request.execute('base')
782
SuccessfulSmartServerResponse(('',)), response)
785
class TestSmartServerBranchRequestGetStackedOnURL(tests.TestCaseWithMemoryTransport):
787
def test_get_stacked_on_url(self):
788
base_branch = self.make_branch('base', format='1.6')
789
stacked_branch = self.make_branch('stacked', format='1.6')
790
# typically should be relative
791
stacked_branch.set_stacked_on_url('../base')
792
request = smart.branch.SmartServerBranchRequestGetStackedOnURL(
793
self.get_transport())
794
response = request.execute('stacked')
796
SmartServerResponse(('ok', '../base')),
800
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
803
tests.TestCaseWithMemoryTransport.setUp(self)
805
def test_lock_write_on_unlocked_branch(self):
806
backing = self.get_transport()
807
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
808
branch = self.make_branch('.', format='knit')
809
repository = branch.repository
810
response = request.execute('')
811
branch_nonce = branch.control_files._lock.peek().get('nonce')
812
repository_nonce = repository.control_files._lock.peek().get('nonce')
814
SmartServerResponse(('ok', branch_nonce, repository_nonce)),
816
# The branch (and associated repository) is now locked. Verify that
817
# with a new branch object.
818
new_branch = repository.bzrdir.open_branch()
819
self.assertRaises(errors.LockContention, new_branch.lock_write)
821
def test_lock_write_on_locked_branch(self):
822
backing = self.get_transport()
823
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
824
branch = self.make_branch('.')
826
branch.leave_lock_in_place()
828
response = request.execute('')
830
SmartServerResponse(('LockContention',)), response)
832
def test_lock_write_with_tokens_on_locked_branch(self):
833
backing = self.get_transport()
834
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
835
branch = self.make_branch('.', format='knit')
836
branch_token = branch.lock_write()
837
repo_token = branch.repository.lock_write()
838
branch.repository.unlock()
839
branch.leave_lock_in_place()
840
branch.repository.leave_lock_in_place()
842
response = request.execute('',
843
branch_token, repo_token)
845
SmartServerResponse(('ok', branch_token, repo_token)), response)
847
def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
848
backing = self.get_transport()
849
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
850
branch = self.make_branch('.', format='knit')
851
branch_token = branch.lock_write()
852
repo_token = branch.repository.lock_write()
853
branch.repository.unlock()
854
branch.leave_lock_in_place()
855
branch.repository.leave_lock_in_place()
857
response = request.execute('',
858
branch_token+'xxx', repo_token)
860
SmartServerResponse(('TokenMismatch',)), response)
862
def test_lock_write_on_locked_repo(self):
863
backing = self.get_transport()
864
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
865
branch = self.make_branch('.', format='knit')
866
branch.repository.lock_write()
867
branch.repository.leave_lock_in_place()
868
branch.repository.unlock()
869
response = request.execute('')
871
SmartServerResponse(('LockContention',)), response)
873
def test_lock_write_on_readonly_transport(self):
874
backing = self.get_readonly_transport()
875
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
876
branch = self.make_branch('.')
877
root = self.get_transport().clone('/')
878
path = urlutils.relative_url(root.base, self.get_transport().base)
879
response = request.execute(path)
880
error_name, lock_str, why_str = response.args
881
self.assertFalse(response.is_successful())
882
self.assertEqual('LockFailed', error_name)
885
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithMemoryTransport):
888
tests.TestCaseWithMemoryTransport.setUp(self)
890
def test_unlock_on_locked_branch_and_repo(self):
891
backing = self.get_transport()
892
request = smart.branch.SmartServerBranchRequestUnlock(backing)
893
branch = self.make_branch('.', format='knit')
895
branch_token = branch.lock_write()
896
repo_token = branch.repository.lock_write()
897
branch.repository.unlock()
898
# Unlock the branch (and repo) object, leaving the physical locks
900
branch.leave_lock_in_place()
901
branch.repository.leave_lock_in_place()
903
response = request.execute('',
904
branch_token, repo_token)
906
SmartServerResponse(('ok',)), response)
907
# The branch is now unlocked. Verify that with a new branch
909
new_branch = branch.bzrdir.open_branch()
910
new_branch.lock_write()
913
def test_unlock_on_unlocked_branch_unlocked_repo(self):
914
backing = self.get_transport()
915
request = smart.branch.SmartServerBranchRequestUnlock(backing)
916
branch = self.make_branch('.', format='knit')
917
response = request.execute(
918
'', 'branch token', 'repo token')
920
SmartServerResponse(('TokenMismatch',)), response)
922
def test_unlock_on_unlocked_branch_locked_repo(self):
923
backing = self.get_transport()
924
request = smart.branch.SmartServerBranchRequestUnlock(backing)
925
branch = self.make_branch('.', format='knit')
926
# Lock the repository.
927
repo_token = branch.repository.lock_write()
928
branch.repository.leave_lock_in_place()
929
branch.repository.unlock()
930
# Issue branch lock_write request on the unlocked branch (with locked
932
response = request.execute(
933
'', 'branch token', repo_token)
935
SmartServerResponse(('TokenMismatch',)), response)
938
class TestSmartServerRepositoryRequest(tests.TestCaseWithMemoryTransport):
940
def test_no_repository(self):
941
"""Raise NoRepositoryPresent when there is a bzrdir and no repo."""
942
# we test this using a shared repository above the named path,
943
# thus checking the right search logic is used - that is, that
944
# its the exact path being looked at and the server is not
946
backing = self.get_transport()
947
request = smart.repository.SmartServerRepositoryRequest(backing)
948
self.make_repository('.', shared=True)
949
self.make_bzrdir('subdir')
950
self.assertRaises(errors.NoRepositoryPresent,
951
request.execute, 'subdir')
954
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithMemoryTransport):
956
def test_trivial_bzipped(self):
957
# This tests that the wire encoding is actually bzipped
958
backing = self.get_transport()
959
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
960
tree = self.make_branch_and_memory_tree('.')
962
self.assertEqual(None,
963
request.execute('', 'missing-id'))
964
# Note that it returns a body that is bzipped.
966
SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
967
request.do_body('\n\n0\n'))
969
def test_trivial_include_missing(self):
970
backing = self.get_transport()
971
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
972
tree = self.make_branch_and_memory_tree('.')
974
self.assertEqual(None,
975
request.execute('', 'missing-id', 'include-missing:'))
977
SuccessfulSmartServerResponse(('ok', ),
978
bz2.compress('missing:missing-id')),
979
request.do_body('\n\n0\n'))
982
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithMemoryTransport):
984
def test_none_argument(self):
985
backing = self.get_transport()
986
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
987
tree = self.make_branch_and_memory_tree('.')
990
r1 = tree.commit('1st commit')
991
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
994
# the lines of revision_id->revision_parent_list has no guaranteed
995
# order coming out of a dict, so sort both our test and response
996
lines = sorted([' '.join([r2, r1]), r1])
997
response = request.execute('', '')
998
response.body = '\n'.join(sorted(response.body.split('\n')))
1001
SmartServerResponse(('ok', ), '\n'.join(lines)), response)
1003
def test_specific_revision_argument(self):
1004
backing = self.get_transport()
1005
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1006
tree = self.make_branch_and_memory_tree('.')
1009
rev_id_utf8 = u'\xc9'.encode('utf-8')
1010
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
1011
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1014
self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
1015
request.execute('', rev_id_utf8))
1017
def test_no_such_revision(self):
1018
backing = self.get_transport()
1019
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1020
tree = self.make_branch_and_memory_tree('.')
1023
r1 = tree.commit('1st commit')
1026
# Note that it still returns body (of zero bytes).
1028
SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
1029
request.execute('', 'missingrevision'))
1032
class TestSmartServerRepositoryGetStream(tests.TestCaseWithMemoryTransport):
1034
def make_two_commit_repo(self):
1035
tree = self.make_branch_and_memory_tree('.')
1038
r1 = tree.commit('1st commit')
1039
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1041
repo = tree.branch.repository
1044
def test_ancestry_of(self):
1045
"""The search argument may be a 'ancestry-of' some heads'."""
1046
backing = self.get_transport()
1047
request = smart.repository.SmartServerRepositoryGetStream(backing)
1048
repo, r1, r2 = self.make_two_commit_repo()
1049
fetch_spec = ['ancestry-of', r2]
1050
lines = '\n'.join(fetch_spec)
1051
request.execute('', repo._format.network_name())
1052
response = request.do_body(lines)
1053
self.assertEqual(('ok',), response.args)
1054
stream_bytes = ''.join(response.body_stream)
1055
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1057
def test_search(self):
1058
"""The search argument may be a 'search' of some explicit keys."""
1059
backing = self.get_transport()
1060
request = smart.repository.SmartServerRepositoryGetStream(backing)
1061
repo, r1, r2 = self.make_two_commit_repo()
1062
fetch_spec = ['search', '%s %s' % (r1, r2), 'null:', '2']
1063
lines = '\n'.join(fetch_spec)
1064
request.execute('', repo._format.network_name())
1065
response = request.do_body(lines)
1066
self.assertEqual(('ok',), response.args)
1067
stream_bytes = ''.join(response.body_stream)
1068
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1071
class TestSmartServerRequestHasRevision(tests.TestCaseWithMemoryTransport):
1073
def test_missing_revision(self):
1074
"""For a missing revision, ('no', ) is returned."""
1075
backing = self.get_transport()
1076
request = smart.repository.SmartServerRequestHasRevision(backing)
1077
self.make_repository('.')
1078
self.assertEqual(SmartServerResponse(('no', )),
1079
request.execute('', 'revid'))
1081
def test_present_revision(self):
1082
"""For a present revision, ('yes', ) is returned."""
1083
backing = self.get_transport()
1084
request = smart.repository.SmartServerRequestHasRevision(backing)
1085
tree = self.make_branch_and_memory_tree('.')
1088
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1089
r1 = tree.commit('a commit', rev_id=rev_id_utf8)
1091
self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
1092
self.assertEqual(SmartServerResponse(('yes', )),
1093
request.execute('', rev_id_utf8))
1096
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithMemoryTransport):
1098
def test_empty_revid(self):
1099
"""With an empty revid, we get only size an number and revisions"""
1100
backing = self.get_transport()
1101
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1102
repository = self.make_repository('.')
1103
stats = repository.gather_stats()
1104
expected_body = 'revisions: 0\n'
1105
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1106
request.execute('', '', 'no'))
1108
def test_revid_with_committers(self):
1109
"""For a revid we get more infos."""
1110
backing = self.get_transport()
1111
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1112
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1113
tree = self.make_branch_and_memory_tree('.')
1116
# Let's build a predictable result
1117
tree.commit('a commit', timestamp=123456.2, timezone=3600)
1118
tree.commit('a commit', timestamp=654321.4, timezone=0,
1122
stats = tree.branch.repository.gather_stats()
1123
expected_body = ('firstrev: 123456.200 3600\n'
1124
'latestrev: 654321.400 0\n'
1126
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1130
def test_not_empty_repository_with_committers(self):
1131
"""For a revid and requesting committers we get the whole thing."""
1132
backing = self.get_transport()
1133
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1134
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1135
tree = self.make_branch_and_memory_tree('.')
1138
# Let's build a predictable result
1139
tree.commit('a commit', timestamp=123456.2, timezone=3600,
1141
tree.commit('a commit', timestamp=654321.4, timezone=0,
1142
committer='bar', rev_id=rev_id_utf8)
1144
stats = tree.branch.repository.gather_stats()
1146
expected_body = ('committers: 2\n'
1147
'firstrev: 123456.200 3600\n'
1148
'latestrev: 654321.400 0\n'
1150
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1152
rev_id_utf8, 'yes'))
1155
class TestSmartServerRepositoryIsShared(tests.TestCaseWithMemoryTransport):
1157
def test_is_shared(self):
1158
"""For a shared repository, ('yes', ) is returned."""
1159
backing = self.get_transport()
1160
request = smart.repository.SmartServerRepositoryIsShared(backing)
1161
self.make_repository('.', shared=True)
1162
self.assertEqual(SmartServerResponse(('yes', )),
1163
request.execute('', ))
1165
def test_is_not_shared(self):
1166
"""For a shared repository, ('no', ) is returned."""
1167
backing = self.get_transport()
1168
request = smart.repository.SmartServerRepositoryIsShared(backing)
1169
self.make_repository('.', shared=False)
1170
self.assertEqual(SmartServerResponse(('no', )),
1171
request.execute('', ))
1174
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithMemoryTransport):
1176
def test_lock_write_on_unlocked_repo(self):
1177
backing = self.get_transport()
1178
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1179
repository = self.make_repository('.', format='knit')
1180
response = request.execute('')
1181
nonce = repository.control_files._lock.peek().get('nonce')
1182
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
1183
# The repository is now locked. Verify that with a new repository
1185
new_repo = repository.bzrdir.open_repository()
1186
self.assertRaises(errors.LockContention, new_repo.lock_write)
1188
def test_lock_write_on_locked_repo(self):
1189
backing = self.get_transport()
1190
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1191
repository = self.make_repository('.', format='knit')
1192
repository.lock_write()
1193
repository.leave_lock_in_place()
1195
response = request.execute('')
1197
SmartServerResponse(('LockContention',)), response)
1199
def test_lock_write_on_readonly_transport(self):
1200
backing = self.get_readonly_transport()
1201
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1202
repository = self.make_repository('.', format='knit')
1203
response = request.execute('')
1204
self.assertFalse(response.is_successful())
1205
self.assertEqual('LockFailed', response.args[0])
1208
class TestInsertStreamBase(tests.TestCaseWithMemoryTransport):
1210
def make_empty_byte_stream(self, repo):
1211
byte_stream = smart.repository._stream_to_byte_stream([], repo._format)
1212
return ''.join(byte_stream)
1215
class TestSmartServerRepositoryInsertStream(TestInsertStreamBase):
1217
def test_insert_stream_empty(self):
1218
backing = self.get_transport()
1219
request = smart.repository.SmartServerRepositoryInsertStream(backing)
1220
repository = self.make_repository('.')
1221
response = request.execute('', '')
1222
self.assertEqual(None, response)
1223
response = request.do_chunk(self.make_empty_byte_stream(repository))
1224
self.assertEqual(None, response)
1225
response = request.do_end()
1226
self.assertEqual(SmartServerResponse(('ok', )), response)
1229
class TestSmartServerRepositoryInsertStreamLocked(TestInsertStreamBase):
1231
def test_insert_stream_empty(self):
1232
backing = self.get_transport()
1233
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1235
repository = self.make_repository('.', format='knit')
1236
lock_token = repository.lock_write()
1237
response = request.execute('', '', lock_token)
1238
self.assertEqual(None, response)
1239
response = request.do_chunk(self.make_empty_byte_stream(repository))
1240
self.assertEqual(None, response)
1241
response = request.do_end()
1242
self.assertEqual(SmartServerResponse(('ok', )), response)
1245
def test_insert_stream_with_wrong_lock_token(self):
1246
backing = self.get_transport()
1247
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1249
repository = self.make_repository('.', format='knit')
1250
lock_token = repository.lock_write()
1252
errors.TokenMismatch, request.execute, '', '', 'wrong-token')
1256
class TestSmartServerRepositoryUnlock(tests.TestCaseWithMemoryTransport):
1259
tests.TestCaseWithMemoryTransport.setUp(self)
1261
def test_unlock_on_locked_repo(self):
1262
backing = self.get_transport()
1263
request = smart.repository.SmartServerRepositoryUnlock(backing)
1264
repository = self.make_repository('.', format='knit')
1265
token = repository.lock_write()
1266
repository.leave_lock_in_place()
1268
response = request.execute('', token)
1270
SmartServerResponse(('ok',)), response)
1271
# The repository is now unlocked. Verify that with a new repository
1273
new_repo = repository.bzrdir.open_repository()
1274
new_repo.lock_write()
1277
def test_unlock_on_unlocked_repo(self):
1278
backing = self.get_transport()
1279
request = smart.repository.SmartServerRepositoryUnlock(backing)
1280
repository = self.make_repository('.', format='knit')
1281
response = request.execute('', 'some token')
1283
SmartServerResponse(('TokenMismatch',)), response)
1286
class TestSmartServerIsReadonly(tests.TestCaseWithMemoryTransport):
1288
def test_is_readonly_no(self):
1289
backing = self.get_transport()
1290
request = smart.request.SmartServerIsReadonly(backing)
1291
response = request.execute()
1293
SmartServerResponse(('no',)), response)
1295
def test_is_readonly_yes(self):
1296
backing = self.get_readonly_transport()
1297
request = smart.request.SmartServerIsReadonly(backing)
1298
response = request.execute()
1300
SmartServerResponse(('yes',)), response)
1303
class TestSmartServerRepositorySetMakeWorkingTrees(tests.TestCaseWithMemoryTransport):
1305
def test_set_false(self):
1306
backing = self.get_transport()
1307
repo = self.make_repository('.', shared=True)
1308
repo.set_make_working_trees(True)
1309
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1310
request = request_class(backing)
1311
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1312
request.execute('', 'False'))
1313
repo = repo.bzrdir.open_repository()
1314
self.assertFalse(repo.make_working_trees())
1316
def test_set_true(self):
1317
backing = self.get_transport()
1318
repo = self.make_repository('.', shared=True)
1319
repo.set_make_working_trees(False)
1320
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1321
request = request_class(backing)
1322
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1323
request.execute('', 'True'))
1324
repo = repo.bzrdir.open_repository()
1325
self.assertTrue(repo.make_working_trees())
1328
class TestSmartServerPackRepositoryAutopack(tests.TestCaseWithTransport):
1330
def make_repo_needing_autopacking(self, path='.'):
1331
# Make a repo in need of autopacking.
1332
tree = self.make_branch_and_tree('.', format='pack-0.92')
1333
repo = tree.branch.repository
1334
# monkey-patch the pack collection to disable autopacking
1335
repo._pack_collection._max_pack_count = lambda count: count
1337
tree.commit('commit %s' % x)
1338
self.assertEqual(10, len(repo._pack_collection.names()))
1339
del repo._pack_collection._max_pack_count
1342
def test_autopack_needed(self):
1343
repo = self.make_repo_needing_autopacking()
1345
self.addCleanup(repo.unlock)
1346
backing = self.get_transport()
1347
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1349
response = request.execute('')
1350
self.assertEqual(SmartServerResponse(('ok',)), response)
1351
repo._pack_collection.reload_pack_names()
1352
self.assertEqual(1, len(repo._pack_collection.names()))
1354
def test_autopack_not_needed(self):
1355
tree = self.make_branch_and_tree('.', format='pack-0.92')
1356
repo = tree.branch.repository
1358
self.addCleanup(repo.unlock)
1360
tree.commit('commit %s' % x)
1361
backing = self.get_transport()
1362
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1364
response = request.execute('')
1365
self.assertEqual(SmartServerResponse(('ok',)), response)
1366
repo._pack_collection.reload_pack_names()
1367
self.assertEqual(9, len(repo._pack_collection.names()))
1369
def test_autopack_on_nonpack_format(self):
1370
"""A request to autopack a non-pack repo is a no-op."""
1371
repo = self.make_repository('.', format='knit')
1372
backing = self.get_transport()
1373
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1375
response = request.execute('')
1376
self.assertEqual(SmartServerResponse(('ok',)), response)
1379
class TestHandlers(tests.TestCase):
1380
"""Tests for the request.request_handlers object."""
1382
def test_all_registrations_exist(self):
1383
"""All registered request_handlers can be found."""
1384
# If there's a typo in a register_lazy call, this loop will fail with
1385
# an AttributeError.
1386
for key, item in smart.request.request_handlers.iteritems():
1389
def test_registered_methods(self):
1390
"""Test that known methods are registered to the correct object."""
1392
smart.request.request_handlers.get('Branch.get_config_file'),
1393
smart.branch.SmartServerBranchGetConfigFile)
1395
smart.request.request_handlers.get('Branch.get_parent'),
1396
smart.branch.SmartServerBranchGetParent)
1398
smart.request.request_handlers.get('Branch.get_tags_bytes'),
1399
smart.branch.SmartServerBranchGetTagsBytes)
1401
smart.request.request_handlers.get('Branch.lock_write'),
1402
smart.branch.SmartServerBranchRequestLockWrite)
1404
smart.request.request_handlers.get('Branch.last_revision_info'),
1405
smart.branch.SmartServerBranchRequestLastRevisionInfo)
1407
smart.request.request_handlers.get('Branch.revision_history'),
1408
smart.branch.SmartServerRequestRevisionHistory)
1410
smart.request.request_handlers.get('Branch.set_config_option'),
1411
smart.branch.SmartServerBranchRequestSetConfigOption)
1413
smart.request.request_handlers.get('Branch.set_last_revision'),
1414
smart.branch.SmartServerBranchRequestSetLastRevision)
1416
smart.request.request_handlers.get('Branch.set_last_revision_info'),
1417
smart.branch.SmartServerBranchRequestSetLastRevisionInfo)
1419
smart.request.request_handlers.get('Branch.unlock'),
1420
smart.branch.SmartServerBranchRequestUnlock)
1422
smart.request.request_handlers.get('BzrDir.find_repository'),
1423
smart.bzrdir.SmartServerRequestFindRepositoryV1)
1425
smart.request.request_handlers.get('BzrDir.find_repositoryV2'),
1426
smart.bzrdir.SmartServerRequestFindRepositoryV2)
1428
smart.request.request_handlers.get('BzrDirFormat.initialize'),
1429
smart.bzrdir.SmartServerRequestInitializeBzrDir)
1431
smart.request.request_handlers.get('BzrDir.cloning_metadir'),
1432
smart.bzrdir.SmartServerBzrDirRequestCloningMetaDir)
1434
smart.request.request_handlers.get('BzrDir.open_branch'),
1435
smart.bzrdir.SmartServerRequestOpenBranch)
1437
smart.request.request_handlers.get('BzrDir.open_branchV2'),
1438
smart.bzrdir.SmartServerRequestOpenBranchV2)
1440
smart.request.request_handlers.get('PackRepository.autopack'),
1441
smart.packrepository.SmartServerPackRepositoryAutopack)
1443
smart.request.request_handlers.get('Repository.gather_stats'),
1444
smart.repository.SmartServerRepositoryGatherStats)
1446
smart.request.request_handlers.get('Repository.get_parent_map'),
1447
smart.repository.SmartServerRepositoryGetParentMap)
1449
smart.request.request_handlers.get(
1450
'Repository.get_revision_graph'),
1451
smart.repository.SmartServerRepositoryGetRevisionGraph)
1453
smart.request.request_handlers.get('Repository.get_stream'),
1454
smart.repository.SmartServerRepositoryGetStream)
1456
smart.request.request_handlers.get('Repository.has_revision'),
1457
smart.repository.SmartServerRequestHasRevision)
1459
smart.request.request_handlers.get('Repository.insert_stream'),
1460
smart.repository.SmartServerRepositoryInsertStream)
1462
smart.request.request_handlers.get('Repository.insert_stream_locked'),
1463
smart.repository.SmartServerRepositoryInsertStreamLocked)
1465
smart.request.request_handlers.get('Repository.is_shared'),
1466
smart.repository.SmartServerRepositoryIsShared)
1468
smart.request.request_handlers.get('Repository.lock_write'),
1469
smart.repository.SmartServerRepositoryLockWrite)
1471
smart.request.request_handlers.get('Repository.tarball'),
1472
smart.repository.SmartServerRepositoryTarball)
1474
smart.request.request_handlers.get('Repository.unlock'),
1475
smart.repository.SmartServerRepositoryUnlock)
1477
smart.request.request_handlers.get('Transport.is_readonly'),
1478
smart.request.SmartServerIsReadonly)