78
78
from bzrlib import (
81
from bzrlib.trace import mutter
82
81
from bzrlib.errors import (WeaveError, WeaveFormatError, WeaveParentMismatch,
83
82
RevisionAlreadyPresent,
84
83
RevisionNotPresent,
84
UnavailableRepresentation,
85
85
WeaveRevisionAlreadyPresent,
86
86
WeaveRevisionNotPresent,
88
88
import bzrlib.errors as errors
89
from bzrlib.osutils import sha_strings
89
from bzrlib.osutils import dirname, sha_strings, split_lines
90
90
import bzrlib.patiencediff
91
from bzrlib.revision import NULL_REVISION
92
from bzrlib.symbol_versioning import *
93
from bzrlib.trace import mutter
91
94
from bzrlib.tsort import topo_sort
92
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
95
from bzrlib.versionedfile import (
93
101
from bzrlib.weavefile import _read_weave_v5, write_weave_v5
104
class WeaveContentFactory(ContentFactory):
105
"""Content factory for streaming from weaves.
107
:seealso ContentFactory:
110
def __init__(self, version, weave):
111
"""Create a WeaveContentFactory for version from weave."""
112
ContentFactory.__init__(self)
113
self.sha1 = weave.get_sha1s([version])[version]
114
self.key = (version,)
115
parents = weave.get_parent_map([version])[version]
116
self.parents = tuple((parent,) for parent in parents)
117
self.storage_kind = 'fulltext'
120
def get_bytes_as(self, storage_kind):
121
if storage_kind == 'fulltext':
122
return self._weave.get_text(self.key[-1])
124
raise UnavailableRepresentation(self.key, storage_kind, 'fulltext')
96
127
class Weave(VersionedFile):
97
128
"""weave - versioned text file storage.
185
216
__slots__ = ['_weave', '_parents', '_sha1s', '_names', '_name_map',
186
'_weave_name', '_matcher']
217
'_weave_name', '_matcher', '_allow_reserved']
188
def __init__(self, weave_name=None, access_mode='w', matcher=None):
219
def __init__(self, weave_name=None, access_mode='w', matcher=None,
220
get_scope=None, allow_reserved=False):
223
:param get_scope: A callable that returns an opaque object to be used
224
for detecting when this weave goes out of scope (should stop
225
answering requests or allowing mutation).
189
227
super(Weave, self).__init__(access_mode)
191
229
self._parents = []
197
235
self._matcher = bzrlib.patiencediff.PatienceSequenceMatcher
199
237
self._matcher = matcher
238
if get_scope is None:
239
get_scope = lambda:None
240
self._get_scope = get_scope
241
self._scope = get_scope()
242
self._access_mode = access_mode
243
self._allow_reserved = allow_reserved
201
245
def __repr__(self):
202
246
return "Weave(%r)" % self._weave_name
248
def _check_write_ok(self):
249
"""Is the versioned file marked as 'finished' ? Raise if it is."""
250
if self._get_scope() != self._scope:
251
raise errors.OutSideTransaction()
252
if self._access_mode != 'w':
253
raise errors.ReadOnlyObjectDirtiedError(self)
205
256
"""Return a deep copy of self.
246
298
__contains__ = has_version
248
def get_parents(self, version_id):
249
"""See VersionedFile.get_parent."""
250
return map(self._idx_to_name, self._parents[self._lookup(version_id)])
300
def get_record_stream(self, versions, ordering, include_delta_closure):
301
"""Get a stream of records for versions.
303
:param versions: The versions to include. Each version is a tuple
305
:param ordering: Either 'unordered' or 'topological'. A topologically
306
sorted stream has compression parents strictly before their
308
:param include_delta_closure: If True then the closure across any
309
compression parents will be included (in the opaque data).
310
:return: An iterator of ContentFactory objects, each of which is only
311
valid until the iterator is advanced.
313
versions = [version[-1] for version in versions]
314
if ordering == 'topological':
315
parents = self.get_parent_map(versions)
316
new_versions = topo_sort(parents)
317
new_versions.extend(set(versions).difference(set(parents)))
318
versions = new_versions
319
for version in versions:
321
yield WeaveContentFactory(version, self)
323
yield AbsentContentFactory((version,))
325
def get_parent_map(self, version_ids):
326
"""See VersionedFile.get_parent_map."""
328
for version_id in version_ids:
329
if version_id == NULL_REVISION:
334
map(self._idx_to_name,
335
self._parents[self._lookup(version_id)]))
336
except RevisionNotPresent:
338
result[version_id] = parents
341
def get_parents_with_ghosts(self, version_id):
342
raise NotImplementedError(self.get_parents_with_ghosts)
344
def insert_record_stream(self, stream):
345
"""Insert a record stream into this versioned file.
347
:param stream: A stream of records to insert.
349
:seealso VersionedFile.get_record_stream:
352
for record in stream:
353
# Raise an error when a record is missing.
354
if record.storage_kind == 'absent':
355
raise RevisionNotPresent([record.key[0]], self)
356
# adapt to non-tuple interface
357
parents = [parent[0] for parent in record.parents]
358
if record.storage_kind == 'fulltext':
359
self.add_lines(record.key[0], parents,
360
split_lines(record.get_bytes_as('fulltext')))
362
adapter_key = record.storage_kind, 'fulltext'
364
adapter = adapters[adapter_key]
366
adapter_factory = adapter_registry.get(adapter_key)
367
adapter = adapter_factory(self)
368
adapters[adapter_key] = adapter
369
lines = split_lines(adapter.get_bytes(
370
record, record.get_bytes_as(record.storage_kind)))
372
self.add_lines(record.key[0], parents, lines)
373
except RevisionAlreadyPresent:
252
376
def _check_repeated_add(self, name, parents, text, sha1):
253
377
"""Check that a duplicated add is OK.
444
556
return len(other_parents.difference(my_parents)) == 0
446
def annotate_iter(self, version_id):
447
"""Yield list of (version-id, line) pairs for the specified version.
558
def annotate(self, version_id):
559
"""Return a list of (version-id, line) tuples for version_id.
449
561
The index indicates when the line originated in the weave."""
450
562
incls = [self._lookup(version_id)]
451
for origin, lineno, text in self._extract(incls):
452
yield self._idx_to_name(origin), text
563
return [(self._idx_to_name(origin), text) for origin, lineno, text in
564
self._extract(incls)]
454
566
def iter_lines_added_or_present_in_versions(self, version_ids=None,
660
764
expected_sha1, measured_sha1))
663
def get_sha1(self, version_id):
664
"""See VersionedFile.get_sha1()."""
665
return self._sha1s[self._lookup(version_id)]
667
767
def get_sha1s(self, version_ids):
668
768
"""See VersionedFile.get_sha1s()."""
669
return [self._sha1s[self._lookup(v)] for v in version_ids]
770
for v in version_ids:
771
result[v] = self._sha1s[self._lookup(v)]
671
774
def num_versions(self):
672
775
"""How many versions are in this weave?"""
673
776
l = len(self._parents)
674
assert l == len(self._sha1s)
677
779
__len__ = num_versions
703
805
for p in self._parents[i]:
704
806
new_inc.update(inclusions[self._idx_to_name(p)])
706
assert set(new_inc) == set(self.get_ancestry(name)), \
707
'failed %s != %s' % (set(new_inc), set(self.get_ancestry(name)))
808
if set(new_inc) != set(self.get_ancestry(name)):
809
raise AssertionError(
811
% (set(new_inc), set(self.get_ancestry(name))))
708
812
inclusions[name] = new_inc
710
814
nlines = len(self._weave)
740
844
# no lines outside of insertion blocks, that deletions are
741
845
# properly paired, etc.
743
def _join(self, other, pb, msg, version_ids, ignore_missing):
744
"""Worker routine for join()."""
745
if not other.versions():
746
return # nothing to update, easy
749
# versions is never none, InterWeave checks this.
752
# two loops so that we do not change ourselves before verifying it
754
# work through in index order to make sure we get all dependencies
757
# get the selected versions only that are in other.versions.
758
version_ids = set(other.versions()).intersection(set(version_ids))
759
# pull in the referenced graph.
760
version_ids = other.get_ancestry(version_ids)
761
pending_graph = [(version, other.get_parents(version)) for
762
version in version_ids]
763
for name in topo_sort(pending_graph):
764
other_idx = other._name_map[name]
765
# returns True if we have it, False if we need it.
766
if not self._check_version_consistent(other, other_idx, name):
767
names_to_join.append((other_idx, name))
776
for other_idx, name in names_to_join:
777
# TODO: If all the parents of the other version are already
778
# present then we can avoid some work by just taking the delta
779
# and adjusting the offsets.
780
new_parents = self._imported_parents(other, other_idx)
781
sha1 = other._sha1s[other_idx]
786
pb.update(msg, merged, len(names_to_join))
788
lines = other.get_lines(other_idx)
789
self._add(name, lines, new_parents, sha1)
791
mutter("merged = %d, processed = %d, file_id=%s; deltat=%d"%(
792
merged, processed, self._weave_name, time.time()-time0))
794
847
def _imported_parents(self, other, other_idx):
795
848
"""Return list of parents in self corresponding to indexes in other."""
854
907
WEAVE_SUFFIX = '.weave'
856
def __init__(self, name, transport, filemode=None, create=False, access_mode='w'):
909
def __init__(self, name, transport, filemode=None, create=False, access_mode='w', get_scope=None):
857
910
"""Create a WeaveFile.
859
912
:param create: If not True, only open an existing knit.
861
super(WeaveFile, self).__init__(name, access_mode)
914
super(WeaveFile, self).__init__(name, access_mode, get_scope=get_scope,
915
allow_reserved=False)
862
916
self._transport = transport
863
917
self._filemode = filemode
882
def _clone_text(self, new_version_id, old_version_id, parents):
883
"""See VersionedFile.clone_text."""
884
super(WeaveFile, self)._clone_text(new_version_id, old_version_id, parents)
887
936
def copy_to(self, name, transport):
888
937
"""See VersionedFile.copy_to()."""
889
938
# as we are all in memory always, just serialise to the new place.
893
942
transport.put_file(name + WeaveFile.WEAVE_SUFFIX, sio, self._filemode)
895
def create_empty(self, name, transport, filemode=None):
896
return WeaveFile(name, transport, filemode, create=True)
899
945
"""Save the weave."""
900
946
self._check_write_ok()
902
948
write_weave_v5(self, sio)
904
self._transport.put_file(self._weave_name + WeaveFile.WEAVE_SUFFIX,
950
bytes = sio.getvalue()
951
path = self._weave_name + WeaveFile.WEAVE_SUFFIX
953
self._transport.put_bytes(path, bytes, self._filemode)
954
except errors.NoSuchFile:
955
self._transport.mkdir(dirname(path))
956
self._transport.put_bytes(path, bytes, self._filemode)
909
959
def get_suffixes():
910
960
"""See VersionedFile.get_suffixes()."""
911
961
return [WeaveFile.WEAVE_SUFFIX]
963
def insert_record_stream(self, stream):
964
super(WeaveFile, self).insert_record_stream(stream)
967
@deprecated_method(one_five)
913
968
def join(self, other, pb=None, msg=None, version_ids=None,
914
969
ignore_missing=False):
915
970
"""Join other into self and save."""
1179
1234
if __name__ == '__main__':
1181
1236
sys.exit(main(sys.argv))
1184
class InterWeave(InterVersionedFile):
1185
"""Optimised code paths for weave to weave operations."""
1187
_matching_file_from_factory = staticmethod(WeaveFile)
1188
_matching_file_to_factory = staticmethod(WeaveFile)
1191
def is_compatible(source, target):
1192
"""Be compatible with weaves."""
1194
return (isinstance(source, Weave) and
1195
isinstance(target, Weave))
1196
except AttributeError:
1199
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
1200
"""See InterVersionedFile.join."""
1201
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
1202
if self.target.versions() == [] and version_ids is None:
1203
self.target._copy_weave_content(self.source)
1205
self.target._join(self.source, pb, msg, version_ids, ignore_missing)
1208
InterVersionedFile.register_optimiser(InterWeave)