Examples¶
The following are a list of examples for lib.zerynth.zdm.
Simple ZDM¶
In this example you can see how to create a ZDM device and connect it to the Zerynth Device Manager and send periodically data. It simulates a weather sensor that sends temperature and pressure values to the ZDM periodically.
You can use this example to test ZDM Webhooks and Ubidots integration.
To check if the ZDM is correctly receiving data sent by your device, use the data ZDM command.
main.py
################################################################################
# Zerynth Device Manager
#
# Created by Zerynth Team 2020 CC
# Authors: E.Neri, D.Neri
###############################################################################
from zdm import zdm
from wireless import wifi
# Please uncomment the line related to the wi-fi chip architecture you are using
# from microchip.winc1500 import winc1500 as wifi_driver
# from espressif.esp32net import esp32wifi as wifi_driver
import streams
import json
# paste here your device id and password generated by the ZDM
# the password is a jwt generated from a device's key
device_id = 'your device id'
password = 'your device password'
def pub_random():
# this function is called periodically to publish to ZDM random int value labeled with tags values
print('------ publish random ------')
tags = ['tag1', 'tag2', 'tag3']
payload = {
'value': 0
}
for t in tags:
payload['value'] = random(0, 100)
# publish payload to ZDM
device.publish(json.dumps(payload), t)
print('published on tag:', t, ':', payload)
print('pub_random done')
def pub_temp_pressure():
# this function publish another payload with two random int values
print('---- publish temp_pressure ----')
tag = 'tag4'
temp = random(19, 23)
pressure = random(50, 60)
payload = {'temp': temp, 'pressure': pressure}
device.publish(json.dumps(payload), tag)
print('published on tag: ', tag, ':', payload)
streams.serial()
try:
wifi_driver.auto_init()
for _ in range(5):
try:
print("connect wifi")
# Write here your wifi SSID and password
wifi.link("***Wifi-name***", wifi.WIFI_WPA2, "***Wifi-password***")
print("connect wifi done")
break
except Exception as e:
print("wifi connect err", e)
# create a ZDM Device instance with your device id
device = zdm.Device(device_id)
# set the device's password (jwt)
device.set_password(password)
# connect your device to ZDM enabling the device to receive incoming messages
device.connect()
while True:
sleep(5000)
pub_random()
pub_temp_pressure()
except Exception as e:
print("main", e)
ZDM Jobs¶
A basic example showing ZDM Jobs and how to handle them. Write your own jobs, then add them in the jobs dictionary with a custom key.
Once your device is connected to the ZDM, you can send it job commands using the key you defined and your device will execute functions remotely.
main.py
################################################################################
# Zerynth Device Manager
#
# Created by Zerynth Team 2020 CC
# Authors: E.Neri, D.Neri
###############################################################################
from zdm import zdm
from wireless import wifi
# Please uncomment the line related to the wi-fi chip architecture you are using
# from microchip.winc1500 import winc1500 as wifi_driver
# from espressif.esp32net import esp32wifi as wifi_driver
import streams
import json
# paste here your device id and password generated by the ZDM
# the password is a jwt generated from a device's key
device_id = 'your device id'
password = 'your device password'
def custom_job_1(obj, arg):
print("custom_job_1")
return {
'value 1': random(0, 100),
'value 2': random(100, 200)
}
def custom_job_2(obj, arg):
print("custom_job_2")
return 'this is an example result'
# you can call job1 and job2 method using rpc
my_jobs = {
'job1': custom_job_1,
'job2': custom_job_2,
}
streams.serial()
try:
wifi_driver.auto_init()
for _ in range(5):
try:
print("connect wifi")
# Write here your wifi SSID and password
wifi.link("***Wifi-name***", wifi.WIFI_WPA2, "***Wifi-password***")
print("connect wifi done")
break
except Exception as e:
print("wifi connect err", e)
# create a ZDM Device instance with your id and the jobs dictionary
device = zdm.Device(device_id, jobs_dict=my_jobs)
# set the device's password (jwt)
device.set_password(password)
# connect your device to ZDM enabling the device to receive incoming messages
device.connect()
while True:
sleep(1000)
except Exception as e:
print("main", e)
FOTA Updates¶
Connect your device to ZDM and start updating the firmware seamlessly. In this example you can see how to create a ZDM device and connect it to the Zerynth Device Manager.
In this example, a FOTA callback function is defined, which is called during the FOTA update steps. The FOTA callback allows you to accept or refuse a FOTA from your devices using the return value. If the callback returns True the device will accept the FOTA update requests, if the callback return False the device will refuse it.
Try to edit the function e do your tests using ZDM FOTA commands.
main.py
################################################################################
# Zerynth Device Manager
#
# Created by Zerynth Team 2020 CC
# Authors: E.Neri, D.Neri
###############################################################################
from zdm import zdm
from wireless import wifi
# choose a wifi chip supporting secure sockets and client certificates
# from microchip.winc1500 import winc1500 as wifi_driver
# from espressif.esp32net import esp32wifi as wifi_driver
import streams
# paste here your device id and password generated by the ZDM
# the password is a jwt generated from a device's key
device_id = 'your device id'
password = 'your device password'
def fota_callback():
# This function is called many times during a FOTA update. If it returns True the FOTA update will
# be accepted by the device, refused if it returns False
print("Fota callback called")
return True
streams.serial()
try:
wifi_driver.auto_init()
for _ in range(5):
try:
print("connect wifi")
# Write here your wifi SSID and password
wifi.link("***Wifi-name***", wifi.WIFI_WPA2, "***Wifi-password***")
print("connect wifi done")
break
except Exception as e:
print("wifi connect err", e)
# create a ZDM Device instance with your id and the fota callback
device = zdm.Device(device_id, fota_callback=fota_callback)
# set the device's password (jwt)
device.set_password(password)
# connect your device to ZDM enabling the device to receive incoming messages
device.connect()
while True:
sleep(1000)
except Exception as e:
print("main", e)
ZDM_events¶
ZDM_events example show you how to send events from your device. You could use events to let the device notify a specific status or condition with a message, for example when a variable reach a threshold value.
This example creates a ZDM Device instance that starts a loop and sends an example event every 10 seconds.
main.py
################################################################################
# Zerynth Device Manager
#
# Created by Zerynth Team 2020 CC
# Authors: E.Neri
###############################################################################
from zdm import zdm
from wireless import wifi
import streams
# Please uncomment the line related to the wi-fi chip architecture you are using
# from microchip.winc1500 import winc1500 as wifi_driver
# from espressif.esp32net import esp32wifi as wifi_driver
# paste here your device id and password generated by the ZDM
# the password is a jwt generated from a device's key
device_id = 'your device id'
password = 'your device password'
streams.serial()
try:
wifi_driver.auto_init()
for _ in range(5):
try:
print("connect wifi")
# Write here your wifi SSID and password
wifi.link("***Wifi-name***", wifi.WIFI_WPA2, "***Wifi-password***")
print("connect wifi done")
break
except Exception as e:
print("wifi connect err", e)
# create a ZDM Device instance with your device id
device = zdm.Device(device_id)
# set the device's password (jwt)
device.set_password(password)
# connect your device to ZDM enabling the device to receive incoming messages
device.connect()
while True:
sleep(10000)
value = {
'name': 'optional event name',
'message': 'this is an event'
}
device.send_event(value)
except Exception as e:
print("main", e)
ZDM_Advanced¶
This is an advanced example aimed at showing all the ZDM functionalities integrated in an embedded firmware. The example simulates the use of the ZDM in an industrial IOT scenario where industrial machines are monitored via sensors interfaced with zerynth powered boards
In this example we simulate the acquisition of 3 variables from 2 industrial machines (a CNC and an industrial pump). The setup also includes two relays used to control a Red and a Green light indicators placed near the machine. The lights are used to notify the operator on remote alarms sent directly by the ZDM as jobs.
main.py
################################################################################
# Zerynth Device Manager
#
# Created by Zerynth Team 2020 CC
# Author: E.Neri, D.Mazzei
###############################################################################
from zdm import zdm
from wireless import wifi
import streams
import json
import monitor
# *** Please uncomment the line related to the wi-fi chip architecture you are using ***
# from microchip.winc1500 import winc1500 as wifi_driver
# from espressif.esp32net import esp32wifi as wifi_driver
# *** Paste here your device id and password generated by the ZDM ***
device_id = 'device-id'
password = 'device-password'
# *** Write here your wifi ssid and password
wifi_ssid = ''
wifi_password = ''
monitor = monitor.machinesMonitor()
# cnc and pump period indicates the interval in seconds between two data publish for each machine
cncPeriod = 10
pumpPeriod = 10
# this bool variables [default=false] are set remotely using jobs below
# when True, device sends also instantaneous values for the associated machine
instantCnc = False
instantPump = False
# acceptFOTA [default True] indicates if the device Fota si enabled or not
acceptFOTA = True
# jobs
def enable_instant_cnc():
global instantCnc
instantCnc = True
return "instant cnc enabled"
def disable_instant_cnc():
global instantCnc
instantCnc = False
return "instant cnc disabled"
def enable_instant_pump():
global instantPump
instantPump = True
return "instant pump enabled"
def disable_instant_pump():
global instantPump
instantPump = False
return "instant pump disabled"
# to set cnc period use the command set_cnc_period --arg period 12 device_id
def set_cnc_period(obj, args):
if not('period' in args['args']) or args['args']['period'] is None:
return "use zmd set_cnc_period --arg period [int] [device_id]"
global cncPeriod
monitor.resetCncVariables()
monitor.resetCncCounter()
cncPeriod = args['args']['period']
print("CNC period set remotely. New value:", cncPeriod)
return "CNC period set"
# to set pump period use the command set_pump_period --arg period 12 device_id
def set_pump_period(obj, args):
if not('period' in args['args']) or args['args']['period'] is None:
return "use zmd set_pump_period --arg period [int] [device_id]"
global pumpPeriod
monitor.resetPumpVariables()
monitor.resetPumpCounter()
pumpPeriod = args['args']['period']
print("Pump period set remotely. New value:", pumpPeriod)
return "Pump period set"
# turn on green lamp
def green_lamp_on():
print("###### GREEN LAMP IS ON ######")
return "green lamp ON"
# turn off green lamp
def green_lamp_off():
print("###### GREEN LAMP IS OFF ######")
return "green lamp OFF"
# turn on red lamp
def red_lamp_on():
print("###### RED LAMP IS ON ######")
return "red lamp ON"
#turn off red lamp
def red_lamp_off():
print("###### RED LAMP IS OFF ######")
return "red lamp OFF"
# to set cnc temp threshold use the command zdm cnc_temp_threshold --arg threshold [int] [device_id]
def cnc_temp_threshold(obj, args):
if not('threshold' in args['args']) or args['args']['threshold'] is None:
return "use zdm cnc_temp_threshold --arg threshold [int] [device_id]"
monitor.setCncTempThreshold(args['args']['threshold'])
print("CNC temp threshold set")
return "CNC temp threshold set"
# to set cnc amp threshold use the command zdm cnc_amp_threshold --arg threshold [int] [device_id]
def cnc_amp_threshold(obj, args):
if not('threshold' in args['args']) or args['args']['threshold'] is None:
return "use zdm cnc_amp_threshold --arg threshold [int] [device_id]"
monitor.setCncAmpThreshold(args['args']['threshold'])
print("CNC amp threshold set")
return "CNC amp threshold set"
# to set pump temp threshold use the command zdm pump_temp_threshold --arg threshold [int] [device_id]
def pump_temp_threshold(obj, args):
if not('threshold' in args['args']) or args['args']['threshold'] is None:
return "use zdm pump_temp_threshold --arg threshold [int] [device_id]"
monitor.setPumpTempThreshold(args['args']['threshold'])
print("Pump temp threshold set")
return "Pump temp threshold set"
# to set cnc amp threshold use the command zdm cnc_amp_threshold --arg threshold [int] [device_id]
def pump_amp_threshold(obj, args):
if not('threshold' in args['args']) or args['args']['threshold'] is None:
return "use zdm pump_amp_threshold --arg threshold [int] [device_id]"
monitor.setPumpAmpThreshold(args['args']['threshold'])
print("Pump amp threshold set")
return "Pump amp threshold set"
# toggleAcceptFota is a job used to change the acceptFOTA variable
def toggleAcceptFota():
global acceptFOTA
if acceptFOTA == True:
acceptFOTA = False
return "acceptFOTA: false"
else:
acceptFOTA = True
return "acceptFOTA: true"
# you can call your jobs using ZDM using names enable_instant_cnc,
# disable_instant_cnc, enable_instant_pump, disable_instant_pump
jobs = {
'enable_instant_cnc': enable_instant_cnc,
'disable_instant_cnc': disable_instant_cnc,
'enable_instant_pump': enable_instant_pump,
'disable_instant_pump': disable_instant_pump,
'set_cnc_period': set_cnc_period,
'set_pump_period': set_pump_period,
'green_lamp_on': green_lamp_on,
'green_lamp_off': green_lamp_off,
'red_lamp_on': red_lamp_on,
'red_lamp_off': red_lamp_off,
'cnc_temp_threshold': cnc_temp_threshold,
'cnc_amp_threshold': cnc_amp_threshold,
'pump_temp_threshold': pump_temp_threshold,
'pump_amp_threshold': pump_amp_threshold,
'toggle_accept_fota': toggleAcceptFota
}
# collectCNCData - returns a dict with three random values [temp, vibration, ampere]
def collectCNCData():
temp = random(10, 100)
vibration = random(-3, 3)
ampere = random(0, 15)
data = {
'temp': temp,
'vibration': vibration,
'ampere': ampere
}
if temp >= monitor.cncTempThres:
value = {"name":"CNC temp threshold", "message": "cnc temp treshold reached" }
device.send_event(value)
print("CNC Temp threshold reached! Event sent")
if ampere >= monitor.cncAmpThres:
value = {"name":"CNC amp threshold", "message": "cnc amp treshold reached" }
device.send_event(value)
print("CNC Temp threshold reached! Event sent")
return data
# collectPumpData - returns a dict with three random values [temp, press, ampere]
def collectPumpData():
temp = random(10, 100)
press = random(0, 20)
ampere = random(0, 15)
data = {
'temp': temp,
'press': press,
'ampere': ampere
}
if temp >= monitor.pumpTempThres:
value = {"name":"Pump temp threshold", "message": "pump temp treshold reached" }
device.send_event(value)
print("Pump Temp threshold reached! Event sent")
if ampere >= monitor.pumpAmpThres:
value = {"name":"Pump amp threshold", "message": "Pump amp treshold reached" }
device.send_event(value)
print("Pump Temp threshold reached! Event sent")
return data
# publishCncData publishes to the ZDM cnc payload generated by the machines monitor
def publishCncData():
cncPayload = monitor.getCncPayload()
device.publish(cncPayload, monitor.cncTag)
print("CNC data published:")
print(cncPayload)
def publishPumpData():
pumpPayload = monitor.getPumpPayload()
device.publish(pumpPayload, monitor.pumpTag)
print("Pump data published:")
print(pumpPayload)
# publishInstantCnc publishes to the ZDM instantenous CNC values
def publishInstantCnc(cnc):
cncPayload = {
'instCncTemp': cnc['temp'],
'instCncVibration': cnc['vibration'],
'instCncAmpere': cnc['ampere']
}
device.publish(cncPayload, monitor.cncTag)
print("Instant CNC values published:", cncPayload)
# publishInstantPump publishes to the ZDM instantenous pump values
def publishInstantPump(pump):
pumpPayload = {
'instPumpTemp': pump['temp'],
'instPumpPress': pump['press'],
'instPumpAmpere': pump['ampere']
}
device.publish(pumpPayload, monitor.pumpTag)
print("Instant pump values published:", pumpPayload)
def fotaCallback():
global acceptFOTA
if acceptFOTA == True:
return True
else:
return False
streams.serial()
try:
wifi_driver.auto_init()
for _ in range(5):
try:
print("connect wifi")
wifi.link(wifi_ssid, wifi.WIFI_WPA2, wifi_password)
print("connect wifi done")
break
except Exception as e:
print("wifi connect err", e)
# create a ZDM Device instance with id and the custom jobs
device = zdm.Device(device_id, jobs_dict=jobs, fota_callback=fotaCallback)
# set the device's password (jwt)
device.set_password(password)
# connect your device to ZDM enabling the device to receive incoming messages
device.connect()
while True:
sleep(1000)
global cncPeriod
global pumpPeriod
# read cnc values
cnc = collectCNCData()
# read pump values
pump = collectPumpData()
# increment cnc counter
monitor.incCncCounter()
# increment pump counter
monitor.incPumpCounter()
# update min max and avg values
monitor.updateStatisticalData(cnc, pump)
# get cnc and pump counter values. if it's equal to the period, device will send data
cncCnt = monitor.getCncCounter()
pumpCnt = monitor.getPumpCounter()
# after CncPeriod iterations, device will send to the ZDM two payloads with min, max and avg values
# using the corresponding tag for each machine
if cncCnt == cncPeriod:
publishCncData()
# call reset variables and counter after the data has been published to avoid zero divisions
monitor.resetCncVariables()
monitor.resetCncCounter()
if pumpCnt == pumpPeriod:
publishPumpData()
# call reset variables and counter after the data has been published to avoid zero divisions
monitor.resetPumpVariables()
monitor.resetPumpCounter()
# if instantCnc is True (set remotely with job), the device sends instantaneout CNC values
if instantCnc == True:
publishInstantCnc(cnc)
# if instantPump is True (set remotely with job), the device sends instantaneout Pump values
if instantPump == True:
publishInstantPump(pump)
except Exception as e:
print("main", e)