175
155
internal representation is of the format:
176
156
(revid, plaintext)
178
# TODO: jam 20070209 The tests expect this to be returned as tuples,
179
# but the code itself doesn't really depend on that.
180
# Figure out a way to not require the overhead of turning the
181
# list back into tuples.
182
lines = [tuple(line.split(' ', 1)) for line in content]
160
origin, text = line.split(' ', 1)
161
lines.append((origin.decode('utf-8'), text))
183
162
return KnitContent(lines)
185
164
def parse_line_delta_iter(self, lines):
186
return iter(self.parse_line_delta(lines))
165
for result_item in self.parse_line_delta[lines]:
188
def parse_line_delta(self, lines, version_id):
168
def parse_line_delta(self, lines, version):
189
169
"""Convert a line based delta into internal representation.
191
171
line delta is in the form of:
192
172
intstart intend intcount
194
174
revid(utf8) newline\n
195
internal representation is
175
internal represnetation is
196
176
(start, end, count, [1..count tuples (revid, newline)])
199
179
lines = iter(lines)
200
180
next = lines.next
203
def cache_and_return(line):
204
origin, text = line.split(' ', 1)
205
return cache.setdefault(origin, origin), text
207
181
# walk through the lines parsing.
208
182
for header in lines:
209
183
start, end, count = [int(n) for n in header.split(',')]
210
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
187
origin, text = next().split(' ', 1)
189
contents.append((origin.decode('utf-8'), text))
211
190
result.append((start, end, count, contents))
214
def get_fulltext_content(self, lines):
215
"""Extract just the content lines from a fulltext."""
216
return (line.split(' ', 1)[1] for line in lines)
218
def get_linedelta_content(self, lines):
219
"""Extract just the content from a line delta.
221
This doesn't return all of the extra information stored in a delta.
222
Only the actual content lines.
227
header = header.split(',')
228
count = int(header[2])
229
for i in xrange(count):
230
origin, text = next().split(' ', 1)
233
193
def lower_fulltext(self, content):
234
194
"""convert a fulltext content record into a serializable form.
236
196
see parse_fulltext which this inverts.
238
# TODO: jam 20070209 We only do the caching thing to make sure that
239
# the origin is a valid utf-8 line, eventually we could remove it
240
return ['%s %s' % (o, t) for o, t in content._lines]
198
return ['%s %s' % (o.encode('utf-8'), t) for o, t in content._lines]
242
200
def lower_line_delta(self, delta):
243
201
"""convert a delta into a serializable form.
245
203
See parse_line_delta which this inverts.
247
# TODO: jam 20070209 We only do the caching thing to make sure that
248
# the origin is a valid utf-8 line, eventually we could remove it
250
206
for start, end, c, lines in delta:
251
207
out.append('%d,%d,%d\n' % (start, end, c))
252
out.extend(origin + ' ' + text
253
for origin, text in lines)
208
for origin, text in lines:
209
out.append('%s %s' % (origin.encode('utf-8'), text))
260
216
annotated = False
262
def parse_fulltext(self, content, version_id):
218
def parse_fulltext(self, content, version):
263
219
"""This parses an unannotated fulltext.
265
221
Note that this is not a noop - the internal representation
266
222
has (versionid, line) - its just a constant versionid.
268
return self.make(content, version_id)
224
return self.make(content, version)
270
def parse_line_delta_iter(self, lines, version_id):
272
num_lines = len(lines)
273
while cur < num_lines:
226
def parse_line_delta_iter(self, lines, version):
228
header = lines.pop(0)
276
229
start, end, c = [int(n) for n in header.split(',')]
277
yield start, end, c, zip([version_id] * c, lines[cur:cur+c])
280
def parse_line_delta(self, lines, version_id):
281
return list(self.parse_line_delta_iter(lines, version_id))
283
def get_fulltext_content(self, lines):
284
"""Extract just the content lines from a fulltext."""
287
def get_linedelta_content(self, lines):
288
"""Extract just the content from a line delta.
290
This doesn't return all of the extra information stored in a delta.
291
Only the actual content lines.
296
header = header.split(',')
297
count = int(header[2])
298
for i in xrange(count):
230
yield start, end, c, zip([version] * c, lines[:c])
233
def parse_line_delta(self, lines, version):
234
return list(self.parse_line_delta_iter(lines, version))
301
236
def lower_fulltext(self, content):
302
237
return content.text()
330
265
stored and retrieved.
333
def __init__(self, relpath, transport, file_mode=None, access_mode=None,
334
factory=None, basis_knit=DEPRECATED_PARAMETER, delta=True,
335
create=False, create_parent_dir=False, delay_create=False,
268
def __init__(self, relpath, transport, file_mode=None, access_mode=None, factory=None,
269
basis_knit=None, delta=True, create=False):
337
270
"""Construct a knit at location specified by relpath.
339
272
:param create: If not True, only open an existing knit.
340
:param create_parent_dir: If True, create the parent directory if
341
creating the file fails. (This is used for stores with
342
hash-prefixes that may not exist yet)
343
:param delay_create: The calling code is aware that the knit won't
344
actually be created until the first data is stored.
346
if deprecated_passed(basis_knit):
347
warnings.warn("KnitVersionedFile.__(): The basis_knit parameter is"
348
" deprecated as of bzr 0.9.",
349
DeprecationWarning, stacklevel=2)
350
274
if access_mode is None:
351
275
access_mode = 'w'
352
276
super(KnitVersionedFile, self).__init__(access_mode)
353
277
assert access_mode in ('r', 'w'), "invalid mode specified %r" % access_mode
278
assert not basis_knit or isinstance(basis_knit, KnitVersionedFile), \
354
281
self.transport = transport
355
282
self.filename = relpath
283
self.basis_knit = basis_knit
356
284
self.factory = factory or KnitAnnotateFactory()
357
285
self.writable = (access_mode == 'w')
358
286
self.delta = delta
360
self._max_delta_chain = 200
362
288
self._index = _KnitIndex(transport, relpath + INDEX_SUFFIX,
363
access_mode, create=create, file_mode=file_mode,
364
create_parent_dir=create_parent_dir, delay_create=delay_create,
289
access_mode, create=create, file_mode=file_mode)
366
290
self._data = _KnitData(transport, relpath + DATA_SUFFIX,
367
access_mode, create=create and not len(self), file_mode=file_mode,
368
create_parent_dir=create_parent_dir, delay_create=delay_create,
291
access_mode, create=create and not len(self), file_mode=file_mode)
371
293
def __repr__(self):
372
294
return '%s(%s)' % (self.__class__.__name__,
373
295
self.transport.abspath(self.filename))
375
def _check_should_delta(self, first_parents):
376
"""Iterate back through the parent listing, looking for a fulltext.
378
This is used when we want to decide whether to add a delta or a new
379
fulltext. It searches for _max_delta_chain parents. When it finds a
380
fulltext parent, it sees if the total size of the deltas leading up to
381
it is large enough to indicate that we want a new full text anyway.
383
Return True if we should create a new delta, False if we should use a
388
delta_parents = first_parents
389
for count in xrange(self._max_delta_chain):
390
parent = delta_parents[0]
391
method = self._index.get_method(parent)
392
pos, size = self._index.get_position(parent)
393
if method == 'fulltext':
397
delta_parents = self._index.get_parents(parent)
399
# We couldn't find a fulltext, so we must create a new one
402
return fulltext_size > delta_size
404
297
def _add_delta(self, version_id, parents, delta_parent, sha1, noeol, delta):
405
298
"""See VersionedFile._add_delta()."""
406
299
self._check_add(version_id, []) # should we check the lines ?
488
380
"""See VersionedFile.copy_to()."""
489
381
# copy the current index to a temp index to avoid racing with local
491
transport.put_file_non_atomic(name + INDEX_SUFFIX + '.tmp',
492
self.transport.get(self._index._filename))
383
transport.put(name + INDEX_SUFFIX + '.tmp', self.transport.get(self._index._filename),)
493
384
# copy the data file
494
f = self._data._open_file()
496
transport.put_file(name + DATA_SUFFIX, f)
499
# move the copied index into place
500
transport.move(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
385
transport.put(name + DATA_SUFFIX, self._data._open_file())
386
# rename the copied index into place
387
transport.rename(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
502
389
def create_empty(self, name, transport, mode=None):
503
return KnitVersionedFile(name, transport, factory=self.factory,
504
delta=self.delta, create=True)
390
return KnitVersionedFile(name, transport, factory=self.factory, delta=self.delta, create=True)
506
def _fix_parents(self, version_id, new_parents):
392
def _fix_parents(self, version, new_parents):
507
393
"""Fix the parents list for version.
509
395
This is done by appending a new version to the index
631
510
diff_hunks.append((op[1], op[2], op[4]-op[3], new_content._lines[op[3]:op[4]]))
632
511
return diff_hunks
634
def _get_components_positions(self, version_ids):
635
"""Produce a map of position data for the components of versions.
637
This data is intended to be used for retrieving the knit records.
639
A dict of version_id to (method, data_pos, data_size, next) is
641
method is the way referenced data should be applied.
642
data_pos is the position of the data in the knit.
643
data_size is the size of the data in the knit.
644
next is the build-parent of the version, or None for fulltexts.
513
def _get_components(self, version_id):
514
"""Return a list of (version_id, method, data) tuples that
515
makes up version specified by version_id of the knit.
517
The components should be applied in the order of the returned
520
The basis knit will be used to the largest extent possible
521
since it is assumed that accesses to it is faster.
647
for version_id in version_ids:
650
while cursor is not None and cursor not in component_data:
651
method = self._index.get_method(cursor)
652
if method == 'fulltext':
655
next = self.get_parents(cursor)[0]
656
data_pos, data_size = self._index.get_position(cursor)
657
component_data[cursor] = (method, data_pos, data_size, next)
659
return component_data
524
# 4168 calls in 14912, 2289 internal
525
# 4168 in 9711 to read_records
526
# 52554 in 1250 to get_parents
527
# 170166 in 865 to list.append
529
# needed_revisions holds a list of (method, version_id) of
530
# versions that is needed to be fetched to construct the final
531
# version of the file.
533
# basis_revisions is a list of versions that needs to be
534
# fetched but exists in the basis knit.
536
basis = self.basis_knit
543
if basis and basis._index.has_version(cursor):
545
basis_versions.append(cursor)
546
method = picked_knit._index.get_method(cursor)
547
needed_versions.append((method, cursor))
548
if method == 'fulltext':
550
cursor = picked_knit.get_parents(cursor)[0]
555
for comp_id in basis_versions:
556
data_pos, data_size = basis._index.get_data_position(comp_id)
557
records.append((piece_id, data_pos, data_size))
558
components.update(basis._data.read_records(records))
561
for comp_id in [vid for method, vid in needed_versions
562
if vid not in basis_versions]:
563
data_pos, data_size = self._index.get_position(comp_id)
564
records.append((comp_id, data_pos, data_size))
565
components.update(self._data.read_records(records))
567
# get_data_records returns a mapping with the version id as
568
# index and the value as data. The order the components need
569
# to be applied is held by needed_versions (reversed).
571
for method, comp_id in reversed(needed_versions):
572
out.append((comp_id, method, components[comp_id]))
661
576
def _get_content(self, version_id, parent_texts={}):
662
577
"""Returns a content object that makes up the specified
668
583
if cached_version is not None:
669
584
return cached_version
671
text_map, contents_map = self._get_content_maps([version_id])
672
return contents_map[version_id]
586
if self.basis_knit and version_id in self.basis_knit:
587
return self.basis_knit._get_content(version_id)
590
components = self._get_components(version_id)
591
for component_id, method, (data, digest) in components:
592
version_idx = self._index.lookup(component_id)
593
if method == 'fulltext':
594
assert content is None
595
content = self.factory.parse_fulltext(data, version_idx)
596
elif method == 'line-delta':
597
delta = self.factory.parse_line_delta(data, version_idx)
598
content._lines = self._apply_delta(content._lines, delta)
600
if 'no-eol' in self._index.get_options(version_id):
601
line = content._lines[-1][1].rstrip('\n')
602
content._lines[-1] = (content._lines[-1][0], line)
604
# digest here is the digest from the last applied component.
605
if sha_strings(content.text()) != digest:
606
import pdb;pdb.set_trace()
607
raise KnitCorrupt(self.filename, 'sha-1 does not match %s' % version_id)
674
611
def _check_versions_present(self, version_ids):
675
612
"""Check that all specified versions are present."""
676
self._index.check_versions_present(version_ids)
613
version_ids = set(version_ids)
614
for r in list(version_ids):
615
if self._index.has_version(r):
616
version_ids.remove(r)
618
raise RevisionNotPresent(list(version_ids)[0], self.filename)
678
620
def _add_lines_with_ghosts(self, version_id, parents, lines, parent_texts):
679
621
"""See VersionedFile.add_lines_with_ghosts()."""
768
718
def _clone_text(self, new_version_id, old_version_id, parents):
769
719
"""See VersionedFile.clone_text()."""
770
# FIXME RBC 20060228 make fast by only inserting an index with null
720
# FIXME RBC 20060228 make fast by only inserting an index with null delta.
772
721
self.add_lines(new_version_id, parents, self.get_lines(old_version_id))
774
723
def get_lines(self, version_id):
775
724
"""See VersionedFile.get_lines()."""
776
return self.get_line_list([version_id])[0]
778
def _get_record_map(self, version_ids):
779
"""Produce a dictionary of knit records.
781
The keys are version_ids, the values are tuples of (method, content,
783
method is the way the content should be applied.
784
content is a KnitContent object.
785
digest is the SHA1 digest of this version id after all steps are done
786
next is the build-parent of the version, i.e. the leftmost ancestor.
787
If the method is fulltext, next will be None.
789
position_map = self._get_components_positions(version_ids)
790
# c = component_id, m = method, p = position, s = size, n = next
791
records = [(c, p, s) for c, (m, p, s, n) in position_map.iteritems()]
793
for component_id, content, digest in \
794
self._data.read_records_iter(records):
795
method, position, size, next = position_map[component_id]
796
record_map[component_id] = method, content, digest, next
800
def get_text(self, version_id):
801
"""See VersionedFile.get_text"""
802
return self.get_texts([version_id])[0]
804
def get_texts(self, version_ids):
805
return [''.join(l) for l in self.get_line_list(version_ids)]
807
def get_line_list(self, version_ids):
808
"""Return the texts of listed versions as a list of strings."""
809
version_ids = [osutils.safe_revision_id(v) for v in version_ids]
810
for version_id in version_ids:
811
self.check_not_reserved_id(version_id)
812
text_map, content_map = self._get_content_maps(version_ids)
813
return [text_map[v] for v in version_ids]
815
def _get_content_maps(self, version_ids):
816
"""Produce maps of text and KnitContents
818
:return: (text_map, content_map) where text_map contains the texts for
819
the requested versions and content_map contains the KnitContents.
820
Both dicts take version_ids as their keys.
822
for version_id in version_ids:
823
if not self.has_version(version_id):
824
raise RevisionNotPresent(version_id, self.filename)
825
record_map = self._get_record_map(version_ids)
830
for version_id in version_ids:
833
while cursor is not None:
834
method, data, digest, next = record_map[cursor]
835
components.append((cursor, method, data, digest))
836
if cursor in content_map:
841
for component_id, method, data, digest in reversed(components):
842
if component_id in content_map:
843
content = content_map[component_id]
845
if method == 'fulltext':
846
assert content is None
847
content = self.factory.parse_fulltext(data, version_id)
848
elif method == 'line-delta':
849
delta = self.factory.parse_line_delta(data, version_id)
850
content = content.copy()
851
content._lines = self._apply_delta(content._lines,
853
content_map[component_id] = content
855
if 'no-eol' in self._index.get_options(version_id):
856
content = content.copy()
857
line = content._lines[-1][1].rstrip('\n')
858
content._lines[-1] = (content._lines[-1][0], line)
859
final_content[version_id] = content
861
# digest here is the digest from the last applied component.
862
text = content.text()
863
if sha_strings(text) != digest:
864
raise KnitCorrupt(self.filename,
865
'sha-1 does not match %s' % version_id)
867
text_map[version_id] = text
868
return text_map, final_content
870
def iter_lines_added_or_present_in_versions(self, version_ids=None,
725
return self._get_content(version_id).text()
727
def iter_lines_added_or_present_in_versions(self, version_ids=None):
872
728
"""See VersionedFile.iter_lines_added_or_present_in_versions()."""
873
729
if version_ids is None:
874
730
version_ids = self.versions()
876
version_ids = [osutils.safe_revision_id(v) for v in version_ids]
878
pb = progress.DummyProgress()
879
# we don't care about inclusions, the caller cares.
731
# we dont care about inclusions, the caller cares.
880
732
# but we need to setup a list of records to visit.
881
733
# we need version_id, position, length
882
734
version_id_records = []
883
requested_versions = set(version_ids)
735
requested_versions = list(version_ids)
884
736
# filter for available versions
885
737
for version_id in requested_versions:
886
738
if not self.has_version(version_id):
887
739
raise RevisionNotPresent(version_id, self.filename)
888
740
# get a in-component-order queue:
889
742
for version_id in self.versions():
890
743
if version_id in requested_versions:
744
version_ids.append(version_id)
891
745
data_pos, length = self._index.get_position(version_id)
892
746
version_id_records.append((version_id, data_pos, length))
748
pb = bzrlib.ui.ui_factory.nested_progress_bar()
894
750
total = len(version_id_records)
895
for version_idx, (version_id, data, sha_value) in \
896
enumerate(self._data.read_records_iter(version_id_records)):
897
pb.update('Walking content.', version_idx, total)
898
method = self._index.get_method(version_id)
900
assert method in ('fulltext', 'line-delta')
901
if method == 'fulltext':
902
line_iterator = self.factory.get_fulltext_content(data)
904
line_iterator = self.factory.get_linedelta_content(data)
905
for line in line_iterator:
908
pb.update('Walking content.', total, total)
752
pb.update('Walking content.', count, total)
753
for version_id, data, sha_value in \
754
self._data.read_records_iter(version_id_records):
755
pb.update('Walking content.', count, total)
756
method = self._index.get_method(version_id)
757
version_idx = self._index.lookup(version_id)
758
assert method in ('fulltext', 'line-delta')
759
if method == 'fulltext':
760
content = self.factory.parse_fulltext(data, version_idx)
761
for line in content.text():
764
delta = self.factory.parse_line_delta(data, version_idx)
765
for start, end, count, lines in delta:
766
for origin, line in lines:
769
pb.update('Walking content.', total, total)
772
pb.update('Walking content.', total, total)
910
776
def num_versions(self):
911
777
"""See VersionedFile.num_versions()."""
1023
884
class _KnitComponentFile(object):
1024
885
"""One of the files used to implement a knit database"""
1026
def __init__(self, transport, filename, mode, file_mode=None,
1027
create_parent_dir=False, dir_mode=None):
887
def __init__(self, transport, filename, mode, file_mode=None):
1028
888
self._transport = transport
1029
889
self._filename = filename
1030
890
self._mode = mode
1031
self._file_mode = file_mode
1032
self._dir_mode = dir_mode
1033
self._create_parent_dir = create_parent_dir
1034
self._need_to_create = False
891
self._file_mode=file_mode
1036
def _full_path(self):
1037
"""Return the full path to this file."""
1038
return self._transport.base + self._filename
893
def write_header(self):
894
if self._transport.append(self._filename, StringIO(self.HEADER),
895
mode=self._file_mode):
896
raise KnitCorrupt(self._filename, 'misaligned after writing header')
1040
898
def check_header(self, fp):
1041
899
line = fp.readline()
1043
# An empty file can actually be treated as though the file doesn't
1045
raise errors.NoSuchFile(self._full_path())
1046
900
if line != self.HEADER:
1047
raise KnitHeaderError(badline=line,
1048
filename=self._transport.abspath(self._filename))
901
raise KnitHeaderError(badline=line)
1050
903
def commit(self):
1051
904
"""Commit is a nop."""
1126
979
self._history.append(version_id)
1128
981
index = self._cache[version_id][5]
1129
self._cache[version_id] = (version_id,
982
self._cache[version_id] = (version_id,
1136
def __init__(self, transport, filename, mode, create=False, file_mode=None,
1137
create_parent_dir=False, delay_create=False, dir_mode=None):
1138
_KnitComponentFile.__init__(self, transport, filename, mode,
1139
file_mode=file_mode,
1140
create_parent_dir=create_parent_dir,
989
def __init__(self, transport, filename, mode, create=False, file_mode=None):
990
_KnitComponentFile.__init__(self, transport, filename, mode, file_mode)
1142
991
self._cache = {}
1143
992
# position in _history is the 'official' index for a revision
1144
993
# but the values may have come from a newer entry.
1145
# so - wc -l of a knit index is != the number of unique names
994
# so - wc -l of a knit index is != the number of uniqe names
1147
996
self._history = []
997
pb = bzrlib.ui.ui_factory.nested_progress_bar()
1149
fp = self._transport.get(self._filename)
1151
# _load_data may raise NoSuchFile if the target knit is
1157
if mode != 'w' or not create:
1160
self._need_to_create = True
1162
self._transport.put_bytes_non_atomic(
1163
self._filename, self.HEADER, mode=self._file_mode)
1165
def _load_data(self, fp):
1167
history = self._history
1169
self.check_header(fp)
1170
# readlines reads the whole file at once:
1171
# bad for transports like http, good for local disk
1172
# we save 60 ms doing this one change (
1173
# from calling readline each time to calling
1175
# probably what we want for nice behaviour on
1176
# http is a incremental readlines that yields, or
1177
# a check for local vs non local indexes,
1178
history_top = len(history) - 1
1179
for line in fp.readlines():
1181
if len(rec) < 5 or rec[-1] != ':':
1183
# FIXME: in the future we should determine if its a
1184
# short write - and ignore it
1185
# or a different failure, and raise. RBC 20060407
1190
for value in rec[4:-1]:
1192
# uncompressed reference
1193
parent_id = value[1:]
1002
pb.update('read knit index', count, total)
1003
fp = self._transport.get(self._filename)
1004
self.check_header(fp)
1005
# readlines reads the whole file at once:
1006
# bad for transports like http, good for local disk
1007
# we save 60 ms doing this one change (
1008
# from calling readline each time to calling
1010
# probably what we want for nice behaviour on
1011
# http is a incremental readlines that yields, or
1012
# a check for local vs non local indexes,
1013
for l in fp.readlines():
1015
if len(rec) < 5 or rec[-1] != ':':
1017
# FIXME: in the future we should determine if its a
1018
# short write - and ignore it
1019
# or a different failure, and raise. RBC 20060407
1023
#pb.update('read knit index', count, total)
1024
# See self._parse_parents
1026
for value in rec[4:-1]:
1028
# uncompressed reference
1029
parents.append(value[1:])
1031
# this is 15/4000ms faster than isinstance,
1033
# this function is called thousands of times a
1034
# second so small variations add up.
1035
assert value.__class__ is str
1036
parents.append(self._history[int(value)])
1037
# end self._parse_parents
1038
# self._cache_version(rec[0],
1039
# rec[1].split(','),
1043
# --- self._cache_version
1044
# only want the _history index to reference the 1st
1045
# index entry for version_id
1047
if version_id not in self._cache:
1048
index = len(self._history)
1049
self._history.append(version_id)
1195
parent_id = history[int(value)]
1196
parents.append(parent_id)
1197
except (IndexError, ValueError), e:
1198
# The parent could not be decoded to get its parent row. This
1199
# at a minimum will cause this row to have wrong parents, or
1200
# even to apply a delta to the wrong base and decode
1201
# incorrectly. its therefore not usable, and because we have
1202
# encountered a situation where a new knit index had this
1203
# corrupt we can't asssume that no other rows referring to the
1204
# index of this record actually mean the subsequent uncorrupt
1206
raise errors.KnitCorrupt(self._filename,
1207
"line %r: %s" % (rec, e))
1209
version_id, options, pos, size = rec[:4]
1210
version_id = version_id
1212
# See self._cache_version
1213
# only want the _history index to reference the 1st
1214
# index entry for version_id
1215
if version_id not in cache:
1218
history.append(version_id)
1051
index = self._cache[version_id][5]
1052
self._cache[version_id] = (version_id,
1058
# --- self._cache_version
1059
except NoSuchFile, e:
1060
if mode != 'w' or not create:
1064
pb.update('read knit index', total, total)
1067
def _parse_parents(self, compressed_parents):
1068
"""convert a list of string parent values into version ids.
1070
ints are looked up in the index.
1071
.FOO values are ghosts and converted in to FOO.
1073
NOTE: the function is retained here for clarity, and for possible
1074
use in partial index reads. However bulk processing now has
1075
it inlined in __init__ for inner-loop optimisation.
1078
for value in compressed_parents:
1079
if value[-1] == '.':
1080
# uncompressed reference
1081
result.append(value[1:])
1220
index = cache[version_id][5]
1221
cache[version_id] = (version_id,
1227
# end self._cache_version
1083
# this is 15/4000ms faster than isinstance,
1084
# this function is called thousands of times a
1085
# second so small variations add up.
1086
assert value.__class__ is str
1087
result.append(self._history[int(value)])
1229
1090
def get_graph(self):
1230
return [(vid, idx[4]) for vid, idx in self._cache.iteritems()]
1092
for version_id, index in self._cache.iteritems():
1093
graph.append((version_id, index[4]))
1232
def get_ancestry(self, versions, topo_sorted=True):
1096
def get_ancestry(self, versions):
1233
1097
"""See VersionedFile.get_ancestry."""
1234
1098
# get a graph of all the mentioned versions:
1236
1100
pending = set(versions)
1239
1102
version = pending.pop()
1103
parents = self._cache[version][4]
1104
# got the parents ok
1242
parents = [p for p in cache[version][4] if p in cache]
1244
raise RevisionNotPresent(version, self._filename)
1245
# if not completed and not a ghost
1246
pending.update([p for p in parents if p not in graph])
1106
parents = [parent for parent in parents if parent in self._cache]
1107
for parent in parents:
1108
# if not completed and not a ghost
1109
if parent not in graph:
1247
1111
graph[version] = parents
1250
1112
return topo_sort(graph.items())
1252
1114
def get_ancestry_with_ghosts(self, versions):
1253
1115
"""See VersionedFile.get_ancestry_with_ghosts."""
1254
1116
# get a graph of all the mentioned versions:
1255
self.check_versions_present(versions)
1258
1118
pending = set(versions)
1260
1120
version = pending.pop()
1262
parents = cache[version][4]
1122
parents = self._cache[version][4]
1263
1123
except KeyError:
1264
1124
# ghost, fake it
1265
1125
graph[version] = []
1268
pending.update([p for p in parents if p not in graph])
1128
# got the parents ok
1129
for parent in parents:
1130
if parent not in graph:
1269
1132
graph[version] = parents
1270
1133
return topo_sort(graph.items())
1307
1169
(version_id, options, pos, size, parents).
1310
orig_history = self._history[:]
1311
orig_cache = self._cache.copy()
1314
for version_id, options, pos, size, parents in versions:
1315
line = "\n%s %s %s %s %s :" % (version_id,
1319
self._version_list_to_index(parents))
1320
assert isinstance(line, str), \
1321
'content must be utf-8 encoded: %r' % (line,)
1323
self._cache_version(version_id, options, pos, size, parents)
1324
if not self._need_to_create:
1325
self._transport.append_bytes(self._filename, ''.join(lines))
1328
sio.write(self.HEADER)
1329
sio.writelines(lines)
1331
self._transport.put_file_non_atomic(self._filename, sio,
1332
create_parent_dir=self._create_parent_dir,
1333
mode=self._file_mode,
1334
dir_mode=self._dir_mode)
1335
self._need_to_create = False
1337
# If any problems happen, restore the original values and re-raise
1338
self._history = orig_history
1339
self._cache = orig_cache
1172
for version_id, options, pos, size, parents in versions:
1173
line = "\n%s %s %s %s %s :" % (version_id.encode('utf-8'),
1177
self._version_list_to_index(parents))
1178
assert isinstance(line, str), \
1179
'content must be utf-8 encoded: %r' % (line,)
1181
self._transport.append(self._filename, StringIO(''.join(lines)))
1182
# cache after writing, so that a failed write leads to missing cache
1183
# entries not extra ones. XXX TODO: RBC 20060502 in the event of a
1184
# failure, reload the index or flush it or some such, to prevent
1185
# writing records that did complete twice.
1186
for version_id, options, pos, size, parents in versions:
1187
self._cache_version(version_id, options, pos, size, parents)
1342
1189
def has_version(self, version_id):
1343
1190
"""True if the version is in the index."""
1344
return version_id in self._cache
1191
return self._cache.has_key(version_id)
1346
1193
def get_position(self, version_id):
1347
1194
"""Return data position and size of specified version."""
1348
entry = self._cache[version_id]
1349
return entry[2], entry[3]
1195
return (self._cache[version_id][2], \
1196
self._cache[version_id][3])
1351
1198
def get_method(self, version_id):
1352
1199
"""Return compression method of specified version."""
1367
1213
if parent in self._cache]
1369
1215
def get_parents_with_ghosts(self, version_id):
1370
"""Return parents of specified version with ghosts."""
1216
"""Return parents of specified version wth ghosts."""
1371
1217
return self._cache[version_id][4]
1373
1219
def check_versions_present(self, version_ids):
1374
1220
"""Check that all specified versions are present."""
1376
for version_id in version_ids:
1377
if version_id not in cache:
1378
raise RevisionNotPresent(version_id, self._filename)
1221
version_ids = set(version_ids)
1222
for version_id in list(version_ids):
1223
if version_id in self._cache:
1224
version_ids.remove(version_id)
1226
raise RevisionNotPresent(list(version_ids)[0], self.filename)
1381
1229
class _KnitData(_KnitComponentFile):
1382
1230
"""Contents of the knit data file"""
1384
def __init__(self, transport, filename, mode, create=False, file_mode=None,
1385
create_parent_dir=False, delay_create=False,
1387
_KnitComponentFile.__init__(self, transport, filename, mode,
1388
file_mode=file_mode,
1389
create_parent_dir=create_parent_dir,
1232
HEADER = "# bzr knit data 8\n"
1234
def __init__(self, transport, filename, mode, create=False, file_mode=None):
1235
_KnitComponentFile.__init__(self, transport, filename, mode)
1391
1237
self._checked = False
1392
# TODO: jam 20060713 conceptually, this could spill to disk
1393
# if the cached size gets larger than a certain amount
1394
# but it complicates the model a bit, so for now just use
1395
# a simple dictionary
1397
self._do_cache = False
1400
self._need_to_create = create
1402
self._transport.put_bytes_non_atomic(self._filename, '',
1403
mode=self._file_mode)
1405
def enable_cache(self):
1406
"""Enable caching of reads."""
1407
self._do_cache = True
1239
self._transport.put(self._filename, StringIO(''), mode=file_mode)
1409
1242
def clear_cache(self):
1410
1243
"""Clear the record cache."""
1411
self._do_cache = False
1414
1246
def _open_file(self):
1416
return self._transport.get(self._filename)
1247
if self._file is None:
1249
self._file = self._transport.get(self._filename)
1421
1254
def _record_to_data(self, version_id, digest, lines):
1422
1255
"""Convert version_id, digest, lines into a raw data block.
1445
1276
:return: the offset in the data file raw_data was written.
1447
1278
assert isinstance(raw_data, str), 'data must be plain bytes'
1448
if not self._need_to_create:
1449
return self._transport.append_bytes(self._filename, raw_data)
1451
self._transport.put_bytes_non_atomic(self._filename, raw_data,
1452
create_parent_dir=self._create_parent_dir,
1453
mode=self._file_mode,
1454
dir_mode=self._dir_mode)
1455
self._need_to_create = False
1279
return self._transport.append(self._filename, StringIO(raw_data))
1458
1281
def add_record(self, version_id, digest, lines):
1459
1282
"""Write new text record to disk. Returns the position in the
1460
1283
file where it was written."""
1461
1284
size, sio = self._record_to_data(version_id, digest, lines)
1286
self._records[version_id] = (digest, lines)
1462
1287
# write to disk
1463
if not self._need_to_create:
1464
start_pos = self._transport.append_file(self._filename, sio)
1466
self._transport.put_file_non_atomic(self._filename, sio,
1467
create_parent_dir=self._create_parent_dir,
1468
mode=self._file_mode,
1469
dir_mode=self._dir_mode)
1470
self._need_to_create = False
1473
self._cache[version_id] = sio.getvalue()
1288
start_pos = self._transport.append(self._filename, sio)
1474
1289
return start_pos, size
1476
1291
def _parse_record_header(self, version_id, raw_data):
1480
1295
as (stream, header_record)
1482
1297
df = GzipFile(mode='rb', fileobj=StringIO(raw_data))
1484
rec = self._check_header(version_id, df.readline())
1485
except Exception, e:
1486
raise KnitCorrupt(self._filename,
1487
"While reading {%s} got %s(%s)"
1488
% (version_id, e.__class__.__name__, str(e)))
1298
rec = df.readline().split()
1300
raise KnitCorrupt(self._filename, 'unexpected number of elements in record header')
1301
if rec[1].decode('utf-8')!= version_id:
1302
raise KnitCorrupt(self._filename,
1303
'unexpected version, wanted %r, got %r' % (
1304
version_id, rec[1]))
1491
def _check_header(self, version_id, line):
1494
raise KnitCorrupt(self._filename,
1495
'unexpected number of elements in record header')
1496
if rec[1] != version_id:
1497
raise KnitCorrupt(self._filename,
1498
'unexpected version, wanted %r, got %r'
1499
% (version_id, rec[1]))
1502
1307
def _parse_record(self, version_id, data):
1503
1308
# profiling notes:
1504
1309
# 4168 calls in 2880 217 internal
1505
1310
# 4168 calls to _parse_record_header in 2121
1506
1311
# 4168 calls to readlines in 330
1507
df = GzipFile(mode='rb', fileobj=StringIO(data))
1510
record_contents = df.readlines()
1511
except Exception, e:
1512
raise KnitCorrupt(self._filename,
1513
"While reading {%s} got %s(%s)"
1514
% (version_id, e.__class__.__name__, str(e)))
1515
header = record_contents.pop(0)
1516
rec = self._check_header(version_id, header)
1518
last_line = record_contents.pop()
1519
if len(record_contents) != int(rec[2]):
1520
raise KnitCorrupt(self._filename,
1521
'incorrect number of lines %s != %s'
1523
% (len(record_contents), int(rec[2]),
1525
if last_line != 'end %s\n' % rec[1]:
1526
raise KnitCorrupt(self._filename,
1527
'unexpected version end line %r, wanted %r'
1528
% (last_line, version_id))
1312
df, rec = self._parse_record_header(version_id, data)
1313
record_contents = df.readlines()
1314
l = record_contents.pop()
1315
assert len(record_contents) == int(rec[2])
1316
if l.decode('utf-8') != 'end %s\n' % version_id:
1317
raise KnitCorrupt(self._filename, 'unexpected version end line %r, wanted %r'
1530
1320
return record_contents, rec[3]
1535
1325
This unpacks enough of the text record to validate the id is
1536
1326
as expected but thats all.
1328
It will actively recompress currently cached records on the
1329
basis that that is cheaper than I/O activity.
1332
for version_id, pos, size in records:
1333
if version_id not in self._records:
1334
needed_records.append((version_id, pos, size))
1538
1336
# setup an iterator of the external records:
1539
1337
# uses readv so nice and fast we hope.
1338
if len(needed_records):
1541
1339
# grab the disk data needed.
1543
# Don't check _cache if it is empty
1544
needed_offsets = [(pos, size) for version_id, pos, size
1546
if version_id not in self._cache]
1548
needed_offsets = [(pos, size) for version_id, pos, size
1551
raw_records = self._transport.readv(self._filename, needed_offsets)
1340
raw_records = self._transport.readv(self._filename,
1341
[(pos, size) for version_id, pos, size in needed_records])
1553
1343
for version_id, pos, size in records:
1554
if version_id in self._cache:
1555
# This data has already been validated
1556
data = self._cache[version_id]
1344
if version_id in self._records:
1345
# compress a new version
1346
size, sio = self._record_to_data(version_id,
1347
self._records[version_id][0],
1348
self._records[version_id][1])
1349
yield version_id, sio.getvalue()
1558
1351
pos, data = raw_records.next()
1560
self._cache[version_id] = data
1562
1352
# validate the header
1563
1353
df, rec = self._parse_record_header(version_id, data)
1565
yield version_id, data
1355
yield version_id, data
1567
1358
def read_records_iter(self, records):
1568
1359
"""Read text records from data file and yield result.
1570
The result will be returned in whatever is the fastest to read.
1571
Not by the order requested. Also, multiple requests for the same
1572
record will only yield 1 response.
1573
:param records: A list of (version_id, pos, len) entries
1574
:return: Yields (version_id, contents, digest) in the order
1575
read, not the order requested
1361
Each passed record is a tuple of (version_id, pos, len) and
1362
will be read in the given order. Yields (version_id,
1581
# Skip records we have alread seen
1582
yielded_records = set()
1583
needed_records = set()
1584
for record in records:
1585
if record[0] in self._cache:
1586
if record[0] in yielded_records:
1588
yielded_records.add(record[0])
1589
data = self._cache[record[0]]
1590
content, digest = self._parse_record(record[0], data)
1591
yield (record[0], content, digest)
1593
needed_records.add(record)
1594
needed_records = sorted(needed_records, key=operator.itemgetter(1))
1596
needed_records = sorted(set(records), key=operator.itemgetter(1))
1598
if not needed_records:
1601
# The transport optimizes the fetching as well
1602
# (ie, reads continuous ranges.)
1603
readv_response = self._transport.readv(self._filename,
1604
[(pos, size) for version_id, pos, size in needed_records])
1606
for (version_id, pos, size), (pos, data) in \
1607
izip(iter(needed_records), readv_response):
1608
content, digest = self._parse_record(version_id, data)
1610
self._cache[version_id] = data
1611
yield version_id, content, digest
1366
# 60890 calls for 4168 extractions in 5045, 683 internal.
1367
# 4168 calls to readv in 1411
1368
# 4168 calls to parse_record in 2880
1371
for version_id, pos, size in records:
1372
if version_id not in self._records:
1373
needed_records.append((version_id, pos, size))
1375
if len(needed_records):
1376
# We take it that the transport optimizes the fetching as good
1377
# as possible (ie, reads continous ranges.)
1378
response = self._transport.readv(self._filename,
1379
[(pos, size) for version_id, pos, size in needed_records])
1381
for (record_id, pos, size), (pos, data) in izip(iter(needed_records), response):
1382
content, digest = self._parse_record(record_id, data)
1383
self._records[record_id] = (digest, content)
1385
for version_id, pos, size in records:
1386
yield version_id, list(self._records[version_id][1]), self._records[version_id][0]
1613
1388
def read_records(self, records):
1614
1389
"""Read records into a dictionary."""
1615
1390
components = {}
1616
for record_id, content, digest in \
1617
self.read_records_iter(records):
1391
for record_id, content, digest in self.read_records_iter(records):
1618
1392
components[record_id] = (content, digest)
1619
1393
return components