Vinz1911

Spike Prime + PowerUP Remote in Python - Noob needs Help

Recommended Posts

Hi Everyone :)

My Name is Vinz, im new in this Forum, I found it after deeper looking in Coding with my Spike Prime.

With the latest Firmware for Spike Prime we got full access to the Micropython ubluetooth library and I startet to try out some things to make my PowerUP Remote working with my Spike Prime.

The Current State is, that i can successfully connect the remote to the spike prime and I also can read some Data. So but here I started struggling, I'm not an expert in BLE and I don't understand (also with the help of the LWP 3.0 Documentation)

What I need to do now to read out which button I pressed on the remote. I also see the same bytes reading from my remote. Is it necessary to send a "start" command to the PowerUP Remote to gather the data which button is pressed ?

 

Maybe someone can light me up :) 

Here is also my current WIP Code, that I run on my spike prime.

 

Thank you and best regards

Vinz

 

import utime
import ubluetooth
import ubinascii
import struct
from micropython import const

_IRQ_CENTRAL_CONNECT = const(1 << 0)
_IRQ_CENTRAL_DISCONNECT = const(1 << 1)
_IRQ_GATTS_WRITE = const(1 << 2)
_IRQ_GATTS_READ_REQUEST = const(1 << 3)
_IRQ_SCAN_RESULT = const(1 << 4)
_IRQ_SCAN_COMPLETE = const(1 << 5)
_IRQ_PERIPHERAL_CONNECT = const(1 << 6)
_IRQ_PERIPHERAL_DISCONNECT = const(1 << 7)
_IRQ_GATTC_SERVICE_RESULT = const(1 << 8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(1 << 9)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(1 << 10)
_IRQ_GATTC_READ_RESULT = const(1 << 11)
_IRQ_GATTC_WRITE_STATUS = const(1 << 12)
_IRQ_GATTC_NOTIFY = const(1 << 13)
_IRQ_GATTC_INDICATE = const(1 << 14)

_IRQ_GATTC_SERVICE_DONE = const(10)


_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)

_COMPANY_IDENTIFIER_CODES = {
    "0397": "LEGO System A/S"
}

_LEGO_SERVICE_UUID = ubluetooth.UUID("00001623-1212-EFDE-1623-785FEABCD123")
_LEGO_SERVICE_CHAR = ubluetooth.UUID("00001624-1212-EFDE-1623-785FEABCD123")

class PowerUPRemote:
    """Class to deal with LEGO(R) PowerUp(TM) Remote Control for Spike Prime"""

    def __init__(self):
        self._ble = ubluetooth.BLE()
        self._ble.active(True)
        self._ble.irq(handler=self._irq)
        self._decoder = Decoder()
        self._reset()

    def _reset(self):
        self._addr = None
        self._addr_type = None
        self._adv_type = None
        self._services = None
        self._man_data = None
        self._name = None
        self._conn = None
        self._value = None

    # start scan for ble devices
    def scan_start(self, timeout):
        self._ble.gap_scan(timeout, 30000, 30000)

    # stop current scan
    def scan_stop(self):
        self._ble.gap_scan(None)

    # connect to ble device
    def connect(self, addr_type, addr):
        self._ble.gap_connect(addr_type, addr)

    # disconnect from ble device
    def disconnect(self):
        if not self._conn:
            return
        self._ble.gap_disconnect(1025)
        self._reset()

    def read(self):
        self._ble.gattc_read(self._conn, self._value)

    # ble event handler
    def _irq(self, event, data):
        # called for every result of a ble scan
        if event == _IRQ_SCAN_RESULT:
            addr_type, addr, adv_type, rssi, adv_data = data
            self._addr_type = addr_type
            self._addr = bytes(addr)
            self._adv_type = adv_type
            self._name = self._decoder.decode_name(adv_data)
            self._services = self._decoder.decode_services(adv_data)
            self._man_data = self._decoder.decode_manufacturer(adv_data)

        # called after a ble scan is finished
        elif event == _IRQ_SCAN_COMPLETE:
            print("scan finished!")

        # called if a peripheral device is connected
        elif event == _IRQ_PERIPHERAL_CONNECT:
            print("connected peripheral device")
            conn, addr_type, addr = data
            self._conn = conn
            self._ble.gattc_discover_services(self._conn)

        # called if a peripheral device is disconnected
        elif event == _IRQ_PERIPHERAL_DISCONNECT:
            conn, _, _ = data
            print("disconnected peripheral device")

        elif event == _IRQ_GATTC_SERVICE_RESULT:
            # Connected device returned a service.
            conn, start_handle, end_handle, uuid = data
            print("service", data, "uuid", uuid, "Conn", conn)
            if conn == self._conn and uuid == _LEGO_SERVICE_UUID:
                self._ble.gattc_discover_characteristics(self._conn, start_handle, end_handle)

        elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
            # Connected device returned a characteristic.
            conn, def_handle, value_handle, properties, uuid = data
            print("Got Charachterisitic", uuid, value_handle)
            if conn == self._conn and uuid == _LEGO_SERVICE_CHAR:
                self._value = value_handle

        elif event == _IRQ_GATTC_READ_RESULT:
            # A read completed successfully.
            conn_handle, value_handle, char_data = data
            print("Got Data Read", data, ubinascii.hexlify(char_data))

        # called on notification
        elif event == _IRQ_GATTC_NOTIFY:
            conn, value_handle, notify_data = data
            print("Notification")


class Decoder:
    """Class to decode BLE adv_data"""

    def decode_manufacturer(self, payload):
        man_data = []
        n = self._decode_field(payload, const(0xff))
        if not n:
            return []
        company_identifier = ubinascii.hexlify(struct.pack('<h', *struct.unpack('>h', n[0])))
        company_name = _COMPANY_IDENTIFIER_CODES.get(company_identifier.decode(), "?")
        company_data = ubinascii.hexlify(n[0][2:])
        man_data.append(company_identifier.decode())
        man_data.append(company_name)
        man_data.append(company_data)
        return man_data

    def decode_name(self, payload):
        n = self._decode_field(payload, const(0x09))
        return str(n[0], "utf-8") if n else "parsing failed!"

    def decode_services(self, payload):
        services = []
        for u in self._decode_field(payload, _ADV_TYPE_UUID16_COMPLETE):
            services.append(ubluetooth.UUID(struct.unpack("<h", u)[0]))
        for u in self._decode_field(payload, _ADV_TYPE_UUID32_COMPLETE):
            services.append(ubluetooth.UUID(struct.unpack("<d", u)[0]))
        for u in self._decode_field(payload, _ADV_TYPE_UUID128_COMPLETE):
            services.append(ubluetooth.UUID(u))
        return services

    def _decode_field(self, payload, adv_type):
        i = 0
        result = []
        while i + 1 < len(payload):
            if payload[i + 1] == adv_type:
                result.append(payload[i + 2: i + payload[i] + 1])
            i += 1 + payload[i]
        return result


remote = PowerUPRemote()
remote.connect(0, b'\x00\x81\xf9\xea\xc1\x9f')
utime.sleep(10)
print("Start Read")
while True:
    remote.read()
    utime.sleep(0.100)
#remote.disconnect()

 

Share this post


Link to post
Share on other sites

Good morning @Vinz1911

You can find here an example related to use a Spike Prime Hub as BLE Central Role, with notify subscription from the BLE Peripheral (in this case, the PU Remote Control):
https://github.com/GianCann/SpikePrimeHub/tree/master/ble

After download the code on the Spike, you can connect to a specific remote control with this command:

bt.gap_connect(0,b'\xa4\x34\xf1\x9b\x07\x9e',2000)

Where \xa4\x34\xf1\x9b\x07\x9e is the BLE Address of the device you want to connect (you can find with the command bt.gap_scan(10000, 30000, 30000) or with the app nRF Connect )

After connected with the remote, if you push a any button, you get a value in this format:

\x05\x00\x0E\x00\x01

- The first byte (x05) is the lengh of the entire message (5 bytes)
- The second byte (x00) is not used at this moment
- The third byte (x0E) is related to "Button device" (you can ignore this)
- The fourth byte can be 0x00 (Buttons A) or 0x01 (Buttons B)
- The last byte can be: x01 = Button '+' pressed, xFF = Button '-' pressed, x7F = Button 'red' pressed, x00 = (any) Button released

So, if you press (and hold pressed) the B+ button you receive the \x05\x00\x0E\x01\x01 data, and when released the button you receive \x05\x00\x0E\x01\x00
Or, if you press (and hold pressed) the A- button you receive the \x05\x00\x0E\x00\xFF data, and when released the button you receive \x05\x00\x0E\x00\x00

For the central green button you receive:
\x05\x00\x08\x02\x01 when pressed
\x05\x00\x08\x02\x00 when released

Note: with my example you don't need to read the data in loop, because the code subcribe the notification from the remote, so you receive the datawith the _IRQ_GATTC_NOTIFY event.

Tell me if you need other infos ;)

Share this post


Link to post
Share on other sites
3 hours ago, GianCann said:

...

After connected with the remote, if you push a any button, you get a value in this format:

\x05\x00\x0E\x00\x01

- The first byte (x05) is the lengh of the entire message (5 bytes)
- The second byte (x00) is not used at this moment
- The third byte (x0E) is related to "Button device" (you can ignore this)
- The fourth byte can be 0x00 (Buttons A) or 0x01 (Buttons B)
- The last byte can be: x01 = Button '+' pressed, xFF = Button '-' pressed, x7F = Button 'red' pressed, x00 = (any) Button released

Hello,

I could not find Command 0x0E in the LWP 3.0.0 Documentation.

Where is this Command documented ?

I would expect, that with Command 0x41 Spice requests Notification of (Button-) Port 0x00 (on/or 0x01) of the Remote

and then you will get a 0x45 Notification when a Button is changing.

So in my understanding, Command 0x0E is (still) undocumented

Can you light here a little bit the documentation darkness ?

Share this post


Link to post
Share on other sites

Hi @GianCann

I tried out your example and I got some problems. 

I Uploaded it with the Spike Prime App and got connected successfully to my remote. But the first Problem I had is, that I'm receiving notifications very slowly and my second problem is, after I press a button I got the bytes in format you presented but then it stoped working :( I don't receive anything else then.

 

Here my Logs, maybe you can help me again ?

Thanks

[13:57:43.801] > event == _IRQ_PERIPHERAL_CONNECT
[13:57:44.804] > 1025 0 b'\x00\x81\xf9\xea\xc1\x9f'
[13:57:47.805] > aaaaaa complete..0x0B write 0x0A,0x0,0x41,0x00,0x00,0x01,0x00,0x00,0x00,0x01
[13:57:50.809] > aaaaaa complete..0x0Bwrite 0x0A,0x0,0x41,0x01,0x00,0x01,0x00,0x00,0x00,0x01
[13:57:51.812] > connect complete.activate notifications.write Handel 0x0c data0100... starting write
[13:57:54.815] > event == _IRQ_GATTC_WRITE_STATUS
[13:57:55.818] > 0
[13:57:56.819] > event == _IRQ_GATTC_WRITE_STATUS
[13:57:57.820] > 0
[13:57:58.821] > event == _IRQ_GATTC_WRITE_STATUS
[13:57:59.824] > 0
[13:58:00.825] > event == _IRQ_GATTC_NOTIFY
[13:58:01.827] > b'\n\x00G\x00\x00\x01\x00\x00\x00\x01'
[13:58:02.828] > event == _IRQ_GATTC_NOTIFY
[13:58:03.831] > b'\x0f\x00\x04\x00\x017\x00\x00\x00\x00\x10\x00\x00\x00\x10'
[13:58:04.833] > event == _IRQ_GATTC_NOTIFY
[13:58:05.837] > b'\x0f\x00\x04\x01\x017\x00\x00\x00\x00\x10\x00\x00\x00\x10'
[13:58:06.839] > event == _IRQ_GATTC_NOTIFY
[13:58:07.842] > b'\x0f\x00\x044\x01\x17\x00\x00\x00\x00\x10\x00\x00\x00\x10'
[13:58:08.843] > event == _IRQ_GATTC_NOTIFY
[13:58:09.846] > b'\n\x00G\x01\x00\x01\x00\x00\x00\x01'
[13:58:10.847] > event == _IRQ_GATTC_NOTIFY
[13:58:11.849] > b'\x05\x00E\x00\xff'

 

Edited by Vinz1911

Share this post


Link to post
Share on other sites

@GianCann 

Yes I directly connected it to my PC with the USB Cable, I also tried it out with my Mac and the latest Spike Prime App and also with my Windows 10 PC, also with the latest Spike Prime App

Edited by Vinz1911

Share this post


Link to post
Share on other sites

I figured out the problem. It's related to the bus where the lego app reads out the console output. 

I wrote a litte code which should enable a light matrix dot if I press a button on the remote. 

After uploading it to the spike and disconnecting it from my pc and directly execute it on the prime. It work as intended.

This makes debugging a bit more complicated, but now it works *_*

 

Just one Question, can explain me this block here, the packets you send after the connection ? Or have you any reference to the LWP 3.0 Documentation what is done here ?

 

ar=struct.pack('<BBBBBBBBBB', 0x0A,0x0,0x41,0x00,0x00,0x01,0x00,0x00,0x00,0x01)
bt.gattc_write(conn_handle,0x0B,ar,1)
utime.sleep(2)
print('aaaaaa complete..0x0B write 0x0A,0x0,0x41,0x00,0x00,0x01,0x00,0x00,0x00,0x01')

al=struct.pack('<BBBBBBBBBB', 0x0A,0x0,0x41,0x01,0x00,0x01,0x00,0x00,0x00,0x01)
bt.gattc_write(conn_handle,0x0B,al,1)
utime.sleep(2)
print('aaaaaa complete..0x0Bwrite 0x0A,0x0,0x41,0x01,0x00,0x01,0x00,0x00,0x00,0x01')

bt.gattc_write(conn_handle,0x0C,struct.pack('<BB',0x01,0x00),1)
print('connect complete.activate notifications.write Handel 0x0c data0100... starting write')

 

Share this post


Link to post
Share on other sites
11 minutes ago, Vinz1911 said:

I figured out the problem. It's related to the bus where the lego app reads out the console output.

Ok, I don't use official App, but uPyCraft ;)

12 minutes ago, Vinz1911 said:

Or have you any reference to the LWP 3.0 Documentation what is done here ?

They are the LWP command to enable the "Port value changes notification" , and the Hub properties from the Hub:

https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#format-of-port-input-format-setup-single

Enable this notification, the Hub, send a message every time a port change his value (button pressed, button released) and you obtain this message with the BLE callback function, without reading in a loop the port value.

Share this post


Link to post
Share on other sites
13 minutes ago, GianCann said:

Ok, I don't use official App, but uPyCraft ;)

They are the LWP command to enable the "Port value changes notification" , and the Hub properties from the Hub:

https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#format-of-port-input-format-setup-single

Enable this notification, the Hub, send a message every time a port change his value (button pressed, button released) and you obtain this message with the BLE callback function, without reading in a loop the port value.

Ah alright thank you :) 

Is there any tutorial how to read out console output with uPyCraft or is this Possible if I put the Spike in REPL mode via console ?

Share this post


Link to post
Share on other sites
1 hour ago, BrickTronic said:

So in my understanding, Command 0x0E is (still) undocumented

Can you light here a little bit the documentation darkness ?

I believe this is a message type rather than a command, isn't it? 05/00/MessageType/ and then it becomes specific to the type. 0E is not documented - as a good bunch of other things. At least not in chapter 3.3 "Message Types" (Hub and port related).

The LWP3.0 document was outdated the day it was introduced as it is always with evolving software/firmware - when not updating the document. Which hasn't been done.

As far as I can tell also the "new" motors (e.g., PUp L) are not in the device list etc etc.

Best
Thorsten 

Share this post


Link to post
Share on other sites

@GianCann

With your Help I could finished my little Class today, which automatically detects der PowerUP Remote, connects to it and the spike got notified about the pressed buttons.
I also could understand with the help of the LWP 3.0 Documentation what the bytes enable in the Remote. But I have one last Question, that's a thing I could not find in the Docu.

It's about the last byte packet you send 

bt.gattc_write(conn_handle,0x0C,struct.pack('<BB',0x01,0x00),1)

What is enabled here ? The 0x0C is the "Value Handle" byte but I can't find any reference about it. Can you explain me what is triggered here in the remote ?
It seems that this is a very necessary part because if I don't send this packet, my program will not start receiving updates from the remote.

Greetings
Vinz 

Edited by Vinz1911

Share this post


Link to post
Share on other sites

For Everyone who is interested in Lego Spike Prime + PowerUP Remote, I created an example here:

Spike Prime + Power UP Remote

 

- The Remote is automatically detected and connected (just copy this to your spike prime, run it and start at the same moment the advertising on the remote)
- After a successfully created connection, the remote‘s LED is changed to GREEN
- Every button let's light up a different dot on the Spike Prime, releasing it turn it off

Edited by Vinz1911

Share this post


Link to post
Share on other sites
On 9/5/2020 at 3:20 AM, Vinz1911 said:

bt.gattc_write(conn_handle,0x0C,struct.pack('<BB',0x01,0x00),1)

What is enabled here ? The 0x0C is the "Value Handle" byte but I can't find any reference about it. Can you explain me what is triggered here in the remote ?

This command enable the "notification service" from the specific characteristics.
Is related to BLE stack:

https://devzone.nordicsemi.com/nordic/short-range-guides/b/bluetooth-low-energy/posts/ble-characteristics-a-beginners-tutorial

Share this post


Link to post
Share on other sites
On 9/4/2020 at 2:46 PM, BrickTronic said:

- The third byte (x0E) is related to "Button device" (you can ignore this)

Sorry for the mistake: the correct value is x45 (Port Value)

Edited by GianCann

Share this post


Link to post
Share on other sites

Hello,

I make some not very nice modifications :) 

I observe different behavior due to print function or maybe connection with spike app.

When i try to use copy paste terminal mode the following error is shown:

 Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "spike/__init__.py", line 1, in <module>
  File "_api/__init__.py", line 1, in <module>
MemoryError: memory allocation failed, allocating 1280 bytes

BR,

Teodor

from spike import PrimeHub, LightMatrix, Button, StatusLight, App
from spike.control import wait_for_seconds, wait_until, Timer
import utime
import ubluetooth
import ubinascii
import struct
from micropython import const
 
# LEGO(R) Wireless Protocol 3.0
# [0x00, 0x00, 0x00][0x00, 0x00 ... n]
# 1. Message Length
# 2. HUB ID (Unused, use 0x00)
# 3. Message Type
# 4. Port ID
# 5. - n. Specific Type Codes
e_addr = None
e_addr_type = None
e_state = 0
class PowerUPHandler:
    """Class to deal with LEGO(R) PowerUp(TM) over BLE"""
 
    def __init__(self):
        # constants
        self.__IRQ_SCAN_RESULT = const(1 << 4)
        self.__IRQ_SCAN_COMPLETE = const(1 << 5)
        self.__IRQ_PERIPHERAL_CONNECT = const(1 << 6)
        self.__IRQ_PERIPHERAL_DISCONNECT = const(1 << 7)
        self.__IRQ_GATTC_SERVICE_RESULT = const(1 << 8)
        self.__IRQ_GATTC_CHARACTERISTIC_RESULT = const(1 << 9)
        self.__IRQ_GATTC_READ_RESULT = const(1 << 11)
        self.__IRQ_GATTC_WRITE_STATUS = const(1 << 12)
 
        self.__IRQ_GATTC_NOTIFY = const(1 << 13)
 
        self.__LEGO_SERVICE_UUID = ubluetooth.UUID("00001623-1212-EFDE-1623-785FEABCD123")
        self.__LEGO_SERVICE_CHAR = ubluetooth.UUID("00001624-1212-EFDE-1623-785FEABCD123")
 
        # class specific
        self.__ble = ubluetooth.BLE()
        self.__ble.active(True)
        self.__ble.irq(handler=self.__irq)
        self.__decoder = Decoder()
        self.__reset()
 
    def __reset(self):
        # cached data
        self.__addr = None
        self.__addr_type = None
        self.__adv_type = None
        self.__services = None
        self.__man_data = None
        self.__name = None
        self.__conn_handle = None
        self.__value_handle = None
 
        # reserved callbacks
        self.__scan_callback = None
        self.__read_callback = None
        self.__notify_callback = None
        self.__connected_callback = None
        self.__disconnected_callback = None
 
    # start scan for ble devices
    def scan_start(selftimeoutcallback):
        self.__scan_callback = callback
        hub.light_matrix.show_image('HAPPY')
        self.__ble.gap_scan(timeout, 3000030000)
        #self.__ble.gap_scan(6000)
 
    # stop current scan
    def scan_stop(self):
        self.__ble.gap_scan(0)
 
    # write gatt client data
    def write(selfdataadv_value=None):
        if not self.__is_connected():
            return
        if adv_value:
            self.__ble.gattc_write(self.__conn_handle, adv_value, data)
        else:
    
            self.__ble.gattc_write(self.__conn_handle, self.__value_handle, data)
 
    # read gatt client
    def read(selfcallback):
        if not self.__is_connected():
            return
        self.__read_callback = callback
        self.__ble.gattc_read(self.__conn_handle, self.__value_handle)
 
    # connect to ble device
    def connect(selfaddr_typeaddr):
        self.__ble.gap_connect(addr_type, addr)
 
    # disconnect from ble device
    def disconnect(self):
        if not self.__is_connected():
            return
        self.__ble.gap_disconnect(self.__conn_handle)
        self.__reset()
 
    # get notification
    def on_notify(selfcallback):
        self.__notify_callback = callback
 
    # get callback on connect
    def on_connect(selfcallback):
        self.__connected_callback = callback
 
    # get callback on connect
    def on_disconnect(selfcallback):
        self.__disconnected_callback = callback
 
    # +-------------------+
    # | Private Functions |
    # +-------------------+
 
    # connection status
    def __is_connected(self):
        return self.__conn_handle is not None
 
    # ble event handler
    def __irq(selfeventdata):
        global e_addr
        global e_addr_type
        global e_state
 
        # called for every result of a ble scan
        if event == self.__IRQ_SCAN_RESULT:
            #print('IRQ_SCAN_RESULT')
            addr_type, addr, adv_type, rssi, adv_data = data
            #print(data)
            #print(self.__decoder.decode_services(adv_data), addr_type)
            if self.__LEGO_SERVICE_UUID in self.__decoder.decode_services(adv_data):
                if  e_state == 0:
                    #print('IRQ_SCAN_RESULT state 0')
                    #print('init_addr')
                    self.__addr_type = addr_type
                    self.__addr = bytes(addr)
                    e_addr = bytes(addr)
                    e_addr_type = addr_type
                    #print(bytes(e_addr))
                    self.__adv_type = adv_type
                    self.__name = self.__decoder.decode_name(adv_data)
                    self.__services = self.__decoder.decode_services(adv_data)
                    self.__man_data = self.__decoder.decode_manufacturer(adv_data)
                    self.scan_stop()
                    utime.sleep(1)
                    #print('SCAN_STOP')
                    e_state = 1
                    #self.__ble.gap_connect(addr_type, addr)
 
        # called after a ble scan is finished
        elif event == self.__IRQ_SCAN_COMPLETE:
            #print('IRQ_SCAN_COMPLETE')
            if e_state == 1:
                if  e_addr:
                        #print('SCAN_COMPLETE - CONNECT')
                        self.__ble.gap_connect(e_addr_type,e_addr)
                        e_state = 2
                        utime.sleep(0.2)
                else:
                    print('SCAN_COMPLETE - AGAIN SCAN')
                    self.__ble.gap_scan(2000)
                    e_addr= None
                    e_state = 0    
                    utime.sleep(0.5)
            
        # called if a peripheral device is connected
        elif event == self.__IRQ_PERIPHERAL_CONNECT:
            #print('PERIFERAL CONNECT')
            if e_state == 2:
                #print('PERIFERAL CONNECT in')
                conn_handle, addr_type, addr = data
                self.__conn_handle = conn_handle
                self.__connected_callback()
                e_state = 3
            else:
                pass
        # called if a peripheral device is disconnected
        elif event == self.__IRQ_PERIPHERAL_DISCONNECT:
            print('DISCONNECT')
            conn_handle, _, _ = data
            self.__disconnected_callback()
            if conn_handle == self.__conn_handle:
                self.__reset()
 
        # called if a service is returned
 
        # called if a characteristic is returned
    
        # called if data was successfully read
 
        # called if a notification appears
        elif event == self.__IRQ_GATTC_NOTIFY:
            conn_handle, value_handle, notify_data = data
            #print(notify_data)
            print('n')
 
            if self.__notify_callback:
                self.__notify_callback(notify_data)
            else:
                print('problem notify callback')
        elif event == self.__IRQ_GATTC_WRITE_STATUS:
            print('write status')
 
class Decoder:
    """Class to decode BLE adv_data"""
 
    def __init__(self):
        self.__COMPANY_IDENTIFIER_CODES = {"0397""LEGO System A/S"}
 
    def decode_manufacturer(selfpayload):
        man_data = []
        n = self.__decode_field(payload, const(0xff))
        if not n:
            return []
        company_identifier = ubinascii.hexlify(struct.pack('<h', *struct.unpack('>h', n[0])))
        company_name = self.__COMPANY_IDENTIFIER_CODES.get(company_identifier.decode(), "?")
        company_data = n[0][2:]
        man_data.append(company_identifier.decode())
        man_data.append(company_name)
        man_data.append(company_data)
        return man_data
 
    def decode_name(selfpayload):
        n = self.__decode_field(payload, const(0x09))
        return str(n[0], "utf-8"if n else "parsing failed!"
 
    def decode_services(selfpayload):
        services = []
        for u in self.__decode_field(payload, const(0x3)):
            services.append(ubluetooth.UUID(struct.unpack("<h", u)[0]))
        for u in self.__decode_field(payload, const(0x5)):
            services.append(ubluetooth.UUID(struct.unpack("<d", u)[0]))
        for u in self.__decode_field(payload, const(0x7)):
            services.append(ubluetooth.UUID(u))
        return services
 
    def __decode_field(selfpayloadadv_type):
        i = 0
        result = []
        while i + 1 < len(payload):
            if payload[i + 1] == adv_type:
                result.append(payload[i + 2: i + payload + 1])
            i += 1 + payload
        return result
 
class PowerUPRemote:
    """Class to handle Lego(R) PowerUP(TM) Remote"""
 
    def __init__(self):
        # constants
        self.__POWERED_UP_REMOTE_ID = 66
        self.__COLOR_BLUE = 0x03
        self.__COLOR_LIGHT_BLUE = 0x04
        self.__COLOR_LIGHT_GREEN = 0x05
        self.__COLOR_GREEN = 0x06
 
        self.BUTTON_A_PLUS = self.__create_message([0x050x000x450x000x01])
        self.BUTTON_A_RED = self.__create_message([0x050x000x450x000x7F])
        self.BUTTON_A_MINUS = self.__create_message([0x050x000x450x000xFF])
        self.BUTTON_A_RELEASED = self.__create_message([0x050x000x450x000x00])
 
        self.BUTTON_B_PLUS = self.__create_message([0x050x000x450x010x01])
        self.BUTTON_B_RED = self.__create_message([0x050x000x450x010x7F])
        self.BUTTON_B_MINUS = self.__create_message([0x050x000x450x010xFF])
        self.BUTTON_B_RELEASED = self.__create_message([0x050x000x450x010x00])
 
        self.BUTTON_CENTER_GREEN = self.__create_message([0x050x000x080x020x01])
        self.BUTTON_CENTER_RELEASED = self.__create_message([0x050x000x080x020x00])
 
        # class specific
        self.__handler = PowerUPHandler()
        self.__buttons = [self.BUTTON_A_RELEASED, self.BUTTON_B_RELEASED, self.BUTTON_CENTER_RELEASED]
 
        # callbacks
        self.__button_callback = None
        self.__connect_callback = None
        self.__disconnect_callback = None
 
    def connect(selftimeout=3000):
        self.__handler.on_connect(callback=self.__on_connect)
        self.__handler.on_disconnect(callback=self.__on_disconnect)
        self.__handler.on_notify(callback=self.__on_notify)
        self.__handler.scan_start(timeout, callback=self.__on_scan)
 
    def disconnect(self):
        self.__handler.disconnect()
 
    def on_button(selfcallback):
        self.__button_callback = callback
 
    def on_connect(selfcallback):
        self.__connect_callback = callback
 
    def on_disconnect(selfcallback):
        self.__disconnect_callback = callback
 
    # +-------------------+
    # | Private Functions |
    # +-------------------+
 
    def __create_message(selfbyte_array):
        message = struct.pack('%sb' % len(byte_array), *byte_array)
        return message
 
    def __set_remote_color(selfcolor_byte):
        color = self.__create_message([0x080x000x810x340x110x510x00, color_byte])
        self.__handler.write(color)
 
    # callback for scan result
    def __on_scan(selfaddr_typeaddrman_data):
        if addr and man_data[2][1] == self.__POWERED_UP_REMOTE_ID:
            self.__handler.connect(addr_type, addr)
 
    def __on_connect(self):
        #print('print_connect_callback')
        # enables remote left port notification
        left_port = self.__create_message([0x0A0x000x410x000x000x010x000x000x000x01])
        # enables remote right port notification
        right_port = self.__create_message([0x0A0x000x410x010x000x010x000x000x000x01])
 
        color = self.__create_message([0x080x000x810x340x110x510x00self.__COLOR_GREEN])
        # enables notifier
        notifier = self.__create_message([0x010x00])
        
        self.__handler.__ble.gattc_write(self.__handler.__conn_handle,0x0B,color,1)
        utime.sleep(0.5)
        self.__handler.__ble.gattc_write(self.__handler.__conn_handle,0x0B,left_port,1)
        utime.sleep(0.5)
        self.__handler.__ble.gattc_write(self.__handler.__conn_handle,0x0B,right_port,1)
        utime.sleep(0.5)
        self.__handler.__ble.gattc_write(self.__handler.__conn_handle,0x0C,notifier,1)
        utime.sleep(0.5)
        #print('print_connect_callback')
        #print(self.__connect_callback)
        hub.light_matrix.off()
        if self.__connect_callback:
            self.__connect_callback()
        else:
            print('problem')
 
    def __on_disconnect(self):
        if self.__disconnect_callback:
            self.__disconnect_callback()
 
    def __on_notify(selfdata):
        if data == self.BUTTON_A_PLUS:
            self.__buttons[0] = self.BUTTON_A_PLUS
        if data == self.BUTTON_A_RED:
            self.__buttons[0] = self.BUTTON_A_RED
        if data == self.BUTTON_A_MINUS:
            self.__buttons[0] = self.BUTTON_A_MINUS
        if data == self.BUTTON_A_RELEASED:
            self.__buttons[0] = self.BUTTON_A_RELEASED
        if data == self.BUTTON_B_PLUS:
            self.__buttons[1] = self.BUTTON_B_PLUS
        if data == self.BUTTON_B_RED:
            self.__buttons[1] = self.BUTTON_B_RED
        if data == self.BUTTON_B_MINUS:
            self.__buttons[1] = self.BUTTON_B_MINUS
        if data == self.BUTTON_B_RELEASED:
            self.__buttons[1] = self.BUTTON_B_RELEASED
        if data == self.BUTTON_CENTER_GREEN:
            self.__buttons[2] = self.BUTTON_CENTER_GREEN
        if data == self.BUTTON_CENTER_RELEASED:
            self.__buttons[2] = self.BUTTON_CENTER_RELEASED
 
        # callback the button data
        if self.__button_callback:
            self.__button_callback(self.__buttons)
        else:
            print('problem bc')
 
def on_connect():
    hub.status_light.on("green")
 
def on_disconnect():
    hub.status_light.on("white")
 
def on_button(buttons):
    #print('on button')
    #print(buttons[_BUTTON_A])
    #print(buttons[_BUTTON_B])
    #print(remote.BUTTON_A_RELEASED)
    #hub.light_matrix.off()
    # a buttons from the remote
    if buttons[_BUTTON_A] == remote.BUTTON_A_RELEASED:
        hub.light_matrix.set_pixel(11brightness=00)
        hub.light_matrix.set_pixel(21brightness=00)
        hub.light_matrix.set_pixel(31brightness=00)
    if buttons[_BUTTON_B] == remote.BUTTON_B_RELEASED:
        hub.light_matrix.set_pixel(13brightness=00)
        hub.light_matrix.set_pixel(23brightness=00)
        hub.light_matrix.set_pixel(33brightness=00)
 
    if buttons[_BUTTON_A] == remote.BUTTON_A_PLUS: # and buttons[_BUTTON_B] == remote.BUTTON_B_RELEASED:
        hub.light_matrix.set_pixel(11brightness=100)
    if buttons[_BUTTON_A] == remote.BUTTON_A_RED: # and buttons[_BUTTON_B] == remote.BUTTON_B_RELEASED:
        hub.light_matrix.set_pixel(21brightness=100)
    if buttons[_BUTTON_A] == remote.BUTTON_A_MINUS: # and buttons[_BUTTON_B] == remote.BUTTON_B_RELEASED:
        hub.light_matrix.set_pixel(31brightness=100)
 
    # b buttons from the remote
    if buttons[_BUTTON_B] == remote.BUTTON_B_PLUS: # and buttons[_BUTTON_A] == remote.BUTTON_A_RELEASED:
        hub.light_matrix.set_pixel(13brightness=100)
    if buttons[_BUTTON_B] == remote.BUTTON_B_RED: # and buttons[_BUTTON_A] == remote.BUTTON_A_RELEASED:
        hub.light_matrix.set_pixel(23brightness=100)
    if buttons[_BUTTON_B] == remote.BUTTON_B_MINUS: # and buttons[_BUTTON_A] == remote.BUTTON_A_RELEASED:
        hub.light_matrix.set_pixel(33brightness=100)
    # combined buttons from the remote


 
_BUTTON_A = 0
_BUTTON_B = 1
_BUTTON_CENTER = 2
hub = PrimeHub()
print('first remote ...')
remote = PowerUPRemote()
remote.on_connect(callback=on_connect)
remote.on_disconnect(callback=on_disconnect)
remote.on_button(callback=on_button)
remote.connect()
On 6/25/2020 at 8:04 PM, BrickTronic said:

Hello,

There is written, that Spike contain a ARM Cortex M4 processor at 100MHz with 1M Flash & 320K RAM

Any idea what processor is really used here ?

Maybe a STM32L4P5RG in LQFP64 ?

But how is then done BLE/BTC ?
what Coprocessor ?
a successor of the CC2640 or nrF52840 ?

Share this post


Link to post
Share on other sites

Hi everyone....

This is a cool project and just what I need!!! 

But I'm wondering how it's possible to run the code on Spike Prime? When I upload it to the hub (via mpfshell) and try to import it in the mPy REPL, I get and ModuleNotFound Error.

Do you precompile the script before uploading to the hub or what's the process of making it work? 

Best Regards

Thomas

On 9/5/2020 at 10:59 PM, Vinz1911 said:

For Everyone who is interested in Lego Spike Prime + PowerUP Remote, I created an example here:

Spike Prime + Power UP Remote

 

- The Remote is automatically detected and connected (just copy this to your spike prime, run it and start at the same moment the advertising on the remote)
- After a successfully created connection, the remote‘s LED is changed to GREEN
- Every button let's light up a different dot on the Spike Prime, releasing it turn it off

 

Share this post


Link to post
Share on other sites

Figured out I needed to update my Spike Prime Hub to the latest version. Dooooh...... Now it's running smoothly. Sorry for the trouble.

BR 

Thomas

Share this post


Link to post
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

  • Recently Browsing   0 members

    No registered users viewing this page.