1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the smart wire/domain protococl."""
19
from bzrlib import bzrdir, errors, smart, tests
20
from bzrlib.smart.request import SmartServerResponse
21
import bzrlib.smart.bzrdir
22
import bzrlib.smart.branch
23
import bzrlib.smart.repository
26
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
29
super(TestCaseWithSmartMedium, self).setUp()
30
# We're allowed to set the transport class here, so that we don't use
31
# the default or a parameterized class, but rather use the
32
# TestCaseWithTransport infrastructure to set up a smart server and
34
self.transport_server = smart.server.SmartTCPServer_for_testing
36
def get_smart_medium(self):
37
"""Get a smart medium to use in tests."""
38
return self.get_transport().get_smart_medium()
41
class TestSmartServerResponse(tests.TestCase):
44
self.assertEqual(SmartServerResponse(('ok', )),
45
SmartServerResponse(('ok', )))
46
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
47
SmartServerResponse(('ok', ), 'body'))
48
self.assertNotEqual(SmartServerResponse(('ok', )),
49
SmartServerResponse(('notok', )))
50
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
51
SmartServerResponse(('ok', )))
52
self.assertNotEqual(None,
53
SmartServerResponse(('ok', )))
56
class TestSmartServerRequestFindRepository(tests.TestCaseWithTransport):
57
"""Tests for BzrDir.find_repository."""
59
def test_no_repository(self):
60
"""When there is no repository to be found, ('norepository', ) is returned."""
61
backing = self.get_transport()
62
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
64
self.assertEqual(SmartServerResponse(('norepository', )),
65
request.execute(backing.local_abspath('')))
67
def test_nonshared_repository(self):
68
# nonshared repositorys only allow 'find' to return a handle when the
69
# path the repository is being searched on is the same as that that
70
# the repository is at.
71
backing = self.get_transport()
72
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
73
result = self._make_repository_and_result()
74
self.assertEqual(result, request.execute(backing.local_abspath('')))
75
self.make_bzrdir('subdir')
76
self.assertEqual(SmartServerResponse(('norepository', )),
77
request.execute(backing.local_abspath('subdir')))
79
def _make_repository_and_result(self, shared=False, format=None):
80
"""Convenience function to setup a repository.
82
:result: The SmartServerResponse to expect when opening it.
84
repo = self.make_repository('.', shared=shared, format=format)
85
if repo.supports_rich_root():
89
if repo._format.supports_tree_reference:
93
return SmartServerResponse(('ok', '', rich_root, subtrees))
95
def test_shared_repository(self):
96
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
97
backing = self.get_transport()
98
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
99
result = self._make_repository_and_result(shared=True)
100
self.assertEqual(result, request.execute(backing.local_abspath('')))
101
self.make_bzrdir('subdir')
102
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
103
self.assertEqual(result2,
104
request.execute(backing.local_abspath('subdir')))
105
self.make_bzrdir('subdir/deeper')
106
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
107
self.assertEqual(result3,
108
request.execute(backing.local_abspath('subdir/deeper')))
110
def test_rich_root_and_subtree_encoding(self):
111
"""Test for the format attributes for rich root and subtree support."""
112
backing = self.get_transport()
113
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
114
result = self._make_repository_and_result(format='dirstate-with-subtree')
115
# check the test will be valid
116
self.assertEqual('True', result.args[2])
117
self.assertEqual('True', result.args[3])
118
self.assertEqual(result, request.execute(backing.local_abspath('')))
121
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithTransport):
123
def test_empty_dir(self):
124
"""Initializing an empty dir should succeed and do it."""
125
backing = self.get_transport()
126
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
127
self.assertEqual(SmartServerResponse(('ok', )),
128
request.execute(backing.local_abspath('.')))
129
made_dir = bzrdir.BzrDir.open_from_transport(backing)
130
# no branch, tree or repository is expected with the current
132
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
133
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
134
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
136
def test_missing_dir(self):
137
"""Initializing a missing directory should fail like the bzrdir api."""
138
backing = self.get_transport()
139
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
140
self.assertRaises(errors.NoSuchFile,
141
request.execute, backing.local_abspath('subdir'))
143
def test_initialized_dir(self):
144
"""Initializing an extant bzrdir should fail like the bzrdir api."""
145
backing = self.get_transport()
146
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
147
self.make_bzrdir('subdir')
148
self.assertRaises(errors.FileExists,
149
request.execute, backing.local_abspath('subdir'))
152
class TestSmartServerRequestOpenBranch(tests.TestCaseWithTransport):
154
def test_no_branch(self):
155
"""When there is no branch, ('nobranch', ) is returned."""
156
backing = self.get_transport()
157
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
158
self.make_bzrdir('.')
159
self.assertEqual(SmartServerResponse(('nobranch', )),
160
request.execute(backing.local_abspath('')))
162
def test_branch(self):
163
"""When there is a branch, 'ok' is returned."""
164
backing = self.get_transport()
165
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
166
self.make_branch('.')
167
self.assertEqual(SmartServerResponse(('ok', '')),
168
request.execute(backing.local_abspath('')))
170
def test_branch_reference(self):
171
"""When there is a branch reference, the reference URL is returned."""
172
backing = self.get_transport()
173
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
174
branch = self.make_branch('branch')
175
checkout = branch.create_checkout('reference',lightweight=True)
176
# TODO: once we have an API to probe for references of any sort, we
178
reference_url = backing.abspath('branch') + '/'
179
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
180
self.assertEqual(SmartServerResponse(('ok', reference_url)),
181
request.execute(backing.local_abspath('reference')))
184
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithTransport):
186
def test_empty(self):
187
"""For an empty branch, the body is empty."""
188
backing = self.get_transport()
189
request = smart.branch.SmartServerRequestRevisionHistory(backing)
190
self.make_branch('.')
191
self.assertEqual(SmartServerResponse(('ok', ), ''),
192
request.execute(backing.local_abspath('')))
194
def test_not_empty(self):
195
"""For a non-empty branch, the body is empty."""
196
backing = self.get_transport()
197
request = smart.branch.SmartServerRequestRevisionHistory(backing)
198
tree = self.make_branch_and_memory_tree('.')
201
r1 = tree.commit('1st commit')
202
r2 = tree.commit('2nd commit', rev_id=u'\xc8')
205
SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
206
request.execute(backing.local_abspath('')))
209
class TestSmartServerBranchRequest(tests.TestCaseWithTransport):
211
def test_no_branch(self):
212
"""When there is a bzrdir and no branch, NotBranchError is raised."""
213
backing = self.get_transport()
214
request = smart.branch.SmartServerBranchRequest(backing)
215
self.make_bzrdir('.')
216
self.assertRaises(errors.NotBranchError,
217
request.execute, backing.local_abspath(''))
219
def test_branch_reference(self):
220
"""When there is a branch reference, NotBranchError is raised."""
221
backing = self.get_transport()
222
request = smart.branch.SmartServerBranchRequest(backing)
223
branch = self.make_branch('branch')
224
checkout = branch.create_checkout('reference',lightweight=True)
225
self.assertRaises(errors.NotBranchError,
226
request.execute, backing.local_abspath('checkout'))
229
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithTransport):
231
def test_empty(self):
232
"""For an empty branch, the result is ('ok', '0', '')."""
233
backing = self.get_transport()
234
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
235
self.make_branch('.')
236
self.assertEqual(SmartServerResponse(('ok', '0', '')),
237
request.execute(backing.local_abspath('')))
239
def test_not_empty(self):
240
"""For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
241
backing = self.get_transport()
242
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
243
tree = self.make_branch_and_memory_tree('.')
247
rev_id_utf8 = rev_id.encode('utf-8')
248
r1 = tree.commit('1st commit')
249
r2 = tree.commit('2nd commit', rev_id=rev_id)
252
SmartServerResponse(('ok', '2', rev_id_utf8)),
253
request.execute(backing.local_abspath('')))
256
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithTransport):
258
def test_default(self):
259
"""With no file, we get empty content."""
260
backing = self.get_transport()
261
request = smart.branch.SmartServerBranchGetConfigFile(backing)
262
branch = self.make_branch('.')
263
# there should be no file by default
265
self.assertEqual(SmartServerResponse(('ok', ), content),
266
request.execute(backing.local_abspath('')))
268
def test_with_content(self):
269
# SmartServerBranchGetConfigFile should return the content from
270
# branch.control_files.get('branch.conf') for now - in the future it may
271
# perform more complex processing.
272
backing = self.get_transport()
273
request = smart.branch.SmartServerBranchGetConfigFile(backing)
274
branch = self.make_branch('.')
275
branch.control_files.put_utf8('branch.conf', 'foo bar baz')
276
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
277
request.execute(backing.local_abspath('')))
280
class TestSmartServerBranchRequestSetLastRevision(tests.TestCaseWithTransport):
282
def test_empty(self):
283
backing = self.get_transport()
284
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
285
b = self.make_branch('.')
286
branch_token = b.lock_write()
287
repo_token = b.repository.lock_write()
288
b.repository.unlock()
290
self.assertEqual(SmartServerResponse(('ok',)),
292
backing.local_abspath(''), branch_token, repo_token, ''))
296
def test_not_present_revision_id(self):
297
backing = self.get_transport()
298
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
299
b = self.make_branch('.')
300
branch_token = b.lock_write()
301
repo_token = b.repository.lock_write()
302
b.repository.unlock()
304
revision_id = 'non-existent revision'
306
SmartServerResponse(('NoSuchRevision', revision_id)),
308
backing.local_abspath(''), branch_token, repo_token,
313
def test_revision_id_present(self):
314
backing = self.get_transport()
315
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
316
tree = self.make_branch_and_memory_tree('.')
320
rev_id_utf8 = rev_id.encode('utf-8')
321
r1 = tree.commit('1st commit', rev_id=rev_id)
322
r2 = tree.commit('2nd commit')
324
branch_token = tree.branch.lock_write()
325
repo_token = tree.branch.repository.lock_write()
326
tree.branch.repository.unlock()
329
SmartServerResponse(('ok',)),
331
backing.local_abspath(''), branch_token, repo_token,
333
self.assertEqual([rev_id_utf8], tree.branch.revision_history())
337
def test_revision_id_present2(self):
338
backing = self.get_transport()
339
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
340
tree = self.make_branch_and_memory_tree('.')
344
rev_id_utf8 = rev_id.encode('utf-8')
345
r1 = tree.commit('1st commit', rev_id=rev_id)
346
r2 = tree.commit('2nd commit')
348
tree.branch.set_revision_history([])
349
branch_token = tree.branch.lock_write()
350
repo_token = tree.branch.repository.lock_write()
351
tree.branch.repository.unlock()
354
SmartServerResponse(('ok',)),
356
backing.local_abspath(''), branch_token, repo_token,
358
self.assertEqual([rev_id_utf8], tree.branch.revision_history())
363
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithTransport):
366
tests.TestCaseWithTransport.setUp(self)
367
self.reduceLockdirTimeout()
369
def test_lock_write_on_unlocked_branch(self):
370
backing = self.get_transport()
371
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
372
branch = self.make_branch('.')
373
repository = branch.repository
374
response = request.execute(backing.local_abspath(''))
375
branch_nonce = branch.control_files._lock.peek().get('nonce')
376
repository_nonce = repository.control_files._lock.peek().get('nonce')
378
SmartServerResponse(('ok', branch_nonce, repository_nonce)),
380
# The branch (and associated repository) is now locked. Verify that
381
# with a new branch object.
382
new_branch = repository.bzrdir.open_branch()
383
self.assertRaises(errors.LockContention, new_branch.lock_write)
385
def test_lock_write_on_locked_branch(self):
386
backing = self.get_transport()
387
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
388
branch = self.make_branch('.')
390
branch.leave_lock_in_place()
392
response = request.execute(backing.local_abspath(''))
394
SmartServerResponse(('LockContention',)), response)
396
def test_lock_write_with_tokens_on_locked_branch(self):
397
backing = self.get_transport()
398
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
399
branch = self.make_branch('.')
400
branch_token = branch.lock_write()
401
repo_token = branch.repository.lock_write()
402
branch.repository.unlock()
403
branch.leave_lock_in_place()
404
branch.repository.leave_lock_in_place()
406
response = request.execute(backing.local_abspath(''),
407
branch_token, repo_token)
409
SmartServerResponse(('ok', branch_token, repo_token)), response)
411
def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
412
backing = self.get_transport()
413
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
414
branch = self.make_branch('.')
415
branch_token = branch.lock_write()
416
repo_token = branch.repository.lock_write()
417
branch.repository.unlock()
418
branch.leave_lock_in_place()
419
branch.repository.leave_lock_in_place()
421
response = request.execute(backing.local_abspath(''),
422
branch_token+'xxx', repo_token)
424
SmartServerResponse(('TokenMismatch',)), response)
426
def test_lock_write_on_locked_repo(self):
427
backing = self.get_transport()
428
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
429
branch = self.make_branch('.')
430
branch.repository.lock_write()
431
branch.repository.leave_lock_in_place()
432
branch.repository.unlock()
433
response = request.execute(backing.local_abspath(''))
435
SmartServerResponse(('LockContention',)), response)
437
def test_lock_write_on_readonly_transport(self):
438
backing = self.get_readonly_transport()
439
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
440
branch = self.make_branch('.')
441
response = request.execute('')
443
SmartServerResponse(('UnlockableTransport',)), response)
446
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithTransport):
449
tests.TestCaseWithTransport.setUp(self)
450
self.reduceLockdirTimeout()
452
def test_unlock_on_locked_branch_and_repo(self):
453
backing = self.get_transport()
454
request = smart.branch.SmartServerBranchRequestUnlock(backing)
455
branch = self.make_branch('.')
457
branch_token = branch.lock_write()
458
repo_token = branch.repository.lock_write()
459
branch.repository.unlock()
460
# Unlock the branch (and repo) object, leaving the physical locks
462
branch.leave_lock_in_place()
463
branch.repository.leave_lock_in_place()
465
response = request.execute(backing.local_abspath(''),
466
branch_token, repo_token)
468
SmartServerResponse(('ok',)), response)
469
# The branch is now unlocked. Verify that with a new branch
471
new_branch = branch.bzrdir.open_branch()
472
new_branch.lock_write()
475
def test_unlock_on_unlocked_branch_unlocked_repo(self):
476
backing = self.get_transport()
477
request = smart.branch.SmartServerBranchRequestUnlock(backing)
478
branch = self.make_branch('.')
479
response = request.execute(
480
backing.local_abspath(''), 'branch token', 'repo token')
482
SmartServerResponse(('TokenMismatch',)), response)
484
def test_unlock_on_unlocked_branch_locked_repo(self):
485
backing = self.get_transport()
486
request = smart.branch.SmartServerBranchRequestUnlock(backing)
487
branch = self.make_branch('.')
488
# Lock the repository.
489
repo_token = branch.repository.lock_write()
490
branch.repository.leave_lock_in_place()
491
branch.repository.unlock()
492
# Issue branch lock_write request on the unlocked branch (with locked
494
response = request.execute(
495
backing.local_abspath(''), 'branch token', repo_token)
497
SmartServerResponse(('TokenMismatch',)), response)
500
class TestSmartServerRepositoryRequest(tests.TestCaseWithTransport):
502
def test_no_repository(self):
503
"""Raise NoRepositoryPresent when there is a bzrdir and no repo."""
504
# we test this using a shared repository above the named path,
505
# thus checking the right search logic is used - that is, that
506
# its the exact path being looked at and the server is not
508
backing = self.get_transport()
509
request = smart.repository.SmartServerRepositoryRequest(backing)
510
self.make_repository('.', shared=True)
511
self.make_bzrdir('subdir')
512
self.assertRaises(errors.NoRepositoryPresent,
513
request.execute, backing.local_abspath('subdir'))
516
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithTransport):
518
def test_none_argument(self):
519
backing = self.get_transport()
520
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
521
tree = self.make_branch_and_memory_tree('.')
524
r1 = tree.commit('1st commit')
525
r2 = tree.commit('2nd commit', rev_id=u'\xc8')
528
# the lines of revision_id->revision_parent_list has no guaranteed
529
# order coming out of a dict, so sort both our test and response
530
lines = sorted([' '.join([r2, r1]), r1])
531
response = request.execute(backing.local_abspath(''), '')
532
response.body = '\n'.join(sorted(response.body.split('\n')))
535
SmartServerResponse(('ok', ), '\n'.join(lines)), response)
537
def test_specific_revision_argument(self):
538
backing = self.get_transport()
539
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
540
tree = self.make_branch_and_memory_tree('.')
543
r1 = tree.commit('1st commit', rev_id=u'\xc9')
544
r2 = tree.commit('2nd commit', rev_id=u'\xc8')
547
self.assertEqual(SmartServerResponse(('ok', ),
548
u'\xc9'.encode('utf8')),
549
request.execute(backing.local_abspath(''), u'\xc9'.encode('utf8')))
551
def test_no_such_revision(self):
552
backing = self.get_transport()
553
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
554
tree = self.make_branch_and_memory_tree('.')
557
r1 = tree.commit('1st commit')
560
# Note that it still returns body (of zero bytes).
562
SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
563
request.execute(backing.local_abspath(''), 'missingrevision'))
566
class TestSmartServerRequestHasRevision(tests.TestCaseWithTransport):
568
def test_missing_revision(self):
569
"""For a missing revision, ('no', ) is returned."""
570
backing = self.get_transport()
571
request = smart.repository.SmartServerRequestHasRevision(backing)
572
self.make_repository('.')
573
self.assertEqual(SmartServerResponse(('no', )),
574
request.execute(backing.local_abspath(''), 'revid'))
576
def test_present_revision(self):
577
"""For a present revision, ('ok', ) is returned."""
578
backing = self.get_transport()
579
request = smart.repository.SmartServerRequestHasRevision(backing)
580
tree = self.make_branch_and_memory_tree('.')
583
r1 = tree.commit('a commit', rev_id=u'\xc8abc')
585
self.assertTrue(tree.branch.repository.has_revision(u'\xc8abc'))
586
self.assertEqual(SmartServerResponse(('ok', )),
587
request.execute(backing.local_abspath(''),
588
u'\xc8abc'.encode('utf8')))
591
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithTransport):
593
def test_empty_revid(self):
594
"""With an empty revid, we get only size an number and revisions"""
595
backing = self.get_transport()
596
request = smart.repository.SmartServerRepositoryGatherStats(backing)
597
repository = self.make_repository('.')
598
stats = repository.gather_stats()
600
expected_body = 'revisions: 0\nsize: %d\n' % size
601
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
602
request.execute(backing.local_abspath(''), '', 'no'))
604
def test_revid_with_committers(self):
605
"""For a revid we get more infos."""
606
backing = self.get_transport()
608
request = smart.repository.SmartServerRepositoryGatherStats(backing)
609
tree = self.make_branch_and_memory_tree('.')
612
# Let's build a predictable result
613
tree.commit('a commit', timestamp=123456.2, timezone=3600)
614
tree.commit('a commit', timestamp=654321.4, timezone=0, rev_id=rev_id)
617
stats = tree.branch.repository.gather_stats()
619
expected_body = ('firstrev: 123456.200 3600\n'
620
'latestrev: 654321.400 0\n'
623
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
624
request.execute(backing.local_abspath(''),
625
rev_id.encode('utf8'), 'no'))
627
def test_not_empty_repository_with_committers(self):
628
"""For a revid and requesting committers we get the whole thing."""
629
backing = self.get_transport()
631
request = smart.repository.SmartServerRepositoryGatherStats(backing)
632
tree = self.make_branch_and_memory_tree('.')
635
# Let's build a predictable result
636
tree.commit('a commit', timestamp=123456.2, timezone=3600,
638
tree.commit('a commit', timestamp=654321.4, timezone=0,
639
committer='bar', rev_id=rev_id)
641
stats = tree.branch.repository.gather_stats()
644
expected_body = ('committers: 2\n'
645
'firstrev: 123456.200 3600\n'
646
'latestrev: 654321.400 0\n'
649
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
650
request.execute(backing.local_abspath(''),
651
rev_id.encode('utf8'), 'yes'))
654
class TestSmartServerRepositoryIsShared(tests.TestCaseWithTransport):
656
def test_is_shared(self):
657
"""For a shared repository, ('yes', ) is returned."""
658
backing = self.get_transport()
659
request = smart.repository.SmartServerRepositoryIsShared(backing)
660
self.make_repository('.', shared=True)
661
self.assertEqual(SmartServerResponse(('yes', )),
662
request.execute(backing.local_abspath(''), ))
664
def test_is_not_shared(self):
665
"""For a shared repository, ('no', ) is returned."""
666
backing = self.get_transport()
667
request = smart.repository.SmartServerRepositoryIsShared(backing)
668
self.make_repository('.', shared=False)
669
self.assertEqual(SmartServerResponse(('no', )),
670
request.execute(backing.local_abspath(''), ))
673
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithTransport):
676
tests.TestCaseWithTransport.setUp(self)
677
self.reduceLockdirTimeout()
679
def test_lock_write_on_unlocked_repo(self):
680
backing = self.get_transport()
681
request = smart.repository.SmartServerRepositoryLockWrite(backing)
682
repository = self.make_repository('.')
683
response = request.execute(backing.local_abspath(''))
684
nonce = repository.control_files._lock.peek().get('nonce')
685
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
686
# The repository is now locked. Verify that with a new repository
688
new_repo = repository.bzrdir.open_repository()
689
self.assertRaises(errors.LockContention, new_repo.lock_write)
691
def test_lock_write_on_locked_repo(self):
692
backing = self.get_transport()
693
request = smart.repository.SmartServerRepositoryLockWrite(backing)
694
repository = self.make_repository('.')
695
repository.lock_write()
696
repository.leave_lock_in_place()
698
response = request.execute(backing.local_abspath(''))
700
SmartServerResponse(('LockContention',)), response)
702
def test_lock_write_on_readonly_transport(self):
703
backing = self.get_readonly_transport()
704
request = smart.repository.SmartServerRepositoryLockWrite(backing)
705
repository = self.make_repository('.')
706
response = request.execute('')
708
SmartServerResponse(('UnlockableTransport',)), response)
711
class TestSmartServerRepositoryUnlock(tests.TestCaseWithTransport):
714
tests.TestCaseWithTransport.setUp(self)
715
self.reduceLockdirTimeout()
717
def test_unlock_on_locked_repo(self):
718
backing = self.get_transport()
719
request = smart.repository.SmartServerRepositoryUnlock(backing)
720
repository = self.make_repository('.')
721
token = repository.lock_write()
722
repository.leave_lock_in_place()
724
response = request.execute(backing.local_abspath(''), token)
726
SmartServerResponse(('ok',)), response)
727
# The repository is now unlocked. Verify that with a new repository
729
new_repo = repository.bzrdir.open_repository()
730
new_repo.lock_write()
733
def test_unlock_on_unlocked_repo(self):
734
backing = self.get_transport()
735
request = smart.repository.SmartServerRepositoryUnlock(backing)
736
repository = self.make_repository('.')
737
response = request.execute(backing.local_abspath(''), 'some token')
739
SmartServerResponse(('TokenMismatch',)), response)
742
class TestSmartServerIsReadonly(tests.TestCaseWithTransport):
744
def test_is_readonly_no(self):
745
backing = self.get_transport()
746
request = smart.request.SmartServerIsReadonly(backing)
747
response = request.execute()
749
SmartServerResponse(('no',)), response)
751
def test_is_readonly_yes(self):
752
backing = self.get_readonly_transport()
753
request = smart.request.SmartServerIsReadonly(backing)
754
response = request.execute()
756
SmartServerResponse(('yes',)), response)
759
class TestHandlers(tests.TestCase):
760
"""Tests for the request.request_handlers object."""
762
def test_registered_methods(self):
763
"""Test that known methods are registered to the correct object."""
765
smart.request.request_handlers.get('Branch.get_config_file'),
766
smart.branch.SmartServerBranchGetConfigFile)
768
smart.request.request_handlers.get('Branch.lock_write'),
769
smart.branch.SmartServerBranchRequestLockWrite)
771
smart.request.request_handlers.get('Branch.last_revision_info'),
772
smart.branch.SmartServerBranchRequestLastRevisionInfo)
774
smart.request.request_handlers.get('Branch.revision_history'),
775
smart.branch.SmartServerRequestRevisionHistory)
777
smart.request.request_handlers.get('Branch.set_last_revision'),
778
smart.branch.SmartServerBranchRequestSetLastRevision)
780
smart.request.request_handlers.get('Branch.unlock'),
781
smart.branch.SmartServerBranchRequestUnlock)
783
smart.request.request_handlers.get('BzrDir.find_repository'),
784
smart.bzrdir.SmartServerRequestFindRepository)
786
smart.request.request_handlers.get('BzrDirFormat.initialize'),
787
smart.bzrdir.SmartServerRequestInitializeBzrDir)
789
smart.request.request_handlers.get('BzrDir.open_branch'),
790
smart.bzrdir.SmartServerRequestOpenBranch)
792
smart.request.request_handlers.get('Repository.gather_stats'),
793
smart.repository.SmartServerRepositoryGatherStats)
795
smart.request.request_handlers.get('Repository.get_revision_graph'),
796
smart.repository.SmartServerRepositoryGetRevisionGraph)
798
smart.request.request_handlers.get('Repository.has_revision'),
799
smart.repository.SmartServerRequestHasRevision)
801
smart.request.request_handlers.get('Repository.is_shared'),
802
smart.repository.SmartServerRepositoryIsShared)
804
smart.request.request_handlers.get('Repository.lock_write'),
805
smart.repository.SmartServerRepositoryLockWrite)
807
smart.request.request_handlers.get('Repository.unlock'),
808
smart.repository.SmartServerRepositoryUnlock)
810
smart.request.request_handlers.get('Transport.is_readonly'),
811
smart.request.SmartServerIsReadonly)