~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/pack_repo.py

  • Committer: Robert Collins
  • Date: 2007-10-12 05:37:36 UTC
  • mto: This revision was merged to the branch mainline in revision 2933.
  • Revision ID: robertc@robertcollins.net-20071012053736-y2su3gx25bhxzeos
Allow pack repositories to have multiple writers active at one time, for greater concurrency.

Show diffs side-by-side

added added

removed removed

Lines of Context:
34
34
    GraphIndexPrefixAdapter,
35
35
    )
36
36
from bzrlib.knit import KnitGraphIndex, _PackAccess, _KnitData
 
37
from bzrlib.osutils import rand_chars
37
38
from bzrlib.pack import ContainerWriter
38
39
from bzrlib.store import revision
39
40
""")
145
146
        self.packs = []
146
147
        # name:Pack mapping
147
148
        self._packs = {}
 
149
        # the previous pack-names content
 
150
        self._packs_at_load = None
148
151
 
149
152
    def add_pack_to_memory(self, pack):
150
153
        """Make a Pack object available to the repository to satisfy queries.
292
295
        if revision_ids is not None and len(revision_ids) == 0:
293
296
            # silly fetch request.
294
297
            return None
295
 
        random_name = self.repo.control_files._lock.nonce + suffix
 
298
        random_name = self._random_name() + suffix
296
299
        if 'fetch' in debug.debug_flags:
297
300
            plain_pack_list = ['%s%s' % (a_pack.transport.base, a_pack.name)
298
301
                for a_pack in packs]
506
509
        for revision_count, packs in pack_operations:
507
510
            self._obsolete_packs(packs)
508
511
 
 
512
    def lock_names(self):
 
513
        """Acquire the mutex around the pack-names index.
 
514
        
 
515
        This cannot be used in the middle of a read-only transaction on the
 
516
        repository.
 
517
        """
 
518
        self.repo.control_files.lock_write()
 
519
 
509
520
    def pack(self):
510
521
        """Pack the pack collection totally."""
511
522
        self.ensure_loaded()
667
678
    def ensure_loaded(self):
668
679
        if self._names is None:
669
680
            self._names = {}
670
 
            for index, key, value in \
671
 
                GraphIndex(self.transport, 'pack-names', None
672
 
                    ).iter_all_entries():
 
681
            self._packs_at_load = set()
 
682
            for index, key, value in self._iter_disk_pack_index():
673
683
                name = key[0]
674
684
                sizes = [int(digits) for digits in value.split(' ')]
675
685
                self._names[name] = sizes
 
686
                self._packs_at_load.add((key, value))
676
687
 
677
688
    def get_pack_by_name(self, name):
678
689
        """Get a Pack object by name.
711
722
        self._names[name] = (revision_index_length, inventory_index_length,
712
723
            text_index_length, signature_index_length)
713
724
 
 
725
    def _iter_disk_pack_index(self):
 
726
        """Iterate over the contents of the pack-names index.
 
727
        
 
728
        This is used when loading the list from disk, and before writing to
 
729
        detect updates from others during our write operation.
 
730
        :return: An iterator of the index contents.
 
731
        """
 
732
        return GraphIndex(self.transport, 'pack-names', None
 
733
                ).iter_all_entries()
 
734
 
714
735
    def _make_index_map(self, suffix):
715
736
        """Return information on existing indexes.
716
737
 
805
826
        self._names = None
806
827
        self.packs = []
807
828
        self._packs = {}
 
829
        self._packs_at_load = None
808
830
 
809
831
    def _make_index_to_pack_map(self, pack_details, index_suffix):
810
832
        """Given a list (transport,name), return a map of (index)->(transport, name)."""
854
876
        else:
855
877
            return all_index.iter_entries(key_filter)
856
878
 
 
879
    def _random_name(self):
 
880
        """Return a random name."""
 
881
        return rand_chars(20)
 
882
 
 
883
    def release_names(self):
 
884
        """Release the mutex around the pack-names index."""
 
885
        self.repo.control_files.unlock()
 
886
 
857
887
    def _save_pack_names(self):
858
 
        builder = GraphIndexBuilder()
859
 
        for name, sizes in self._names.iteritems():
860
 
            builder.add_node((name, ), ' '.join(str(size) for size in sizes))
861
 
        self.transport.put_file('pack-names', builder.finish())
 
888
        """Save the list of packs.
 
889
 
 
890
        This will take out the mutex around the pack names list for the
 
891
        duration of the method call. If concurrent updates have been made, a
 
892
        three-way merge between the current list and the current in memory list
 
893
        is performed.
 
894
        """
 
895
        self.lock_names()
 
896
        try:
 
897
            builder = GraphIndexBuilder()
 
898
            # load the disk nodes across
 
899
            disk_nodes = set()
 
900
            for index, key, value in self._iter_disk_pack_index():
 
901
                disk_nodes.add((key, value))
 
902
            # do a two-way diff against our original content
 
903
            current_nodes = set()
 
904
            for name, sizes in self._names.iteritems():
 
905
                current_nodes.add(
 
906
                    ((name, ), ' '.join(str(size) for size in sizes)))
 
907
            deleted_nodes = self._packs_at_load - current_nodes
 
908
            new_nodes = current_nodes - self._packs_at_load
 
909
            disk_nodes.difference_update(deleted_nodes)
 
910
            disk_nodes.update(new_nodes)
 
911
            for key, value in disk_nodes:
 
912
                builder.add_node(key, value)
 
913
            self.transport.put_file('pack-names', builder.finish())
 
914
        finally:
 
915
            self.release_names()
862
916
 
863
917
    def setup(self):
864
918
        # cannot add names if we're not in a 'write lock'.
865
 
        if self.repo.control_files._lock_mode != 'w':
 
919
        if not self.repo.is_write_locked():
866
920
            raise errors.NotWriteLocked(self)
867
921
 
868
922
    def _start_write_group(self):
869
 
        random_name = self.repo.control_files._lock.nonce
 
923
        random_name = self._random_name()
870
924
        self.repo._open_pack_tuple = (self._upload_transport, random_name + '.pack')
871
925
        write_stream = self._upload_transport.open_write_stream(random_name + '.pack')
872
926
        self._write_stream = write_stream
984
1038
        self.repo._revision_knit = knit.KnitVersionedFile(
985
1039
            'revisions', self.transport.clone('..'),
986
1040
            self.repo.control_files._file_mode,
987
 
            create=False, access_mode=self.repo.control_files._lock_mode,
 
1041
            create=False, access_mode=self.repo._access_mode(),
988
1042
            index=knit_index, delta=False, factory=knit.KnitPlainFactory(),
989
1043
            access_method=knit_access)
990
1044
        return self.repo._revision_knit
1011
1065
        self.repo._signature_knit = knit.KnitVersionedFile(
1012
1066
            'signatures', self.transport.clone('..'),
1013
1067
            self.repo.control_files._file_mode,
1014
 
            create=False, access_mode=self.repo.control_files._lock_mode,
 
1068
            create=False, access_mode=self.repo._access_mode(),
1015
1069
            index=knit_index, delta=False, factory=knit.KnitPlainFactory(),
1016
1070
            access_method=knit_access)
1017
1071
        return self.repo._signature_knit
1376
1430
        self._revision_store = GraphKnitRevisionStore(self, index_transport, self._revision_store)
1377
1431
        self.weave_store = GraphKnitTextStore(self, index_transport, self.weave_store)
1378
1432
        self._inv_thunk = InventoryKnitThunk(self, index_transport)
 
1433
        # True when the repository object is 'write locked' (as opposed to the
 
1434
        # physical lock only taken out around changes to the pack-names list.) 
 
1435
        # Another way to represent this would be a decorator around the control
 
1436
        # files object that presents logical locks as physical ones - if this
 
1437
        # gets ugly consider that alternative design. RBC 20071011
 
1438
        self._write_lock_count = 0
 
1439
        self._transaction = None
1379
1440
        # for tests
1380
1441
        self._reconcile_does_inventory_gc = False
1381
1442
 
1382
1443
    def _abort_write_group(self):
1383
1444
        self._packs._abort_write_group()
1384
1445
 
 
1446
    def _access_mode(self):
 
1447
        """Return 'w' or 'r' for depending on whether a write lock is active.
 
1448
        
 
1449
        This method is a helper for the Knit-thunking support objects.
 
1450
        """
 
1451
        if self.is_write_locked():
 
1452
            return 'w'
 
1453
        return 'r'
 
1454
 
1385
1455
    def _refresh_data(self):
1386
 
        if self.control_files._lock_count==1:
 
1456
        if self._write_lock_count == 1 or self.control_files._lock_count==1:
1387
1457
            self._revision_store.reset()
1388
1458
            self.weave_store.reset()
1389
1459
            self._inv_thunk.reset()
1399
1469
    def get_inventory_weave(self):
1400
1470
        return self._inv_thunk.get_weave()
1401
1471
 
 
1472
    def get_transaction(self):
 
1473
        if self._write_lock_count:
 
1474
            return self._transaction
 
1475
        else:
 
1476
            return self.control_files.get_transaction()
 
1477
 
 
1478
    def is_locked(self):
 
1479
        return self._write_lock_count or self.control_files.is_locked()
 
1480
 
 
1481
    def is_write_locked(self):
 
1482
        return self._write_lock_count
 
1483
 
 
1484
    def lock_write(self, token=None):
 
1485
        if not self._write_lock_count and self.is_locked():
 
1486
            raise errors.ReadOnlyError(self)
 
1487
        self._write_lock_count += 1
 
1488
        if self._write_lock_count == 1:
 
1489
            from bzrlib import transactions
 
1490
            self._transaction = transactions.WriteTransaction()
 
1491
        self._refresh_data()
 
1492
 
 
1493
    def lock_read(self):
 
1494
        if self._write_lock_count:
 
1495
            self._write_lock_count += 1
 
1496
        else:
 
1497
            self.control_files.lock_read()
 
1498
        self._refresh_data()
 
1499
 
 
1500
    def leave_lock_in_place(self):
 
1501
        # not supported - raise an error
 
1502
        raise NotImplementedError(self.leave_lock_in_place)
 
1503
 
 
1504
    def dont_leave_lock_in_place(self):
 
1505
        # not supported - raise an error
 
1506
        raise NotImplementedError(self.dont_leave_lock_in_place)
 
1507
 
1402
1508
    @needs_write_lock
1403
1509
    def pack(self):
1404
1510
        """Compress the data within the repository.
1416
1522
        reconciler.reconcile()
1417
1523
        return reconciler
1418
1524
 
 
1525
    def unlock(self):
 
1526
        if self._write_lock_count == 1 and self._write_group is not None:
 
1527
            raise errors.BzrError(
 
1528
                'Must end write groups before releasing write locks.')
 
1529
        if self._write_lock_count:
 
1530
            self._write_lock_count -= 1
 
1531
            if not self._write_lock_count:
 
1532
                transaction = self._transaction
 
1533
                self._transaction = None
 
1534
                transaction.finish()
 
1535
        else:
 
1536
            self.control_files.unlock()
 
1537
 
1419
1538
 
1420
1539
class RepositoryFormatPack(MetaDirRepositoryFormat):
1421
1540
    """Format logic for pack structured repositories.