~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: John Arbash Meinel
  • Date: 2006-08-16 18:38:57 UTC
  • mfrom: (1934 +trunk)
  • mto: This revision was merged to the branch mainline in revision 1937.
  • Revision ID: john@arbash-meinel.com-20060816183857-7307edffa7098bd2
[merge] bzr.dev 1934

Show diffs side-by-side

added added

removed removed

Lines of Context:
74
74
import warnings
75
75
 
76
76
import bzrlib
77
 
import bzrlib.errors as errors
 
77
from bzrlib import (
 
78
    cache_utf8,
 
79
    errors,
 
80
    )
78
81
from bzrlib.errors import FileExists, NoSuchFile, KnitError, \
79
82
        InvalidRevisionId, KnitCorrupt, KnitHeaderError, \
80
83
        RevisionNotPresent, RevisionAlreadyPresent
82
85
from bzrlib.trace import mutter
83
86
from bzrlib.osutils import contains_whitespace, contains_linebreaks, \
84
87
     sha_strings
85
 
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
86
88
from bzrlib.symbol_versioning import DEPRECATED_PARAMETER, deprecated_passed
87
89
from bzrlib.tsort import topo_sort
88
90
import bzrlib.weave
 
91
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
89
92
 
90
93
 
91
94
# TODO: Split out code specific to this format into an associated object.
162
165
        internal representation is of the format:
163
166
        (revid, plaintext)
164
167
        """
 
168
        decode_utf8 = cache_utf8.decode
165
169
        lines = []
166
170
        for line in content:
167
171
            origin, text = line.split(' ', 1)
168
 
            lines.append((origin.decode('utf-8'), text))
 
172
            lines.append((decode_utf8(origin), text))
169
173
        return KnitContent(lines)
170
174
 
171
175
    def parse_line_delta_iter(self, lines):
182
186
        internal representation is
183
187
        (start, end, count, [1..count tuples (revid, newline)])
184
188
        """
 
189
        decode_utf8 = cache_utf8.decode
185
190
        result = []
186
191
        lines = iter(lines)
187
192
        next = lines.next
193
198
            while remaining:
194
199
                origin, text = next().split(' ', 1)
195
200
                remaining -= 1
196
 
                contents.append((origin.decode('utf-8'), text))
 
201
                contents.append((decode_utf8(origin), text))
197
202
            result.append((start, end, count, contents))
198
203
        return result
199
204
 
202
207
 
203
208
        see parse_fulltext which this inverts.
204
209
        """
205
 
        return ['%s %s' % (o.encode('utf-8'), t) for o, t in content._lines]
 
210
        encode_utf8 = cache_utf8.encode
 
211
        return ['%s %s' % (encode_utf8(o), t) for o, t in content._lines]
206
212
 
207
213
    def lower_line_delta(self, delta):
208
214
        """convert a delta into a serializable form.
209
215
 
210
216
        See parse_line_delta which this inverts.
211
217
        """
 
218
        encode_utf8 = cache_utf8.encode
212
219
        out = []
213
220
        for start, end, c, lines in delta:
214
221
            out.append('%d,%d,%d\n' % (start, end, c))
215
 
            for origin, text in lines:
216
 
                out.append('%s %s' % (origin.encode('utf-8'), text))
 
222
            out.extend(encode_utf8(origin) + ' ' + text
 
223
                       for origin, text in lines)
217
224
        return out
218
225
 
219
226
 
374
381
        """
375
382
        # write all the data
376
383
        pos = self._data.add_raw_record(data)
 
384
        offset = 0
377
385
        index_entries = []
378
386
        for (version_id, options, parents, size) in records:
379
 
            index_entries.append((version_id, options, pos, size, parents))
380
 
            pos += size
 
387
            index_entries.append((version_id, options, pos+offset,
 
388
                                  size, parents))
 
389
            if self._data._do_cache:
 
390
                self._data._cache[version_id] = data[offset:offset+size]
 
391
            offset += size
381
392
        self._index.add_versions(index_entries)
382
393
 
 
394
    def enable_cache(self):
 
395
        """Start caching data for this knit"""
 
396
        self._data.enable_cache()
 
397
 
383
398
    def clear_cache(self):
384
399
        """Clear the data cache only."""
385
400
        self._data.clear_cache()
695
710
        # c = component_id, m = method, p = position, s = size, n = next
696
711
        records = [(c, p, s) for c, (m, p, s, n) in position_map.iteritems()]
697
712
        record_map = {}
698
 
        for component_id, content, digest in\
699
 
            self._data.read_records_iter(records): 
 
713
        for component_id, content, digest in \
 
714
                self._data.read_records_iter(records):
700
715
            method, position, size, next = position_map[component_id]
701
716
            record_map[component_id] = method, content, digest, next
702
717
                          
1198
1213
        return self._cache[version_id][5]
1199
1214
 
1200
1215
    def _version_list_to_index(self, versions):
 
1216
        encode_utf8 = cache_utf8.encode
1201
1217
        result_list = []
1202
1218
        for version in versions:
1203
1219
            if version in self._cache:
1205
1221
                result_list.append(str(self._cache[version][5]))
1206
1222
                # -- end lookup () --
1207
1223
            else:
1208
 
                result_list.append('.' + version.encode('utf-8'))
 
1224
                result_list.append('.' + encode_utf8(version))
1209
1225
        return ' '.join(result_list)
1210
1226
 
1211
1227
    def add_version(self, version_id, options, pos, size, parents):
1219
1235
                         (version_id, options, pos, size, parents).
1220
1236
        """
1221
1237
        lines = []
 
1238
        encode_utf8 = cache_utf8.encode
1222
1239
        for version_id, options, pos, size, parents in versions:
1223
 
            line = "\n%s %s %s %s %s :" % (version_id.encode('utf-8'),
 
1240
            line = "\n%s %s %s %s %s :" % (encode_utf8(version_id),
1224
1241
                                           ','.join(options),
1225
1242
                                           pos,
1226
1243
                                           size,
1284
1301
    def __init__(self, transport, filename, mode, create=False, file_mode=None):
1285
1302
        _KnitComponentFile.__init__(self, transport, filename, mode)
1286
1303
        self._checked = False
 
1304
        # TODO: jam 20060713 conceptually, this could spill to disk
 
1305
        #       if the cached size gets larger than a certain amount
 
1306
        #       but it complicates the model a bit, so for now just use
 
1307
        #       a simple dictionary
 
1308
        self._cache = {}
 
1309
        self._do_cache = False
1287
1310
        if create:
1288
1311
            self._transport.put(self._filename, StringIO(''), mode=file_mode)
1289
1312
 
 
1313
    def enable_cache(self):
 
1314
        """Enable caching of reads."""
 
1315
        self._do_cache = True
 
1316
 
1290
1317
    def clear_cache(self):
1291
1318
        """Clear the record cache."""
1292
 
        pass
 
1319
        self._do_cache = False
 
1320
        self._cache = {}
1293
1321
 
1294
1322
    def _open_file(self):
1295
1323
        try:
1305
1333
        """
1306
1334
        sio = StringIO()
1307
1335
        data_file = GzipFile(None, mode='wb', fileobj=sio)
 
1336
        version_id_utf8 = cache_utf8.encode(version_id)
1308
1337
        data_file.writelines(chain(
1309
 
            ["version %s %d %s\n" % (version_id.encode('utf-8'), 
 
1338
            ["version %s %d %s\n" % (version_id_utf8,
1310
1339
                                     len(lines),
1311
1340
                                     digest)],
1312
1341
            lines,
1313
 
            ["end %s\n" % version_id.encode('utf-8')]))
 
1342
            ["end %s\n" % version_id_utf8]))
1314
1343
        data_file.close()
1315
1344
        length= sio.tell()
1316
1345
 
1331
1360
        size, sio = self._record_to_data(version_id, digest, lines)
1332
1361
        # write to disk
1333
1362
        start_pos = self._transport.append(self._filename, sio)
 
1363
        if self._do_cache:
 
1364
            self._cache[version_id] = sio.getvalue()
1334
1365
        return start_pos, size
1335
1366
 
1336
1367
    def _parse_record_header(self, version_id, raw_data):
1343
1374
        rec = df.readline().split()
1344
1375
        if len(rec) != 4:
1345
1376
            raise KnitCorrupt(self._filename, 'unexpected number of elements in record header')
1346
 
        if rec[1].decode('utf-8')!= version_id:
 
1377
        if cache_utf8.decode(rec[1]) != version_id:
1347
1378
            raise KnitCorrupt(self._filename, 
1348
1379
                              'unexpected version, wanted %r, got %r' % (
1349
1380
                                version_id, rec[1]))
1358
1389
        record_contents = df.readlines()
1359
1390
        l = record_contents.pop()
1360
1391
        assert len(record_contents) == int(rec[2])
1361
 
        if l.decode('utf-8') != 'end %s\n' % version_id:
 
1392
        if l != 'end %s\n' % cache_utf8.encode(version_id):
1362
1393
            raise KnitCorrupt(self._filename, 'unexpected version end line %r, wanted %r' 
1363
1394
                        % (l, version_id))
1364
1395
        df.close()
1369
1400
 
1370
1401
        This unpacks enough of the text record to validate the id is
1371
1402
        as expected but thats all.
1372
 
 
1373
 
        It will actively recompress currently cached records on the
1374
 
        basis that that is cheaper than I/O activity.
1375
1403
        """
1376
1404
        # setup an iterator of the external records:
1377
1405
        # uses readv so nice and fast we hope.
1378
1406
        if len(records):
1379
1407
            # grab the disk data needed.
1380
 
            raw_records = self._transport.readv(self._filename,
1381
 
                [(pos, size) for version_id, pos, size in records])
 
1408
            if self._cache:
 
1409
                # Don't check _cache if it is empty
 
1410
                needed_offsets = [(pos, size) for version_id, pos, size
 
1411
                                              in records
 
1412
                                              if version_id not in self._cache]
 
1413
            else:
 
1414
                needed_offsets = [(pos, size) for version_id, pos, size
 
1415
                                               in records]
 
1416
 
 
1417
            raw_records = self._transport.readv(self._filename, needed_offsets)
 
1418
                
1382
1419
 
1383
1420
        for version_id, pos, size in records:
1384
 
            pos, data = raw_records.next()
1385
 
            # validate the header
1386
 
            df, rec = self._parse_record_header(version_id, data)
1387
 
            df.close()
 
1421
            if version_id in self._cache:
 
1422
                # This data has already been validated
 
1423
                data = self._cache[version_id]
 
1424
            else:
 
1425
                pos, data = raw_records.next()
 
1426
                if self._do_cache:
 
1427
                    self._cache[version_id] = data
 
1428
 
 
1429
                # validate the header
 
1430
                df, rec = self._parse_record_header(version_id, data)
 
1431
                df.close()
1388
1432
            yield version_id, data
1389
1433
 
1390
1434
    def read_records_iter(self, records):
1391
1435
        """Read text records from data file and yield result.
1392
1436
 
1393
 
        Each passed record is a tuple of (version_id, pos, len) and
1394
 
        will be read in the given order.  Yields (version_id,
1395
 
        contents, digest).
 
1437
        The result will be returned in whatever is the fastest to read.
 
1438
        Not by the order requested. Also, multiple requests for the same
 
1439
        record will only yield 1 response.
 
1440
        :param records: A list of (version_id, pos, len) entries
 
1441
        :return: Yields (version_id, contents, digest) in the order
 
1442
                 read, not the order requested
1396
1443
        """
1397
 
        if len(records) == 0:
1398
 
            return
1399
 
        # profiling notes:
1400
 
        # 60890  calls for 4168 extractions in 5045, 683 internal.
1401
 
        # 4168   calls to readv              in 1411
1402
 
        # 4168   calls to parse_record       in 2880
1403
 
 
1404
 
        # Get unique records, sorted by position
1405
 
        needed_records = sorted(set(records), key=operator.itemgetter(1))
1406
 
 
1407
 
        # We take it that the transport optimizes the fetching as good
1408
 
        # as possible (ie, reads continuous ranges.)
1409
 
        response = self._transport.readv(self._filename,
 
1444
        if not records:
 
1445
            return
 
1446
 
 
1447
        if self._cache:
 
1448
            # Skip records we have alread seen
 
1449
            yielded_records = set()
 
1450
            needed_records = set()
 
1451
            for record in records:
 
1452
                if record[0] in self._cache:
 
1453
                    if record[0] in yielded_records:
 
1454
                        continue
 
1455
                    yielded_records.add(record[0])
 
1456
                    data = self._cache[record[0]]
 
1457
                    content, digest = self._parse_record(record[0], data)
 
1458
                    yield (record[0], content, digest)
 
1459
                else:
 
1460
                    needed_records.add(record)
 
1461
            needed_records = sorted(needed_records, key=operator.itemgetter(1))
 
1462
        else:
 
1463
            needed_records = sorted(set(records), key=operator.itemgetter(1))
 
1464
 
 
1465
        if not needed_records:
 
1466
            return
 
1467
 
 
1468
        # The transport optimizes the fetching as well 
 
1469
        # (ie, reads continuous ranges.)
 
1470
        readv_response = self._transport.readv(self._filename,
1410
1471
            [(pos, size) for version_id, pos, size in needed_records])
1411
1472
 
1412
 
        record_map = {}
1413
 
        for (record_id, pos, size), (pos, data) in \
1414
 
            izip(iter(needed_records), response):
1415
 
            content, digest = self._parse_record(record_id, data)
1416
 
            record_map[record_id] = (digest, content)
1417
 
 
1418
 
        for version_id, pos, size in records:
1419
 
            digest, content = record_map[version_id]
 
1473
        for (version_id, pos, size), (pos, data) in \
 
1474
                izip(iter(needed_records), readv_response):
 
1475
            content, digest = self._parse_record(version_id, data)
 
1476
            if self._do_cache:
 
1477
                self._cache[version_id] = data
1420
1478
            yield version_id, content, digest
1421
1479
 
1422
1480
    def read_records(self, records):
1423
1481
        """Read records into a dictionary."""
1424
1482
        components = {}
1425
 
        for record_id, content, digest in self.read_records_iter(records):
 
1483
        for record_id, content, digest in \
 
1484
                self.read_records_iter(records):
1426
1485
            components[record_id] = (content, digest)
1427
1486
        return components
1428
1487