1
# Copyright (C) 2005-2010 Canonical Ltd
1
# Copyright (C) 2005-2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
35
35
from bzrlib.bundle import read_mergeable_from_url
36
36
from bzrlib.bundle.apply_bundle import install_bundle, merge_bundle
37
37
from bzrlib.bundle.bundle_data import BundleTree
38
from bzrlib.bzrdir import BzrDir
39
38
from bzrlib.directory_service import directories
40
39
from bzrlib.bundle.serializer import write_bundle, read_bundle, v09, v4
41
40
from bzrlib.bundle.serializer.v08 import BundleSerializerV08
42
41
from bzrlib.bundle.serializer.v09 import BundleSerializerV09
43
42
from bzrlib.bundle.serializer.v4 import BundleSerializerV4
44
from bzrlib.branch import Branch
45
43
from bzrlib.repofmt import knitrepo
46
44
from bzrlib.tests import (
50
50
from bzrlib.transform import TreeTransform
78
79
inventory = property(lambda x:x)
81
return self.paths.iterkeys()
81
def all_file_ids(self):
82
return set(self.paths.keys())
83
84
def __getitem__(self, file_id):
84
85
if file_id == self.root.file_id:
114
115
ie = InventoryDirectory(file_id, name, parent_id)
115
116
elif kind == 'file':
116
117
ie = InventoryFile(file_id, name, parent_id)
118
ie.text_sha1 = text_sha_1
119
ie.text_size = text_size
117
120
elif kind == 'symlink':
118
121
ie = InventoryLink(file_id, name, parent_id)
120
123
raise errors.BzrError('unknown kind %r' % kind)
121
ie.text_sha1 = text_sha_1
122
ie.text_size = text_size
125
126
def add_dir(self, file_id, path):
149
def get_file_revision(self, file_id):
150
return self.inventory[file_id].revision
148
152
def contents_stats(self, file_id):
149
153
if file_id not in self.contents:
150
154
return None, None
507
511
if not _mod_revision.is_null(rev_id):
508
rh = self.b1.revision_history()
509
tree.branch.set_revision_history(rh[:rh.index(rev_id)+1])
512
tree.branch.generate_revision_history(rev_id)
511
514
delta = tree.changes_from(self.b1.repository.revision_tree(rev_id))
512
515
self.assertFalse(delta.has_changed(),
679
682
def _test_symlink_bundle(self, link_name, link_target, new_link_target):
680
683
link_id = 'link-1'
682
self.requireFeature(tests.SymlinkFeature)
685
self.requireFeature(features.SymlinkFeature)
683
686
self.tree1 = self.make_branch_and_tree('b1')
684
687
self.b1 = self.tree1.branch
726
729
self._test_symlink_bundle('link', 'bar/foo', 'mars')
728
731
def test_unicode_symlink_bundle(self):
729
self.requireFeature(tests.UnicodeFilenameFeature)
732
self.requireFeature(features.UnicodeFilenameFeature)
730
733
self._test_symlink_bundle(u'\N{Euro Sign}link',
731
734
u'bar/\N{Euro Sign}foo',
732
735
u'mars\N{Euro Sign}')
833
836
return bundle_file.getvalue()
835
838
def test_unicode_bundle(self):
836
self.requireFeature(tests.UnicodeFilenameFeature)
839
self.requireFeature(features.UnicodeFilenameFeature)
837
840
# Handle international characters
839
842
f = open(u'b1/with Dod\N{Euro Sign}', 'wb')
1412
1415
branch = tree_a.branch
1413
1416
repo_a = branch.repository
1414
1417
tree_a.commit("base", allow_pointless=True, rev_id='A')
1415
self.failIf(branch.repository.has_signature_for_revision_id('A'))
1418
self.assertFalse(branch.repository.has_signature_for_revision_id('A'))
1417
1420
from bzrlib.testament import Testament
1418
1421
# monkey patch gpg signing mechanism
1839
1836
bundle, then the ConnectionReset error should be propagated.
1841
1838
# Instantiate a server that will provoke a ConnectionReset
1842
sock_server = _DisconnectingTCPServer()
1839
sock_server = DisconnectingServer()
1843
1840
self.start_server(sock_server)
1844
1841
# We don't really care what the url is since the server will close the
1845
1842
# connection without interpreting it
1847
1844
self.assertRaises(errors.ConnectionReset, read_mergeable_from_url, url)
1850
class _DisconnectingTCPServer(object):
1851
"""A TCP server that immediately closes any connection made to it."""
1853
def start_server(self):
1854
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1855
self.sock.bind(('127.0.0.1', 0))
1857
self.port = self.sock.getsockname()[1]
1858
self.thread = threading.Thread(
1859
name='%s (port %d)' % (self.__class__.__name__, self.port),
1860
target=self.accept_and_close)
1863
def accept_and_close(self):
1864
conn, addr = self.sock.accept()
1865
conn.shutdown(socket.SHUT_RDWR)
1847
class DisconnectingHandler(SocketServer.BaseRequestHandler):
1848
"""A request handler that immediately closes any connection made to it."""
1851
self.request.close()
1854
class DisconnectingServer(test_server.TestingTCPServerInAThread):
1857
super(DisconnectingServer, self).__init__(
1859
test_server.TestingTCPServer,
1860
DisconnectingHandler)
1868
1862
def get_url(self):
1869
return 'bzr://127.0.0.1:%d/' % (self.port,)
1871
def stop_server(self):
1873
# make sure the thread dies by connecting to the listening socket,
1874
# just in case the test failed to do so.
1875
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1876
conn.connect(self.sock.getsockname())
1878
except socket.error:
1863
"""Return the url of the server"""
1864
return "bzr://%s:%d/" % self.server.server_address