1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the smart wire/domain protococl."""
19
from bzrlib import bzrdir, errors, smart, tests
20
from bzrlib.smart.request import SmartServerResponse
21
import bzrlib.smart.bzrdir
22
import bzrlib.smart.branch
23
import bzrlib.smart.repository
26
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
29
super(TestCaseWithSmartMedium, self).setUp()
30
# We're allowed to set the transport class here, so that we don't use
31
# the default or a parameterized class, but rather use the
32
# TestCaseWithTransport infrastructure to set up a smart server and
34
self.transport_server = smart.server.SmartTCPServer_for_testing
36
def get_smart_medium(self):
37
"""Get a smart medium to use in tests."""
38
return self.get_transport().get_smart_medium()
41
class TestSmartServerResponse(tests.TestCase):
44
self.assertEqual(SmartServerResponse(('ok', )),
45
SmartServerResponse(('ok', )))
46
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
47
SmartServerResponse(('ok', ), 'body'))
48
self.assertNotEqual(SmartServerResponse(('ok', )),
49
SmartServerResponse(('notok', )))
50
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
51
SmartServerResponse(('ok', )))
52
self.assertNotEqual(None,
53
SmartServerResponse(('ok', )))
56
class TestSmartServerRequestFindRepository(tests.TestCaseWithTransport):
57
"""Tests for BzrDir.find_repository."""
59
def test_no_repository(self):
60
"""When there is no repository to be found, ('norepository', ) is returned."""
61
backing = self.get_transport()
62
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
64
self.assertEqual(SmartServerResponse(('norepository', )),
65
request.execute(backing.local_abspath('')))
67
def test_nonshared_repository(self):
68
# nonshared repositorys only allow 'find' to return a handle when the
69
# path the repository is being searched on is the same as that that
70
# the repository is at.
71
backing = self.get_transport()
72
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
73
result = self._make_repository_and_result()
74
self.assertEqual(result, request.execute(backing.local_abspath('')))
75
self.make_bzrdir('subdir')
76
self.assertEqual(SmartServerResponse(('norepository', )),
77
request.execute(backing.local_abspath('subdir')))
79
def _make_repository_and_result(self, shared=False, format=None):
80
"""Convenience function to setup a repository.
82
:result: The SmartServerResponse to expect when opening it.
84
repo = self.make_repository('.', shared=shared, format=format)
85
if repo.supports_rich_root():
89
if repo._format.supports_tree_reference:
93
return SmartServerResponse(('ok', '', rich_root, subtrees))
95
def test_shared_repository(self):
96
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
97
backing = self.get_transport()
98
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
99
result = self._make_repository_and_result(shared=True)
100
self.assertEqual(result, request.execute(backing.local_abspath('')))
101
self.make_bzrdir('subdir')
102
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
103
self.assertEqual(result2,
104
request.execute(backing.local_abspath('subdir')))
105
self.make_bzrdir('subdir/deeper')
106
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
107
self.assertEqual(result3,
108
request.execute(backing.local_abspath('subdir/deeper')))
110
def test_rich_root_and_subtree_encoding(self):
111
"""Test for the format attributes for rich root and subtree support."""
112
backing = self.get_transport()
113
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
114
result = self._make_repository_and_result(format='dirstate-with-subtree')
115
# check the test will be valid
116
self.assertEqual('yes', result.args[2])
117
self.assertEqual('yes', result.args[3])
118
self.assertEqual(result, request.execute(backing.local_abspath('')))
121
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithTransport):
123
def test_empty_dir(self):
124
"""Initializing an empty dir should succeed and do it."""
125
backing = self.get_transport()
126
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
127
self.assertEqual(SmartServerResponse(('ok', )),
128
request.execute(backing.local_abspath('.')))
129
made_dir = bzrdir.BzrDir.open_from_transport(backing)
130
# no branch, tree or repository is expected with the current
132
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
133
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
134
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
136
def test_missing_dir(self):
137
"""Initializing a missing directory should fail like the bzrdir api."""
138
backing = self.get_transport()
139
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
140
self.assertRaises(errors.NoSuchFile,
141
request.execute, backing.local_abspath('subdir'))
143
def test_initialized_dir(self):
144
"""Initializing an extant bzrdir should fail like the bzrdir api."""
145
backing = self.get_transport()
146
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
147
self.make_bzrdir('subdir')
148
self.assertRaises(errors.FileExists,
149
request.execute, backing.local_abspath('subdir'))
152
class TestSmartServerRequestOpenBranch(tests.TestCaseWithTransport):
154
def test_no_branch(self):
155
"""When there is no branch, ('nobranch', ) is returned."""
156
backing = self.get_transport()
157
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
158
self.make_bzrdir('.')
159
self.assertEqual(SmartServerResponse(('nobranch', )),
160
request.execute(backing.local_abspath('')))
162
def test_branch(self):
163
"""When there is a branch, 'ok' is returned."""
164
backing = self.get_transport()
165
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
166
self.make_branch('.')
167
self.assertEqual(SmartServerResponse(('ok', '')),
168
request.execute(backing.local_abspath('')))
170
def test_branch_reference(self):
171
"""When there is a branch reference, the reference URL is returned."""
172
backing = self.get_transport()
173
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
174
branch = self.make_branch('branch')
175
checkout = branch.create_checkout('reference',lightweight=True)
176
# TODO: once we have an API to probe for references of any sort, we
178
reference_url = backing.abspath('branch') + '/'
179
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
180
self.assertEqual(SmartServerResponse(('ok', reference_url)),
181
request.execute(backing.local_abspath('reference')))
184
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithTransport):
186
def test_empty(self):
187
"""For an empty branch, the body is empty."""
188
backing = self.get_transport()
189
request = smart.branch.SmartServerRequestRevisionHistory(backing)
190
self.make_branch('.')
191
self.assertEqual(SmartServerResponse(('ok', ), ''),
192
request.execute(backing.local_abspath('')))
194
def test_not_empty(self):
195
"""For a non-empty branch, the body is empty."""
196
backing = self.get_transport()
197
request = smart.branch.SmartServerRequestRevisionHistory(backing)
198
tree = self.make_branch_and_memory_tree('.')
201
r1 = tree.commit('1st commit')
202
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
205
SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
206
request.execute(backing.local_abspath('')))
209
class TestSmartServerBranchRequest(tests.TestCaseWithTransport):
211
def test_no_branch(self):
212
"""When there is a bzrdir and no branch, NotBranchError is raised."""
213
backing = self.get_transport()
214
request = smart.branch.SmartServerBranchRequest(backing)
215
self.make_bzrdir('.')
216
self.assertRaises(errors.NotBranchError,
217
request.execute, backing.local_abspath(''))
219
def test_branch_reference(self):
220
"""When there is a branch reference, NotBranchError is raised."""
221
backing = self.get_transport()
222
request = smart.branch.SmartServerBranchRequest(backing)
223
branch = self.make_branch('branch')
224
checkout = branch.create_checkout('reference',lightweight=True)
225
self.assertRaises(errors.NotBranchError,
226
request.execute, backing.local_abspath('checkout'))
229
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithTransport):
231
def test_empty(self):
232
"""For an empty branch, the result is ('ok', '0', 'null:')."""
233
backing = self.get_transport()
234
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
235
self.make_branch('.')
236
self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
237
request.execute(backing.local_abspath('')))
239
def test_not_empty(self):
240
"""For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
241
backing = self.get_transport()
242
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
243
tree = self.make_branch_and_memory_tree('.')
246
rev_id_utf8 = u'\xc8'.encode('utf-8')
247
r1 = tree.commit('1st commit')
248
r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
251
SmartServerResponse(('ok', '2', rev_id_utf8)),
252
request.execute(backing.local_abspath('')))
255
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithTransport):
257
def test_default(self):
258
"""With no file, we get empty content."""
259
backing = self.get_transport()
260
request = smart.branch.SmartServerBranchGetConfigFile(backing)
261
branch = self.make_branch('.')
262
# there should be no file by default
264
self.assertEqual(SmartServerResponse(('ok', ), content),
265
request.execute(backing.local_abspath('')))
267
def test_with_content(self):
268
# SmartServerBranchGetConfigFile should return the content from
269
# branch.control_files.get('branch.conf') for now - in the future it may
270
# perform more complex processing.
271
backing = self.get_transport()
272
request = smart.branch.SmartServerBranchGetConfigFile(backing)
273
branch = self.make_branch('.')
274
branch.control_files.put_utf8('branch.conf', 'foo bar baz')
275
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
276
request.execute(backing.local_abspath('')))
279
class TestSmartServerBranchRequestSetLastRevision(tests.TestCaseWithTransport):
281
def test_empty(self):
282
backing = self.get_transport()
283
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
284
b = self.make_branch('.')
285
branch_token = b.lock_write()
286
repo_token = b.repository.lock_write()
287
b.repository.unlock()
289
self.assertEqual(SmartServerResponse(('ok',)),
291
backing.local_abspath(''), branch_token, repo_token,
296
def test_not_present_revision_id(self):
297
backing = self.get_transport()
298
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
299
b = self.make_branch('.')
300
branch_token = b.lock_write()
301
repo_token = b.repository.lock_write()
302
b.repository.unlock()
304
revision_id = 'non-existent revision'
306
SmartServerResponse(('NoSuchRevision', revision_id)),
308
backing.local_abspath(''), branch_token, repo_token,
313
def test_revision_id_present(self):
314
backing = self.get_transport()
315
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
316
tree = self.make_branch_and_memory_tree('.')
319
rev_id_utf8 = u'\xc8'.encode('utf-8')
320
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
321
r2 = tree.commit('2nd commit')
323
branch_token = tree.branch.lock_write()
324
repo_token = tree.branch.repository.lock_write()
325
tree.branch.repository.unlock()
328
SmartServerResponse(('ok',)),
330
backing.local_abspath(''), branch_token, repo_token,
332
self.assertEqual([rev_id_utf8], tree.branch.revision_history())
336
def test_revision_id_present2(self):
337
backing = self.get_transport()
338
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
339
tree = self.make_branch_and_memory_tree('.')
342
rev_id_utf8 = u'\xc8'.encode('utf-8')
343
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
344
r2 = tree.commit('2nd commit')
346
tree.branch.set_revision_history([])
347
branch_token = tree.branch.lock_write()
348
repo_token = tree.branch.repository.lock_write()
349
tree.branch.repository.unlock()
352
SmartServerResponse(('ok',)),
354
backing.local_abspath(''), branch_token, repo_token,
356
self.assertEqual([rev_id_utf8], tree.branch.revision_history())
361
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithTransport):
364
tests.TestCaseWithTransport.setUp(self)
365
self.reduceLockdirTimeout()
367
def test_lock_write_on_unlocked_branch(self):
368
backing = self.get_transport()
369
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
370
branch = self.make_branch('.')
371
repository = branch.repository
372
response = request.execute(backing.local_abspath(''))
373
branch_nonce = branch.control_files._lock.peek().get('nonce')
374
repository_nonce = repository.control_files._lock.peek().get('nonce')
376
SmartServerResponse(('ok', branch_nonce, repository_nonce)),
378
# The branch (and associated repository) is now locked. Verify that
379
# with a new branch object.
380
new_branch = repository.bzrdir.open_branch()
381
self.assertRaises(errors.LockContention, new_branch.lock_write)
383
def test_lock_write_on_locked_branch(self):
384
backing = self.get_transport()
385
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
386
branch = self.make_branch('.')
388
branch.leave_lock_in_place()
390
response = request.execute(backing.local_abspath(''))
392
SmartServerResponse(('LockContention',)), response)
394
def test_lock_write_with_tokens_on_locked_branch(self):
395
backing = self.get_transport()
396
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
397
branch = self.make_branch('.')
398
branch_token = branch.lock_write()
399
repo_token = branch.repository.lock_write()
400
branch.repository.unlock()
401
branch.leave_lock_in_place()
402
branch.repository.leave_lock_in_place()
404
response = request.execute(backing.local_abspath(''),
405
branch_token, repo_token)
407
SmartServerResponse(('ok', branch_token, repo_token)), response)
409
def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
410
backing = self.get_transport()
411
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
412
branch = self.make_branch('.')
413
branch_token = branch.lock_write()
414
repo_token = branch.repository.lock_write()
415
branch.repository.unlock()
416
branch.leave_lock_in_place()
417
branch.repository.leave_lock_in_place()
419
response = request.execute(backing.local_abspath(''),
420
branch_token+'xxx', repo_token)
422
SmartServerResponse(('TokenMismatch',)), response)
424
def test_lock_write_on_locked_repo(self):
425
backing = self.get_transport()
426
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
427
branch = self.make_branch('.')
428
branch.repository.lock_write()
429
branch.repository.leave_lock_in_place()
430
branch.repository.unlock()
431
response = request.execute(backing.local_abspath(''))
433
SmartServerResponse(('LockContention',)), response)
435
def test_lock_write_on_readonly_transport(self):
436
backing = self.get_readonly_transport()
437
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
438
branch = self.make_branch('.')
439
response = request.execute('')
441
SmartServerResponse(('UnlockableTransport',)), response)
444
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithTransport):
447
tests.TestCaseWithTransport.setUp(self)
448
self.reduceLockdirTimeout()
450
def test_unlock_on_locked_branch_and_repo(self):
451
backing = self.get_transport()
452
request = smart.branch.SmartServerBranchRequestUnlock(backing)
453
branch = self.make_branch('.')
455
branch_token = branch.lock_write()
456
repo_token = branch.repository.lock_write()
457
branch.repository.unlock()
458
# Unlock the branch (and repo) object, leaving the physical locks
460
branch.leave_lock_in_place()
461
branch.repository.leave_lock_in_place()
463
response = request.execute(backing.local_abspath(''),
464
branch_token, repo_token)
466
SmartServerResponse(('ok',)), response)
467
# The branch is now unlocked. Verify that with a new branch
469
new_branch = branch.bzrdir.open_branch()
470
new_branch.lock_write()
473
def test_unlock_on_unlocked_branch_unlocked_repo(self):
474
backing = self.get_transport()
475
request = smart.branch.SmartServerBranchRequestUnlock(backing)
476
branch = self.make_branch('.')
477
response = request.execute(
478
backing.local_abspath(''), 'branch token', 'repo token')
480
SmartServerResponse(('TokenMismatch',)), response)
482
def test_unlock_on_unlocked_branch_locked_repo(self):
483
backing = self.get_transport()
484
request = smart.branch.SmartServerBranchRequestUnlock(backing)
485
branch = self.make_branch('.')
486
# Lock the repository.
487
repo_token = branch.repository.lock_write()
488
branch.repository.leave_lock_in_place()
489
branch.repository.unlock()
490
# Issue branch lock_write request on the unlocked branch (with locked
492
response = request.execute(
493
backing.local_abspath(''), 'branch token', repo_token)
495
SmartServerResponse(('TokenMismatch',)), response)
498
class TestSmartServerRepositoryRequest(tests.TestCaseWithTransport):
500
def test_no_repository(self):
501
"""Raise NoRepositoryPresent when there is a bzrdir and no repo."""
502
# we test this using a shared repository above the named path,
503
# thus checking the right search logic is used - that is, that
504
# its the exact path being looked at and the server is not
506
backing = self.get_transport()
507
request = smart.repository.SmartServerRepositoryRequest(backing)
508
self.make_repository('.', shared=True)
509
self.make_bzrdir('subdir')
510
self.assertRaises(errors.NoRepositoryPresent,
511
request.execute, backing.local_abspath('subdir'))
514
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithTransport):
516
def test_none_argument(self):
517
backing = self.get_transport()
518
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
519
tree = self.make_branch_and_memory_tree('.')
522
r1 = tree.commit('1st commit')
523
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
526
# the lines of revision_id->revision_parent_list has no guaranteed
527
# order coming out of a dict, so sort both our test and response
528
lines = sorted([' '.join([r2, r1]), r1])
529
response = request.execute(backing.local_abspath(''), '')
530
response.body = '\n'.join(sorted(response.body.split('\n')))
533
SmartServerResponse(('ok', ), '\n'.join(lines)), response)
535
def test_specific_revision_argument(self):
536
backing = self.get_transport()
537
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
538
tree = self.make_branch_and_memory_tree('.')
541
rev_id_utf8 = u'\xc9'.encode('utf-8')
542
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
543
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
546
self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
547
request.execute(backing.local_abspath(''), rev_id_utf8))
549
def test_no_such_revision(self):
550
backing = self.get_transport()
551
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
552
tree = self.make_branch_and_memory_tree('.')
555
r1 = tree.commit('1st commit')
558
# Note that it still returns body (of zero bytes).
560
SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
561
request.execute(backing.local_abspath(''), 'missingrevision'))
564
class TestSmartServerRequestHasRevision(tests.TestCaseWithTransport):
566
def test_missing_revision(self):
567
"""For a missing revision, ('no', ) is returned."""
568
backing = self.get_transport()
569
request = smart.repository.SmartServerRequestHasRevision(backing)
570
self.make_repository('.')
571
self.assertEqual(SmartServerResponse(('no', )),
572
request.execute(backing.local_abspath(''), 'revid'))
574
def test_present_revision(self):
575
"""For a present revision, ('yes', ) is returned."""
576
backing = self.get_transport()
577
request = smart.repository.SmartServerRequestHasRevision(backing)
578
tree = self.make_branch_and_memory_tree('.')
581
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
582
r1 = tree.commit('a commit', rev_id=rev_id_utf8)
584
self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
585
self.assertEqual(SmartServerResponse(('yes', )),
586
request.execute(backing.local_abspath(''), rev_id_utf8))
589
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithTransport):
591
def test_empty_revid(self):
592
"""With an empty revid, we get only size an number and revisions"""
593
backing = self.get_transport()
594
request = smart.repository.SmartServerRepositoryGatherStats(backing)
595
repository = self.make_repository('.')
596
stats = repository.gather_stats()
598
expected_body = 'revisions: 0\nsize: %d\n' % size
599
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
600
request.execute(backing.local_abspath(''), '', 'no'))
602
def test_revid_with_committers(self):
603
"""For a revid we get more infos."""
604
backing = self.get_transport()
605
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
606
request = smart.repository.SmartServerRepositoryGatherStats(backing)
607
tree = self.make_branch_and_memory_tree('.')
610
# Let's build a predictable result
611
tree.commit('a commit', timestamp=123456.2, timezone=3600)
612
tree.commit('a commit', timestamp=654321.4, timezone=0,
616
stats = tree.branch.repository.gather_stats()
618
expected_body = ('firstrev: 123456.200 3600\n'
619
'latestrev: 654321.400 0\n'
622
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
623
request.execute(backing.local_abspath(''),
626
def test_not_empty_repository_with_committers(self):
627
"""For a revid and requesting committers we get the whole thing."""
628
backing = self.get_transport()
629
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
630
request = smart.repository.SmartServerRepositoryGatherStats(backing)
631
tree = self.make_branch_and_memory_tree('.')
634
# Let's build a predictable result
635
tree.commit('a commit', timestamp=123456.2, timezone=3600,
637
tree.commit('a commit', timestamp=654321.4, timezone=0,
638
committer='bar', rev_id=rev_id_utf8)
640
stats = tree.branch.repository.gather_stats()
643
expected_body = ('committers: 2\n'
644
'firstrev: 123456.200 3600\n'
645
'latestrev: 654321.400 0\n'
648
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
649
request.execute(backing.local_abspath(''),
653
class TestSmartServerRepositoryIsShared(tests.TestCaseWithTransport):
655
def test_is_shared(self):
656
"""For a shared repository, ('yes', ) is returned."""
657
backing = self.get_transport()
658
request = smart.repository.SmartServerRepositoryIsShared(backing)
659
self.make_repository('.', shared=True)
660
self.assertEqual(SmartServerResponse(('yes', )),
661
request.execute(backing.local_abspath(''), ))
663
def test_is_not_shared(self):
664
"""For a shared repository, ('no', ) is returned."""
665
backing = self.get_transport()
666
request = smart.repository.SmartServerRepositoryIsShared(backing)
667
self.make_repository('.', shared=False)
668
self.assertEqual(SmartServerResponse(('no', )),
669
request.execute(backing.local_abspath(''), ))
672
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithTransport):
675
tests.TestCaseWithTransport.setUp(self)
676
self.reduceLockdirTimeout()
678
def test_lock_write_on_unlocked_repo(self):
679
backing = self.get_transport()
680
request = smart.repository.SmartServerRepositoryLockWrite(backing)
681
repository = self.make_repository('.')
682
response = request.execute(backing.local_abspath(''))
683
nonce = repository.control_files._lock.peek().get('nonce')
684
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
685
# The repository is now locked. Verify that with a new repository
687
new_repo = repository.bzrdir.open_repository()
688
self.assertRaises(errors.LockContention, new_repo.lock_write)
690
def test_lock_write_on_locked_repo(self):
691
backing = self.get_transport()
692
request = smart.repository.SmartServerRepositoryLockWrite(backing)
693
repository = self.make_repository('.')
694
repository.lock_write()
695
repository.leave_lock_in_place()
697
response = request.execute(backing.local_abspath(''))
699
SmartServerResponse(('LockContention',)), response)
701
def test_lock_write_on_readonly_transport(self):
702
backing = self.get_readonly_transport()
703
request = smart.repository.SmartServerRepositoryLockWrite(backing)
704
repository = self.make_repository('.')
705
response = request.execute('')
707
SmartServerResponse(('UnlockableTransport',)), response)
710
class TestSmartServerRepositoryUnlock(tests.TestCaseWithTransport):
713
tests.TestCaseWithTransport.setUp(self)
714
self.reduceLockdirTimeout()
716
def test_unlock_on_locked_repo(self):
717
backing = self.get_transport()
718
request = smart.repository.SmartServerRepositoryUnlock(backing)
719
repository = self.make_repository('.')
720
token = repository.lock_write()
721
repository.leave_lock_in_place()
723
response = request.execute(backing.local_abspath(''), token)
725
SmartServerResponse(('ok',)), response)
726
# The repository is now unlocked. Verify that with a new repository
728
new_repo = repository.bzrdir.open_repository()
729
new_repo.lock_write()
732
def test_unlock_on_unlocked_repo(self):
733
backing = self.get_transport()
734
request = smart.repository.SmartServerRepositoryUnlock(backing)
735
repository = self.make_repository('.')
736
response = request.execute(backing.local_abspath(''), 'some token')
738
SmartServerResponse(('TokenMismatch',)), response)
741
class TestSmartServerIsReadonly(tests.TestCaseWithTransport):
743
def test_is_readonly_no(self):
744
backing = self.get_transport()
745
request = smart.request.SmartServerIsReadonly(backing)
746
response = request.execute()
748
SmartServerResponse(('no',)), response)
750
def test_is_readonly_yes(self):
751
backing = self.get_readonly_transport()
752
request = smart.request.SmartServerIsReadonly(backing)
753
response = request.execute()
755
SmartServerResponse(('yes',)), response)
758
class TestHandlers(tests.TestCase):
759
"""Tests for the request.request_handlers object."""
761
def test_registered_methods(self):
762
"""Test that known methods are registered to the correct object."""
764
smart.request.request_handlers.get('Branch.get_config_file'),
765
smart.branch.SmartServerBranchGetConfigFile)
767
smart.request.request_handlers.get('Branch.lock_write'),
768
smart.branch.SmartServerBranchRequestLockWrite)
770
smart.request.request_handlers.get('Branch.last_revision_info'),
771
smart.branch.SmartServerBranchRequestLastRevisionInfo)
773
smart.request.request_handlers.get('Branch.revision_history'),
774
smart.branch.SmartServerRequestRevisionHistory)
776
smart.request.request_handlers.get('Branch.set_last_revision'),
777
smart.branch.SmartServerBranchRequestSetLastRevision)
779
smart.request.request_handlers.get('Branch.unlock'),
780
smart.branch.SmartServerBranchRequestUnlock)
782
smart.request.request_handlers.get('BzrDir.find_repository'),
783
smart.bzrdir.SmartServerRequestFindRepository)
785
smart.request.request_handlers.get('BzrDirFormat.initialize'),
786
smart.bzrdir.SmartServerRequestInitializeBzrDir)
788
smart.request.request_handlers.get('BzrDir.open_branch'),
789
smart.bzrdir.SmartServerRequestOpenBranch)
791
smart.request.request_handlers.get('Repository.gather_stats'),
792
smart.repository.SmartServerRepositoryGatherStats)
794
smart.request.request_handlers.get('Repository.get_revision_graph'),
795
smart.repository.SmartServerRepositoryGetRevisionGraph)
797
smart.request.request_handlers.get('Repository.has_revision'),
798
smart.repository.SmartServerRequestHasRevision)
800
smart.request.request_handlers.get('Repository.is_shared'),
801
smart.repository.SmartServerRepositoryIsShared)
803
smart.request.request_handlers.get('Repository.lock_write'),
804
smart.repository.SmartServerRepositoryLockWrite)
806
smart.request.request_handlers.get('Repository.unlock'),
807
smart.repository.SmartServerRepositoryUnlock)
809
smart.request.request_handlers.get('Transport.is_readonly'),
810
smart.request.SmartServerIsReadonly)