155
175
internal representation is of the format:
156
176
(revid, plaintext)
160
origin, text = line.split(' ', 1)
161
lines.append((origin.decode('utf-8'), text))
178
# TODO: jam 20070209 The tests expect this to be returned as tuples,
179
# but the code itself doesn't really depend on that.
180
# Figure out a way to not require the overhead of turning the
181
# list back into tuples.
182
lines = [tuple(line.split(' ', 1)) for line in content]
162
183
return KnitContent(lines)
164
185
def parse_line_delta_iter(self, lines):
165
for result_item in self.parse_line_delta[lines]:
186
return iter(self.parse_line_delta(lines))
168
def parse_line_delta(self, lines, version):
188
def parse_line_delta(self, lines, version_id):
169
189
"""Convert a line based delta into internal representation.
171
191
line delta is in the form of:
172
192
intstart intend intcount
174
194
revid(utf8) newline\n
175
internal represnetation is
195
internal representation is
176
196
(start, end, count, [1..count tuples (revid, newline)])
179
199
lines = iter(lines)
180
200
next = lines.next
203
def cache_and_return(line):
204
origin, text = line.split(' ', 1)
205
return cache.setdefault(origin, origin), text
181
207
# walk through the lines parsing.
182
208
for header in lines:
183
209
start, end, count = [int(n) for n in header.split(',')]
210
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
211
result.append((start, end, count, contents))
214
def get_fulltext_content(self, lines):
215
"""Extract just the content lines from a fulltext."""
216
return (line.split(' ', 1)[1] for line in lines)
218
def get_linedelta_content(self, lines):
219
"""Extract just the content from a line delta.
221
This doesn't return all of the extra information stored in a delta.
222
Only the actual content lines.
227
header = header.split(',')
228
count = int(header[2])
229
for i in xrange(count):
187
230
origin, text = next().split(' ', 1)
189
contents.append((origin.decode('utf-8'), text))
190
result.append((start, end, count, contents))
193
233
def lower_fulltext(self, content):
194
234
"""convert a fulltext content record into a serializable form.
196
236
see parse_fulltext which this inverts.
198
return ['%s %s' % (o.encode('utf-8'), t) for o, t in content._lines]
238
# TODO: jam 20070209 We only do the caching thing to make sure that
239
# the origin is a valid utf-8 line, eventually we could remove it
240
return ['%s %s' % (o, t) for o, t in content._lines]
200
242
def lower_line_delta(self, delta):
201
243
"""convert a delta into a serializable form.
203
245
See parse_line_delta which this inverts.
247
# TODO: jam 20070209 We only do the caching thing to make sure that
248
# the origin is a valid utf-8 line, eventually we could remove it
206
250
for start, end, c, lines in delta:
207
251
out.append('%d,%d,%d\n' % (start, end, c))
208
for origin, text in lines:
209
out.append('%s %s' % (origin.encode('utf-8'), text))
252
out.extend(origin + ' ' + text
253
for origin, text in lines)
216
260
annotated = False
218
def parse_fulltext(self, content, version):
262
def parse_fulltext(self, content, version_id):
219
263
"""This parses an unannotated fulltext.
221
265
Note that this is not a noop - the internal representation
222
266
has (versionid, line) - its just a constant versionid.
224
return self.make(content, version)
268
return self.make(content, version_id)
226
def parse_line_delta_iter(self, lines, version):
228
header = lines.pop(0)
270
def parse_line_delta_iter(self, lines, version_id):
272
num_lines = len(lines)
273
while cur < num_lines:
229
276
start, end, c = [int(n) for n in header.split(',')]
230
yield start, end, c, zip([version] * c, lines[:c])
233
def parse_line_delta(self, lines, version):
234
return list(self.parse_line_delta_iter(lines, version))
277
yield start, end, c, zip([version_id] * c, lines[cur:cur+c])
280
def parse_line_delta(self, lines, version_id):
281
return list(self.parse_line_delta_iter(lines, version_id))
283
def get_fulltext_content(self, lines):
284
"""Extract just the content lines from a fulltext."""
287
def get_linedelta_content(self, lines):
288
"""Extract just the content from a line delta.
290
This doesn't return all of the extra information stored in a delta.
291
Only the actual content lines.
296
header = header.split(',')
297
count = int(header[2])
298
for i in xrange(count):
236
301
def lower_fulltext(self, content):
237
302
return content.text()
265
330
stored and retrieved.
268
def __init__(self, relpath, transport, file_mode=None, access_mode=None, factory=None,
269
basis_knit=None, delta=True, create=False):
333
def __init__(self, relpath, transport, file_mode=None, access_mode=None,
334
factory=None, basis_knit=DEPRECATED_PARAMETER, delta=True,
335
create=False, create_parent_dir=False, delay_create=False,
270
337
"""Construct a knit at location specified by relpath.
272
339
:param create: If not True, only open an existing knit.
340
:param create_parent_dir: If True, create the parent directory if
341
creating the file fails. (This is used for stores with
342
hash-prefixes that may not exist yet)
343
:param delay_create: The calling code is aware that the knit won't
344
actually be created until the first data is stored.
346
if deprecated_passed(basis_knit):
347
warnings.warn("KnitVersionedFile.__(): The basis_knit parameter is"
348
" deprecated as of bzr 0.9.",
349
DeprecationWarning, stacklevel=2)
274
350
if access_mode is None:
275
351
access_mode = 'w'
276
352
super(KnitVersionedFile, self).__init__(access_mode)
277
353
assert access_mode in ('r', 'w'), "invalid mode specified %r" % access_mode
278
assert not basis_knit or isinstance(basis_knit, KnitVersionedFile), \
281
354
self.transport = transport
282
355
self.filename = relpath
283
self.basis_knit = basis_knit
284
356
self.factory = factory or KnitAnnotateFactory()
285
357
self.writable = (access_mode == 'w')
286
358
self.delta = delta
360
self._max_delta_chain = 200
288
362
self._index = _KnitIndex(transport, relpath + INDEX_SUFFIX,
289
access_mode, create=create, file_mode=file_mode)
363
access_mode, create=create, file_mode=file_mode,
364
create_parent_dir=create_parent_dir, delay_create=delay_create,
290
366
self._data = _KnitData(transport, relpath + DATA_SUFFIX,
291
access_mode, create=create and not len(self), file_mode=file_mode)
367
access_mode, create=create and not len(self), file_mode=file_mode,
368
create_parent_dir=create_parent_dir, delay_create=delay_create,
293
371
def __repr__(self):
294
372
return '%s(%s)' % (self.__class__.__name__,
295
373
self.transport.abspath(self.filename))
375
def _check_should_delta(self, first_parents):
376
"""Iterate back through the parent listing, looking for a fulltext.
378
This is used when we want to decide whether to add a delta or a new
379
fulltext. It searches for _max_delta_chain parents. When it finds a
380
fulltext parent, it sees if the total size of the deltas leading up to
381
it is large enough to indicate that we want a new full text anyway.
383
Return True if we should create a new delta, False if we should use a
388
delta_parents = first_parents
389
for count in xrange(self._max_delta_chain):
390
parent = delta_parents[0]
391
method = self._index.get_method(parent)
392
pos, size = self._index.get_position(parent)
393
if method == 'fulltext':
397
delta_parents = self._index.get_parents(parent)
399
# We couldn't find a fulltext, so we must create a new one
402
return fulltext_size > delta_size
297
404
def _add_delta(self, version_id, parents, delta_parent, sha1, noeol, delta):
298
405
"""See VersionedFile._add_delta()."""
299
406
self._check_add(version_id, []) # should we check the lines ?
380
488
"""See VersionedFile.copy_to()."""
381
489
# copy the current index to a temp index to avoid racing with local
383
transport.put(name + INDEX_SUFFIX + '.tmp', self.transport.get(self._index._filename),)
491
transport.put_file_non_atomic(name + INDEX_SUFFIX + '.tmp',
492
self.transport.get(self._index._filename))
384
493
# copy the data file
385
transport.put(name + DATA_SUFFIX, self._data._open_file())
386
# rename the copied index into place
387
transport.rename(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
494
f = self._data._open_file()
496
transport.put_file(name + DATA_SUFFIX, f)
499
# move the copied index into place
500
transport.move(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
389
502
def create_empty(self, name, transport, mode=None):
390
return KnitVersionedFile(name, transport, factory=self.factory, delta=self.delta, create=True)
503
return KnitVersionedFile(name, transport, factory=self.factory,
504
delta=self.delta, create=True)
392
def _fix_parents(self, version, new_parents):
506
def _fix_parents(self, version_id, new_parents):
393
507
"""Fix the parents list for version.
395
509
This is done by appending a new version to the index
510
631
diff_hunks.append((op[1], op[2], op[4]-op[3], new_content._lines[op[3]:op[4]]))
511
632
return diff_hunks
513
def _get_components(self, version_id):
514
"""Return a list of (version_id, method, data) tuples that
515
makes up version specified by version_id of the knit.
517
The components should be applied in the order of the returned
520
The basis knit will be used to the largest extent possible
521
since it is assumed that accesses to it is faster.
634
def _get_components_positions(self, version_ids):
635
"""Produce a map of position data for the components of versions.
637
This data is intended to be used for retrieving the knit records.
639
A dict of version_id to (method, data_pos, data_size, next) is
641
method is the way referenced data should be applied.
642
data_pos is the position of the data in the knit.
643
data_size is the size of the data in the knit.
644
next is the build-parent of the version, or None for fulltexts.
524
# 4168 calls in 14912, 2289 internal
525
# 4168 in 9711 to read_records
526
# 52554 in 1250 to get_parents
527
# 170166 in 865 to list.append
529
# needed_revisions holds a list of (method, version_id) of
530
# versions that is needed to be fetched to construct the final
531
# version of the file.
533
# basis_revisions is a list of versions that needs to be
534
# fetched but exists in the basis knit.
536
basis = self.basis_knit
543
if basis and basis._index.has_version(cursor):
545
basis_versions.append(cursor)
546
method = picked_knit._index.get_method(cursor)
547
needed_versions.append((method, cursor))
548
if method == 'fulltext':
550
cursor = picked_knit.get_parents(cursor)[0]
555
for comp_id in basis_versions:
556
data_pos, data_size = basis._index.get_data_position(comp_id)
557
records.append((piece_id, data_pos, data_size))
558
components.update(basis._data.read_records(records))
561
for comp_id in [vid for method, vid in needed_versions
562
if vid not in basis_versions]:
563
data_pos, data_size = self._index.get_position(comp_id)
564
records.append((comp_id, data_pos, data_size))
565
components.update(self._data.read_records(records))
567
# get_data_records returns a mapping with the version id as
568
# index and the value as data. The order the components need
569
# to be applied is held by needed_versions (reversed).
571
for method, comp_id in reversed(needed_versions):
572
out.append((comp_id, method, components[comp_id]))
647
for version_id in version_ids:
650
while cursor is not None and cursor not in component_data:
651
method = self._index.get_method(cursor)
652
if method == 'fulltext':
655
next = self.get_parents(cursor)[0]
656
data_pos, data_size = self._index.get_position(cursor)
657
component_data[cursor] = (method, data_pos, data_size, next)
659
return component_data
576
661
def _get_content(self, version_id, parent_texts={}):
577
662
"""Returns a content object that makes up the specified
583
668
if cached_version is not None:
584
669
return cached_version
586
if self.basis_knit and version_id in self.basis_knit:
587
return self.basis_knit._get_content(version_id)
590
components = self._get_components(version_id)
591
for component_id, method, (data, digest) in components:
592
version_idx = self._index.lookup(component_id)
593
if method == 'fulltext':
594
assert content is None
595
content = self.factory.parse_fulltext(data, version_idx)
596
elif method == 'line-delta':
597
delta = self.factory.parse_line_delta(data, version_idx)
598
content._lines = self._apply_delta(content._lines, delta)
600
if 'no-eol' in self._index.get_options(version_id):
601
line = content._lines[-1][1].rstrip('\n')
602
content._lines[-1] = (content._lines[-1][0], line)
604
# digest here is the digest from the last applied component.
605
if sha_strings(content.text()) != digest:
606
import pdb;pdb.set_trace()
607
raise KnitCorrupt(self.filename, 'sha-1 does not match %s' % version_id)
671
text_map, contents_map = self._get_content_maps([version_id])
672
return contents_map[version_id]
611
674
def _check_versions_present(self, version_ids):
612
675
"""Check that all specified versions are present."""
613
version_ids = set(version_ids)
614
for r in list(version_ids):
615
if self._index.has_version(r):
616
version_ids.remove(r)
618
raise RevisionNotPresent(list(version_ids)[0], self.filename)
676
self._index.check_versions_present(version_ids)
620
678
def _add_lines_with_ghosts(self, version_id, parents, lines, parent_texts):
621
679
"""See VersionedFile.add_lines_with_ghosts()."""
718
768
def _clone_text(self, new_version_id, old_version_id, parents):
719
769
"""See VersionedFile.clone_text()."""
720
# FIXME RBC 20060228 make fast by only inserting an index with null delta.
770
# FIXME RBC 20060228 make fast by only inserting an index with null
721
772
self.add_lines(new_version_id, parents, self.get_lines(old_version_id))
723
774
def get_lines(self, version_id):
724
775
"""See VersionedFile.get_lines()."""
725
return self._get_content(version_id).text()
727
def iter_lines_added_or_present_in_versions(self, version_ids=None):
776
return self.get_line_list([version_id])[0]
778
def _get_record_map(self, version_ids):
779
"""Produce a dictionary of knit records.
781
The keys are version_ids, the values are tuples of (method, content,
783
method is the way the content should be applied.
784
content is a KnitContent object.
785
digest is the SHA1 digest of this version id after all steps are done
786
next is the build-parent of the version, i.e. the leftmost ancestor.
787
If the method is fulltext, next will be None.
789
position_map = self._get_components_positions(version_ids)
790
# c = component_id, m = method, p = position, s = size, n = next
791
records = [(c, p, s) for c, (m, p, s, n) in position_map.iteritems()]
793
for component_id, content, digest in \
794
self._data.read_records_iter(records):
795
method, position, size, next = position_map[component_id]
796
record_map[component_id] = method, content, digest, next
800
def get_text(self, version_id):
801
"""See VersionedFile.get_text"""
802
return self.get_texts([version_id])[0]
804
def get_texts(self, version_ids):
805
return [''.join(l) for l in self.get_line_list(version_ids)]
807
def get_line_list(self, version_ids):
808
"""Return the texts of listed versions as a list of strings."""
809
version_ids = [osutils.safe_revision_id(v) for v in version_ids]
810
for version_id in version_ids:
811
self.check_not_reserved_id(version_id)
812
text_map, content_map = self._get_content_maps(version_ids)
813
return [text_map[v] for v in version_ids]
815
def _get_content_maps(self, version_ids):
816
"""Produce maps of text and KnitContents
818
:return: (text_map, content_map) where text_map contains the texts for
819
the requested versions and content_map contains the KnitContents.
820
Both dicts take version_ids as their keys.
822
for version_id in version_ids:
823
if not self.has_version(version_id):
824
raise RevisionNotPresent(version_id, self.filename)
825
record_map = self._get_record_map(version_ids)
830
for version_id in version_ids:
833
while cursor is not None:
834
method, data, digest, next = record_map[cursor]
835
components.append((cursor, method, data, digest))
836
if cursor in content_map:
841
for component_id, method, data, digest in reversed(components):
842
if component_id in content_map:
843
content = content_map[component_id]
845
if method == 'fulltext':
846
assert content is None
847
content = self.factory.parse_fulltext(data, version_id)
848
elif method == 'line-delta':
849
delta = self.factory.parse_line_delta(data, version_id)
850
content = content.copy()
851
content._lines = self._apply_delta(content._lines,
853
content_map[component_id] = content
855
if 'no-eol' in self._index.get_options(version_id):
856
content = content.copy()
857
line = content._lines[-1][1].rstrip('\n')
858
content._lines[-1] = (content._lines[-1][0], line)
859
final_content[version_id] = content
861
# digest here is the digest from the last applied component.
862
text = content.text()
863
if sha_strings(text) != digest:
864
raise KnitCorrupt(self.filename,
865
'sha-1 does not match %s' % version_id)
867
text_map[version_id] = text
868
return text_map, final_content
870
def iter_lines_added_or_present_in_versions(self, version_ids=None,
728
872
"""See VersionedFile.iter_lines_added_or_present_in_versions()."""
729
873
if version_ids is None:
730
874
version_ids = self.versions()
731
# we dont care about inclusions, the caller cares.
876
version_ids = [osutils.safe_revision_id(v) for v in version_ids]
878
pb = progress.DummyProgress()
879
# we don't care about inclusions, the caller cares.
732
880
# but we need to setup a list of records to visit.
733
881
# we need version_id, position, length
734
882
version_id_records = []
735
requested_versions = list(version_ids)
883
requested_versions = set(version_ids)
736
884
# filter for available versions
737
885
for version_id in requested_versions:
738
886
if not self.has_version(version_id):
739
887
raise RevisionNotPresent(version_id, self.filename)
740
888
# get a in-component-order queue:
742
889
for version_id in self.versions():
743
890
if version_id in requested_versions:
744
version_ids.append(version_id)
745
891
data_pos, length = self._index.get_position(version_id)
746
892
version_id_records.append((version_id, data_pos, length))
748
pb = bzrlib.ui.ui_factory.nested_progress_bar()
750
894
total = len(version_id_records)
752
pb.update('Walking content.', count, total)
753
for version_id, data, sha_value in \
754
self._data.read_records_iter(version_id_records):
755
pb.update('Walking content.', count, total)
756
method = self._index.get_method(version_id)
757
version_idx = self._index.lookup(version_id)
758
assert method in ('fulltext', 'line-delta')
759
if method == 'fulltext':
760
content = self.factory.parse_fulltext(data, version_idx)
761
for line in content.text():
764
delta = self.factory.parse_line_delta(data, version_idx)
765
for start, end, count, lines in delta:
766
for origin, line in lines:
769
pb.update('Walking content.', total, total)
772
pb.update('Walking content.', total, total)
895
for version_idx, (version_id, data, sha_value) in \
896
enumerate(self._data.read_records_iter(version_id_records)):
897
pb.update('Walking content.', version_idx, total)
898
method = self._index.get_method(version_id)
900
assert method in ('fulltext', 'line-delta')
901
if method == 'fulltext':
902
line_iterator = self.factory.get_fulltext_content(data)
904
line_iterator = self.factory.get_linedelta_content(data)
905
for line in line_iterator:
908
pb.update('Walking content.', total, total)
776
910
def num_versions(self):
777
911
"""See VersionedFile.num_versions()."""
884
1023
class _KnitComponentFile(object):
885
1024
"""One of the files used to implement a knit database"""
887
def __init__(self, transport, filename, mode, file_mode=None):
1026
def __init__(self, transport, filename, mode, file_mode=None,
1027
create_parent_dir=False, dir_mode=None):
888
1028
self._transport = transport
889
1029
self._filename = filename
890
1030
self._mode = mode
891
self._file_mode=file_mode
1031
self._file_mode = file_mode
1032
self._dir_mode = dir_mode
1033
self._create_parent_dir = create_parent_dir
1034
self._need_to_create = False
893
def write_header(self):
894
if self._transport.append(self._filename, StringIO(self.HEADER),
895
mode=self._file_mode):
896
raise KnitCorrupt(self._filename, 'misaligned after writing header')
1036
def _full_path(self):
1037
"""Return the full path to this file."""
1038
return self._transport.base + self._filename
898
1040
def check_header(self, fp):
899
1041
line = fp.readline()
1043
# An empty file can actually be treated as though the file doesn't
1045
raise errors.NoSuchFile(self._full_path())
900
1046
if line != self.HEADER:
901
raise KnitHeaderError(badline=line)
1047
raise KnitHeaderError(badline=line,
1048
filename=self._transport.abspath(self._filename))
903
1050
def commit(self):
904
1051
"""Commit is a nop."""
979
1126
self._history.append(version_id)
981
1128
index = self._cache[version_id][5]
982
self._cache[version_id] = (version_id,
1129
self._cache[version_id] = (version_id,
989
def __init__(self, transport, filename, mode, create=False, file_mode=None):
990
_KnitComponentFile.__init__(self, transport, filename, mode, file_mode)
1136
def __init__(self, transport, filename, mode, create=False, file_mode=None,
1137
create_parent_dir=False, delay_create=False, dir_mode=None):
1138
_KnitComponentFile.__init__(self, transport, filename, mode,
1139
file_mode=file_mode,
1140
create_parent_dir=create_parent_dir,
991
1142
self._cache = {}
992
1143
# position in _history is the 'official' index for a revision
993
1144
# but the values may have come from a newer entry.
994
# so - wc -l of a knit index is != the number of uniqe names
1145
# so - wc -l of a knit index is != the number of unique names
996
1147
self._history = []
997
pb = bzrlib.ui.ui_factory.nested_progress_bar()
1149
fp = self._transport.get(self._filename)
1002
pb.update('read knit index', count, total)
1003
fp = self._transport.get(self._filename)
1004
self.check_header(fp)
1005
# readlines reads the whole file at once:
1006
# bad for transports like http, good for local disk
1007
# we save 60 ms doing this one change (
1008
# from calling readline each time to calling
1010
# probably what we want for nice behaviour on
1011
# http is a incremental readlines that yields, or
1012
# a check for local vs non local indexes,
1013
for l in fp.readlines():
1015
if len(rec) < 5 or rec[-1] != ':':
1017
# FIXME: in the future we should determine if its a
1018
# short write - and ignore it
1019
# or a different failure, and raise. RBC 20060407
1023
#pb.update('read knit index', count, total)
1024
# See self._parse_parents
1026
for value in rec[4:-1]:
1028
# uncompressed reference
1029
parents.append(value[1:])
1031
# this is 15/4000ms faster than isinstance,
1033
# this function is called thousands of times a
1034
# second so small variations add up.
1035
assert value.__class__ is str
1036
parents.append(self._history[int(value)])
1037
# end self._parse_parents
1038
# self._cache_version(rec[0],
1039
# rec[1].split(','),
1043
# --- self._cache_version
1044
# only want the _history index to reference the 1st
1045
# index entry for version_id
1047
if version_id not in self._cache:
1048
index = len(self._history)
1049
self._history.append(version_id)
1051
index = self._cache[version_id][5]
1052
self._cache[version_id] = (version_id,
1058
# --- self._cache_version
1059
except NoSuchFile, e:
1060
if mode != 'w' or not create:
1064
pb.update('read knit index', total, total)
1067
def _parse_parents(self, compressed_parents):
1068
"""convert a list of string parent values into version ids.
1070
ints are looked up in the index.
1071
.FOO values are ghosts and converted in to FOO.
1073
NOTE: the function is retained here for clarity, and for possible
1074
use in partial index reads. However bulk processing now has
1075
it inlined in __init__ for inner-loop optimisation.
1078
for value in compressed_parents:
1079
if value[-1] == '.':
1080
# uncompressed reference
1081
result.append(value[1:])
1083
# this is 15/4000ms faster than isinstance,
1084
# this function is called thousands of times a
1085
# second so small variations add up.
1086
assert value.__class__ is str
1087
result.append(self._history[int(value)])
1151
# _load_data may raise NoSuchFile if the target knit is
1157
if mode != 'w' or not create:
1160
self._need_to_create = True
1162
self._transport.put_bytes_non_atomic(
1163
self._filename, self.HEADER, mode=self._file_mode)
1165
def _load_data(self, fp):
1167
history = self._history
1169
self.check_header(fp)
1170
# readlines reads the whole file at once:
1171
# bad for transports like http, good for local disk
1172
# we save 60 ms doing this one change (
1173
# from calling readline each time to calling
1175
# probably what we want for nice behaviour on
1176
# http is a incremental readlines that yields, or
1177
# a check for local vs non local indexes,
1178
history_top = len(history) - 1
1179
for line in fp.readlines():
1181
if len(rec) < 5 or rec[-1] != ':':
1183
# FIXME: in the future we should determine if its a
1184
# short write - and ignore it
1185
# or a different failure, and raise. RBC 20060407
1189
for value in rec[4:-1]:
1191
# uncompressed reference
1192
parent_id = value[1:]
1194
parent_id = history[int(value)]
1195
parents.append(parent_id)
1197
version_id, options, pos, size = rec[:4]
1198
version_id = version_id
1200
# See self._cache_version
1201
# only want the _history index to reference the 1st
1202
# index entry for version_id
1203
if version_id not in cache:
1206
history.append(version_id)
1208
index = cache[version_id][5]
1209
cache[version_id] = (version_id,
1215
# end self._cache_version
1090
1217
def get_graph(self):
1092
for version_id, index in self._cache.iteritems():
1093
graph.append((version_id, index[4]))
1218
return [(vid, idx[4]) for vid, idx in self._cache.iteritems()]
1096
1220
def get_ancestry(self, versions):
1097
1221
"""See VersionedFile.get_ancestry."""
1098
1222
# get a graph of all the mentioned versions:
1100
1224
pending = set(versions)
1102
1227
version = pending.pop()
1103
parents = self._cache[version][4]
1104
# got the parents ok
1106
parents = [parent for parent in parents if parent in self._cache]
1107
for parent in parents:
1108
# if not completed and not a ghost
1109
if parent not in graph:
1230
parents = [p for p in cache[version][4] if p in cache]
1232
raise RevisionNotPresent(version, self._filename)
1233
# if not completed and not a ghost
1234
pending.update([p for p in parents if p not in graph])
1111
1235
graph[version] = parents
1112
1236
return topo_sort(graph.items())
1114
1238
def get_ancestry_with_ghosts(self, versions):
1115
1239
"""See VersionedFile.get_ancestry_with_ghosts."""
1116
1240
# get a graph of all the mentioned versions:
1241
self.check_versions_present(versions)
1118
1244
pending = set(versions)
1120
1246
version = pending.pop()
1122
parents = self._cache[version][4]
1248
parents = cache[version][4]
1123
1249
except KeyError:
1124
1250
# ghost, fake it
1125
1251
graph[version] = []
1128
# got the parents ok
1129
for parent in parents:
1130
if parent not in graph:
1254
pending.update([p for p in parents if p not in graph])
1132
1255
graph[version] = parents
1133
1256
return topo_sort(graph.items())
1169
1293
(version_id, options, pos, size, parents).
1172
for version_id, options, pos, size, parents in versions:
1173
line = "\n%s %s %s %s %s :" % (version_id.encode('utf-8'),
1177
self._version_list_to_index(parents))
1178
assert isinstance(line, str), \
1179
'content must be utf-8 encoded: %r' % (line,)
1181
self._transport.append(self._filename, StringIO(''.join(lines)))
1182
# cache after writing, so that a failed write leads to missing cache
1183
# entries not extra ones. XXX TODO: RBC 20060502 in the event of a
1184
# failure, reload the index or flush it or some such, to prevent
1185
# writing records that did complete twice.
1186
for version_id, options, pos, size, parents in versions:
1187
self._cache_version(version_id, options, pos, size, parents)
1296
orig_history = self._history[:]
1297
orig_cache = self._cache.copy()
1300
for version_id, options, pos, size, parents in versions:
1301
line = "\n%s %s %s %s %s :" % (version_id,
1305
self._version_list_to_index(parents))
1306
assert isinstance(line, str), \
1307
'content must be utf-8 encoded: %r' % (line,)
1309
self._cache_version(version_id, options, pos, size, parents)
1310
if not self._need_to_create:
1311
self._transport.append_bytes(self._filename, ''.join(lines))
1314
sio.write(self.HEADER)
1315
sio.writelines(lines)
1317
self._transport.put_file_non_atomic(self._filename, sio,
1318
create_parent_dir=self._create_parent_dir,
1319
mode=self._file_mode,
1320
dir_mode=self._dir_mode)
1321
self._need_to_create = False
1323
# If any problems happen, restore the original values and re-raise
1324
self._history = orig_history
1325
self._cache = orig_cache
1189
1328
def has_version(self, version_id):
1190
1329
"""True if the version is in the index."""
1191
return self._cache.has_key(version_id)
1330
return version_id in self._cache
1193
1332
def get_position(self, version_id):
1194
1333
"""Return data position and size of specified version."""
1195
return (self._cache[version_id][2], \
1196
self._cache[version_id][3])
1334
entry = self._cache[version_id]
1335
return entry[2], entry[3]
1198
1337
def get_method(self, version_id):
1199
1338
"""Return compression method of specified version."""
1213
1353
if parent in self._cache]
1215
1355
def get_parents_with_ghosts(self, version_id):
1216
"""Return parents of specified version wth ghosts."""
1356
"""Return parents of specified version with ghosts."""
1217
1357
return self._cache[version_id][4]
1219
1359
def check_versions_present(self, version_ids):
1220
1360
"""Check that all specified versions are present."""
1221
version_ids = set(version_ids)
1222
for version_id in list(version_ids):
1223
if version_id in self._cache:
1224
version_ids.remove(version_id)
1226
raise RevisionNotPresent(list(version_ids)[0], self.filename)
1362
for version_id in version_ids:
1363
if version_id not in cache:
1364
raise RevisionNotPresent(version_id, self._filename)
1229
1367
class _KnitData(_KnitComponentFile):
1230
1368
"""Contents of the knit data file"""
1232
HEADER = "# bzr knit data 8\n"
1234
def __init__(self, transport, filename, mode, create=False, file_mode=None):
1235
_KnitComponentFile.__init__(self, transport, filename, mode)
1370
def __init__(self, transport, filename, mode, create=False, file_mode=None,
1371
create_parent_dir=False, delay_create=False,
1373
_KnitComponentFile.__init__(self, transport, filename, mode,
1374
file_mode=file_mode,
1375
create_parent_dir=create_parent_dir,
1237
1377
self._checked = False
1378
# TODO: jam 20060713 conceptually, this could spill to disk
1379
# if the cached size gets larger than a certain amount
1380
# but it complicates the model a bit, so for now just use
1381
# a simple dictionary
1383
self._do_cache = False
1239
self._transport.put(self._filename, StringIO(''), mode=file_mode)
1386
self._need_to_create = create
1388
self._transport.put_bytes_non_atomic(self._filename, '',
1389
mode=self._file_mode)
1391
def enable_cache(self):
1392
"""Enable caching of reads."""
1393
self._do_cache = True
1242
1395
def clear_cache(self):
1243
1396
"""Clear the record cache."""
1397
self._do_cache = False
1246
1400
def _open_file(self):
1247
if self._file is None:
1249
self._file = self._transport.get(self._filename)
1402
return self._transport.get(self._filename)
1254
1407
def _record_to_data(self, version_id, digest, lines):
1255
1408
"""Convert version_id, digest, lines into a raw data block.
1276
1431
:return: the offset in the data file raw_data was written.
1278
1433
assert isinstance(raw_data, str), 'data must be plain bytes'
1279
return self._transport.append(self._filename, StringIO(raw_data))
1434
if not self._need_to_create:
1435
return self._transport.append_bytes(self._filename, raw_data)
1437
self._transport.put_bytes_non_atomic(self._filename, raw_data,
1438
create_parent_dir=self._create_parent_dir,
1439
mode=self._file_mode,
1440
dir_mode=self._dir_mode)
1441
self._need_to_create = False
1281
1444
def add_record(self, version_id, digest, lines):
1282
1445
"""Write new text record to disk. Returns the position in the
1283
1446
file where it was written."""
1284
1447
size, sio = self._record_to_data(version_id, digest, lines)
1286
self._records[version_id] = (digest, lines)
1287
1448
# write to disk
1288
start_pos = self._transport.append(self._filename, sio)
1449
if not self._need_to_create:
1450
start_pos = self._transport.append_file(self._filename, sio)
1452
self._transport.put_file_non_atomic(self._filename, sio,
1453
create_parent_dir=self._create_parent_dir,
1454
mode=self._file_mode,
1455
dir_mode=self._dir_mode)
1456
self._need_to_create = False
1459
self._cache[version_id] = sio.getvalue()
1289
1460
return start_pos, size
1291
1462
def _parse_record_header(self, version_id, raw_data):
1295
1466
as (stream, header_record)
1297
1468
df = GzipFile(mode='rb', fileobj=StringIO(raw_data))
1298
rec = df.readline().split()
1469
rec = self._check_header(version_id, df.readline())
1472
def _check_header(self, version_id, line):
1299
1474
if len(rec) != 4:
1300
raise KnitCorrupt(self._filename, 'unexpected number of elements in record header')
1301
if rec[1].decode('utf-8')!= version_id:
1302
raise KnitCorrupt(self._filename,
1303
'unexpected version, wanted %r, got %r' % (
1304
version_id, rec[1]))
1475
raise KnitCorrupt(self._filename,
1476
'unexpected number of elements in record header')
1477
if rec[1] != version_id:
1478
raise KnitCorrupt(self._filename,
1479
'unexpected version, wanted %r, got %r'
1480
% (version_id, rec[1]))
1307
1483
def _parse_record(self, version_id, data):
1308
1484
# profiling notes:
1309
1485
# 4168 calls in 2880 217 internal
1310
1486
# 4168 calls to _parse_record_header in 2121
1311
1487
# 4168 calls to readlines in 330
1312
df, rec = self._parse_record_header(version_id, data)
1488
df = GzipFile(mode='rb', fileobj=StringIO(data))
1313
1490
record_contents = df.readlines()
1314
l = record_contents.pop()
1491
header = record_contents.pop(0)
1492
rec = self._check_header(version_id, header)
1494
last_line = record_contents.pop()
1315
1495
assert len(record_contents) == int(rec[2])
1316
if l.decode('utf-8') != 'end %s\n' % version_id:
1317
raise KnitCorrupt(self._filename, 'unexpected version end line %r, wanted %r'
1496
if last_line != 'end %s\n' % rec[1]:
1497
raise KnitCorrupt(self._filename,
1498
'unexpected version end line %r, wanted %r'
1499
% (last_line, version_id))
1320
1501
return record_contents, rec[3]
1325
1506
This unpacks enough of the text record to validate the id is
1326
1507
as expected but thats all.
1328
It will actively recompress currently cached records on the
1329
basis that that is cheaper than I/O activity.
1332
for version_id, pos, size in records:
1333
if version_id not in self._records:
1334
needed_records.append((version_id, pos, size))
1336
1509
# setup an iterator of the external records:
1337
1510
# uses readv so nice and fast we hope.
1338
if len(needed_records):
1339
1512
# grab the disk data needed.
1340
raw_records = self._transport.readv(self._filename,
1341
[(pos, size) for version_id, pos, size in needed_records])
1514
# Don't check _cache if it is empty
1515
needed_offsets = [(pos, size) for version_id, pos, size
1517
if version_id not in self._cache]
1519
needed_offsets = [(pos, size) for version_id, pos, size
1522
raw_records = self._transport.readv(self._filename, needed_offsets)
1343
1524
for version_id, pos, size in records:
1344
if version_id in self._records:
1345
# compress a new version
1346
size, sio = self._record_to_data(version_id,
1347
self._records[version_id][0],
1348
self._records[version_id][1])
1349
yield version_id, sio.getvalue()
1525
if version_id in self._cache:
1526
# This data has already been validated
1527
data = self._cache[version_id]
1351
1529
pos, data = raw_records.next()
1531
self._cache[version_id] = data
1352
1533
# validate the header
1353
1534
df, rec = self._parse_record_header(version_id, data)
1355
yield version_id, data
1536
yield version_id, data
1358
1538
def read_records_iter(self, records):
1359
1539
"""Read text records from data file and yield result.
1361
Each passed record is a tuple of (version_id, pos, len) and
1362
will be read in the given order. Yields (version_id,
1541
The result will be returned in whatever is the fastest to read.
1542
Not by the order requested. Also, multiple requests for the same
1543
record will only yield 1 response.
1544
:param records: A list of (version_id, pos, len) entries
1545
:return: Yields (version_id, contents, digest) in the order
1546
read, not the order requested
1366
# 60890 calls for 4168 extractions in 5045, 683 internal.
1367
# 4168 calls to readv in 1411
1368
# 4168 calls to parse_record in 2880
1371
for version_id, pos, size in records:
1372
if version_id not in self._records:
1373
needed_records.append((version_id, pos, size))
1375
if len(needed_records):
1376
# We take it that the transport optimizes the fetching as good
1377
# as possible (ie, reads continous ranges.)
1378
response = self._transport.readv(self._filename,
1379
[(pos, size) for version_id, pos, size in needed_records])
1381
for (record_id, pos, size), (pos, data) in izip(iter(needed_records), response):
1382
content, digest = self._parse_record(record_id, data)
1383
self._records[record_id] = (digest, content)
1385
for version_id, pos, size in records:
1386
yield version_id, list(self._records[version_id][1]), self._records[version_id][0]
1552
# Skip records we have alread seen
1553
yielded_records = set()
1554
needed_records = set()
1555
for record in records:
1556
if record[0] in self._cache:
1557
if record[0] in yielded_records:
1559
yielded_records.add(record[0])
1560
data = self._cache[record[0]]
1561
content, digest = self._parse_record(record[0], data)
1562
yield (record[0], content, digest)
1564
needed_records.add(record)
1565
needed_records = sorted(needed_records, key=operator.itemgetter(1))
1567
needed_records = sorted(set(records), key=operator.itemgetter(1))
1569
if not needed_records:
1572
# The transport optimizes the fetching as well
1573
# (ie, reads continuous ranges.)
1574
readv_response = self._transport.readv(self._filename,
1575
[(pos, size) for version_id, pos, size in needed_records])
1577
for (version_id, pos, size), (pos, data) in \
1578
izip(iter(needed_records), readv_response):
1579
content, digest = self._parse_record(version_id, data)
1581
self._cache[version_id] = data
1582
yield version_id, content, digest
1388
1584
def read_records(self, records):
1389
1585
"""Read records into a dictionary."""
1390
1586
components = {}
1391
for record_id, content, digest in self.read_records_iter(records):
1587
for record_id, content, digest in \
1588
self.read_records_iter(records):
1392
1589
components[record_id] = (content, digest)
1393
1590
return components