~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/groupcompress_repo.py

  • Committer: John Arbash Meinel
  • Date: 2009-03-13 03:27:18 UTC
  • mto: (3735.2.143 brisbane-core)
  • mto: This revision was merged to the branch mainline in revision 4280.
  • Revision ID: john@arbash-meinel.com-20090313032718-kojgpztxqo0qo7sz
Finish refactoring everything into _copy_X_texts() style.

Show diffs side-by-side

added added

removed removed

Lines of Context:
175
175
        self._pack_collection = pack_collection
176
176
        # ATM, We only support this for GCCHK repositories
177
177
        assert pack_collection.chk_index is not None
 
178
        self._filtered_fetch = (revision_ids is not None)
178
179
        self._chk_id_roots = []
179
180
        self._chk_p_id_roots = []
 
181
        self._referenced_texts = set()
 
182
        # set by .pack() if self.revision_ids is not None
 
183
        self.revision_keys = None
180
184
 
181
 
    def _get_progress_stream(self, source_vf, keys, index_name, pb):
 
185
    def _get_progress_stream(self, source_vf, keys, message, pb):
182
186
        def pb_stream():
183
187
            substream = source_vf.get_record_stream(keys, 'groupcompress', True)
184
188
            for idx, record in enumerate(substream):
185
189
                if pb is not None:
186
 
                    pb.update(index_name, idx + 1, len(keys))
 
190
                    pb.update(message, idx + 1, len(keys))
187
191
                yield record
188
192
        return pb_stream()
189
193
 
190
 
    def _get_filtered_inv_stream(self, source_vf, keys, pb=None):
 
194
    def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
191
195
        """Filter the texts of inventories, to find the chk pages."""
192
196
        total_keys = len(keys)
193
197
        def _filtered_inv_stream():
261
265
                                    keys_by_search_prefix.setdefault(prefix,
262
266
                                        []).append(value)
263
267
                                    next_keys.add(value)
 
268
                        # XXX: We don't walk the chk map to determine
 
269
                        #      referenced (file_id, revision_id) keys.
 
270
                        #      We don't do it yet because you really need to
 
271
                        #      filter out the ones that are present in the
 
272
                        #      parents of the rev just before the ones you are
 
273
                        #      copying, otherwise the filter is grabbing too
 
274
                        #      many keys...
264
275
                        counter[0] += 1
265
276
                        if pb is not None:
266
277
                            pb.update('chk node', counter[0], total_keys)
273
284
                    cur_keys.extend(keys_by_search_prefix[prefix])
274
285
        for stream in _get_referenced_stream(self._chk_id_roots):
275
286
            yield stream
 
287
        del self._chk_id_roots
276
288
        for stream in _get_referenced_stream(self._chk_p_id_roots):
277
289
            yield stream
 
290
        del self._chk_p_id_roots
278
291
        if remaining_keys:
279
 
            trace.note('There were %d keys in the chk index, which were not'
280
 
                       ' referenced from inventories', len(remaining_keys))
281
 
            stream = source_vf.get_record_stream(remaining_keys, 'unordered',
282
 
                                                 True)
283
 
            yield stream
 
292
            trace.mutter('There were %d keys in the chk index, %d of which'
 
293
                         ' were not referenced', total_keys,
 
294
                         len(remaining_keys))
 
295
            if self.revision_ids is None:
 
296
                stream = source_vf.get_record_stream(remaining_keys,
 
297
                                                     'unordered', True)
 
298
                yield stream
284
299
 
285
300
    def _build_vf(self, index_name, parents, delta, for_write=False):
286
301
        """Build a VersionedFiles instance on top of this group of packs."""
321
336
                                   delta, for_write=True)
322
337
        return source_vf, target_vf
323
338
 
 
339
    def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
 
340
                     pb_offset):
 
341
        trace.mutter('repacking %d %s', len(keys), message)
 
342
        self.pb.update('repacking %s', pb_offset)
 
343
        child_pb = ui.ui_factory.nested_progress_bar()
 
344
        try:
 
345
            stream = vf_to_stream(source_vf, keys, message, child_pb)
 
346
            target_vf.insert_record_stream(stream)
 
347
        finally:
 
348
            child_pb.finished()
 
349
 
 
350
    def _copy_revision_texts(self):
 
351
        source_vf, target_vf = self._build_vfs('revision', True, False)
 
352
        if not self.revision_keys:
 
353
            # We are doing a full fetch, aka 'pack'
 
354
            self.revision_keys = source_vf.keys()
 
355
        self._copy_stream(source_vf, target_vf, self.revision_keys,
 
356
                          'revisions', self._get_progress_stream, 1)
 
357
 
 
358
    def _copy_inventory_texts(self):
 
359
        source_vf, target_vf = self._build_vfs('inventory', True, True)
 
360
        self._copy_stream(source_vf, target_vf, self.revision_keys,
 
361
                          'revisions', self._get_filtered_inv_stream, 2)
 
362
 
 
363
    def _copy_chk_texts(self):
 
364
        source_vf, target_vf = self._build_vfs('chk', False, False)
 
365
        # TODO: This is technically spurious... if it is a performance issue,
 
366
        #       remove it
 
367
        total_keys = source_vf.keys()
 
368
        trace.mutter('repacking chk: %d id_to_entry roots,'
 
369
                     ' %d p_id_map roots, %d total keys',
 
370
                     len(self._chk_id_roots), len(self._chk_p_id_roots),
 
371
                     len(total_keys))
 
372
        self.pb.update('repacking chk', 3)
 
373
        child_pb = ui.ui_factory.nested_progress_bar()
 
374
        try:
 
375
            for stream in self._get_chk_streams(source_vf, total_keys,
 
376
                                                pb=child_pb):
 
377
                target_vf.insert_record_stream(stream)
 
378
        finally:
 
379
            child_pb.finished()
 
380
 
 
381
    def _copy_text_texts(self):
 
382
        source_vf, target_vf = self._build_vfs('text', True, True)
 
383
        # XXX: We don't walk the chk map to determine referenced (file_id,
 
384
        #      revision_id) keys.  We don't do it yet because you really need
 
385
        #      to filter out the ones that are present in the parents of the
 
386
        #      rev just before the ones you are copying, otherwise the filter
 
387
        #      is grabbing too many keys...
 
388
        text_keys = source_vf.keys()
 
389
        self._copy_stream(source_vf, target_vf, text_keys,
 
390
                          'revisions', self._get_progress_stream, 4)
 
391
 
 
392
    def _copy_signature_texts(self):
 
393
        source_vf, target_vf = self._build_vfs('signature', False, False)
 
394
        signature_keys = source_vf.keys()
 
395
        signature_keys.intersection(self.revision_keys)
 
396
        self._copy_stream(source_vf, target_vf, signature_keys,
 
397
                          'signatures', self._get_progress_stream, 5)
 
398
 
324
399
    def _create_pack_from_packs(self):
325
 
        to_copy = [('revision', True, False),
326
 
                   ('inventory', True, True),
327
 
                   ('chk', False, True),
328
 
                   ('text', True, True),
329
 
                   ('signature', False, False),
330
 
                  ]
331
 
        num_steps = len(to_copy) + 2
332
 
        self.pb.update('repacking', 0, num_steps)
 
400
        self.pb.update('repacking', 0, 7)
333
401
        self.new_pack = self.open_pack()
334
402
        # Is this necessary for GC ?
335
403
        self.new_pack.set_write_cache_size(1024*1024)
336
 
        for idx, (index_name, parents, delta) in enumerate(to_copy):
337
 
            source_vf, target_vf = self._build_vfs(index_name, parents, delta)
338
 
            keys = source_vf.keys()
339
 
            self.pb.update('repacking %s' % (index_name,), idx + 1, num_steps)
340
 
            child_pb = ui.ui_factory.nested_progress_bar()
341
 
            try:
342
 
                if index_name == 'inventory':
343
 
                    stream = self._get_filtered_inv_stream(
344
 
                        source_vf, keys, pb=child_pb)
345
 
                    target_vf.insert_record_stream(stream)
346
 
                elif index_name == 'chk':
347
 
                    for stream in self._get_chk_streams(source_vf, keys,
348
 
                                        pb=child_pb):
349
 
                        target_vf.insert_record_stream(stream)
350
 
                else:
351
 
                    target_vf.insert_record_stream(self._get_progress_stream(
352
 
                        source_vf, keys, index_name, child_pb))
353
 
            finally:
354
 
                child_pb.finished()
355
 
        # XXX: This shouldn't be necessary, as GC packs don't have external
356
 
        #      references.
 
404
        self._copy_revision_texts()
 
405
        self._copy_inventory_texts()
 
406
        self._copy_chk_texts()
 
407
        self._copy_text_texts()
 
408
        self._copy_signature_texts()
357
409
        self.new_pack._check_references()
358
410
        if not self._use_pack(self.new_pack):
359
411
            self.new_pack.abort()
360
412
            return None
361
 
        self.pb.update('Finishing pack', idx + 1, num_steps)
 
413
        self.pb.update('finishing repack', 6, 7)
362
414
        self.new_pack.finish()
363
415
        self._pack_collection.allocate(self.new_pack)
364
416
        return self.new_pack