Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

from http import HTTPStatus 

from concurrent.futures import ThreadPoolExecutor 

from datetime import date 

from tornado import web, escape, ioloop, httpclient, gen 

from tornado.concurrent import run_on_executor 

from drone import Altimeter, Hexacopter, LightEmittingDiode, Drone 

 

 

thread_pool_executor = ThreadPoolExecutor() 

drone = Drone() 

 

 

class AsyncHexacopterHandler(web.RequestHandler): 

SUPPORTED_METHODS = ("GET", "PATCH") 

HEXACOPTER_ID = 1 

_thread_pool_executor = thread_pool_executor 

 

@gen.coroutine 

def get(self, id): 

20 ↛ 21line 20 didn't jump to line 21, because the condition on line 20 was never true if int(id) is not self.__class__.HEXACOPTER_ID: 

self.set_status(HTTPStatus.NOT_FOUND) 

self.finish() 

return 

print("I've started retrieving the hexacopter's status") 

hexacopter_status = yield self.retrieve_hexacopter_status() 

print("I've finished retrieving the hexacopter's status") 

response = { 

'motor_speed_in_rpm': hexacopter_status.motor_speed, 

'is_turned_on': hexacopter_status.is_turned_on,} 

self.set_status(HTTPStatus.OK) 

self.write(response) 

self.finish() 

 

@run_on_executor(executor="_thread_pool_executor") 

def retrieve_hexacopter_status(self): 

return drone.hexacopter.status 

 

@gen.coroutine 

def patch(self, id): 

40 ↛ 41line 40 didn't jump to line 41, because the condition on line 40 was never true if int(id) is not self.__class__.HEXACOPTER_ID: 

self.set_status(HTTPStatus.NOT_FOUND) 

self.finish() 

return 

request_data = escape.json_decode(self.request.body) 

45 ↛ 47line 45 didn't jump to line 47, because the condition on line 45 was never true if ('motor_speed_in_rpm' not in request_data.keys()) or \ 

(request_data['motor_speed_in_rpm'] is None): 

self.set_status(HTTPStatus.BAD_REQUEST) 

self.finish() 

return 

try: 

motor_speed = int(request_data['motor_speed_in_rpm']) 

print("I've started setting the hexacopter's motor speed") 

hexacopter_status = yield self.set_hexacopter_motor_speed(motor_speed) 

print("I've finished setting the hexacopter's motor speed") 

response = { 

'motor_speed_in_rpm': hexacopter_status.motor_speed, 

'is_turned_on': hexacopter_status.is_turned_on,} 

self.set_status(HTTPStatus.OK) 

self.write(response) 

self.finish() 

except ValueError as e: 

print("I've failed setting the hexacopter's motor speed") 

self.set_status(HTTPStatus.BAD_REQUEST) 

response = { 

'error': e.args[0]} 

self.write(response) 

self.finish() 

 

@run_on_executor(executor="_thread_pool_executor") 

def set_hexacopter_motor_speed(self, motor_speed): 

drone.hexacopter.motor_speed = motor_speed 

hexacopter_status = drone.hexacopter.status 

return hexacopter_status 

 

 

class AsyncLedHandler(web.RequestHandler): 

SUPPORTED_METHODS = ("GET", "PATCH") 

_thread_pool_executor = thread_pool_executor 

 

@gen.coroutine 

def get(self, id): 

int_id = int(id) 

83 ↛ 84line 83 didn't jump to line 84, because the condition on line 83 was never true if int_id not in drone.leds.keys(): 

self.set_status(HTTPStatus.NOT_FOUND) 

self.finish() 

return 

led = drone.leds[int_id] 

print("I've started retrieving {0}'s status".format(led.description)) 

brightness_level = yield self.retrieve_led_brightness_level(led) 

print("I've finished retrieving {0}'s status".format(led.description)) 

response = { 

'id': led.id, 

'description': led.description, 

'brightness_level': brightness_level} 

self.set_status(HTTPStatus.OK) 

self.write(response) 

self.finish() 

 

@run_on_executor(executor="_thread_pool_executor") 

def retrieve_led_brightness_level(self, led): 

return led.brightness_level 

 

@gen.coroutine 

def patch(self, id): 

int_id = int(id) 

106 ↛ 107line 106 didn't jump to line 107, because the condition on line 106 was never true if int_id not in drone.leds.keys(): 

self.set_status(HTTPStatus.NOT_FOUND) 

self.finish() 

return 

led = drone.leds[int_id] 

request_data = escape.json_decode(self.request.body) 

112 ↛ 114line 112 didn't jump to line 114, because the condition on line 112 was never true if ('brightness_level' not in request_data.keys()) or \ 

(request_data['brightness_level'] is None): 

self.set_status(HTTPStatus.BAD_REQUEST) 

self.finish() 

return 

try: 

brightness_level = int(request_data['brightness_level']) 

print("I've started setting the {0}'s brightness level".format(led.description)) 

yield self.set_led_brightness_level(led, brightness_level) 

print("I've finished setting the {0}'s brightness level".format(led.description)) 

response = { 

'id': led.id, 

'description': led.description, 

'brightness_level': brightness_level} 

self.set_status(HTTPStatus.OK) 

self.write(response) 

self.finish() 

except ValueError as e: 

print("I've failed setting the {0}'s brightness level".format(led.description)) 

self.set_status(HTTPStatus.BAD_REQUEST) 

response = { 

'error': e.args[0]} 

self.write(response) 

self.finish() 

 

@run_on_executor(executor="_thread_pool_executor") 

def set_led_brightness_level(self, led, brightness_level): 

led.brightness_level = brightness_level 

 

 

class AsyncAltimeterHandler(web.RequestHandler): 

SUPPORTED_METHODS = ("GET") 

ALTIMETER_ID = 1 

_thread_pool_executor = thread_pool_executor 

 

@gen.coroutine 

def get(self, id): 

print("Running the get method") 

150 ↛ 151line 150 didn't jump to line 151, because the condition on line 150 was never true if int(id) is not self.__class__.ALTIMETER_ID: 

self.set_status(HTTPStatus.NOT_FOUND) 

self.finish() 

return 

unit = self.get_arguments(name='unit') 

if 'meters' in unit: 

altitude_multiplier = 0.3048 

response_unit = 'meters' 

else: 

altitude_multiplier = 1 

response_unit = 'feet' 

print("I've started retrieving the altitude") 

altitude_in_feet = yield self.retrieve_altitude_in_feet() 

altitude = round(altitude_in_feet * altitude_multiplier, 4) 

print("I've finished retrieving the altitude") 

response = { 

'altitude': altitude, 

'unit': response_unit} 

self.set_status(HTTPStatus.OK) 

self.write(response) 

self.finish() 

 

@run_on_executor(executor="_thread_pool_executor") 

def retrieve_altitude_in_feet(self): 

return drone.altimeter.altitude 

 

 

class Application(web.Application): 

def __init__(self, **kwargs): 

handlers = [ 

(r"/hexacopters/([0-9]+)", AsyncHexacopterHandler), 

(r"/leds/([0-9]+)", AsyncLedHandler), 

(r"/altimeters/([0-9]+)", AsyncAltimeterHandler), 

] 

super(Application, self).__init__(handlers, **kwargs) 

 

 

187 ↛ 188line 187 didn't jump to line 188, because the condition on line 187 was never trueif __name__ == "__main__": 

application = Application() 

port = 8888 

print("Listening at port {0}".format(port)) 

application.listen(port) 

tornado_ioloop = ioloop.IOLoop.instance() 

periodic_callback = ioloop.PeriodicCallback(lambda: None, 500) 

periodic_callback.start() 

tornado_ioloop.start()