1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the smart wire/domain protocol.
19
This module contains tests for the domain-level smart requests and responses,
20
such as the 'Branch.lock_write' request. Many of these use specific disk
21
formats to exercise calls that only make sense for formats with specific
24
Tests for low-level protocol encoding are found in test_smart_transport.
28
from cStringIO import StringIO
41
from bzrlib.branch import Branch, BranchReferenceFormat
42
import bzrlib.smart.branch
43
import bzrlib.smart.bzrdir, bzrlib.smart.bzrdir as smart_dir
44
import bzrlib.smart.packrepository
45
import bzrlib.smart.repository
46
import bzrlib.smart.vfs
47
from bzrlib.smart.request import (
48
FailedSmartServerResponse,
51
SuccessfulSmartServerResponse,
53
from bzrlib.tests import (
56
from bzrlib.transport import chroot, get_transport, local, memory
59
def load_tests(standard_tests, module, loader):
60
"""Multiply tests version and protocol consistency."""
61
# FindRepository tests.
62
bzrdir_mod = bzrlib.smart.bzrdir
65
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
66
("find_repositoryV2", {
67
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
68
("find_repositoryV3", {
69
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV3}),
71
to_adapt, result = split_suite_by_re(standard_tests,
72
"TestSmartServerRequestFindRepository")
73
v2_only, v1_and_2 = split_suite_by_re(to_adapt,
75
tests.multiply_tests(v1_and_2, scenarios, result)
76
# The first scenario is only applicable to v1 protocols, it is deleted
78
tests.multiply_tests(v2_only, scenarios[1:], result)
82
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
85
self.vfs_transport_factory = memory.MemoryServer
86
tests.TestCaseWithTransport.setUp(self)
87
self._chroot_server = None
89
def get_transport(self, relpath=None):
90
if self._chroot_server is None:
91
backing_transport = tests.TestCaseWithTransport.get_transport(self)
92
self._chroot_server = chroot.ChrootServer(backing_transport)
93
self.start_server(self._chroot_server)
94
t = get_transport(self._chroot_server.get_url())
95
if relpath is not None:
100
class TestCaseWithSmartMedium(tests.TestCaseWithMemoryTransport):
103
super(TestCaseWithSmartMedium, self).setUp()
104
# We're allowed to set the transport class here, so that we don't use
105
# the default or a parameterized class, but rather use the
106
# TestCaseWithTransport infrastructure to set up a smart server and
108
self.transport_server = self.make_transport_server
110
def make_transport_server(self):
111
return smart.server.SmartTCPServer_for_testing('-' + self.id())
113
def get_smart_medium(self):
114
"""Get a smart medium to use in tests."""
115
return self.get_transport().get_smart_medium()
118
class TestByteStreamToStream(tests.TestCase):
120
def test_repeated_substreams_same_kind_are_one_stream(self):
121
# Make a stream - an iterable of bytestrings.
122
stream = [('text', [versionedfile.FulltextContentFactory(('k1',), None,
123
None, 'foo')]),('text', [
124
versionedfile.FulltextContentFactory(('k2',), None, None, 'bar')])]
125
fmt = bzrdir.format_registry.get('pack-0.92')().repository_format
126
bytes = smart.repository._stream_to_byte_stream(stream, fmt)
128
# Iterate the resulting iterable; checking that we get only one stream
130
fmt, stream = smart.repository._byte_stream_to_stream(bytes)
131
for kind, substream in stream:
132
streams.append((kind, list(substream)))
133
self.assertLength(1, streams)
134
self.assertLength(2, streams[0][1])
137
class TestSmartServerResponse(tests.TestCase):
139
def test__eq__(self):
140
self.assertEqual(SmartServerResponse(('ok', )),
141
SmartServerResponse(('ok', )))
142
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
143
SmartServerResponse(('ok', ), 'body'))
144
self.assertNotEqual(SmartServerResponse(('ok', )),
145
SmartServerResponse(('notok', )))
146
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
147
SmartServerResponse(('ok', )))
148
self.assertNotEqual(None,
149
SmartServerResponse(('ok', )))
151
def test__str__(self):
152
"""SmartServerResponses can be stringified."""
154
"<SuccessfulSmartServerResponse args=('args',) body='body'>",
155
str(SuccessfulSmartServerResponse(('args',), 'body')))
157
"<FailedSmartServerResponse args=('args',) body='body'>",
158
str(FailedSmartServerResponse(('args',), 'body')))
161
class TestSmartServerRequest(tests.TestCaseWithMemoryTransport):
163
def test_translate_client_path(self):
164
transport = self.get_transport()
165
request = SmartServerRequest(transport, 'foo/')
166
self.assertEqual('./', request.translate_client_path('foo/'))
168
errors.InvalidURLJoin, request.translate_client_path, 'foo/..')
170
errors.PathNotChild, request.translate_client_path, '/')
172
errors.PathNotChild, request.translate_client_path, 'bar/')
173
self.assertEqual('./baz', request.translate_client_path('foo/baz'))
174
e_acute = u'\N{LATIN SMALL LETTER E WITH ACUTE}'.encode('utf-8')
175
self.assertEqual('./' + urlutils.escape(e_acute),
176
request.translate_client_path('foo/' + e_acute))
178
def test_translate_client_path_vfs(self):
179
"""VfsRequests receive escaped paths rather than raw UTF-8."""
180
transport = self.get_transport()
181
request = smart.vfs.VfsRequest(transport, 'foo/')
182
e_acute = u'\N{LATIN SMALL LETTER E WITH ACUTE}'.encode('utf-8')
183
escaped = urlutils.escape('foo/' + e_acute)
184
self.assertEqual('./' + urlutils.escape(e_acute),
185
request.translate_client_path(escaped))
187
def test_transport_from_client_path(self):
188
transport = self.get_transport()
189
request = SmartServerRequest(transport, 'foo/')
192
request.transport_from_client_path('foo/').base)
195
class TestSmartServerBzrDirRequestCloningMetaDir(
196
tests.TestCaseWithMemoryTransport):
197
"""Tests for BzrDir.cloning_metadir."""
199
def test_cloning_metadir(self):
200
"""When there is a bzrdir present, the call succeeds."""
201
backing = self.get_transport()
202
dir = self.make_bzrdir('.')
203
local_result = dir.cloning_metadir()
204
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
205
request = request_class(backing)
206
expected = SuccessfulSmartServerResponse(
207
(local_result.network_name(),
208
local_result.repository_format.network_name(),
209
('branch', local_result.get_branch_format().network_name())))
210
self.assertEqual(expected, request.execute('', 'False'))
212
def test_cloning_metadir_reference(self):
213
"""The request fails when bzrdir contains a branch reference."""
214
backing = self.get_transport()
215
referenced_branch = self.make_branch('referenced')
216
dir = self.make_bzrdir('.')
217
local_result = dir.cloning_metadir()
218
reference = BranchReferenceFormat().initialize(dir, referenced_branch)
219
reference_url = BranchReferenceFormat().get_reference(dir)
220
# The server shouldn't try to follow the branch reference, so it's fine
221
# if the referenced branch isn't reachable.
222
backing.rename('referenced', 'moved')
223
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
224
request = request_class(backing)
225
expected = FailedSmartServerResponse(('BranchReference',))
226
self.assertEqual(expected, request.execute('', 'False'))
229
class TestSmartServerRequestCreateRepository(tests.TestCaseWithMemoryTransport):
230
"""Tests for BzrDir.create_repository."""
232
def test_makes_repository(self):
233
"""When there is a bzrdir present, the call succeeds."""
234
backing = self.get_transport()
235
self.make_bzrdir('.')
236
request_class = bzrlib.smart.bzrdir.SmartServerRequestCreateRepository
237
request = request_class(backing)
238
reference_bzrdir_format = bzrdir.format_registry.get('pack-0.92')()
239
reference_format = reference_bzrdir_format.repository_format
240
network_name = reference_format.network_name()
241
expected = SuccessfulSmartServerResponse(
242
('ok', 'no', 'no', 'no', network_name))
243
self.assertEqual(expected, request.execute('', network_name, 'True'))
246
class TestSmartServerRequestFindRepository(tests.TestCaseWithMemoryTransport):
247
"""Tests for BzrDir.find_repository."""
249
def test_no_repository(self):
250
"""When there is no repository to be found, ('norepository', ) is returned."""
251
backing = self.get_transport()
252
request = self._request_class(backing)
253
self.make_bzrdir('.')
254
self.assertEqual(SmartServerResponse(('norepository', )),
257
def test_nonshared_repository(self):
258
# nonshared repositorys only allow 'find' to return a handle when the
259
# path the repository is being searched on is the same as that that
260
# the repository is at.
261
backing = self.get_transport()
262
request = self._request_class(backing)
263
result = self._make_repository_and_result()
264
self.assertEqual(result, request.execute(''))
265
self.make_bzrdir('subdir')
266
self.assertEqual(SmartServerResponse(('norepository', )),
267
request.execute('subdir'))
269
def _make_repository_and_result(self, shared=False, format=None):
270
"""Convenience function to setup a repository.
272
:result: The SmartServerResponse to expect when opening it.
274
repo = self.make_repository('.', shared=shared, format=format)
275
if repo.supports_rich_root():
279
if repo._format.supports_tree_reference:
283
if repo._format.supports_external_lookups:
287
if (smart.bzrdir.SmartServerRequestFindRepositoryV3 ==
288
self._request_class):
289
return SuccessfulSmartServerResponse(
290
('ok', '', rich_root, subtrees, external,
291
repo._format.network_name()))
292
elif (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
293
self._request_class):
294
# All tests so far are on formats, and for non-external
296
return SuccessfulSmartServerResponse(
297
('ok', '', rich_root, subtrees, external))
299
return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
301
def test_shared_repository(self):
302
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
303
backing = self.get_transport()
304
request = self._request_class(backing)
305
result = self._make_repository_and_result(shared=True)
306
self.assertEqual(result, request.execute(''))
307
self.make_bzrdir('subdir')
308
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
309
self.assertEqual(result2,
310
request.execute('subdir'))
311
self.make_bzrdir('subdir/deeper')
312
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
313
self.assertEqual(result3,
314
request.execute('subdir/deeper'))
316
def test_rich_root_and_subtree_encoding(self):
317
"""Test for the format attributes for rich root and subtree support."""
318
backing = self.get_transport()
319
request = self._request_class(backing)
320
result = self._make_repository_and_result(format='dirstate-with-subtree')
321
# check the test will be valid
322
self.assertEqual('yes', result.args[2])
323
self.assertEqual('yes', result.args[3])
324
self.assertEqual(result, request.execute(''))
326
def test_supports_external_lookups_no_v2(self):
327
"""Test for the supports_external_lookups attribute."""
328
backing = self.get_transport()
329
request = self._request_class(backing)
330
result = self._make_repository_and_result(format='dirstate-with-subtree')
331
# check the test will be valid
332
self.assertEqual('no', result.args[4])
333
self.assertEqual(result, request.execute(''))
336
class TestSmartServerBzrDirRequestGetConfigFile(
337
tests.TestCaseWithMemoryTransport):
338
"""Tests for BzrDir.get_config_file."""
340
def test_present(self):
341
backing = self.get_transport()
342
dir = self.make_bzrdir('.')
343
dir.get_config().set_default_stack_on("/")
344
local_result = dir._get_config()._get_config_file().read()
345
request_class = smart_dir.SmartServerBzrDirRequestConfigFile
346
request = request_class(backing)
347
expected = SuccessfulSmartServerResponse((), local_result)
348
self.assertEqual(expected, request.execute(''))
350
def test_missing(self):
351
backing = self.get_transport()
352
dir = self.make_bzrdir('.')
353
request_class = smart_dir.SmartServerBzrDirRequestConfigFile
354
request = request_class(backing)
355
expected = SuccessfulSmartServerResponse((), '')
356
self.assertEqual(expected, request.execute(''))
359
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
361
def test_empty_dir(self):
362
"""Initializing an empty dir should succeed and do it."""
363
backing = self.get_transport()
364
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
365
self.assertEqual(SmartServerResponse(('ok', )),
367
made_dir = bzrdir.BzrDir.open_from_transport(backing)
368
# no branch, tree or repository is expected with the current
370
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
371
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
372
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
374
def test_missing_dir(self):
375
"""Initializing a missing directory should fail like the bzrdir api."""
376
backing = self.get_transport()
377
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
378
self.assertRaises(errors.NoSuchFile,
379
request.execute, 'subdir')
381
def test_initialized_dir(self):
382
"""Initializing an extant bzrdir should fail like the bzrdir api."""
383
backing = self.get_transport()
384
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
385
self.make_bzrdir('subdir')
386
self.assertRaises(errors.FileExists,
387
request.execute, 'subdir')
390
class TestSmartServerRequestBzrDirInitializeEx(tests.TestCaseWithMemoryTransport):
391
"""Basic tests for BzrDir.initialize_ex_1.16 in the smart server.
393
The main unit tests in test_bzrdir exercise the API comprehensively.
396
def test_empty_dir(self):
397
"""Initializing an empty dir should succeed and do it."""
398
backing = self.get_transport()
399
name = self.make_bzrdir('reference')._format.network_name()
400
request = smart.bzrdir.SmartServerRequestBzrDirInitializeEx(backing)
401
self.assertEqual(SmartServerResponse(('', '', '', '', '', '', name,
402
'False', '', '', '')),
403
request.execute(name, '', 'True', 'False', 'False', '', '', '', '',
405
made_dir = bzrdir.BzrDir.open_from_transport(backing)
406
# no branch, tree or repository is expected with the current
408
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
409
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
410
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
412
def test_missing_dir(self):
413
"""Initializing a missing directory should fail like the bzrdir api."""
414
backing = self.get_transport()
415
name = self.make_bzrdir('reference')._format.network_name()
416
request = smart.bzrdir.SmartServerRequestBzrDirInitializeEx(backing)
417
self.assertRaises(errors.NoSuchFile, request.execute, name,
418
'subdir/dir', 'False', 'False', 'False', '', '', '', '', 'False')
420
def test_initialized_dir(self):
421
"""Initializing an extant directory should fail like the bzrdir api."""
422
backing = self.get_transport()
423
name = self.make_bzrdir('reference')._format.network_name()
424
request = smart.bzrdir.SmartServerRequestBzrDirInitializeEx(backing)
425
self.make_bzrdir('subdir')
426
self.assertRaises(errors.FileExists, request.execute, name, 'subdir',
427
'False', 'False', 'False', '', '', '', '', 'False')
430
class TestSmartServerRequestOpenBzrDir(tests.TestCaseWithMemoryTransport):
432
def test_no_directory(self):
433
backing = self.get_transport()
434
request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing)
435
self.assertEqual(SmartServerResponse(('no', )),
436
request.execute('does-not-exist'))
438
def test_empty_directory(self):
439
backing = self.get_transport()
440
backing.mkdir('empty')
441
request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing)
442
self.assertEqual(SmartServerResponse(('no', )),
443
request.execute('empty'))
445
def test_outside_root_client_path(self):
446
backing = self.get_transport()
447
request = smart.bzrdir.SmartServerRequestOpenBzrDir(backing,
448
root_client_path='root')
449
self.assertEqual(SmartServerResponse(('no', )),
450
request.execute('not-root'))
453
class TestSmartServerRequestOpenBzrDir_2_1(tests.TestCaseWithMemoryTransport):
455
def test_no_directory(self):
456
backing = self.get_transport()
457
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
458
self.assertEqual(SmartServerResponse(('no', )),
459
request.execute('does-not-exist'))
461
def test_empty_directory(self):
462
backing = self.get_transport()
463
backing.mkdir('empty')
464
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
465
self.assertEqual(SmartServerResponse(('no', )),
466
request.execute('empty'))
468
def test_present_without_workingtree(self):
469
backing = self.get_transport()
470
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
471
self.make_bzrdir('.')
472
self.assertEqual(SmartServerResponse(('yes', 'no')),
475
def test_outside_root_client_path(self):
476
backing = self.get_transport()
477
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing,
478
root_client_path='root')
479
self.assertEqual(SmartServerResponse(('no',)),
480
request.execute('not-root'))
483
class TestSmartServerRequestOpenBzrDir_2_1_disk(TestCaseWithChrootedTransport):
485
def test_present_with_workingtree(self):
486
self.vfs_transport_factory = local.LocalURLServer
487
backing = self.get_transport()
488
request = smart.bzrdir.SmartServerRequestOpenBzrDir_2_1(backing)
489
bd = self.make_bzrdir('.')
490
bd.create_repository()
492
bd.create_workingtree()
493
self.assertEqual(SmartServerResponse(('yes', 'yes')),
497
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
499
def test_no_branch(self):
500
"""When there is no branch, ('nobranch', ) is returned."""
501
backing = self.get_transport()
502
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
503
self.make_bzrdir('.')
504
self.assertEqual(SmartServerResponse(('nobranch', )),
507
def test_branch(self):
508
"""When there is a branch, 'ok' is returned."""
509
backing = self.get_transport()
510
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
511
self.make_branch('.')
512
self.assertEqual(SmartServerResponse(('ok', '')),
515
def test_branch_reference(self):
516
"""When there is a branch reference, the reference URL is returned."""
517
self.vfs_transport_factory = local.LocalURLServer
518
backing = self.get_transport()
519
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
520
branch = self.make_branch('branch')
521
checkout = branch.create_checkout('reference',lightweight=True)
522
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
523
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
524
self.assertEqual(SmartServerResponse(('ok', reference_url)),
525
request.execute('reference'))
528
class TestSmartServerRequestOpenBranchV2(TestCaseWithChrootedTransport):
530
def test_no_branch(self):
531
"""When there is no branch, ('nobranch', ) is returned."""
532
backing = self.get_transport()
533
self.make_bzrdir('.')
534
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
535
self.assertEqual(SmartServerResponse(('nobranch', )),
538
def test_branch(self):
539
"""When there is a branch, 'ok' is returned."""
540
backing = self.get_transport()
541
expected = self.make_branch('.')._format.network_name()
542
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
543
self.assertEqual(SuccessfulSmartServerResponse(('branch', expected)),
546
def test_branch_reference(self):
547
"""When there is a branch reference, the reference URL is returned."""
548
self.vfs_transport_factory = local.LocalURLServer
549
backing = self.get_transport()
550
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
551
branch = self.make_branch('branch')
552
checkout = branch.create_checkout('reference',lightweight=True)
553
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
554
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
555
self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
556
request.execute('reference'))
558
def test_stacked_branch(self):
559
"""Opening a stacked branch does not open the stacked-on branch."""
560
trunk = self.make_branch('trunk')
561
feature = self.make_branch('feature')
562
feature.set_stacked_on_url(trunk.base)
564
Branch.hooks.install_named_hook('open', opened_branches.append, None)
565
backing = self.get_transport()
566
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
569
response = request.execute('feature')
571
request.teardown_jail()
572
expected_format = feature._format.network_name()
574
SuccessfulSmartServerResponse(('branch', expected_format)),
576
self.assertLength(1, opened_branches)
579
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
581
def test_empty(self):
582
"""For an empty branch, the body is empty."""
583
backing = self.get_transport()
584
request = smart.branch.SmartServerRequestRevisionHistory(backing)
585
self.make_branch('.')
586
self.assertEqual(SmartServerResponse(('ok', ), ''),
589
def test_not_empty(self):
590
"""For a non-empty branch, the body is empty."""
591
backing = self.get_transport()
592
request = smart.branch.SmartServerRequestRevisionHistory(backing)
593
tree = self.make_branch_and_memory_tree('.')
596
r1 = tree.commit('1st commit')
597
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
600
SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
604
class TestSmartServerBranchRequest(tests.TestCaseWithMemoryTransport):
606
def test_no_branch(self):
607
"""When there is a bzrdir and no branch, NotBranchError is raised."""
608
backing = self.get_transport()
609
request = smart.branch.SmartServerBranchRequest(backing)
610
self.make_bzrdir('.')
611
self.assertRaises(errors.NotBranchError,
614
def test_branch_reference(self):
615
"""When there is a branch reference, NotBranchError is raised."""
616
backing = self.get_transport()
617
request = smart.branch.SmartServerBranchRequest(backing)
618
branch = self.make_branch('branch')
619
checkout = branch.create_checkout('reference',lightweight=True)
620
self.assertRaises(errors.NotBranchError,
621
request.execute, 'checkout')
624
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithMemoryTransport):
626
def test_empty(self):
627
"""For an empty branch, the result is ('ok', '0', 'null:')."""
628
backing = self.get_transport()
629
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
630
self.make_branch('.')
631
self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
634
def test_not_empty(self):
635
"""For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
636
backing = self.get_transport()
637
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
638
tree = self.make_branch_and_memory_tree('.')
641
rev_id_utf8 = u'\xc8'.encode('utf-8')
642
r1 = tree.commit('1st commit')
643
r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
646
SmartServerResponse(('ok', '2', rev_id_utf8)),
650
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithMemoryTransport):
652
def test_default(self):
653
"""With no file, we get empty content."""
654
backing = self.get_transport()
655
request = smart.branch.SmartServerBranchGetConfigFile(backing)
656
branch = self.make_branch('.')
657
# there should be no file by default
659
self.assertEqual(SmartServerResponse(('ok', ), content),
662
def test_with_content(self):
663
# SmartServerBranchGetConfigFile should return the content from
664
# branch.control_files.get('branch.conf') for now - in the future it may
665
# perform more complex processing.
666
backing = self.get_transport()
667
request = smart.branch.SmartServerBranchGetConfigFile(backing)
668
branch = self.make_branch('.')
669
branch._transport.put_bytes('branch.conf', 'foo bar baz')
670
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
674
class TestLockedBranch(tests.TestCaseWithMemoryTransport):
676
def get_lock_tokens(self, branch):
677
branch_token = branch.lock_write()
678
repo_token = branch.repository.lock_write()
679
branch.repository.unlock()
680
return branch_token, repo_token
683
class TestSmartServerBranchRequestSetConfigOption(TestLockedBranch):
685
def test_value_name(self):
686
branch = self.make_branch('.')
687
request = smart.branch.SmartServerBranchRequestSetConfigOption(
688
branch.bzrdir.root_transport)
689
branch_token, repo_token = self.get_lock_tokens(branch)
690
config = branch._get_config()
691
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
693
self.assertEqual(SuccessfulSmartServerResponse(()), result)
694
self.assertEqual('bar', config.get_option('foo'))
698
def test_value_name_section(self):
699
branch = self.make_branch('.')
700
request = smart.branch.SmartServerBranchRequestSetConfigOption(
701
branch.bzrdir.root_transport)
702
branch_token, repo_token = self.get_lock_tokens(branch)
703
config = branch._get_config()
704
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
706
self.assertEqual(SuccessfulSmartServerResponse(()), result)
707
self.assertEqual('bar', config.get_option('foo', 'gam'))
712
class TestSmartServerBranchRequestSetTagsBytes(TestLockedBranch):
713
# Only called when the branch format and tags match [yay factory
714
# methods] so only need to test straight forward cases.
716
def test_set_bytes(self):
717
base_branch = self.make_branch('base')
718
tag_bytes = base_branch._get_tags_bytes()
719
# get_lock_tokens takes out a lock.
720
branch_token, repo_token = self.get_lock_tokens(base_branch)
721
request = smart.branch.SmartServerBranchSetTagsBytes(
722
self.get_transport())
723
response = request.execute('base', branch_token, repo_token)
724
self.assertEqual(None, response)
725
response = request.do_chunk(tag_bytes)
726
self.assertEqual(None, response)
727
response = request.do_end()
729
SuccessfulSmartServerResponse(()), response)
732
def test_lock_failed(self):
733
base_branch = self.make_branch('base')
734
base_branch.lock_write()
735
tag_bytes = base_branch._get_tags_bytes()
736
request = smart.branch.SmartServerBranchSetTagsBytes(
737
self.get_transport())
738
self.assertRaises(errors.TokenMismatch, request.execute,
739
'base', 'wrong token', 'wrong token')
740
# The request handler will keep processing the message parts, so even
741
# if the request fails immediately do_chunk and do_end are still
743
request.do_chunk(tag_bytes)
749
class SetLastRevisionTestBase(TestLockedBranch):
750
"""Base test case for verbs that implement set_last_revision."""
753
tests.TestCaseWithMemoryTransport.setUp(self)
754
backing_transport = self.get_transport()
755
self.request = self.request_class(backing_transport)
756
self.tree = self.make_branch_and_memory_tree('.')
758
def lock_branch(self):
759
return self.get_lock_tokens(self.tree.branch)
761
def unlock_branch(self):
762
self.tree.branch.unlock()
764
def set_last_revision(self, revision_id, revno):
765
branch_token, repo_token = self.lock_branch()
766
response = self._set_last_revision(
767
revision_id, revno, branch_token, repo_token)
771
def assertRequestSucceeds(self, revision_id, revno):
772
response = self.set_last_revision(revision_id, revno)
773
self.assertEqual(SuccessfulSmartServerResponse(('ok',)), response)
776
class TestSetLastRevisionVerbMixin(object):
777
"""Mixin test case for verbs that implement set_last_revision."""
779
def test_set_null_to_null(self):
780
"""An empty branch can have its last revision set to 'null:'."""
781
self.assertRequestSucceeds('null:', 0)
783
def test_NoSuchRevision(self):
784
"""If the revision_id is not present, the verb returns NoSuchRevision.
786
revision_id = 'non-existent revision'
788
FailedSmartServerResponse(('NoSuchRevision', revision_id)),
789
self.set_last_revision(revision_id, 1))
791
def make_tree_with_two_commits(self):
792
self.tree.lock_write()
794
rev_id_utf8 = u'\xc8'.encode('utf-8')
795
r1 = self.tree.commit('1st commit', rev_id=rev_id_utf8)
796
r2 = self.tree.commit('2nd commit', rev_id='rev-2')
799
def test_branch_last_revision_info_is_updated(self):
800
"""A branch's tip can be set to a revision that is present in its
803
# Make a branch with an empty revision history, but two revisions in
805
self.make_tree_with_two_commits()
806
rev_id_utf8 = u'\xc8'.encode('utf-8')
807
self.tree.branch.set_revision_history([])
809
(0, 'null:'), self.tree.branch.last_revision_info())
810
# We can update the branch to a revision that is present in the
812
self.assertRequestSucceeds(rev_id_utf8, 1)
814
(1, rev_id_utf8), self.tree.branch.last_revision_info())
816
def test_branch_last_revision_info_rewind(self):
817
"""A branch's tip can be set to a revision that is an ancestor of the
820
self.make_tree_with_two_commits()
821
rev_id_utf8 = u'\xc8'.encode('utf-8')
823
(2, 'rev-2'), self.tree.branch.last_revision_info())
824
self.assertRequestSucceeds(rev_id_utf8, 1)
826
(1, rev_id_utf8), self.tree.branch.last_revision_info())
828
def test_TipChangeRejected(self):
829
"""If a pre_change_branch_tip hook raises TipChangeRejected, the verb
830
returns TipChangeRejected.
832
rejection_message = u'rejection message\N{INTERROBANG}'
833
def hook_that_rejects(params):
834
raise errors.TipChangeRejected(rejection_message)
835
Branch.hooks.install_named_hook(
836
'pre_change_branch_tip', hook_that_rejects, None)
838
FailedSmartServerResponse(
839
('TipChangeRejected', rejection_message.encode('utf-8'))),
840
self.set_last_revision('null:', 0))
843
class TestSmartServerBranchRequestSetLastRevision(
844
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
845
"""Tests for Branch.set_last_revision verb."""
847
request_class = smart.branch.SmartServerBranchRequestSetLastRevision
849
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
850
return self.request.execute(
851
'', branch_token, repo_token, revision_id)
854
class TestSmartServerBranchRequestSetLastRevisionInfo(
855
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
856
"""Tests for Branch.set_last_revision_info verb."""
858
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionInfo
860
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
861
return self.request.execute(
862
'', branch_token, repo_token, revno, revision_id)
864
def test_NoSuchRevision(self):
865
"""Branch.set_last_revision_info does not have to return
866
NoSuchRevision if the revision_id is absent.
868
raise tests.TestNotApplicable()
871
class TestSmartServerBranchRequestSetLastRevisionEx(
872
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
873
"""Tests for Branch.set_last_revision_ex verb."""
875
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionEx
877
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
878
return self.request.execute(
879
'', branch_token, repo_token, revision_id, 0, 0)
881
def assertRequestSucceeds(self, revision_id, revno):
882
response = self.set_last_revision(revision_id, revno)
884
SuccessfulSmartServerResponse(('ok', revno, revision_id)),
887
def test_branch_last_revision_info_rewind(self):
888
"""A branch's tip can be set to a revision that is an ancestor of the
889
current tip, but only if allow_overwrite_descendant is passed.
891
self.make_tree_with_two_commits()
892
rev_id_utf8 = u'\xc8'.encode('utf-8')
894
(2, 'rev-2'), self.tree.branch.last_revision_info())
895
# If allow_overwrite_descendant flag is 0, then trying to set the tip
896
# to an older revision ID has no effect.
897
branch_token, repo_token = self.lock_branch()
898
response = self.request.execute(
899
'', branch_token, repo_token, rev_id_utf8, 0, 0)
901
SuccessfulSmartServerResponse(('ok', 2, 'rev-2')),
904
(2, 'rev-2'), self.tree.branch.last_revision_info())
906
# If allow_overwrite_descendant flag is 1, then setting the tip to an
908
response = self.request.execute(
909
'', branch_token, repo_token, rev_id_utf8, 0, 1)
911
SuccessfulSmartServerResponse(('ok', 1, rev_id_utf8)),
915
(1, rev_id_utf8), self.tree.branch.last_revision_info())
917
def make_branch_with_divergent_history(self):
918
"""Make a branch with divergent history in its repo.
920
The branch's tip will be 'child-2', and the repo will also contain
921
'child-1', which diverges from a common base revision.
923
self.tree.lock_write()
925
r1 = self.tree.commit('1st commit')
926
revno_1, revid_1 = self.tree.branch.last_revision_info()
927
r2 = self.tree.commit('2nd commit', rev_id='child-1')
928
# Undo the second commit
929
self.tree.branch.set_last_revision_info(revno_1, revid_1)
930
self.tree.set_parent_ids([revid_1])
931
# Make a new second commit, child-2. child-2 has diverged from
933
new_r2 = self.tree.commit('2nd commit', rev_id='child-2')
936
def test_not_allow_diverged(self):
937
"""If allow_diverged is not passed, then setting a divergent history
938
returns a Diverged error.
940
self.make_branch_with_divergent_history()
942
FailedSmartServerResponse(('Diverged',)),
943
self.set_last_revision('child-1', 2))
944
# The branch tip was not changed.
945
self.assertEqual('child-2', self.tree.branch.last_revision())
947
def test_allow_diverged(self):
948
"""If allow_diverged is passed, then setting a divergent history
951
self.make_branch_with_divergent_history()
952
branch_token, repo_token = self.lock_branch()
953
response = self.request.execute(
954
'', branch_token, repo_token, 'child-1', 1, 0)
956
SuccessfulSmartServerResponse(('ok', 2, 'child-1')),
959
# The branch tip was changed.
960
self.assertEqual('child-1', self.tree.branch.last_revision())
963
class TestSmartServerBranchRequestGetParent(tests.TestCaseWithMemoryTransport):
965
def test_get_parent_none(self):
966
base_branch = self.make_branch('base')
967
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
968
response = request.execute('base')
970
SuccessfulSmartServerResponse(('',)), response)
972
def test_get_parent_something(self):
973
base_branch = self.make_branch('base')
974
base_branch.set_parent(self.get_url('foo'))
975
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
976
response = request.execute('base')
978
SuccessfulSmartServerResponse(("../foo",)),
982
class TestSmartServerBranchRequestSetParent(tests.TestCaseWithMemoryTransport):
984
def test_set_parent_none(self):
985
branch = self.make_branch('base', format="1.9")
987
branch._set_parent_location('foo')
989
request = smart.branch.SmartServerBranchRequestSetParentLocation(
990
self.get_transport())
991
branch_token = branch.lock_write()
992
repo_token = branch.repository.lock_write()
994
response = request.execute('base', branch_token, repo_token, '')
996
branch.repository.unlock()
998
self.assertEqual(SuccessfulSmartServerResponse(()), response)
999
self.assertEqual(None, branch.get_parent())
1001
def test_set_parent_something(self):
1002
branch = self.make_branch('base', format="1.9")
1003
request = smart.branch.SmartServerBranchRequestSetParentLocation(
1004
self.get_transport())
1005
branch_token = branch.lock_write()
1006
repo_token = branch.repository.lock_write()
1008
response = request.execute('base', branch_token, repo_token,
1011
branch.repository.unlock()
1013
self.assertEqual(SuccessfulSmartServerResponse(()), response)
1014
self.assertEqual('http://bar/', branch.get_parent())
1017
class TestSmartServerBranchRequestGetTagsBytes(tests.TestCaseWithMemoryTransport):
1018
# Only called when the branch format and tags match [yay factory
1019
# methods] so only need to test straight forward cases.
1021
def test_get_bytes(self):
1022
base_branch = self.make_branch('base')
1023
request = smart.branch.SmartServerBranchGetTagsBytes(
1024
self.get_transport())
1025
response = request.execute('base')
1027
SuccessfulSmartServerResponse(('',)), response)
1030
class TestSmartServerBranchRequestGetStackedOnURL(tests.TestCaseWithMemoryTransport):
1032
def test_get_stacked_on_url(self):
1033
base_branch = self.make_branch('base', format='1.6')
1034
stacked_branch = self.make_branch('stacked', format='1.6')
1035
# typically should be relative
1036
stacked_branch.set_stacked_on_url('../base')
1037
request = smart.branch.SmartServerBranchRequestGetStackedOnURL(
1038
self.get_transport())
1039
response = request.execute('stacked')
1041
SmartServerResponse(('ok', '../base')),
1045
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
1048
tests.TestCaseWithMemoryTransport.setUp(self)
1050
def test_lock_write_on_unlocked_branch(self):
1051
backing = self.get_transport()
1052
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
1053
branch = self.make_branch('.', format='knit')
1054
repository = branch.repository
1055
response = request.execute('')
1056
branch_nonce = branch.control_files._lock.peek().get('nonce')
1057
repository_nonce = repository.control_files._lock.peek().get('nonce')
1059
SmartServerResponse(('ok', branch_nonce, repository_nonce)),
1061
# The branch (and associated repository) is now locked. Verify that
1062
# with a new branch object.
1063
new_branch = repository.bzrdir.open_branch()
1064
self.assertRaises(errors.LockContention, new_branch.lock_write)
1066
request = smart.branch.SmartServerBranchRequestUnlock(backing)
1067
response = request.execute('', branch_nonce, repository_nonce)
1069
def test_lock_write_on_locked_branch(self):
1070
backing = self.get_transport()
1071
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
1072
branch = self.make_branch('.')
1073
branch_token = branch.lock_write()
1074
branch.leave_lock_in_place()
1076
response = request.execute('')
1078
SmartServerResponse(('LockContention',)), response)
1080
branch.lock_write(branch_token)
1081
branch.dont_leave_lock_in_place()
1084
def test_lock_write_with_tokens_on_locked_branch(self):
1085
backing = self.get_transport()
1086
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
1087
branch = self.make_branch('.', format='knit')
1088
branch_token = branch.lock_write()
1089
repo_token = branch.repository.lock_write()
1090
branch.repository.unlock()
1091
branch.leave_lock_in_place()
1092
branch.repository.leave_lock_in_place()
1094
response = request.execute('',
1095
branch_token, repo_token)
1097
SmartServerResponse(('ok', branch_token, repo_token)), response)
1099
branch.repository.lock_write(repo_token)
1100
branch.repository.dont_leave_lock_in_place()
1101
branch.repository.unlock()
1102
branch.lock_write(branch_token)
1103
branch.dont_leave_lock_in_place()
1106
def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
1107
backing = self.get_transport()
1108
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
1109
branch = self.make_branch('.', format='knit')
1110
branch_token = branch.lock_write()
1111
repo_token = branch.repository.lock_write()
1112
branch.repository.unlock()
1113
branch.leave_lock_in_place()
1114
branch.repository.leave_lock_in_place()
1116
response = request.execute('',
1117
branch_token+'xxx', repo_token)
1119
SmartServerResponse(('TokenMismatch',)), response)
1121
branch.repository.lock_write(repo_token)
1122
branch.repository.dont_leave_lock_in_place()
1123
branch.repository.unlock()
1124
branch.lock_write(branch_token)
1125
branch.dont_leave_lock_in_place()
1128
def test_lock_write_on_locked_repo(self):
1129
backing = self.get_transport()
1130
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
1131
branch = self.make_branch('.', format='knit')
1132
repo = branch.repository
1133
repo_token = repo.lock_write()
1134
repo.leave_lock_in_place()
1136
response = request.execute('')
1138
SmartServerResponse(('LockContention',)), response)
1140
repo.lock_write(repo_token)
1141
repo.dont_leave_lock_in_place()
1144
def test_lock_write_on_readonly_transport(self):
1145
backing = self.get_readonly_transport()
1146
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
1147
branch = self.make_branch('.')
1148
root = self.get_transport().clone('/')
1149
path = urlutils.relative_url(root.base, self.get_transport().base)
1150
response = request.execute(path)
1151
error_name, lock_str, why_str = response.args
1152
self.assertFalse(response.is_successful())
1153
self.assertEqual('LockFailed', error_name)
1156
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithMemoryTransport):
1159
tests.TestCaseWithMemoryTransport.setUp(self)
1161
def test_unlock_on_locked_branch_and_repo(self):
1162
backing = self.get_transport()
1163
request = smart.branch.SmartServerBranchRequestUnlock(backing)
1164
branch = self.make_branch('.', format='knit')
1166
branch_token = branch.lock_write()
1167
repo_token = branch.repository.lock_write()
1168
branch.repository.unlock()
1169
# Unlock the branch (and repo) object, leaving the physical locks
1171
branch.leave_lock_in_place()
1172
branch.repository.leave_lock_in_place()
1174
response = request.execute('',
1175
branch_token, repo_token)
1177
SmartServerResponse(('ok',)), response)
1178
# The branch is now unlocked. Verify that with a new branch
1180
new_branch = branch.bzrdir.open_branch()
1181
new_branch.lock_write()
1184
def test_unlock_on_unlocked_branch_unlocked_repo(self):
1185
backing = self.get_transport()
1186
request = smart.branch.SmartServerBranchRequestUnlock(backing)
1187
branch = self.make_branch('.', format='knit')
1188
response = request.execute(
1189
'', 'branch token', 'repo token')
1191
SmartServerResponse(('TokenMismatch',)), response)
1193
def test_unlock_on_unlocked_branch_locked_repo(self):
1194
backing = self.get_transport()
1195
request = smart.branch.SmartServerBranchRequestUnlock(backing)
1196
branch = self.make_branch('.', format='knit')
1197
# Lock the repository.
1198
repo_token = branch.repository.lock_write()
1199
branch.repository.leave_lock_in_place()
1200
branch.repository.unlock()
1201
# Issue branch lock_write request on the unlocked branch (with locked
1203
response = request.execute(
1204
'', 'branch token', repo_token)
1206
SmartServerResponse(('TokenMismatch',)), response)
1208
branch.repository.lock_write(repo_token)
1209
branch.repository.dont_leave_lock_in_place()
1210
branch.repository.unlock()
1213
class TestSmartServerRepositoryRequest(tests.TestCaseWithMemoryTransport):
1215
def test_no_repository(self):
1216
"""Raise NoRepositoryPresent when there is a bzrdir and no repo."""
1217
# we test this using a shared repository above the named path,
1218
# thus checking the right search logic is used - that is, that
1219
# its the exact path being looked at and the server is not
1221
backing = self.get_transport()
1222
request = smart.repository.SmartServerRepositoryRequest(backing)
1223
self.make_repository('.', shared=True)
1224
self.make_bzrdir('subdir')
1225
self.assertRaises(errors.NoRepositoryPresent,
1226
request.execute, 'subdir')
1229
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithMemoryTransport):
1231
def test_trivial_bzipped(self):
1232
# This tests that the wire encoding is actually bzipped
1233
backing = self.get_transport()
1234
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
1235
tree = self.make_branch_and_memory_tree('.')
1237
self.assertEqual(None,
1238
request.execute('', 'missing-id'))
1239
# Note that it returns a body that is bzipped.
1241
SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
1242
request.do_body('\n\n0\n'))
1244
def test_trivial_include_missing(self):
1245
backing = self.get_transport()
1246
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
1247
tree = self.make_branch_and_memory_tree('.')
1249
self.assertEqual(None,
1250
request.execute('', 'missing-id', 'include-missing:'))
1252
SuccessfulSmartServerResponse(('ok', ),
1253
bz2.compress('missing:missing-id')),
1254
request.do_body('\n\n0\n'))
1257
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithMemoryTransport):
1259
def test_none_argument(self):
1260
backing = self.get_transport()
1261
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1262
tree = self.make_branch_and_memory_tree('.')
1265
r1 = tree.commit('1st commit')
1266
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1269
# the lines of revision_id->revision_parent_list has no guaranteed
1270
# order coming out of a dict, so sort both our test and response
1271
lines = sorted([' '.join([r2, r1]), r1])
1272
response = request.execute('', '')
1273
response.body = '\n'.join(sorted(response.body.split('\n')))
1276
SmartServerResponse(('ok', ), '\n'.join(lines)), response)
1278
def test_specific_revision_argument(self):
1279
backing = self.get_transport()
1280
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1281
tree = self.make_branch_and_memory_tree('.')
1284
rev_id_utf8 = u'\xc9'.encode('utf-8')
1285
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
1286
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1289
self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
1290
request.execute('', rev_id_utf8))
1292
def test_no_such_revision(self):
1293
backing = self.get_transport()
1294
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1295
tree = self.make_branch_and_memory_tree('.')
1298
r1 = tree.commit('1st commit')
1301
# Note that it still returns body (of zero bytes).
1303
SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
1304
request.execute('', 'missingrevision'))
1307
class TestSmartServerRepositoryGetRevIdForRevno(tests.TestCaseWithMemoryTransport):
1309
def test_revno_found(self):
1310
backing = self.get_transport()
1311
request = smart.repository.SmartServerRepositoryGetRevIdForRevno(backing)
1312
tree = self.make_branch_and_memory_tree('.')
1315
rev1_id_utf8 = u'\xc8'.encode('utf-8')
1316
rev2_id_utf8 = u'\xc9'.encode('utf-8')
1317
tree.commit('1st commit', rev_id=rev1_id_utf8)
1318
tree.commit('2nd commit', rev_id=rev2_id_utf8)
1321
self.assertEqual(SmartServerResponse(('ok', rev1_id_utf8)),
1322
request.execute('', 1, (2, rev2_id_utf8)))
1324
def test_known_revid_missing(self):
1325
backing = self.get_transport()
1326
request = smart.repository.SmartServerRepositoryGetRevIdForRevno(backing)
1327
repo = self.make_repository('.')
1329
FailedSmartServerResponse(('nosuchrevision', 'ghost')),
1330
request.execute('', 1, (2, 'ghost')))
1332
def test_history_incomplete(self):
1333
backing = self.get_transport()
1334
request = smart.repository.SmartServerRepositoryGetRevIdForRevno(backing)
1335
parent = self.make_branch_and_memory_tree('parent', format='1.9')
1337
parent.add([''], ['TREE_ROOT'])
1338
r1 = parent.commit(message='first commit')
1339
r2 = parent.commit(message='second commit')
1341
local = self.make_branch_and_memory_tree('local', format='1.9')
1342
local.branch.pull(parent.branch)
1343
local.set_parent_ids([r2])
1344
r3 = local.commit(message='local commit')
1345
local.branch.create_clone_on_transport(
1346
self.get_transport('stacked'), stacked_on=self.get_url('parent'))
1348
SmartServerResponse(('history-incomplete', 2, r2)),
1349
request.execute('stacked', 1, (3, r3)))
1352
class TestSmartServerRepositoryGetStream(tests.TestCaseWithMemoryTransport):
1354
def make_two_commit_repo(self):
1355
tree = self.make_branch_and_memory_tree('.')
1358
r1 = tree.commit('1st commit')
1359
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1361
repo = tree.branch.repository
1364
def test_ancestry_of(self):
1365
"""The search argument may be a 'ancestry-of' some heads'."""
1366
backing = self.get_transport()
1367
request = smart.repository.SmartServerRepositoryGetStream(backing)
1368
repo, r1, r2 = self.make_two_commit_repo()
1369
fetch_spec = ['ancestry-of', r2]
1370
lines = '\n'.join(fetch_spec)
1371
request.execute('', repo._format.network_name())
1372
response = request.do_body(lines)
1373
self.assertEqual(('ok',), response.args)
1374
stream_bytes = ''.join(response.body_stream)
1375
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1377
def test_search(self):
1378
"""The search argument may be a 'search' of some explicit keys."""
1379
backing = self.get_transport()
1380
request = smart.repository.SmartServerRepositoryGetStream(backing)
1381
repo, r1, r2 = self.make_two_commit_repo()
1382
fetch_spec = ['search', '%s %s' % (r1, r2), 'null:', '2']
1383
lines = '\n'.join(fetch_spec)
1384
request.execute('', repo._format.network_name())
1385
response = request.do_body(lines)
1386
self.assertEqual(('ok',), response.args)
1387
stream_bytes = ''.join(response.body_stream)
1388
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1391
class TestSmartServerRequestHasRevision(tests.TestCaseWithMemoryTransport):
1393
def test_missing_revision(self):
1394
"""For a missing revision, ('no', ) is returned."""
1395
backing = self.get_transport()
1396
request = smart.repository.SmartServerRequestHasRevision(backing)
1397
self.make_repository('.')
1398
self.assertEqual(SmartServerResponse(('no', )),
1399
request.execute('', 'revid'))
1401
def test_present_revision(self):
1402
"""For a present revision, ('yes', ) is returned."""
1403
backing = self.get_transport()
1404
request = smart.repository.SmartServerRequestHasRevision(backing)
1405
tree = self.make_branch_and_memory_tree('.')
1408
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1409
r1 = tree.commit('a commit', rev_id=rev_id_utf8)
1411
self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
1412
self.assertEqual(SmartServerResponse(('yes', )),
1413
request.execute('', rev_id_utf8))
1416
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithMemoryTransport):
1418
def test_empty_revid(self):
1419
"""With an empty revid, we get only size an number and revisions"""
1420
backing = self.get_transport()
1421
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1422
repository = self.make_repository('.')
1423
stats = repository.gather_stats()
1424
expected_body = 'revisions: 0\n'
1425
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1426
request.execute('', '', 'no'))
1428
def test_revid_with_committers(self):
1429
"""For a revid we get more infos."""
1430
backing = self.get_transport()
1431
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1432
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1433
tree = self.make_branch_and_memory_tree('.')
1436
# Let's build a predictable result
1437
tree.commit('a commit', timestamp=123456.2, timezone=3600)
1438
tree.commit('a commit', timestamp=654321.4, timezone=0,
1442
stats = tree.branch.repository.gather_stats()
1443
expected_body = ('firstrev: 123456.200 3600\n'
1444
'latestrev: 654321.400 0\n'
1446
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1450
def test_not_empty_repository_with_committers(self):
1451
"""For a revid and requesting committers we get the whole thing."""
1452
backing = self.get_transport()
1453
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1454
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1455
tree = self.make_branch_and_memory_tree('.')
1458
# Let's build a predictable result
1459
tree.commit('a commit', timestamp=123456.2, timezone=3600,
1461
tree.commit('a commit', timestamp=654321.4, timezone=0,
1462
committer='bar', rev_id=rev_id_utf8)
1464
stats = tree.branch.repository.gather_stats()
1466
expected_body = ('committers: 2\n'
1467
'firstrev: 123456.200 3600\n'
1468
'latestrev: 654321.400 0\n'
1470
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1472
rev_id_utf8, 'yes'))
1475
class TestSmartServerRepositoryIsShared(tests.TestCaseWithMemoryTransport):
1477
def test_is_shared(self):
1478
"""For a shared repository, ('yes', ) is returned."""
1479
backing = self.get_transport()
1480
request = smart.repository.SmartServerRepositoryIsShared(backing)
1481
self.make_repository('.', shared=True)
1482
self.assertEqual(SmartServerResponse(('yes', )),
1483
request.execute('', ))
1485
def test_is_not_shared(self):
1486
"""For a shared repository, ('no', ) is returned."""
1487
backing = self.get_transport()
1488
request = smart.repository.SmartServerRepositoryIsShared(backing)
1489
self.make_repository('.', shared=False)
1490
self.assertEqual(SmartServerResponse(('no', )),
1491
request.execute('', ))
1494
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithMemoryTransport):
1496
def test_lock_write_on_unlocked_repo(self):
1497
backing = self.get_transport()
1498
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1499
repository = self.make_repository('.', format='knit')
1500
response = request.execute('')
1501
nonce = repository.control_files._lock.peek().get('nonce')
1502
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
1503
# The repository is now locked. Verify that with a new repository
1505
new_repo = repository.bzrdir.open_repository()
1506
self.assertRaises(errors.LockContention, new_repo.lock_write)
1508
request = smart.repository.SmartServerRepositoryUnlock(backing)
1509
response = request.execute('', nonce)
1511
def test_lock_write_on_locked_repo(self):
1512
backing = self.get_transport()
1513
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1514
repository = self.make_repository('.', format='knit')
1515
repo_token = repository.lock_write()
1516
repository.leave_lock_in_place()
1518
response = request.execute('')
1520
SmartServerResponse(('LockContention',)), response)
1522
repository.lock_write(repo_token)
1523
repository.dont_leave_lock_in_place()
1526
def test_lock_write_on_readonly_transport(self):
1527
backing = self.get_readonly_transport()
1528
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1529
repository = self.make_repository('.', format='knit')
1530
response = request.execute('')
1531
self.assertFalse(response.is_successful())
1532
self.assertEqual('LockFailed', response.args[0])
1535
class TestInsertStreamBase(tests.TestCaseWithMemoryTransport):
1537
def make_empty_byte_stream(self, repo):
1538
byte_stream = smart.repository._stream_to_byte_stream([], repo._format)
1539
return ''.join(byte_stream)
1542
class TestSmartServerRepositoryInsertStream(TestInsertStreamBase):
1544
def test_insert_stream_empty(self):
1545
backing = self.get_transport()
1546
request = smart.repository.SmartServerRepositoryInsertStream(backing)
1547
repository = self.make_repository('.')
1548
response = request.execute('', '')
1549
self.assertEqual(None, response)
1550
response = request.do_chunk(self.make_empty_byte_stream(repository))
1551
self.assertEqual(None, response)
1552
response = request.do_end()
1553
self.assertEqual(SmartServerResponse(('ok', )), response)
1556
class TestSmartServerRepositoryInsertStreamLocked(TestInsertStreamBase):
1558
def test_insert_stream_empty(self):
1559
backing = self.get_transport()
1560
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1562
repository = self.make_repository('.', format='knit')
1563
lock_token = repository.lock_write()
1564
response = request.execute('', '', lock_token)
1565
self.assertEqual(None, response)
1566
response = request.do_chunk(self.make_empty_byte_stream(repository))
1567
self.assertEqual(None, response)
1568
response = request.do_end()
1569
self.assertEqual(SmartServerResponse(('ok', )), response)
1572
def test_insert_stream_with_wrong_lock_token(self):
1573
backing = self.get_transport()
1574
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1576
repository = self.make_repository('.', format='knit')
1577
lock_token = repository.lock_write()
1579
errors.TokenMismatch, request.execute, '', '', 'wrong-token')
1583
class TestSmartServerRepositoryUnlock(tests.TestCaseWithMemoryTransport):
1586
tests.TestCaseWithMemoryTransport.setUp(self)
1588
def test_unlock_on_locked_repo(self):
1589
backing = self.get_transport()
1590
request = smart.repository.SmartServerRepositoryUnlock(backing)
1591
repository = self.make_repository('.', format='knit')
1592
token = repository.lock_write()
1593
repository.leave_lock_in_place()
1595
response = request.execute('', token)
1597
SmartServerResponse(('ok',)), response)
1598
# The repository is now unlocked. Verify that with a new repository
1600
new_repo = repository.bzrdir.open_repository()
1601
new_repo.lock_write()
1604
def test_unlock_on_unlocked_repo(self):
1605
backing = self.get_transport()
1606
request = smart.repository.SmartServerRepositoryUnlock(backing)
1607
repository = self.make_repository('.', format='knit')
1608
response = request.execute('', 'some token')
1610
SmartServerResponse(('TokenMismatch',)), response)
1613
class TestSmartServerIsReadonly(tests.TestCaseWithMemoryTransport):
1615
def test_is_readonly_no(self):
1616
backing = self.get_transport()
1617
request = smart.request.SmartServerIsReadonly(backing)
1618
response = request.execute()
1620
SmartServerResponse(('no',)), response)
1622
def test_is_readonly_yes(self):
1623
backing = self.get_readonly_transport()
1624
request = smart.request.SmartServerIsReadonly(backing)
1625
response = request.execute()
1627
SmartServerResponse(('yes',)), response)
1630
class TestSmartServerRepositorySetMakeWorkingTrees(tests.TestCaseWithMemoryTransport):
1632
def test_set_false(self):
1633
backing = self.get_transport()
1634
repo = self.make_repository('.', shared=True)
1635
repo.set_make_working_trees(True)
1636
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1637
request = request_class(backing)
1638
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1639
request.execute('', 'False'))
1640
repo = repo.bzrdir.open_repository()
1641
self.assertFalse(repo.make_working_trees())
1643
def test_set_true(self):
1644
backing = self.get_transport()
1645
repo = self.make_repository('.', shared=True)
1646
repo.set_make_working_trees(False)
1647
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1648
request = request_class(backing)
1649
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1650
request.execute('', 'True'))
1651
repo = repo.bzrdir.open_repository()
1652
self.assertTrue(repo.make_working_trees())
1655
class TestSmartServerPackRepositoryAutopack(tests.TestCaseWithTransport):
1657
def make_repo_needing_autopacking(self, path='.'):
1658
# Make a repo in need of autopacking.
1659
tree = self.make_branch_and_tree('.', format='pack-0.92')
1660
repo = tree.branch.repository
1661
# monkey-patch the pack collection to disable autopacking
1662
repo._pack_collection._max_pack_count = lambda count: count
1664
tree.commit('commit %s' % x)
1665
self.assertEqual(10, len(repo._pack_collection.names()))
1666
del repo._pack_collection._max_pack_count
1669
def test_autopack_needed(self):
1670
repo = self.make_repo_needing_autopacking()
1672
self.addCleanup(repo.unlock)
1673
backing = self.get_transport()
1674
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1676
response = request.execute('')
1677
self.assertEqual(SmartServerResponse(('ok',)), response)
1678
repo._pack_collection.reload_pack_names()
1679
self.assertEqual(1, len(repo._pack_collection.names()))
1681
def test_autopack_not_needed(self):
1682
tree = self.make_branch_and_tree('.', format='pack-0.92')
1683
repo = tree.branch.repository
1685
self.addCleanup(repo.unlock)
1687
tree.commit('commit %s' % x)
1688
backing = self.get_transport()
1689
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1691
response = request.execute('')
1692
self.assertEqual(SmartServerResponse(('ok',)), response)
1693
repo._pack_collection.reload_pack_names()
1694
self.assertEqual(9, len(repo._pack_collection.names()))
1696
def test_autopack_on_nonpack_format(self):
1697
"""A request to autopack a non-pack repo is a no-op."""
1698
repo = self.make_repository('.', format='knit')
1699
backing = self.get_transport()
1700
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1702
response = request.execute('')
1703
self.assertEqual(SmartServerResponse(('ok',)), response)
1706
class TestSmartServerVfsGet(tests.TestCaseWithMemoryTransport):
1708
def test_unicode_path(self):
1709
"""VFS requests expect unicode paths to be escaped."""
1710
filename = u'foo\N{INTERROBANG}'
1711
filename_escaped = urlutils.escape(filename)
1712
backing = self.get_transport()
1713
request = smart.vfs.GetRequest(backing)
1714
backing.put_bytes_non_atomic(filename_escaped, 'contents')
1715
self.assertEqual(SmartServerResponse(('ok', ), 'contents'),
1716
request.execute(filename_escaped))
1719
class TestHandlers(tests.TestCase):
1720
"""Tests for the request.request_handlers object."""
1722
def test_all_registrations_exist(self):
1723
"""All registered request_handlers can be found."""
1724
# If there's a typo in a register_lazy call, this loop will fail with
1725
# an AttributeError.
1726
for key, item in smart.request.request_handlers.iteritems():
1729
def assertHandlerEqual(self, verb, handler):
1730
self.assertEqual(smart.request.request_handlers.get(verb), handler)
1732
def test_registered_methods(self):
1733
"""Test that known methods are registered to the correct object."""
1734
self.assertHandlerEqual('Branch.get_config_file',
1735
smart.branch.SmartServerBranchGetConfigFile)
1736
self.assertHandlerEqual('Branch.get_parent',
1737
smart.branch.SmartServerBranchGetParent)
1738
self.assertHandlerEqual('Branch.get_tags_bytes',
1739
smart.branch.SmartServerBranchGetTagsBytes)
1740
self.assertHandlerEqual('Branch.lock_write',
1741
smart.branch.SmartServerBranchRequestLockWrite)
1742
self.assertHandlerEqual('Branch.last_revision_info',
1743
smart.branch.SmartServerBranchRequestLastRevisionInfo)
1744
self.assertHandlerEqual('Branch.revision_history',
1745
smart.branch.SmartServerRequestRevisionHistory)
1746
self.assertHandlerEqual('Branch.set_config_option',
1747
smart.branch.SmartServerBranchRequestSetConfigOption)
1748
self.assertHandlerEqual('Branch.set_last_revision',
1749
smart.branch.SmartServerBranchRequestSetLastRevision)
1750
self.assertHandlerEqual('Branch.set_last_revision_info',
1751
smart.branch.SmartServerBranchRequestSetLastRevisionInfo)
1752
self.assertHandlerEqual('Branch.set_last_revision_ex',
1753
smart.branch.SmartServerBranchRequestSetLastRevisionEx)
1754
self.assertHandlerEqual('Branch.set_parent_location',
1755
smart.branch.SmartServerBranchRequestSetParentLocation)
1756
self.assertHandlerEqual('Branch.unlock',
1757
smart.branch.SmartServerBranchRequestUnlock)
1758
self.assertHandlerEqual('BzrDir.find_repository',
1759
smart.bzrdir.SmartServerRequestFindRepositoryV1)
1760
self.assertHandlerEqual('BzrDir.find_repositoryV2',
1761
smart.bzrdir.SmartServerRequestFindRepositoryV2)
1762
self.assertHandlerEqual('BzrDirFormat.initialize',
1763
smart.bzrdir.SmartServerRequestInitializeBzrDir)
1764
self.assertHandlerEqual('BzrDirFormat.initialize_ex_1.16',
1765
smart.bzrdir.SmartServerRequestBzrDirInitializeEx)
1766
self.assertHandlerEqual('BzrDir.cloning_metadir',
1767
smart.bzrdir.SmartServerBzrDirRequestCloningMetaDir)
1768
self.assertHandlerEqual('BzrDir.get_config_file',
1769
smart.bzrdir.SmartServerBzrDirRequestConfigFile)
1770
self.assertHandlerEqual('BzrDir.open_branch',
1771
smart.bzrdir.SmartServerRequestOpenBranch)
1772
self.assertHandlerEqual('BzrDir.open_branchV2',
1773
smart.bzrdir.SmartServerRequestOpenBranchV2)
1774
self.assertHandlerEqual('PackRepository.autopack',
1775
smart.packrepository.SmartServerPackRepositoryAutopack)
1776
self.assertHandlerEqual('Repository.gather_stats',
1777
smart.repository.SmartServerRepositoryGatherStats)
1778
self.assertHandlerEqual('Repository.get_parent_map',
1779
smart.repository.SmartServerRepositoryGetParentMap)
1780
self.assertHandlerEqual('Repository.get_rev_id_for_revno',
1781
smart.repository.SmartServerRepositoryGetRevIdForRevno)
1782
self.assertHandlerEqual('Repository.get_revision_graph',
1783
smart.repository.SmartServerRepositoryGetRevisionGraph)
1784
self.assertHandlerEqual('Repository.get_stream',
1785
smart.repository.SmartServerRepositoryGetStream)
1786
self.assertHandlerEqual('Repository.has_revision',
1787
smart.repository.SmartServerRequestHasRevision)
1788
self.assertHandlerEqual('Repository.insert_stream',
1789
smart.repository.SmartServerRepositoryInsertStream)
1790
self.assertHandlerEqual('Repository.insert_stream_locked',
1791
smart.repository.SmartServerRepositoryInsertStreamLocked)
1792
self.assertHandlerEqual('Repository.is_shared',
1793
smart.repository.SmartServerRepositoryIsShared)
1794
self.assertHandlerEqual('Repository.lock_write',
1795
smart.repository.SmartServerRepositoryLockWrite)
1796
self.assertHandlerEqual('Repository.tarball',
1797
smart.repository.SmartServerRepositoryTarball)
1798
self.assertHandlerEqual('Repository.unlock',
1799
smart.repository.SmartServerRepositoryUnlock)
1800
self.assertHandlerEqual('Transport.is_readonly',
1801
smart.request.SmartServerIsReadonly)