Source code for src.lambda_mqtt_manager

"""
.. module:: lambda_mqtt_manager
    :synopsis: define mqtt manager responsible for all communications with amazon aws mqtt server

.. moduleauthor:: Khoi Trinh
"""

import sys
import os
# Imports for v3 validation
sys.path.append(os.path.abspath(os.path.join(
    os.path.dirname(__file__), '../')))
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
from AWSIoTPythonSDK.MQTTLib import DROP_OLDEST
import mqtt_constant
import lambda_master_handler
import iot_object
import logging
import time
import json
import copy

# source: https://github.com/aws/aws-iot-device-sdk-python/blob/master/samples/basicPubSub/basicPubSub.py

# TODO: implement a queue for message handling

# source: https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python


[docs]class MqttManager: def __init__(self): """ Constructor for Mqtt Manager, create logger, connect to AWS mqtt client and grab the list of IoT devices that are available :param self: """ self.awsDeviceList = [] """This of aws profiles of the supported IoT devices, used during discovery request to advertise available devices""" self.iotObjList = iot_object.IOT_OBJ_LIST """List of IoT mcu that this backend can talk to to fulfill Alexa request, it will also advertise this list to Alexa using Discovery Directives""" self.createLogger() self.createAWSClient()
[docs] def createLogger(self): """ Create a logger to log mqtt messages :param self: instance of MqttManager """ # Configure logging self._logger = logging.getLogger("AWSIoTPythonSDK.core") self._logger.setLevel(logging.DEBUG) streamHandler = logging.StreamHandler() formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) self._logger.addHandler(streamHandler)
[docs] def createAWSClient(self): """ Connect to AWS mqtt server using credentials included with the lambda deployment package, most settings are stored in the mqtt_constant module :param self: instance of MqttManager """ # configure AWS client with credentials and connection info with open(mqtt_constant.ENDPT_FILE_PATH, 'r') as idFile: iotID = idFile.read().replace('\n', '') self._myAWSIoTMQTTClient = None self._myAWSIoTMQTTClient = AWSIoTMQTTClient(mqtt_constant.CLIENT_ID) self._myAWSIoTMQTTClient.configureEndpoint( iotID + mqtt_constant.AWS_ENDPOINT, mqtt_constant.MQTT_PORT) self._myAWSIoTMQTTClient.configureCredentials( mqtt_constant.AUTHORITY_CERT_PATH, mqtt_constant.PRIVATE_KEY_PATH, mqtt_constant.CERTIFICATE_PATH) # AWSIoTMQTTClient connection configuration self._myAWSIoTMQTTClient.configureAutoReconnectBackoffTime( mqtt_constant.INITIAL_BACKOFF_TIME, mqtt_constant.MAX_BACKOFF_TIME, mqtt_constant.STABLE_TIME) self._myAWSIoTMQTTClient.configureOfflinePublishQueueing( mqtt_constant.OFFLINE_PUB_QUEUE, DROP_OLDEST) self._myAWSIoTMQTTClient.configureDrainingFrequency( mqtt_constant.DRAINING_FREQ) self._myAWSIoTMQTTClient.configureConnectDisconnectTimeout( mqtt_constant.CONNECT_DISCONNECT_TIMEOUT) # 10 sec self._myAWSIoTMQTTClient.configureMQTTOperationTimeout( mqtt_constant.OPERATION_TIMEOUT) while False == self._myAWSIoTMQTTClient.connect(mqtt_constant.KEEP_ALIVE_SECONDS): pass # AWS subscription configuration for iotObject in iot_object.IOT_OBJ_LIST: self._myAWSIoTMQTTClient.subscribe( iotObject.pubTopic, mqtt_constant.SUB_QOS, lambda_master_handler.MasterHandler.subCallBack) self.awsDeviceList.append(iotObject.awsObjectProfile)
[docs] def mqttPub(self, package, pubTopic): """ Publish the given package to the supplied mqtt topic :param self: instance of MqttManager :param package: json dict to be published :param pubTopic: topic to publish to """ message = {} # check if we are in debug mode, if not, proceed normally if "Alexa.Debug" == package["namespace"]: message["properties"] = [] message["properties"].append(package) else: message = package messageJson = json.dumps(message) self._myAWSIoTMQTTClient.publish( pubTopic, messageJson, mqtt_constant.PUB_QOS)