~abentley/bzrtools/bzrtools.dev

« back to all changes in this revision

Viewing changes to tests/upstream_import.py

  • Committer: Aaron Bentley
  • Date: 2005-09-11 03:28:31 UTC
  • Revision ID: aaron.bentley@utoronto.ca-20050911032831-7df69b7bdc4cfefd
Added new bzr patch command

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
import os
2
 
from StringIO import StringIO
3
 
from shutil import rmtree, copy2, copytree
4
 
import tarfile
5
 
import tempfile
6
 
from unittest import makeSuite
7
 
 
8
 
from bzrlib import osutils
9
 
from bzrlib.bzrdir import BzrDir
10
 
try:
11
 
    from bzrlib.plugins.bzrtools.upstream_import import (
12
 
        common_directory,
13
 
        import_tar,
14
 
        import_zip,
15
 
        import_dir,
16
 
        top_directory,
17
 
        ZipFileWrapper, 
18
 
    )
19
 
except ImportError:
20
 
    from bzrtools.upstream_import import (
21
 
        common_directory,
22
 
        import_tar,
23
 
        import_zip,
24
 
        import_dir,
25
 
        top_directory,
26
 
        ZipFileWrapper, 
27
 
    )
28
 
from bzrlib.tests import TestCaseInTempDir
29
 
 
30
 
 
31
 
class DirFileWriter(object):
32
 
 
33
 
    def __init__(self, fileobj, mode):
34
 
        # We may be asked to 'append'.  If so, fileobj already has a path.
35
 
        # So we copy the existing tree, and overwrite afterward.
36
 
        fileobj.seek(0)
37
 
        existing = fileobj.read()
38
 
        fileobj.seek(0)
39
 
        path = tempfile.mkdtemp(dir=os.getcwd())
40
 
        if existing != '':
41
 
            # copytree requires the directory not to exist
42
 
            os.rmdir(path)
43
 
            copytree(existing, path)
44
 
        fileobj.write(path)
45
 
        self.root = path
46
 
 
47
 
    def add(self, path):
48
 
        target_path = os.path.join(self.root, path)
49
 
        parent = osutils.dirname(target_path)
50
 
        if not os.path.exists(parent):
51
 
            os.makedirs(parent)
52
 
        kind = osutils.file_kind(path)
53
 
        if kind == 'file':
54
 
            copy2(path, target_path)
55
 
        if kind == 'directory':
56
 
            os.mkdir(target_path)
57
 
 
58
 
    def close(self):
59
 
        pass
60
 
 
61
 
 
62
 
class TestImport(TestCaseInTempDir):
63
 
 
64
 
    def make_tar(self, mode='w'):
65
 
        def maker(fileobj):
66
 
            return tarfile.open('project-0.1.tar', mode, fileobj)
67
 
        return self.make_archive(maker)
68
 
 
69
 
    def make_archive(self, maker, subdir=True):
70
 
        result = StringIO()
71
 
        archive_file = maker(result)
72
 
        try:
73
 
            os.mkdir('project-0.1')
74
 
            if subdir:
75
 
                prefix='project-0.1/'
76
 
                archive_file.add('project-0.1')
77
 
            else:
78
 
                prefix=''
79
 
                os.chdir('project-0.1')
80
 
            os.mkdir(prefix + 'junk')
81
 
            archive_file.add(prefix + 'junk')
82
 
            
83
 
            f = file(prefix + 'README', 'wb')
84
 
            f.write('What?')
85
 
            f.close()
86
 
            archive_file.add(prefix + 'README')
87
 
 
88
 
            f = file(prefix + 'FEEDME', 'wb')
89
 
            f.write('Hungry!!')
90
 
            f.close()
91
 
            archive_file.add(prefix + 'FEEDME')
92
 
 
93
 
            archive_file.close()
94
 
        finally:
95
 
            if not subdir:
96
 
                os.chdir('..')
97
 
            rmtree('project-0.1')
98
 
        result.seek(0)
99
 
        return result
100
 
 
101
 
    def make_archive2(self, builder, subdir):
102
 
        result = StringIO()
103
 
        archive_file = builder(result)
104
 
        os.mkdir('project-0.2')
105
 
        try:
106
 
            if subdir:
107
 
                prefix='project-0.2/'
108
 
                archive_file.add('project-0.2')
109
 
            else:
110
 
                prefix=''
111
 
                os.chdir('project-0.2')
112
 
            
113
 
            os.mkdir(prefix + 'junk')
114
 
            archive_file.add(prefix + 'junk')
115
 
 
116
 
            f = file(prefix + 'README', 'wb')
117
 
            f.write('Now?')
118
 
            f.close()
119
 
            archive_file.add(prefix + 'README')
120
 
            archive_file.close()
121
 
 
122
 
            archive_file = builder(result, 'a')
123
 
            archive_file.add(prefix + 'README')
124
 
            archive_file.close()
125
 
 
126
 
        finally:
127
 
            if not subdir:
128
 
                os.chdir('..')
129
 
        result.seek(0)
130
 
        return result
131
 
 
132
 
    def make_messed_tar(self):
133
 
        result = StringIO()
134
 
        tar_file = tarfile.open('project-0.1.tar', 'w', result)
135
 
        os.mkdir('project-0.1')
136
 
        tar_file.add('project-0.1')
137
 
 
138
 
        os.mkdir('project-0.2')
139
 
        tar_file.add('project-0.2')
140
 
        
141
 
        f = file('project-0.1/README', 'wb')
142
 
        f.write('What?')
143
 
        f.close()
144
 
        tar_file.add('project-0.1/README')
145
 
        tar_file.close()
146
 
        rmtree('project-0.1')
147
 
        result.seek(0)
148
 
        return result
149
 
 
150
 
    def make_zip(self):
151
 
        def maker(fileobj):
152
 
            return ZipFileWrapper(fileobj, 'w')
153
 
        return self.make_archive(maker)
154
 
 
155
 
    def test_top_directory(self):
156
 
        self.assertEqual(top_directory('ab/b/c'), 'ab')
157
 
        self.assertEqual(top_directory('/etc'), '/')
158
 
 
159
 
    def test_common_directory(self):
160
 
        self.assertEqual(common_directory(['ab/c/d', 'ab/c/e']), 'ab')
161
 
        self.assertIs(common_directory(['ab/c/d', 'ac/c/e']), None)
162
 
        self.assertIs(None, common_directory(['FEEDME']))
163
 
 
164
 
    def test_untar(self):
165
 
        def builder(fileobj, mode='w'):
166
 
            return tarfile.open('project-0.1.tar', mode, fileobj)
167
 
        self.archive_test(builder, import_tar)
168
 
 
169
 
    def test_unzip(self):
170
 
        def builder(fileobj, mode='w'):
171
 
            return ZipFileWrapper(fileobj, mode)
172
 
        self.archive_test(builder, import_zip)
173
 
 
174
 
    def test_copydir_nosub(self):
175
 
        def builder(fileobj, mode='w'):
176
 
            return DirFileWriter(fileobj, mode)
177
 
        # It would be bogus to test with the result in a subdirectory,
178
 
        # because for directories, the input root is always the output root.
179
 
        self.archive_test(builder, import_dir)
180
 
 
181
 
    def archive_test(self, builder, importer, subdir=False):
182
 
        archive_file = self.make_archive(builder, subdir)
183
 
        tree = BzrDir.create_standalone_workingtree('tree')
184
 
        importer(tree, archive_file)
185
 
        self.assertTrue(tree.path2id('README') is not None) 
186
 
        self.assertTrue(tree.path2id('FEEDME') is not None)
187
 
        self.assertTrue(os.path.isfile(tree.abspath('README')))
188
 
        self.assertEqual(tree.inventory[tree.path2id('README')].kind, 'file')
189
 
        self.assertEqual(tree.inventory[tree.path2id('FEEDME')].kind, 'file')
190
 
        
191
 
        f = file(tree.abspath('junk/food'), 'wb')
192
 
        f.write('I like food\n')
193
 
        f.close()
194
 
 
195
 
        archive_file = self.make_archive2(builder, subdir)
196
 
        importer(tree, archive_file)
197
 
        self.assertTrue(tree.path2id('README') is not None) 
198
 
        self.assertTrue(not os.path.exists(tree.abspath('FEEDME')))
199
 
 
200
 
 
201
 
    def test_untar2(self):
202
 
        tar_file = self.make_messed_tar()
203
 
        tree = BzrDir.create_standalone_workingtree('tree')
204
 
        import_tar(tree, tar_file)
205
 
        self.assertTrue(tree.path2id('project-0.1/README') is not None) 
206
 
 
207
 
    def test_untar_gzip(self):
208
 
        tar_file = self.make_tar(mode='w:gz')
209
 
        tree = BzrDir.create_standalone_workingtree('tree')
210
 
        import_tar(tree, tar_file)
211
 
        self.assertTrue(tree.path2id('README') is not None) 
212
 
 
213
 
def test_suite():
214
 
    return makeSuite(TestImport)