Small fixes for MQTT Serializer and MQTT Keep-alive Demos (#336)

* Some update to the demo comments and other fixes.

- Delete the leading underscore in the milliseconds helper macros.
- Add articles to some comments.
- Add clarify to come comments.
- Delete the extraneous line endings of log statements.
- Delete unused headers in the keep-alive demo.

Co-authored-by: Oscar Michael Abrina <abrinao@amazon.com>
This commit is contained in:
SarenaAWS 2020-10-13 14:24:29 -07:00 committed by GitHub
parent c0591f4658
commit f1d80ffc35
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 121 additions and 112 deletions

View file

@ -25,13 +25,17 @@
*/ */
/* /*
* Demo for showing use of the managed MQTT API. * Demo that shows use of the MQTT API without its keep-alive feature.
* This demo instead implements the keep-alive functionality in the application.
* *
* The Example shown below uses this API to create MQTT messages and * The example shown below uses this API to create MQTT messages and
* send them over the connection established using FreeRTOS sockets. * send them over the TCP connection established using a FreeRTOS sockets
* The example is single threaded and uses statically allocated memory; * based transport interface implementation.
* it uses QOS0 and therefore does not implement any retransmission * It shows how the MQTT API can be used without the keep-alive feature,
* mechanism for Publish messages. * so that the application can implements its own keep-alive functionality
* for MQTT. The example is single threaded and uses statically allocated memory;
* it uses QOS0, and therefore it does not implement any retransmission
* mechanism for publish messages.
* *
* !!! NOTE !!! * !!! NOTE !!!
* This MQTT demo does not authenticate the server nor the client. * This MQTT demo does not authenticate the server nor the client.
@ -46,10 +50,6 @@
#include "FreeRTOS.h" #include "FreeRTOS.h"
#include "task.h" #include "task.h"
/* FreeRTOS+TCP includes. */
#include "FreeRTOS_IP.h"
#include "FreeRTOS_Sockets.h"
/* Demo Specific configs. */ /* Demo Specific configs. */
#include "demo_config.h" #include "demo_config.h"
@ -139,9 +139,9 @@
/** /**
* @brief Keep alive time reported to the broker while establishing an MQTT connection. * @brief Keep alive time reported to the broker while establishing an MQTT connection.
* *
* It is the responsibility of the Client to ensure that the interval between * It is the responsibility of the client to ensure that the interval between
* Control Packets being sent does not exceed the this Keep Alive value. In the * control packets being sent does not exceed the this keep-alive value. In the
* absence of sending any other Control Packets, the Client MUST send a * absence of sending any other control packets, the client MUST send a
* PINGREQ Packet. * PINGREQ Packet.
*/ */
#define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 60U ) #define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 60U )
@ -176,8 +176,8 @@
/*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/
#define _MILLISECONDS_PER_SECOND ( 1000U ) /**< @brief Milliseconds per second. */ #define MILLISECONDS_PER_SECOND ( 1000U ) /**< @brief Milliseconds per second. */
#define _MILLISECONDS_PER_TICK ( _MILLISECONDS_PER_SECOND / configTICK_RATE_HZ ) /**< Milliseconds per FreeRTOS tick. */ #define MILLISECONDS_PER_TICK ( MILLISECONDS_PER_SECOND / configTICK_RATE_HZ ) /**< Milliseconds per FreeRTOS tick. */
/*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/
@ -379,7 +379,7 @@ static uint32_t ulPingReqSendTimeMs;
/** /**
* @brief Timeout for a pending PINGRESP from the MQTT broker. * @brief Timeout for a pending PINGRESP from the MQTT broker.
*/ */
static uint32_t ulPingRespTimeoutMs = ( mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS / 4 ) * _MILLISECONDS_PER_SECOND; static uint32_t ulPingRespTimeoutMs = ( mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS / 4 ) * MILLISECONDS_PER_SECOND;
/** /**
* @brief Static buffer used to hold an MQTT PINGREQ packet for keep-alive mechanism. * @brief Static buffer used to hold an MQTT PINGREQ packet for keep-alive mechanism.
@ -390,7 +390,9 @@ const static MQTTFixedBuffer_t xPingReqBuffer =
.size = MQTT_PACKET_PINGREQ_SIZE .size = MQTT_PACKET_PINGREQ_SIZE
}; };
/** @brief Static buffer used to hold MQTT messages being sent and received. */ /**
* @brief Static buffer used to hold MQTT messages being sent and received.
*/
static MQTTFixedBuffer_t xBuffer = static MQTTFixedBuffer_t xBuffer =
{ {
.pBuffer = ucSharedBuffer, .pBuffer = ucSharedBuffer,
@ -400,7 +402,8 @@ static MQTTFixedBuffer_t xBuffer =
/*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/
/** /**
* @brief Create the task that demonstrates the Plain text MQTT API Demo. * @brief Create the task that demonstrates the coreMQTT API over a plaintext TCP
* connection.
*/ */
void vStartSimpleMQTTDemo( void ) void vStartSimpleMQTTDemo( void )
{ {
@ -440,17 +443,18 @@ static void prvMQTTDemoTask( void * pvParameters )
{ {
/****************************** Connect. ******************************/ /****************************** Connect. ******************************/
/* Attempt to connect to the MQTT broker. If connection fails, retry after /* Attempt to connect to the MQTT broker. If connection fails, retry
* a timeout. Timeout value will be exponentially increased until the maximum * after a timeout. The timeout value will be exponentially increased
* number of attempts are reached or the maximum timeout value is reached. * until the maximum number of attempts are reached or the maximum
* The function returns a failure status if the TCP connection cannot be * timeout value is reached. The function below returns a failure status
* established to the broker after the configured number of attempts. */ * if the TCP connection cannot be established to the broker after
* the configured number of attempts. */
xNetworkStatus = prvConnectToServerWithBackoffRetries( &xNetworkContext ); xNetworkStatus = prvConnectToServerWithBackoffRetries( &xNetworkContext );
configASSERT( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS ); configASSERT( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS );
/* Sends an MQTT Connect packet over the already connected TCP socket, /* Sends an MQTT Connect packet over the already connected TCP socket,
* and waits for connection acknowledgment (CONNACK) packet. */ * and waits for connection acknowledgment (CONNACK) packet. */
LogInfo( ( "Creating an MQTT connection to %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); LogInfo( ( "Creating an MQTT connection to %s.", democonfigMQTT_BROKER_ENDPOINT ) );
prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext ); prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext );
/* Create an auto-reload timer to handle keep-alive. */ /* Create an auto-reload timer to handle keep-alive. */
@ -468,44 +472,44 @@ static void prvMQTTDemoTask( void * pvParameters )
/**************************** Subscribe. ******************************/ /**************************** Subscribe. ******************************/
/* If server rejected the subscription request, attempt to resubscribe to /* If the server rejected the subscription request, attempt to resubscribe
* topic. Attempts are made according to the exponential backoff retry * to the topic. Attempts are made according to the exponential backoff retry
* strategy implemented in retryUtils. */ * strategy declared in retry_utils.h. */
prvMQTTSubscribeWithBackoffRetries( &xMQTTContext ); prvMQTTSubscribeWithBackoffRetries( &xMQTTContext );
/********************* Publish and Receive Loop. **********************/ /********************* Publish and Receive Loop. **********************/
/* Publish messages with QOS0, send and process Keep alive messages. */ /* Publish messages with QOS0, send and process keep-alive messages. */
for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ ) for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ )
{ {
LogInfo( ( "Publish to the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); LogInfo( ( "Publish to the MQTT topic %s.", mqttexampleTOPIC ) );
prvMQTTPublishToTopic( &xMQTTContext ); prvMQTTPublishToTopic( &xMQTTContext );
/* Process incoming publish echo, since application subscribed to the /* Process the incoming publish echo. Since the application subscribed to
* same topic the broker will send publish message back to the * the same topic, the broker will send the same publish message back
* application. */ * to the application. */
LogInfo( ( "Attempt to receive publish message from broker.\r\n" ) ); LogInfo( ( "Attempt to receive publish message from broker." ) );
xMQTTStatus = MQTT_ReceiveLoop( &xMQTTContext, mqttexampleRECEIVE_LOOP_TIMEOUT_MS ); xMQTTStatus = MQTT_ReceiveLoop( &xMQTTContext, mqttexampleRECEIVE_LOOP_TIMEOUT_MS );
configASSERT( xMQTTStatus == MQTTSuccess ); configASSERT( xMQTTStatus == MQTTSuccess );
/* Leave Connection Idle for some time. */ /* Leave Connection Idle for some time. */
LogInfo( ( "Keeping Connection Idle...\r\n\r\n" ) ); LogInfo( ( "Keeping Connection Idle..." ) );
vTaskDelay( mqttexampleDELAY_BETWEEN_PUBLISHES ); vTaskDelay( mqttexampleDELAY_BETWEEN_PUBLISHES );
} }
/******************** Unsubscribe from the topic. *********************/ /******************** Unsubscribe from the topic. *********************/
LogInfo( ( "Unsubscribe from the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); LogInfo( ( "Unsubscribe from the MQTT topic %s.", mqttexampleTOPIC ) );
prvMQTTUnsubscribeFromTopic( &xMQTTContext ); prvMQTTUnsubscribeFromTopic( &xMQTTContext );
/* Process Incoming packet from the broker. */ /* Process an incoming packet from the broker. */
xMQTTStatus = MQTT_ReceiveLoop( &xMQTTContext, mqttexampleRECEIVE_LOOP_TIMEOUT_MS ); xMQTTStatus = MQTT_ReceiveLoop( &xMQTTContext, mqttexampleRECEIVE_LOOP_TIMEOUT_MS );
configASSERT( xMQTTStatus == MQTTSuccess ); configASSERT( xMQTTStatus == MQTTSuccess );
/**************************** Disconnect. *****************************/ /**************************** Disconnect. *****************************/
/* Send an MQTT Disconnect packet over the already connected TCP socket. /* Send an MQTT disconnect packet over the connected TCP socket.
* There is no corresponding response for the disconnect packet. After * There is no corresponding response for the disconnect packet. After
* sending disconnect, client must close the network connection. */ * sending the disconnect, the client must close the network connection. */
LogInfo( ( "Disconnecting the MQTT connection with %s.\r\n", LogInfo( ( "Disconnecting the MQTT connection with %s.",
democonfigMQTT_BROKER_ENDPOINT ) ); democonfigMQTT_BROKER_ENDPOINT ) );
xMQTTStatus = MQTT_Disconnect( &xMQTTContext ); xMQTTStatus = MQTT_Disconnect( &xMQTTContext );
configASSERT( xMQTTStatus == MQTTSuccess ); configASSERT( xMQTTStatus == MQTTSuccess );
@ -518,8 +522,8 @@ static void prvMQTTDemoTask( void * pvParameters )
xNetworkStatus = Plaintext_FreeRTOS_Disconnect( &xNetworkContext ); xNetworkStatus = Plaintext_FreeRTOS_Disconnect( &xNetworkContext );
configASSERT( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS ); configASSERT( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS );
/* Reset SUBACK status for each topic filter after completion of subscription /* Reset the SUBACK status for each topic filter after completion of the
* request cycle. */ * subscription request cycle. */
for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ )
{ {
xTopicFilterContext[ ulTopicCount ].xSubAckStatus = MQTTSubAckFailure; xTopicFilterContext[ ulTopicCount ].xSubAckStatus = MQTTSubAckFailure;
@ -528,10 +532,10 @@ static void prvMQTTDemoTask( void * pvParameters )
/* Wait for some time between two iterations to ensure that we do not /* Wait for some time between two iterations to ensure that we do not
* bombard the broker. */ * bombard the broker. */
LogInfo( ( "prvMQTTDemoTask() completed an iteration successfully. " LogInfo( ( "prvMQTTDemoTask() completed an iteration successfully. "
"Total free heap is %u.\r\n", "Total free heap is %u.",
xPortGetFreeHeapSize() ) ); xPortGetFreeHeapSize() ) );
LogInfo( ( "Demo completed successfully.\r\n" ) ); LogInfo( ( "Demo completed successfully." ) );
LogInfo( ( "Short delay before starting the next iteration.... \r\n\r\n" ) ); LogInfo( ( "Short delay before starting the next iteration.... \r\n" ) );
vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ); vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS );
} }
} }
@ -608,20 +612,21 @@ static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext,
( void ) memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) ); ( void ) memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) );
/* Start with a clean session i.e. direct the MQTT broker to discard any /* Start with a clean session i.e. direct the MQTT broker to discard any
* previous session data. Also, establishing a connection with clean session * previous session data. Also, establishing a connection with a clean session
* will ensure that the broker does not store any data when this client * will ensure that the broker does not store any data when this client
* gets disconnected. */ * gets disconnected. */
xConnectInfo.cleanSession = true; xConnectInfo.cleanSession = true;
/* The client identifier is used to uniquely identify this MQTT client to /* The client identifier is used to uniquely identify this MQTT client to
* the MQTT broker. In a production device the identifier can be something * the MQTT broker. In a production device, the identifier can be something
* unique, such as a device serial number. */ * unique, such as a device serial number. */
xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER; xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER;
xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER ); xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER );
/* Set MQTT keep-alive period. It is the responsibility of the application to ensure /* Set MQTT keep-alive period. It is the responsibility of the application
* that the interval between Control Packets being sent does not exceed the Keep Alive value. * to ensure that the interval between control packets being sent does not
* In the absence of sending any other Control Packets, the Client MUST send a PINGREQ Packet. */ * exceed the keep-alive value. In the absence of sending any other control
* packets, the client MUST send a PINGREQ Packet. */
xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS; xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS;
/* Send MQTT CONNECT packet to broker. LWT is not used in this demo, so it /* Send MQTT CONNECT packet to broker. LWT is not used in this demo, so it
@ -664,7 +669,7 @@ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext )
bool xFailedSubscribeToTopic = false; bool xFailedSubscribeToTopic = false;
uint32_t ulTopicCount = 0U; uint32_t ulTopicCount = 0U;
/* Some fields not used by this demo so start with everything at 0. */ /* Some fields are not used by this demo so start with everything at 0. */
( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );
/* Get a unique packet id. */ /* Get a unique packet id. */
@ -687,9 +692,9 @@ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext )
* subscribe packet then waiting for a subscribe acknowledgment (SUBACK). * subscribe packet then waiting for a subscribe acknowledgment (SUBACK).
* This client will then publish to the same topic it subscribed to, so it * This client will then publish to the same topic it subscribed to, so it
* will expect all the messages it sends to the broker to be sent back to it * will expect all the messages it sends to the broker to be sent back to it
* from the broker. This demo uses QOS0 in Subscribe, therefore, the Publish * from the broker. This demo uses QOS0 in Subscribe. Therefore, the publish
* messages received from the broker will have QOS0. */ * messages received from the broker will have QOS0. */
LogInfo( ( "Attempt to subscribe to the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); LogInfo( ( "Attempt to subscribe to the MQTT topic %s.", mqttexampleTOPIC ) );
xResult = MQTT_Subscribe( pxMQTTContext, xResult = MQTT_Subscribe( pxMQTTContext,
xMQTTSubscription, xMQTTSubscription,
sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ),
@ -700,18 +705,19 @@ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext )
/* Process incoming packet from the broker. After sending the subscribe, the /* Process incoming packet from the broker. After sending the subscribe, the
* client may receive a publish before it receives a subscribe ack. Therefore, * client may receive a publish before it receives a subscribe ack. Therefore,
* call generic incoming packet processing function. Since this demo is * call the generic incoming packet processing function. Since this demo is
* subscribing to the topic to which no one is publishing, probability of * subscribing to the topic to which no one is publishing, probability of
* receiving Publish message before subscribe ack is zero; but application * receiving a publish message before a subscribe ack is zero; but the application
* must be ready to receive any packet. This demo uses the generic packet * must be ready to receive any packet. This demo uses the generic packet
* processing function everywhere to highlight this fact. */ * processing function everywhere to highlight this fact. */
xResult = MQTT_ProcessLoop( pxMQTTContext, mqttexampleRECEIVE_LOOP_TIMEOUT_MS ); xResult = MQTT_ProcessLoop( pxMQTTContext, mqttexampleRECEIVE_LOOP_TIMEOUT_MS );
configASSERT( xResult == MQTTSuccess ); configASSERT( xResult == MQTTSuccess );
/* Check if recent subscription request has been rejected. #xTopicFilterContext is updated /* Check if the recent subscription request has been rejected. #xTopicFilterContext
* in the event callback to reflect the status of the SUBACK sent by the broker. It represents * is updated in the event callback to reflect the status of the SUBACK
* either the QoS level granted by the server upon subscription, or acknowledgement of * sent by the broker. It represents either the QoS level granted by the
* server rejection of the subscription request. */ * server upon subscription or acknowledgement of server rejection of the
* subscription request. */
for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ )
{ {
if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus == MQTTSubAckFailure ) if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus == MQTTSubAckFailure )
@ -740,7 +746,7 @@ static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext )
* asserts(). * asserts().
***/ ***/
/* Some fields not used by this demo so start with everything at 0. */ /* Some fields are not used by this demo so start with everything at 0. */
( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) ); ( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) );
/* This demo uses QoS0. */ /* This demo uses QoS0. */
@ -751,7 +757,7 @@ static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext )
xMQTTPublishInfo.pPayload = mqttexampleMESSAGE; xMQTTPublishInfo.pPayload = mqttexampleMESSAGE;
xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE ); xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE );
/* Send PUBLISH packet. Packet ID is not used for a QoS0 publish. */ /* Send a PUBLISH packet. Packet ID is not used for a QoS0 publish. */
xResult = MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, 0U ); xResult = MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, 0U );
configASSERT( xResult == MQTTSuccess ); configASSERT( xResult == MQTTSuccess );
@ -766,7 +772,7 @@ static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext )
MQTTStatus_t xResult; MQTTStatus_t xResult;
MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ]; MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ];
/* Some fields not used by this demo so start with everything at 0. */ /* Some fields are not used by this demo, so start with everything at 0. */
( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );
/* Get a unique packet id. */ /* Get a unique packet id. */
@ -778,10 +784,10 @@ static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext )
xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC;
xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );
/* Get next unique packet identifier. */ /* Get the next unique packet identifier. */
usUnsubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); usUnsubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext );
/* Send UNSUBSCRIBE packet. */ /* Send the UNSUBSCRIBE packet. */
xResult = MQTT_Unsubscribe( pxMQTTContext, xResult = MQTT_Unsubscribe( pxMQTTContext,
xMQTTSubscription, xMQTTSubscription,
sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ),
@ -810,7 +816,7 @@ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket,
{ {
if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus != MQTTSubAckFailure ) if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus != MQTTSubAckFailure )
{ {
LogInfo( ( "Subscribed to the topic %s with maximum QoS %u.\r\n", LogInfo( ( "Subscribed to the topic %s with maximum QoS %u.",
xTopicFilterContext[ ulTopicCount ].pcTopicFilter, xTopicFilterContext[ ulTopicCount ].pcTopicFilter,
xTopicFilterContext[ ulTopicCount ].xSubAckStatus ) ); xTopicFilterContext[ ulTopicCount ].xSubAckStatus ) );
} }
@ -821,19 +827,19 @@ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket,
break; break;
case MQTT_PACKET_TYPE_UNSUBACK: case MQTT_PACKET_TYPE_UNSUBACK:
LogInfo( ( "Unsubscribed from the topic %s.\r\n", mqttexampleTOPIC ) ); LogInfo( ( "Unsubscribed from the topic %s.", mqttexampleTOPIC ) );
/* Make sure ACK packet identifier matches with Request packet identifier. */ /* Make sure ACK packet identifier matches with Request packet identifier. */
configASSERT( usUnsubscribePacketIdentifier == usPacketId ); configASSERT( usUnsubscribePacketIdentifier == usPacketId );
break; break;
case MQTT_PACKET_TYPE_PINGRESP: case MQTT_PACKET_TYPE_PINGRESP:
xWaitingForPingResp = false; xWaitingForPingResp = false;
LogInfo( ( "Ping Response successfully received.\r\n" ) ); LogInfo( ( "Ping Response successfully received." ) );
break; break;
/* Any other packet type is invalid. */ /* Any other packet type is invalid. */
default: default:
LogWarn( ( "prvMQTTProcessResponse() called with unknown packet type:(%02X).\r\n", LogWarn( ( "prvMQTTProcessResponse() called with unknown packet type:(%02X).",
pxIncomingPacket->type ) ); pxIncomingPacket->type ) );
} }
} }
@ -851,8 +857,8 @@ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo )
if( ( pxPublishInfo->topicNameLength == strlen( mqttexampleTOPIC ) ) && if( ( pxPublishInfo->topicNameLength == strlen( mqttexampleTOPIC ) ) &&
( 0 == strncmp( mqttexampleTOPIC, pxPublishInfo->pTopicName, pxPublishInfo->topicNameLength ) ) ) ( 0 == strncmp( mqttexampleTOPIC, pxPublishInfo->pTopicName, pxPublishInfo->topicNameLength ) ) )
{ {
LogInfo( ( "\r\nIncoming Publish Topic Name: %.*s matches subscribed topic.\r\n" LogInfo( ( "Incoming Publish Topic Name: %.*s matches subscribed topic.\r\n"
"Incoming Publish Message : %.*s\r\n", "Incoming Publish Message : %.*s",
pxPublishInfo->topicNameLength, pxPublishInfo->topicNameLength,
pxPublishInfo->pTopicName, pxPublishInfo->pTopicName,
pxPublishInfo->payloadLength, pxPublishInfo->payloadLength,
@ -860,7 +866,7 @@ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo )
} }
else else
{ {
LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.\r\n", LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.",
pxPublishInfo->topicNameLength, pxPublishInfo->topicNameLength,
pxPublishInfo->pTopicName ) ); pxPublishInfo->pTopicName ) );
} }
@ -901,7 +907,7 @@ static void prvKeepAliveTimerCallback( TimerHandle_t pxTimer )
else else
{ {
/* Send Ping Request to the broker. */ /* Send Ping Request to the broker. */
LogInfo( ( "Attempt to ping the MQTT broker.\r\n" ) ); LogInfo( ( "Attempt to ping the MQTT broker." ) );
xTransportStatus = pxTransport->send( pxTransport->pNetworkContext, xTransportStatus = pxTransport->send( pxTransport->pNetworkContext,
( void * ) xPingReqBuffer.pBuffer, ( void * ) xPingReqBuffer.pBuffer,
xPingReqBuffer.size ); xPingReqBuffer.size );
@ -942,7 +948,7 @@ static uint32_t prvGetTimeMs( void )
xTickCount = xTaskGetTickCount(); xTickCount = xTaskGetTickCount();
/* Convert the ticks to milliseconds. */ /* Convert the ticks to milliseconds. */
ulTimeMs = ( uint32_t ) xTickCount * _MILLISECONDS_PER_TICK; ulTimeMs = ( uint32_t ) xTickCount * MILLISECONDS_PER_TICK;
/* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the /* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the
* elapsed time in the application. */ * elapsed time in the application. */

View file

@ -153,8 +153,8 @@
*/ */
#define mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ( 200U ) #define mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ( 200U )
#define _MILLISECONDS_PER_SECOND ( 1000U ) /**< @brief Milliseconds per second. */ #define MILLISECONDS_PER_SECOND ( 1000U ) /**< @brief Milliseconds per second. */
#define _MILLISECONDS_PER_TICK ( _MILLISECONDS_PER_SECOND / configTICK_RATE_HZ ) /**< Milliseconds per FreeRTOS tick. */ #define MILLISECONDS_PER_TICK ( MILLISECONDS_PER_SECOND / configTICK_RATE_HZ ) /**< Milliseconds per FreeRTOS tick. */
/*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/
@ -788,7 +788,7 @@ static uint32_t prvGetTimeMs( void )
xTickCount = xTaskGetTickCount(); xTickCount = xTaskGetTickCount();
/* Convert the ticks to milliseconds. */ /* Convert the ticks to milliseconds. */
ulTimeMs = ( uint32_t ) xTickCount * _MILLISECONDS_PER_TICK; ulTimeMs = ( uint32_t ) xTickCount * MILLISECONDS_PER_TICK;
/* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the /* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the
* elapsed time in the application. */ * elapsed time in the application. */

View file

@ -26,11 +26,11 @@
/* /*
* Demo for showing use of the MQTT serializer API. * Demo for showing use of the MQTT serializer API.
* The MQTT serializer API lets user to serialize and * The MQTT serializer API allows the user to serialize and
* deserialize MQTT messages into a user provided buffer. * deserialize MQTT messages into a user provided buffer.
* This API allows use of a statically allocated buffer. * This API allows use of a statically allocated buffer.
* *
* The Example shown below uses this API to create MQTT messages and * The example shown below uses this API to create MQTT messages and
* send them over the connection established using FreeRTOS sockets. * send them over the connection established using FreeRTOS sockets.
* The example is single threaded and uses statically allocated memory; * The example is single threaded and uses statically allocated memory;
* it uses QOS0 and therefore does not implement any retransmission * it uses QOS0 and therefore does not implement any retransmission
@ -128,11 +128,11 @@
#define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ( pdMS_TO_TICKS( 5000U ) ) #define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ( pdMS_TO_TICKS( 5000U ) )
/** /**
* @brief Keep alive time reported to the broker while establishing an MQTT connection. * @brief Keep-alive time reported to the broker while establishing an MQTT connection.
* *
* It is the responsibility of the Client to ensure that the interval between * It is the responsibility of the client to ensure that the interval between
* Control Packets being sent does not exceed the this Keep Alive value. In the * control packets being sent does not exceed the this keep-alive value. In the
* absence of sending any other Control Packets, the Client MUST send a * absence of sending any other control packets, the client MUST send a
* PINGREQ Packet. * PINGREQ Packet.
*/ */
#define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 10U ) #define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 10U )
@ -414,34 +414,34 @@ static void prvMQTTDemoTask( void * pvParameters )
/* Sends an MQTT Connect packet over the already connected TCP socket /* Sends an MQTT Connect packet over the already connected TCP socket
* xMQTTSocket, and waits for connection acknowledgment (CONNACK) packet. */ * xMQTTSocket, and waits for connection acknowledgment (CONNACK) packet. */
LogInfo( ( "Creating an MQTT connection to %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); LogInfo( ( "Creating an MQTT connection to %s.", democonfigMQTT_BROKER_ENDPOINT ) );
prvCreateMQTTConnectionWithBroker( xMQTTSocket ); prvCreateMQTTConnectionWithBroker( xMQTTSocket );
/**************************** Subscribe. ******************************/ /**************************** Subscribe. ******************************/
/* If server rejected the subscription request, attempt to resubscribe to topic. /* If the server rejected the subscription request, attempt to resubscribe
* Attempts are made according to the exponential backoff retry strategy * to the topic. Attempts are made according to the exponential backoff
* implemented in retryUtils. */ * retry strategy declared in retry_utils.h. */
prvMQTTSubscribeWithBackoffRetries( xMQTTSocket ); prvMQTTSubscribeWithBackoffRetries( xMQTTSocket );
/**************************** Publish and Keep Alive Loop. ******************************/ /**************************** Publish and Keep-Alive Loop. ******************************/
/* Publish messages with QoS0, send and process Keep alive messages. */ /* Publish messages with QoS0, send and process keep-alive messages. */
for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ ) for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ )
{ {
LogInfo( ( "Publish to the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); LogInfo( ( "Publish to the MQTT topic %s.", mqttexampleTOPIC ) );
prvMQTTPublishToTopic( xMQTTSocket ); prvMQTTPublishToTopic( xMQTTSocket );
/* Process incoming publish echo, since application subscribed to the same /* Process incoming publish echo, since application subscribed to the same
* topic the broker will send publish message back to the application. */ * topic the broker will send publish message back to the application. */
LogInfo( ( "Attempt to receive publish message from broker.\r\n" ) ); LogInfo( ( "Attempt to receive publish message from broker." ) );
prvMQTTProcessIncomingPacket( xMQTTSocket ); prvMQTTProcessIncomingPacket( xMQTTSocket );
/* Leave Connection Idle for some time */ /* Leave Connection Idle for some time */
LogInfo( ( "Keeping Connection Idle.\r\n\r\n" ) ); LogInfo( ( "Keeping Connection Idle.\r\n" ) );
vTaskDelay( mqttexampleKEEP_ALIVE_DELAY ); vTaskDelay( mqttexampleKEEP_ALIVE_DELAY );
/* Send Ping request to broker and receive ping response */ /* Send Ping request to broker and receive ping response */
LogInfo( ( "Sending Ping Request to the broker.\r\n" ) ); LogInfo( ( "Sending Ping Request to the broker." ) );
prvMQTTKeepAlive( xMQTTSocket ); prvMQTTKeepAlive( xMQTTSocket );
/* Process Incoming packet from the broker */ /* Process Incoming packet from the broker */
@ -449,7 +449,7 @@ static void prvMQTTDemoTask( void * pvParameters )
} }
/************************ Unsubscribe from the topic. **************************/ /************************ Unsubscribe from the topic. **************************/
LogInfo( ( "Unsubscribe from the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); LogInfo( ( "Unsubscribe from the MQTT topic %s.", mqttexampleTOPIC ) );
prvMQTTUnsubscribeFromTopic( xMQTTSocket ); prvMQTTUnsubscribeFromTopic( xMQTTSocket );
/* Process Incoming packet from the broker. */ /* Process Incoming packet from the broker. */
@ -460,7 +460,7 @@ static void prvMQTTDemoTask( void * pvParameters )
/* Send an MQTT Disconnect packet over the already connected TCP socket. /* Send an MQTT Disconnect packet over the already connected TCP socket.
* There is no corresponding response for the disconnect packet. After sending * There is no corresponding response for the disconnect packet. After sending
* disconnect, client must close the network connection. */ * disconnect, client must close the network connection. */
LogInfo( ( "Disconnecting the MQTT connection with %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); LogInfo( ( "Disconnecting the MQTT connection with %s.", democonfigMQTT_BROKER_ENDPOINT ) );
prvMQTTDisconnect( xMQTTSocket ); prvMQTTDisconnect( xMQTTSocket );
/* Close the network connection. */ /* Close the network connection. */
@ -474,9 +474,9 @@ static void prvMQTTDemoTask( void * pvParameters )
/* Wait for some time between two iterations to ensure that we do not /* Wait for some time between two iterations to ensure that we do not
* bombard the public test mosquitto broker. */ * bombard the public test mosquitto broker. */
LogInfo( ( "prvMQTTDemoTask() completed an iteration successfully. Total free heap is %u.\r\n", xPortGetFreeHeapSize() ) ); LogInfo( ( "prvMQTTDemoTask() completed an iteration successfully. Total free heap is %u.", xPortGetFreeHeapSize() ) );
LogInfo( ( "Demo completed successfully.\r\n" ) ); LogInfo( ( "Demo completed successfully." ) );
LogInfo( ( "Short delay before starting the next iteration.... \r\n\r\n" ) ); LogInfo( ( "Short delay before starting the next iteration.... \r\n" ) );
vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ); vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS );
} }
} }
@ -582,17 +582,17 @@ static Socket_t prvCreateTCPConnectionToBroker( void )
} }
else else
{ {
LogInfo( ( "Located but could not connect to MQTT broker %s.\r\n\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); LogInfo( ( "Located but could not connect to MQTT broker %s.", democonfigMQTT_BROKER_ENDPOINT ) );
} }
} }
else else
{ {
LogInfo( ( "Could not locate MQTT broker %s.\r\n\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); LogInfo( ( "Could not locate MQTT broker %s.", democonfigMQTT_BROKER_ENDPOINT ) );
} }
} }
else else
{ {
LogInfo( ( "Could not create TCP socket.\r\n\r\n" ) ); LogInfo( ( "Could not create TCP socket." ) );
} }
/* If the socket was created but the connection was not successful then delete /* If the socket was created but the connection was not successful then delete
@ -684,8 +684,8 @@ static void prvCreateMQTTConnectionWithBroker( Socket_t xMQTTSocket )
xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER ); xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER );
/* Set MQTT keep-alive period. It is the responsibility of the application to ensure /* Set MQTT keep-alive period. It is the responsibility of the application to ensure
* that the interval between Control Packets being sent does not exceed the Keep Alive value. * that the interval between control Packets being sent does not exceed the keep-alive value.
* In the absence of sending any other Control Packets, the Client MUST send a PINGREQ Packet. */ * In the absence of sending any other control packets, the client MUST send a PINGREQ Packet. */
xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS; xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS;
/* Get size requirement for the connect packet. /* Get size requirement for the connect packet.
@ -742,12 +742,15 @@ static void prvCreateMQTTConnectionWithBroker( Socket_t xMQTTSocket )
xResult = MQTT_DeserializeAck( &xIncomingPacket, xResult = MQTT_DeserializeAck( &xIncomingPacket,
&usPacketId, &usPacketId,
&xSessionPresent ); &xSessionPresent );
configASSERT( xResult == MQTTSuccess );
/* Log this convenient demo information before asserting if the result is
* successful. */
if( xResult != MQTTSuccess ) if( xResult != MQTTSuccess )
{ {
LogInfo( ( "Connection with MQTT broker failed.\r\n" ) ); LogError( ( "Connection with MQTT broker failed." ) );
} }
configASSERT( xResult == MQTTSuccess );
} }
/*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/
@ -824,9 +827,9 @@ static void prvMQTTSubscribeWithBackoffRetries( Socket_t xMQTTSocket )
* subscribe packet then waiting for a subscribe acknowledgment (SUBACK). * subscribe packet then waiting for a subscribe acknowledgment (SUBACK).
* This client will then publish to the same topic it subscribed to, so it * This client will then publish to the same topic it subscribed to, so it
* will expect all the messages it sends to the broker to be sent back to it * will expect all the messages it sends to the broker to be sent back to it
* from the broker. This demo uses QOS0 in Subscribe, therefore, the Publish * from the broker. This demo uses QOS0 in Subscribe, therefore, the publish
* messages received from the broker will have QOS0. */ * messages received from the broker will have QOS0. */
LogInfo( ( "Attempt to subscribe to the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); LogInfo( ( "Attempt to subscribe to the MQTT topic %s.", mqttexampleTOPIC ) );
prvMQTTSubscribeToTopic( xMQTTSocket ); prvMQTTSubscribeToTopic( xMQTTSocket );
LogInfo( ( "SUBSCRIBE sent for topic %s to broker.\n\n", mqttexampleTOPIC ) ); LogInfo( ( "SUBSCRIBE sent for topic %s to broker.\n\n", mqttexampleTOPIC ) );
@ -1064,7 +1067,7 @@ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket,
{ {
if( xTopicFilterContext[ ulTopicCount ].xSubAckSuccess != false ) if( xTopicFilterContext[ ulTopicCount ].xSubAckSuccess != false )
{ {
LogInfo( ( "Subscribed to the topic %s with maximum QoS %u.\r\n", LogInfo( ( "Subscribed to the topic %s with maximum QoS %u.",
xTopicFilterContext[ ulTopicCount ].pcTopicFilter, xTopicFilterContext[ ulTopicCount ].pcTopicFilter,
xTopicFilterContext[ ulTopicCount ].xSubAckSuccess ) ); xTopicFilterContext[ ulTopicCount ].xSubAckSuccess ) );
} }
@ -1075,18 +1078,18 @@ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket,
break; break;
case MQTT_PACKET_TYPE_UNSUBACK: case MQTT_PACKET_TYPE_UNSUBACK:
LogInfo( ( "Unsubscribed from the topic %s.\r\n", mqttexampleTOPIC ) ); LogInfo( ( "Unsubscribed from the topic %s.", mqttexampleTOPIC ) );
/* Make sure ACK packet identifier matches with Request packet identifier. */ /* Make sure ACK packet identifier matches with Request packet identifier. */
configASSERT( usUnsubscribePacketIdentifier == usPacketId ); configASSERT( usUnsubscribePacketIdentifier == usPacketId );
break; break;
case MQTT_PACKET_TYPE_PINGRESP: case MQTT_PACKET_TYPE_PINGRESP:
LogInfo( ( "Ping Response successfully received.\r\n" ) ); LogInfo( ( "Ping Response successfully received." ) );
break; break;
/* Any other packet type is invalid. */ /* Any other packet type is invalid. */
default: default:
LogWarn( ( "prvMQTTProcessResponse() called with unknown packet type:(%02X).\r\n", LogWarn( ( "prvMQTTProcessResponse() called with unknown packet type:(%02X).",
pxIncomingPacket->type ) ); pxIncomingPacket->type ) );
} }
} }
@ -1106,8 +1109,8 @@ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo )
pxPublishInfo->pTopicName, pxPublishInfo->pTopicName,
pxPublishInfo->topicNameLength ) ) ) pxPublishInfo->topicNameLength ) ) )
{ {
LogInfo( ( "\r\nIncoming Publish Topic Name: %.*s matches subscribed topic.\r\n" LogInfo( ( "Incoming Publish Topic Name: %.*s matches subscribed topic.\r\n"
"Incoming Publish Message : %.*s\r\n", "Incoming Publish Message : %.*s",
pxPublishInfo->topicNameLength, pxPublishInfo->topicNameLength,
pxPublishInfo->pTopicName, pxPublishInfo->pTopicName,
pxPublishInfo->payloadLength, pxPublishInfo->payloadLength,
@ -1115,7 +1118,7 @@ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo )
} }
else else
{ {
LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.\r\n", LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.",
pxPublishInfo->topicNameLength, pxPublishInfo->topicNameLength,
pxPublishInfo->pTopicName ) ); pxPublishInfo->pTopicName ) );
} }