~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/pack_repo.py

  • Committer: Andrew Bennetts
  • Date: 2009-02-13 07:34:40 UTC
  • mto: This revision was merged to the branch mainline in revision 4027.
  • Revision ID: andrew.bennetts@canonical.com-20090213073440-qzvli80q9729jvce
Implement suspend_write_group/resume_write_group.

Show diffs side-by-side

added added

removed removed

Lines of Context:
127
127
    ExistingPack and NewPack are used.
128
128
    """
129
129
 
 
130
    # A map of index 'type' to the file extension and position in the
 
131
    # index_sizes array.
 
132
    index_definitions = {
 
133
        'revision': ('.rix', 0),
 
134
        'inventory': ('.iix', 1),
 
135
        'text': ('.tix', 2),
 
136
        'signature': ('.six', 3),
 
137
        }
 
138
 
130
139
    def __init__(self, revision_index, inventory_index, text_index,
131
140
        signature_index):
132
141
        """Create a pack instance.
158
167
    def get_revision_count(self):
159
168
        return self.revision_index.key_count()
160
169
 
 
170
    def index_name(self, index_type, name):
 
171
        """Get the disk name of an index type for pack name 'name'."""
 
172
        return name + Pack.index_definitions[index_type][0]
 
173
 
 
174
    def index_offset(self, index_type):
 
175
        """Get the position in a index_size array for a given index type."""
 
176
        return Pack.index_definitions[index_type][1]
 
177
 
161
178
    def inventory_index_name(self, name):
162
179
        """The inv index is the name + .iix."""
163
180
        return self.index_name('inventory', name)
174
191
        """The text index is the name + .tix."""
175
192
        return self.index_name('text', name)
176
193
 
 
194
    def _replace_index_with_readonly(self, index_type):
 
195
        setattr(self, index_type + '_index',
 
196
            self.index_class(self.index_transport,
 
197
                self.index_name(index_type, self.name),
 
198
                self.index_sizes[self.index_offset(index_type)]))
 
199
 
177
200
 
178
201
class ExistingPack(Pack):
179
202
    """An in memory proxy for an existing .pack and its disk indices."""
200
223
        return not self.__eq__(other)
201
224
 
202
225
    def __repr__(self):
203
 
        return "<bzrlib.repofmt.pack_repo.Pack object at 0x%x, %s, %s" % (
204
 
            id(self), self.pack_transport, self.name)
 
226
        return "<%s.%s object at 0x%x, %s, %s" % (
 
227
            self.__class__.__module__, self.__class__.__name__, id(self),
 
228
            self.pack_transport, self.name)
 
229
 
 
230
 
 
231
class ResumedPack(ExistingPack):
 
232
 
 
233
    def __init__(self, name, revision_index, inventory_index, text_index,
 
234
        signature_index, upload_transport, pack_transport, index_transport,
 
235
        pack_collection):
 
236
        """Create a ResumedPack object."""
 
237
        ExistingPack.__init__(self, pack_transport, name, revision_index,
 
238
            inventory_index, text_index, signature_index)
 
239
        self.upload_transport = upload_transport
 
240
        self.index_transport = index_transport
 
241
        self.index_sizes = [None, None, None, None]
 
242
        indices = [
 
243
            ('revision', revision_index),
 
244
            ('inventory', inventory_index),
 
245
            ('text', text_index),
 
246
            ('signature', signature_index),
 
247
            ]
 
248
        for index_type, index in indices:
 
249
            offset = self.index_offset(index_type)
 
250
            self.index_sizes[offset] = index._size
 
251
        self.index_class = pack_collection._index_class
 
252
        self._state = 'resumed'
 
253
        # XXX: perhaps check that the .pack file exists?
 
254
        # XXX: should sanity check name: what if a client uses a 'name' of
 
255
        #      "../../../something/private"?  could steal suspended wg from
 
256
        #      another repo!
 
257
 
 
258
    def access_tuple(self):
 
259
        if self._state == 'finished':
 
260
            return Pack.access_tuple(self)
 
261
        elif self._state == 'resumed':
 
262
            return self.upload_transport, self.file_name()
 
263
        else:
 
264
            raise AssertionError(self._state)
 
265
 
 
266
    def abort(self):
 
267
        self.upload_transport.delete(self.file_name())
 
268
        indices = [self.revision_index, self.inventory_index, self.text_index,
 
269
            self.signature_index]
 
270
        for index in indices:
 
271
            index._transport.delete(index._name)
 
272
 
 
273
    def finish(self):
 
274
        #XXX self._check_references()
 
275
        new_name = '../packs/' + self.file_name()
 
276
        self.upload_transport.rename(self.file_name(), new_name)
 
277
        for index_type in ['revision', 'inventory', 'text', 'signature']:
 
278
            old_name = self.index_name(index_type, self.name)
 
279
            new_name = '../indices/' + old_name
 
280
            self.upload_transport.rename(old_name, new_name)
 
281
            self._replace_index_with_readonly(index_type)
 
282
        self._state = 'finished'
205
283
 
206
284
 
207
285
class NewPack(Pack):
208
286
    """An in memory proxy for a pack which is being created."""
209
287
 
210
 
    # A map of index 'type' to the file extension and position in the
211
 
    # index_sizes array.
212
 
    index_definitions = {
213
 
        'revision': ('.rix', 0),
214
 
        'inventory': ('.iix', 1),
215
 
        'text': ('.tix', 2),
216
 
        'signature': ('.six', 3),
217
 
        }
218
 
 
219
288
    def __init__(self, pack_collection, upload_suffix='', file_mode=None):
220
289
        """Create a NewPack instance.
221
290
 
323
392
        
324
393
        Packs are allowed to have deltas whose base is not in the pack, but it
325
394
        must be present somewhere in this collection.  It is not allowed to
326
 
        have deltas based on a fallback repository. 
 
395
        have deltas based on a fallback repository.
327
396
        (See <https://bugs.launchpad.net/bzr/+bug/288751>)
328
397
        """
329
398
        missing_items = {}
336
405
                self._pack_collection.inventory_index.combined_index),
337
406
            ]:
338
407
            missing = external_refs.difference(
339
 
                k for (idx, k, v, r) in 
 
408
                k for (idx, k, v, r) in
340
409
                index.iter_entries(external_refs))
341
410
            if missing:
342
411
                missing_items[index_name] = sorted(list(missing))
354
423
            self.text_index.key_count() or
355
424
            self.signature_index.key_count())
356
425
 
357
 
    def finish(self):
 
426
    def finish(self, suspend=False):
358
427
        """Finish the new pack.
359
428
 
360
429
        This:
376
445
        # visible is smaller.  On the other hand none will be seen until
377
446
        # they're in the names list.
378
447
        self.index_sizes = [None, None, None, None]
379
 
        self._write_index('revision', self.revision_index, 'revision')
380
 
        self._write_index('inventory', self.inventory_index, 'inventory')
381
 
        self._write_index('text', self.text_index, 'file texts')
 
448
        self._write_index('revision', self.revision_index, 'revision', suspend)
 
449
        self._write_index('inventory', self.inventory_index, 'inventory',
 
450
            suspend)
 
451
        self._write_index('text', self.text_index, 'file texts', suspend)
382
452
        self._write_index('signature', self.signature_index,
383
 
            'revision signatures')
 
453
            'revision signatures', suspend)
384
454
        self.write_stream.close()
385
455
        # Note that this will clobber an existing pack with the same name,
386
456
        # without checking for hash collisions. While this is undesirable this
393
463
        #  - try for HASH.pack
394
464
        #  - try for temporary-name
395
465
        #  - refresh the pack-list to see if the pack is now absent
396
 
        self.upload_transport.rename(self.random_name,
397
 
                '../packs/' + self.name + '.pack')
 
466
        new_name = self.name + '.pack'
 
467
        if not suspend:
 
468
            new_name = '../packs/' + new_name
 
469
        self.upload_transport.rename(self.random_name, new_name)
398
470
        self._state = 'finished'
399
471
        if 'pack' in debug.debug_flags:
400
472
            # XXX: size might be interesting?
401
 
            mutter('%s: create_pack: pack renamed into place: %s%s->%s%s t+%6.3fs',
 
473
            mutter('%s: create_pack: pack finished: %s%s->%s t+%6.3fs',
402
474
                time.ctime(), self.upload_transport.base, self.random_name,
403
 
                self.pack_transport, self.name,
404
 
                time.time() - self.start_time)
 
475
                new_name, time.time() - self.start_time)
405
476
 
406
477
    def flush(self):
407
478
        """Flush any current data."""
411
482
            self._hash.update(bytes)
412
483
            self._buffer[:] = [[], 0]
413
484
 
414
 
    def index_name(self, index_type, name):
415
 
        """Get the disk name of an index type for pack name 'name'."""
416
 
        return name + NewPack.index_definitions[index_type][0]
417
 
 
418
 
    def index_offset(self, index_type):
419
 
        """Get the position in a index_size array for a given index type."""
420
 
        return NewPack.index_definitions[index_type][1]
421
 
 
422
 
    def _replace_index_with_readonly(self, index_type):
423
 
        setattr(self, index_type + '_index',
424
 
            self.index_class(self.index_transport,
425
 
                self.index_name(index_type, self.name),
426
 
                self.index_sizes[self.index_offset(index_type)]))
427
 
 
428
485
    def set_write_cache_size(self, size):
429
486
        self._cache_limit = size
430
487
 
431
 
    def _write_index(self, index_type, index, label):
 
488
    def _write_index(self, index_type, index, label, suspend=False):
432
489
        """Write out an index.
433
490
 
434
491
        :param index_type: The type of index to write - e.g. 'revision'.
436
493
        :param label: What label to give the index e.g. 'revision'.
437
494
        """
438
495
        index_name = self.index_name(index_type, self.name)
439
 
        self.index_sizes[self.index_offset(index_type)] = \
440
 
            self.index_transport.put_file(index_name, index.finish(),
441
 
            mode=self._file_mode)
 
496
        if suspend:
 
497
            transport = self.upload_transport
 
498
        else:
 
499
            transport = self.index_transport
 
500
        self.index_sizes[self.index_offset(index_type)] = transport.put_file(
 
501
            index_name, index.finish(), mode=self._file_mode)
442
502
        if 'pack' in debug.debug_flags:
443
503
            # XXX: size might be interesting?
444
504
            mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
1250
1310
        self.inventory_index = AggregateIndex(self.reload_pack_names)
1251
1311
        self.text_index = AggregateIndex(self.reload_pack_names)
1252
1312
        self.signature_index = AggregateIndex(self.reload_pack_names)
 
1313
        # resumed packs
 
1314
        self._resumed_packs = []
1253
1315
 
1254
1316
    def add_pack_to_memory(self, pack):
1255
1317
        """Make a Pack object available to the repository to satisfy queries.
1257
1319
        :param pack: A Pack object.
1258
1320
        """
1259
1321
        if pack.name in self._packs_by_name:
1260
 
            raise AssertionError()
 
1322
            raise AssertionError(
 
1323
                'pack %s already in _packs_by_name' % (pack.name,))
1261
1324
        self.packs.append(pack)
1262
1325
        self._packs_by_name[pack.name] = pack
1263
1326
        self.revision_index.add_index(pack.revision_index, pack)
1493
1556
            self.add_pack_to_memory(result)
1494
1557
            return result
1495
1558
 
 
1559
    def _resume_pack(self, name):
 
1560
        """Get a suspended Pack object by name.
 
1561
 
 
1562
        :param name: The name of the pack - e.g. '123456'
 
1563
        :return: A Pack object.
 
1564
        """
 
1565
        try:
 
1566
            rev_index = self._make_index(name, '.rix', resume=True)
 
1567
            inv_index = self._make_index(name, '.iix', resume=True)
 
1568
            txt_index = self._make_index(name, '.tix', resume=True)
 
1569
            sig_index = self._make_index(name, '.six', resume=True)
 
1570
            result = ResumedPack(name, rev_index, inv_index, txt_index,
 
1571
                sig_index, self._upload_transport, self._pack_transport,
 
1572
                self._index_transport, self)
 
1573
        except errors.NoSuchFile, e:
 
1574
            raise errors.UnresumableWriteGroups(self.repo, [name], str(e))
 
1575
        self.add_pack_to_memory(result)
 
1576
        self._resumed_packs.append(result)
 
1577
        return result
 
1578
 
1496
1579
    def allocate(self, a_new_pack):
1497
1580
        """Allocate name in the list of packs.
1498
1581
 
1516
1599
        return self._index_class(self.transport, 'pack-names', None
1517
1600
                ).iter_all_entries()
1518
1601
 
1519
 
    def _make_index(self, name, suffix):
 
1602
    def _make_index(self, name, suffix, resume=False):
1520
1603
        size_offset = self._suffix_offsets[suffix]
1521
1604
        index_name = name + suffix
1522
 
        index_size = self._names[name][size_offset]
1523
 
        return self._index_class(
1524
 
            self._index_transport, index_name, index_size)
 
1605
        if resume:
 
1606
            transport = self._upload_transport
 
1607
            index_size = transport.stat(index_name).st_size
 
1608
        else:
 
1609
            transport = self._index_transport
 
1610
            index_size = self._names[name][size_offset]
 
1611
        return self._index_class(transport, index_name, index_size)
1525
1612
 
1526
1613
    def _max_pack_count(self, total_revisions):
1527
1614
        """Return the maximum number of packs to use for total revisions.
1792
1879
    def _abort_write_group(self):
1793
1880
        # FIXME: just drop the transient index.
1794
1881
        # forget what names there are
 
1882
        #import pdb; pdb.set_trace()
1795
1883
        if self._new_pack is not None:
1796
1884
            try:
1797
1885
                self._new_pack.abort()
1802
1890
                # case.  -- mbp 20081113
1803
1891
                self._remove_pack_indices(self._new_pack)
1804
1892
                self._new_pack = None
 
1893
        for resumed_pack in self._resumed_packs:
 
1894
            try:
 
1895
                resumed_pack.abort()
 
1896
            finally:
 
1897
                # See comment in previous finally block.
 
1898
                self._remove_pack_indices(resumed_pack)
 
1899
            del self._resumed_packs[:]
1805
1900
        self.repo._text_knit = None
1806
1901
 
 
1902
    def _remove_resumed_pack_indices(self):
 
1903
        for resumed_pack in self._resumed_packs:
 
1904
            self._remove_pack_indices(resumed_pack)
 
1905
        del self._resumed_packs[:]
 
1906
 
1807
1907
    def _commit_write_group(self):
1808
1908
        self._remove_pack_indices(self._new_pack)
 
1909
        should_autopack = False
1809
1910
        if self._new_pack.data_inserted():
1810
1911
            # get all the data to disk and read to use
1811
1912
            self._new_pack.finish()
1812
1913
            self.allocate(self._new_pack)
1813
1914
            self._new_pack = None
 
1915
            should_autopack = True
 
1916
        else:
 
1917
            self._new_pack.abort()
 
1918
            self._new_pack = None
 
1919
        for resumed_pack in self._resumed_packs:
 
1920
            # XXX: this is a pretty ugly way to turn the resumed pack into a
 
1921
            # properly committed pack.
 
1922
            self._names[resumed_pack.name] = None
 
1923
            self._remove_pack_from_memory(resumed_pack)
 
1924
            resumed_pack.finish()
 
1925
            self.allocate(resumed_pack)
 
1926
            should_autopack = True
 
1927
        del self._resumed_packs[:]
 
1928
        if should_autopack:
1814
1929
            if not self.autopack():
1815
1930
                # when autopack takes no steps, the names list is still
1816
1931
                # unsaved.
1817
1932
                self._save_pack_names()
 
1933
        self.repo._text_knit = None
 
1934
 
 
1935
    def _suspend_write_group(self):
 
1936
        tokens = [pack.name for pack in self._resumed_packs]
 
1937
        self._remove_pack_indices(self._new_pack)
 
1938
        if self._new_pack.data_inserted():
 
1939
            # get all the data to disk and read to use
 
1940
            self._new_pack.finish(suspend=True)
 
1941
            tokens.append(self._new_pack.name)
 
1942
            self._new_pack = None
1818
1943
        else:
1819
1944
            self._new_pack.abort()
1820
1945
            self._new_pack = None
 
1946
        self._remove_resumed_pack_indices()
1821
1947
        self.repo._text_knit = None
 
1948
        return tokens
 
1949
 
 
1950
    def _resume_write_group(self, tokens):
 
1951
        for token in tokens:
 
1952
            self._resume_pack(token)
1822
1953
 
1823
1954
 
1824
1955
class KnitPackRepository(KnitRepository):
1973
2104
    def _commit_write_group(self):
1974
2105
        return self._pack_collection._commit_write_group()
1975
2106
 
 
2107
    def suspend_write_group(self):
 
2108
        # XXX check self._write_group is self.get_transaction()?
 
2109
        tokens = self._pack_collection._suspend_write_group()
 
2110
        self._write_group = None
 
2111
        return tokens
 
2112
 
 
2113
    def _resume_write_group(self, tokens):
 
2114
        self._start_write_group()
 
2115
        self._pack_collection._resume_write_group(tokens)
 
2116
 
1976
2117
    def get_transaction(self):
1977
2118
        if self._write_lock_count:
1978
2119
            return self._transaction