78
78
from bzrlib import (
81
from bzrlib.trace import mutter
81
82
from bzrlib.errors import (WeaveError, WeaveFormatError, WeaveParentMismatch,
82
83
RevisionAlreadyPresent,
83
84
RevisionNotPresent,
84
UnavailableRepresentation,
85
85
WeaveRevisionAlreadyPresent,
86
86
WeaveRevisionNotPresent,
88
88
import bzrlib.errors as errors
89
from bzrlib.osutils import dirname, sha_strings, split_lines
89
from bzrlib.osutils import sha_strings
90
90
import bzrlib.patiencediff
91
from bzrlib.revision import NULL_REVISION
92
from bzrlib.symbol_versioning import *
93
from bzrlib.trace import mutter
94
91
from bzrlib.tsort import topo_sort
95
from bzrlib.versionedfile import (
92
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
101
93
from bzrlib.weavefile import _read_weave_v5, write_weave_v5
104
class WeaveContentFactory(ContentFactory):
105
"""Content factory for streaming from weaves.
107
:seealso ContentFactory:
110
def __init__(self, version, weave):
111
"""Create a WeaveContentFactory for version from weave."""
112
ContentFactory.__init__(self)
113
self.sha1 = weave.get_sha1s([version])[version]
114
self.key = (version,)
115
parents = weave.get_parent_map([version])[version]
116
self.parents = tuple((parent,) for parent in parents)
117
self.storage_kind = 'fulltext'
120
def get_bytes_as(self, storage_kind):
121
if storage_kind == 'fulltext':
122
return self._weave.get_text(self.key[-1])
124
raise UnavailableRepresentation(self.key, storage_kind, 'fulltext')
127
96
class Weave(VersionedFile):
128
97
"""weave - versioned text file storage.
216
185
__slots__ = ['_weave', '_parents', '_sha1s', '_names', '_name_map',
217
'_weave_name', '_matcher', '_allow_reserved']
186
'_weave_name', '_matcher']
219
def __init__(self, weave_name=None, access_mode='w', matcher=None,
220
get_scope=None, allow_reserved=False):
223
:param get_scope: A callable that returns an opaque object to be used
224
for detecting when this weave goes out of scope (should stop
225
answering requests or allowing mutation).
188
def __init__(self, weave_name=None, access_mode='w', matcher=None):
227
189
super(Weave, self).__init__(access_mode)
229
191
self._parents = []
235
197
self._matcher = bzrlib.patiencediff.PatienceSequenceMatcher
237
199
self._matcher = matcher
238
if get_scope is None:
239
get_scope = lambda:None
240
self._get_scope = get_scope
241
self._scope = get_scope()
242
self._access_mode = access_mode
243
self._allow_reserved = allow_reserved
245
201
def __repr__(self):
246
202
return "Weave(%r)" % self._weave_name
248
def _check_write_ok(self):
249
"""Is the versioned file marked as 'finished' ? Raise if it is."""
250
if self._get_scope() != self._scope:
251
raise errors.OutSideTransaction()
252
if self._access_mode != 'w':
253
raise errors.ReadOnlyObjectDirtiedError(self)
256
205
"""Return a deep copy of self.
298
246
__contains__ = has_version
300
def get_record_stream(self, versions, ordering, include_delta_closure):
301
"""Get a stream of records for versions.
303
:param versions: The versions to include. Each version is a tuple
305
:param ordering: Either 'unordered' or 'topological'. A topologically
306
sorted stream has compression parents strictly before their
308
:param include_delta_closure: If True then the closure across any
309
compression parents will be included (in the opaque data).
310
:return: An iterator of ContentFactory objects, each of which is only
311
valid until the iterator is advanced.
313
versions = [version[-1] for version in versions]
314
if ordering == 'topological':
315
parents = self.get_parent_map(versions)
316
new_versions = topo_sort(parents)
317
new_versions.extend(set(versions).difference(set(parents)))
318
versions = new_versions
319
for version in versions:
321
yield WeaveContentFactory(version, self)
323
yield AbsentContentFactory((version,))
325
def get_parent_map(self, version_ids):
326
"""See VersionedFile.get_parent_map."""
328
for version_id in version_ids:
329
if version_id == NULL_REVISION:
334
map(self._idx_to_name,
335
self._parents[self._lookup(version_id)]))
336
except RevisionNotPresent:
338
result[version_id] = parents
341
def get_parents_with_ghosts(self, version_id):
342
raise NotImplementedError(self.get_parents_with_ghosts)
344
def insert_record_stream(self, stream):
345
"""Insert a record stream into this versioned file.
347
:param stream: A stream of records to insert.
349
:seealso VersionedFile.get_record_stream:
352
for record in stream:
353
# Raise an error when a record is missing.
354
if record.storage_kind == 'absent':
355
raise RevisionNotPresent([record.key[0]], self)
356
# adapt to non-tuple interface
357
parents = [parent[0] for parent in record.parents]
358
if record.storage_kind == 'fulltext':
359
self.add_lines(record.key[0], parents,
360
split_lines(record.get_bytes_as('fulltext')))
362
adapter_key = record.storage_kind, 'fulltext'
364
adapter = adapters[adapter_key]
366
adapter_factory = adapter_registry.get(adapter_key)
367
adapter = adapter_factory(self)
368
adapters[adapter_key] = adapter
369
lines = split_lines(adapter.get_bytes(
370
record, record.get_bytes_as(record.storage_kind)))
372
self.add_lines(record.key[0], parents, lines)
373
except RevisionAlreadyPresent:
248
def get_parents(self, version_id):
249
"""See VersionedFile.get_parent."""
250
return map(self._idx_to_name, self._parents[self._lookup(version_id)])
376
252
def _check_repeated_add(self, name, parents, text, sha1):
377
253
"""Check that a duplicated add is OK.
556
444
return len(other_parents.difference(my_parents)) == 0
558
def annotate(self, version_id):
559
"""Return a list of (version-id, line) tuples for version_id.
446
def annotate_iter(self, version_id):
447
"""Yield list of (version-id, line) pairs for the specified version.
561
449
The index indicates when the line originated in the weave."""
562
450
incls = [self._lookup(version_id)]
563
return [(self._idx_to_name(origin), text) for origin, lineno, text in
564
self._extract(incls)]
451
for origin, lineno, text in self._extract(incls):
452
yield self._idx_to_name(origin), text
566
454
def iter_lines_added_or_present_in_versions(self, version_ids=None,
764
660
expected_sha1, measured_sha1))
663
def get_sha1(self, version_id):
664
"""See VersionedFile.get_sha1()."""
665
return self._sha1s[self._lookup(version_id)]
767
667
def get_sha1s(self, version_ids):
768
668
"""See VersionedFile.get_sha1s()."""
770
for v in version_ids:
771
result[v] = self._sha1s[self._lookup(v)]
669
return [self._sha1s[self._lookup(v)] for v in version_ids]
774
671
def num_versions(self):
775
672
"""How many versions are in this weave?"""
776
673
l = len(self._parents)
674
assert l == len(self._sha1s)
779
677
__len__ = num_versions
805
703
for p in self._parents[i]:
806
704
new_inc.update(inclusions[self._idx_to_name(p)])
808
if set(new_inc) != set(self.get_ancestry(name)):
809
raise AssertionError(
811
% (set(new_inc), set(self.get_ancestry(name))))
706
assert set(new_inc) == set(self.get_ancestry(name)), \
707
'failed %s != %s' % (set(new_inc), set(self.get_ancestry(name)))
812
708
inclusions[name] = new_inc
814
710
nlines = len(self._weave)
844
740
# no lines outside of insertion blocks, that deletions are
845
741
# properly paired, etc.
743
def _join(self, other, pb, msg, version_ids, ignore_missing):
744
"""Worker routine for join()."""
745
if not other.versions():
746
return # nothing to update, easy
749
# versions is never none, InterWeave checks this.
752
# two loops so that we do not change ourselves before verifying it
754
# work through in index order to make sure we get all dependencies
757
# get the selected versions only that are in other.versions.
758
version_ids = set(other.versions()).intersection(set(version_ids))
759
# pull in the referenced graph.
760
version_ids = other.get_ancestry(version_ids)
761
pending_graph = [(version, other.get_parents(version)) for
762
version in version_ids]
763
for name in topo_sort(pending_graph):
764
other_idx = other._name_map[name]
765
# returns True if we have it, False if we need it.
766
if not self._check_version_consistent(other, other_idx, name):
767
names_to_join.append((other_idx, name))
776
for other_idx, name in names_to_join:
777
# TODO: If all the parents of the other version are already
778
# present then we can avoid some work by just taking the delta
779
# and adjusting the offsets.
780
new_parents = self._imported_parents(other, other_idx)
781
sha1 = other._sha1s[other_idx]
786
pb.update(msg, merged, len(names_to_join))
788
lines = other.get_lines(other_idx)
789
self._add(name, lines, new_parents, sha1)
791
mutter("merged = %d, processed = %d, file_id=%s; deltat=%d"%(
792
merged, processed, self._weave_name, time.time()-time0))
847
794
def _imported_parents(self, other, other_idx):
848
795
"""Return list of parents in self corresponding to indexes in other."""
907
854
WEAVE_SUFFIX = '.weave'
909
def __init__(self, name, transport, filemode=None, create=False, access_mode='w', get_scope=None):
856
def __init__(self, name, transport, filemode=None, create=False, access_mode='w'):
910
857
"""Create a WeaveFile.
912
859
:param create: If not True, only open an existing knit.
914
super(WeaveFile, self).__init__(name, access_mode, get_scope=get_scope,
915
allow_reserved=False)
861
super(WeaveFile, self).__init__(name, access_mode)
916
862
self._transport = transport
917
863
self._filemode = filemode
882
def _clone_text(self, new_version_id, old_version_id, parents):
883
"""See VersionedFile.clone_text."""
884
super(WeaveFile, self)._clone_text(new_version_id, old_version_id, parents)
936
887
def copy_to(self, name, transport):
937
888
"""See VersionedFile.copy_to()."""
938
889
# as we are all in memory always, just serialise to the new place.
942
893
transport.put_file(name + WeaveFile.WEAVE_SUFFIX, sio, self._filemode)
895
def create_empty(self, name, transport, filemode=None):
896
return WeaveFile(name, transport, filemode, create=True)
945
899
"""Save the weave."""
946
900
self._check_write_ok()
948
902
write_weave_v5(self, sio)
950
bytes = sio.getvalue()
951
path = self._weave_name + WeaveFile.WEAVE_SUFFIX
953
self._transport.put_bytes(path, bytes, self._filemode)
954
except errors.NoSuchFile:
955
self._transport.mkdir(dirname(path))
956
self._transport.put_bytes(path, bytes, self._filemode)
904
self._transport.put_file(self._weave_name + WeaveFile.WEAVE_SUFFIX,
959
909
def get_suffixes():
960
910
"""See VersionedFile.get_suffixes()."""
961
911
return [WeaveFile.WEAVE_SUFFIX]
963
def insert_record_stream(self, stream):
964
super(WeaveFile, self).insert_record_stream(stream)
967
@deprecated_method(one_five)
968
913
def join(self, other, pb=None, msg=None, version_ids=None,
969
914
ignore_missing=False):
970
915
"""Join other into self and save."""
1234
1179
if __name__ == '__main__':
1236
1181
sys.exit(main(sys.argv))
1184
class InterWeave(InterVersionedFile):
1185
"""Optimised code paths for weave to weave operations."""
1187
_matching_file_from_factory = staticmethod(WeaveFile)
1188
_matching_file_to_factory = staticmethod(WeaveFile)
1191
def is_compatible(source, target):
1192
"""Be compatible with weaves."""
1194
return (isinstance(source, Weave) and
1195
isinstance(target, Weave))
1196
except AttributeError:
1199
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
1200
"""See InterVersionedFile.join."""
1201
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
1202
if self.target.versions() == [] and version_ids is None:
1203
self.target._copy_weave_content(self.source)
1205
self.target._join(self.source, pb, msg, version_ids, ignore_missing)
1208
InterVersionedFile.register_optimiser(InterWeave)