~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/groupcompress_repo.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-03-16 14:01:20 UTC
  • mfrom: (3280.2.5 integration)
  • Revision ID: pqm@pqm.ubuntu.com-20080316140120-i3yq8yr1l66m11h7
Start 1.4 development

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Repository formats using CHK inventories and groupcompress compression."""
18
 
 
19
 
import time
20
 
 
21
 
from bzrlib import (
22
 
    bzrdir,
23
 
    chk_map,
24
 
    chk_serializer,
25
 
    debug,
26
 
    errors,
27
 
    index as _mod_index,
28
 
    inventory,
29
 
    knit,
30
 
    osutils,
31
 
    pack,
32
 
    remote,
33
 
    repository,
34
 
    revision as _mod_revision,
35
 
    trace,
36
 
    ui,
37
 
    )
38
 
from bzrlib.btree_index import (
39
 
    BTreeGraphIndex,
40
 
    BTreeBuilder,
41
 
    )
42
 
from bzrlib.index import GraphIndex, GraphIndexBuilder
43
 
from bzrlib.groupcompress import (
44
 
    _GCGraphIndex,
45
 
    GroupCompressVersionedFiles,
46
 
    )
47
 
from bzrlib.repofmt.pack_repo import (
48
 
    Pack,
49
 
    NewPack,
50
 
    KnitPackRepository,
51
 
    KnitPackStreamSource,
52
 
    PackRootCommitBuilder,
53
 
    RepositoryPackCollection,
54
 
    RepositoryFormatPack,
55
 
    ResumedPack,
56
 
    Packer,
57
 
    )
58
 
 
59
 
 
60
 
class GCPack(NewPack):
61
 
 
62
 
    def __init__(self, pack_collection, upload_suffix='', file_mode=None):
63
 
        """Create a NewPack instance.
64
 
 
65
 
        :param pack_collection: A PackCollection into which this is being
66
 
            inserted.
67
 
        :param upload_suffix: An optional suffix to be given to any temporary
68
 
            files created during the pack creation. e.g '.autopack'
69
 
        :param file_mode: An optional file mode to create the new files with.
70
 
        """
71
 
        # replaced from NewPack to:
72
 
        # - change inventory reference list length to 1
73
 
        # - change texts reference lists to 1
74
 
        # TODO: patch this to be parameterised
75
 
 
76
 
        # The relative locations of the packs are constrained, but all are
77
 
        # passed in because the caller has them, so as to avoid object churn.
78
 
        index_builder_class = pack_collection._index_builder_class
79
 
        # from brisbane-core
80
 
        if pack_collection.chk_index is not None:
81
 
            chk_index = index_builder_class(reference_lists=0)
82
 
        else:
83
 
            chk_index = None
84
 
        Pack.__init__(self,
85
 
            # Revisions: parents list, no text compression.
86
 
            index_builder_class(reference_lists=1),
87
 
            # Inventory: We want to map compression only, but currently the
88
 
            # knit code hasn't been updated enough to understand that, so we
89
 
            # have a regular 2-list index giving parents and compression
90
 
            # source.
91
 
            index_builder_class(reference_lists=1),
92
 
            # Texts: per file graph, for all fileids - so one reference list
93
 
            # and two elements in the key tuple.
94
 
            index_builder_class(reference_lists=1, key_elements=2),
95
 
            # Signatures: Just blobs to store, no compression, no parents
96
 
            # listing.
97
 
            index_builder_class(reference_lists=0),
98
 
            # CHK based storage - just blobs, no compression or parents.
99
 
            chk_index=chk_index
100
 
            )
101
 
        self._pack_collection = pack_collection
102
 
        # When we make readonly indices, we need this.
103
 
        self.index_class = pack_collection._index_class
104
 
        # where should the new pack be opened
105
 
        self.upload_transport = pack_collection._upload_transport
106
 
        # where are indices written out to
107
 
        self.index_transport = pack_collection._index_transport
108
 
        # where is the pack renamed to when it is finished?
109
 
        self.pack_transport = pack_collection._pack_transport
110
 
        # What file mode to upload the pack and indices with.
111
 
        self._file_mode = file_mode
112
 
        # tracks the content written to the .pack file.
113
 
        self._hash = osutils.md5()
114
 
        # a four-tuple with the length in bytes of the indices, once the pack
115
 
        # is finalised. (rev, inv, text, sigs)
116
 
        self.index_sizes = None
117
 
        # How much data to cache when writing packs. Note that this is not
118
 
        # synchronised with reads, because it's not in the transport layer, so
119
 
        # is not safe unless the client knows it won't be reading from the pack
120
 
        # under creation.
121
 
        self._cache_limit = 0
122
 
        # the temporary pack file name.
123
 
        self.random_name = osutils.rand_chars(20) + upload_suffix
124
 
        # when was this pack started ?
125
 
        self.start_time = time.time()
126
 
        # open an output stream for the data added to the pack.
127
 
        self.write_stream = self.upload_transport.open_write_stream(
128
 
            self.random_name, mode=self._file_mode)
129
 
        if 'pack' in debug.debug_flags:
130
 
            trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
131
 
                time.ctime(), self.upload_transport.base, self.random_name,
132
 
                time.time() - self.start_time)
133
 
        # A list of byte sequences to be written to the new pack, and the
134
 
        # aggregate size of them.  Stored as a list rather than separate
135
 
        # variables so that the _write_data closure below can update them.
136
 
        self._buffer = [[], 0]
137
 
        # create a callable for adding data
138
 
        #
139
 
        # robertc says- this is a closure rather than a method on the object
140
 
        # so that the variables are locals, and faster than accessing object
141
 
        # members.
142
 
        def _write_data(bytes, flush=False, _buffer=self._buffer,
143
 
            _write=self.write_stream.write, _update=self._hash.update):
144
 
            _buffer[0].append(bytes)
145
 
            _buffer[1] += len(bytes)
146
 
            # buffer cap
147
 
            if _buffer[1] > self._cache_limit or flush:
148
 
                bytes = ''.join(_buffer[0])
149
 
                _write(bytes)
150
 
                _update(bytes)
151
 
                _buffer[:] = [[], 0]
152
 
        # expose this on self, for the occasion when clients want to add data.
153
 
        self._write_data = _write_data
154
 
        # a pack writer object to serialise pack records.
155
 
        self._writer = pack.ContainerWriter(self._write_data)
156
 
        self._writer.begin()
157
 
        # what state is the pack in? (open, finished, aborted)
158
 
        self._state = 'open'
159
 
 
160
 
    def _check_references(self):
161
 
        """Make sure our external references are present.
162
 
 
163
 
        Packs are allowed to have deltas whose base is not in the pack, but it
164
 
        must be present somewhere in this collection.  It is not allowed to
165
 
        have deltas based on a fallback repository.
166
 
        (See <https://bugs.launchpad.net/bzr/+bug/288751>)
167
 
        """
168
 
        # Groupcompress packs don't have any external references, arguably CHK
169
 
        # pages have external references, but we cannot 'cheaply' determine
170
 
        # them without actually walking all of the chk pages.
171
 
 
172
 
 
173
 
class ResumedGCPack(ResumedPack):
174
 
 
175
 
    def _check_references(self):
176
 
        """Make sure our external compression parents are present."""
177
 
        # See GCPack._check_references for why this is empty
178
 
 
179
 
    def _get_external_refs(self, index):
180
 
        # GC repositories don't have compression parents external to a given
181
 
        # pack file
182
 
        return set()
183
 
 
184
 
 
185
 
class GCCHKPacker(Packer):
186
 
    """This class understand what it takes to collect a GCCHK repo."""
187
 
 
188
 
    def __init__(self, pack_collection, packs, suffix, revision_ids=None,
189
 
                 reload_func=None):
190
 
        super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
191
 
                                          revision_ids=revision_ids,
192
 
                                          reload_func=reload_func)
193
 
        self._pack_collection = pack_collection
194
 
        # ATM, We only support this for GCCHK repositories
195
 
        if pack_collection.chk_index is None:
196
 
            raise AssertionError('pack_collection.chk_index should not be None')
197
 
        self._gather_text_refs = False
198
 
        self._chk_id_roots = []
199
 
        self._chk_p_id_roots = []
200
 
        self._text_refs = None
201
 
        # set by .pack() if self.revision_ids is not None
202
 
        self.revision_keys = None
203
 
 
204
 
    def _get_progress_stream(self, source_vf, keys, message, pb):
205
 
        def pb_stream():
206
 
            substream = source_vf.get_record_stream(keys, 'groupcompress', True)
207
 
            for idx, record in enumerate(substream):
208
 
                if pb is not None:
209
 
                    pb.update(message, idx + 1, len(keys))
210
 
                yield record
211
 
        return pb_stream()
212
 
 
213
 
    def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
214
 
        """Filter the texts of inventories, to find the chk pages."""
215
 
        total_keys = len(keys)
216
 
        def _filtered_inv_stream():
217
 
            id_roots_set = set()
218
 
            p_id_roots_set = set()
219
 
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
220
 
            for idx, record in enumerate(stream):
221
 
                bytes = record.get_bytes_as('fulltext')
222
 
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
223
 
                                                             record.key)
224
 
                if pb is not None:
225
 
                    pb.update('inv', idx, total_keys)
226
 
                key = chk_inv.id_to_entry.key()
227
 
                if key not in id_roots_set:
228
 
                    self._chk_id_roots.append(key)
229
 
                    id_roots_set.add(key)
230
 
                p_id_map = chk_inv.parent_id_basename_to_file_id
231
 
                if p_id_map is None:
232
 
                    raise AssertionError('Parent id -> file_id map not set')
233
 
                key = p_id_map.key()
234
 
                if key not in p_id_roots_set:
235
 
                    p_id_roots_set.add(key)
236
 
                    self._chk_p_id_roots.append(key)
237
 
                yield record
238
 
            # We have finished processing all of the inventory records, we
239
 
            # don't need these sets anymore
240
 
            id_roots_set.clear()
241
 
            p_id_roots_set.clear()
242
 
        return _filtered_inv_stream()
243
 
 
244
 
    def _get_chk_streams(self, source_vf, keys, pb=None):
245
 
        # We want to stream the keys from 'id_roots', and things they
246
 
        # reference, and then stream things from p_id_roots and things they
247
 
        # reference, and then any remaining keys that we didn't get to.
248
 
 
249
 
        # We also group referenced texts together, so if one root references a
250
 
        # text with prefix 'a', and another root references a node with prefix
251
 
        # 'a', we want to yield those nodes before we yield the nodes for 'b'
252
 
        # This keeps 'similar' nodes together.
253
 
 
254
 
        # Note: We probably actually want multiple streams here, to help the
255
 
        #       client understand that the different levels won't compress well
256
 
        #       against each other.
257
 
        #       Test the difference between using one Group per level, and
258
 
        #       using 1 Group per prefix. (so '' (root) would get a group, then
259
 
        #       all the references to search-key 'a' would get a group, etc.)
260
 
        total_keys = len(keys)
261
 
        remaining_keys = set(keys)
262
 
        counter = [0]
263
 
        if self._gather_text_refs:
264
 
            bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
265
 
            self._text_refs = set()
266
 
        def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
267
 
            cur_keys = root_keys
268
 
            while cur_keys:
269
 
                keys_by_search_prefix = {}
270
 
                remaining_keys.difference_update(cur_keys)
271
 
                next_keys = set()
272
 
                def handle_internal_node(node):
273
 
                    for prefix, value in node._items.iteritems():
274
 
                        # We don't want to request the same key twice, and we
275
 
                        # want to order it by the first time it is seen.
276
 
                        # Even further, we don't want to request a key which is
277
 
                        # not in this group of pack files (it should be in the
278
 
                        # repo, but it doesn't have to be in the group being
279
 
                        # packed.)
280
 
                        # TODO: consider how to treat externally referenced chk
281
 
                        #       pages as 'external_references' so that we
282
 
                        #       always fill them in for stacked branches
283
 
                        if value not in next_keys and value in remaining_keys:
284
 
                            keys_by_search_prefix.setdefault(prefix,
285
 
                                []).append(value)
286
 
                            next_keys.add(value)
287
 
                def handle_leaf_node(node):
288
 
                    # Store is None, because we know we have a LeafNode, and we
289
 
                    # just want its entries
290
 
                    for file_id, bytes in node.iteritems(None):
291
 
                        name_utf8, file_id, revision_id = bytes_to_info(bytes)
292
 
                        self._text_refs.add((file_id, revision_id))
293
 
                def next_stream():
294
 
                    stream = source_vf.get_record_stream(cur_keys,
295
 
                                                         'as-requested', True)
296
 
                    for record in stream:
297
 
                        bytes = record.get_bytes_as('fulltext')
298
 
                        # We don't care about search_key_func for this code,
299
 
                        # because we only care about external references.
300
 
                        node = chk_map._deserialise(bytes, record.key,
301
 
                                                    search_key_func=None)
302
 
                        common_base = node._search_prefix
303
 
                        if isinstance(node, chk_map.InternalNode):
304
 
                            handle_internal_node(node)
305
 
                        elif parse_leaf_nodes:
306
 
                            handle_leaf_node(node)
307
 
                        counter[0] += 1
308
 
                        if pb is not None:
309
 
                            pb.update('chk node', counter[0], total_keys)
310
 
                        yield record
311
 
                yield next_stream()
312
 
                # Double check that we won't be emitting any keys twice
313
 
                # If we get rid of the pre-calculation of all keys, we could
314
 
                # turn this around and do
315
 
                # next_keys.difference_update(seen_keys)
316
 
                # However, we also may have references to chk pages in another
317
 
                # pack file during autopack. We filter earlier, so we should no
318
 
                # longer need to do this
319
 
                # next_keys = next_keys.intersection(remaining_keys)
320
 
                cur_keys = []
321
 
                for prefix in sorted(keys_by_search_prefix):
322
 
                    cur_keys.extend(keys_by_search_prefix.pop(prefix))
323
 
        for stream in _get_referenced_stream(self._chk_id_roots,
324
 
                                             self._gather_text_refs):
325
 
            yield stream
326
 
        del self._chk_id_roots
327
 
        # while it isn't really possible for chk_id_roots to not be in the
328
 
        # local group of packs, it is possible that the tree shape has not
329
 
        # changed recently, so we need to filter _chk_p_id_roots by the
330
 
        # available keys
331
 
        chk_p_id_roots = [key for key in self._chk_p_id_roots
332
 
                          if key in remaining_keys]
333
 
        del self._chk_p_id_roots
334
 
        for stream in _get_referenced_stream(chk_p_id_roots, False):
335
 
            yield stream
336
 
        if remaining_keys:
337
 
            trace.mutter('There were %d keys in the chk index, %d of which'
338
 
                         ' were not referenced', total_keys,
339
 
                         len(remaining_keys))
340
 
            if self.revision_ids is None:
341
 
                stream = source_vf.get_record_stream(remaining_keys,
342
 
                                                     'unordered', True)
343
 
                yield stream
344
 
 
345
 
    def _build_vf(self, index_name, parents, delta, for_write=False):
346
 
        """Build a VersionedFiles instance on top of this group of packs."""
347
 
        index_name = index_name + '_index'
348
 
        index_to_pack = {}
349
 
        access = knit._DirectPackAccess(index_to_pack)
350
 
        if for_write:
351
 
            # Use new_pack
352
 
            if self.new_pack is None:
353
 
                raise AssertionError('No new pack has been set')
354
 
            index = getattr(self.new_pack, index_name)
355
 
            index_to_pack[index] = self.new_pack.access_tuple()
356
 
            index.set_optimize(for_size=True)
357
 
            access.set_writer(self.new_pack._writer, index,
358
 
                              self.new_pack.access_tuple())
359
 
            add_callback = index.add_nodes
360
 
        else:
361
 
            indices = []
362
 
            for pack in self.packs:
363
 
                sub_index = getattr(pack, index_name)
364
 
                index_to_pack[sub_index] = pack.access_tuple()
365
 
                indices.append(sub_index)
366
 
            index = _mod_index.CombinedGraphIndex(indices)
367
 
            add_callback = None
368
 
        vf = GroupCompressVersionedFiles(
369
 
            _GCGraphIndex(index,
370
 
                          add_callback=add_callback,
371
 
                          parents=parents,
372
 
                          is_locked=self._pack_collection.repo.is_locked),
373
 
            access=access,
374
 
            delta=delta)
375
 
        return vf
376
 
 
377
 
    def _build_vfs(self, index_name, parents, delta):
378
 
        """Build the source and target VersionedFiles."""
379
 
        source_vf = self._build_vf(index_name, parents,
380
 
                                   delta, for_write=False)
381
 
        target_vf = self._build_vf(index_name, parents,
382
 
                                   delta, for_write=True)
383
 
        return source_vf, target_vf
384
 
 
385
 
    def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
386
 
                     pb_offset):
387
 
        trace.mutter('repacking %d %s', len(keys), message)
388
 
        self.pb.update('repacking %s' % (message,), pb_offset)
389
 
        child_pb = ui.ui_factory.nested_progress_bar()
390
 
        try:
391
 
            stream = vf_to_stream(source_vf, keys, message, child_pb)
392
 
            for _ in target_vf._insert_record_stream(stream,
393
 
                                                     random_id=True,
394
 
                                                     reuse_blocks=False):
395
 
                pass
396
 
        finally:
397
 
            child_pb.finished()
398
 
 
399
 
    def _copy_revision_texts(self):
400
 
        source_vf, target_vf = self._build_vfs('revision', True, False)
401
 
        if not self.revision_keys:
402
 
            # We are doing a full fetch, aka 'pack'
403
 
            self.revision_keys = source_vf.keys()
404
 
        self._copy_stream(source_vf, target_vf, self.revision_keys,
405
 
                          'revisions', self._get_progress_stream, 1)
406
 
 
407
 
    def _copy_inventory_texts(self):
408
 
        source_vf, target_vf = self._build_vfs('inventory', True, True)
409
 
        self._copy_stream(source_vf, target_vf, self.revision_keys,
410
 
                          'inventories', self._get_filtered_inv_stream, 2)
411
 
 
412
 
    def _copy_chk_texts(self):
413
 
        source_vf, target_vf = self._build_vfs('chk', False, False)
414
 
        # TODO: This is technically spurious... if it is a performance issue,
415
 
        #       remove it
416
 
        total_keys = source_vf.keys()
417
 
        trace.mutter('repacking chk: %d id_to_entry roots,'
418
 
                     ' %d p_id_map roots, %d total keys',
419
 
                     len(self._chk_id_roots), len(self._chk_p_id_roots),
420
 
                     len(total_keys))
421
 
        self.pb.update('repacking chk', 3)
422
 
        child_pb = ui.ui_factory.nested_progress_bar()
423
 
        try:
424
 
            for stream in self._get_chk_streams(source_vf, total_keys,
425
 
                                                pb=child_pb):
426
 
                for _ in target_vf._insert_record_stream(stream,
427
 
                                                         random_id=True,
428
 
                                                         reuse_blocks=False):
429
 
                    pass
430
 
        finally:
431
 
            child_pb.finished()
432
 
 
433
 
    def _copy_text_texts(self):
434
 
        source_vf, target_vf = self._build_vfs('text', True, True)
435
 
        # XXX: We don't walk the chk map to determine referenced (file_id,
436
 
        #      revision_id) keys.  We don't do it yet because you really need
437
 
        #      to filter out the ones that are present in the parents of the
438
 
        #      rev just before the ones you are copying, otherwise the filter
439
 
        #      is grabbing too many keys...
440
 
        text_keys = source_vf.keys()
441
 
        self._copy_stream(source_vf, target_vf, text_keys,
442
 
                          'text', self._get_progress_stream, 4)
443
 
 
444
 
    def _copy_signature_texts(self):
445
 
        source_vf, target_vf = self._build_vfs('signature', False, False)
446
 
        signature_keys = source_vf.keys()
447
 
        signature_keys.intersection(self.revision_keys)
448
 
        self._copy_stream(source_vf, target_vf, signature_keys,
449
 
                          'signatures', self._get_progress_stream, 5)
450
 
 
451
 
    def _create_pack_from_packs(self):
452
 
        self.pb.update('repacking', 0, 7)
453
 
        self.new_pack = self.open_pack()
454
 
        # Is this necessary for GC ?
455
 
        self.new_pack.set_write_cache_size(1024*1024)
456
 
        self._copy_revision_texts()
457
 
        self._copy_inventory_texts()
458
 
        self._copy_chk_texts()
459
 
        self._copy_text_texts()
460
 
        self._copy_signature_texts()
461
 
        self.new_pack._check_references()
462
 
        if not self._use_pack(self.new_pack):
463
 
            self.new_pack.abort()
464
 
            return None
465
 
        self.pb.update('finishing repack', 6, 7)
466
 
        self.new_pack.finish()
467
 
        self._pack_collection.allocate(self.new_pack)
468
 
        return self.new_pack
469
 
 
470
 
 
471
 
class GCCHKReconcilePacker(GCCHKPacker):
472
 
    """A packer which regenerates indices etc as it copies.
473
 
 
474
 
    This is used by ``bzr reconcile`` to cause parent text pointers to be
475
 
    regenerated.
476
 
    """
477
 
 
478
 
    def __init__(self, *args, **kwargs):
479
 
        super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
480
 
        self._data_changed = False
481
 
        self._gather_text_refs = True
482
 
 
483
 
    def _copy_inventory_texts(self):
484
 
        source_vf, target_vf = self._build_vfs('inventory', True, True)
485
 
        self._copy_stream(source_vf, target_vf, self.revision_keys,
486
 
                          'inventories', self._get_filtered_inv_stream, 2)
487
 
        if source_vf.keys() != self.revision_keys:
488
 
            self._data_changed = True
489
 
 
490
 
    def _copy_text_texts(self):
491
 
        """generate what texts we should have and then copy."""
492
 
        source_vf, target_vf = self._build_vfs('text', True, True)
493
 
        trace.mutter('repacking %d texts', len(self._text_refs))
494
 
        self.pb.update("repacking texts", 4)
495
 
        # we have three major tasks here:
496
 
        # 1) generate the ideal index
497
 
        repo = self._pack_collection.repo
498
 
        # We want the one we just wrote, so base it on self.new_pack
499
 
        revision_vf = self._build_vf('revision', True, False, for_write=True)
500
 
        ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
501
 
        # Strip keys back into revision_ids.
502
 
        ancestors = dict((k[0], tuple([p[0] for p in parents]))
503
 
                         for k, parents in ancestor_keys.iteritems())
504
 
        del ancestor_keys
505
 
        # TODO: _generate_text_key_index should be much cheaper to generate from
506
 
        #       a chk repository, rather than the current implementation
507
 
        ideal_index = repo._generate_text_key_index(None, ancestors)
508
 
        file_id_parent_map = source_vf.get_parent_map(self._text_refs)
509
 
        # 2) generate a keys list that contains all the entries that can
510
 
        #    be used as-is, with corrected parents.
511
 
        ok_keys = []
512
 
        new_parent_keys = {} # (key, parent_keys)
513
 
        discarded_keys = []
514
 
        NULL_REVISION = _mod_revision.NULL_REVISION
515
 
        for key in self._text_refs:
516
 
            # 0 - index
517
 
            # 1 - key
518
 
            # 2 - value
519
 
            # 3 - refs
520
 
            try:
521
 
                ideal_parents = tuple(ideal_index[key])
522
 
            except KeyError:
523
 
                discarded_keys.append(key)
524
 
                self._data_changed = True
525
 
            else:
526
 
                if ideal_parents == (NULL_REVISION,):
527
 
                    ideal_parents = ()
528
 
                source_parents = file_id_parent_map[key]
529
 
                if ideal_parents == source_parents:
530
 
                    # no change needed.
531
 
                    ok_keys.append(key)
532
 
                else:
533
 
                    # We need to change the parent graph, but we don't need to
534
 
                    # re-insert the text (since we don't pun the compression
535
 
                    # parent with the parents list)
536
 
                    self._data_changed = True
537
 
                    new_parent_keys[key] = ideal_parents
538
 
        # we're finished with some data.
539
 
        del ideal_index
540
 
        del file_id_parent_map
541
 
        # 3) bulk copy the data, updating records than need it
542
 
        def _update_parents_for_texts():
543
 
            stream = source_vf.get_record_stream(self._text_refs,
544
 
                'groupcompress', False)
545
 
            for record in stream:
546
 
                if record.key in new_parent_keys:
547
 
                    record.parents = new_parent_keys[record.key]
548
 
                yield record
549
 
        target_vf.insert_record_stream(_update_parents_for_texts())
550
 
 
551
 
    def _use_pack(self, new_pack):
552
 
        """Override _use_pack to check for reconcile having changed content."""
553
 
        return new_pack.data_inserted() and self._data_changed
554
 
 
555
 
 
556
 
class GCRepositoryPackCollection(RepositoryPackCollection):
557
 
 
558
 
    pack_factory = GCPack
559
 
    resumed_pack_factory = ResumedGCPack
560
 
 
561
 
    def _already_packed(self):
562
 
        """Is the collection already packed?"""
563
 
        # Always repack GC repositories for now
564
 
        return False
565
 
 
566
 
    def _execute_pack_operations(self, pack_operations,
567
 
                                 _packer_class=GCCHKPacker,
568
 
                                 reload_func=None):
569
 
        """Execute a series of pack operations.
570
 
 
571
 
        :param pack_operations: A list of [revision_count, packs_to_combine].
572
 
        :param _packer_class: The class of packer to use (default: Packer).
573
 
        :return: None.
574
 
        """
575
 
        # XXX: Copied across from RepositoryPackCollection simply because we
576
 
        #      want to override the _packer_class ... :(
577
 
        for revision_count, packs in pack_operations:
578
 
            # we may have no-ops from the setup logic
579
 
            if len(packs) == 0:
580
 
                continue
581
 
            packer = GCCHKPacker(self, packs, '.autopack',
582
 
                                 reload_func=reload_func)
583
 
            try:
584
 
                packer.pack()
585
 
            except errors.RetryWithNewPacks:
586
 
                # An exception is propagating out of this context, make sure
587
 
                # this packer has cleaned up. Packer() doesn't set its new_pack
588
 
                # state into the RepositoryPackCollection object, so we only
589
 
                # have access to it directly here.
590
 
                if packer.new_pack is not None:
591
 
                    packer.new_pack.abort()
592
 
                raise
593
 
            for pack in packs:
594
 
                self._remove_pack_from_memory(pack)
595
 
        # record the newly available packs and stop advertising the old
596
 
        # packs
597
 
        self._save_pack_names(clear_obsolete_packs=True)
598
 
        # Move the old packs out of the way now they are no longer referenced.
599
 
        for revision_count, packs in pack_operations:
600
 
            self._obsolete_packs(packs)
601
 
 
602
 
 
603
 
class CHKInventoryRepository(KnitPackRepository):
604
 
    """subclass of KnitPackRepository that uses CHK based inventories."""
605
 
 
606
 
    def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
607
 
        _serializer):
608
 
        """Overridden to change pack collection class."""
609
 
        KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
610
 
            _commit_builder_class, _serializer)
611
 
        # and now replace everything it did :)
612
 
        index_transport = self._transport.clone('indices')
613
 
        self._pack_collection = GCRepositoryPackCollection(self,
614
 
            self._transport, index_transport,
615
 
            self._transport.clone('upload'),
616
 
            self._transport.clone('packs'),
617
 
            _format.index_builder_class,
618
 
            _format.index_class,
619
 
            use_chk_index=self._format.supports_chks,
620
 
            )
621
 
        self.inventories = GroupCompressVersionedFiles(
622
 
            _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
623
 
                add_callback=self._pack_collection.inventory_index.add_callback,
624
 
                parents=True, is_locked=self.is_locked),
625
 
            access=self._pack_collection.inventory_index.data_access)
626
 
        self.revisions = GroupCompressVersionedFiles(
627
 
            _GCGraphIndex(self._pack_collection.revision_index.combined_index,
628
 
                add_callback=self._pack_collection.revision_index.add_callback,
629
 
                parents=True, is_locked=self.is_locked,
630
 
                track_external_parent_refs=True),
631
 
            access=self._pack_collection.revision_index.data_access,
632
 
            delta=False)
633
 
        self.signatures = GroupCompressVersionedFiles(
634
 
            _GCGraphIndex(self._pack_collection.signature_index.combined_index,
635
 
                add_callback=self._pack_collection.signature_index.add_callback,
636
 
                parents=False, is_locked=self.is_locked),
637
 
            access=self._pack_collection.signature_index.data_access,
638
 
            delta=False)
639
 
        self.texts = GroupCompressVersionedFiles(
640
 
            _GCGraphIndex(self._pack_collection.text_index.combined_index,
641
 
                add_callback=self._pack_collection.text_index.add_callback,
642
 
                parents=True, is_locked=self.is_locked),
643
 
            access=self._pack_collection.text_index.data_access)
644
 
        # No parents, individual CHK pages don't have specific ancestry
645
 
        self.chk_bytes = GroupCompressVersionedFiles(
646
 
            _GCGraphIndex(self._pack_collection.chk_index.combined_index,
647
 
                add_callback=self._pack_collection.chk_index.add_callback,
648
 
                parents=False, is_locked=self.is_locked),
649
 
            access=self._pack_collection.chk_index.data_access)
650
 
        # True when the repository object is 'write locked' (as opposed to the
651
 
        # physical lock only taken out around changes to the pack-names list.)
652
 
        # Another way to represent this would be a decorator around the control
653
 
        # files object that presents logical locks as physical ones - if this
654
 
        # gets ugly consider that alternative design. RBC 20071011
655
 
        self._write_lock_count = 0
656
 
        self._transaction = None
657
 
        # for tests
658
 
        self._reconcile_does_inventory_gc = True
659
 
        self._reconcile_fixes_text_parents = True
660
 
        self._reconcile_backsup_inventory = False
661
 
 
662
 
    def _add_inventory_checked(self, revision_id, inv, parents):
663
 
        """Add inv to the repository after checking the inputs.
664
 
 
665
 
        This function can be overridden to allow different inventory styles.
666
 
 
667
 
        :seealso: add_inventory, for the contract.
668
 
        """
669
 
        # make inventory
670
 
        serializer = self._format._serializer
671
 
        result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
672
 
            maximum_size=serializer.maximum_size,
673
 
            search_key_name=serializer.search_key_name)
674
 
        inv_lines = result.to_lines()
675
 
        return self._inventory_add_lines(revision_id, parents,
676
 
            inv_lines, check_content=False)
677
 
 
678
 
    def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
679
 
                               parents, basis_inv=None, propagate_caches=False):
680
 
        """Add a new inventory expressed as a delta against another revision.
681
 
 
682
 
        :param basis_revision_id: The inventory id the delta was created
683
 
            against.
684
 
        :param delta: The inventory delta (see Inventory.apply_delta for
685
 
            details).
686
 
        :param new_revision_id: The revision id that the inventory is being
687
 
            added for.
688
 
        :param parents: The revision ids of the parents that revision_id is
689
 
            known to have and are in the repository already. These are supplied
690
 
            for repositories that depend on the inventory graph for revision
691
 
            graph access, as well as for those that pun ancestry with delta
692
 
            compression.
693
 
        :param basis_inv: The basis inventory if it is already known,
694
 
            otherwise None.
695
 
        :param propagate_caches: If True, the caches for this inventory are
696
 
          copied to and updated for the result if possible.
697
 
 
698
 
        :returns: (validator, new_inv)
699
 
            The validator(which is a sha1 digest, though what is sha'd is
700
 
            repository format specific) of the serialized inventory, and the
701
 
            resulting inventory.
702
 
        """
703
 
        if basis_revision_id == _mod_revision.NULL_REVISION:
704
 
            return KnitPackRepository.add_inventory_by_delta(self,
705
 
                basis_revision_id, delta, new_revision_id, parents)
706
 
        if not self.is_in_write_group():
707
 
            raise AssertionError("%r not in write group" % (self,))
708
 
        _mod_revision.check_not_reserved_id(new_revision_id)
709
 
        basis_tree = self.revision_tree(basis_revision_id)
710
 
        basis_tree.lock_read()
711
 
        try:
712
 
            if basis_inv is None:
713
 
                basis_inv = basis_tree.inventory
714
 
            result = basis_inv.create_by_apply_delta(delta, new_revision_id,
715
 
                propagate_caches=propagate_caches)
716
 
            inv_lines = result.to_lines()
717
 
            return self._inventory_add_lines(new_revision_id, parents,
718
 
                inv_lines, check_content=False), result
719
 
        finally:
720
 
            basis_tree.unlock()
721
 
 
722
 
    def _iter_inventories(self, revision_ids):
723
 
        """Iterate over many inventory objects."""
724
 
        keys = [(revision_id,) for revision_id in revision_ids]
725
 
        stream = self.inventories.get_record_stream(keys, 'unordered', True)
726
 
        texts = {}
727
 
        for record in stream:
728
 
            if record.storage_kind != 'absent':
729
 
                texts[record.key] = record.get_bytes_as('fulltext')
730
 
            else:
731
 
                raise errors.NoSuchRevision(self, record.key)
732
 
        for key in keys:
733
 
            yield inventory.CHKInventory.deserialise(self.chk_bytes, texts[key], key)
734
 
 
735
 
    def _iter_inventory_xmls(self, revision_ids):
736
 
        # Without a native 'xml' inventory, this method doesn't make sense, so
737
 
        # make it raise to trap naughty direct users.
738
 
        raise NotImplementedError(self._iter_inventory_xmls)
739
 
 
740
 
    def _find_present_inventory_ids(self, revision_ids):
741
 
        keys = [(r,) for r in revision_ids]
742
 
        parent_map = self.inventories.get_parent_map(keys)
743
 
        present_inventory_ids = set(k[-1] for k in parent_map)
744
 
        return present_inventory_ids
745
 
 
746
 
    def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
747
 
        """Find the file ids and versions affected by revisions.
748
 
 
749
 
        :param revisions: an iterable containing revision ids.
750
 
        :param _inv_weave: The inventory weave from this repository or None.
751
 
            If None, the inventory weave will be opened automatically.
752
 
        :return: a dictionary mapping altered file-ids to an iterable of
753
 
            revision_ids. Each altered file-ids has the exact revision_ids that
754
 
            altered it listed explicitly.
755
 
        """
756
 
        rich_root = self.supports_rich_root()
757
 
        bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
758
 
        file_id_revisions = {}
759
 
        pb = ui.ui_factory.nested_progress_bar()
760
 
        try:
761
 
            parent_ids = self._find_parent_ids_of_revisions(revision_ids)
762
 
            present_parent_inv_ids = self._find_present_inventory_ids(parent_ids)
763
 
            uninteresting_root_keys = set()
764
 
            interesting_root_keys = set()
765
 
            inventories_to_read = set(present_parent_inv_ids)
766
 
            inventories_to_read.update(revision_ids)
767
 
            for inv in self.iter_inventories(inventories_to_read):
768
 
                entry_chk_root_key = inv.id_to_entry.key()
769
 
                if inv.revision_id in present_parent_inv_ids:
770
 
                    uninteresting_root_keys.add(entry_chk_root_key)
771
 
                else:
772
 
                    interesting_root_keys.add(entry_chk_root_key)
773
 
 
774
 
            chk_bytes = self.chk_bytes
775
 
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
776
 
                        interesting_root_keys, uninteresting_root_keys,
777
 
                        pb=pb):
778
 
                for name, bytes in items:
779
 
                    (name_utf8, file_id, revision_id) = bytes_to_info(bytes)
780
 
                    if not rich_root and name_utf8 == '':
781
 
                        continue
782
 
                    try:
783
 
                        file_id_revisions[file_id].add(revision_id)
784
 
                    except KeyError:
785
 
                        file_id_revisions[file_id] = set([revision_id])
786
 
        finally:
787
 
            pb.finished()
788
 
        return file_id_revisions
789
 
 
790
 
    def find_text_key_references(self):
791
 
        """Find the text key references within the repository.
792
 
 
793
 
        :return: A dictionary mapping text keys ((fileid, revision_id) tuples)
794
 
            to whether they were referred to by the inventory of the
795
 
            revision_id that they contain. The inventory texts from all present
796
 
            revision ids are assessed to generate this report.
797
 
        """
798
 
        # XXX: Slow version but correct: rewrite as a series of delta
799
 
        # examinations/direct tree traversal. Note that that will require care
800
 
        # as a common node is reachable both from the inventory that added it,
801
 
        # and others afterwards.
802
 
        revision_keys = self.revisions.keys()
803
 
        result = {}
804
 
        rich_roots = self.supports_rich_root()
805
 
        pb = ui.ui_factory.nested_progress_bar()
806
 
        try:
807
 
            all_revs = self.all_revision_ids()
808
 
            total = len(all_revs)
809
 
            for pos, inv in enumerate(self.iter_inventories(all_revs)):
810
 
                pb.update("Finding text references", pos, total)
811
 
                for _, entry in inv.iter_entries():
812
 
                    if not rich_roots and entry.file_id == inv.root_id:
813
 
                        continue
814
 
                    key = (entry.file_id, entry.revision)
815
 
                    result.setdefault(key, False)
816
 
                    if entry.revision == inv.revision_id:
817
 
                        result[key] = True
818
 
            return result
819
 
        finally:
820
 
            pb.finished()
821
 
 
822
 
    def _reconcile_pack(self, collection, packs, extension, revs, pb):
823
 
        packer = GCCHKReconcilePacker(collection, packs, extension)
824
 
        return packer.pack(pb)
825
 
 
826
 
    def _get_source(self, to_format):
827
 
        """Return a source for streaming from this repository."""
828
 
        if isinstance(to_format, remote.RemoteRepositoryFormat):
829
 
            # Can't just check attributes on to_format with the current code,
830
 
            # work around this:
831
 
            to_format._ensure_real()
832
 
            to_format = to_format._custom_format
833
 
        if to_format.__class__ is self._format.__class__:
834
 
            # We must be exactly the same format, otherwise stuff like the chk
835
 
            # page layout might be different
836
 
            return GroupCHKStreamSource(self, to_format)
837
 
        return super(CHKInventoryRepository, self)._get_source(to_format)
838
 
 
839
 
 
840
 
class GroupCHKStreamSource(KnitPackStreamSource):
841
 
    """Used when both the source and target repo are GroupCHK repos."""
842
 
 
843
 
    def __init__(self, from_repository, to_format):
844
 
        """Create a StreamSource streaming from from_repository."""
845
 
        super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
846
 
        self._revision_keys = None
847
 
        self._text_keys = None
848
 
        self._text_fetch_order = 'groupcompress'
849
 
        self._chk_id_roots = None
850
 
        self._chk_p_id_roots = None
851
 
 
852
 
    def _get_inventory_stream(self, inventory_keys):
853
 
        """Get a stream of inventory texts.
854
 
 
855
 
        When this function returns, self._chk_id_roots and self._chk_p_id_roots
856
 
        should be populated.
857
 
        """
858
 
        self._chk_id_roots = []
859
 
        self._chk_p_id_roots = []
860
 
        def _filtered_inv_stream():
861
 
            id_roots_set = set()
862
 
            p_id_roots_set = set()
863
 
            source_vf = self.from_repository.inventories
864
 
            stream = source_vf.get_record_stream(inventory_keys,
865
 
                                                 'groupcompress', True)
866
 
            for record in stream:
867
 
                bytes = record.get_bytes_as('fulltext')
868
 
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
869
 
                                                             record.key)
870
 
                key = chk_inv.id_to_entry.key()
871
 
                if key not in id_roots_set:
872
 
                    self._chk_id_roots.append(key)
873
 
                    id_roots_set.add(key)
874
 
                p_id_map = chk_inv.parent_id_basename_to_file_id
875
 
                if p_id_map is None:
876
 
                    raise AssertionError('Parent id -> file_id map not set')
877
 
                key = p_id_map.key()
878
 
                if key not in p_id_roots_set:
879
 
                    p_id_roots_set.add(key)
880
 
                    self._chk_p_id_roots.append(key)
881
 
                yield record
882
 
            # We have finished processing all of the inventory records, we
883
 
            # don't need these sets anymore
884
 
            id_roots_set.clear()
885
 
            p_id_roots_set.clear()
886
 
        return ('inventories', _filtered_inv_stream())
887
 
 
888
 
    def _get_filtered_chk_streams(self, excluded_revision_ids):
889
 
        self._text_keys = set()
890
 
        excluded_revision_ids.discard(_mod_revision.NULL_REVISION)
891
 
        if not excluded_revision_ids:
892
 
            uninteresting_root_keys = set()
893
 
            uninteresting_pid_root_keys = set()
894
 
        else:
895
 
            # filter out any excluded revisions whose inventories are not
896
 
            # actually present
897
 
            # TODO: Update Repository.iter_inventories() to add
898
 
            #       ignore_missing=True
899
 
            present_ids = self.from_repository._find_present_inventory_ids(
900
 
                            excluded_revision_ids)
901
 
            uninteresting_root_keys = set()
902
 
            uninteresting_pid_root_keys = set()
903
 
            for inv in self.from_repository.iter_inventories(present_ids):
904
 
                uninteresting_root_keys.add(inv.id_to_entry.key())
905
 
                uninteresting_pid_root_keys.add(
906
 
                    inv.parent_id_basename_to_file_id.key())
907
 
        bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
908
 
        chk_bytes = self.from_repository.chk_bytes
909
 
        def _filter_id_to_entry():
910
 
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
911
 
                        self._chk_id_roots, uninteresting_root_keys):
912
 
                for name, bytes in items:
913
 
                    # Note: we don't care about name_utf8, because we are always
914
 
                    # rich-root = True
915
 
                    _, file_id, revision_id = bytes_to_info(bytes)
916
 
                    self._text_keys.add((file_id, revision_id))
917
 
                if record is not None:
918
 
                    yield record
919
 
            # Consumed
920
 
            self._chk_id_roots = None
921
 
        yield 'chk_bytes', _filter_id_to_entry()
922
 
        def _get_parent_id_basename_to_file_id_pages():
923
 
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
924
 
                        self._chk_p_id_roots, uninteresting_pid_root_keys):
925
 
                if record is not None:
926
 
                    yield record
927
 
            # Consumed
928
 
            self._chk_p_id_roots = None
929
 
        yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
930
 
 
931
 
    def get_stream(self, search):
932
 
        revision_ids = search.get_keys()
933
 
        for stream_info in self._fetch_revision_texts(revision_ids):
934
 
            yield stream_info
935
 
        self._revision_keys = [(rev_id,) for rev_id in revision_ids]
936
 
        yield self._get_inventory_stream(self._revision_keys)
937
 
        # TODO: The keys to exclude might be part of the search recipe
938
 
        # For now, exclude all parents that are at the edge of ancestry, for
939
 
        # which we have inventories
940
 
        from_repo = self.from_repository
941
 
        parent_ids = from_repo._find_parent_ids_of_revisions(revision_ids)
942
 
        for stream_info in self._get_filtered_chk_streams(parent_ids):
943
 
            yield stream_info
944
 
        yield self._get_text_stream()
945
 
 
946
 
    def get_stream_for_missing_keys(self, missing_keys):
947
 
        # missing keys can only occur when we are byte copying and not
948
 
        # translating (because translation means we don't send
949
 
        # unreconstructable deltas ever).
950
 
        missing_inventory_keys = set()
951
 
        for key in missing_keys:
952
 
            if key[0] != 'inventories':
953
 
                raise AssertionError('The only missing keys we should'
954
 
                    ' be filling in are inventory keys, not %s'
955
 
                    % (key[0],))
956
 
            missing_inventory_keys.add(key[1:])
957
 
        if self._chk_id_roots or self._chk_p_id_roots:
958
 
            raise AssertionError('Cannot call get_stream_for_missing_keys'
959
 
                ' untill all of get_stream() has been consumed.')
960
 
        # Yield the inventory stream, so we can find the chk stream
961
 
        yield self._get_inventory_stream(missing_inventory_keys)
962
 
        # We use the empty set for excluded_revision_ids, to make it clear that
963
 
        # we want to transmit all referenced chk pages.
964
 
        for stream_info in self._get_filtered_chk_streams(set()):
965
 
            yield stream_info
966
 
 
967
 
 
968
 
class RepositoryFormatCHK1(RepositoryFormatPack):
969
 
    """A hashed CHK+group compress pack repository."""
970
 
 
971
 
    repository_class = CHKInventoryRepository
972
 
    supports_external_lookups = True
973
 
    supports_chks = True
974
 
    # For right now, setting this to True gives us InterModel1And2 rather
975
 
    # than InterDifferingSerializer
976
 
    _commit_builder_class = PackRootCommitBuilder
977
 
    rich_root_data = True
978
 
    _serializer = chk_serializer.chk_serializer_255_bigpage
979
 
    _commit_inv_deltas = True
980
 
    # What index classes to use
981
 
    index_builder_class = BTreeBuilder
982
 
    index_class = BTreeGraphIndex
983
 
    # Note: We cannot unpack a delta that references a text we haven't
984
 
    # seen yet. There are 2 options, work in fulltexts, or require
985
 
    # topological sorting. Using fulltexts is more optimal for local
986
 
    # operations, because the source can be smart about extracting
987
 
    # multiple in-a-row (and sharing strings). Topological is better
988
 
    # for remote, because we access less data.
989
 
    _fetch_order = 'unordered'
990
 
    _fetch_uses_deltas = False # essentially ignored by the groupcompress code.
991
 
    fast_deltas = True
992
 
 
993
 
    def _get_matching_bzrdir(self):
994
 
        return bzrdir.format_registry.make_bzrdir('development6-rich-root')
995
 
 
996
 
    def _ignore_setting_bzrdir(self, format):
997
 
        pass
998
 
 
999
 
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1000
 
 
1001
 
    def get_format_string(self):
1002
 
        """See RepositoryFormat.get_format_string()."""
1003
 
        return ('Bazaar development format - group compression and chk inventory'
1004
 
                ' (needs bzr.dev from 1.14)\n')
1005
 
 
1006
 
    def get_format_description(self):
1007
 
        """See RepositoryFormat.get_format_description()."""
1008
 
        return ("Development repository format - rich roots, group compression"
1009
 
            " and chk inventories")
1010
 
 
1011
 
    def check_conversion_target(self, target_format):
1012
 
        if not target_format.rich_root_data:
1013
 
            raise errors.BadConversionTarget(
1014
 
                'Does not support rich root data.', target_format)
1015
 
        if not getattr(target_format, 'supports_tree_reference', False):
1016
 
            raise errors.BadConversionTarget(
1017
 
                'Does not support nested trees', target_format)
1018
 
 
1019