45 Serial Communication Interface for the AFBR-S50 Device.
46 Connects to the device via a UART interface.
62 def __init__(self, port, baudrate):
64 Initializes the class and opens a serial port w/
65 "115200,8,N,1" serial settings and no timeout.
67 @param port (str): The port number string, e.g. "COM1"
68 @param baudrate (int): The baud rate in bauds per second, e.g. 115200
70 print(
"AFBR-S50: Open Serial Port " + port)
71 self.ser = serial.Serial(port, baudrate)
72 self.ser.timeout = 1.0
73 print(
"AFBR-S50: Serial Port is open " + port +
": " + str(self.ser.is_open))
76 if self.ser.inWaiting() > 0:
77 self.ser.read(self.ser.inWaiting())
81 Deletes the class and closes the opened serial port.
85 def write(self, tx: bytes):
87 Sends a SCI message and waits for an optional answer and
88 the mandatory acknowledge.
90 If any answer is received, it is returned as bytearray.
92 @param tx (bytes): The data message (incl. excape bytes) as byte array to be sent.
93 @return Returns the received answer (ACK or NAK) as byte array. None if no answer was received.
95 print(
"Sending: " + tx.hex())
97 return self.__wait_for_ack(tx[1])
99 def __wait_for_ack(self, txcmd):
101 Waits for an acknowledge signal for the specified command.
102 If an answer is received before the acknowledge is received,
103 the answer is returned as a bytearray.
104 If no acknowledge or any other command is received, an
106 @param txcmd (byte): The TX command byte to await an acknowledge for.
107 @return Returns the received answer (ACK or NAK) as byte array. None if no answer was received.
113 rx = bytearray(self.ser.read_until(self.stop_byte))
115 raise Exception(
"No data was read from the RX line.")
117 if rx[0] != self.start_byte[0]
or rx[-1] != self.stop_byte[0]:
118 raise Exception(
"Invalid data frame received (start or stop byte missing).")
120 rx = self.__remove_byte_stuffing(rx)
129 elif rxcmd == self.cmd_ack:
138 raise Exception(
"Invalid ACK received")
141 elif rxcmd == self.cmd_nak:
146 raise Exception(
"NAK received")
150 raise Exception(
"Invalid NAK received")
152 def __remove_byte_stuffing(self, rx: bytearray):
154 Removes escape bytes from the incoming message if any
155 @param rx (bytearray): The data message as byte array with escape bytes.
157 rxi = rx.split(self.esc_byte)
159 for i
in range(1, len(rxi)):
164 def __extract_1d_data(self, rx: bytearray):
166 Extracts the 1D data values from the 1D data message.
167 @param rx (bytearray): The 1D data message as byte array without escape bytes.
168 @return Returns the read data as dictionary.
173 s = (rx[3] << 8) + rx[4]
174 d[
"status"] = s
if s < 0x8000
else s - 0x10000
177 t_sec = (rx[5] << 24) + (rx[6] << 16) + (rx[7] << 8) + rx[8]
178 t_usec = (rx[9] << 8) + rx[10]
179 d[
"timestamp"] = t_sec + t_usec * 16.0 / 1.0e6
182 r = (rx[15] << 16) + (rx[16] << 8) + rx[17]
183 d[
"range"] = r / 16384.0
186 a = (rx[18] << 8) + rx[19]
187 d[
"amplitude"] = a / 16.0
191 d[
"signal quality"] = q
197 Reads the serial port and decodes the SCI data messages.
198 Currently only 1D data messages are supported.
199 If no data is pending to be read, the function immediately
200 return with None. If other data than measurement data was read,
201 the function returns with None.
202 Otherwise it returns a dictionary with the extracted data values.
203 @return Returns the read data as dictionary. None if no data has been read.
205 if self.ser.inWaiting() == 0:
209 rx = bytearray(self.ser.read_until(self.stop_byte))
211 raise Exception(
"No data was read from the RX line.")
213 if rx[0] != self.start_byte[0]
or rx[-1] != self.stop_byte[0]:
214 raise Exception(
"Invalid data frame received (start or stop byte missing).")
216 rx = self.__remove_byte_stuffing(rx)
222 print(
"Device Log: " + str(rx[8:-2]))
225 return self.__extract_1d_data(rx)
228 print(
"Received Unknown Data Frame: " + rx.hex())
231 if __name__ ==
"__main__":
235 s50 = AFBR_S50(port, baudrate)
246 print(
"setting data output mode to 1d data only")
247 s50.write(bytes.fromhex(
"02 41 07 F5 03"))
261 print(
"setting frame rate to 5 Hz (i.e. frame time to 0.2 sec)")
262 s50.write(bytes.fromhex(
"02 43 00 1B FC 0D 40 85 03"))
266 print(
"starting measurements in timer based auto mode")
267 s50.write(bytes.fromhex(
"02 11 D0 03"))
270 print(
"read measurement data")
275 f
"{d['timestamp']:10.6f} sec | "
276 + f
"range: {d['range']:6.3f} m | "
277 + f
"amplitude: {d['amplitude']:8.3f} | "
278 + f
"signal quality: {d['signal quality']:3d} | "
279 + f
"status: {d['status']:5d} |"
286 except KeyboardInterrupt:
289 print(
"stop measurements")
290 s50.write(bytes.fromhex(
"02 12 F7 03"))