~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: John Arbash Meinel
  • Date: 2006-08-16 22:00:19 UTC
  • mto: This revision was merged to the branch mainline in revision 1942.
  • Revision ID: john@arbash-meinel.com-20060816220019-541cb90093258ac3
Using real utf8 and cache_utf8 has similar performance, 272ms, and 363ms

Show diffs side-by-side

added added

removed removed

Lines of Context:
74
74
import warnings
75
75
 
76
76
import bzrlib
77
 
import bzrlib.errors as errors
 
77
from bzrlib import (
 
78
    cache_utf8,
 
79
    errors,
 
80
    )
78
81
from bzrlib.errors import FileExists, NoSuchFile, KnitError, \
79
82
        InvalidRevisionId, KnitCorrupt, KnitHeaderError, \
80
83
        RevisionNotPresent, RevisionAlreadyPresent
82
85
from bzrlib.trace import mutter
83
86
from bzrlib.osutils import contains_whitespace, contains_linebreaks, \
84
87
     sha_strings
85
 
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
86
88
from bzrlib.symbol_versioning import DEPRECATED_PARAMETER, deprecated_passed
87
89
from bzrlib.tsort import topo_sort
88
90
import bzrlib.weave
 
91
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
89
92
 
90
93
 
91
94
# TODO: Split out code specific to this format into an associated object.
162
165
        internal representation is of the format:
163
166
        (revid, plaintext)
164
167
        """
 
168
        decode_utf8 = cache_utf8.decode
165
169
        lines = []
166
170
        for line in content:
167
171
            origin, text = line.split(' ', 1)
168
 
            lines.append((origin.decode('utf-8'), text))
 
172
            lines.append((decode_utf8(origin), text))
169
173
        return KnitContent(lines)
170
174
 
171
175
    def parse_line_delta_iter(self, lines):
182
186
        internal representation is
183
187
        (start, end, count, [1..count tuples (revid, newline)])
184
188
        """
 
189
        decode_utf8 = cache_utf8.decode
185
190
        result = []
186
191
        lines = iter(lines)
187
192
        next = lines.next
193
198
            while remaining:
194
199
                origin, text = next().split(' ', 1)
195
200
                remaining -= 1
196
 
                contents.append((origin.decode('utf-8'), text))
 
201
                contents.append((decode_utf8(origin), text))
197
202
            result.append((start, end, count, contents))
198
203
        return result
199
204
 
202
207
 
203
208
        see parse_fulltext which this inverts.
204
209
        """
205
 
        return ['%s %s' % (o.encode('utf-8'), t) for o, t in content._lines]
 
210
        encode_utf8 = cache_utf8.encode
 
211
        return ['%s %s' % (encode_utf8(o), t) for o, t in content._lines]
206
212
 
207
213
    def lower_line_delta(self, delta):
208
214
        """convert a delta into a serializable form.
209
215
 
210
216
        See parse_line_delta which this inverts.
211
217
        """
 
218
        encode_utf8 = cache_utf8.encode
212
219
        out = []
213
220
        for start, end, c, lines in delta:
214
221
            out.append('%d,%d,%d\n' % (start, end, c))
215
 
            for origin, text in lines:
216
 
                out.append('%s %s' % (origin.encode('utf-8'), text))
 
222
            out.extend(encode_utf8(origin) + ' ' + text
 
223
                       for origin, text in lines)
217
224
        return out
218
225
 
219
226
 
374
381
        """
375
382
        # write all the data
376
383
        pos = self._data.add_raw_record(data)
 
384
        offset = 0
377
385
        index_entries = []
378
386
        for (version_id, options, parents, size) in records:
379
 
            index_entries.append((version_id, options, pos, size, parents))
380
 
            pos += size
 
387
            index_entries.append((version_id, options, pos+offset,
 
388
                                  size, parents))
 
389
            if self._data._do_cache:
 
390
                self._data._cache[version_id] = data[offset:offset+size]
 
391
            offset += size
381
392
        self._index.add_versions(index_entries)
382
393
 
 
394
    def enable_cache(self):
 
395
        """Start caching data for this knit"""
 
396
        self._data.enable_cache()
 
397
 
383
398
    def clear_cache(self):
384
399
        """Clear the data cache only."""
385
400
        self._data.clear_cache()
695
710
        # c = component_id, m = method, p = position, s = size, n = next
696
711
        records = [(c, p, s) for c, (m, p, s, n) in position_map.iteritems()]
697
712
        record_map = {}
698
 
        for component_id, content, digest in\
699
 
            self._data.read_records_iter(records): 
 
713
        for component_id, content, digest in \
 
714
                self._data.read_records_iter(records):
700
715
            method, position, size, next = position_map[component_id]
701
716
            record_map[component_id] = method, content, digest, next
702
717
                          
1198
1213
        return self._cache[version_id][5]
1199
1214
 
1200
1215
    def _version_list_to_index(self, versions):
 
1216
        encode_utf8 = cache_utf8.encode
1201
1217
        result_list = []
1202
1218
        for version in versions:
1203
1219
            if version in self._cache:
1205
1221
                result_list.append(str(self._cache[version][5]))
1206
1222
                # -- end lookup () --
1207
1223
            else:
1208
 
                result_list.append('.' + version.encode('utf-8'))
 
1224
                result_list.append('.' + encode_utf8(version))
1209
1225
        return ' '.join(result_list)
1210
1226
 
1211
1227
    def add_version(self, version_id, options, pos, size, parents):
1219
1235
                         (version_id, options, pos, size, parents).
1220
1236
        """
1221
1237
        lines = []
 
1238
        encode_utf8 = cache_utf8.encode
1222
1239
        for version_id, options, pos, size, parents in versions:
1223
 
            line = "\n%s %s %s %s %s :" % (version_id.encode('utf-8'),
 
1240
            line = "\n%s %s %s %s %s :" % (encode_utf8(version_id),
1224
1241
                                           ','.join(options),
1225
1242
                                           pos,
1226
1243
                                           size,
1284
1301
    def __init__(self, transport, filename, mode, create=False, file_mode=None):
1285
1302
        _KnitComponentFile.__init__(self, transport, filename, mode)
1286
1303
        self._checked = False
 
1304
        # TODO: jam 20060713 conceptually, this could spill to disk
 
1305
        #       if the cached size gets larger than a certain amount
 
1306
        #       but it complicates the model a bit, so for now just use
 
1307
        #       a simple dictionary
 
1308
        self._cache = {}
 
1309
        self._do_cache = False
1287
1310
        if create:
1288
1311
            self._transport.put(self._filename, StringIO(''), mode=file_mode)
1289
1312
 
 
1313
    def enable_cache(self):
 
1314
        """Enable caching of reads."""
 
1315
        self._do_cache = True
 
1316
 
1290
1317
    def clear_cache(self):
1291
1318
        """Clear the record cache."""
1292
 
        pass
 
1319
        self._do_cache = False
 
1320
        self._cache = {}
1293
1321
 
1294
1322
    def _open_file(self):
1295
1323
        try:
1305
1333
        """
1306
1334
        sio = StringIO()
1307
1335
        data_file = GzipFile(None, mode='wb', fileobj=sio)
 
1336
 
 
1337
        version_id_utf8 = cache_utf8.encode(version_id)
1308
1338
        data_file.writelines(chain(
1309
 
            ["version %s %d %s\n" % (version_id.encode('utf-8'), 
 
1339
            ["version %s %d %s\n" % (version_id_utf8,
1310
1340
                                     len(lines),
1311
1341
                                     digest)],
1312
1342
            lines,
1313
 
            ["end %s\n" % version_id.encode('utf-8')]))
 
1343
            ["end %s\n" % version_id_utf8]))
1314
1344
        data_file.close()
1315
1345
        length= sio.tell()
1316
1346
 
1331
1361
        size, sio = self._record_to_data(version_id, digest, lines)
1332
1362
        # write to disk
1333
1363
        start_pos = self._transport.append(self._filename, sio)
 
1364
        if self._do_cache:
 
1365
            self._cache[version_id] = sio.getvalue()
1334
1366
        return start_pos, size
1335
1367
 
1336
1368
    def _parse_record_header(self, version_id, raw_data):
1343
1375
        rec = df.readline().split()
1344
1376
        if len(rec) != 4:
1345
1377
            raise KnitCorrupt(self._filename, 'unexpected number of elements in record header')
1346
 
        if rec[1].decode('utf-8')!= version_id:
 
1378
        if cache_utf8.decode(rec[1]) != version_id:
1347
1379
            raise KnitCorrupt(self._filename, 
1348
1380
                              'unexpected version, wanted %r, got %r' % (
1349
1381
                                version_id, rec[1]))
1358
1390
        record_contents = df.readlines()
1359
1391
        l = record_contents.pop()
1360
1392
        assert len(record_contents) == int(rec[2])
1361
 
        if l.decode('utf-8') != 'end %s\n' % version_id:
 
1393
        if l != 'end %s\n' % cache_utf8.encode(version_id):
1362
1394
            raise KnitCorrupt(self._filename, 'unexpected version end line %r, wanted %r' 
1363
1395
                        % (l, version_id))
1364
1396
        df.close()
1369
1401
 
1370
1402
        This unpacks enough of the text record to validate the id is
1371
1403
        as expected but thats all.
1372
 
 
1373
 
        It will actively recompress currently cached records on the
1374
 
        basis that that is cheaper than I/O activity.
1375
1404
        """
1376
1405
        # setup an iterator of the external records:
1377
1406
        # uses readv so nice and fast we hope.
1378
1407
        if len(records):
1379
1408
            # grab the disk data needed.
1380
 
            raw_records = self._transport.readv(self._filename,
1381
 
                [(pos, size) for version_id, pos, size in records])
 
1409
            if self._cache:
 
1410
                # Don't check _cache if it is empty
 
1411
                needed_offsets = [(pos, size) for version_id, pos, size
 
1412
                                              in records
 
1413
                                              if version_id not in self._cache]
 
1414
            else:
 
1415
                needed_offsets = [(pos, size) for version_id, pos, size
 
1416
                                               in records]
 
1417
 
 
1418
            raw_records = self._transport.readv(self._filename, needed_offsets)
 
1419
                
1382
1420
 
1383
1421
        for version_id, pos, size in records:
1384
 
            pos, data = raw_records.next()
1385
 
            # validate the header
1386
 
            df, rec = self._parse_record_header(version_id, data)
1387
 
            df.close()
 
1422
            if version_id in self._cache:
 
1423
                # This data has already been validated
 
1424
                data = self._cache[version_id]
 
1425
            else:
 
1426
                pos, data = raw_records.next()
 
1427
                if self._do_cache:
 
1428
                    self._cache[version_id] = data
 
1429
 
 
1430
                # validate the header
 
1431
                df, rec = self._parse_record_header(version_id, data)
 
1432
                df.close()
1388
1433
            yield version_id, data
1389
1434
 
1390
1435
    def read_records_iter(self, records):
1391
1436
        """Read text records from data file and yield result.
1392
1437
 
1393
 
        Each passed record is a tuple of (version_id, pos, len) and
1394
 
        will be read in the given order.  Yields (version_id,
1395
 
        contents, digest).
 
1438
        The result will be returned in whatever is the fastest to read.
 
1439
        Not by the order requested. Also, multiple requests for the same
 
1440
        record will only yield 1 response.
 
1441
        :param records: A list of (version_id, pos, len) entries
 
1442
        :return: Yields (version_id, contents, digest) in the order
 
1443
                 read, not the order requested
1396
1444
        """
1397
 
        if len(records) == 0:
1398
 
            return
1399
 
        # profiling notes:
1400
 
        # 60890  calls for 4168 extractions in 5045, 683 internal.
1401
 
        # 4168   calls to readv              in 1411
1402
 
        # 4168   calls to parse_record       in 2880
1403
 
 
1404
 
        # Get unique records, sorted by position
1405
 
        needed_records = sorted(set(records), key=operator.itemgetter(1))
1406
 
 
1407
 
        # We take it that the transport optimizes the fetching as good
1408
 
        # as possible (ie, reads continuous ranges.)
1409
 
        response = self._transport.readv(self._filename,
 
1445
        if not records:
 
1446
            return
 
1447
 
 
1448
        if self._cache:
 
1449
            # Skip records we have alread seen
 
1450
            yielded_records = set()
 
1451
            needed_records = set()
 
1452
            for record in records:
 
1453
                if record[0] in self._cache:
 
1454
                    if record[0] in yielded_records:
 
1455
                        continue
 
1456
                    yielded_records.add(record[0])
 
1457
                    data = self._cache[record[0]]
 
1458
                    content, digest = self._parse_record(record[0], data)
 
1459
                    yield (record[0], content, digest)
 
1460
                else:
 
1461
                    needed_records.add(record)
 
1462
            needed_records = sorted(needed_records, key=operator.itemgetter(1))
 
1463
        else:
 
1464
            needed_records = sorted(set(records), key=operator.itemgetter(1))
 
1465
 
 
1466
        if not needed_records:
 
1467
            return
 
1468
 
 
1469
        # The transport optimizes the fetching as well 
 
1470
        # (ie, reads continuous ranges.)
 
1471
        readv_response = self._transport.readv(self._filename,
1410
1472
            [(pos, size) for version_id, pos, size in needed_records])
1411
1473
 
1412
 
        record_map = {}
1413
 
        for (record_id, pos, size), (pos, data) in \
1414
 
            izip(iter(needed_records), response):
1415
 
            content, digest = self._parse_record(record_id, data)
1416
 
            record_map[record_id] = (digest, content)
1417
 
 
1418
 
        for version_id, pos, size in records:
1419
 
            digest, content = record_map[version_id]
 
1474
        for (version_id, pos, size), (pos, data) in \
 
1475
                izip(iter(needed_records), readv_response):
 
1476
            content, digest = self._parse_record(version_id, data)
 
1477
            if self._do_cache:
 
1478
                self._cache[version_id] = data
1420
1479
            yield version_id, content, digest
1421
1480
 
1422
1481
    def read_records(self, records):
1423
1482
        """Read records into a dictionary."""
1424
1483
        components = {}
1425
 
        for record_id, content, digest in self.read_records_iter(records):
 
1484
        for record_id, content, digest in \
 
1485
                self.read_records_iter(records):
1426
1486
            components[record_id] = (content, digest)
1427
1487
        return components
1428
1488