veezer

BrickNil - A Python async library for PoweredUp/Boost

Recommended Posts

Posted (edited)

Hi all, although I briefly mentioned this in a thread over in the Lego Trains sub-forum, I thought it might be useful to create a new thread to announce my new Python library for controlling PoweredUp and Boost hubs/sensors/motors called BrickNil.  It's open-source and written to support the newest Python 3.7+ while taking full advantage of modern async programming features.  I mainly wrote it to have a cross-platform set of libraries in Python with a clean concurrent programming interface.  I've tested on Mac OS X and a Raspberry PI, although I'm pretty sure it should work on any Linux system. Win10 support was added by @David Lechner.  It should support all the sensors and motors that are provided in Boost, PoweredUp, and Duplo Trains, and I hope to have Wedo support at some point soon.

If anyone finds this useful and finds any bugs or has some feature-requests, please do let me know.  

You can find the project documentation here:

 BrickNil - Control LEGO Bluetooth Sensors and Motors with Python

Installation available as a standard Python library:  pip install bricknil

Examples: look under examples/ in the github source https://github.com/virantha/bricknil

And here's a quick but non-trivial example of what using the library looks like.  This program sets the speed of a train depending on how close your hand is to a vision sensor, and will quit the program if you wave your hand more than three times in front of it.

 

from curio import sleep
from bricknil import attach, start
from bricknil.hub import PoweredUpHub
from bricknil.sensor import TrainMotor, VisionSensor
from bricknil.process import Process

@attach(VisionSensor, name='train_sensor', capabilities=['sense_count', 'sense_distance'])
@attach(TrainMotor, name='motor')
class Train(PoweredUpHub):

    async def train_sensor_change(self):
        distance = self.train_sensor.value[VisionSensor.capability.sense_distance]
        count = self.train_sensor.value[VisionSensor.capability.sense_count]

        if count > 3:
            # Wave your hand more than three times in front of the sensor and the program ends
            self.keep_running = False

        # The closer your hand gets to the sensor, the faster the motor runs
        self.motor_speed = (10-distance)*10

        # Flag a change
        self.sensor_change = True

    async def run(self):
        self.motor_speed = 0
        self.keep_running = True
        self.sensor_change = False

        while self.keep_running:
            if self.sensor_change:
                await self.motor.ramp_speed(self.motor_speed, 900)  # Ramp to new speed in 0.9 seconds
                self.sensor_change = False
            await sleep(1)

async def system():
    train = Train('My Train')

if __name__ == '__main__':
    Process.level = Process.MSG_LEVEL.INFO
    start(system)

And here's a quick video of where it's being used to relay button presses from the PoweredUp remote to control Vernie the Robot (Boost hub).

And of course, I can't say enough about how helpful all the posts on Lego hardware in these forums have been to getting this library to work, and in particular the inspiration from the NodeJS PoweredUP library that forum member MrHobbles has created.

Edited by veezer
win10

Share this post


Link to post
Share on other sites
18 hours ago, David Lechner said:

This looks really great!

Thanks, David!  Saw your pull request adding support for Windows 10. I’ll get your additions merged and the updates into pypi later today. Appreciate the work!

Share this post


Link to post
Share on other sites

@David Lechner I've uploaded new pypi packages based on your changes (bricknil 0.8, bricknil-bleak 0.3.1).  Let me know if you see any issues.  

 

Share this post


Link to post
Share on other sites
Posted (edited)

Hiya, thank you (both) for this library! It works great (with the 0.8 version) on Windows 10. Once I get a grip on how the async/await pattern works in Python/curio I will definitely add this to my project (I am trying to send commands to my trains through MQTT so I can easily integrate it with other IoT-like devices). 

I have 2 minor remarks on the project and/or documentation, and they are:

1) On my machine (Win10) the UUID is not actually showing, instead a MAC-like address is given. This has made it hard to select a specific device. From inside the main run() method in Train I have added

self.message_info("BLE_ID:"+self.ble_id)

which outputs

INFO:GreenCargoTrain.2:BLE_ID:90:84:2B:0B:DD:BF

I have tried setting the logging to DEBUG, and using all the UUID's that are in there, including the Device advertised UUID, but no luck. For example, without setting a specific UUID it finds the train:

INFO:BLE Event Q.0:checking manufacturer ID for device named Unknown for HUB NO.4
INFO:BLE Event Q.0:found device Unknown
INFO:BLE Event Q.0:Device advertised: {'00002a00-0000-1000-8000-00805f9b34fb': <Windows.Devices.Bluetooth.GenericAttributeProfile.GattCharacteristic object at 0x071A27D0>, '00002a01-0000-1000-8000-00805f9b34fb': <Windows.Devices.Bluetooth.GenericAttributeProfile.GattCharacteristic object at 0x071A2D50>, '00002a04-0000-1000-8000-00805f9b34fb': <Windows.Devices.Bluetooth.GenericAttributeProfile.GattCharacteristic object at 0x071A2D70>, '00002a05-0000-1000-8000-00805f9b34fb': <Windows.Devices.Bluetooth.GenericAttributeProfile.GattCharacteristic object at 0x071A2A70>, '00001624-1212-efde-1623-785feabcd123': <Windows.Devices.Bluetooth.GenericAttributeProfile.GattCharacteristic object at 0x071A2870>}
INFO:BLE Event Q.0:Connected to device HUB NO.4:90:84:2B:0B:DD:BF

But when putting any of those UUID's in there (the first one in this example) I get:

hub = Train("GreenCargoTrain", ble_id="00002a05-0000-1000-8000-00805f9b34fb")
INFO:BLE Event Q.0:checking manufacturer ID for device named Unknown for HUB NO.4
INFO:BLE Event Q.0:Address 90:84:2B:0B:DD:BF is not a match

This feels like it might be a Windows issue...

2) The following line in the example does not seem to work on my machine.

Process.level = Process.MSG_LEVEL.INFO

I changed it to:

logging.basicConfig(level=logging.INFO)

That does work, and I noticed you have the same code in other examples on website.

Edited by Entalyan
Added a remark, and found evidence that my original remark was already in documentation.

Share this post


Link to post
Share on other sites

@Entalyan regarding 2), yes you're right, that line setting the log level is incorrect in the documentation.  I've fixed the README to reflect the proper way, which you've already figured out. Sorry about that. 

Regarding 1), I think it is a Windows 10 issue.  I don't have a windows box right now to debug this, so would you mind opening a bug report at https://github.com/virantha/bricknil/issues and pasting in everything that gets logged with logging.DEBUG setting?  From the logs you have there, it does seem like we need to fix where the windows-specific code is pulling in the broadcast UUID.

Share this post


Link to post
Share on other sites

@veezer As discussed on github, the new version now works perfectly with the new ID selector, thanks! :)

I will be moving the code onto a linux box in the future, then I will test there. I have also gotten MQTT set up to work with the library, so the basic version of what I want this code to do is ready:

import logging
import paho.mqtt.client as mqtt
from curio import  UniversalQueue, sleep, spawn
from bricknil import attach, start
from bricknil.hub import PoweredUpHub
from bricknil.sensor import TrainMotor
from bricknil.process import Process

def on_message(client, userdata, message):
    #print("message received " ,str(message.payload.decode("utf-8")))
    q.put(str(message.payload.decode("utf-8")))

async def start_MQTT():
    client = mqtt.Client("P1")
    client.on_message=on_message
    client.connect("127.0.0.1")
    client.loop_start()
    client.subscribe("trains/pup/cargo1")
    
q = UniversalQueue()

@attach(TrainMotor, name='motor')
class Train(PoweredUpHub):

    async def run(self):
        self.message_info("Running")
        m = await spawn(start_MQTT)
        self.message_info("Started MQTT")
        while True:
            item = await q.get()
            if item is not None:
                if item == "start":
                    await self.starttrain()
                if item == "stop":
                    await self.stoptrain()

    async def starttrain(self):
        self.message_info('Increasing speed')
        await self.motor.ramp_speed(80,5000)
            
    async def stoptrain(self):
        self.message_info('Coming to a stop')
        await self.motor.ramp_speed(0,1000)
        

async def system():
    hub = Train('GreenCargoTrain', ble_id="90:84:2B:0B:DD:BF")
    
if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    start(system)

It is still ugly and could use some professionalization, but I will get to that later. Using this library and code I can control my trains using MQTT, which I chose to automate all my devices (sensors and switches mostly right now). This way I can create a simple UI interacting only with MQTT that can fully automate my train layout. I thought you might like to know what people would be using this for ;)

Share this post


Link to post
Share on other sites
Posted (edited)

@Entalyan Great to hear it's working!  Thanks for your feedback (and @David Lechner's help) to get the ble_id working on Win/Linux.   Very interesting to see how you're using it; I can see that it would be pretty neat to have Lego hubs integrated as part of a larger automation system with other devices!  Are you using some other software automation tool that's controlling and sending the MQTT messages, or is it your own code?

Oh, and just noticed that you probably need to add a `await q.task_done()` after you do the q.get(). 

 while True:
    item = await q.get()
    await q.task_done()		

This will probably be needed to have a clean exit if you implement a 'quit' command and you have a `await q.join()` statement to close out the queue processing.

Edited by veezer

Share this post


Link to post
Share on other sites

Updated:  Version 0.8.2 is now out, and adds support for external motors (which was the last outstanding peripheral from Pup/Boost).  Absolute positioning and relative rotation are now supported.

Share this post


Link to post
Share on other sites
Posted (edited)

 @veezer Thanks for the tip regarding the task_done(). Building a clean exit hasn't been a top priority so far, I had enough trouble getting it running in the first place ;) 

As for controlling this using MQTT: I will first write my own application for this, since that seems easiest. I only really started this whole project 2 weeks ago, and it involves a lot of things I am unfamiliar with (Python, integration async libraries with synchronous ones, Arduinos, wiring, etc.), so I am taking baby steps. The idea is to build a slowly growing ecosystem out of small parts, so I don't burn out on any single task. Doing all the things below (and whatever else I think of) in 1 single massive project would probably put too much pressure on it, making it feel more like work and less like a hobby. My list of ideas include (in no particular order):

  • Using NiFi to control the logic
  • Using Kafka instead of MQTT
  • Develop web or tablet app (instead of desktop)
  • Put NFC chips on the trains and cars to identify then while they pass
  • Automate a 'railyard', where I can reconfigure trains using NFC chips
  • Put the whole thing in containers on Kubernetes
  • Hosting the control software in the cloud for remote access
And that is just for the soft/hardware part. Of course I also want to add more trains and buildings in Lego ;)
 
By the way, I updated to 0.8.2, and my regression test was successful. The functionality I was using is still working. I have no way to test the new features though.
Edited by Entalyan

Share this post


Link to post
Share on other sites

Just wanted to note that v0.9 is out.  It adds support for using the Wedo peripherals (motor, tilt sensor, motion sensor).  Also working on a web dashboard for the library, which should come out once we hit 1.0.  Only thing to note is that according to @Mr Hobbles the virtual port behavior has changed on the PoweredUp hub with the new firmware, so if you're controlling two motors ganged into one virtual port, it won't work yet.  I'll add support for that at some point once the Boost hub gets a firmware update or I get my hands on another motor.

Share this post


Link to post
Share on other sites

I had a go with this last night, and got it working very quickly, controlling Boost. :thumbup:  

Much faster *for me* to understand than the alternatives I've seen (because I'm familiar with python and python modules).  Thanks. :classic:

Share this post


Link to post
Share on other sites
On 4/17/2019 at 12:48 PM, andythenorth said:

I had a go with this last night, and got it working very quickly, controlling Boost. :thumbup:  

Much faster *for me* to understand than the alternatives I've seen (because I'm familiar with python and python modules).  Thanks. :classic:

Cool! Great to hear!  Let me know if you spot any bugs or issues.  

Share this post


Link to post
Share on other sites

Coming in the next release is a web dashboard that will show the status of all the peripherals in your running BrickNil program.  It's display only right now, but I hope to add control capability to this dashboard at some point too.  Here's an example where the program is controlling my duplo train, with the motor ramping up at intervals, changing LED color, and displaying speed and wheel revolutions, as well as the reflectivity sensor reading.

 

 

Share this post


Link to post
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

  • Recently Browsing   0 members

    No registered users viewing this page.