AFBR-S50 API Reference Manual v1.5.6
AFBR-S50 Time-of-Flight Sensor SDK for Embedded Software
Loading...
Searching...
No Matches
sci_python_example.py
1# #############################################################################
2# ### Example for using the AFBR-S50 API with UART interface ###
3# #############################################################################
4#
5# Prepare your evaluation kit (w/ NXP MKL46z MCU) by flashing the UART binary
6# to the device. Connect the OpenSDA USB port (NOT the one labeled with KL46Z)
7# to your computer. Go to the Device/Binary folder install directory of you SDK
8# (default: C:\Program Files (x86)\Broadcom\AFBR-S50 SDK\Device\Binary) and copy
9# the AFBR.S50.ExplorerApp.vX.X.X_KL46z_UART.bin (not the *_USB.*!!) file to
10# the OpenSDA USB drive.
11#
12# After flashing, the device is ready to receive commands via the OpenSDA serial
13# port. Go to your device manager to find out which COM port is assigned to the
14# device. Type it to the "port" variable below, before starting the script.
15#
16# Use Python 3 to run the script. The script requires the pySerial module which
17# might need to be installed.
18# To install, run: "pip install pyserial"
19# See: https://pyserial.readthedocs.io/en/latest/index.html
20#
21#
22# The script sends configuration commands to set the data output mode to 1D data
23# only and the frame rate to 5 Hz. After setting the configuration, the
24# measurements are started and the data is extracted from the received data
25# frames and printed to the console.
26#
27#
28# Note: The CRC values are calculated manually and added before the frames are
29# sent. You can use the online calculator from the following link w/
30# CRC8_SAE_J1850_ZERO to obtain the CRC values for a frame:
31# http://www.sunshine2k.de/coding/javascript/crc/crc_js.html
32#
33# #############################################################################
34
35import time
36import serial
37
38# input parameters
39port = "COM4"
40baudrate = 115200
41
42
43class AFBR_S50:
44 """
45 Serial Communication Interface for the AFBR-S50 Device.
46 Connects to the device via a UART interface.
47 """
48
49
50 start_byte = b"\x02"
51
52 stop_byte = b"\x03"
53
54 esc_byte = b"\x1B"
55
56 cmd_ack = 0x0A
57
58 cmd_nak = 0x0B
59
60 ser = None
61
62 def __init__(self, port, baudrate):
63 """!
64 Initializes the class and opens a serial port w/
65 "115200,8,N,1" serial settings and no timeout.
66
67 @param port (str): The port number string, e.g. "COM1"
68 @param baudrate (int): The baud rate in bauds per second, e.g. 115200
69 """
70 print("AFBR-S50: Open Serial Port " + port)
71 self.ser = serial.Serial(port, baudrate)
72 self.ser.timeout = 1.0 # seconds
73 print("AFBR-S50: Serial Port is open " + port + ": " + str(self.ser.is_open))
74
75 # discard old data
76 if self.ser.inWaiting() > 0:
77 self.ser.read(self.ser.inWaiting())
78
79 def __del__(self):
80 """!
81 Deletes the class and closes the opened serial port.
82 """
83 self.ser.close()
84
85 def write(self, tx: bytes):
86 """!
87 Sends a SCI message and waits for an optional answer and
88 the mandatory acknowledge.
89
90 If any answer is received, it is returned as bytearray.
91
92 @param tx (bytes): The data message (incl. excape bytes) as byte array to be sent.
93 @return Returns the received answer (ACK or NAK) as byte array. None if no answer was received.
94 """
95 print("Sending: " + tx.hex())
96 self.ser.write(tx)
97 return self.__wait_for_ack(tx[1]) # read acknowledge
98
99 def __wait_for_ack(self, txcmd):
100 """!
101 Waits for an acknowledge signal for the specified command.
102 If an answer is received before the acknowledge is received,
103 the answer is returned as a bytearray.
104 If no acknowledge or any other command is received, an
105 exception is raised.
106 @param txcmd (byte): The TX command byte to await an acknowledge for.
107 @return Returns the received answer (ACK or NAK) as byte array. None if no answer was received.
108 """
109 answer = None
110
111 while True:
112 # Read until next stop byte and remove escape bytes
113 rx = bytearray(self.ser.read_until(self.stop_byte))
114 if len(rx) == 0:
115 raise Exception("No data was read from the RX line.")
116
117 if rx[0] != self.start_byte[0] or rx[-1] != self.stop_byte[0]:
118 raise Exception("Invalid data frame received (start or stop byte missing).")
119
120 rx = self.__remove_byte_stuffing(rx)
121
122 # Extract command byte (first after start byte)
123 rxcmd = rx[1]
124
125 if rxcmd == txcmd: # response received
126 answer = rx
127
128 # acknowledge signal received
129 elif rxcmd == self.cmd_ack:
130 ackcmd = rx[2]
131
132 # acknowledge for the current command
133 if ackcmd == txcmd:
134 return answer
135
136 # acknowledge for any other command
137 else:
138 raise Exception("Invalid ACK received")
139
140 # not-acknowledge signal received
141 elif rxcmd == self.cmd_nak:
142 nakcmd = rx[2]
143
144 # not-acknowledge for current command
145 if nakcmd == txcmd:
146 raise Exception("NAK received")
147
148 # not-acknowledge for any other command
149 else:
150 raise Exception("Invalid NAK received")
151
152 def __remove_byte_stuffing(self, rx: bytearray):
153 """!
154 Removes escape bytes from the incoming message if any
155 @param rx (bytearray): The data message as byte array with escape bytes.
156 """
157 rxi = rx.split(self.esc_byte)
158 rx = b""
159 for i in range(1, len(rxi)):
160 # invert byte after escape byte (also inverts start byte, but we don't care..)
161 rxi[i][0] ^= 0xFF
162 return rx.join(rxi)
163
164 def __extract_1d_data(self, rx: bytearray):
165 """!
166 Extracts the 1D data values from the 1D data message.
167 @param rx (bytearray): The 1D data message as byte array without escape bytes.
168 @return Returns the read data as dictionary.
169 """
170 d = dict()
171
172 # Extract Status:
173 s = (rx[3] << 8) + rx[4]
174 d["status"] = s if s < 0x8000 else s - 0x10000 # convert to signed 16-bit int
175
176 # Extract Time Stamp
177 t_sec = (rx[5] << 24) + (rx[6] << 16) + (rx[7] << 8) + rx[8]
178 t_usec = (rx[9] << 8) + rx[10]
179 d["timestamp"] = t_sec + t_usec * 16.0 / 1.0e6
180
181 # Extract Range:
182 r = (rx[15] << 16) + (rx[16] << 8) + rx[17]
183 d["range"] = r / 16384.0 # convert from Q9.14
184
185 # Extract Amplitude:
186 a = (rx[18] << 8) + rx[19]
187 d["amplitude"] = a / 16.0 # convert from UQ12.4
188
189 # Extract Signal Quality:
190 q = rx[20]
191 d["signal quality"] = q
192
193 return d
194
195 def read_data(self):
196 """!
197 Reads the serial port and decodes the SCI data messages.
198 Currently only 1D data messages are supported.
199 If no data is pending to be read, the function immediately
200 return with None. If other data than measurement data was read,
201 the function returns with None.
202 Otherwise it returns a dictionary with the extracted data values.
203 @return Returns the read data as dictionary. None if no data has been read.
204 """
205 if self.ser.inWaiting() == 0:
206 return None
207
208 # Read until next stop byte and remove escape bytes
209 rx = bytearray(self.ser.read_until(self.stop_byte))
210 if len(rx) == 0:
211 raise Exception("No data was read from the RX line.")
212
213 if rx[0] != self.start_byte[0] or rx[-1] != self.stop_byte[0]:
214 raise Exception("Invalid data frame received (start or stop byte missing).")
215
216 rx = self.__remove_byte_stuffing(rx)
217
218 # extract command byte (first after start byte)
219 cmd = rx[1]
220
221 if cmd == 0x06: # Log Message
222 print("Device Log: " + str(rx[8:-2]))
223
224 elif cmd == 0xB6: # 1D Data Set
225 return self.__extract_1d_data(rx)
226
227 else: # Unknown or not handled here
228 print("Received Unknown Data Frame: " + rx.hex())
229
230
231if __name__ == "__main__":
232
233 try:
234 # Create a new instance and open a serial port connection to the device.
235 s50 = AFBR_S50(port, baudrate)
236
237 # Setting data output mode to 1D data only
238 # The message is composed of:
239 # [START][CMD][PARAM][CRC][STOP]
240 # where:
241 # [START] = 0x02; start byte
242 # [CMD] = 0x41; command byte: data streaming mode
243 # [PARAM] = 0x07; parameter of command: 1d data streaming
244 # [CRC] = 0xF5; checksum: pre-calculated in online calculator
245 # [STOP] = 0x03; stop byte
246 print("setting data output mode to 1d data only")
247 s50.write(bytes.fromhex("02 41 07 F5 03"))
248
249 # Setting frame time to 200000 µsec = 0x00030D40 µsec
250 # The message is composed of:
251 # [START][CMD][PARAM(0)]...[PARAM(N)][CRC][STOP]
252 # where:
253 # [START] = 0x02; start byte
254 # [CMD] = 0x43; command byte: measurement frame time
255 # [PARAM(x)] = 0x001BFC0D40; 4-bit parameter of command: 0x00 03 0D 40 w/ escape bytes
256 # [CRC] = 0x85; checksum: pre-calculated in online calculator
257 # [STOP] = 0x03; stop byte
258 #
259 # NOTE: the 0x03 byte must be escaped and inverted (i.e. use 0x1BFC instead of 0x03)
260 # The CRC is calculated on the original data, i.e. 0x43 00 03 0D 40 => 0x85
261 print("setting frame rate to 5 Hz (i.e. frame time to 0.2 sec)")
262 s50.write(bytes.fromhex("02 43 00 1B FC 0D 40 85 03"))
263
264 # Starting measurements
265 # [CMD] = 0x11; command byte: start timer based measurements
266 print("starting measurements in timer based auto mode")
267 s50.write(bytes.fromhex("02 11 D0 03"))
268
269 # Read measurement data
270 print("read measurement data")
271 while True:
272 d = s50.read_data()
273 if d != None:
274 print(
275 f"{d['timestamp']:10.6f} sec | "
276 + f"range: {d['range']:6.3f} m | "
277 + f"amplitude: {d['amplitude']:8.3f} | "
278 + f"signal quality: {d['signal quality']:3d} | "
279 + f"status: {d['status']:5d} |"
280 )
281
282 else:
283 # do other stuff
284 time.sleep(0.1)
285
286 except KeyboardInterrupt:
287 # Try to stop measurements
288 # [CMD] = 0x12; command byte: stop timer based measurements
289 print("stop measurements")
290 s50.write(bytes.fromhex("02 12 F7 03"))
291
status_t print(const char *fmt_s,...)
A printf-like function to print formatted data to an debugging interface.
Definition sci_log.c:106