1
# Copyright (C) 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Repository formats using CHK inventories and groupcompress compression."""
34
revision as _mod_revision,
38
from bzrlib.btree_index import (
42
from bzrlib.index import GraphIndex, GraphIndexBuilder
43
from bzrlib.groupcompress import (
45
GroupCompressVersionedFiles,
47
from bzrlib.repofmt.pack_repo import (
51
PackRootCommitBuilder,
52
RepositoryPackCollection,
58
class GCPack(NewPack):
60
def __init__(self, pack_collection, upload_suffix='', file_mode=None):
61
"""Create a NewPack instance.
63
:param pack_collection: A PackCollection into which this is being
65
:param upload_suffix: An optional suffix to be given to any temporary
66
files created during the pack creation. e.g '.autopack'
67
:param file_mode: An optional file mode to create the new files with.
69
# replaced from NewPack to:
70
# - change inventory reference list length to 1
71
# - change texts reference lists to 1
72
# TODO: patch this to be parameterised
74
# The relative locations of the packs are constrained, but all are
75
# passed in because the caller has them, so as to avoid object churn.
76
index_builder_class = pack_collection._index_builder_class
78
if pack_collection.chk_index is not None:
79
chk_index = index_builder_class(reference_lists=0)
83
# Revisions: parents list, no text compression.
84
index_builder_class(reference_lists=1),
85
# Inventory: We want to map compression only, but currently the
86
# knit code hasn't been updated enough to understand that, so we
87
# have a regular 2-list index giving parents and compression
89
index_builder_class(reference_lists=1),
90
# Texts: compression and per file graph, for all fileids - so two
91
# reference lists and two elements in the key tuple.
92
index_builder_class(reference_lists=1, key_elements=2),
93
# Signatures: Just blobs to store, no compression, no parents
95
index_builder_class(reference_lists=0),
96
# CHK based storage - just blobs, no compression or parents.
99
self._pack_collection = pack_collection
100
# When we make readonly indices, we need this.
101
self.index_class = pack_collection._index_class
102
# where should the new pack be opened
103
self.upload_transport = pack_collection._upload_transport
104
# where are indices written out to
105
self.index_transport = pack_collection._index_transport
106
# where is the pack renamed to when it is finished?
107
self.pack_transport = pack_collection._pack_transport
108
# What file mode to upload the pack and indices with.
109
self._file_mode = file_mode
110
# tracks the content written to the .pack file.
111
self._hash = osutils.md5()
112
# a four-tuple with the length in bytes of the indices, once the pack
113
# is finalised. (rev, inv, text, sigs)
114
self.index_sizes = None
115
# How much data to cache when writing packs. Note that this is not
116
# synchronised with reads, because it's not in the transport layer, so
117
# is not safe unless the client knows it won't be reading from the pack
119
self._cache_limit = 0
120
# the temporary pack file name.
121
self.random_name = osutils.rand_chars(20) + upload_suffix
122
# when was this pack started ?
123
self.start_time = time.time()
124
# open an output stream for the data added to the pack.
125
self.write_stream = self.upload_transport.open_write_stream(
126
self.random_name, mode=self._file_mode)
127
if 'pack' in debug.debug_flags:
128
trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
129
time.ctime(), self.upload_transport.base, self.random_name,
130
time.time() - self.start_time)
131
# A list of byte sequences to be written to the new pack, and the
132
# aggregate size of them. Stored as a list rather than separate
133
# variables so that the _write_data closure below can update them.
134
self._buffer = [[], 0]
135
# create a callable for adding data
137
# robertc says- this is a closure rather than a method on the object
138
# so that the variables are locals, and faster than accessing object
140
def _write_data(bytes, flush=False, _buffer=self._buffer,
141
_write=self.write_stream.write, _update=self._hash.update):
142
_buffer[0].append(bytes)
143
_buffer[1] += len(bytes)
145
if _buffer[1] > self._cache_limit or flush:
146
bytes = ''.join(_buffer[0])
150
# expose this on self, for the occasion when clients want to add data.
151
self._write_data = _write_data
152
# a pack writer object to serialise pack records.
153
self._writer = pack.ContainerWriter(self._write_data)
155
# what state is the pack in? (open, finished, aborted)
158
def _check_references(self):
159
"""Make sure our external references are present.
161
Packs are allowed to have deltas whose base is not in the pack, but it
162
must be present somewhere in this collection. It is not allowed to
163
have deltas based on a fallback repository.
164
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
166
# Groupcompress packs don't have any external references
169
class GCCHKPacker(Packer):
170
"""This class understand what it takes to collect a GCCHK repo."""
172
def __init__(self, pack_collection, packs, suffix, revision_ids=None,
174
super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
175
revision_ids=revision_ids,
176
reload_func=reload_func)
177
self._pack_collection = pack_collection
178
# ATM, We only support this for GCCHK repositories
179
if pack_collection.chk_index is None:
180
raise AssertionError('pack_collection.chk_index should not be None')
181
self._gather_text_refs = False
182
self._chk_id_roots = []
183
self._chk_p_id_roots = []
184
self._text_refs = None
185
# set by .pack() if self.revision_ids is not None
186
self.revision_keys = None
188
def _get_progress_stream(self, source_vf, keys, message, pb):
190
substream = source_vf.get_record_stream(keys, 'groupcompress', True)
191
for idx, record in enumerate(substream):
193
pb.update(message, idx + 1, len(keys))
197
def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
198
"""Filter the texts of inventories, to find the chk pages."""
199
total_keys = len(keys)
200
def _filtered_inv_stream():
202
p_id_roots_set = set()
203
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
204
for idx, record in enumerate(stream):
205
bytes = record.get_bytes_as('fulltext')
206
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
209
pb.update('inv', idx, total_keys)
210
key = chk_inv.id_to_entry.key()
211
if key not in id_roots_set:
212
self._chk_id_roots.append(key)
213
id_roots_set.add(key)
214
p_id_map = chk_inv.parent_id_basename_to_file_id
216
raise AssertionError('Parent id -> file_id map not set')
218
if key not in p_id_roots_set:
219
p_id_roots_set.add(key)
220
self._chk_p_id_roots.append(key)
222
# We have finished processing all of the inventory records, we
223
# don't need these sets anymore
225
p_id_roots_set.clear()
226
return _filtered_inv_stream()
228
def _get_chk_streams(self, source_vf, keys, pb=None):
229
# We want to stream the keys from 'id_roots', and things they
230
# reference, and then stream things from p_id_roots and things they
231
# reference, and then any remaining keys that we didn't get to.
233
# We also group referenced texts together, so if one root references a
234
# text with prefix 'a', and another root references a node with prefix
235
# 'a', we want to yield those nodes before we yield the nodes for 'b'
236
# This keeps 'similar' nodes together.
238
# Note: We probably actually want multiple streams here, to help the
239
# client understand that the different levels won't compress well
240
# against each other.
241
# Test the difference between using one Group per level, and
242
# using 1 Group per prefix. (so '' (root) would get a group, then
243
# all the references to search-key 'a' would get a group, etc.)
244
total_keys = len(keys)
245
remaining_keys = set(keys)
247
if self._gather_text_refs:
248
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
249
self._text_refs = set()
250
def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
253
keys_by_search_prefix = {}
254
remaining_keys.difference_update(cur_keys)
256
def handle_internal_node(node):
257
for prefix, value in node._items.iteritems():
258
# We don't want to request the same key twice, and we
259
# want to order it by the first time it is seen.
260
# Even further, we don't want to request a key which is
261
# not in this group of pack files (it should be in the
262
# repo, but it doesn't have to be in the group being
264
# TODO: consider how to treat externally referenced chk
265
# pages as 'external_references' so that we
266
# always fill them in for stacked branches
267
if value not in next_keys and value in remaining_keys:
268
keys_by_search_prefix.setdefault(prefix,
271
def handle_leaf_node(node):
272
# Store is None, because we know we have a LeafNode, and we
273
# just want its entries
274
for file_id, bytes in node.iteritems(None):
275
name_utf8, file_id, revision_id = bytes_to_info(bytes)
276
self._text_refs.add((file_id, revision_id))
278
stream = source_vf.get_record_stream(cur_keys,
279
'as-requested', True)
280
for record in stream:
281
bytes = record.get_bytes_as('fulltext')
282
# We don't care about search_key_func for this code,
283
# because we only care about external references.
284
node = chk_map._deserialise(bytes, record.key,
285
search_key_func=None)
286
common_base = node._search_prefix
287
if isinstance(node, chk_map.InternalNode):
288
handle_internal_node(node)
289
elif parse_leaf_nodes:
290
handle_leaf_node(node)
293
pb.update('chk node', counter[0], total_keys)
296
# Double check that we won't be emitting any keys twice
297
# If we get rid of the pre-calculation of all keys, we could
298
# turn this around and do
299
# next_keys.difference_update(seen_keys)
300
# However, we also may have references to chk pages in another
301
# pack file during autopack. We filter earlier, so we should no
302
# longer need to do this
303
# next_keys = next_keys.intersection(remaining_keys)
305
for prefix in sorted(keys_by_search_prefix):
306
cur_keys.extend(keys_by_search_prefix.pop(prefix))
307
for stream in _get_referenced_stream(self._chk_id_roots,
308
self._gather_text_refs):
310
del self._chk_id_roots
311
# while it isn't really possible for chk_id_roots to not be in the
312
# local group of packs, it is possible that the tree shape has not
313
# changed recently, so we need to filter _chk_p_id_roots by the
315
chk_p_id_roots = [key for key in self._chk_p_id_roots
316
if key in remaining_keys]
317
del self._chk_p_id_roots
318
for stream in _get_referenced_stream(chk_p_id_roots, False):
321
trace.mutter('There were %d keys in the chk index, %d of which'
322
' were not referenced', total_keys,
324
if self.revision_ids is None:
325
stream = source_vf.get_record_stream(remaining_keys,
329
def _build_vf(self, index_name, parents, delta, for_write=False):
330
"""Build a VersionedFiles instance on top of this group of packs."""
331
index_name = index_name + '_index'
333
access = knit._DirectPackAccess(index_to_pack)
336
if self.new_pack is None:
337
raise AssertionError('No new pack has been set')
338
index = getattr(self.new_pack, index_name)
339
index_to_pack[index] = self.new_pack.access_tuple()
340
index.set_optimize(for_size=True)
341
access.set_writer(self.new_pack._writer, index,
342
self.new_pack.access_tuple())
343
add_callback = index.add_nodes
346
for pack in self.packs:
347
sub_index = getattr(pack, index_name)
348
index_to_pack[sub_index] = pack.access_tuple()
349
indices.append(sub_index)
350
index = _mod_index.CombinedGraphIndex(indices)
352
vf = GroupCompressVersionedFiles(
354
add_callback=add_callback,
356
is_locked=self._pack_collection.repo.is_locked),
361
def _build_vfs(self, index_name, parents, delta):
362
"""Build the source and target VersionedFiles."""
363
source_vf = self._build_vf(index_name, parents,
364
delta, for_write=False)
365
target_vf = self._build_vf(index_name, parents,
366
delta, for_write=True)
367
return source_vf, target_vf
369
def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
371
trace.mutter('repacking %d %s', len(keys), message)
372
self.pb.update('repacking %s' % (message,), pb_offset)
373
child_pb = ui.ui_factory.nested_progress_bar()
375
stream = vf_to_stream(source_vf, keys, message, child_pb)
376
for _ in target_vf._insert_record_stream(stream,
383
def _copy_revision_texts(self):
384
source_vf, target_vf = self._build_vfs('revision', True, False)
385
if not self.revision_keys:
386
# We are doing a full fetch, aka 'pack'
387
self.revision_keys = source_vf.keys()
388
self._copy_stream(source_vf, target_vf, self.revision_keys,
389
'revisions', self._get_progress_stream, 1)
391
def _copy_inventory_texts(self):
392
source_vf, target_vf = self._build_vfs('inventory', True, True)
393
self._copy_stream(source_vf, target_vf, self.revision_keys,
394
'inventories', self._get_filtered_inv_stream, 2)
396
def _copy_chk_texts(self):
397
source_vf, target_vf = self._build_vfs('chk', False, False)
398
# TODO: This is technically spurious... if it is a performance issue,
400
total_keys = source_vf.keys()
401
trace.mutter('repacking chk: %d id_to_entry roots,'
402
' %d p_id_map roots, %d total keys',
403
len(self._chk_id_roots), len(self._chk_p_id_roots),
405
self.pb.update('repacking chk', 3)
406
child_pb = ui.ui_factory.nested_progress_bar()
408
for stream in self._get_chk_streams(source_vf, total_keys,
410
for _ in target_vf._insert_record_stream(stream,
417
def _copy_text_texts(self):
418
source_vf, target_vf = self._build_vfs('text', True, True)
419
# XXX: We don't walk the chk map to determine referenced (file_id,
420
# revision_id) keys. We don't do it yet because you really need
421
# to filter out the ones that are present in the parents of the
422
# rev just before the ones you are copying, otherwise the filter
423
# is grabbing too many keys...
424
text_keys = source_vf.keys()
425
self._copy_stream(source_vf, target_vf, text_keys,
426
'text', self._get_progress_stream, 4)
428
def _copy_signature_texts(self):
429
source_vf, target_vf = self._build_vfs('signature', False, False)
430
signature_keys = source_vf.keys()
431
signature_keys.intersection(self.revision_keys)
432
self._copy_stream(source_vf, target_vf, signature_keys,
433
'signatures', self._get_progress_stream, 5)
435
def _create_pack_from_packs(self):
436
self.pb.update('repacking', 0, 7)
437
self.new_pack = self.open_pack()
438
# Is this necessary for GC ?
439
self.new_pack.set_write_cache_size(1024*1024)
440
self._copy_revision_texts()
441
self._copy_inventory_texts()
442
self._copy_chk_texts()
443
self._copy_text_texts()
444
self._copy_signature_texts()
445
self.new_pack._check_references()
446
if not self._use_pack(self.new_pack):
447
self.new_pack.abort()
449
self.pb.update('finishing repack', 6, 7)
450
self.new_pack.finish()
451
self._pack_collection.allocate(self.new_pack)
455
class GCCHKReconcilePacker(GCCHKPacker):
456
"""A packer which regenerates indices etc as it copies.
458
This is used by ``bzr reconcile`` to cause parent text pointers to be
462
def __init__(self, *args, **kwargs):
463
super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
464
self._data_changed = False
465
self._gather_text_refs = True
467
def _copy_inventory_texts(self):
468
source_vf, target_vf = self._build_vfs('inventory', True, True)
469
self._copy_stream(source_vf, target_vf, self.revision_keys,
470
'inventories', self._get_filtered_inv_stream, 2)
471
if source_vf.keys() != self.revision_keys:
472
self._data_changed = True
474
def _copy_text_texts(self):
475
"""generate what texts we should have and then copy."""
476
source_vf, target_vf = self._build_vfs('text', True, True)
477
trace.mutter('repacking %d texts', len(self._text_refs))
478
self.pb.update("repacking texts", 4)
479
# we have three major tasks here:
480
# 1) generate the ideal index
481
repo = self._pack_collection.repo
482
# We want the one we just wrote, so base it on self.new_pack
483
revision_vf = self._build_vf('revision', True, False, for_write=True)
484
ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
485
# Strip keys back into revision_ids.
486
ancestors = dict((k[0], tuple([p[0] for p in parents]))
487
for k, parents in ancestor_keys.iteritems())
489
# TODO: _generate_text_key_index should be much cheaper to generate from
490
# a chk repository, rather than the current implementation
491
ideal_index = repo._generate_text_key_index(None, ancestors)
492
file_id_parent_map = source_vf.get_parent_map(self._text_refs)
493
# 2) generate a keys list that contains all the entries that can
494
# be used as-is, with corrected parents.
496
new_parent_keys = {} # (key, parent_keys)
498
NULL_REVISION = _mod_revision.NULL_REVISION
499
for key in self._text_refs:
505
ideal_parents = tuple(ideal_index[key])
507
discarded_keys.append(key)
508
self._data_changed = True
510
if ideal_parents == (NULL_REVISION,):
512
source_parents = file_id_parent_map[key]
513
if ideal_parents == source_parents:
517
# We need to change the parent graph, but we don't need to
518
# re-insert the text (since we don't pun the compression
519
# parent with the parents list)
520
self._data_changed = True
521
new_parent_keys[key] = ideal_parents
522
# we're finished with some data.
524
del file_id_parent_map
525
# 3) bulk copy the data, updating records than need it
526
def _update_parents_for_texts():
527
stream = source_vf.get_record_stream(self._text_refs,
528
'groupcompress', False)
529
for record in stream:
530
if record.key in new_parent_keys:
531
record.parents = new_parent_keys[record.key]
533
target_vf.insert_record_stream(_update_parents_for_texts())
535
def _use_pack(self, new_pack):
536
"""Override _use_pack to check for reconcile having changed content."""
537
return new_pack.data_inserted() and self._data_changed
540
class GCRepositoryPackCollection(RepositoryPackCollection):
542
pack_factory = GCPack
544
def _already_packed(self):
545
"""Is the collection already packed?"""
546
# Always repack GC repositories for now
549
def _execute_pack_operations(self, pack_operations,
550
_packer_class=GCCHKPacker,
552
"""Execute a series of pack operations.
554
:param pack_operations: A list of [revision_count, packs_to_combine].
555
:param _packer_class: The class of packer to use (default: Packer).
558
# XXX: Copied across from RepositoryPackCollection simply because we
559
# want to override the _packer_class ... :(
560
for revision_count, packs in pack_operations:
561
# we may have no-ops from the setup logic
564
packer = GCCHKPacker(self, packs, '.autopack',
565
reload_func=reload_func)
568
except errors.RetryWithNewPacks:
569
# An exception is propagating out of this context, make sure
570
# this packer has cleaned up. Packer() doesn't set its new_pack
571
# state into the RepositoryPackCollection object, so we only
572
# have access to it directly here.
573
if packer.new_pack is not None:
574
packer.new_pack.abort()
577
self._remove_pack_from_memory(pack)
578
# record the newly available packs and stop advertising the old
580
self._save_pack_names(clear_obsolete_packs=True)
581
# Move the old packs out of the way now they are no longer referenced.
582
for revision_count, packs in pack_operations:
583
self._obsolete_packs(packs)
586
class CHKInventoryRepository(KnitPackRepository):
587
"""subclass of KnitPackRepository that uses CHK based inventories."""
589
def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
591
"""Overridden to change pack collection class."""
592
KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
593
_commit_builder_class, _serializer)
594
# and now replace everything it did :)
595
index_transport = self._transport.clone('indices')
596
self._pack_collection = GCRepositoryPackCollection(self,
597
self._transport, index_transport,
598
self._transport.clone('upload'),
599
self._transport.clone('packs'),
600
_format.index_builder_class,
602
use_chk_index=self._format.supports_chks,
604
self.inventories = GroupCompressVersionedFiles(
605
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
606
add_callback=self._pack_collection.inventory_index.add_callback,
607
parents=True, is_locked=self.is_locked),
608
access=self._pack_collection.inventory_index.data_access)
609
self.revisions = GroupCompressVersionedFiles(
610
_GCGraphIndex(self._pack_collection.revision_index.combined_index,
611
add_callback=self._pack_collection.revision_index.add_callback,
612
parents=True, is_locked=self.is_locked),
613
access=self._pack_collection.revision_index.data_access,
615
self.signatures = GroupCompressVersionedFiles(
616
_GCGraphIndex(self._pack_collection.signature_index.combined_index,
617
add_callback=self._pack_collection.signature_index.add_callback,
618
parents=False, is_locked=self.is_locked),
619
access=self._pack_collection.signature_index.data_access,
621
self.texts = GroupCompressVersionedFiles(
622
_GCGraphIndex(self._pack_collection.text_index.combined_index,
623
add_callback=self._pack_collection.text_index.add_callback,
624
parents=True, is_locked=self.is_locked),
625
access=self._pack_collection.text_index.data_access)
626
# No parents, individual CHK pages don't have specific ancestry
627
self.chk_bytes = GroupCompressVersionedFiles(
628
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
629
add_callback=self._pack_collection.chk_index.add_callback,
630
parents=False, is_locked=self.is_locked),
631
access=self._pack_collection.chk_index.data_access)
632
# True when the repository object is 'write locked' (as opposed to the
633
# physical lock only taken out around changes to the pack-names list.)
634
# Another way to represent this would be a decorator around the control
635
# files object that presents logical locks as physical ones - if this
636
# gets ugly consider that alternative design. RBC 20071011
637
self._write_lock_count = 0
638
self._transaction = None
640
self._reconcile_does_inventory_gc = True
641
self._reconcile_fixes_text_parents = True
642
self._reconcile_backsup_inventory = False
644
def _add_inventory_checked(self, revision_id, inv, parents):
645
"""Add inv to the repository after checking the inputs.
647
This function can be overridden to allow different inventory styles.
649
:seealso: add_inventory, for the contract.
652
serializer = self._format._serializer
653
result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
654
maximum_size=serializer.maximum_size,
655
search_key_name=serializer.search_key_name)
656
inv_lines = result.to_lines()
657
return self._inventory_add_lines(revision_id, parents,
658
inv_lines, check_content=False)
660
def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
661
parents, basis_inv=None, propagate_caches=False):
662
"""Add a new inventory expressed as a delta against another revision.
664
:param basis_revision_id: The inventory id the delta was created
666
:param delta: The inventory delta (see Inventory.apply_delta for
668
:param new_revision_id: The revision id that the inventory is being
670
:param parents: The revision ids of the parents that revision_id is
671
known to have and are in the repository already. These are supplied
672
for repositories that depend on the inventory graph for revision
673
graph access, as well as for those that pun ancestry with delta
675
:param basis_inv: The basis inventory if it is already known,
677
:param propagate_caches: If True, the caches for this inventory are
678
copied to and updated for the result if possible.
680
:returns: (validator, new_inv)
681
The validator(which is a sha1 digest, though what is sha'd is
682
repository format specific) of the serialized inventory, and the
685
if basis_revision_id == _mod_revision.NULL_REVISION:
686
return KnitPackRepository.add_inventory_by_delta(self,
687
basis_revision_id, delta, new_revision_id, parents)
688
if not self.is_in_write_group():
689
raise AssertionError("%r not in write group" % (self,))
690
_mod_revision.check_not_reserved_id(new_revision_id)
691
basis_tree = self.revision_tree(basis_revision_id)
692
basis_tree.lock_read()
694
if basis_inv is None:
695
basis_inv = basis_tree.inventory
696
result = basis_inv.create_by_apply_delta(delta, new_revision_id,
697
propagate_caches=propagate_caches)
698
inv_lines = result.to_lines()
699
return self._inventory_add_lines(new_revision_id, parents,
700
inv_lines, check_content=False), result
704
def _iter_inventories(self, revision_ids):
705
"""Iterate over many inventory objects."""
706
keys = [(revision_id,) for revision_id in revision_ids]
707
stream = self.inventories.get_record_stream(keys, 'unordered', True)
709
for record in stream:
710
if record.storage_kind != 'absent':
711
texts[record.key] = record.get_bytes_as('fulltext')
713
raise errors.NoSuchRevision(self, record.key)
715
yield inventory.CHKInventory.deserialise(self.chk_bytes, texts[key], key)
717
def _iter_inventory_xmls(self, revision_ids):
718
# Without a native 'xml' inventory, this method doesn't make sense, so
719
# make it raise to trap naughty direct users.
720
raise NotImplementedError(self._iter_inventory_xmls)
722
def _find_revision_outside_set(self, revision_ids):
723
revision_set = frozenset(revision_ids)
724
for revid in revision_ids:
725
parent_ids = self.get_parent_map([revid]).get(revid, ())
726
for parent in parent_ids:
727
if parent in revision_set:
728
# Parent is not outside the set
730
if parent not in self.get_parent_map([parent]):
734
return _mod_revision.NULL_REVISION
736
def _find_file_keys_to_fetch(self, revision_ids, pb):
737
rich_root = self.supports_rich_root()
738
revision_outside_set = self._find_revision_outside_set(revision_ids)
739
if revision_outside_set == _mod_revision.NULL_REVISION:
740
uninteresting_root_keys = set()
742
uninteresting_inv = self.get_inventory(revision_outside_set)
743
uninteresting_root_keys = set([uninteresting_inv.id_to_entry.key()])
744
interesting_root_keys = set()
745
for idx, inv in enumerate(self.iter_inventories(revision_ids)):
746
interesting_root_keys.add(inv.id_to_entry.key())
747
revision_ids = frozenset(revision_ids)
748
file_id_revisions = {}
749
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
750
for record, items in chk_map.iter_interesting_nodes(self.chk_bytes,
751
interesting_root_keys, uninteresting_root_keys,
753
# This is cheating a bit to use the last grabbed 'inv', but it
755
for name, bytes in items:
756
(name_utf8, file_id, revision_id) = bytes_to_info(bytes)
757
if not rich_root and name_utf8 == '':
759
if revision_id in revision_ids:
760
# Would we rather build this up into file_id => revision
763
file_id_revisions[file_id].add(revision_id)
765
file_id_revisions[file_id] = set([revision_id])
766
for file_id, revisions in file_id_revisions.iteritems():
767
yield ('file', file_id, revisions)
769
def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
770
"""Find the file ids and versions affected by revisions.
772
:param revisions: an iterable containing revision ids.
773
:param _inv_weave: The inventory weave from this repository or None.
774
If None, the inventory weave will be opened automatically.
775
:return: a dictionary mapping altered file-ids to an iterable of
776
revision_ids. Each altered file-ids has the exact revision_ids that
777
altered it listed explicitly.
779
rich_roots = self.supports_rich_root()
781
pb = ui.ui_factory.nested_progress_bar()
783
total = len(revision_ids)
784
for pos, inv in enumerate(self.iter_inventories(revision_ids)):
785
pb.update("Finding text references", pos, total)
786
for entry in inv.iter_just_entries():
787
if entry.revision != inv.revision_id:
789
if not rich_roots and entry.file_id == inv.root_id:
791
alterations = result.setdefault(entry.file_id, set([]))
792
alterations.add(entry.revision)
797
def find_text_key_references(self):
798
"""Find the text key references within the repository.
800
:return: A dictionary mapping text keys ((fileid, revision_id) tuples)
801
to whether they were referred to by the inventory of the
802
revision_id that they contain. The inventory texts from all present
803
revision ids are assessed to generate this report.
805
# XXX: Slow version but correct: rewrite as a series of delta
806
# examinations/direct tree traversal. Note that that will require care
807
# as a common node is reachable both from the inventory that added it,
808
# and others afterwards.
809
revision_keys = self.revisions.keys()
811
rich_roots = self.supports_rich_root()
812
pb = ui.ui_factory.nested_progress_bar()
814
all_revs = self.all_revision_ids()
815
total = len(all_revs)
816
for pos, inv in enumerate(self.iter_inventories(all_revs)):
817
pb.update("Finding text references", pos, total)
818
for _, entry in inv.iter_entries():
819
if not rich_roots and entry.file_id == inv.root_id:
821
key = (entry.file_id, entry.revision)
822
result.setdefault(key, False)
823
if entry.revision == inv.revision_id:
829
def _reconcile_pack(self, collection, packs, extension, revs, pb):
830
packer = GCCHKReconcilePacker(collection, packs, extension)
831
return packer.pack(pb)
833
def _get_source(self, to_format):
834
"""Return a source for streaming from this repository."""
835
if isinstance(to_format, remote.RemoteRepositoryFormat):
836
# Can't just check attributes on to_format with the current code,
838
to_format._ensure_real()
839
to_format = to_format._custom_format
840
if to_format.__class__ is self._format.__class__:
841
# We must be exactly the same format, otherwise stuff like the chk
842
# page layout might be different
843
return GroupCHKStreamSource(self, to_format)
844
return super(CHKInventoryRepository, self)._get_source(to_format)
846
def suspend_write_group(self):
847
raise errors.UnsuspendableWriteGroup(self)
849
def _resume_write_group(self, tokens):
850
raise errors.UnsuspendableWriteGroup(self)
853
class GroupCHKStreamSource(repository.StreamSource):
854
"""Used when both the source and target repo are GroupCHK repos."""
856
def __init__(self, from_repository, to_format):
857
"""Create a StreamSource streaming from from_repository."""
858
super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
859
self._revision_keys = None
860
self._text_keys = None
861
self._chk_id_roots = None
862
self._chk_p_id_roots = None
864
def _get_filtered_inv_stream(self):
865
"""Get a stream of inventory texts.
867
When this function returns, self._chk_id_roots and self._chk_p_id_roots
870
self._chk_id_roots = []
871
self._chk_p_id_roots = []
872
def _filtered_inv_stream():
874
p_id_roots_set = set()
875
source_vf = self.from_repository.inventories
876
stream = source_vf.get_record_stream(self._revision_keys,
877
'groupcompress', True)
878
for record in stream:
879
bytes = record.get_bytes_as('fulltext')
880
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
882
key = chk_inv.id_to_entry.key()
883
if key not in id_roots_set:
884
self._chk_id_roots.append(key)
885
id_roots_set.add(key)
886
p_id_map = chk_inv.parent_id_basename_to_file_id
888
raise AssertionError('Parent id -> file_id map not set')
890
if key not in p_id_roots_set:
891
p_id_roots_set.add(key)
892
self._chk_p_id_roots.append(key)
894
# We have finished processing all of the inventory records, we
895
# don't need these sets anymore
897
p_id_roots_set.clear()
898
return ('inventories', _filtered_inv_stream())
900
def _get_filtered_chk_streams(self, excluded_keys):
901
self._text_keys = set()
902
excluded_keys.discard(_mod_revision.NULL_REVISION)
903
if not excluded_keys:
904
uninteresting_root_keys = set()
905
uninteresting_pid_root_keys = set()
907
uninteresting_root_keys = set()
908
uninteresting_pid_root_keys = set()
909
for inv in self.from_repository.iter_inventories(excluded_keys):
910
uninteresting_root_keys.add(inv.id_to_entry.key())
911
uninteresting_pid_root_keys.add(
912
inv.parent_id_basename_to_file_id.key())
913
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
914
chk_bytes = self.from_repository.chk_bytes
915
def _filter_id_to_entry():
916
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
917
self._chk_id_roots, uninteresting_root_keys):
918
for name, bytes in items:
919
# Note: we don't care about name_utf8, because we are always
921
_, file_id, revision_id = bytes_to_info(bytes)
922
self._text_keys.add((file_id, revision_id))
923
if record is not None:
925
yield 'chk_bytes', _filter_id_to_entry()
926
def _get_parent_id_basename_to_file_id_pages():
927
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
928
self._chk_p_id_roots, uninteresting_pid_root_keys):
929
if record is not None:
931
yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
933
def _get_text_stream(self):
934
# Note: We know we don't have to handle adding root keys, because both
935
# the source and target are GCCHK, and those always support rich-roots
936
# We may want to request as 'unordered', in case the source has done a
938
return ('texts', self.from_repository.texts.get_record_stream(
939
self._text_keys, 'groupcompress', False))
941
def get_stream(self, search):
942
revision_ids = search.get_keys()
943
for stream_info in self._fetch_revision_texts(revision_ids):
945
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
946
yield self._get_filtered_inv_stream()
947
# The keys to exclude are part of the search recipe
948
_, _, exclude_keys, _ = search.get_recipe()
949
for stream_info in self._get_filtered_chk_streams(exclude_keys):
951
yield self._get_text_stream()
954
class RepositoryFormatCHK1(RepositoryFormatPack):
955
"""A hashed CHK+group compress pack repository."""
957
repository_class = CHKInventoryRepository
959
# For right now, setting this to True gives us InterModel1And2 rather
960
# than InterDifferingSerializer
961
_commit_builder_class = PackRootCommitBuilder
962
rich_root_data = True
963
_serializer = chk_serializer.chk_serializer_255_bigpage
964
_commit_inv_deltas = True
965
# What index classes to use
966
index_builder_class = BTreeBuilder
967
index_class = BTreeGraphIndex
968
# Note: We cannot unpack a delta that references a text we haven't
969
# seen yet. There are 2 options, work in fulltexts, or require
970
# topological sorting. Using fulltexts is more optimal for local
971
# operations, because the source can be smart about extracting
972
# multiple in-a-row (and sharing strings). Topological is better
973
# for remote, because we access less data.
974
_fetch_order = 'unordered'
975
_fetch_uses_deltas = False # essentially ignored by the groupcompress code.
978
def _get_matching_bzrdir(self):
979
return bzrdir.format_registry.make_bzrdir('development6-rich-root')
981
def _ignore_setting_bzrdir(self, format):
984
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
986
def get_format_string(self):
987
"""See RepositoryFormat.get_format_string()."""
988
return ('Bazaar development format - group compression and chk inventory'
989
' (needs bzr.dev from 1.14)\n')
991
def get_format_description(self):
992
"""See RepositoryFormat.get_format_description()."""
993
return ("Development repository format - rich roots, group compression"
994
" and chk inventories")
996
def check_conversion_target(self, target_format):
997
if not target_format.rich_root_data:
998
raise errors.BadConversionTarget(
999
'Does not support rich root data.', target_format)
1000
if not getattr(target_format, 'supports_tree_reference', False):
1001
raise errors.BadConversionTarget(
1002
'Does not support nested trees', target_format)