~abentley/bzrtools/bzrtools.dev

« back to all changes in this revision

Viewing changes to upstream_import.py

  • Committer: Michael Ellerman
  • Date: 2005-11-29 07:12:26 UTC
  • mto: (0.3.1 shelf-dev) (325.1.2 bzrtools)
  • mto: This revision was merged to the branch mainline in revision 334.
  • Revision ID: michael@ellerman.id.au-20051129071226-a04b3f827880025d
Unshelve --pick was broken, because we deleted the whole patch, even when only
part of it was unshelved. So now if we unshelve part of a patch, the patch is
replaced with a new patch that has just the unshelved parts. That's a long way
of saying it does what you'd expect.

Implementing this required changing HunkSelector to return both the selected,
and unselected hunks (ie. patches to shelve, and patches to keep).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
"""Import upstream source into a branch"""
2
 
 
3
 
from bz2 import BZ2File
4
 
import errno
5
 
import os
6
 
from shutil import rmtree
7
 
from StringIO import StringIO
8
 
import tarfile
9
 
from unittest import makeSuite
10
 
 
11
 
from bzrlib.bzrdir import BzrDir
12
 
from bzrlib.delta import compare_trees
13
 
from bzrlib.errors import NoSuchFile, BzrCommandError, NotBranchError
14
 
from bzrlib.osutils import pathjoin, isdir, file_iterator
15
 
from bzrlib.tests import TestCaseInTempDir
16
 
from bzrlib.trace import warning
17
 
from bzrlib.transform import TreeTransform, resolve_conflicts, cook_conflicts
18
 
from bzrlib.workingtree import WorkingTree
19
 
 
20
 
 
21
 
def top_directory(path):
22
 
    """Return the top directory given in a path."""
23
 
    dirname = os.path.dirname(path)
24
 
    last_dirname = dirname
25
 
    while True:
26
 
        dirname = os.path.dirname(dirname)
27
 
        if dirname == '' or dirname == last_dirname:
28
 
            return last_dirname
29
 
        last_dirname = dirname
30
 
 
31
 
 
32
 
def common_directory(names):
33
 
    """Determine a single directory prefix from a list of names"""
34
 
    possible_prefix = None
35
 
    for name in names:
36
 
        name_top = top_directory(name)
37
 
        if possible_prefix is None:
38
 
            possible_prefix = name_top
39
 
        else:
40
 
            if name_top != possible_prefix:
41
 
                return None
42
 
    return possible_prefix
43
 
 
44
 
 
45
 
def do_directory(tt, trans_id, tree, relative_path, path):
46
 
    if isdir(path) and tree.path2id(relative_path) is not None:
47
 
        tt.cancel_deletion(trans_id)
48
 
    else:
49
 
        tt.create_directory(trans_id)
50
 
 
51
 
 
52
 
def add_implied_parents(implied_parents, path):
53
 
    """Update the set of implied parents from a path"""
54
 
    parent = os.path.dirname(path)
55
 
    if parent in implied_parents:
56
 
        return
57
 
    implied_parents.add(parent)
58
 
    add_implied_parents(implied_parents, parent)
59
 
 
60
 
 
61
 
def names_of_files(tar_file):
62
 
    for member in tar_file.getmembers():
63
 
        if member.type != "g":
64
 
            yield member.name
65
 
 
66
 
 
67
 
def import_tar(tree, tar_input):
68
 
    """Replace the contents of a working directory with tarfile contents.
69
 
    The tarfile may be a gzipped stream.  File ids will be updated.
70
 
    """
71
 
    tar_file = tarfile.open('lala', 'r', tar_input)
72
 
    prefix = common_directory(names_of_files(tar_file))
73
 
    tt = TreeTransform(tree)
74
 
 
75
 
    removed = set()
76
 
    for path, entry in tree.inventory.iter_entries():
77
 
        trans_id = tt.trans_id_tree_path(path)
78
 
        tt.delete_contents(trans_id)
79
 
        removed.add(path)
80
 
 
81
 
    added = set() 
82
 
    implied_parents = set()
83
 
    seen = set()
84
 
    for member in tar_file.getmembers():
85
 
        if member.type == 'g':
86
 
            # type 'g' is a header
87
 
            continue
88
 
        relative_path = member.name 
89
 
        if prefix is not None:
90
 
            relative_path = relative_path[len(prefix)+1:]
91
 
        if relative_path == '':
92
 
            continue
93
 
        add_implied_parents(implied_parents, relative_path)
94
 
        trans_id = tt.trans_id_tree_path(relative_path)
95
 
        added.add(relative_path.rstrip('/'))
96
 
        path = tree.abspath(relative_path)
97
 
        if member.name in seen:
98
 
            tt.cancel_creation(trans_id)
99
 
        seen.add(member.name)
100
 
        if member.isreg():
101
 
            tt.create_file(file_iterator(tar_file.extractfile(member)), 
102
 
                           trans_id)
103
 
        elif member.isdir():
104
 
            do_directory(tt, trans_id, tree, relative_path, path)
105
 
        elif member.issym():
106
 
            tt.create_symlink(member.linkname, trans_id)
107
 
 
108
 
    for relative_path in implied_parents.difference(added):
109
 
        if relative_path == "":
110
 
            continue
111
 
        trans_id = tt.trans_id_tree_path(relative_path)
112
 
        path = tree.abspath(relative_path)
113
 
        do_directory(tt, trans_id, tree, relative_path, path)
114
 
        added.add(relative_path)
115
 
 
116
 
    for conflict in cook_conflicts(resolve_conflicts(tt), tt):
117
 
        warning(conflict)
118
 
    tt.apply()
119
 
    update_ids(tree, added, removed)
120
 
 
121
 
 
122
 
def update_ids(tree, added, removed):
123
 
    """Make sure that all present files files have file_ids.
124
 
    """
125
 
    # XXX detect renames
126
 
    new = added.difference(removed)
127
 
    deleted = removed.difference(added)
128
 
    tree.add(sorted(new))
129
 
    tree.remove(sorted(deleted, reverse=True))
130
 
 
131
 
 
132
 
def do_import(source, tree_directory=None):
133
 
    """Implementation of import command.  Intended for UI only"""
134
 
    if tree_directory is not None:
135
 
        try:
136
 
            tree = WorkingTree.open(tree_directory)
137
 
        except NotBranchError:
138
 
            if not os.path.exists(tree_directory):
139
 
                os.mkdir(tree_directory)
140
 
            branch = BzrDir.create_branch_convenience(tree_directory)
141
 
            tree = branch.bzrdir.open_workingtree()
142
 
    else:
143
 
        tree = WorkingTree.open_containing('.')[0]
144
 
    tree.lock_write()
145
 
    try:
146
 
        if compare_trees(tree, tree.basis_tree()).has_changed():
147
 
            raise BzrCommandError("Working tree has uncommitted changes.")
148
 
 
149
 
        if (source.endswith('.tar') or source.endswith('.tar.gz') or 
150
 
            source.endswith('.tar.bz2')) or source.endswith('.tgz'):
151
 
            try:
152
 
                if source.endswith('.bz2'):
153
 
                    tar_input = BZ2File(source, 'r')
154
 
                    tar_input = StringIO(tar_input.read())
155
 
                else:
156
 
                    tar_input = file(source, 'rb')
157
 
            except IOError, e:
158
 
                if e.errno == errno.ENOENT:
159
 
                    raise NoSuchFile(source)
160
 
            try:
161
 
                import_tar(tree, tar_input)
162
 
            finally:
163
 
                tar_input.close()
164
 
    finally:
165
 
        tree.unlock()
166
 
 
167
 
class TestImport(TestCaseInTempDir):
168
 
 
169
 
    def make_tar(self, mode='w'):
170
 
        result = StringIO()
171
 
        tar_file = tarfile.open('project-0.1.tar', mode, result)
172
 
        os.mkdir('project-0.1')
173
 
        tar_file.add('project-0.1')
174
 
        os.mkdir('project-0.1/junk')
175
 
        tar_file.add('project-0.1/junk')
176
 
        
177
 
        f = file('project-0.1/README', 'wb')
178
 
        f.write('What?')
179
 
        f.close()
180
 
        tar_file.add('project-0.1/README')
181
 
 
182
 
        f = file('project-0.1/FEEDME', 'wb')
183
 
        f.write('Hungry!!')
184
 
        f.close()
185
 
        tar_file.add('project-0.1/FEEDME')
186
 
 
187
 
        tar_file.close()
188
 
        rmtree('project-0.1')
189
 
        result.seek(0)
190
 
        return result
191
 
 
192
 
    def make_tar2(self):
193
 
        result = StringIO()
194
 
        tar_file = tarfile.open('project-0.2.tar', 'w', result)
195
 
        os.mkdir('project-0.2')
196
 
        tar_file.add('project-0.2')
197
 
        
198
 
        os.mkdir('project-0.2/junk')
199
 
        tar_file.add('project-0.2/junk')
200
 
 
201
 
        f = file('project-0.2/README', 'wb')
202
 
        f.write('Now?')
203
 
        f.close()
204
 
        tar_file.add('project-0.2/README')
205
 
        tar_file.close()
206
 
 
207
 
        tar_file = tarfile.open('project-0.2.tar', 'a', result)
208
 
        tar_file.add('project-0.2/README')
209
 
 
210
 
        rmtree('project-0.2')
211
 
        return result
212
 
 
213
 
    def make_messed_tar(self):
214
 
        result = StringIO()
215
 
        tar_file = tarfile.open('project-0.1.tar', 'w', result)
216
 
        os.mkdir('project-0.1')
217
 
        tar_file.add('project-0.1')
218
 
 
219
 
        os.mkdir('project-0.2')
220
 
        tar_file.add('project-0.2')
221
 
        
222
 
        f = file('project-0.1/README', 'wb')
223
 
        f.write('What?')
224
 
        f.close()
225
 
        tar_file.add('project-0.1/README')
226
 
        tar_file.close()
227
 
        rmtree('project-0.1')
228
 
        result.seek(0)
229
 
        return result
230
 
 
231
 
    def test_top_directory(self):
232
 
        self.assertEqual(top_directory('ab/b/c'), 'ab')
233
 
        self.assertEqual(top_directory('/etc'), '/')
234
 
 
235
 
    def test_common_directory(self):
236
 
        self.assertEqual(common_directory(['ab/c/d', 'ab/c/e']), 'ab')
237
 
        self.assertIs(common_directory(['ab/c/d', 'ac/c/e']), None)
238
 
 
239
 
    def test_untar(self):
240
 
        tar_file = self.make_tar()
241
 
        tree = BzrDir.create_standalone_workingtree('tree')
242
 
        import_tar(tree, tar_file)
243
 
        self.assertTrue(tree.path2id('README') is not None) 
244
 
        self.assertTrue(tree.path2id('FEEDME') is not None)
245
 
        self.assertTrue(os.path.isfile(tree.abspath('README')))
246
 
        self.assertEqual(tree.inventory[tree.path2id('README')].kind, 'file')
247
 
        self.assertEqual(tree.inventory[tree.path2id('FEEDME')].kind, 'file')
248
 
        
249
 
        f = file(tree.abspath('junk/food'), 'wb')
250
 
        f.write('I like food\n')
251
 
        f.close()
252
 
 
253
 
        tar_file = self.make_tar2()
254
 
        import_tar(tree, tar_file)
255
 
        self.assertTrue(tree.path2id('README') is not None) 
256
 
        self.assertTrue(not os.path.exists(tree.abspath('FEEDME')))
257
 
 
258
 
 
259
 
    def test_untar2(self):
260
 
        tar_file = self.make_messed_tar()
261
 
        tree = BzrDir.create_standalone_workingtree('tree')
262
 
        import_tar(tree, tar_file)
263
 
        self.assertTrue(tree.path2id('project-0.1/README') is not None) 
264
 
 
265
 
    def test_untar_gzip(self):
266
 
        tar_file = self.make_tar(mode='w:gz')
267
 
        tree = BzrDir.create_standalone_workingtree('tree')
268
 
        import_tar(tree, tar_file)
269
 
        self.assertTrue(tree.path2id('README') is not None) 
270
 
 
271
 
 
272
 
def test_suite():
273
 
    return makeSuite(TestImport)