Skip to main content
Sign in
Snippets Groups Projects
Select Git revision
  • f3865a3e4e7b9fe89b7d765dc64765c0689ed6be
  • master default protected
  • gitkeep
  • dev protected
  • Issue/2464-invalidateMeta
  • Issue/2309-docs
  • Issue/2462-removeTraces
  • Hotfix/2459-EncodingPath
  • Hotfix/2452-linkedDeletion
  • Issue/1792-newMetadataStructure
  • Hotfix/2371-fixGitLabinRCV
  • Fix/xxxx-activateGitlab
  • Issue/2349-gitlabHttps
  • Issue/2287-guestRole
  • Issue/2102-gitLabResTypeRCV
  • Hotfix/2254-fixContentLenghtCalculation
  • Fix/xxxx-resourceVisibility
  • Issue/1951-quotaImplementation
  • Issue/2162-fixFolderResponse
  • Issue/2158-emailServicedesk
  • Hotfix/2141-fileUploadErrors
  • v3.3.4
  • v3.3.3
  • v3.3.2
  • v3.3.1
  • v3.3.0
  • v3.2.3
  • v3.2.2
  • v3.2.1
  • v3.2.0
  • v3.1.2
  • v3.1.1
  • v3.1.0
  • v3.0.6
  • v3.0.5
  • v3.0.4
  • v3.0.3
  • v3.0.2
  • v3.0.1
  • v3.0.0
  • v2.8.2
41 results

BlobController.cs

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    connect.py 9.27 KiB
    #!/usr/bin/env python3
    
    # Written by Kim Brose <kim.brose@rwth-aachen.de> for IENT of RWTH Aachen
    # with inspiration by https://github.com/igomezal/bluetoothctl_helper
    # Modified by Dominik Mehlem (11/2021) <mehlem@ient.rwth-aachen.de>
    
    # note: EV3 uses Bluetooth SPP (Serial Port Profile) to connect to a virtual serial port.
    # Your system must have this profile enabled. See the lmcd doc for more info.
    
    import os.path
    import subprocess as sp
    from time import sleep
    import re
    from dataclasses import dataclass
    import io
    
    
    # structs
    # bluetooth device
    @dataclass
    class BTDevice:
        name: str
        MAC: str
    
    # rfcomm binding
    @dataclass
    class RFCBinding:
        device: str
        MAC: str
        channel: int
    
    
    ##### SP HELPERS #####
    # https://stackoverflow.com/a/52544846
    
    def write(process, message):
        process.stdin.write(f"{message.strip()}\n".encode("utf-8"))
        process.stdin.flush()
    
    
    ##### CHECK FOR EXISTING DEVICES #####
    
    def find_existing_binds():
        # rfcomm without arguments displays status
        args = f'sudo rfcomm'.split()
        output = sp.run(args, capture_output=True)
        binds = output.stdout.decode("utf-8").strip().split(sep='\n')
        binds = list(filter(None, binds)) # strip empty
    
        status = list()
        for s in binds:
            s = re.split(r'(rfcomm[0-9]+):\s+([A-Z0-9:]{17})\s+channel ([0-9]+).*', s)
            s = list(filter(None, s)) # remove empty strings
            s = RFCBinding(device=s[0], MAC=s[1], channel=s[2])
            status.append(s)
    
        return status
    
    
    def print_existing_binds():
        binds = find_existing_binds()
    
        for b in binds:
            print(f'/dev/{b.device:8} bound to {b.MAC} (using Bluetooth channel {b.channel})')
    
        if len(binds) > 0:
            print('If the device you want to connect is already listed,')
            print('use the existing rfcomm device and abort this script.\n')
    
        return binds
    
    
    def find_free_dev(device, ID):
        while os.path.exists(f"{device}{ID}"):
            ID += 1
    
        return f"{device}{ID}"
    
    
    ##### SCAN FOR DEVICES #####
    
    def scan_devices(countdown=10):
        # we will use the interactive bluetootctl program
        args = ['bluetoothctl']
        with sp.Popen(args, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE) as btctl:
    
            # debug
            # echo -e "select $ctrl\n" >&${COPROC[1]}
    
            print('Make sure your EV3 is on and Bluetooth is set to visible.')
            sleep(1)
            input('Then press Enter to start scanning.\n')
    
            # tell bluetoothctl to start scanning
            write(btctl, 'scan on')
    
            # display countdown during scanning
            for c in range(countdown, -1, -1):
                print(f'\rScanning {c:02} secs...', end='')
                sleep(1)
    
            print(' scan done')
            write(btctl, 'scan off')
            sleep(1)
    
            # save the list of devices from the scan
            write(btctl, 'devices')
            sleep(1)
            write(btctl, 'exit')
            try:
                output, _ = btctl.communicate(timeout=3)
            except sp.TimeoutExpired:
                btctl.kill()
                output, _ = btctl.communicate()
            output = output.decode("utf-8").strip()
    
            # debug
            # print(output)
            # print(read(btctl.stderr))
    
        ##### PREPARE DEVICE LIST FOR MENU #####
    
        # split into list at linebreaks
        output = output.split(sep='\n')
    
        # debug
        # print(output)
    
        # looking for lines like "Device MAC-ADDR EV3-DD-L"
        # all our EV3 bricks are named EV3-DD-L where D are digits and L letters
        ev3 = re.compile(r'^Device\s+([A-Z0-9:]{17})\s+(EV3-[0-9]{2}-[A-Z])')
        unspecific = re.compile(r'^Device\s+([A-Z0-9:]{17})\s+(.*)$')
    
        l_ev3 = list()
        l_uns = list()
        for brick in output:
            if ev3.search(brick):
                brick = ev3.split(brick)
                brick = list(filter(None, brick)) # remove empty strings
                brick = BTDevice(MAC=brick[0], name=brick[1])
                l_ev3.append(brick)
            elif unspecific.search(brick):
                brick = unspecific.split(brick)
                brick = list(filter(None, brick)) # remove empty strings
                brick = BTDevice(MAC=brick[0], name=brick[1])
                l_uns.append(brick)
    
        print()
        return l_ev3 + l_uns
    
    
    ##### PRESENT MENU #####
    
    def remove_binds(allbricks, binds):
        # delete existing binds from list
        bricks = allbricks
        for br in allbricks:
            for bi in binds:
                if br.MAC == bi.MAC:
                    bricks.remove(br)
    
        return bricks
    
    
    def print_menu(bricks):
        print('Found devices:')
        print(f'{0}) scan again')
        i = 1
        for b in bricks:
            print(f'{i}) {b.name} ({b.MAC})')
            i += 1
    
    
    def menu(bricks):
        print_menu(bricks)
    
        selection = None
        while not (selection in range(1, len(bricks) + 1)):
            selection = input('Enter number to select: ')
            try:
                selection = int(selection)
            except:
                selection = None
            if selection == 0:
                bricks = scan_devices(15)
                print_menu(bricks)
    
        print(selection)
        brick = bricks[selection - 1]
    
        # debug
        # print(f'using MAC: {brick.MAC}')
        print()
    
        return brick
    
    
    ##### PAIR/AUTHENTICATE BT DEVICE #####
    
    def print_success(brick, device):
        ##### PRINT RESULTS #####
    
        print(f'{brick.name} ({brick.MAC}) is now connected to {device}.')
    
        # we can now use our bluetooth-EV3 with matlab
        print(f'''
    ################ USE WITH MATLAB ############
    1. b=EV3();
    2. b.connect('bt','serPort','{device}');
    3. b.beep();
    #############################################
    ''')
    
    
    def is_paired(btctl, MAC):
        write(btctl, f'info {MAC}')
    
        output = ''
    
        write(btctl, 'exit')
        try:
            out, _ = btctl.communicate(timeout=3)
        except sp.TimeoutExpired:
            btctl.kill()
            out, _ = btctl.communicate()
            #print(out)
    
        output += out.decode("utf-8").strip()
        
        # debug
        #print(line)
        #print(output)
    
        return (f'{MAC} Paired: yes' in output), output
    
    
    def pair(brick, PIN, device, channel=1):
        MAC = brick.MAC
        name = brick.name
    
        # open btctl agent to catch anything going on
        args = ['bluetoothctl']
        paired = False
    
        while not paired:
            with sp.Popen(args, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE) as btctl:
                output = ''
                #print(output)
                paired, o = is_paired(btctl, MAC)
                #print(paired)
                output += o
    
            with sp.Popen(args, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE) as btctl:
                if paired:
                    print('Device is paired.')
                else:
                    # initiate pairing. it takes a sec to appear on the EV3.
                    # the purpose of doing this here is to do it in a controlled manner so it can be semi-automatic,
                    # since the daemon does not automatically pop up a PIN request form upon receiving from the device.
                    write(btctl, f'pair {MAC}')
                    print('Connecting...')
                    sleep(3)
                    write(btctl, f'{PIN}') # "enter" pin
    
                    print(f'Pairing {name}: accept at the device AND confirm PIN. (PIN {PIN})')
                    sleep(1)
                    print('THEN press Enter to continue...')
                    input('If the prompt does not appear in a few seconds, press Enter to retry pairing.')
                    
                    write(btctl, f'{PIN}') # "enter" pin
    
                    for c in range(5):
                        print('.', end='', flush=True)
                        sleep(1)
                    print()
    
                write(btctl, 'exit')
    
                # verify success?
                try:
                    out, _ = btctl.communicate(timeout=3)
                except sp.TimeoutExpired:
                    btctl.kill()
                    out, _ = btctl.communicate()
                    print(out)
            out = out.decode("utf-8").strip()
            output += out
            #output += read(btctl.stdout)
            paired = paired or 'Paired: yes' in output
    
            # debug
            # print(output)
            # print(read(btctl.stderr))
    
        ##### BIND BT DEVICE TO RFCOMM DEVICE #####
    
        # rfcomm will initiate pairing and then keep the connection open
        args = f'sudo rfcomm bind {device} {MAC} {channel}'.split()
        rfcomm = sp.run(args, capture_output=True)
    
        # debug
        # print(rfcomm)
    
    
    def main():
    
        ##### VARIABLES SETUP #####
    
        device = '/dev/rfcomm' # "radio frequency communication" protocol, upon which SPP used by EV3 is based
        ID = 0 # start at 0
        channel = 1 # SPP channel
        PIN='1234' # EV3 default PIN
        output='' # will be the list of found BT/EV3 devices
    
        # MAC = '00:16:53:52:E3:5A' # debug: EV3-21-G
        # ctrl = '00:04:0E:8D:36:70' # debug: 20-A blue#2
    
        # a certain amount of time is required before we receive the device name/alias from the EV3, so wait 10 secs
        scan = 10
    
        ##### RUN #####
    
        binds = print_existing_binds()
    
        device = find_free_dev(device, ID)
    
        bricks = scan_devices(scan)
    
        while len(bricks) < 1:
            print('No EV3 bricks found, retrying for a little longer...')
            # if scanning failed, scan longer
            scan += 10
            bricks = scan_devices(scan)
    
        # debug
        # print(bricks)
        # print(binds)
    
        bricks = remove_binds(bricks, binds)
    
        brick = menu(bricks)
    
        pair(brick=brick, PIN=PIN, device=device, channel=1)
    
        print()
        if brick.MAC in [b.MAC for b in find_existing_binds()]:
            print_success(brick, device)
        else:
            print('Something went wrong, please try again.')
    
    
    if __name__ == "__main__":
        main()