AFBR-S50 API Reference Manual  v1.6.5
AFBR-S50 Time-of-Flight Sensor SDK for Embedded Software
sci_python_example.py
1 # #############################################################################
2 # ### Example for using the AFBR-S50 API with UART interface ###
3 # #############################################################################
4 #
5 # Prepare your evaluation kit (w/ NXP MKL46z MCU) by flashing the UART binary
6 # to the device. Connect the OpenSDA USB port (NOT the one labeled with KL46Z)
7 # to your computer. Go to the Device/Binary folder install directory of you SDK
8 # (default: C:\Program Files (x86)\Broadcom\AFBR-S50 SDK\Device\Binary) and copy
9 # the AFBR.S50.ExplorerApp.vX.X.X_KL46z_UART.bin (not the *_USB.*!!) file to
10 # the OpenSDA USB drive.
11 #
12 # After flashing, the device is ready to receive commands via the OpenSDA serial
13 # port. Go to your device manager to find out which COM port is assigned to the
14 # device. Type it to the "port" variable below, before starting the script.
15 #
16 # Use Python 3 to run the script. The script requires the pySerial module which
17 # might need to be installed.
18 # To install, run: "pip install pyserial"
19 # See: https://pyserial.readthedocs.io/en/latest/index.html
20 #
21 #
22 # The script sends configuration commands to set the data output mode to 1D data
23 # only and the frame rate to 5 Hz. After setting the configuration, the
24 # measurements are started and the data is extracted from the received data
25 # frames and printed to the console.
26 #
27 #
28 # Note: The CRC values are calculated manually and added before the frames are
29 # sent. You can use the online calculator from the following link w/
30 # CRC8_SAE_J1850_ZERO to obtain the CRC values for a frame:
31 # http://www.sunshine2k.de/coding/javascript/crc/crc_js.html
32 #
33 # #############################################################################
34 
35 import time
36 import serial
37 
38 # input parameters
39 port = "COM4"
40 baudrate = 1_000_000 # bps
41 
42 
43 class AFBR_S50:
44  """
45  Serial Communication Interface for the AFBR-S50 Device.
46  Connects to the device via a UART interface.
47  """
48 
49 
50  start_byte = b"\x02"
51 
52  stop_byte = b"\x03"
53 
54  esc_byte = b"\x1B"
55 
56  cmd_ack = 0x0A
57 
58  cmd_nak = 0x0B
59 
60  ser = None
61 
62  def __init__(self, port, baudrate):
63  """!
64  Initializes the class and opens a serial port w/
65  "115200,8,N,1" serial settings and no timeout.
66 
67  @param port (str): The port number string, e.g. "COM1"
68  @param baudrate (int): The baud rate in bauds per second, e.g. 115200
69  """
70  print("AFBR-S50: Open Serial Port " + port)
71  self.ser = serial.Serial(port, baudrate)
72  self.ser.timeout = 1.0 # seconds
73  print("AFBR-S50: Serial Port is open " + port + ": " + str(self.ser.is_open))
74 
75  # discard old data
76  if self.ser.inWaiting() > 0:
77  self.ser.read(self.ser.inWaiting())
78 
79  def __del__(self):
80  """!
81  Deletes the class and closes the opened serial port.
82  """
83  self.ser.close()
84 
85  def write(self, tx: bytes):
86  """!
87  Sends a SCI message and waits for an optional answer and
88  the mandatory acknowledge.
89 
90  If any answer is received, it is returned as bytearray.
91 
92  @param tx (bytes): The data message (incl. excape bytes) as byte array to be sent.
93  @return Returns the received answer (ACK or NAK) as byte array. None if no answer was received.
94  """
95  print("Sending: " + tx.hex())
96  self.ser.write(tx)
97  return self.__wait_for_ack(tx[1]) # read acknowledge
98 
99  def __wait_for_ack(self, txcmd):
100  """!
101  Waits for an acknowledge signal for the specified command.
102  If an answer is received before the acknowledge is received,
103  the answer is returned as a bytearray.
104  If no acknowledge or any other command is received, an
105  exception is raised.
106  @param txcmd (byte): The TX command byte to await an acknowledge for.
107  @return Returns the received answer (ACK or NAK) as byte array. None if no answer was received.
108  """
109  answer = None
110 
111  while True:
112  # Read until next stop byte and remove escape bytes
113  rx = bytearray(self.ser.read_until(self.stop_byte))
114  if len(rx) == 0:
115  raise Exception("No data was read from the RX line.")
116 
117  if rx[0] != self.start_byte[0] or rx[-1] != self.stop_byte[0]:
118  raise Exception("Invalid data frame received (start or stop byte missing).")
119 
120  rx = self.__remove_byte_stuffing(rx)
121 
122  # Extract command byte (first after start byte)
123  rxcmd = rx[1]
124 
125  if rxcmd == txcmd: # response received
126  answer = rx
127 
128  # acknowledge signal received
129  elif rxcmd == self.cmd_ack:
130  ackcmd = rx[2]
131 
132  # acknowledge for the current command
133  if ackcmd == txcmd:
134  return answer
135 
136  # acknowledge for any other command
137  else:
138  raise Exception("Invalid ACK received")
139 
140  # not-acknowledge signal received
141  elif rxcmd == self.cmd_nak:
142  nakcmd = rx[2]
143 
144  # not-acknowledge for current command
145  if nakcmd == txcmd:
146  raise Exception("NAK received")
147 
148  # not-acknowledge for any other command
149  else:
150  raise Exception("Invalid NAK received")
151 
152  def __remove_byte_stuffing(self, rx: bytearray):
153  """!
154  Removes escape bytes from the incoming message if any
155  @param rx (bytearray): The data message as byte array with escape bytes.
156  """
157  rxi = rx.split(self.esc_byte)
158  rx = b""
159  for i in range(1, len(rxi)):
160  # invert byte after escape byte (also inverts start byte, but we don't care..)
161  rxi[i][0] ^= 0xFF
162  return rx.join(rxi)
163 
164  def __extract_1d_data(self, rx: bytearray):
165  """!
166  Extracts the 1D data values from the 1D data message.
167  @param rx (bytearray): The 1D data message as byte array without escape bytes.
168  @return Returns the read data as dictionary.
169  """
170  d = dict()
171 
172  # Extract Status:
173  s = (rx[3] << 8) + rx[4]
174  d["status"] = s if s < 0x8000 else s - 0x10000 # convert to signed 16-bit int
175 
176  # Extract Time Stamp
177  t_sec = (rx[5] << 24) + (rx[6] << 16) + (rx[7] << 8) + rx[8]
178  t_usec = (rx[9] << 8) + rx[10]
179  d["timestamp"] = t_sec + t_usec * 16.0 / 1.0e6
180 
181  # Extract Range:
182  r = (rx[15] << 16) + (rx[16] << 8) + rx[17]
183  d["range"] = r / 16384.0 # convert from Q9.14
184 
185  # Extract Amplitude:
186  a = (rx[18] << 8) + rx[19]
187  d["amplitude"] = a / 16.0 # convert from UQ12.4
188 
189  # Extract Signal Quality:
190  q = rx[20]
191  d["signal quality"] = q
192 
193  return d
194 
195  def read_data(self):
196  """!
197  Reads the serial port and decodes the SCI data messages.
198  Currently only 1D data messages are supported.
199  If no data is pending to be read, the function immediately
200  return with None. If other data than measurement data was read,
201  the function returns with None.
202  Otherwise it returns a dictionary with the extracted data values.
203  @return Returns the read data as dictionary. None if no data has been read.
204  """
205  if self.ser.inWaiting() == 0:
206  return None
207 
208  # Read until next stop byte and remove escape bytes
209  rx = bytearray(self.ser.read_until(self.stop_byte))
210  if len(rx) == 0:
211  raise Exception("No data was read from the RX line.")
212 
213  if rx[0] != self.start_byte[0] or rx[-1] != self.stop_byte[0]:
214  raise Exception("Invalid data frame received (start or stop byte missing).")
215 
216  rx = self.__remove_byte_stuffing(rx)
217 
218  # extract command byte (first after start byte)
219  cmd = rx[1]
220 
221  if cmd == 0x06: # Log Message
222  print("Device Log: " + str(rx[8:-2]))
223 
224  elif cmd == 0xB6: # 1D Data Set
225  return self.__extract_1d_data(rx)
226 
227  else: # Unknown or not handled here
228  print("Received Unknown Data Frame: " + rx.hex())
229 
230 
231 if __name__ == "__main__":
232 
233  try:
234  # Create a new instance and open a serial port connection to the device.
235  s50 = AFBR_S50(port, baudrate)
236 
237  # Setting data output mode to 1D data only
238  # The message is composed of:
239  # [START][CMD][PARAM][CRC][STOP]
240  # where:
241  # [START] = 0x02; start byte
242  # [CMD] = 0x41; command byte: data streaming mode
243  # [PARAM] = 0x07; parameter of command: 1d data streaming
244  # [CRC] = 0xF5; checksum: pre-calculated in online calculator
245  # [STOP] = 0x03; stop byte
246  print("setting data output mode to 1d data only")
247  s50.write(bytes.fromhex("02 41 07 F5 03"))
248 
249  # Setting frame time to 200000 µsec = 0x00030D40 µsec
250  # The message is composed of:
251  # [START][CMD][PARAM(0)]...[PARAM(N)][CRC][STOP]
252  # where:
253  # [START] = 0x02; start byte
254  # [CMD] = 0x43; command byte: measurement frame time
255  # [PARAM(x)] = 0x001BFC0D40; 4-bit parameter of command: 0x00 03 0D 40 w/ escape bytes
256  # [CRC] = 0x85; checksum: pre-calculated in online calculator
257  # [STOP] = 0x03; stop byte
258  #
259  # NOTE: the 0x03 byte must be escaped and inverted (i.e. use 0x1BFC instead of 0x03)
260  # The CRC is calculated on the original data, i.e. 0x43 00 03 0D 40 => 0x85
261  print("setting frame rate to 5 Hz (i.e. frame time to 0.2 sec)")
262  s50.write(bytes.fromhex("02 43 00 1B FC 0D 40 85 03"))
263 
264  # Starting measurements
265  # [CMD] = 0x11; command byte: start timer based measurements
266  print("starting measurements in timer based auto mode")
267  s50.write(bytes.fromhex("02 11 D0 03"))
268 
269  # Read measurement data
270  print("read measurement data")
271  while True:
272  d = s50.read_data()
273  if d != None:
274  print(
275  f"{d['timestamp']:10.6f} sec | "
276  + f"range: {d['range']:6.3f} m | "
277  + f"amplitude: {d['amplitude']:8.3f} | "
278  + f"signal quality: {d['signal quality']:3d} | "
279  + f"status: {d['status']:5d} |"
280  )
281 
282  else:
283  # do other stuff
284  time.sleep(0.1)
285 
286  except KeyboardInterrupt:
287  # Try to stop measurements
288  # [CMD] = 0x12; command byte: stop timer based measurements
289  print("stop measurements")
290  s50.write(bytes.fromhex("02 12 F7 03"))
print
status_t print(const char *fmt_s,...)
A printf-like function to print formatted data to an debugging interface.
Definition: sci_log.c:106