Continued to work on the MQTT demo project.

A few review comments added into the MQTT implementation.
This commit is contained in:
Richard Barry 2019-07-24 00:27:14 +00:00
parent 53842d4cac
commit fe4511b35e
21 changed files with 244 additions and 166 deletions

View file

@ -140,31 +140,31 @@ static void prvMQTTDemoTask( void *pvParameters );
/**
* @brief The callback invoked by the MQTT library when the MQTT connection gets
* disconnected.
*
*
* @param[in] pvCallbackContext Callback context as provided at the time of
* connect.
* @param[in] pxCallbackParams Contains the reason why the MQTT connection was
* disconnected.
*/
static void prvExample_DisconnectCallback( void * pvCallbackContext,
IotMqttCallbackParam_t * pxCallbackParams );
static void prvExample_OnDisconnect( void * pvCallbackContext,
IotMqttCallbackParam_t * pxCallbackParams );
/**
* @brief The callback invoked by the MQTT library when a message is received on
* a subscribed topic from the MQTT broker.
*
*
* @param[in] pvCallbackContext Callback context as provided at the time of
* subscribe.
* @param[in] pxCallbackParams Contain the details about the received message -
* @param[in] pxCallbackParams Contain the details about the received message -
* topic on which the message was received, the received message.
*/
static void prvExample_PublishCallback( void * pvCallbackContext,
IotMqttCallbackParam_t * pxCallbackParams );
static void prvExample_OnMessageReceived( void * pvCallbackContext,
IotMqttCallbackParam_t * pxCallbackParams );
/**
* @brief Connects to the MQTT broker as specified in mqttexampleMQTT_BROKER_ENDPOINT
* and mqttexampleMQTT_BROKER_PORT.
*
*
* @note This example does not use TLS and therefore will not work with MQTT.
*/
static void prvMQTTConnect( void );
@ -191,7 +191,7 @@ static void prvMQTTUnsubscribe( void );
static void prvMQTTDisconnect( void );
/*-----------------------------------------------------------*/
static void prvExample_DisconnectCallback( void * pvCallbackContext,
static void prvExample_OnDisconnect( void * pvCallbackContext,
IotMqttCallbackParam_t * pxCallbackParams )
{
TaskHandle_t xDemoTaskHandle = ( TaskHandle_t ) pvCallbackContext;
@ -207,7 +207,7 @@ TaskHandle_t xDemoTaskHandle = ( TaskHandle_t ) pvCallbackContext;
}
/*-----------------------------------------------------------*/
static void prvExample_PublishCallback( void * pvCallbackContext,
static void prvExample_OnMessageReceived( void * pvCallbackContext,
IotMqttCallbackParam_t * pxCallbackParams )
{
TaskHandle_t xDemoTaskHandle = ( TaskHandle_t ) pvCallbackContext;
@ -227,6 +227,12 @@ TaskHandle_t xDemoTaskHandle = ( TaskHandle_t ) pvCallbackContext;
/* Ensure that the message QoS is as expected. */
configASSERT( pxCallbackParams->u.message.info.qos == IOT_MQTT_QOS_1 );
/* Although this print uses the constants rather than the data from the
message payload the asserts above have already checked the message payload
equals the constants, and it is more efficient not to have to worry about
lengths in the print. */
configPRINTF( ( "Received %s from topic %s\r\n", mqttexampleMESSAGE, mqttexampleTOPIC ) );
/* Inform the demo task about the message received from the MQTT broker. */
xTaskNotify( xDemoTaskHandle,
mqttexampleMESSAGE_RECEIVED_BIT,
@ -252,7 +258,8 @@ void vStartSimpleMQTTDemo( void )
static void prvMQTTDemoTask( void *pvParameters )
{
IotMqttError_t xResult;
uint32_t ulNotificationValue = 0;
uint32_t ulNotificationValue = 0, ulIterations;
const uint32_t ulMaxIterations = 5UL;
const TickType_t xNoDelay = ( TickType_t ) 0;
/* Remove compiler warnings about unused parameters. */
@ -272,55 +279,77 @@ const TickType_t xNoDelay = ( TickType_t ) 0;
/* Don't expect any notifications to be pending yet. */
configASSERT( ulTaskNotifyTake( pdTRUE, xNoDelay ) == 0 );
/******************** CONNECT ****************************************/
/* Establish a connection to the MQTT broker. This example connects to
* the MQTT broker as specified in mqttexampleMQTT_BROKER_ENDPOINT and
* mqttexampleMQTT_BROKER_PORT at the top of this file. Please change
* it to the MQTT broker you want to connect to. Note that this example
* does not use TLS and therefore will not work with AWS IoT. */
prvMQTTConnect();
configPRINTF( ( "Connected to %s\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );
/* Subscribe to the topic as specified in mqttexampleTOPIC at the top
* of this file. */
/******************* SUBSCRIBE ***************************************/
/* The client is now connected to the broker. Subscribe to the topic
as specified in mqttexampleTOPIC at the top of this file. This client
will then publish to the same topic it subscribed to, so will expect
all the messages it sends to the broker to be sent back to it from the
broker. */
prvMQTTSubscribe();
configPRINTF( ( "Subscribed to %s\r\n", mqttexampleTOPIC ) );
/* Publish a message on the mqttexampleTOPIC topic as specified at the
* top of this file. */
prvMQTTPublish();
/* Since we are subscribed on the same topic, we will get the same
* message back from the MQTT broker. Wait for the message to be
* received which is informed to us by the publish callback
* (prvExample_PublishCallback) by setting the mqttexampleMESSAGE_RECEIVED_BIT
* in this task's notification value. */
xTaskNotifyWait( 0UL, /* Don't clear any bits on entry. */
0UL, /* Don't clear any bits on exit. */
&( ulNotificationValue ), /* Obtain the notification value. */
pdMS_TO_TICKS( mqttexampleMQTT_TIMEOUT_MS ) );
configASSERT( ( ulNotificationValue & mqttexampleMESSAGE_RECEIVED_BIT ) == mqttexampleMESSAGE_RECEIVED_BIT );
/******************* PUBLISH 5 TIMES *********************************/
/* Unsubscribe from the topic mqttexampleTOPIC. */
/* Publish a few messages while connected. */
for( ulIterations = 0; ulIterations < ulMaxIterations; ulIterations++ )
{
/* Publish a message on the mqttexampleTOPIC topic as specified at the
* top of this file. */
prvMQTTPublish();
configPRINTF( ( "Published %s to %s\r\n", mqttexampleMESSAGE, mqttexampleTOPIC ) );
/* Since we are subscribed on the same topic, we will get the same
* message back from the MQTT broker. Wait for the message to be
* received which is informed to us by the publish callback
* (prvExample_OnMessageReceived) by setting the
* mqttexampleMESSAGE_RECEIVED_BIT in this task's notification
value. */
xTaskNotifyWait( 0UL, /* Don't clear any bits on entry. */
mqttexampleMESSAGE_RECEIVED_BIT, /* Clear bit on exit. */
&( ulNotificationValue ), /* Obtain the notification value. */
pdMS_TO_TICKS( mqttexampleMQTT_TIMEOUT_MS ) );
configASSERT( ( ulNotificationValue & mqttexampleMESSAGE_RECEIVED_BIT ) == mqttexampleMESSAGE_RECEIVED_BIT );
}
/******************* UNSUBSCRIBE AND DISCONNECT **********************/
/* Unsubscribe from the topic mqttexampleTOPIC the disconnect
gracefully. */
prvMQTTUnsubscribe();
/* Gracefully disconnect from the MQTT broker by sending an MQTT
* DISCONNECT message. */
prvMQTTDisconnect();
configPRINTF( ( "Disconnected from from %s\r\n\r\n", mqttexampleMQTT_BROKER_ENDPOINT ) );
/* Wait for the disconnect operation to complete which is informed to us
* by the disconnect callback (prvExample_DisconnectCallback)by setting
* by the disconnect callback (prvExample_OnDisconnect)by setting
* the mqttexampleDISCONNECTED_BIT in this task's notification value.
* Note that all bits are cleared in the task's notification value to
* ensure that it is ready for the next run. */
xTaskNotifyWait( 0UL, /* Don't clear any bits on entry. */
portMAX_DELAY, /* Clear all bits on exit - portMAX_DELAY is used as it is a portable way of having all bits set. */
mqttexampleDISCONNECTED_BIT, /* Clear bit again on exit. */
&( ulNotificationValue ), /* Obtain the notification value. */
pdMS_TO_TICKS( mqttexampleMQTT_TIMEOUT_MS ) );
configASSERT( ( ulNotificationValue & mqttexampleDISCONNECTED_BIT ) == mqttexampleDISCONNECTED_BIT );
printf( "prvMQTTDemoTask() completed an iteration without hitting an assert.\r\n" );
fflush( stdout );
/* Wait for some time between two iterations to ensure that we do not
* bombard the public test mosquitto broker. */
configPRINTF( ( "prvMQTTDemoTask() completed an iteration without hitting an assert. Total free heap is %u\r\n\r\n", xPortGetFreeHeapSize() ) );
vTaskDelay( pdMS_TO_TICKS( 5000 ) );
}
}
@ -351,11 +380,12 @@ IotMqttConnectInfo_t xConnectInfo = IOT_MQTT_CONNECT_INFO_INITIALIZER;
xNetworkInfo.pNetworkInterface = IOT_NETWORK_INTERFACE_FREERTOS;
/* Setup the callback which is called when the MQTT connection is disconnected. */
xNetworkInfo.disconnectCallback.pCallbackContext = ( void * ) xTaskGetCurrentTaskHandle();
xNetworkInfo.disconnectCallback.function = prvExample_DisconnectCallback;
xNetworkInfo.disconnectCallback.pCallbackContext = ( void * ) xTaskGetCurrentTaskHandle();//_RB_ Why the task handle?
xNetworkInfo.disconnectCallback.function = prvExample_OnDisconnect;
/****************** MQTT Connection information setup. ********************/
/* This example does not use TLS and therefore won't work with AWS IoT. */
/* Set this flag to true if connecting to the AWS IoT MQTT broker. This
example does not use TLS and therefore won't work with AWS IoT. */
xConnectInfo.awsIotMqttMode = false;
/* Start with a clean session i.e. direct the MQTT broker to discard any
@ -373,11 +403,13 @@ IotMqttConnectInfo_t xConnectInfo = IOT_MQTT_CONNECT_INFO_INITIALIZER;
* client gets disconnected. */
xConnectInfo.pWillInfo = NULL;
/* Send an MQTT PING request every minute. */
/* Send an MQTT PING request every minute to keep the connection open if
there is no other MQTT trafic. */
xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_SECONDS;
/* The client identifier is used to uniquely identify this MQTT client to
* the MQTT broker. */
* the MQTT broker. In a production device the identifier can be something
unique, such as a device serial number. */
xConnectInfo.pClientIdentifier = mqttexampleCLIENT_IDENTIFIER;
xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( mqttexampleCLIENT_IDENTIFIER );
@ -389,7 +421,7 @@ IotMqttConnectInfo_t xConnectInfo = IOT_MQTT_CONNECT_INFO_INITIALIZER;
xConnectInfo.passwordLength = 0;
/* Establish the connection to the MQTT broker - It is a blocking call and
will return only when connection is complete. */
will return only when connection is complete or a timeout occurrs. */
xResult = IotMqtt_Connect( &( xNetworkInfo ),
&( xConnectInfo ),
mqttexampleMQTT_TIMEOUT_MS,
@ -408,7 +440,7 @@ IotMqttSubscription_t xMQTTSubscription;
xMQTTSubscription.pTopicFilter = mqttexampleTOPIC;
xMQTTSubscription.topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );
xMQTTSubscription.callback.pCallbackContext = ( void * ) xTaskGetCurrentTaskHandle();
xMQTTSubscription.callback.function = prvExample_PublishCallback;
xMQTTSubscription.callback.function = prvExample_OnMessageReceived;
/* Use the synchronous API to subscribe - It is a blocking call and only
* returns when the subscribe operation is complete. */