1
# Copyright (C) 2006-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Knit versionedfile implementation.
19
A knit is a versioned file implementation that supports efficient append only
23
lifeless: the data file is made up of "delta records". each delta record has a delta header
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3) the digest of
25
the -expanded data- (ie, the delta applied to the parent). the delta also ends with a
26
end-marker; simply "end VERSION"
28
delta can be line or full contents.a
29
... the 8's there are the index number of the annotation.
30
version robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 7 c7d23b2a5bd6ca00e8e266cec0ec228158ee9f9e
34
8 e.set('executable', 'yes')
36
8 if elt.get('executable') == 'yes':
37
8 ie.executable = True
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
42
09:33 < jrydberg> lifeless: each index is made up of a tuple of; version id, options, position, size, parents
43
09:33 < jrydberg> lifeless: the parents are currently dictionary compressed
44
09:33 < jrydberg> lifeless: (meaning it currently does not support ghosts)
45
09:33 < lifeless> right
46
09:33 < jrydberg> lifeless: the position and size is the range in the data file
49
so the index sequence is the dictionary compressed sequence number used
50
in the deltas to provide line annotation
55
from cStringIO import StringIO
56
from itertools import izip
62
from bzrlib.lazy_import import lazy_import
63
lazy_import(globals(), """
85
from bzrlib.errors import (
93
RevisionAlreadyPresent,
96
from bzrlib.osutils import (
103
from bzrlib.versionedfile import (
104
AbsentContentFactory,
108
ChunkedContentFactory,
115
# TODO: Split out code specific to this format into an associated object.
117
# TODO: Can we put in some kind of value to check that the index and data
118
# files belong together?
120
# TODO: accommodate binaries, perhaps by storing a byte count
122
# TODO: function to check whole file
124
# TODO: atomically append data, then measure backwards from the cursor
125
# position after writing to work out where it was located. we may need to
126
# bypass python file buffering.
128
DATA_SUFFIX = '.knit'
129
INDEX_SUFFIX = '.kndx'
130
_STREAM_MIN_BUFFER_SIZE = 5*1024*1024
133
class KnitAdapter(object):
134
"""Base class for knit record adaption."""
136
def __init__(self, basis_vf):
137
"""Create an adapter which accesses full texts from basis_vf.
139
:param basis_vf: A versioned file to access basis texts of deltas from.
140
May be None for adapters that do not need to access basis texts.
142
self._data = KnitVersionedFiles(None, None)
143
self._annotate_factory = KnitAnnotateFactory()
144
self._plain_factory = KnitPlainFactory()
145
self._basis_vf = basis_vf
148
class FTAnnotatedToUnannotated(KnitAdapter):
149
"""An adapter from FT annotated knits to unannotated ones."""
151
def get_bytes(self, factory):
152
annotated_compressed_bytes = factory._raw_record
154
self._data._parse_record_unchecked(annotated_compressed_bytes)
155
content = self._annotate_factory.parse_fulltext(contents, rec[1])
156
size, bytes = self._data._record_to_data((rec[1],), rec[3], content.text())
160
class DeltaAnnotatedToUnannotated(KnitAdapter):
161
"""An adapter for deltas from annotated to unannotated."""
163
def get_bytes(self, factory):
164
annotated_compressed_bytes = factory._raw_record
166
self._data._parse_record_unchecked(annotated_compressed_bytes)
167
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
169
contents = self._plain_factory.lower_line_delta(delta)
170
size, bytes = self._data._record_to_data((rec[1],), rec[3], contents)
174
class FTAnnotatedToFullText(KnitAdapter):
175
"""An adapter from FT annotated knits to unannotated ones."""
177
def get_bytes(self, factory):
178
annotated_compressed_bytes = factory._raw_record
180
self._data._parse_record_unchecked(annotated_compressed_bytes)
181
content, delta = self._annotate_factory.parse_record(factory.key[-1],
182
contents, factory._build_details, None)
183
return ''.join(content.text())
186
class DeltaAnnotatedToFullText(KnitAdapter):
187
"""An adapter for deltas from annotated to unannotated."""
189
def get_bytes(self, factory):
190
annotated_compressed_bytes = factory._raw_record
192
self._data._parse_record_unchecked(annotated_compressed_bytes)
193
delta = self._annotate_factory.parse_line_delta(contents, rec[1],
195
compression_parent = factory.parents[0]
196
basis_entry = self._basis_vf.get_record_stream(
197
[compression_parent], 'unordered', True).next()
198
if basis_entry.storage_kind == 'absent':
199
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
200
basis_chunks = basis_entry.get_bytes_as('chunked')
201
basis_lines = osutils.chunks_to_lines(basis_chunks)
202
# Manually apply the delta because we have one annotated content and
204
basis_content = PlainKnitContent(basis_lines, compression_parent)
205
basis_content.apply_delta(delta, rec[1])
206
basis_content._should_strip_eol = factory._build_details[1]
207
return ''.join(basis_content.text())
210
class FTPlainToFullText(KnitAdapter):
211
"""An adapter from FT plain knits to unannotated ones."""
213
def get_bytes(self, factory):
214
compressed_bytes = factory._raw_record
216
self._data._parse_record_unchecked(compressed_bytes)
217
content, delta = self._plain_factory.parse_record(factory.key[-1],
218
contents, factory._build_details, None)
219
return ''.join(content.text())
222
class DeltaPlainToFullText(KnitAdapter):
223
"""An adapter for deltas from annotated to unannotated."""
225
def get_bytes(self, factory):
226
compressed_bytes = factory._raw_record
228
self._data._parse_record_unchecked(compressed_bytes)
229
delta = self._plain_factory.parse_line_delta(contents, rec[1])
230
compression_parent = factory.parents[0]
231
# XXX: string splitting overhead.
232
basis_entry = self._basis_vf.get_record_stream(
233
[compression_parent], 'unordered', True).next()
234
if basis_entry.storage_kind == 'absent':
235
raise errors.RevisionNotPresent(compression_parent, self._basis_vf)
236
basis_chunks = basis_entry.get_bytes_as('chunked')
237
basis_lines = osutils.chunks_to_lines(basis_chunks)
238
basis_content = PlainKnitContent(basis_lines, compression_parent)
239
# Manually apply the delta because we have one annotated content and
241
content, _ = self._plain_factory.parse_record(rec[1], contents,
242
factory._build_details, basis_content)
243
return ''.join(content.text())
246
class KnitContentFactory(ContentFactory):
247
"""Content factory for streaming from knits.
249
:seealso ContentFactory:
252
def __init__(self, key, parents, build_details, sha1, raw_record,
253
annotated, knit=None, network_bytes=None):
254
"""Create a KnitContentFactory for key.
257
:param parents: The parents.
258
:param build_details: The build details as returned from
260
:param sha1: The sha1 expected from the full text of this object.
261
:param raw_record: The bytes of the knit data from disk.
262
:param annotated: True if the raw data is annotated.
263
:param network_bytes: None to calculate the network bytes on demand,
264
not-none if they are already known.
266
ContentFactory.__init__(self)
269
self.parents = parents
270
if build_details[0] == 'line-delta':
275
annotated_kind = 'annotated-'
278
self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
279
self._raw_record = raw_record
280
self._network_bytes = network_bytes
281
self._build_details = build_details
284
def _create_network_bytes(self):
285
"""Create a fully serialised network version for transmission."""
286
# storage_kind, key, parents, Noeol, raw_record
287
key_bytes = '\x00'.join(self.key)
288
if self.parents is None:
289
parent_bytes = 'None:'
291
parent_bytes = '\t'.join('\x00'.join(key) for key in self.parents)
292
if self._build_details[1]:
296
network_bytes = "%s\n%s\n%s\n%s%s" % (self.storage_kind, key_bytes,
297
parent_bytes, noeol, self._raw_record)
298
self._network_bytes = network_bytes
300
def get_bytes_as(self, storage_kind):
301
if storage_kind == self.storage_kind:
302
if self._network_bytes is None:
303
self._create_network_bytes()
304
return self._network_bytes
305
if ('-ft-' in self.storage_kind and
306
storage_kind in ('chunked', 'fulltext')):
307
adapter_key = (self.storage_kind, 'fulltext')
308
adapter_factory = adapter_registry.get(adapter_key)
309
adapter = adapter_factory(None)
310
bytes = adapter.get_bytes(self)
311
if storage_kind == 'chunked':
315
if self._knit is not None:
316
# Not redundant with direct conversion above - that only handles
318
if storage_kind == 'chunked':
319
return self._knit.get_lines(self.key[0])
320
elif storage_kind == 'fulltext':
321
return self._knit.get_text(self.key[0])
322
raise errors.UnavailableRepresentation(self.key, storage_kind,
326
class LazyKnitContentFactory(ContentFactory):
327
"""A ContentFactory which can either generate full text or a wire form.
329
:seealso ContentFactory:
332
def __init__(self, key, parents, generator, first):
333
"""Create a LazyKnitContentFactory.
335
:param key: The key of the record.
336
:param parents: The parents of the record.
337
:param generator: A _ContentMapGenerator containing the record for this
339
:param first: Is this the first content object returned from generator?
340
if it is, its storage kind is knit-delta-closure, otherwise it is
341
knit-delta-closure-ref
344
self.parents = parents
346
self._generator = generator
347
self.storage_kind = "knit-delta-closure"
349
self.storage_kind = self.storage_kind + "-ref"
352
def get_bytes_as(self, storage_kind):
353
if storage_kind == self.storage_kind:
355
return self._generator._wire_bytes()
357
# all the keys etc are contained in the bytes returned in the
360
if storage_kind in ('chunked', 'fulltext'):
361
chunks = self._generator._get_one_work(self.key).text()
362
if storage_kind == 'chunked':
365
return ''.join(chunks)
366
raise errors.UnavailableRepresentation(self.key, storage_kind,
370
def knit_delta_closure_to_records(storage_kind, bytes, line_end):
371
"""Convert a network record to a iterator over stream records.
373
:param storage_kind: The storage kind of the record.
374
Must be 'knit-delta-closure'.
375
:param bytes: The bytes of the record on the network.
377
generator = _NetworkContentMapGenerator(bytes, line_end)
378
return generator.get_record_stream()
381
def knit_network_to_record(storage_kind, bytes, line_end):
382
"""Convert a network record to a record object.
384
:param storage_kind: The storage kind of the record.
385
:param bytes: The bytes of the record on the network.
388
line_end = bytes.find('\n', start)
389
key = tuple(bytes[start:line_end].split('\x00'))
391
line_end = bytes.find('\n', start)
392
parent_line = bytes[start:line_end]
393
if parent_line == 'None:':
397
[tuple(segment.split('\x00')) for segment in parent_line.split('\t')
400
noeol = bytes[start] == 'N'
401
if 'ft' in storage_kind:
404
method = 'line-delta'
405
build_details = (method, noeol)
407
raw_record = bytes[start:]
408
annotated = 'annotated' in storage_kind
409
return [KnitContentFactory(key, parents, build_details, None, raw_record,
410
annotated, network_bytes=bytes)]
413
class KnitContent(object):
414
"""Content of a knit version to which deltas can be applied.
416
This is always stored in memory as a list of lines with \n at the end,
417
plus a flag saying if the final ending is really there or not, because that
418
corresponds to the on-disk knit representation.
422
self._should_strip_eol = False
424
def apply_delta(self, delta, new_version_id):
425
"""Apply delta to this object to become new_version_id."""
426
raise NotImplementedError(self.apply_delta)
428
def line_delta_iter(self, new_lines):
429
"""Generate line-based delta from this content to new_lines."""
430
new_texts = new_lines.text()
431
old_texts = self.text()
432
s = patiencediff.PatienceSequenceMatcher(None, old_texts, new_texts)
433
for tag, i1, i2, j1, j2 in s.get_opcodes():
436
# ofrom, oto, length, data
437
yield i1, i2, j2 - j1, new_lines._lines[j1:j2]
439
def line_delta(self, new_lines):
440
return list(self.line_delta_iter(new_lines))
443
def get_line_delta_blocks(knit_delta, source, target):
444
"""Extract SequenceMatcher.get_matching_blocks() from a knit delta"""
445
target_len = len(target)
448
for s_begin, s_end, t_len, new_text in knit_delta:
449
true_n = s_begin - s_pos
452
# knit deltas do not provide reliable info about whether the
453
# last line of a file matches, due to eol handling.
454
if source[s_pos + n -1] != target[t_pos + n -1]:
457
yield s_pos, t_pos, n
458
t_pos += t_len + true_n
460
n = target_len - t_pos
462
if source[s_pos + n -1] != target[t_pos + n -1]:
465
yield s_pos, t_pos, n
466
yield s_pos + (target_len - t_pos), target_len, 0
469
class AnnotatedKnitContent(KnitContent):
470
"""Annotated content."""
472
def __init__(self, lines):
473
KnitContent.__init__(self)
477
"""Return a list of (origin, text) for each content line."""
478
lines = self._lines[:]
479
if self._should_strip_eol:
480
origin, last_line = lines[-1]
481
lines[-1] = (origin, last_line.rstrip('\n'))
484
def apply_delta(self, delta, new_version_id):
485
"""Apply delta to this object to become new_version_id."""
488
for start, end, count, delta_lines in delta:
489
lines[offset+start:offset+end] = delta_lines
490
offset = offset + (start - end) + count
494
lines = [text for origin, text in self._lines]
495
except ValueError, e:
496
# most commonly (only?) caused by the internal form of the knit
497
# missing annotation information because of a bug - see thread
499
raise KnitCorrupt(self,
500
"line in annotated knit missing annotation information: %s"
502
if self._should_strip_eol:
503
lines[-1] = lines[-1].rstrip('\n')
507
return AnnotatedKnitContent(self._lines[:])
510
class PlainKnitContent(KnitContent):
511
"""Unannotated content.
513
When annotate[_iter] is called on this content, the same version is reported
514
for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
518
def __init__(self, lines, version_id):
519
KnitContent.__init__(self)
521
self._version_id = version_id
524
"""Return a list of (origin, text) for each content line."""
525
return [(self._version_id, line) for line in self._lines]
527
def apply_delta(self, delta, new_version_id):
528
"""Apply delta to this object to become new_version_id."""
531
for start, end, count, delta_lines in delta:
532
lines[offset+start:offset+end] = delta_lines
533
offset = offset + (start - end) + count
534
self._version_id = new_version_id
537
return PlainKnitContent(self._lines[:], self._version_id)
541
if self._should_strip_eol:
543
lines[-1] = lines[-1].rstrip('\n')
547
class _KnitFactory(object):
548
"""Base class for common Factory functions."""
550
def parse_record(self, version_id, record, record_details,
551
base_content, copy_base_content=True):
552
"""Parse a record into a full content object.
554
:param version_id: The official version id for this content
555
:param record: The data returned by read_records_iter()
556
:param record_details: Details about the record returned by
558
:param base_content: If get_build_details returns a compression_parent,
559
you must return a base_content here, else use None
560
:param copy_base_content: When building from the base_content, decide
561
you can either copy it and return a new object, or modify it in
563
:return: (content, delta) A Content object and possibly a line-delta,
566
method, noeol = record_details
567
if method == 'line-delta':
568
if copy_base_content:
569
content = base_content.copy()
571
content = base_content
572
delta = self.parse_line_delta(record, version_id)
573
content.apply_delta(delta, version_id)
575
content = self.parse_fulltext(record, version_id)
577
content._should_strip_eol = noeol
578
return (content, delta)
581
class KnitAnnotateFactory(_KnitFactory):
582
"""Factory for creating annotated Content objects."""
586
def make(self, lines, version_id):
587
num_lines = len(lines)
588
return AnnotatedKnitContent(zip([version_id] * num_lines, lines))
590
def parse_fulltext(self, content, version_id):
591
"""Convert fulltext to internal representation
593
fulltext content is of the format
594
revid(utf8) plaintext\n
595
internal representation is of the format:
598
# TODO: jam 20070209 The tests expect this to be returned as tuples,
599
# but the code itself doesn't really depend on that.
600
# Figure out a way to not require the overhead of turning the
601
# list back into tuples.
602
lines = [tuple(line.split(' ', 1)) for line in content]
603
return AnnotatedKnitContent(lines)
605
def parse_line_delta_iter(self, lines):
606
return iter(self.parse_line_delta(lines))
608
def parse_line_delta(self, lines, version_id, plain=False):
609
"""Convert a line based delta into internal representation.
611
line delta is in the form of:
612
intstart intend intcount
614
revid(utf8) newline\n
615
internal representation is
616
(start, end, count, [1..count tuples (revid, newline)])
618
:param plain: If True, the lines are returned as a plain
619
list without annotations, not as a list of (origin, content) tuples, i.e.
620
(start, end, count, [1..count newline])
627
def cache_and_return(line):
628
origin, text = line.split(' ', 1)
629
return cache.setdefault(origin, origin), text
631
# walk through the lines parsing.
632
# Note that the plain test is explicitly pulled out of the
633
# loop to minimise any performance impact
636
start, end, count = [int(n) for n in header.split(',')]
637
contents = [next().split(' ', 1)[1] for i in xrange(count)]
638
result.append((start, end, count, contents))
641
start, end, count = [int(n) for n in header.split(',')]
642
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
643
result.append((start, end, count, contents))
646
def get_fulltext_content(self, lines):
647
"""Extract just the content lines from a fulltext."""
648
return (line.split(' ', 1)[1] for line in lines)
650
def get_linedelta_content(self, lines):
651
"""Extract just the content from a line delta.
653
This doesn't return all of the extra information stored in a delta.
654
Only the actual content lines.
659
header = header.split(',')
660
count = int(header[2])
661
for i in xrange(count):
662
origin, text = next().split(' ', 1)
665
def lower_fulltext(self, content):
666
"""convert a fulltext content record into a serializable form.
668
see parse_fulltext which this inverts.
670
return ['%s %s' % (o, t) for o, t in content._lines]
672
def lower_line_delta(self, delta):
673
"""convert a delta into a serializable form.
675
See parse_line_delta which this inverts.
677
# TODO: jam 20070209 We only do the caching thing to make sure that
678
# the origin is a valid utf-8 line, eventually we could remove it
680
for start, end, c, lines in delta:
681
out.append('%d,%d,%d\n' % (start, end, c))
682
out.extend(origin + ' ' + text
683
for origin, text in lines)
686
def annotate(self, knit, key):
687
content = knit._get_content(key)
688
# adjust for the fact that serialised annotations are only key suffixes
690
if type(key) is tuple:
692
origins = content.annotate()
694
for origin, line in origins:
695
result.append((prefix + (origin,), line))
698
# XXX: This smells a bit. Why would key ever be a non-tuple here?
699
# Aren't keys defined to be tuples? -- spiv 20080618
700
return content.annotate()
703
class KnitPlainFactory(_KnitFactory):
704
"""Factory for creating plain Content objects."""
708
def make(self, lines, version_id):
709
return PlainKnitContent(lines, version_id)
711
def parse_fulltext(self, content, version_id):
712
"""This parses an unannotated fulltext.
714
Note that this is not a noop - the internal representation
715
has (versionid, line) - its just a constant versionid.
717
return self.make(content, version_id)
719
def parse_line_delta_iter(self, lines, version_id):
721
num_lines = len(lines)
722
while cur < num_lines:
725
start, end, c = [int(n) for n in header.split(',')]
726
yield start, end, c, lines[cur:cur+c]
729
def parse_line_delta(self, lines, version_id):
730
return list(self.parse_line_delta_iter(lines, version_id))
732
def get_fulltext_content(self, lines):
733
"""Extract just the content lines from a fulltext."""
736
def get_linedelta_content(self, lines):
737
"""Extract just the content from a line delta.
739
This doesn't return all of the extra information stored in a delta.
740
Only the actual content lines.
745
header = header.split(',')
746
count = int(header[2])
747
for i in xrange(count):
750
def lower_fulltext(self, content):
751
return content.text()
753
def lower_line_delta(self, delta):
755
for start, end, c, lines in delta:
756
out.append('%d,%d,%d\n' % (start, end, c))
760
def annotate(self, knit, key):
761
annotator = _KnitAnnotator(knit)
762
return annotator.annotate_flat(key)
766
def make_file_factory(annotated, mapper):
767
"""Create a factory for creating a file based KnitVersionedFiles.
769
This is only functional enough to run interface tests, it doesn't try to
770
provide a full pack environment.
772
:param annotated: knit annotations are wanted.
773
:param mapper: The mapper from keys to paths.
775
def factory(transport):
776
index = _KndxIndex(transport, mapper, lambda:None, lambda:True, lambda:True)
777
access = _KnitKeyAccess(transport, mapper)
778
return KnitVersionedFiles(index, access, annotated=annotated)
782
def make_pack_factory(graph, delta, keylength):
783
"""Create a factory for creating a pack based VersionedFiles.
785
This is only functional enough to run interface tests, it doesn't try to
786
provide a full pack environment.
788
:param graph: Store a graph.
789
:param delta: Delta compress contents.
790
:param keylength: How long should keys be.
792
def factory(transport):
793
parents = graph or delta
799
max_delta_chain = 200
802
graph_index = _mod_index.InMemoryGraphIndex(reference_lists=ref_length,
803
key_elements=keylength)
804
stream = transport.open_write_stream('newpack')
805
writer = pack.ContainerWriter(stream.write)
807
index = _KnitGraphIndex(graph_index, lambda:True, parents=parents,
808
deltas=delta, add_callback=graph_index.add_nodes)
809
access = _DirectPackAccess({})
810
access.set_writer(writer, graph_index, (transport, 'newpack'))
811
result = KnitVersionedFiles(index, access,
812
max_delta_chain=max_delta_chain)
813
result.stream = stream
814
result.writer = writer
819
def cleanup_pack_knit(versioned_files):
820
versioned_files.stream.close()
821
versioned_files.writer.end()
824
def _get_total_build_size(self, keys, positions):
825
"""Determine the total bytes to build these keys.
827
(helper function because _KnitGraphIndex and _KndxIndex work the same, but
828
don't inherit from a common base.)
830
:param keys: Keys that we want to build
831
:param positions: dict of {key, (info, index_memo, comp_parent)} (such
832
as returned by _get_components_positions)
833
:return: Number of bytes to build those keys
835
all_build_index_memos = {}
839
for key in build_keys:
840
# This is mostly for the 'stacked' case
841
# Where we will be getting the data from a fallback
842
if key not in positions:
844
_, index_memo, compression_parent = positions[key]
845
all_build_index_memos[key] = index_memo
846
if compression_parent not in all_build_index_memos:
847
next_keys.add(compression_parent)
848
build_keys = next_keys
849
return sum([index_memo[2] for index_memo
850
in all_build_index_memos.itervalues()])
853
class KnitVersionedFiles(VersionedFiles):
854
"""Storage for many versioned files using knit compression.
856
Backend storage is managed by indices and data objects.
858
:ivar _index: A _KnitGraphIndex or similar that can describe the
859
parents, graph, compression and data location of entries in this
860
KnitVersionedFiles. Note that this is only the index for
861
*this* vfs; if there are fallbacks they must be queried separately.
864
def __init__(self, index, data_access, max_delta_chain=200,
865
annotated=False, reload_func=None):
866
"""Create a KnitVersionedFiles with index and data_access.
868
:param index: The index for the knit data.
869
:param data_access: The access object to store and retrieve knit
871
:param max_delta_chain: The maximum number of deltas to permit during
872
insertion. Set to 0 to prohibit the use of deltas.
873
:param annotated: Set to True to cause annotations to be calculated and
874
stored during insertion.
875
:param reload_func: An function that can be called if we think we need
876
to reload the pack listing and try again. See
877
'bzrlib.repofmt.pack_repo.AggregateIndex' for the signature.
880
self._access = data_access
881
self._max_delta_chain = max_delta_chain
883
self._factory = KnitAnnotateFactory()
885
self._factory = KnitPlainFactory()
886
self._fallback_vfs = []
887
self._reload_func = reload_func
890
return "%s(%r, %r)" % (
891
self.__class__.__name__,
895
def add_fallback_versioned_files(self, a_versioned_files):
896
"""Add a source of texts for texts not present in this knit.
898
:param a_versioned_files: A VersionedFiles object.
900
self._fallback_vfs.append(a_versioned_files)
902
def add_lines(self, key, parents, lines, parent_texts=None,
903
left_matching_blocks=None, nostore_sha=None, random_id=False,
905
"""See VersionedFiles.add_lines()."""
906
self._index._check_write_ok()
907
self._check_add(key, lines, random_id, check_content)
909
# The caller might pass None if there is no graph data, but kndx
910
# indexes can't directly store that, so we give them
911
# an empty tuple instead.
913
line_bytes = ''.join(lines)
914
return self._add(key, lines, parents,
915
parent_texts, left_matching_blocks, nostore_sha, random_id,
916
line_bytes=line_bytes)
918
def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
919
"""See VersionedFiles._add_text()."""
920
self._index._check_write_ok()
921
self._check_add(key, None, random_id, check_content=False)
922
if text.__class__ is not str:
923
raise errors.BzrBadParameterUnicode("text")
925
# The caller might pass None if there is no graph data, but kndx
926
# indexes can't directly store that, so we give them
927
# an empty tuple instead.
929
return self._add(key, None, parents,
930
None, None, nostore_sha, random_id,
933
def _add(self, key, lines, parents, parent_texts,
934
left_matching_blocks, nostore_sha, random_id,
936
"""Add a set of lines on top of version specified by parents.
938
Any versions not present will be converted into ghosts.
940
:param lines: A list of strings where each one is a single line (has a
941
single newline at the end of the string) This is now optional
942
(callers can pass None). It is left in its location for backwards
943
compatibility. It should ''.join(lines) must == line_bytes
944
:param line_bytes: A single string containing the content
946
We pass both lines and line_bytes because different routes bring the
947
values to this function. And for memory efficiency, we don't want to
948
have to split/join on-demand.
950
# first thing, if the content is something we don't need to store, find
952
digest = sha_string(line_bytes)
953
if nostore_sha == digest:
954
raise errors.ExistingContent
957
if parent_texts is None:
959
# Do a single query to ascertain parent presence; we only compress
960
# against parents in the same kvf.
961
present_parent_map = self._index.get_parent_map(parents)
962
for parent in parents:
963
if parent in present_parent_map:
964
present_parents.append(parent)
966
# Currently we can only compress against the left most present parent.
967
if (len(present_parents) == 0 or
968
present_parents[0] != parents[0]):
971
# To speed the extract of texts the delta chain is limited
972
# to a fixed number of deltas. This should minimize both
973
# I/O and the time spend applying deltas.
974
delta = self._check_should_delta(present_parents[0])
976
text_length = len(line_bytes)
979
# Note: line_bytes is not modified to add a newline, that is tracked
980
# via the no_eol flag. 'lines' *is* modified, because that is the
981
# general values needed by the Content code.
982
if line_bytes and line_bytes[-1] != '\n':
983
options.append('no-eol')
985
# Copy the existing list, or create a new one
987
lines = osutils.split_lines(line_bytes)
990
# Replace the last line with one that ends in a final newline
991
lines[-1] = lines[-1] + '\n'
993
lines = osutils.split_lines(line_bytes)
995
for element in key[:-1]:
996
if type(element) is not str:
997
raise TypeError("key contains non-strings: %r" % (key,))
999
key = key[:-1] + ('sha1:' + digest,)
1000
elif type(key[-1]) is not str:
1001
raise TypeError("key contains non-strings: %r" % (key,))
1002
# Knit hunks are still last-element only
1003
version_id = key[-1]
1004
content = self._factory.make(lines, version_id)
1006
# Hint to the content object that its text() call should strip the
1008
content._should_strip_eol = True
1009
if delta or (self._factory.annotated and len(present_parents) > 0):
1010
# Merge annotations from parent texts if needed.
1011
delta_hunks = self._merge_annotations(content, present_parents,
1012
parent_texts, delta, self._factory.annotated,
1013
left_matching_blocks)
1016
options.append('line-delta')
1017
store_lines = self._factory.lower_line_delta(delta_hunks)
1018
size, bytes = self._record_to_data(key, digest,
1021
options.append('fulltext')
1022
# isinstance is slower and we have no hierarchy.
1023
if self._factory.__class__ is KnitPlainFactory:
1024
# Use the already joined bytes saving iteration time in
1026
dense_lines = [line_bytes]
1028
dense_lines.append('\n')
1029
size, bytes = self._record_to_data(key, digest,
1032
# get mixed annotation + content and feed it into the
1034
store_lines = self._factory.lower_fulltext(content)
1035
size, bytes = self._record_to_data(key, digest,
1038
access_memo = self._access.add_raw_records([(key, size)], bytes)[0]
1039
self._index.add_records(
1040
((key, options, access_memo, parents),),
1041
random_id=random_id)
1042
return digest, text_length, content
1044
def annotate(self, key):
1045
"""See VersionedFiles.annotate."""
1046
return self._factory.annotate(self, key)
1048
def get_annotator(self):
1049
return _KnitAnnotator(self)
1051
def check(self, progress_bar=None, keys=None):
1052
"""See VersionedFiles.check()."""
1054
return self._logical_check()
1056
# At the moment, check does not extra work over get_record_stream
1057
return self.get_record_stream(keys, 'unordered', True)
1059
def _logical_check(self):
1060
# This doesn't actually test extraction of everything, but that will
1061
# impact 'bzr check' substantially, and needs to be integrated with
1062
# care. However, it does check for the obvious problem of a delta with
1064
keys = self._index.keys()
1065
parent_map = self.get_parent_map(keys)
1067
if self._index.get_method(key) != 'fulltext':
1068
compression_parent = parent_map[key][0]
1069
if compression_parent not in parent_map:
1070
raise errors.KnitCorrupt(self,
1071
"Missing basis parent %s for %s" % (
1072
compression_parent, key))
1073
for fallback_vfs in self._fallback_vfs:
1074
fallback_vfs.check()
1076
def _check_add(self, key, lines, random_id, check_content):
1077
"""check that version_id and lines are safe to add."""
1078
version_id = key[-1]
1079
if version_id is not None:
1080
if contains_whitespace(version_id):
1081
raise InvalidRevisionId(version_id, self)
1082
self.check_not_reserved_id(version_id)
1083
# TODO: If random_id==False and the key is already present, we should
1084
# probably check that the existing content is identical to what is
1085
# being inserted, and otherwise raise an exception. This would make
1086
# the bundle code simpler.
1088
self._check_lines_not_unicode(lines)
1089
self._check_lines_are_lines(lines)
1091
def _check_header(self, key, line):
1092
rec = self._split_header(line)
1093
self._check_header_version(rec, key[-1])
1096
def _check_header_version(self, rec, version_id):
1097
"""Checks the header version on original format knit records.
1099
These have the last component of the key embedded in the record.
1101
if rec[1] != version_id:
1102
raise KnitCorrupt(self,
1103
'unexpected version, wanted %r, got %r' % (version_id, rec[1]))
1105
def _check_should_delta(self, parent):
1106
"""Iterate back through the parent listing, looking for a fulltext.
1108
This is used when we want to decide whether to add a delta or a new
1109
fulltext. It searches for _max_delta_chain parents. When it finds a
1110
fulltext parent, it sees if the total size of the deltas leading up to
1111
it is large enough to indicate that we want a new full text anyway.
1113
Return True if we should create a new delta, False if we should use a
1117
fulltext_size = None
1118
for count in xrange(self._max_delta_chain):
1120
# Note that this only looks in the index of this particular
1121
# KnitVersionedFiles, not in the fallbacks. This ensures that
1122
# we won't store a delta spanning physical repository
1124
build_details = self._index.get_build_details([parent])
1125
parent_details = build_details[parent]
1126
except (RevisionNotPresent, KeyError), e:
1127
# Some basis is not locally present: always fulltext
1129
index_memo, compression_parent, _, _ = parent_details
1130
_, _, size = index_memo
1131
if compression_parent is None:
1132
fulltext_size = size
1135
# We don't explicitly check for presence because this is in an
1136
# inner loop, and if it's missing it'll fail anyhow.
1137
parent = compression_parent
1139
# We couldn't find a fulltext, so we must create a new one
1141
# Simple heuristic - if the total I/O wold be greater as a delta than
1142
# the originally installed fulltext, we create a new fulltext.
1143
return fulltext_size > delta_size
1145
def _build_details_to_components(self, build_details):
1146
"""Convert a build_details tuple to a position tuple."""
1147
# record_details, access_memo, compression_parent
1148
return build_details[3], build_details[0], build_details[1]
1150
def _get_components_positions(self, keys, allow_missing=False):
1151
"""Produce a map of position data for the components of keys.
1153
This data is intended to be used for retrieving the knit records.
1155
A dict of key to (record_details, index_memo, next, parents) is
1157
method is the way referenced data should be applied.
1158
index_memo is the handle to pass to the data access to actually get the
1160
next is the build-parent of the version, or None for fulltexts.
1161
parents is the version_ids of the parents of this version
1163
:param allow_missing: If True do not raise an error on a missing component,
1167
pending_components = keys
1168
while pending_components:
1169
build_details = self._index.get_build_details(pending_components)
1170
current_components = set(pending_components)
1171
pending_components = set()
1172
for key, details in build_details.iteritems():
1173
(index_memo, compression_parent, parents,
1174
record_details) = details
1175
method = record_details[0]
1176
if compression_parent is not None:
1177
pending_components.add(compression_parent)
1178
component_data[key] = self._build_details_to_components(details)
1179
missing = current_components.difference(build_details)
1180
if missing and not allow_missing:
1181
raise errors.RevisionNotPresent(missing.pop(), self)
1182
return component_data
1184
def _get_content(self, key, parent_texts={}):
1185
"""Returns a content object that makes up the specified
1187
cached_version = parent_texts.get(key, None)
1188
if cached_version is not None:
1189
# Ensure the cache dict is valid.
1190
if not self.get_parent_map([key]):
1191
raise RevisionNotPresent(key, self)
1192
return cached_version
1193
generator = _VFContentMapGenerator(self, [key])
1194
return generator._get_content(key)
1196
def get_known_graph_ancestry(self, keys):
1197
"""Get a KnownGraph instance with the ancestry of keys."""
1198
parent_map, missing_keys = self._index.find_ancestry(keys)
1199
for fallback in self._transitive_fallbacks():
1200
if not missing_keys:
1202
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1204
parent_map.update(f_parent_map)
1205
missing_keys = f_missing_keys
1206
kg = _mod_graph.KnownGraph(parent_map)
1209
def get_parent_map(self, keys):
1210
"""Get a map of the graph parents of keys.
1212
:param keys: The keys to look up parents for.
1213
:return: A mapping from keys to parents. Absent keys are absent from
1216
return self._get_parent_map_with_sources(keys)[0]
1218
def _get_parent_map_with_sources(self, keys):
1219
"""Get a map of the parents of keys.
1221
:param keys: The keys to look up parents for.
1222
:return: A tuple. The first element is a mapping from keys to parents.
1223
Absent keys are absent from the mapping. The second element is a
1224
list with the locations each key was found in. The first element
1225
is the in-this-knit parents, the second the first fallback source,
1229
sources = [self._index] + self._fallback_vfs
1232
for source in sources:
1235
new_result = source.get_parent_map(missing)
1236
source_results.append(new_result)
1237
result.update(new_result)
1238
missing.difference_update(set(new_result))
1239
return result, source_results
1241
def _get_record_map(self, keys, allow_missing=False):
1242
"""Produce a dictionary of knit records.
1244
:return: {key:(record, record_details, digest, next)}
1246
data returned from read_records (a KnitContentobject)
1248
opaque information to pass to parse_record
1250
SHA1 digest of the full text after all steps are done
1252
build-parent of the version, i.e. the leftmost ancestor.
1253
Will be None if the record is not a delta.
1254
:param keys: The keys to build a map for
1255
:param allow_missing: If some records are missing, rather than
1256
error, just return the data that could be generated.
1258
raw_map = self._get_record_map_unparsed(keys,
1259
allow_missing=allow_missing)
1260
return self._raw_map_to_record_map(raw_map)
1262
def _raw_map_to_record_map(self, raw_map):
1263
"""Parse the contents of _get_record_map_unparsed.
1265
:return: see _get_record_map.
1269
data, record_details, next = raw_map[key]
1270
content, digest = self._parse_record(key[-1], data)
1271
result[key] = content, record_details, digest, next
1274
def _get_record_map_unparsed(self, keys, allow_missing=False):
1275
"""Get the raw data for reconstructing keys without parsing it.
1277
:return: A dict suitable for parsing via _raw_map_to_record_map.
1278
key-> raw_bytes, (method, noeol), compression_parent
1280
# This retries the whole request if anything fails. Potentially we
1281
# could be a bit more selective. We could track the keys whose records
1282
# we have successfully found, and then only request the new records
1283
# from there. However, _get_components_positions grabs the whole build
1284
# chain, which means we'll likely try to grab the same records again
1285
# anyway. Also, can the build chains change as part of a pack
1286
# operation? We wouldn't want to end up with a broken chain.
1289
position_map = self._get_components_positions(keys,
1290
allow_missing=allow_missing)
1291
# key = component_id, r = record_details, i_m = index_memo,
1293
records = [(key, i_m) for key, (r, i_m, n)
1294
in position_map.iteritems()]
1295
# Sort by the index memo, so that we request records from the
1296
# same pack file together, and in forward-sorted order
1297
records.sort(key=operator.itemgetter(1))
1299
for key, data in self._read_records_iter_unchecked(records):
1300
(record_details, index_memo, next) = position_map[key]
1301
raw_record_map[key] = data, record_details, next
1302
return raw_record_map
1303
except errors.RetryWithNewPacks, e:
1304
self._access.reload_or_raise(e)
1307
def _split_by_prefix(cls, keys):
1308
"""For the given keys, split them up based on their prefix.
1310
To keep memory pressure somewhat under control, split the
1311
requests back into per-file-id requests, otherwise "bzr co"
1312
extracts the full tree into memory before writing it to disk.
1313
This should be revisited if _get_content_maps() can ever cross
1316
The keys for a given file_id are kept in the same relative order.
1317
Ordering between file_ids is not, though prefix_order will return the
1318
order that the key was first seen.
1320
:param keys: An iterable of key tuples
1321
:return: (split_map, prefix_order)
1322
split_map A dictionary mapping prefix => keys
1323
prefix_order The order that we saw the various prefixes
1325
split_by_prefix = {}
1333
if prefix in split_by_prefix:
1334
split_by_prefix[prefix].append(key)
1336
split_by_prefix[prefix] = [key]
1337
prefix_order.append(prefix)
1338
return split_by_prefix, prefix_order
1340
def _group_keys_for_io(self, keys, non_local_keys, positions,
1341
_min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
1342
"""For the given keys, group them into 'best-sized' requests.
1344
The idea is to avoid making 1 request per file, but to never try to
1345
unpack an entire 1.5GB source tree in a single pass. Also when
1346
possible, we should try to group requests to the same pack file
1349
:return: list of (keys, non_local) tuples that indicate what keys
1350
should be fetched next.
1352
# TODO: Ideally we would group on 2 factors. We want to extract texts
1353
# from the same pack file together, and we want to extract all
1354
# the texts for a given build-chain together. Ultimately it
1355
# probably needs a better global view.
1356
total_keys = len(keys)
1357
prefix_split_keys, prefix_order = self._split_by_prefix(keys)
1358
prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
1360
cur_non_local = set()
1364
for prefix in prefix_order:
1365
keys = prefix_split_keys[prefix]
1366
non_local = prefix_split_non_local_keys.get(prefix, [])
1368
this_size = self._index._get_total_build_size(keys, positions)
1369
cur_size += this_size
1370
cur_keys.extend(keys)
1371
cur_non_local.update(non_local)
1372
if cur_size > _min_buffer_size:
1373
result.append((cur_keys, cur_non_local))
1374
sizes.append(cur_size)
1376
cur_non_local = set()
1379
result.append((cur_keys, cur_non_local))
1380
sizes.append(cur_size)
1383
def get_record_stream(self, keys, ordering, include_delta_closure):
1384
"""Get a stream of records for keys.
1386
:param keys: The keys to include.
1387
:param ordering: Either 'unordered' or 'topological'. A topologically
1388
sorted stream has compression parents strictly before their
1390
:param include_delta_closure: If True then the closure across any
1391
compression parents will be included (in the opaque data).
1392
:return: An iterator of ContentFactory objects, each of which is only
1393
valid until the iterator is advanced.
1395
# keys might be a generator
1399
if not self._index.has_graph:
1400
# Cannot sort when no graph has been stored.
1401
ordering = 'unordered'
1403
remaining_keys = keys
1406
keys = set(remaining_keys)
1407
for content_factory in self._get_remaining_record_stream(keys,
1408
ordering, include_delta_closure):
1409
remaining_keys.discard(content_factory.key)
1410
yield content_factory
1412
except errors.RetryWithNewPacks, e:
1413
self._access.reload_or_raise(e)
1415
def _get_remaining_record_stream(self, keys, ordering,
1416
include_delta_closure):
1417
"""This function is the 'retry' portion for get_record_stream."""
1418
if include_delta_closure:
1419
positions = self._get_components_positions(keys, allow_missing=True)
1421
build_details = self._index.get_build_details(keys)
1423
# (record_details, access_memo, compression_parent_key)
1424
positions = dict((key, self._build_details_to_components(details))
1425
for key, details in build_details.iteritems())
1426
absent_keys = keys.difference(set(positions))
1427
# There may be more absent keys : if we're missing the basis component
1428
# and are trying to include the delta closure.
1429
# XXX: We should not ever need to examine remote sources because we do
1430
# not permit deltas across versioned files boundaries.
1431
if include_delta_closure:
1432
needed_from_fallback = set()
1433
# Build up reconstructable_keys dict. key:True in this dict means
1434
# the key can be reconstructed.
1435
reconstructable_keys = {}
1439
chain = [key, positions[key][2]]
1441
needed_from_fallback.add(key)
1444
while chain[-1] is not None:
1445
if chain[-1] in reconstructable_keys:
1446
result = reconstructable_keys[chain[-1]]
1450
chain.append(positions[chain[-1]][2])
1452
# missing basis component
1453
needed_from_fallback.add(chain[-1])
1456
for chain_key in chain[:-1]:
1457
reconstructable_keys[chain_key] = result
1459
needed_from_fallback.add(key)
1460
# Double index lookups here : need a unified api ?
1461
global_map, parent_maps = self._get_parent_map_with_sources(keys)
1462
if ordering in ('topological', 'groupcompress'):
1463
if ordering == 'topological':
1464
# Global topological sort
1465
present_keys = tsort.topo_sort(global_map)
1467
present_keys = sort_groupcompress(global_map)
1468
# Now group by source:
1470
current_source = None
1471
for key in present_keys:
1472
for parent_map in parent_maps:
1473
if key in parent_map:
1474
key_source = parent_map
1476
if current_source is not key_source:
1477
source_keys.append((key_source, []))
1478
current_source = key_source
1479
source_keys[-1][1].append(key)
1481
if ordering != 'unordered':
1482
raise AssertionError('valid values for ordering are:'
1483
' "unordered", "groupcompress" or "topological" not: %r'
1485
# Just group by source; remote sources first.
1488
for parent_map in reversed(parent_maps):
1489
source_keys.append((parent_map, []))
1490
for key in parent_map:
1491
present_keys.append(key)
1492
source_keys[-1][1].append(key)
1493
# We have been requested to return these records in an order that
1494
# suits us. So we ask the index to give us an optimally sorted
1496
for source, sub_keys in source_keys:
1497
if source is parent_maps[0]:
1498
# Only sort the keys for this VF
1499
self._index._sort_keys_by_io(sub_keys, positions)
1500
absent_keys = keys - set(global_map)
1501
for key in absent_keys:
1502
yield AbsentContentFactory(key)
1503
# restrict our view to the keys we can answer.
1504
# XXX: Memory: TODO: batch data here to cap buffered data at (say) 1MB.
1505
# XXX: At that point we need to consider the impact of double reads by
1506
# utilising components multiple times.
1507
if include_delta_closure:
1508
# XXX: get_content_maps performs its own index queries; allow state
1510
non_local_keys = needed_from_fallback - absent_keys
1511
for keys, non_local_keys in self._group_keys_for_io(present_keys,
1514
generator = _VFContentMapGenerator(self, keys, non_local_keys,
1517
for record in generator.get_record_stream():
1520
for source, keys in source_keys:
1521
if source is parent_maps[0]:
1522
# this KnitVersionedFiles
1523
records = [(key, positions[key][1]) for key in keys]
1524
for key, raw_data in self._read_records_iter_unchecked(records):
1525
(record_details, index_memo, _) = positions[key]
1526
yield KnitContentFactory(key, global_map[key],
1527
record_details, None, raw_data, self._factory.annotated, None)
1529
vf = self._fallback_vfs[parent_maps.index(source) - 1]
1530
for record in vf.get_record_stream(keys, ordering,
1531
include_delta_closure):
1534
def get_sha1s(self, keys):
1535
"""See VersionedFiles.get_sha1s()."""
1537
record_map = self._get_record_map(missing, allow_missing=True)
1539
for key, details in record_map.iteritems():
1540
if key not in missing:
1542
# record entry 2 is the 'digest'.
1543
result[key] = details[2]
1544
missing.difference_update(set(result))
1545
for source in self._fallback_vfs:
1548
new_result = source.get_sha1s(missing)
1549
result.update(new_result)
1550
missing.difference_update(set(new_result))
1553
def insert_record_stream(self, stream):
1554
"""Insert a record stream into this container.
1556
:param stream: A stream of records to insert.
1558
:seealso VersionedFiles.get_record_stream:
1560
def get_adapter(adapter_key):
1562
return adapters[adapter_key]
1564
adapter_factory = adapter_registry.get(adapter_key)
1565
adapter = adapter_factory(self)
1566
adapters[adapter_key] = adapter
1569
if self._factory.annotated:
1570
# self is annotated, we need annotated knits to use directly.
1571
annotated = "annotated-"
1574
# self is not annotated, but we can strip annotations cheaply.
1576
convertibles = set(["knit-annotated-ft-gz"])
1577
if self._max_delta_chain:
1578
delta_types.add("knit-annotated-delta-gz")
1579
convertibles.add("knit-annotated-delta-gz")
1580
# The set of types we can cheaply adapt without needing basis texts.
1581
native_types = set()
1582
if self._max_delta_chain:
1583
native_types.add("knit-%sdelta-gz" % annotated)
1584
delta_types.add("knit-%sdelta-gz" % annotated)
1585
native_types.add("knit-%sft-gz" % annotated)
1586
knit_types = native_types.union(convertibles)
1588
# Buffer all index entries that we can't add immediately because their
1589
# basis parent is missing. We don't buffer all because generating
1590
# annotations may require access to some of the new records. However we
1591
# can't generate annotations from new deltas until their basis parent
1592
# is present anyway, so we get away with not needing an index that
1593
# includes the new keys.
1595
# See <http://launchpad.net/bugs/300177> about ordering of compression
1596
# parents in the records - to be conservative, we insist that all
1597
# parents must be present to avoid expanding to a fulltext.
1599
# key = basis_parent, value = index entry to add
1600
buffered_index_entries = {}
1601
for record in stream:
1602
kind = record.storage_kind
1603
if kind.startswith('knit-') and kind.endswith('-gz'):
1604
# Check that the ID in the header of the raw knit bytes matches
1605
# the record metadata.
1606
raw_data = record._raw_record
1607
df, rec = self._parse_record_header(record.key, raw_data)
1610
parents = record.parents
1611
if record.storage_kind in delta_types:
1612
# TODO: eventually the record itself should track
1613
# compression_parent
1614
compression_parent = parents[0]
1616
compression_parent = None
1617
# Raise an error when a record is missing.
1618
if record.storage_kind == 'absent':
1619
raise RevisionNotPresent([record.key], self)
1620
elif ((record.storage_kind in knit_types)
1621
and (compression_parent is None
1622
or not self._fallback_vfs
1623
or self._index.has_key(compression_parent)
1624
or not self.has_key(compression_parent))):
1625
# we can insert the knit record literally if either it has no
1626
# compression parent OR we already have its basis in this kvf
1627
# OR the basis is not present even in the fallbacks. In the
1628
# last case it will either turn up later in the stream and all
1629
# will be well, or it won't turn up at all and we'll raise an
1632
# TODO: self.has_key is somewhat redundant with
1633
# self._index.has_key; we really want something that directly
1634
# asks if it's only present in the fallbacks. -- mbp 20081119
1635
if record.storage_kind not in native_types:
1637
adapter_key = (record.storage_kind, "knit-delta-gz")
1638
adapter = get_adapter(adapter_key)
1640
adapter_key = (record.storage_kind, "knit-ft-gz")
1641
adapter = get_adapter(adapter_key)
1642
bytes = adapter.get_bytes(record)
1644
# It's a knit record, it has a _raw_record field (even if
1645
# it was reconstituted from a network stream).
1646
bytes = record._raw_record
1647
options = [record._build_details[0]]
1648
if record._build_details[1]:
1649
options.append('no-eol')
1650
# Just blat it across.
1651
# Note: This does end up adding data on duplicate keys. As
1652
# modern repositories use atomic insertions this should not
1653
# lead to excessive growth in the event of interrupted fetches.
1654
# 'knit' repositories may suffer excessive growth, but as a
1655
# deprecated format this is tolerable. It can be fixed if
1656
# needed by in the kndx index support raising on a duplicate
1657
# add with identical parents and options.
1658
access_memo = self._access.add_raw_records(
1659
[(record.key, len(bytes))], bytes)[0]
1660
index_entry = (record.key, options, access_memo, parents)
1661
if 'fulltext' not in options:
1662
# Not a fulltext, so we need to make sure the compression
1663
# parent will also be present.
1664
# Note that pack backed knits don't need to buffer here
1665
# because they buffer all writes to the transaction level,
1666
# but we don't expose that difference at the index level. If
1667
# the query here has sufficient cost to show up in
1668
# profiling we should do that.
1670
# They're required to be physically in this
1671
# KnitVersionedFiles, not in a fallback.
1672
if not self._index.has_key(compression_parent):
1673
pending = buffered_index_entries.setdefault(
1674
compression_parent, [])
1675
pending.append(index_entry)
1678
self._index.add_records([index_entry])
1679
elif record.storage_kind == 'chunked':
1680
self.add_lines(record.key, parents,
1681
osutils.chunks_to_lines(record.get_bytes_as('chunked')))
1683
# Not suitable for direct insertion as a
1684
# delta, either because it's not the right format, or this
1685
# KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1686
# 0) or because it depends on a base only present in the
1688
self._access.flush()
1690
# Try getting a fulltext directly from the record.
1691
bytes = record.get_bytes_as('fulltext')
1692
except errors.UnavailableRepresentation:
1693
adapter_key = record.storage_kind, 'fulltext'
1694
adapter = get_adapter(adapter_key)
1695
bytes = adapter.get_bytes(record)
1696
lines = split_lines(bytes)
1698
self.add_lines(record.key, parents, lines)
1699
except errors.RevisionAlreadyPresent:
1701
# Add any records whose basis parent is now available.
1703
added_keys = [record.key]
1705
key = added_keys.pop(0)
1706
if key in buffered_index_entries:
1707
index_entries = buffered_index_entries[key]
1708
self._index.add_records(index_entries)
1710
[index_entry[0] for index_entry in index_entries])
1711
del buffered_index_entries[key]
1712
if buffered_index_entries:
1713
# There were index entries buffered at the end of the stream,
1714
# So these need to be added (if the index supports holding such
1715
# entries for later insertion)
1717
for key in buffered_index_entries:
1718
index_entries = buffered_index_entries[key]
1719
all_entries.extend(index_entries)
1720
self._index.add_records(
1721
all_entries, missing_compression_parents=True)
1723
def get_missing_compression_parent_keys(self):
1724
"""Return an iterable of keys of missing compression parents.
1726
Check this after calling insert_record_stream to find out if there are
1727
any missing compression parents. If there are, the records that
1728
depend on them are not able to be inserted safely. For atomic
1729
KnitVersionedFiles built on packs, the transaction should be aborted or
1730
suspended - commit will fail at this point. Nonatomic knits will error
1731
earlier because they have no staging area to put pending entries into.
1733
return self._index.get_missing_compression_parents()
1735
def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1736
"""Iterate over the lines in the versioned files from keys.
1738
This may return lines from other keys. Each item the returned
1739
iterator yields is a tuple of a line and a text version that that line
1740
is present in (not introduced in).
1742
Ordering of results is in whatever order is most suitable for the
1743
underlying storage format.
1745
If a progress bar is supplied, it may be used to indicate progress.
1746
The caller is responsible for cleaning up progress bars (because this
1750
* Lines are normalised by the underlying store: they will all have \\n
1752
* Lines are returned in arbitrary order.
1753
* If a requested key did not change any lines (or didn't have any
1754
lines), it may not be mentioned at all in the result.
1756
:param pb: Progress bar supplied by caller.
1757
:return: An iterator over (line, key).
1760
pb = ui.ui_factory.nested_progress_bar()
1766
# we don't care about inclusions, the caller cares.
1767
# but we need to setup a list of records to visit.
1768
# we need key, position, length
1770
build_details = self._index.get_build_details(keys)
1771
for key, details in build_details.iteritems():
1773
key_records.append((key, details[0]))
1774
records_iter = enumerate(self._read_records_iter(key_records))
1775
for (key_idx, (key, data, sha_value)) in records_iter:
1776
pb.update('Walking content', key_idx, total)
1777
compression_parent = build_details[key][1]
1778
if compression_parent is None:
1780
line_iterator = self._factory.get_fulltext_content(data)
1783
line_iterator = self._factory.get_linedelta_content(data)
1784
# Now that we are yielding the data for this key, remove it
1787
# XXX: It might be more efficient to yield (key,
1788
# line_iterator) in the future. However for now, this is a
1789
# simpler change to integrate into the rest of the
1790
# codebase. RBC 20071110
1791
for line in line_iterator:
1794
except errors.RetryWithNewPacks, e:
1795
self._access.reload_or_raise(e)
1796
# If there are still keys we've not yet found, we look in the fallback
1797
# vfs, and hope to find them there. Note that if the keys are found
1798
# but had no changes or no content, the fallback may not return
1800
if keys and not self._fallback_vfs:
1801
# XXX: strictly the second parameter is meant to be the file id
1802
# but it's not easily accessible here.
1803
raise RevisionNotPresent(keys, repr(self))
1804
for source in self._fallback_vfs:
1808
for line, key in source.iter_lines_added_or_present_in_keys(keys):
1809
source_keys.add(key)
1811
keys.difference_update(source_keys)
1812
pb.update('Walking content', total, total)
1814
def _make_line_delta(self, delta_seq, new_content):
1815
"""Generate a line delta from delta_seq and new_content."""
1817
for op in delta_seq.get_opcodes():
1818
if op[0] == 'equal':
1820
diff_hunks.append((op[1], op[2], op[4]-op[3], new_content._lines[op[3]:op[4]]))
1823
def _merge_annotations(self, content, parents, parent_texts={},
1824
delta=None, annotated=None,
1825
left_matching_blocks=None):
1826
"""Merge annotations for content and generate deltas.
1828
This is done by comparing the annotations based on changes to the text
1829
and generating a delta on the resulting full texts. If annotations are
1830
not being created then a simple delta is created.
1832
if left_matching_blocks is not None:
1833
delta_seq = diff._PrematchedMatcher(left_matching_blocks)
1837
for parent_key in parents:
1838
merge_content = self._get_content(parent_key, parent_texts)
1839
if (parent_key == parents[0] and delta_seq is not None):
1842
seq = patiencediff.PatienceSequenceMatcher(
1843
None, merge_content.text(), content.text())
1844
for i, j, n in seq.get_matching_blocks():
1847
# this copies (origin, text) pairs across to the new
1848
# content for any line that matches the last-checked
1850
content._lines[j:j+n] = merge_content._lines[i:i+n]
1851
# XXX: Robert says the following block is a workaround for a
1852
# now-fixed bug and it can probably be deleted. -- mbp 20080618
1853
if content._lines and content._lines[-1][1][-1] != '\n':
1854
# The copied annotation was from a line without a trailing EOL,
1855
# reinstate one for the content object, to ensure correct
1857
line = content._lines[-1][1] + '\n'
1858
content._lines[-1] = (content._lines[-1][0], line)
1860
if delta_seq is None:
1861
reference_content = self._get_content(parents[0], parent_texts)
1862
new_texts = content.text()
1863
old_texts = reference_content.text()
1864
delta_seq = patiencediff.PatienceSequenceMatcher(
1865
None, old_texts, new_texts)
1866
return self._make_line_delta(delta_seq, content)
1868
def _parse_record(self, version_id, data):
1869
"""Parse an original format knit record.
1871
These have the last element of the key only present in the stored data.
1873
rec, record_contents = self._parse_record_unchecked(data)
1874
self._check_header_version(rec, version_id)
1875
return record_contents, rec[3]
1877
def _parse_record_header(self, key, raw_data):
1878
"""Parse a record header for consistency.
1880
:return: the header and the decompressor stream.
1881
as (stream, header_record)
1883
df = gzip.GzipFile(mode='rb', fileobj=StringIO(raw_data))
1886
rec = self._check_header(key, df.readline())
1887
except Exception, e:
1888
raise KnitCorrupt(self,
1889
"While reading {%s} got %s(%s)"
1890
% (key, e.__class__.__name__, str(e)))
1893
def _parse_record_unchecked(self, data):
1895
# 4168 calls in 2880 217 internal
1896
# 4168 calls to _parse_record_header in 2121
1897
# 4168 calls to readlines in 330
1898
df = gzip.GzipFile(mode='rb', fileobj=StringIO(data))
1900
record_contents = df.readlines()
1901
except Exception, e:
1902
raise KnitCorrupt(self, "Corrupt compressed record %r, got %s(%s)" %
1903
(data, e.__class__.__name__, str(e)))
1904
header = record_contents.pop(0)
1905
rec = self._split_header(header)
1906
last_line = record_contents.pop()
1907
if len(record_contents) != int(rec[2]):
1908
raise KnitCorrupt(self,
1909
'incorrect number of lines %s != %s'
1910
' for version {%s} %s'
1911
% (len(record_contents), int(rec[2]),
1912
rec[1], record_contents))
1913
if last_line != 'end %s\n' % rec[1]:
1914
raise KnitCorrupt(self,
1915
'unexpected version end line %r, wanted %r'
1916
% (last_line, rec[1]))
1918
return rec, record_contents
1920
def _read_records_iter(self, records):
1921
"""Read text records from data file and yield result.
1923
The result will be returned in whatever is the fastest to read.
1924
Not by the order requested. Also, multiple requests for the same
1925
record will only yield 1 response.
1926
:param records: A list of (key, access_memo) entries
1927
:return: Yields (key, contents, digest) in the order
1928
read, not the order requested
1933
# XXX: This smells wrong, IO may not be getting ordered right.
1934
needed_records = sorted(set(records), key=operator.itemgetter(1))
1935
if not needed_records:
1938
# The transport optimizes the fetching as well
1939
# (ie, reads continuous ranges.)
1940
raw_data = self._access.get_raw_records(
1941
[index_memo for key, index_memo in needed_records])
1943
for (key, index_memo), data in \
1944
izip(iter(needed_records), raw_data):
1945
content, digest = self._parse_record(key[-1], data)
1946
yield key, content, digest
1948
def _read_records_iter_raw(self, records):
1949
"""Read text records from data file and yield raw data.
1951
This unpacks enough of the text record to validate the id is
1952
as expected but thats all.
1954
Each item the iterator yields is (key, bytes,
1955
expected_sha1_of_full_text).
1957
for key, data in self._read_records_iter_unchecked(records):
1958
# validate the header (note that we can only use the suffix in
1959
# current knit records).
1960
df, rec = self._parse_record_header(key, data)
1962
yield key, data, rec[3]
1964
def _read_records_iter_unchecked(self, records):
1965
"""Read text records from data file and yield raw data.
1967
No validation is done.
1969
Yields tuples of (key, data).
1971
# setup an iterator of the external records:
1972
# uses readv so nice and fast we hope.
1974
# grab the disk data needed.
1975
needed_offsets = [index_memo for key, index_memo
1977
raw_records = self._access.get_raw_records(needed_offsets)
1979
for key, index_memo in records:
1980
data = raw_records.next()
1983
def _record_to_data(self, key, digest, lines, dense_lines=None):
1984
"""Convert key, digest, lines into a raw data block.
1986
:param key: The key of the record. Currently keys are always serialised
1987
using just the trailing component.
1988
:param dense_lines: The bytes of lines but in a denser form. For
1989
instance, if lines is a list of 1000 bytestrings each ending in \n,
1990
dense_lines may be a list with one line in it, containing all the
1991
1000's lines and their \n's. Using dense_lines if it is already
1992
known is a win because the string join to create bytes in this
1993
function spends less time resizing the final string.
1994
:return: (len, a StringIO instance with the raw data ready to read.)
1996
chunks = ["version %s %d %s\n" % (key[-1], len(lines), digest)]
1997
chunks.extend(dense_lines or lines)
1998
chunks.append("end %s\n" % key[-1])
1999
for chunk in chunks:
2000
if type(chunk) is not str:
2001
raise AssertionError(
2002
'data must be plain bytes was %s' % type(chunk))
2003
if lines and lines[-1][-1] != '\n':
2004
raise ValueError('corrupt lines value %r' % lines)
2005
compressed_bytes = tuned_gzip.chunks_to_gzip(chunks)
2006
return len(compressed_bytes), compressed_bytes
2008
def _split_header(self, line):
2011
raise KnitCorrupt(self,
2012
'unexpected number of elements in record header')
2016
"""See VersionedFiles.keys."""
2017
if 'evil' in debug.debug_flags:
2018
trace.mutter_callsite(2, "keys scales with size of history")
2019
sources = [self._index] + self._fallback_vfs
2021
for source in sources:
2022
result.update(source.keys())
2026
class _ContentMapGenerator(object):
2027
"""Generate texts or expose raw deltas for a set of texts."""
2029
def __init__(self, ordering='unordered'):
2030
self._ordering = ordering
2032
def _get_content(self, key):
2033
"""Get the content object for key."""
2034
# Note that _get_content is only called when the _ContentMapGenerator
2035
# has been constructed with just one key requested for reconstruction.
2036
if key in self.nonlocal_keys:
2037
record = self.get_record_stream().next()
2038
# Create a content object on the fly
2039
lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
2040
return PlainKnitContent(lines, record.key)
2042
# local keys we can ask for directly
2043
return self._get_one_work(key)
2045
def get_record_stream(self):
2046
"""Get a record stream for the keys requested during __init__."""
2047
for record in self._work():
2051
"""Produce maps of text and KnitContents as dicts.
2053
:return: (text_map, content_map) where text_map contains the texts for
2054
the requested versions and content_map contains the KnitContents.
2056
# NB: By definition we never need to read remote sources unless texts
2057
# are requested from them: we don't delta across stores - and we
2058
# explicitly do not want to to prevent data loss situations.
2059
if self.global_map is None:
2060
self.global_map = self.vf.get_parent_map(self.keys)
2061
nonlocal_keys = self.nonlocal_keys
2063
missing_keys = set(nonlocal_keys)
2064
# Read from remote versioned file instances and provide to our caller.
2065
for source in self.vf._fallback_vfs:
2066
if not missing_keys:
2068
# Loop over fallback repositories asking them for texts - ignore
2069
# any missing from a particular fallback.
2070
for record in source.get_record_stream(missing_keys,
2071
self._ordering, True):
2072
if record.storage_kind == 'absent':
2073
# Not in thie particular stream, may be in one of the
2074
# other fallback vfs objects.
2076
missing_keys.remove(record.key)
2079
if self._raw_record_map is None:
2080
raise AssertionError('_raw_record_map should have been filled')
2082
for key in self.keys:
2083
if key in self.nonlocal_keys:
2085
yield LazyKnitContentFactory(key, self.global_map[key], self, first)
2088
def _get_one_work(self, requested_key):
2089
# Now, if we have calculated everything already, just return the
2091
if requested_key in self._contents_map:
2092
return self._contents_map[requested_key]
2093
# To simplify things, parse everything at once - code that wants one text
2094
# probably wants them all.
2095
# FUTURE: This function could be improved for the 'extract many' case
2096
# by tracking each component and only doing the copy when the number of
2097
# children than need to apply delta's to it is > 1 or it is part of the
2099
multiple_versions = len(self.keys) != 1
2100
if self._record_map is None:
2101
self._record_map = self.vf._raw_map_to_record_map(
2102
self._raw_record_map)
2103
record_map = self._record_map
2104
# raw_record_map is key:
2105
# Have read and parsed records at this point.
2106
for key in self.keys:
2107
if key in self.nonlocal_keys:
2112
while cursor is not None:
2114
record, record_details, digest, next = record_map[cursor]
2116
raise RevisionNotPresent(cursor, self)
2117
components.append((cursor, record, record_details, digest))
2119
if cursor in self._contents_map:
2120
# no need to plan further back
2121
components.append((cursor, None, None, None))
2125
for (component_id, record, record_details,
2126
digest) in reversed(components):
2127
if component_id in self._contents_map:
2128
content = self._contents_map[component_id]
2130
content, delta = self._factory.parse_record(key[-1],
2131
record, record_details, content,
2132
copy_base_content=multiple_versions)
2133
if multiple_versions:
2134
self._contents_map[component_id] = content
2136
# digest here is the digest from the last applied component.
2137
text = content.text()
2138
actual_sha = sha_strings(text)
2139
if actual_sha != digest:
2140
raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
2141
if multiple_versions:
2142
return self._contents_map[requested_key]
2146
def _wire_bytes(self):
2147
"""Get the bytes to put on the wire for 'key'.
2149
The first collection of bytes asked for returns the serialised
2150
raw_record_map and the additional details (key, parent) for key.
2151
Subsequent calls return just the additional details (key, parent).
2152
The wire storage_kind given for the first key is 'knit-delta-closure',
2153
For subsequent keys it is 'knit-delta-closure-ref'.
2155
:param key: A key from the content generator.
2156
:return: Bytes to put on the wire.
2159
# kind marker for dispatch on the far side,
2160
lines.append('knit-delta-closure')
2162
if self.vf._factory.annotated:
2163
lines.append('annotated')
2166
# then the list of keys
2167
lines.append('\t'.join(['\x00'.join(key) for key in self.keys
2168
if key not in self.nonlocal_keys]))
2169
# then the _raw_record_map in serialised form:
2171
# for each item in the map:
2173
# 1 line with parents if the key is to be yielded (None: for None, '' for ())
2174
# one line with method
2175
# one line with noeol
2176
# one line with next ('' for None)
2177
# one line with byte count of the record bytes
2179
for key, (record_bytes, (method, noeol), next) in \
2180
self._raw_record_map.iteritems():
2181
key_bytes = '\x00'.join(key)
2182
parents = self.global_map.get(key, None)
2184
parent_bytes = 'None:'
2186
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
2187
method_bytes = method
2193
next_bytes = '\x00'.join(next)
2196
map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
2197
key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2198
len(record_bytes), record_bytes))
2199
map_bytes = ''.join(map_byte_list)
2200
lines.append(map_bytes)
2201
bytes = '\n'.join(lines)
2205
class _VFContentMapGenerator(_ContentMapGenerator):
2206
"""Content map generator reading from a VersionedFiles object."""
2208
def __init__(self, versioned_files, keys, nonlocal_keys=None,
2209
global_map=None, raw_record_map=None, ordering='unordered'):
2210
"""Create a _ContentMapGenerator.
2212
:param versioned_files: The versioned files that the texts are being
2214
:param keys: The keys to produce content maps for.
2215
:param nonlocal_keys: An iterable of keys(possibly intersecting keys)
2216
which are known to not be in this knit, but rather in one of the
2218
:param global_map: The result of get_parent_map(keys) (or a supermap).
2219
This is required if get_record_stream() is to be used.
2220
:param raw_record_map: A unparsed raw record map to use for answering
2223
_ContentMapGenerator.__init__(self, ordering=ordering)
2224
# The vf to source data from
2225
self.vf = versioned_files
2227
self.keys = list(keys)
2228
# Keys known to be in fallback vfs objects
2229
if nonlocal_keys is None:
2230
self.nonlocal_keys = set()
2232
self.nonlocal_keys = frozenset(nonlocal_keys)
2233
# Parents data for keys to be returned in get_record_stream
2234
self.global_map = global_map
2235
# The chunked lists for self.keys in text form
2237
# A cache of KnitContent objects used in extracting texts.
2238
self._contents_map = {}
2239
# All the knit records needed to assemble the requested keys as full
2241
self._record_map = None
2242
if raw_record_map is None:
2243
self._raw_record_map = self.vf._get_record_map_unparsed(keys,
2246
self._raw_record_map = raw_record_map
2247
# the factory for parsing records
2248
self._factory = self.vf._factory
2251
class _NetworkContentMapGenerator(_ContentMapGenerator):
2252
"""Content map generator sourced from a network stream."""
2254
def __init__(self, bytes, line_end):
2255
"""Construct a _NetworkContentMapGenerator from a bytes block."""
2257
self.global_map = {}
2258
self._raw_record_map = {}
2259
self._contents_map = {}
2260
self._record_map = None
2261
self.nonlocal_keys = []
2262
# Get access to record parsing facilities
2263
self.vf = KnitVersionedFiles(None, None)
2266
line_end = bytes.find('\n', start)
2267
line = bytes[start:line_end]
2268
start = line_end + 1
2269
if line == 'annotated':
2270
self._factory = KnitAnnotateFactory()
2272
self._factory = KnitPlainFactory()
2273
# list of keys to emit in get_record_stream
2274
line_end = bytes.find('\n', start)
2275
line = bytes[start:line_end]
2276
start = line_end + 1
2278
tuple(segment.split('\x00')) for segment in line.split('\t')
2280
# now a loop until the end. XXX: It would be nice if this was just a
2281
# bunch of the same records as get_record_stream(..., False) gives, but
2282
# there is a decent sized gap stopping that at the moment.
2286
line_end = bytes.find('\n', start)
2287
key = tuple(bytes[start:line_end].split('\x00'))
2288
start = line_end + 1
2289
# 1 line with parents (None: for None, '' for ())
2290
line_end = bytes.find('\n', start)
2291
line = bytes[start:line_end]
2296
[tuple(segment.split('\x00')) for segment in line.split('\t')
2298
self.global_map[key] = parents
2299
start = line_end + 1
2300
# one line with method
2301
line_end = bytes.find('\n', start)
2302
line = bytes[start:line_end]
2304
start = line_end + 1
2305
# one line with noeol
2306
line_end = bytes.find('\n', start)
2307
line = bytes[start:line_end]
2309
start = line_end + 1
2310
# one line with next ('' for None)
2311
line_end = bytes.find('\n', start)
2312
line = bytes[start:line_end]
2316
next = tuple(bytes[start:line_end].split('\x00'))
2317
start = line_end + 1
2318
# one line with byte count of the record bytes
2319
line_end = bytes.find('\n', start)
2320
line = bytes[start:line_end]
2322
start = line_end + 1
2324
record_bytes = bytes[start:start+count]
2325
start = start + count
2327
self._raw_record_map[key] = (record_bytes, (method, noeol), next)
2329
def get_record_stream(self):
2330
"""Get a record stream for for keys requested by the bytestream."""
2332
for key in self.keys:
2333
yield LazyKnitContentFactory(key, self.global_map[key], self, first)
2336
def _wire_bytes(self):
2340
class _KndxIndex(object):
2341
"""Manages knit index files
2343
The index is kept in memory and read on startup, to enable
2344
fast lookups of revision information. The cursor of the index
2345
file is always pointing to the end, making it easy to append
2348
_cache is a cache for fast mapping from version id to a Index
2351
_history is a cache for fast mapping from indexes to version ids.
2353
The index data format is dictionary compressed when it comes to
2354
parent references; a index entry may only have parents that with a
2355
lover index number. As a result, the index is topological sorted.
2357
Duplicate entries may be written to the index for a single version id
2358
if this is done then the latter one completely replaces the former:
2359
this allows updates to correct version and parent information.
2360
Note that the two entries may share the delta, and that successive
2361
annotations and references MUST point to the first entry.
2363
The index file on disc contains a header, followed by one line per knit
2364
record. The same revision can be present in an index file more than once.
2365
The first occurrence gets assigned a sequence number starting from 0.
2367
The format of a single line is
2368
REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
2369
REVISION_ID is a utf8-encoded revision id
2370
FLAGS is a comma separated list of flags about the record. Values include
2371
no-eol, line-delta, fulltext.
2372
BYTE_OFFSET is the ascii representation of the byte offset in the data file
2373
that the compressed data starts at.
2374
LENGTH is the ascii representation of the length of the data file.
2375
PARENT_ID a utf-8 revision id prefixed by a '.' that is a parent of
2377
PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
2378
revision id already in the knit that is a parent of REVISION_ID.
2379
The ' :' marker is the end of record marker.
2382
when a write is interrupted to the index file, it will result in a line
2383
that does not end in ' :'. If the ' :' is not present at the end of a line,
2384
or at the end of the file, then the record that is missing it will be
2385
ignored by the parser.
2387
When writing new records to the index file, the data is preceded by '\n'
2388
to ensure that records always start on new lines even if the last write was
2389
interrupted. As a result its normal for the last line in the index to be
2390
missing a trailing newline. One can be added with no harmful effects.
2392
:ivar _kndx_cache: dict from prefix to the old state of KnitIndex objects,
2393
where prefix is e.g. the (fileid,) for .texts instances or () for
2394
constant-mapped things like .revisions, and the old state is
2395
tuple(history_vector, cache_dict). This is used to prevent having an
2396
ABI change with the C extension that reads .kndx files.
2399
HEADER = "# bzr knit index 8\n"
2401
def __init__(self, transport, mapper, get_scope, allow_writes, is_locked):
2402
"""Create a _KndxIndex on transport using mapper."""
2403
self._transport = transport
2404
self._mapper = mapper
2405
self._get_scope = get_scope
2406
self._allow_writes = allow_writes
2407
self._is_locked = is_locked
2409
self.has_graph = True
2411
def add_records(self, records, random_id=False, missing_compression_parents=False):
2412
"""Add multiple records to the index.
2414
:param records: a list of tuples:
2415
(key, options, access_memo, parents).
2416
:param random_id: If True the ids being added were randomly generated
2417
and no check for existence will be performed.
2418
:param missing_compression_parents: If True the records being added are
2419
only compressed against texts already in the index (or inside
2420
records). If False the records all refer to unavailable texts (or
2421
texts inside records) as compression parents.
2423
if missing_compression_parents:
2424
# It might be nice to get the edge of the records. But keys isn't
2426
keys = sorted(record[0] for record in records)
2427
raise errors.RevisionNotPresent(keys, self)
2429
for record in records:
2432
path = self._mapper.map(key) + '.kndx'
2433
path_keys = paths.setdefault(path, (prefix, []))
2434
path_keys[1].append(record)
2435
for path in sorted(paths):
2436
prefix, path_keys = paths[path]
2437
self._load_prefixes([prefix])
2439
orig_history = self._kndx_cache[prefix][1][:]
2440
orig_cache = self._kndx_cache[prefix][0].copy()
2443
for key, options, (_, pos, size), parents in path_keys:
2445
# kndx indices cannot be parentless.
2447
line = "\n%s %s %s %s %s :" % (
2448
key[-1], ','.join(options), pos, size,
2449
self._dictionary_compress(parents))
2450
if type(line) is not str:
2451
raise AssertionError(
2452
'data must be utf8 was %s' % type(line))
2454
self._cache_key(key, options, pos, size, parents)
2455
if len(orig_history):
2456
self._transport.append_bytes(path, ''.join(lines))
2458
self._init_index(path, lines)
2460
# If any problems happen, restore the original values and re-raise
2461
self._kndx_cache[prefix] = (orig_cache, orig_history)
2464
def scan_unvalidated_index(self, graph_index):
2465
"""See _KnitGraphIndex.scan_unvalidated_index."""
2466
# Because kndx files do not support atomic insertion via separate index
2467
# files, they do not support this method.
2468
raise NotImplementedError(self.scan_unvalidated_index)
2470
def get_missing_compression_parents(self):
2471
"""See _KnitGraphIndex.get_missing_compression_parents."""
2472
# Because kndx files do not support atomic insertion via separate index
2473
# files, they do not support this method.
2474
raise NotImplementedError(self.get_missing_compression_parents)
2476
def _cache_key(self, key, options, pos, size, parent_keys):
2477
"""Cache a version record in the history array and index cache.
2479
This is inlined into _load_data for performance. KEEP IN SYNC.
2480
(It saves 60ms, 25% of the __init__ overhead on local 4000 record
2484
version_id = key[-1]
2485
# last-element only for compatibilty with the C load_data.
2486
parents = tuple(parent[-1] for parent in parent_keys)
2487
for parent in parent_keys:
2488
if parent[:-1] != prefix:
2489
raise ValueError("mismatched prefixes for %r, %r" % (
2491
cache, history = self._kndx_cache[prefix]
2492
# only want the _history index to reference the 1st index entry
2494
if version_id not in cache:
2495
index = len(history)
2496
history.append(version_id)
2498
index = cache[version_id][5]
2499
cache[version_id] = (version_id,
2506
def check_header(self, fp):
2507
line = fp.readline()
2509
# An empty file can actually be treated as though the file doesn't
2511
raise errors.NoSuchFile(self)
2512
if line != self.HEADER:
2513
raise KnitHeaderError(badline=line, filename=self)
2515
def _check_read(self):
2516
if not self._is_locked():
2517
raise errors.ObjectNotLocked(self)
2518
if self._get_scope() != self._scope:
2521
def _check_write_ok(self):
2522
"""Assert if not writes are permitted."""
2523
if not self._is_locked():
2524
raise errors.ObjectNotLocked(self)
2525
if self._get_scope() != self._scope:
2527
if self._mode != 'w':
2528
raise errors.ReadOnlyObjectDirtiedError(self)
2530
def get_build_details(self, keys):
2531
"""Get the method, index_memo and compression parent for keys.
2533
Ghosts are omitted from the result.
2535
:param keys: An iterable of keys.
2536
:return: A dict of key:(index_memo, compression_parent, parents,
2539
opaque structure to pass to read_records to extract the raw
2542
Content that this record is built upon, may be None
2544
Logical parents of this node
2546
extra information about the content which needs to be passed to
2547
Factory.parse_record
2549
parent_map = self.get_parent_map(keys)
2552
if key not in parent_map:
2554
method = self.get_method(key)
2555
parents = parent_map[key]
2556
if method == 'fulltext':
2557
compression_parent = None
2559
compression_parent = parents[0]
2560
noeol = 'no-eol' in self.get_options(key)
2561
index_memo = self.get_position(key)
2562
result[key] = (index_memo, compression_parent,
2563
parents, (method, noeol))
2566
def get_method(self, key):
2567
"""Return compression method of specified key."""
2568
options = self.get_options(key)
2569
if 'fulltext' in options:
2571
elif 'line-delta' in options:
2574
raise errors.KnitIndexUnknownMethod(self, options)
2576
def get_options(self, key):
2577
"""Return a list representing options.
2581
prefix, suffix = self._split_key(key)
2582
self._load_prefixes([prefix])
2584
return self._kndx_cache[prefix][0][suffix][1]
2586
raise RevisionNotPresent(key, self)
2588
def find_ancestry(self, keys):
2589
"""See CombinedGraphIndex.find_ancestry()"""
2590
prefixes = set(key[:-1] for key in keys)
2591
self._load_prefixes(prefixes)
2594
missing_keys = set()
2595
pending_keys = list(keys)
2596
# This assumes that keys will not reference parents in a different
2597
# prefix, which is accurate so far.
2599
key = pending_keys.pop()
2600
if key in parent_map:
2604
suffix_parents = self._kndx_cache[prefix][0][key[-1]][4]
2606
missing_keys.add(key)
2608
parent_keys = tuple([prefix + (suffix,)
2609
for suffix in suffix_parents])
2610
parent_map[key] = parent_keys
2611
pending_keys.extend([p for p in parent_keys
2612
if p not in parent_map])
2613
return parent_map, missing_keys
2615
def get_parent_map(self, keys):
2616
"""Get a map of the parents of keys.
2618
:param keys: The keys to look up parents for.
2619
:return: A mapping from keys to parents. Absent keys are absent from
2622
# Parse what we need to up front, this potentially trades off I/O
2623
# locality (.kndx and .knit in the same block group for the same file
2624
# id) for less checking in inner loops.
2625
prefixes = set(key[:-1] for key in keys)
2626
self._load_prefixes(prefixes)
2631
suffix_parents = self._kndx_cache[prefix][0][key[-1]][4]
2635
result[key] = tuple(prefix + (suffix,) for
2636
suffix in suffix_parents)
2639
def get_position(self, key):
2640
"""Return details needed to access the version.
2642
:return: a tuple (key, data position, size) to hand to the access
2643
logic to get the record.
2645
prefix, suffix = self._split_key(key)
2646
self._load_prefixes([prefix])
2647
entry = self._kndx_cache[prefix][0][suffix]
2648
return key, entry[2], entry[3]
2650
has_key = _mod_index._has_key_from_parent_map
2652
def _init_index(self, path, extra_lines=[]):
2653
"""Initialize an index."""
2655
sio.write(self.HEADER)
2656
sio.writelines(extra_lines)
2658
self._transport.put_file_non_atomic(path, sio,
2659
create_parent_dir=True)
2660
# self._create_parent_dir)
2661
# mode=self._file_mode,
2662
# dir_mode=self._dir_mode)
2665
"""Get all the keys in the collection.
2667
The keys are not ordered.
2670
# Identify all key prefixes.
2671
# XXX: A bit hacky, needs polish.
2672
if type(self._mapper) is ConstantMapper:
2676
for quoted_relpath in self._transport.iter_files_recursive():
2677
path, ext = os.path.splitext(quoted_relpath)
2679
prefixes = [self._mapper.unmap(path) for path in relpaths]
2680
self._load_prefixes(prefixes)
2681
for prefix in prefixes:
2682
for suffix in self._kndx_cache[prefix][1]:
2683
result.add(prefix + (suffix,))
2686
def _load_prefixes(self, prefixes):
2687
"""Load the indices for prefixes."""
2689
for prefix in prefixes:
2690
if prefix not in self._kndx_cache:
2691
# the load_data interface writes to these variables.
2694
self._filename = prefix
2696
path = self._mapper.map(prefix) + '.kndx'
2697
fp = self._transport.get(path)
2699
# _load_data may raise NoSuchFile if the target knit is
2701
_load_data(self, fp)
2704
self._kndx_cache[prefix] = (self._cache, self._history)
2709
self._kndx_cache[prefix] = ({}, [])
2710
if type(self._mapper) is ConstantMapper:
2711
# preserve behaviour for revisions.kndx etc.
2712
self._init_index(path)
2717
missing_keys = _mod_index._missing_keys_from_parent_map
2719
def _partition_keys(self, keys):
2720
"""Turn keys into a dict of prefix:suffix_list."""
2723
prefix_keys = result.setdefault(key[:-1], [])
2724
prefix_keys.append(key[-1])
2727
def _dictionary_compress(self, keys):
2728
"""Dictionary compress keys.
2730
:param keys: The keys to generate references to.
2731
:return: A string representation of keys. keys which are present are
2732
dictionary compressed, and others are emitted as fulltext with a
2738
prefix = keys[0][:-1]
2739
cache = self._kndx_cache[prefix][0]
2741
if key[:-1] != prefix:
2742
# kndx indices cannot refer across partitioned storage.
2743
raise ValueError("mismatched prefixes for %r" % keys)
2744
if key[-1] in cache:
2745
# -- inlined lookup() --
2746
result_list.append(str(cache[key[-1]][5]))
2747
# -- end lookup () --
2749
result_list.append('.' + key[-1])
2750
return ' '.join(result_list)
2752
def _reset_cache(self):
2753
# Possibly this should be a LRU cache. A dictionary from key_prefix to
2754
# (cache_dict, history_vector) for parsed kndx files.
2755
self._kndx_cache = {}
2756
self._scope = self._get_scope()
2757
allow_writes = self._allow_writes()
2763
def _sort_keys_by_io(self, keys, positions):
2764
"""Figure out an optimal order to read the records for the given keys.
2766
Sort keys, grouped by index and sorted by position.
2768
:param keys: A list of keys whose records we want to read. This will be
2770
:param positions: A dict, such as the one returned by
2771
_get_components_positions()
2774
def get_sort_key(key):
2775
index_memo = positions[key][1]
2776
# Group by prefix and position. index_memo[0] is the key, so it is
2777
# (file_id, revision_id) and we don't want to sort on revision_id,
2778
# index_memo[1] is the position, and index_memo[2] is the size,
2779
# which doesn't matter for the sort
2780
return index_memo[0][:-1], index_memo[1]
2781
return keys.sort(key=get_sort_key)
2783
_get_total_build_size = _get_total_build_size
2785
def _split_key(self, key):
2786
"""Split key into a prefix and suffix."""
2787
return key[:-1], key[-1]
2790
class _KeyRefs(object):
2792
def __init__(self, track_new_keys=False):
2793
# dict mapping 'key' to 'set of keys referring to that key'
2796
# set remembering all new keys
2797
self.new_keys = set()
2799
self.new_keys = None
2805
self.new_keys.clear()
2807
def add_references(self, key, refs):
2808
# Record the new references
2809
for referenced in refs:
2811
needed_by = self.refs[referenced]
2813
needed_by = self.refs[referenced] = set()
2815
# Discard references satisfied by the new key
2818
def get_new_keys(self):
2819
return self.new_keys
2821
def get_unsatisfied_refs(self):
2822
return self.refs.iterkeys()
2824
def _satisfy_refs_for_key(self, key):
2828
# No keys depended on this key. That's ok.
2831
def add_key(self, key):
2832
# satisfy refs for key, and remember that we've seen this key.
2833
self._satisfy_refs_for_key(key)
2834
if self.new_keys is not None:
2835
self.new_keys.add(key)
2837
def satisfy_refs_for_keys(self, keys):
2839
self._satisfy_refs_for_key(key)
2841
def get_referrers(self):
2843
for referrers in self.refs.itervalues():
2844
result.update(referrers)
2848
class _KnitGraphIndex(object):
2849
"""A KnitVersionedFiles index layered on GraphIndex."""
2851
def __init__(self, graph_index, is_locked, deltas=False, parents=True,
2852
add_callback=None, track_external_parent_refs=False):
2853
"""Construct a KnitGraphIndex on a graph_index.
2855
:param graph_index: An implementation of bzrlib.index.GraphIndex.
2856
:param is_locked: A callback to check whether the object should answer
2858
:param deltas: Allow delta-compressed records.
2859
:param parents: If True, record knits parents, if not do not record
2861
:param add_callback: If not None, allow additions to the index and call
2862
this callback with a list of added GraphIndex nodes:
2863
[(node, value, node_refs), ...]
2864
:param is_locked: A callback, returns True if the index is locked and
2866
:param track_external_parent_refs: If True, record all external parent
2867
references parents from added records. These can be retrieved
2868
later by calling get_missing_parents().
2870
self._add_callback = add_callback
2871
self._graph_index = graph_index
2872
self._deltas = deltas
2873
self._parents = parents
2874
if deltas and not parents:
2875
# XXX: TODO: Delta tree and parent graph should be conceptually
2877
raise KnitCorrupt(self, "Cannot do delta compression without "
2879
self.has_graph = parents
2880
self._is_locked = is_locked
2881
self._missing_compression_parents = set()
2882
if track_external_parent_refs:
2883
self._key_dependencies = _KeyRefs()
2885
self._key_dependencies = None
2888
return "%s(%r)" % (self.__class__.__name__, self._graph_index)
2890
def add_records(self, records, random_id=False,
2891
missing_compression_parents=False):
2892
"""Add multiple records to the index.
2894
This function does not insert data into the Immutable GraphIndex
2895
backing the KnitGraphIndex, instead it prepares data for insertion by
2896
the caller and checks that it is safe to insert then calls
2897
self._add_callback with the prepared GraphIndex nodes.
2899
:param records: a list of tuples:
2900
(key, options, access_memo, parents).
2901
:param random_id: If True the ids being added were randomly generated
2902
and no check for existence will be performed.
2903
:param missing_compression_parents: If True the records being added are
2904
only compressed against texts already in the index (or inside
2905
records). If False the records all refer to unavailable texts (or
2906
texts inside records) as compression parents.
2908
if not self._add_callback:
2909
raise errors.ReadOnlyError(self)
2910
# we hope there are no repositories with inconsistent parentage
2914
compression_parents = set()
2915
key_dependencies = self._key_dependencies
2916
for (key, options, access_memo, parents) in records:
2918
parents = tuple(parents)
2919
if key_dependencies is not None:
2920
key_dependencies.add_references(key, parents)
2921
index, pos, size = access_memo
2922
if 'no-eol' in options:
2926
value += "%d %d" % (pos, size)
2927
if not self._deltas:
2928
if 'line-delta' in options:
2929
raise KnitCorrupt(self, "attempt to add line-delta in non-delta knit")
2932
if 'line-delta' in options:
2933
node_refs = (parents, (parents[0],))
2934
if missing_compression_parents:
2935
compression_parents.add(parents[0])
2937
node_refs = (parents, ())
2939
node_refs = (parents, )
2942
raise KnitCorrupt(self, "attempt to add node with parents "
2943
"in parentless index.")
2945
keys[key] = (value, node_refs)
2948
present_nodes = self._get_entries(keys)
2949
for (index, key, value, node_refs) in present_nodes:
2950
parents = node_refs[:1]
2951
# Sometimes these are passed as a list rather than a tuple
2952
passed = static_tuple.as_tuples(keys[key])
2953
passed_parents = passed[1][:1]
2954
if (value[0] != keys[key][0][0] or
2955
parents != passed_parents):
2956
node_refs = static_tuple.as_tuples(node_refs)
2957
raise KnitCorrupt(self, "inconsistent details in add_records"
2958
": %s %s" % ((value, node_refs), passed))
2962
for key, (value, node_refs) in keys.iteritems():
2963
result.append((key, value, node_refs))
2965
for key, (value, node_refs) in keys.iteritems():
2966
result.append((key, value))
2967
self._add_callback(result)
2968
if missing_compression_parents:
2969
# This may appear to be incorrect (it does not check for
2970
# compression parents that are in the existing graph index),
2971
# but such records won't have been buffered, so this is
2972
# actually correct: every entry when
2973
# missing_compression_parents==True either has a missing parent, or
2974
# a parent that is one of the keys in records.
2975
compression_parents.difference_update(keys)
2976
self._missing_compression_parents.update(compression_parents)
2977
# Adding records may have satisfied missing compression parents.
2978
self._missing_compression_parents.difference_update(keys)
2980
def scan_unvalidated_index(self, graph_index):
2981
"""Inform this _KnitGraphIndex that there is an unvalidated index.
2983
This allows this _KnitGraphIndex to keep track of any missing
2984
compression parents we may want to have filled in to make those
2987
:param graph_index: A GraphIndex
2990
new_missing = graph_index.external_references(ref_list_num=1)
2991
new_missing.difference_update(self.get_parent_map(new_missing))
2992
self._missing_compression_parents.update(new_missing)
2993
if self._key_dependencies is not None:
2994
# Add parent refs from graph_index (and discard parent refs that
2995
# the graph_index has).
2996
for node in graph_index.iter_all_entries():
2997
self._key_dependencies.add_references(node[1], node[3][0])
2999
def get_missing_compression_parents(self):
3000
"""Return the keys of missing compression parents.
3002
Missing compression parents occur when a record stream was missing
3003
basis texts, or a index was scanned that had missing basis texts.
3005
return frozenset(self._missing_compression_parents)
3007
def get_missing_parents(self):
3008
"""Return the keys of missing parents."""
3009
# If updating this, you should also update
3010
# groupcompress._GCGraphIndex.get_missing_parents
3011
# We may have false positives, so filter those out.
3012
self._key_dependencies.satisfy_refs_for_keys(
3013
self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
3014
return frozenset(self._key_dependencies.get_unsatisfied_refs())
3016
def _check_read(self):
3017
"""raise if reads are not permitted."""
3018
if not self._is_locked():
3019
raise errors.ObjectNotLocked(self)
3021
def _check_write_ok(self):
3022
"""Assert if writes are not permitted."""
3023
if not self._is_locked():
3024
raise errors.ObjectNotLocked(self)
3026
def _compression_parent(self, an_entry):
3027
# return the key that an_entry is compressed against, or None
3028
# Grab the second parent list (as deltas implies parents currently)
3029
compression_parents = an_entry[3][1]
3030
if not compression_parents:
3032
if len(compression_parents) != 1:
3033
raise AssertionError(
3034
"Too many compression parents: %r" % compression_parents)
3035
return compression_parents[0]
3037
def get_build_details(self, keys):
3038
"""Get the method, index_memo and compression parent for version_ids.
3040
Ghosts are omitted from the result.
3042
:param keys: An iterable of keys.
3043
:return: A dict of key:
3044
(index_memo, compression_parent, parents, record_details).
3046
opaque structure to pass to read_records to extract the raw
3049
Content that this record is built upon, may be None
3051
Logical parents of this node
3053
extra information about the content which needs to be passed to
3054
Factory.parse_record
3058
entries = self._get_entries(keys, False)
3059
for entry in entries:
3061
if not self._parents:
3064
parents = entry[3][0]
3065
if not self._deltas:
3066
compression_parent_key = None
3068
compression_parent_key = self._compression_parent(entry)
3069
noeol = (entry[2][0] == 'N')
3070
if compression_parent_key:
3071
method = 'line-delta'
3074
result[key] = (self._node_to_position(entry),
3075
compression_parent_key, parents,
3079
def _get_entries(self, keys, check_present=False):
3080
"""Get the entries for keys.
3082
:param keys: An iterable of index key tuples.
3087
for node in self._graph_index.iter_entries(keys):
3089
found_keys.add(node[1])
3091
# adapt parentless index to the rest of the code.
3092
for node in self._graph_index.iter_entries(keys):
3093
yield node[0], node[1], node[2], ()
3094
found_keys.add(node[1])
3096
missing_keys = keys.difference(found_keys)
3098
raise RevisionNotPresent(missing_keys.pop(), self)
3100
def get_method(self, key):
3101
"""Return compression method of specified key."""
3102
return self._get_method(self._get_node(key))
3104
def _get_method(self, node):
3105
if not self._deltas:
3107
if self._compression_parent(node):
3112
def _get_node(self, key):
3114
return list(self._get_entries([key]))[0]
3116
raise RevisionNotPresent(key, self)
3118
def get_options(self, key):
3119
"""Return a list representing options.
3123
node = self._get_node(key)
3124
options = [self._get_method(node)]
3125
if node[2][0] == 'N':
3126
options.append('no-eol')
3129
def find_ancestry(self, keys):
3130
"""See CombinedGraphIndex.find_ancestry()"""
3131
return self._graph_index.find_ancestry(keys, 0)
3133
def get_parent_map(self, keys):
3134
"""Get a map of the parents of keys.
3136
:param keys: The keys to look up parents for.
3137
:return: A mapping from keys to parents. Absent keys are absent from
3141
nodes = self._get_entries(keys)
3145
result[node[1]] = node[3][0]
3148
result[node[1]] = None
3151
def get_position(self, key):
3152
"""Return details needed to access the version.
3154
:return: a tuple (index, data position, size) to hand to the access
3155
logic to get the record.
3157
node = self._get_node(key)
3158
return self._node_to_position(node)
3160
has_key = _mod_index._has_key_from_parent_map
3163
"""Get all the keys in the collection.
3165
The keys are not ordered.
3168
return [node[1] for node in self._graph_index.iter_all_entries()]
3170
missing_keys = _mod_index._missing_keys_from_parent_map
3172
def _node_to_position(self, node):
3173
"""Convert an index value to position details."""
3174
bits = node[2][1:].split(' ')
3175
return node[0], int(bits[0]), int(bits[1])
3177
def _sort_keys_by_io(self, keys, positions):
3178
"""Figure out an optimal order to read the records for the given keys.
3180
Sort keys, grouped by index and sorted by position.
3182
:param keys: A list of keys whose records we want to read. This will be
3184
:param positions: A dict, such as the one returned by
3185
_get_components_positions()
3188
def get_index_memo(key):
3189
# index_memo is at offset [1]. It is made up of (GraphIndex,
3190
# position, size). GI is an object, which will be unique for each
3191
# pack file. This causes us to group by pack file, then sort by
3192
# position. Size doesn't matter, but it isn't worth breaking up the
3194
return positions[key][1]
3195
return keys.sort(key=get_index_memo)
3197
_get_total_build_size = _get_total_build_size
3200
class _KnitKeyAccess(object):
3201
"""Access to records in .knit files."""
3203
def __init__(self, transport, mapper):
3204
"""Create a _KnitKeyAccess with transport and mapper.
3206
:param transport: The transport the access object is rooted at.
3207
:param mapper: The mapper used to map keys to .knit files.
3209
self._transport = transport
3210
self._mapper = mapper
3212
def add_raw_records(self, key_sizes, raw_data):
3213
"""Add raw knit bytes to a storage area.
3215
The data is spooled to the container writer in one bytes-record per
3218
:param sizes: An iterable of tuples containing the key and size of each
3220
:param raw_data: A bytestring containing the data.
3221
:return: A list of memos to retrieve the record later. Each memo is an
3222
opaque index memo. For _KnitKeyAccess the memo is (key, pos,
3223
length), where the key is the record key.
3225
if type(raw_data) is not str:
3226
raise AssertionError(
3227
'data must be plain bytes was %s' % type(raw_data))
3230
# TODO: This can be tuned for writing to sftp and other servers where
3231
# append() is relatively expensive by grouping the writes to each key
3233
for key, size in key_sizes:
3234
path = self._mapper.map(key)
3236
base = self._transport.append_bytes(path + '.knit',
3237
raw_data[offset:offset+size])
3238
except errors.NoSuchFile:
3239
self._transport.mkdir(osutils.dirname(path))
3240
base = self._transport.append_bytes(path + '.knit',
3241
raw_data[offset:offset+size])
3245
result.append((key, base, size))
3249
"""Flush pending writes on this access object.
3251
For .knit files this is a no-op.
3255
def get_raw_records(self, memos_for_retrieval):
3256
"""Get the raw bytes for a records.
3258
:param memos_for_retrieval: An iterable containing the access memo for
3259
retrieving the bytes.
3260
:return: An iterator over the bytes of the records.
3262
# first pass, group into same-index request to minimise readv's issued.
3264
current_prefix = None
3265
for (key, offset, length) in memos_for_retrieval:
3266
if current_prefix == key[:-1]:
3267
current_list.append((offset, length))
3269
if current_prefix is not None:
3270
request_lists.append((current_prefix, current_list))
3271
current_prefix = key[:-1]
3272
current_list = [(offset, length)]
3273
# handle the last entry
3274
if current_prefix is not None:
3275
request_lists.append((current_prefix, current_list))
3276
for prefix, read_vector in request_lists:
3277
path = self._mapper.map(prefix) + '.knit'
3278
for pos, data in self._transport.readv(path, read_vector):
3282
class _DirectPackAccess(object):
3283
"""Access to data in one or more packs with less translation."""
3285
def __init__(self, index_to_packs, reload_func=None, flush_func=None):
3286
"""Create a _DirectPackAccess object.
3288
:param index_to_packs: A dict mapping index objects to the transport
3289
and file names for obtaining data.
3290
:param reload_func: A function to call if we determine that the pack
3291
files have moved and we need to reload our caches. See
3292
bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
3294
self._container_writer = None
3295
self._write_index = None
3296
self._indices = index_to_packs
3297
self._reload_func = reload_func
3298
self._flush_func = flush_func
3300
def add_raw_records(self, key_sizes, raw_data):
3301
"""Add raw knit bytes to a storage area.
3303
The data is spooled to the container writer in one bytes-record per
3306
:param sizes: An iterable of tuples containing the key and size of each
3308
:param raw_data: A bytestring containing the data.
3309
:return: A list of memos to retrieve the record later. Each memo is an
3310
opaque index memo. For _DirectPackAccess the memo is (index, pos,
3311
length), where the index field is the write_index object supplied
3312
to the PackAccess object.
3314
if type(raw_data) is not str:
3315
raise AssertionError(
3316
'data must be plain bytes was %s' % type(raw_data))
3319
for key, size in key_sizes:
3320
p_offset, p_length = self._container_writer.add_bytes_record(
3321
raw_data[offset:offset+size], [])
3323
result.append((self._write_index, p_offset, p_length))
3327
"""Flush pending writes on this access object.
3329
This will flush any buffered writes to a NewPack.
3331
if self._flush_func is not None:
3334
def get_raw_records(self, memos_for_retrieval):
3335
"""Get the raw bytes for a records.
3337
:param memos_for_retrieval: An iterable containing the (index, pos,
3338
length) memo for retrieving the bytes. The Pack access method
3339
looks up the pack to use for a given record in its index_to_pack
3341
:return: An iterator over the bytes of the records.
3343
# first pass, group into same-index requests
3345
current_index = None
3346
for (index, offset, length) in memos_for_retrieval:
3347
if current_index == index:
3348
current_list.append((offset, length))
3350
if current_index is not None:
3351
request_lists.append((current_index, current_list))
3352
current_index = index
3353
current_list = [(offset, length)]
3354
# handle the last entry
3355
if current_index is not None:
3356
request_lists.append((current_index, current_list))
3357
for index, offsets in request_lists:
3359
transport, path = self._indices[index]
3361
# A KeyError here indicates that someone has triggered an index
3362
# reload, and this index has gone missing, we need to start
3364
if self._reload_func is None:
3365
# If we don't have a _reload_func there is nothing that can
3368
raise errors.RetryWithNewPacks(index,
3369
reload_occurred=True,
3370
exc_info=sys.exc_info())
3372
reader = pack.make_readv_reader(transport, path, offsets)
3373
for names, read_func in reader.iter_records():
3374
yield read_func(None)
3375
except errors.NoSuchFile:
3376
# A NoSuchFile error indicates that a pack file has gone
3377
# missing on disk, we need to trigger a reload, and start over.
3378
if self._reload_func is None:
3380
raise errors.RetryWithNewPacks(transport.abspath(path),
3381
reload_occurred=False,
3382
exc_info=sys.exc_info())
3384
def set_writer(self, writer, index, transport_packname):
3385
"""Set a writer to use for adding data."""
3386
if index is not None:
3387
self._indices[index] = transport_packname
3388
self._container_writer = writer
3389
self._write_index = index
3391
def reload_or_raise(self, retry_exc):
3392
"""Try calling the reload function, or re-raise the original exception.
3394
This should be called after _DirectPackAccess raises a
3395
RetryWithNewPacks exception. This function will handle the common logic
3396
of determining when the error is fatal versus being temporary.
3397
It will also make sure that the original exception is raised, rather
3398
than the RetryWithNewPacks exception.
3400
If this function returns, then the calling function should retry
3401
whatever operation was being performed. Otherwise an exception will
3404
:param retry_exc: A RetryWithNewPacks exception.
3407
if self._reload_func is None:
3409
elif not self._reload_func():
3410
# The reload claimed that nothing changed
3411
if not retry_exc.reload_occurred:
3412
# If there wasn't an earlier reload, then we really were
3413
# expecting to find changes. We didn't find them, so this is a
3417
exc_class, exc_value, exc_traceback = retry_exc.exc_info
3418
raise exc_class, exc_value, exc_traceback
3421
def annotate_knit(knit, revision_id):
3422
"""Annotate a knit with no cached annotations.
3424
This implementation is for knits with no cached annotations.
3425
It will work for knits with cached annotations, but this is not
3428
annotator = _KnitAnnotator(knit)
3429
return iter(annotator.annotate_flat(revision_id))
3432
class _KnitAnnotator(annotate.Annotator):
3433
"""Build up the annotations for a text."""
3435
def __init__(self, vf):
3436
annotate.Annotator.__init__(self, vf)
3438
# TODO: handle Nodes which cannot be extracted
3439
# self._ghosts = set()
3441
# Map from (key, parent_key) => matching_blocks, should be 'use once'
3442
self._matching_blocks = {}
3444
# KnitContent objects
3445
self._content_objects = {}
3446
# The number of children that depend on this fulltext content object
3447
self._num_compression_children = {}
3448
# Delta records that need their compression parent before they can be
3450
self._pending_deltas = {}
3451
# Fulltext records that are waiting for their parents fulltexts before
3452
# they can be yielded for annotation
3453
self._pending_annotation = {}
3455
self._all_build_details = {}
3457
def _get_build_graph(self, key):
3458
"""Get the graphs for building texts and annotations.
3460
The data you need for creating a full text may be different than the
3461
data you need to annotate that text. (At a minimum, you need both
3462
parents to create an annotation, but only need 1 parent to generate the
3465
:return: A list of (key, index_memo) records, suitable for
3466
passing to read_records_iter to start reading in the raw data from
3469
pending = set([key])
3472
self._num_needed_children[key] = 1
3474
# get all pending nodes
3475
this_iteration = pending
3476
build_details = self._vf._index.get_build_details(this_iteration)
3477
self._all_build_details.update(build_details)
3478
# new_nodes = self._vf._index._get_entries(this_iteration)
3480
for key, details in build_details.iteritems():
3481
(index_memo, compression_parent, parent_keys,
3482
record_details) = details
3483
self._parent_map[key] = parent_keys
3484
self._heads_provider = None
3485
records.append((key, index_memo))
3486
# Do we actually need to check _annotated_lines?
3487
pending.update([p for p in parent_keys
3488
if p not in self._all_build_details])
3490
for parent_key in parent_keys:
3491
if parent_key in self._num_needed_children:
3492
self._num_needed_children[parent_key] += 1
3494
self._num_needed_children[parent_key] = 1
3495
if compression_parent:
3496
if compression_parent in self._num_compression_children:
3497
self._num_compression_children[compression_parent] += 1
3499
self._num_compression_children[compression_parent] = 1
3501
missing_versions = this_iteration.difference(build_details.keys())
3502
if missing_versions:
3503
for key in missing_versions:
3504
if key in self._parent_map and key in self._text_cache:
3505
# We already have this text ready, we just need to
3506
# yield it later so we get it annotated
3508
parent_keys = self._parent_map[key]
3509
for parent_key in parent_keys:
3510
if parent_key in self._num_needed_children:
3511
self._num_needed_children[parent_key] += 1
3513
self._num_needed_children[parent_key] = 1
3514
pending.update([p for p in parent_keys
3515
if p not in self._all_build_details])
3517
raise errors.RevisionNotPresent(key, self._vf)
3518
# Generally we will want to read the records in reverse order, because
3519
# we find the parent nodes after the children
3521
return records, ann_keys
3523
def _get_needed_texts(self, key, pb=None):
3524
# if True or len(self._vf._fallback_vfs) > 0:
3525
if len(self._vf._fallback_vfs) > 0:
3526
# If we have fallbacks, go to the generic path
3527
for v in annotate.Annotator._get_needed_texts(self, key, pb=pb):
3532
records, ann_keys = self._get_build_graph(key)
3533
for idx, (sub_key, text, num_lines) in enumerate(
3534
self._extract_texts(records)):
3536
pb.update('annotating', idx, len(records))
3537
yield sub_key, text, num_lines
3538
for sub_key in ann_keys:
3539
text = self._text_cache[sub_key]
3540
num_lines = len(text) # bad assumption
3541
yield sub_key, text, num_lines
3543
except errors.RetryWithNewPacks, e:
3544
self._vf._access.reload_or_raise(e)
3545
# The cached build_details are no longer valid
3546
self._all_build_details.clear()
3548
def _cache_delta_blocks(self, key, compression_parent, delta, lines):
3549
parent_lines = self._text_cache[compression_parent]
3550
blocks = list(KnitContent.get_line_delta_blocks(delta, parent_lines, lines))
3551
self._matching_blocks[(key, compression_parent)] = blocks
3553
def _expand_record(self, key, parent_keys, compression_parent, record,
3556
if compression_parent:
3557
if compression_parent not in self._content_objects:
3558
# Waiting for the parent
3559
self._pending_deltas.setdefault(compression_parent, []).append(
3560
(key, parent_keys, record, record_details))
3562
# We have the basis parent, so expand the delta
3563
num = self._num_compression_children[compression_parent]
3566
base_content = self._content_objects.pop(compression_parent)
3567
self._num_compression_children.pop(compression_parent)
3569
self._num_compression_children[compression_parent] = num
3570
base_content = self._content_objects[compression_parent]
3571
# It is tempting to want to copy_base_content=False for the last
3572
# child object. However, whenever noeol=False,
3573
# self._text_cache[parent_key] is content._lines. So mutating it
3574
# gives very bad results.
3575
# The alternative is to copy the lines into text cache, but then we
3576
# are copying anyway, so just do it here.
3577
content, delta = self._vf._factory.parse_record(
3578
key, record, record_details, base_content,
3579
copy_base_content=True)
3582
content, _ = self._vf._factory.parse_record(
3583
key, record, record_details, None)
3584
if self._num_compression_children.get(key, 0) > 0:
3585
self._content_objects[key] = content
3586
lines = content.text()
3587
self._text_cache[key] = lines
3588
if delta is not None:
3589
self._cache_delta_blocks(key, compression_parent, delta, lines)
3592
def _get_parent_annotations_and_matches(self, key, text, parent_key):
3593
"""Get the list of annotations for the parent, and the matching lines.
3595
:param text: The opaque value given by _get_needed_texts
3596
:param parent_key: The key for the parent text
3597
:return: (parent_annotations, matching_blocks)
3598
parent_annotations is a list as long as the number of lines in
3600
matching_blocks is a list of (parent_idx, text_idx, len) tuples
3601
indicating which lines match between the two texts
3603
block_key = (key, parent_key)
3604
if block_key in self._matching_blocks:
3605
blocks = self._matching_blocks.pop(block_key)
3606
parent_annotations = self._annotations_cache[parent_key]
3607
return parent_annotations, blocks
3608
return annotate.Annotator._get_parent_annotations_and_matches(self,
3609
key, text, parent_key)
3611
def _process_pending(self, key):
3612
"""The content for 'key' was just processed.
3614
Determine if there is any more pending work to be processed.
3617
if key in self._pending_deltas:
3618
compression_parent = key
3619
children = self._pending_deltas.pop(key)
3620
for child_key, parent_keys, record, record_details in children:
3621
lines = self._expand_record(child_key, parent_keys,
3623
record, record_details)
3624
if self._check_ready_for_annotations(child_key, parent_keys):
3625
to_return.append(child_key)
3626
# Also check any children that are waiting for this parent to be
3628
if key in self._pending_annotation:
3629
children = self._pending_annotation.pop(key)
3630
to_return.extend([c for c, p_keys in children
3631
if self._check_ready_for_annotations(c, p_keys)])
3634
def _check_ready_for_annotations(self, key, parent_keys):
3635
"""return true if this text is ready to be yielded.
3637
Otherwise, this will return False, and queue the text into
3638
self._pending_annotation
3640
for parent_key in parent_keys:
3641
if parent_key not in self._annotations_cache:
3642
# still waiting on at least one parent text, so queue it up
3643
# Note that if there are multiple parents, we need to wait
3645
self._pending_annotation.setdefault(parent_key,
3646
[]).append((key, parent_keys))
3650
def _extract_texts(self, records):
3651
"""Extract the various texts needed based on records"""
3652
# We iterate in the order read, rather than a strict order requested
3653
# However, process what we can, and put off to the side things that
3654
# still need parents, cleaning them up when those parents are
3657
# 1) As 'records' are read, see if we can expand these records into
3658
# Content objects (and thus lines)
3659
# 2) If a given line-delta is waiting on its compression parent, it
3660
# gets queued up into self._pending_deltas, otherwise we expand
3661
# it, and put it into self._text_cache and self._content_objects
3662
# 3) If we expanded the text, we will then check to see if all
3663
# parents have also been processed. If so, this text gets yielded,
3664
# else this record gets set aside into pending_annotation
3665
# 4) Further, if we expanded the text in (2), we will then check to
3666
# see if there are any children in self._pending_deltas waiting to
3667
# also be processed. If so, we go back to (2) for those
3668
# 5) Further again, if we yielded the text, we can then check if that
3669
# 'unlocks' any of the texts in pending_annotations, which should
3670
# then get yielded as well
3671
# Note that both steps 4 and 5 are 'recursive' in that unlocking one
3672
# compression child could unlock yet another, and yielding a fulltext
3673
# will also 'unlock' the children that are waiting on that annotation.
3674
# (Though also, unlocking 1 parent's fulltext, does not unlock a child
3675
# if other parents are also waiting.)
3676
# We want to yield content before expanding child content objects, so
3677
# that we know when we can re-use the content lines, and the annotation
3678
# code can know when it can stop caching fulltexts, as well.
3680
# Children that are missing their compression parent
3682
for (key, record, digest) in self._vf._read_records_iter(records):
3684
details = self._all_build_details[key]
3685
(_, compression_parent, parent_keys, record_details) = details
3686
lines = self._expand_record(key, parent_keys, compression_parent,
3687
record, record_details)
3689
# Pending delta should be queued up
3691
# At this point, we may be able to yield this content, if all
3692
# parents are also finished
3693
yield_this_text = self._check_ready_for_annotations(key,
3696
# All parents present
3697
yield key, lines, len(lines)
3698
to_process = self._process_pending(key)
3700
this_process = to_process
3702
for key in this_process:
3703
lines = self._text_cache[key]
3704
yield key, lines, len(lines)
3705
to_process.extend(self._process_pending(key))
3708
from bzrlib._knit_load_data_pyx import _load_data_c as _load_data
3709
except ImportError, e:
3710
osutils.failed_to_load_extension(e)
3711
from bzrlib._knit_load_data_py import _load_data_py as _load_data