~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_remote.py

Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for remote bzrdir/branch/repo/etc
 
18
 
 
19
These are proxy objects which act on remote objects by sending messages
 
20
through a smart client.  The proxies are to be created when attempting to open
 
21
the object given a transport that supports smartserver rpc operations. 
 
22
"""
 
23
 
 
24
from cStringIO import StringIO
 
25
 
 
26
from bzrlib import (
 
27
    errors,
 
28
    remote,
 
29
    tests,
 
30
    )
 
31
from bzrlib.branch import Branch
 
32
from bzrlib.bzrdir import BzrDir, BzrDirFormat
 
33
from bzrlib.remote import (
 
34
    RemoteBranch,
 
35
    RemoteBzrDir,
 
36
    RemoteBzrDirFormat,
 
37
    RemoteRepository,
 
38
    )
 
39
from bzrlib.revision import NULL_REVISION
 
40
from bzrlib.smart import server
 
41
from bzrlib.smart.client import _SmartClient
 
42
from bzrlib.transport.memory import MemoryTransport
 
43
 
 
44
 
 
45
class BasicRemoteObjectTests(tests.TestCaseWithTransport):
 
46
 
 
47
    def setUp(self):
 
48
        self.transport_server = server.SmartTCPServer_for_testing
 
49
        super(BasicRemoteObjectTests, self).setUp()
 
50
        self.transport = self.get_transport()
 
51
        self.client = self.transport.get_smart_client()
 
52
        # make a branch that can be opened over the smart transport
 
53
        self.local_wt = BzrDir.create_standalone_workingtree('.')
 
54
 
 
55
    def tearDown(self):
 
56
        self.transport.disconnect()
 
57
        tests.TestCaseWithTransport.tearDown(self)
 
58
 
 
59
    def test_is_readonly(self):
 
60
        # XXX: this is a poor way to test RemoteTransport, but currently there's
 
61
        # no easy way to substitute in a fake client on a transport like we can
 
62
        # with RemoteBzrDir/Branch/Repository.
 
63
        self.assertEqual(self.transport.is_readonly(), False)
 
64
 
 
65
    def test_create_remote_bzrdir(self):
 
66
        b = remote.RemoteBzrDir(self.transport)
 
67
        self.assertIsInstance(b, BzrDir)
 
68
 
 
69
    def test_open_remote_branch(self):
 
70
        # open a standalone branch in the working directory
 
71
        b = remote.RemoteBzrDir(self.transport)
 
72
        branch = b.open_branch()
 
73
        self.assertIsInstance(branch, Branch)
 
74
 
 
75
    def test_remote_repository(self):
 
76
        b = BzrDir.open_from_transport(self.transport)
 
77
        repo = b.open_repository()
 
78
        revid = u'\xc823123123'.encode('utf8')
 
79
        self.assertFalse(repo.has_revision(revid))
 
80
        self.local_wt.commit(message='test commit', rev_id=revid)
 
81
        self.assertTrue(repo.has_revision(revid))
 
82
 
 
83
    def test_remote_branch_revision_history(self):
 
84
        b = BzrDir.open_from_transport(self.transport).open_branch()
 
85
        self.assertEqual([], b.revision_history())
 
86
        r1 = self.local_wt.commit('1st commit')
 
87
        r2 = self.local_wt.commit('1st commit', rev_id=u'\xc8'.encode('utf8'))
 
88
        self.assertEqual([r1, r2], b.revision_history())
 
89
 
 
90
    def test_find_correct_format(self):
 
91
        """Should open a RemoteBzrDir over a RemoteTransport"""
 
92
        fmt = BzrDirFormat.find_format(self.transport)
 
93
        self.assertTrue(RemoteBzrDirFormat
 
94
                        in BzrDirFormat._control_server_formats)
 
95
        self.assertIsInstance(fmt, remote.RemoteBzrDirFormat)
 
96
 
 
97
    def test_open_detected_smart_format(self):
 
98
        fmt = BzrDirFormat.find_format(self.transport)
 
99
        d = fmt.open(self.transport)
 
100
        self.assertIsInstance(d, BzrDir)
 
101
 
 
102
 
 
103
class ReadonlyRemoteTransportTests(tests.TestCaseWithTransport):
 
104
 
 
105
    def setUp(self):
 
106
        self.transport_server = server.ReadonlySmartTCPServer_for_testing
 
107
        super(ReadonlyRemoteTransportTests, self).setUp()
 
108
 
 
109
    def test_is_readonly_yes(self):
 
110
        # XXX: this is a poor way to test RemoteTransport, but currently there's
 
111
        # no easy way to substitute in a fake client on a transport like we can
 
112
        # with RemoteBzrDir/Branch/Repository.
 
113
        transport = self.get_readonly_transport()
 
114
        self.assertEqual(transport.is_readonly(), True)
 
115
 
 
116
 
 
117
class FakeProtocol(object):
 
118
    """Lookalike SmartClientRequestProtocolOne allowing body reading tests."""
 
119
 
 
120
    def __init__(self, body):
 
121
        self._body_buffer = StringIO(body)
 
122
 
 
123
    def read_body_bytes(self, count=-1):
 
124
        return self._body_buffer.read(count)
 
125
 
 
126
 
 
127
class FakeClient(_SmartClient):
 
128
    """Lookalike for _SmartClient allowing testing."""
 
129
    
 
130
    def __init__(self, responses):
 
131
        # We don't call the super init because there is no medium.
 
132
        """create a FakeClient.
 
133
 
 
134
        :param respones: A list of response-tuple, body-data pairs to be sent
 
135
            back to callers.
 
136
        """
 
137
        self.responses = responses
 
138
        self._calls = []
 
139
 
 
140
    def call(self, method, *args):
 
141
        self._calls.append(('call', method, args))
 
142
        return self.responses.pop(0)[0]
 
143
 
 
144
    def call_expecting_body(self, method, *args):
 
145
        self._calls.append(('call_expecting_body', method, args))
 
146
        result = self.responses.pop(0)
 
147
        return result[0], FakeProtocol(result[1])
 
148
 
 
149
 
 
150
class TestBzrDirOpenBranch(tests.TestCase):
 
151
 
 
152
    def test_branch_present(self):
 
153
        client = FakeClient([(('ok', ''), ), (('ok', '', 'no', 'no'), )])
 
154
        transport = MemoryTransport()
 
155
        transport.mkdir('quack')
 
156
        transport = transport.clone('quack')
 
157
        bzrdir = RemoteBzrDir(transport, _client=client)
 
158
        result = bzrdir.open_branch()
 
159
        self.assertEqual(
 
160
            [('call', 'BzrDir.open_branch', ('///quack/',)),
 
161
             ('call', 'BzrDir.find_repository', ('///quack/',))],
 
162
            client._calls)
 
163
        self.assertIsInstance(result, RemoteBranch)
 
164
        self.assertEqual(bzrdir, result.bzrdir)
 
165
 
 
166
    def test_branch_missing(self):
 
167
        client = FakeClient([(('nobranch',), )])
 
168
        transport = MemoryTransport()
 
169
        transport.mkdir('quack')
 
170
        transport = transport.clone('quack')
 
171
        bzrdir = RemoteBzrDir(transport, _client=client)
 
172
        self.assertRaises(errors.NotBranchError, bzrdir.open_branch)
 
173
        self.assertEqual(
 
174
            [('call', 'BzrDir.open_branch', ('///quack/',))],
 
175
            client._calls)
 
176
 
 
177
    def check_open_repository(self, rich_root, subtrees):
 
178
        if rich_root:
 
179
            rich_response = 'yes'
 
180
        else:
 
181
            rich_response = 'no'
 
182
        if subtrees:
 
183
            subtree_response = 'yes'
 
184
        else:
 
185
            subtree_response = 'no'
 
186
        client = FakeClient([(('ok', '', rich_response, subtree_response), ),])
 
187
        transport = MemoryTransport()
 
188
        transport.mkdir('quack')
 
189
        transport = transport.clone('quack')
 
190
        bzrdir = RemoteBzrDir(transport, _client=client)
 
191
        result = bzrdir.open_repository()
 
192
        self.assertEqual(
 
193
            [('call', 'BzrDir.find_repository', ('///quack/',))],
 
194
            client._calls)
 
195
        self.assertIsInstance(result, RemoteRepository)
 
196
        self.assertEqual(bzrdir, result.bzrdir)
 
197
        self.assertEqual(rich_root, result._format.rich_root_data)
 
198
        self.assertEqual(subtrees, result._format.supports_tree_reference)
 
199
 
 
200
    def test_open_repository_sets_format_attributes(self):
 
201
        self.check_open_repository(True, True)
 
202
        self.check_open_repository(False, True)
 
203
        self.check_open_repository(True, False)
 
204
        self.check_open_repository(False, False)
 
205
 
 
206
 
 
207
class TestBranchLastRevisionInfo(tests.TestCase):
 
208
 
 
209
    def test_empty_branch(self):
 
210
        # in an empty branch we decode the response properly
 
211
        client = FakeClient([(('ok', '0', 'null:'), )])
 
212
        transport = MemoryTransport()
 
213
        transport.mkdir('quack')
 
214
        transport = transport.clone('quack')
 
215
        # we do not want bzrdir to make any remote calls
 
216
        bzrdir = RemoteBzrDir(transport, _client=False)
 
217
        branch = RemoteBranch(bzrdir, None, _client=client)
 
218
        result = branch.last_revision_info()
 
219
 
 
220
        self.assertEqual(
 
221
            [('call', 'Branch.last_revision_info', ('///quack/',))],
 
222
            client._calls)
 
223
        self.assertEqual((0, NULL_REVISION), result)
 
224
 
 
225
    def test_non_empty_branch(self):
 
226
        # in a non-empty branch we also decode the response properly
 
227
        revid = u'\xc8'.encode('utf8')
 
228
        client = FakeClient([(('ok', '2', revid), )])
 
229
        transport = MemoryTransport()
 
230
        transport.mkdir('kwaak')
 
231
        transport = transport.clone('kwaak')
 
232
        # we do not want bzrdir to make any remote calls
 
233
        bzrdir = RemoteBzrDir(transport, _client=False)
 
234
        branch = RemoteBranch(bzrdir, None, _client=client)
 
235
        result = branch.last_revision_info()
 
236
 
 
237
        self.assertEqual(
 
238
            [('call', 'Branch.last_revision_info', ('///kwaak/',))],
 
239
            client._calls)
 
240
        self.assertEqual((2, revid), result)
 
241
 
 
242
 
 
243
class TestBranchSetLastRevision(tests.TestCase):
 
244
 
 
245
    def test_set_empty(self):
 
246
        # set_revision_history([]) is translated to calling
 
247
        # Branch.set_last_revision(path, '') on the wire.
 
248
        client = FakeClient([
 
249
            # lock_write
 
250
            (('ok', 'branch token', 'repo token'), ),
 
251
            # set_last_revision
 
252
            (('ok',), ),
 
253
            # unlock
 
254
            (('ok',), )])
 
255
        transport = MemoryTransport()
 
256
        transport.mkdir('branch')
 
257
        transport = transport.clone('branch')
 
258
 
 
259
        bzrdir = RemoteBzrDir(transport, _client=False)
 
260
        branch = RemoteBranch(bzrdir, None, _client=client)
 
261
        # This is a hack to work around the problem that RemoteBranch currently
 
262
        # unnecessarily invokes _ensure_real upon a call to lock_write.
 
263
        branch._ensure_real = lambda: None
 
264
        branch.lock_write()
 
265
        client._calls = []
 
266
        result = branch.set_revision_history([])
 
267
        self.assertEqual(
 
268
            [('call', 'Branch.set_last_revision',
 
269
                ('///branch/', 'branch token', 'repo token', 'null:'))],
 
270
            client._calls)
 
271
        branch.unlock()
 
272
        self.assertEqual(None, result)
 
273
 
 
274
    def test_set_nonempty(self):
 
275
        # set_revision_history([rev-id1, ..., rev-idN]) is translated to calling
 
276
        # Branch.set_last_revision(path, rev-idN) on the wire.
 
277
        client = FakeClient([
 
278
            # lock_write
 
279
            (('ok', 'branch token', 'repo token'), ),
 
280
            # set_last_revision
 
281
            (('ok',), ),
 
282
            # unlock
 
283
            (('ok',), )])
 
284
        transport = MemoryTransport()
 
285
        transport.mkdir('branch')
 
286
        transport = transport.clone('branch')
 
287
 
 
288
        bzrdir = RemoteBzrDir(transport, _client=False)
 
289
        branch = RemoteBranch(bzrdir, None, _client=client)
 
290
        # This is a hack to work around the problem that RemoteBranch currently
 
291
        # unnecessarily invokes _ensure_real upon a call to lock_write.
 
292
        branch._ensure_real = lambda: None
 
293
        # Lock the branch, reset the record of remote calls.
 
294
        branch.lock_write()
 
295
        client._calls = []
 
296
 
 
297
        result = branch.set_revision_history(['rev-id1', 'rev-id2'])
 
298
        self.assertEqual(
 
299
            [('call', 'Branch.set_last_revision',
 
300
                ('///branch/', 'branch token', 'repo token', 'rev-id2'))],
 
301
            client._calls)
 
302
        branch.unlock()
 
303
        self.assertEqual(None, result)
 
304
 
 
305
    def test_no_such_revision(self):
 
306
        # A response of 'NoSuchRevision' is translated into an exception.
 
307
        client = FakeClient([
 
308
            # lock_write
 
309
            (('ok', 'branch token', 'repo token'), ),
 
310
            # set_last_revision
 
311
            (('NoSuchRevision', 'rev-id'), ),
 
312
            # unlock
 
313
            (('ok',), )])
 
314
        transport = MemoryTransport()
 
315
        transport.mkdir('branch')
 
316
        transport = transport.clone('branch')
 
317
 
 
318
        bzrdir = RemoteBzrDir(transport, _client=False)
 
319
        branch = RemoteBranch(bzrdir, None, _client=client)
 
320
        branch._ensure_real = lambda: None
 
321
        branch.lock_write()
 
322
        client._calls = []
 
323
 
 
324
        self.assertRaises(
 
325
            errors.NoSuchRevision, branch.set_revision_history, ['rev-id'])
 
326
        branch.unlock()
 
327
 
 
328
 
 
329
class TestBranchControlGetBranchConf(tests.TestCaseWithMemoryTransport):
 
330
    """Test branch.control_files api munging...
 
331
 
 
332
    We special case RemoteBranch.control_files.get('branch.conf') to
 
333
    call a specific API so that RemoteBranch's can intercept configuration
 
334
    file reading, allowing them to signal to the client about things like
 
335
    'email is configured for commits'.
 
336
    """
 
337
 
 
338
    def test_get_branch_conf(self):
 
339
        # in an empty branch we decode the response properly
 
340
        client = FakeClient([(('ok', ), 'config file body')])
 
341
        # we need to make a real branch because the remote_branch.control_files
 
342
        # will trigger _ensure_real.
 
343
        branch = self.make_branch('quack')
 
344
        transport = branch.bzrdir.root_transport
 
345
        # we do not want bzrdir to make any remote calls
 
346
        bzrdir = RemoteBzrDir(transport, _client=False)
 
347
        branch = RemoteBranch(bzrdir, None, _client=client)
 
348
        result = branch.control_files.get('branch.conf')
 
349
        self.assertEqual(
 
350
            [('call_expecting_body', 'Branch.get_config_file', ('///quack/',))],
 
351
            client._calls)
 
352
        self.assertEqual('config file body', result.read())
 
353
 
 
354
 
 
355
class TestBranchLockWrite(tests.TestCase):
 
356
 
 
357
    def test_lock_write_unlockable(self):
 
358
        client = FakeClient([(('UnlockableTransport', ), '')])
 
359
        transport = MemoryTransport()
 
360
        transport.mkdir('quack')
 
361
        transport = transport.clone('quack')
 
362
        # we do not want bzrdir to make any remote calls
 
363
        bzrdir = RemoteBzrDir(transport, _client=False)
 
364
        branch = RemoteBranch(bzrdir, None, _client=client)
 
365
        self.assertRaises(errors.UnlockableTransport, branch.lock_write)
 
366
        self.assertEqual(
 
367
            [('call', 'Branch.lock_write', ('///quack/', '', ''))],
 
368
            client._calls)
 
369
 
 
370
 
 
371
class TestRemoteRepository(tests.TestCase):
 
372
 
 
373
    def setup_fake_client_and_repository(self, responses, transport_path):
 
374
        """Create the fake client and repository for testing with."""
 
375
        client = FakeClient(responses)
 
376
        transport = MemoryTransport()
 
377
        transport.mkdir(transport_path)
 
378
        transport = transport.clone(transport_path)
 
379
        # we do not want bzrdir to make any remote calls
 
380
        bzrdir = RemoteBzrDir(transport, _client=False)
 
381
        repo = RemoteRepository(bzrdir, None, _client=client)
 
382
        return repo, client
 
383
 
 
384
 
 
385
class TestRepositoryGatherStats(TestRemoteRepository):
 
386
 
 
387
    def test_revid_none(self):
 
388
        # ('ok',), body with revisions and size
 
389
        responses = [(('ok', ), 'revisions: 2\nsize: 18\n')]
 
390
        transport_path = 'quack'
 
391
        repo, client = self.setup_fake_client_and_repository(
 
392
            responses, transport_path)
 
393
        result = repo.gather_stats(None)
 
394
        self.assertEqual(
 
395
            [('call_expecting_body', 'Repository.gather_stats',
 
396
             ('///quack/','','no'))],
 
397
            client._calls)
 
398
        self.assertEqual({'revisions': 2, 'size': 18}, result)
 
399
 
 
400
    def test_revid_no_committers(self):
 
401
        # ('ok',), body without committers
 
402
        responses = [(('ok', ),
 
403
                      'firstrev: 123456.300 3600\n'
 
404
                      'latestrev: 654231.400 0\n'
 
405
                      'revisions: 2\n'
 
406
                      'size: 18\n')]
 
407
        transport_path = 'quick'
 
408
        revid = u'\xc8'.encode('utf8')
 
409
        repo, client = self.setup_fake_client_and_repository(
 
410
            responses, transport_path)
 
411
        result = repo.gather_stats(revid)
 
412
        self.assertEqual(
 
413
            [('call_expecting_body', 'Repository.gather_stats',
 
414
              ('///quick/', revid, 'no'))],
 
415
            client._calls)
 
416
        self.assertEqual({'revisions': 2, 'size': 18,
 
417
                          'firstrev': (123456.300, 3600),
 
418
                          'latestrev': (654231.400, 0),},
 
419
                         result)
 
420
 
 
421
    def test_revid_with_committers(self):
 
422
        # ('ok',), body with committers
 
423
        responses = [(('ok', ),
 
424
                      'committers: 128\n'
 
425
                      'firstrev: 123456.300 3600\n'
 
426
                      'latestrev: 654231.400 0\n'
 
427
                      'revisions: 2\n'
 
428
                      'size: 18\n')]
 
429
        transport_path = 'buick'
 
430
        revid = u'\xc8'.encode('utf8')
 
431
        repo, client = self.setup_fake_client_and_repository(
 
432
            responses, transport_path)
 
433
        result = repo.gather_stats(revid, True)
 
434
        self.assertEqual(
 
435
            [('call_expecting_body', 'Repository.gather_stats',
 
436
              ('///buick/', revid, 'yes'))],
 
437
            client._calls)
 
438
        self.assertEqual({'revisions': 2, 'size': 18,
 
439
                          'committers': 128,
 
440
                          'firstrev': (123456.300, 3600),
 
441
                          'latestrev': (654231.400, 0),},
 
442
                         result)
 
443
 
 
444
 
 
445
class TestRepositoryGetRevisionGraph(TestRemoteRepository):
 
446
    
 
447
    def test_null_revision(self):
 
448
        # a null revision has the predictable result {}, we should have no wire
 
449
        # traffic when calling it with this argument
 
450
        responses = [(('notused', ), '')]
 
451
        transport_path = 'empty'
 
452
        repo, client = self.setup_fake_client_and_repository(
 
453
            responses, transport_path)
 
454
        result = repo.get_revision_graph(NULL_REVISION)
 
455
        self.assertEqual([], client._calls)
 
456
        self.assertEqual({}, result)
 
457
 
 
458
    def test_none_revision(self):
 
459
        # with none we want the entire graph
 
460
        r1 = u'\u0e33'.encode('utf8')
 
461
        r2 = u'\u0dab'.encode('utf8')
 
462
        lines = [' '.join([r2, r1]), r1]
 
463
        encoded_body = '\n'.join(lines)
 
464
 
 
465
        responses = [(('ok', ), encoded_body)]
 
466
        transport_path = 'sinhala'
 
467
        repo, client = self.setup_fake_client_and_repository(
 
468
            responses, transport_path)
 
469
        result = repo.get_revision_graph()
 
470
        self.assertEqual(
 
471
            [('call_expecting_body', 'Repository.get_revision_graph',
 
472
             ('///sinhala/', ''))],
 
473
            client._calls)
 
474
        self.assertEqual({r1: [], r2: [r1]}, result)
 
475
 
 
476
    def test_specific_revision(self):
 
477
        # with a specific revision we want the graph for that
 
478
        # with none we want the entire graph
 
479
        r11 = u'\u0e33'.encode('utf8')
 
480
        r12 = u'\xc9'.encode('utf8')
 
481
        r2 = u'\u0dab'.encode('utf8')
 
482
        lines = [' '.join([r2, r11, r12]), r11, r12]
 
483
        encoded_body = '\n'.join(lines)
 
484
 
 
485
        responses = [(('ok', ), encoded_body)]
 
486
        transport_path = 'sinhala'
 
487
        repo, client = self.setup_fake_client_and_repository(
 
488
            responses, transport_path)
 
489
        result = repo.get_revision_graph(r2)
 
490
        self.assertEqual(
 
491
            [('call_expecting_body', 'Repository.get_revision_graph',
 
492
             ('///sinhala/', r2))],
 
493
            client._calls)
 
494
        self.assertEqual({r11: [], r12: [], r2: [r11, r12], }, result)
 
495
 
 
496
    def test_no_such_revision(self):
 
497
        revid = '123'
 
498
        responses = [(('nosuchrevision', revid), '')]
 
499
        transport_path = 'sinhala'
 
500
        repo, client = self.setup_fake_client_and_repository(
 
501
            responses, transport_path)
 
502
        # also check that the right revision is reported in the error
 
503
        self.assertRaises(errors.NoSuchRevision,
 
504
            repo.get_revision_graph, revid)
 
505
        self.assertEqual(
 
506
            [('call_expecting_body', 'Repository.get_revision_graph',
 
507
             ('///sinhala/', revid))],
 
508
            client._calls)
 
509
 
 
510
        
 
511
class TestRepositoryIsShared(TestRemoteRepository):
 
512
 
 
513
    def test_is_shared(self):
 
514
        # ('yes', ) for Repository.is_shared -> 'True'.
 
515
        responses = [(('yes', ), )]
 
516
        transport_path = 'quack'
 
517
        repo, client = self.setup_fake_client_and_repository(
 
518
            responses, transport_path)
 
519
        result = repo.is_shared()
 
520
        self.assertEqual(
 
521
            [('call', 'Repository.is_shared', ('///quack/',))],
 
522
            client._calls)
 
523
        self.assertEqual(True, result)
 
524
 
 
525
    def test_is_not_shared(self):
 
526
        # ('no', ) for Repository.is_shared -> 'False'.
 
527
        responses = [(('no', ), )]
 
528
        transport_path = 'qwack'
 
529
        repo, client = self.setup_fake_client_and_repository(
 
530
            responses, transport_path)
 
531
        result = repo.is_shared()
 
532
        self.assertEqual(
 
533
            [('call', 'Repository.is_shared', ('///qwack/',))],
 
534
            client._calls)
 
535
        self.assertEqual(False, result)
 
536
 
 
537
 
 
538
class TestRepositoryLockWrite(TestRemoteRepository):
 
539
 
 
540
    def test_lock_write(self):
 
541
        responses = [(('ok', 'a token'), '')]
 
542
        transport_path = 'quack'
 
543
        repo, client = self.setup_fake_client_and_repository(
 
544
            responses, transport_path)
 
545
        result = repo.lock_write()
 
546
        self.assertEqual(
 
547
            [('call', 'Repository.lock_write', ('///quack/', ''))],
 
548
            client._calls)
 
549
        self.assertEqual('a token', result)
 
550
 
 
551
    def test_lock_write_already_locked(self):
 
552
        responses = [(('LockContention', ), '')]
 
553
        transport_path = 'quack'
 
554
        repo, client = self.setup_fake_client_and_repository(
 
555
            responses, transport_path)
 
556
        self.assertRaises(errors.LockContention, repo.lock_write)
 
557
        self.assertEqual(
 
558
            [('call', 'Repository.lock_write', ('///quack/', ''))],
 
559
            client._calls)
 
560
 
 
561
    def test_lock_write_unlockable(self):
 
562
        responses = [(('UnlockableTransport', ), '')]
 
563
        transport_path = 'quack'
 
564
        repo, client = self.setup_fake_client_and_repository(
 
565
            responses, transport_path)
 
566
        self.assertRaises(errors.UnlockableTransport, repo.lock_write)
 
567
        self.assertEqual(
 
568
            [('call', 'Repository.lock_write', ('///quack/', ''))],
 
569
            client._calls)
 
570
 
 
571
 
 
572
class TestRepositoryUnlock(TestRemoteRepository):
 
573
 
 
574
    def test_unlock(self):
 
575
        responses = [(('ok', 'a token'), ''),
 
576
                     (('ok',), '')]
 
577
        transport_path = 'quack'
 
578
        repo, client = self.setup_fake_client_and_repository(
 
579
            responses, transport_path)
 
580
        repo.lock_write()
 
581
        repo.unlock()
 
582
        self.assertEqual(
 
583
            [('call', 'Repository.lock_write', ('///quack/', '')),
 
584
             ('call', 'Repository.unlock', ('///quack/', 'a token'))],
 
585
            client._calls)
 
586
 
 
587
    def test_unlock_wrong_token(self):
 
588
        # If somehow the token is wrong, unlock will raise TokenMismatch.
 
589
        responses = [(('ok', 'a token'), ''),
 
590
                     (('TokenMismatch',), '')]
 
591
        transport_path = 'quack'
 
592
        repo, client = self.setup_fake_client_and_repository(
 
593
            responses, transport_path)
 
594
        repo.lock_write()
 
595
        self.assertRaises(errors.TokenMismatch, repo.unlock)
 
596
 
 
597
 
 
598
class TestRepositoryHasRevision(TestRemoteRepository):
 
599
 
 
600
    def test_none(self):
 
601
        # repo.has_revision(None) should not cause any traffic.
 
602
        transport_path = 'quack'
 
603
        responses = None
 
604
        repo, client = self.setup_fake_client_and_repository(
 
605
            responses, transport_path)
 
606
 
 
607
        # The null revision is always there, so has_revision(None) == True.
 
608
        self.assertEqual(True, repo.has_revision(None))
 
609
 
 
610
        # The remote repo shouldn't be accessed.
 
611
        self.assertEqual([], client._calls)