1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for remote bzrdir/branch/repo/etc
19
These are proxy objects which act on remote objects by sending messages
20
through a smart client. The proxies are to be created when attempting to open
21
the object given a transport that supports smartserver rpc operations.
23
These tests correspond to tests.test_smart, which exercises the server side.
26
from cStringIO import StringIO
35
from bzrlib.branch import Branch
36
from bzrlib.bzrdir import BzrDir, BzrDirFormat
37
from bzrlib.remote import (
43
from bzrlib.revision import NULL_REVISION
44
from bzrlib.smart import server, medium
45
from bzrlib.smart.client import _SmartClient
46
from bzrlib.transport.memory import MemoryTransport
47
from bzrlib.transport.remote import RemoteTransport
50
class BasicRemoteObjectTests(tests.TestCaseWithTransport):
53
self.transport_server = server.SmartTCPServer_for_testing
54
super(BasicRemoteObjectTests, self).setUp()
55
self.transport = self.get_transport()
56
self.client = self.transport.get_smart_client()
57
# make a branch that can be opened over the smart transport
58
self.local_wt = BzrDir.create_standalone_workingtree('.')
61
self.transport.disconnect()
62
tests.TestCaseWithTransport.tearDown(self)
64
def test_create_remote_bzrdir(self):
65
b = remote.RemoteBzrDir(self.transport)
66
self.assertIsInstance(b, BzrDir)
68
def test_open_remote_branch(self):
69
# open a standalone branch in the working directory
70
b = remote.RemoteBzrDir(self.transport)
71
branch = b.open_branch()
72
self.assertIsInstance(branch, Branch)
74
def test_remote_repository(self):
75
b = BzrDir.open_from_transport(self.transport)
76
repo = b.open_repository()
77
revid = u'\xc823123123'.encode('utf8')
78
self.assertFalse(repo.has_revision(revid))
79
self.local_wt.commit(message='test commit', rev_id=revid)
80
self.assertTrue(repo.has_revision(revid))
82
def test_remote_branch_revision_history(self):
83
b = BzrDir.open_from_transport(self.transport).open_branch()
84
self.assertEqual([], b.revision_history())
85
r1 = self.local_wt.commit('1st commit')
86
r2 = self.local_wt.commit('1st commit', rev_id=u'\xc8'.encode('utf8'))
87
self.assertEqual([r1, r2], b.revision_history())
89
def test_find_correct_format(self):
90
"""Should open a RemoteBzrDir over a RemoteTransport"""
91
fmt = BzrDirFormat.find_format(self.transport)
92
self.assertTrue(RemoteBzrDirFormat
93
in BzrDirFormat._control_server_formats)
94
self.assertIsInstance(fmt, remote.RemoteBzrDirFormat)
96
def test_open_detected_smart_format(self):
97
fmt = BzrDirFormat.find_format(self.transport)
98
d = fmt.open(self.transport)
99
self.assertIsInstance(d, BzrDir)
102
class FakeProtocol(object):
103
"""Lookalike SmartClientRequestProtocolOne allowing body reading tests."""
105
def __init__(self, body):
106
self._body_buffer = StringIO(body)
108
def read_body_bytes(self, count=-1):
109
return self._body_buffer.read(count)
112
class FakeClient(_SmartClient):
113
"""Lookalike for _SmartClient allowing testing."""
115
def __init__(self, responses):
116
# We don't call the super init because there is no medium.
117
"""create a FakeClient.
119
:param respones: A list of response-tuple, body-data pairs to be sent
122
self.responses = responses
125
def call(self, method, *args):
126
self._calls.append(('call', method, args))
127
return self.responses.pop(0)[0]
129
def call_expecting_body(self, method, *args):
130
self._calls.append(('call_expecting_body', method, args))
131
result = self.responses.pop(0)
132
return result[0], FakeProtocol(result[1])
135
class TestBzrDirOpenBranch(tests.TestCase):
137
def test_branch_present(self):
138
client = FakeClient([(('ok', ''), ), (('ok', '', 'no', 'no'), )])
139
transport = MemoryTransport()
140
transport.mkdir('quack')
141
transport = transport.clone('quack')
142
bzrdir = RemoteBzrDir(transport, _client=client)
143
result = bzrdir.open_branch()
145
[('call', 'BzrDir.open_branch', ('///quack/',)),
146
('call', 'BzrDir.find_repository', ('///quack/',))],
148
self.assertIsInstance(result, RemoteBranch)
149
self.assertEqual(bzrdir, result.bzrdir)
151
def test_branch_missing(self):
152
client = FakeClient([(('nobranch',), )])
153
transport = MemoryTransport()
154
transport.mkdir('quack')
155
transport = transport.clone('quack')
156
bzrdir = RemoteBzrDir(transport, _client=client)
157
self.assertRaises(errors.NotBranchError, bzrdir.open_branch)
159
[('call', 'BzrDir.open_branch', ('///quack/',))],
162
def check_open_repository(self, rich_root, subtrees):
164
rich_response = 'yes'
168
subtree_response = 'yes'
170
subtree_response = 'no'
171
client = FakeClient([(('ok', '', rich_response, subtree_response), ),])
172
transport = MemoryTransport()
173
transport.mkdir('quack')
174
transport = transport.clone('quack')
175
bzrdir = RemoteBzrDir(transport, _client=client)
176
result = bzrdir.open_repository()
178
[('call', 'BzrDir.find_repository', ('///quack/',))],
180
self.assertIsInstance(result, RemoteRepository)
181
self.assertEqual(bzrdir, result.bzrdir)
182
self.assertEqual(rich_root, result._format.rich_root_data)
183
self.assertEqual(subtrees, result._format.supports_tree_reference)
185
def test_open_repository_sets_format_attributes(self):
186
self.check_open_repository(True, True)
187
self.check_open_repository(False, True)
188
self.check_open_repository(True, False)
189
self.check_open_repository(False, False)
191
def test_old_server(self):
192
"""RemoteBzrDirFormat should fail to probe if the server version is too
195
self.assertRaises(errors.NotBranchError,
196
RemoteBzrDirFormat.probe_transport, OldServerTransport())
199
class OldSmartClient(object):
200
"""A fake smart client for test_old_version that just returns a version one
201
response to the 'hello' (query version) command.
204
def get_request(self):
205
input_file = StringIO('ok\x011\n')
206
output_file = StringIO()
207
client_medium = medium.SmartSimplePipesClientMedium(
208
input_file, output_file)
209
return medium.SmartClientStreamMediumRequest(client_medium)
212
class OldServerTransport(object):
213
"""A fake transport for test_old_server that reports it's smart server
214
protocol version as version one.
220
def get_smart_client(self):
221
return OldSmartClient()
224
class TestBranchLastRevisionInfo(tests.TestCase):
226
def test_empty_branch(self):
227
# in an empty branch we decode the response properly
228
client = FakeClient([(('ok', '0', 'null:'), )])
229
transport = MemoryTransport()
230
transport.mkdir('quack')
231
transport = transport.clone('quack')
232
# we do not want bzrdir to make any remote calls
233
bzrdir = RemoteBzrDir(transport, _client=False)
234
branch = RemoteBranch(bzrdir, None, _client=client)
235
result = branch.last_revision_info()
238
[('call', 'Branch.last_revision_info', ('///quack/',))],
240
self.assertEqual((0, NULL_REVISION), result)
242
def test_non_empty_branch(self):
243
# in a non-empty branch we also decode the response properly
244
revid = u'\xc8'.encode('utf8')
245
client = FakeClient([(('ok', '2', revid), )])
246
transport = MemoryTransport()
247
transport.mkdir('kwaak')
248
transport = transport.clone('kwaak')
249
# we do not want bzrdir to make any remote calls
250
bzrdir = RemoteBzrDir(transport, _client=False)
251
branch = RemoteBranch(bzrdir, None, _client=client)
252
result = branch.last_revision_info()
255
[('call', 'Branch.last_revision_info', ('///kwaak/',))],
257
self.assertEqual((2, revid), result)
260
class TestBranchSetLastRevision(tests.TestCase):
262
def test_set_empty(self):
263
# set_revision_history([]) is translated to calling
264
# Branch.set_last_revision(path, '') on the wire.
265
client = FakeClient([
267
(('ok', 'branch token', 'repo token'), ),
272
transport = MemoryTransport()
273
transport.mkdir('branch')
274
transport = transport.clone('branch')
276
bzrdir = RemoteBzrDir(transport, _client=False)
277
branch = RemoteBranch(bzrdir, None, _client=client)
278
# This is a hack to work around the problem that RemoteBranch currently
279
# unnecessarily invokes _ensure_real upon a call to lock_write.
280
branch._ensure_real = lambda: None
283
result = branch.set_revision_history([])
285
[('call', 'Branch.set_last_revision',
286
('///branch/', 'branch token', 'repo token', 'null:'))],
289
self.assertEqual(None, result)
291
def test_set_nonempty(self):
292
# set_revision_history([rev-id1, ..., rev-idN]) is translated to calling
293
# Branch.set_last_revision(path, rev-idN) on the wire.
294
client = FakeClient([
296
(('ok', 'branch token', 'repo token'), ),
301
transport = MemoryTransport()
302
transport.mkdir('branch')
303
transport = transport.clone('branch')
305
bzrdir = RemoteBzrDir(transport, _client=False)
306
branch = RemoteBranch(bzrdir, None, _client=client)
307
# This is a hack to work around the problem that RemoteBranch currently
308
# unnecessarily invokes _ensure_real upon a call to lock_write.
309
branch._ensure_real = lambda: None
310
# Lock the branch, reset the record of remote calls.
314
result = branch.set_revision_history(['rev-id1', 'rev-id2'])
316
[('call', 'Branch.set_last_revision',
317
('///branch/', 'branch token', 'repo token', 'rev-id2'))],
320
self.assertEqual(None, result)
322
def test_no_such_revision(self):
323
# A response of 'NoSuchRevision' is translated into an exception.
324
client = FakeClient([
326
(('ok', 'branch token', 'repo token'), ),
328
(('NoSuchRevision', 'rev-id'), ),
331
transport = MemoryTransport()
332
transport.mkdir('branch')
333
transport = transport.clone('branch')
335
bzrdir = RemoteBzrDir(transport, _client=False)
336
branch = RemoteBranch(bzrdir, None, _client=client)
337
branch._ensure_real = lambda: None
342
errors.NoSuchRevision, branch.set_revision_history, ['rev-id'])
346
class TestBranchControlGetBranchConf(tests.TestCaseWithMemoryTransport):
347
"""Test branch.control_files api munging...
349
We special case RemoteBranch.control_files.get('branch.conf') to
350
call a specific API so that RemoteBranch's can intercept configuration
351
file reading, allowing them to signal to the client about things like
352
'email is configured for commits'.
355
def test_get_branch_conf(self):
356
# in an empty branch we decode the response properly
357
client = FakeClient([(('ok', ), 'config file body')])
358
# we need to make a real branch because the remote_branch.control_files
359
# will trigger _ensure_real.
360
branch = self.make_branch('quack')
361
transport = branch.bzrdir.root_transport
362
# we do not want bzrdir to make any remote calls
363
bzrdir = RemoteBzrDir(transport, _client=False)
364
branch = RemoteBranch(bzrdir, None, _client=client)
365
result = branch.control_files.get('branch.conf')
367
[('call_expecting_body', 'Branch.get_config_file', ('///quack/',))],
369
self.assertEqual('config file body', result.read())
372
class TestBranchLockWrite(tests.TestCase):
374
def test_lock_write_unlockable(self):
375
client = FakeClient([(('UnlockableTransport', ), '')])
376
transport = MemoryTransport()
377
transport.mkdir('quack')
378
transport = transport.clone('quack')
379
# we do not want bzrdir to make any remote calls
380
bzrdir = RemoteBzrDir(transport, _client=False)
381
branch = RemoteBranch(bzrdir, None, _client=client)
382
self.assertRaises(errors.UnlockableTransport, branch.lock_write)
384
[('call', 'Branch.lock_write', ('///quack/', '', ''))],
388
class TestTransportIsReadonly(tests.TestCase):
391
client = FakeClient([(('yes',), '')])
392
transport = RemoteTransport('bzr://example.com/', medium=False,
394
self.assertEqual(True, transport.is_readonly())
396
[('call', 'Transport.is_readonly', ())],
399
def test_false(self):
400
client = FakeClient([(('no',), '')])
401
transport = RemoteTransport('bzr://example.com/', medium=False,
403
self.assertEqual(False, transport.is_readonly())
405
[('call', 'Transport.is_readonly', ())],
408
def test_error_from_old_server(self):
409
"""bzr 0.15 and earlier servers don't recognise the is_readonly verb.
411
Clients should treat it as a "no" response, because is_readonly is only
412
advisory anyway (a transport could be read-write, but then the
413
underlying filesystem could be readonly anyway).
415
client = FakeClient([(
416
('error', "Generic bzr smart protocol error: "
417
"bad request 'Transport.is_readonly'"), '')])
418
transport = RemoteTransport('bzr://example.com/', medium=False,
420
self.assertEqual(False, transport.is_readonly())
422
[('call', 'Transport.is_readonly', ())],
426
class TestRemoteRepository(tests.TestCase):
427
"""Base for testing RemoteRepository protocol usage.
429
These tests contain frozen requests and responses. We want any changes to
430
what is sent or expected to be require a thoughtful update to these tests
431
because they might break compatibility with different-versioned servers.
434
def setup_fake_client_and_repository(self, responses, transport_path):
435
"""Create the fake client and repository for testing with.
437
There's no real server here; we just have canned responses sent
440
:param transport_path: Path below the root of the MemoryTransport
441
where the repository will be created.
443
client = FakeClient(responses)
444
transport = MemoryTransport()
445
transport.mkdir(transport_path)
446
transport = transport.clone(transport_path)
447
# we do not want bzrdir to make any remote calls
448
bzrdir = RemoteBzrDir(transport, _client=False)
449
repo = RemoteRepository(bzrdir, None, _client=client)
453
class TestRepositoryGatherStats(TestRemoteRepository):
455
def test_revid_none(self):
456
# ('ok',), body with revisions and size
457
responses = [(('ok', ), 'revisions: 2\nsize: 18\n')]
458
transport_path = 'quack'
459
repo, client = self.setup_fake_client_and_repository(
460
responses, transport_path)
461
result = repo.gather_stats(None)
463
[('call_expecting_body', 'Repository.gather_stats',
464
('///quack/','','no'))],
466
self.assertEqual({'revisions': 2, 'size': 18}, result)
468
def test_revid_no_committers(self):
469
# ('ok',), body without committers
470
responses = [(('ok', ),
471
'firstrev: 123456.300 3600\n'
472
'latestrev: 654231.400 0\n'
475
transport_path = 'quick'
476
revid = u'\xc8'.encode('utf8')
477
repo, client = self.setup_fake_client_and_repository(
478
responses, transport_path)
479
result = repo.gather_stats(revid)
481
[('call_expecting_body', 'Repository.gather_stats',
482
('///quick/', revid, 'no'))],
484
self.assertEqual({'revisions': 2, 'size': 18,
485
'firstrev': (123456.300, 3600),
486
'latestrev': (654231.400, 0),},
489
def test_revid_with_committers(self):
490
# ('ok',), body with committers
491
responses = [(('ok', ),
493
'firstrev: 123456.300 3600\n'
494
'latestrev: 654231.400 0\n'
497
transport_path = 'buick'
498
revid = u'\xc8'.encode('utf8')
499
repo, client = self.setup_fake_client_and_repository(
500
responses, transport_path)
501
result = repo.gather_stats(revid, True)
503
[('call_expecting_body', 'Repository.gather_stats',
504
('///buick/', revid, 'yes'))],
506
self.assertEqual({'revisions': 2, 'size': 18,
508
'firstrev': (123456.300, 3600),
509
'latestrev': (654231.400, 0),},
513
class TestRepositoryGetRevisionGraph(TestRemoteRepository):
515
def test_null_revision(self):
516
# a null revision has the predictable result {}, we should have no wire
517
# traffic when calling it with this argument
518
responses = [(('notused', ), '')]
519
transport_path = 'empty'
520
repo, client = self.setup_fake_client_and_repository(
521
responses, transport_path)
522
result = repo.get_revision_graph(NULL_REVISION)
523
self.assertEqual([], client._calls)
524
self.assertEqual({}, result)
526
def test_none_revision(self):
527
# with none we want the entire graph
528
r1 = u'\u0e33'.encode('utf8')
529
r2 = u'\u0dab'.encode('utf8')
530
lines = [' '.join([r2, r1]), r1]
531
encoded_body = '\n'.join(lines)
533
responses = [(('ok', ), encoded_body)]
534
transport_path = 'sinhala'
535
repo, client = self.setup_fake_client_and_repository(
536
responses, transport_path)
537
result = repo.get_revision_graph()
539
[('call_expecting_body', 'Repository.get_revision_graph',
540
('///sinhala/', ''))],
542
self.assertEqual({r1: [], r2: [r1]}, result)
544
def test_specific_revision(self):
545
# with a specific revision we want the graph for that
546
# with none we want the entire graph
547
r11 = u'\u0e33'.encode('utf8')
548
r12 = u'\xc9'.encode('utf8')
549
r2 = u'\u0dab'.encode('utf8')
550
lines = [' '.join([r2, r11, r12]), r11, r12]
551
encoded_body = '\n'.join(lines)
553
responses = [(('ok', ), encoded_body)]
554
transport_path = 'sinhala'
555
repo, client = self.setup_fake_client_and_repository(
556
responses, transport_path)
557
result = repo.get_revision_graph(r2)
559
[('call_expecting_body', 'Repository.get_revision_graph',
560
('///sinhala/', r2))],
562
self.assertEqual({r11: [], r12: [], r2: [r11, r12], }, result)
564
def test_no_such_revision(self):
566
responses = [(('nosuchrevision', revid), '')]
567
transport_path = 'sinhala'
568
repo, client = self.setup_fake_client_and_repository(
569
responses, transport_path)
570
# also check that the right revision is reported in the error
571
self.assertRaises(errors.NoSuchRevision,
572
repo.get_revision_graph, revid)
574
[('call_expecting_body', 'Repository.get_revision_graph',
575
('///sinhala/', revid))],
579
class TestRepositoryIsShared(TestRemoteRepository):
581
def test_is_shared(self):
582
# ('yes', ) for Repository.is_shared -> 'True'.
583
responses = [(('yes', ), )]
584
transport_path = 'quack'
585
repo, client = self.setup_fake_client_and_repository(
586
responses, transport_path)
587
result = repo.is_shared()
589
[('call', 'Repository.is_shared', ('///quack/',))],
591
self.assertEqual(True, result)
593
def test_is_not_shared(self):
594
# ('no', ) for Repository.is_shared -> 'False'.
595
responses = [(('no', ), )]
596
transport_path = 'qwack'
597
repo, client = self.setup_fake_client_and_repository(
598
responses, transport_path)
599
result = repo.is_shared()
601
[('call', 'Repository.is_shared', ('///qwack/',))],
603
self.assertEqual(False, result)
606
class TestRepositoryLockWrite(TestRemoteRepository):
608
def test_lock_write(self):
609
responses = [(('ok', 'a token'), '')]
610
transport_path = 'quack'
611
repo, client = self.setup_fake_client_and_repository(
612
responses, transport_path)
613
result = repo.lock_write()
615
[('call', 'Repository.lock_write', ('///quack/', ''))],
617
self.assertEqual('a token', result)
619
def test_lock_write_already_locked(self):
620
responses = [(('LockContention', ), '')]
621
transport_path = 'quack'
622
repo, client = self.setup_fake_client_and_repository(
623
responses, transport_path)
624
self.assertRaises(errors.LockContention, repo.lock_write)
626
[('call', 'Repository.lock_write', ('///quack/', ''))],
629
def test_lock_write_unlockable(self):
630
responses = [(('UnlockableTransport', ), '')]
631
transport_path = 'quack'
632
repo, client = self.setup_fake_client_and_repository(
633
responses, transport_path)
634
self.assertRaises(errors.UnlockableTransport, repo.lock_write)
636
[('call', 'Repository.lock_write', ('///quack/', ''))],
640
class TestRepositoryUnlock(TestRemoteRepository):
642
def test_unlock(self):
643
responses = [(('ok', 'a token'), ''),
645
transport_path = 'quack'
646
repo, client = self.setup_fake_client_and_repository(
647
responses, transport_path)
651
[('call', 'Repository.lock_write', ('///quack/', '')),
652
('call', 'Repository.unlock', ('///quack/', 'a token'))],
655
def test_unlock_wrong_token(self):
656
# If somehow the token is wrong, unlock will raise TokenMismatch.
657
responses = [(('ok', 'a token'), ''),
658
(('TokenMismatch',), '')]
659
transport_path = 'quack'
660
repo, client = self.setup_fake_client_and_repository(
661
responses, transport_path)
663
self.assertRaises(errors.TokenMismatch, repo.unlock)
666
class TestRepositoryHasRevision(TestRemoteRepository):
669
# repo.has_revision(None) should not cause any traffic.
670
transport_path = 'quack'
672
repo, client = self.setup_fake_client_and_repository(
673
responses, transport_path)
675
# The null revision is always there, so has_revision(None) == True.
676
self.assertEqual(True, repo.has_revision(None))
678
# The remote repo shouldn't be accessed.
679
self.assertEqual([], client._calls)
682
class TestRepositoryTarball(TestRemoteRepository):
684
# This is a canned tarball reponse we can validate against
686
'QlpoOTFBWSZTWdGkj3wAAWF/k8aQACBIB//A9+8cIX/v33AACEAYABAECEACNz'
687
'JqsgJJFPTSnk1A3qh6mTQAAAANPUHkagkSTEkaA09QaNAAAGgAAAcwCYCZGAEY'
688
'mJhMJghpiaYBUkKammSHqNMZQ0NABkNAeo0AGneAevnlwQoGzEzNVzaYxp/1Uk'
689
'xXzA1CQX0BJMZZLcPBrluJir5SQyijWHYZ6ZUtVqqlYDdB2QoCwa9GyWwGYDMA'
690
'OQYhkpLt/OKFnnlT8E0PmO8+ZNSo2WWqeCzGB5fBXZ3IvV7uNJVE7DYnWj6qwB'
691
'k5DJDIrQ5OQHHIjkS9KqwG3mc3t+F1+iujb89ufyBNIKCgeZBWrl5cXxbMGoMs'
692
'c9JuUkg5YsiVcaZJurc6KLi6yKOkgCUOlIlOpOoXyrTJjK8ZgbklReDdwGmFgt'
693
'dkVsAIslSVCd4AtACSLbyhLHryfb14PKegrVDba+U8OL6KQtzdM5HLjAc8/p6n'
694
'0lgaWU8skgO7xupPTkyuwheSckejFLK5T4ZOo0Gda9viaIhpD1Qn7JqqlKAJqC'
695
'QplPKp2nqBWAfwBGaOwVrz3y1T+UZZNismXHsb2Jq18T+VaD9k4P8DqE3g70qV'
696
'JLurpnDI6VS5oqDDPVbtVjMxMxMg4rzQVipn2Bv1fVNK0iq3Gl0hhnnHKm/egy'
697
'nWQ7QH/F3JFOFCQ0aSPfA='
700
def test_repository_tarball(self):
701
# Test that Repository.tarball generates the right operations
702
transport_path = 'repo'
703
expected_responses = [(('ok',), self.tarball_content),
705
expected_calls = [('call_expecting_body', 'Repository.tarball',
706
('///repo/', 'bz2',),),
708
remote_repo, client = self.setup_fake_client_and_repository(
709
expected_responses, transport_path)
710
# Now actually ask for the tarball
711
tarball_file = remote_repo._get_tarball('bz2')
713
self.assertEqual(expected_calls, client._calls)
714
self.assertEqual(self.tarball_content, tarball_file.read())
718
def test_sprout_uses_tarball(self):
719
# RemoteRepository.sprout should try to use the
720
# tarball command rather than accessing all the files
721
transport_path = 'srcrepo'
722
expected_responses = [(('ok',), self.tarball_content),
724
expected_calls = [('call2', 'Repository.tarball', ('///srcrepo/', 'bz2',),),
726
remote_repo, client = self.setup_fake_client_and_repository(
727
expected_responses, transport_path)
728
# make a regular local repository to receive the results
729
dest_transport = MemoryTransport()
730
dest_transport.mkdir('destrepo')
731
bzrdir_format = bzrdir.format_registry.make_bzrdir('default')
732
dest_bzrdir = bzrdir_format.initialize_on_transport(dest_transport)
734
remote_repo.sprout(dest_bzrdir)
737
class TestRemoteRepositoryCopyContent(tests.TestCaseWithTransport):
738
"""RemoteRepository.copy_content_into optimizations"""
740
def test_copy_content_remote_to_local(self):
741
self.transport_server = server.SmartTCPServer_for_testing
742
src_repo = self.make_repository('repo1')
743
src_repo = repository.Repository.open(self.get_url('repo1'))
744
# At the moment the tarball-based copy_content_into can't write back
745
# into a smart server. It would be good if it could upload the
746
# tarball; once that works we'd have to create repositories of
747
# different formats. -- mbp 20070410
748
dest_url = self.get_vfs_only_url('repo2')
749
dest_bzrdir = BzrDir.create(dest_url)
750
dest_repo = dest_bzrdir.create_repository()
751
self.assertFalse(isinstance(dest_repo, RemoteRepository))
752
self.assertTrue(isinstance(src_repo, RemoteRepository))
753
src_repo.copy_content_into(dest_repo)