1
# Copyright (C) 2006 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Test read_bundle works properly across various transports."""
23
from bzrlib.bundle.serializer import write_bundle
25
import bzrlib.errors as errors
26
from bzrlib.tests import TestCaseInTempDir
27
import bzrlib.transport
28
from bzrlib.transport.memory import MemoryTransport
29
import bzrlib.urlutils
32
# jam 20060615 Originally I thought I should use TestCaseWithTransport
33
# but it turns out that doesn't play well with the transport adapter
34
class TestReadBundleFromURL(TestCaseInTempDir):
35
"""Test that read_bundle works properly across multiple transports"""
38
super(TestReadBundleFromURL, self).setUp()
39
self._server = self.transport_server()
41
self._transport = None
44
self._transport = None
45
self._server.tearDown()
46
super(TestReadBundleFromURL, self).tearDown()
48
def get_transport(self):
49
"""Return a connected transport to the local directory."""
50
base_url = self._server.get_url()
51
t = bzrlib.transport.get_transport(base_url)
52
if not isinstance(t, self.transport_class):
53
# we want to make sure to construct one particular class, even if
54
# there are several available implementations of this transport;
55
# therefore construct it by hand rather than through the regular
56
# get_transport method
57
t = self.transport_class(base_url)
60
def get_url(self, relpath=''):
61
return bzrlib.urlutils.join(self._server.get_url(), relpath)
63
def create_test_bundle(self):
64
# Can't use self.get_transport() because that asserts that
65
# it is not readonly, so just skip tests where the server is readonly
66
self._transport = self.get_transport()
67
#if isinstance(self._transport, MemoryTransport):
69
self.build_tree(['tree/', 'tree/a', 'tree/subdir/'])
71
format = bzrlib.bzrdir.BzrDirFormat.get_default_format()
74
bzrdir = format.initialize('tree')
75
repo = bzrdir.create_repository()
76
branch = repo.bzrdir.create_branch()
77
wt = branch.bzrdir.create_workingtree()
79
wt.add(['a', 'subdir/'])
80
wt.commit('new project', rev_id='commit-1')
82
out = cStringIO.StringIO()
83
rev_ids = write_bundle(wt.branch.repository,
84
wt.last_revision(), None, out)
86
if self._transport.is_readonly():
87
f = open('test_bundle', 'wb')
88
f.write(out.getvalue())
91
self._transport.put('test_bundle', out)
92
self.log('Put to: %s', self.get_url('test_bundle'))
95
def test_read_bundle_from_url(self):
96
wt = self.create_test_bundle()
100
reader = bzrlib.bundle.read_bundle_from_url(
101
self.get_url('test_bundle'))
102
info, bundle_tree = reader.get_bundle(wt.branch.repository)
103
self.assertEqual('commit-1', bundle_tree.revision_id)
105
def test_read_fail(self):
106
# Trying to read from a directory, or non-bundle file
107
# should fail with NotABundle
108
wt = self.create_test_bundle()
112
self.assertRaises(errors.NotABundle,
113
bzrlib.bundle.read_bundle_from_url,
114
self.get_url('tree'))
115
self.assertRaises(errors.NotABundle,
116
bzrlib.bundle.read_bundle_from_url,
117
self.get_url('tree/a'))