14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the smart wire/domain protocol.
19
This module contains tests for the domain-level smart requests and responses,
20
such as the 'Branch.lock_write' request. Many of these use specific disk
21
formats to exercise calls that only make sense for formats with specific
24
Tests for low-level protocol encoding are found in test_smart_transport.
17
"""Tests for the smart wire/domain protococl."""
27
19
from StringIO import StringIO
31
from bzrlib import bzrdir, errors, pack, smart, tests
32
from bzrlib.smart.request import (
33
FailedSmartServerResponse,
35
SuccessfulSmartServerResponse,
23
from bzrlib import bzrdir, errors, smart, tests
24
from bzrlib.smart.request import SmartServerResponse
37
25
import bzrlib.smart.bzrdir
38
26
import bzrlib.smart.branch
39
27
import bzrlib.smart.repository
40
from bzrlib.util import bencode
43
30
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
454
441
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
455
442
branch = self.make_branch('.')
456
443
response = request.execute('')
457
error_name, lock_str, why_str = response.args
458
self.assertFalse(response.is_successful())
459
self.assertEqual('LockFailed', error_name)
445
SmartServerResponse(('UnlockableTransport',)), response)
462
448
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithTransport):
719
705
def test_lock_write_on_readonly_transport(self):
720
706
backing = self.get_readonly_transport()
721
707
request = smart.repository.SmartServerRepositoryLockWrite(backing)
722
repository = self.make_repository('.', format='knit')
708
repository = self.make_repository('.')
723
709
response = request.execute('')
724
self.assertFalse(response.is_successful())
725
self.assertEqual('LockFailed', response.args[0])
711
SmartServerResponse(('UnlockableTransport',)), response)
728
714
class TestSmartServerRepositoryUnlock(tests.TestCaseWithTransport):
780
766
"extraneous file present in tar file")
783
class TestSmartServerRepositoryStreamKnitData(tests.TestCaseWithTransport):
785
def test_fetch_revisions(self):
786
backing = self.get_transport()
787
request = smart.repository.SmartServerRepositoryStreamKnitDataForRevisions(backing)
788
tree = self.make_branch_and_memory_tree('.')
791
rev_id1_utf8 = u'\xc8'.encode('utf-8')
792
rev_id2_utf8 = u'\xc9'.encode('utf-8')
793
r1 = tree.commit('1st commit', rev_id=rev_id1_utf8)
794
r1 = tree.commit('2nd commit', rev_id=rev_id2_utf8)
797
response = request.execute(backing.local_abspath(''), rev_id2_utf8)
798
self.assertEqual(('ok',), response.args)
799
from cStringIO import StringIO
800
unpacker = pack.ContainerReader(StringIO(response.body))
802
for [name], read_bytes in unpacker.iter_records():
804
bytes = read_bytes(None)
805
# The bytes should be a valid bencoded string.
806
bencode.bdecode(bytes)
807
# XXX: assert that the bencoded knit records have the right
810
def test_no_such_revision_error(self):
811
backing = self.get_transport()
812
request = smart.repository.SmartServerRepositoryStreamKnitDataForRevisions(backing)
813
repo = self.make_repository('.')
814
rev_id1_utf8 = u'\xc8'.encode('utf-8')
815
response = request.execute(backing.local_abspath(''), rev_id1_utf8)
817
SmartServerResponse(('NoSuchRevision', rev_id1_utf8)),
821
class TestSmartServerRepositoryStreamRevisionsChunked(tests.TestCaseWithTransport):
823
def test_fetch_revisions(self):
824
backing = self.get_transport()
825
request = smart.repository.SmartServerRepositoryStreamRevisionsChunked(
827
tree = self.make_branch_and_memory_tree('.')
830
rev_id1_utf8 = u'\xc8'.encode('utf-8')
831
rev_id2_utf8 = u'\xc9'.encode('utf-8')
832
r1 = tree.commit('1st commit', rev_id=rev_id1_utf8)
833
r1 = tree.commit('2nd commit', rev_id=rev_id2_utf8)
836
response = request.execute(backing.local_abspath(''), rev_id2_utf8)
837
self.assertEqual(('ok',), response.args)
838
from cStringIO import StringIO
839
parser = pack.ContainerPushParser()
841
for stream_bytes in response.body_stream:
842
parser.accept_bytes(stream_bytes)
843
for [name], record_bytes in parser.read_pending_records():
845
# The bytes should be a valid bencoded string.
846
bencode.bdecode(record_bytes)
847
# XXX: assert that the bencoded knit records have the right
850
def test_no_such_revision_error(self):
851
backing = self.get_transport()
852
request = smart.repository.SmartServerRepositoryStreamRevisionsChunked(
854
repo = self.make_repository('.')
855
rev_id1_utf8 = u'\xc8'.encode('utf-8')
856
response = request.execute(backing.local_abspath(''), rev_id1_utf8)
857
# There's no error initially.
858
self.assertTrue(response.is_successful())
859
self.assertEqual(('ok',), response.args)
860
# We only get an error while streaming the body.
861
body = list(response.body_stream)
862
last_chunk = body[-1]
863
self.assertIsInstance(last_chunk, FailedSmartServerResponse)
866
FailedSmartServerResponse(('NoSuchRevision', rev_id1_utf8)))
869
769
class TestSmartServerIsReadonly(tests.TestCaseWithTransport):
871
771
def test_is_readonly_no(self):
932
831
smart.request.request_handlers.get('Repository.lock_write'),
933
832
smart.repository.SmartServerRepositoryLockWrite)
934
833
self.assertEqual(
935
smart.request.request_handlers.get(
936
'Repository.chunked_stream_knit_data_for_revisions'),
937
smart.repository.SmartServerRepositoryStreamKnitDataForRevisions)
834
smart.request.request_handlers.get('Repository.unlock'),
835
smart.repository.SmartServerRepositoryUnlock)
938
836
self.assertEqual(
939
837
smart.request.request_handlers.get('Repository.tarball'),
940
838
smart.repository.SmartServerRepositoryTarball)
941
839
self.assertEqual(
942
smart.request.request_handlers.get('Repository.unlock'),
943
smart.repository.SmartServerRepositoryUnlock)
945
840
smart.request.request_handlers.get('Transport.is_readonly'),
946
841
smart.request.SmartServerIsReadonly)