~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/groupcompress_repo.py

create thread for bbc

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Repostory formats using B+Tree indices and groupcompress compression."""
 
18
 
 
19
import time
 
20
 
 
21
from bzrlib import (
 
22
    bzrdir,
 
23
    chk_map,
 
24
    chk_serializer,
 
25
    debug,
 
26
    errors,
 
27
    index as _mod_index,
 
28
    inventory,
 
29
    knit,
 
30
    osutils,
 
31
    pack,
 
32
    repository,
 
33
    revision as _mod_revision,
 
34
    trace,
 
35
    ui,
 
36
    )
 
37
from bzrlib.index import GraphIndex, GraphIndexBuilder
 
38
from bzrlib.groupcompress import (
 
39
    _GCGraphIndex,
 
40
    GroupCompressVersionedFiles,
 
41
    )
 
42
from bzrlib.repofmt.pack_repo import (
 
43
    Pack,
 
44
    NewPack,
 
45
    KnitPackRepository,
 
46
    PackRootCommitBuilder,
 
47
    RepositoryPackCollection,
 
48
    RepositoryFormatKnitPack6,
 
49
    Packer,
 
50
    CHKInventoryRepository,
 
51
    RepositoryFormatPackDevelopment5Hash16,
 
52
    RepositoryFormatPackDevelopment5Hash255,
 
53
    )
 
54
 
 
55
 
 
56
 
 
57
class GCPack(NewPack):
 
58
 
 
59
    def __init__(self, pack_collection, upload_suffix='', file_mode=None):
 
60
        """Create a NewPack instance.
 
61
 
 
62
        :param pack_collection: A PackCollection into which this is being
 
63
            inserted.
 
64
        :param upload_suffix: An optional suffix to be given to any temporary
 
65
            files created during the pack creation. e.g '.autopack'
 
66
        :param file_mode: An optional file mode to create the new files with.
 
67
        """
 
68
        # replaced from NewPack to:
 
69
        # - change inventory reference list length to 1
 
70
        # - change texts reference lists to 1
 
71
        # TODO: patch this to be parameterised
 
72
 
 
73
        # The relative locations of the packs are constrained, but all are
 
74
        # passed in because the caller has them, so as to avoid object churn.
 
75
        index_builder_class = pack_collection._index_builder_class
 
76
        # from brisbane-core
 
77
        if pack_collection.chk_index is not None:
 
78
            chk_index = index_builder_class(reference_lists=0)
 
79
        else:
 
80
            chk_index = None
 
81
        Pack.__init__(self,
 
82
            # Revisions: parents list, no text compression.
 
83
            index_builder_class(reference_lists=1),
 
84
            # Inventory: We want to map compression only, but currently the
 
85
            # knit code hasn't been updated enough to understand that, so we
 
86
            # have a regular 2-list index giving parents and compression
 
87
            # source.
 
88
            index_builder_class(reference_lists=1),
 
89
            # Texts: compression and per file graph, for all fileids - so two
 
90
            # reference lists and two elements in the key tuple.
 
91
            index_builder_class(reference_lists=1, key_elements=2),
 
92
            # Signatures: Just blobs to store, no compression, no parents
 
93
            # listing.
 
94
            index_builder_class(reference_lists=0),
 
95
            # CHK based storage - just blobs, no compression or parents.
 
96
            chk_index=chk_index
 
97
            )
 
98
        self._pack_collection = pack_collection
 
99
        # When we make readonly indices, we need this.
 
100
        self.index_class = pack_collection._index_class
 
101
        # where should the new pack be opened
 
102
        self.upload_transport = pack_collection._upload_transport
 
103
        # where are indices written out to
 
104
        self.index_transport = pack_collection._index_transport
 
105
        # where is the pack renamed to when it is finished?
 
106
        self.pack_transport = pack_collection._pack_transport
 
107
        # What file mode to upload the pack and indices with.
 
108
        self._file_mode = file_mode
 
109
        # tracks the content written to the .pack file.
 
110
        self._hash = osutils.md5()
 
111
        # a four-tuple with the length in bytes of the indices, once the pack
 
112
        # is finalised. (rev, inv, text, sigs)
 
113
        self.index_sizes = None
 
114
        # How much data to cache when writing packs. Note that this is not
 
115
        # synchronised with reads, because it's not in the transport layer, so
 
116
        # is not safe unless the client knows it won't be reading from the pack
 
117
        # under creation.
 
118
        self._cache_limit = 0
 
119
        # the temporary pack file name.
 
120
        self.random_name = osutils.rand_chars(20) + upload_suffix
 
121
        # when was this pack started ?
 
122
        self.start_time = time.time()
 
123
        # open an output stream for the data added to the pack.
 
124
        self.write_stream = self.upload_transport.open_write_stream(
 
125
            self.random_name, mode=self._file_mode)
 
126
        if 'pack' in debug.debug_flags:
 
127
            trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
 
128
                time.ctime(), self.upload_transport.base, self.random_name,
 
129
                time.time() - self.start_time)
 
130
        # A list of byte sequences to be written to the new pack, and the
 
131
        # aggregate size of them.  Stored as a list rather than separate
 
132
        # variables so that the _write_data closure below can update them.
 
133
        self._buffer = [[], 0]
 
134
        # create a callable for adding data
 
135
        #
 
136
        # robertc says- this is a closure rather than a method on the object
 
137
        # so that the variables are locals, and faster than accessing object
 
138
        # members.
 
139
        def _write_data(bytes, flush=False, _buffer=self._buffer,
 
140
            _write=self.write_stream.write, _update=self._hash.update):
 
141
            _buffer[0].append(bytes)
 
142
            _buffer[1] += len(bytes)
 
143
            # buffer cap
 
144
            if _buffer[1] > self._cache_limit or flush:
 
145
                bytes = ''.join(_buffer[0])
 
146
                _write(bytes)
 
147
                _update(bytes)
 
148
                _buffer[:] = [[], 0]
 
149
        # expose this on self, for the occasion when clients want to add data.
 
150
        self._write_data = _write_data
 
151
        # a pack writer object to serialise pack records.
 
152
        self._writer = pack.ContainerWriter(self._write_data)
 
153
        self._writer.begin()
 
154
        # what state is the pack in? (open, finished, aborted)
 
155
        self._state = 'open'
 
156
 
 
157
    def _check_references(self):
 
158
        """Make sure our external references are present.
 
159
 
 
160
        Packs are allowed to have deltas whose base is not in the pack, but it
 
161
        must be present somewhere in this collection.  It is not allowed to
 
162
        have deltas based on a fallback repository.
 
163
        (See <https://bugs.launchpad.net/bzr/+bug/288751>)
 
164
        """
 
165
        # Groupcompress packs don't have any external references
 
166
 
 
167
 
 
168
class GCCHKPacker(Packer):
 
169
    """This class understand what it takes to collect a GCCHK repo."""
 
170
 
 
171
    def __init__(self, pack_collection, packs, suffix, revision_ids=None,
 
172
                 reload_func=None):
 
173
        super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
 
174
                                          revision_ids=revision_ids,
 
175
                                          reload_func=reload_func)
 
176
        self._pack_collection = pack_collection
 
177
        # ATM, We only support this for GCCHK repositories
 
178
        assert pack_collection.chk_index is not None
 
179
        self._gather_text_refs = False
 
180
        self._chk_id_roots = []
 
181
        self._chk_p_id_roots = []
 
182
        self._text_refs = None
 
183
        # set by .pack() if self.revision_ids is not None
 
184
        self.revision_keys = None
 
185
 
 
186
    def _get_progress_stream(self, source_vf, keys, message, pb):
 
187
        def pb_stream():
 
188
            substream = source_vf.get_record_stream(keys, 'groupcompress', True)
 
189
            for idx, record in enumerate(substream):
 
190
                if pb is not None:
 
191
                    pb.update(message, idx + 1, len(keys))
 
192
                yield record
 
193
        return pb_stream()
 
194
 
 
195
    def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
 
196
        """Filter the texts of inventories, to find the chk pages."""
 
197
        total_keys = len(keys)
 
198
        def _filtered_inv_stream():
 
199
            id_roots_set = set()
 
200
            p_id_roots_set = set()
 
201
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
 
202
            for idx, record in enumerate(stream):
 
203
                bytes = record.get_bytes_as('fulltext')
 
204
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
 
205
                                                             record.key)
 
206
                if pb is not None:
 
207
                    pb.update('inv', idx, total_keys)
 
208
                key = chk_inv.id_to_entry.key()
 
209
                if key not in id_roots_set:
 
210
                    self._chk_id_roots.append(key)
 
211
                    id_roots_set.add(key)
 
212
                p_id_map = chk_inv.parent_id_basename_to_file_id
 
213
                assert p_id_map is not None
 
214
                key = p_id_map.key()
 
215
                if key not in p_id_roots_set:
 
216
                    p_id_roots_set.add(key)
 
217
                    self._chk_p_id_roots.append(key)
 
218
                yield record
 
219
            # We have finished processing all of the inventory records, we
 
220
            # don't need these sets anymore
 
221
            id_roots_set.clear()
 
222
            p_id_roots_set.clear()
 
223
        return _filtered_inv_stream()
 
224
 
 
225
    def _get_chk_streams(self, source_vf, keys, pb=None):
 
226
        # We want to stream the keys from 'id_roots', and things they
 
227
        # reference, and then stream things from p_id_roots and things they
 
228
        # reference, and then any remaining keys that we didn't get to.
 
229
 
 
230
        # We also group referenced texts together, so if one root references a
 
231
        # text with prefix 'a', and another root references a node with prefix
 
232
        # 'a', we want to yield those nodes before we yield the nodes for 'b'
 
233
        # This keeps 'similar' nodes together.
 
234
 
 
235
        # Note: We probably actually want multiple streams here, to help the
 
236
        #       client understand that the different levels won't compress well
 
237
        #       against each other.
 
238
        #       Test the difference between using one Group per level, and
 
239
        #       using 1 Group per prefix. (so '' (root) would get a group, then
 
240
        #       all the references to search-key 'a' would get a group, etc.)
 
241
        total_keys = len(keys)
 
242
        remaining_keys = set(keys)
 
243
        counter = [0]
 
244
        if self._gather_text_refs:
 
245
            # Just to get _bytes_to_entry, so we don't care about the
 
246
            # search_key_name
 
247
            inv = inventory.CHKInventory(None)
 
248
            self._text_refs = set()
 
249
        def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
 
250
            cur_keys = root_keys
 
251
            while cur_keys:
 
252
                keys_by_search_prefix = {}
 
253
                remaining_keys.difference_update(cur_keys)
 
254
                next_keys = set()
 
255
                def handle_internal_node(node):
 
256
                    for prefix, value in node._items.iteritems():
 
257
                        # We don't want to request the same key twice, and we
 
258
                        # want to order it by the first time it is seen.
 
259
                        # Even further, we don't want to request a key which is
 
260
                        # not in this group of pack files (it should be in the
 
261
                        # repo, but it doesn't have to be in the group being
 
262
                        # packed.)
 
263
                        # TODO: consider how to treat externally referenced chk
 
264
                        #       pages as 'external_references' so that we
 
265
                        #       always fill them in for stacked branches
 
266
                        if value not in next_keys and value in remaining_keys:
 
267
                            keys_by_search_prefix.setdefault(prefix,
 
268
                                []).append(value)
 
269
                            next_keys.add(value)
 
270
                def handle_leaf_node(node):
 
271
                    # Store is None, because we know we have a LeafNode, and we
 
272
                    # just want its entries
 
273
                    for file_id, bytes in node.iteritems(None):
 
274
                        entry = inv._bytes_to_entry(bytes)
 
275
                        self._text_refs.add((entry.file_id, entry.revision))
 
276
                def next_stream():
 
277
                    stream = source_vf.get_record_stream(cur_keys,
 
278
                                                         'as-requested', True)
 
279
                    for record in stream:
 
280
                        bytes = record.get_bytes_as('fulltext')
 
281
                        # We don't care about search_key_func for this code,
 
282
                        # because we only care about external references.
 
283
                        node = chk_map._deserialise(bytes, record.key,
 
284
                                                    search_key_func=None)
 
285
                        common_base = node._search_prefix
 
286
                        if isinstance(node, chk_map.InternalNode):
 
287
                            handle_internal_node(node)
 
288
                        elif parse_leaf_nodes:
 
289
                            handle_leaf_node(node)
 
290
                        counter[0] += 1
 
291
                        if pb is not None:
 
292
                            pb.update('chk node', counter[0], total_keys)
 
293
                        yield record
 
294
                yield next_stream()
 
295
                # Double check that we won't be emitting any keys twice
 
296
                # If we get rid of the pre-calculation of all keys, we could
 
297
                # turn this around and do
 
298
                # next_keys.difference_update(seen_keys)
 
299
                # However, we also may have references to chk pages in another
 
300
                # pack file during autopack. We filter earlier, so we should no
 
301
                # longer need to do this
 
302
                # next_keys = next_keys.intersection(remaining_keys)
 
303
                cur_keys = []
 
304
                for prefix in sorted(keys_by_search_prefix):
 
305
                    cur_keys.extend(keys_by_search_prefix.pop(prefix))
 
306
        for stream in _get_referenced_stream(self._chk_id_roots,
 
307
                                             self._gather_text_refs):
 
308
            yield stream
 
309
        del self._chk_id_roots
 
310
        # while it isn't really possible for chk_id_roots to not be in the
 
311
        # local group of packs, it is possible that the tree shape has not
 
312
        # changed recently, so we need to filter _chk_p_id_roots by the
 
313
        # available keys
 
314
        chk_p_id_roots = [key for key in self._chk_p_id_roots
 
315
                          if key in remaining_keys]
 
316
        del self._chk_p_id_roots
 
317
        for stream in _get_referenced_stream(chk_p_id_roots, False):
 
318
            yield stream
 
319
        if remaining_keys:
 
320
            trace.mutter('There were %d keys in the chk index, %d of which'
 
321
                         ' were not referenced', total_keys,
 
322
                         len(remaining_keys))
 
323
            if self.revision_ids is None:
 
324
                stream = source_vf.get_record_stream(remaining_keys,
 
325
                                                     'unordered', True)
 
326
                yield stream
 
327
 
 
328
    def _build_vf(self, index_name, parents, delta, for_write=False):
 
329
        """Build a VersionedFiles instance on top of this group of packs."""
 
330
        index_name = index_name + '_index'
 
331
        index_to_pack = {}
 
332
        access = knit._DirectPackAccess(index_to_pack)
 
333
        if for_write:
 
334
            # Use new_pack
 
335
            assert self.new_pack is not None
 
336
            index = getattr(self.new_pack, index_name)
 
337
            index_to_pack[index] = self.new_pack.access_tuple()
 
338
            index.set_optimize(for_size=True)
 
339
            access.set_writer(self.new_pack._writer, index,
 
340
                              self.new_pack.access_tuple())
 
341
            add_callback = index.add_nodes
 
342
        else:
 
343
            indices = []
 
344
            for pack in self.packs:
 
345
                sub_index = getattr(pack, index_name)
 
346
                index_to_pack[sub_index] = pack.access_tuple()
 
347
                indices.append(sub_index)
 
348
            index = _mod_index.CombinedGraphIndex(indices)
 
349
            add_callback = None
 
350
        vf = GroupCompressVersionedFiles(
 
351
            _GCGraphIndex(index,
 
352
                          add_callback=add_callback,
 
353
                          parents=parents,
 
354
                          is_locked=self._pack_collection.repo.is_locked),
 
355
            access=access,
 
356
            delta=delta)
 
357
        return vf
 
358
 
 
359
    def _build_vfs(self, index_name, parents, delta):
 
360
        """Build the source and target VersionedFiles."""
 
361
        source_vf = self._build_vf(index_name, parents,
 
362
                                   delta, for_write=False)
 
363
        target_vf = self._build_vf(index_name, parents,
 
364
                                   delta, for_write=True)
 
365
        return source_vf, target_vf
 
366
 
 
367
    def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
 
368
                     pb_offset):
 
369
        trace.mutter('repacking %d %s', len(keys), message)
 
370
        self.pb.update('repacking %s' % (message,), pb_offset)
 
371
        child_pb = ui.ui_factory.nested_progress_bar()
 
372
        try:
 
373
            stream = vf_to_stream(source_vf, keys, message, child_pb)
 
374
            for _ in target_vf._insert_record_stream(stream,
 
375
                                                     random_id=True,
 
376
                                                     reuse_blocks=False):
 
377
                pass
 
378
        finally:
 
379
            child_pb.finished()
 
380
 
 
381
    def _copy_revision_texts(self):
 
382
        source_vf, target_vf = self._build_vfs('revision', True, False)
 
383
        if not self.revision_keys:
 
384
            # We are doing a full fetch, aka 'pack'
 
385
            self.revision_keys = source_vf.keys()
 
386
        self._copy_stream(source_vf, target_vf, self.revision_keys,
 
387
                          'revisions', self._get_progress_stream, 1)
 
388
 
 
389
    def _copy_inventory_texts(self):
 
390
        source_vf, target_vf = self._build_vfs('inventory', True, True)
 
391
        self._copy_stream(source_vf, target_vf, self.revision_keys,
 
392
                          'inventories', self._get_filtered_inv_stream, 2)
 
393
 
 
394
    def _copy_chk_texts(self):
 
395
        source_vf, target_vf = self._build_vfs('chk', False, False)
 
396
        # TODO: This is technically spurious... if it is a performance issue,
 
397
        #       remove it
 
398
        total_keys = source_vf.keys()
 
399
        trace.mutter('repacking chk: %d id_to_entry roots,'
 
400
                     ' %d p_id_map roots, %d total keys',
 
401
                     len(self._chk_id_roots), len(self._chk_p_id_roots),
 
402
                     len(total_keys))
 
403
        self.pb.update('repacking chk', 3)
 
404
        child_pb = ui.ui_factory.nested_progress_bar()
 
405
        try:
 
406
            for stream in self._get_chk_streams(source_vf, total_keys,
 
407
                                                pb=child_pb):
 
408
                for _ in target_vf._insert_record_stream(stream,
 
409
                                                         random_id=True,
 
410
                                                         reuse_blocks=False):
 
411
                    pass
 
412
        finally:
 
413
            child_pb.finished()
 
414
 
 
415
    def _copy_text_texts(self):
 
416
        source_vf, target_vf = self._build_vfs('text', True, True)
 
417
        # XXX: We don't walk the chk map to determine referenced (file_id,
 
418
        #      revision_id) keys.  We don't do it yet because you really need
 
419
        #      to filter out the ones that are present in the parents of the
 
420
        #      rev just before the ones you are copying, otherwise the filter
 
421
        #      is grabbing too many keys...
 
422
        text_keys = source_vf.keys()
 
423
        self._copy_stream(source_vf, target_vf, text_keys,
 
424
                          'text', self._get_progress_stream, 4)
 
425
 
 
426
    def _copy_signature_texts(self):
 
427
        source_vf, target_vf = self._build_vfs('signature', False, False)
 
428
        signature_keys = source_vf.keys()
 
429
        signature_keys.intersection(self.revision_keys)
 
430
        self._copy_stream(source_vf, target_vf, signature_keys,
 
431
                          'signatures', self._get_progress_stream, 5)
 
432
 
 
433
    def _create_pack_from_packs(self):
 
434
        self.pb.update('repacking', 0, 7)
 
435
        self.new_pack = self.open_pack()
 
436
        # Is this necessary for GC ?
 
437
        self.new_pack.set_write_cache_size(1024*1024)
 
438
        self._copy_revision_texts()
 
439
        self._copy_inventory_texts()
 
440
        self._copy_chk_texts()
 
441
        self._copy_text_texts()
 
442
        self._copy_signature_texts()
 
443
        self.new_pack._check_references()
 
444
        if not self._use_pack(self.new_pack):
 
445
            self.new_pack.abort()
 
446
            return None
 
447
        self.pb.update('finishing repack', 6, 7)
 
448
        self.new_pack.finish()
 
449
        self._pack_collection.allocate(self.new_pack)
 
450
        return self.new_pack
 
451
 
 
452
 
 
453
class GCCHKReconcilePacker(GCCHKPacker):
 
454
    """A packer which regenerates indices etc as it copies.
 
455
 
 
456
    This is used by ``bzr reconcile`` to cause parent text pointers to be
 
457
    regenerated.
 
458
    """
 
459
 
 
460
    def __init__(self, *args, **kwargs):
 
461
        super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
 
462
        self._data_changed = False
 
463
        self._gather_text_refs = True
 
464
 
 
465
    def _copy_inventory_texts(self):
 
466
        source_vf, target_vf = self._build_vfs('inventory', True, True)
 
467
        self._copy_stream(source_vf, target_vf, self.revision_keys,
 
468
                          'inventories', self._get_filtered_inv_stream, 2)
 
469
        if source_vf.keys() != self.revision_keys:
 
470
            self._data_changed = True
 
471
 
 
472
    def _copy_text_texts(self):
 
473
        """generate what texts we should have and then copy."""
 
474
        source_vf, target_vf = self._build_vfs('text', True, True)
 
475
        trace.mutter('repacking %d texts', len(self._text_refs))
 
476
        self.pb.update("repacking texts", 4)
 
477
        # we have three major tasks here:
 
478
        # 1) generate the ideal index
 
479
        repo = self._pack_collection.repo
 
480
        # We want the one we just wrote, so base it on self.new_pack
 
481
        revision_vf = self._build_vf('revision', True, False, for_write=True)
 
482
        ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
 
483
        # Strip keys back into revision_ids.
 
484
        ancestors = dict((k[0], tuple([p[0] for p in parents]))
 
485
                         for k, parents in ancestor_keys.iteritems())
 
486
        del ancestor_keys
 
487
        # TODO: _generate_text_key_index should be much cheaper to generate from
 
488
        #       a chk repository, rather than the current implementation
 
489
        ideal_index = repo._generate_text_key_index(None, ancestors)
 
490
        file_id_parent_map = source_vf.get_parent_map(self._text_refs)
 
491
        # 2) generate a keys list that contains all the entries that can
 
492
        #    be used as-is, with corrected parents.
 
493
        ok_keys = []
 
494
        new_parent_keys = {} # (key, parent_keys)
 
495
        discarded_keys = []
 
496
        NULL_REVISION = _mod_revision.NULL_REVISION
 
497
        for key in self._text_refs:
 
498
            # 0 - index
 
499
            # 1 - key
 
500
            # 2 - value
 
501
            # 3 - refs
 
502
            try:
 
503
                ideal_parents = tuple(ideal_index[key])
 
504
            except KeyError:
 
505
                discarded_keys.append(key)
 
506
                self._data_changed = True
 
507
            else:
 
508
                if ideal_parents == (NULL_REVISION,):
 
509
                    ideal_parents = ()
 
510
                source_parents = file_id_parent_map[key]
 
511
                if ideal_parents == source_parents:
 
512
                    # no change needed.
 
513
                    ok_keys.append(key)
 
514
                else:
 
515
                    # We need to change the parent graph, but we don't need to
 
516
                    # re-insert the text (since we don't pun the compression
 
517
                    # parent with the parents list)
 
518
                    self._data_changed = True
 
519
                    new_parent_keys[key] = ideal_parents
 
520
        # we're finished with some data.
 
521
        del ideal_index
 
522
        del file_id_parent_map
 
523
        # 3) bulk copy the data, updating records than need it
 
524
        def _update_parents_for_texts():
 
525
            stream = source_vf.get_record_stream(self._text_refs,
 
526
                'groupcompress', False)
 
527
            for record in stream:
 
528
                if record.key in new_parent_keys:
 
529
                    record.parents = new_parent_keys[record.key]
 
530
                yield record
 
531
        target_vf.insert_record_stream(_update_parents_for_texts())
 
532
 
 
533
    def _use_pack(self, new_pack):
 
534
        """Override _use_pack to check for reconcile having changed content."""
 
535
        return new_pack.data_inserted() and self._data_changed
 
536
 
 
537
 
 
538
class GCRepositoryPackCollection(RepositoryPackCollection):
 
539
 
 
540
    pack_factory = GCPack
 
541
 
 
542
    def _already_packed(self):
 
543
        """Is the collection already packed?"""
 
544
        # Always repack GC repositories for now
 
545
        return False
 
546
 
 
547
    def _execute_pack_operations(self, pack_operations,
 
548
                                 _packer_class=GCCHKPacker,
 
549
                                 reload_func=None):
 
550
        """Execute a series of pack operations.
 
551
 
 
552
        :param pack_operations: A list of [revision_count, packs_to_combine].
 
553
        :param _packer_class: The class of packer to use (default: Packer).
 
554
        :return: None.
 
555
        """
 
556
        # XXX: Copied across from RepositoryPackCollection simply because we
 
557
        #      want to override the _packer_class ... :(
 
558
        for revision_count, packs in pack_operations:
 
559
            # we may have no-ops from the setup logic
 
560
            if len(packs) == 0:
 
561
                continue
 
562
            packer = GCCHKPacker(self, packs, '.autopack',
 
563
                                 reload_func=reload_func)
 
564
            try:
 
565
                packer.pack()
 
566
            except errors.RetryWithNewPacks:
 
567
                # An exception is propagating out of this context, make sure
 
568
                # this packer has cleaned up. Packer() doesn't set its new_pack
 
569
                # state into the RepositoryPackCollection object, so we only
 
570
                # have access to it directly here.
 
571
                if packer.new_pack is not None:
 
572
                    packer.new_pack.abort()
 
573
                raise
 
574
            for pack in packs:
 
575
                self._remove_pack_from_memory(pack)
 
576
        # record the newly available packs and stop advertising the old
 
577
        # packs
 
578
        self._save_pack_names(clear_obsolete_packs=True)
 
579
        # Move the old packs out of the way now they are no longer referenced.
 
580
        for revision_count, packs in pack_operations:
 
581
            self._obsolete_packs(packs)
 
582
 
 
583
 
 
584
# XXX: This format is scheduled for termination
 
585
#
 
586
# class GCPackRepository(KnitPackRepository):
 
587
#     """GC customisation of KnitPackRepository."""
 
588
#
 
589
#     def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
 
590
#         _serializer):
 
591
#         """Overridden to change pack collection class."""
 
592
#         KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
 
593
#             _commit_builder_class, _serializer)
 
594
#         # and now replace everything it did :)
 
595
#         index_transport = self._transport.clone('indices')
 
596
#         self._pack_collection = GCRepositoryPackCollection(self,
 
597
#             self._transport, index_transport,
 
598
#             self._transport.clone('upload'),
 
599
#             self._transport.clone('packs'),
 
600
#             _format.index_builder_class,
 
601
#             _format.index_class,
 
602
#             use_chk_index=self._format.supports_chks,
 
603
#             )
 
604
#         self.inventories = GroupCompressVersionedFiles(
 
605
#             _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
 
606
#                 add_callback=self._pack_collection.inventory_index.add_callback,
 
607
#                 parents=True, is_locked=self.is_locked),
 
608
#             access=self._pack_collection.inventory_index.data_access)
 
609
#         self.revisions = GroupCompressVersionedFiles(
 
610
#             _GCGraphIndex(self._pack_collection.revision_index.combined_index,
 
611
#                 add_callback=self._pack_collection.revision_index.add_callback,
 
612
#                 parents=True, is_locked=self.is_locked),
 
613
#             access=self._pack_collection.revision_index.data_access,
 
614
#             delta=False)
 
615
#         self.signatures = GroupCompressVersionedFiles(
 
616
#             _GCGraphIndex(self._pack_collection.signature_index.combined_index,
 
617
#                 add_callback=self._pack_collection.signature_index.add_callback,
 
618
#                 parents=False, is_locked=self.is_locked),
 
619
#             access=self._pack_collection.signature_index.data_access,
 
620
#             delta=False)
 
621
#         self.texts = GroupCompressVersionedFiles(
 
622
#             _GCGraphIndex(self._pack_collection.text_index.combined_index,
 
623
#                 add_callback=self._pack_collection.text_index.add_callback,
 
624
#                 parents=True, is_locked=self.is_locked),
 
625
#             access=self._pack_collection.text_index.data_access)
 
626
#         if _format.supports_chks:
 
627
#             # No graph, no compression:- references from chks are between
 
628
#             # different objects not temporal versions of the same; and without
 
629
#             # some sort of temporal structure knit compression will just fail.
 
630
#             self.chk_bytes = GroupCompressVersionedFiles(
 
631
#                 _GCGraphIndex(self._pack_collection.chk_index.combined_index,
 
632
#                     add_callback=self._pack_collection.chk_index.add_callback,
 
633
#                     parents=False, is_locked=self.is_locked),
 
634
#                 access=self._pack_collection.chk_index.data_access)
 
635
#         else:
 
636
#             self.chk_bytes = None
 
637
#         # True when the repository object is 'write locked' (as opposed to the
 
638
#         # physical lock only taken out around changes to the pack-names list.)
 
639
#         # Another way to represent this would be a decorator around the control
 
640
#         # files object that presents logical locks as physical ones - if this
 
641
#         # gets ugly consider that alternative design. RBC 20071011
 
642
#         self._write_lock_count = 0
 
643
#         self._transaction = None
 
644
#         # for tests
 
645
#         self._reconcile_does_inventory_gc = True
 
646
#         self._reconcile_fixes_text_parents = True
 
647
#         self._reconcile_backsup_inventory = False
 
648
#
 
649
#     def suspend_write_group(self):
 
650
#         raise errors.UnsuspendableWriteGroup(self)
 
651
#
 
652
#     def _resume_write_group(self, tokens):
 
653
#         raise errors.UnsuspendableWriteGroup(self)
 
654
#
 
655
#     def _reconcile_pack(self, collection, packs, extension, revs, pb):
 
656
#         bork
 
657
#         return packer.pack(pb)
 
658
 
 
659
 
 
660
class GCCHKPackRepository(CHKInventoryRepository):
 
661
    """GC customisation of CHKInventoryRepository."""
 
662
 
 
663
    def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
 
664
        _serializer):
 
665
        """Overridden to change pack collection class."""
 
666
        KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
 
667
            _commit_builder_class, _serializer)
 
668
        # and now replace everything it did :)
 
669
        index_transport = self._transport.clone('indices')
 
670
        self._pack_collection = GCRepositoryPackCollection(self,
 
671
            self._transport, index_transport,
 
672
            self._transport.clone('upload'),
 
673
            self._transport.clone('packs'),
 
674
            _format.index_builder_class,
 
675
            _format.index_class,
 
676
            use_chk_index=self._format.supports_chks,
 
677
            )
 
678
        self.inventories = GroupCompressVersionedFiles(
 
679
            _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
 
680
                add_callback=self._pack_collection.inventory_index.add_callback,
 
681
                parents=True, is_locked=self.is_locked),
 
682
            access=self._pack_collection.inventory_index.data_access)
 
683
        self.revisions = GroupCompressVersionedFiles(
 
684
            _GCGraphIndex(self._pack_collection.revision_index.combined_index,
 
685
                add_callback=self._pack_collection.revision_index.add_callback,
 
686
                parents=True, is_locked=self.is_locked),
 
687
            access=self._pack_collection.revision_index.data_access,
 
688
            delta=False)
 
689
        self.signatures = GroupCompressVersionedFiles(
 
690
            _GCGraphIndex(self._pack_collection.signature_index.combined_index,
 
691
                add_callback=self._pack_collection.signature_index.add_callback,
 
692
                parents=False, is_locked=self.is_locked),
 
693
            access=self._pack_collection.signature_index.data_access,
 
694
            delta=False)
 
695
        self.texts = GroupCompressVersionedFiles(
 
696
            _GCGraphIndex(self._pack_collection.text_index.combined_index,
 
697
                add_callback=self._pack_collection.text_index.add_callback,
 
698
                parents=True, is_locked=self.is_locked),
 
699
            access=self._pack_collection.text_index.data_access)
 
700
        # No parents, individual CHK pages don't have specific ancestry
 
701
        self.chk_bytes = GroupCompressVersionedFiles(
 
702
            _GCGraphIndex(self._pack_collection.chk_index.combined_index,
 
703
                add_callback=self._pack_collection.chk_index.add_callback,
 
704
                parents=False, is_locked=self.is_locked),
 
705
            access=self._pack_collection.chk_index.data_access)
 
706
        # True when the repository object is 'write locked' (as opposed to the
 
707
        # physical lock only taken out around changes to the pack-names list.)
 
708
        # Another way to represent this would be a decorator around the control
 
709
        # files object that presents logical locks as physical ones - if this
 
710
        # gets ugly consider that alternative design. RBC 20071011
 
711
        self._write_lock_count = 0
 
712
        self._transaction = None
 
713
        # for tests
 
714
        self._reconcile_does_inventory_gc = True
 
715
        self._reconcile_fixes_text_parents = True
 
716
        self._reconcile_backsup_inventory = False
 
717
 
 
718
    def suspend_write_group(self):
 
719
        raise errors.UnsuspendableWriteGroup(self)
 
720
 
 
721
    def _resume_write_group(self, tokens):
 
722
        raise errors.UnsuspendableWriteGroup(self)
 
723
 
 
724
    def _reconcile_pack(self, collection, packs, extension, revs, pb):
 
725
        # assert revs is None
 
726
        packer = GCCHKReconcilePacker(collection, packs, extension)
 
727
        return packer.pack(pb)
 
728
 
 
729
 
 
730
# This format has been disabled for now. It is not expected that this will be a
 
731
# useful next-generation format.
 
732
#
 
733
# class RepositoryFormatPackGCPlain(RepositoryFormatKnitPack6):
 
734
#     """A B+Tree index using pack repository."""
 
735
#
 
736
#     repository_class = GCPackRepository
 
737
#     rich_root_data = False
 
738
#     # Note: We cannot unpack a delta that references a text we haven't
 
739
#     # seen yet. There are 2 options, work in fulltexts, or require
 
740
#     # topological sorting. Using fulltexts is more optimal for local
 
741
#     # operations, because the source can be smart about extracting
 
742
#     # multiple in-a-row (and sharing strings). Topological is better
 
743
#     # for remote, because we access less data.
 
744
#     _fetch_order = 'unordered'
 
745
#     _fetch_uses_deltas = False
 
746
#
 
747
#     def _get_matching_bzrdir(self):
 
748
#         return bzrdir.format_registry.make_bzrdir('gc-no-rich-root')
 
749
#
 
750
#     def _ignore_setting_bzrdir(self, format):
 
751
#         pass
 
752
#
 
753
#     _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
754
#
 
755
#     def get_format_string(self):
 
756
#         """See RepositoryFormat.get_format_string()."""
 
757
#         return ("Bazaar development format - btree+gc "
 
758
#             "(needs bzr.dev from 1.13)\n")
 
759
#
 
760
#     def get_format_description(self):
 
761
#         """See RepositoryFormat.get_format_description()."""
 
762
#         return ("Development repository format - btree+groupcompress "
 
763
#             ", interoperates with pack-0.92\n")
 
764
#
 
765
 
 
766
class RepositoryFormatPackGCCHK16(RepositoryFormatPackDevelopment5Hash16):
 
767
    """A hashed CHK+group compress pack repository."""
 
768
 
 
769
    repository_class = GCCHKPackRepository
 
770
    _commit_builder_class = PackRootCommitBuilder
 
771
    rich_root_data = True
 
772
    supports_external_lookups = True
 
773
    supports_tree_reference = True
 
774
    supports_chks = True
 
775
    # Note: We cannot unpack a delta that references a text we haven't
 
776
    # seen yet. There are 2 options, work in fulltexts, or require
 
777
    # topological sorting. Using fulltexts is more optimal for local
 
778
    # operations, because the source can be smart about extracting
 
779
    # multiple in-a-row (and sharing strings). Topological is better
 
780
    # for remote, because we access less data.
 
781
    _fetch_order = 'unordered'
 
782
    _fetch_uses_deltas = False
 
783
 
 
784
    def _get_matching_bzrdir(self):
 
785
        return bzrdir.format_registry.make_bzrdir('gc-chk16')
 
786
 
 
787
    def _ignore_setting_bzrdir(self, format):
 
788
        pass
 
789
 
 
790
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
791
 
 
792
    def get_format_string(self):
 
793
        """See RepositoryFormat.get_format_string()."""
 
794
        return ('Bazaar development format - hash16chk+gc rich-root'
 
795
                ' (needs bzr.dev from 1.13)\n')
 
796
 
 
797
    def get_format_description(self):
 
798
        """See RepositoryFormat.get_format_description()."""
 
799
        return ("Development repository format - hash16chk+groupcompress")
 
800
 
 
801
    def check_conversion_target(self, target_format):
 
802
        if not target_format.rich_root_data:
 
803
            raise errors.BadConversionTarget(
 
804
                'Does not support rich root data.', target_format)
 
805
        if not getattr(target_format, 'supports_tree_reference', False):
 
806
            raise errors.BadConversionTarget(
 
807
                'Does not support nested trees', target_format)
 
808
 
 
809
 
 
810
class RepositoryFormatPackGCCHK255(RepositoryFormatPackDevelopment5Hash255):
 
811
    """A hashed CHK+group compress pack repository."""
 
812
 
 
813
    repository_class = GCCHKPackRepository
 
814
    supports_chks = True
 
815
    # Setting this to True causes us to use InterModel1And2, so for now set
 
816
    # it to False which uses InterDifferingSerializer. When IM1&2 is
 
817
    # removed (as it is in bzr.dev) we can set this back to True.
 
818
    _commit_builder_class = PackRootCommitBuilder
 
819
    rich_root_data = True
 
820
 
 
821
    def _get_matching_bzrdir(self):
 
822
        return bzrdir.format_registry.make_bzrdir('gc-chk255')
 
823
 
 
824
    def _ignore_setting_bzrdir(self, format):
 
825
        pass
 
826
 
 
827
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
828
 
 
829
    def get_format_string(self):
 
830
        """See RepositoryFormat.get_format_string()."""
 
831
        return ('Bazaar development format - hash255chk+gc rich-root'
 
832
                ' (needs bzr.dev from 1.13)\n')
 
833
 
 
834
    def get_format_description(self):
 
835
        """See RepositoryFormat.get_format_description()."""
 
836
        return ("Development repository format - hash255chk+groupcompress")
 
837
 
 
838
    def check_conversion_target(self, target_format):
 
839
        if not target_format.rich_root_data:
 
840
            raise errors.BadConversionTarget(
 
841
                'Does not support rich root data.', target_format)
 
842
        if not getattr(target_format, 'supports_tree_reference', False):
 
843
            raise errors.BadConversionTarget(
 
844
                'Does not support nested trees', target_format)
 
845
 
 
846
 
 
847
class RepositoryFormatPackGCCHK255Big(RepositoryFormatPackGCCHK255):
 
848
    """A hashed CHK+group compress pack repository."""
 
849
 
 
850
    repository_class = GCCHKPackRepository
 
851
    supports_chks = True
 
852
    # For right now, setting this to True gives us InterModel1And2 rather
 
853
    # than InterDifferingSerializer
 
854
    _commit_builder_class = PackRootCommitBuilder
 
855
    rich_root_data = True
 
856
    _serializer = chk_serializer.chk_serializer_255_bigpage
 
857
    # Note: We cannot unpack a delta that references a text we haven't
 
858
    # seen yet. There are 2 options, work in fulltexts, or require
 
859
    # topological sorting. Using fulltexts is more optimal for local
 
860
    # operations, because the source can be smart about extracting
 
861
    # multiple in-a-row (and sharing strings). Topological is better
 
862
    # for remote, because we access less data.
 
863
    _fetch_order = 'unordered'
 
864
    _fetch_uses_deltas = False
 
865
 
 
866
    def _get_matching_bzrdir(self):
 
867
        return bzrdir.format_registry.make_bzrdir('gc-chk255-big')
 
868
 
 
869
    def _ignore_setting_bzrdir(self, format):
 
870
        pass
 
871
 
 
872
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
 
873
 
 
874
    def get_format_string(self):
 
875
        """See RepositoryFormat.get_format_string()."""
 
876
        return ('Bazaar development format - hash255chk+gc rich-root bigpage'
 
877
                ' (needs bzr.dev from 1.13)\n')
 
878
 
 
879
    def get_format_description(self):
 
880
        """See RepositoryFormat.get_format_description()."""
 
881
        return ("Development repository format - hash255chk+groupcompress + bigpage")
 
882
 
 
883
    def check_conversion_target(self, target_format):
 
884
        if not target_format.rich_root_data:
 
885
            raise errors.BadConversionTarget(
 
886
                'Does not support rich root data.', target_format)
 
887
        if not getattr(target_format, 'supports_tree_reference', False):
 
888
            raise errors.BadConversionTarget(
 
889
                'Does not support nested trees', target_format)