71
71
from copy import copy
72
72
from cStringIO import StringIO
77
from bzrlib.lazy_import import lazy_import
78
lazy_import(globals(), """
79
from bzrlib import tsort
81
78
from bzrlib import (
81
from bzrlib.trace import mutter
86
82
from bzrlib.errors import (WeaveError, WeaveFormatError, WeaveParentMismatch,
87
83
RevisionAlreadyPresent,
88
84
RevisionNotPresent,
89
UnavailableRepresentation,
90
85
WeaveRevisionAlreadyPresent,
91
86
WeaveRevisionNotPresent,
93
from bzrlib.osutils import dirname, sha, sha_strings, split_lines
88
import bzrlib.errors as errors
89
from bzrlib.osutils import sha_strings
94
90
import bzrlib.patiencediff
95
from bzrlib.revision import NULL_REVISION
96
from bzrlib.symbol_versioning import *
97
from bzrlib.trace import mutter
98
from bzrlib.versionedfile import (
91
from bzrlib.tsort import topo_sort
92
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
104
93
from bzrlib.weavefile import _read_weave_v5, write_weave_v5
107
class WeaveContentFactory(ContentFactory):
108
"""Content factory for streaming from weaves.
110
:seealso ContentFactory:
113
def __init__(self, version, weave):
114
"""Create a WeaveContentFactory for version from weave."""
115
ContentFactory.__init__(self)
116
self.sha1 = weave.get_sha1s([version])[version]
117
self.key = (version,)
118
parents = weave.get_parent_map([version])[version]
119
self.parents = tuple((parent,) for parent in parents)
120
self.storage_kind = 'fulltext'
123
def get_bytes_as(self, storage_kind):
124
if storage_kind == 'fulltext':
125
return self._weave.get_text(self.key[-1])
126
elif storage_kind == 'chunked':
127
return self._weave.get_lines(self.key[-1])
129
raise UnavailableRepresentation(self.key, storage_kind, 'fulltext')
132
96
class Weave(VersionedFile):
133
97
"""weave - versioned text file storage.
221
185
__slots__ = ['_weave', '_parents', '_sha1s', '_names', '_name_map',
222
'_weave_name', '_matcher', '_allow_reserved']
224
def __init__(self, weave_name=None, access_mode='w', matcher=None,
225
get_scope=None, allow_reserved=False):
228
:param get_scope: A callable that returns an opaque object to be used
229
for detecting when this weave goes out of scope (should stop
230
answering requests or allowing mutation).
232
super(Weave, self).__init__()
186
'_weave_name', '_matcher']
188
def __init__(self, weave_name=None, access_mode='w', matcher=None):
189
super(Weave, self).__init__(access_mode)
234
191
self._parents = []
240
197
self._matcher = bzrlib.patiencediff.PatienceSequenceMatcher
242
199
self._matcher = matcher
243
if get_scope is None:
244
get_scope = lambda:None
245
self._get_scope = get_scope
246
self._scope = get_scope()
247
self._access_mode = access_mode
248
self._allow_reserved = allow_reserved
250
201
def __repr__(self):
251
202
return "Weave(%r)" % self._weave_name
253
def _check_write_ok(self):
254
"""Is the versioned file marked as 'finished' ? Raise if it is."""
255
if self._get_scope() != self._scope:
256
raise errors.OutSideTransaction()
257
if self._access_mode != 'w':
258
raise errors.ReadOnlyObjectDirtiedError(self)
261
205
"""Return a deep copy of self.
303
246
__contains__ = has_version
305
def get_record_stream(self, versions, ordering, include_delta_closure):
306
"""Get a stream of records for versions.
308
:param versions: The versions to include. Each version is a tuple
310
:param ordering: Either 'unordered' or 'topological'. A topologically
311
sorted stream has compression parents strictly before their
313
:param include_delta_closure: If True then the closure across any
314
compression parents will be included (in the opaque data).
315
:return: An iterator of ContentFactory objects, each of which is only
316
valid until the iterator is advanced.
318
versions = [version[-1] for version in versions]
319
if ordering == 'topological':
320
parents = self.get_parent_map(versions)
321
new_versions = tsort.topo_sort(parents)
322
new_versions.extend(set(versions).difference(set(parents)))
323
versions = new_versions
324
for version in versions:
326
yield WeaveContentFactory(version, self)
328
yield AbsentContentFactory((version,))
330
def get_parent_map(self, version_ids):
331
"""See VersionedFile.get_parent_map."""
333
for version_id in version_ids:
334
if version_id == NULL_REVISION:
339
map(self._idx_to_name,
340
self._parents[self._lookup(version_id)]))
341
except RevisionNotPresent:
343
result[version_id] = parents
346
def get_parents_with_ghosts(self, version_id):
347
raise NotImplementedError(self.get_parents_with_ghosts)
349
def insert_record_stream(self, stream):
350
"""Insert a record stream into this versioned file.
352
:param stream: A stream of records to insert.
354
:seealso VersionedFile.get_record_stream:
357
for record in stream:
358
# Raise an error when a record is missing.
359
if record.storage_kind == 'absent':
360
raise RevisionNotPresent([record.key[0]], self)
361
# adapt to non-tuple interface
362
parents = [parent[0] for parent in record.parents]
363
if (record.storage_kind == 'fulltext'
364
or record.storage_kind == 'chunked'):
365
self.add_lines(record.key[0], parents,
366
osutils.chunks_to_lines(record.get_bytes_as('chunked')))
368
adapter_key = record.storage_kind, 'fulltext'
370
adapter = adapters[adapter_key]
372
adapter_factory = adapter_registry.get(adapter_key)
373
adapter = adapter_factory(self)
374
adapters[adapter_key] = adapter
375
lines = split_lines(adapter.get_bytes(
376
record, record.get_bytes_as(record.storage_kind)))
378
self.add_lines(record.key[0], parents, lines)
379
except RevisionAlreadyPresent:
248
def get_parents(self, version_id):
249
"""See VersionedFile.get_parent."""
250
return map(self._idx_to_name, self._parents[self._lookup(version_id)])
382
252
def _check_repeated_add(self, name, parents, text, sha1):
383
253
"""Check that a duplicated add is OK.
562
444
return len(other_parents.difference(my_parents)) == 0
564
def annotate(self, version_id):
565
"""Return a list of (version-id, line) tuples for version_id.
446
def annotate_iter(self, version_id):
447
"""Yield list of (version-id, line) pairs for the specified version.
567
449
The index indicates when the line originated in the weave."""
568
450
incls = [self._lookup(version_id)]
569
return [(self._idx_to_name(origin), text) for origin, lineno, text in
570
self._extract(incls)]
451
for origin, lineno, text in self._extract(incls):
452
yield self._idx_to_name(origin), text
572
454
def iter_lines_added_or_present_in_versions(self, version_ids=None,
770
660
expected_sha1, measured_sha1))
663
def get_sha1(self, version_id):
664
"""See VersionedFile.get_sha1()."""
665
return self._sha1s[self._lookup(version_id)]
773
667
def get_sha1s(self, version_ids):
774
668
"""See VersionedFile.get_sha1s()."""
776
for v in version_ids:
777
result[v] = self._sha1s[self._lookup(v)]
669
return [self._sha1s[self._lookup(v)] for v in version_ids]
780
671
def num_versions(self):
781
672
"""How many versions are in this weave?"""
782
673
l = len(self._parents)
674
assert l == len(self._sha1s)
785
677
__len__ = num_versions
805
697
# For creating the ancestry, IntSet is much faster (3.7s vs 0.17s)
806
698
# The problem is that set membership is much more expensive
807
699
name = self._idx_to_name(i)
700
sha1s[name] = sha.new()
810
702
new_inc = set([name])
811
703
for p in self._parents[i]:
812
704
new_inc.update(inclusions[self._idx_to_name(p)])
814
if set(new_inc) != set(self.get_ancestry(name)):
815
raise AssertionError(
817
% (set(new_inc), set(self.get_ancestry(name))))
706
assert set(new_inc) == set(self.get_ancestry(name)), \
707
'failed %s != %s' % (set(new_inc), set(self.get_ancestry(name)))
818
708
inclusions[name] = new_inc
820
710
nlines = len(self._weave)
850
740
# no lines outside of insertion blocks, that deletions are
851
741
# properly paired, etc.
743
def _join(self, other, pb, msg, version_ids, ignore_missing):
744
"""Worker routine for join()."""
745
if not other.versions():
746
return # nothing to update, easy
749
# versions is never none, InterWeave checks this.
752
# two loops so that we do not change ourselves before verifying it
754
# work through in index order to make sure we get all dependencies
757
# get the selected versions only that are in other.versions.
758
version_ids = set(other.versions()).intersection(set(version_ids))
759
# pull in the referenced graph.
760
version_ids = other.get_ancestry(version_ids)
761
pending_graph = [(version, other.get_parents(version)) for
762
version in version_ids]
763
for name in topo_sort(pending_graph):
764
other_idx = other._name_map[name]
765
# returns True if we have it, False if we need it.
766
if not self._check_version_consistent(other, other_idx, name):
767
names_to_join.append((other_idx, name))
776
for other_idx, name in names_to_join:
777
# TODO: If all the parents of the other version are already
778
# present then we can avoid some work by just taking the delta
779
# and adjusting the offsets.
780
new_parents = self._imported_parents(other, other_idx)
781
sha1 = other._sha1s[other_idx]
786
pb.update(msg, merged, len(names_to_join))
788
lines = other.get_lines(other_idx)
789
self._add(name, lines, new_parents, sha1)
791
mutter("merged = %d, processed = %d, file_id=%s; deltat=%d"%(
792
merged, processed, self._weave_name, time.time()-time0))
853
794
def _imported_parents(self, other, other_idx):
854
795
"""Return list of parents in self corresponding to indexes in other."""
913
854
WEAVE_SUFFIX = '.weave'
915
def __init__(self, name, transport, filemode=None, create=False, access_mode='w', get_scope=None):
856
def __init__(self, name, transport, filemode=None, create=False, access_mode='w'):
916
857
"""Create a WeaveFile.
918
859
:param create: If not True, only open an existing knit.
920
super(WeaveFile, self).__init__(name, access_mode, get_scope=get_scope,
921
allow_reserved=False)
861
super(WeaveFile, self).__init__(name, access_mode)
922
862
self._transport = transport
923
863
self._filemode = filemode
882
def _clone_text(self, new_version_id, old_version_id, parents):
883
"""See VersionedFile.clone_text."""
884
super(WeaveFile, self)._clone_text(new_version_id, old_version_id, parents)
942
887
def copy_to(self, name, transport):
943
888
"""See VersionedFile.copy_to()."""
944
889
# as we are all in memory always, just serialise to the new place.
948
893
transport.put_file(name + WeaveFile.WEAVE_SUFFIX, sio, self._filemode)
895
def create_empty(self, name, transport, filemode=None):
896
return WeaveFile(name, transport, filemode, create=True)
951
899
"""Save the weave."""
952
900
self._check_write_ok()
954
902
write_weave_v5(self, sio)
956
bytes = sio.getvalue()
957
path = self._weave_name + WeaveFile.WEAVE_SUFFIX
959
self._transport.put_bytes(path, bytes, self._filemode)
960
except errors.NoSuchFile:
961
self._transport.mkdir(dirname(path))
962
self._transport.put_bytes(path, bytes, self._filemode)
904
self._transport.put_file(self._weave_name + WeaveFile.WEAVE_SUFFIX,
965
909
def get_suffixes():
966
910
"""See VersionedFile.get_suffixes()."""
967
911
return [WeaveFile.WEAVE_SUFFIX]
969
def insert_record_stream(self, stream):
970
super(WeaveFile, self).insert_record_stream(stream)
973
@deprecated_method(one_five)
974
913
def join(self, other, pb=None, msg=None, version_ids=None,
975
914
ignore_missing=False):
976
915
"""Join other into self and save."""
1240
1179
if __name__ == '__main__':
1242
1181
sys.exit(main(sys.argv))
1184
class InterWeave(InterVersionedFile):
1185
"""Optimised code paths for weave to weave operations."""
1187
_matching_file_from_factory = staticmethod(WeaveFile)
1188
_matching_file_to_factory = staticmethod(WeaveFile)
1191
def is_compatible(source, target):
1192
"""Be compatible with weaves."""
1194
return (isinstance(source, Weave) and
1195
isinstance(target, Weave))
1196
except AttributeError:
1199
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
1200
"""See InterVersionedFile.join."""
1201
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
1202
if self.target.versions() == [] and version_ids is None:
1203
self.target._copy_weave_content(self.source)
1205
self.target._join(self.source, pb, msg, version_ids, ignore_missing)
1208
InterVersionedFile.register_optimiser(InterWeave)