~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/bundle/serializer/v4.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-08-09 15:19:06 UTC
  • mfrom: (2681.1.7 send-bundle)
  • Revision ID: pqm@pqm.ubuntu.com-20070809151906-hdn9oyslf2qib2op
Allow omitting -o for bundle, add --format

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
from cStringIO import StringIO
18
18
import bz2
27
27
    pack,
28
28
    revision as _mod_revision,
29
29
    trace,
30
 
    serializer,
 
30
    xml_serializer,
31
31
    )
32
 
from bzrlib.bundle import bundle_data, serializer as bundle_serializer
 
32
from bzrlib.bundle import bundle_data, serializer
33
33
from bzrlib.util import bencode
34
34
 
35
35
 
54
54
 
55
55
    def begin(self):
56
56
        """Start writing the bundle"""
57
 
        self._fileobj.write(bundle_serializer._get_bundle_header(
58
 
            bundle_serializer.v4_string))
 
57
        self._fileobj.write(serializer._get_bundle_header(
 
58
            serializer.v4_string))
59
59
        self._fileobj.write('#\n')
60
60
        self._container.begin()
61
61
 
107
107
    @staticmethod
108
108
    def encode_name(content_kind, revision_id, file_id=None):
109
109
        """Encode semantic ids as a container name"""
110
 
        if content_kind not in ('revision', 'file', 'inventory', 'signature',
111
 
                'info'):
112
 
            raise ValueError(content_kind)
 
110
        assert content_kind in ('revision', 'file', 'inventory', 'signature',
 
111
                                'info')
 
112
 
113
113
        if content_kind == 'file':
114
 
            if file_id is None:
115
 
                raise AssertionError()
 
114
            assert file_id is not None
116
115
        else:
117
 
            if file_id is not None:
118
 
                raise AssertionError()
 
116
            assert file_id is None
119
117
        if content_kind == 'info':
120
 
            if revision_id is not None:
121
 
                raise AssertionError()
122
 
        elif revision_id is None:
123
 
            raise AssertionError()
 
118
            assert revision_id is None
 
119
        else:
 
120
            assert revision_id is not None
124
121
        names = [n.replace('/', '//') for n in
125
122
                 (content_kind, revision_id, file_id) if n is not None]
126
123
        return '/'.join(names)
147
144
    body
148
145
    """
149
146
 
150
 
    def __init__(self, fileobj, stream_input=True):
151
 
        """Constructor
152
 
 
153
 
        :param fileobj: a file containing a bzip-encoded container
154
 
        :param stream_input: If True, the BundleReader stream input rather than
155
 
            reading it all into memory at once.  Reading it into memory all at
156
 
            once is (currently) faster.
157
 
        """
 
147
    def __init__(self, fileobj):
158
148
        line = fileobj.readline()
159
149
        if line != '\n':
160
150
            fileobj.readline()
161
151
        self.patch_lines = []
162
 
        if stream_input:
163
 
            source_file = iterablefile.IterableFile(self.iter_decode(fileobj))
164
 
        else:
165
 
            source_file = StringIO(bz2.decompress(fileobj.read()))
166
 
        self._container_file = source_file
 
152
        self._container = pack.ContainerReader(
 
153
            iterablefile.IterableFile(self.iter_decode(fileobj)))
167
154
 
168
155
    @staticmethod
169
156
    def iter_decode(fileobj):
170
157
        """Iterate through decoded fragments of the file"""
171
158
        decompressor = bz2.BZ2Decompressor()
172
159
        for line in fileobj:
173
 
            try:
174
 
                yield decompressor.decompress(line)
175
 
            except EOFError:
176
 
                return
 
160
            yield decompressor.decompress(line)
177
161
 
178
162
    @staticmethod
179
163
    def decode_name(name):
205
189
        :return: a generator of (bytes, metadata, content_kind, revision_id,
206
190
            file_id)
207
191
        """
208
 
        iterator = pack.iter_records_from_file(self._container_file)
209
 
        for names, bytes in iterator:
 
192
        iterator = self._container.iter_records()
 
193
        for names, meta_bytes in iterator:
210
194
            if len(names) != 1:
211
195
                raise errors.BadBundle('Record has %d names instead of 1'
212
196
                                       % len(names))
213
 
            metadata = bencode.bdecode(bytes)
 
197
            metadata = bencode.bdecode(meta_bytes(None))
214
198
            if metadata['storage_kind'] == 'header':
215
199
                bytes = None
216
200
            else:
217
201
                _unused, bytes = iterator.next()
 
202
                bytes = bytes(None)
218
203
            yield (bytes, metadata) + self.decode_name(names[0][0])
219
204
 
220
205
 
221
 
class BundleSerializerV4(bundle_serializer.BundleSerializer):
 
206
class BundleSerializerV4(serializer.BundleSerializer):
222
207
    """Implement the high-level bundle interface"""
223
208
 
224
209
    def write(self, repository, revision_ids, forced_bases, fileobj):
250
235
    @staticmethod
251
236
    def get_source_serializer(info):
252
237
        """Retrieve the serializer for a given info object"""
253
 
        return serializer.format_registry.get(info['serializer'])
 
238
        return xml_serializer.format_registry.get(info['serializer'])
254
239
 
255
240
 
256
241
class BundleWriteOperation(object):
270
255
        self.repository = repository
271
256
        bundle = BundleWriter(fileobj)
272
257
        self.bundle = bundle
 
258
        self.base_ancestry = set(repository.get_ancestry(base,
 
259
                                                         topo_sorted=False))
273
260
        if revision_ids is not None:
274
261
            self.revision_ids = revision_ids
275
262
        else:
276
 
            graph = repository.get_graph()
277
 
            revision_ids = graph.find_unique_ancestors(target, [base])
278
 
            # Strip ghosts
279
 
            parents = graph.get_parent_map(revision_ids)
280
 
            self.revision_ids = [r for r in revision_ids if r in parents]
281
 
        self.revision_keys = set([(revid,) for revid in self.revision_ids])
 
263
            revision_ids = set(repository.get_ancestry(target,
 
264
                                                       topo_sorted=False))
 
265
            self.revision_ids = revision_ids.difference(self.base_ancestry)
282
266
 
283
267
    def do_write(self):
284
268
        """Write all data to the bundle"""
285
 
        trace.note('Bundling %d revision(s).', len(self.revision_ids))
286
 
        self.repository.lock_read()
287
 
        try:
288
 
            self.bundle.begin()
289
 
            self.write_info()
290
 
            self.write_files()
291
 
            self.write_revisions()
292
 
            self.bundle.end()
293
 
        finally:
294
 
            self.repository.unlock()
 
269
        self.bundle.begin()
 
270
        self.write_info()
 
271
        self.write_files()
 
272
        self.write_revisions()
 
273
        self.bundle.end()
295
274
        return self.revision_ids
296
275
 
297
276
    def write_info(self):
302
281
        self.bundle.add_info_record(serializer=serializer_format,
303
282
                                    supports_rich_root=supports_rich_root)
304
283
 
 
284
    def iter_file_revisions(self):
 
285
        """Iterate through all relevant revisions of all files.
 
286
 
 
287
        This is the correct implementation, but is not compatible with bzr.dev,
 
288
        because certain old revisions were not converted correctly, and have
 
289
        the wrong "revision" marker in inventories.
 
290
        """
 
291
        transaction = self.repository.get_transaction()
 
292
        altered = self.repository.fileids_altered_by_revision_ids(
 
293
            self.revision_ids)
 
294
        for file_id, file_revision_ids in altered.iteritems():
 
295
            vf = self.repository.weave_store.get_weave(file_id, transaction)
 
296
            yield vf, file_id, file_revision_ids
 
297
 
 
298
    def iter_file_revisions_aggressive(self):
 
299
        """Iterate through all relevant revisions of all files.
 
300
 
 
301
        This uses the standard iter_file_revisions to determine what revisions
 
302
        are referred to by inventories, but then uses the versionedfile to
 
303
        determine what the build-dependencies of each required revision.
 
304
 
 
305
        All build dependencies which are not ancestors of the base revision
 
306
        are emitted.
 
307
        """
 
308
        for vf, file_id, file_revision_ids in self.iter_file_revisions():
 
309
            new_revision_ids = set()
 
310
            pending = list(file_revision_ids)
 
311
            while len(pending) > 0:
 
312
                revision_id = pending.pop()
 
313
                if revision_id in new_revision_ids:
 
314
                    continue
 
315
                if revision_id in self.base_ancestry:
 
316
                    continue
 
317
                new_revision_ids.add(revision_id)
 
318
                pending.extend(vf.get_parents(revision_id))
 
319
            yield vf, file_id, new_revision_ids
 
320
 
305
321
    def write_files(self):
306
322
        """Write bundle records for all revisions of all files"""
307
 
        text_keys = []
308
 
        altered_fileids = self.repository.fileids_altered_by_revision_ids(
309
 
                self.revision_ids)
310
 
        for file_id, revision_ids in altered_fileids.iteritems():
311
 
            for revision_id in revision_ids:
312
 
                text_keys.append((file_id, revision_id))
313
 
        self._add_mp_records_keys('file', self.repository.texts, text_keys)
 
323
        for vf, file_id, revision_ids in self.iter_file_revisions_aggressive():
 
324
            self.add_mp_records('file', file_id, vf, revision_ids)
314
325
 
315
326
    def write_revisions(self):
316
327
        """Write bundle records for all revisions and signatures"""
317
 
        inv_vf = self.repository.inventories
318
 
        revision_order = [key[-1] for key in multiparent.topo_iter_keys(inv_vf,
319
 
            self.revision_keys)]
 
328
        inv_vf = self.repository.get_inventory_weave()
 
329
        revision_order = list(multiparent.topo_iter(inv_vf, self.revision_ids))
320
330
        if self.target is not None and self.target in self.revision_ids:
321
331
            revision_order.remove(self.target)
322
332
            revision_order.append(self.target)
323
 
        self._add_mp_records_keys('inventory', inv_vf, [(revid,) for revid in revision_order])
324
 
        parent_map = self.repository.get_parent_map(revision_order)
325
 
        revision_to_str = self.repository._serializer.write_revision_to_string
326
 
        revisions = self.repository.get_revisions(revision_order)
327
 
        for revision in revisions:
328
 
            revision_id = revision.revision_id
329
 
            parents = parent_map.get(revision_id, None)
330
 
            revision_text = revision_to_str(revision)
 
333
        self.add_mp_records('inventory', None, inv_vf, revision_order)
 
334
        parents_list = self.repository.get_parents(revision_order)
 
335
        for parents, revision_id in zip(parents_list, revision_order):
 
336
            revision_text = self.repository.get_revision_xml(revision_id)
331
337
            self.bundle.add_fulltext_record(revision_text, parents,
332
338
                                       'revision', revision_id)
333
339
            try:
352
358
                base = parents[0]
353
359
        return base, target
354
360
 
355
 
    def _add_mp_records_keys(self, repo_kind, vf, keys):
 
361
    def add_mp_records(self, repo_kind, file_id, vf, revision_ids):
356
362
        """Add multi-parent diff records to a bundle"""
357
 
        ordered_keys = list(multiparent.topo_iter_keys(vf, keys))
358
 
        mpdiffs = vf.make_mpdiffs(ordered_keys)
359
 
        sha1s = vf.get_sha1s(ordered_keys)
360
 
        parent_map = vf.get_parent_map(ordered_keys)
361
 
        for mpdiff, item_key, in zip(mpdiffs, ordered_keys):
362
 
            sha1 = sha1s[item_key]
363
 
            parents = [key[-1] for key in parent_map[item_key]]
 
363
        revision_ids = list(multiparent.topo_iter(vf, revision_ids))
 
364
        mpdiffs = vf.make_mpdiffs(revision_ids)
 
365
        sha1s = vf.get_sha1s(revision_ids)
 
366
        for mpdiff, revision_id, sha1, in zip(mpdiffs, revision_ids, sha1s):
 
367
            parents = vf.get_parents(revision_id)
364
368
            text = ''.join(mpdiff.to_patch())
365
 
            # Infer file id records as appropriate.
366
 
            if len(item_key) == 2:
367
 
                file_id = item_key[0]
368
 
            else:
369
 
                file_id = None
370
369
            self.bundle.add_multiparent_record(text, sha1, parents, repo_kind,
371
 
                                               item_key[-1], file_id)
 
370
                                               revision_id, file_id)
372
371
 
373
372
 
374
373
class BundleInfoV4(object):
383
382
    def install(self, repository):
384
383
        return self.install_revisions(repository)
385
384
 
386
 
    def install_revisions(self, repository, stream_input=True):
387
 
        """Install this bundle's revisions into the specified repository
388
 
 
389
 
        :param target_repo: The repository to install into
390
 
        :param stream_input: If True, will stream input rather than reading it
391
 
            all into memory at once.  Reading it into memory all at once is
392
 
            (currently) faster.
393
 
        """
 
385
    def install_revisions(self, repository):
 
386
        """Install this bundle's revisions into the specified repository"""
394
387
        repository.lock_write()
395
388
        try:
396
 
            ri = RevisionInstaller(self.get_bundle_reader(stream_input),
 
389
            ri = RevisionInstaller(self.get_bundle_reader(),
397
390
                                   self._serializer, repository)
398
391
            return ri.install()
399
392
        finally:
406
399
        """
407
400
        return None, self.target, 'inapplicable'
408
401
 
409
 
    def get_bundle_reader(self, stream_input=True):
410
 
        """Return a new BundleReader for the associated bundle
411
 
 
412
 
        :param stream_input: If True, the BundleReader stream input rather than
413
 
            reading it all into memory at once.  Reading it into memory all at
414
 
            once is (currently) faster.
415
 
        """
 
402
    def get_bundle_reader(self):
416
403
        self._fileobj.seek(0)
417
 
        return BundleReader(self._fileobj, stream_input)
 
404
        return BundleReader(self._fileobj)
418
405
 
419
406
    def _get_real_revisions(self):
420
407
        if self.__real_revisions is None:
457
444
        self._info = None
458
445
 
459
446
    def install(self):
460
 
        """Perform the installation.
461
 
 
462
 
        Must be called with the Repository locked.
463
 
        """
464
 
        self._repository.start_write_group()
465
 
        try:
466
 
            result = self._install_in_write_group()
467
 
        except:
468
 
            self._repository.abort_write_group()
469
 
            raise
470
 
        self._repository.commit_write_group()
471
 
        return result
472
 
 
473
 
    def _install_in_write_group(self):
 
447
        """Perform the installation"""
474
448
        current_file = None
475
449
        current_versionedfile = None
476
450
        pending_file_records = []
477
 
        inventory_vf = None
478
 
        pending_inventory_records = []
479
451
        added_inv = set()
480
452
        target_revision = None
481
453
        for bytes, metadata, repo_kind, revision_id, file_id in\
482
454
            self._container.iter_records():
483
455
            if repo_kind == 'info':
484
 
                if self._info is not None:
485
 
                    raise AssertionError()
 
456
                assert self._info is None
486
457
                self._handle_info(metadata)
487
 
            if (pending_file_records and
488
 
                (repo_kind, file_id) != ('file', current_file)):
489
 
                # Flush the data for a single file - prevents memory
490
 
                # spiking due to buffering all files in memory.
491
 
                self._install_mp_records_keys(self._repository.texts,
 
458
            if repo_kind != 'file':
 
459
                self._install_mp_records(current_versionedfile,
492
460
                    pending_file_records)
493
461
                current_file = None
494
 
                del pending_file_records[:]
495
 
            if len(pending_inventory_records) > 0 and repo_kind != 'inventory':
496
 
                self._install_inventory_records(pending_inventory_records)
497
 
                pending_inventory_records = []
498
 
            if repo_kind == 'inventory':
499
 
                pending_inventory_records.append(((revision_id,), metadata, bytes))
500
 
            if repo_kind == 'revision':
501
 
                target_revision = revision_id
502
 
                self._install_revision(revision_id, metadata, bytes)
503
 
            if repo_kind == 'signature':
504
 
                self._install_signature(revision_id, metadata, bytes)
 
462
                current_versionedfile = None
 
463
                pending_file_records = []
 
464
                if repo_kind == 'inventory':
 
465
                    self._install_inventory(revision_id, metadata, bytes)
 
466
                if repo_kind == 'revision':
 
467
                    target_revision = revision_id
 
468
                    self._install_revision(revision_id, metadata, bytes)
 
469
                if repo_kind == 'signature':
 
470
                    self._install_signature(revision_id, metadata, bytes)
505
471
            if repo_kind == 'file':
506
 
                current_file = file_id
507
 
                pending_file_records.append(((file_id, revision_id), metadata, bytes))
508
 
        self._install_mp_records_keys(self._repository.texts, pending_file_records)
 
472
                if file_id != current_file:
 
473
                    self._install_mp_records(current_versionedfile,
 
474
                        pending_file_records)
 
475
                    current_file = file_id
 
476
                    current_versionedfile = \
 
477
                        self._repository.weave_store.get_weave_or_empty(
 
478
                        file_id, self._repository.get_transaction())
 
479
                    pending_file_records = []
 
480
                if revision_id in current_versionedfile:
 
481
                    continue
 
482
                pending_file_records.append((revision_id, metadata, bytes))
 
483
        self._install_mp_records(current_versionedfile, pending_file_records)
509
484
        return target_revision
510
485
 
511
486
    def _handle_info(self, info):
526
501
                      records if r not in versionedfile]
527
502
        versionedfile.add_mpdiffs(vf_records)
528
503
 
529
 
    def _install_mp_records_keys(self, versionedfile, records):
530
 
        d_func = multiparent.MultiParent.from_patch
531
 
        vf_records = []
532
 
        for key, meta, text in records:
533
 
            # Adapt to tuple interface: A length two key is a file_id,
534
 
            # revision_id pair, a length 1 key is a
535
 
            # revision/signature/inventory. We need to do this because
536
 
            # the metadata extraction from the bundle has not yet been updated
537
 
            # to use the consistent tuple interface itself.
538
 
            if len(key) == 2:
539
 
                prefix = key[:1]
540
 
            else:
541
 
                prefix = ()
542
 
            parents = [prefix + (parent,) for parent in meta['parents']]
543
 
            vf_records.append((key, parents, meta['sha1'], d_func(text)))
544
 
        versionedfile.add_mpdiffs(vf_records)
545
 
 
546
 
    def _install_inventory_records(self, records):
 
504
    def _install_inventory(self, revision_id, metadata, text):
 
505
        vf = self._repository.get_inventory_weave()
 
506
        if revision_id in vf:
 
507
            return
 
508
        parent_ids = metadata['parents']
547
509
        if self._info['serializer'] == self._repository._serializer.format_num:
548
 
            return self._install_mp_records_keys(self._repository.inventories,
549
 
                records)
550
 
        for key, metadata, bytes in records:
551
 
            revision_id = key[-1]
552
 
            parent_ids = metadata['parents']
553
 
            parents = [self._repository.get_inventory(p)
554
 
                       for p in parent_ids]
555
 
            p_texts = [self._source_serializer.write_inventory_to_string(p)
556
 
                       for p in parents]
557
 
            target_lines = multiparent.MultiParent.from_patch(bytes).to_lines(
558
 
                p_texts)
559
 
            sha1 = osutils.sha_strings(target_lines)
560
 
            if sha1 != metadata['sha1']:
561
 
                raise errors.BadBundle("Can't convert to target format")
562
 
            target_inv = self._source_serializer.read_inventory_from_string(
563
 
                ''.join(target_lines))
564
 
            self._handle_root(target_inv, parent_ids)
565
 
            try:
566
 
                self._repository.add_inventory(revision_id, target_inv,
567
 
                                               parent_ids)
568
 
            except errors.UnsupportedInventoryKind:
569
 
                raise errors.IncompatibleRevision(repr(self._repository))
 
510
            return self._install_mp_records(vf, [(revision_id, metadata,
 
511
                                                  text)])
 
512
        parents = [self._repository.get_inventory(p)
 
513
                   for p in parent_ids]
 
514
        parent_texts = [self._source_serializer.write_inventory_to_string(p)
 
515
                        for p in parents]
 
516
        target_lines = multiparent.MultiParent.from_patch(text).to_lines(
 
517
            parent_texts)
 
518
        sha1 = osutils.sha_strings(target_lines)
 
519
        if sha1 != metadata['sha1']:
 
520
            raise errors.BadBundle("Can't convert to target format")
 
521
        target_inv = self._source_serializer.read_inventory_from_string(
 
522
            ''.join(target_lines))
 
523
        self._handle_root(target_inv, parent_ids)
 
524
        try:
 
525
            self._repository.add_inventory(revision_id, target_inv, parent_ids)
 
526
        except errors.UnsupportedInventoryKind:
 
527
            raise errors.IncompatibleRevision(repr(self._repository))
570
528
 
571
529
    def _handle_root(self, target_inv, parent_ids):
572
530
        revision_id = target_inv.revision_id
573
531
        if self.update_root:
574
 
            text_key = (target_inv.root.file_id, revision_id)
575
 
            parent_keys = [(target_inv.root.file_id, parent) for
576
 
                parent in parent_ids]
577
 
            self._repository.texts.add_lines(text_key, parent_keys, [])
 
532
            target_inv.root.revision = revision_id
 
533
            store = self._repository.weave_store
 
534
            transaction = self._repository.get_transaction()
 
535
            vf = store.get_weave_or_empty(target_inv.root.file_id, transaction)
 
536
            vf.add_lines(revision_id, parent_ids, [])
578
537
        elif not self._repository.supports_rich_root():
579
538
            if target_inv.root.revision != revision_id:
580
539
                raise errors.IncompatibleRevision(repr(self._repository))
581
540
 
 
541
 
582
542
    def _install_revision(self, revision_id, metadata, text):
583
543
        if self._repository.has_revision(revision_id):
584
544
            return
585
 
        revision = self._source_serializer.read_revision_from_string(text)
586
 
        self._repository.add_revision(revision.revision_id, revision)
 
545
        self._repository._add_revision_text(revision_id, text)
587
546
 
588
547
    def _install_signature(self, revision_id, metadata, text):
589
548
        transaction = self._repository.get_transaction()
590
 
        if self._repository.has_signature_for_revision_id(revision_id):
 
549
        if self._repository._revision_store.has_signature(revision_id,
 
550
                                                          transaction):
591
551
            return
592
 
        self._repository.add_signature_text(revision_id, text)
 
552
        self._repository._revision_store.add_revision_signature_text(
 
553
            revision_id, text, transaction)