475
475
self.knit_access.set_writer(None, None, (None, None))
478
class RepositoryPackCollection(object):
479
"""Management of packs within a repository."""
481
def __init__(self, repo, transport, index_transport, upload_transport,
483
"""Create a new RepositoryPackCollection.
485
:param transport: Addresses the repository base directory
486
(typically .bzr/repository/).
487
:param index_transport: Addresses the directory containing indices.
488
:param upload_transport: Addresses the directory into which packs are written
489
while they're being created.
490
:param pack_transport: Addresses the directory of existing complete packs.
493
self.transport = transport
494
self._index_transport = index_transport
495
self._upload_transport = upload_transport
496
self._pack_transport = pack_transport
497
self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3}
500
self._packs_by_name = {}
501
# the previous pack-names content
502
self._packs_at_load = None
503
# when a pack is being created by this object, the state of that pack.
504
self._new_pack = None
505
# aggregated revision index data
506
self.revision_index = AggregateIndex()
507
self.inventory_index = AggregateIndex()
508
self.text_index = AggregateIndex()
509
self.signature_index = AggregateIndex()
511
def add_pack_to_memory(self, pack):
512
"""Make a Pack object available to the repository to satisfy queries.
514
:param pack: A Pack object.
516
assert pack.name not in self._packs_by_name
517
self.packs.append(pack)
518
self._packs_by_name[pack.name] = pack
519
self.revision_index.add_index(pack.revision_index, pack)
520
self.inventory_index.add_index(pack.inventory_index, pack)
521
self.text_index.add_index(pack.text_index, pack)
522
self.signature_index.add_index(pack.signature_index, pack)
524
def _add_text_to_weave(self, file_id, revision_id, new_lines, parents,
525
nostore_sha, random_revid):
526
file_id_index = GraphIndexPrefixAdapter(
527
self.text_index.combined_index,
529
add_nodes_callback=self.text_index.add_callback)
530
self.repo._text_knit._index._graph_index = file_id_index
531
self.repo._text_knit._index._add_callback = file_id_index.add_nodes
532
return self.repo._text_knit.add_lines_with_ghosts(
533
revision_id, parents, new_lines, nostore_sha=nostore_sha,
534
random_id=random_revid, check_content=False)[0:2]
537
"""Return a list of all the Pack objects this repository has.
539
Note that an in-progress pack being created is not returned.
541
:return: A list of Pack objects for all the packs in the repository.
544
for name in self.names():
545
result.append(self.get_pack_by_name(name))
549
"""Pack the pack collection incrementally.
551
This will not attempt global reorganisation or recompression,
552
rather it will just ensure that the total number of packs does
553
not grow without bound. It uses the _max_pack_count method to
554
determine if autopacking is needed, and the pack_distribution
555
method to determine the number of revisions in each pack.
557
If autopacking takes place then the packs name collection will have
558
been flushed to disk - packing requires updating the name collection
559
in synchronisation with certain steps. Otherwise the names collection
562
:return: True if packing took place.
564
# XXX: Should not be needed when the management of indices is sane.
565
total_revisions = self.revision_index.combined_index.key_count()
566
total_packs = len(self._names)
567
if self._max_pack_count(total_revisions) >= total_packs:
569
# XXX: the following may want to be a class, to pack with a given
571
mutter('Auto-packing repository %s, which has %d pack files, '
572
'containing %d revisions into %d packs.', self, total_packs,
573
total_revisions, self._max_pack_count(total_revisions))
574
# determine which packs need changing
575
pack_distribution = self.pack_distribution(total_revisions)
577
for pack in self.all_packs():
578
revision_count = pack.get_revision_count()
579
if revision_count == 0:
580
# revision less packs are not generated by normal operation,
581
# only by operations like sign-my-commits, and thus will not
582
# tend to grow rapdily or without bound like commit containing
583
# packs do - leave them alone as packing them really should
584
# group their data with the relevant commit, and that may
585
# involve rewriting ancient history - which autopack tries to
586
# avoid. Alternatively we could not group the data but treat
587
# each of these as having a single revision, and thus add
588
# one revision for each to the total revision count, to get
589
# a matching distribution.
591
existing_packs.append((revision_count, pack))
592
pack_operations = self.plan_autopack_combinations(
593
existing_packs, pack_distribution)
594
self._execute_pack_operations(pack_operations)
597
def create_pack_from_packs(self, packs, suffix, revision_ids=None):
478
class Packer(object):
479
"""Create a pack from packs."""
481
def __init__(self, pack_collection, packs, suffix, revision_ids=None):
484
self.revision_ids = revision_ids
485
self._pack_collection = pack_collection
598
488
"""Create a new pack by reading data from other packs.
600
490
This does little more than a bulk copy of data. One key difference
614
504
# - which has already been flushed, so its safe.
615
505
# XXX: - duplicate code warning with start_write_group; fix before
616
506
# considering 'done'.
617
if self._new_pack is not None:
507
if self._pack_collection._new_pack is not None:
618
508
raise errors.BzrError('call to create_pack_from_packs while '
619
509
'another pack is being written.')
620
if revision_ids is not None:
621
if len(revision_ids) == 0:
510
if self.revision_ids is not None:
511
if len(self.revision_ids) == 0:
622
512
# silly fetch request.
625
revision_ids = frozenset(revision_ids)
626
pb = ui.ui_factory.nested_progress_bar()
515
self.revision_ids = frozenset(self.revision_ids)
516
self.pb = ui.ui_factory.nested_progress_bar()
628
return self._create_pack_from_packs(packs, suffix, revision_ids,
518
return self._create_pack_from_packs()
633
def _create_pack_from_packs(self, packs, suffix, revision_ids, pb):
634
pb.update("Opening pack", 0, 5)
635
new_pack = NewPack(self._upload_transport, self._index_transport,
636
self._pack_transport, upload_suffix=suffix)
523
"""Open a pack for the pack we are creating."""
524
return NewPack(self._pack_collection._upload_transport,
525
self._pack_collection._index_transport,
526
self._pack_collection._pack_transport, upload_suffix=self.suffix)
528
def _create_pack_from_packs(self):
529
self.pb.update("Opening pack", 0, 5)
530
new_pack = self.open_pack()
637
531
# buffer data - we won't be reading-back during the pack creation and
638
532
# this makes a significant difference on sftp pushes.
639
533
new_pack.set_write_cache_size(1024*1024)
640
534
if 'pack' in debug.debug_flags:
641
535
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
643
if revision_ids is not None:
644
rev_count = len(revision_ids)
536
for a_pack in self.packs]
537
if self.revision_ids is not None:
538
rev_count = len(self.revision_ids)
646
540
rev_count = 'all'
647
541
mutter('%s: create_pack: creating pack from source packs: '
648
542
'%s%s %s revisions wanted %s t=0',
649
time.ctime(), self._upload_transport.base, new_pack.random_name,
543
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
650
544
plain_pack_list, rev_count)
651
545
# select revisions
653
revision_keys = [(revision_id,) for revision_id in revision_ids]
546
if self.revision_ids:
547
revision_keys = [(revision_id,) for revision_id in self.revision_ids]
655
549
revision_keys = None
657
551
# select revision keys
658
revision_index_map = self._packs_list_to_pack_map_and_index_list(
659
packs, 'revision_index')[0]
660
revision_nodes = self._index_contents(revision_index_map, revision_keys)
552
revision_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
553
self.packs, 'revision_index')[0]
554
revision_nodes = self._pack_collection._index_contents(revision_index_map, revision_keys)
661
555
# copy revision keys and adjust values
662
pb.update("Copying revision texts", 1)
556
self.pb.update("Copying revision texts", 1)
663
557
list(self._copy_nodes_graph(revision_nodes, revision_index_map,
664
558
new_pack._writer, new_pack.revision_index))
665
559
if 'pack' in debug.debug_flags:
666
560
mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
667
time.ctime(), self._upload_transport.base, new_pack.random_name,
561
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
668
562
new_pack.revision_index.key_count(),
669
563
time.time() - new_pack.start_time)
670
564
# select inventory keys
718
612
raise errors.RevisionNotPresent(a_missing_key[1],
719
613
a_missing_key[0])
720
614
# copy text keys and adjust values
721
pb.update("Copying content texts", 3)
615
self.pb.update("Copying content texts", 3)
722
616
list(self._copy_nodes_graph(text_nodes, text_index_map,
723
617
new_pack._writer, new_pack.text_index))
724
618
if 'pack' in debug.debug_flags:
725
619
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
726
time.ctime(), self._upload_transport.base, new_pack.random_name,
620
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
727
621
new_pack.text_index.key_count(),
728
622
time.time() - new_pack.start_time)
729
623
# select signature keys
730
624
signature_filter = revision_keys # same keyspace
731
signature_index_map = self._packs_list_to_pack_map_and_index_list(
732
packs, 'signature_index')[0]
733
signature_nodes = self._index_contents(signature_index_map,
625
signature_index_map = self._pack_collection._packs_list_to_pack_map_and_index_list(
626
self.packs, 'signature_index')[0]
627
signature_nodes = self._pack_collection._index_contents(signature_index_map,
734
628
signature_filter)
735
629
# copy signature keys and adjust values
736
pb.update("Copying signature texts", 4)
630
self.pb.update("Copying signature texts", 4)
737
631
self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer,
738
632
new_pack.signature_index)
739
633
if 'pack' in debug.debug_flags:
740
634
mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
741
time.ctime(), self._upload_transport.base, new_pack.random_name,
635
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
742
636
new_pack.signature_index.key_count(),
743
637
time.time() - new_pack.start_time)
744
638
if not new_pack.data_inserted():
747
pb.update("Finishing pack", 5)
641
self.pb.update("Finishing pack", 5)
748
642
new_pack.finish()
749
self.allocate(new_pack)
643
self._pack_collection.allocate(new_pack)
646
def _copy_nodes(self, nodes, index_map, writer, write_index):
647
"""Copy knit nodes between packs with no graph references."""
648
pb = ui.ui_factory.nested_progress_bar()
650
return self._do_copy_nodes(nodes, index_map, writer,
655
def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb):
656
# for record verification
657
knit_data = _KnitData(None)
658
# plan a readv on each source pack:
660
nodes = sorted(nodes)
661
# how to map this into knit.py - or knit.py into this?
662
# we don't want the typical knit logic, we want grouping by pack
663
# at this point - perhaps a helper library for the following code
664
# duplication points?
666
for index, key, value in nodes:
667
if index not in request_groups:
668
request_groups[index] = []
669
request_groups[index].append((key, value))
671
pb.update("Copied record", record_index, len(nodes))
672
for index, items in request_groups.iteritems():
673
pack_readv_requests = []
674
for key, value in items:
675
# ---- KnitGraphIndex.get_position
676
bits = value[1:].split(' ')
677
offset, length = int(bits[0]), int(bits[1])
678
pack_readv_requests.append((offset, length, (key, value[0])))
679
# linear scan up the pack
680
pack_readv_requests.sort()
682
transport, path = index_map[index]
683
reader = pack.make_readv_reader(transport, path,
684
[offset[0:2] for offset in pack_readv_requests])
685
for (names, read_func), (_1, _2, (key, eol_flag)) in \
686
izip(reader.iter_records(), pack_readv_requests):
687
raw_data = read_func(None)
688
# check the header only
689
df, _ = knit_data._parse_record_header(key[-1], raw_data)
691
pos, size = writer.add_bytes_record(raw_data, names)
692
write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
693
pb.update("Copied record", record_index)
696
def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
698
"""Copy knit nodes between packs.
700
:param output_lines: Return lines present in the copied data as
703
pb = ui.ui_factory.nested_progress_bar()
705
return self._do_copy_nodes_graph(nodes, index_map, writer,
706
write_index, output_lines, pb)
710
def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
712
# for record verification
713
knit_data = _KnitData(None)
714
# for line extraction when requested (inventories only)
716
factory = knit.KnitPlainFactory()
717
# plan a readv on each source pack:
719
nodes = sorted(nodes)
720
# how to map this into knit.py - or knit.py into this?
721
# we don't want the typical knit logic, we want grouping by pack
722
# at this point - perhaps a helper library for the following code
723
# duplication points?
726
pb.update("Copied record", record_index, len(nodes))
727
for index, key, value, references in nodes:
728
if index not in request_groups:
729
request_groups[index] = []
730
request_groups[index].append((key, value, references))
731
for index, items in request_groups.iteritems():
732
pack_readv_requests = []
733
for key, value, references in items:
734
# ---- KnitGraphIndex.get_position
735
bits = value[1:].split(' ')
736
offset, length = int(bits[0]), int(bits[1])
737
pack_readv_requests.append((offset, length, (key, value[0], references)))
738
# linear scan up the pack
739
pack_readv_requests.sort()
741
transport, path = index_map[index]
742
reader = pack.make_readv_reader(transport, path,
743
[offset[0:2] for offset in pack_readv_requests])
744
for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
745
izip(reader.iter_records(), pack_readv_requests):
746
raw_data = read_func(None)
748
# read the entire thing
749
content, _ = knit_data._parse_record(key[-1], raw_data)
750
if len(references[-1]) == 0:
751
line_iterator = factory.get_fulltext_content(content)
753
line_iterator = factory.get_linedelta_content(content)
754
for line in line_iterator:
757
# check the header only
758
df, _ = knit_data._parse_record_header(key[-1], raw_data)
760
pos, size = writer.add_bytes_record(raw_data, names)
761
write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
762
pb.update("Copied record", record_index)
767
class RepositoryPackCollection(object):
768
"""Management of packs within a repository."""
770
def __init__(self, repo, transport, index_transport, upload_transport,
772
"""Create a new RepositoryPackCollection.
774
:param transport: Addresses the repository base directory
775
(typically .bzr/repository/).
776
:param index_transport: Addresses the directory containing indices.
777
:param upload_transport: Addresses the directory into which packs are written
778
while they're being created.
779
:param pack_transport: Addresses the directory of existing complete packs.
782
self.transport = transport
783
self._index_transport = index_transport
784
self._upload_transport = upload_transport
785
self._pack_transport = pack_transport
786
self._suffix_offsets = {'.rix': 0, '.iix': 1, '.tix': 2, '.six': 3}
789
self._packs_by_name = {}
790
# the previous pack-names content
791
self._packs_at_load = None
792
# when a pack is being created by this object, the state of that pack.
793
self._new_pack = None
794
# aggregated revision index data
795
self.revision_index = AggregateIndex()
796
self.inventory_index = AggregateIndex()
797
self.text_index = AggregateIndex()
798
self.signature_index = AggregateIndex()
800
def add_pack_to_memory(self, pack):
801
"""Make a Pack object available to the repository to satisfy queries.
803
:param pack: A Pack object.
805
assert pack.name not in self._packs_by_name
806
self.packs.append(pack)
807
self._packs_by_name[pack.name] = pack
808
self.revision_index.add_index(pack.revision_index, pack)
809
self.inventory_index.add_index(pack.inventory_index, pack)
810
self.text_index.add_index(pack.text_index, pack)
811
self.signature_index.add_index(pack.signature_index, pack)
813
def _add_text_to_weave(self, file_id, revision_id, new_lines, parents,
814
nostore_sha, random_revid):
815
file_id_index = GraphIndexPrefixAdapter(
816
self.text_index.combined_index,
818
add_nodes_callback=self.text_index.add_callback)
819
self.repo._text_knit._index._graph_index = file_id_index
820
self.repo._text_knit._index._add_callback = file_id_index.add_nodes
821
return self.repo._text_knit.add_lines_with_ghosts(
822
revision_id, parents, new_lines, nostore_sha=nostore_sha,
823
random_id=random_revid, check_content=False)[0:2]
826
"""Return a list of all the Pack objects this repository has.
828
Note that an in-progress pack being created is not returned.
830
:return: A list of Pack objects for all the packs in the repository.
833
for name in self.names():
834
result.append(self.get_pack_by_name(name))
838
"""Pack the pack collection incrementally.
840
This will not attempt global reorganisation or recompression,
841
rather it will just ensure that the total number of packs does
842
not grow without bound. It uses the _max_pack_count method to
843
determine if autopacking is needed, and the pack_distribution
844
method to determine the number of revisions in each pack.
846
If autopacking takes place then the packs name collection will have
847
been flushed to disk - packing requires updating the name collection
848
in synchronisation with certain steps. Otherwise the names collection
851
:return: True if packing took place.
853
# XXX: Should not be needed when the management of indices is sane.
854
total_revisions = self.revision_index.combined_index.key_count()
855
total_packs = len(self._names)
856
if self._max_pack_count(total_revisions) >= total_packs:
858
# XXX: the following may want to be a class, to pack with a given
860
mutter('Auto-packing repository %s, which has %d pack files, '
861
'containing %d revisions into %d packs.', self, total_packs,
862
total_revisions, self._max_pack_count(total_revisions))
863
# determine which packs need changing
864
pack_distribution = self.pack_distribution(total_revisions)
866
for pack in self.all_packs():
867
revision_count = pack.get_revision_count()
868
if revision_count == 0:
869
# revision less packs are not generated by normal operation,
870
# only by operations like sign-my-commits, and thus will not
871
# tend to grow rapdily or without bound like commit containing
872
# packs do - leave them alone as packing them really should
873
# group their data with the relevant commit, and that may
874
# involve rewriting ancient history - which autopack tries to
875
# avoid. Alternatively we could not group the data but treat
876
# each of these as having a single revision, and thus add
877
# one revision for each to the total revision count, to get
878
# a matching distribution.
880
existing_packs.append((revision_count, pack))
881
pack_operations = self.plan_autopack_combinations(
882
existing_packs, pack_distribution)
883
self._execute_pack_operations(pack_operations)
752
886
def _execute_pack_operations(self, pack_operations):
753
887
"""Execute a series of pack operations.
842
975
return pack_operations
844
def _copy_nodes(self, nodes, index_map, writer, write_index):
845
"""Copy knit nodes between packs with no graph references."""
846
pb = ui.ui_factory.nested_progress_bar()
848
return self._do_copy_nodes(nodes, index_map, writer,
853
def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb):
854
# for record verification
855
knit_data = _KnitData(None)
856
# plan a readv on each source pack:
858
nodes = sorted(nodes)
859
# how to map this into knit.py - or knit.py into this?
860
# we don't want the typical knit logic, we want grouping by pack
861
# at this point - perhaps a helper library for the following code
862
# duplication points?
864
for index, key, value in nodes:
865
if index not in request_groups:
866
request_groups[index] = []
867
request_groups[index].append((key, value))
869
pb.update("Copied record", record_index, len(nodes))
870
for index, items in request_groups.iteritems():
871
pack_readv_requests = []
872
for key, value in items:
873
# ---- KnitGraphIndex.get_position
874
bits = value[1:].split(' ')
875
offset, length = int(bits[0]), int(bits[1])
876
pack_readv_requests.append((offset, length, (key, value[0])))
877
# linear scan up the pack
878
pack_readv_requests.sort()
880
transport, path = index_map[index]
881
reader = pack.make_readv_reader(transport, path,
882
[offset[0:2] for offset in pack_readv_requests])
883
for (names, read_func), (_1, _2, (key, eol_flag)) in \
884
izip(reader.iter_records(), pack_readv_requests):
885
raw_data = read_func(None)
886
# check the header only
887
df, _ = knit_data._parse_record_header(key[-1], raw_data)
889
pos, size = writer.add_bytes_record(raw_data, names)
890
write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
891
pb.update("Copied record", record_index)
894
def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
896
"""Copy knit nodes between packs.
898
:param output_lines: Return lines present in the copied data as
901
pb = ui.ui_factory.nested_progress_bar()
903
return self._do_copy_nodes_graph(nodes, index_map, writer,
904
write_index, output_lines, pb)
908
def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
910
# for record verification
911
knit_data = _KnitData(None)
912
# for line extraction when requested (inventories only)
914
factory = knit.KnitPlainFactory()
915
# plan a readv on each source pack:
917
nodes = sorted(nodes)
918
# how to map this into knit.py - or knit.py into this?
919
# we don't want the typical knit logic, we want grouping by pack
920
# at this point - perhaps a helper library for the following code
921
# duplication points?
924
pb.update("Copied record", record_index, len(nodes))
925
for index, key, value, references in nodes:
926
if index not in request_groups:
927
request_groups[index] = []
928
request_groups[index].append((key, value, references))
929
for index, items in request_groups.iteritems():
930
pack_readv_requests = []
931
for key, value, references in items:
932
# ---- KnitGraphIndex.get_position
933
bits = value[1:].split(' ')
934
offset, length = int(bits[0]), int(bits[1])
935
pack_readv_requests.append((offset, length, (key, value[0], references)))
936
# linear scan up the pack
937
pack_readv_requests.sort()
939
transport, path = index_map[index]
940
reader = pack.make_readv_reader(transport, path,
941
[offset[0:2] for offset in pack_readv_requests])
942
for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
943
izip(reader.iter_records(), pack_readv_requests):
944
raw_data = read_func(None)
946
# read the entire thing
947
content, _ = knit_data._parse_record(key[-1], raw_data)
948
if len(references[-1]) == 0:
949
line_iterator = factory.get_fulltext_content(content)
951
line_iterator = factory.get_linedelta_content(content)
952
for line in line_iterator:
955
# check the header only
956
df, _ = knit_data._parse_record_header(key[-1], raw_data)
958
pos, size = writer.add_bytes_record(raw_data, names)
959
write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
960
pb.update("Copied record", record_index)
963
977
def ensure_loaded(self):
964
978
# NB: if you see an assertion error here, its probably access against
965
979
# an unlocked repo. Naughty.