1
# groupcompress, a bzr plugin providing improved disk utilisation
2
# Copyright (C) 2008 Canonical Limited.
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License version 2 as published
6
# by the Free Software Foundation.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
"""Repostory formats using B+Tree indices and groupcompress compression."""
31
from bzrlib.btree_index import (
35
from bzrlib.index import GraphIndex, GraphIndexBuilder
36
from bzrlib.repository import InterPackRepo
37
from bzrlib.plugins.groupcompress.groupcompress import (
39
GroupCompressVersionedFiles,
41
from bzrlib.osutils import rand_chars
42
from bzrlib.repofmt.pack_repo import (
46
RepositoryPackCollection,
47
RepositoryFormatPackDevelopment2,
48
RepositoryFormatPackDevelopment2Subtree,
49
RepositoryFormatKnitPack1,
50
RepositoryFormatKnitPack3,
51
RepositoryFormatKnitPack4,
57
from bzrlib.repofmt.pack_repo import (
58
CHKInventoryRepository,
59
RepositoryFormatPackDevelopment5,
60
RepositoryFormatPackDevelopment5Hash16,
61
RepositoryFormatPackDevelopment5Hash255,
70
return self._pack_collection.pack_factory(self._pack_collection,
71
upload_suffix=self.suffix,
72
file_mode=self._pack_collection.repo.bzrdir._get_file_mode())
75
Packer.open_pack = open_pack
78
class GCPack(NewPack):
80
def __init__(self, pack_collection, upload_suffix='', file_mode=None):
81
"""Create a NewPack instance.
83
:param upload_transport: A writable transport for the pack to be
84
incrementally uploaded to.
85
:param index_transport: A writable transport for the pack's indices to
86
be written to when the pack is finished.
87
:param pack_transport: A writable transport for the pack to be renamed
88
to when the upload is complete. This *must* be the same as
89
upload_transport.clone('../packs').
90
:param upload_suffix: An optional suffix to be given to any temporary
91
files created during the pack creation. e.g '.autopack'
92
:param file_mode: An optional file mode to create the new files with.
94
# replaced from bzr.dev to:
95
# - change inventory reference list length to 1
96
# - change texts reference lists to 1
97
# TODO: patch this to be parameterised upstream
99
# The relative locations of the packs are constrained, but all are
100
# passed in because the caller has them, so as to avoid object churn.
101
index_builder_class = pack_collection._index_builder_class
104
if pack_collection.chk_index is not None:
105
chk_index = index_builder_class(reference_lists=0)
109
# Revisions: parents list, no text compression.
110
index_builder_class(reference_lists=1),
111
# Inventory: We want to map compression only, but currently the
112
# knit code hasn't been updated enough to understand that, so we
113
# have a regular 2-list index giving parents and compression
115
index_builder_class(reference_lists=1),
116
# Texts: compression and per file graph, for all fileids - so two
117
# reference lists and two elements in the key tuple.
118
index_builder_class(reference_lists=1, key_elements=2),
119
# Signatures: Just blobs to store, no compression, no parents
121
index_builder_class(reference_lists=0),
122
# CHK based storage - just blobs, no compression or parents.
128
# Revisions: parents list, no text compression.
129
index_builder_class(reference_lists=1),
130
# Inventory: compressed, with graph for compatibility with other
131
# existing bzrlib code.
132
index_builder_class(reference_lists=1),
133
# Texts: per file graph:
134
index_builder_class(reference_lists=1, key_elements=2),
135
# Signatures: Just blobs to store, no compression, no parents
137
index_builder_class(reference_lists=0),
139
self._pack_collection = pack_collection
140
# When we make readonly indices, we need this.
141
self.index_class = pack_collection._index_class
142
# where should the new pack be opened
143
self.upload_transport = pack_collection._upload_transport
144
# where are indices written out to
145
self.index_transport = pack_collection._index_transport
146
# where is the pack renamed to when it is finished?
147
self.pack_transport = pack_collection._pack_transport
148
# What file mode to upload the pack and indices with.
149
self._file_mode = file_mode
150
# tracks the content written to the .pack file.
151
self._hash = md5.new()
152
# a four-tuple with the length in bytes of the indices, once the pack
153
# is finalised. (rev, inv, text, sigs)
154
self.index_sizes = None
155
# How much data to cache when writing packs. Note that this is not
156
# synchronised with reads, because it's not in the transport layer, so
157
# is not safe unless the client knows it won't be reading from the pack
159
self._cache_limit = 0
160
# the temporary pack file name.
161
self.random_name = rand_chars(20) + upload_suffix
162
# when was this pack started ?
163
self.start_time = time.time()
164
# open an output stream for the data added to the pack.
165
self.write_stream = self.upload_transport.open_write_stream(
166
self.random_name, mode=self._file_mode)
167
if 'pack' in debug.debug_flags:
168
mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
169
time.ctime(), self.upload_transport.base, self.random_name,
170
time.time() - self.start_time)
171
# A list of byte sequences to be written to the new pack, and the
172
# aggregate size of them. Stored as a list rather than separate
173
# variables so that the _write_data closure below can update them.
174
self._buffer = [[], 0]
175
# create a callable for adding data
177
# robertc says- this is a closure rather than a method on the object
178
# so that the variables are locals, and faster than accessing object
180
def _write_data(bytes, flush=False, _buffer=self._buffer,
181
_write=self.write_stream.write, _update=self._hash.update):
182
_buffer[0].append(bytes)
183
_buffer[1] += len(bytes)
185
if _buffer[1] > self._cache_limit or flush:
186
bytes = ''.join(_buffer[0])
190
# expose this on self, for the occasion when clients want to add data.
191
self._write_data = _write_data
192
# a pack writer object to serialise pack records.
193
self._writer = pack.ContainerWriter(self._write_data)
195
# what state is the pack in? (open, finished, aborted)
199
RepositoryPackCollection.pack_factory = NewPack
201
class GCRepositoryPackCollection(RepositoryPackCollection):
203
pack_factory = GCPack
205
def _make_index(self, name, suffix):
206
"""Overridden to use BTreeGraphIndex objects."""
207
size_offset = self._suffix_offsets[suffix]
208
index_name = name + suffix
209
index_size = self._names[name][size_offset]
210
return BTreeGraphIndex(
211
self._index_transport, index_name, index_size)
213
def _start_write_group(self):
214
# Overridden to add 'self.pack_factory()'
215
# Do not permit preparation for writing if we're not in a 'write lock'.
216
if not self.repo.is_write_locked():
217
raise errors.NotWriteLocked(self)
218
self._new_pack = self.pack_factory(self, upload_suffix='.pack',
219
file_mode=self.repo.bzrdir._get_file_mode())
220
# allow writing: queue writes to a new index
221
self.revision_index.add_writable_index(self._new_pack.revision_index,
223
self.inventory_index.add_writable_index(self._new_pack.inventory_index,
225
self.text_index.add_writable_index(self._new_pack.text_index,
227
self.signature_index.add_writable_index(self._new_pack.signature_index,
229
if chk_support and self.chk_index is not None:
230
self.chk_index.add_writable_index(self._new_pack.chk_index,
232
self.repo.chk_bytes._index._add_callback = self.chk_index.add_callback
234
self.repo.inventories._index._add_callback = self.inventory_index.add_callback
235
self.repo.revisions._index._add_callback = self.revision_index.add_callback
236
self.repo.signatures._index._add_callback = self.signature_index.add_callback
237
self.repo.texts._index._add_callback = self.text_index.add_callback
239
def _execute_pack_operations(self, pack_operations, _packer_class=Packer,
241
"""Execute a series of pack operations.
243
:param pack_operations: A list of [revision_count, packs_to_combine].
244
:param _packer_class: The class of packer to use (default: Packer).
247
for revision_count, packs in pack_operations:
248
# we may have no-ops from the setup logic
251
# Create a new temp VersionedFile instance based on these packs,
252
# and then just fetch everything into the target
254
# XXX: Find a way to 'set_optimize' on the newly created pack
256
# def open_pack(self):
257
# """Open a pack for the pack we are creating."""
258
# new_pack = super(OptimisingPacker, self).open_pack()
259
# # Turn on the optimization flags for all the index builders.
260
# new_pack.revision_index.set_optimize(for_size=True)
261
# new_pack.inventory_index.set_optimize(for_size=True)
262
# new_pack.text_index.set_optimize(for_size=True)
263
# new_pack.signature_index.set_optimize(for_size=True)
265
to_copy = [('revision_index', 'revisions'),
266
('inventory_index', 'inventories'),
267
('text_index', 'texts'),
268
('signature_index', 'signatures'),
270
if getattr(self, 'chk_index', None) is not None:
271
to_copy.insert(2, ('chk_index', 'chk_bytes'))
273
# Shouldn't we start_write_group around this?
274
if self._new_pack is not None:
275
raise errors.BzrError('call to %s.pack() while another pack is'
277
% (self.__class__.__name__,))
278
new_pack = self.pack_factory(self, 'autopack',
279
self.repo.bzrdir._get_file_mode())
280
new_pack.set_write_cache_size(1024*1024)
281
# TODO: A better alternative is to probably use Packer.open_pack(), and
282
# then create a GroupCompressVersionedFiles() around the
283
# target pack to insert into.
284
pb = ui.ui_factory.nested_progress_bar()
286
for idx, (index_name, vf_name) in enumerate(to_copy):
287
pb.update('repacking %s' % (vf_name,), idx + 1, len(to_copy))
289
new_index = getattr(new_pack, index_name)
290
new_index.set_optimize(for_size=True)
292
source_index = getattr(pack, index_name)
293
keys.update(e[1] for e in source_index.iter_all_entries())
294
source_vf = getattr(self.repo, vf_name)
295
target_access = knit._DirectPackAccess({})
296
target_access.set_writer(new_pack._writer, new_index,
297
new_pack.access_tuple())
298
target_vf = GroupCompressVersionedFiles(
299
_GCGraphIndex(new_index,
300
add_callback=new_index.add_nodes,
301
parents=source_vf._index._parents,
302
is_locked=self.repo.is_locked),
303
access=target_access,
304
delta=source_vf._delta)
305
stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
306
target_vf.insert_record_stream(stream)
307
new_pack._check_references() # shouldn't be needed
314
if not new_pack.data_inserted():
315
raise AssertionError('We copied from pack files,'
316
' but had no data copied')
317
# we need to abort somehow, because we don't want to remove
320
self.allocate(new_pack)
322
self._remove_pack_from_memory(pack)
323
# record the newly available packs and stop advertising the old
325
self._save_pack_names(clear_obsolete_packs=True)
326
# Move the old packs out of the way now they are no longer referenced.
327
for revision_count, packs in pack_operations:
328
self._obsolete_packs(packs)
332
class GCPackRepository(KnitPackRepository):
333
"""GC customisation of KnitPackRepository."""
335
def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
337
"""Overridden to change pack collection class."""
338
KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
339
_commit_builder_class, _serializer)
340
# and now replace everything it did :)
341
index_transport = self._transport.clone('indices')
343
self._pack_collection = GCRepositoryPackCollection(self,
344
self._transport, index_transport,
345
self._transport.clone('upload'),
346
self._transport.clone('packs'),
347
_format.index_builder_class,
349
use_chk_index=self._format.supports_chks,
352
self._pack_collection = GCRepositoryPackCollection(self,
353
self._transport, index_transport,
354
self._transport.clone('upload'),
355
self._transport.clone('packs'),
356
_format.index_builder_class,
358
self.inventories = GroupCompressVersionedFiles(
359
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
360
add_callback=self._pack_collection.inventory_index.add_callback,
361
parents=True, is_locked=self.is_locked),
362
access=self._pack_collection.inventory_index.data_access)
363
self.revisions = GroupCompressVersionedFiles(
364
_GCGraphIndex(self._pack_collection.revision_index.combined_index,
365
add_callback=self._pack_collection.revision_index.add_callback,
366
parents=True, is_locked=self.is_locked),
367
access=self._pack_collection.revision_index.data_access,
369
self.signatures = GroupCompressVersionedFiles(
370
_GCGraphIndex(self._pack_collection.signature_index.combined_index,
371
add_callback=self._pack_collection.signature_index.add_callback,
372
parents=False, is_locked=self.is_locked),
373
access=self._pack_collection.signature_index.data_access,
375
self.texts = GroupCompressVersionedFiles(
376
_GCGraphIndex(self._pack_collection.text_index.combined_index,
377
add_callback=self._pack_collection.text_index.add_callback,
378
parents=True, is_locked=self.is_locked),
379
access=self._pack_collection.text_index.data_access)
380
if chk_support and _format.supports_chks:
381
# No graph, no compression:- references from chks are between
382
# different objects not temporal versions of the same; and without
383
# some sort of temporal structure knit compression will just fail.
384
self.chk_bytes = GroupCompressVersionedFiles(
385
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
386
add_callback=self._pack_collection.chk_index.add_callback,
387
parents=False, is_locked=self.is_locked),
388
access=self._pack_collection.chk_index.data_access)
390
self.chk_bytes = None
391
# True when the repository object is 'write locked' (as opposed to the
392
# physical lock only taken out around changes to the pack-names list.)
393
# Another way to represent this would be a decorator around the control
394
# files object that presents logical locks as physical ones - if this
395
# gets ugly consider that alternative design. RBC 20071011
396
self._write_lock_count = 0
397
self._transaction = None
399
self._reconcile_does_inventory_gc = True
400
self._reconcile_fixes_text_parents = True
401
self._reconcile_backsup_inventory = False
402
# Note: We cannot unpack a delta that references a text we haven't seen yet.
403
# there are 2 options, work in fulltexts, or require topological
404
# sorting. Using fulltexts is more optimal for local operations,
405
# because the source can be smart about extracting multiple
406
# in-a-row (and sharing strings). Topological is better for
407
# remote, because we access less data.
408
self._fetch_order = 'unordered'
409
self._fetch_gc_optimal = True
410
self._fetch_uses_deltas = False
414
class GCCHKPackRepository(CHKInventoryRepository):
415
"""GC customisation of CHKInventoryRepository."""
417
def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
419
"""Overridden to change pack collection class."""
420
KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
421
_commit_builder_class, _serializer)
422
# and now replace everything it did :)
423
index_transport = self._transport.clone('indices')
424
self._pack_collection = GCRepositoryPackCollection(self,
425
self._transport, index_transport,
426
self._transport.clone('upload'),
427
self._transport.clone('packs'),
428
_format.index_builder_class,
430
use_chk_index=self._format.supports_chks,
432
self.inventories = GroupCompressVersionedFiles(
433
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
434
add_callback=self._pack_collection.inventory_index.add_callback,
435
parents=True, is_locked=self.is_locked),
436
access=self._pack_collection.inventory_index.data_access)
437
self.revisions = GroupCompressVersionedFiles(
438
_GCGraphIndex(self._pack_collection.revision_index.combined_index,
439
add_callback=self._pack_collection.revision_index.add_callback,
440
parents=True, is_locked=self.is_locked),
441
access=self._pack_collection.revision_index.data_access,
443
self.signatures = GroupCompressVersionedFiles(
444
_GCGraphIndex(self._pack_collection.signature_index.combined_index,
445
add_callback=self._pack_collection.signature_index.add_callback,
446
parents=False, is_locked=self.is_locked),
447
access=self._pack_collection.signature_index.data_access,
449
self.texts = GroupCompressVersionedFiles(
450
_GCGraphIndex(self._pack_collection.text_index.combined_index,
451
add_callback=self._pack_collection.text_index.add_callback,
452
parents=True, is_locked=self.is_locked),
453
access=self._pack_collection.text_index.data_access)
454
assert _format.supports_chks
455
# No parents, individual CHK pages don't have specific ancestry
456
self.chk_bytes = GroupCompressVersionedFiles(
457
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
458
add_callback=self._pack_collection.chk_index.add_callback,
459
parents=False, is_locked=self.is_locked),
460
access=self._pack_collection.chk_index.data_access)
461
# True when the repository object is 'write locked' (as opposed to the
462
# physical lock only taken out around changes to the pack-names list.)
463
# Another way to represent this would be a decorator around the control
464
# files object that presents logical locks as physical ones - if this
465
# gets ugly consider that alternative design. RBC 20071011
466
self._write_lock_count = 0
467
self._transaction = None
469
self._reconcile_does_inventory_gc = True
470
self._reconcile_fixes_text_parents = True
471
self._reconcile_backsup_inventory = False
472
self._fetch_order = 'unordered'
473
self._fetch_gc_optimal = True
474
self._fetch_uses_deltas = False
477
class RepositoryFormatPackGCPlain(RepositoryFormatPackDevelopment2):
478
"""A B+Tree index using pack repository."""
480
repository_class = GCPackRepository
482
def get_format_string(self):
483
"""See RepositoryFormat.get_format_string()."""
484
return ("Bazaar development format - btree+gc "
485
"(needs bzr.dev from 1.6)\n")
487
def get_format_description(self):
488
"""See RepositoryFormat.get_format_description()."""
489
return ("Development repository format - btree+groupcompress "
490
", interoperates with pack-0.92\n")
493
class RepositoryFormatPackGCRichRoot(RepositoryFormatKnitPack4):
494
"""A B+Tree index using pack repository."""
496
repository_class = GCPackRepository
498
def get_format_string(self):
499
"""See RepositoryFormat.get_format_string()."""
500
return ("Bazaar development format - btree+gc-rich-root "
501
"(needs bzr.dev from 1.6)\n")
503
def get_format_description(self):
504
"""See RepositoryFormat.get_format_description()."""
505
return ("Development repository format - btree+groupcompress "
506
", interoperates with rich-root-pack\n")
509
class RepositoryFormatPackGCSubtrees(RepositoryFormatPackDevelopment2Subtree):
510
"""A B+Tree index using pack repository."""
512
repository_class = GCPackRepository
514
def get_format_string(self):
515
"""See RepositoryFormat.get_format_string()."""
516
return ("Bazaar development format - btree+gc-subtrees "
517
"(needs bzr.dev from 1.6)\n")
519
def get_format_description(self):
520
"""See RepositoryFormat.get_format_description()."""
521
return ("Development repository format - btree+groupcompress "
522
", interoperates with pack-0.92-subtrees\n")
525
class RepositoryFormatPackGCPlainCHK(RepositoryFormatPackDevelopment5):
526
"""A CHK+group compress pack repository."""
528
repository_class = GCCHKPackRepository
530
def get_format_string(self):
531
"""See RepositoryFormat.get_format_string()."""
532
return ('Bazaar development format - chk+gc'
533
' (needs bzr.dev from 1.13)\n')
535
def get_format_description(self):
536
"""See RepositoryFormat.get_format_description()."""
537
return ("Development repository format - chk+groupcompress")
540
class RepositoryFormatPackGCPlainCHK16(RepositoryFormatPackDevelopment5Hash16):
541
"""A hashed CHK+group compress pack repository."""
543
repository_class = GCCHKPackRepository
545
def get_format_string(self):
546
"""See RepositoryFormat.get_format_string()."""
547
return ('Bazaar development format - hash16chk+gc'
548
' (needs bzr.dev from 1.13)\n')
550
def get_format_description(self):
551
"""See RepositoryFormat.get_format_description()."""
552
return ("Development repository format - hash16chk+groupcompress")
555
class RepositoryFormatPackGCPlainCHK255(RepositoryFormatPackDevelopment5Hash255):
556
"""A hashed CHK+group compress pack repository."""
558
repository_class = GCCHKPackRepository
560
def get_format_string(self):
561
"""See RepositoryFormat.get_format_string()."""
562
return ('Bazaar development format - hash255chk+gc'
563
' (needs bzr.dev from 1.13)\n')
565
def get_format_description(self):
566
"""See RepositoryFormat.get_format_description()."""
567
return ("Development repository format - hash255chk+groupcompress")
570
def pack_incompatible(source, target, orig_method=InterPackRepo.is_compatible):
571
"""Be incompatible with the regular fetch code."""
572
formats = (RepositoryFormatPackGCPlain, RepositoryFormatPackGCRichRoot,
573
RepositoryFormatPackGCSubtrees)
575
formats = formats + (RepositoryFormatPackGCPlainCHK,
576
RepositoryFormatPackGCPlainCHK16,
577
RepositoryFormatPackGCPlainCHK255)
578
if isinstance(source._format, formats) or isinstance(target._format, formats):
581
return orig_method(source, target)
584
InterPackRepo.is_compatible = staticmethod(pack_incompatible)