~abentley/bzrtools/bzrtools.dev

« back to all changes in this revision

Viewing changes to fai/arch/tests/test_arch.py

  • Committer: Aaron Bentley
  • Date: 2005-06-07 18:52:04 UTC
  • Revision ID: abentley@panoramicfeedback.com-20050607185204-5fc1f0e3d393b909
Added NEWS, obsoleted bzr-pull

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/env python
2
 
# arch-tag: 6e2d3c18-a7b2-4d3c-a4ae-0fb1773e7214
3
 
# Copyright (C) 2003 Ben Burns <bburns@mailops.com>
4
 
#               2004 David Allouche <david@allouche.net>
5
 
#
6
 
#    This program is free software; you can redistribute it and/or modify
7
 
#    it under the terms of the GNU General Public License as published by
8
 
#    the Free Software Foundation; either version 2 of the License, or
9
 
#    (at your option) any later version.
10
 
#
11
 
#    This program is distributed in the hope that it will be useful,
12
 
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
#    GNU General Public License for more details.
15
 
#
16
 
#    You should have received a copy of the GNU General Public License
17
 
#    along with this program; if not, write to the Free Software
18
 
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
 
 
20
 
"""Main test suite
21
 
"""
22
 
 
23
 
import types
24
 
import sys
25
 
import os
26
 
import shutil
27
 
import arch
28
 
from arch import errors
29
 
from arch.pathname import DirName, FileName, PathName
30
 
import framework
31
 
from framework import TestCase
32
 
 
33
 
# import arch.backends.logger
34
 
# arch.backend.logger = backends.logger.DebugLogger()
35
 
 
36
 
class ArchBasic(TestCase):
37
 
 
38
 
    tests = []
39
 
 
40
 
    def default_my_id_empty(self):
41
 
        """my_id returns None when unset."""
42
 
        my_id = arch.my_id()
43
 
        self.failUnlessEqual(None, my_id)
44
 
    tests.append('default_my_id_empty')
45
 
 
46
 
    def default_archive_empty(self):
47
 
        """archives is empty when no archive is registered."""
48
 
        archives = list(arch.iter_archives())
49
 
        self.failUnlessEqual(len(archives), 0)
50
 
    tests.append('default_archive_empty')
51
 
 
52
 
    def set_my_id(self):
53
 
        """set_my_id effects arch.my_id()."""
54
 
        arch.set_my_id(self.my_id)
55
 
        my_id = arch.my_id()
56
 
        self.failUnlessEqual(my_id, self.my_id)
57
 
    tests.append('set_my_id')
58
 
 
59
 
    def set_invalid_myid(self):
60
 
        """set_my_id fails for invalid id and preserves current id."""
61
 
        arch.set_my_id(self.my_id)
62
 
        try:
63
 
            arch.set_my_id("not a valid email address")
64
 
        except arch.ExecProblem:
65
 
            my_id = arch.my_id()
66
 
            self.failUnlessEqual(my_id, self.my_id)
67
 
        else:
68
 
            self.fail("invalid email address was accepted")
69
 
    tests.append('set_invalid_myid')
70
 
 
71
 
    def make_archive(self):
72
 
        """make_archive creates and registers archive at location."""
73
 
        name = self.arch_name
74
 
        location = self.arch_dir/name
75
 
        newArchive = arch.make_archive(name, location)
76
 
        self.failUnlessEqual(newArchive.name, name)
77
 
        self.failUnlessEqual(newArchive.location, location)
78
 
        archives = list(arch.iter_archives())
79
 
        self.failUnlessEqual(len(archives), 1)
80
 
        archive = archives[0]
81
 
        self.failUnlessEqual(archive.name, name)
82
 
        self.failUnless(os.path.samefile(archive.location, location))
83
 
        return archive
84
 
    tests.append('make_archive')
85
 
 
86
 
 
87
 
class Archive(TestCase):
88
 
 
89
 
    def extraSetup(self):
90
 
        self.set_my_id()
91
 
        self.create_archive()
92
 
 
93
 
    tests = []
94
 
 
95
 
    def archive_location_readonly(self):
96
 
        """Archive.location is present and read-only."""
97
 
        self.failUnless(hasattr(self.archive, 'location'))
98
 
        def setArchiveLocation(): self.archive.location = 'error'
99
 
        self.failUnlessRaises(AttributeError, setArchiveLocation)
100
 
    tests.append('archive_location_readonly')
101
 
 
102
 
    def archive_name_readonly(self):
103
 
        """Archive.name is present and read-only."""
104
 
        self.failUnless(hasattr(self.archive, 'name'))
105
 
        def setArchiveName(): self.archive.name = 'error'
106
 
        self.failUnlessRaises(AttributeError, setArchiveName)
107
 
    tests.append('archive_name_readonly')
108
 
 
109
 
    def make_already_made_archive(self):
110
 
        """Making the same archive twice fails."""
111
 
        try: arch.make_archive(self.archive.name, self.archive.location)
112
 
        except arch.ExecProblem: pass
113
 
        else: self.fail("could create same archive twice")
114
 
    tests.append('make_already_made_archive')
115
 
 
116
 
    def archive_setup(self):
117
 
        """Version.setup works."""
118
 
        arch_name = self.arch_name
119
 
        archive = self.archive
120
 
        version_id = arch_name + "/example--test--0.1"
121
 
        version = arch.Version(version_id)
122
 
        version.setup()
123
 
        cats = list(archive.iter_categories())
124
 
        self.failUnlessEqual(len(cats), 1)
125
 
        self.failUnlessEqual(cats[0].fullname, arch_name + '/example')
126
 
        branches = list(cats[0].iter_branches())
127
 
        self.failUnlessEqual(len(branches), 1)
128
 
        self.failUnlessEqual(branches[0].fullname,
129
 
                             arch_name + '/example--test')
130
 
        versions = list(branches[0].iter_versions())
131
 
        self.failUnlessEqual(len(versions), 1)
132
 
        self.failUnlessEqual(versions[0].fullname, version_id)
133
 
    tests.append('archive_setup')
134
 
 
135
 
    def archive_setup_nameless_branch(self):
136
 
        """Version.setup (nameless branch) works."""
137
 
        arch_name = self.arch_name
138
 
        archive = self.archive
139
 
        version_id = arch_name + "/example--0.1"
140
 
        version = arch.Version(version_id)
141
 
        version.setup()
142
 
        cats = list(archive.iter_categories())
143
 
        self.failUnlessEqual(len(cats), 1)
144
 
        self.failUnlessEqual(cats[0].fullname, arch_name + '/example')
145
 
        branches = list(cats[0].iter_branches())
146
 
        self.failUnlessEqual(len(branches), 1)
147
 
        self.failUnlessEqual(list(cats[0].iter_branches())[0].fullname,
148
 
                             arch_name + '/example')
149
 
        versions = list(branches[0].iter_versions())
150
 
        self.failUnlessEqual(len(versions), 1)
151
 
        self.failUnlessEqual(versions[0].fullname, version_id)
152
 
    tests.append('archive_setup_nameless_branch')
153
 
 
154
 
    def _help_make_mirror(self, kwargs, meta_name, meta_expected):
155
 
        master = self.archive
156
 
        mirror_name = master.name + '-MIRROR'
157
 
        location = self.arch_dir/mirror_name
158
 
        mirror = master.make_mirror(mirror_name, location, **kwargs)
159
 
        meta_file = DirName(mirror.location)/'=meta-info'/meta_name
160
 
        self.failUnless(os.path.isfile(meta_file))
161
 
        meta_data = open(meta_file).read()
162
 
        self.failUnlessEqual(meta_expected, meta_data)
163
 
 
164
 
    def make_mirror(self):
165
 
        """Archive.make_mirror works for creating a -MIRROR."""
166
 
        self._help_make_mirror({}, 'mirror', self.arch_name + '\n')
167
 
    tests.append('make_mirror')
168
 
 
169
 
    def make_mirror_listing(self):
170
 
        """Archive.make_mirror can set up http-blows."""
171
 
        self._help_make_mirror({'listing': True}, 'http-blows',
172
 
                               'it sure does\n')
173
 
    tests.append('make_mirror_listing')
174
 
 
175
 
    def make_mirror_signed(self):
176
 
        """Archive.make_mirror can set up signed-archive."""
177
 
        self._help_make_mirror({'signed': True}, 'signed-archive',
178
 
                               'system cracking is (nearly always) lame\n')
179
 
    tests.append('make_mirror_signed')
180
 
 
181
 
    def meta_info(self):
182
 
        """Archive._meta_info works."""
183
 
        self.assertEqual(self.archive.name, self.archive._meta_info('name'))
184
 
    tests.append('meta_info')
185
 
 
186
 
    def open_meta_info(self, name, flags):
187
 
        return open(DirName(self.archive.location)/'=meta-info'/name, flags)
188
 
 
189
 
    def meta_info_missing(self):
190
 
        """arch.tla_tells_empty_meta_info is set correctly."""
191
 
        if arch._builtin.tla_tells_empty_meta_info:
192
 
            try:
193
 
                self.archive._meta_info('not_here')
194
 
                self.fail("arch.tla_tells_empty_meta_info is True, but"
195
 
                          " archive-meta-info exit(0) on missing meta-info.")
196
 
            except KeyError:
197
 
                pass
198
 
        else:
199
 
            try:
200
 
                self.assertEqual(None, self.archive._meta_info('not_here'))
201
 
            except KeyError:
202
 
                self.fail("arch.tla_tells_empty_meta_info is False, but"
203
 
                          " archive-meta-info exit(1) on missing meta-info.")
204
 
        self.open_meta_info('empty', 'w')
205
 
        self.assertEqual(None, self.archive._meta_info('empty'))
206
 
        print >> self.open_meta_info('empty-line', 'w')
207
 
        self.assertEqual(str(), self.archive._meta_info('empty-line'))
208
 
    tests.append('meta_info_missing')
209
 
 
210
 
    def has_meta_info(self):
211
 
        """Archive._has_meta_info works."""
212
 
        self.failIf(self.archive._has_meta_info('not_here'))
213
 
        self.open_meta_info('empty', 'w')
214
 
        if arch._builtin.tla_tells_empty_meta_info:
215
 
            self.assert_(self.archive._has_meta_info('empty'))
216
 
        else:
217
 
            self.failIf(self.archive._has_meta_info('empty'))
218
 
        print >> self.open_meta_info('empty-line', 'w')
219
 
        self.assert_(self.archive._has_meta_info('empty-line'))
220
 
        print >> self.open_meta_info('something', 'w'), 'hello'
221
 
        self.assert_(self.archive._has_meta_info('something'))
222
 
    tests.append('has_meta_info')
223
 
 
224
 
    def is_signed(self):
225
 
        """Archive.is_signed works."""
226
 
        mirror_name = self.archive.name + '-MIRROR'
227
 
        location = self.arch_dir/mirror_name
228
 
        mirror = self.archive.make_mirror(mirror_name, location, signed=True)
229
 
        self.assertEqual(False, self.archive.is_signed)
230
 
        self.assertEqual(True, mirror.is_signed)
231
 
    tests.append('is_signed')
232
 
 
233
 
    def has_listings(self):
234
 
        """Archive.has_listings works."""
235
 
        mirror_name = self.archive.name + '-MIRROR'
236
 
        location = self.arch_dir/mirror_name
237
 
        mirror = self.archive.make_mirror(mirror_name, location, listing=True)
238
 
        self.assertEqual(False, self.archive.has_listings)
239
 
        self.assertEqual(True, mirror.has_listings)
240
 
    tests.append('has_listings')
241
 
 
242
 
    def open_version(self, flags):
243
 
        return open(DirName(self.archive.location)/'.archive-version', flags)
244
 
 
245
 
    def version_string(self):
246
 
        """Archive.version_string works."""
247
 
        version_file = self.open_version('r')
248
 
        version_string = version_file.read().rstrip('\n')
249
 
        version_file.close()
250
 
        self.assertEqual(version_string, self.archive.version_string)
251
 
    tests.append('version_string')
252
 
 
253
 
 
254
 
class MirrorArchive(TestCase):
255
 
 
256
 
    def extraSetup(self):
257
 
        self.set_my_id()
258
 
        self.create_archive_and_mirror()
259
 
 
260
 
    tests = []
261
 
 
262
 
    def is_mirror(self):
263
 
        """Archive.is_mirror works."""
264
 
        self.assert_(not self.archive.is_mirror)
265
 
        self.assert_(self.mirror.is_mirror)
266
 
    tests.append('is_mirror')
267
 
 
268
 
    def official_name(self):
269
 
        """Archive.official_name works."""
270
 
        self.assertEqual(self.archive.official_name, self.archive.name)
271
 
        self.assertEqual(self.mirror.official_name, self.archive.name)
272
 
    tests.append('official_name')
273
 
 
274
 
    def mirror_official_name(self):
275
 
        """Archive.make mirror use official name."""
276
 
        mirror2_name = self.mirror.name + '-2'
277
 
        location2 = self.arch_dir/mirror2_name
278
 
        mirror2 = self.mirror.make_mirror(mirror2_name, location2)
279
 
        self.assertEqual(mirror2.official_name, self.archive.name)
280
 
    tests.append('mirror_official_name')
281
 
 
282
 
 
283
 
class InventoryTree(TestCase):
284
 
 
285
 
    """Test cases that work on a tree but need no associated archive"""
286
 
 
287
 
    def extraSetup(self):
288
 
        self.set_my_id()
289
 
        self.create_working_tree()
290
 
 
291
 
    tests = []
292
 
 
293
 
    def inventory(self):
294
 
        """Basic inventory."""
295
 
        wt = self.working_tree
296
 
        open(wt/'ahoy', 'w').write('')
297
 
        wt.add_tag(wt/'ahoy')
298
 
        os.mkdir(wt/'bar')
299
 
        wt.add_tag(wt/'bar')
300
 
        open(wt/'bar'/'baz', 'w').write('')
301
 
        wt.add_tag(wt/'bar'/'baz')
302
 
        inv = list(wt.iter_inventory(source=True, both=True))
303
 
        result = map(type, inv)
304
 
        expected = [FileName, DirName, FileName]
305
 
        self.assertEqual(expected, result)
306
 
        result = map(str, inv)
307
 
        expected = ['ahoy', 'bar', 'bar/baz']
308
 
        self.assertEqual(expected, result)
309
 
        inv = list(wt.iter_inventory(source=True, files=True))
310
 
        self.assertEqual([FileName, FileName], map(type, inv))
311
 
        self.assertEqual(['ahoy', 'bar/baz'], map(str, inv))
312
 
        inv = list(wt.iter_inventory(source=True, directories=True))
313
 
        self.assertEqual([DirName], map(type, inv))
314
 
        self.assertEqual(['bar'], map(str, inv))
315
 
    tests.append('inventory')
316
 
 
317
 
    def inventory_ids(self):
318
 
        """Inventory with ids"""
319
 
        wt = self.working_tree
320
 
        wt.tagging_method = 'names'
321
 
        open(wt/'ahoy', 'w').write('')
322
 
        os.mkdir(wt/'bar')
323
 
        open(wt/'bar'/'baz', 'w').write('')
324
 
        inv = list(wt.iter_inventory_ids(source=True, both=True))
325
 
        result = map(lambda L: type(L[1]), inv)
326
 
        expected = [FileName, DirName, FileName]
327
 
        self.assertEqual(expected, result)
328
 
        result = map(lambda L: str(L[1]), inv)
329
 
        expected = ['ahoy', 'bar', 'bar/baz']
330
 
        self.assertEqual(expected, result)
331
 
        result = map(lambda L: L[0], inv)
332
 
        expected = ['?./ahoy', '?./bar', '?./bar/baz']
333
 
        self.assertEqual(expected, result)
334
 
        inv = list(wt.iter_inventory_ids(source=True, files=True))
335
 
        self.assertEqual([FileName, FileName], map(lambda L: type(L[1]), inv))
336
 
        self.assertEqual(['ahoy', 'bar/baz'], map(lambda L: str(L[1]), inv))
337
 
        self.assertEqual(['?./ahoy', '?./bar/baz'], map(lambda L: L[0], inv))
338
 
        inv = list(wt.iter_inventory_ids(source=True, directories=True))
339
 
        self.assertEqual([DirName], map(lambda L: type(L[1]), inv))
340
 
        self.assertEqual(['bar'], map(lambda L: str(L[1]), inv))
341
 
        self.assertEqual(['?./bar'], map(lambda L: L[0], inv))
342
 
    tests.append('inventory_ids')
343
 
 
344
 
    def inventory_names(self):
345
 
        """iter_inventory names keyword overrides untagged-source setting."""
346
 
        wt = self.working_tree
347
 
        print >> open(wt/'{arch}'/'=tagging-method', 'w'),\
348
 
              '\n'.join(('explicit', 'untagged-source precious',''))
349
 
        foo = FileName('foo')
350
 
        print >> open(wt/foo, 'w')
351
 
        result = list(wt.iter_inventory(precious=True, files=True))
352
 
        self.assertEqual([foo], result) # foo is untagged-source precious
353
 
        result = list(wt.iter_inventory(source=True, files=True))
354
 
        self.assertEqual([], result) # thus foo is not source
355
 
        result = list(wt.iter_inventory(source=True, files=True, names=True))
356
 
        self.assertEqual([foo], result) # however it has a source name
357
 
    tests.append('inventory_names')
358
 
 
359
 
    def get_tag(self):
360
 
        """ArchSourceTree.get_tag works."""
361
 
        wt = self.working_tree
362
 
        wt.tagging_method = 'tagline'
363
 
        foo = FileName('foo')
364
 
        print >> open(wt/foo, 'w')
365
 
        self.assertEqual(None, wt.get_tag(foo))
366
 
        print >> open(wt/foo, 'w'), 'arch-tag: yadda'
367
 
        self.assertEqual('i_yadda', wt.get_tag(foo))
368
 
        wt.add_tag(foo)
369
 
        print >> open(wt/'.arch-ids'/foo+'.id', 'w'), 'yadda'
370
 
        self.assertEqual('x_yadda', wt.get_tag(foo))
371
 
    tests.append('get_tag')
372
 
 
373
 
    def inventory_file_space(self):
374
 
        """iter_inventory handles spaces in file and dir names correctly."""
375
 
        if tla_skip_whitespace_tests(): return
376
 
        wt = self.working_tree
377
 
        filename = FileName('file name')
378
 
        dirname = DirName('dir name')
379
 
        open(wt/filename, 'w').write('')
380
 
        wt.add_tag(filename)
381
 
        os.mkdir(wt/dirname)
382
 
        wt.add_tag(dirname)
383
 
        expected = [filename]
384
 
        result = list(wt.iter_inventory(source=True, files=True))
385
 
        self.assertEqual(expected, result) # inventory -s
386
 
        expected = [dirname]
387
 
        result = list(wt.iter_inventory(source=True, directories=True))
388
 
        self.assertEqual(expected, result) # inventory -s -d
389
 
        expected = [filename, dirname]
390
 
        expected.sort()
391
 
        result = list(wt.iter_inventory(source=True, both=True))
392
 
        result.sort()
393
 
        self.assertEqual(expected, result) # inventory -s -B
394
 
        nestedname = 'nested tree'
395
 
        os.mkdir(wt/nestedname)
396
 
        nested = arch.init_tree(wt/nestedname, nested=True)
397
 
        expected = [nested]
398
 
        result = list(wt.iter_inventory(trees=True))
399
 
        self.assertEqual(expected, result) # inventory -t
400
 
    tests.append('inventory_file_space')
401
 
 
402
 
    def inventory_limit(self):
403
 
        """Inventory with a limit"""
404
 
        wt = self.working_tree
405
 
        open(wt/'ahoy', 'w').write('')
406
 
        wt.add_tag(wt/'ahoy')
407
 
        os.mkdir(wt/'bar')
408
 
        wt.add_tag(wt/'bar')
409
 
        open(wt/'bar'/'baz', 'w').write('')
410
 
        wt.add_tag(wt/'bar'/'baz')
411
 
        inv = list(wt.iter_inventory(limit='bar', source=True, both=True))
412
 
        self.assertEqual([FileName], map(type, inv))
413
 
        self.assertEqual(['bar/baz'], map(str, inv))
414
 
        def thunk():
415
 
            return wt.iter_inventory(limit=('a', 'b'), source=True, both=True)
416
 
        self.assertRaises(TypeError, thunk)
417
 
        def thunk():
418
 
            return wt.iter_inventory(limit='/tmp', source=True, both=True)
419
 
        self.assertRaises(ValueError, thunk)
420
 
    tests.append('inventory_limit')
421
 
 
422
 
    def inventory_ids_limit(self):
423
 
        """Inventory with ids"""
424
 
        wt = self.working_tree
425
 
        wt.tagging_method = 'names'
426
 
        open(wt/'ahoy', 'w').write('')
427
 
        os.mkdir(wt/'bar')
428
 
        open(wt/'bar'/'baz', 'w').write('')
429
 
        inv = list(wt.iter_inventory_ids(limit='bar', source=True, both=True))
430
 
        self.assertEqual([FileName], map(lambda L: type(L[1]), inv))
431
 
        self.assertEqual(['bar/baz'], map(lambda L: str(L[1]), inv))
432
 
        expected, result = ['?bar/baz'], map(lambda L: L[0], inv)
433
 
        self.assertEqual(expected, result) # XXX will fail when tla is fixed
434
 
        def thunk():
435
 
            return wt.iter_inventory_ids(
436
 
                limit=('a', 'b'), source=True, both=True)
437
 
        self.assertRaises(TypeError, thunk)
438
 
        def thunk():
439
 
            return wt.iter_inventory_ids(limit='/tmp', source=True, both=True)
440
 
        self.assertRaises(ValueError, thunk)
441
 
    tests.append('inventory_ids_limit')
442
 
 
443
 
 
444
 
class InitTree(TestCase):
445
 
 
446
 
    tests = []
447
 
 
448
 
    def init_tree(self):
449
 
        """init_tree works (w/o version) for unnested and nested trees."""
450
 
        os.mkdir(self.working_dir)
451
 
        arch.init_tree(self.working_dir)
452
 
        os.mkdir(self.nested_dir)
453
 
        arch.init_tree(self.nested_dir, nested=True)
454
 
    tests.append('init_tree')
455
 
 
456
 
    def init_tree_not_nested(self):
457
 
        """init_tree (w/o version) fails for nested if "nested" is not set."""
458
 
        os.mkdir(self.working_dir)
459
 
        arch.init_tree(self.working_dir)
460
 
        os.mkdir(self.nested_dir)
461
 
        self.failUnlessRaises(errors.ExecProblem,
462
 
                              lambda: arch.init_tree(self.nested_dir))
463
 
    tests.append('init_tree_not_nested')
464
 
 
465
 
    def init_tree_with_version(self):
466
 
        """init_tree with version set tree-version and create log-version."""
467
 
        os.mkdir(self.working_dir)
468
 
        wt = arch.init_tree(self.working_dir, self.version)
469
 
        self.failUnlessEqual(self.version, wt.tree_version)
470
 
        self.failUnlessEqual(1, len(list(wt.iter_log_versions())))
471
 
        self.failUnlessEqual(self.version, wt.iter_log_versions().next())
472
 
    tests.append('init_tree_with_version')
473
 
 
474
 
    def tree_root(self):
475
 
        """tree_root works"""
476
 
        os.mkdir(self.working_dir)
477
 
        tree = arch.init_tree(self.working_dir)
478
 
        subdir = tree/'subdir'
479
 
        os.mkdir(subdir)
480
 
        subdir_tree = arch.tree_root(subdir)
481
 
        self.assertEqual(tree, subdir_tree)
482
 
    tests.append('tree_root')
483
 
 
484
 
 
485
 
class WorkingTree(TestCase):
486
 
 
487
 
    def extraSetup(self):
488
 
        self.set_my_id()
489
 
        self.create_archive()
490
 
        self.create_working_tree()
491
 
 
492
 
    tests = []
493
 
 
494
 
    def log_versions(self):
495
 
        """Log version addition, iteration and deletion."""
496
 
        wt = self.working_tree
497
 
        self.failUnlessEqual(0, len(list(wt.iter_log_versions())))
498
 
        wt.add_log_version(self.version)
499
 
        self.failUnlessEqual(1, len(list(wt.iter_log_versions())))
500
 
        self.failUnless(isinstance(wt.iter_log_versions().next(),
501
 
                                   arch.Version))
502
 
        self.failUnlessEqual(self.version, wt.iter_log_versions().next())
503
 
        wt.remove_log_version(self.version)
504
 
        self.failUnlessEqual(0, len(list(wt.iter_log_versions())))
505
 
    tests.append('log_versions')
506
 
 
507
 
    def log_versions_limit(self):
508
 
        """Log version listing with a limit."""
509
 
        wt = self.working_tree
510
 
        vsn = self.version
511
 
        jdoe_cat_brn_10 = vsn
512
 
        other_arch = arch.Version('alice@example/%s' % vsn.nonarch)
513
 
        other_cat = vsn.archive['dog']['brn']['1.0']
514
 
        other_brn = vsn.category['trk']['1.0']
515
 
        other_vsn = vsn.branch['2']
516
 
        all_versions = [vsn, other_arch, other_cat, other_brn, other_vsn]
517
 
        for V in all_versions: wt.add_log_version(V)
518
 
        all_versions.sort()
519
 
        expected = map(str, all_versions)
520
 
        expected.sort()
521
 
        result = map(str, wt.iter_log_versions())
522
 
        result.sort()
523
 
        self.assertEqual(expected, result)
524
 
        result = list(wt.iter_log_versions(other_arch.archive))
525
 
        self.assertEqual([other_arch], result)
526
 
        result = map(str, wt.iter_log_versions(vsn.archive))
527
 
        result.sort()
528
 
        expected = map(str, (vsn, other_cat, other_brn, other_vsn))
529
 
        expected.sort()
530
 
        self.assertEqual(expected, result)
531
 
        result = list(wt.iter_log_versions(other_cat.category))
532
 
        self.assertEqual([other_cat], result)
533
 
        result = map(str, wt.iter_log_versions(vsn.category))
534
 
        result.sort()
535
 
        expected = map(str, (vsn, other_brn, other_vsn))
536
 
        expected.sort()
537
 
        self.assertEqual(expected, result)
538
 
        result = list(wt.iter_log_versions(other_brn.branch))
539
 
        self.assertEqual([other_brn], result)
540
 
        result = map(str, wt.iter_log_versions(vsn.branch))
541
 
        result.sort()
542
 
        expected = map(str, (vsn, other_vsn))
543
 
        expected.sort()
544
 
        self.assertEqual(expected, result)
545
 
        result = list(wt.iter_log_versions(vsn))
546
 
        self.assertEqual([vsn], result)
547
 
    tests.append('log_versions_limit')
548
 
 
549
 
    def tree_version(self):
550
 
        """Setting and getting WorkingTree.version."""
551
 
        wt = self.working_tree
552
 
        self.failUnlessRaises(arch.errors.TreeVersionError,
553
 
                              lambda: wt.tree_version)
554
 
        try: wt.tree_version
555
 
        except arch.errors.TreeVersionError, E:
556
 
            self.assertEqual(E.bad_version, None)
557
 
        open(wt/'{arch}'/'++default-version', 'w').write('fooo!!!\n')
558
 
        try: wt.tree_version
559
 
        except arch.errors.TreeVersionError, E:
560
 
            self.assertEqual(E.bad_version, 'fooo!!!')
561
 
        self.failUnlessRaises(arch.errors.TreeVersionError,
562
 
                              lambda: wt.tree_version)
563
 
        wt.tree_version = self.version
564
 
        self.failUnless(isinstance(wt.tree_version, arch.Version))
565
 
        self.failUnlessEqual(self.version, wt.tree_version)
566
 
    tests.append('tree_version')
567
 
 
568
 
 
569
 
class TreeRevision(TestCase):
570
 
 
571
 
    def extraSetup(self):
572
 
        self.set_my_id()
573
 
        self.create_archive()
574
 
        self.create_version()
575
 
 
576
 
    tests = []
577
 
 
578
 
    def tree_revision(self):
579
 
        """tree_revision works"""
580
 
        os.mkdir(self.working_dir)
581
 
        wt = arch.init_tree(self.working_dir, self.version)
582
 
        wt.import_()
583
 
        self.assertEqual(self.version['base-0'], wt.tree_revision)
584
 
        m = wt.log_message()
585
 
        m['Summary'] = m.description = 'first revision'
586
 
        wt.commit(m)
587
 
        self.assertEqual(self.version['patch-1'], wt.tree_revision)
588
 
    tests.append('tree_revision')
589
 
 
590
 
 
591
 
class LogMessage(TestCase):
592
 
 
593
 
    def extraSetup(self):
594
 
        self.set_my_id()
595
 
        self.create_archive()
596
 
        self.create_working_tree(self.version)
597
 
        nonarch = self.version.nonarch
598
 
        archname = self.version.archive.name
599
 
        self.file_name = '++log.%s--%s' % (nonarch, archname)
600
 
 
601
 
    tests = []
602
 
 
603
 
    def create(self):
604
 
        """LogMessage creation."""
605
 
        wt = self.working_tree
606
 
        # message is None if not created
607
 
        m = wt.log_message(create=False)
608
 
        self.failUnlessEqual(m, None)
609
 
        # message is created by default
610
 
        m = wt.log_message()
611
 
        self.failIfEqual(m, None)
612
 
    tests.append('create')
613
 
 
614
 
    def name(self):
615
 
        """LogMessage.name attribute is read-only."""
616
 
        m = self.working_tree.log_message()
617
 
        self.assertEqual(m.name, self.working_tree.abspath()/m.name)
618
 
        def setLogName(): m.name = 'error'
619
 
        self.failUnlessRaises(AttributeError, setLogName)
620
 
    tests.append('name')
621
 
 
622
 
    def template(self):
623
 
        """LogMessage template has expected value and can be parsed."""
624
 
        m = self.working_tree.log_message()
625
 
        template = '\n'.join(('Summary: ', 'Keywords: ', '', ''))
626
 
        self.failUnlessEqual(file(m.name).read(), template)
627
 
        self.failUnlessEqual(m['Summary'], '')
628
 
        self.failUnlessEqual(m['Keywords'], '')
629
 
        self.failUnlessEqual(m.description, '')
630
 
    tests.append('template')
631
 
 
632
 
    def setup_message(self):
633
 
        self.description = '\n'.join(("Hello,  ", "  World!"))
634
 
        # leading space in summary dropped, trailing preserved
635
 
        self.summary = "Short summary  "
636
 
        self.message = '\n'.join(('Summary: ' + self.summary,
637
 
                                  'Keywords: ', '', self.description))
638
 
        m = self.message
639
 
        m = self.working_tree.log_message()
640
 
        m.description = self.description
641
 
        m['Summary'] = self.summary
642
 
        return m
643
 
 
644
 
    def check_message_file(self, m):
645
 
        self.failUnlessEqual(file(m.name).read(), self.message)
646
 
 
647
 
    def check_message_object(self, m):
648
 
        self.failUnlessEqual(m['Summary'], self.summary)
649
 
        self.failUnlessEqual(m['Keywords'], '')
650
 
        self.failUnlessEqual(m.description, self.description)
651
 
 
652
 
    def save_load(self):
653
 
        """LogMessage save() and load() behave consistently."""
654
 
        m = self.setup_message()
655
 
        self.check_message_object(m)
656
 
        m.save()
657
 
        self.check_message_file(m)
658
 
        m.load()
659
 
        self.check_message_object(m)
660
 
    tests.append('save_load')
661
 
 
662
 
    def recreate(self):
663
 
        """Recreating LogMessage does not overwrite existing message."""
664
 
        m = self.setup_message()
665
 
        m.save()
666
 
        del m
667
 
        m = self.working_tree.log_message()
668
 
        self.check_message_file(m)
669
 
        self.check_message_object(m)
670
 
    tests.append('recreate')
671
 
 
672
 
    def reload_(self):
673
 
        """Reloading LogMessage discards unsaved changes."""
674
 
        m = self.setup_message()
675
 
        m.save()
676
 
        m['Summary'] = 'foo'
677
 
        m.load()
678
 
        self.check_message_file(m)
679
 
        self.check_message_object(m)
680
 
    tests.append('reload_')
681
 
 
682
 
    def clear(self):
683
 
        """LogMessage.clear() method."""
684
 
        m = self.setup_message()
685
 
        m.clear()
686
 
        self.failUnlessEqual(m['Summary'], None)
687
 
        self.failUnlessEqual(m.description, None)
688
 
        m.save()
689
 
        self.failUnlessEqual(file(m.name).read(), '\n')
690
 
    tests.append('clear')
691
 
 
692
 
 
693
 
class LogMessageWhitespace(TestCase):
694
 
 
695
 
    def extraSetup(self):
696
 
        self.set_my_id()
697
 
        self.create_archive()
698
 
 
699
 
    tests = []
700
 
 
701
 
    def log_message_whitespace(self):
702
 
        """LogMessage files created in a path containing a space."""
703
 
        os.mkdir(self.working_dir_newline)
704
 
        wt = arch.init_tree(self.working_dir_newline, self.version)
705
 
        m = wt.log_message()
706
 
        template = '\n'.join(('Summary: ', 'Keywords: ', '', ''))
707
 
        self.failUnlessEqual(file(m.name).read(), template)
708
 
    tests.append('log_message_whitespace')
709
 
 
710
 
 
711
 
class Import(TestCase):
712
 
 
713
 
    def extraSetup(self):
714
 
        self.set_my_id()
715
 
        self.create_archive()
716
 
        self.create_version()
717
 
        self.create_working_tree(self.version)
718
 
 
719
 
    tests = []
720
 
 
721
 
    def import_without_log(self):
722
 
        """WorkingTree.import w/o log message produces expected message."""
723
 
        wt = self.working_tree
724
 
        wt.import_()
725
 
        logs = tuple(wt.iter_logs())
726
 
        self.failUnlessEqual(1, len(logs))
727
 
        log = logs[0]
728
 
        self.failUnlessEqual(log.revision, self.version['base-0'])
729
 
        self.failUnlessEqual(log.summary, 'initial import')
730
 
        self.failUnlessEqual(log.description,
731
 
                             '\n(automatically generated log message)\n')
732
 
    tests.append('import_without_log')
733
 
 
734
 
    def _import_with_log_helper(self, m, import_):
735
 
        wt = arch.WorkingTree(self.working_dir)
736
 
        s = "empty initial import"
737
 
        m['Summary'] = s
738
 
        d = "Empty initial import.\n"
739
 
        m.description = d
740
 
        import_()
741
 
        logs = tuple(wt.iter_logs())
742
 
        self.failUnlessEqual(1, len(logs))
743
 
        log = logs[0]
744
 
        self.failUnlessEqual(log.revision, self.version['base-0'])
745
 
        self.failUnlessEqual(log.summary, s)
746
 
        self.failUnlessEqual(log.description, d)
747
 
 
748
 
    def import_with_log(self):
749
 
        """WorkingTree.import with default log message file name."""
750
 
        wt = self.working_tree
751
 
        m = wt.log_message()
752
 
        def import_():
753
 
            m.save()
754
 
            wt.import_()
755
 
        self._import_with_log_helper(m, import_)
756
 
    tests.append('import_with_log')
757
 
 
758
 
    def import_with_custom_log(self):
759
 
        """WorkingTree.import with specified log message file name."""
760
 
        wt = self.working_tree
761
 
        m = arch.LogMessage(wt/'+custom_log_message')
762
 
        m.clear()
763
 
        def import_():
764
 
            wt.import_(m)
765
 
        self._import_with_log_helper(m, import_)
766
 
    tests.append('import_with_custom_log')
767
 
 
768
 
    def import_not_empty(self):
769
 
        """WorkingTree.import fails on unrecognized and adds source."""
770
 
        wt = self.working_tree
771
 
        # unrecognized file
772
 
        print >> file(wt/'!foo', 'w'), 'bar'
773
 
        def import_default(): wt.import_()
774
 
        self.failUnlessRaises(errors.ExecProblem, import_default)
775
 
        os.remove(wt/'!foo')
776
 
        # source file
777
 
        print >> file(wt/'source', 'w'), 'hello'
778
 
        wt.add_tag('source')
779
 
        wt.import_()
780
 
        log = wt.iter_logs().next()
781
 
        self.failUnlessEqual(list(log.new_files), [FileName('source')])
782
 
    tests.append('import_not_empty')
783
 
 
784
 
 
785
 
class Commit(TestCase):
786
 
 
787
 
    def extraSetup(self):
788
 
        self.set_my_id()
789
 
        self.create_archive()
790
 
        self.create_version()
791
 
        self.create_working_tree(self.version)
792
 
 
793
 
    tests = []
794
 
 
795
 
    def commit_seal_fix(self):
796
 
        """Commit, seal, fix, create expected patchlevels."""
797
 
        wt = self.working_tree
798
 
        wt.import_()
799
 
        # commit w/o log message
800
 
        self.failUnlessRaises(errors.ExecProblem, lambda: wt.commit())
801
 
        # commit revision
802
 
        m = wt.log_message()
803
 
        m['Summary'] = m.description = 'first revision'
804
 
        wt.commit(m)
805
 
        self.failUnlessEqual(map(lambda(x): x.revision, wt.iter_logs()),
806
 
                             map(lambda(x): self.version[x],
807
 
                                 ('base-0', 'patch-1')))
808
 
        # fix non-sealed, then seal
809
 
        m['Summary'] = m.description = 'second revision'
810
 
        self.failUnlessRaises(errors.ExecProblem,
811
 
                              lambda: wt.commit(fix=True, log=m))
812
 
        wt.commit(seal=True, log=m)
813
 
        self.failUnlessEqual(map(lambda(x): x.revision, wt.iter_logs()),
814
 
                             map(lambda(x): self.version[x],
815
 
                                 ('base-0', 'patch-1', 'version-0')))
816
 
        # commit sealed, then fix
817
 
        m['Summary'] = m.description = 'third revision'
818
 
        self.failUnlessRaises(errors.ExecProblem,
819
 
                              lambda: wt.commit(m))
820
 
        wt.commit(fix=True, log=m)
821
 
        self.failUnlessEqual(map(lambda(x): x.revision, wt.iter_logs()),
822
 
                             map(lambda(x): self.version[x],
823
 
                                 ('base-0', 'patch-1',
824
 
                                  'version-0', 'versionfix-1')))
825
 
        # sanity catches trying to fix and seal at the same time
826
 
        m['Summary'] = m.description = 'error'
827
 
        self.failUnlessRaises(AssertionError,
828
 
                              lambda: wt.commit(seal=True, fix=True, log=m))
829
 
    tests.append('commit_seal_fix')
830
 
 
831
 
    def commit_relative_log(self):
832
 
        """Commit with a log file given as a relative path."""
833
 
        wt = self.working_tree
834
 
        wt.import_()
835
 
        tree_dir, tree_name = map(DirName, os.path.split(wt))
836
 
        os.chdir(tree_dir)
837
 
        log_name = tree_name/'+log'
838
 
        f = open(log_name, 'w')
839
 
        summary = 'first rev'
840
 
        f.write('Summary: %s\n\n' % summary)
841
 
        f.close()
842
 
        wt.commit(log=log_name)
843
 
        def lastlog(): return wt.iter_logs(reverse=True).next()
844
 
        self.failUnlessEqual(lastlog().summary, summary)
845
 
    tests.append('commit_relative_log')
846
 
 
847
 
    def commit_files(self):
848
 
        """Commit (maybe with limit) and Patchlog are consistent."""
849
 
        wt = self.working_tree
850
 
        # import two files
851
 
        print >> file(wt/'f1', 'w'), 'hello'
852
 
        print >> file(wt/'f2', 'w'), 'world'
853
 
        map(wt.add_tag, ('f1', 'f2'))
854
 
        wt.import_()
855
 
        def lastlog(): return wt.iter_logs(reverse=True).next()
856
 
        self.failUnlessEqual(lastlog().new_files, map(FileName, ('f1','f2')))
857
 
        # modify both files
858
 
        print >> file(wt/'f1', 'w'), 'Hello,'
859
 
        print >> file(wt/'f2', 'w'), 'World!'
860
 
        # sanity catches non-null but empty file-list
861
 
        m = wt.log_message()
862
 
        m['Summary'] = m.description = 'first revision'
863
 
        def commit_empty(): wt.commit(log=m, file_list=[].__iter__())
864
 
        self.failUnlessRaises(AssertionError, commit_empty)
865
 
        # commit one file, absolute path
866
 
        wt.commit(m, file_list=[wt/'f2'].__iter__())
867
 
        self.failUnlessEqual(lastlog().modified_files, [FileName('f2')])
868
 
        # commit the other file, relative path
869
 
        m['Summary'] = m.description = 'third revision'
870
 
        wt.commit(m, file_list=['f1'])
871
 
        self.failUnlessEqual(lastlog().modified_files, [FileName('f1')])
872
 
    tests.append('commit_files')
873
 
 
874
 
    def commit_out_of_date(self):
875
 
        """Committing out-of-date fails unless forced."""
876
 
        wt = self.working_tree
877
 
        wt.import_()
878
 
        m = wt.log_message()
879
 
        m['Summary'] = m.description = 'first revision'
880
 
        wt.commit(m)
881
 
        shutil.rmtree(self.working_dir)
882
 
        wt = self.version['base-0'].get(self.working_dir)
883
 
        m['Summary'] = m.description = 'out of date revision'
884
 
        # trying to commit an out-of-date tree fails
885
 
        self.failUnlessRaises(errors.ExecProblem, lambda: wt.commit(m))
886
 
        # unless the right option is set
887
 
        wt.commit(m, out_of_date_ok=True)
888
 
        self.failUnlessEqual(map(lambda(x): x.revision, wt.iter_logs()),
889
 
                             map(lambda(x): self.version[x],
890
 
                                 ('base-0', 'patch-2')))
891
 
    tests.append('commit_out_of_date')
892
 
 
893
 
    def commit_version(self):
894
 
        """Commit on a specified version."""
895
 
        wt = self.working_tree
896
 
        wt.import_()
897
 
        def think(**kw):
898
 
            def thunk():
899
 
                m = wt.log_message()
900
 
                m['Summary'] = m.description = 'first revision'
901
 
                wt.commit(m, **kw)
902
 
            return thunk
903
 
        wt.tree_version = self.other_version
904
 
        self.assertRaises(errors.ExecProblem, think())
905
 
        think(version=self.version)()
906
 
        wt.tree_version = self.version
907
 
        self.failUnlessEqual(map(lambda(x): x.revision, wt.iter_logs()),
908
 
                             map(lambda(x): self.version[x],
909
 
                                 ('base-0', 'patch-1')))
910
 
    tests.append('commit_version')
911
 
 
912
 
    def iter_commit(self):
913
 
        """iter_commit works"""
914
 
        wt = self.working_tree
915
 
        wt.tagging_method = 'names'
916
 
        wt.import_()
917
 
        print >> file(wt/'f1', 'w'), 'hello'
918
 
        m = wt.log_message()
919
 
        m['Summary'] = m.description = 'first revision'
920
 
        lines = wt.iter_commit(m)
921
 
        changes = [L for L in lines if isinstance(L, arch.TreeChange)]
922
 
        self.assertEqual(1, len(changes))
923
 
        self.assertEqual(arch.FileAddition, type(changes[0]))
924
 
        self.assertEqual('f1', changes[0].name)
925
 
    tests.append('iter_commit')
926
 
 
927
 
    def iter_commit_files(self):
928
 
        """Commit with limit print the right path."""
929
 
        wt = self.working_tree
930
 
        print >> file(wt/'f1', 'w'), 'hello'
931
 
        print >> file(wt/'f2', 'w'), 'world'
932
 
        map(wt.add_tag, ('f1', 'f2'))
933
 
        wt.import_()
934
 
        print >> file(wt/'f1', 'w'), 'Hello,'
935
 
        print >> file(wt/'f2', 'w'), 'World!'
936
 
        m = wt.log_message()
937
 
        m['Summary'] = m.description = 'first revision'
938
 
        lines = wt.iter_commit(m, file_list=['f2'].__iter__())
939
 
        changes = [L for L in lines if isinstance(L, arch.TreeChange)]
940
 
        self.assertEqual(1, len(changes))
941
 
        self.assertEqual(arch.FileModification, type(changes[0]))
942
 
        self.assertEqual('f2', changes[0].name)
943
 
        # commit the other file, relative path
944
 
        m['Summary'] = m.description = 'first revision'
945
 
        lines = wt.iter_commit(m, file_list=['f1'])
946
 
        changes = [L for L in lines if isinstance(L, arch.TreeChange)]
947
 
        self.assertEqual(1, len(changes))
948
 
        self.assertEqual(arch.FileModification, type(changes[0]))
949
 
        self.assertEqual('f1', changes[0].name)
950
 
    tests.append('iter_commit_files')
951
 
 
952
 
 
953
 
def tla_skip_whitespace_tests(quiet=False):
954
 
    from arch.backends import tla
955
 
    status = tla.status_cmd(['escape'], expected=(0,1))
956
 
    if status == 1 and not quiet:
957
 
        sys.stdout.write('[SKIP] ')
958
 
        sys.stdout.flush()
959
 
    return status == 1
960
 
 
961
 
 
962
 
class EscapedWhitespace(TestCase):
963
 
 
964
 
    tests = []
965
 
 
966
 
    def extraSetup(self):
967
 
        self.set_my_id()
968
 
        self.create_archive()
969
 
        self.create_version()
970
 
        self.create_working_tree(self.version)
971
 
 
972
 
    def setUp(self):
973
 
        TestCase.setUp(self)
974
 
        if tla_skip_whitespace_tests(quiet=True): return
975
 
        self.tree = self.working_tree
976
 
        self.file_name = FileName('foo bar')
977
 
        open(self.tree/self.file_name, 'w').write('')
978
 
        self.tree.add_tag(self.file_name)
979
 
        tagfile = self.tree/'.arch-ids'/self.file_name+'.id'
980
 
        print >> open(tagfile, 'w'), 'baby jesus cries'
981
 
        self.tree.import_()
982
 
 
983
 
    def new_files_escaped(self):
984
 
        """New-files from patchlog with spaces"""
985
 
        if tla_skip_whitespace_tests(): return
986
 
        expected = [self.file_name]
987
 
        result = list(self.tree.iter_logs())[0].new_files
988
 
        self.assertEqual(expected, result)
989
 
    tests.append('new_files_escaped')
990
 
 
991
 
    def modified_files_escaped(self):
992
 
        """Modified-files from patchlog with spaces"""
993
 
        if tla_skip_whitespace_tests(): return
994
 
        open(self.tree/self.file_name, 'w').write('Hello, World!')
995
 
        m = self.tree.log_message()
996
 
        m['Summary'] = 'first revision'
997
 
        self.tree.commit(m)
998
 
        expected = [self.file_name]
999
 
        result = list(self.tree.iter_logs())[1].modified_files
1000
 
        self.assertEqual(expected, result)
1001
 
    tests.append('modified_files_escaped')
1002
 
 
1003
 
    def removed_files_escaped(self):
1004
 
        """Removed-files from patchlog with spaces"""
1005
 
        if tla_skip_whitespace_tests(): return
1006
 
        name = self.file_name
1007
 
        self.tree.delete(name)
1008
 
        m = self.tree.log_message()
1009
 
        m['Summary'] = 'first revision'
1010
 
        self.tree.commit(m)
1011
 
        expected = [name, name.dirname()/'.arch-ids'/name.basename()+'.id']
1012
 
        result = list(self.tree.iter_logs())[1].removed_files
1013
 
        expected.sort()
1014
 
        result.sort()
1015
 
        self.assertEqual(expected, result)
1016
 
    tests.append('removed_files_escaped')
1017
 
 
1018
 
    def get_index_escaped(self):
1019
 
        """Changeset.get_index supports escaped file names."""
1020
 
        if tla_skip_whitespace_tests(): return
1021
 
        open(self.tree/self.file_name, 'w').write('Hello, World!')
1022
 
        m = self.tree.log_message()
1023
 
        m['Summary'] = 'first revision'
1024
 
        self.tree.commit(m)
1025
 
        cset = self.version['patch-1'].get_patch(self.tree/',cset')
1026
 
        expected = { 'x_baby_jesus_cries': self.file_name }
1027
 
        result = cset.get_index('orig-files')
1028
 
        self.assertEqual(expected, result)
1029
 
    tests.append('get_index_escaped')
1030
 
 
1031
 
    def file_find_pristine_escaped(self):
1032
 
        """file_find with pristine supports escaped names."""
1033
 
        if tla_skip_whitespace_tests(): return
1034
 
        revision = self.version['base-0']
1035
 
        result = self.tree.file_find(self.file_name, revision)
1036
 
        pristine = self.tree.find_pristine(revision)
1037
 
        expected = pristine/self.file_name
1038
 
        self.assertEqual(expected, result)
1039
 
    tests.append('file_find_pristine_escaped')
1040
 
 
1041
 
 
1042
 
class LatestVersion(TestCase):
1043
 
 
1044
 
    def extraSetup(self):
1045
 
        self.set_my_id()
1046
 
 
1047
 
    tests = []
1048
 
 
1049
 
    def latest_version(self):
1050
 
        return self.version.branch.latest_version()
1051
 
 
1052
 
    def not_registered(self):
1053
 
        """Branch.latest_version ValueError if archive is not registered."""
1054
 
        self.assertRaises(ValueError, self.latest_version)
1055
 
    tests.append('not_registered')
1056
 
 
1057
 
    def missing_branch(self):
1058
 
        """Branch.latest_version ValueError if branch does not exist."""
1059
 
        self.create_archive()
1060
 
        self.assertRaises(ValueError, self.latest_version)
1061
 
    tests.append('missing_branch')
1062
 
 
1063
 
    def empty_branch(self):
1064
 
        """Branch.latest_version ValueError if branch is empty."""
1065
 
        self.create_archive()
1066
 
        self.create_branch()
1067
 
        self.assertRaises(ValueError, self.latest_version)
1068
 
    tests.append('empty_branch')
1069
 
 
1070
 
    def works(self):
1071
 
        """Branch.latest_version works."""
1072
 
        self.create_archive()
1073
 
        self.create_version()
1074
 
        version = self.version
1075
 
        branch = version.branch
1076
 
        self.assertEqual(version, branch.latest_version())
1077
 
        next_version = arch.Version(version.fullname + '.1')
1078
 
        next_version.setup()
1079
 
        self.assertEqual(next_version, branch.latest_version())
1080
 
    tests.append('works')
1081
 
 
1082
 
 
1083
 
class LatestRevision(TestCase):
1084
 
 
1085
 
    def extraSetup(self):
1086
 
        self.set_my_id()
1087
 
 
1088
 
    tests = []
1089
 
 
1090
 
    def latest_revision(self):
1091
 
        return self.version.latest_revision()
1092
 
 
1093
 
    def not_registered(self):
1094
 
        """Version.latest_revision ValueError if archive is not registered."""
1095
 
        self.assertRaises(ValueError, self.latest_revision)
1096
 
    tests.append('not_registered')
1097
 
 
1098
 
    def missing_version(self):
1099
 
        """Version.latest_revision ValueError if version does not exist."""
1100
 
        self.create_archive()
1101
 
        self.assertRaises(ValueError, self.latest_revision)
1102
 
    tests.append('missing_version')
1103
 
 
1104
 
    def empty_version(self):
1105
 
        """Version.latest_revision ValueError if version is empty."""
1106
 
        self.create_archive()
1107
 
        self.create_version()
1108
 
        self.assertRaises(ValueError, self.latest_revision)
1109
 
    tests.append('empty_version')
1110
 
 
1111
 
    def works(self):
1112
 
        """Version.latest_revision works."""
1113
 
        self.create_archive()
1114
 
        self.create_version()
1115
 
        self.create_working_tree(self.version)
1116
 
        tree = self.working_tree
1117
 
        tree.import_()
1118
 
        version = self.version
1119
 
        self.assertEqual(version['base-0'], version.latest_revision())
1120
 
        self.commit_summary(tree, 'first revision')
1121
 
        self.assertEqual(version['patch-1'], version.latest_revision())
1122
 
    tests.append('works')
1123
 
 
1124
 
 
1125
 
class PopulatedArchive(TestCase):
1126
 
 
1127
 
    def extraSetup(self):
1128
 
        self.set_my_id()
1129
 
        self.create_archive()
1130
 
        self.create_version()
1131
 
        self.create_working_tree(self.version)
1132
 
 
1133
 
    def setUp(self):
1134
 
        TestCase.setUp(self)
1135
 
        self.tree = self.working_tree
1136
 
        self.tree.import_()
1137
 
        self.commit_summary(self.tree, 'first revision')
1138
 
 
1139
 
    tests = []
1140
 
 
1141
 
    def cachedrevs(self):
1142
 
        """Cached revisions."""
1143
 
        def ls_cached():
1144
 
            gen = self.version.iter_cachedrevs()
1145
 
            self.assert_(isinstance(gen, types.GeneratorType))
1146
 
            return list(gen)
1147
 
        self.assertEqual(ls_cached(), [])
1148
 
        self.version['base-0'].cache()
1149
 
        self.assertEqual(ls_cached(), [self.version['base-0']])
1150
 
        self.version['patch-1'].cache()
1151
 
        self.assertEqual(ls_cached(),
1152
 
                         map(lambda(x): self.version[x],
1153
 
                             ('base-0', 'patch-1')))
1154
 
        self.version['base-0'].uncache()
1155
 
        self.assertEqual(ls_cached(), [self.version['patch-1']])
1156
 
        self.version['patch-1'].uncache()
1157
 
        self.assertEqual(ls_cached(), [])
1158
 
    tests.append('cachedrevs')
1159
 
 
1160
 
    def iter_logs(self):
1161
 
        """SourceTree.iter_log for the tree-version."""
1162
 
        def ls_logs_revision(vsn=None):
1163
 
            gen = self.tree.iter_logs(vsn)
1164
 
            self.assert_(isinstance(gen, types.GeneratorType))
1165
 
            return map(lambda(x): x.revision, gen)
1166
 
        rvsns = map(lambda(x): self.version[x], ('base-0', 'patch-1'))
1167
 
        self.assertEqual(ls_logs_revision(), rvsns)
1168
 
        self.assertEqual(ls_logs_revision(self.version), rvsns)
1169
 
        self.assertEqual(ls_logs_revision(self.version.fullname), rvsns)
1170
 
    tests.append('iter_logs')
1171
 
 
1172
 
    def continuation_from_version(self):
1173
 
        """Create a continuation version (from version)."""
1174
 
        version = self.version
1175
 
        other_version = self.other_version
1176
 
        assert not other_version.exists()
1177
 
        version.latest_revision().make_continuation(other_version)
1178
 
        self.failUnless(other_version.exists())
1179
 
        revisions = tuple(other_version.iter_revisions())
1180
 
        other_base = other_version['base-0']
1181
 
        self.failUnlessEqual(revisions, (other_base,))
1182
 
        summary = other_base.patchlog.summary
1183
 
        self.failUnlessEqual(summary, 'tag of %s' % version.latest_revision())
1184
 
    tests.append('continuation_from_version')
1185
 
 
1186
 
    def mirror_basic(self):
1187
 
        """Archive.mirror without fromto, pushing."""
1188
 
        master = self.archive
1189
 
        mirror_name = master.name + '-MIRROR'
1190
 
        location = self.arch_dir/mirror_name
1191
 
        mirror = master.make_mirror(mirror_name, location)
1192
 
        master.mirror()
1193
 
        def nonarch(x): return x.nonarch
1194
 
        master_revisions = map(nonarch, master.iter_revisions())
1195
 
        mirror_revisions = map(nonarch, mirror.iter_revisions())
1196
 
        self.failUnlessEqual(master_revisions, mirror_revisions)
1197
 
    tests.append('mirror_basic')
1198
 
 
1199
 
    def mirror_fromto(self):
1200
 
        """Archive.mirror with fromto."""
1201
 
        master = self.archive
1202
 
        mirror_name = master.name + '-MIRROR'
1203
 
        mirror = master.make_mirror(mirror_name, self.arch_dir/mirror_name)
1204
 
        master.mirror()
1205
 
        third_name = master.name + '-third'
1206
 
        third = master.make_mirror(third_name, self.arch_dir/third_name)
1207
 
        master.mirror(fromto=(mirror, third))
1208
 
        def nonarch(x): return x.nonarch
1209
 
        mirror_revisions = map(nonarch, mirror.iter_revisions())
1210
 
        third_revisions = map(nonarch, third.iter_revisions())
1211
 
        self.failUnlessEqual(mirror_revisions, third_revisions)
1212
 
    tests.append('mirror_fromto')
1213
 
 
1214
 
    def mirror_limit(self):
1215
 
        """Archive.mirror with limit version"""
1216
 
        version = self.version
1217
 
        other_version = self.other_version
1218
 
        version.latest_revision().make_continuation(other_version)
1219
 
        master = self.archive
1220
 
        mirror_name = master.name + '-MIRROR'
1221
 
        location = self.arch_dir/mirror_name
1222
 
        mirror = master.make_mirror(mirror_name, location)
1223
 
        master.mirror(limit=(version,))
1224
 
        def nonarch(x): return x.nonarch
1225
 
        mirror_versions = tuple(map(nonarch, mirror.iter_versions()))
1226
 
        self.failUnlessEqual(mirror_versions, (version.nonarch,))
1227
 
    tests.append('mirror_limit')
1228
 
 
1229
 
    def _check_logs(self, tree, levels):
1230
 
        expected_revs = map(lambda (x): self.version[x], levels)
1231
 
        found_revs = map(lambda (x): x.revision, tree.iter_logs())
1232
 
        self.assertEqual(expected_revs, found_revs)
1233
 
 
1234
 
    def tree_get(self):
1235
 
        """Get a working tree, given a revision."""
1236
 
        shutil.rmtree(self.working_dir)
1237
 
        tree = self.version['base-0'].get(self.working_dir)
1238
 
        self._check_logs(tree, ['base-0'])
1239
 
    tests.append('tree_get')
1240
 
 
1241
 
    def tree_update(self):
1242
 
        """WorkingTree.update, without options."""
1243
 
        shutil.rmtree(self.working_dir)
1244
 
        tree = self.version['base-0'].get(self.working_dir)
1245
 
        tree.update()
1246
 
        self._check_logs(tree, ['base-0', 'patch-1'])
1247
 
    tests.append('tree_update')
1248
 
 
1249
 
 
1250
 
class Merges(PopulatedArchive):
1251
 
 
1252
 
    tests = []
1253
 
 
1254
 
    def setup_first_merge(self):
1255
 
        version = self.version
1256
 
        other_version = self.other_version
1257
 
        tree = self.tree
1258
 
        version.latest_revision().make_continuation(other_version)
1259
 
        other_tree = other_version['base-0'].get(self.other_working_dir)
1260
 
        self.other_tree = other_tree
1261
 
        self.commit_summary(tree, 'second rev')
1262
 
        self.commit_summary(tree, 'third rev')
1263
 
 
1264
 
    def star_merge(self):
1265
 
        """star-merge with a version"""
1266
 
        self.setup_first_merge()
1267
 
        other_tree = self.other_tree
1268
 
        other_tree.star_merge(self.version)
1269
 
        msg = 'merge'
1270
 
        self.commit_summary(other_tree, msg)
1271
 
        log = self.other_version['patch-1'].patchlog
1272
 
        self.assertEqual(log.summary, msg)
1273
 
        vsn = self.version
1274
 
        expected = [vsn['patch-2'], vsn['patch-3']]
1275
 
        self.assertEqual(log.merged_patches, expected)
1276
 
    tests.append('star_merge')
1277
 
 
1278
 
    def setup_star_merge(self):
1279
 
        version, other_version = self.version, self.other_version
1280
 
        open(self.tree/'foo', 'w').write('hello')
1281
 
        self.tree.add_tag('foo')
1282
 
        self.commit_summary(self.tree, 'second rev')
1283
 
        version.latest_revision().make_continuation(other_version)
1284
 
        other_tree = other_version['base-0'].get(self.other_working_dir)
1285
 
        self.other_tree = other_tree
1286
 
        open(self.tree/'foo', 'w').write('hello world')
1287
 
        self.commit_summary(self.tree, 'third rev')
1288
 
 
1289
 
    def assertObject(self, obj, class_, **attrs):
1290
 
        self.assert_(isinstance(obj, class_))
1291
 
        for k, v in attrs.items():
1292
 
            self.assertEqual(getattr(obj, k), v)
1293
 
 
1294
 
    def iter_star_merge(self):
1295
 
        """star-merge incremental output"""
1296
 
        self.setup_star_merge()
1297
 
        changes = {}
1298
 
        for line in self.other_tree.iter_star_merge(self.version):
1299
 
            if isinstance(line, arch.Chatter):
1300
 
                continue
1301
 
            changes[line.name] = line
1302
 
        expected = (('foo', arch.FileModification, {'binary': False}),
1303
 
                    ('{arch}/cat/cat--brn/cat--brn--1.0/'
1304
 
                     'jdoe@example.com--example--9999/patch-log/patch-3',
1305
 
                     arch.FileAddition, {'directory': False}))
1306
 
        self.assertEqual(len(expected), len(changes))
1307
 
        for name, class_, attrs in expected:
1308
 
            self.assert_(name in changes)
1309
 
            change = changes[name]
1310
 
            self.assertObject(change, class_, **attrs)
1311
 
    tests.append('iter_star_merge')
1312
 
 
1313
 
    def iter_star_merge_conflict(self):
1314
 
        """incremental star-merge conflict"""
1315
 
        self.setup_star_merge()
1316
 
        open(self.tree/'foo', 'w').write('hello world')
1317
 
        self.commit_summary(self.tree, 'third rev')
1318
 
        open(self.other_tree/'foo', 'w').write('Hello, World!')
1319
 
        self.commit_summary(self.other_tree, 'first rev')
1320
 
        cset_app = self.other_tree.iter_star_merge(self.version)
1321
 
        for unused in cset_app: pass
1322
 
        self.assert_(cset_app.conflicting)
1323
 
    tests.append('iter_star_merge_conflict')
1324
 
 
1325
 
    def setup_merges(self):
1326
 
        self.setup_first_merge()
1327
 
        version, other_version = self.version, self.other_version
1328
 
        tree, other_tree = self.tree, self.other_tree
1329
 
        other_tree.star_merge(version)
1330
 
        self.commit_summary(other_tree, 'merge patch-3')
1331
 
        self.commit_summary(other_tree, 'empty merge')
1332
 
 
1333
 
    def depth_first(self, merges_iter):
1334
 
        merges = []
1335
 
        for i, F in merges_iter:
1336
 
            for f in F:
1337
 
                merges.append([i, f])
1338
 
        return merges
1339
 
 
1340
 
    def iter_depth(self, reverse=False):
1341
 
        """Version.iter_merges depth first"""
1342
 
        self.setup_merges()
1343
 
        vsn, oth = self.version, self.other_version
1344
 
        expected = [[oth['base-0'], vsn['base-0']],
1345
 
                    [oth['base-0'], vsn['patch-1']],
1346
 
                    [oth['base-0'], oth['base-0']],
1347
 
                    [oth['patch-1'], vsn['patch-2']],
1348
 
                    [oth['patch-1'], vsn['patch-3']],
1349
 
                    [oth['patch-1'], oth['patch-1']],
1350
 
                    [oth['patch-2'], oth['patch-2']]]
1351
 
        if reverse: expected.reverse()
1352
 
        merges_iter = self.other_version.iter_merges(reverse=reverse)
1353
 
        merges = self.depth_first(merges_iter)
1354
 
        self.assertEqual(expected, merges)
1355
 
    tests.append('iter_depth')
1356
 
 
1357
 
    def iter_depth_reverse(self):
1358
 
        """Version.iter_merges depth first in reverse"""
1359
 
        self.iter_depth(reverse=True)
1360
 
    tests.append('iter_depth_reverse')
1361
 
 
1362
 
    def iter_not_me(self):
1363
 
        """Version.iter_merges metoo=False"""
1364
 
        self.setup_merges()
1365
 
        vsn, oth = self.version, self.other_version
1366
 
        expected = [[oth['base-0'], vsn['base-0']],
1367
 
                    [oth['base-0'], vsn['patch-1']],
1368
 
                    [oth['patch-1'], vsn['patch-2']],
1369
 
                    [oth['patch-1'], vsn['patch-3']]]
1370
 
        merges = self.depth_first(self.other_version.iter_merges(metoo=False))
1371
 
        self.assertEqual(expected, merges)
1372
 
    tests.append('iter_not_me')
1373
 
 
1374
 
    def iter_version(self):
1375
 
        """Version.iter_merges limited to a single source version"""
1376
 
        self.setup_merges()
1377
 
        vsn, oth = self.version, self.other_version
1378
 
        expected = [[oth['base-0'], oth['base-0']],
1379
 
                    [oth['patch-1'], oth['patch-1']],
1380
 
                    [oth['patch-2'], oth['patch-2']]]
1381
 
        merges_iter = self.other_version.iter_merges(self.other_version)
1382
 
        merges = self.depth_first(merges_iter)
1383
 
        self.assertEqual(expected, merges)
1384
 
    tests.append('iter_version')
1385
 
 
1386
 
    def breadth_first(self, iterator):
1387
 
        merges = map(list, iterator)
1388
 
        for step in merges:
1389
 
            step[1] = list(step[1])
1390
 
        return merges
1391
 
 
1392
 
    def iter_breadth(self):
1393
 
        """Version.iter_merges with breadth first traversal."""
1394
 
        self.setup_merges()
1395
 
        merges_iter = self.other_version.iter_merges()
1396
 
        merges = self.breadth_first(merges_iter)
1397
 
        vsn, oth = self.version, self.other_version
1398
 
        expected = [[oth['base-0'], [vsn['base-0'],
1399
 
                                     vsn['patch-1'],
1400
 
                                     oth['base-0']]],
1401
 
                    [oth['patch-1'], [vsn['patch-2'],
1402
 
                                      vsn['patch-3'],
1403
 
                                      oth['patch-1']]],
1404
 
                    [oth['patch-2'], [oth['patch-2']]]]
1405
 
        self.assertEqual(expected, merges)
1406
 
    tests.append('iter_breadth')
1407
 
 
1408
 
 
1409
 
class RevisionFiles(PopulatedArchive):
1410
 
 
1411
 
    tests = []
1412
 
 
1413
 
    def parse_checksum(self):
1414
 
        """Parsing checksum file data"""
1415
 
        checksum = '\n'.join((
1416
 
            "-----BEGIN PGP SIGNED MESSAGE-----",
1417
 
            "Hash: SHA1",
1418
 
            "",
1419
 
            "Signature-for: ddaa@ddaa.net--2004/pyarch--ddaa--0.5--base-0",
1420
 
            "md5 pyarch--ddaa--0.5--base-0.tar.gz"
1421
 
            " f1998e8294e50070993471cec6b66b52",
1422
 
            "foo bar-baz-blah 0123456789abcdef",
1423
 
            "sha1 pyarch--ddaa--0.5--base-0.tar.gz"
1424
 
            " 633c0efcceab4e6588a04a9ce73233bc206bcdf9",
1425
 
            "-----BEGIN PGP SIGNATURE-----",
1426
 
            "Version: GnuPG v1.2.4 (GNU/Linux)",
1427
 
            "",
1428
 
            "iD8DBQFAUQ/bZEH9AkgfRL0RAiiFAKCnV9jenFvCsckbIigtQmHYfZgcKwCgk5FY",
1429
 
            "XQwvigYzMiiR9lFZh6vYUJM=",
1430
 
            "=PCU/",
1431
 
            "-----END PGP SIGNATURE-----",
1432
 
            ""))
1433
 
        rvsn  = arch.Revision('ddaa@ddaa.net--2004/pyarch--ddaa--0.5--base-0')
1434
 
        checksums = rvsn._parse_checksum(checksum)
1435
 
        expected = {'pyarch--ddaa--0.5--base-0.tar.gz':
1436
 
                    {'md5': 'f1998e8294e50070993471cec6b66b52',
1437
 
                     'sha1': '633c0efcceab4e6588a04a9ce73233bc206bcdf9'},
1438
 
                    'bar-baz-blah': {'foo': '0123456789abcdef'}}
1439
 
        self.assertEqual(expected, checksums)
1440
 
    tests.append('parse_checksum')
1441
 
 
1442
 
    def _help_file_names(self, rvsn, expected_names):
1443
 
        files = rvsn.iter_files()
1444
 
        file_names = map(lambda (x): x.name, files)
1445
 
        file_names.sort()
1446
 
        expected_names.sort()
1447
 
        self.assertEqual(file_names, expected_names)
1448
 
 
1449
 
    def import_files(self):
1450
 
        """Import revisions have expected files."""
1451
 
        rvsn  = self.version['base-0']
1452
 
        expected_names = ['checksum', 'log', rvsn.nonarch + '.src.tar.gz' ]
1453
 
        self._help_file_names(rvsn, expected_names)
1454
 
    tests.append('import_files')
1455
 
 
1456
 
    def commit_files(self):
1457
 
        """Commit revisions (uncached) have expected files."""
1458
 
        rvsn = self.version['patch-1']
1459
 
        expected_names = ['checksum', 'log', rvsn.nonarch + '.patches.tar.gz']
1460
 
        self._help_file_names(rvsn, expected_names)
1461
 
    tests.append('commit_files')
1462
 
 
1463
 
    def cachedrev_files(self):
1464
 
        """Commit revisions (cached) have expected files."""
1465
 
        rvsn = self.version['patch-1']
1466
 
        rvsn.cache()
1467
 
        expected_names = ['checksum', 'log', rvsn.nonarch + '.patches.tar.gz',
1468
 
                          'checksum.cacherev', rvsn.nonarch + '.tar.gz']
1469
 
        self._help_file_names(rvsn, expected_names)
1470
 
    tests.append('cachedrev_files')
1471
 
 
1472
 
    def revision_files_data(self):
1473
 
        """Accessing the data of Revision files."""
1474
 
        rvsn = self.version['patch-1']
1475
 
        rvsn.cache()
1476
 
        rev_dir = reduce(
1477
 
            lambda a,b: a/DirName(b), \
1478
 
            (self.arch_dir, self.arch_name,
1479
 
             'cat', 'cat--brn', 'cat--brn--1.0', 'patch-1'))
1480
 
        for rev_file in rvsn.iter_files():
1481
 
            self.assert_(rev_file.data == open(rev_dir/rev_file.name).read())
1482
 
    tests.append('revision_files_data')
1483
 
 
1484
 
 
1485
 
class PristineTrees(PopulatedArchive):
1486
 
 
1487
 
    tests = []
1488
 
 
1489
 
    def iter_pristines_one(self):
1490
 
        """iter_pristines works"""
1491
 
        result = list(self.tree.iter_pristines())
1492
 
        expected = [self.version['patch-1']]
1493
 
        self.assertEqual(expected, result)
1494
 
    tests.append('iter_pristines_one')
1495
 
 
1496
 
    def add_pristine(self):
1497
 
        """add_pristine works"""
1498
 
        self.tree.add_pristine(self.version['base-0'])
1499
 
        result = list(self.tree.iter_pristines())
1500
 
        expected = [self.version['base-0'], self.version['patch-1']]
1501
 
        self.assertEqual(expected, result)
1502
 
    tests.append('add_pristine')
1503
 
 
1504
 
    def find_pristine(self):
1505
 
        """find_pristine works"""
1506
 
        revision = self.version['patch-1']
1507
 
        result = self.tree.find_pristine(revision)
1508
 
        expected = (self.tree / '{arch}' / '++pristine-trees' / 'unlocked' /
1509
 
                    self.version.category.nonarch /
1510
 
                    self.version.branch.nonarch / self.version.nonarch /
1511
 
                    self.archive.name / revision.nonarch)
1512
 
        self.assertEqual(expected, result)
1513
 
        self.assertEqual(arch.ArchSourceTree, type(result))
1514
 
    tests.append('find_pristine')
1515
 
 
1516
 
    def find_pristine_missing(self):
1517
 
        """find_pristine raises NoPristineFoundError"""
1518
 
        revision = self.version['base-0']
1519
 
        self.assert_(revision not in self.tree.iter_pristines())
1520
 
        def thunk(): return self.tree.find_pristine(revision)
1521
 
        self.assertRaises(arch.errors.NoPristineFoundError, thunk)
1522
 
    tests.append('find_pristine_missing')
1523
 
 
1524
 
 
1525
 
class Changeset(PopulatedArchive):
1526
 
 
1527
 
    tests = []
1528
 
 
1529
 
    def setUp(self):
1530
 
        PopulatedArchive.setUp(self)
1531
 
        self.base0 = self.version['base-0']
1532
 
        self.patch1 = self.version['patch-1']
1533
 
        self.base0_tree = self.base0.get(self.arch_dir/'base0')
1534
 
        self.patch1_tree = self.patch1.get(self.arch_dir/'patch1')
1535
 
        self.changeset_name = self.arch_dir/'cset'
1536
 
 
1537
 
    def changeset_does_nothing(self):
1538
 
        """Changeset.does_nothing works."""
1539
 
        cset = arch.delta(
1540
 
            self.patch1_tree, self.patch1_tree, self.changeset_name)
1541
 
        self.assertEqual(True, cset.does_nothing)
1542
 
        shutil.rmtree(cset)
1543
 
        cset = arch.delta(
1544
 
            self.base0_tree, self.patch1_tree, self.changeset_name)
1545
 
        self.assertEqual(False, cset.does_nothing)
1546
 
    tests.append('changeset_does_nothing')
1547
 
 
1548
 
 
1549
 
class Delta(Changeset):
1550
 
 
1551
 
    tests = []
1552
 
 
1553
 
    def _patchlog_name(self, rvsn):
1554
 
        return '{arch}/%s/%s/%s/%s/patch-log/%s' % \
1555
 
               (rvsn.category.nonarch, rvsn.branch.nonarch,
1556
 
                rvsn.version.nonarch, rvsn.archive.name, rvsn.patchlevel)
1557
 
 
1558
 
    def _delta_helper(self, orig, mod):
1559
 
        name = self._patchlog_name(self.patch1)
1560
 
        expected_created = [('A_./'+name, name)]
1561
 
        cset = arch.delta(orig, mod, self.changeset_name)
1562
 
        created = list(cset.iter_created_files(True))
1563
 
        self.assertEqual(expected_created, created)
1564
 
 
1565
 
    def delta_tree_tree(self):
1566
 
        """delta(tree, tree) works."""
1567
 
        self._delta_helper(self.base0_tree, self.patch1_tree)
1568
 
    tests.append('delta_tree_tree')
1569
 
 
1570
 
    def delta_tree_revision(self):
1571
 
        """delta(tree, revision) works."""
1572
 
        self._delta_helper(self.base0_tree, self.patch1)
1573
 
    tests.append('delta_tree_revision')
1574
 
 
1575
 
    def delta_revision_tree(self):
1576
 
        """delta(revision, tree) works."""
1577
 
        self._delta_helper(self.base0, self.patch1_tree)
1578
 
    tests.append('delta_revision_tree')
1579
 
 
1580
 
    def delta_revision_revision(self):
1581
 
        """delta(revision, revision) works."""
1582
 
        self._delta_helper(self.base0, self.patch1)
1583
 
    tests.append('delta_revision_revision')
1584
 
 
1585
 
    def iter_delta(self):
1586
 
        """iter_delta works."""
1587
 
        orig, mod = self.base0, self.patch1
1588
 
        name = self._patchlog_name(self.patch1)
1589
 
        idelta = arch.iter_delta(orig, mod, self.changeset_name)
1590
 
        def get_changeset(): return idelta.changeset
1591
 
        self.assertRaises(AttributeError, get_changeset)
1592
 
        lines = [ L for L in idelta if not isinstance(L, arch.Chatter)]
1593
 
        self.assertEqual(1, len(lines))
1594
 
        self.assertEqual(arch.FileAddition, type(lines[0]))
1595
 
        self.assertEqual(name, lines[0].name)
1596
 
        cset = get_changeset()
1597
 
        created = list(cset.iter_created_files(True))
1598
 
        expected_created = [('A_./'+name, name)]
1599
 
        self.assertEqual(expected_created, created)
1600
 
    tests.append('iter_delta')
1601
 
 
1602
 
 
1603
 
class TreeChanges(Delta):
1604
 
    tests = []
1605
 
 
1606
 
    def changes(self):
1607
 
        """WorkingTree.changes works"""
1608
 
        cset = self.tree.changes(None, self.changeset_name)
1609
 
        self.assertEqual(True, cset.does_nothing)
1610
 
        shutil.rmtree(cset)
1611
 
        cset = self.tree.changes(self.base0, self.changeset_name)
1612
 
        log_name = self._patchlog_name(self.patch1)
1613
 
        expected = [('A_./%s' % log_name, log_name)]
1614
 
        newfiles = list(cset.iter_created_files(True))
1615
 
        self.assertEqual(expected, newfiles)
1616
 
        shutil.rmtree(cset)
1617
 
        open(self.tree/'foo', 'w').write("hello\n")
1618
 
        self.tree.add_tag('foo')
1619
 
        cset = self.tree.changes(None, self.changeset_name)
1620
 
        expected = ['.arch-ids/foo.id', 'foo']
1621
 
        newfiles = [C[1] for C in cset.iter_created_files(True)]
1622
 
        newfiles.sort()
1623
 
        self.assertEqual(expected, newfiles)
1624
 
    tests.append('changes')
1625
 
 
1626
 
 
1627
 
class FileHistory(TestCase):
1628
 
 
1629
 
    file_history = [
1630
 
        { '%add': ['foo'], 'foo': "hello world\n" },
1631
 
        { 'foo': "Hello, World\n" } ]
1632
 
 
1633
 
    def extraSetup(self):
1634
 
        self.set_my_id()
1635
 
        self.create_archive()
1636
 
        self.create_version()
1637
 
        self.create_working_tree(self.version)
1638
 
 
1639
 
    def setUp(self):
1640
 
        TestCase.setUp(self)
1641
 
        self.tree = self.working_tree
1642
 
        self.make_history(self.tree, self.file_history)
1643
 
 
1644
 
 
1645
 
class Patchlog(TestCase):
1646
 
 
1647
 
    def extraSetup(self):
1648
 
        self.set_my_id()
1649
 
        self.create_archive()
1650
 
        self.create_version()
1651
 
        self.create_working_tree(self.version)
1652
 
 
1653
 
    def setUp(self):
1654
 
        TestCase.setUp(self)
1655
 
        self.tree = self.working_tree
1656
 
 
1657
 
    tests = []
1658
 
 
1659
 
    def renames(self):
1660
 
        """Patchlog.renamed_files works."""
1661
 
        file_history = [
1662
 
            { '%add': ['foo', 'bar'], 'foo': "hello\n", 'bar': "world\n" },
1663
 
            { '%mv': [['foo', 'spam'], ['bar', 'eggs']] } ]
1664
 
        self.make_history(self.tree, file_history)
1665
 
        log = list(self.tree.iter_logs())[1]
1666
 
        def filetag(name):
1667
 
            name = FileName(name)
1668
 
            return name.dirname()/'.arch-ids'/name.basename()+'.id'
1669
 
        expected = {'foo':'spam', filetag('foo'): filetag('spam'),
1670
 
                    'bar':'eggs', filetag('bar'): filetag('eggs')}
1671
 
        self.assertEqual(expected, log.renamed_files)
1672
 
    tests.append('renames')
1673
 
 
1674
 
    def get_missing(self):
1675
 
        """Patchlog getitem on a missing header returns None."""
1676
 
        self.tree.import_()
1677
 
        log = self.tree.tree_revision.patchlog
1678
 
        self.assertEqual(None, log['Not-Present-Header'])
1679
 
    tests.append('get_missing')
1680
 
 
1681
 
    def revision(self):
1682
 
        """Patchlog.revision works."""
1683
 
        self.tree.import_()
1684
 
        rvsn = self.tree.tree_revision
1685
 
        self.assertEqual(rvsn, rvsn.patchlog.revision)
1686
 
    tests.append('revision')
1687
 
 
1688
 
    def summary(self):
1689
 
        """Patchlog.summary works."""
1690
 
        self.tree.import_()
1691
 
        rvsn = self.tree.tree_revision
1692
 
        self.assertEqual("initial import", rvsn.patchlog.summary)
1693
 
    tests.append('summary')
1694
 
 
1695
 
 
1696
 
class FileFind(FileHistory):
1697
 
 
1698
 
    tests = []
1699
 
 
1700
 
    def file_find_pristine(self):
1701
 
        """file_find works with pristines"""
1702
 
        revision = self.version['patch-1']
1703
 
        path = self.tree.file_find('foo', revision)
1704
 
        self.assertEqual(PathName, type(path))
1705
 
        pristine = self.tree.find_pristine(revision)
1706
 
        expected = pristine/'foo'
1707
 
        self.assertEqual(expected, path)
1708
 
        result = open(path).read()
1709
 
        expected = self.file_history[1]['foo']
1710
 
        self.assertEqual(expected, result)
1711
 
    tests.append('file_find_pristine')
1712
 
 
1713
 
    def file_find_revlib(self):
1714
 
        """file_find works with the revision library"""
1715
 
        revlib_dir = self.arch_dir/'revlib'
1716
 
        os.mkdir(revlib_dir)
1717
 
        arch.register_revision_library(revlib_dir)
1718
 
        revision = self.version['patch-1']
1719
 
        shutil.rmtree(self.tree/'{arch}'/'++pristine-trees')
1720
 
        self.assertEqual([], list(self.tree.iter_pristines()))
1721
 
        revision.library_add()
1722
 
        path = self.tree.file_find('foo', revision)
1723
 
        self.assertEqual(PathName, type(path))
1724
 
        pristine = revision.library_find()
1725
 
        expected = pristine/'foo'
1726
 
        self.assertEqual(expected, path)
1727
 
        result = open(path).read()
1728
 
        expected = self.file_history[1]['foo']
1729
 
        self.assertEqual(expected, result)
1730
 
    tests.append('file_find_revlib')
1731
 
 
1732
 
    def file_find_pristine_missing(self):
1733
 
        """file_find works with missing pristine"""
1734
 
        revision = self.version['base-0']
1735
 
        path = self.tree.file_find('foo', revision)
1736
 
        self.assertEqual(PathName, type(path))
1737
 
        pristine = self.tree.find_pristine(revision)
1738
 
        expected = pristine/'foo'
1739
 
        self.assertEqual(expected, path)
1740
 
        result = open(path).read()
1741
 
        expected = self.file_history[0]['foo']
1742
 
        self.assertEqual(expected, result)
1743
 
    tests.append('file_find_pristine_missing')
1744
 
 
1745
 
    def _file_find_raises_helper(self, flavour):
1746
 
        revision = self.version['patch-1']
1747
 
        name = {'source': 'bar', 'junk': ',bar'}[flavour]
1748
 
        open(self.tree/name, 'w').write('Yadda!\n')
1749
 
        if flavour == 'source': self.tree.add_tag(name)
1750
 
        def thunk(): return self.tree.file_find(name, revision)
1751
 
        self.assertRaises(arch.errors.MissingFileError, thunk)
1752
 
 
1753
 
    def file_find_new_raises(self):
1754
 
        """file_find raises MissingFileError for new source files"""
1755
 
        self._file_find_raises_helper('source')
1756
 
    tests.append('file_find_new_raises')
1757
 
 
1758
 
    def file_find_junk_raises(self):
1759
 
        """file_find raises MissingFileError for junk files"""
1760
 
        self._file_find_raises_helper('junk')
1761
 
    tests.append('file_find_junk_raises')
1762
 
 
1763
 
 
1764
 
class ApplyChangeset(FileHistory, Delta):
1765
 
 
1766
 
    tests = []
1767
 
 
1768
 
    def setUp(self):
1769
 
        FileHistory.setUp(self)
1770
 
        shutil.rmtree(self.working_dir)
1771
 
        self.tree = None
1772
 
        self.base0 = self.version['base-0']
1773
 
        self.patch1 = self.version['patch-1']
1774
 
        self.changeset_name = self.arch_dir/'cset'
1775
 
        self.cset = arch.delta(self.base0, self.patch1, self.changeset_name)
1776
 
 
1777
 
    def iter_apply(self):
1778
 
        """Changeset.iter_apply works"""
1779
 
        tree = self.base0.get(self.working_dir)
1780
 
        lines = list(self.cset.iter_apply(tree))
1781
 
        self.assertEqual(2, len(lines))
1782
 
        self.assertEqual(arch.FileAddition, type(lines[0]))
1783
 
        self.assertEqual(self._patchlog_name(self.patch1), lines[0].name)
1784
 
        self.assertEqual(arch.FileModification, type(lines[1]))
1785
 
        self.assertEqual('foo', lines[1].name)
1786
 
        self.assertEqual(self.patch1, list(tree.iter_logs())[-1].revision)
1787
 
        self.assert_(not tree.has_changes())
1788
 
    tests.append('iter_apply')
1789
 
 
1790
 
    def iter_apply_conflict(self):
1791
 
        """Changeset.iter_apply works with conflicts."""
1792
 
        tree = self.patch1.get(self.working_dir)
1793
 
        lines = list(self.cset.iter_apply(tree))
1794
 
        self.assertEqual(2, len(lines))
1795
 
        self.assertEqual(arch.FileAddition, type(lines[0]))
1796
 
        self.assertEqual(self._patchlog_name(self.patch1), lines[0].name)
1797
 
        self.assertEqual(arch.PatchConflict, type(lines[1]))
1798
 
        self.assertEqual('foo', lines[1].name)
1799
 
        self.assertEqual(self.patch1, list(tree.iter_logs())[-1].revision)
1800
 
        self.assert_(not tree.has_changes())
1801
 
    tests.append('iter_apply_conflict')
1802
 
 
1803
 
    def iter_apply_reverse(self):
1804
 
        """Changeset.iter_apply reverse=True works."""
1805
 
        tree = self.patch1.get(self.working_dir)
1806
 
        lines = list(self.cset.iter_apply(tree, reverse=True))
1807
 
        self.assertEqual(arch.FileDeletion, type(lines[0]))
1808
 
        self.assertEqual(self._patchlog_name(self.patch1), lines[0].name)
1809
 
        self.assertEqual(arch.FileModification, type(lines[1]))
1810
 
        self.assertEqual('foo', lines[1].name)
1811
 
        self.assertEqual(self.base0, list(tree.iter_logs())[-1].revision)
1812
 
        self.assert_(not tree.has_changes())
1813
 
    tests.append('iter_apply_reverse')
1814
 
 
1815
 
 
1816
 
def test_suite(limit=()):
1817
 
    classes = (
1818
 
        'ArchBasic',
1819
 
        'Archive',
1820
 
        'MirrorArchive',
1821
 
        'InitTree',
1822
 
        'InventoryTree',
1823
 
        'WorkingTree',
1824
 
        'TreeRevision',
1825
 
        'LogMessage',
1826
 
        'LogMessageWhitespace',
1827
 
        'Import',
1828
 
        'Commit',
1829
 
        'EscapedWhitespace',
1830
 
        'LatestVersion',
1831
 
        'LatestRevision',
1832
 
        'PopulatedArchive',
1833
 
        'Merges',
1834
 
        'RevisionFiles',
1835
 
        'PristineTrees',
1836
 
        'Changeset',
1837
 
        'Delta',
1838
 
        'TreeChanges',
1839
 
        'Patchlog',
1840
 
        'FileFind',
1841
 
        'ApplyChangeset',
1842
 
        )
1843
 
    return framework.make_test_suite(globals(), classes, limit)
1844
 
 
1845
 
def main(argv):
1846
 
    return framework.run_test_suite(argv, test_suite)
1847
 
 
1848
 
if __name__ == "__main__":
1849
 
    sys.exit(main(sys.argv))