~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/groupcompress_repo.py

merge bzr.dev@4126 into brisbane-core

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Repostory formats using B+Tree indices and groupcompress compression."""
 
18
 
 
19
import time
 
20
 
 
21
from bzrlib import (
 
22
    bzrdir,
 
23
    chk_map,
 
24
    chk_serializer,
 
25
    debug,
 
26
    errors,
 
27
    index as _mod_index,
 
28
    inventory,
 
29
    knit,
 
30
    osutils,
 
31
    pack,
 
32
    repository,
 
33
    revision as _mod_revision,
 
34
    trace,
 
35
    ui,
 
36
    )
 
37
from bzrlib.index import GraphIndex, GraphIndexBuilder
 
38
from bzrlib.groupcompress import (
 
39
    _GCGraphIndex,
 
40
    GroupCompressVersionedFiles,
 
41
    )
 
42
from bzrlib.repofmt.pack_repo import (
 
43
    Pack,
 
44
    NewPack,
 
45
    KnitPackRepository,
 
46
    PackRootCommitBuilder,
 
47
    RepositoryPackCollection,
 
48
    RepositoryFormatKnitPack6,
 
49
    Packer,
 
50
    CHKInventoryRepository,
 
51
    RepositoryFormatPackDevelopment5Hash16,
 
52
    RepositoryFormatPackDevelopment5Hash255,
 
53
    )
 
54
 
 
55
 
 
56
 
 
57
class GCPack(NewPack):
 
58
 
 
59
    def __init__(self, pack_collection, upload_suffix='', file_mode=None):
 
60
        """Create a NewPack instance.
 
61
 
 
62
        :param pack_collection: A PackCollection into which this is being
 
63
            inserted.
 
64
        :param upload_suffix: An optional suffix to be given to any temporary
 
65
            files created during the pack creation. e.g '.autopack'
 
66
        :param file_mode: An optional file mode to create the new files with.
 
67
        """
 
68
        # replaced from NewPack to:
 
69
        # - change inventory reference list length to 1
 
70
        # - change texts reference lists to 1
 
71
        # TODO: patch this to be parameterised
 
72
 
 
73
        # The relative locations of the packs are constrained, but all are
 
74
        # passed in because the caller has them, so as to avoid object churn.
 
75
        index_builder_class = pack_collection._index_builder_class
 
76
        # from brisbane-core
 
77
        if pack_collection.chk_index is not None:
 
78
            chk_index = index_builder_class(reference_lists=0)
 
79
        else:
 
80
            chk_index = None
 
81
        Pack.__init__(self,
 
82
            # Revisions: parents list, no text compression.
 
83
            index_builder_class(reference_lists=1),
 
84
            # Inventory: We want to map compression only, but currently the
 
85
            # knit code hasn't been updated enough to understand that, so we
 
86
            # have a regular 2-list index giving parents and compression
 
87
            # source.
 
88
            index_builder_class(reference_lists=1),
 
89
            # Texts: compression and per file graph, for all fileids - so two
 
90
            # reference lists and two elements in the key tuple.
 
91
            index_builder_class(reference_lists=1, key_elements=2),
 
92
            # Signatures: Just blobs to store, no compression, no parents
 
93
            # listing.
 
94
            index_builder_class(reference_lists=0),
 
95
            # CHK based storage - just blobs, no compression or parents.
 
96
            chk_index=chk_index
 
97
            )
 
98
        self._pack_collection = pack_collection
 
99
        # When we make readonly indices, we need this.
 
100
        self.index_class = pack_collection._index_class
 
101
        # where should the new pack be opened
 
102
        self.upload_transport = pack_collection._upload_transport
 
103
        # where are indices written out to
 
104
        self.index_transport = pack_collection._index_transport
 
105
        # where is the pack renamed to when it is finished?
 
106
        self.pack_transport = pack_collection._pack_transport
 
107
        # What file mode to upload the pack and indices with.
 
108
        self._file_mode = file_mode
 
109
        # tracks the content written to the .pack file.
 
110
        self._hash = osutils.md5()
 
111
        # a four-tuple with the length in bytes of the indices, once the pack
 
112
        # is finalised. (rev, inv, text, sigs)
 
113
        self.index_sizes = None
 
114
        # How much data to cache when writing packs. Note that this is not
 
115
        # synchronised with reads, because it's not in the transport layer, so
 
116
        # is not safe unless the client knows it won't be reading from the pack
 
117
        # under creation.
 
118
        self._cache_limit = 0
 
119
        # the temporary pack file name.
 
120
        self.random_name = osutils.rand_chars(20) + upload_suffix
 
121
        # when was this pack started ?
 
122
        self.start_time = time.time()
 
123
        # open an output stream for the data added to the pack.
 
124
        self.write_stream = self.upload_transport.open_write_stream(
 
125
            self.random_name, mode=self._file_mode)
 
126
        if 'pack' in debug.debug_flags:
 
127
            trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
 
128
                time.ctime(), self.upload_transport.base, self.random_name,
 
129
                time.time() - self.start_time)
 
130
        # A list of byte sequences to be written to the new pack, and the
 
131
        # aggregate size of them.  Stored as a list rather than separate
 
132
        # variables so that the _write_data closure below can update them.
 
133
        self._buffer = [[], 0]
 
134
        # create a callable for adding data
 
135
        #
 
136
        # robertc says- this is a closure rather than a method on the object
 
137
        # so that the variables are locals, and faster than accessing object
 
138
        # members.
 
139
        def _write_data(bytes, flush=False, _buffer=self._buffer,
 
140
            _write=self.write_stream.write, _update=self._hash.update):
 
141
            _buffer[0].append(bytes)
 
142
            _buffer[1] += len(bytes)
 
143
            # buffer cap
 
144
            if _buffer[1] > self._cache_limit or flush:
 
145
                bytes = ''.join(_buffer[0])
 
146
                _write(bytes)
 
147
                _update(bytes)
 
148
                _buffer[:] = [[], 0]
 
149
        # expose this on self, for the occasion when clients want to add data.
 
150
        self._write_data = _write_data
 
151
        # a pack writer object to serialise pack records.
 
152
        self._writer = pack.ContainerWriter(self._write_data)
 
153
        self._writer.begin()
 
154
        # what state is the pack in? (open, finished, aborted)
 
155
        self._state = 'open'
 
156
 
 
157
    def _check_references(self):
 
158
        """Make sure our external references are present.
 
159
 
 
160
        Packs are allowed to have deltas whose base is not in the pack, but it
 
161
        must be present somewhere in this collection.  It is not allowed to
 
162
        have deltas based on a fallback repository.
 
163
        (See <https://bugs.launchpad.net/bzr/+bug/288751>)
 
164
        """
 
165
        # Groupcompress packs don't have any external references
 
166
 
 
167
 
 
168
class GCCHKPacker(Packer):
 
169
    """This class understand what it takes to collect a GCCHK repo."""
 
170
 
 
171
    def __init__(self, pack_collection, packs, suffix, revision_ids=None,
 
172
                 reload_func=None):
 
173
        super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
 
174
                                          revision_ids=revision_ids,
 
175
                                          reload_func=reload_func)
 
176
        self._pack_collection = pack_collection
 
177
        # ATM, We only support this for GCCHK repositories
 
178
        assert pack_collection.chk_index is not None
 
179
        self._gather_text_refs = False
 
180
        self._chk_id_roots = []
 
181
        self._chk_p_id_roots = []
 
182
        self._text_refs = None
 
183
        # set by .pack() if self.revision_ids is not None
 
184
        self.revision_keys = None
 
185
 
 
186
    def _get_progress_stream(self, source_vf, keys, message, pb):
 
187
        def pb_stream():
 
188
            substream = source_vf.get_record_stream(keys, 'groupcompress', True)
 
189
            for idx, record in enumerate(substream):
 
190
                if pb is not None:
 
191
                    pb.update(message, idx + 1, len(keys))
 
192
                yield record
 
193
        return pb_stream()
 
194
 
 
195
    def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
 
196
        """Filter the texts of inventories, to find the chk pages."""
 
197
        total_keys = len(keys)
 
198
        def _filtered_inv_stream():
 
199
            id_roots_set = set()
 
200
            p_id_roots_set = set()
 
201
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
 
202
            for idx, record in enumerate(stream):
 
203
                bytes = record.get_bytes_as('fulltext')
 
204
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
 
205
                                                             record.key)
 
206
                if pb is not None:
 
207
                    pb.update('inv', idx, total_keys)
 
208
                key = chk_inv.id_to_entry.key()
 
209
                if key not in id_roots_set:
 
210
                    self._chk_id_roots.append(key)
 
211
                    id_roots_set.add(key)
 
212
                p_id_map = chk_inv.parent_id_basename_to_file_id
 
213
                assert p_id_map is not None
 
214
                key = p_id_map.key()
 
215
                if key not in p_id_roots_set:
 
216
                    p_id_roots_set.add(key)
 
217
                    self._chk_p_id_roots.append(key)
 
218
                yield record
 
219
            # We have finished processing all of the inventory records, we
 
220
            # don't need these sets anymore
 
221
            id_roots_set.clear()
 
222
            p_id_roots_set.clear()
 
223
        return _filtered_inv_stream()
 
224
 
 
225
    def _get_chk_streams(self, source_vf, keys, pb=None):
 
226
        # We want to stream the keys from 'id_roots', and things they
 
227
        # reference, and then stream things from p_id_roots and things they
 
228
        # reference, and then any remaining keys that we didn't get to.
 
229
 
 
230
        # We also group referenced texts together, so if one root references a
 
231
        # text with prefix 'a', and another root references a node with prefix
 
232
        # 'a', we want to yield those nodes before we yield the nodes for 'b'
 
233
        # This keeps 'similar' nodes together.
 
234
 
 
235
        # Note: We probably actually want multiple streams here, to help the
 
236
        #       client understand that the different levels won't compress well
 
237
        #       against each other.
 
238
        #       Test the difference between using one Group per level, and
 
239
        #       using 1 Group per prefix. (so '' (root) would get a group, then
 
240
        #       all the references to search-key 'a' would get a group, etc.)
 
241
        total_keys = len(keys)
 
242
        remaining_keys = set(keys)
 
243
        counter = [0]
 
244
        if self._gather_text_refs:
 
245
            bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
 
246
            self._text_refs = set()
 
247
        def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
 
248
            cur_keys = root_keys
 
249
            while cur_keys:
 
250
                keys_by_search_prefix = {}
 
251
                remaining_keys.difference_update(cur_keys)
 
252
                next_keys = set()
 
253
                def handle_internal_node(node):
 
254
                    for prefix, value in node._items.iteritems():
 
255
                        # We don't want to request the same key twice, and we
 
256
                        # want to order it by the first time it is seen.
 
257
                        # Even further, we don't want to request a key which is
 
258
                        # not in this group of pack files (it should be in the
 
259
                        # repo, but it doesn't have to be in the group being
 
260
                        # packed.)
 
261
                        # TODO: consider how to treat externally referenced chk
 
262
                        #       pages as 'external_references' so that we
 
263
                        #       always fill them in for stacked branches
 
264
                        if value not in next_keys and value in remaining_keys:
 
265
                            keys_by_search_prefix.setdefault(prefix,
 
266
                                []).append(value)
 
267
                            next_keys.add(value)
 
268
                def handle_leaf_node(node):
 
269
                    # Store is None, because we know we have a LeafNode, and we
 
270
                    # just want its entries
 
271
                    for file_id, bytes in node.iteritems(None):
 
272
                        name_utf8, file_id, revision_id = bytes_to_info(bytes)
 
273
                        self._text_refs.add((file_id, revision_id))
 
274
                def next_stream():
 
275
                    stream = source_vf.get_record_stream(cur_keys,
 
276
                                                         'as-requested', True)
 
277
                    for record in stream:
 
278
                        bytes = record.get_bytes_as('fulltext')
 
279
                        # We don't care about search_key_func for this code,
 
280
                        # because we only care about external references.
 
281
                        node = chk_map._deserialise(bytes, record.key,
 
282
                                                    search_key_func=None)
 
283
                        common_base = node._search_prefix
 
284
                        if isinstance(node, chk_map.InternalNode):
 
285
                            handle_internal_node(node)
 
286
                        elif parse_leaf_nodes:
 
287
                            handle_leaf_node(node)
 
288
                        counter[0] += 1
 
289
                        if pb is not None:
 
290
                            pb.update('chk node', counter[0], total_keys)
 
291
                        yield record
 
292
                yield next_stream()
 
293
                # Double check that we won't be emitting any keys twice
 
294
                # If we get rid of the pre-calculation of all keys, we could
 
295
                # turn this around and do
 
296
                # next_keys.difference_update(seen_keys)
 
297
                # However, we also may have references to chk pages in another
 
298
                # pack file during autopack. We filter earlier, so we should no
 
299
                # longer need to do this
 
300
                # next_keys = next_keys.intersection(remaining_keys)
 
301
                cur_keys = []
 
302
                for prefix in sorted(keys_by_search_prefix):
 
303
                    cur_keys.extend(keys_by_search_prefix.pop(prefix))
 
304
        for stream in _get_referenced_stream(self._chk_id_roots,
 
305
                                             self._gather_text_refs):
 
306
            yield stream
 
307
        del self._chk_id_roots
 
308
        # while it isn't really possible for chk_id_roots to not be in the
 
309
        # local group of packs, it is possible that the tree shape has not
 
310
        # changed recently, so we need to filter _chk_p_id_roots by the
 
311
        # available keys
 
312
        chk_p_id_roots = [key for key in self._chk_p_id_roots
 
313
                          if key in remaining_keys]
 
314
        del self._chk_p_id_roots
 
315
        for stream in _get_referenced_stream(chk_p_id_roots, False):
 
316
            yield stream
 
317
        if remaining_keys:
 
318
            trace.mutter('There were %d keys in the chk index, %d of which'
 
319
                         ' were not referenced', total_keys,
 
320
                         len(remaining_keys))
 
321
            if self.revision_ids is None:
 
322
                stream = source_vf.get_record_stream(remaining_keys,
 
323
                                                     'unordered', True)
 
324
                yield stream
 
325
 
 
326
    def _build_vf(self, index_name, parents, delta, for_write=False):
 
327
        """Build a VersionedFiles instance on top of this group of packs."""
 
328
        index_name = index_name + '_index'
 
329
        index_to_pack = {}
 
330
        access = knit._DirectPackAccess(index_to_pack)
 
331
        if for_write:
 
332
            # Use new_pack
 
333
            assert self.new_pack is not None
 
334
            index = getattr(self.new_pack, index_name)
 
335
            index_to_pack[index] = self.new_pack.access_tuple()
 
336
            index.set_optimize(for_size=True)
 
337
            access.set_writer(self.new_pack._writer, index,
 
338
                              self.new_pack.access_tuple())
 
339
            add_callback = index.add_nodes
 
340
        else:
 
341
            indices = []
 
342
            for pack in self.packs:
 
343
                sub_index = getattr(pack, index_name)
 
344
                index_to_pack[sub_index] = pack.access_tuple()
 
345
                indices.append(sub_index)
 
346
            index = _mod_index.CombinedGraphIndex(indices)
 
347
            add_callback = None
 
348
        vf = GroupCompressVersionedFiles(
 
349
            _GCGraphIndex(index,
 
350
                          add_callback=add_callback,
 
351
                          parents=parents,
 
352
                          is_locked=self._pack_collection.repo.is_locked),
 
353
            access=access,
 
354
            delta=delta)
 
355
        return vf
 
356
 
 
357
    def _build_vfs(self, index_name, parents, delta):
 
358
        """Build the source and target VersionedFiles."""
 
359
        source_vf = self._build_vf(index_name, parents,
 
360
                                   delta, for_write=False)
 
361
        target_vf = self._build_vf(index_name, parents,
 
362
                                   delta, for_write=True)
 
363
        return source_vf, target_vf
 
364
 
 
365
    def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
 
366
                     pb_offset):
 
367
        trace.mutter('repacking %d %s', len(keys), message)
 
368
        self.pb.update('repacking %s' % (message,), pb_offset)
 
369
        child_pb = ui.ui_factory.nested_progress_bar()
 
370
        try:
 
371
            stream = vf_to_stream(source_vf, keys, message, child_pb)
 
372
            for _ in target_vf._insert_record_stream(stream,
 
373
                                                     random_id=True,
 
374
                                                     reuse_blocks=False):
 
375
                pass
 
376
        finally:
 
377
            child_pb.finished()
 
378
 
 
379
    def _copy_revision_texts(self):
 
380
        source_vf, target_vf = self._build_vfs('revision', True, False)
 
381
        if not self.revision_keys:
 
382
            # We are doing a full fetch, aka 'pack'
 
383
            self.revision_keys = source_vf.keys()
 
384
        self._copy_stream(source_vf, target_vf, self.revision_keys,
 
385
                          'revisions', self._get_progress_stream, 1)
 
386
 
 
387
    def _copy_inventory_texts(self):
 
388
        source_vf, target_vf = self._build_vfs('inventory', True, True)
 
389
        self._copy_stream(source_vf, target_vf, self.revision_keys,
 
390
                          'inventories', self._get_filtered_inv_stream, 2)
 
391
 
 
392
    def _copy_chk_texts(self):
 
393
        source_vf, target_vf = self._build_vfs('chk', False, False)
 
394
        # TODO: This is technically spurious... if it is a performance issue,
 
395
        #       remove it
 
396
        total_keys = source_vf.keys()
 
397
        trace.mutter('repacking chk: %d id_to_entry roots,'
 
398
                     ' %d p_id_map roots, %d total keys',
 
399
                     len(self._chk_id_roots), len(self._chk_p_id_roots),
 
400
                     len(total_keys))
 
401
        self.pb.update('repacking chk', 3)
 
402
        child_pb = ui.ui_factory.nested_progress_bar()
 
403
        try:
 
404
            for stream in self._get_chk_streams(source_vf, total_keys,
 
405
                                                pb=child_pb):
 
406
                for _ in target_vf._insert_record_stream(stream,
 
407
                                                         random_id=True,
 
408
                                                         reuse_blocks=False):
 
409
                    pass
 
410
        finally:
 
411
            child_pb.finished()
 
412
 
 
413
    def _copy_text_texts(self):
 
414
        source_vf, target_vf = self._build_vfs('text', True, True)
 
415
        # XXX: We don't walk the chk map to determine referenced (file_id,
 
416
        #      revision_id) keys.  We don't do it yet because you really need
 
417
        #      to filter out the ones that are present in the parents of the
 
418
        #      rev just before the ones you are copying, otherwise the filter
 
419
        #      is grabbing too many keys...
 
420
        text_keys = source_vf.keys()
 
421
        self._copy_stream(source_vf, target_vf, text_keys,
 
422
                          'text', self._get_progress_stream, 4)
 
423
 
 
424
    def _copy_signature_texts(self):
 
425
        source_vf, target_vf = self._build_vfs('signature', False, False)
 
426
        signature_keys = source_vf.keys()
 
427
        signature_keys.intersection(self.revision_keys)
 
428
        self._copy_stream(source_vf, target_vf, signature_keys,
 
429
                          'signatures', self._get_progress_stream, 5)
 
430
 
 
431
    def _create_pack_from_packs(self):
 
432
        self.pb.update('repacking', 0, 7)
 
433
        self.new_pack = self.open_pack()
 
434
        # Is this necessary for GC ?
 
435
        self.new_pack.set_write_cache_size(1024*1024)
 
436
        self._copy_revision_texts()
 
437
        self._copy_inventory_texts()
 
438
        self._copy_chk_texts()
 
439
        self._copy_text_texts()
 
440
        self._copy_signature_texts()
 
441
        self.new_pack._check_references()
 
442
        if not self._use_pack(self.new_pack):
 
443
            self.new_pack.abort()
 
444
            return None
 
445
        self.pb.update('finishing repack', 6, 7)
 
446
        self.new_pack.finish()
 
447
        self._pack_collection.allocate(self.new_pack)
 
448
        return self.new_pack
 
449
 
 
450
 
 
451
class GCCHKReconcilePacker(GCCHKPacker):
 
452
    """A packer which regenerates indices etc as it copies.
 
453
 
 
454
    This is used by ``bzr reconcile`` to cause parent text pointers to be
 
455
    regenerated.
 
456
    """
 
457
 
 
458
    def __init__(self, *args, **kwargs):
 
459
        super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
 
460
        self._data_changed = False
 
461
        self._gather_text_refs = True
 
462
 
 
463
    def _copy_inventory_texts(self):
 
464
        source_vf, target_vf = self._build_vfs('inventory', True, True)
 
465
        self._copy_stream(source_vf, target_vf, self.revision_keys,
 
466
                          'inventories', self._get_filtered_inv_stream, 2)
 
467
        if source_vf.keys() != self.revision_keys:
 
468
            self._data_changed = True
 
469
 
 
470
    def _copy_text_texts(self):
 
471
        """generate what texts we should have and then copy."""
 
472
        source_vf, target_vf = self._build_vfs('text', True, True)
 
473
        trace.mutter('repacking %d texts', len(self._text_refs))
 
474
        self.pb.update("repacking texts", 4)
 
475
        # we have three major tasks here:
 
476
        # 1) generate the ideal index
 
477
        repo = self._pack_collection.repo
 
478
        # We want the one we just wrote, so base it on self.new_pack
 
479
        revision_vf = self._build_vf('revision', True, False, for_write=True)
 
480
        ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
 
481
        # Strip keys back into revision_ids.
 
482
        ancestors = dict((k[0], tuple([p[0] for p in parents]))
 
483
                         for k, parents in ancestor_keys.iteritems())
 
484
        del ancestor_keys
 
485
        # TODO: _generate_text_key_index should be much cheaper to generate from
 
486
        #       a chk repository, rather than the current implementation
 
487
        ideal_index = repo._generate_text_key_index(None, ancestors)
 
488
        file_id_parent_map = source_vf.get_parent_map(self._text_refs)
 
489
        # 2) generate a keys list that contains all the entries that can
 
490
        #    be used as-is, with corrected parents.
 
491
        ok_keys = []
 
492
        new_parent_keys = {} # (key, parent_keys)
 
493
        discarded_keys = []
 
494
        NULL_REVISION = _mod_revision.NULL_REVISION
 
495
        for key in self._text_refs:
 
496
            # 0 - index
 
497
            # 1 - key
 
498
            # 2 - value
 
499
            # 3 - refs
 
500
            try:
 
501
                ideal_parents = tuple(ideal_index[key])
 
502
            except KeyError:
 
503
                discarded_keys.append(key)
 
504
                self._data_changed = True
 
505
            else:
 
506
                if ideal_parents == (NULL_REVISION,):
 
507
                    ideal_parents = ()
 
508
                source_parents = file_id_parent_map[key]
 
509
                if ideal_parents == source_parents:
 
510
                    # no change needed.
 
511
                    ok_keys.append(key)
 
512
                else:
 
513
                    # We need to change the parent graph, but we don't need to
 
514
                    # re-insert the text (since we don't pun the compression
 
515
                    # parent with the parents list)
 
516
                    self._data_changed = True
 
517
                    new_parent_keys[key] = ideal_parents
 
518
        # we're finished with some data.
 
519
        del ideal_index
 
520
        del file_id_parent_map
 
521
        # 3) bulk copy the data, updating records than need it
 
522
        def _update_parents_for_texts():
 
523
            stream = source_vf.get_record_stream(self._text_refs,
 
524
                'groupcompress', False)
 
525
            for record in stream:
 
526
                if record.key in new_parent_keys:
 
527
                    record.parents = new_parent_keys[record.key]
 
528
                yield record
 
529
        target_vf.insert_record_stream(_update_parents_for_texts())
 
530
 
 
531
    def _use_pack(self, new_pack):
 
532
        """Override _use_pack to check for reconcile having changed content."""
 
533
        return new_pack.data_inserted() and self._data_changed
 
534
 
 
535
 
 
536
class GCRepositoryPackCollection(RepositoryPackCollection):
 
537
 
 
538
    pack_factory = GCPack
 
539
 
 
540
    def _already_packed(self):
 
541
        """Is the collection already packed?"""
 
542
        # Always repack GC repositories for now
 
543
        return False
 
544
 
 
545
    def _execute_pack_operations(self, pack_operations,
 
546
                                 _packer_class=GCCHKPacker,
 
547
                                 reload_func=None):
 
548
        """Execute a series of pack operations.
 
549
 
 
550
        :param pack_operations: A list of [revision_count, packs_to_combine].
 
551
        :param _packer_class: The class of packer to use (default: Packer).
 
552
        :return: None.
 
553
        """
 
554
        # XXX: Copied across from RepositoryPackCollection simply because we
 
555
        #      want to override the _packer_class ... :(
 
556
        for revision_count, packs in pack_operations:
 
557
            # we may have no-ops from the setup logic
 
558
            if len(packs) == 0:
 
559
                continue
 
560
            packer = GCCHKPacker(self, packs, '.autopack',
 
561
                                 reload_func=reload_func)
 
562
            try:
 
563
                packer.pack()
 
564
            except errors.RetryWithNewPacks:
 
565
                # An exception is propagating out of this context, make sure
 
566
                # this packer has cleaned up. Packer() doesn't set its new_pack
 
567
                # state into the RepositoryPackCollection object, so we only
 
568
                # have access to it directly here.
 
569
                if packer.new_pack is not None:
 
570
                    packer.new_pack.abort()
 
571
                raise
 
572
            for pack in packs:
 
573
                self._remove_pack_from_memory(pack)
 
574
        # record the newly available packs and stop advertising the old
 
575
        # packs
 
576
        self._save_pack_names(clear_obsolete_packs=True)
 
577
        # Move the old packs out of the way now they are no longer referenced.
 
578
        for revision_count, packs in pack_operations:
 
579
            self._obsolete_packs(packs)
 
580
 
 
581
 
 
582
# XXX: This format is scheduled for termination
 
583
#
 
584
# class GCPackRepository(KnitPackRepository):
 
585
#     """GC customisation of KnitPackRepository."""
 
586
#
 
587
#     def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
 
588
#         _serializer):
 
589
#         """Overridden to change pack collection class."""
 
590
#         KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
 
591
#             _commit_builder_class, _serializer)
 
592
#         # and now replace everything it did :)
 
593
#         index_transport = self._transport.clone('indices')
 
594
#         self._pack_collection = GCRepositoryPackCollection(self,
 
595
#             self._transport, index_transport,
 
596
#             self._transport.clone('upload'),
 
597
#             self._transport.clone('packs'),
 
598
#             _format.index_builder_class,
 
599
#             _format.index_class,
 
600
#             use_chk_index=self._format.supports_chks,
 
601
#             )
 
602
#         self.inventories = GroupCompressVersionedFiles(
 
603
#             _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
 
604
#                 add_callback=self._pack_collection.inventory_index.add_callback,
 
605
#                 parents=True, is_locked=self.is_locked),
 
606
#             access=self._pack_collection.inventory_index.data_access)
 
607
#         self.revisions = GroupCompressVersionedFiles(
 
608
#             _GCGraphIndex(self._pack_collection.revision_index.combined_index,
 
609
#                 add_callback=self._pack_collection.revision_index.add_callback,
 
610
#                 parents=True, is_locked=self.is_locked),
 
611
#             access=self._pack_collection.revision_index.data_access,
 
612
#             delta=False)
 
613
#         self.signatures = GroupCompressVersionedFiles(
 
614
#             _GCGraphIndex(self._pack_collection.signature_index.combined_index,
 
615
#                 add_callback=self._pack_collection.signature_index.add_callback,
 
616
#                 parents=False, is_locked=self.is_locked),
 
617
#             access=self._pack_collection.signature_index.data_access,
 
618
#             delta=False)
 
619
#         self.texts = GroupCompressVersionedFiles(
 
620
#             _GCGraphIndex(self._pack_collection.text_index.combined_index,
 
621
#                 add_callback=self._pack_collection.text_index.add_callback,
 
622
#                 parents=True, is_locked=self.is_locked),
 
623
#             access=self._pack_collection.text_index.data_access)
 
624
#         if _format.supports_chks:
 
625
#             # No graph, no compression:- references from chks are between
 
626
#             # different objects not temporal versions of the same; and without
 
627
#             # some sort of temporal structure knit compression will just fail.
 
628
#             self.chk_bytes = GroupCompressVersionedFiles(
 
629
#                 _GCGraphIndex(self._pack_collection.chk_index.combined_index,
 
630
#                     add_callback=self._pack_collection.chk_index.add_callback,
 
631
#                     parents=False, is_locked=self.is_locked),
 
632
#                 access=self._pack_collection.chk_index.data_access)
 
633
#         else:
 
634
#             self.chk_bytes = None
 
635
#         # True when the repository object is 'write locked' (as opposed to the
 
636
#         # physical lock only taken out around changes to the pack-names list.)
 
637
#         # Another way to represent this would be a decorator around the control
 
638
#         # files object that presents logical locks as physical ones - if this
 
639
#         # gets ugly consider that alternative design. RBC 20071011
 
640
#         self._write_lock_count = 0
 
641
#         self._transaction = None
 
642
#         # for tests
 
643
#         self._reconcile_does_inventory_gc = True
 
644
#         self._reconcile_fixes_text_parents = True
 
645
#         self._reconcile_backsup_inventory = False
 
646
#
 
647
#     def suspend_write_group(self):
 
648
#         raise errors.UnsuspendableWriteGroup(self)
 
649
#
 
650
#     def _resume_write_group(self, tokens):
 
651
#         raise errors.UnsuspendableWriteGroup(self)
 
652
#
 
653
#     def _reconcile_pack(self, collection, packs, extension, revs, pb):
 
654
#         bork
 
655
#         return packer.pack(pb)
 
656
 
 
657
 
 
658
class GCCHKPackRepository(CHKInventoryRepository):
 
659
    """GC customisation of CHKInventoryRepository."""
 
660
 
 
661
    def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
 
662
        _serializer):
 
663
        """Overridden to change pack collection class."""
 
664
        KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
 
665
            _commit_builder_class, _serializer)
 
666
        # and now replace everything it did :)
 
667
        index_transport = self._transport.clone('indices')
 
668
        self._pack_collection = GCRepositoryPackCollection(self,
 
669
            self._transport, index_transport,
 
670
            self._transport.clone('upload'),
 
671
            self._transport.clone('packs'),
 
672
            _format.index_builder_class,
 
673
            _format.index_class,
 
674
            use_chk_index=self._format.supports_chks,
 
675
            )
 
676
        self.inventories = GroupCompressVersionedFiles(
 
677
            _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
 
678
                add_callback=self._pack_collection.inventory_index.add_callback,
 
679
                parents=True, is_locked=self.is_locked),
 
680
            access=self._pack_collection.inventory_index.data_access)
 
681
        self.revisions = GroupCompressVersionedFiles(
 
682
            _GCGraphIndex(self._pack_collection.revision_index.combined_index,
 
683
                add_callback=self._pack_collection.revision_index.add_callback,
 
684
                parents=True, is_locked=self.is_locked),
 
685
            access=self._pack_collection.revision_index.data_access,
 
686
            delta=False)
 
687
        self.signatures = GroupCompressVersionedFiles(
 
688
            _GCGraphIndex(self._pack_collection.signature_index.combined_index,
 
689
                add_callback=self._pack_collection.signature_index.add_callback,
 
690
                parents=False, is_locked=self.is_locked),
 
691
            access=self._pack_collection.signature_index.data_access,
 
692
            delta=False)
 
693
        self.texts = GroupCompressVersionedFiles(
 
694
            _GCGraphIndex(self._pack_collection.text_index.combined_index,
 
695
                add_callback=self._pack_collection.text_index.add_callback,
 
696
                parents=True, is_locked=self.is_locked),
 
697
            access=self._pack_collection.text_index.data_access)
 
698
        # No parents, individual CHK pages don't have specific ancestry
 
699
        self.chk_bytes = GroupCompressVersionedFiles(
 
700
            _GCGraphIndex(self._pack_collection.chk_index.combined_index,
 
701
                add_callback=self._pack_collection.chk_index.add_callback,
 
702
                parents=False, is_locked=self.is_locked),
 
703
            access=self._pack_collection.chk_index.data_access)
 
704
        # True when the repository object is 'write locked' (as opposed to the
 
705
        # physical lock only taken out around changes to the pack-names list.)
 
706
        # Another way to represent this would be a decorator around the control
 
707
        # files object that presents logical locks as physical ones - if this
 
708
        # gets ugly consider that alternative design. RBC 20071011
 
709
        self._write_lock_count = 0
 
710
        self._transaction = None
 
711
        # for tests
 
712
        self._reconcile_does_inventory_gc = True
 
713
        self._reconcile_fixes_text_parents = True
 
714
        self._reconcile_backsup_inventory = False
 
715
 
 
716
    def suspend_write_group(self):
 
717
        raise errors.UnsuspendableWriteGroup(self)
 
718
 
 
719
    def _resume_write_group(self, tokens):
 
720
        raise errors.UnsuspendableWriteGroup(self)
 
721
 
 
722
    def _reconcile_pack(self, collection, packs, extension, revs, pb):
 
723
        # assert revs is None
 
724
        packer = GCCHKReconcilePacker(collection, packs, extension)
 
725
        return packer.pack(pb)
 
726
 
 
727
 
 
728
# This format has been disabled for now. It is not expected that this will be a
 
729
# useful next-generation format.
 
730
#
 
731
# class RepositoryFormatPackGCPlain(RepositoryFormatKnitPack6):
 
732
#     """A B+Tree index using pack repository."""
 
733
#
 
734
#     repository_class = GCPackRepository
 
735
#     rich_root_data = False
 
736
#     # Note: We cannot unpack a delta that references a text we haven't
 
737
#     # seen yet. There are 2 options, work in fulltexts, or require
 
738
#     # topological sorting. Using fulltexts is more optimal for local
 
739
#     # operations, because the source can be smart about extracting
 
740
#     # multiple in-a-row (and sharing strings). Topological is better
 
741
#     # for remote, because we access less data.
 
742
#     _fetch_order = 'unordered'
 
743
#     _fetch_uses_deltas = False
 
744
#
 
745
#     def _get_matching_bzrdir(self):
 
746
#         return bzrdir.format_registry.make_bzrdir('gc-no-rich-root')
 
747
#
 
748
#     def _ignore_setting_bzrdir(self, format):
 
749
#         pass
 
750
#
 
751
#     _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
752
#
 
753
#     def get_format_string(self):
 
754
#         """See RepositoryFormat.get_format_string()."""
 
755
#         return ("Bazaar development format - btree+gc "
 
756
#             "(needs bzr.dev from 1.13)\n")
 
757
#
 
758
#     def get_format_description(self):
 
759
#         """See RepositoryFormat.get_format_description()."""
 
760
#         return ("Development repository format - btree+groupcompress "
 
761
#             ", interoperates with pack-0.92\n")
 
762
#
 
763
 
 
764
class RepositoryFormatPackGCCHK16(RepositoryFormatPackDevelopment5Hash16):
 
765
    """A hashed CHK+group compress pack repository."""
 
766
 
 
767
    repository_class = GCCHKPackRepository
 
768
    _commit_builder_class = PackRootCommitBuilder
 
769
    rich_root_data = True
 
770
    supports_external_lookups = True
 
771
    supports_tree_reference = True
 
772
    supports_chks = True
 
773
    # Note: We cannot unpack a delta that references a text we haven't
 
774
    # seen yet. There are 2 options, work in fulltexts, or require
 
775
    # topological sorting. Using fulltexts is more optimal for local
 
776
    # operations, because the source can be smart about extracting
 
777
    # multiple in-a-row (and sharing strings). Topological is better
 
778
    # for remote, because we access less data.
 
779
    _fetch_order = 'unordered'
 
780
    _fetch_uses_deltas = False
 
781
 
 
782
    def _get_matching_bzrdir(self):
 
783
        return bzrdir.format_registry.make_bzrdir('gc-chk16')
 
784
 
 
785
    def _ignore_setting_bzrdir(self, format):
 
786
        pass
 
787
 
 
788
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
789
 
 
790
    def get_format_string(self):
 
791
        """See RepositoryFormat.get_format_string()."""
 
792
        return ('Bazaar development format - hash16chk+gc rich-root'
 
793
                ' (needs bzr.dev from 1.13)\n')
 
794
 
 
795
    def get_format_description(self):
 
796
        """See RepositoryFormat.get_format_description()."""
 
797
        return ("Development repository format - hash16chk+groupcompress")
 
798
 
 
799
    def check_conversion_target(self, target_format):
 
800
        if not target_format.rich_root_data:
 
801
            raise errors.BadConversionTarget(
 
802
                'Does not support rich root data.', target_format)
 
803
        if not getattr(target_format, 'supports_tree_reference', False):
 
804
            raise errors.BadConversionTarget(
 
805
                'Does not support nested trees', target_format)
 
806
 
 
807
 
 
808
class RepositoryFormatPackGCCHK255(RepositoryFormatPackDevelopment5Hash255):
 
809
    """A hashed CHK+group compress pack repository."""
 
810
 
 
811
    repository_class = GCCHKPackRepository
 
812
    supports_chks = True
 
813
    # Setting this to True causes us to use InterModel1And2, so for now set
 
814
    # it to False which uses InterDifferingSerializer. When IM1&2 is
 
815
    # removed (as it is in bzr.dev) we can set this back to True.
 
816
    _commit_builder_class = PackRootCommitBuilder
 
817
    rich_root_data = True
 
818
 
 
819
    def _get_matching_bzrdir(self):
 
820
        return bzrdir.format_registry.make_bzrdir('gc-chk255')
 
821
 
 
822
    def _ignore_setting_bzrdir(self, format):
 
823
        pass
 
824
 
 
825
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
826
 
 
827
    def get_format_string(self):
 
828
        """See RepositoryFormat.get_format_string()."""
 
829
        return ('Bazaar development format - hash255chk+gc rich-root'
 
830
                ' (needs bzr.dev from 1.13)\n')
 
831
 
 
832
    def get_format_description(self):
 
833
        """See RepositoryFormat.get_format_description()."""
 
834
        return ("Development repository format - hash255chk+groupcompress")
 
835
 
 
836
    def check_conversion_target(self, target_format):
 
837
        if not target_format.rich_root_data:
 
838
            raise errors.BadConversionTarget(
 
839
                'Does not support rich root data.', target_format)
 
840
        if not getattr(target_format, 'supports_tree_reference', False):
 
841
            raise errors.BadConversionTarget(
 
842
                'Does not support nested trees', target_format)
 
843
 
 
844
 
 
845
class RepositoryFormatPackGCCHK255Big(RepositoryFormatPackGCCHK255):
 
846
    """A hashed CHK+group compress pack repository."""
 
847
 
 
848
    repository_class = GCCHKPackRepository
 
849
    supports_chks = True
 
850
    # For right now, setting this to True gives us InterModel1And2 rather
 
851
    # than InterDifferingSerializer
 
852
    _commit_builder_class = PackRootCommitBuilder
 
853
    rich_root_data = True
 
854
    _serializer = chk_serializer.chk_serializer_255_bigpage
 
855
    # Note: We cannot unpack a delta that references a text we haven't
 
856
    # seen yet. There are 2 options, work in fulltexts, or require
 
857
    # topological sorting. Using fulltexts is more optimal for local
 
858
    # operations, because the source can be smart about extracting
 
859
    # multiple in-a-row (and sharing strings). Topological is better
 
860
    # for remote, because we access less data.
 
861
    _fetch_order = 'unordered'
 
862
    _fetch_uses_deltas = False
 
863
 
 
864
    def _get_matching_bzrdir(self):
 
865
        return bzrdir.format_registry.make_bzrdir('gc-chk255-big')
 
866
 
 
867
    def _ignore_setting_bzrdir(self, format):
 
868
        pass
 
869
 
 
870
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
871
 
 
872
    def get_format_string(self):
 
873
        """See RepositoryFormat.get_format_string()."""
 
874
        return ('Bazaar development format - hash255chk+gc rich-root bigpage'
 
875
                ' (needs bzr.dev from 1.13)\n')
 
876
 
 
877
    def get_format_description(self):
 
878
        """See RepositoryFormat.get_format_description()."""
 
879
        return ("Development repository format - hash255chk+groupcompress + bigpage")
 
880
 
 
881
    def check_conversion_target(self, target_format):
 
882
        if not target_format.rich_root_data:
 
883
            raise errors.BadConversionTarget(
 
884
                'Does not support rich root data.', target_format)
 
885
        if not getattr(target_format, 'supports_tree_reference', False):
 
886
            raise errors.BadConversionTarget(
 
887
                'Does not support nested trees', target_format)