~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_dirstate.py

merge dirstate

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
import bisect
20
20
import os
 
21
import time
21
22
 
22
23
from bzrlib import (
23
24
    dirstate,
1005
1006
        self.assertEqual(expected, dirblock_names)
1006
1007
 
1007
1008
 
 
1009
class InstrumentedDirState(dirstate.DirState):
 
1010
    """An DirState with instrumented sha1 functionality."""
 
1011
 
 
1012
    def __init__(self, path):
 
1013
        super(InstrumentedDirState, self).__init__(path)
 
1014
        self._time_offset = 0
 
1015
        self._log = []
 
1016
 
 
1017
    def _sha_cutoff_time(self):
 
1018
        timestamp = super(InstrumentedDirState, self)._sha_cutoff_time()
 
1019
        self._cutoff_time = timestamp + self._time_offset
 
1020
 
 
1021
    def _sha1_file(self, abspath, entry):
 
1022
        self._log.append(('sha1', abspath))
 
1023
        return super(InstrumentedDirState, self)._sha1_file(abspath, entry)
 
1024
 
 
1025
    def _read_link(self, abspath, old_link):
 
1026
        self._log.append(('read_link', abspath, old_link))
 
1027
        return super(InstrumentedDirState, self)._read_link(abspath, old_link)
 
1028
 
 
1029
    def _lstat(self, abspath, entry):
 
1030
        self._log.append(('lstat', abspath))
 
1031
        return super(InstrumentedDirState, self)._lstat(abspath, entry)
 
1032
 
 
1033
    def _is_executable(self, mode, old_executable):
 
1034
        self._log.append(('is_exec', mode, old_executable))
 
1035
        return super(InstrumentedDirState, self)._is_executable(mode,
 
1036
                                                                old_executable)
 
1037
 
 
1038
    def adjust_time(self, secs):
 
1039
        """Move the clock forward or back.
 
1040
 
 
1041
        :param secs: The amount to adjust the clock by. Positive values make it
 
1042
        seem as if we are in the future, negative values make it seem like we
 
1043
        are in the past.
 
1044
        """
 
1045
        self._time_offset += secs
 
1046
        self._cutoff_time = None
 
1047
 
 
1048
 
 
1049
class _FakeStat(object):
 
1050
    """A class with the same attributes as a real stat result."""
 
1051
 
 
1052
    def __init__(self, size, mtime, ctime, dev, ino, mode):
 
1053
        self.st_size = size
 
1054
        self.st_mtime = mtime
 
1055
        self.st_ctime = ctime
 
1056
        self.st_dev = dev
 
1057
        self.st_ino = ino
 
1058
        self.st_mode = mode
 
1059
 
 
1060
 
 
1061
class TestUpdateEntry(TestCaseWithDirState):
 
1062
    """Test the DirState.update_entry functions"""
 
1063
 
 
1064
    def get_state_with_a(self):
 
1065
        """Create a DirState tracking a single object named 'a'"""
 
1066
        state = InstrumentedDirState.initialize('dirstate')
 
1067
        self.addCleanup(state.unlock)
 
1068
        state.add('a', 'a-id', 'file', None, '')
 
1069
        entry = state._get_entry(0, path_utf8='a')
 
1070
        return state, entry
 
1071
 
 
1072
    def test_update_entry(self):
 
1073
        state, entry = self.get_state_with_a()
 
1074
        self.build_tree(['a'])
 
1075
        # Add one where we don't provide the stat or sha already
 
1076
        self.assertEqual(('', 'a', 'a-id'), entry[0])
 
1077
        self.assertEqual([('f', '', 0, False, dirstate.DirState.NULLSTAT)],
 
1078
                         entry[1])
 
1079
        # Flush the buffers to disk
 
1080
        state.save()
 
1081
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1082
                         state._dirblock_state)
 
1083
 
 
1084
        stat_value = os.lstat('a')
 
1085
        packed_stat = dirstate.pack_stat(stat_value)
 
1086
        link_or_sha1 = state.update_entry(entry, abspath='a',
 
1087
                                          stat_value=stat_value)
 
1088
        self.assertEqual('b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
 
1089
                         link_or_sha1)
 
1090
 
 
1091
        # The dirblock entry should be updated with the new info
 
1092
        self.assertEqual([('f', link_or_sha1, 14, False, packed_stat)],
 
1093
                         entry[1])
 
1094
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
 
1095
                         state._dirblock_state)
 
1096
        mode = stat_value.st_mode
 
1097
        self.assertEqual([('sha1', 'a'), ('is_exec', mode, False)], state._log)
 
1098
 
 
1099
        state.save()
 
1100
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1101
                         state._dirblock_state)
 
1102
 
 
1103
        # If we do it again right away, we don't know if the file has changed
 
1104
        # so we will re-read the file. Roll the clock back so the file is
 
1105
        # guaranteed to look too new.
 
1106
        state.adjust_time(-10)
 
1107
 
 
1108
        link_or_sha1 = state.update_entry(entry, abspath='a',
 
1109
                                          stat_value=stat_value)
 
1110
        self.assertEqual([('sha1', 'a'), ('is_exec', mode, False),
 
1111
                          ('sha1', 'a'), ('is_exec', mode, False),
 
1112
                         ], state._log)
 
1113
        self.assertEqual('b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
 
1114
                         link_or_sha1)
 
1115
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
 
1116
                         state._dirblock_state)
 
1117
        state.save()
 
1118
 
 
1119
        # However, if we move the clock forward so the file is considered
 
1120
        # "stable", it should just returned the cached value.
 
1121
        state.adjust_time(20)
 
1122
        link_or_sha1 = state.update_entry(entry, abspath='a',
 
1123
                                          stat_value=stat_value)
 
1124
        self.assertEqual('b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
 
1125
                         link_or_sha1)
 
1126
        self.assertEqual([('sha1', 'a'), ('is_exec', mode, False),
 
1127
                          ('sha1', 'a'), ('is_exec', mode, False),
 
1128
                         ], state._log)
 
1129
 
 
1130
    def test_update_entry_no_stat_value(self):
 
1131
        """Passing the stat_value is optional."""
 
1132
        state, entry = self.get_state_with_a()
 
1133
        state.adjust_time(-10) # Make sure the file looks new
 
1134
        self.build_tree(['a'])
 
1135
        # Add one where we don't provide the stat or sha already
 
1136
        link_or_sha1 = state.update_entry(entry, abspath='a')
 
1137
        self.assertEqual('b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
 
1138
                         link_or_sha1)
 
1139
        stat_value = os.lstat('a')
 
1140
        self.assertEqual([('lstat', 'a'), ('sha1', 'a'),
 
1141
                          ('is_exec', stat_value.st_mode, False),
 
1142
                         ], state._log)
 
1143
 
 
1144
    def test_update_entry_symlink(self):
 
1145
        """Update entry should read symlinks."""
 
1146
        if not osutils.has_symlinks():
 
1147
            return # PlatformDeficiency / TestSkipped
 
1148
        state, entry = self.get_state_with_a()
 
1149
        state.save()
 
1150
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
 
1151
                         state._dirblock_state)
 
1152
        os.symlink('target', 'a')
 
1153
 
 
1154
        state.adjust_time(-10) # Make the symlink look new
 
1155
        stat_value = os.lstat('a')
 
1156
        packed_stat = dirstate.pack_stat(stat_value)
 
1157
        link_or_sha1 = state.update_entry(entry, abspath='a',
 
1158
                                          stat_value=stat_value)
 
1159
        self.assertEqual('target', link_or_sha1)
 
1160
        self.assertEqual([('read_link', 'a', '')], state._log)
 
1161
        # Dirblock is updated
 
1162
        self.assertEqual([('l', link_or_sha1, 6, False, packed_stat)],
 
1163
                         entry[1])
 
1164
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
 
1165
                         state._dirblock_state)
 
1166
 
 
1167
        # Because the stat_value looks new, we should re-read the target
 
1168
        link_or_sha1 = state.update_entry(entry, abspath='a',
 
1169
                                          stat_value=stat_value)
 
1170
        self.assertEqual('target', link_or_sha1)
 
1171
        self.assertEqual([('read_link', 'a', ''),
 
1172
                          ('read_link', 'a', 'target'),
 
1173
                         ], state._log)
 
1174
        state.adjust_time(+20) # Skip into the future, all files look old
 
1175
        link_or_sha1 = state.update_entry(entry, abspath='a',
 
1176
                                          stat_value=stat_value)
 
1177
        self.assertEqual('target', link_or_sha1)
 
1178
        # There should not be a new read_link call.
 
1179
        # (this is a weak assertion, because read_link is fairly inexpensive,
 
1180
        # versus the number of symlinks that we would have)
 
1181
        self.assertEqual([('read_link', 'a', ''),
 
1182
                          ('read_link', 'a', 'target'),
 
1183
                         ], state._log)
 
1184
 
 
1185
    def test_update_entry_dir(self):
 
1186
        state, entry = self.get_state_with_a()
 
1187
        self.build_tree(['a/'])
 
1188
        self.assertIs(None, state.update_entry(entry, 'a'))
 
1189
 
 
1190
    def create_and_test_file(self, state, entry):
 
1191
        """Create a file at 'a' and verify the state finds it.
 
1192
 
 
1193
        The state should already be versioning *something* at 'a'. This makes
 
1194
        sure that state.update_entry recognizes it as a file.
 
1195
        """
 
1196
        self.build_tree(['a'])
 
1197
        stat_value = os.lstat('a')
 
1198
        packed_stat = dirstate.pack_stat(stat_value)
 
1199
 
 
1200
        link_or_sha1 = state.update_entry(entry, abspath='a')
 
1201
        self.assertEqual('b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
 
1202
                         link_or_sha1)
 
1203
        self.assertEqual([('f', link_or_sha1, 14, False, packed_stat)],
 
1204
                         entry[1])
 
1205
        return packed_stat
 
1206
 
 
1207
    def create_and_test_dir(self, state, entry):
 
1208
        """Create a directory at 'a' and verify the state finds it.
 
1209
 
 
1210
        The state should already be versioning *something* at 'a'. This makes
 
1211
        sure that state.update_entry recognizes it as a directory.
 
1212
        """
 
1213
        self.build_tree(['a/'])
 
1214
        stat_value = os.lstat('a')
 
1215
        packed_stat = dirstate.pack_stat(stat_value)
 
1216
 
 
1217
        link_or_sha1 = state.update_entry(entry, abspath='a')
 
1218
        self.assertIs(None, link_or_sha1)
 
1219
        self.assertEqual([('d', '', 0, False, packed_stat)], entry[1])
 
1220
 
 
1221
        return packed_stat
 
1222
 
 
1223
    def create_and_test_symlink(self, state, entry):
 
1224
        """Create a symlink at 'a' and verify the state finds it.
 
1225
 
 
1226
        The state should already be versioning *something* at 'a'. This makes
 
1227
        sure that state.update_entry recognizes it as a symlink.
 
1228
 
 
1229
        This should not be called if this platform does not have symlink
 
1230
        support.
 
1231
        """
 
1232
        os.symlink('path/to/foo', 'a')
 
1233
 
 
1234
        stat_value = os.lstat('a')
 
1235
        packed_stat = dirstate.pack_stat(stat_value)
 
1236
 
 
1237
        link_or_sha1 = state.update_entry(entry, abspath='a')
 
1238
        self.assertEqual('path/to/foo', link_or_sha1)
 
1239
        self.assertEqual([('l', 'path/to/foo', 11, False, packed_stat)],
 
1240
                         entry[1])
 
1241
        return packed_stat
 
1242
 
 
1243
    def test_update_missing_file(self):
 
1244
        state, entry = self.get_state_with_a()
 
1245
        packed_stat = self.create_and_test_file(state, entry)
 
1246
        # Now if we delete the file, update_entry should recover and
 
1247
        # return None.
 
1248
        os.remove('a')
 
1249
        self.assertIs(None, state.update_entry(entry, abspath='a'))
 
1250
        # And the record shouldn't be changed.
 
1251
        digest = 'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
 
1252
        self.assertEqual([('f', digest, 14, False, packed_stat)],
 
1253
                         entry[1])
 
1254
 
 
1255
    def test_update_missing_dir(self):
 
1256
        state, entry = self.get_state_with_a()
 
1257
        packed_stat = self.create_and_test_dir(state, entry)
 
1258
        # Now if we delete the directory, update_entry should recover and
 
1259
        # return None.
 
1260
        os.rmdir('a')
 
1261
        self.assertIs(None, state.update_entry(entry, abspath='a'))
 
1262
        self.assertEqual([('d', '', 0, False, packed_stat)], entry[1])
 
1263
 
 
1264
    def test_update_missing_symlink(self):
 
1265
        if not osutils.has_symlinks():
 
1266
            return # PlatformDeficiency / TestSkipped
 
1267
        state, entry = self.get_state_with_a()
 
1268
        packed_stat = self.create_and_test_symlink(state, entry)
 
1269
        os.remove('a')
 
1270
        self.assertIs(None, state.update_entry(entry, abspath='a'))
 
1271
        # And the record shouldn't be changed.
 
1272
        self.assertEqual([('l', 'path/to/foo', 11, False, packed_stat)],
 
1273
                         entry[1])
 
1274
 
 
1275
    def test_update_file_to_dir(self):
 
1276
        """If a file changes to a directory we return None for the sha.
 
1277
        We also update the inventory record.
 
1278
        """
 
1279
        state, entry = self.get_state_with_a()
 
1280
        self.create_and_test_file(state, entry)
 
1281
        os.remove('a')
 
1282
        self.create_and_test_dir(state, entry)
 
1283
 
 
1284
    def test_update_file_to_symlink(self):
 
1285
        """File becomes a symlink"""
 
1286
        if not osutils.has_symlinks():
 
1287
            return # PlatformDeficiency / TestSkipped
 
1288
        state, entry = self.get_state_with_a()
 
1289
        self.create_and_test_file(state, entry)
 
1290
        os.remove('a')
 
1291
        self.create_and_test_symlink(state, entry)
 
1292
 
 
1293
    def test_update_dir_to_file(self):
 
1294
        """Directory becoming a file updates the entry."""
 
1295
        state, entry = self.get_state_with_a()
 
1296
        self.create_and_test_dir(state, entry)
 
1297
        os.rmdir('a')
 
1298
        self.create_and_test_file(state, entry)
 
1299
 
 
1300
    def test_update_dir_to_symlink(self):
 
1301
        """Directory becomes a symlink"""
 
1302
        if not osutils.has_symlinks():
 
1303
            return # PlatformDeficiency / TestSkipped
 
1304
        state, entry = self.get_state_with_a()
 
1305
        self.create_and_test_dir(state, entry)
 
1306
        os.rmdir('a')
 
1307
        self.create_and_test_symlink(state, entry)
 
1308
 
 
1309
    def test_update_symlink_to_file(self):
 
1310
        """Symlink becomes a file"""
 
1311
        state, entry = self.get_state_with_a()
 
1312
        self.create_and_test_symlink(state, entry)
 
1313
        os.remove('a')
 
1314
        self.create_and_test_file(state, entry)
 
1315
 
 
1316
    def test_update_symlink_to_dir(self):
 
1317
        """Symlink becomes a directory"""
 
1318
        state, entry = self.get_state_with_a()
 
1319
        self.create_and_test_symlink(state, entry)
 
1320
        os.remove('a')
 
1321
        self.create_and_test_dir(state, entry)
 
1322
 
 
1323
    def test__is_executable_win32(self):
 
1324
        state, entry = self.get_state_with_a()
 
1325
        self.build_tree(['a'])
 
1326
 
 
1327
        # Make sure we are using the win32 implementation of _is_executable
 
1328
        state._is_executable = state._is_executable_win32
 
1329
 
 
1330
        # The file on disk is not executable, but we are marking it as though
 
1331
        # it is. With _is_executable_win32 we ignore what is on disk.
 
1332
        entry[1][0] = ('f', '', 0, True, dirstate.DirState.NULLSTAT)
 
1333
 
 
1334
        stat_value = os.lstat('a')
 
1335
        packed_stat = dirstate.pack_stat(stat_value)
 
1336
 
 
1337
        state.adjust_time(-10) # Make sure everything is new
 
1338
        # Make sure it wants to kkkkkkkk
 
1339
        state.update_entry(entry, abspath='a', stat_value=stat_value)
 
1340
 
 
1341
        # The row is updated, but the executable bit stays set.
 
1342
        digest = 'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
 
1343
        self.assertEqual([('f', digest, 14, True, packed_stat)], entry[1])
 
1344
 
 
1345
 
 
1346
class TestPackStat(TestCaseWithTransport):
 
1347
 
 
1348
    def assertPackStat(self, expected, stat_value):
 
1349
        """Check the packed and serialized form of a stat value."""
 
1350
        self.assertEqual(expected, dirstate.pack_stat(stat_value))
 
1351
 
 
1352
    def test_pack_stat_int(self):
 
1353
        st = _FakeStat(6859L, 1172758614, 1172758617, 777L, 6499538L, 0100644)
 
1354
        # Make sure that all parameters have an impact on the packed stat.
 
1355
        self.assertPackStat('AAAay0Xm4FZF5uBZAAADCQBjLNIAAIGk', st)
 
1356
        st.st_size = 7000L
 
1357
        #                ay0 => bWE
 
1358
        self.assertPackStat('AAAbWEXm4FZF5uBZAAADCQBjLNIAAIGk', st)
 
1359
        st.st_mtime = 1172758620
 
1360
        #                     4FZ => 4Fx
 
1361
        self.assertPackStat('AAAbWEXm4FxF5uBZAAADCQBjLNIAAIGk', st)
 
1362
        st.st_ctime = 1172758630
 
1363
        #                          uBZ => uBm
 
1364
        self.assertPackStat('AAAbWEXm4FxF5uBmAAADCQBjLNIAAIGk', st)
 
1365
        st.st_dev = 888L
 
1366
        #                                DCQ => DeA
 
1367
        self.assertPackStat('AAAbWEXm4FxF5uBmAAADeABjLNIAAIGk', st)
 
1368
        st.st_ino = 6499540L
 
1369
        #                                     LNI => LNQ
 
1370
        self.assertPackStat('AAAbWEXm4FxF5uBmAAADeABjLNQAAIGk', st)
 
1371
        st.st_mode = 0100744
 
1372
        #                                          IGk => IHk
 
1373
        self.assertPackStat('AAAbWEXm4FxF5uBmAAADeABjLNQAAIHk', st)
 
1374
 
 
1375
    def test_pack_stat_float(self):
 
1376
        """On some platforms mtime and ctime are floats.
 
1377
 
 
1378
        Make sure we don't get warnings or errors, and that we ignore changes <
 
1379
        1s
 
1380
        """
 
1381
        st = _FakeStat(7000L, 1172758614.0, 1172758617.0,
 
1382
                       777L, 6499538L, 0100644)
 
1383
        # These should all be the same as the integer counterparts
 
1384
        self.assertPackStat('AAAbWEXm4FZF5uBZAAADCQBjLNIAAIGk', st)
 
1385
        st.st_mtime = 1172758620.0
 
1386
        #                     FZF5 => FxF5
 
1387
        self.assertPackStat('AAAbWEXm4FxF5uBZAAADCQBjLNIAAIGk', st)
 
1388
        st.st_ctime = 1172758630.0
 
1389
        #                          uBZ => uBm
 
1390
        self.assertPackStat('AAAbWEXm4FxF5uBmAAADCQBjLNIAAIGk', st)
 
1391
        # fractional seconds are discarded, so no change from above
 
1392
        st.st_mtime = 1172758620.453
 
1393
        self.assertPackStat('AAAbWEXm4FxF5uBmAAADCQBjLNIAAIGk', st)
 
1394
        st.st_ctime = 1172758630.228
 
1395
        self.assertPackStat('AAAbWEXm4FxF5uBmAAADCQBjLNIAAIGk', st)
 
1396
 
 
1397
 
1008
1398
class TestBisect(TestCaseWithTransport):
1009
1399
    """Test the ability to bisect into the disk format."""
1010
1400