1088
1088
def _commit_write_group(self):
1089
1089
"""Template method for per-repository write group cleanup.
1091
This is called before the write group is considered to be
1091
This is called before the write group is considered to be
1092
1092
finished and should ensure that all data handed to the repository
1093
for writing during the write group is safely committed (to the
1093
for writing during the write group is safely committed (to the
1094
1094
extent possible considering file system caching etc).
1097
def suspend_write_group(self):
1098
raise errors.UnsuspendableWriteGroup(self)
1100
def resume_write_group(self, tokens):
1101
if not self.is_write_locked():
1102
raise errors.NotWriteLocked(self)
1103
if self._write_group:
1104
raise errors.BzrError('already in a write group')
1105
self._resume_write_group(tokens)
1106
# so we can detect unlock/relock - the write group is now entered.
1107
self._write_group = self.get_transaction()
1109
def _resume_write_group(self, tokens):
1110
raise errors.UnsuspendableWriteGroup(self)
1097
1112
def fetch(self, source, revision_id=None, pb=None, find_ghosts=False):
1098
1113
"""Fetch the content required to construct revision_id from source.
2191
class RepositoryFormatRegistry(registry.Registry):
2192
"""Registry of RepositoryFormats."""
2194
def get(self, format_string):
2195
r = registry.Registry.get(self, format_string)
2201
format_registry = RepositoryFormatRegistry()
2202
"""Registry of formats, indexed by their identifying format string.
2210
network_format_registry = registry.FormatRegistry()
2211
"""Registry of formats indexed by their network name.
2213
The network name for a repository format is an identifier that can be used when
2214
referring to formats with smart server operations. See
2215
RepositoryFormat.network_name() for more detail.
2219
format_registry = registry.FormatRegistry(network_format_registry)
2220
"""Registry of formats, indexed by their BzrDirMetaFormat format string.
2204
2222
This can contain either format instances themselves, or classes/factories that
2205
2223
can be called to obtain one.
2212
2230
class RepositoryFormat(object):
2213
2231
"""A repository format.
2215
Formats provide three things:
2233
Formats provide four things:
2216
2234
* An initialization routine to construct repository data on disk.
2217
* a format string which is used when the BzrDir supports versioned
2235
* a optional format string which is used when the BzrDir supports
2219
2237
* an open routine which returns a Repository instance.
2238
* A network name for referring to the format in smart server RPC
2221
2241
There is one and only one Format subclass for each on-disk format. But
2222
2242
there can be one Repository subclass that is used for several different
2223
2243
formats. The _format attribute on a Repository instance can be used to
2224
2244
determine the disk format.
2226
Formats are placed in an dict by their format string for reference
2227
during opening. These should be subclasses of RepositoryFormat
2246
Formats are placed in a registry by their format string for reference
2247
during opening. These should be subclasses of RepositoryFormat for
2230
2250
Once a format is deprecated, just deprecate the initialize and open
2231
methods on the format class. Do not deprecate the object, as the
2232
object will be created every system load.
2251
methods on the format class. Do not deprecate the object, as the
2252
object may be created even when a repository instnace hasn't been
2234
2255
Common instance attributes:
2235
2256
_matchingbzrdir - the bzrdir format that the repository format was
2337
2358
"""Is this format supported?
2339
2360
Supported formats must be initializable and openable.
2340
Unsupported formats may not support initialization or committing or
2361
Unsupported formats may not support initialization or committing or
2341
2362
some other features depending on the reason for not being supported.
2366
def network_name(self):
2367
"""A simple byte string uniquely identifying this format for RPC calls.
2369
MetaDir repository formats use their disk format string to identify the
2370
repository over the wire. All in one formats such as bzr < 0.8, and
2371
foreign formats like svn/git and hg should use some marker which is
2372
unique and immutable.
2374
raise NotImplementedError(self.network_name)
2345
2376
def check_conversion_target(self, target_format):
2346
2377
raise NotImplementedError(self.check_conversion_target)
2348
2379
def open(self, a_bzrdir, _found=False):
2349
2380
"""Return an instance of this format for the bzrdir a_bzrdir.
2351
2382
_found is a private parameter, do not use it.
2353
2384
raise NotImplementedError(self.open)
2398
2429
control_files.unlock()
2401
# formats which have no format string are not discoverable
2402
# and not independently creatable, so are not registered. They're
2431
def network_name(self):
2432
"""Metadir formats have matching disk and network format strings."""
2433
return self.get_format_string()
2436
# Pre-0.8 formats that don't have a disk format string (because they are
2437
# versioned by the matching control directory). We use the control directories
2438
# disk format string as a key for the network_name because they meet the
2439
# constraints (simple string, unique, immmutable).
2440
network_format_registry.register_lazy(
2441
"Bazaar-NG branch, format 5\n",
2442
'bzrlib.repofmt.weaverepo',
2443
'RepositoryFormat5',
2445
network_format_registry.register_lazy(
2446
"Bazaar-NG branch, format 6\n",
2447
'bzrlib.repofmt.weaverepo',
2448
'RepositoryFormat6',
2451
# formats which have no format string are not discoverable or independently
2452
# creatable on disk, so are not registered in format_registry. They're
2403
2453
# all in bzrlib.repofmt.weaverepo now. When an instance of one of these is
2404
2454
# needed, it's constructed directly by the BzrDir. Non-native formats where
2405
2455
# the repository is not separately opened are similar.
2799
2849
"""See InterRepository.missing_revision_ids()."""
2800
2850
# we want all revisions to satisfy revision_id in source.
2801
2851
# but we don't want to stat every file here and there.
2802
# we want then, all revisions other needs to satisfy revision_id
2852
# we want then, all revisions other needs to satisfy revision_id
2803
2853
# checked, but not those that we have locally.
2804
# so the first thing is to get a subset of the revisions to
2854
# so the first thing is to get a subset of the revisions to
2805
2855
# satisfy revision_id in source, and then eliminate those that
2806
# we do already have.
2856
# we do already have.
2807
2857
# this is slow on high latency connection to self, but as as this
2808
# disk format scales terribly for push anyway due to rewriting
2858
# disk format scales terribly for push anyway due to rewriting
2809
2859
# inventory.weave, this is considered acceptable.
2810
2860
# - RBC 20060209
2811
2861
if revision_id is not None:
3185
def _fetch_batch(self, revision_ids, basis_id, basis_tree):
3235
def _get_delta_for_revision(self, tree, parent_ids, basis_id, cache):
3236
"""Get the best delta and base for this revision.
3238
:return: (basis_id, delta)
3240
possible_trees = [(parent_id, cache[parent_id])
3241
for parent_id in parent_ids
3242
if parent_id in cache]
3243
if len(possible_trees) == 0:
3244
# There either aren't any parents, or the parents aren't in the
3245
# cache, so just use the last converted tree
3246
possible_trees.append((basis_id, cache[basis_id]))
3248
for basis_id, basis_tree in possible_trees:
3249
delta = tree.inventory._make_delta(basis_tree.inventory)
3250
deltas.append((len(delta), basis_id, delta))
3252
return deltas[0][1:]
3254
def _fetch_batch(self, revision_ids, basis_id, cache):
3186
3255
"""Fetch across a few revisions.
3188
3257
:param revision_ids: The revisions to copy
3189
:param basis_id: The revision_id of basis_tree
3190
:param basis_tree: A tree that is not in revision_ids which should
3191
already exist in the target.
3192
:return: (basis_id, basis_tree) A new basis to use now that these trees
3258
:param basis_id: The revision_id of a tree that must be in cache, used
3259
as a basis for delta when no other base is available
3260
:param cache: A cache of RevisionTrees that we can use.
3261
:return: The revision_id of the last converted tree. The RevisionTree
3262
for it will be in cache
3195
3264
# Walk though all revisions; get inventory deltas, copy referenced
3196
3265
# texts that delta references, insert the delta, revision and
3198
3267
text_keys = set()
3199
3268
pending_deltas = []
3200
3269
pending_revisions = []
3270
parent_map = self.source.get_parent_map(revision_ids)
3201
3271
for tree in self.source.revision_trees(revision_ids):
3202
3272
current_revision_id = tree.get_revision_id()
3203
delta = tree.inventory._make_delta(basis_tree.inventory)
3273
parent_ids = parent_map.get(current_revision_id, ())
3274
basis_id, delta = self._get_delta_for_revision(tree, parent_ids,
3276
# Find text entries that need to be copied
3204
3277
for old_path, new_path, file_id, entry in delta:
3205
3278
if new_path is not None:
3206
3279
if not (new_path or self.target.supports_rich_root()):
3207
# We leave the inventory delta in, because that
3208
# will have the deserialised inventory root
3280
# We don't copy the text for the root node unless the
3281
# target supports_rich_root.
3211
3283
# TODO: Do we need:
3212
3284
# "if entry.revision == current_revision_id" ?
3587
3671
revision_graph[key] = tuple(parent for parent in parents if parent
3588
3672
in revision_graph)
3589
3673
return revision_graph
3676
class StreamSink(object):
3677
"""An object that can insert a stream into a repository.
3679
This interface handles the complexity of reserialising inventories and
3680
revisions from different formats, and allows unidirectional insertion into
3681
stacked repositories without looking for the missing basis parents
3685
def __init__(self, target_repo):
3686
self.target_repo = target_repo
3688
def insert_stream(self, stream, src_format, resume_tokens):
3689
"""Insert a stream's content into the target repository.
3691
:param src_format: a bzr repository format.
3693
:return: a list of resume tokens and an iterable of keys additional
3694
items required before the insertion can be completed.
3696
self.target_repo.lock_write()
3699
self.target_repo.resume_write_group(resume_tokens)
3701
self.target_repo.start_write_group()
3703
# locked_insert_stream performs a commit|suspend.
3704
return self._locked_insert_stream(stream, src_format)
3706
self.target_repo.abort_write_group(suppress_errors=True)
3709
self.target_repo.unlock()
3711
def _locked_insert_stream(self, stream, src_format):
3712
to_serializer = self.target_repo._format._serializer
3713
src_serializer = src_format._serializer
3714
for substream_type, substream in stream:
3715
if substream_type == 'texts':
3716
self.target_repo.texts.insert_record_stream(substream)
3717
elif substream_type == 'inventories':
3718
if src_serializer == to_serializer:
3719
self.target_repo.inventories.insert_record_stream(
3722
self._extract_and_insert_inventories(
3723
substream, src_serializer)
3724
elif substream_type == 'revisions':
3725
# This may fallback to extract-and-insert more often than
3726
# required if the serializers are different only in terms of
3728
if src_serializer == to_serializer:
3729
self.target_repo.revisions.insert_record_stream(
3732
self._extract_and_insert_revisions(substream,
3734
elif substream_type == 'signatures':
3735
self.target_repo.signatures.insert_record_stream(substream)
3737
raise AssertionError('kaboom! %s' % (substream_type,))
3739
missing_keys = set()
3740
for prefix, versioned_file in (
3741
('texts', self.target_repo.texts),
3742
('inventories', self.target_repo.inventories),
3743
('revisions', self.target_repo.revisions),
3744
('signatures', self.target_repo.signatures),
3746
missing_keys.update((prefix,) + key for key in
3747
versioned_file.get_missing_compression_parent_keys())
3748
except NotImplementedError:
3749
# cannot even attempt suspending, and missing would have failed
3750
# during stream insertion.
3751
missing_keys = set()
3754
# suspend the write group and tell the caller what we is
3755
# missing. We know we can suspend or else we would not have
3756
# entered this code path. (All repositories that can handle
3757
# missing keys can handle suspending a write group).
3758
write_group_tokens = self.target_repo.suspend_write_group()
3759
return write_group_tokens, missing_keys
3760
self.target_repo.commit_write_group()
3763
def _extract_and_insert_inventories(self, substream, serializer):
3764
"""Generate a new inventory versionedfile in target, converting data.
3766
The inventory is retrieved from the source, (deserializing it), and
3767
stored in the target (reserializing it in a different format).
3769
for record in substream:
3770
bytes = record.get_bytes_as('fulltext')
3771
revision_id = record.key[0]
3772
inv = serializer.read_inventory_from_string(bytes, revision_id)
3773
parents = [key[0] for key in record.parents]
3774
self.target_repo.add_inventory(revision_id, inv, parents)
3776
def _extract_and_insert_revisions(self, substream, serializer):
3777
for record in substream:
3778
bytes = record.get_bytes_as('fulltext')
3779
revision_id = record.key[0]
3780
rev = serializer.read_revision_from_string(bytes)
3781
if rev.revision_id != revision_id:
3782
raise AssertionError('wtf: %s != %s' % (rev, revision_id))
3783
self.target_repo.add_revision(revision_id, rev)
3786
if self.target_repo._fetch_reconcile:
3787
self.target_repo.reconcile()