95
90
self._format = _format
96
91
self._base = a_bzrdir.transport.base
97
92
self._ignore_fallbacks = False
98
self.bzrdir = a_bzrdir
99
foreign.ForeignBranch.__init__(self,
93
foreign.ForeignBranch.__init__(self,
100
94
DummyForeignVcsMapping(DummyForeignVcs()))
101
branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir,
95
branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir,
104
def _get_checkout_format(self):
105
"""Return the most suitable metadir for a checkout of this branch.
106
Weaves are used if this branch's repository uses weaves.
108
return self.bzrdir.checkout_metadir()
110
def import_last_revision_info_and_tags(self, source, revno, revid,
112
interbranch = InterToDummyVcsBranch(source, self)
113
result = interbranch.push(stop_revision=revid, lossy=True)
115
revid = result.revidmap[revid]
116
return (revno, revid)
119
class DummyForeignCommitBuilder(vf_repository.VersionedFileRootCommitBuilder):
121
def _generate_revision_if_needed(self):
122
mapping = DummyForeignVcsMapping(DummyForeignVcs())
124
self._new_revision_id = mapping.revision_id_foreign_to_bzr(
125
(str(self._timestamp), str(self._timezone), "UNKNOWN"))
126
self.random_revid = False
127
elif self._new_revision_id is not None:
128
self.random_revid = False
130
self._new_revision_id = self._gen_revision_id()
131
self.random_revid = True
134
class DummyForeignVcsRepository(groupcompress_repo.CHKInventoryRepository,
135
foreign.ForeignRepository):
136
"""Dummy foreign vcs repository."""
139
class DummyForeignVcsRepositoryFormat(groupcompress_repo.RepositoryFormat2a):
141
repository_class = DummyForeignVcsRepository
142
_commit_builder_class = DummyForeignCommitBuilder
144
def get_format_string(self):
145
return "Dummy Foreign Vcs Repository"
147
def get_format_description(self):
148
return "Dummy Foreign Vcs Repository"
151
class InterToDummyVcsBranch(branch.GenericInterBranch):
99
class InterToDummyVcsBranch(branch.GenericInterBranch,
100
foreign.InterToForeignBranch):
154
103
def is_compatible(source, target):
155
104
return isinstance(target, DummyForeignVcsBranch)
157
def push(self, overwrite=False, stop_revision=None, lossy=False):
159
raise errors.NoRoundtrippingSupport(self.source, self.target)
106
def push(self, overwrite=False, stop_revision=None):
107
raise errors.NoRoundtrippingSupport(self.source, self.target)
109
def lossy_push(self, stop_revision=None):
160
110
result = branch.BranchPushResult()
161
111
result.source_branch = self.source
162
112
result.target_branch = self.target
163
113
result.old_revno, result.old_revid = self.target.last_revision_info()
164
114
self.source.lock_read()
166
graph = self.source.repository.get_graph()
167
116
# This just handles simple cases, but that's good enough for tests
168
117
my_history = self.target.revision_history()
169
if stop_revision is None:
170
stop_revision = self.source.last_revision()
171
their_history = list(graph.iter_lefthand_ancestry(stop_revision,
172
(revision.NULL_REVISION,)))
173
their_history.reverse()
118
their_history = self.source.revision_history()
174
119
if their_history[:min(len(my_history), len(their_history))] != my_history:
175
120
raise errors.DivergedBranches(self.target, self.source)
176
121
todo = their_history[len(my_history):]
226
171
super(DummyForeignVcsBranchFormat, self).__init__()
227
172
self._matchingbzrdir = DummyForeignVcsDirFormat()
229
def open(self, a_bzrdir, name=None, _found=False, ignore_fallbacks=False,
230
found_repository=None):
174
def open(self, a_bzrdir, _found=False):
232
176
raise NotImplementedError
234
transport = a_bzrdir.get_branch_transport(None, name=name)
178
transport = a_bzrdir.get_branch_transport(None)
235
179
control_files = lockable_files.LockableFiles(transport, 'lock',
237
if found_repository is None:
238
found_repository = a_bzrdir.find_repository()
239
181
return DummyForeignVcsBranch(_format=self,
240
182
_control_files=control_files,
241
183
a_bzrdir=a_bzrdir,
242
_repository=found_repository)
184
_repository=a_bzrdir.find_repository())
243
185
except errors.NoSuchFile:
244
186
raise errors.NotBranchError(path=transport.base)
298
243
self._control_files = lockable_files.LockableFiles(self.transport,
299
244
"lock", lockable_files.TransportLock)
301
def create_workingtree(self):
302
# dirstate requires a ".bzr" entry to exist
303
self.root_transport.put_bytes(".bzr", "foo")
304
return super(DummyForeignVcsDir, self).create_workingtree()
306
def open_branch(self, name=None, unsupported=False, ignore_fallbacks=True):
308
raise errors.NoColocatedBranchSupport(self)
246
def open_branch(self, ignore_fallbacks=True):
309
247
return self._format.get_branch_format().open(self, _found=True)
311
249
def cloning_metadir(self, stacked=False):
312
250
"""Produce a metadir suitable for cloning with."""
313
251
return bzrdir.format_registry.make_bzrdir("default")
315
def checkout_metadir(self):
316
return self.cloning_metadir()
318
253
def sprout(self, url, revision_id=None, force_new_repo=False,
319
254
recurse='down', possible_transports=None,
320
255
accelerator_tree=None, hardlink=False, stacked=False,
330
265
def register_dummy_foreign_for_test(testcase):
331
controldir.ControlDirFormat.register_prober(DummyForeignProber)
332
testcase.addCleanup(controldir.ControlDirFormat.unregister_prober,
334
repository.format_registry.register(DummyForeignVcsRepositoryFormat())
335
testcase.addCleanup(repository.format_registry.remove,
336
DummyForeignVcsRepositoryFormat())
337
branch.format_registry.register(DummyForeignVcsBranchFormat())
338
testcase.addCleanup(branch.format_registry.remove,
339
DummyForeignVcsBranchFormat())
266
bzrdir.BzrDirFormat.register_control_format(DummyForeignVcsDirFormat)
267
testcase.addCleanup(bzrdir.BzrDirFormat.unregister_control_format,
268
DummyForeignVcsDirFormat)
340
269
# We need to register the optimiser to make the dummy appears really
341
270
# different from a regular bzr repository.
342
271
branch.InterBranch.register_optimiser(InterToDummyVcsBranch)