~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/pack_repo.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-02-20 10:08:54 UTC
  • mfrom: (4002.1.14 integration)
  • Revision ID: pqm@pqm.ubuntu.com-20090220100854-p9g7snhipls2cj0z
(robertc) Add the ability to suspend and resume write groups,
        for use by smart server streaming push. (Andrew Bennetts,
        Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
 
17
import re
17
18
import sys
18
19
 
19
20
from bzrlib.lazy_import import lazy_import
127
128
    ExistingPack and NewPack are used.
128
129
    """
129
130
 
 
131
    # A map of index 'type' to the file extension and position in the
 
132
    # index_sizes array.
 
133
    index_definitions = {
 
134
        'revision': ('.rix', 0),
 
135
        'inventory': ('.iix', 1),
 
136
        'text': ('.tix', 2),
 
137
        'signature': ('.six', 3),
 
138
        }
 
139
 
130
140
    def __init__(self, revision_index, inventory_index, text_index,
131
141
        signature_index):
132
142
        """Create a pack instance.
151
161
        """Return a tuple (transport, name) for the pack content."""
152
162
        return self.pack_transport, self.file_name()
153
163
 
 
164
    def _check_references(self):
 
165
        """Make sure our external references are present.
 
166
        
 
167
        Packs are allowed to have deltas whose base is not in the pack, but it
 
168
        must be present somewhere in this collection.  It is not allowed to
 
169
        have deltas based on a fallback repository.
 
170
        (See <https://bugs.launchpad.net/bzr/+bug/288751>)
 
171
        """
 
172
        missing_items = {}
 
173
        for (index_name, external_refs, index) in [
 
174
            ('texts',
 
175
                self._get_external_refs(self.text_index),
 
176
                self._pack_collection.text_index.combined_index),
 
177
            ('inventories',
 
178
                self._get_external_refs(self.inventory_index),
 
179
                self._pack_collection.inventory_index.combined_index),
 
180
            ]:
 
181
            missing = external_refs.difference(
 
182
                k for (idx, k, v, r) in
 
183
                index.iter_entries(external_refs))
 
184
            if missing:
 
185
                missing_items[index_name] = sorted(list(missing))
 
186
        if missing_items:
 
187
            from pprint import pformat
 
188
            raise errors.BzrCheckError(
 
189
                "Newly created pack file %r has delta references to "
 
190
                "items not in its repository:\n%s"
 
191
                % (self, pformat(missing_items)))
 
192
 
154
193
    def file_name(self):
155
194
        """Get the file name for the pack on disk."""
156
195
        return self.name + '.pack'
158
197
    def get_revision_count(self):
159
198
        return self.revision_index.key_count()
160
199
 
 
200
    def index_name(self, index_type, name):
 
201
        """Get the disk name of an index type for pack name 'name'."""
 
202
        return name + Pack.index_definitions[index_type][0]
 
203
 
 
204
    def index_offset(self, index_type):
 
205
        """Get the position in a index_size array for a given index type."""
 
206
        return Pack.index_definitions[index_type][1]
 
207
 
161
208
    def inventory_index_name(self, name):
162
209
        """The inv index is the name + .iix."""
163
210
        return self.index_name('inventory', name)
174
221
        """The text index is the name + .tix."""
175
222
        return self.index_name('text', name)
176
223
 
 
224
    def _replace_index_with_readonly(self, index_type):
 
225
        setattr(self, index_type + '_index',
 
226
            self.index_class(self.index_transport,
 
227
                self.index_name(index_type, self.name),
 
228
                self.index_sizes[self.index_offset(index_type)]))
 
229
 
177
230
 
178
231
class ExistingPack(Pack):
179
232
    """An in memory proxy for an existing .pack and its disk indices."""
200
253
        return not self.__eq__(other)
201
254
 
202
255
    def __repr__(self):
203
 
        return "<bzrlib.repofmt.pack_repo.Pack object at 0x%x, %s, %s" % (
204
 
            id(self), self.pack_transport, self.name)
 
256
        return "<%s.%s object at 0x%x, %s, %s" % (
 
257
            self.__class__.__module__, self.__class__.__name__, id(self),
 
258
            self.pack_transport, self.name)
 
259
 
 
260
 
 
261
class ResumedPack(ExistingPack):
 
262
 
 
263
    def __init__(self, name, revision_index, inventory_index, text_index,
 
264
        signature_index, upload_transport, pack_transport, index_transport,
 
265
        pack_collection):
 
266
        """Create a ResumedPack object."""
 
267
        ExistingPack.__init__(self, pack_transport, name, revision_index,
 
268
            inventory_index, text_index, signature_index)
 
269
        self.upload_transport = upload_transport
 
270
        self.index_transport = index_transport
 
271
        self.index_sizes = [None, None, None, None]
 
272
        indices = [
 
273
            ('revision', revision_index),
 
274
            ('inventory', inventory_index),
 
275
            ('text', text_index),
 
276
            ('signature', signature_index),
 
277
            ]
 
278
        for index_type, index in indices:
 
279
            offset = self.index_offset(index_type)
 
280
            self.index_sizes[offset] = index._size
 
281
        self.index_class = pack_collection._index_class
 
282
        self._pack_collection = pack_collection
 
283
        self._state = 'resumed'
 
284
        # XXX: perhaps check that the .pack file exists?
 
285
 
 
286
    def access_tuple(self):
 
287
        if self._state == 'finished':
 
288
            return Pack.access_tuple(self)
 
289
        elif self._state == 'resumed':
 
290
            return self.upload_transport, self.file_name()
 
291
        else:
 
292
            raise AssertionError(self._state)
 
293
 
 
294
    def abort(self):
 
295
        self.upload_transport.delete(self.file_name())
 
296
        indices = [self.revision_index, self.inventory_index, self.text_index,
 
297
            self.signature_index]
 
298
        for index in indices:
 
299
            index._transport.delete(index._name)
 
300
 
 
301
    def finish(self):
 
302
        self._check_references()
 
303
        new_name = '../packs/' + self.file_name()
 
304
        self.upload_transport.rename(self.file_name(), new_name)
 
305
        for index_type in ['revision', 'inventory', 'text', 'signature']:
 
306
            old_name = self.index_name(index_type, self.name)
 
307
            new_name = '../indices/' + old_name
 
308
            self.upload_transport.rename(old_name, new_name)
 
309
            self._replace_index_with_readonly(index_type)
 
310
        self._state = 'finished'
 
311
 
 
312
    def _get_external_refs(self, index):
 
313
        return index.external_references(1)
205
314
 
206
315
 
207
316
class NewPack(Pack):
208
317
    """An in memory proxy for a pack which is being created."""
209
318
 
210
 
    # A map of index 'type' to the file extension and position in the
211
 
    # index_sizes array.
212
 
    index_definitions = {
213
 
        'revision': ('.rix', 0),
214
 
        'inventory': ('.iix', 1),
215
 
        'text': ('.tix', 2),
216
 
        'signature': ('.six', 3),
217
 
        }
218
 
 
219
319
    def __init__(self, pack_collection, upload_suffix='', file_mode=None):
220
320
        """Create a NewPack instance.
221
321
 
318
418
        else:
319
419
            raise AssertionError(self._state)
320
420
 
321
 
    def _check_references(self):
322
 
        """Make sure our external references are present.
323
 
        
324
 
        Packs are allowed to have deltas whose base is not in the pack, but it
325
 
        must be present somewhere in this collection.  It is not allowed to
326
 
        have deltas based on a fallback repository. 
327
 
        (See <https://bugs.launchpad.net/bzr/+bug/288751>)
328
 
        """
329
 
        missing_items = {}
330
 
        for (index_name, external_refs, index) in [
331
 
            ('texts',
332
 
                self.text_index._external_references(),
333
 
                self._pack_collection.text_index.combined_index),
334
 
            ('inventories',
335
 
                self.inventory_index._external_references(),
336
 
                self._pack_collection.inventory_index.combined_index),
337
 
            ]:
338
 
            missing = external_refs.difference(
339
 
                k for (idx, k, v, r) in 
340
 
                index.iter_entries(external_refs))
341
 
            if missing:
342
 
                missing_items[index_name] = sorted(list(missing))
343
 
        if missing_items:
344
 
            from pprint import pformat
345
 
            raise errors.BzrCheckError(
346
 
                "Newly created pack file %r has delta references to "
347
 
                "items not in its repository:\n%s"
348
 
                % (self, pformat(missing_items)))
349
 
 
350
421
    def data_inserted(self):
351
422
        """True if data has been added to this pack."""
352
423
        return bool(self.get_revision_count() or
354
425
            self.text_index.key_count() or
355
426
            self.signature_index.key_count())
356
427
 
357
 
    def finish(self):
 
428
    def finish(self, suspend=False):
358
429
        """Finish the new pack.
359
430
 
360
431
        This:
369
440
        if self._buffer[1]:
370
441
            self._write_data('', flush=True)
371
442
        self.name = self._hash.hexdigest()
372
 
        self._check_references()
 
443
        if not suspend:
 
444
            self._check_references()
373
445
        # write indices
374
446
        # XXX: It'd be better to write them all to temporary names, then
375
447
        # rename them all into place, so that the window when only some are
376
448
        # visible is smaller.  On the other hand none will be seen until
377
449
        # they're in the names list.
378
450
        self.index_sizes = [None, None, None, None]
379
 
        self._write_index('revision', self.revision_index, 'revision')
380
 
        self._write_index('inventory', self.inventory_index, 'inventory')
381
 
        self._write_index('text', self.text_index, 'file texts')
 
451
        self._write_index('revision', self.revision_index, 'revision', suspend)
 
452
        self._write_index('inventory', self.inventory_index, 'inventory',
 
453
            suspend)
 
454
        self._write_index('text', self.text_index, 'file texts', suspend)
382
455
        self._write_index('signature', self.signature_index,
383
 
            'revision signatures')
 
456
            'revision signatures', suspend)
384
457
        self.write_stream.close()
385
458
        # Note that this will clobber an existing pack with the same name,
386
459
        # without checking for hash collisions. While this is undesirable this
393
466
        #  - try for HASH.pack
394
467
        #  - try for temporary-name
395
468
        #  - refresh the pack-list to see if the pack is now absent
396
 
        self.upload_transport.rename(self.random_name,
397
 
                '../packs/' + self.name + '.pack')
 
469
        new_name = self.name + '.pack'
 
470
        if not suspend:
 
471
            new_name = '../packs/' + new_name
 
472
        self.upload_transport.rename(self.random_name, new_name)
398
473
        self._state = 'finished'
399
474
        if 'pack' in debug.debug_flags:
400
475
            # XXX: size might be interesting?
401
 
            mutter('%s: create_pack: pack renamed into place: %s%s->%s%s t+%6.3fs',
 
476
            mutter('%s: create_pack: pack finished: %s%s->%s t+%6.3fs',
402
477
                time.ctime(), self.upload_transport.base, self.random_name,
403
 
                self.pack_transport, self.name,
404
 
                time.time() - self.start_time)
 
478
                new_name, time.time() - self.start_time)
405
479
 
406
480
    def flush(self):
407
481
        """Flush any current data."""
411
485
            self._hash.update(bytes)
412
486
            self._buffer[:] = [[], 0]
413
487
 
414
 
    def index_name(self, index_type, name):
415
 
        """Get the disk name of an index type for pack name 'name'."""
416
 
        return name + NewPack.index_definitions[index_type][0]
417
 
 
418
 
    def index_offset(self, index_type):
419
 
        """Get the position in a index_size array for a given index type."""
420
 
        return NewPack.index_definitions[index_type][1]
421
 
 
422
 
    def _replace_index_with_readonly(self, index_type):
423
 
        setattr(self, index_type + '_index',
424
 
            self.index_class(self.index_transport,
425
 
                self.index_name(index_type, self.name),
426
 
                self.index_sizes[self.index_offset(index_type)]))
 
488
    def _get_external_refs(self, index):
 
489
        return index._external_references()
427
490
 
428
491
    def set_write_cache_size(self, size):
429
492
        self._cache_limit = size
430
493
 
431
 
    def _write_index(self, index_type, index, label):
 
494
    def _write_index(self, index_type, index, label, suspend=False):
432
495
        """Write out an index.
433
496
 
434
497
        :param index_type: The type of index to write - e.g. 'revision'.
436
499
        :param label: What label to give the index e.g. 'revision'.
437
500
        """
438
501
        index_name = self.index_name(index_type, self.name)
439
 
        self.index_sizes[self.index_offset(index_type)] = \
440
 
            self.index_transport.put_file(index_name, index.finish(),
441
 
            mode=self._file_mode)
 
502
        if suspend:
 
503
            transport = self.upload_transport
 
504
        else:
 
505
            transport = self.index_transport
 
506
        self.index_sizes[self.index_offset(index_type)] = transport.put_file(
 
507
            index_name, index.finish(), mode=self._file_mode)
442
508
        if 'pack' in debug.debug_flags:
443
509
            # XXX: size might be interesting?
444
510
            mutter('%s: create_pack: wrote %s index: %s%s t+%6.3fs',
1250
1316
        self.inventory_index = AggregateIndex(self.reload_pack_names)
1251
1317
        self.text_index = AggregateIndex(self.reload_pack_names)
1252
1318
        self.signature_index = AggregateIndex(self.reload_pack_names)
 
1319
        # resumed packs
 
1320
        self._resumed_packs = []
1253
1321
 
1254
1322
    def add_pack_to_memory(self, pack):
1255
1323
        """Make a Pack object available to the repository to satisfy queries.
1257
1325
        :param pack: A Pack object.
1258
1326
        """
1259
1327
        if pack.name in self._packs_by_name:
1260
 
            raise AssertionError()
 
1328
            raise AssertionError(
 
1329
                'pack %s already in _packs_by_name' % (pack.name,))
1261
1330
        self.packs.append(pack)
1262
1331
        self._packs_by_name[pack.name] = pack
1263
1332
        self.revision_index.add_index(pack.revision_index, pack)
1493
1562
            self.add_pack_to_memory(result)
1494
1563
            return result
1495
1564
 
 
1565
    def _resume_pack(self, name):
 
1566
        """Get a suspended Pack object by name.
 
1567
 
 
1568
        :param name: The name of the pack - e.g. '123456'
 
1569
        :return: A Pack object.
 
1570
        """
 
1571
        if not re.match('[a-f0-9]{32}', name):
 
1572
            # Tokens should be md5sums of the suspended pack file, i.e. 32 hex
 
1573
            # digits.
 
1574
            raise errors.UnresumableWriteGroup(
 
1575
                self.repo, [name], 'Malformed write group token')
 
1576
        try:
 
1577
            rev_index = self._make_index(name, '.rix', resume=True)
 
1578
            inv_index = self._make_index(name, '.iix', resume=True)
 
1579
            txt_index = self._make_index(name, '.tix', resume=True)
 
1580
            sig_index = self._make_index(name, '.six', resume=True)
 
1581
            result = ResumedPack(name, rev_index, inv_index, txt_index,
 
1582
                sig_index, self._upload_transport, self._pack_transport,
 
1583
                self._index_transport, self)
 
1584
        except errors.NoSuchFile, e:
 
1585
            raise errors.UnresumableWriteGroup(self.repo, [name], str(e))
 
1586
        self.add_pack_to_memory(result)
 
1587
        self._resumed_packs.append(result)
 
1588
        return result
 
1589
 
1496
1590
    def allocate(self, a_new_pack):
1497
1591
        """Allocate name in the list of packs.
1498
1592
 
1516
1610
        return self._index_class(self.transport, 'pack-names', None
1517
1611
                ).iter_all_entries()
1518
1612
 
1519
 
    def _make_index(self, name, suffix):
 
1613
    def _make_index(self, name, suffix, resume=False):
1520
1614
        size_offset = self._suffix_offsets[suffix]
1521
1615
        index_name = name + suffix
1522
 
        index_size = self._names[name][size_offset]
1523
 
        return self._index_class(
1524
 
            self._index_transport, index_name, index_size)
 
1616
        if resume:
 
1617
            transport = self._upload_transport
 
1618
            index_size = transport.stat(index_name).st_size
 
1619
        else:
 
1620
            transport = self._index_transport
 
1621
            index_size = self._names[name][size_offset]
 
1622
        return self._index_class(transport, index_name, index_size)
1525
1623
 
1526
1624
    def _max_pack_count(self, total_revisions):
1527
1625
        """Return the maximum number of packs to use for total revisions.
1802
1900
                # case.  -- mbp 20081113
1803
1901
                self._remove_pack_indices(self._new_pack)
1804
1902
                self._new_pack = None
 
1903
        for resumed_pack in self._resumed_packs:
 
1904
            try:
 
1905
                resumed_pack.abort()
 
1906
            finally:
 
1907
                # See comment in previous finally block.
 
1908
                try:
 
1909
                    self._remove_pack_indices(resumed_pack)
 
1910
                except KeyError:
 
1911
                    pass
 
1912
        del self._resumed_packs[:]
1805
1913
        self.repo._text_knit = None
1806
1914
 
 
1915
    def _remove_resumed_pack_indices(self):
 
1916
        for resumed_pack in self._resumed_packs:
 
1917
            self._remove_pack_indices(resumed_pack)
 
1918
        del self._resumed_packs[:]
 
1919
 
1807
1920
    def _commit_write_group(self):
1808
1921
        all_missing = set()
1809
1922
        for prefix, versioned_file in (
1819
1932
                "Repository %s has missing compression parent(s) %r "
1820
1933
                 % (self.repo, sorted(all_missing)))
1821
1934
        self._remove_pack_indices(self._new_pack)
 
1935
        should_autopack = False
1822
1936
        if self._new_pack.data_inserted():
1823
1937
            # get all the data to disk and read to use
1824
1938
            self._new_pack.finish()
1825
1939
            self.allocate(self._new_pack)
1826
1940
            self._new_pack = None
 
1941
            should_autopack = True
 
1942
        else:
 
1943
            self._new_pack.abort()
 
1944
            self._new_pack = None
 
1945
        for resumed_pack in self._resumed_packs:
 
1946
            # XXX: this is a pretty ugly way to turn the resumed pack into a
 
1947
            # properly committed pack.
 
1948
            self._names[resumed_pack.name] = None
 
1949
            self._remove_pack_from_memory(resumed_pack)
 
1950
            resumed_pack.finish()
 
1951
            self.allocate(resumed_pack)
 
1952
            should_autopack = True
 
1953
        del self._resumed_packs[:]
 
1954
        if should_autopack:
1827
1955
            if not self.autopack():
1828
1956
                # when autopack takes no steps, the names list is still
1829
1957
                # unsaved.
1830
1958
                self._save_pack_names()
 
1959
        self.repo._text_knit = None
 
1960
 
 
1961
    def _suspend_write_group(self):
 
1962
        tokens = [pack.name for pack in self._resumed_packs]
 
1963
        self._remove_pack_indices(self._new_pack)
 
1964
        if self._new_pack.data_inserted():
 
1965
            # get all the data to disk and read to use
 
1966
            self._new_pack.finish(suspend=True)
 
1967
            tokens.append(self._new_pack.name)
 
1968
            self._new_pack = None
1831
1969
        else:
1832
1970
            self._new_pack.abort()
1833
1971
            self._new_pack = None
 
1972
        self._remove_resumed_pack_indices()
1834
1973
        self.repo._text_knit = None
 
1974
        return tokens
 
1975
 
 
1976
    def _resume_write_group(self, tokens):
 
1977
        for token in tokens:
 
1978
            self._resume_pack(token)
1835
1979
 
1836
1980
 
1837
1981
class KnitPackRepository(KnitRepository):
1986
2130
    def _commit_write_group(self):
1987
2131
        return self._pack_collection._commit_write_group()
1988
2132
 
 
2133
    def suspend_write_group(self):
 
2134
        # XXX check self._write_group is self.get_transaction()?
 
2135
        tokens = self._pack_collection._suspend_write_group()
 
2136
        self._write_group = None
 
2137
        return tokens
 
2138
 
 
2139
    def _resume_write_group(self, tokens):
 
2140
        self._start_write_group()
 
2141
        self._pack_collection._resume_write_group(tokens)
 
2142
 
1989
2143
    def get_transaction(self):
1990
2144
        if self._write_lock_count:
1991
2145
            return self._transaction