1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20
from bzrlib.lazy_import import lazy_import
21
lazy_import(globals(), """
22
from itertools import izip
37
from bzrlib.index import (
39
GraphIndexPrefixAdapter,
41
from bzrlib.knit import (
47
from bzrlib import tsort
54
revision as _mod_revision,
57
from bzrlib.decorators import needs_write_lock
58
from bzrlib.btree_index import (
62
from bzrlib.index import (
66
from bzrlib.repofmt.knitrepo import KnitRepository
67
from bzrlib.repository import (
69
MetaDirRepositoryFormat,
74
from bzrlib.trace import (
80
class PackCommitBuilder(CommitBuilder):
81
"""A subclass of CommitBuilder to add texts with pack semantics.
83
Specifically this uses one knit object rather than one knit object per
84
added text, reducing memory and object pressure.
87
def __init__(self, repository, parents, config, timestamp=None,
88
timezone=None, committer=None, revprops=None,
90
CommitBuilder.__init__(self, repository, parents, config,
91
timestamp=timestamp, timezone=timezone, committer=committer,
92
revprops=revprops, revision_id=revision_id)
93
self._file_graph = graph.Graph(
94
repository._pack_collection.text_index.combined_index)
96
def _heads(self, file_id, revision_ids):
97
keys = [(file_id, revision_id) for revision_id in revision_ids]
98
return set([key[1] for key in self._file_graph.heads(keys)])
101
class PackRootCommitBuilder(RootCommitBuilder):
102
"""A subclass of RootCommitBuilder to add texts with pack semantics.
104
Specifically this uses one knit object rather than one knit object per
105
added text, reducing memory and object pressure.
108
def __init__(self, repository, parents, config, timestamp=None,
109
timezone=None, committer=None, revprops=None,
111
CommitBuilder.__init__(self, repository, parents, config,
112
timestamp=timestamp, timezone=timezone, committer=committer,
113
revprops=revprops, revision_id=revision_id)
114
self._file_graph = graph.Graph(
115
repository._pack_collection.text_index.combined_index)
117
def _heads(self, file_id, revision_ids):
118
keys = [(file_id, revision_id) for revision_id in revision_ids]
119
return set([key[1] for key in self._file_graph.heads(keys)])
123
"""An in memory proxy for a pack and its indices.
125
This is a base class that is not directly used, instead the classes
126
ExistingPack and NewPack are used.
129
# A map of index 'type' to the file extension and position in the
131
index_definitions = {
133
'revision': ('.rix', 0),
134
'inventory': ('.iix', 1),
136
'signature': ('.six', 3),
139
def __init__(self, revision_index, inventory_index, text_index,
140
signature_index, chk_index=None):
141
"""Create a pack instance.
143
:param revision_index: A GraphIndex for determining what revisions are
144
present in the Pack and accessing the locations of their texts.
145
:param inventory_index: A GraphIndex for determining what inventories are
146
present in the Pack and accessing the locations of their
148
:param text_index: A GraphIndex for determining what file texts
149
are present in the pack and accessing the locations of their
150
texts/deltas (via (fileid, revisionid) tuples).
151
:param signature_index: A GraphIndex for determining what signatures are
152
present in the Pack and accessing the locations of their texts.
153
:param chk_index: A GraphIndex for accessing content by CHK, if the
156
self.revision_index = revision_index
157
self.inventory_index = inventory_index
158
self.text_index = text_index
159
self.signature_index = signature_index
160
self.chk_index = chk_index
162
def access_tuple(self):
163
"""Return a tuple (transport, name) for the pack content."""
164
return self.pack_transport, self.file_name()
166
def _check_references(self):
167
"""Make sure our external references are present.
169
Packs are allowed to have deltas whose base is not in the pack, but it
170
must be present somewhere in this collection. It is not allowed to
171
have deltas based on a fallback repository.
172
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
175
for (index_name, external_refs, index) in [
177
self._get_external_refs(self.text_index),
178
self._pack_collection.text_index.combined_index),
180
self._get_external_refs(self.inventory_index),
181
self._pack_collection.inventory_index.combined_index),
183
missing = external_refs.difference(
184
k for (idx, k, v, r) in
185
index.iter_entries(external_refs))
187
missing_items[index_name] = sorted(list(missing))
189
from pprint import pformat
190
raise errors.BzrCheckError(
191
"Newly created pack file %r has delta references to "
192
"items not in its repository:\n%s"
193
% (self, pformat(missing_items)))
196
"""Get the file name for the pack on disk."""
197
return self.name + '.pack'
199
def get_revision_count(self):
200
return self.revision_index.key_count()
202
def index_name(self, index_type, name):
203
"""Get the disk name of an index type for pack name 'name'."""
204
return name + Pack.index_definitions[index_type][0]
206
def index_offset(self, index_type):
207
"""Get the position in a index_size array for a given index type."""
208
return Pack.index_definitions[index_type][1]
210
def inventory_index_name(self, name):
211
"""The inv index is the name + .iix."""
212
return self.index_name('inventory', name)
214
def revision_index_name(self, name):
215
"""The revision index is the name + .rix."""
216
return self.index_name('revision', name)
218
def signature_index_name(self, name):
219
"""The signature index is the name + .six."""
220
return self.index_name('signature', name)
222
def text_index_name(self, name):
223
"""The text index is the name + .tix."""
224
return self.index_name('text', name)
226
def _replace_index_with_readonly(self, index_type):
227
unlimited_cache = False
228
if index_type == 'chk':
229
unlimited_cache = True
230
setattr(self, index_type + '_index',
231
self.index_class(self.index_transport,
232
self.index_name(index_type, self.name),
233
self.index_sizes[self.index_offset(index_type)],
234
unlimited_cache=unlimited_cache))
237
class ExistingPack(Pack):
238
"""An in memory proxy for an existing .pack and its disk indices."""
240
def __init__(self, pack_transport, name, revision_index, inventory_index,
241
text_index, signature_index, chk_index=None):
242
"""Create an ExistingPack object.
244
:param pack_transport: The transport where the pack file resides.
245
:param name: The name of the pack on disk in the pack_transport.
247
Pack.__init__(self, revision_index, inventory_index, text_index,
248
signature_index, chk_index)
250
self.pack_transport = pack_transport
251
if None in (revision_index, inventory_index, text_index,
252
signature_index, name, pack_transport):
253
raise AssertionError()
255
def __eq__(self, other):
256
return self.__dict__ == other.__dict__
258
def __ne__(self, other):
259
return not self.__eq__(other)
262
return "<%s.%s object at 0x%x, %s, %s" % (
263
self.__class__.__module__, self.__class__.__name__, id(self),
264
self.pack_transport, self.name)
267
class ResumedPack(ExistingPack):
269
def __init__(self, name, revision_index, inventory_index, text_index,
270
signature_index, upload_transport, pack_transport, index_transport,
271
pack_collection, chk_index=None):
272
"""Create a ResumedPack object."""
273
ExistingPack.__init__(self, pack_transport, name, revision_index,
274
inventory_index, text_index, signature_index,
276
self.upload_transport = upload_transport
277
self.index_transport = index_transport
278
self.index_sizes = [None, None, None, None]
280
('revision', revision_index),
281
('inventory', inventory_index),
282
('text', text_index),
283
('signature', signature_index),
285
if chk_index is not None:
286
indices.append(('chk', chk_index))
287
self.index_sizes.append(None)
288
for index_type, index in indices:
289
offset = self.index_offset(index_type)
290
self.index_sizes[offset] = index._size
291
self.index_class = pack_collection._index_class
292
self._pack_collection = pack_collection
293
self._state = 'resumed'
294
# XXX: perhaps check that the .pack file exists?
296
def access_tuple(self):
297
if self._state == 'finished':
298
return Pack.access_tuple(self)
299
elif self._state == 'resumed':
300
return self.upload_transport, self.file_name()
302
raise AssertionError(self._state)
305
self.upload_transport.delete(self.file_name())
306
indices = [self.revision_index, self.inventory_index, self.text_index,
307
self.signature_index]
308
if self.chk_index is not None:
309
indices.append(self.chk_index)
310
for index in indices:
311
index._transport.delete(index._name)
314
self._check_references()
315
index_types = ['revision', 'inventory', 'text', 'signature']
316
if self.chk_index is not None:
317
index_types.append('chk')
318
for index_type in index_types:
319
old_name = self.index_name(index_type, self.name)
320
new_name = '../indices/' + old_name
321
self.upload_transport.rename(old_name, new_name)
322
self._replace_index_with_readonly(index_type)
323
new_name = '../packs/' + self.file_name()
324
self.upload_transport.rename(self.file_name(), new_name)
325
self._state = 'finished'
327
def _get_external_refs(self, index):
328
"""Return compression parents for this index that are not present.
330
This returns any compression parents that are referenced by this index,
331
which are not contained *in* this index. They may be present elsewhere.
333
return index.external_references(1)
337
"""An in memory proxy for a pack which is being created."""
339
def __init__(self, pack_collection, upload_suffix='', file_mode=None):
340
"""Create a NewPack instance.
342
:param pack_collection: A PackCollection into which this is being inserted.
343
:param upload_suffix: An optional suffix to be given to any temporary
344
files created during the pack creation. e.g '.autopack'
345
:param file_mode: Unix permissions for newly created file.
347
# The relative locations of the packs are constrained, but all are
348
# passed in because the caller has them, so as to avoid object churn.
349
index_builder_class = pack_collection._index_builder_class
350
if pack_collection.chk_index is not None:
351
chk_index = index_builder_class(reference_lists=0)
355
# Revisions: parents list, no text compression.
356
index_builder_class(reference_lists=1),
357
# Inventory: We want to map compression only, but currently the
358
# knit code hasn't been updated enough to understand that, so we
359
# have a regular 2-list index giving parents and compression
361
index_builder_class(reference_lists=2),
362
# Texts: compression and per file graph, for all fileids - so two
363
# reference lists and two elements in the key tuple.
364
index_builder_class(reference_lists=2, key_elements=2),
365
# Signatures: Just blobs to store, no compression, no parents
367
index_builder_class(reference_lists=0),
368
# CHK based storage - just blobs, no compression or parents.
371
self._pack_collection = pack_collection
372
# When we make readonly indices, we need this.
373
self.index_class = pack_collection._index_class
374
# where should the new pack be opened
375
self.upload_transport = pack_collection._upload_transport
376
# where are indices written out to
377
self.index_transport = pack_collection._index_transport
378
# where is the pack renamed to when it is finished?
379
self.pack_transport = pack_collection._pack_transport
380
# What file mode to upload the pack and indices with.
381
self._file_mode = file_mode
382
# tracks the content written to the .pack file.
383
self._hash = osutils.md5()
384
# a tuple with the length in bytes of the indices, once the pack
385
# is finalised. (rev, inv, text, sigs, chk_if_in_use)
386
self.index_sizes = None
387
# How much data to cache when writing packs. Note that this is not
388
# synchronised with reads, because it's not in the transport layer, so
389
# is not safe unless the client knows it won't be reading from the pack
391
self._cache_limit = 0
392
# the temporary pack file name.
393
self.random_name = osutils.rand_chars(20) + upload_suffix
394
# when was this pack started ?
395
self.start_time = time.time()
396
# open an output stream for the data added to the pack.
397
self.write_stream = self.upload_transport.open_write_stream(
398
self.random_name, mode=self._file_mode)
399
if 'pack' in debug.debug_flags:
400
mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
401
time.ctime(), self.upload_transport.base, self.random_name,
402
time.time() - self.start_time)
403
# A list of byte sequences to be written to the new pack, and the
404
# aggregate size of them. Stored as a list rather than separate
405
# variables so that the _write_data closure below can update them.
406
self._buffer = [[], 0]
407
# create a callable for adding data
409
# robertc says- this is a closure rather than a method on the object
410
# so that the variables are locals, and faster than accessing object
412
def _write_data(bytes, flush=False, _buffer=self._buffer,
413
_write=self.write_stream.write, _update=self._hash.update):
414
_buffer[0].append(bytes)
415
_buffer[1] += len(bytes)
417
if _buffer[1] > self._cache_limit or flush:
418
bytes = ''.join(_buffer[0])
422
# expose this on self, for the occasion when clients want to add data.
423
self._write_data = _write_data
424
# a pack writer object to serialise pack records.
425
self._writer = pack.ContainerWriter(self._write_data)
427
# what state is the pack in? (open, finished, aborted)
429
# no name until we finish writing the content
433
"""Cancel creating this pack."""
434
self._state = 'aborted'
435
self.write_stream.close()
436
# Remove the temporary pack file.
437
self.upload_transport.delete(self.random_name)
438
# The indices have no state on disk.
440
def access_tuple(self):
441
"""Return a tuple (transport, name) for the pack content."""
442
if self._state == 'finished':
443
return Pack.access_tuple(self)
444
elif self._state == 'open':
445
return self.upload_transport, self.random_name
447
raise AssertionError(self._state)
449
def data_inserted(self):
450
"""True if data has been added to this pack."""
451
return bool(self.get_revision_count() or
452
self.inventory_index.key_count() or
453
self.text_index.key_count() or
454
self.signature_index.key_count() or
455
(self.chk_index is not None and self.chk_index.key_count()))
457
def finish_content(self):
458
if self.name is not None:
462
self._write_data('', flush=True)
463
self.name = self._hash.hexdigest()
465
def finish(self, suspend=False):
466
"""Finish the new pack.
469
- finalises the content
470
- assigns a name (the md5 of the content, currently)
471
- writes out the associated indices
472
- renames the pack into place.
473
- stores the index size tuple for the pack in the index_sizes
476
self.finish_content()
478
self._check_references()
480
# XXX: It'd be better to write them all to temporary names, then
481
# rename them all into place, so that the window when only some are
482
# visible is smaller. On the other hand none will be seen until
483
# they're in the names list.
484
self.index_sizes = [None, None, None, None]
485
self._write_index('revision', self.revision_index, 'revision', suspend)
486
self._write_index('inventory', self.inventory_index, 'inventory',
488
self._write_index('text', self.text_index, 'file texts', suspend)
489
self._write_index('signature', self.signature_index,
490
'revision signatures', suspend)
491
if self.chk_index is not None:
492
self.index_sizes.append(None)
493
self._write_index('chk', self.chk_index,
494
'content hash bytes', suspend)
495
self.write_stream.close()
496
# Note that this will clobber an existing pack with the same name,
497
# without checking for hash collisions. While this is undesirable this
498
# is something that can be rectified in a subsequent release. One way
499
# to rectify it may be to leave the pack at the original name, writing
500
# its pack-names entry as something like 'HASH: index-sizes
501
# temporary-name'. Allocate that and check for collisions, if it is
502
# collision free then rename it into place. If clients know this scheme
503
# they can handle missing-file errors by:
504
# - try for HASH.pack
505
# - try for temporary-name
506
# - refresh the pack-list to see if the pack is now absent
507
new_name = self.name + '.pack'
509
new_name = '../packs/' + new_name
510
self.upload_transport.rename(self.random_name, new_name)
511
self._state = 'finished'
512
if 'pack' in debug.debug_flags:
513
# XXX: size might be interesting?
514
mutter('%s: create_pack: pack finished: %s%s->%s t+%6.3fs',
515
time.ctime(), self.upload_transport.base, self.random_name,
516
new_name, time.time() - self.start_time)
519
"""Flush any current data."""
521
bytes = ''.join(self._buffer[0])
522
self.write_stream.write(bytes)
523
self._hash.update(bytes)
524
self._buffer[:] = [[], 0]
526
def _get_external_refs(self, index):
527
return index._external_references()
529
def set_write_cache_size(self, size):
530
self._cache_limit = size
532
def _write_index(self, index_type, index, label, suspend=False):
533
"""Write out an index.
535
:param index_type: The type of index to write - e.g. 'revision'.
536
:param index: The index object to serialise.
537
:param label: What label to give the index e.g. 'revision'.
539
index_name = self.index_name(index_type, self.name)
541
transport = self.upload_transport
543
transport = self.index_transport
544
self.index_sizes[self.index_offset(index_type)] = transport.put_file(
545
index_name, index.finish(), mode=self._file_mode)
546
if 'pack' in debug.debug_flags:
547
# XXX: size might be interesting?
548
mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
549
time.ctime(), label, self.upload_transport.base,
550
self.random_name, time.time() - self.start_time)
551
# Replace the writable index on this object with a readonly,
552
# presently unloaded index. We should alter
553
# the index layer to make its finish() error if add_node is
554
# subsequently used. RBC
555
self._replace_index_with_readonly(index_type)
558
class AggregateIndex(object):
559
"""An aggregated index for the RepositoryPackCollection.
561
AggregateIndex is reponsible for managing the PackAccess object,
562
Index-To-Pack mapping, and all indices list for a specific type of index
563
such as 'revision index'.
565
A CombinedIndex provides an index on a single key space built up
566
from several on-disk indices. The AggregateIndex builds on this
567
to provide a knit access layer, and allows having up to one writable
568
index within the collection.
570
# XXX: Probably 'can be written to' could/should be separated from 'acts
571
# like a knit index' -- mbp 20071024
573
def __init__(self, reload_func=None, flush_func=None):
574
"""Create an AggregateIndex.
576
:param reload_func: A function to call if we find we are missing an
577
index. Should have the form reload_func() => True if the list of
578
active pack files has changed.
580
self._reload_func = reload_func
581
self.index_to_pack = {}
582
self.combined_index = CombinedGraphIndex([], reload_func=reload_func)
583
self.data_access = _DirectPackAccess(self.index_to_pack,
584
reload_func=reload_func,
585
flush_func=flush_func)
586
self.add_callback = None
588
def replace_indices(self, index_to_pack, indices):
589
"""Replace the current mappings with fresh ones.
591
This should probably not be used eventually, rather incremental add and
592
removal of indices. It has been added during refactoring of existing
595
:param index_to_pack: A mapping from index objects to
596
(transport, name) tuples for the pack file data.
597
:param indices: A list of indices.
599
# refresh the revision pack map dict without replacing the instance.
600
self.index_to_pack.clear()
601
self.index_to_pack.update(index_to_pack)
602
# XXX: API break - clearly a 'replace' method would be good?
603
self.combined_index._indices[:] = indices
604
# the current add nodes callback for the current writable index if
606
self.add_callback = None
608
def add_index(self, index, pack):
609
"""Add index to the aggregate, which is an index for Pack pack.
611
Future searches on the aggregate index will seach this new index
612
before all previously inserted indices.
614
:param index: An Index for the pack.
615
:param pack: A Pack instance.
617
# expose it to the index map
618
self.index_to_pack[index] = pack.access_tuple()
619
# put it at the front of the linear index list
620
self.combined_index.insert_index(0, index)
622
def add_writable_index(self, index, pack):
623
"""Add an index which is able to have data added to it.
625
There can be at most one writable index at any time. Any
626
modifications made to the knit are put into this index.
628
:param index: An index from the pack parameter.
629
:param pack: A Pack instance.
631
if self.add_callback is not None:
632
raise AssertionError(
633
"%s already has a writable index through %s" % \
634
(self, self.add_callback))
635
# allow writing: queue writes to a new index
636
self.add_index(index, pack)
637
# Updates the index to packs mapping as a side effect,
638
self.data_access.set_writer(pack._writer, index, pack.access_tuple())
639
self.add_callback = index.add_nodes
642
"""Reset all the aggregate data to nothing."""
643
self.data_access.set_writer(None, None, (None, None))
644
self.index_to_pack.clear()
645
del self.combined_index._indices[:]
646
self.add_callback = None
648
def remove_index(self, index, pack):
649
"""Remove index from the indices used to answer queries.
651
:param index: An index from the pack parameter.
652
:param pack: A Pack instance.
654
del self.index_to_pack[index]
655
self.combined_index._indices.remove(index)
656
if (self.add_callback is not None and
657
getattr(index, 'add_nodes', None) == self.add_callback):
658
self.add_callback = None
659
self.data_access.set_writer(None, None, (None, None))
662
class Packer(object):
663
"""Create a pack from packs."""
665
def __init__(self, pack_collection, packs, suffix, revision_ids=None,
669
:param pack_collection: A RepositoryPackCollection object where the
670
new pack is being written to.
671
:param packs: The packs to combine.
672
:param suffix: The suffix to use on the temporary files for the pack.
673
:param revision_ids: Revision ids to limit the pack to.
674
:param reload_func: A function to call if a pack file/index goes
675
missing. The side effect of calling this function should be to
676
update self.packs. See also AggregateIndex
680
self.revision_ids = revision_ids
681
# The pack object we are creating.
683
self._pack_collection = pack_collection
684
self._reload_func = reload_func
685
# The index layer keys for the revisions being copied. None for 'all
687
self._revision_keys = None
688
# What text keys to copy. None for 'all texts'. This is set by
689
# _copy_inventory_texts
690
self._text_filter = None
693
def _extra_init(self):
694
"""A template hook to allow extending the constructor trivially."""
696
def _pack_map_and_index_list(self, index_attribute):
697
"""Convert a list of packs to an index pack map and index list.
699
:param index_attribute: The attribute that the desired index is found
701
:return: A tuple (map, list) where map contains the dict from
702
index:pack_tuple, and list contains the indices in the preferred
707
for pack_obj in self.packs:
708
index = getattr(pack_obj, index_attribute)
709
indices.append(index)
710
pack_map[index] = pack_obj
711
return pack_map, indices
713
def _index_contents(self, indices, key_filter=None):
714
"""Get an iterable of the index contents from a pack_map.
716
:param indices: The list of indices to query
717
:param key_filter: An optional filter to limit the keys returned.
719
all_index = CombinedGraphIndex(indices)
720
if key_filter is None:
721
return all_index.iter_all_entries()
723
return all_index.iter_entries(key_filter)
725
def pack(self, pb=None):
726
"""Create a new pack by reading data from other packs.
728
This does little more than a bulk copy of data. One key difference
729
is that data with the same item key across multiple packs is elided
730
from the output. The new pack is written into the current pack store
731
along with its indices, and the name added to the pack names. The
732
source packs are not altered and are not required to be in the current
735
:param pb: An optional progress bar to use. A nested bar is created if
737
:return: A Pack object, or None if nothing was copied.
739
# open a pack - using the same name as the last temporary file
740
# - which has already been flushed, so its safe.
741
# XXX: - duplicate code warning with start_write_group; fix before
742
# considering 'done'.
743
if self._pack_collection._new_pack is not None:
744
raise errors.BzrError('call to %s.pack() while another pack is'
746
% (self.__class__.__name__,))
747
if self.revision_ids is not None:
748
if len(self.revision_ids) == 0:
749
# silly fetch request.
752
self.revision_ids = frozenset(self.revision_ids)
753
self.revision_keys = frozenset((revid,) for revid in
756
self.pb = ui.ui_factory.nested_progress_bar()
760
return self._create_pack_from_packs()
766
"""Open a pack for the pack we are creating."""
767
new_pack = self._pack_collection.pack_factory(self._pack_collection,
768
upload_suffix=self.suffix,
769
file_mode=self._pack_collection.repo.bzrdir._get_file_mode())
770
# We know that we will process all nodes in order, and don't need to
771
# query, so don't combine any indices spilled to disk until we are done
772
new_pack.revision_index.set_optimize(combine_backing_indices=False)
773
new_pack.inventory_index.set_optimize(combine_backing_indices=False)
774
new_pack.text_index.set_optimize(combine_backing_indices=False)
775
new_pack.signature_index.set_optimize(combine_backing_indices=False)
778
def _update_pack_order(self, entries, index_to_pack_map):
779
"""Determine how we want our packs to be ordered.
781
This changes the sort order of the self.packs list so that packs unused
782
by 'entries' will be at the end of the list, so that future requests
783
can avoid probing them. Used packs will be at the front of the
784
self.packs list, in the order of their first use in 'entries'.
786
:param entries: A list of (index, ...) tuples
787
:param index_to_pack_map: A mapping from index objects to pack objects.
791
for entry in entries:
793
if index not in seen_indexes:
794
packs.append(index_to_pack_map[index])
795
seen_indexes.add(index)
796
if len(packs) == len(self.packs):
797
if 'pack' in debug.debug_flags:
798
mutter('Not changing pack list, all packs used.')
800
seen_packs = set(packs)
801
for pack in self.packs:
802
if pack not in seen_packs:
805
if 'pack' in debug.debug_flags:
806
old_names = [p.access_tuple()[1] for p in self.packs]
807
new_names = [p.access_tuple()[1] for p in packs]
808
mutter('Reordering packs\nfrom: %s\n to: %s',
809
old_names, new_names)
812
def _copy_revision_texts(self):
813
"""Copy revision data to the new pack."""
815
if self.revision_ids:
816
revision_keys = [(revision_id,) for revision_id in self.revision_ids]
819
# select revision keys
820
revision_index_map, revision_indices = self._pack_map_and_index_list(
822
revision_nodes = self._index_contents(revision_indices, revision_keys)
823
revision_nodes = list(revision_nodes)
824
self._update_pack_order(revision_nodes, revision_index_map)
825
# copy revision keys and adjust values
826
self.pb.update("Copying revision texts", 1)
827
total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
828
list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
829
self.new_pack.revision_index, readv_group_iter, total_items))
830
if 'pack' in debug.debug_flags:
831
mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
832
time.ctime(), self._pack_collection._upload_transport.base,
833
self.new_pack.random_name,
834
self.new_pack.revision_index.key_count(),
835
time.time() - self.new_pack.start_time)
836
self._revision_keys = revision_keys
838
def _copy_inventory_texts(self):
839
"""Copy the inventory texts to the new pack.
841
self._revision_keys is used to determine what inventories to copy.
843
Sets self._text_filter appropriately.
845
# select inventory keys
846
inv_keys = self._revision_keys # currently the same keyspace, and note that
847
# querying for keys here could introduce a bug where an inventory item
848
# is missed, so do not change it to query separately without cross
849
# checking like the text key check below.
850
inventory_index_map, inventory_indices = self._pack_map_and_index_list(
852
inv_nodes = self._index_contents(inventory_indices, inv_keys)
853
# copy inventory keys and adjust values
854
# XXX: Should be a helper function to allow different inv representation
856
self.pb.update("Copying inventory texts", 2)
857
total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
858
# Only grab the output lines if we will be processing them
859
output_lines = bool(self.revision_ids)
860
inv_lines = self._copy_nodes_graph(inventory_index_map,
861
self.new_pack._writer, self.new_pack.inventory_index,
862
readv_group_iter, total_items, output_lines=output_lines)
863
if self.revision_ids:
864
self._process_inventory_lines(inv_lines)
866
# eat the iterator to cause it to execute.
868
self._text_filter = None
869
if 'pack' in debug.debug_flags:
870
mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
871
time.ctime(), self._pack_collection._upload_transport.base,
872
self.new_pack.random_name,
873
self.new_pack.inventory_index.key_count(),
874
time.time() - self.new_pack.start_time)
876
def _copy_text_texts(self):
878
text_index_map, text_nodes = self._get_text_nodes()
879
if self._text_filter is not None:
880
# We could return the keys copied as part of the return value from
881
# _copy_nodes_graph but this doesn't work all that well with the
882
# need to get line output too, so we check separately, and as we're
883
# going to buffer everything anyway, we check beforehand, which
884
# saves reading knit data over the wire when we know there are
886
text_nodes = set(text_nodes)
887
present_text_keys = set(_node[1] for _node in text_nodes)
888
missing_text_keys = set(self._text_filter) - present_text_keys
889
if missing_text_keys:
890
# TODO: raise a specific error that can handle many missing
892
mutter("missing keys during fetch: %r", missing_text_keys)
893
a_missing_key = missing_text_keys.pop()
894
raise errors.RevisionNotPresent(a_missing_key[1],
896
# copy text keys and adjust values
897
self.pb.update("Copying content texts", 3)
898
total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
899
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
900
self.new_pack.text_index, readv_group_iter, total_items))
901
self._log_copied_texts()
903
def _create_pack_from_packs(self):
904
self.pb.update("Opening pack", 0, 5)
905
self.new_pack = self.open_pack()
906
new_pack = self.new_pack
907
# buffer data - we won't be reading-back during the pack creation and
908
# this makes a significant difference on sftp pushes.
909
new_pack.set_write_cache_size(1024*1024)
910
if 'pack' in debug.debug_flags:
911
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
912
for a_pack in self.packs]
913
if self.revision_ids is not None:
914
rev_count = len(self.revision_ids)
917
mutter('%s: create_pack: creating pack from source packs: '
918
'%s%s %s revisions wanted %s t=0',
919
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
920
plain_pack_list, rev_count)
921
self._copy_revision_texts()
922
self._copy_inventory_texts()
923
self._copy_text_texts()
924
# select signature keys
925
signature_filter = self._revision_keys # same keyspace
926
signature_index_map, signature_indices = self._pack_map_and_index_list(
928
signature_nodes = self._index_contents(signature_indices,
930
# copy signature keys and adjust values
931
self.pb.update("Copying signature texts", 4)
932
self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer,
933
new_pack.signature_index)
934
if 'pack' in debug.debug_flags:
935
mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
936
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
937
new_pack.signature_index.key_count(),
938
time.time() - new_pack.start_time)
940
# NB XXX: how to check CHK references are present? perhaps by yielding
941
# the items? How should that interact with stacked repos?
942
if new_pack.chk_index is not None:
944
if 'pack' in debug.debug_flags:
945
mutter('%s: create_pack: chk content copied: %s%s %d items t+%6.3fs',
946
time.ctime(), self._pack_collection._upload_transport.base,
947
new_pack.random_name,
948
new_pack.chk_index.key_count(),
949
time.time() - new_pack.start_time)
950
new_pack._check_references()
951
if not self._use_pack(new_pack):
954
self.pb.update("Finishing pack", 5)
956
self._pack_collection.allocate(new_pack)
959
def _copy_chks(self, refs=None):
960
# XXX: Todo, recursive follow-pointers facility when fetching some
962
chk_index_map, chk_indices = self._pack_map_and_index_list(
964
chk_nodes = self._index_contents(chk_indices, refs)
966
# TODO: This isn't strictly tasteful as we are accessing some private
967
# variables (_serializer). Perhaps a better way would be to have
968
# Repository._deserialise_chk_node()
969
search_key_func = chk_map.search_key_registry.get(
970
self._pack_collection.repo._serializer.search_key_name)
971
def accumlate_refs(lines):
972
# XXX: move to a generic location
974
bytes = ''.join(lines)
975
node = chk_map._deserialise(bytes, ("unknown",), search_key_func)
976
new_refs.update(node.refs())
977
self._copy_nodes(chk_nodes, chk_index_map, self.new_pack._writer,
978
self.new_pack.chk_index, output_lines=accumlate_refs)
981
def _copy_nodes(self, nodes, index_map, writer, write_index,
983
"""Copy knit nodes between packs with no graph references.
985
:param output_lines: Output full texts of copied items.
987
pb = ui.ui_factory.nested_progress_bar()
989
return self._do_copy_nodes(nodes, index_map, writer,
990
write_index, pb, output_lines=output_lines)
994
def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb,
996
# for record verification
997
knit = KnitVersionedFiles(None, None)
998
# plan a readv on each source pack:
1000
nodes = sorted(nodes)
1001
# how to map this into knit.py - or knit.py into this?
1002
# we don't want the typical knit logic, we want grouping by pack
1003
# at this point - perhaps a helper library for the following code
1004
# duplication points?
1006
for index, key, value in nodes:
1007
if index not in request_groups:
1008
request_groups[index] = []
1009
request_groups[index].append((key, value))
1011
pb.update("Copied record", record_index, len(nodes))
1012
for index, items in request_groups.iteritems():
1013
pack_readv_requests = []
1014
for key, value in items:
1015
# ---- KnitGraphIndex.get_position
1016
bits = value[1:].split(' ')
1017
offset, length = int(bits[0]), int(bits[1])
1018
pack_readv_requests.append((offset, length, (key, value[0])))
1019
# linear scan up the pack
1020
pack_readv_requests.sort()
1022
pack_obj = index_map[index]
1023
transport, path = pack_obj.access_tuple()
1025
reader = pack.make_readv_reader(transport, path,
1026
[offset[0:2] for offset in pack_readv_requests])
1027
except errors.NoSuchFile:
1028
if self._reload_func is not None:
1031
for (names, read_func), (_1, _2, (key, eol_flag)) in \
1032
izip(reader.iter_records(), pack_readv_requests):
1033
raw_data = read_func(None)
1034
# check the header only
1035
if output_lines is not None:
1036
output_lines(knit._parse_record(key[-1], raw_data)[0])
1038
df, _ = knit._parse_record_header(key, raw_data)
1040
pos, size = writer.add_bytes_record(raw_data, names)
1041
write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
1042
pb.update("Copied record", record_index)
1045
def _copy_nodes_graph(self, index_map, writer, write_index,
1046
readv_group_iter, total_items, output_lines=False):
1047
"""Copy knit nodes between packs.
1049
:param output_lines: Return lines present in the copied data as
1050
an iterator of line,version_id.
1052
pb = ui.ui_factory.nested_progress_bar()
1054
for result in self._do_copy_nodes_graph(index_map, writer,
1055
write_index, output_lines, pb, readv_group_iter, total_items):
1058
# Python 2.4 does not permit try:finally: in a generator.
1064
def _do_copy_nodes_graph(self, index_map, writer, write_index,
1065
output_lines, pb, readv_group_iter, total_items):
1066
# for record verification
1067
knit = KnitVersionedFiles(None, None)
1068
# for line extraction when requested (inventories only)
1070
factory = KnitPlainFactory()
1072
pb.update("Copied record", record_index, total_items)
1073
for index, readv_vector, node_vector in readv_group_iter:
1075
pack_obj = index_map[index]
1076
transport, path = pack_obj.access_tuple()
1078
reader = pack.make_readv_reader(transport, path, readv_vector)
1079
except errors.NoSuchFile:
1080
if self._reload_func is not None:
1083
for (names, read_func), (key, eol_flag, references) in \
1084
izip(reader.iter_records(), node_vector):
1085
raw_data = read_func(None)
1087
# read the entire thing
1088
content, _ = knit._parse_record(key[-1], raw_data)
1089
if len(references[-1]) == 0:
1090
line_iterator = factory.get_fulltext_content(content)
1092
line_iterator = factory.get_linedelta_content(content)
1093
for line in line_iterator:
1096
# check the header only
1097
df, _ = knit._parse_record_header(key, raw_data)
1099
pos, size = writer.add_bytes_record(raw_data, names)
1100
write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
1101
pb.update("Copied record", record_index)
1104
def _get_text_nodes(self):
1105
text_index_map, text_indices = self._pack_map_and_index_list(
1107
return text_index_map, self._index_contents(text_indices,
1110
def _least_readv_node_readv(self, nodes):
1111
"""Generate request groups for nodes using the least readv's.
1113
:param nodes: An iterable of graph index nodes.
1114
:return: Total node count and an iterator of the data needed to perform
1115
readvs to obtain the data for nodes. Each item yielded by the
1116
iterator is a tuple with:
1117
index, readv_vector, node_vector. readv_vector is a list ready to
1118
hand to the transport readv method, and node_vector is a list of
1119
(key, eol_flag, references) for the the node retrieved by the
1120
matching readv_vector.
1122
# group by pack so we do one readv per pack
1123
nodes = sorted(nodes)
1126
for index, key, value, references in nodes:
1127
if index not in request_groups:
1128
request_groups[index] = []
1129
request_groups[index].append((key, value, references))
1131
for index, items in request_groups.iteritems():
1132
pack_readv_requests = []
1133
for key, value, references in items:
1134
# ---- KnitGraphIndex.get_position
1135
bits = value[1:].split(' ')
1136
offset, length = int(bits[0]), int(bits[1])
1137
pack_readv_requests.append(
1138
((offset, length), (key, value[0], references)))
1139
# linear scan up the pack to maximum range combining.
1140
pack_readv_requests.sort()
1141
# split out the readv and the node data.
1142
pack_readv = [readv for readv, node in pack_readv_requests]
1143
node_vector = [node for readv, node in pack_readv_requests]
1144
result.append((index, pack_readv, node_vector))
1145
return total, result
1147
def _log_copied_texts(self):
1148
if 'pack' in debug.debug_flags:
1149
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
1150
time.ctime(), self._pack_collection._upload_transport.base,
1151
self.new_pack.random_name,
1152
self.new_pack.text_index.key_count(),
1153
time.time() - self.new_pack.start_time)
1155
def _process_inventory_lines(self, inv_lines):
1156
"""Use up the inv_lines generator and setup a text key filter."""
1157
repo = self._pack_collection.repo
1158
fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
1159
inv_lines, self.revision_keys)
1161
for fileid, file_revids in fileid_revisions.iteritems():
1162
text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
1163
self._text_filter = text_filter
1165
def _revision_node_readv(self, revision_nodes):
1166
"""Return the total revisions and the readv's to issue.
1168
:param revision_nodes: The revision index contents for the packs being
1169
incorporated into the new pack.
1170
:return: As per _least_readv_node_readv.
1172
return self._least_readv_node_readv(revision_nodes)
1174
def _use_pack(self, new_pack):
1175
"""Return True if new_pack should be used.
1177
:param new_pack: The pack that has just been created.
1178
:return: True if the pack should be used.
1180
return new_pack.data_inserted()
1183
class OptimisingPacker(Packer):
1184
"""A packer which spends more time to create better disk layouts."""
1186
def _revision_node_readv(self, revision_nodes):
1187
"""Return the total revisions and the readv's to issue.
1189
This sort places revisions in topological order with the ancestors
1192
:param revision_nodes: The revision index contents for the packs being
1193
incorporated into the new pack.
1194
:return: As per _least_readv_node_readv.
1196
# build an ancestors dict
1199
for index, key, value, references in revision_nodes:
1200
ancestors[key] = references[0]
1201
by_key[key] = (index, value, references)
1202
order = tsort.topo_sort(ancestors)
1204
# Single IO is pathological, but it will work as a starting point.
1206
for key in reversed(order):
1207
index, value, references = by_key[key]
1208
# ---- KnitGraphIndex.get_position
1209
bits = value[1:].split(' ')
1210
offset, length = int(bits[0]), int(bits[1])
1212
(index, [(offset, length)], [(key, value[0], references)]))
1213
# TODO: combine requests in the same index that are in ascending order.
1214
return total, requests
1216
def open_pack(self):
1217
"""Open a pack for the pack we are creating."""
1218
new_pack = super(OptimisingPacker, self).open_pack()
1219
# Turn on the optimization flags for all the index builders.
1220
new_pack.revision_index.set_optimize(for_size=True)
1221
new_pack.inventory_index.set_optimize(for_size=True)
1222
new_pack.text_index.set_optimize(for_size=True)
1223
new_pack.signature_index.set_optimize(for_size=True)
1227
class ReconcilePacker(Packer):
1228
"""A packer which regenerates indices etc as it copies.
1230
This is used by ``bzr reconcile`` to cause parent text pointers to be
1234
def _extra_init(self):
1235
self._data_changed = False
1237
def _process_inventory_lines(self, inv_lines):
1238
"""Generate a text key reference map rather for reconciling with."""
1239
repo = self._pack_collection.repo
1240
refs = repo._find_text_key_references_from_xml_inventory_lines(
1242
self._text_refs = refs
1243
# during reconcile we:
1244
# - convert unreferenced texts to full texts
1245
# - correct texts which reference a text not copied to be full texts
1246
# - copy all others as-is but with corrected parents.
1247
# - so at this point we don't know enough to decide what becomes a full
1249
self._text_filter = None
1251
def _copy_text_texts(self):
1252
"""generate what texts we should have and then copy."""
1253
self.pb.update("Copying content texts", 3)
1254
# we have three major tasks here:
1255
# 1) generate the ideal index
1256
repo = self._pack_collection.repo
1257
ancestors = dict([(key[0], tuple(ref[0] for ref in refs[0])) for
1258
_1, key, _2, refs in
1259
self.new_pack.revision_index.iter_all_entries()])
1260
ideal_index = repo._generate_text_key_index(self._text_refs, ancestors)
1261
# 2) generate a text_nodes list that contains all the deltas that can
1262
# be used as-is, with corrected parents.
1265
discarded_nodes = []
1266
NULL_REVISION = _mod_revision.NULL_REVISION
1267
text_index_map, text_nodes = self._get_text_nodes()
1268
for node in text_nodes:
1274
ideal_parents = tuple(ideal_index[node[1]])
1276
discarded_nodes.append(node)
1277
self._data_changed = True
1279
if ideal_parents == (NULL_REVISION,):
1281
if ideal_parents == node[3][0]:
1283
ok_nodes.append(node)
1284
elif ideal_parents[0:1] == node[3][0][0:1]:
1285
# the left most parent is the same, or there are no parents
1286
# today. Either way, we can preserve the representation as
1287
# long as we change the refs to be inserted.
1288
self._data_changed = True
1289
ok_nodes.append((node[0], node[1], node[2],
1290
(ideal_parents, node[3][1])))
1291
self._data_changed = True
1293
# Reinsert this text completely
1294
bad_texts.append((node[1], ideal_parents))
1295
self._data_changed = True
1296
# we're finished with some data.
1299
# 3) bulk copy the ok data
1300
total_items, readv_group_iter = self._least_readv_node_readv(ok_nodes)
1301
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
1302
self.new_pack.text_index, readv_group_iter, total_items))
1303
# 4) adhoc copy all the other texts.
1304
# We have to topologically insert all texts otherwise we can fail to
1305
# reconcile when parts of a single delta chain are preserved intact,
1306
# and other parts are not. E.g. Discarded->d1->d2->d3. d1 will be
1307
# reinserted, and if d3 has incorrect parents it will also be
1308
# reinserted. If we insert d3 first, d2 is present (as it was bulk
1309
# copied), so we will try to delta, but d2 is not currently able to be
1310
# extracted because it's basis d1 is not present. Topologically sorting
1311
# addresses this. The following generates a sort for all the texts that
1312
# are being inserted without having to reference the entire text key
1313
# space (we only topo sort the revisions, which is smaller).
1314
topo_order = tsort.topo_sort(ancestors)
1315
rev_order = dict(zip(topo_order, range(len(topo_order))))
1316
bad_texts.sort(key=lambda key:rev_order.get(key[0][1], 0))
1317
transaction = repo.get_transaction()
1318
file_id_index = GraphIndexPrefixAdapter(
1319
self.new_pack.text_index,
1321
add_nodes_callback=self.new_pack.text_index.add_nodes)
1322
data_access = _DirectPackAccess(
1323
{self.new_pack.text_index:self.new_pack.access_tuple()})
1324
data_access.set_writer(self.new_pack._writer, self.new_pack.text_index,
1325
self.new_pack.access_tuple())
1326
output_texts = KnitVersionedFiles(
1327
_KnitGraphIndex(self.new_pack.text_index,
1328
add_callback=self.new_pack.text_index.add_nodes,
1329
deltas=True, parents=True, is_locked=repo.is_locked),
1330
data_access=data_access, max_delta_chain=200)
1331
for key, parent_keys in bad_texts:
1332
# We refer to the new pack to delta data being output.
1333
# A possible improvement would be to catch errors on short reads
1334
# and only flush then.
1335
self.new_pack.flush()
1337
for parent_key in parent_keys:
1338
if parent_key[0] != key[0]:
1339
# Graph parents must match the fileid
1340
raise errors.BzrError('Mismatched key parent %r:%r' %
1342
parents.append(parent_key[1])
1343
text_lines = osutils.split_lines(repo.texts.get_record_stream(
1344
[key], 'unordered', True).next().get_bytes_as('fulltext'))
1345
output_texts.add_lines(key, parent_keys, text_lines,
1346
random_id=True, check_content=False)
1347
# 5) check that nothing inserted has a reference outside the keyspace.
1348
missing_text_keys = self.new_pack.text_index._external_references()
1349
if missing_text_keys:
1350
raise errors.BzrCheckError('Reference to missing compression parents %r'
1351
% (missing_text_keys,))
1352
self._log_copied_texts()
1354
def _use_pack(self, new_pack):
1355
"""Override _use_pack to check for reconcile having changed content."""
1356
# XXX: we might be better checking this at the copy time.
1357
original_inventory_keys = set()
1358
inv_index = self._pack_collection.inventory_index.combined_index
1359
for entry in inv_index.iter_all_entries():
1360
original_inventory_keys.add(entry[1])
1361
new_inventory_keys = set()
1362
for entry in new_pack.inventory_index.iter_all_entries():
1363
new_inventory_keys.add(entry[1])
1364
if new_inventory_keys != original_inventory_keys:
1365
self._data_changed = True
1366
return new_pack.data_inserted() and self._data_changed
1369
class RepositoryPackCollection(object):
1370
"""Management of packs within a repository.
1372
:ivar _names: map of {pack_name: (index_size,)}
1375
pack_factory = NewPack
1376
resumed_pack_factory = ResumedPack
1378
def __init__(self, repo, transport, index_transport, upload_transport,
1379
pack_transport, index_builder_class, index_class,
1381
"""Create a new RepositoryPackCollection.
1383
:param transport: Addresses the repository base directory
1384
(typically .bzr/repository/).
1385
:param index_transport: Addresses the directory containing indices.
1386
:param upload_transport: Addresses the directory into which packs are written
1387
while they're being created.
1388
:param pack_transport: Addresses the directory of existing complete packs.
1389
:param index_builder_class: The index builder class to use.
1390
:param index_class: The index class to use.
1391
:param use_chk_index: Whether to setup and manage a CHK index.
1393
# XXX: This should call self.reset()
1395
self.transport = transport
1396
self._index_transport = index_transport
1397
self._upload_transport = upload_transport
1398
self._pack_transport = pack_transport
1399
self._index_builder_class = index_builder_class
1400
self._index_class = index_class
1401
self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3,
1406
self._packs_by_name = {}
1407
# the previous pack-names content
1408
self._packs_at_load = None
1409
# when a pack is being created by this object, the state of that pack.
1410
self._new_pack = None
1411
# aggregated revision index data
1412
flush = self._flush_new_pack
1413
self.revision_index = AggregateIndex(self.reload_pack_names, flush)
1414
self.inventory_index = AggregateIndex(self.reload_pack_names, flush)
1415
self.text_index = AggregateIndex(self.reload_pack_names, flush)
1416
self.signature_index = AggregateIndex(self.reload_pack_names, flush)
1418
self.chk_index = AggregateIndex(self.reload_pack_names, flush)
1420
# used to determine if we're using a chk_index elsewhere.
1421
self.chk_index = None
1423
self._resumed_packs = []
1425
def add_pack_to_memory(self, pack):
1426
"""Make a Pack object available to the repository to satisfy queries.
1428
:param pack: A Pack object.
1430
if pack.name in self._packs_by_name:
1431
raise AssertionError(
1432
'pack %s already in _packs_by_name' % (pack.name,))
1433
self.packs.append(pack)
1434
self._packs_by_name[pack.name] = pack
1435
self.revision_index.add_index(pack.revision_index, pack)
1436
self.inventory_index.add_index(pack.inventory_index, pack)
1437
self.text_index.add_index(pack.text_index, pack)
1438
self.signature_index.add_index(pack.signature_index, pack)
1439
if self.chk_index is not None:
1440
self.chk_index.add_index(pack.chk_index, pack)
1442
def all_packs(self):
1443
"""Return a list of all the Pack objects this repository has.
1445
Note that an in-progress pack being created is not returned.
1447
:return: A list of Pack objects for all the packs in the repository.
1450
for name in self.names():
1451
result.append(self.get_pack_by_name(name))
1455
"""Pack the pack collection incrementally.
1457
This will not attempt global reorganisation or recompression,
1458
rather it will just ensure that the total number of packs does
1459
not grow without bound. It uses the _max_pack_count method to
1460
determine if autopacking is needed, and the pack_distribution
1461
method to determine the number of revisions in each pack.
1463
If autopacking takes place then the packs name collection will have
1464
been flushed to disk - packing requires updating the name collection
1465
in synchronisation with certain steps. Otherwise the names collection
1468
:return: Something evaluating true if packing took place.
1472
return self._do_autopack()
1473
except errors.RetryAutopack:
1474
# If we get a RetryAutopack exception, we should abort the
1475
# current action, and retry.
1478
def _do_autopack(self):
1479
# XXX: Should not be needed when the management of indices is sane.
1480
total_revisions = self.revision_index.combined_index.key_count()
1481
total_packs = len(self._names)
1482
if self._max_pack_count(total_revisions) >= total_packs:
1484
# determine which packs need changing
1485
pack_distribution = self.pack_distribution(total_revisions)
1487
for pack in self.all_packs():
1488
revision_count = pack.get_revision_count()
1489
if revision_count == 0:
1490
# revision less packs are not generated by normal operation,
1491
# only by operations like sign-my-commits, and thus will not
1492
# tend to grow rapdily or without bound like commit containing
1493
# packs do - leave them alone as packing them really should
1494
# group their data with the relevant commit, and that may
1495
# involve rewriting ancient history - which autopack tries to
1496
# avoid. Alternatively we could not group the data but treat
1497
# each of these as having a single revision, and thus add
1498
# one revision for each to the total revision count, to get
1499
# a matching distribution.
1501
existing_packs.append((revision_count, pack))
1502
pack_operations = self.plan_autopack_combinations(
1503
existing_packs, pack_distribution)
1504
num_new_packs = len(pack_operations)
1505
num_old_packs = sum([len(po[1]) for po in pack_operations])
1506
num_revs_affected = sum([po[0] for po in pack_operations])
1507
mutter('Auto-packing repository %s, which has %d pack files, '
1508
'containing %d revisions. Packing %d files into %d affecting %d'
1509
' revisions', self, total_packs, total_revisions, num_old_packs,
1510
num_new_packs, num_revs_affected)
1511
result = self._execute_pack_operations(pack_operations,
1512
reload_func=self._restart_autopack)
1513
mutter('Auto-packing repository %s completed', self)
1516
def _execute_pack_operations(self, pack_operations, _packer_class=Packer,
1518
"""Execute a series of pack operations.
1520
:param pack_operations: A list of [revision_count, packs_to_combine].
1521
:param _packer_class: The class of packer to use (default: Packer).
1522
:return: The new pack names.
1524
for revision_count, packs in pack_operations:
1525
# we may have no-ops from the setup logic
1528
packer = _packer_class(self, packs, '.autopack',
1529
reload_func=reload_func)
1532
except errors.RetryWithNewPacks:
1533
# An exception is propagating out of this context, make sure
1534
# this packer has cleaned up. Packer() doesn't set its new_pack
1535
# state into the RepositoryPackCollection object, so we only
1536
# have access to it directly here.
1537
if packer.new_pack is not None:
1538
packer.new_pack.abort()
1541
self._remove_pack_from_memory(pack)
1542
# record the newly available packs and stop advertising the old
1544
result = self._save_pack_names(clear_obsolete_packs=True)
1545
# Move the old packs out of the way now they are no longer referenced.
1546
for revision_count, packs in pack_operations:
1547
self._obsolete_packs(packs)
1550
def _flush_new_pack(self):
1551
if self._new_pack is not None:
1552
self._new_pack.flush()
1554
def lock_names(self):
1555
"""Acquire the mutex around the pack-names index.
1557
This cannot be used in the middle of a read-only transaction on the
1560
self.repo.control_files.lock_write()
1562
def _already_packed(self):
1563
"""Is the collection already packed?"""
1564
return not (self.repo._format.pack_compresses or (len(self._names) > 1))
1566
def pack(self, hint=None):
1567
"""Pack the pack collection totally."""
1568
self.ensure_loaded()
1569
total_packs = len(self._names)
1570
if self._already_packed():
1572
total_revisions = self.revision_index.combined_index.key_count()
1573
# XXX: the following may want to be a class, to pack with a given
1575
mutter('Packing repository %s, which has %d pack files, '
1576
'containing %d revisions with hint %r.', self, total_packs,
1577
total_revisions, hint)
1578
# determine which packs need changing
1579
pack_operations = [[0, []]]
1580
for pack in self.all_packs():
1581
if hint is None or pack.name in hint:
1582
# Either no hint was provided (so we are packing everything),
1583
# or this pack was included in the hint.
1584
pack_operations[-1][0] += pack.get_revision_count()
1585
pack_operations[-1][1].append(pack)
1586
self._execute_pack_operations(pack_operations, OptimisingPacker)
1588
def plan_autopack_combinations(self, existing_packs, pack_distribution):
1589
"""Plan a pack operation.
1591
:param existing_packs: The packs to pack. (A list of (revcount, Pack)
1593
:param pack_distribution: A list with the number of revisions desired
1596
if len(existing_packs) <= len(pack_distribution):
1598
existing_packs.sort(reverse=True)
1599
pack_operations = [[0, []]]
1600
# plan out what packs to keep, and what to reorganise
1601
while len(existing_packs):
1602
# take the largest pack, and if its less than the head of the
1603
# distribution chart we will include its contents in the new pack
1604
# for that position. If its larger, we remove its size from the
1605
# distribution chart
1606
next_pack_rev_count, next_pack = existing_packs.pop(0)
1607
if next_pack_rev_count >= pack_distribution[0]:
1608
# this is already packed 'better' than this, so we can
1609
# not waste time packing it.
1610
while next_pack_rev_count > 0:
1611
next_pack_rev_count -= pack_distribution[0]
1612
if next_pack_rev_count >= 0:
1614
del pack_distribution[0]
1616
# didn't use that entire bucket up
1617
pack_distribution[0] = -next_pack_rev_count
1619
# add the revisions we're going to add to the next output pack
1620
pack_operations[-1][0] += next_pack_rev_count
1621
# allocate this pack to the next pack sub operation
1622
pack_operations[-1][1].append(next_pack)
1623
if pack_operations[-1][0] >= pack_distribution[0]:
1624
# this pack is used up, shift left.
1625
del pack_distribution[0]
1626
pack_operations.append([0, []])
1627
# Now that we know which pack files we want to move, shove them all
1628
# into a single pack file.
1630
final_pack_list = []
1631
for num_revs, pack_files in pack_operations:
1632
final_rev_count += num_revs
1633
final_pack_list.extend(pack_files)
1634
if len(final_pack_list) == 1:
1635
raise AssertionError('We somehow generated an autopack with a'
1636
' single pack file being moved.')
1638
return [[final_rev_count, final_pack_list]]
1640
def ensure_loaded(self):
1641
"""Ensure we have read names from disk.
1643
:return: True if the disk names had not been previously read.
1645
# NB: if you see an assertion error here, its probably access against
1646
# an unlocked repo. Naughty.
1647
if not self.repo.is_locked():
1648
raise errors.ObjectNotLocked(self.repo)
1649
if self._names is None:
1651
self._packs_at_load = set()
1652
for index, key, value in self._iter_disk_pack_index():
1654
self._names[name] = self._parse_index_sizes(value)
1655
self._packs_at_load.add((key, value))
1659
# populate all the metadata.
1663
def _parse_index_sizes(self, value):
1664
"""Parse a string of index sizes."""
1665
return tuple([int(digits) for digits in value.split(' ')])
1667
def get_pack_by_name(self, name):
1668
"""Get a Pack object by name.
1670
:param name: The name of the pack - e.g. '123456'
1671
:return: A Pack object.
1674
return self._packs_by_name[name]
1676
rev_index = self._make_index(name, '.rix')
1677
inv_index = self._make_index(name, '.iix')
1678
txt_index = self._make_index(name, '.tix')
1679
sig_index = self._make_index(name, '.six')
1680
if self.chk_index is not None:
1681
chk_index = self._make_index(name, '.cix', unlimited_cache=True)
1684
result = ExistingPack(self._pack_transport, name, rev_index,
1685
inv_index, txt_index, sig_index, chk_index)
1686
self.add_pack_to_memory(result)
1689
def _resume_pack(self, name):
1690
"""Get a suspended Pack object by name.
1692
:param name: The name of the pack - e.g. '123456'
1693
:return: A Pack object.
1695
if not re.match('[a-f0-9]{32}', name):
1696
# Tokens should be md5sums of the suspended pack file, i.e. 32 hex
1698
raise errors.UnresumableWriteGroup(
1699
self.repo, [name], 'Malformed write group token')
1701
rev_index = self._make_index(name, '.rix', resume=True)
1702
inv_index = self._make_index(name, '.iix', resume=True)
1703
txt_index = self._make_index(name, '.tix', resume=True)
1704
sig_index = self._make_index(name, '.six', resume=True)
1705
if self.chk_index is not None:
1706
chk_index = self._make_index(name, '.cix', resume=True,
1707
unlimited_cache=True)
1710
result = self.resumed_pack_factory(name, rev_index, inv_index,
1711
txt_index, sig_index, self._upload_transport,
1712
self._pack_transport, self._index_transport, self,
1713
chk_index=chk_index)
1714
except errors.NoSuchFile, e:
1715
raise errors.UnresumableWriteGroup(self.repo, [name], str(e))
1716
self.add_pack_to_memory(result)
1717
self._resumed_packs.append(result)
1720
def allocate(self, a_new_pack):
1721
"""Allocate name in the list of packs.
1723
:param a_new_pack: A NewPack instance to be added to the collection of
1724
packs for this repository.
1726
self.ensure_loaded()
1727
if a_new_pack.name in self._names:
1728
raise errors.BzrError(
1729
'Pack %r already exists in %s' % (a_new_pack.name, self))
1730
self._names[a_new_pack.name] = tuple(a_new_pack.index_sizes)
1731
self.add_pack_to_memory(a_new_pack)
1733
def _iter_disk_pack_index(self):
1734
"""Iterate over the contents of the pack-names index.
1736
This is used when loading the list from disk, and before writing to
1737
detect updates from others during our write operation.
1738
:return: An iterator of the index contents.
1740
return self._index_class(self.transport, 'pack-names', None
1741
).iter_all_entries()
1743
def _make_index(self, name, suffix, resume=False, unlimited_cache=False):
1744
size_offset = self._suffix_offsets[suffix]
1745
index_name = name + suffix
1747
transport = self._upload_transport
1748
index_size = transport.stat(index_name).st_size
1750
transport = self._index_transport
1751
index_size = self._names[name][size_offset]
1752
return self._index_class(transport, index_name, index_size,
1753
unlimited_cache=unlimited_cache)
1755
def _max_pack_count(self, total_revisions):
1756
"""Return the maximum number of packs to use for total revisions.
1758
:param total_revisions: The total number of revisions in the
1761
if not total_revisions:
1763
digits = str(total_revisions)
1765
for digit in digits:
1766
result += int(digit)
1770
"""Provide an order to the underlying names."""
1771
return sorted(self._names.keys())
1773
def _obsolete_packs(self, packs):
1774
"""Move a number of packs which have been obsoleted out of the way.
1776
Each pack and its associated indices are moved out of the way.
1778
Note: for correctness this function should only be called after a new
1779
pack names index has been written without these pack names, and with
1780
the names of packs that contain the data previously available via these
1783
:param packs: The packs to obsolete.
1784
:param return: None.
1787
pack.pack_transport.rename(pack.file_name(),
1788
'../obsolete_packs/' + pack.file_name())
1789
# TODO: Probably needs to know all possible indices for this pack
1790
# - or maybe list the directory and move all indices matching this
1791
# name whether we recognize it or not?
1792
suffixes = ['.iix', '.six', '.tix', '.rix']
1793
if self.chk_index is not None:
1794
suffixes.append('.cix')
1795
for suffix in suffixes:
1796
self._index_transport.rename(pack.name + suffix,
1797
'../obsolete_packs/' + pack.name + suffix)
1799
def pack_distribution(self, total_revisions):
1800
"""Generate a list of the number of revisions to put in each pack.
1802
:param total_revisions: The total number of revisions in the
1805
if total_revisions == 0:
1807
digits = reversed(str(total_revisions))
1809
for exponent, count in enumerate(digits):
1810
size = 10 ** exponent
1811
for pos in range(int(count)):
1813
return list(reversed(result))
1815
def _pack_tuple(self, name):
1816
"""Return a tuple with the transport and file name for a pack name."""
1817
return self._pack_transport, name + '.pack'
1819
def _remove_pack_from_memory(self, pack):
1820
"""Remove pack from the packs accessed by this repository.
1822
Only affects memory state, until self._save_pack_names() is invoked.
1824
self._names.pop(pack.name)
1825
self._packs_by_name.pop(pack.name)
1826
self._remove_pack_indices(pack)
1827
self.packs.remove(pack)
1829
def _remove_pack_indices(self, pack):
1830
"""Remove the indices for pack from the aggregated indices."""
1831
self.revision_index.remove_index(pack.revision_index, pack)
1832
self.inventory_index.remove_index(pack.inventory_index, pack)
1833
self.text_index.remove_index(pack.text_index, pack)
1834
self.signature_index.remove_index(pack.signature_index, pack)
1835
if self.chk_index is not None:
1836
self.chk_index.remove_index(pack.chk_index, pack)
1839
"""Clear all cached data."""
1840
# cached revision data
1841
self.revision_index.clear()
1842
# cached signature data
1843
self.signature_index.clear()
1844
# cached file text data
1845
self.text_index.clear()
1846
# cached inventory data
1847
self.inventory_index.clear()
1849
if self.chk_index is not None:
1850
self.chk_index.clear()
1851
# remove the open pack
1852
self._new_pack = None
1853
# information about packs.
1856
self._packs_by_name = {}
1857
self._packs_at_load = None
1859
def _unlock_names(self):
1860
"""Release the mutex around the pack-names index."""
1861
self.repo.control_files.unlock()
1863
def _diff_pack_names(self):
1864
"""Read the pack names from disk, and compare it to the one in memory.
1866
:return: (disk_nodes, deleted_nodes, new_nodes)
1867
disk_nodes The final set of nodes that should be referenced
1868
deleted_nodes Nodes which have been removed from when we started
1869
new_nodes Nodes that are newly introduced
1871
# load the disk nodes across
1873
for index, key, value in self._iter_disk_pack_index():
1874
disk_nodes.add((key, value))
1876
# do a two-way diff against our original content
1877
current_nodes = set()
1878
for name, sizes in self._names.iteritems():
1880
((name, ), ' '.join(str(size) for size in sizes)))
1882
# Packs no longer present in the repository, which were present when we
1883
# locked the repository
1884
deleted_nodes = self._packs_at_load - current_nodes
1885
# Packs which this process is adding
1886
new_nodes = current_nodes - self._packs_at_load
1888
# Update the disk_nodes set to include the ones we are adding, and
1889
# remove the ones which were removed by someone else
1890
disk_nodes.difference_update(deleted_nodes)
1891
disk_nodes.update(new_nodes)
1893
return disk_nodes, deleted_nodes, new_nodes
1895
def _syncronize_pack_names_from_disk_nodes(self, disk_nodes):
1896
"""Given the correct set of pack files, update our saved info.
1898
:return: (removed, added, modified)
1899
removed pack names removed from self._names
1900
added pack names added to self._names
1901
modified pack names that had changed value
1906
## self._packs_at_load = disk_nodes
1907
new_names = dict(disk_nodes)
1908
# drop no longer present nodes
1909
for pack in self.all_packs():
1910
if (pack.name,) not in new_names:
1911
removed.append(pack.name)
1912
self._remove_pack_from_memory(pack)
1913
# add new nodes/refresh existing ones
1914
for key, value in disk_nodes:
1916
sizes = self._parse_index_sizes(value)
1917
if name in self._names:
1919
if sizes != self._names[name]:
1920
# the pack for name has had its indices replaced - rare but
1921
# important to handle. XXX: probably can never happen today
1922
# because the three-way merge code above does not handle it
1923
# - you may end up adding the same key twice to the new
1924
# disk index because the set values are the same, unless
1925
# the only index shows up as deleted by the set difference
1926
# - which it may. Until there is a specific test for this,
1927
# assume its broken. RBC 20071017.
1928
self._remove_pack_from_memory(self.get_pack_by_name(name))
1929
self._names[name] = sizes
1930
self.get_pack_by_name(name)
1931
modified.append(name)
1934
self._names[name] = sizes
1935
self.get_pack_by_name(name)
1937
return removed, added, modified
1939
def _save_pack_names(self, clear_obsolete_packs=False):
1940
"""Save the list of packs.
1942
This will take out the mutex around the pack names list for the
1943
duration of the method call. If concurrent updates have been made, a
1944
three-way merge between the current list and the current in memory list
1947
:param clear_obsolete_packs: If True, clear out the contents of the
1948
obsolete_packs directory.
1949
:return: A list of the names saved that were not previously on disk.
1953
builder = self._index_builder_class()
1954
disk_nodes, deleted_nodes, new_nodes = self._diff_pack_names()
1955
# TODO: handle same-name, index-size-changes here -
1956
# e.g. use the value from disk, not ours, *unless* we're the one
1958
for key, value in disk_nodes:
1959
builder.add_node(key, value)
1960
self.transport.put_file('pack-names', builder.finish(),
1961
mode=self.repo.bzrdir._get_file_mode())
1962
# move the baseline forward
1963
self._packs_at_load = disk_nodes
1964
if clear_obsolete_packs:
1965
self._clear_obsolete_packs()
1967
self._unlock_names()
1968
# synchronise the memory packs list with what we just wrote:
1969
self._syncronize_pack_names_from_disk_nodes(disk_nodes)
1970
return [new_node[0][0] for new_node in new_nodes]
1972
def reload_pack_names(self):
1973
"""Sync our pack listing with what is present in the repository.
1975
This should be called when we find out that something we thought was
1976
present is now missing. This happens when another process re-packs the
1979
:return: True if the in-memory list of packs has been altered at all.
1981
# The ensure_loaded call is to handle the case where the first call
1982
# made involving the collection was to reload_pack_names, where we
1983
# don't have a view of disk contents. Its a bit of a bandaid, and
1984
# causes two reads of pack-names, but its a rare corner case not struck
1985
# with regular push/pull etc.
1986
first_read = self.ensure_loaded()
1989
# out the new value.
1990
disk_nodes, _, _ = self._diff_pack_names()
1991
self._packs_at_load = disk_nodes
1993
modified) = self._syncronize_pack_names_from_disk_nodes(disk_nodes)
1994
if removed or added or modified:
1998
def _restart_autopack(self):
1999
"""Reload the pack names list, and restart the autopack code."""
2000
if not self.reload_pack_names():
2001
# Re-raise the original exception, because something went missing
2002
# and a restart didn't find it
2004
raise errors.RetryAutopack(self.repo, False, sys.exc_info())
2006
def _clear_obsolete_packs(self):
2007
"""Delete everything from the obsolete-packs directory.
2009
obsolete_pack_transport = self.transport.clone('obsolete_packs')
2010
for filename in obsolete_pack_transport.list_dir('.'):
2012
obsolete_pack_transport.delete(filename)
2013
except (errors.PathError, errors.TransportError), e:
2014
warning("couldn't delete obsolete pack, skipping it:\n%s" % (e,))
2016
def _start_write_group(self):
2017
# Do not permit preparation for writing if we're not in a 'write lock'.
2018
if not self.repo.is_write_locked():
2019
raise errors.NotWriteLocked(self)
2020
self._new_pack = self.pack_factory(self, upload_suffix='.pack',
2021
file_mode=self.repo.bzrdir._get_file_mode())
2022
# allow writing: queue writes to a new index
2023
self.revision_index.add_writable_index(self._new_pack.revision_index,
2025
self.inventory_index.add_writable_index(self._new_pack.inventory_index,
2027
self.text_index.add_writable_index(self._new_pack.text_index,
2029
self._new_pack.text_index.set_optimize(combine_backing_indices=False)
2030
self.signature_index.add_writable_index(self._new_pack.signature_index,
2032
if self.chk_index is not None:
2033
self.chk_index.add_writable_index(self._new_pack.chk_index,
2035
self.repo.chk_bytes._index._add_callback = self.chk_index.add_callback
2036
self._new_pack.chk_index.set_optimize(combine_backing_indices=False)
2038
self.repo.inventories._index._add_callback = self.inventory_index.add_callback
2039
self.repo.revisions._index._add_callback = self.revision_index.add_callback
2040
self.repo.signatures._index._add_callback = self.signature_index.add_callback
2041
self.repo.texts._index._add_callback = self.text_index.add_callback
2043
def _abort_write_group(self):
2044
# FIXME: just drop the transient index.
2045
# forget what names there are
2046
if self._new_pack is not None:
2048
self._new_pack.abort()
2050
# XXX: If we aborted while in the middle of finishing the write
2051
# group, _remove_pack_indices can fail because the indexes are
2052
# already gone. If they're not there we shouldn't fail in this
2053
# case. -- mbp 20081113
2054
self._remove_pack_indices(self._new_pack)
2055
self._new_pack = None
2056
for resumed_pack in self._resumed_packs:
2058
resumed_pack.abort()
2060
# See comment in previous finally block.
2062
self._remove_pack_indices(resumed_pack)
2065
del self._resumed_packs[:]
2067
def _remove_resumed_pack_indices(self):
2068
for resumed_pack in self._resumed_packs:
2069
self._remove_pack_indices(resumed_pack)
2070
del self._resumed_packs[:]
2072
def _check_new_inventories(self):
2073
"""Detect missing inventories in this write group.
2075
:returns: list of strs, summarising any problems found. If the list is
2076
empty no problems were found.
2078
# The base implementation does no checks. GCRepositoryPackCollection
2082
def _commit_write_group(self):
2084
for prefix, versioned_file in (
2085
('revisions', self.repo.revisions),
2086
('inventories', self.repo.inventories),
2087
('texts', self.repo.texts),
2088
('signatures', self.repo.signatures),
2090
missing = versioned_file.get_missing_compression_parent_keys()
2091
all_missing.update([(prefix,) + key for key in missing])
2093
raise errors.BzrCheckError(
2094
"Repository %s has missing compression parent(s) %r "
2095
% (self.repo, sorted(all_missing)))
2096
problems = self._check_new_inventories()
2098
problems_summary = '\n'.join(problems)
2099
raise errors.BzrCheckError(
2100
"Cannot add revision(s) to repository: " + problems_summary)
2101
self._remove_pack_indices(self._new_pack)
2102
any_new_content = False
2103
if self._new_pack.data_inserted():
2104
# get all the data to disk and read to use
2105
self._new_pack.finish()
2106
self.allocate(self._new_pack)
2107
self._new_pack = None
2108
any_new_content = True
2110
self._new_pack.abort()
2111
self._new_pack = None
2112
for resumed_pack in self._resumed_packs:
2113
# XXX: this is a pretty ugly way to turn the resumed pack into a
2114
# properly committed pack.
2115
self._names[resumed_pack.name] = None
2116
self._remove_pack_from_memory(resumed_pack)
2117
resumed_pack.finish()
2118
self.allocate(resumed_pack)
2119
any_new_content = True
2120
del self._resumed_packs[:]
2122
result = self.autopack()
2124
# when autopack takes no steps, the names list is still
2126
return self._save_pack_names()
2130
def _suspend_write_group(self):
2131
tokens = [pack.name for pack in self._resumed_packs]
2132
self._remove_pack_indices(self._new_pack)
2133
if self._new_pack.data_inserted():
2134
# get all the data to disk and read to use
2135
self._new_pack.finish(suspend=True)
2136
tokens.append(self._new_pack.name)
2137
self._new_pack = None
2139
self._new_pack.abort()
2140
self._new_pack = None
2141
self._remove_resumed_pack_indices()
2144
def _resume_write_group(self, tokens):
2145
for token in tokens:
2146
self._resume_pack(token)
2149
class KnitPackRepository(KnitRepository):
2150
"""Repository with knit objects stored inside pack containers.
2152
The layering for a KnitPackRepository is:
2154
Graph | HPSS | Repository public layer |
2155
===================================================
2156
Tuple based apis below, string based, and key based apis above
2157
---------------------------------------------------
2159
Provides .texts, .revisions etc
2160
This adapts the N-tuple keys to physical knit records which only have a
2161
single string identifier (for historical reasons), which in older formats
2162
was always the revision_id, and in the mapped code for packs is always
2163
the last element of key tuples.
2164
---------------------------------------------------
2166
A separate GraphIndex is used for each of the
2167
texts/inventories/revisions/signatures contained within each individual
2168
pack file. The GraphIndex layer works in N-tuples and is unaware of any
2170
===================================================
2174
def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
2176
KnitRepository.__init__(self, _format, a_bzrdir, control_files,
2177
_commit_builder_class, _serializer)
2178
index_transport = self._transport.clone('indices')
2179
self._pack_collection = RepositoryPackCollection(self, self._transport,
2181
self._transport.clone('upload'),
2182
self._transport.clone('packs'),
2183
_format.index_builder_class,
2184
_format.index_class,
2185
use_chk_index=self._format.supports_chks,
2187
self.inventories = KnitVersionedFiles(
2188
_KnitGraphIndex(self._pack_collection.inventory_index.combined_index,
2189
add_callback=self._pack_collection.inventory_index.add_callback,
2190
deltas=True, parents=True, is_locked=self.is_locked),
2191
data_access=self._pack_collection.inventory_index.data_access,
2192
max_delta_chain=200)
2193
self.revisions = KnitVersionedFiles(
2194
_KnitGraphIndex(self._pack_collection.revision_index.combined_index,
2195
add_callback=self._pack_collection.revision_index.add_callback,
2196
deltas=False, parents=True, is_locked=self.is_locked,
2197
track_external_parent_refs=True),
2198
data_access=self._pack_collection.revision_index.data_access,
2200
self.signatures = KnitVersionedFiles(
2201
_KnitGraphIndex(self._pack_collection.signature_index.combined_index,
2202
add_callback=self._pack_collection.signature_index.add_callback,
2203
deltas=False, parents=False, is_locked=self.is_locked),
2204
data_access=self._pack_collection.signature_index.data_access,
2206
self.texts = KnitVersionedFiles(
2207
_KnitGraphIndex(self._pack_collection.text_index.combined_index,
2208
add_callback=self._pack_collection.text_index.add_callback,
2209
deltas=True, parents=True, is_locked=self.is_locked),
2210
data_access=self._pack_collection.text_index.data_access,
2211
max_delta_chain=200)
2212
if _format.supports_chks:
2213
# No graph, no compression:- references from chks are between
2214
# different objects not temporal versions of the same; and without
2215
# some sort of temporal structure knit compression will just fail.
2216
self.chk_bytes = KnitVersionedFiles(
2217
_KnitGraphIndex(self._pack_collection.chk_index.combined_index,
2218
add_callback=self._pack_collection.chk_index.add_callback,
2219
deltas=False, parents=False, is_locked=self.is_locked),
2220
data_access=self._pack_collection.chk_index.data_access,
2223
self.chk_bytes = None
2224
# True when the repository object is 'write locked' (as opposed to the
2225
# physical lock only taken out around changes to the pack-names list.)
2226
# Another way to represent this would be a decorator around the control
2227
# files object that presents logical locks as physical ones - if this
2228
# gets ugly consider that alternative design. RBC 20071011
2229
self._write_lock_count = 0
2230
self._transaction = None
2232
self._reconcile_does_inventory_gc = True
2233
self._reconcile_fixes_text_parents = True
2234
self._reconcile_backsup_inventory = False
2236
def _warn_if_deprecated(self):
2237
# This class isn't deprecated, but one sub-format is
2238
if isinstance(self._format, RepositoryFormatKnitPack5RichRootBroken):
2239
from bzrlib import repository
2240
if repository._deprecation_warning_done:
2242
repository._deprecation_warning_done = True
2243
warning("Format %s for %s is deprecated - please use"
2244
" 'bzr upgrade --1.6.1-rich-root'"
2245
% (self._format, self.bzrdir.transport.base))
2247
def _abort_write_group(self):
2248
self.revisions._index._key_dependencies.clear()
2249
self._pack_collection._abort_write_group()
2251
def _get_source(self, to_format):
2252
if to_format.network_name() == self._format.network_name():
2253
return KnitPackStreamSource(self, to_format)
2254
return super(KnitPackRepository, self)._get_source(to_format)
2256
def _make_parents_provider(self):
2257
return graph.CachingParentsProvider(self)
2259
def _refresh_data(self):
2260
if not self.is_locked():
2262
self._pack_collection.reload_pack_names()
2264
def _start_write_group(self):
2265
self._pack_collection._start_write_group()
2267
def _commit_write_group(self):
2268
hint = self._pack_collection._commit_write_group()
2269
self.revisions._index._key_dependencies.clear()
2272
def suspend_write_group(self):
2273
# XXX check self._write_group is self.get_transaction()?
2274
tokens = self._pack_collection._suspend_write_group()
2275
self.revisions._index._key_dependencies.clear()
2276
self._write_group = None
2279
def _resume_write_group(self, tokens):
2280
self._start_write_group()
2282
self._pack_collection._resume_write_group(tokens)
2283
except errors.UnresumableWriteGroup:
2284
self._abort_write_group()
2286
for pack in self._pack_collection._resumed_packs:
2287
self.revisions._index.scan_unvalidated_index(pack.revision_index)
2289
def get_transaction(self):
2290
if self._write_lock_count:
2291
return self._transaction
2293
return self.control_files.get_transaction()
2295
def is_locked(self):
2296
return self._write_lock_count or self.control_files.is_locked()
2298
def is_write_locked(self):
2299
return self._write_lock_count
2301
def lock_write(self, token=None):
2302
locked = self.is_locked()
2303
if not self._write_lock_count and locked:
2304
raise errors.ReadOnlyError(self)
2305
self._write_lock_count += 1
2306
if self._write_lock_count == 1:
2307
self._transaction = transactions.WriteTransaction()
2309
for repo in self._fallback_repositories:
2310
# Writes don't affect fallback repos
2312
self._refresh_data()
2314
def lock_read(self):
2315
locked = self.is_locked()
2316
if self._write_lock_count:
2317
self._write_lock_count += 1
2319
self.control_files.lock_read()
2321
for repo in self._fallback_repositories:
2323
self._refresh_data()
2325
def leave_lock_in_place(self):
2326
# not supported - raise an error
2327
raise NotImplementedError(self.leave_lock_in_place)
2329
def dont_leave_lock_in_place(self):
2330
# not supported - raise an error
2331
raise NotImplementedError(self.dont_leave_lock_in_place)
2334
def pack(self, hint=None):
2335
"""Compress the data within the repository.
2337
This will pack all the data to a single pack. In future it may
2338
recompress deltas or do other such expensive operations.
2340
self._pack_collection.pack(hint=hint)
2343
def reconcile(self, other=None, thorough=False):
2344
"""Reconcile this repository."""
2345
from bzrlib.reconcile import PackReconciler
2346
reconciler = PackReconciler(self, thorough=thorough)
2347
reconciler.reconcile()
2350
def _reconcile_pack(self, collection, packs, extension, revs, pb):
2351
packer = ReconcilePacker(collection, packs, extension, revs)
2352
return packer.pack(pb)
2355
if self._write_lock_count == 1 and self._write_group is not None:
2356
self.abort_write_group()
2357
self._transaction = None
2358
self._write_lock_count = 0
2359
raise errors.BzrError(
2360
'Must end write group before releasing write lock on %s'
2362
if self._write_lock_count:
2363
self._write_lock_count -= 1
2364
if not self._write_lock_count:
2365
transaction = self._transaction
2366
self._transaction = None
2367
transaction.finish()
2369
self.control_files.unlock()
2371
if not self.is_locked():
2372
for repo in self._fallback_repositories:
2376
class KnitPackStreamSource(StreamSource):
2377
"""A StreamSource used to transfer data between same-format KnitPack repos.
2379
This source assumes:
2380
1) Same serialization format for all objects
2381
2) Same root information
2382
3) XML format inventories
2383
4) Atomic inserts (so we can stream inventory texts before text
2388
def __init__(self, from_repository, to_format):
2389
super(KnitPackStreamSource, self).__init__(from_repository, to_format)
2390
self._text_keys = None
2391
self._text_fetch_order = 'unordered'
2393
def _get_filtered_inv_stream(self, revision_ids):
2394
from_repo = self.from_repository
2395
parent_ids = from_repo._find_parent_ids_of_revisions(revision_ids)
2396
parent_keys = [(p,) for p in parent_ids]
2397
find_text_keys = from_repo._find_text_key_references_from_xml_inventory_lines
2398
parent_text_keys = set(find_text_keys(
2399
from_repo._inventory_xml_lines_for_keys(parent_keys)))
2400
content_text_keys = set()
2401
knit = KnitVersionedFiles(None, None)
2402
factory = KnitPlainFactory()
2403
def find_text_keys_from_content(record):
2404
if record.storage_kind not in ('knit-delta-gz', 'knit-ft-gz'):
2405
raise ValueError("Unknown content storage kind for"
2406
" inventory text: %s" % (record.storage_kind,))
2407
# It's a knit record, it has a _raw_record field (even if it was
2408
# reconstituted from a network stream).
2409
raw_data = record._raw_record
2410
# read the entire thing
2411
revision_id = record.key[-1]
2412
content, _ = knit._parse_record(revision_id, raw_data)
2413
if record.storage_kind == 'knit-delta-gz':
2414
line_iterator = factory.get_linedelta_content(content)
2415
elif record.storage_kind == 'knit-ft-gz':
2416
line_iterator = factory.get_fulltext_content(content)
2417
content_text_keys.update(find_text_keys(
2418
[(line, revision_id) for line in line_iterator]))
2419
revision_keys = [(r,) for r in revision_ids]
2420
def _filtered_inv_stream():
2421
source_vf = from_repo.inventories
2422
stream = source_vf.get_record_stream(revision_keys,
2424
for record in stream:
2425
if record.storage_kind == 'absent':
2426
raise errors.NoSuchRevision(from_repo, record.key)
2427
find_text_keys_from_content(record)
2429
self._text_keys = content_text_keys - parent_text_keys
2430
return ('inventories', _filtered_inv_stream())
2432
def _get_text_stream(self):
2433
# Note: We know we don't have to handle adding root keys, because both
2434
# the source and target are the identical network name.
2435
text_stream = self.from_repository.texts.get_record_stream(
2436
self._text_keys, self._text_fetch_order, False)
2437
return ('texts', text_stream)
2439
def get_stream(self, search):
2440
revision_ids = search.get_keys()
2441
for stream_info in self._fetch_revision_texts(revision_ids):
2443
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
2444
yield self._get_filtered_inv_stream(revision_ids)
2445
yield self._get_text_stream()
2449
class RepositoryFormatPack(MetaDirRepositoryFormat):
2450
"""Format logic for pack structured repositories.
2452
This repository format has:
2453
- a list of packs in pack-names
2454
- packs in packs/NAME.pack
2455
- indices in indices/NAME.{iix,six,tix,rix}
2456
- knit deltas in the packs, knit indices mapped to the indices.
2457
- thunk objects to support the knits programming API.
2458
- a format marker of its own
2459
- an optional 'shared-storage' flag
2460
- an optional 'no-working-trees' flag
2464
# Set this attribute in derived classes to control the repository class
2465
# created by open and initialize.
2466
repository_class = None
2467
# Set this attribute in derived classes to control the
2468
# _commit_builder_class that the repository objects will have passed to
2469
# their constructor.
2470
_commit_builder_class = None
2471
# Set this attribute in derived clases to control the _serializer that the
2472
# repository objects will have passed to their constructor.
2474
# Packs are not confused by ghosts.
2475
supports_ghosts = True
2476
# External references are not supported in pack repositories yet.
2477
supports_external_lookups = False
2478
# Most pack formats do not use chk lookups.
2479
supports_chks = False
2480
# What index classes to use
2481
index_builder_class = None
2483
_fetch_uses_deltas = True
2486
def initialize(self, a_bzrdir, shared=False):
2487
"""Create a pack based repository.
2489
:param a_bzrdir: bzrdir to contain the new repository; must already
2491
:param shared: If true the repository will be initialized as a shared
2494
mutter('creating repository in %s.', a_bzrdir.transport.base)
2495
dirs = ['indices', 'obsolete_packs', 'packs', 'upload']
2496
builder = self.index_builder_class()
2497
files = [('pack-names', builder.finish())]
2498
utf8_files = [('format', self.get_format_string())]
2500
self._upload_blank_content(a_bzrdir, dirs, files, utf8_files, shared)
2501
return self.open(a_bzrdir=a_bzrdir, _found=True)
2503
def open(self, a_bzrdir, _found=False, _override_transport=None):
2504
"""See RepositoryFormat.open().
2506
:param _override_transport: INTERNAL USE ONLY. Allows opening the
2507
repository at a slightly different url
2508
than normal. I.e. during 'upgrade'.
2511
format = RepositoryFormat.find_format(a_bzrdir)
2512
if _override_transport is not None:
2513
repo_transport = _override_transport
2515
repo_transport = a_bzrdir.get_repository_transport(None)
2516
control_files = lockable_files.LockableFiles(repo_transport,
2517
'lock', lockdir.LockDir)
2518
return self.repository_class(_format=self,
2520
control_files=control_files,
2521
_commit_builder_class=self._commit_builder_class,
2522
_serializer=self._serializer)
2525
class RepositoryFormatKnitPack1(RepositoryFormatPack):
2526
"""A no-subtrees parameterized Pack repository.
2528
This format was introduced in 0.92.
2531
repository_class = KnitPackRepository
2532
_commit_builder_class = PackCommitBuilder
2534
def _serializer(self):
2535
return xml5.serializer_v5
2536
# What index classes to use
2537
index_builder_class = InMemoryGraphIndex
2538
index_class = GraphIndex
2540
def _get_matching_bzrdir(self):
2541
return bzrdir.format_registry.make_bzrdir('pack-0.92')
2543
def _ignore_setting_bzrdir(self, format):
2546
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2548
def get_format_string(self):
2549
"""See RepositoryFormat.get_format_string()."""
2550
return "Bazaar pack repository format 1 (needs bzr 0.92)\n"
2552
def get_format_description(self):
2553
"""See RepositoryFormat.get_format_description()."""
2554
return "Packs containing knits without subtree support"
2557
class RepositoryFormatKnitPack3(RepositoryFormatPack):
2558
"""A subtrees parameterized Pack repository.
2560
This repository format uses the xml7 serializer to get:
2561
- support for recording full info about the tree root
2562
- support for recording tree-references
2564
This format was introduced in 0.92.
2567
repository_class = KnitPackRepository
2568
_commit_builder_class = PackRootCommitBuilder
2569
rich_root_data = True
2570
supports_tree_reference = True
2572
def _serializer(self):
2573
return xml7.serializer_v7
2574
# What index classes to use
2575
index_builder_class = InMemoryGraphIndex
2576
index_class = GraphIndex
2578
def _get_matching_bzrdir(self):
2579
return bzrdir.format_registry.make_bzrdir(
2580
'pack-0.92-subtree')
2582
def _ignore_setting_bzrdir(self, format):
2585
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2587
def get_format_string(self):
2588
"""See RepositoryFormat.get_format_string()."""
2589
return "Bazaar pack repository format 1 with subtree support (needs bzr 0.92)\n"
2591
def get_format_description(self):
2592
"""See RepositoryFormat.get_format_description()."""
2593
return "Packs containing knits with subtree support\n"
2596
class RepositoryFormatKnitPack4(RepositoryFormatPack):
2597
"""A rich-root, no subtrees parameterized Pack repository.
2599
This repository format uses the xml6 serializer to get:
2600
- support for recording full info about the tree root
2602
This format was introduced in 1.0.
2605
repository_class = KnitPackRepository
2606
_commit_builder_class = PackRootCommitBuilder
2607
rich_root_data = True
2608
supports_tree_reference = False
2610
def _serializer(self):
2611
return xml6.serializer_v6
2612
# What index classes to use
2613
index_builder_class = InMemoryGraphIndex
2614
index_class = GraphIndex
2616
def _get_matching_bzrdir(self):
2617
return bzrdir.format_registry.make_bzrdir(
2620
def _ignore_setting_bzrdir(self, format):
2623
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2625
def get_format_string(self):
2626
"""See RepositoryFormat.get_format_string()."""
2627
return ("Bazaar pack repository format 1 with rich root"
2628
" (needs bzr 1.0)\n")
2630
def get_format_description(self):
2631
"""See RepositoryFormat.get_format_description()."""
2632
return "Packs containing knits with rich root support\n"
2635
class RepositoryFormatKnitPack5(RepositoryFormatPack):
2636
"""Repository that supports external references to allow stacking.
2640
Supports external lookups, which results in non-truncated ghosts after
2641
reconcile compared to pack-0.92 formats.
2644
repository_class = KnitPackRepository
2645
_commit_builder_class = PackCommitBuilder
2646
supports_external_lookups = True
2647
# What index classes to use
2648
index_builder_class = InMemoryGraphIndex
2649
index_class = GraphIndex
2652
def _serializer(self):
2653
return xml5.serializer_v5
2655
def _get_matching_bzrdir(self):
2656
return bzrdir.format_registry.make_bzrdir('1.6')
2658
def _ignore_setting_bzrdir(self, format):
2661
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2663
def get_format_string(self):
2664
"""See RepositoryFormat.get_format_string()."""
2665
return "Bazaar RepositoryFormatKnitPack5 (bzr 1.6)\n"
2667
def get_format_description(self):
2668
"""See RepositoryFormat.get_format_description()."""
2669
return "Packs 5 (adds stacking support, requires bzr 1.6)"
2672
class RepositoryFormatKnitPack5RichRoot(RepositoryFormatPack):
2673
"""A repository with rich roots and stacking.
2675
New in release 1.6.1.
2677
Supports stacking on other repositories, allowing data to be accessed
2678
without being stored locally.
2681
repository_class = KnitPackRepository
2682
_commit_builder_class = PackRootCommitBuilder
2683
rich_root_data = True
2684
supports_tree_reference = False # no subtrees
2685
supports_external_lookups = True
2686
# What index classes to use
2687
index_builder_class = InMemoryGraphIndex
2688
index_class = GraphIndex
2691
def _serializer(self):
2692
return xml6.serializer_v6
2694
def _get_matching_bzrdir(self):
2695
return bzrdir.format_registry.make_bzrdir(
2698
def _ignore_setting_bzrdir(self, format):
2701
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2703
def get_format_string(self):
2704
"""See RepositoryFormat.get_format_string()."""
2705
return "Bazaar RepositoryFormatKnitPack5RichRoot (bzr 1.6.1)\n"
2707
def get_format_description(self):
2708
return "Packs 5 rich-root (adds stacking support, requires bzr 1.6.1)"
2711
class RepositoryFormatKnitPack5RichRootBroken(RepositoryFormatPack):
2712
"""A repository with rich roots and external references.
2716
Supports external lookups, which results in non-truncated ghosts after
2717
reconcile compared to pack-0.92 formats.
2719
This format was deprecated because the serializer it uses accidentally
2720
supported subtrees, when the format was not intended to. This meant that
2721
someone could accidentally fetch from an incorrect repository.
2724
repository_class = KnitPackRepository
2725
_commit_builder_class = PackRootCommitBuilder
2726
rich_root_data = True
2727
supports_tree_reference = False # no subtrees
2729
supports_external_lookups = True
2730
# What index classes to use
2731
index_builder_class = InMemoryGraphIndex
2732
index_class = GraphIndex
2735
def _serializer(self):
2736
return xml7.serializer_v7
2738
def _get_matching_bzrdir(self):
2739
matching = bzrdir.format_registry.make_bzrdir(
2741
matching.repository_format = self
2744
def _ignore_setting_bzrdir(self, format):
2747
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2749
def get_format_string(self):
2750
"""See RepositoryFormat.get_format_string()."""
2751
return "Bazaar RepositoryFormatKnitPack5RichRoot (bzr 1.6)\n"
2753
def get_format_description(self):
2754
return ("Packs 5 rich-root (adds stacking support, requires bzr 1.6)"
2758
class RepositoryFormatKnitPack6(RepositoryFormatPack):
2759
"""A repository with stacking and btree indexes,
2760
without rich roots or subtrees.
2762
This is equivalent to pack-1.6 with B+Tree indices.
2765
repository_class = KnitPackRepository
2766
_commit_builder_class = PackCommitBuilder
2767
supports_external_lookups = True
2768
# What index classes to use
2769
index_builder_class = BTreeBuilder
2770
index_class = BTreeGraphIndex
2773
def _serializer(self):
2774
return xml5.serializer_v5
2776
def _get_matching_bzrdir(self):
2777
return bzrdir.format_registry.make_bzrdir('1.9')
2779
def _ignore_setting_bzrdir(self, format):
2782
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2784
def get_format_string(self):
2785
"""See RepositoryFormat.get_format_string()."""
2786
return "Bazaar RepositoryFormatKnitPack6 (bzr 1.9)\n"
2788
def get_format_description(self):
2789
"""See RepositoryFormat.get_format_description()."""
2790
return "Packs 6 (uses btree indexes, requires bzr 1.9)"
2793
class RepositoryFormatKnitPack6RichRoot(RepositoryFormatPack):
2794
"""A repository with rich roots, no subtrees, stacking and btree indexes.
2796
1.6-rich-root with B+Tree indices.
2799
repository_class = KnitPackRepository
2800
_commit_builder_class = PackRootCommitBuilder
2801
rich_root_data = True
2802
supports_tree_reference = False # no subtrees
2803
supports_external_lookups = True
2804
# What index classes to use
2805
index_builder_class = BTreeBuilder
2806
index_class = BTreeGraphIndex
2809
def _serializer(self):
2810
return xml6.serializer_v6
2812
def _get_matching_bzrdir(self):
2813
return bzrdir.format_registry.make_bzrdir(
2816
def _ignore_setting_bzrdir(self, format):
2819
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2821
def get_format_string(self):
2822
"""See RepositoryFormat.get_format_string()."""
2823
return "Bazaar RepositoryFormatKnitPack6RichRoot (bzr 1.9)\n"
2825
def get_format_description(self):
2826
return "Packs 6 rich-root (uses btree indexes, requires bzr 1.9)"
2829
class RepositoryFormatPackDevelopment2Subtree(RepositoryFormatPack):
2830
"""A subtrees development repository.
2832
This format should be retained until the second release after bzr 1.7.
2834
1.6.1-subtree[as it might have been] with B+Tree indices.
2836
This is [now] retained until we have a CHK based subtree format in
2840
repository_class = KnitPackRepository
2841
_commit_builder_class = PackRootCommitBuilder
2842
rich_root_data = True
2843
supports_tree_reference = True
2844
supports_external_lookups = True
2845
# What index classes to use
2846
index_builder_class = BTreeBuilder
2847
index_class = BTreeGraphIndex
2850
def _serializer(self):
2851
return xml7.serializer_v7
2853
def _get_matching_bzrdir(self):
2854
return bzrdir.format_registry.make_bzrdir(
2855
'development-subtree')
2857
def _ignore_setting_bzrdir(self, format):
2860
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
2862
def get_format_string(self):
2863
"""See RepositoryFormat.get_format_string()."""
2864
return ("Bazaar development format 2 with subtree support "
2865
"(needs bzr.dev from before 1.8)\n")
2867
def get_format_description(self):
2868
"""See RepositoryFormat.get_format_description()."""
2869
return ("Development repository format, currently the same as "
2870
"1.6.1-subtree with B+Tree indices.\n")