Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Most of this work is copyright (C) 2013-2020 David R. MacIver 

5# (david@drmaciver.com), but it contains contributions by others. See 

6# CONTRIBUTING.rst for a full list of people who may hold copyright, and 

7# consult the git log if you need to determine who owns an individual 

8# contribution. 

9# 

10# This Source Code Form is subject to the terms of the Mozilla Public License, 

11# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

12# obtain one at https://mozilla.org/MPL/2.0/. 

13# 

14# END HEADER 

15 

16import binascii 

17import os 

18import warnings 

19from hashlib import sha384 

20 

21from hypothesis.configuration import mkdir_p, storage_directory 

22from hypothesis.errors import HypothesisException, HypothesisWarning 

23from hypothesis.utils.conventions import not_set 

24 

25 

26def _usable_dir(path): 

27 """ 

28 Returns True iff the desired path can be used as database path because 

29 either the directory exists and can be used, or its root directory can 

30 be used and we can make the directory as needed. 

31 """ 

32 while not os.path.exists(path): 

33 # Loop terminates because the root dir ('/' on unix) always exists. 

34 path = os.path.dirname(path) 

35 return os.path.isdir(path) and os.access(path, os.R_OK | os.W_OK | os.X_OK) 

36 

37 

38def _db_for_path(path=None): 

39 if path is not_set: 

40 if os.getenv("HYPOTHESIS_DATABASE_FILE") is not None: # pragma: no cover 

41 raise HypothesisException( 

42 "The $HYPOTHESIS_DATABASE_FILE environment variable no longer has any " 

43 "effect. Configure your database location via a settings profile instead.\n" 

44 "https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles" 

45 ) 

46 

47 path = storage_directory("examples") 

48 if not _usable_dir(path): # pragma: no cover 

49 warnings.warn( 

50 HypothesisWarning( 

51 "The database setting is not configured, and the default " 

52 "location is unusable - falling back to an in-memory " 

53 "database for this session. path=%r" % (path,) 

54 ) 

55 ) 

56 return InMemoryExampleDatabase() 

57 if path in (None, ":memory:"): 

58 return InMemoryExampleDatabase() 

59 return DirectoryBasedExampleDatabase(str(path)) 

60 

61 

62class EDMeta(type): 

63 def __call__(self, *args, **kwargs): 

64 if self is ExampleDatabase: 

65 return _db_for_path(*args, **kwargs) 

66 return super().__call__(*args, **kwargs) 

67 

68 

69class ExampleDatabase(metaclass=EDMeta): 

70 """Interface class for storage systems. 

71 

72 A key -> multiple distinct values mapping. 

73 

74 Keys and values are binary data. 

75 """ 

76 

77 def save(self, key, value): 

78 """Save ``value`` under ``key``. 

79 

80 If this value is already present for this key, silently do 

81 nothing 

82 """ 

83 raise NotImplementedError("%s.save" % (type(self).__name__)) 

84 

85 def delete(self, key, value): 

86 """Remove this value from this key. 

87 

88 If this value is not present, silently do nothing. 

89 """ 

90 raise NotImplementedError("%s.delete" % (type(self).__name__)) 

91 

92 def move(self, src, dest, value): 

93 """Move value from key src to key dest. Equivalent to delete(src, 

94 value) followed by save(src, value) but may have a more efficient 

95 implementation. 

96 

97 Note that value will be inserted at dest regardless of whether 

98 it is currently present at src. 

99 """ 

100 if src == dest: 

101 self.save(src, value) 

102 return 

103 self.delete(src, value) 

104 self.save(dest, value) 

105 

106 def fetch(self, key): 

107 """Return all values matching this key.""" 

108 raise NotImplementedError("%s.fetch" % (type(self).__name__)) 

109 

110 def close(self): 

111 """Clear up any resources associated with this database.""" 

112 raise NotImplementedError("%s.close" % (type(self).__name__)) 

113 

114 

115class InMemoryExampleDatabase(ExampleDatabase): 

116 def __init__(self): 

117 self.data = {} 

118 

119 def __repr__(self): 

120 return "InMemoryExampleDatabase(%r)" % (self.data,) 

121 

122 def fetch(self, key): 

123 yield from self.data.get(key, ()) 

124 

125 def save(self, key, value): 

126 self.data.setdefault(key, set()).add(bytes(value)) 

127 

128 def delete(self, key, value): 

129 self.data.get(key, set()).discard(bytes(value)) 

130 

131 def close(self): 

132 pass 

133 

134 

135def _hash(key): 

136 return sha384(key).hexdigest()[:16] 

137 

138 

139class DirectoryBasedExampleDatabase(ExampleDatabase): 

140 def __init__(self, path): 

141 self.path = path 

142 self.keypaths = {} 

143 

144 def __repr__(self): 

145 return "DirectoryBasedExampleDatabase(%r)" % (self.path,) 

146 

147 def close(self): 

148 pass 

149 

150 def _key_path(self, key): 

151 try: 

152 return self.keypaths[key] 

153 except KeyError: 

154 pass 

155 directory = os.path.join(self.path, _hash(key)) 

156 self.keypaths[key] = directory 

157 return directory 

158 

159 def _value_path(self, key, value): 

160 return os.path.join(self._key_path(key), _hash(value)) 

161 

162 def fetch(self, key): 

163 kp = self._key_path(key) 

164 if not os.path.exists(kp): 

165 return 

166 for path in os.listdir(kp): 

167 try: 

168 with open(os.path.join(kp, path), "rb") as i: 

169 yield i.read() 

170 except OSError: 

171 pass 

172 

173 def save(self, key, value): 

174 # Note: we attempt to create the dir in question now. We 

175 # already checked for permissions, but there can still be other issues, 

176 # e.g. the disk is full 

177 mkdir_p(self._key_path(key)) 

178 path = self._value_path(key, value) 

179 if not os.path.exists(path): 

180 suffix = binascii.hexlify(os.urandom(16)).decode("ascii") 

181 tmpname = path + "." + suffix 

182 with open(tmpname, "wb") as o: 

183 o.write(value) 

184 try: 

185 os.rename(tmpname, path) 

186 except OSError: # pragma: no cover 

187 os.unlink(tmpname) 

188 assert not os.path.exists(tmpname) 

189 

190 def move(self, src, dest, value): 

191 if src == dest: 

192 self.save(src, value) 

193 return 

194 try: 

195 os.renames(self._value_path(src, value), self._value_path(dest, value)) 

196 except OSError: 

197 self.delete(src, value) 

198 self.save(dest, value) 

199 

200 def delete(self, key, value): 

201 try: 

202 os.unlink(self._value_path(key, value)) 

203 except OSError: 

204 pass