~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/groupcompress_repo.py

  • Committer: Martin Pool
  • Date: 2007-04-04 06:17:31 UTC
  • mto: This revision was merged to the branch mainline in revision 2397.
  • Revision ID: mbp@sourcefrog.net-20070404061731-tt2xrzllqhbodn83
Contents of TODO file moved into bug tracker

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Repository formats using CHK inventories and groupcompress compression."""
18
 
 
19
 
import time
20
 
 
21
 
from bzrlib import (
22
 
    bzrdir,
23
 
    chk_map,
24
 
    chk_serializer,
25
 
    debug,
26
 
    errors,
27
 
    index as _mod_index,
28
 
    inventory,
29
 
    knit,
30
 
    osutils,
31
 
    pack,
32
 
    remote,
33
 
    repository,
34
 
    revision as _mod_revision,
35
 
    trace,
36
 
    ui,
37
 
    )
38
 
from bzrlib.btree_index import (
39
 
    BTreeGraphIndex,
40
 
    BTreeBuilder,
41
 
    )
42
 
from bzrlib.index import GraphIndex, GraphIndexBuilder
43
 
from bzrlib.groupcompress import (
44
 
    _GCGraphIndex,
45
 
    GroupCompressVersionedFiles,
46
 
    )
47
 
from bzrlib.repofmt.pack_repo import (
48
 
    Pack,
49
 
    NewPack,
50
 
    KnitPackRepository,
51
 
    PackRootCommitBuilder,
52
 
    RepositoryPackCollection,
53
 
    RepositoryFormatPack,
54
 
    Packer,
55
 
    )
56
 
 
57
 
 
58
 
class GCPack(NewPack):
59
 
 
60
 
    def __init__(self, pack_collection, upload_suffix='', file_mode=None):
61
 
        """Create a NewPack instance.
62
 
 
63
 
        :param pack_collection: A PackCollection into which this is being
64
 
            inserted.
65
 
        :param upload_suffix: An optional suffix to be given to any temporary
66
 
            files created during the pack creation. e.g '.autopack'
67
 
        :param file_mode: An optional file mode to create the new files with.
68
 
        """
69
 
        # replaced from NewPack to:
70
 
        # - change inventory reference list length to 1
71
 
        # - change texts reference lists to 1
72
 
        # TODO: patch this to be parameterised
73
 
 
74
 
        # The relative locations of the packs are constrained, but all are
75
 
        # passed in because the caller has them, so as to avoid object churn.
76
 
        index_builder_class = pack_collection._index_builder_class
77
 
        # from brisbane-core
78
 
        if pack_collection.chk_index is not None:
79
 
            chk_index = index_builder_class(reference_lists=0)
80
 
        else:
81
 
            chk_index = None
82
 
        Pack.__init__(self,
83
 
            # Revisions: parents list, no text compression.
84
 
            index_builder_class(reference_lists=1),
85
 
            # Inventory: We want to map compression only, but currently the
86
 
            # knit code hasn't been updated enough to understand that, so we
87
 
            # have a regular 2-list index giving parents and compression
88
 
            # source.
89
 
            index_builder_class(reference_lists=1),
90
 
            # Texts: compression and per file graph, for all fileids - so two
91
 
            # reference lists and two elements in the key tuple.
92
 
            index_builder_class(reference_lists=1, key_elements=2),
93
 
            # Signatures: Just blobs to store, no compression, no parents
94
 
            # listing.
95
 
            index_builder_class(reference_lists=0),
96
 
            # CHK based storage - just blobs, no compression or parents.
97
 
            chk_index=chk_index
98
 
            )
99
 
        self._pack_collection = pack_collection
100
 
        # When we make readonly indices, we need this.
101
 
        self.index_class = pack_collection._index_class
102
 
        # where should the new pack be opened
103
 
        self.upload_transport = pack_collection._upload_transport
104
 
        # where are indices written out to
105
 
        self.index_transport = pack_collection._index_transport
106
 
        # where is the pack renamed to when it is finished?
107
 
        self.pack_transport = pack_collection._pack_transport
108
 
        # What file mode to upload the pack and indices with.
109
 
        self._file_mode = file_mode
110
 
        # tracks the content written to the .pack file.
111
 
        self._hash = osutils.md5()
112
 
        # a four-tuple with the length in bytes of the indices, once the pack
113
 
        # is finalised. (rev, inv, text, sigs)
114
 
        self.index_sizes = None
115
 
        # How much data to cache when writing packs. Note that this is not
116
 
        # synchronised with reads, because it's not in the transport layer, so
117
 
        # is not safe unless the client knows it won't be reading from the pack
118
 
        # under creation.
119
 
        self._cache_limit = 0
120
 
        # the temporary pack file name.
121
 
        self.random_name = osutils.rand_chars(20) + upload_suffix
122
 
        # when was this pack started ?
123
 
        self.start_time = time.time()
124
 
        # open an output stream for the data added to the pack.
125
 
        self.write_stream = self.upload_transport.open_write_stream(
126
 
            self.random_name, mode=self._file_mode)
127
 
        if 'pack' in debug.debug_flags:
128
 
            trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
129
 
                time.ctime(), self.upload_transport.base, self.random_name,
130
 
                time.time() - self.start_time)
131
 
        # A list of byte sequences to be written to the new pack, and the
132
 
        # aggregate size of them.  Stored as a list rather than separate
133
 
        # variables so that the _write_data closure below can update them.
134
 
        self._buffer = [[], 0]
135
 
        # create a callable for adding data
136
 
        #
137
 
        # robertc says- this is a closure rather than a method on the object
138
 
        # so that the variables are locals, and faster than accessing object
139
 
        # members.
140
 
        def _write_data(bytes, flush=False, _buffer=self._buffer,
141
 
            _write=self.write_stream.write, _update=self._hash.update):
142
 
            _buffer[0].append(bytes)
143
 
            _buffer[1] += len(bytes)
144
 
            # buffer cap
145
 
            if _buffer[1] > self._cache_limit or flush:
146
 
                bytes = ''.join(_buffer[0])
147
 
                _write(bytes)
148
 
                _update(bytes)
149
 
                _buffer[:] = [[], 0]
150
 
        # expose this on self, for the occasion when clients want to add data.
151
 
        self._write_data = _write_data
152
 
        # a pack writer object to serialise pack records.
153
 
        self._writer = pack.ContainerWriter(self._write_data)
154
 
        self._writer.begin()
155
 
        # what state is the pack in? (open, finished, aborted)
156
 
        self._state = 'open'
157
 
 
158
 
    def _check_references(self):
159
 
        """Make sure our external references are present.
160
 
 
161
 
        Packs are allowed to have deltas whose base is not in the pack, but it
162
 
        must be present somewhere in this collection.  It is not allowed to
163
 
        have deltas based on a fallback repository.
164
 
        (See <https://bugs.launchpad.net/bzr/+bug/288751>)
165
 
        """
166
 
        # Groupcompress packs don't have any external references
167
 
 
168
 
 
169
 
class GCCHKPacker(Packer):
170
 
    """This class understand what it takes to collect a GCCHK repo."""
171
 
 
172
 
    def __init__(self, pack_collection, packs, suffix, revision_ids=None,
173
 
                 reload_func=None):
174
 
        super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
175
 
                                          revision_ids=revision_ids,
176
 
                                          reload_func=reload_func)
177
 
        self._pack_collection = pack_collection
178
 
        # ATM, We only support this for GCCHK repositories
179
 
        if pack_collection.chk_index is None:
180
 
            raise AssertionError('pack_collection.chk_index should not be None')
181
 
        self._gather_text_refs = False
182
 
        self._chk_id_roots = []
183
 
        self._chk_p_id_roots = []
184
 
        self._text_refs = None
185
 
        # set by .pack() if self.revision_ids is not None
186
 
        self.revision_keys = None
187
 
 
188
 
    def _get_progress_stream(self, source_vf, keys, message, pb):
189
 
        def pb_stream():
190
 
            substream = source_vf.get_record_stream(keys, 'groupcompress', True)
191
 
            for idx, record in enumerate(substream):
192
 
                if pb is not None:
193
 
                    pb.update(message, idx + 1, len(keys))
194
 
                yield record
195
 
        return pb_stream()
196
 
 
197
 
    def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
198
 
        """Filter the texts of inventories, to find the chk pages."""
199
 
        total_keys = len(keys)
200
 
        def _filtered_inv_stream():
201
 
            id_roots_set = set()
202
 
            p_id_roots_set = set()
203
 
            stream = source_vf.get_record_stream(keys, 'groupcompress', True)
204
 
            for idx, record in enumerate(stream):
205
 
                bytes = record.get_bytes_as('fulltext')
206
 
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
207
 
                                                             record.key)
208
 
                if pb is not None:
209
 
                    pb.update('inv', idx, total_keys)
210
 
                key = chk_inv.id_to_entry.key()
211
 
                if key not in id_roots_set:
212
 
                    self._chk_id_roots.append(key)
213
 
                    id_roots_set.add(key)
214
 
                p_id_map = chk_inv.parent_id_basename_to_file_id
215
 
                if p_id_map is None:
216
 
                    raise AssertionError('Parent id -> file_id map not set')
217
 
                key = p_id_map.key()
218
 
                if key not in p_id_roots_set:
219
 
                    p_id_roots_set.add(key)
220
 
                    self._chk_p_id_roots.append(key)
221
 
                yield record
222
 
            # We have finished processing all of the inventory records, we
223
 
            # don't need these sets anymore
224
 
            id_roots_set.clear()
225
 
            p_id_roots_set.clear()
226
 
        return _filtered_inv_stream()
227
 
 
228
 
    def _get_chk_streams(self, source_vf, keys, pb=None):
229
 
        # We want to stream the keys from 'id_roots', and things they
230
 
        # reference, and then stream things from p_id_roots and things they
231
 
        # reference, and then any remaining keys that we didn't get to.
232
 
 
233
 
        # We also group referenced texts together, so if one root references a
234
 
        # text with prefix 'a', and another root references a node with prefix
235
 
        # 'a', we want to yield those nodes before we yield the nodes for 'b'
236
 
        # This keeps 'similar' nodes together.
237
 
 
238
 
        # Note: We probably actually want multiple streams here, to help the
239
 
        #       client understand that the different levels won't compress well
240
 
        #       against each other.
241
 
        #       Test the difference between using one Group per level, and
242
 
        #       using 1 Group per prefix. (so '' (root) would get a group, then
243
 
        #       all the references to search-key 'a' would get a group, etc.)
244
 
        total_keys = len(keys)
245
 
        remaining_keys = set(keys)
246
 
        counter = [0]
247
 
        if self._gather_text_refs:
248
 
            bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
249
 
            self._text_refs = set()
250
 
        def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
251
 
            cur_keys = root_keys
252
 
            while cur_keys:
253
 
                keys_by_search_prefix = {}
254
 
                remaining_keys.difference_update(cur_keys)
255
 
                next_keys = set()
256
 
                def handle_internal_node(node):
257
 
                    for prefix, value in node._items.iteritems():
258
 
                        # We don't want to request the same key twice, and we
259
 
                        # want to order it by the first time it is seen.
260
 
                        # Even further, we don't want to request a key which is
261
 
                        # not in this group of pack files (it should be in the
262
 
                        # repo, but it doesn't have to be in the group being
263
 
                        # packed.)
264
 
                        # TODO: consider how to treat externally referenced chk
265
 
                        #       pages as 'external_references' so that we
266
 
                        #       always fill them in for stacked branches
267
 
                        if value not in next_keys and value in remaining_keys:
268
 
                            keys_by_search_prefix.setdefault(prefix,
269
 
                                []).append(value)
270
 
                            next_keys.add(value)
271
 
                def handle_leaf_node(node):
272
 
                    # Store is None, because we know we have a LeafNode, and we
273
 
                    # just want its entries
274
 
                    for file_id, bytes in node.iteritems(None):
275
 
                        name_utf8, file_id, revision_id = bytes_to_info(bytes)
276
 
                        self._text_refs.add((file_id, revision_id))
277
 
                def next_stream():
278
 
                    stream = source_vf.get_record_stream(cur_keys,
279
 
                                                         'as-requested', True)
280
 
                    for record in stream:
281
 
                        bytes = record.get_bytes_as('fulltext')
282
 
                        # We don't care about search_key_func for this code,
283
 
                        # because we only care about external references.
284
 
                        node = chk_map._deserialise(bytes, record.key,
285
 
                                                    search_key_func=None)
286
 
                        common_base = node._search_prefix
287
 
                        if isinstance(node, chk_map.InternalNode):
288
 
                            handle_internal_node(node)
289
 
                        elif parse_leaf_nodes:
290
 
                            handle_leaf_node(node)
291
 
                        counter[0] += 1
292
 
                        if pb is not None:
293
 
                            pb.update('chk node', counter[0], total_keys)
294
 
                        yield record
295
 
                yield next_stream()
296
 
                # Double check that we won't be emitting any keys twice
297
 
                # If we get rid of the pre-calculation of all keys, we could
298
 
                # turn this around and do
299
 
                # next_keys.difference_update(seen_keys)
300
 
                # However, we also may have references to chk pages in another
301
 
                # pack file during autopack. We filter earlier, so we should no
302
 
                # longer need to do this
303
 
                # next_keys = next_keys.intersection(remaining_keys)
304
 
                cur_keys = []
305
 
                for prefix in sorted(keys_by_search_prefix):
306
 
                    cur_keys.extend(keys_by_search_prefix.pop(prefix))
307
 
        for stream in _get_referenced_stream(self._chk_id_roots,
308
 
                                             self._gather_text_refs):
309
 
            yield stream
310
 
        del self._chk_id_roots
311
 
        # while it isn't really possible for chk_id_roots to not be in the
312
 
        # local group of packs, it is possible that the tree shape has not
313
 
        # changed recently, so we need to filter _chk_p_id_roots by the
314
 
        # available keys
315
 
        chk_p_id_roots = [key for key in self._chk_p_id_roots
316
 
                          if key in remaining_keys]
317
 
        del self._chk_p_id_roots
318
 
        for stream in _get_referenced_stream(chk_p_id_roots, False):
319
 
            yield stream
320
 
        if remaining_keys:
321
 
            trace.mutter('There were %d keys in the chk index, %d of which'
322
 
                         ' were not referenced', total_keys,
323
 
                         len(remaining_keys))
324
 
            if self.revision_ids is None:
325
 
                stream = source_vf.get_record_stream(remaining_keys,
326
 
                                                     'unordered', True)
327
 
                yield stream
328
 
 
329
 
    def _build_vf(self, index_name, parents, delta, for_write=False):
330
 
        """Build a VersionedFiles instance on top of this group of packs."""
331
 
        index_name = index_name + '_index'
332
 
        index_to_pack = {}
333
 
        access = knit._DirectPackAccess(index_to_pack)
334
 
        if for_write:
335
 
            # Use new_pack
336
 
            if self.new_pack is None:
337
 
                raise AssertionError('No new pack has been set')
338
 
            index = getattr(self.new_pack, index_name)
339
 
            index_to_pack[index] = self.new_pack.access_tuple()
340
 
            index.set_optimize(for_size=True)
341
 
            access.set_writer(self.new_pack._writer, index,
342
 
                              self.new_pack.access_tuple())
343
 
            add_callback = index.add_nodes
344
 
        else:
345
 
            indices = []
346
 
            for pack in self.packs:
347
 
                sub_index = getattr(pack, index_name)
348
 
                index_to_pack[sub_index] = pack.access_tuple()
349
 
                indices.append(sub_index)
350
 
            index = _mod_index.CombinedGraphIndex(indices)
351
 
            add_callback = None
352
 
        vf = GroupCompressVersionedFiles(
353
 
            _GCGraphIndex(index,
354
 
                          add_callback=add_callback,
355
 
                          parents=parents,
356
 
                          is_locked=self._pack_collection.repo.is_locked),
357
 
            access=access,
358
 
            delta=delta)
359
 
        return vf
360
 
 
361
 
    def _build_vfs(self, index_name, parents, delta):
362
 
        """Build the source and target VersionedFiles."""
363
 
        source_vf = self._build_vf(index_name, parents,
364
 
                                   delta, for_write=False)
365
 
        target_vf = self._build_vf(index_name, parents,
366
 
                                   delta, for_write=True)
367
 
        return source_vf, target_vf
368
 
 
369
 
    def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
370
 
                     pb_offset):
371
 
        trace.mutter('repacking %d %s', len(keys), message)
372
 
        self.pb.update('repacking %s' % (message,), pb_offset)
373
 
        child_pb = ui.ui_factory.nested_progress_bar()
374
 
        try:
375
 
            stream = vf_to_stream(source_vf, keys, message, child_pb)
376
 
            for _ in target_vf._insert_record_stream(stream,
377
 
                                                     random_id=True,
378
 
                                                     reuse_blocks=False):
379
 
                pass
380
 
        finally:
381
 
            child_pb.finished()
382
 
 
383
 
    def _copy_revision_texts(self):
384
 
        source_vf, target_vf = self._build_vfs('revision', True, False)
385
 
        if not self.revision_keys:
386
 
            # We are doing a full fetch, aka 'pack'
387
 
            self.revision_keys = source_vf.keys()
388
 
        self._copy_stream(source_vf, target_vf, self.revision_keys,
389
 
                          'revisions', self._get_progress_stream, 1)
390
 
 
391
 
    def _copy_inventory_texts(self):
392
 
        source_vf, target_vf = self._build_vfs('inventory', True, True)
393
 
        self._copy_stream(source_vf, target_vf, self.revision_keys,
394
 
                          'inventories', self._get_filtered_inv_stream, 2)
395
 
 
396
 
    def _copy_chk_texts(self):
397
 
        source_vf, target_vf = self._build_vfs('chk', False, False)
398
 
        # TODO: This is technically spurious... if it is a performance issue,
399
 
        #       remove it
400
 
        total_keys = source_vf.keys()
401
 
        trace.mutter('repacking chk: %d id_to_entry roots,'
402
 
                     ' %d p_id_map roots, %d total keys',
403
 
                     len(self._chk_id_roots), len(self._chk_p_id_roots),
404
 
                     len(total_keys))
405
 
        self.pb.update('repacking chk', 3)
406
 
        child_pb = ui.ui_factory.nested_progress_bar()
407
 
        try:
408
 
            for stream in self._get_chk_streams(source_vf, total_keys,
409
 
                                                pb=child_pb):
410
 
                for _ in target_vf._insert_record_stream(stream,
411
 
                                                         random_id=True,
412
 
                                                         reuse_blocks=False):
413
 
                    pass
414
 
        finally:
415
 
            child_pb.finished()
416
 
 
417
 
    def _copy_text_texts(self):
418
 
        source_vf, target_vf = self._build_vfs('text', True, True)
419
 
        # XXX: We don't walk the chk map to determine referenced (file_id,
420
 
        #      revision_id) keys.  We don't do it yet because you really need
421
 
        #      to filter out the ones that are present in the parents of the
422
 
        #      rev just before the ones you are copying, otherwise the filter
423
 
        #      is grabbing too many keys...
424
 
        text_keys = source_vf.keys()
425
 
        self._copy_stream(source_vf, target_vf, text_keys,
426
 
                          'text', self._get_progress_stream, 4)
427
 
 
428
 
    def _copy_signature_texts(self):
429
 
        source_vf, target_vf = self._build_vfs('signature', False, False)
430
 
        signature_keys = source_vf.keys()
431
 
        signature_keys.intersection(self.revision_keys)
432
 
        self._copy_stream(source_vf, target_vf, signature_keys,
433
 
                          'signatures', self._get_progress_stream, 5)
434
 
 
435
 
    def _create_pack_from_packs(self):
436
 
        self.pb.update('repacking', 0, 7)
437
 
        self.new_pack = self.open_pack()
438
 
        # Is this necessary for GC ?
439
 
        self.new_pack.set_write_cache_size(1024*1024)
440
 
        self._copy_revision_texts()
441
 
        self._copy_inventory_texts()
442
 
        self._copy_chk_texts()
443
 
        self._copy_text_texts()
444
 
        self._copy_signature_texts()
445
 
        self.new_pack._check_references()
446
 
        if not self._use_pack(self.new_pack):
447
 
            self.new_pack.abort()
448
 
            return None
449
 
        self.pb.update('finishing repack', 6, 7)
450
 
        self.new_pack.finish()
451
 
        self._pack_collection.allocate(self.new_pack)
452
 
        return self.new_pack
453
 
 
454
 
 
455
 
class GCCHKReconcilePacker(GCCHKPacker):
456
 
    """A packer which regenerates indices etc as it copies.
457
 
 
458
 
    This is used by ``bzr reconcile`` to cause parent text pointers to be
459
 
    regenerated.
460
 
    """
461
 
 
462
 
    def __init__(self, *args, **kwargs):
463
 
        super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
464
 
        self._data_changed = False
465
 
        self._gather_text_refs = True
466
 
 
467
 
    def _copy_inventory_texts(self):
468
 
        source_vf, target_vf = self._build_vfs('inventory', True, True)
469
 
        self._copy_stream(source_vf, target_vf, self.revision_keys,
470
 
                          'inventories', self._get_filtered_inv_stream, 2)
471
 
        if source_vf.keys() != self.revision_keys:
472
 
            self._data_changed = True
473
 
 
474
 
    def _copy_text_texts(self):
475
 
        """generate what texts we should have and then copy."""
476
 
        source_vf, target_vf = self._build_vfs('text', True, True)
477
 
        trace.mutter('repacking %d texts', len(self._text_refs))
478
 
        self.pb.update("repacking texts", 4)
479
 
        # we have three major tasks here:
480
 
        # 1) generate the ideal index
481
 
        repo = self._pack_collection.repo
482
 
        # We want the one we just wrote, so base it on self.new_pack
483
 
        revision_vf = self._build_vf('revision', True, False, for_write=True)
484
 
        ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
485
 
        # Strip keys back into revision_ids.
486
 
        ancestors = dict((k[0], tuple([p[0] for p in parents]))
487
 
                         for k, parents in ancestor_keys.iteritems())
488
 
        del ancestor_keys
489
 
        # TODO: _generate_text_key_index should be much cheaper to generate from
490
 
        #       a chk repository, rather than the current implementation
491
 
        ideal_index = repo._generate_text_key_index(None, ancestors)
492
 
        file_id_parent_map = source_vf.get_parent_map(self._text_refs)
493
 
        # 2) generate a keys list that contains all the entries that can
494
 
        #    be used as-is, with corrected parents.
495
 
        ok_keys = []
496
 
        new_parent_keys = {} # (key, parent_keys)
497
 
        discarded_keys = []
498
 
        NULL_REVISION = _mod_revision.NULL_REVISION
499
 
        for key in self._text_refs:
500
 
            # 0 - index
501
 
            # 1 - key
502
 
            # 2 - value
503
 
            # 3 - refs
504
 
            try:
505
 
                ideal_parents = tuple(ideal_index[key])
506
 
            except KeyError:
507
 
                discarded_keys.append(key)
508
 
                self._data_changed = True
509
 
            else:
510
 
                if ideal_parents == (NULL_REVISION,):
511
 
                    ideal_parents = ()
512
 
                source_parents = file_id_parent_map[key]
513
 
                if ideal_parents == source_parents:
514
 
                    # no change needed.
515
 
                    ok_keys.append(key)
516
 
                else:
517
 
                    # We need to change the parent graph, but we don't need to
518
 
                    # re-insert the text (since we don't pun the compression
519
 
                    # parent with the parents list)
520
 
                    self._data_changed = True
521
 
                    new_parent_keys[key] = ideal_parents
522
 
        # we're finished with some data.
523
 
        del ideal_index
524
 
        del file_id_parent_map
525
 
        # 3) bulk copy the data, updating records than need it
526
 
        def _update_parents_for_texts():
527
 
            stream = source_vf.get_record_stream(self._text_refs,
528
 
                'groupcompress', False)
529
 
            for record in stream:
530
 
                if record.key in new_parent_keys:
531
 
                    record.parents = new_parent_keys[record.key]
532
 
                yield record
533
 
        target_vf.insert_record_stream(_update_parents_for_texts())
534
 
 
535
 
    def _use_pack(self, new_pack):
536
 
        """Override _use_pack to check for reconcile having changed content."""
537
 
        return new_pack.data_inserted() and self._data_changed
538
 
 
539
 
 
540
 
class GCRepositoryPackCollection(RepositoryPackCollection):
541
 
 
542
 
    pack_factory = GCPack
543
 
 
544
 
    def _already_packed(self):
545
 
        """Is the collection already packed?"""
546
 
        # Always repack GC repositories for now
547
 
        return False
548
 
 
549
 
    def _execute_pack_operations(self, pack_operations,
550
 
                                 _packer_class=GCCHKPacker,
551
 
                                 reload_func=None):
552
 
        """Execute a series of pack operations.
553
 
 
554
 
        :param pack_operations: A list of [revision_count, packs_to_combine].
555
 
        :param _packer_class: The class of packer to use (default: Packer).
556
 
        :return: None.
557
 
        """
558
 
        # XXX: Copied across from RepositoryPackCollection simply because we
559
 
        #      want to override the _packer_class ... :(
560
 
        for revision_count, packs in pack_operations:
561
 
            # we may have no-ops from the setup logic
562
 
            if len(packs) == 0:
563
 
                continue
564
 
            packer = GCCHKPacker(self, packs, '.autopack',
565
 
                                 reload_func=reload_func)
566
 
            try:
567
 
                packer.pack()
568
 
            except errors.RetryWithNewPacks:
569
 
                # An exception is propagating out of this context, make sure
570
 
                # this packer has cleaned up. Packer() doesn't set its new_pack
571
 
                # state into the RepositoryPackCollection object, so we only
572
 
                # have access to it directly here.
573
 
                if packer.new_pack is not None:
574
 
                    packer.new_pack.abort()
575
 
                raise
576
 
            for pack in packs:
577
 
                self._remove_pack_from_memory(pack)
578
 
        # record the newly available packs and stop advertising the old
579
 
        # packs
580
 
        self._save_pack_names(clear_obsolete_packs=True)
581
 
        # Move the old packs out of the way now they are no longer referenced.
582
 
        for revision_count, packs in pack_operations:
583
 
            self._obsolete_packs(packs)
584
 
 
585
 
 
586
 
class CHKInventoryRepository(KnitPackRepository):
587
 
    """subclass of KnitPackRepository that uses CHK based inventories."""
588
 
 
589
 
    def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
590
 
        _serializer):
591
 
        """Overridden to change pack collection class."""
592
 
        KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
593
 
            _commit_builder_class, _serializer)
594
 
        # and now replace everything it did :)
595
 
        index_transport = self._transport.clone('indices')
596
 
        self._pack_collection = GCRepositoryPackCollection(self,
597
 
            self._transport, index_transport,
598
 
            self._transport.clone('upload'),
599
 
            self._transport.clone('packs'),
600
 
            _format.index_builder_class,
601
 
            _format.index_class,
602
 
            use_chk_index=self._format.supports_chks,
603
 
            )
604
 
        self.inventories = GroupCompressVersionedFiles(
605
 
            _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
606
 
                add_callback=self._pack_collection.inventory_index.add_callback,
607
 
                parents=True, is_locked=self.is_locked),
608
 
            access=self._pack_collection.inventory_index.data_access)
609
 
        self.revisions = GroupCompressVersionedFiles(
610
 
            _GCGraphIndex(self._pack_collection.revision_index.combined_index,
611
 
                add_callback=self._pack_collection.revision_index.add_callback,
612
 
                parents=True, is_locked=self.is_locked),
613
 
            access=self._pack_collection.revision_index.data_access,
614
 
            delta=False)
615
 
        self.signatures = GroupCompressVersionedFiles(
616
 
            _GCGraphIndex(self._pack_collection.signature_index.combined_index,
617
 
                add_callback=self._pack_collection.signature_index.add_callback,
618
 
                parents=False, is_locked=self.is_locked),
619
 
            access=self._pack_collection.signature_index.data_access,
620
 
            delta=False)
621
 
        self.texts = GroupCompressVersionedFiles(
622
 
            _GCGraphIndex(self._pack_collection.text_index.combined_index,
623
 
                add_callback=self._pack_collection.text_index.add_callback,
624
 
                parents=True, is_locked=self.is_locked),
625
 
            access=self._pack_collection.text_index.data_access)
626
 
        # No parents, individual CHK pages don't have specific ancestry
627
 
        self.chk_bytes = GroupCompressVersionedFiles(
628
 
            _GCGraphIndex(self._pack_collection.chk_index.combined_index,
629
 
                add_callback=self._pack_collection.chk_index.add_callback,
630
 
                parents=False, is_locked=self.is_locked),
631
 
            access=self._pack_collection.chk_index.data_access)
632
 
        # True when the repository object is 'write locked' (as opposed to the
633
 
        # physical lock only taken out around changes to the pack-names list.)
634
 
        # Another way to represent this would be a decorator around the control
635
 
        # files object that presents logical locks as physical ones - if this
636
 
        # gets ugly consider that alternative design. RBC 20071011
637
 
        self._write_lock_count = 0
638
 
        self._transaction = None
639
 
        # for tests
640
 
        self._reconcile_does_inventory_gc = True
641
 
        self._reconcile_fixes_text_parents = True
642
 
        self._reconcile_backsup_inventory = False
643
 
 
644
 
    def _add_inventory_checked(self, revision_id, inv, parents):
645
 
        """Add inv to the repository after checking the inputs.
646
 
 
647
 
        This function can be overridden to allow different inventory styles.
648
 
 
649
 
        :seealso: add_inventory, for the contract.
650
 
        """
651
 
        # make inventory
652
 
        serializer = self._format._serializer
653
 
        result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
654
 
            maximum_size=serializer.maximum_size,
655
 
            search_key_name=serializer.search_key_name)
656
 
        inv_lines = result.to_lines()
657
 
        return self._inventory_add_lines(revision_id, parents,
658
 
            inv_lines, check_content=False)
659
 
 
660
 
    def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
661
 
                               parents, basis_inv=None, propagate_caches=False):
662
 
        """Add a new inventory expressed as a delta against another revision.
663
 
 
664
 
        :param basis_revision_id: The inventory id the delta was created
665
 
            against.
666
 
        :param delta: The inventory delta (see Inventory.apply_delta for
667
 
            details).
668
 
        :param new_revision_id: The revision id that the inventory is being
669
 
            added for.
670
 
        :param parents: The revision ids of the parents that revision_id is
671
 
            known to have and are in the repository already. These are supplied
672
 
            for repositories that depend on the inventory graph for revision
673
 
            graph access, as well as for those that pun ancestry with delta
674
 
            compression.
675
 
        :param basis_inv: The basis inventory if it is already known,
676
 
            otherwise None.
677
 
        :param propagate_caches: If True, the caches for this inventory are
678
 
          copied to and updated for the result if possible.
679
 
 
680
 
        :returns: (validator, new_inv)
681
 
            The validator(which is a sha1 digest, though what is sha'd is
682
 
            repository format specific) of the serialized inventory, and the
683
 
            resulting inventory.
684
 
        """
685
 
        if basis_revision_id == _mod_revision.NULL_REVISION:
686
 
            return KnitPackRepository.add_inventory_by_delta(self,
687
 
                basis_revision_id, delta, new_revision_id, parents)
688
 
        if not self.is_in_write_group():
689
 
            raise AssertionError("%r not in write group" % (self,))
690
 
        _mod_revision.check_not_reserved_id(new_revision_id)
691
 
        basis_tree = self.revision_tree(basis_revision_id)
692
 
        basis_tree.lock_read()
693
 
        try:
694
 
            if basis_inv is None:
695
 
                basis_inv = basis_tree.inventory
696
 
            result = basis_inv.create_by_apply_delta(delta, new_revision_id,
697
 
                propagate_caches=propagate_caches)
698
 
            inv_lines = result.to_lines()
699
 
            return self._inventory_add_lines(new_revision_id, parents,
700
 
                inv_lines, check_content=False), result
701
 
        finally:
702
 
            basis_tree.unlock()
703
 
 
704
 
    def _iter_inventories(self, revision_ids):
705
 
        """Iterate over many inventory objects."""
706
 
        keys = [(revision_id,) for revision_id in revision_ids]
707
 
        stream = self.inventories.get_record_stream(keys, 'unordered', True)
708
 
        texts = {}
709
 
        for record in stream:
710
 
            if record.storage_kind != 'absent':
711
 
                texts[record.key] = record.get_bytes_as('fulltext')
712
 
            else:
713
 
                raise errors.NoSuchRevision(self, record.key)
714
 
        for key in keys:
715
 
            yield inventory.CHKInventory.deserialise(self.chk_bytes, texts[key], key)
716
 
 
717
 
    def _iter_inventory_xmls(self, revision_ids):
718
 
        # Without a native 'xml' inventory, this method doesn't make sense, so
719
 
        # make it raise to trap naughty direct users.
720
 
        raise NotImplementedError(self._iter_inventory_xmls)
721
 
 
722
 
    def _find_revision_outside_set(self, revision_ids):
723
 
        revision_set = frozenset(revision_ids)
724
 
        for revid in revision_ids:
725
 
            parent_ids = self.get_parent_map([revid]).get(revid, ())
726
 
            for parent in parent_ids:
727
 
                if parent in revision_set:
728
 
                    # Parent is not outside the set
729
 
                    continue
730
 
                if parent not in self.get_parent_map([parent]):
731
 
                    # Parent is a ghost
732
 
                    continue
733
 
                return parent
734
 
        return _mod_revision.NULL_REVISION
735
 
 
736
 
    def _find_file_keys_to_fetch(self, revision_ids, pb):
737
 
        rich_root = self.supports_rich_root()
738
 
        revision_outside_set = self._find_revision_outside_set(revision_ids)
739
 
        if revision_outside_set == _mod_revision.NULL_REVISION:
740
 
            uninteresting_root_keys = set()
741
 
        else:
742
 
            uninteresting_inv = self.get_inventory(revision_outside_set)
743
 
            uninteresting_root_keys = set([uninteresting_inv.id_to_entry.key()])
744
 
        interesting_root_keys = set()
745
 
        for idx, inv in enumerate(self.iter_inventories(revision_ids)):
746
 
            interesting_root_keys.add(inv.id_to_entry.key())
747
 
        revision_ids = frozenset(revision_ids)
748
 
        file_id_revisions = {}
749
 
        bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
750
 
        for record, items in chk_map.iter_interesting_nodes(self.chk_bytes,
751
 
                    interesting_root_keys, uninteresting_root_keys,
752
 
                    pb=pb):
753
 
            # This is cheating a bit to use the last grabbed 'inv', but it
754
 
            # works
755
 
            for name, bytes in items:
756
 
                (name_utf8, file_id, revision_id) = bytes_to_info(bytes)
757
 
                if not rich_root and name_utf8 == '':
758
 
                    continue
759
 
                if revision_id in revision_ids:
760
 
                    # Would we rather build this up into file_id => revision
761
 
                    # maps?
762
 
                    try:
763
 
                        file_id_revisions[file_id].add(revision_id)
764
 
                    except KeyError:
765
 
                        file_id_revisions[file_id] = set([revision_id])
766
 
        for file_id, revisions in file_id_revisions.iteritems():
767
 
            yield ('file', file_id, revisions)
768
 
 
769
 
    def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
770
 
        """Find the file ids and versions affected by revisions.
771
 
 
772
 
        :param revisions: an iterable containing revision ids.
773
 
        :param _inv_weave: The inventory weave from this repository or None.
774
 
            If None, the inventory weave will be opened automatically.
775
 
        :return: a dictionary mapping altered file-ids to an iterable of
776
 
            revision_ids. Each altered file-ids has the exact revision_ids that
777
 
            altered it listed explicitly.
778
 
        """
779
 
        rich_roots = self.supports_rich_root()
780
 
        result = {}
781
 
        pb = ui.ui_factory.nested_progress_bar()
782
 
        try:
783
 
            total = len(revision_ids)
784
 
            for pos, inv in enumerate(self.iter_inventories(revision_ids)):
785
 
                pb.update("Finding text references", pos, total)
786
 
                for entry in inv.iter_just_entries():
787
 
                    if entry.revision != inv.revision_id:
788
 
                        continue
789
 
                    if not rich_roots and entry.file_id == inv.root_id:
790
 
                        continue
791
 
                    alterations = result.setdefault(entry.file_id, set([]))
792
 
                    alterations.add(entry.revision)
793
 
            return result
794
 
        finally:
795
 
            pb.finished()
796
 
 
797
 
    def find_text_key_references(self):
798
 
        """Find the text key references within the repository.
799
 
 
800
 
        :return: A dictionary mapping text keys ((fileid, revision_id) tuples)
801
 
            to whether they were referred to by the inventory of the
802
 
            revision_id that they contain. The inventory texts from all present
803
 
            revision ids are assessed to generate this report.
804
 
        """
805
 
        # XXX: Slow version but correct: rewrite as a series of delta
806
 
        # examinations/direct tree traversal. Note that that will require care
807
 
        # as a common node is reachable both from the inventory that added it,
808
 
        # and others afterwards.
809
 
        revision_keys = self.revisions.keys()
810
 
        result = {}
811
 
        rich_roots = self.supports_rich_root()
812
 
        pb = ui.ui_factory.nested_progress_bar()
813
 
        try:
814
 
            all_revs = self.all_revision_ids()
815
 
            total = len(all_revs)
816
 
            for pos, inv in enumerate(self.iter_inventories(all_revs)):
817
 
                pb.update("Finding text references", pos, total)
818
 
                for _, entry in inv.iter_entries():
819
 
                    if not rich_roots and entry.file_id == inv.root_id:
820
 
                        continue
821
 
                    key = (entry.file_id, entry.revision)
822
 
                    result.setdefault(key, False)
823
 
                    if entry.revision == inv.revision_id:
824
 
                        result[key] = True
825
 
            return result
826
 
        finally:
827
 
            pb.finished()
828
 
 
829
 
    def _reconcile_pack(self, collection, packs, extension, revs, pb):
830
 
        packer = GCCHKReconcilePacker(collection, packs, extension)
831
 
        return packer.pack(pb)
832
 
 
833
 
    def _get_source(self, to_format):
834
 
        """Return a source for streaming from this repository."""
835
 
        if isinstance(to_format, remote.RemoteRepositoryFormat):
836
 
            # Can't just check attributes on to_format with the current code,
837
 
            # work around this:
838
 
            to_format._ensure_real()
839
 
            to_format = to_format._custom_format
840
 
        if to_format.__class__ is self._format.__class__:
841
 
            # We must be exactly the same format, otherwise stuff like the chk
842
 
            # page layout might be different
843
 
            return GroupCHKStreamSource(self, to_format)
844
 
        return super(CHKInventoryRepository, self)._get_source(to_format)
845
 
 
846
 
    def suspend_write_group(self):
847
 
        raise errors.UnsuspendableWriteGroup(self)
848
 
 
849
 
    def _resume_write_group(self, tokens):
850
 
        raise errors.UnsuspendableWriteGroup(self)
851
 
 
852
 
 
853
 
class GroupCHKStreamSource(repository.StreamSource):
854
 
    """Used when both the source and target repo are GroupCHK repos."""
855
 
 
856
 
    def __init__(self, from_repository, to_format):
857
 
        """Create a StreamSource streaming from from_repository."""
858
 
        super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
859
 
        self._revision_keys = None
860
 
        self._text_keys = None
861
 
        self._chk_id_roots = None
862
 
        self._chk_p_id_roots = None
863
 
 
864
 
    def _get_filtered_inv_stream(self):
865
 
        """Get a stream of inventory texts.
866
 
 
867
 
        When this function returns, self._chk_id_roots and self._chk_p_id_roots
868
 
        should be populated.
869
 
        """
870
 
        self._chk_id_roots = []
871
 
        self._chk_p_id_roots = []
872
 
        def _filtered_inv_stream():
873
 
            id_roots_set = set()
874
 
            p_id_roots_set = set()
875
 
            source_vf = self.from_repository.inventories
876
 
            stream = source_vf.get_record_stream(self._revision_keys,
877
 
                                                 'groupcompress', True)
878
 
            for record in stream:
879
 
                bytes = record.get_bytes_as('fulltext')
880
 
                chk_inv = inventory.CHKInventory.deserialise(None, bytes,
881
 
                                                             record.key)
882
 
                key = chk_inv.id_to_entry.key()
883
 
                if key not in id_roots_set:
884
 
                    self._chk_id_roots.append(key)
885
 
                    id_roots_set.add(key)
886
 
                p_id_map = chk_inv.parent_id_basename_to_file_id
887
 
                if p_id_map is None:
888
 
                    raise AssertionError('Parent id -> file_id map not set')
889
 
                key = p_id_map.key()
890
 
                if key not in p_id_roots_set:
891
 
                    p_id_roots_set.add(key)
892
 
                    self._chk_p_id_roots.append(key)
893
 
                yield record
894
 
            # We have finished processing all of the inventory records, we
895
 
            # don't need these sets anymore
896
 
            id_roots_set.clear()
897
 
            p_id_roots_set.clear()
898
 
        return ('inventories', _filtered_inv_stream())
899
 
 
900
 
    def _get_filtered_chk_streams(self, excluded_keys):
901
 
        self._text_keys = set()
902
 
        excluded_keys.discard(_mod_revision.NULL_REVISION)
903
 
        if not excluded_keys:
904
 
            uninteresting_root_keys = set()
905
 
            uninteresting_pid_root_keys = set()
906
 
        else:
907
 
            uninteresting_root_keys = set()
908
 
            uninteresting_pid_root_keys = set()
909
 
            for inv in self.from_repository.iter_inventories(excluded_keys):
910
 
                uninteresting_root_keys.add(inv.id_to_entry.key())
911
 
                uninteresting_pid_root_keys.add(
912
 
                    inv.parent_id_basename_to_file_id.key())
913
 
        bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
914
 
        chk_bytes = self.from_repository.chk_bytes
915
 
        def _filter_id_to_entry():
916
 
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
917
 
                        self._chk_id_roots, uninteresting_root_keys):
918
 
                for name, bytes in items:
919
 
                    # Note: we don't care about name_utf8, because we are always
920
 
                    # rich-root = True
921
 
                    _, file_id, revision_id = bytes_to_info(bytes)
922
 
                    self._text_keys.add((file_id, revision_id))
923
 
                if record is not None:
924
 
                    yield record
925
 
        yield 'chk_bytes', _filter_id_to_entry()
926
 
        def _get_parent_id_basename_to_file_id_pages():
927
 
            for record, items in chk_map.iter_interesting_nodes(chk_bytes,
928
 
                        self._chk_p_id_roots, uninteresting_pid_root_keys):
929
 
                if record is not None:
930
 
                    yield record
931
 
        yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
932
 
 
933
 
    def _get_text_stream(self):
934
 
        # Note: We know we don't have to handle adding root keys, because both
935
 
        # the source and target are GCCHK, and those always support rich-roots
936
 
        # We may want to request as 'unordered', in case the source has done a
937
 
        # 'split' packing
938
 
        return ('texts', self.from_repository.texts.get_record_stream(
939
 
                            self._text_keys, 'groupcompress', False))
940
 
 
941
 
    def get_stream(self, search):
942
 
        revision_ids = search.get_keys()
943
 
        for stream_info in self._fetch_revision_texts(revision_ids):
944
 
            yield stream_info
945
 
        self._revision_keys = [(rev_id,) for rev_id in revision_ids]
946
 
        yield self._get_filtered_inv_stream()
947
 
        # The keys to exclude are part of the search recipe
948
 
        _, _, exclude_keys, _ = search.get_recipe()
949
 
        for stream_info in self._get_filtered_chk_streams(exclude_keys):
950
 
            yield stream_info
951
 
        yield self._get_text_stream()
952
 
 
953
 
 
954
 
class RepositoryFormatCHK1(RepositoryFormatPack):
955
 
    """A hashed CHK+group compress pack repository."""
956
 
 
957
 
    repository_class = CHKInventoryRepository
958
 
    supports_chks = True
959
 
    # For right now, setting this to True gives us InterModel1And2 rather
960
 
    # than InterDifferingSerializer
961
 
    _commit_builder_class = PackRootCommitBuilder
962
 
    rich_root_data = True
963
 
    _serializer = chk_serializer.chk_serializer_255_bigpage
964
 
    _commit_inv_deltas = True
965
 
    # What index classes to use
966
 
    index_builder_class = BTreeBuilder
967
 
    index_class = BTreeGraphIndex
968
 
    # Note: We cannot unpack a delta that references a text we haven't
969
 
    # seen yet. There are 2 options, work in fulltexts, or require
970
 
    # topological sorting. Using fulltexts is more optimal for local
971
 
    # operations, because the source can be smart about extracting
972
 
    # multiple in-a-row (and sharing strings). Topological is better
973
 
    # for remote, because we access less data.
974
 
    _fetch_order = 'unordered'
975
 
    _fetch_uses_deltas = False # essentially ignored by the groupcompress code.
976
 
    fast_deltas = True
977
 
 
978
 
    def _get_matching_bzrdir(self):
979
 
        return bzrdir.format_registry.make_bzrdir('development6-rich-root')
980
 
 
981
 
    def _ignore_setting_bzrdir(self, format):
982
 
        pass
983
 
 
984
 
    _matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
985
 
 
986
 
    def get_format_string(self):
987
 
        """See RepositoryFormat.get_format_string()."""
988
 
        return ('Bazaar development format - group compression and chk inventory'
989
 
                ' (needs bzr.dev from 1.14)\n')
990
 
 
991
 
    def get_format_description(self):
992
 
        """See RepositoryFormat.get_format_description()."""
993
 
        return ("Development repository format - rich roots, group compression"
994
 
            " and chk inventories")
995
 
 
996
 
    def check_conversion_target(self, target_format):
997
 
        if not target_format.rich_root_data:
998
 
            raise errors.BadConversionTarget(
999
 
                'Does not support rich root data.', target_format)
1000
 
        if not getattr(target_format, 'supports_tree_reference', False):
1001
 
            raise errors.BadConversionTarget(
1002
 
                'Does not support nested trees', target_format)
1003
 
 
1004