1
# Copyright (C) 2006 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Test read_bundle works properly across various transports."""
23
from bzrlib.bundle.serializer import write_bundle
25
import bzrlib.errors as errors
26
from bzrlib.tests import TestCaseInTempDir
27
from bzrlib.tests.test_transport import TestTransportImplementation
28
import bzrlib.transport
29
from bzrlib.transport.memory import MemoryTransport
30
import bzrlib.urlutils
33
class TestReadBundleFromURL(TestTransportImplementation):
34
"""Test that read_bundle works properly across multiple transports"""
36
def get_url(self, relpath=''):
37
return bzrlib.urlutils.join(self._server.get_url(), relpath)
39
def create_test_bundle(self):
40
# Can't use self.get_transport() because that asserts that
41
# it is not readonly, so just skip tests where the server is readonly
42
self._transport = self.get_transport()
43
#if isinstance(self._transport, MemoryTransport):
45
self.build_tree(['tree/', 'tree/a', 'tree/subdir/'])
47
format = bzrlib.bzrdir.BzrDirFormat.get_default_format()
50
bzrdir = format.initialize('tree')
51
repo = bzrdir.create_repository()
52
branch = repo.bzrdir.create_branch()
53
wt = branch.bzrdir.create_workingtree()
55
wt.add(['a', 'subdir/'])
56
wt.commit('new project', rev_id='commit-1')
58
out = cStringIO.StringIO()
59
rev_ids = write_bundle(wt.branch.repository,
60
wt.get_parent_ids()[0], None, out)
62
if self._transport.is_readonly():
63
f = open('test_bundle', 'wb')
64
f.write(out.getvalue())
67
self._transport.put_file('test_bundle', out)
68
self.log('Put to: %s', self.get_url('test_bundle'))
71
def test_read_bundle_from_url(self):
72
wt = self.create_test_bundle()
76
info = bzrlib.bundle.read_bundle_from_url(
77
self.get_url('test_bundle'))
78
bundle_tree = info.revision_tree(wt.branch.repository, info.target)
79
self.assertEqual('commit-1', bundle_tree.revision_id)
81
def test_read_fail(self):
82
# Trying to read from a directory, or non-bundle file
83
# should fail with NotABundle
84
wt = self.create_test_bundle()
88
self.assertRaises(errors.NotABundle,
89
bzrlib.bundle.read_bundle_from_url,
91
self.assertRaises(errors.NotABundle,
92
bzrlib.bundle.read_bundle_from_url,
93
self.get_url('tree/a'))