1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35import time
36import serial
37
38
39port = "COM4"
40baudrate = 115200
41
42
43class AFBR_S50:
44 """
45 Serial Communication Interface for the AFBR-S50 Device.
46 Connects to the device via a UART interface.
47 """
48
49
50 start_byte = b"\x02"
51
52 stop_byte = b"\x03"
53
54 esc_byte = b"\x1B"
55
56 cmd_ack = 0x0A
57
58 cmd_nak = 0x0B
59
60 ser = None
61
62 def __init__(self, port, baudrate):
63 """!
64 Initializes the class and opens a serial port w/
65 "115200,8,N,1" serial settings and no timeout.
66
67 @param port (str): The port number string, e.g. "COM1"
68 @param baudrate (int): The baud rate in bauds per second, e.g. 115200
69 """
70 print(
"AFBR-S50: Open Serial Port " + port)
71 self.ser = serial.Serial(port, baudrate)
72 self.ser.timeout = 1.0
73 print(
"AFBR-S50: Serial Port is open " + port +
": " + str(self.ser.is_open))
74
75
76 if self.ser.inWaiting() > 0:
77 self.ser.read(self.ser.inWaiting())
78
79 def __del__(self):
80 """!
81 Deletes the class and closes the opened serial port.
82 """
83 self.ser.close()
84
85 def write(self, tx: bytes):
86 """!
87 Sends a SCI message and waits for an optional answer and
88 the mandatory acknowledge.
89
90 If any answer is received, it is returned as bytearray.
91
92 @param tx (bytes): The data message (incl. excape bytes) as byte array to be sent.
93 @return Returns the received answer (ACK or NAK) as byte array. None if no answer was received.
94 """
95 print(
"Sending: " + tx.hex())
96 self.ser.write(tx)
97 return self.__wait_for_ack(tx[1])
98
99 def __wait_for_ack(self, txcmd):
100 """!
101 Waits for an acknowledge signal for the specified command.
102 If an answer is received before the acknowledge is received,
103 the answer is returned as a bytearray.
104 If no acknowledge or any other command is received, an
105 exception is raised.
106 @param txcmd (byte): The TX command byte to await an acknowledge for.
107 @return Returns the received answer (ACK or NAK) as byte array. None if no answer was received.
108 """
109 answer = None
110
111 while True:
112
113 rx = bytearray(self.ser.read_until(self.stop_byte))
114 if len(rx) == 0:
115 raise Exception("No data was read from the RX line.")
116
117 if rx[0] != self.start_byte[0] or rx[-1] != self.stop_byte[0]:
118 raise Exception("Invalid data frame received (start or stop byte missing).")
119
120 rx = self.__remove_byte_stuffing(rx)
121
122
123 rxcmd = rx[1]
124
125 if rxcmd == txcmd:
126 answer = rx
127
128
129 elif rxcmd == self.cmd_ack:
130 ackcmd = rx[2]
131
132
133 if ackcmd == txcmd:
134 return answer
135
136
137 else:
138 raise Exception("Invalid ACK received")
139
140
141 elif rxcmd == self.cmd_nak:
142 nakcmd = rx[2]
143
144
145 if nakcmd == txcmd:
146 raise Exception("NAK received")
147
148
149 else:
150 raise Exception("Invalid NAK received")
151
152 def __remove_byte_stuffing(self, rx: bytearray):
153 """!
154 Removes escape bytes from the incoming message if any
155 @param rx (bytearray): The data message as byte array with escape bytes.
156 """
157 rxi = rx.split(self.esc_byte)
158 rx = b""
159 for i in range(1, len(rxi)):
160
161 rxi[i][0] ^= 0xFF
162 return rx.join(rxi)
163
164 def __extract_1d_data(self, rx: bytearray):
165 """!
166 Extracts the 1D data values from the 1D data message.
167 @param rx (bytearray): The 1D data message as byte array without escape bytes.
168 @return Returns the read data as dictionary.
169 """
170 d = dict()
171
172
173 s = (rx[3] << 8) + rx[4]
174 d["status"] = s if s < 0x8000 else s - 0x10000
175
176
177 t_sec = (rx[5] << 24) + (rx[6] << 16) + (rx[7] << 8) + rx[8]
178 t_usec = (rx[9] << 8) + rx[10]
179 d["timestamp"] = t_sec + t_usec * 16.0 / 1.0e6
180
181
182 r = (rx[15] << 16) + (rx[16] << 8) + rx[17]
183 d["range"] = r / 16384.0
184
185
186 a = (rx[18] << 8) + rx[19]
187 d["amplitude"] = a / 16.0
188
189
190 q = rx[20]
191 d["signal quality"] = q
192
193 return d
194
195 def read_data(self):
196 """!
197 Reads the serial port and decodes the SCI data messages.
198 Currently only 1D data messages are supported.
199 If no data is pending to be read, the function immediately
200 return with None. If other data than measurement data was read,
201 the function returns with None.
202 Otherwise it returns a dictionary with the extracted data values.
203 @return Returns the read data as dictionary. None if no data has been read.
204 """
205 if self.ser.inWaiting() == 0:
206 return None
207
208
209 rx = bytearray(self.ser.read_until(self.stop_byte))
210 if len(rx) == 0:
211 raise Exception("No data was read from the RX line.")
212
213 if rx[0] != self.start_byte[0] or rx[-1] != self.stop_byte[0]:
214 raise Exception("Invalid data frame received (start or stop byte missing).")
215
216 rx = self.__remove_byte_stuffing(rx)
217
218
219 cmd = rx[1]
220
221 if cmd == 0x06:
222 print(
"Device Log: " + str(rx[8:-2]))
223
224 elif cmd == 0xB6:
225 return self.__extract_1d_data(rx)
226
227 else:
228 print(
"Received Unknown Data Frame: " + rx.hex())
229
230
231if __name__ == "__main__":
232
233 try:
234
235 s50 = AFBR_S50(port, baudrate)
236
237
238
239
240
241
242
243
244
245
246 print(
"setting data output mode to 1d data only")
247 s50.write(bytes.fromhex("02 41 07 F5 03"))
248
249
250
251
252
253
254
255
256
257
258
259
260
261 print(
"setting frame rate to 5 Hz (i.e. frame time to 0.2 sec)")
262 s50.write(bytes.fromhex("02 43 00 1B FC 0D 40 85 03"))
263
264
265
266 print(
"starting measurements in timer based auto mode")
267 s50.write(bytes.fromhex("02 11 D0 03"))
268
269
270 print(
"read measurement data")
271 while True:
272 d = s50.read_data()
273 if d != None:
275 f"{d['timestamp']:10.6f} sec | "
276 + f"range: {d['range']:6.3f} m | "
277 + f"amplitude: {d['amplitude']:8.3f} | "
278 + f"signal quality: {d['signal quality']:3d} | "
279 + f"status: {d['status']:5d} |"
280 )
281
282 else:
283
284 time.sleep(0.1)
285
286 except KeyboardInterrupt:
287
288
289 print(
"stop measurements")
290 s50.write(bytes.fromhex("02 12 F7 03"))
291
status_t print(const char *fmt_s,...)
A printf-like function to print formatted data to an debugging interface.
Definition sci_log.c:106