1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the smart wire/domain protocol."""
19
from StringIO import StringIO
23
from bzrlib import bzrdir, errors, smart, tests
24
from bzrlib.smart.request import SmartServerResponse
25
import bzrlib.smart.bzrdir
26
import bzrlib.smart.branch
27
import bzrlib.smart.repository
30
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
33
super(TestCaseWithSmartMedium, self).setUp()
34
# We're allowed to set the transport class here, so that we don't use
35
# the default or a parameterized class, but rather use the
36
# TestCaseWithTransport infrastructure to set up a smart server and
38
self.transport_server = smart.server.SmartTCPServer_for_testing
40
def get_smart_medium(self):
41
"""Get a smart medium to use in tests."""
42
return self.get_transport().get_smart_medium()
45
class TestSmartServerResponse(tests.TestCase):
48
self.assertEqual(SmartServerResponse(('ok', )),
49
SmartServerResponse(('ok', )))
50
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
51
SmartServerResponse(('ok', ), 'body'))
52
self.assertNotEqual(SmartServerResponse(('ok', )),
53
SmartServerResponse(('notok', )))
54
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
55
SmartServerResponse(('ok', )))
56
self.assertNotEqual(None,
57
SmartServerResponse(('ok', )))
60
class TestSmartServerRequestFindRepository(tests.TestCaseWithTransport):
61
"""Tests for BzrDir.find_repository."""
63
def test_no_repository(self):
64
"""When there is no repository to be found, ('norepository', ) is returned."""
65
backing = self.get_transport()
66
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
68
self.assertEqual(SmartServerResponse(('norepository', )),
69
request.execute(backing.local_abspath('')))
71
def test_nonshared_repository(self):
72
# nonshared repositorys only allow 'find' to return a handle when the
73
# path the repository is being searched on is the same as that that
74
# the repository is at.
75
backing = self.get_transport()
76
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
77
result = self._make_repository_and_result()
78
self.assertEqual(result, request.execute(backing.local_abspath('')))
79
self.make_bzrdir('subdir')
80
self.assertEqual(SmartServerResponse(('norepository', )),
81
request.execute(backing.local_abspath('subdir')))
83
def _make_repository_and_result(self, shared=False, format=None):
84
"""Convenience function to setup a repository.
86
:result: The SmartServerResponse to expect when opening it.
88
repo = self.make_repository('.', shared=shared, format=format)
89
if repo.supports_rich_root():
93
if repo._format.supports_tree_reference:
97
return SmartServerResponse(('ok', '', rich_root, subtrees))
99
def test_shared_repository(self):
100
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
101
backing = self.get_transport()
102
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
103
result = self._make_repository_and_result(shared=True)
104
self.assertEqual(result, request.execute(backing.local_abspath('')))
105
self.make_bzrdir('subdir')
106
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
107
self.assertEqual(result2,
108
request.execute(backing.local_abspath('subdir')))
109
self.make_bzrdir('subdir/deeper')
110
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
111
self.assertEqual(result3,
112
request.execute(backing.local_abspath('subdir/deeper')))
114
def test_rich_root_and_subtree_encoding(self):
115
"""Test for the format attributes for rich root and subtree support."""
116
backing = self.get_transport()
117
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
118
result = self._make_repository_and_result(format='dirstate-with-subtree')
119
# check the test will be valid
120
self.assertEqual('yes', result.args[2])
121
self.assertEqual('yes', result.args[3])
122
self.assertEqual(result, request.execute(backing.local_abspath('')))
125
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithTransport):
127
def test_empty_dir(self):
128
"""Initializing an empty dir should succeed and do it."""
129
backing = self.get_transport()
130
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
131
self.assertEqual(SmartServerResponse(('ok', )),
132
request.execute(backing.local_abspath('.')))
133
made_dir = bzrdir.BzrDir.open_from_transport(backing)
134
# no branch, tree or repository is expected with the current
136
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
137
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
138
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
140
def test_missing_dir(self):
141
"""Initializing a missing directory should fail like the bzrdir api."""
142
backing = self.get_transport()
143
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
144
self.assertRaises(errors.NoSuchFile,
145
request.execute, backing.local_abspath('subdir'))
147
def test_initialized_dir(self):
148
"""Initializing an extant bzrdir should fail like the bzrdir api."""
149
backing = self.get_transport()
150
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
151
self.make_bzrdir('subdir')
152
self.assertRaises(errors.FileExists,
153
request.execute, backing.local_abspath('subdir'))
156
class TestSmartServerRequestOpenBranch(tests.TestCaseWithTransport):
158
def test_no_branch(self):
159
"""When there is no branch, ('nobranch', ) is returned."""
160
backing = self.get_transport()
161
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
162
self.make_bzrdir('.')
163
self.assertEqual(SmartServerResponse(('nobranch', )),
164
request.execute(backing.local_abspath('')))
166
def test_branch(self):
167
"""When there is a branch, 'ok' is returned."""
168
backing = self.get_transport()
169
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
170
self.make_branch('.')
171
self.assertEqual(SmartServerResponse(('ok', '')),
172
request.execute(backing.local_abspath('')))
174
def test_branch_reference(self):
175
"""When there is a branch reference, the reference URL is returned."""
176
backing = self.get_transport()
177
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
178
branch = self.make_branch('branch')
179
checkout = branch.create_checkout('reference',lightweight=True)
180
# TODO: once we have an API to probe for references of any sort, we
182
reference_url = backing.abspath('branch') + '/'
183
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
184
self.assertEqual(SmartServerResponse(('ok', reference_url)),
185
request.execute(backing.local_abspath('reference')))
188
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithTransport):
190
def test_empty(self):
191
"""For an empty branch, the body is empty."""
192
backing = self.get_transport()
193
request = smart.branch.SmartServerRequestRevisionHistory(backing)
194
self.make_branch('.')
195
self.assertEqual(SmartServerResponse(('ok', ), ''),
196
request.execute(backing.local_abspath('')))
198
def test_not_empty(self):
199
"""For a non-empty branch, the body is empty."""
200
backing = self.get_transport()
201
request = smart.branch.SmartServerRequestRevisionHistory(backing)
202
tree = self.make_branch_and_memory_tree('.')
205
r1 = tree.commit('1st commit')
206
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
209
SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
210
request.execute(backing.local_abspath('')))
213
class TestSmartServerBranchRequest(tests.TestCaseWithTransport):
215
def test_no_branch(self):
216
"""When there is a bzrdir and no branch, NotBranchError is raised."""
217
backing = self.get_transport()
218
request = smart.branch.SmartServerBranchRequest(backing)
219
self.make_bzrdir('.')
220
self.assertRaises(errors.NotBranchError,
221
request.execute, backing.local_abspath(''))
223
def test_branch_reference(self):
224
"""When there is a branch reference, NotBranchError is raised."""
225
backing = self.get_transport()
226
request = smart.branch.SmartServerBranchRequest(backing)
227
branch = self.make_branch('branch')
228
checkout = branch.create_checkout('reference',lightweight=True)
229
self.assertRaises(errors.NotBranchError,
230
request.execute, backing.local_abspath('checkout'))
233
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithTransport):
235
def test_empty(self):
236
"""For an empty branch, the result is ('ok', '0', 'null:')."""
237
backing = self.get_transport()
238
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
239
self.make_branch('.')
240
self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
241
request.execute(backing.local_abspath('')))
243
def test_not_empty(self):
244
"""For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
245
backing = self.get_transport()
246
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
247
tree = self.make_branch_and_memory_tree('.')
250
rev_id_utf8 = u'\xc8'.encode('utf-8')
251
r1 = tree.commit('1st commit')
252
r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
255
SmartServerResponse(('ok', '2', rev_id_utf8)),
256
request.execute(backing.local_abspath('')))
259
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithTransport):
261
def test_default(self):
262
"""With no file, we get empty content."""
263
backing = self.get_transport()
264
request = smart.branch.SmartServerBranchGetConfigFile(backing)
265
branch = self.make_branch('.')
266
# there should be no file by default
268
self.assertEqual(SmartServerResponse(('ok', ), content),
269
request.execute(backing.local_abspath('')))
271
def test_with_content(self):
272
# SmartServerBranchGetConfigFile should return the content from
273
# branch.control_files.get('branch.conf') for now - in the future it may
274
# perform more complex processing.
275
backing = self.get_transport()
276
request = smart.branch.SmartServerBranchGetConfigFile(backing)
277
branch = self.make_branch('.')
278
branch.control_files.put_utf8('branch.conf', 'foo bar baz')
279
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
280
request.execute(backing.local_abspath('')))
283
class TestSmartServerBranchRequestSetLastRevision(tests.TestCaseWithTransport):
285
def test_empty(self):
286
backing = self.get_transport()
287
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
288
b = self.make_branch('.')
289
branch_token = b.lock_write()
290
repo_token = b.repository.lock_write()
291
b.repository.unlock()
293
self.assertEqual(SmartServerResponse(('ok',)),
295
backing.local_abspath(''), branch_token, repo_token,
300
def test_not_present_revision_id(self):
301
backing = self.get_transport()
302
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
303
b = self.make_branch('.')
304
branch_token = b.lock_write()
305
repo_token = b.repository.lock_write()
306
b.repository.unlock()
308
revision_id = 'non-existent revision'
310
SmartServerResponse(('NoSuchRevision', revision_id)),
312
backing.local_abspath(''), branch_token, repo_token,
317
def test_revision_id_present(self):
318
backing = self.get_transport()
319
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
320
tree = self.make_branch_and_memory_tree('.')
323
rev_id_utf8 = u'\xc8'.encode('utf-8')
324
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
325
r2 = tree.commit('2nd commit')
327
branch_token = tree.branch.lock_write()
328
repo_token = tree.branch.repository.lock_write()
329
tree.branch.repository.unlock()
332
SmartServerResponse(('ok',)),
334
backing.local_abspath(''), branch_token, repo_token,
336
self.assertEqual([rev_id_utf8], tree.branch.revision_history())
340
def test_revision_id_present2(self):
341
backing = self.get_transport()
342
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
343
tree = self.make_branch_and_memory_tree('.')
346
rev_id_utf8 = u'\xc8'.encode('utf-8')
347
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
348
r2 = tree.commit('2nd commit')
350
tree.branch.set_revision_history([])
351
branch_token = tree.branch.lock_write()
352
repo_token = tree.branch.repository.lock_write()
353
tree.branch.repository.unlock()
356
SmartServerResponse(('ok',)),
358
backing.local_abspath(''), branch_token, repo_token,
360
self.assertEqual([rev_id_utf8], tree.branch.revision_history())
365
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithTransport):
368
tests.TestCaseWithTransport.setUp(self)
369
self.reduceLockdirTimeout()
371
def test_lock_write_on_unlocked_branch(self):
372
backing = self.get_transport()
373
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
374
branch = self.make_branch('.')
375
repository = branch.repository
376
response = request.execute(backing.local_abspath(''))
377
branch_nonce = branch.control_files._lock.peek().get('nonce')
378
repository_nonce = repository.control_files._lock.peek().get('nonce')
380
SmartServerResponse(('ok', branch_nonce, repository_nonce)),
382
# The branch (and associated repository) is now locked. Verify that
383
# with a new branch object.
384
new_branch = repository.bzrdir.open_branch()
385
self.assertRaises(errors.LockContention, new_branch.lock_write)
387
def test_lock_write_on_locked_branch(self):
388
backing = self.get_transport()
389
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
390
branch = self.make_branch('.')
392
branch.leave_lock_in_place()
394
response = request.execute(backing.local_abspath(''))
396
SmartServerResponse(('LockContention',)), response)
398
def test_lock_write_with_tokens_on_locked_branch(self):
399
backing = self.get_transport()
400
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
401
branch = self.make_branch('.')
402
branch_token = branch.lock_write()
403
repo_token = branch.repository.lock_write()
404
branch.repository.unlock()
405
branch.leave_lock_in_place()
406
branch.repository.leave_lock_in_place()
408
response = request.execute(backing.local_abspath(''),
409
branch_token, repo_token)
411
SmartServerResponse(('ok', branch_token, repo_token)), response)
413
def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
414
backing = self.get_transport()
415
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
416
branch = self.make_branch('.')
417
branch_token = branch.lock_write()
418
repo_token = branch.repository.lock_write()
419
branch.repository.unlock()
420
branch.leave_lock_in_place()
421
branch.repository.leave_lock_in_place()
423
response = request.execute(backing.local_abspath(''),
424
branch_token+'xxx', repo_token)
426
SmartServerResponse(('TokenMismatch',)), response)
428
def test_lock_write_on_locked_repo(self):
429
backing = self.get_transport()
430
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
431
branch = self.make_branch('.')
432
branch.repository.lock_write()
433
branch.repository.leave_lock_in_place()
434
branch.repository.unlock()
435
response = request.execute(backing.local_abspath(''))
437
SmartServerResponse(('LockContention',)), response)
439
def test_lock_write_on_readonly_transport(self):
440
backing = self.get_readonly_transport()
441
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
442
branch = self.make_branch('.')
443
response = request.execute('')
445
SmartServerResponse(('UnlockableTransport',)), response)
448
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithTransport):
451
tests.TestCaseWithTransport.setUp(self)
452
self.reduceLockdirTimeout()
454
def test_unlock_on_locked_branch_and_repo(self):
455
backing = self.get_transport()
456
request = smart.branch.SmartServerBranchRequestUnlock(backing)
457
branch = self.make_branch('.')
459
branch_token = branch.lock_write()
460
repo_token = branch.repository.lock_write()
461
branch.repository.unlock()
462
# Unlock the branch (and repo) object, leaving the physical locks
464
branch.leave_lock_in_place()
465
branch.repository.leave_lock_in_place()
467
response = request.execute(backing.local_abspath(''),
468
branch_token, repo_token)
470
SmartServerResponse(('ok',)), response)
471
# The branch is now unlocked. Verify that with a new branch
473
new_branch = branch.bzrdir.open_branch()
474
new_branch.lock_write()
477
def test_unlock_on_unlocked_branch_unlocked_repo(self):
478
backing = self.get_transport()
479
request = smart.branch.SmartServerBranchRequestUnlock(backing)
480
branch = self.make_branch('.')
481
response = request.execute(
482
backing.local_abspath(''), 'branch token', 'repo token')
484
SmartServerResponse(('TokenMismatch',)), response)
486
def test_unlock_on_unlocked_branch_locked_repo(self):
487
backing = self.get_transport()
488
request = smart.branch.SmartServerBranchRequestUnlock(backing)
489
branch = self.make_branch('.')
490
# Lock the repository.
491
repo_token = branch.repository.lock_write()
492
branch.repository.leave_lock_in_place()
493
branch.repository.unlock()
494
# Issue branch lock_write request on the unlocked branch (with locked
496
response = request.execute(
497
backing.local_abspath(''), 'branch token', repo_token)
499
SmartServerResponse(('TokenMismatch',)), response)
502
class TestSmartServerRepositoryRequest(tests.TestCaseWithTransport):
504
def test_no_repository(self):
505
"""Raise NoRepositoryPresent when there is a bzrdir and no repo."""
506
# we test this using a shared repository above the named path,
507
# thus checking the right search logic is used - that is, that
508
# its the exact path being looked at and the server is not
510
backing = self.get_transport()
511
request = smart.repository.SmartServerRepositoryRequest(backing)
512
self.make_repository('.', shared=True)
513
self.make_bzrdir('subdir')
514
self.assertRaises(errors.NoRepositoryPresent,
515
request.execute, backing.local_abspath('subdir'))
518
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithTransport):
520
def test_none_argument(self):
521
backing = self.get_transport()
522
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
523
tree = self.make_branch_and_memory_tree('.')
526
r1 = tree.commit('1st commit')
527
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
530
# the lines of revision_id->revision_parent_list has no guaranteed
531
# order coming out of a dict, so sort both our test and response
532
lines = sorted([' '.join([r2, r1]), r1])
533
response = request.execute(backing.local_abspath(''), '')
534
response.body = '\n'.join(sorted(response.body.split('\n')))
537
SmartServerResponse(('ok', ), '\n'.join(lines)), response)
539
def test_specific_revision_argument(self):
540
backing = self.get_transport()
541
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
542
tree = self.make_branch_and_memory_tree('.')
545
rev_id_utf8 = u'\xc9'.encode('utf-8')
546
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
547
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
550
self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
551
request.execute(backing.local_abspath(''), rev_id_utf8))
553
def test_no_such_revision(self):
554
backing = self.get_transport()
555
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
556
tree = self.make_branch_and_memory_tree('.')
559
r1 = tree.commit('1st commit')
562
# Note that it still returns body (of zero bytes).
564
SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
565
request.execute(backing.local_abspath(''), 'missingrevision'))
568
class TestSmartServerRequestHasRevision(tests.TestCaseWithTransport):
570
def test_missing_revision(self):
571
"""For a missing revision, ('no', ) is returned."""
572
backing = self.get_transport()
573
request = smart.repository.SmartServerRequestHasRevision(backing)
574
self.make_repository('.')
575
self.assertEqual(SmartServerResponse(('no', )),
576
request.execute(backing.local_abspath(''), 'revid'))
578
def test_present_revision(self):
579
"""For a present revision, ('yes', ) is returned."""
580
backing = self.get_transport()
581
request = smart.repository.SmartServerRequestHasRevision(backing)
582
tree = self.make_branch_and_memory_tree('.')
585
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
586
r1 = tree.commit('a commit', rev_id=rev_id_utf8)
588
self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
589
self.assertEqual(SmartServerResponse(('yes', )),
590
request.execute(backing.local_abspath(''), rev_id_utf8))
593
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithTransport):
595
def test_empty_revid(self):
596
"""With an empty revid, we get only size an number and revisions"""
597
backing = self.get_transport()
598
request = smart.repository.SmartServerRepositoryGatherStats(backing)
599
repository = self.make_repository('.')
600
stats = repository.gather_stats()
602
expected_body = 'revisions: 0\nsize: %d\n' % size
603
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
604
request.execute(backing.local_abspath(''), '', 'no'))
606
def test_revid_with_committers(self):
607
"""For a revid we get more infos."""
608
backing = self.get_transport()
609
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
610
request = smart.repository.SmartServerRepositoryGatherStats(backing)
611
tree = self.make_branch_and_memory_tree('.')
614
# Let's build a predictable result
615
tree.commit('a commit', timestamp=123456.2, timezone=3600)
616
tree.commit('a commit', timestamp=654321.4, timezone=0,
620
stats = tree.branch.repository.gather_stats()
622
expected_body = ('firstrev: 123456.200 3600\n'
623
'latestrev: 654321.400 0\n'
626
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
627
request.execute(backing.local_abspath(''),
630
def test_not_empty_repository_with_committers(self):
631
"""For a revid and requesting committers we get the whole thing."""
632
backing = self.get_transport()
633
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
634
request = smart.repository.SmartServerRepositoryGatherStats(backing)
635
tree = self.make_branch_and_memory_tree('.')
638
# Let's build a predictable result
639
tree.commit('a commit', timestamp=123456.2, timezone=3600,
641
tree.commit('a commit', timestamp=654321.4, timezone=0,
642
committer='bar', rev_id=rev_id_utf8)
644
stats = tree.branch.repository.gather_stats()
647
expected_body = ('committers: 2\n'
648
'firstrev: 123456.200 3600\n'
649
'latestrev: 654321.400 0\n'
652
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
653
request.execute(backing.local_abspath(''),
657
class TestSmartServerRepositoryIsShared(tests.TestCaseWithTransport):
659
def test_is_shared(self):
660
"""For a shared repository, ('yes', ) is returned."""
661
backing = self.get_transport()
662
request = smart.repository.SmartServerRepositoryIsShared(backing)
663
self.make_repository('.', shared=True)
664
self.assertEqual(SmartServerResponse(('yes', )),
665
request.execute(backing.local_abspath(''), ))
667
def test_is_not_shared(self):
668
"""For a shared repository, ('no', ) is returned."""
669
backing = self.get_transport()
670
request = smart.repository.SmartServerRepositoryIsShared(backing)
671
self.make_repository('.', shared=False)
672
self.assertEqual(SmartServerResponse(('no', )),
673
request.execute(backing.local_abspath(''), ))
676
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithTransport):
679
tests.TestCaseWithTransport.setUp(self)
680
self.reduceLockdirTimeout()
682
def test_lock_write_on_unlocked_repo(self):
683
backing = self.get_transport()
684
request = smart.repository.SmartServerRepositoryLockWrite(backing)
685
repository = self.make_repository('.')
686
response = request.execute(backing.local_abspath(''))
687
nonce = repository.control_files._lock.peek().get('nonce')
688
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
689
# The repository is now locked. Verify that with a new repository
691
new_repo = repository.bzrdir.open_repository()
692
self.assertRaises(errors.LockContention, new_repo.lock_write)
694
def test_lock_write_on_locked_repo(self):
695
backing = self.get_transport()
696
request = smart.repository.SmartServerRepositoryLockWrite(backing)
697
repository = self.make_repository('.')
698
repository.lock_write()
699
repository.leave_lock_in_place()
701
response = request.execute(backing.local_abspath(''))
703
SmartServerResponse(('LockContention',)), response)
705
def test_lock_write_on_readonly_transport(self):
706
backing = self.get_readonly_transport()
707
request = smart.repository.SmartServerRepositoryLockWrite(backing)
708
repository = self.make_repository('.')
709
response = request.execute('')
711
SmartServerResponse(('UnlockableTransport',)), response)
714
class TestSmartServerRepositoryUnlock(tests.TestCaseWithTransport):
717
tests.TestCaseWithTransport.setUp(self)
718
self.reduceLockdirTimeout()
720
def test_unlock_on_locked_repo(self):
721
backing = self.get_transport()
722
request = smart.repository.SmartServerRepositoryUnlock(backing)
723
repository = self.make_repository('.')
724
token = repository.lock_write()
725
repository.leave_lock_in_place()
727
response = request.execute(backing.local_abspath(''), token)
729
SmartServerResponse(('ok',)), response)
730
# The repository is now unlocked. Verify that with a new repository
732
new_repo = repository.bzrdir.open_repository()
733
new_repo.lock_write()
736
def test_unlock_on_unlocked_repo(self):
737
backing = self.get_transport()
738
request = smart.repository.SmartServerRepositoryUnlock(backing)
739
repository = self.make_repository('.')
740
response = request.execute(backing.local_abspath(''), 'some token')
742
SmartServerResponse(('TokenMismatch',)), response)
745
class TestSmartServerRepositoryTarball(tests.TestCaseWithTransport):
747
def test_repository_tarball(self):
748
backing = self.get_transport()
749
request = smart.repository.SmartServerRepositoryTarball(backing)
750
repository = self.make_repository('.')
751
# make some extraneous junk in the repository directory which should
753
self.build_tree(['.bzr/repository/extra-junk'])
754
response = request.execute(backing.local_abspath(''), 'bz2')
755
self.assertEqual(('ok',), response.args)
756
# body should be a tbz2
757
body_file = StringIO(response.body)
758
body_tar = tarfile.open('body_tar.tbz2', fileobj=body_file,
760
# let's make sure there are some key repository components inside it.
761
# the tarfile returns directories with trailing slashes...
762
names = set([n.rstrip('/') for n in body_tar.getnames()])
763
self.assertTrue('.bzr/repository/lock' in names)
764
self.assertTrue('.bzr/repository/format' in names)
765
self.assertTrue('.bzr/repository/extra-junk' not in names,
766
"extraneous file present in tar file")
769
class TestSmartServerIsReadonly(tests.TestCaseWithTransport):
771
def test_is_readonly_no(self):
772
backing = self.get_transport()
773
request = smart.request.SmartServerIsReadonly(backing)
774
response = request.execute()
776
SmartServerResponse(('no',)), response)
778
def test_is_readonly_yes(self):
779
backing = self.get_readonly_transport()
780
request = smart.request.SmartServerIsReadonly(backing)
781
response = request.execute()
783
SmartServerResponse(('yes',)), response)
786
class TestHandlers(tests.TestCase):
787
"""Tests for the request.request_handlers object."""
789
def test_registered_methods(self):
790
"""Test that known methods are registered to the correct object."""
792
smart.request.request_handlers.get('Branch.get_config_file'),
793
smart.branch.SmartServerBranchGetConfigFile)
795
smart.request.request_handlers.get('Branch.lock_write'),
796
smart.branch.SmartServerBranchRequestLockWrite)
798
smart.request.request_handlers.get('Branch.last_revision_info'),
799
smart.branch.SmartServerBranchRequestLastRevisionInfo)
801
smart.request.request_handlers.get('Branch.revision_history'),
802
smart.branch.SmartServerRequestRevisionHistory)
804
smart.request.request_handlers.get('Branch.set_last_revision'),
805
smart.branch.SmartServerBranchRequestSetLastRevision)
807
smart.request.request_handlers.get('Branch.unlock'),
808
smart.branch.SmartServerBranchRequestUnlock)
810
smart.request.request_handlers.get('BzrDir.find_repository'),
811
smart.bzrdir.SmartServerRequestFindRepository)
813
smart.request.request_handlers.get('BzrDirFormat.initialize'),
814
smart.bzrdir.SmartServerRequestInitializeBzrDir)
816
smart.request.request_handlers.get('BzrDir.open_branch'),
817
smart.bzrdir.SmartServerRequestOpenBranch)
819
smart.request.request_handlers.get('Repository.gather_stats'),
820
smart.repository.SmartServerRepositoryGatherStats)
822
smart.request.request_handlers.get('Repository.get_revision_graph'),
823
smart.repository.SmartServerRepositoryGetRevisionGraph)
825
smart.request.request_handlers.get('Repository.has_revision'),
826
smart.repository.SmartServerRequestHasRevision)
828
smart.request.request_handlers.get('Repository.is_shared'),
829
smart.repository.SmartServerRepositoryIsShared)
831
smart.request.request_handlers.get('Repository.lock_write'),
832
smart.repository.SmartServerRepositoryLockWrite)
834
smart.request.request_handlers.get('Repository.unlock'),
835
smart.repository.SmartServerRepositoryUnlock)
837
smart.request.request_handlers.get('Repository.tarball'),
838
smart.repository.SmartServerRepositoryTarball)
840
smart.request.request_handlers.get('Transport.is_readonly'),
841
smart.request.SmartServerIsReadonly)