~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/pack_repo.py

  • Committer: Robert Collins
  • Date: 2007-09-25 01:47:51 UTC
  • mfrom: (2592.5.17 pack-repository)
  • mto: This revision was merged to the branch mainline in revision 2933.
  • Revision ID: robertc@robertcollins.net-20070925014751-skcw309yznr5czka
Merge Martins refactoring work.

Show diffs side-by-side

added added

removed removed

Lines of Context:
106
106
 
107
107
 
108
108
class RepositoryPackCollection(object):
109
 
 
110
 
    def __init__(self, repo, transport):
 
109
    """Management of packs within a repository."""
 
110
 
 
111
    def __init__(self, repo, transport, index_transport, upload_transport,
 
112
                 pack_transport):
 
113
        """Create a new RepositoryPackCollection.
 
114
 
 
115
        :param transport: Addresses the repository base directory 
 
116
            (typically .bzr/repository/).
 
117
        :param index_transport: Addresses the directory containing indexes.
 
118
        :param upload_transport: Addresses the directory into which packs are written
 
119
            while they're being created.
 
120
        :param pack_transport: Addresses the directory of existing complete packs.
 
121
        """
111
122
        self.repo = repo
112
123
        self.transport = transport
 
124
        self._index_transport = index_transport
 
125
        self._upload_transport = upload_transport
 
126
        self._pack_transport = pack_transport
113
127
        self.packs = []
114
128
 
115
129
    def add_pack_to_memory(self, pack):
259
273
                rev_count = 'all'
260
274
            mutter('%s: create_pack: creating pack from source packs: '
261
275
                '%s%s %s revisions wanted %s t=0',
262
 
                time.ctime(), self.repo._upload_transport.base, random_name,
 
276
                time.ctime(), self._upload_transport.base, random_name,
263
277
                plain_pack_list, rev_count)
264
278
            start_time = time.time()
265
 
        write_stream = self.repo._upload_transport.open_write_stream(random_name)
 
279
        write_stream = self._upload_transport.open_write_stream(random_name)
266
280
        if 'fetch' in debug.debug_flags:
267
281
            mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
268
 
                time.ctime(), self.repo._upload_transport.base, random_name,
 
282
                time.ctime(), self._upload_transport.base, random_name,
269
283
                time.time() - start_time)
270
284
        pack_hash = md5.new()
271
285
        buffer = []
294
308
            revision_index))
295
309
        if 'fetch' in debug.debug_flags:
296
310
            mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
297
 
                time.ctime(), self.repo._upload_transport.base, random_name,
 
311
                time.ctime(), self._upload_transport.base, random_name,
298
312
                revision_index.key_count(),
299
313
                time.time() - start_time)
300
314
        # select inventory keys
321
335
            text_filter = None
322
336
        if 'fetch' in debug.debug_flags:
323
337
            mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
324
 
                time.ctime(), self.repo._upload_transport.base, random_name,
 
338
                time.ctime(), self._upload_transport.base, random_name,
325
339
                inv_index.key_count(),
326
340
                time.time() - start_time)
327
341
        # select text keys
347
361
            text_index))
348
362
        if 'fetch' in debug.debug_flags:
349
363
            mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
350
 
                time.ctime(), self.repo._upload_transport.base, random_name,
 
364
                time.ctime(), self._upload_transport.base, random_name,
351
365
                text_index.key_count(),
352
366
                time.time() - start_time)
353
367
        # select signature keys
358
372
        self._copy_nodes(signature_nodes, signature_index_map, writer, signature_index)
359
373
        if 'fetch' in debug.debug_flags:
360
374
            mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
361
 
                time.ctime(), self.repo._upload_transport.base, random_name,
 
375
                time.ctime(), self._upload_transport.base, random_name,
362
376
                signature_index.key_count(),
363
377
                time.time() - start_time)
364
378
        # finish the pack
374
388
            text_index.key_count(),
375
389
            signature_index.key_count(),
376
390
            )):
377
 
            self.repo._upload_transport.delete(random_name)
 
391
            self._upload_transport.delete(random_name)
378
392
            return None
379
393
        result = Pack()
380
394
        result.name = new_name
381
 
        result.transport = self.repo._upload_transport.clone('../packs/')
 
395
        result.transport = self._upload_transport.clone('../packs/')
382
396
        # write indices
383
 
        index_transport = self.repo._upload_transport.clone('../indices')
 
397
        index_transport = self._index_transport
384
398
        rev_index_name = self.repo._revision_store.name_to_revision_index_name(new_name)
385
399
        revision_index_length = index_transport.put_file(rev_index_name,
386
400
            revision_index.finish())
387
401
        if 'fetch' in debug.debug_flags:
388
402
            # XXX: size might be interesting?
389
403
            mutter('%s: create_pack: wrote revision index: %s%s t+%6.3fs',
390
 
                time.ctime(), self.repo._upload_transport.base, random_name,
 
404
                time.ctime(), self._upload_transport.base, random_name,
391
405
                time.time() - start_time)
392
406
        inv_index_name = self.repo._inv_thunk.name_to_inv_index_name(new_name)
393
407
        inventory_index_length = index_transport.put_file(inv_index_name,
395
409
        if 'fetch' in debug.debug_flags:
396
410
            # XXX: size might be interesting?
397
411
            mutter('%s: create_pack: wrote inventory index: %s%s t+%6.3fs',
398
 
                time.ctime(), self.repo._upload_transport.base, random_name,
 
412
                time.ctime(), self._upload_transport.base, random_name,
399
413
                time.time() - start_time)
400
414
        text_index_name = self.repo.weave_store.name_to_text_index_name(new_name)
401
415
        text_index_length = index_transport.put_file(text_index_name,
403
417
        if 'fetch' in debug.debug_flags:
404
418
            # XXX: size might be interesting?
405
419
            mutter('%s: create_pack: wrote file texts index: %s%s t+%6.3fs',
406
 
                time.ctime(), self.repo._upload_transport.base, random_name,
 
420
                time.ctime(), self._upload_transport.base, random_name,
407
421
                time.time() - start_time)
408
422
        signature_index_name = self.repo._revision_store.name_to_signature_index_name(new_name)
409
423
        signature_index_length = index_transport.put_file(signature_index_name,
411
425
        if 'fetch' in debug.debug_flags:
412
426
            # XXX: size might be interesting?
413
427
            mutter('%s: create_pack: wrote revision signatures index: %s%s t+%6.3fs',
414
 
                time.ctime(), self.repo._upload_transport.base, random_name,
 
428
                time.ctime(), self._upload_transport.base, random_name,
415
429
                time.time() - start_time)
416
430
        # add to name
417
431
        self.allocate(new_name, revision_index_length, inventory_index_length,
419
433
        # rename into place. XXX: should rename each index too rather than just
420
434
        # uploading blind under the chosen name.
421
435
        write_stream.close()
422
 
        self.repo._upload_transport.rename(random_name, '../packs/' + new_name + '.pack')
 
436
        self._upload_transport.rename(random_name, '../packs/' + new_name + '.pack')
423
437
        if 'fetch' in debug.debug_flags:
424
438
            # XXX: size might be interesting?
425
439
            mutter('%s: create_pack: pack renamed into place: %s%s->%s%s t+%6.3fs',
426
 
                time.ctime(), self.repo._upload_transport.base, random_name,
 
440
                time.ctime(), self._upload_transport.base, random_name,
427
441
                result.transport, result.name,
428
442
                time.time() - start_time)
429
443
        result.revision_index = revision_index
433
447
        if 'fetch' in debug.debug_flags:
434
448
            # XXX: size might be interesting?
435
449
            mutter('%s: create_pack: finished: %s%s t+%6.3fs',
436
 
                time.ctime(), self.repo._upload_transport.base, random_name,
 
450
                time.ctime(), self._upload_transport.base, random_name,
437
451
                time.time() - start_time)
438
452
        return result
439
453
 
453
467
                self._remove_pack_by_name(pack_detail[1])
454
468
        # record the newly available packs and stop advertising the old
455
469
        # packs
456
 
        self.save()
 
470
        self._save_pack_names()
457
471
        # move the old packs out of the way
458
472
        for revision_count, pack_details in pack_operations:
459
473
            self._obsolete_packs(pack_details)
663
677
        self._names[name] = (revision_index_length, inventory_index_length,
664
678
            text_index_length, signature_index_length)
665
679
 
 
680
    def _make_index_map(self, suffix):
 
681
        """Return information on existing indexes.
 
682
 
 
683
        :param suffix: Index suffix added to pack name.
 
684
 
 
685
        :returns: (pack_map, indices) where indices is a list of GraphIndex 
 
686
        objects, and pack_map is a mapping from those objects to the 
 
687
        pack tuple they describe.
 
688
        """
 
689
        indices = []
 
690
        pack_map = {}
 
691
        self.ensure_loaded()
 
692
        for name in self.names():
 
693
            # TODO: maybe this should expose size to us  to allow
 
694
            # sorting of the indices for better performance ?
 
695
            index_name = name + suffix
 
696
            new_index = GraphIndex(self._index_transport, index_name)
 
697
            indices.append(new_index)
 
698
            pack_map[new_index] = self._pack_tuple(name)
 
699
        return pack_map, indices
 
700
 
666
701
    def _max_pack_count(self, total_revisions):
667
702
        """Return the maximum number of packs to use for total revisions.
668
703
        
698
733
            pack_detail[0].rename(pack_detail[1],
699
734
                '../obsolete_packs/' + pack_detail[1])
700
735
            basename = pack_detail[1][:-4]
701
 
            index_transport = pack_detail[0].clone('../indices')
 
736
            # TODO: Probably needs to know all possible indexes for this pack
 
737
            # - or maybe list the directory and move all indexes matching this
 
738
            # name whether we recognize it or not?
702
739
            for suffix in ('iix', 'six', 'tix', 'rix'):
703
 
                index_transport.rename(basename + suffix,
 
740
                self._index_transport.rename(basename + suffix,
704
741
                    '../obsolete_packs/' + basename + suffix)
705
742
 
706
743
    def pack_distribution(self, total_revisions):
719
756
                result.append(size)
720
757
        return list(reversed(result))
721
758
 
 
759
    def _pack_tuple(self, name):
 
760
        """Return a tuple with the transport and file name for a pack name."""
 
761
        return self._pack_transport, name + '.pack'
 
762
 
722
763
    def _remove_pack_by_name(self, name):
723
764
        # strip .pack
724
765
        self._names.pop(name[:-5])
727
768
        self._names = None
728
769
        self.packs = []
729
770
 
 
771
    def _make_index_to_pack_map(self, pack_details, index_suffix):
 
772
        """Given a list (transport,name), return a map of (index)->(transport, name)."""
 
773
        # the simplest thing for now is to create new index objects.
 
774
        # this should really reuse the existing index objects for these 
 
775
        # packs - this means making the way they are managed in the repo be 
 
776
        # more sane.
 
777
        indices = {}
 
778
        for transport, name in pack_details:
 
779
            index_name = name[:-5] + index_suffix
 
780
            indices[GraphIndex(self._index_transport, index_name)] = \
 
781
                (transport, name)
 
782
        return indices
 
783
 
730
784
    def _inv_index_map(self, pack_details):
731
785
        """Get a map of inv index -> packs for pack_details."""
732
 
        # the simplest thing for now is to create new index objects.
733
 
        # this should really reuse the existing index objects for these 
734
 
        # packs - this means making the way they are managed in the repo be 
735
 
        # more sane.
736
 
        indices = {}
737
 
        for transport, name in pack_details:
738
 
            index_name = name[:-5]
739
 
            index_name = self.repo._inv_thunk.name_to_inv_index_name(index_name)
740
 
            indices[GraphIndex(transport.clone('../indices'), index_name)] = \
741
 
                (transport, name)
742
 
        return indices
 
786
        return self._make_index_to_pack_map(pack_details, '.iix')
743
787
 
744
788
    def _revision_index_map(self, pack_details):
745
789
        """Get a map of revision index -> packs for pack_details."""
746
 
        # the simplest thing for now is to create new index objects.
747
 
        # this should really reuse the existing index objects for these 
748
 
        # packs - this means making the way they are managed in the repo be 
749
 
        # more sane.
750
 
        indices = {}
751
 
        for transport, name in pack_details:
752
 
            index_name = name[:-5]
753
 
            index_name = self.repo._revision_store.name_to_revision_index_name(index_name)
754
 
            indices[GraphIndex(transport.clone('../indices'), index_name)] = \
755
 
                (transport, name)
756
 
        return indices
 
790
        return self._make_index_to_pack_map(pack_details, '.rix')
757
791
 
758
792
    def _signature_index_map(self, pack_details):
759
793
        """Get a map of signature index -> packs for pack_details."""
760
 
        # the simplest thing for now is to create new index objects.
761
 
        # this should really reuse the existing index objects for these 
762
 
        # packs - this means making the way they are managed in the repo be 
763
 
        # more sane.
764
 
        indices = {}
765
 
        for transport, name in pack_details:
766
 
            index_name = name[:-5]
767
 
            index_name = self.repo._revision_store.name_to_signature_index_name(index_name)
768
 
            indices[GraphIndex(transport.clone('../indices'), index_name)] = \
769
 
                (transport, name)
770
 
        return indices
 
794
        return self._make_index_to_pack_map(pack_details, '.six')
771
795
 
772
796
    def _text_index_map(self, pack_details):
773
797
        """Get a map of text index -> packs for pack_details."""
774
 
        # the simplest thing for now is to create new index objects.
775
 
        # this should really reuse the existing index objects for these 
776
 
        # packs - this means making the way they are managed in the repo be 
777
 
        # more sane.
778
 
        indices = {}
779
 
        for transport, name in pack_details:
780
 
            index_name = name[:-5]
781
 
            index_name = self.repo.weave_store.name_to_text_index_name(index_name)
782
 
            indices[GraphIndex(transport.clone('../indices'), index_name)] = \
783
 
                (transport, name)
784
 
        return indices
 
798
        return self._make_index_to_pack_map(pack_details, '.tix')
785
799
 
786
800
    def _index_contents(self, pack_map, key_filter=None):
787
801
        """Get an iterable of the index contents from a pack_map.
797
811
        else:
798
812
            return all_index.iter_entries(key_filter)
799
813
 
800
 
    def save(self):
 
814
    def _save_pack_names(self):
801
815
        builder = GraphIndexBuilder()
802
816
        for name, sizes in self._names.iteritems():
803
817
            builder.add_node((name, ), ' '.join(str(size) for size in sizes))
808
822
        if self.repo.control_files._lock_mode != 'w':
809
823
            raise errors.NotWriteLocked(self)
810
824
 
 
825
    def _start_write_group(self):
 
826
        random_name = self.repo.control_files._lock.nonce
 
827
        self.repo._open_pack_tuple = (self._upload_transport, random_name + '.pack')
 
828
        write_stream = self._upload_transport.open_write_stream(random_name + '.pack')
 
829
        self._write_stream = write_stream
 
830
        self._open_pack_hash = md5.new()
 
831
        def write_data(bytes, write=write_stream.write,
 
832
                       update=self._open_pack_hash.update):
 
833
            write(bytes)
 
834
            update(bytes)
 
835
        self._open_pack_writer = pack.ContainerWriter(write_data)
 
836
        self._open_pack_writer.begin()
 
837
        self.setup()
 
838
        self.repo._revision_store.setup()
 
839
        self.repo.weave_store.setup()
 
840
        self.repo._inv_thunk.setup()
 
841
 
 
842
    def _abort_write_group(self):
 
843
        # FIXME: just drop the transient index.
 
844
        self.repo._revision_store.reset()
 
845
        self.repo.weave_store.reset()
 
846
        self.repo._inv_thunk.reset()
 
847
        # forget what names there are
 
848
        self.reset()
 
849
        self._open_pack_hash = None
 
850
 
 
851
    def _commit_write_group(self):
 
852
        data_inserted = (self.repo._revision_store.data_inserted() or
 
853
            self.repo.weave_store.data_inserted() or 
 
854
            self.repo._inv_thunk.data_inserted())
 
855
        if data_inserted:
 
856
            self._open_pack_writer.end()
 
857
            new_name = self._open_pack_hash.hexdigest()
 
858
            new_pack = Pack()
 
859
            new_pack.name = new_name
 
860
            new_pack.transport = self._upload_transport.clone('../packs/')
 
861
            # To populate:
 
862
            # new_pack.revision_index = 
 
863
            # new_pack.inventory_index = 
 
864
            # new_pack.text_index = 
 
865
            # new_pack.signature_index = 
 
866
            self.repo.weave_store.flush(new_name, new_pack)
 
867
            self.repo._inv_thunk.flush(new_name, new_pack)
 
868
            self.repo._revision_store.flush(new_name, new_pack)
 
869
            self._write_stream.close()
 
870
            self._upload_transport.rename(self.repo._open_pack_tuple[1],
 
871
                '../packs/' + new_name + '.pack')
 
872
            # If this fails, its a hash collision. We should:
 
873
            # - determine if its a collision or
 
874
            # - the same content or
 
875
            # - the existing name is not the actual hash - e.g.
 
876
            #   its a deliberate attack or data corruption has
 
877
            #   occuring during the write of that file.
 
878
            self.allocate(new_name, new_pack.revision_index_length,
 
879
                new_pack.inventory_index_length, new_pack.text_index_length,
 
880
                new_pack.signature_index_length)
 
881
            self.repo._open_pack_tuple = None
 
882
            if not self.autopack():
 
883
                self._save_pack_names()
 
884
        else:
 
885
            # remove the pending upload
 
886
            self._upload_transport.delete(self.repo._open_pack_tuple[1])
 
887
        self.repo._revision_store.reset()
 
888
        self.repo.weave_store.reset()
 
889
        self.repo._inv_thunk.reset()
 
890
        # forget what names there are - should just refresh and deal with the
 
891
        # delta.
 
892
        self.reset()
 
893
        self._open_pack_hash = None
 
894
        self._write_stream = None
 
895
 
811
896
 
812
897
class GraphKnitRevisionStore(KnitRevisionStore):
813
898
    """An object to adapt access from RevisionStore's to use GraphKnits.
839
924
        """Get the revision versioned file object."""
840
925
        if getattr(self.repo, '_revision_knit', None) is not None:
841
926
            return self.repo._revision_knit
842
 
        self.repo._packs.ensure_loaded()
843
 
        pack_map, indices = self._make_rev_pack_map()
 
927
        pack_map, indices = self.repo._packs._make_index_map('.rix')
844
928
        if self.repo.is_in_write_group():
845
929
            # allow writing: queue writes to a new index
846
930
            indices.insert(0, self.repo._revision_write_index)
847
931
            pack_map[self.repo._revision_write_index] = self.repo._open_pack_tuple
848
 
            writer = self.repo._open_pack_writer, self.repo._revision_write_index
 
932
            writer = self.repo._packs._open_pack_writer, self.repo._revision_write_index
849
933
            add_callback = self.repo._revision_write_index.add_nodes
850
934
        else:
851
935
            writer = None
864
948
            access_method=knit_access)
865
949
        return self.repo._revision_knit
866
950
 
867
 
    def _make_rev_pack_map(self):
868
 
        indices = []
869
 
        pack_map = {}
870
 
        for name in self.repo._packs.names():
871
 
            # TODO: maybe this should expose size to us  to allow
872
 
            # sorting of the indices for better performance ?
873
 
            index_name = self.name_to_revision_index_name(name)
874
 
            indices.append(GraphIndex(self.transport, index_name))
875
 
            pack_map[indices[-1]] = (self.repo._pack_tuple(name))
876
 
        return pack_map, indices
877
 
 
878
951
    def get_signature_file(self, transaction):
879
952
        """Get the signature versioned file object."""
880
953
        if getattr(self.repo, '_signature_knit', None) is not None:
881
954
            return self.repo._signature_knit
882
 
        indices = []
883
 
        self.repo._packs.ensure_loaded()
884
 
        pack_map = {}
885
 
        for name in self.repo._packs.names():
886
 
            # TODO: maybe this should expose size to us  to allow
887
 
            # sorting of the indices for better performance ?
888
 
            index_name = self.name_to_signature_index_name(name)
889
 
            indices.append(GraphIndex(self.transport, index_name))
890
 
            pack_map[indices[-1]] = (self.repo._pack_tuple(name))
 
955
        pack_map, indices = self.repo._packs._make_index_map('.six')
891
956
        if self.repo.is_in_write_group():
892
957
            # allow writing: queue writes to a new index
893
958
            indices.insert(0, self.repo._signature_write_index)
894
959
            pack_map[self.repo._signature_write_index] = self.repo._open_pack_tuple
895
 
            writer = self.repo._open_pack_writer, self.repo._signature_write_index
 
960
            writer = self.repo._packs._open_pack_writer, self.repo._signature_write_index
896
961
            add_callback = self.repo._signature_write_index.add_nodes
897
962
        else:
898
963
            writer = None
929
994
            # create a pack map for the autopack code - XXX finish
930
995
            # making a clear managed list of packs, indices and use
931
996
            # that in these mapping classes
932
 
            self.repo._revision_pack_map = self._make_rev_pack_map()[0]
 
997
            self.repo._revision_pack_map = self.repo._packs._make_index_map('.rix')[0]
933
998
        else:
934
999
            del self.repo._revision_pack_map[self.repo._revision_write_index]
935
1000
            self.repo._revision_write_index = None
936
1001
            new_index = GraphIndex(self.transport, new_index_name)
937
 
            self.repo._revision_pack_map[new_index] = (self.repo._pack_tuple(new_name))
 
1002
            self.repo._revision_pack_map[new_index] = (self.repo._packs._pack_tuple(new_name))
938
1003
            # revisions 'knit' accessed : update it.
939
1004
            self.repo._revision_all_indices.insert_index(0, new_index)
940
1005
            # remove the write buffering index. XXX: API break
988
1053
        if self.repo._revision_knit is not None:
989
1054
            self.repo._revision_all_indices.insert_index(0, self.repo._revision_write_index)
990
1055
            self.repo._revision_knit._index._add_callback = self.repo._revision_write_index.add_nodes
991
 
            self.repo._revision_knit_access.set_writer(self.repo._open_pack_writer,
 
1056
            self.repo._revision_knit_access.set_writer(
 
1057
                self.repo._packs._open_pack_writer,
992
1058
                self.repo._revision_write_index, self.repo._open_pack_tuple)
993
1059
        if self.repo._signature_knit is not None:
994
1060
            self.repo._signature_all_indices.insert_index(0, self.repo._signature_write_index)
995
1061
            self.repo._signature_knit._index._add_callback = self.repo._signature_write_index.add_nodes
996
 
            self.repo._signature_knit_access.set_writer(self.repo._open_pack_writer,
 
1062
            self.repo._signature_knit_access.set_writer(
 
1063
                self.repo._packs._open_pack_writer,
997
1064
                self.repo._signature_write_index, self.repo._open_pack_tuple)
998
1065
 
999
1066
 
1039
1106
        """Create the combined index for all texts."""
1040
1107
        if getattr(self.repo, '_text_all_indices', None) is not None:
1041
1108
            return
1042
 
        indices = []
1043
 
        self.repo._packs.ensure_loaded()
1044
 
        self.repo._text_pack_map = {}
1045
 
        for name in self.repo._packs.names():
1046
 
            # TODO: maybe this should expose size to us  to allow
1047
 
            # sorting of the indices for better performance ?
1048
 
            index_name = self.name_to_text_index_name(name)
1049
 
            indices.append(GraphIndex(self.transport, index_name))
1050
 
            self.repo._text_pack_map[indices[-1]] = (self.repo._pack_tuple(name))
 
1109
        pack_map, indices = self.repo._packs._make_index_map('.tix')
 
1110
        self.repo._text_pack_map = pack_map
1051
1111
        if for_write or self.repo.is_in_write_group():
1052
1112
            # allow writing: queue writes to a new index
1053
1113
            indices.insert(0, self.repo._text_write_index)
1135
1195
    
1136
1196
    def _setup_knit(self, for_write):
1137
1197
        if for_write:
1138
 
            writer = (self.repo._open_pack_writer, self.repo._text_write_index)
 
1198
            writer = (self.repo._packs._open_pack_writer, self.repo._text_write_index)
1139
1199
        else:
1140
1200
            writer = None
1141
1201
        self.repo._text_knit_access = _PackAccess(
1171
1231
        """Create the combined index for all inventories."""
1172
1232
        if getattr(self.repo, '_inv_all_indices', None) is not None:
1173
1233
            return
1174
 
        indices = []
1175
 
        self.repo._packs.ensure_loaded()
1176
 
        pack_map = {}
1177
 
        for name in self.repo._packs.names():
1178
 
            # TODO: maybe this should expose size to us  to allow
1179
 
            # sorting of the indices for better performance ?
1180
 
            index_name = self.name_to_inv_index_name(name)
1181
 
            indices.append(GraphIndex(self.transport, index_name))
1182
 
            pack_map[indices[-1]] = (self.repo._pack_tuple(name))
 
1234
        pack_map, indices = self.repo._packs._make_index_map('.iix')
1183
1235
        if self.repo.is_in_write_group():
1184
1236
            # allow writing: queue writes to a new index
1185
1237
            indices.append(self.repo._inv_write_index)
1212
1264
        if self.repo.is_in_write_group():
1213
1265
            add_callback = self.repo._inv_write_index.add_nodes
1214
1266
            self.repo._inv_pack_map[self.repo._inv_write_index] = self.repo._open_pack_tuple
1215
 
            writer = self.repo._open_pack_writer, self.repo._inv_write_index
 
1267
            writer = self.repo._packs._open_pack_writer, self.repo._inv_write_index
1216
1268
        else:
1217
1269
            add_callback = None # no data-adding permitted.
1218
1270
            writer = None
1266
1318
        KnitRepository.__init__(self, _format, a_bzrdir, control_files,
1267
1319
                              _revision_store, control_store, text_store)
1268
1320
        index_transport = control_files._transport.clone('indices')
1269
 
        self._packs = RepositoryPackCollection(self, control_files._transport)
 
1321
        self._packs = RepositoryPackCollection(self, control_files._transport,
 
1322
            index_transport,
 
1323
            control_files._transport.clone('upload'),
 
1324
            control_files._transport.clone('packs'))
1270
1325
        self._revision_store = GraphKnitRevisionStore(self, index_transport, self._revision_store)
1271
1326
        self.weave_store = GraphKnitTextStore(self, index_transport, self.weave_store)
1272
1327
        self._inv_thunk = InventoryKnitThunk(self, index_transport)
1273
 
        self._upload_transport = control_files._transport.clone('upload')
1274
 
        self._pack_transport = control_files._transport.clone('packs')
1275
1328
        # for tests
1276
1329
        self._reconcile_does_inventory_gc = False
1277
1330
 
1278
1331
    def _abort_write_group(self):
1279
 
        # FIXME: just drop the transient index.
1280
 
        self._revision_store.reset()
1281
 
        self.weave_store.reset()
1282
 
        self._inv_thunk.reset()
1283
 
        # forget what names there are
1284
 
        self._packs.reset()
1285
 
        self._open_pack_hash = None
1286
 
 
1287
 
    def _pack_tuple(self, name):
1288
 
        """Return a tuple with the transport and file name for a pack name."""
1289
 
        return self._pack_transport, name + '.pack'
 
1332
        self._packs._abort_write_group()
1290
1333
 
1291
1334
    def _refresh_data(self):
1292
1335
        if self.control_files._lock_count==1:
1297
1340
            self._packs.reset()
1298
1341
 
1299
1342
    def _start_write_group(self):
1300
 
        random_name = self.control_files._lock.nonce
1301
 
        self._open_pack_tuple = (self._upload_transport, random_name + '.pack')
1302
 
        write_stream = self._upload_transport.open_write_stream(random_name + '.pack')
1303
 
        self._write_stream = write_stream
1304
 
        self._open_pack_hash = md5.new()
1305
 
        def write_data(bytes, write=write_stream.write, update=self._open_pack_hash.update):
1306
 
            write(bytes)
1307
 
            update(bytes)
1308
 
        self._open_pack_writer = pack.ContainerWriter(write_data)
1309
 
        self._open_pack_writer.begin()
1310
 
        self._packs.setup()
1311
 
        self._revision_store.setup()
1312
 
        self.weave_store.setup()
1313
 
        self._inv_thunk.setup()
 
1343
        self._packs._start_write_group()
1314
1344
 
1315
1345
    def _commit_write_group(self):
1316
 
        data_inserted = (self._revision_store.data_inserted() or
1317
 
            self.weave_store.data_inserted() or 
1318
 
            self._inv_thunk.data_inserted())
1319
 
        if data_inserted:
1320
 
            self._open_pack_writer.end()
1321
 
            new_name = self._open_pack_hash.hexdigest()
1322
 
            new_pack = Pack()
1323
 
            new_pack.name = new_name
1324
 
            new_pack.transport = self._upload_transport.clone('../packs/')
1325
 
            # To populate:
1326
 
            # new_pack.revision_index = 
1327
 
            # new_pack.inventory_index = 
1328
 
            # new_pack.text_index = 
1329
 
            # new_pack.signature_index = 
1330
 
            self.weave_store.flush(new_name, new_pack)
1331
 
            self._inv_thunk.flush(new_name, new_pack)
1332
 
            self._revision_store.flush(new_name, new_pack)
1333
 
            self._write_stream.close()
1334
 
            self._upload_transport.rename(self._open_pack_tuple[1],
1335
 
                '../packs/' + new_name + '.pack')
1336
 
            # If this fails, its a hash collision. We should:
1337
 
            # - determine if its a collision or
1338
 
            # - the same content or
1339
 
            # - the existing name is not the actual hash - e.g.
1340
 
            #   its a deliberate attack or data corruption has
1341
 
            #   occuring during the write of that file.
1342
 
            self._packs.allocate(new_name, new_pack.revision_index_length,
1343
 
                new_pack.inventory_index_length, new_pack.text_index_length,
1344
 
                new_pack.signature_index_length)
1345
 
            self._open_pack_tuple = None
1346
 
            if not self._packs.autopack():
1347
 
                self._packs.save()
1348
 
        else:
1349
 
            # remove the pending upload
1350
 
            self._upload_transport.delete(self._open_pack_tuple[1])
1351
 
        self._revision_store.reset()
1352
 
        self.weave_store.reset()
1353
 
        self._inv_thunk.reset()
1354
 
        # forget what names there are - should just refresh and deal with the
1355
 
        # delta.
1356
 
        self._packs.reset()
1357
 
        self._open_pack_hash = None
1358
 
        self._write_stream = None
 
1346
        return self._packs._commit_write_group()
1359
1347
 
1360
1348
    def get_inventory_weave(self):
1361
1349
        return self._inv_thunk.get_weave()
1397
1385
        KnitRepository3.__init__(self, _format, a_bzrdir, control_files,
1398
1386
                              _revision_store, control_store, text_store)
1399
1387
        index_transport = control_files._transport.clone('indices')
1400
 
        self._packs = RepositoryPackCollection(self, control_files._transport)
 
1388
        self._packs = RepositoryPackCollection(self, control_files._transport,
 
1389
            index_transport,
 
1390
            control_files._transport.clone('upload'),
 
1391
            control_files._transport.clone('packs'))
1401
1392
        self._revision_store = GraphKnitRevisionStore(self, index_transport, self._revision_store)
1402
1393
        self.weave_store = GraphKnitTextStore(self, index_transport, self.weave_store)
1403
1394
        self._inv_thunk = InventoryKnitThunk(self, index_transport)
1404
 
        self._upload_transport = control_files._transport.clone('upload')
1405
 
        self._pack_transport = control_files._transport.clone('packs')
1406
1395
        # for tests
1407
1396
        self._reconcile_does_inventory_gc = False
1408
1397
 
1409
1398
    def _abort_write_group(self):
1410
 
        # FIXME: just drop the transient index.
1411
 
        self._revision_store.reset()
1412
 
        self.weave_store.reset()
1413
 
        self._inv_thunk.reset()
1414
 
        # forget what names there are
1415
 
        self._packs.reset()
1416
 
        self._open_pack_hash = None
1417
 
 
1418
 
    def _pack_tuple(self, name):
1419
 
        """Return a tuple with the transport and file name for a pack name."""
1420
 
        return self._pack_transport, name + '.pack'
 
1399
        return self._packs._abort_write_group()
1421
1400
 
1422
1401
    def _refresh_data(self):
1423
1402
        if self.control_files._lock_count==1:
1428
1407
            self._packs.reset()
1429
1408
 
1430
1409
    def _start_write_group(self):
1431
 
        random_name = self.control_files._lock.nonce
1432
 
        self._open_pack_tuple = (self._upload_transport, random_name + '.pack')
1433
 
        write_stream = self._upload_transport.open_write_stream(random_name + '.pack')
1434
 
        self._write_stream = write_stream
1435
 
        self._open_pack_hash = md5.new()
1436
 
        def write_data(bytes, write=write_stream.write, update=self._open_pack_hash.update):
1437
 
            write(bytes)
1438
 
            update(bytes)
1439
 
        self._open_pack_writer = pack.ContainerWriter(write_data)
1440
 
        self._open_pack_writer.begin()
1441
 
        self._packs.setup()
1442
 
        self._revision_store.setup()
1443
 
        self.weave_store.setup()
1444
 
        self._inv_thunk.setup()
 
1410
        self._packs._start_write_group()
1445
1411
 
1446
1412
    def _commit_write_group(self):
1447
 
        data_inserted = (self._revision_store.data_inserted() or
1448
 
            self.weave_store.data_inserted() or 
1449
 
            self._inv_thunk.data_inserted())
1450
 
        if data_inserted:
1451
 
            self._open_pack_writer.end()
1452
 
            new_name = self._open_pack_hash.hexdigest()
1453
 
            new_pack = Pack()
1454
 
            new_pack.name = new_name
1455
 
            new_pack.transport = self._upload_transport.clone('../packs/')
1456
 
            # To populate:
1457
 
            # new_pack.revision_index = 
1458
 
            # new_pack.inventory_index = 
1459
 
            # new_pack.text_index = 
1460
 
            # new_pack.signature_index = 
1461
 
            self.weave_store.flush(new_name, new_pack)
1462
 
            self._inv_thunk.flush(new_name, new_pack)
1463
 
            self._revision_store.flush(new_name, new_pack)
1464
 
            self._write_stream.close()
1465
 
            self._upload_transport.rename(self._open_pack_tuple[1],
1466
 
                '../packs/' + new_name + '.pack')
1467
 
            # If this fails, its a hash collision. We should:
1468
 
            # - determine if its a collision or
1469
 
            # - the same content or
1470
 
            # - the existing name is not the actual hash - e.g.
1471
 
            #   its a deliberate attack or data corruption has
1472
 
            #   occuring during the write of that file.
1473
 
            self._packs.allocate(new_name, new_pack.revision_index_length,
1474
 
                new_pack.inventory_index_length, new_pack.text_index_length,
1475
 
                new_pack.signature_index_length)
1476
 
            self._open_pack_tuple = None
1477
 
            if not self._packs.autopack():
1478
 
                self._packs.save()
1479
 
        else:
1480
 
            # remove the pending upload
1481
 
            self._upload_transport.delete(self._open_pack_tuple[1])
1482
 
        self._revision_store.reset()
1483
 
        self.weave_store.reset()
1484
 
        self._inv_thunk.reset()
1485
 
        # forget what names there are - should just refresh and deal with the
1486
 
        # delta.
1487
 
        self._packs.reset()
1488
 
        self._open_pack_hash = None
1489
 
        self._write_stream = None
 
1413
        return self._packs._commit_write_group()
1490
1414
 
1491
1415
    def get_inventory_weave(self):
1492
1416
        return self._inv_thunk.get_weave()