Select Git revision
UserStructuralData.cs
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
connect.py 9.27 KiB
#!/usr/bin/env python3
# Written by Kim Brose <kim.brose@rwth-aachen.de> for IENT of RWTH Aachen
# with inspiration by https://github.com/igomezal/bluetoothctl_helper
# Modified by Dominik Mehlem (11/2021) <mehlem@ient.rwth-aachen.de>
# note: EV3 uses Bluetooth SPP (Serial Port Profile) to connect to a virtual serial port.
# Your system must have this profile enabled. See the lmcd doc for more info.
import os.path
import subprocess as sp
from time import sleep
import re
from dataclasses import dataclass
import io
# structs
# bluetooth device
@dataclass
class BTDevice:
name: str
MAC: str
# rfcomm binding
@dataclass
class RFCBinding:
device: str
MAC: str
channel: int
##### SP HELPERS #####
# https://stackoverflow.com/a/52544846
def write(process, message):
process.stdin.write(f"{message.strip()}\n".encode("utf-8"))
process.stdin.flush()
##### CHECK FOR EXISTING DEVICES #####
def find_existing_binds():
# rfcomm without arguments displays status
args = f'sudo rfcomm'.split()
output = sp.run(args, capture_output=True)
binds = output.stdout.decode("utf-8").strip().split(sep='\n')
binds = list(filter(None, binds)) # strip empty
status = list()
for s in binds:
s = re.split(r'(rfcomm[0-9]+):\s+([A-Z0-9:]{17})\s+channel ([0-9]+).*', s)
s = list(filter(None, s)) # remove empty strings
s = RFCBinding(device=s[0], MAC=s[1], channel=s[2])
status.append(s)
return status
def print_existing_binds():
binds = find_existing_binds()
for b in binds:
print(f'/dev/{b.device:8} bound to {b.MAC} (using Bluetooth channel {b.channel})')
if len(binds) > 0:
print('If the device you want to connect is already listed,')
print('use the existing rfcomm device and abort this script.\n')
return binds
def find_free_dev(device, ID):
while os.path.exists(f"{device}{ID}"):
ID += 1
return f"{device}{ID}"
##### SCAN FOR DEVICES #####
def scan_devices(countdown=10):
# we will use the interactive bluetootctl program
args = ['bluetoothctl']
with sp.Popen(args, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE) as btctl:
# debug
# echo -e "select $ctrl\n" >&${COPROC[1]}
print('Make sure your EV3 is on and Bluetooth is set to visible.')
sleep(1)
input('Then press Enter to start scanning.\n')
# tell bluetoothctl to start scanning
write(btctl, 'scan on')
# display countdown during scanning
for c in range(countdown, -1, -1):
print(f'\rScanning {c:02} secs...', end='')
sleep(1)
print(' scan done')
write(btctl, 'scan off')
sleep(1)
# save the list of devices from the scan
write(btctl, 'devices')
sleep(1)
write(btctl, 'exit')
try:
output, _ = btctl.communicate(timeout=3)
except sp.TimeoutExpired:
btctl.kill()
output, _ = btctl.communicate()
output = output.decode("utf-8").strip()
# debug
# print(output)
# print(read(btctl.stderr))
##### PREPARE DEVICE LIST FOR MENU #####
# split into list at linebreaks
output = output.split(sep='\n')
# debug
# print(output)
# looking for lines like "Device MAC-ADDR EV3-DD-L"
# all our EV3 bricks are named EV3-DD-L where D are digits and L letters
ev3 = re.compile(r'^Device\s+([A-Z0-9:]{17})\s+(EV3-[0-9]{2}-[A-Z])')
unspecific = re.compile(r'^Device\s+([A-Z0-9:]{17})\s+(.*)$')
l_ev3 = list()
l_uns = list()
for brick in output:
if ev3.search(brick):
brick = ev3.split(brick)
brick = list(filter(None, brick)) # remove empty strings
brick = BTDevice(MAC=brick[0], name=brick[1])
l_ev3.append(brick)
elif unspecific.search(brick):
brick = unspecific.split(brick)
brick = list(filter(None, brick)) # remove empty strings
brick = BTDevice(MAC=brick[0], name=brick[1])
l_uns.append(brick)
print()
return l_ev3 + l_uns
##### PRESENT MENU #####
def remove_binds(allbricks, binds):
# delete existing binds from list
bricks = allbricks
for br in allbricks:
for bi in binds:
if br.MAC == bi.MAC:
bricks.remove(br)
return bricks
def print_menu(bricks):
print('Found devices:')
print(f'{0}) scan again')
i = 1
for b in bricks:
print(f'{i}) {b.name} ({b.MAC})')
i += 1
def menu(bricks):
print_menu(bricks)
selection = None
while not (selection in range(1, len(bricks) + 1)):
selection = input('Enter number to select: ')
try:
selection = int(selection)
except:
selection = None
if selection == 0:
bricks = scan_devices(15)
print_menu(bricks)
print(selection)
brick = bricks[selection - 1]
# debug
# print(f'using MAC: {brick.MAC}')
print()
return brick
##### PAIR/AUTHENTICATE BT DEVICE #####
def print_success(brick, device):
##### PRINT RESULTS #####
print(f'{brick.name} ({brick.MAC}) is now connected to {device}.')
# we can now use our bluetooth-EV3 with matlab
print(f'''
################ USE WITH MATLAB ############
1. b=EV3();
2. b.connect('bt','serPort','{device}');
3. b.beep();
#############################################
''')
def is_paired(btctl, MAC):
write(btctl, f'info {MAC}')
output = ''
write(btctl, 'exit')
try:
out, _ = btctl.communicate(timeout=3)
except sp.TimeoutExpired:
btctl.kill()
out, _ = btctl.communicate()
#print(out)
output += out.decode("utf-8").strip()
# debug
#print(line)
#print(output)
return (f'{MAC} Paired: yes' in output), output
def pair(brick, PIN, device, channel=1):
MAC = brick.MAC
name = brick.name
# open btctl agent to catch anything going on
args = ['bluetoothctl']
paired = False
while not paired:
with sp.Popen(args, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE) as btctl:
output = ''
#print(output)
paired, o = is_paired(btctl, MAC)
#print(paired)
output += o
with sp.Popen(args, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE) as btctl:
if paired:
print('Device is paired.')
else:
# initiate pairing. it takes a sec to appear on the EV3.
# the purpose of doing this here is to do it in a controlled manner so it can be semi-automatic,
# since the daemon does not automatically pop up a PIN request form upon receiving from the device.
write(btctl, f'pair {MAC}')
print('Connecting...')
sleep(3)
write(btctl, f'{PIN}') # "enter" pin
print(f'Pairing {name}: accept at the device AND confirm PIN. (PIN {PIN})')
sleep(1)
print('THEN press Enter to continue...')
input('If the prompt does not appear in a few seconds, press Enter to retry pairing.')
write(btctl, f'{PIN}') # "enter" pin
for c in range(5):
print('.', end='', flush=True)
sleep(1)
print()
write(btctl, 'exit')
# verify success?
try:
out, _ = btctl.communicate(timeout=3)
except sp.TimeoutExpired:
btctl.kill()
out, _ = btctl.communicate()
print(out)
out = out.decode("utf-8").strip()
output += out
#output += read(btctl.stdout)
paired = paired or 'Paired: yes' in output
# debug
# print(output)
# print(read(btctl.stderr))
##### BIND BT DEVICE TO RFCOMM DEVICE #####
# rfcomm will initiate pairing and then keep the connection open
args = f'sudo rfcomm bind {device} {MAC} {channel}'.split()
rfcomm = sp.run(args, capture_output=True)
# debug
# print(rfcomm)
def main():
##### VARIABLES SETUP #####
device = '/dev/rfcomm' # "radio frequency communication" protocol, upon which SPP used by EV3 is based
ID = 0 # start at 0
channel = 1 # SPP channel
PIN='1234' # EV3 default PIN
output='' # will be the list of found BT/EV3 devices
# MAC = '00:16:53:52:E3:5A' # debug: EV3-21-G
# ctrl = '00:04:0E:8D:36:70' # debug: 20-A blue#2
# a certain amount of time is required before we receive the device name/alias from the EV3, so wait 10 secs
scan = 10
##### RUN #####
binds = print_existing_binds()
device = find_free_dev(device, ID)
bricks = scan_devices(scan)
while len(bricks) < 1:
print('No EV3 bricks found, retrying for a little longer...')
# if scanning failed, scan longer
scan += 10
bricks = scan_devices(scan)
# debug
# print(bricks)
# print(binds)
bricks = remove_binds(bricks, binds)
brick = menu(bricks)
pair(brick=brick, PIN=PIN, device=device, channel=1)
print()
if brick.MAC in [b.MAC for b in find_existing_binds()]:
print_success(brick, device)
else:
print('Something went wrong, please try again.')
if __name__ == "__main__":
main()