~abentley/bzrtools/bzrtools.dev

« back to all changes in this revision

Viewing changes to fai/arch/tests/test_arch.py

  • Committer: Robert Collins
  • Date: 2005-09-13 15:11:39 UTC
  • mto: (147.2.6) (364.1.3 bzrtools)
  • mto: This revision was merged to the branch mainline in revision 324.
  • Revision ID: robertc@robertcollins.net-20050913151139-9ac920fc9d7bda31
TODOification

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
# arch-tag: 6e2d3c18-a7b2-4d3c-a4ae-0fb1773e7214
 
3
# Copyright (C) 2003 Ben Burns <bburns@mailops.com>
 
4
#               2004 David Allouche <david@allouche.net>
 
5
#
 
6
#    This program is free software; you can redistribute it and/or modify
 
7
#    it under the terms of the GNU General Public License as published by
 
8
#    the Free Software Foundation; either version 2 of the License, or
 
9
#    (at your option) any later version.
 
10
#
 
11
#    This program is distributed in the hope that it will be useful,
 
12
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
#    GNU General Public License for more details.
 
15
#
 
16
#    You should have received a copy of the GNU General Public License
 
17
#    along with this program; if not, write to the Free Software
 
18
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
19
 
 
20
"""Main test suite
 
21
"""
 
22
 
 
23
import types
 
24
import sys
 
25
import os
 
26
import shutil
 
27
import arch
 
28
from arch import errors
 
29
from arch.pathname import DirName, FileName, PathName
 
30
import framework
 
31
from framework import TestCase
 
32
 
 
33
# import arch.backends.logger
 
34
# arch.backend.logger = backends.logger.DebugLogger()
 
35
 
 
36
class ArchBasic(TestCase):
 
37
 
 
38
    tests = []
 
39
 
 
40
    def default_my_id_empty(self):
 
41
        """my_id returns None when unset."""
 
42
        my_id = arch.my_id()
 
43
        self.failUnlessEqual(None, my_id)
 
44
    tests.append('default_my_id_empty')
 
45
 
 
46
    def default_archive_empty(self):
 
47
        """archives is empty when no archive is registered."""
 
48
        archives = list(arch.iter_archives())
 
49
        self.failUnlessEqual(len(archives), 0)
 
50
    tests.append('default_archive_empty')
 
51
 
 
52
    def set_my_id(self):
 
53
        """set_my_id effects arch.my_id()."""
 
54
        arch.set_my_id(self.my_id)
 
55
        my_id = arch.my_id()
 
56
        self.failUnlessEqual(my_id, self.my_id)
 
57
    tests.append('set_my_id')
 
58
 
 
59
    def set_invalid_myid(self):
 
60
        """set_my_id fails for invalid id and preserves current id."""
 
61
        arch.set_my_id(self.my_id)
 
62
        try:
 
63
            arch.set_my_id("not a valid email address")
 
64
        except arch.ExecProblem:
 
65
            my_id = arch.my_id()
 
66
            self.failUnlessEqual(my_id, self.my_id)
 
67
        else:
 
68
            self.fail("invalid email address was accepted")
 
69
    tests.append('set_invalid_myid')
 
70
 
 
71
    def make_archive(self):
 
72
        """make_archive creates and registers archive at location."""
 
73
        name = self.arch_name
 
74
        location = self.arch_dir/name
 
75
        newArchive = arch.make_archive(name, location)
 
76
        self.failUnlessEqual(newArchive.name, name)
 
77
        self.failUnlessEqual(newArchive.location, location)
 
78
        archives = list(arch.iter_archives())
 
79
        self.failUnlessEqual(len(archives), 1)
 
80
        archive = archives[0]
 
81
        self.failUnlessEqual(archive.name, name)
 
82
        self.failUnless(os.path.samefile(archive.location, location))
 
83
        return archive
 
84
    tests.append('make_archive')
 
85
 
 
86
 
 
87
class Archive(TestCase):
 
88
 
 
89
    def extraSetup(self):
 
90
        self.set_my_id()
 
91
        self.create_archive()
 
92
 
 
93
    tests = []
 
94
 
 
95
    def archive_location_readonly(self):
 
96
        """Archive.location is present and read-only."""
 
97
        self.failUnless(hasattr(self.archive, 'location'))
 
98
        def setArchiveLocation(): self.archive.location = 'error'
 
99
        self.failUnlessRaises(AttributeError, setArchiveLocation)
 
100
    tests.append('archive_location_readonly')
 
101
 
 
102
    def archive_name_readonly(self):
 
103
        """Archive.name is present and read-only."""
 
104
        self.failUnless(hasattr(self.archive, 'name'))
 
105
        def setArchiveName(): self.archive.name = 'error'
 
106
        self.failUnlessRaises(AttributeError, setArchiveName)
 
107
    tests.append('archive_name_readonly')
 
108
 
 
109
    def make_already_made_archive(self):
 
110
        """Making the same archive twice fails."""
 
111
        try: arch.make_archive(self.archive.name, self.archive.location)
 
112
        except arch.ExecProblem: pass
 
113
        else: self.fail("could create same archive twice")
 
114
    tests.append('make_already_made_archive')
 
115
 
 
116
    def archive_setup(self):
 
117
        """Version.setup works."""
 
118
        arch_name = self.arch_name
 
119
        archive = self.archive
 
120
        version_id = arch_name + "/example--test--0.1"
 
121
        version = arch.Version(version_id)
 
122
        version.setup()
 
123
        cats = list(archive.iter_categories())
 
124
        self.failUnlessEqual(len(cats), 1)
 
125
        self.failUnlessEqual(cats[0].fullname, arch_name + '/example')
 
126
        branches = list(cats[0].iter_branches())
 
127
        self.failUnlessEqual(len(branches), 1)
 
128
        self.failUnlessEqual(branches[0].fullname,
 
129
                             arch_name + '/example--test')
 
130
        versions = list(branches[0].iter_versions())
 
131
        self.failUnlessEqual(len(versions), 1)
 
132
        self.failUnlessEqual(versions[0].fullname, version_id)
 
133
    tests.append('archive_setup')
 
134
 
 
135
    def archive_setup_nameless_branch(self):
 
136
        """Version.setup (nameless branch) works."""
 
137
        arch_name = self.arch_name
 
138
        archive = self.archive
 
139
        version_id = arch_name + "/example--0.1"
 
140
        version = arch.Version(version_id)
 
141
        version.setup()
 
142
        cats = list(archive.iter_categories())
 
143
        self.failUnlessEqual(len(cats), 1)
 
144
        self.failUnlessEqual(cats[0].fullname, arch_name + '/example')
 
145
        branches = list(cats[0].iter_branches())
 
146
        self.failUnlessEqual(len(branches), 1)
 
147
        self.failUnlessEqual(list(cats[0].iter_branches())[0].fullname,
 
148
                             arch_name + '/example')
 
149
        versions = list(branches[0].iter_versions())
 
150
        self.failUnlessEqual(len(versions), 1)
 
151
        self.failUnlessEqual(versions[0].fullname, version_id)
 
152
    tests.append('archive_setup_nameless_branch')
 
153
 
 
154
    def _help_make_mirror(self, kwargs, meta_name, meta_expected):
 
155
        master = self.archive
 
156
        mirror_name = master.name + '-MIRROR'
 
157
        location = self.arch_dir/mirror_name
 
158
        mirror = master.make_mirror(mirror_name, location, **kwargs)
 
159
        meta_file = DirName(mirror.location)/'=meta-info'/meta_name
 
160
        self.failUnless(os.path.isfile(meta_file))
 
161
        meta_data = open(meta_file).read()
 
162
        self.failUnlessEqual(meta_expected, meta_data)
 
163
 
 
164
    def make_mirror(self):
 
165
        """Archive.make_mirror works for creating a -MIRROR."""
 
166
        self._help_make_mirror({}, 'mirror', self.arch_name + '\n')
 
167
    tests.append('make_mirror')
 
168
 
 
169
    def make_mirror_listing(self):
 
170
        """Archive.make_mirror can set up http-blows."""
 
171
        self._help_make_mirror({'listing': True}, 'http-blows',
 
172
                               'it sure does\n')
 
173
    tests.append('make_mirror_listing')
 
174
 
 
175
    def make_mirror_signed(self):
 
176
        """Archive.make_mirror can set up signed-archive."""
 
177
        self._help_make_mirror({'signed': True}, 'signed-archive',
 
178
                               'system cracking is (nearly always) lame\n')
 
179
    tests.append('make_mirror_signed')
 
180
 
 
181
    def meta_info(self):
 
182
        """Archive._meta_info works."""
 
183
        self.assertEqual(self.archive.name, self.archive._meta_info('name'))
 
184
    tests.append('meta_info')
 
185
 
 
186
    def open_meta_info(self, name, flags):
 
187
        return open(DirName(self.archive.location)/'=meta-info'/name, flags)
 
188
 
 
189
    def meta_info_missing(self):
 
190
        """arch.tla_tells_empty_meta_info is set correctly."""
 
191
        if arch._builtin.tla_tells_empty_meta_info:
 
192
            try:
 
193
                self.archive._meta_info('not_here')
 
194
                self.fail("arch.tla_tells_empty_meta_info is True, but"
 
195
                          " archive-meta-info exit(0) on missing meta-info.")
 
196
            except KeyError:
 
197
                pass
 
198
        else:
 
199
            try:
 
200
                self.assertEqual(None, self.archive._meta_info('not_here'))
 
201
            except KeyError:
 
202
                self.fail("arch.tla_tells_empty_meta_info is False, but"
 
203
                          " archive-meta-info exit(1) on missing meta-info.")
 
204
        self.open_meta_info('empty', 'w')
 
205
        self.assertEqual(None, self.archive._meta_info('empty'))
 
206
        print >> self.open_meta_info('empty-line', 'w')
 
207
        self.assertEqual(str(), self.archive._meta_info('empty-line'))
 
208
    tests.append('meta_info_missing')
 
209
 
 
210
    def has_meta_info(self):
 
211
        """Archive._has_meta_info works."""
 
212
        self.failIf(self.archive._has_meta_info('not_here'))
 
213
        self.open_meta_info('empty', 'w')
 
214
        if arch._builtin.tla_tells_empty_meta_info:
 
215
            self.assert_(self.archive._has_meta_info('empty'))
 
216
        else:
 
217
            self.failIf(self.archive._has_meta_info('empty'))
 
218
        print >> self.open_meta_info('empty-line', 'w')
 
219
        self.assert_(self.archive._has_meta_info('empty-line'))
 
220
        print >> self.open_meta_info('something', 'w'), 'hello'
 
221
        self.assert_(self.archive._has_meta_info('something'))
 
222
    tests.append('has_meta_info')
 
223
 
 
224
    def is_signed(self):
 
225
        """Archive.is_signed works."""
 
226
        mirror_name = self.archive.name + '-MIRROR'
 
227
        location = self.arch_dir/mirror_name
 
228
        mirror = self.archive.make_mirror(mirror_name, location, signed=True)
 
229
        self.assertEqual(False, self.archive.is_signed)
 
230
        self.assertEqual(True, mirror.is_signed)
 
231
    tests.append('is_signed')
 
232
 
 
233
    def has_listings(self):
 
234
        """Archive.has_listings works."""
 
235
        mirror_name = self.archive.name + '-MIRROR'
 
236
        location = self.arch_dir/mirror_name
 
237
        mirror = self.archive.make_mirror(mirror_name, location, listing=True)
 
238
        self.assertEqual(False, self.archive.has_listings)
 
239
        self.assertEqual(True, mirror.has_listings)
 
240
    tests.append('has_listings')
 
241
 
 
242
    def open_version(self, flags):
 
243
        return open(DirName(self.archive.location)/'.archive-version', flags)
 
244
 
 
245
    def version_string(self):
 
246
        """Archive.version_string works."""
 
247
        version_file = self.open_version('r')
 
248
        version_string = version_file.read().rstrip('\n')
 
249
        version_file.close()
 
250
        self.assertEqual(version_string, self.archive.version_string)
 
251
    tests.append('version_string')
 
252
 
 
253
 
 
254
class MirrorArchive(TestCase):
 
255
 
 
256
    def extraSetup(self):
 
257
        self.set_my_id()
 
258
        self.create_archive_and_mirror()
 
259
 
 
260
    tests = []
 
261
 
 
262
    def is_mirror(self):
 
263
        """Archive.is_mirror works."""
 
264
        self.assert_(not self.archive.is_mirror)
 
265
        self.assert_(self.mirror.is_mirror)
 
266
    tests.append('is_mirror')
 
267
 
 
268
    def official_name(self):
 
269
        """Archive.official_name works."""
 
270
        self.assertEqual(self.archive.official_name, self.archive.name)
 
271
        self.assertEqual(self.mirror.official_name, self.archive.name)
 
272
    tests.append('official_name')
 
273
 
 
274
    def mirror_official_name(self):
 
275
        """Archive.make mirror use official name."""
 
276
        mirror2_name = self.mirror.name + '-2'
 
277
        location2 = self.arch_dir/mirror2_name
 
278
        mirror2 = self.mirror.make_mirror(mirror2_name, location2)
 
279
        self.assertEqual(mirror2.official_name, self.archive.name)
 
280
    tests.append('mirror_official_name')
 
281
 
 
282
 
 
283
class InventoryTree(TestCase):
 
284
 
 
285
    """Test cases that work on a tree but need no associated archive"""
 
286
 
 
287
    def extraSetup(self):
 
288
        self.set_my_id()
 
289
        self.create_working_tree()
 
290
 
 
291
    tests = []
 
292
 
 
293
    def inventory(self):
 
294
        """Basic inventory."""
 
295
        wt = self.working_tree
 
296
        open(wt/'ahoy', 'w').write('')
 
297
        wt.add_tag(wt/'ahoy')
 
298
        os.mkdir(wt/'bar')
 
299
        wt.add_tag(wt/'bar')
 
300
        open(wt/'bar'/'baz', 'w').write('')
 
301
        wt.add_tag(wt/'bar'/'baz')
 
302
        inv = list(wt.iter_inventory(source=True, both=True))
 
303
        result = map(type, inv)
 
304
        expected = [FileName, DirName, FileName]
 
305
        self.assertEqual(expected, result)
 
306
        result = map(str, inv)
 
307
        expected = ['ahoy', 'bar', 'bar/baz']
 
308
        self.assertEqual(expected, result)
 
309
        inv = list(wt.iter_inventory(source=True, files=True))
 
310
        self.assertEqual([FileName, FileName], map(type, inv))
 
311
        self.assertEqual(['ahoy', 'bar/baz'], map(str, inv))
 
312
        inv = list(wt.iter_inventory(source=True, directories=True))
 
313
        self.assertEqual([DirName], map(type, inv))
 
314
        self.assertEqual(['bar'], map(str, inv))
 
315
    tests.append('inventory')
 
316
 
 
317
    def inventory_ids(self):
 
318
        """Inventory with ids"""
 
319
        wt = self.working_tree
 
320
        wt.tagging_method = 'names'
 
321
        open(wt/'ahoy', 'w').write('')
 
322
        os.mkdir(wt/'bar')
 
323
        open(wt/'bar'/'baz', 'w').write('')
 
324
        inv = list(wt.iter_inventory_ids(source=True, both=True))
 
325
        result = map(lambda L: type(L[1]), inv)
 
326
        expected = [FileName, DirName, FileName]
 
327
        self.assertEqual(expected, result)
 
328
        result = map(lambda L: str(L[1]), inv)
 
329
        expected = ['ahoy', 'bar', 'bar/baz']
 
330
        self.assertEqual(expected, result)
 
331
        result = map(lambda L: L[0], inv)
 
332
        expected = ['?./ahoy', '?./bar', '?./bar/baz']
 
333
        self.assertEqual(expected, result)
 
334
        inv = list(wt.iter_inventory_ids(source=True, files=True))
 
335
        self.assertEqual([FileName, FileName], map(lambda L: type(L[1]), inv))
 
336
        self.assertEqual(['ahoy', 'bar/baz'], map(lambda L: str(L[1]), inv))
 
337
        self.assertEqual(['?./ahoy', '?./bar/baz'], map(lambda L: L[0], inv))
 
338
        inv = list(wt.iter_inventory_ids(source=True, directories=True))
 
339
        self.assertEqual([DirName], map(lambda L: type(L[1]), inv))
 
340
        self.assertEqual(['bar'], map(lambda L: str(L[1]), inv))
 
341
        self.assertEqual(['?./bar'], map(lambda L: L[0], inv))
 
342
    tests.append('inventory_ids')
 
343
 
 
344
    def inventory_names(self):
 
345
        """iter_inventory names keyword overrides untagged-source setting."""
 
346
        wt = self.working_tree
 
347
        print >> open(wt/'{arch}'/'=tagging-method', 'w'),\
 
348
              '\n'.join(('explicit', 'untagged-source precious',''))
 
349
        foo = FileName('foo')
 
350
        print >> open(wt/foo, 'w')
 
351
        result = list(wt.iter_inventory(precious=True, files=True))
 
352
        self.assertEqual([foo], result) # foo is untagged-source precious
 
353
        result = list(wt.iter_inventory(source=True, files=True))
 
354
        self.assertEqual([], result) # thus foo is not source
 
355
        result = list(wt.iter_inventory(source=True, files=True, names=True))
 
356
        self.assertEqual([foo], result) # however it has a source name
 
357
    tests.append('inventory_names')
 
358
 
 
359
    def get_tag(self):
 
360
        """ArchSourceTree.get_tag works."""
 
361
        wt = self.working_tree
 
362
        wt.tagging_method = 'tagline'
 
363
        foo = FileName('foo')
 
364
        print >> open(wt/foo, 'w')
 
365
        self.assertEqual(None, wt.get_tag(foo))
 
366
        print >> open(wt/foo, 'w'), 'arch-tag: yadda'
 
367
        self.assertEqual('i_yadda', wt.get_tag(foo))
 
368
        wt.add_tag(foo)
 
369
        print >> open(wt/'.arch-ids'/foo+'.id', 'w'), 'yadda'
 
370
        self.assertEqual('x_yadda', wt.get_tag(foo))
 
371
    tests.append('get_tag')
 
372
 
 
373
    def inventory_file_space(self):
 
374
        """iter_inventory handles spaces in file and dir names correctly."""
 
375
        if tla_skip_whitespace_tests(): return
 
376
        wt = self.working_tree
 
377
        filename = FileName('file name')
 
378
        dirname = DirName('dir name')
 
379
        open(wt/filename, 'w').write('')
 
380
        wt.add_tag(filename)
 
381
        os.mkdir(wt/dirname)
 
382
        wt.add_tag(dirname)
 
383
        expected = [filename]
 
384
        result = list(wt.iter_inventory(source=True, files=True))
 
385
        self.assertEqual(expected, result) # inventory -s
 
386
        expected = [dirname]
 
387
        result = list(wt.iter_inventory(source=True, directories=True))
 
388
        self.assertEqual(expected, result) # inventory -s -d
 
389
        expected = [filename, dirname]
 
390
        expected.sort()
 
391
        result = list(wt.iter_inventory(source=True, both=True))
 
392
        result.sort()
 
393
        self.assertEqual(expected, result) # inventory -s -B
 
394
        nestedname = 'nested tree'
 
395
        os.mkdir(wt/nestedname)
 
396
        nested = arch.init_tree(wt/nestedname, nested=True)
 
397
        expected = [nested]
 
398
        result = list(wt.iter_inventory(trees=True))
 
399
        self.assertEqual(expected, result) # inventory -t
 
400
    tests.append('inventory_file_space')
 
401
 
 
402
    def inventory_limit(self):
 
403
        """Inventory with a limit"""
 
404
        wt = self.working_tree
 
405
        open(wt/'ahoy', 'w').write('')
 
406
        wt.add_tag(wt/'ahoy')
 
407
        os.mkdir(wt/'bar')
 
408
        wt.add_tag(wt/'bar')
 
409
        open(wt/'bar'/'baz', 'w').write('')
 
410
        wt.add_tag(wt/'bar'/'baz')
 
411
        inv = list(wt.iter_inventory(limit='bar', source=True, both=True))
 
412
        self.assertEqual([FileName], map(type, inv))
 
413
        self.assertEqual(['bar/baz'], map(str, inv))
 
414
        def thunk():
 
415
            return wt.iter_inventory(limit=('a', 'b'), source=True, both=True)
 
416
        self.assertRaises(TypeError, thunk)
 
417
        def thunk():
 
418
            return wt.iter_inventory(limit='/tmp', source=True, both=True)
 
419
        self.assertRaises(ValueError, thunk)
 
420
    tests.append('inventory_limit')
 
421
 
 
422
    def inventory_ids_limit(self):
 
423
        """Inventory with ids"""
 
424
        wt = self.working_tree
 
425
        wt.tagging_method = 'names'
 
426
        open(wt/'ahoy', 'w').write('')
 
427
        os.mkdir(wt/'bar')
 
428
        open(wt/'bar'/'baz', 'w').write('')
 
429
        inv = list(wt.iter_inventory_ids(limit='bar', source=True, both=True))
 
430
        self.assertEqual([FileName], map(lambda L: type(L[1]), inv))
 
431
        self.assertEqual(['bar/baz'], map(lambda L: str(L[1]), inv))
 
432
        expected, result = ['?bar/baz'], map(lambda L: L[0], inv)
 
433
        self.assertEqual(expected, result) # XXX will fail when tla is fixed
 
434
        def thunk():
 
435
            return wt.iter_inventory_ids(
 
436
                limit=('a', 'b'), source=True, both=True)
 
437
        self.assertRaises(TypeError, thunk)
 
438
        def thunk():
 
439
            return wt.iter_inventory_ids(limit='/tmp', source=True, both=True)
 
440
        self.assertRaises(ValueError, thunk)
 
441
    tests.append('inventory_ids_limit')
 
442
 
 
443
 
 
444
class InitTree(TestCase):
 
445
 
 
446
    tests = []
 
447
 
 
448
    def init_tree(self):
 
449
        """init_tree works (w/o version) for unnested and nested trees."""
 
450
        os.mkdir(self.working_dir)
 
451
        arch.init_tree(self.working_dir)
 
452
        os.mkdir(self.nested_dir)
 
453
        arch.init_tree(self.nested_dir, nested=True)
 
454
    tests.append('init_tree')
 
455
 
 
456
    def init_tree_not_nested(self):
 
457
        """init_tree (w/o version) fails for nested if "nested" is not set."""
 
458
        os.mkdir(self.working_dir)
 
459
        arch.init_tree(self.working_dir)
 
460
        os.mkdir(self.nested_dir)
 
461
        self.failUnlessRaises(errors.ExecProblem,
 
462
                              lambda: arch.init_tree(self.nested_dir))
 
463
    tests.append('init_tree_not_nested')
 
464
 
 
465
    def init_tree_with_version(self):
 
466
        """init_tree with version set tree-version and create log-version."""
 
467
        os.mkdir(self.working_dir)
 
468
        wt = arch.init_tree(self.working_dir, self.version)
 
469
        self.failUnlessEqual(self.version, wt.tree_version)
 
470
        self.failUnlessEqual(1, len(list(wt.iter_log_versions())))
 
471
        self.failUnlessEqual(self.version, wt.iter_log_versions().next())
 
472
    tests.append('init_tree_with_version')
 
473
 
 
474
    def tree_root(self):
 
475
        """tree_root works"""
 
476
        os.mkdir(self.working_dir)
 
477
        tree = arch.init_tree(self.working_dir)
 
478
        subdir = tree/'subdir'
 
479
        os.mkdir(subdir)
 
480
        subdir_tree = arch.tree_root(subdir)
 
481
        self.assertEqual(tree, subdir_tree)
 
482
    tests.append('tree_root')
 
483
 
 
484
 
 
485
class WorkingTree(TestCase):
 
486
 
 
487
    def extraSetup(self):
 
488
        self.set_my_id()
 
489
        self.create_archive()
 
490
        self.create_working_tree()
 
491
 
 
492
    tests = []
 
493
 
 
494
    def log_versions(self):
 
495
        """Log version addition, iteration and deletion."""
 
496
        wt = self.working_tree
 
497
        self.failUnlessEqual(0, len(list(wt.iter_log_versions())))
 
498
        wt.add_log_version(self.version)
 
499
        self.failUnlessEqual(1, len(list(wt.iter_log_versions())))
 
500
        self.failUnless(isinstance(wt.iter_log_versions().next(),
 
501
                                   arch.Version))
 
502
        self.failUnlessEqual(self.version, wt.iter_log_versions().next())
 
503
        wt.remove_log_version(self.version)
 
504
        self.failUnlessEqual(0, len(list(wt.iter_log_versions())))
 
505
    tests.append('log_versions')
 
506
 
 
507
    def log_versions_limit(self):
 
508
        """Log version listing with a limit."""
 
509
        wt = self.working_tree
 
510
        vsn = self.version
 
511
        jdoe_cat_brn_10 = vsn
 
512
        other_arch = arch.Version('alice@example/%s' % vsn.nonarch)
 
513
        other_cat = vsn.archive['dog']['brn']['1.0']
 
514
        other_brn = vsn.category['trk']['1.0']
 
515
        other_vsn = vsn.branch['2']
 
516
        all_versions = [vsn, other_arch, other_cat, other_brn, other_vsn]
 
517
        for V in all_versions: wt.add_log_version(V)
 
518
        all_versions.sort()
 
519
        expected = map(str, all_versions)
 
520
        expected.sort()
 
521
        result = map(str, wt.iter_log_versions())
 
522
        result.sort()
 
523
        self.assertEqual(expected, result)
 
524
        result = list(wt.iter_log_versions(other_arch.archive))
 
525
        self.assertEqual([other_arch], result)
 
526
        result = map(str, wt.iter_log_versions(vsn.archive))
 
527
        result.sort()
 
528
        expected = map(str, (vsn, other_cat, other_brn, other_vsn))
 
529
        expected.sort()
 
530
        self.assertEqual(expected, result)
 
531
        result = list(wt.iter_log_versions(other_cat.category))
 
532
        self.assertEqual([other_cat], result)
 
533
        result = map(str, wt.iter_log_versions(vsn.category))
 
534
        result.sort()
 
535
        expected = map(str, (vsn, other_brn, other_vsn))
 
536
        expected.sort()
 
537
        self.assertEqual(expected, result)
 
538
        result = list(wt.iter_log_versions(other_brn.branch))
 
539
        self.assertEqual([other_brn], result)
 
540
        result = map(str, wt.iter_log_versions(vsn.branch))
 
541
        result.sort()
 
542
        expected = map(str, (vsn, other_vsn))
 
543
        expected.sort()
 
544
        self.assertEqual(expected, result)
 
545
        result = list(wt.iter_log_versions(vsn))
 
546
        self.assertEqual([vsn], result)
 
547
    tests.append('log_versions_limit')
 
548
 
 
549
    def tree_version(self):
 
550
        """Setting and getting WorkingTree.version."""
 
551
        wt = self.working_tree
 
552
        self.failUnlessRaises(arch.errors.TreeVersionError,
 
553
                              lambda: wt.tree_version)
 
554
        try: wt.tree_version
 
555
        except arch.errors.TreeVersionError, E:
 
556
            self.assertEqual(E.bad_version, None)
 
557
        open(wt/'{arch}'/'++default-version', 'w').write('fooo!!!\n')
 
558
        try: wt.tree_version
 
559
        except arch.errors.TreeVersionError, E:
 
560
            self.assertEqual(E.bad_version, 'fooo!!!')
 
561
        self.failUnlessRaises(arch.errors.TreeVersionError,
 
562
                              lambda: wt.tree_version)
 
563
        wt.tree_version = self.version
 
564
        self.failUnless(isinstance(wt.tree_version, arch.Version))
 
565
        self.failUnlessEqual(self.version, wt.tree_version)
 
566
    tests.append('tree_version')
 
567
 
 
568
 
 
569
class TreeRevision(TestCase):
 
570
 
 
571
    def extraSetup(self):
 
572
        self.set_my_id()
 
573
        self.create_archive()
 
574
        self.create_version()
 
575
 
 
576
    tests = []
 
577
 
 
578
    def tree_revision(self):
 
579
        """tree_revision works"""
 
580
        os.mkdir(self.working_dir)
 
581
        wt = arch.init_tree(self.working_dir, self.version)
 
582
        wt.import_()
 
583
        self.assertEqual(self.version['base-0'], wt.tree_revision)
 
584
        m = wt.log_message()
 
585
        m['Summary'] = m.description = 'first revision'
 
586
        wt.commit(m)
 
587
        self.assertEqual(self.version['patch-1'], wt.tree_revision)
 
588
    tests.append('tree_revision')
 
589
 
 
590
 
 
591
class LogMessage(TestCase):
 
592
 
 
593
    def extraSetup(self):
 
594
        self.set_my_id()
 
595
        self.create_archive()
 
596
        self.create_working_tree(self.version)
 
597
        nonarch = self.version.nonarch
 
598
        archname = self.version.archive.name
 
599
        self.file_name = '++log.%s--%s' % (nonarch, archname)
 
600
 
 
601
    tests = []
 
602
 
 
603
    def create(self):
 
604
        """LogMessage creation."""
 
605
        wt = self.working_tree
 
606
        # message is None if not created
 
607
        m = wt.log_message(create=False)
 
608
        self.failUnlessEqual(m, None)
 
609
        # message is created by default
 
610
        m = wt.log_message()
 
611
        self.failIfEqual(m, None)
 
612
    tests.append('create')
 
613
 
 
614
    def name(self):
 
615
        """LogMessage.name attribute is read-only."""
 
616
        m = self.working_tree.log_message()
 
617
        self.assertEqual(m.name, self.working_tree.abspath()/m.name)
 
618
        def setLogName(): m.name = 'error'
 
619
        self.failUnlessRaises(AttributeError, setLogName)
 
620
    tests.append('name')
 
621
 
 
622
    def template(self):
 
623
        """LogMessage template has expected value and can be parsed."""
 
624
        m = self.working_tree.log_message()
 
625
        template = '\n'.join(('Summary: ', 'Keywords: ', '', ''))
 
626
        self.failUnlessEqual(file(m.name).read(), template)
 
627
        self.failUnlessEqual(m['Summary'], '')
 
628
        self.failUnlessEqual(m['Keywords'], '')
 
629
        self.failUnlessEqual(m.description, '')
 
630
    tests.append('template')
 
631
 
 
632
    def setup_message(self):
 
633
        self.description = '\n'.join(("Hello,  ", "  World!"))
 
634
        # leading space in summary dropped, trailing preserved
 
635
        self.summary = "Short summary  "
 
636
        self.message = '\n'.join(('Summary: ' + self.summary,
 
637
                                  'Keywords: ', '', self.description))
 
638
        m = self.message
 
639
        m = self.working_tree.log_message()
 
640
        m.description = self.description
 
641
        m['Summary'] = self.summary
 
642
        return m
 
643
 
 
644
    def check_message_file(self, m):
 
645
        self.failUnlessEqual(file(m.name).read(), self.message)
 
646
 
 
647
    def check_message_object(self, m):
 
648
        self.failUnlessEqual(m['Summary'], self.summary)
 
649
        self.failUnlessEqual(m['Keywords'], '')
 
650
        self.failUnlessEqual(m.description, self.description)
 
651
 
 
652
    def save_load(self):
 
653
        """LogMessage save() and load() behave consistently."""
 
654
        m = self.setup_message()
 
655
        self.check_message_object(m)
 
656
        m.save()
 
657
        self.check_message_file(m)
 
658
        m.load()
 
659
        self.check_message_object(m)
 
660
    tests.append('save_load')
 
661
 
 
662
    def recreate(self):
 
663
        """Recreating LogMessage does not overwrite existing message."""
 
664
        m = self.setup_message()
 
665
        m.save()
 
666
        del m
 
667
        m = self.working_tree.log_message()
 
668
        self.check_message_file(m)
 
669
        self.check_message_object(m)
 
670
    tests.append('recreate')
 
671
 
 
672
    def reload_(self):
 
673
        """Reloading LogMessage discards unsaved changes."""
 
674
        m = self.setup_message()
 
675
        m.save()
 
676
        m['Summary'] = 'foo'
 
677
        m.load()
 
678
        self.check_message_file(m)
 
679
        self.check_message_object(m)
 
680
    tests.append('reload_')
 
681
 
 
682
    def clear(self):
 
683
        """LogMessage.clear() method."""
 
684
        m = self.setup_message()
 
685
        m.clear()
 
686
        self.failUnlessEqual(m['Summary'], None)
 
687
        self.failUnlessEqual(m.description, None)
 
688
        m.save()
 
689
        self.failUnlessEqual(file(m.name).read(), '\n')
 
690
    tests.append('clear')
 
691
 
 
692
 
 
693
class LogMessageWhitespace(TestCase):
 
694
 
 
695
    def extraSetup(self):
 
696
        self.set_my_id()
 
697
        self.create_archive()
 
698
 
 
699
    tests = []
 
700
 
 
701
    def log_message_whitespace(self):
 
702
        """LogMessage files created in a path containing a space."""
 
703
        os.mkdir(self.working_dir_newline)
 
704
        wt = arch.init_tree(self.working_dir_newline, self.version)
 
705
        m = wt.log_message()
 
706
        template = '\n'.join(('Summary: ', 'Keywords: ', '', ''))
 
707
        self.failUnlessEqual(file(m.name).read(), template)
 
708
    tests.append('log_message_whitespace')
 
709
 
 
710
 
 
711
class Import(TestCase):
 
712
 
 
713
    def extraSetup(self):
 
714
        self.set_my_id()
 
715
        self.create_archive()
 
716
        self.create_version()
 
717
        self.create_working_tree(self.version)
 
718
 
 
719
    tests = []
 
720
 
 
721
    def import_without_log(self):
 
722
        """WorkingTree.import w/o log message produces expected message."""
 
723
        wt = self.working_tree
 
724
        wt.import_()
 
725
        logs = tuple(wt.iter_logs())
 
726
        self.failUnlessEqual(1, len(logs))
 
727
        log = logs[0]
 
728
        self.failUnlessEqual(log.revision, self.version['base-0'])
 
729
        self.failUnlessEqual(log.summary, 'initial import')
 
730
        self.failUnlessEqual(log.description,
 
731
                             '\n(automatically generated log message)\n')
 
732
    tests.append('import_without_log')
 
733
 
 
734
    def _import_with_log_helper(self, m, import_):
 
735
        wt = arch.WorkingTree(self.working_dir)
 
736
        s = "empty initial import"
 
737
        m['Summary'] = s
 
738
        d = "Empty initial import.\n"
 
739
        m.description = d
 
740
        import_()
 
741
        logs = tuple(wt.iter_logs())
 
742
        self.failUnlessEqual(1, len(logs))
 
743
        log = logs[0]
 
744
        self.failUnlessEqual(log.revision, self.version['base-0'])
 
745
        self.failUnlessEqual(log.summary, s)
 
746
        self.failUnlessEqual(log.description, d)
 
747
 
 
748
    def import_with_log(self):
 
749
        """WorkingTree.import with default log message file name."""
 
750
        wt = self.working_tree
 
751
        m = wt.log_message()
 
752
        def import_():
 
753
            m.save()
 
754
            wt.import_()
 
755
        self._import_with_log_helper(m, import_)
 
756
    tests.append('import_with_log')
 
757
 
 
758
    def import_with_custom_log(self):
 
759
        """WorkingTree.import with specified log message file name."""
 
760
        wt = self.working_tree
 
761
        m = arch.LogMessage(wt/'+custom_log_message')
 
762
        m.clear()
 
763
        def import_():
 
764
            wt.import_(m)
 
765
        self._import_with_log_helper(m, import_)
 
766
    tests.append('import_with_custom_log')
 
767
 
 
768
    def import_not_empty(self):
 
769
        """WorkingTree.import fails on unrecognized and adds source."""
 
770
        wt = self.working_tree
 
771
        # unrecognized file
 
772
        print >> file(wt/'!foo', 'w'), 'bar'
 
773
        def import_default(): wt.import_()
 
774
        self.failUnlessRaises(errors.ExecProblem, import_default)
 
775
        os.remove(wt/'!foo')
 
776
        # source file
 
777
        print >> file(wt/'source', 'w'), 'hello'
 
778
        wt.add_tag('source')
 
779
        wt.import_()
 
780
        log = wt.iter_logs().next()
 
781
        self.failUnlessEqual(list(log.new_files), [FileName('source')])
 
782
    tests.append('import_not_empty')
 
783
 
 
784
 
 
785
class Commit(TestCase):
 
786
 
 
787
    def extraSetup(self):
 
788
        self.set_my_id()
 
789
        self.create_archive()
 
790
        self.create_version()
 
791
        self.create_working_tree(self.version)
 
792
 
 
793
    tests = []
 
794
 
 
795
    def commit_seal_fix(self):
 
796
        """Commit, seal, fix, create expected patchlevels."""
 
797
        wt = self.working_tree
 
798
        wt.import_()
 
799
        # commit w/o log message
 
800
        self.failUnlessRaises(errors.ExecProblem, lambda: wt.commit())
 
801
        # commit revision
 
802
        m = wt.log_message()
 
803
        m['Summary'] = m.description = 'first revision'
 
804
        wt.commit(m)
 
805
        self.failUnlessEqual(map(lambda(x): x.revision, wt.iter_logs()),
 
806
                             map(lambda(x): self.version[x],
 
807
                                 ('base-0', 'patch-1')))
 
808
        # fix non-sealed, then seal
 
809
        m['Summary'] = m.description = 'second revision'
 
810
        self.failUnlessRaises(errors.ExecProblem,
 
811
                              lambda: wt.commit(fix=True, log=m))
 
812
        wt.commit(seal=True, log=m)
 
813
        self.failUnlessEqual(map(lambda(x): x.revision, wt.iter_logs()),
 
814
                             map(lambda(x): self.version[x],
 
815
                                 ('base-0', 'patch-1', 'version-0')))
 
816
        # commit sealed, then fix
 
817
        m['Summary'] = m.description = 'third revision'
 
818
        self.failUnlessRaises(errors.ExecProblem,
 
819
                              lambda: wt.commit(m))
 
820
        wt.commit(fix=True, log=m)
 
821
        self.failUnlessEqual(map(lambda(x): x.revision, wt.iter_logs()),
 
822
                             map(lambda(x): self.version[x],
 
823
                                 ('base-0', 'patch-1',
 
824
                                  'version-0', 'versionfix-1')))
 
825
        # sanity catches trying to fix and seal at the same time
 
826
        m['Summary'] = m.description = 'error'
 
827
        self.failUnlessRaises(AssertionError,
 
828
                              lambda: wt.commit(seal=True, fix=True, log=m))
 
829
    tests.append('commit_seal_fix')
 
830
 
 
831
    def commit_relative_log(self):
 
832
        """Commit with a log file given as a relative path."""
 
833
        wt = self.working_tree
 
834
        wt.import_()
 
835
        tree_dir, tree_name = map(DirName, os.path.split(wt))
 
836
        os.chdir(tree_dir)
 
837
        log_name = tree_name/'+log'
 
838
        f = open(log_name, 'w')
 
839
        summary = 'first rev'
 
840
        f.write('Summary: %s\n\n' % summary)
 
841
        f.close()
 
842
        wt.commit(log=log_name)
 
843
        def lastlog(): return wt.iter_logs(reverse=True).next()
 
844
        self.failUnlessEqual(lastlog().summary, summary)
 
845
    tests.append('commit_relative_log')
 
846
 
 
847
    def commit_files(self):
 
848
        """Commit (maybe with limit) and Patchlog are consistent."""
 
849
        wt = self.working_tree
 
850
        # import two files
 
851
        print >> file(wt/'f1', 'w'), 'hello'
 
852
        print >> file(wt/'f2', 'w'), 'world'
 
853
        map(wt.add_tag, ('f1', 'f2'))
 
854
        wt.import_()
 
855
        def lastlog(): return wt.iter_logs(reverse=True).next()
 
856
        self.failUnlessEqual(lastlog().new_files, map(FileName, ('f1','f2')))
 
857
        # modify both files
 
858
        print >> file(wt/'f1', 'w'), 'Hello,'
 
859
        print >> file(wt/'f2', 'w'), 'World!'
 
860
        # sanity catches non-null but empty file-list
 
861
        m = wt.log_message()
 
862
        m['Summary'] = m.description = 'first revision'
 
863
        def commit_empty(): wt.commit(log=m, file_list=[].__iter__())
 
864
        self.failUnlessRaises(AssertionError, commit_empty)
 
865
        # commit one file, absolute path
 
866
        wt.commit(m, file_list=[wt/'f2'].__iter__())
 
867
        self.failUnlessEqual(lastlog().modified_files, [FileName('f2')])
 
868
        # commit the other file, relative path
 
869
        m['Summary'] = m.description = 'third revision'
 
870
        wt.commit(m, file_list=['f1'])
 
871
        self.failUnlessEqual(lastlog().modified_files, [FileName('f1')])
 
872
    tests.append('commit_files')
 
873
 
 
874
    def commit_out_of_date(self):
 
875
        """Committing out-of-date fails unless forced."""
 
876
        wt = self.working_tree
 
877
        wt.import_()
 
878
        m = wt.log_message()
 
879
        m['Summary'] = m.description = 'first revision'
 
880
        wt.commit(m)
 
881
        shutil.rmtree(self.working_dir)
 
882
        wt = self.version['base-0'].get(self.working_dir)
 
883
        m['Summary'] = m.description = 'out of date revision'
 
884
        # trying to commit an out-of-date tree fails
 
885
        self.failUnlessRaises(errors.ExecProblem, lambda: wt.commit(m))
 
886
        # unless the right option is set
 
887
        wt.commit(m, out_of_date_ok=True)
 
888
        self.failUnlessEqual(map(lambda(x): x.revision, wt.iter_logs()),
 
889
                             map(lambda(x): self.version[x],
 
890
                                 ('base-0', 'patch-2')))
 
891
    tests.append('commit_out_of_date')
 
892
 
 
893
    def commit_version(self):
 
894
        """Commit on a specified version."""
 
895
        wt = self.working_tree
 
896
        wt.import_()
 
897
        def think(**kw):
 
898
            def thunk():
 
899
                m = wt.log_message()
 
900
                m['Summary'] = m.description = 'first revision'
 
901
                wt.commit(m, **kw)
 
902
            return thunk
 
903
        wt.tree_version = self.other_version
 
904
        self.assertRaises(errors.ExecProblem, think())
 
905
        think(version=self.version)()
 
906
        wt.tree_version = self.version
 
907
        self.failUnlessEqual(map(lambda(x): x.revision, wt.iter_logs()),
 
908
                             map(lambda(x): self.version[x],
 
909
                                 ('base-0', 'patch-1')))
 
910
    tests.append('commit_version')
 
911
 
 
912
    def iter_commit(self):
 
913
        """iter_commit works"""
 
914
        wt = self.working_tree
 
915
        wt.tagging_method = 'names'
 
916
        wt.import_()
 
917
        print >> file(wt/'f1', 'w'), 'hello'
 
918
        m = wt.log_message()
 
919
        m['Summary'] = m.description = 'first revision'
 
920
        lines = wt.iter_commit(m)
 
921
        changes = [L for L in lines if isinstance(L, arch.TreeChange)]
 
922
        self.assertEqual(1, len(changes))
 
923
        self.assertEqual(arch.FileAddition, type(changes[0]))
 
924
        self.assertEqual('f1', changes[0].name)
 
925
    tests.append('iter_commit')
 
926
 
 
927
    def iter_commit_files(self):
 
928
        """Commit with limit print the right path."""
 
929
        wt = self.working_tree
 
930
        print >> file(wt/'f1', 'w'), 'hello'
 
931
        print >> file(wt/'f2', 'w'), 'world'
 
932
        map(wt.add_tag, ('f1', 'f2'))
 
933
        wt.import_()
 
934
        print >> file(wt/'f1', 'w'), 'Hello,'
 
935
        print >> file(wt/'f2', 'w'), 'World!'
 
936
        m = wt.log_message()
 
937
        m['Summary'] = m.description = 'first revision'
 
938
        lines = wt.iter_commit(m, file_list=['f2'].__iter__())
 
939
        changes = [L for L in lines if isinstance(L, arch.TreeChange)]
 
940
        self.assertEqual(1, len(changes))
 
941
        self.assertEqual(arch.FileModification, type(changes[0]))
 
942
        self.assertEqual('f2', changes[0].name)
 
943
        # commit the other file, relative path
 
944
        m['Summary'] = m.description = 'first revision'
 
945
        lines = wt.iter_commit(m, file_list=['f1'])
 
946
        changes = [L for L in lines if isinstance(L, arch.TreeChange)]
 
947
        self.assertEqual(1, len(changes))
 
948
        self.assertEqual(arch.FileModification, type(changes[0]))
 
949
        self.assertEqual('f1', changes[0].name)
 
950
    tests.append('iter_commit_files')
 
951
 
 
952
 
 
953
def tla_skip_whitespace_tests(quiet=False):
 
954
    from arch.backends import tla
 
955
    status = tla.status_cmd(['escape'], expected=(0,1))
 
956
    if status == 1 and not quiet:
 
957
        sys.stdout.write('[SKIP] ')
 
958
        sys.stdout.flush()
 
959
    return status == 1
 
960
 
 
961
 
 
962
class EscapedWhitespace(TestCase):
 
963
 
 
964
    tests = []
 
965
 
 
966
    def extraSetup(self):
 
967
        self.set_my_id()
 
968
        self.create_archive()
 
969
        self.create_version()
 
970
        self.create_working_tree(self.version)
 
971
 
 
972
    def setUp(self):
 
973
        TestCase.setUp(self)
 
974
        if tla_skip_whitespace_tests(quiet=True): return
 
975
        self.tree = self.working_tree
 
976
        self.file_name = FileName('foo bar')
 
977
        open(self.tree/self.file_name, 'w').write('')
 
978
        self.tree.add_tag(self.file_name)
 
979
        tagfile = self.tree/'.arch-ids'/self.file_name+'.id'
 
980
        print >> open(tagfile, 'w'), 'baby jesus cries'
 
981
        self.tree.import_()
 
982
 
 
983
    def new_files_escaped(self):
 
984
        """New-files from patchlog with spaces"""
 
985
        if tla_skip_whitespace_tests(): return
 
986
        expected = [self.file_name]
 
987
        result = list(self.tree.iter_logs())[0].new_files
 
988
        self.assertEqual(expected, result)
 
989
    tests.append('new_files_escaped')
 
990
 
 
991
    def modified_files_escaped(self):
 
992
        """Modified-files from patchlog with spaces"""
 
993
        if tla_skip_whitespace_tests(): return
 
994
        open(self.tree/self.file_name, 'w').write('Hello, World!')
 
995
        m = self.tree.log_message()
 
996
        m['Summary'] = 'first revision'
 
997
        self.tree.commit(m)
 
998
        expected = [self.file_name]
 
999
        result = list(self.tree.iter_logs())[1].modified_files
 
1000
        self.assertEqual(expected, result)
 
1001
    tests.append('modified_files_escaped')
 
1002
 
 
1003
    def removed_files_escaped(self):
 
1004
        """Removed-files from patchlog with spaces"""
 
1005
        if tla_skip_whitespace_tests(): return
 
1006
        name = self.file_name
 
1007
        self.tree.delete(name)
 
1008
        m = self.tree.log_message()
 
1009
        m['Summary'] = 'first revision'
 
1010
        self.tree.commit(m)
 
1011
        expected = [name, name.dirname()/'.arch-ids'/name.basename()+'.id']
 
1012
        result = list(self.tree.iter_logs())[1].removed_files
 
1013
        expected.sort()
 
1014
        result.sort()
 
1015
        self.assertEqual(expected, result)
 
1016
    tests.append('removed_files_escaped')
 
1017
 
 
1018
    def get_index_escaped(self):
 
1019
        """Changeset.get_index supports escaped file names."""
 
1020
        if tla_skip_whitespace_tests(): return
 
1021
        open(self.tree/self.file_name, 'w').write('Hello, World!')
 
1022
        m = self.tree.log_message()
 
1023
        m['Summary'] = 'first revision'
 
1024
        self.tree.commit(m)
 
1025
        cset = self.version['patch-1'].get_patch(self.tree/',cset')
 
1026
        expected = { 'x_baby_jesus_cries': self.file_name }
 
1027
        result = cset.get_index('orig-files')
 
1028
        self.assertEqual(expected, result)
 
1029
    tests.append('get_index_escaped')
 
1030
 
 
1031
    def file_find_pristine_escaped(self):
 
1032
        """file_find with pristine supports escaped names."""
 
1033
        if tla_skip_whitespace_tests(): return
 
1034
        revision = self.version['base-0']
 
1035
        result = self.tree.file_find(self.file_name, revision)
 
1036
        pristine = self.tree.find_pristine(revision)
 
1037
        expected = pristine/self.file_name
 
1038
        self.assertEqual(expected, result)
 
1039
    tests.append('file_find_pristine_escaped')
 
1040
 
 
1041
 
 
1042
class LatestVersion(TestCase):
 
1043
 
 
1044
    def extraSetup(self):
 
1045
        self.set_my_id()
 
1046
 
 
1047
    tests = []
 
1048
 
 
1049
    def latest_version(self):
 
1050
        return self.version.branch.latest_version()
 
1051
 
 
1052
    def not_registered(self):
 
1053
        """Branch.latest_version ValueError if archive is not registered."""
 
1054
        self.assertRaises(ValueError, self.latest_version)
 
1055
    tests.append('not_registered')
 
1056
 
 
1057
    def missing_branch(self):
 
1058
        """Branch.latest_version ValueError if branch does not exist."""
 
1059
        self.create_archive()
 
1060
        self.assertRaises(ValueError, self.latest_version)
 
1061
    tests.append('missing_branch')
 
1062
 
 
1063
    def empty_branch(self):
 
1064
        """Branch.latest_version ValueError if branch is empty."""
 
1065
        self.create_archive()
 
1066
        self.create_branch()
 
1067
        self.assertRaises(ValueError, self.latest_version)
 
1068
    tests.append('empty_branch')
 
1069
 
 
1070
    def works(self):
 
1071
        """Branch.latest_version works."""
 
1072
        self.create_archive()
 
1073
        self.create_version()
 
1074
        version = self.version
 
1075
        branch = version.branch
 
1076
        self.assertEqual(version, branch.latest_version())
 
1077
        next_version = arch.Version(version.fullname + '.1')
 
1078
        next_version.setup()
 
1079
        self.assertEqual(next_version, branch.latest_version())
 
1080
    tests.append('works')
 
1081
 
 
1082
 
 
1083
class LatestRevision(TestCase):
 
1084
 
 
1085
    def extraSetup(self):
 
1086
        self.set_my_id()
 
1087
 
 
1088
    tests = []
 
1089
 
 
1090
    def latest_revision(self):
 
1091
        return self.version.latest_revision()
 
1092
 
 
1093
    def not_registered(self):
 
1094
        """Version.latest_revision ValueError if archive is not registered."""
 
1095
        self.assertRaises(ValueError, self.latest_revision)
 
1096
    tests.append('not_registered')
 
1097
 
 
1098
    def missing_version(self):
 
1099
        """Version.latest_revision ValueError if version does not exist."""
 
1100
        self.create_archive()
 
1101
        self.assertRaises(ValueError, self.latest_revision)
 
1102
    tests.append('missing_version')
 
1103
 
 
1104
    def empty_version(self):
 
1105
        """Version.latest_revision ValueError if version is empty."""
 
1106
        self.create_archive()
 
1107
        self.create_version()
 
1108
        self.assertRaises(ValueError, self.latest_revision)
 
1109
    tests.append('empty_version')
 
1110
 
 
1111
    def works(self):
 
1112
        """Version.latest_revision works."""
 
1113
        self.create_archive()
 
1114
        self.create_version()
 
1115
        self.create_working_tree(self.version)
 
1116
        tree = self.working_tree
 
1117
        tree.import_()
 
1118
        version = self.version
 
1119
        self.assertEqual(version['base-0'], version.latest_revision())
 
1120
        self.commit_summary(tree, 'first revision')
 
1121
        self.assertEqual(version['patch-1'], version.latest_revision())
 
1122
    tests.append('works')
 
1123
 
 
1124
 
 
1125
class PopulatedArchive(TestCase):
 
1126
 
 
1127
    def extraSetup(self):
 
1128
        self.set_my_id()
 
1129
        self.create_archive()
 
1130
        self.create_version()
 
1131
        self.create_working_tree(self.version)
 
1132
 
 
1133
    def setUp(self):
 
1134
        TestCase.setUp(self)
 
1135
        self.tree = self.working_tree
 
1136
        self.tree.import_()
 
1137
        self.commit_summary(self.tree, 'first revision')
 
1138
 
 
1139
    tests = []
 
1140
 
 
1141
    def cachedrevs(self):
 
1142
        """Cached revisions."""
 
1143
        def ls_cached():
 
1144
            gen = self.version.iter_cachedrevs()
 
1145
            self.assert_(isinstance(gen, types.GeneratorType))
 
1146
            return list(gen)
 
1147
        self.assertEqual(ls_cached(), [])
 
1148
        self.version['base-0'].cache()
 
1149
        self.assertEqual(ls_cached(), [self.version['base-0']])
 
1150
        self.version['patch-1'].cache()
 
1151
        self.assertEqual(ls_cached(),
 
1152
                         map(lambda(x): self.version[x],
 
1153
                             ('base-0', 'patch-1')))
 
1154
        self.version['base-0'].uncache()
 
1155
        self.assertEqual(ls_cached(), [self.version['patch-1']])
 
1156
        self.version['patch-1'].uncache()
 
1157
        self.assertEqual(ls_cached(), [])
 
1158
    tests.append('cachedrevs')
 
1159
 
 
1160
    def iter_logs(self):
 
1161
        """SourceTree.iter_log for the tree-version."""
 
1162
        def ls_logs_revision(vsn=None):
 
1163
            gen = self.tree.iter_logs(vsn)
 
1164
            self.assert_(isinstance(gen, types.GeneratorType))
 
1165
            return map(lambda(x): x.revision, gen)
 
1166
        rvsns = map(lambda(x): self.version[x], ('base-0', 'patch-1'))
 
1167
        self.assertEqual(ls_logs_revision(), rvsns)
 
1168
        self.assertEqual(ls_logs_revision(self.version), rvsns)
 
1169
        self.assertEqual(ls_logs_revision(self.version.fullname), rvsns)
 
1170
    tests.append('iter_logs')
 
1171
 
 
1172
    def continuation_from_version(self):
 
1173
        """Create a continuation version (from version)."""
 
1174
        version = self.version
 
1175
        other_version = self.other_version
 
1176
        assert not other_version.exists()
 
1177
        version.latest_revision().make_continuation(other_version)
 
1178
        self.failUnless(other_version.exists())
 
1179
        revisions = tuple(other_version.iter_revisions())
 
1180
        other_base = other_version['base-0']
 
1181
        self.failUnlessEqual(revisions, (other_base,))
 
1182
        summary = other_base.patchlog.summary
 
1183
        self.failUnlessEqual(summary, 'tag of %s' % version.latest_revision())
 
1184
    tests.append('continuation_from_version')
 
1185
 
 
1186
    def mirror_basic(self):
 
1187
        """Archive.mirror without fromto, pushing."""
 
1188
        master = self.archive
 
1189
        mirror_name = master.name + '-MIRROR'
 
1190
        location = self.arch_dir/mirror_name
 
1191
        mirror = master.make_mirror(mirror_name, location)
 
1192
        master.mirror()
 
1193
        def nonarch(x): return x.nonarch
 
1194
        master_revisions = map(nonarch, master.iter_revisions())
 
1195
        mirror_revisions = map(nonarch, mirror.iter_revisions())
 
1196
        self.failUnlessEqual(master_revisions, mirror_revisions)
 
1197
    tests.append('mirror_basic')
 
1198
 
 
1199
    def mirror_fromto(self):
 
1200
        """Archive.mirror with fromto."""
 
1201
        master = self.archive
 
1202
        mirror_name = master.name + '-MIRROR'
 
1203
        mirror = master.make_mirror(mirror_name, self.arch_dir/mirror_name)
 
1204
        master.mirror()
 
1205
        third_name = master.name + '-third'
 
1206
        third = master.make_mirror(third_name, self.arch_dir/third_name)
 
1207
        master.mirror(fromto=(mirror, third))
 
1208
        def nonarch(x): return x.nonarch
 
1209
        mirror_revisions = map(nonarch, mirror.iter_revisions())
 
1210
        third_revisions = map(nonarch, third.iter_revisions())
 
1211
        self.failUnlessEqual(mirror_revisions, third_revisions)
 
1212
    tests.append('mirror_fromto')
 
1213
 
 
1214
    def mirror_limit(self):
 
1215
        """Archive.mirror with limit version"""
 
1216
        version = self.version
 
1217
        other_version = self.other_version
 
1218
        version.latest_revision().make_continuation(other_version)
 
1219
        master = self.archive
 
1220
        mirror_name = master.name + '-MIRROR'
 
1221
        location = self.arch_dir/mirror_name
 
1222
        mirror = master.make_mirror(mirror_name, location)
 
1223
        master.mirror(limit=(version,))
 
1224
        def nonarch(x): return x.nonarch
 
1225
        mirror_versions = tuple(map(nonarch, mirror.iter_versions()))
 
1226
        self.failUnlessEqual(mirror_versions, (version.nonarch,))
 
1227
    tests.append('mirror_limit')
 
1228
 
 
1229
    def _check_logs(self, tree, levels):
 
1230
        expected_revs = map(lambda (x): self.version[x], levels)
 
1231
        found_revs = map(lambda (x): x.revision, tree.iter_logs())
 
1232
        self.assertEqual(expected_revs, found_revs)
 
1233
 
 
1234
    def tree_get(self):
 
1235
        """Get a working tree, given a revision."""
 
1236
        shutil.rmtree(self.working_dir)
 
1237
        tree = self.version['base-0'].get(self.working_dir)
 
1238
        self._check_logs(tree, ['base-0'])
 
1239
    tests.append('tree_get')
 
1240
 
 
1241
    def tree_update(self):
 
1242
        """WorkingTree.update, without options."""
 
1243
        shutil.rmtree(self.working_dir)
 
1244
        tree = self.version['base-0'].get(self.working_dir)
 
1245
        tree.update()
 
1246
        self._check_logs(tree, ['base-0', 'patch-1'])
 
1247
    tests.append('tree_update')
 
1248
 
 
1249
 
 
1250
class Merges(PopulatedArchive):
 
1251
 
 
1252
    tests = []
 
1253
 
 
1254
    def setup_first_merge(self):
 
1255
        version = self.version
 
1256
        other_version = self.other_version
 
1257
        tree = self.tree
 
1258
        version.latest_revision().make_continuation(other_version)
 
1259
        other_tree = other_version['base-0'].get(self.other_working_dir)
 
1260
        self.other_tree = other_tree
 
1261
        self.commit_summary(tree, 'second rev')
 
1262
        self.commit_summary(tree, 'third rev')
 
1263
 
 
1264
    def star_merge(self):
 
1265
        """star-merge with a version"""
 
1266
        self.setup_first_merge()
 
1267
        other_tree = self.other_tree
 
1268
        other_tree.star_merge(self.version)
 
1269
        msg = 'merge'
 
1270
        self.commit_summary(other_tree, msg)
 
1271
        log = self.other_version['patch-1'].patchlog
 
1272
        self.assertEqual(log.summary, msg)
 
1273
        vsn = self.version
 
1274
        expected = [vsn['patch-2'], vsn['patch-3']]
 
1275
        self.assertEqual(log.merged_patches, expected)
 
1276
    tests.append('star_merge')
 
1277
 
 
1278
    def setup_star_merge(self):
 
1279
        version, other_version = self.version, self.other_version
 
1280
        open(self.tree/'foo', 'w').write('hello')
 
1281
        self.tree.add_tag('foo')
 
1282
        self.commit_summary(self.tree, 'second rev')
 
1283
        version.latest_revision().make_continuation(other_version)
 
1284
        other_tree = other_version['base-0'].get(self.other_working_dir)
 
1285
        self.other_tree = other_tree
 
1286
        open(self.tree/'foo', 'w').write('hello world')
 
1287
        self.commit_summary(self.tree, 'third rev')
 
1288
 
 
1289
    def assertObject(self, obj, class_, **attrs):
 
1290
        self.assert_(isinstance(obj, class_))
 
1291
        for k, v in attrs.items():
 
1292
            self.assertEqual(getattr(obj, k), v)
 
1293
 
 
1294
    def iter_star_merge(self):
 
1295
        """star-merge incremental output"""
 
1296
        self.setup_star_merge()
 
1297
        changes = {}
 
1298
        for line in self.other_tree.iter_star_merge(self.version):
 
1299
            if isinstance(line, arch.Chatter):
 
1300
                continue
 
1301
            changes[line.name] = line
 
1302
        expected = (('foo', arch.FileModification, {'binary': False}),
 
1303
                    ('{arch}/cat/cat--brn/cat--brn--1.0/'
 
1304
                     'jdoe@example.com--example--9999/patch-log/patch-3',
 
1305
                     arch.FileAddition, {'directory': False}))
 
1306
        self.assertEqual(len(expected), len(changes))
 
1307
        for name, class_, attrs in expected:
 
1308
            self.assert_(name in changes)
 
1309
            change = changes[name]
 
1310
            self.assertObject(change, class_, **attrs)
 
1311
    tests.append('iter_star_merge')
 
1312
 
 
1313
    def iter_star_merge_conflict(self):
 
1314
        """incremental star-merge conflict"""
 
1315
        self.setup_star_merge()
 
1316
        open(self.tree/'foo', 'w').write('hello world')
 
1317
        self.commit_summary(self.tree, 'third rev')
 
1318
        open(self.other_tree/'foo', 'w').write('Hello, World!')
 
1319
        self.commit_summary(self.other_tree, 'first rev')
 
1320
        cset_app = self.other_tree.iter_star_merge(self.version)
 
1321
        for unused in cset_app: pass
 
1322
        self.assert_(cset_app.conflicting)
 
1323
    tests.append('iter_star_merge_conflict')
 
1324
 
 
1325
    def setup_merges(self):
 
1326
        self.setup_first_merge()
 
1327
        version, other_version = self.version, self.other_version
 
1328
        tree, other_tree = self.tree, self.other_tree
 
1329
        other_tree.star_merge(version)
 
1330
        self.commit_summary(other_tree, 'merge patch-3')
 
1331
        self.commit_summary(other_tree, 'empty merge')
 
1332
 
 
1333
    def depth_first(self, merges_iter):
 
1334
        merges = []
 
1335
        for i, F in merges_iter:
 
1336
            for f in F:
 
1337
                merges.append([i, f])
 
1338
        return merges
 
1339
 
 
1340
    def iter_depth(self, reverse=False):
 
1341
        """Version.iter_merges depth first"""
 
1342
        self.setup_merges()
 
1343
        vsn, oth = self.version, self.other_version
 
1344
        expected = [[oth['base-0'], vsn['base-0']],
 
1345
                    [oth['base-0'], vsn['patch-1']],
 
1346
                    [oth['base-0'], oth['base-0']],
 
1347
                    [oth['patch-1'], vsn['patch-2']],
 
1348
                    [oth['patch-1'], vsn['patch-3']],
 
1349
                    [oth['patch-1'], oth['patch-1']],
 
1350
                    [oth['patch-2'], oth['patch-2']]]
 
1351
        if reverse: expected.reverse()
 
1352
        merges_iter = self.other_version.iter_merges(reverse=reverse)
 
1353
        merges = self.depth_first(merges_iter)
 
1354
        self.assertEqual(expected, merges)
 
1355
    tests.append('iter_depth')
 
1356
 
 
1357
    def iter_depth_reverse(self):
 
1358
        """Version.iter_merges depth first in reverse"""
 
1359
        self.iter_depth(reverse=True)
 
1360
    tests.append('iter_depth_reverse')
 
1361
 
 
1362
    def iter_not_me(self):
 
1363
        """Version.iter_merges metoo=False"""
 
1364
        self.setup_merges()
 
1365
        vsn, oth = self.version, self.other_version
 
1366
        expected = [[oth['base-0'], vsn['base-0']],
 
1367
                    [oth['base-0'], vsn['patch-1']],
 
1368
                    [oth['patch-1'], vsn['patch-2']],
 
1369
                    [oth['patch-1'], vsn['patch-3']]]
 
1370
        merges = self.depth_first(self.other_version.iter_merges(metoo=False))
 
1371
        self.assertEqual(expected, merges)
 
1372
    tests.append('iter_not_me')
 
1373
 
 
1374
    def iter_version(self):
 
1375
        """Version.iter_merges limited to a single source version"""
 
1376
        self.setup_merges()
 
1377
        vsn, oth = self.version, self.other_version
 
1378
        expected = [[oth['base-0'], oth['base-0']],
 
1379
                    [oth['patch-1'], oth['patch-1']],
 
1380
                    [oth['patch-2'], oth['patch-2']]]
 
1381
        merges_iter = self.other_version.iter_merges(self.other_version)
 
1382
        merges = self.depth_first(merges_iter)
 
1383
        self.assertEqual(expected, merges)
 
1384
    tests.append('iter_version')
 
1385
 
 
1386
    def breadth_first(self, iterator):
 
1387
        merges = map(list, iterator)
 
1388
        for step in merges:
 
1389
            step[1] = list(step[1])
 
1390
        return merges
 
1391
 
 
1392
    def iter_breadth(self):
 
1393
        """Version.iter_merges with breadth first traversal."""
 
1394
        self.setup_merges()
 
1395
        merges_iter = self.other_version.iter_merges()
 
1396
        merges = self.breadth_first(merges_iter)
 
1397
        vsn, oth = self.version, self.other_version
 
1398
        expected = [[oth['base-0'], [vsn['base-0'],
 
1399
                                     vsn['patch-1'],
 
1400
                                     oth['base-0']]],
 
1401
                    [oth['patch-1'], [vsn['patch-2'],
 
1402
                                      vsn['patch-3'],
 
1403
                                      oth['patch-1']]],
 
1404
                    [oth['patch-2'], [oth['patch-2']]]]
 
1405
        self.assertEqual(expected, merges)
 
1406
    tests.append('iter_breadth')
 
1407
 
 
1408
 
 
1409
class RevisionFiles(PopulatedArchive):
 
1410
 
 
1411
    tests = []
 
1412
 
 
1413
    def parse_checksum(self):
 
1414
        """Parsing checksum file data"""
 
1415
        checksum = '\n'.join((
 
1416
            "-----BEGIN PGP SIGNED MESSAGE-----",
 
1417
            "Hash: SHA1",
 
1418
            "",
 
1419
            "Signature-for: ddaa@ddaa.net--2004/pyarch--ddaa--0.5--base-0",
 
1420
            "md5 pyarch--ddaa--0.5--base-0.tar.gz"
 
1421
            " f1998e8294e50070993471cec6b66b52",
 
1422
            "foo bar-baz-blah 0123456789abcdef",
 
1423
            "sha1 pyarch--ddaa--0.5--base-0.tar.gz"
 
1424
            " 633c0efcceab4e6588a04a9ce73233bc206bcdf9",
 
1425
            "-----BEGIN PGP SIGNATURE-----",
 
1426
            "Version: GnuPG v1.2.4 (GNU/Linux)",
 
1427
            "",
 
1428
            "iD8DBQFAUQ/bZEH9AkgfRL0RAiiFAKCnV9jenFvCsckbIigtQmHYfZgcKwCgk5FY",
 
1429
            "XQwvigYzMiiR9lFZh6vYUJM=",
 
1430
            "=PCU/",
 
1431
            "-----END PGP SIGNATURE-----",
 
1432
            ""))
 
1433
        rvsn  = arch.Revision('ddaa@ddaa.net--2004/pyarch--ddaa--0.5--base-0')
 
1434
        checksums = rvsn._parse_checksum(checksum)
 
1435
        expected = {'pyarch--ddaa--0.5--base-0.tar.gz':
 
1436
                    {'md5': 'f1998e8294e50070993471cec6b66b52',
 
1437
                     'sha1': '633c0efcceab4e6588a04a9ce73233bc206bcdf9'},
 
1438
                    'bar-baz-blah': {'foo': '0123456789abcdef'}}
 
1439
        self.assertEqual(expected, checksums)
 
1440
    tests.append('parse_checksum')
 
1441
 
 
1442
    def _help_file_names(self, rvsn, expected_names):
 
1443
        files = rvsn.iter_files()
 
1444
        file_names = map(lambda (x): x.name, files)
 
1445
        file_names.sort()
 
1446
        expected_names.sort()
 
1447
        self.assertEqual(file_names, expected_names)
 
1448
 
 
1449
    def import_files(self):
 
1450
        """Import revisions have expected files."""
 
1451
        rvsn  = self.version['base-0']
 
1452
        expected_names = ['checksum', 'log', rvsn.nonarch + '.src.tar.gz' ]
 
1453
        self._help_file_names(rvsn, expected_names)
 
1454
    tests.append('import_files')
 
1455
 
 
1456
    def commit_files(self):
 
1457
        """Commit revisions (uncached) have expected files."""
 
1458
        rvsn = self.version['patch-1']
 
1459
        expected_names = ['checksum', 'log', rvsn.nonarch + '.patches.tar.gz']
 
1460
        self._help_file_names(rvsn, expected_names)
 
1461
    tests.append('commit_files')
 
1462
 
 
1463
    def cachedrev_files(self):
 
1464
        """Commit revisions (cached) have expected files."""
 
1465
        rvsn = self.version['patch-1']
 
1466
        rvsn.cache()
 
1467
        expected_names = ['checksum', 'log', rvsn.nonarch + '.patches.tar.gz',
 
1468
                          'checksum.cacherev', rvsn.nonarch + '.tar.gz']
 
1469
        self._help_file_names(rvsn, expected_names)
 
1470
    tests.append('cachedrev_files')
 
1471
 
 
1472
    def revision_files_data(self):
 
1473
        """Accessing the data of Revision files."""
 
1474
        rvsn = self.version['patch-1']
 
1475
        rvsn.cache()
 
1476
        rev_dir = reduce(
 
1477
            lambda a,b: a/DirName(b), \
 
1478
            (self.arch_dir, self.arch_name,
 
1479
             'cat', 'cat--brn', 'cat--brn--1.0', 'patch-1'))
 
1480
        for rev_file in rvsn.iter_files():
 
1481
            self.assert_(rev_file.data == open(rev_dir/rev_file.name).read())
 
1482
    tests.append('revision_files_data')
 
1483
 
 
1484
 
 
1485
class PristineTrees(PopulatedArchive):
 
1486
 
 
1487
    tests = []
 
1488
 
 
1489
    def iter_pristines_one(self):
 
1490
        """iter_pristines works"""
 
1491
        result = list(self.tree.iter_pristines())
 
1492
        expected = [self.version['patch-1']]
 
1493
        self.assertEqual(expected, result)
 
1494
    tests.append('iter_pristines_one')
 
1495
 
 
1496
    def add_pristine(self):
 
1497
        """add_pristine works"""
 
1498
        self.tree.add_pristine(self.version['base-0'])
 
1499
        result = list(self.tree.iter_pristines())
 
1500
        expected = [self.version['base-0'], self.version['patch-1']]
 
1501
        self.assertEqual(expected, result)
 
1502
    tests.append('add_pristine')
 
1503
 
 
1504
    def find_pristine(self):
 
1505
        """find_pristine works"""
 
1506
        revision = self.version['patch-1']
 
1507
        result = self.tree.find_pristine(revision)
 
1508
        expected = (self.tree / '{arch}' / '++pristine-trees' / 'unlocked' /
 
1509
                    self.version.category.nonarch /
 
1510
                    self.version.branch.nonarch / self.version.nonarch /
 
1511
                    self.archive.name / revision.nonarch)
 
1512
        self.assertEqual(expected, result)
 
1513
        self.assertEqual(arch.ArchSourceTree, type(result))
 
1514
    tests.append('find_pristine')
 
1515
 
 
1516
    def find_pristine_missing(self):
 
1517
        """find_pristine raises NoPristineFoundError"""
 
1518
        revision = self.version['base-0']
 
1519
        self.assert_(revision not in self.tree.iter_pristines())
 
1520
        def thunk(): return self.tree.find_pristine(revision)
 
1521
        self.assertRaises(arch.errors.NoPristineFoundError, thunk)
 
1522
    tests.append('find_pristine_missing')
 
1523
 
 
1524
 
 
1525
class Changeset(PopulatedArchive):
 
1526
 
 
1527
    tests = []
 
1528
 
 
1529
    def setUp(self):
 
1530
        PopulatedArchive.setUp(self)
 
1531
        self.base0 = self.version['base-0']
 
1532
        self.patch1 = self.version['patch-1']
 
1533
        self.base0_tree = self.base0.get(self.arch_dir/'base0')
 
1534
        self.patch1_tree = self.patch1.get(self.arch_dir/'patch1')
 
1535
        self.changeset_name = self.arch_dir/'cset'
 
1536
 
 
1537
    def changeset_does_nothing(self):
 
1538
        """Changeset.does_nothing works."""
 
1539
        cset = arch.delta(
 
1540
            self.patch1_tree, self.patch1_tree, self.changeset_name)
 
1541
        self.assertEqual(True, cset.does_nothing)
 
1542
        shutil.rmtree(cset)
 
1543
        cset = arch.delta(
 
1544
            self.base0_tree, self.patch1_tree, self.changeset_name)
 
1545
        self.assertEqual(False, cset.does_nothing)
 
1546
    tests.append('changeset_does_nothing')
 
1547
 
 
1548
 
 
1549
class Delta(Changeset):
 
1550
 
 
1551
    tests = []
 
1552
 
 
1553
    def _patchlog_name(self, rvsn):
 
1554
        return '{arch}/%s/%s/%s/%s/patch-log/%s' % \
 
1555
               (rvsn.category.nonarch, rvsn.branch.nonarch,
 
1556
                rvsn.version.nonarch, rvsn.archive.name, rvsn.patchlevel)
 
1557
 
 
1558
    def _delta_helper(self, orig, mod):
 
1559
        name = self._patchlog_name(self.patch1)
 
1560
        expected_created = [('A_./'+name, name)]
 
1561
        cset = arch.delta(orig, mod, self.changeset_name)
 
1562
        created = list(cset.iter_created_files(True))
 
1563
        self.assertEqual(expected_created, created)
 
1564
 
 
1565
    def delta_tree_tree(self):
 
1566
        """delta(tree, tree) works."""
 
1567
        self._delta_helper(self.base0_tree, self.patch1_tree)
 
1568
    tests.append('delta_tree_tree')
 
1569
 
 
1570
    def delta_tree_revision(self):
 
1571
        """delta(tree, revision) works."""
 
1572
        self._delta_helper(self.base0_tree, self.patch1)
 
1573
    tests.append('delta_tree_revision')
 
1574
 
 
1575
    def delta_revision_tree(self):
 
1576
        """delta(revision, tree) works."""
 
1577
        self._delta_helper(self.base0, self.patch1_tree)
 
1578
    tests.append('delta_revision_tree')
 
1579
 
 
1580
    def delta_revision_revision(self):
 
1581
        """delta(revision, revision) works."""
 
1582
        self._delta_helper(self.base0, self.patch1)
 
1583
    tests.append('delta_revision_revision')
 
1584
 
 
1585
    def iter_delta(self):
 
1586
        """iter_delta works."""
 
1587
        orig, mod = self.base0, self.patch1
 
1588
        name = self._patchlog_name(self.patch1)
 
1589
        idelta = arch.iter_delta(orig, mod, self.changeset_name)
 
1590
        def get_changeset(): return idelta.changeset
 
1591
        self.assertRaises(AttributeError, get_changeset)
 
1592
        lines = [ L for L in idelta if not isinstance(L, arch.Chatter)]
 
1593
        self.assertEqual(1, len(lines))
 
1594
        self.assertEqual(arch.FileAddition, type(lines[0]))
 
1595
        self.assertEqual(name, lines[0].name)
 
1596
        cset = get_changeset()
 
1597
        created = list(cset.iter_created_files(True))
 
1598
        expected_created = [('A_./'+name, name)]
 
1599
        self.assertEqual(expected_created, created)
 
1600
    tests.append('iter_delta')
 
1601
 
 
1602
 
 
1603
class TreeChanges(Delta):
 
1604
    tests = []
 
1605
 
 
1606
    def changes(self):
 
1607
        """WorkingTree.changes works"""
 
1608
        cset = self.tree.changes(None, self.changeset_name)
 
1609
        self.assertEqual(True, cset.does_nothing)
 
1610
        shutil.rmtree(cset)
 
1611
        cset = self.tree.changes(self.base0, self.changeset_name)
 
1612
        log_name = self._patchlog_name(self.patch1)
 
1613
        expected = [('A_./%s' % log_name, log_name)]
 
1614
        newfiles = list(cset.iter_created_files(True))
 
1615
        self.assertEqual(expected, newfiles)
 
1616
        shutil.rmtree(cset)
 
1617
        open(self.tree/'foo', 'w').write("hello\n")
 
1618
        self.tree.add_tag('foo')
 
1619
        cset = self.tree.changes(None, self.changeset_name)
 
1620
        expected = ['.arch-ids/foo.id', 'foo']
 
1621
        newfiles = [C[1] for C in cset.iter_created_files(True)]
 
1622
        newfiles.sort()
 
1623
        self.assertEqual(expected, newfiles)
 
1624
    tests.append('changes')
 
1625
 
 
1626
 
 
1627
class FileHistory(TestCase):
 
1628
 
 
1629
    file_history = [
 
1630
        { '%add': ['foo'], 'foo': "hello world\n" },
 
1631
        { 'foo': "Hello, World\n" } ]
 
1632
 
 
1633
    def extraSetup(self):
 
1634
        self.set_my_id()
 
1635
        self.create_archive()
 
1636
        self.create_version()
 
1637
        self.create_working_tree(self.version)
 
1638
 
 
1639
    def setUp(self):
 
1640
        TestCase.setUp(self)
 
1641
        self.tree = self.working_tree
 
1642
        self.make_history(self.tree, self.file_history)
 
1643
 
 
1644
 
 
1645
class Patchlog(TestCase):
 
1646
 
 
1647
    def extraSetup(self):
 
1648
        self.set_my_id()
 
1649
        self.create_archive()
 
1650
        self.create_version()
 
1651
        self.create_working_tree(self.version)
 
1652
 
 
1653
    def setUp(self):
 
1654
        TestCase.setUp(self)
 
1655
        self.tree = self.working_tree
 
1656
 
 
1657
    tests = []
 
1658
 
 
1659
    def renames(self):
 
1660
        """Patchlog.renamed_files works."""
 
1661
        file_history = [
 
1662
            { '%add': ['foo', 'bar'], 'foo': "hello\n", 'bar': "world\n" },
 
1663
            { '%mv': [['foo', 'spam'], ['bar', 'eggs']] } ]
 
1664
        self.make_history(self.tree, file_history)
 
1665
        log = list(self.tree.iter_logs())[1]
 
1666
        def filetag(name):
 
1667
            name = FileName(name)
 
1668
            return name.dirname()/'.arch-ids'/name.basename()+'.id'
 
1669
        expected = {'foo':'spam', filetag('foo'): filetag('spam'),
 
1670
                    'bar':'eggs', filetag('bar'): filetag('eggs')}
 
1671
        self.assertEqual(expected, log.renamed_files)
 
1672
    tests.append('renames')
 
1673
 
 
1674
    def get_missing(self):
 
1675
        """Patchlog getitem on a missing header returns None."""
 
1676
        self.tree.import_()
 
1677
        log = self.tree.tree_revision.patchlog
 
1678
        self.assertEqual(None, log['Not-Present-Header'])
 
1679
    tests.append('get_missing')
 
1680
 
 
1681
    def revision(self):
 
1682
        """Patchlog.revision works."""
 
1683
        self.tree.import_()
 
1684
        rvsn = self.tree.tree_revision
 
1685
        self.assertEqual(rvsn, rvsn.patchlog.revision)
 
1686
    tests.append('revision')
 
1687
 
 
1688
    def summary(self):
 
1689
        """Patchlog.summary works."""
 
1690
        self.tree.import_()
 
1691
        rvsn = self.tree.tree_revision
 
1692
        self.assertEqual("initial import", rvsn.patchlog.summary)
 
1693
    tests.append('summary')
 
1694
 
 
1695
 
 
1696
class FileFind(FileHistory):
 
1697
 
 
1698
    tests = []
 
1699
 
 
1700
    def file_find_pristine(self):
 
1701
        """file_find works with pristines"""
 
1702
        revision = self.version['patch-1']
 
1703
        path = self.tree.file_find('foo', revision)
 
1704
        self.assertEqual(PathName, type(path))
 
1705
        pristine = self.tree.find_pristine(revision)
 
1706
        expected = pristine/'foo'
 
1707
        self.assertEqual(expected, path)
 
1708
        result = open(path).read()
 
1709
        expected = self.file_history[1]['foo']
 
1710
        self.assertEqual(expected, result)
 
1711
    tests.append('file_find_pristine')
 
1712
 
 
1713
    def file_find_revlib(self):
 
1714
        """file_find works with the revision library"""
 
1715
        revlib_dir = self.arch_dir/'revlib'
 
1716
        os.mkdir(revlib_dir)
 
1717
        arch.register_revision_library(revlib_dir)
 
1718
        revision = self.version['patch-1']
 
1719
        shutil.rmtree(self.tree/'{arch}'/'++pristine-trees')
 
1720
        self.assertEqual([], list(self.tree.iter_pristines()))
 
1721
        revision.library_add()
 
1722
        path = self.tree.file_find('foo', revision)
 
1723
        self.assertEqual(PathName, type(path))
 
1724
        pristine = revision.library_find()
 
1725
        expected = pristine/'foo'
 
1726
        self.assertEqual(expected, path)
 
1727
        result = open(path).read()
 
1728
        expected = self.file_history[1]['foo']
 
1729
        self.assertEqual(expected, result)
 
1730
    tests.append('file_find_revlib')
 
1731
 
 
1732
    def file_find_pristine_missing(self):
 
1733
        """file_find works with missing pristine"""
 
1734
        revision = self.version['base-0']
 
1735
        path = self.tree.file_find('foo', revision)
 
1736
        self.assertEqual(PathName, type(path))
 
1737
        pristine = self.tree.find_pristine(revision)
 
1738
        expected = pristine/'foo'
 
1739
        self.assertEqual(expected, path)
 
1740
        result = open(path).read()
 
1741
        expected = self.file_history[0]['foo']
 
1742
        self.assertEqual(expected, result)
 
1743
    tests.append('file_find_pristine_missing')
 
1744
 
 
1745
    def _file_find_raises_helper(self, flavour):
 
1746
        revision = self.version['patch-1']
 
1747
        name = {'source': 'bar', 'junk': ',bar'}[flavour]
 
1748
        open(self.tree/name, 'w').write('Yadda!\n')
 
1749
        if flavour == 'source': self.tree.add_tag(name)
 
1750
        def thunk(): return self.tree.file_find(name, revision)
 
1751
        self.assertRaises(arch.errors.MissingFileError, thunk)
 
1752
 
 
1753
    def file_find_new_raises(self):
 
1754
        """file_find raises MissingFileError for new source files"""
 
1755
        self._file_find_raises_helper('source')
 
1756
    tests.append('file_find_new_raises')
 
1757
 
 
1758
    def file_find_junk_raises(self):
 
1759
        """file_find raises MissingFileError for junk files"""
 
1760
        self._file_find_raises_helper('junk')
 
1761
    tests.append('file_find_junk_raises')
 
1762
 
 
1763
 
 
1764
class ApplyChangeset(FileHistory, Delta):
 
1765
 
 
1766
    tests = []
 
1767
 
 
1768
    def setUp(self):
 
1769
        FileHistory.setUp(self)
 
1770
        shutil.rmtree(self.working_dir)
 
1771
        self.tree = None
 
1772
        self.base0 = self.version['base-0']
 
1773
        self.patch1 = self.version['patch-1']
 
1774
        self.changeset_name = self.arch_dir/'cset'
 
1775
        self.cset = arch.delta(self.base0, self.patch1, self.changeset_name)
 
1776
 
 
1777
    def iter_apply(self):
 
1778
        """Changeset.iter_apply works"""
 
1779
        tree = self.base0.get(self.working_dir)
 
1780
        lines = list(self.cset.iter_apply(tree))
 
1781
        self.assertEqual(2, len(lines))
 
1782
        self.assertEqual(arch.FileAddition, type(lines[0]))
 
1783
        self.assertEqual(self._patchlog_name(self.patch1), lines[0].name)
 
1784
        self.assertEqual(arch.FileModification, type(lines[1]))
 
1785
        self.assertEqual('foo', lines[1].name)
 
1786
        self.assertEqual(self.patch1, list(tree.iter_logs())[-1].revision)
 
1787
        self.assert_(not tree.has_changes())
 
1788
    tests.append('iter_apply')
 
1789
 
 
1790
    def iter_apply_conflict(self):
 
1791
        """Changeset.iter_apply works with conflicts."""
 
1792
        tree = self.patch1.get(self.working_dir)
 
1793
        lines = list(self.cset.iter_apply(tree))
 
1794
        self.assertEqual(2, len(lines))
 
1795
        self.assertEqual(arch.FileAddition, type(lines[0]))
 
1796
        self.assertEqual(self._patchlog_name(self.patch1), lines[0].name)
 
1797
        self.assertEqual(arch.PatchConflict, type(lines[1]))
 
1798
        self.assertEqual('foo', lines[1].name)
 
1799
        self.assertEqual(self.patch1, list(tree.iter_logs())[-1].revision)
 
1800
        self.assert_(not tree.has_changes())
 
1801
    tests.append('iter_apply_conflict')
 
1802
 
 
1803
    def iter_apply_reverse(self):
 
1804
        """Changeset.iter_apply reverse=True works."""
 
1805
        tree = self.patch1.get(self.working_dir)
 
1806
        lines = list(self.cset.iter_apply(tree, reverse=True))
 
1807
        self.assertEqual(arch.FileDeletion, type(lines[0]))
 
1808
        self.assertEqual(self._patchlog_name(self.patch1), lines[0].name)
 
1809
        self.assertEqual(arch.FileModification, type(lines[1]))
 
1810
        self.assertEqual('foo', lines[1].name)
 
1811
        self.assertEqual(self.base0, list(tree.iter_logs())[-1].revision)
 
1812
        self.assert_(not tree.has_changes())
 
1813
    tests.append('iter_apply_reverse')
 
1814
 
 
1815
 
 
1816
def test_suite(limit=()):
 
1817
    classes = (
 
1818
        'ArchBasic',
 
1819
        'Archive',
 
1820
        'MirrorArchive',
 
1821
        'InitTree',
 
1822
        'InventoryTree',
 
1823
        'WorkingTree',
 
1824
        'TreeRevision',
 
1825
        'LogMessage',
 
1826
        'LogMessageWhitespace',
 
1827
        'Import',
 
1828
        'Commit',
 
1829
        'EscapedWhitespace',
 
1830
        'LatestVersion',
 
1831
        'LatestRevision',
 
1832
        'PopulatedArchive',
 
1833
        'Merges',
 
1834
        'RevisionFiles',
 
1835
        'PristineTrees',
 
1836
        'Changeset',
 
1837
        'Delta',
 
1838
        'TreeChanges',
 
1839
        'Patchlog',
 
1840
        'FileFind',
 
1841
        'ApplyChangeset',
 
1842
        )
 
1843
    return framework.make_test_suite(globals(), classes, limit)
 
1844
 
 
1845
def main(argv):
 
1846
    return framework.run_test_suite(argv, test_suite)
 
1847
 
 
1848
if __name__ == "__main__":
 
1849
    sys.exit(main(sys.argv))