diff --git a/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/DemoTasks/FleetProvisioningDemoExample.c b/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/DemoTasks/FleetProvisioningDemoExample.c index fac2829f1..87fb6df0d 100644 --- a/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/DemoTasks/FleetProvisioningDemoExample.c +++ b/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/DemoTasks/FleetProvisioningDemoExample.c @@ -78,7 +78,7 @@ #include "fleet_provisioning.h" /* Demo includes. */ -#include "mqtt_operations.h" +#include "mqtt_pkcs11_demo_helpers.h" #include "pkcs11_operations.h" #include "tinycbor_serializer.h" #include "using_mbedtls_pkcs11.h" @@ -169,6 +169,19 @@ typedef enum ResponseRejected } ResponseStatus_t; + +/** + * @brief Each compilation unit that consumes the NetworkContext must define it. + * It should contain a single pointer to the type of your desired transport. + * When using multiple transports in the same compilation unit, define this pointer as void *. + * + * @note Transport stacks are defined in FreeRTOS-Plus/Source/Application-Protocols/network_transport. + */ +struct NetworkContext +{ + TlsTransportParams_t * pxParams; +}; + /*-----------------------------------------------------------*/ /** @@ -199,18 +212,33 @@ static uint8_t pucPayloadBuffer[ democonfigNETWORK_BUFFER_SIZE ]; */ static size_t xPayloadLength; -/*-----------------------------------------------------------*/ +/** + * @brief The MQTT context used for MQTT operation. + */ +static MQTTContext_t xMqttContext; /** - * @brief Each compilation unit that consumes the NetworkContext must define it. - * It should contain a single pointer to the type of your desired transport. - * When using multiple transports in the same compilation unit, define this pointer as void *. - * - * @note Transport stacks are defined in FreeRTOS-Plus/Source/Application-Protocols/network_transport. + * @brief The network context used for mbedTLS operation. */ -struct NetworkContext +static NetworkContext_t xNetworkContext; + +/** + * @brief The parameters for the network context using mbedTLS operation. + */ +static TlsTransportParams_t xTlsTransportParams; + +/** + * @brief Static buffer used to hold MQTT messages being sent and received. + */ +static uint8_t ucSharedBuffer[ democonfigNETWORK_BUFFER_SIZE ]; + +/** + * @brief Static buffer used to hold MQTT messages being sent and received. + */ +static MQTTFixedBuffer_t xBuffer = { - TlsTransportParams_t * pxParams; + ucSharedBuffer, + democonfigNETWORK_BUFFER_SIZE }; /*-----------------------------------------------------------*/ @@ -224,13 +252,9 @@ struct NetworkContext * @param[in] pPublishInfo Pointer to publish info of the incoming publish. * @param[in] usPacketIdentifier Packet identifier of the incoming publish. */ -static void prvProvisioningPublishCallback( MQTTPublishInfo_t * pPublishInfo, - uint16_t usPacketIdentifier ); - -/** - * @brief Run the MQTT process loop to get a response. - */ -static bool prvWaitForResponse( void ); +static void prvProvisioningPublishCallback( MQTTContext_t * pxMqttContext, + MQTTPacketInfo_t * pxPacketInfo, + MQTTDeserializedInfo_t * pxDeserializedInfo ); /** * @brief Subscribe to the CreateCertificateFromCsr accepted and rejected topics. @@ -269,94 +293,93 @@ static int prvFleetProvisioningTask( void * pvParameters ); /*-----------------------------------------------------------*/ -static void prvProvisioningPublishCallback( MQTTPublishInfo_t * pPublishInfo, - uint16_t usPacketIdentifier ) +static void prvProvisioningPublishCallback( MQTTContext_t * pxMqttContext, + MQTTPacketInfo_t * pxPacketInfo, + MQTTDeserializedInfo_t * pxDeserializedInfo ) { - FleetProvisioningStatus_t status; - FleetProvisioningTopic_t api; + FleetProvisioningStatus_t xStatus; + FleetProvisioningTopic_t xApi; + MQTTPublishInfo_t * pxPublishInfo; - /* Silence compiler warnings about unused variables. */ - ( void ) usPacketIdentifier; + configASSERT( pxMqttContext != NULL ); + configASSERT( pxPacketInfo != NULL ); + configASSERT( pxDeserializedInfo != NULL ); - status = FleetProvisioning_MatchTopic( pPublishInfo->pTopicName, - pPublishInfo->topicNameLength, &api ); + /* Suppress the unused parameter warning when asserts are disabled in + * build. */ + ( void ) pxMqttContext; - if( status != FleetProvisioningSuccess ) + /* Handle an incoming publish. The lower 4 bits of the publish packet + * type is used for the dup, QoS, and retain flags. Hence masking + * out the lower bits to check if the packet is publish. */ + if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH ) { - LogWarn( ( "Unexpected publish message received. Topic: %.*s.", - ( int ) pPublishInfo->topicNameLength, - ( const char * ) pPublishInfo->pTopicName ) ); - } - else - { - if( api == FleetProvCborCreateCertFromCsrAccepted ) + configASSERT( pxDeserializedInfo->pPublishInfo != NULL ); + pxPublishInfo = pxDeserializedInfo->pPublishInfo; + + xStatus = FleetProvisioning_MatchTopic(pxPublishInfo->pTopicName, + pxPublishInfo->topicNameLength, + &xApi); + + if (xStatus != FleetProvisioningSuccess) { - LogInfo( ( "Received accepted response from Fleet Provisioning CreateCertificateFromCsr API." ) ); - - xResponseStatus = ResponseAccepted; - - /* Copy the payload from the MQTT library's buffer to #pucPayloadBuffer. */ - ( void ) memcpy( ( void * ) pucPayloadBuffer, - ( const void * ) pPublishInfo->pPayload, - ( size_t ) pPublishInfo->payloadLength ); - - xPayloadLength = pPublishInfo->payloadLength; - } - else if( api == FleetProvCborCreateCertFromCsrRejected ) - { - LogError( ( "Received rejected response from Fleet Provisioning CreateCertificateFromCsr API." ) ); - - xResponseStatus = ResponseRejected; - } - else if( api == FleetProvCborRegisterThingAccepted ) - { - LogInfo( ( "Received accepted response from Fleet Provisioning RegisterThing API." ) ); - - xResponseStatus = ResponseAccepted; - - /* Copy the payload from the MQTT library's buffer to #pucPayloadBuffer. */ - ( void ) memcpy( ( void * ) pucPayloadBuffer, - ( const void * ) pPublishInfo->pPayload, - ( size_t ) pPublishInfo->payloadLength ); - - xPayloadLength = pPublishInfo->payloadLength; - } - else if( api == FleetProvCborRegisterThingRejected ) - { - LogError( ( "Received rejected response from Fleet Provisioning RegisterThing API." ) ); - - xResponseStatus = ResponseRejected; + LogWarn( ( "Unexpected publish message received. Topic: %.*s.", + ( int ) pxPublishInfo->topicNameLength, + ( const char * ) pxPublishInfo->pTopicName ) ); } else { - LogError( ( "Received message on unexpected Fleet Provisioning topic. Topic: %.*s.", - ( int ) pPublishInfo->topicNameLength, - ( const char * ) pPublishInfo->pTopicName ) ); + if (xApi == FleetProvCborCreateCertFromCsrAccepted) + { + LogInfo( ( "Received accepted response from Fleet Provisioning CreateCertificateFromCsr API." ) ); + + xResponseStatus = ResponseAccepted; + + /* Copy the payload from the MQTT library's buffer to #pucPayloadBuffer. */ + ( void ) memcpy( ( void * ) pucPayloadBuffer, + ( const void * ) pxPublishInfo->pPayload, + ( size_t ) pxPublishInfo->payloadLength ); + + xPayloadLength = pxPublishInfo->payloadLength; + } + else if (xApi == FleetProvCborCreateCertFromCsrRejected) + { + LogError( ( "Received rejected response from Fleet Provisioning CreateCertificateFromCsr API." ) ); + + xResponseStatus = ResponseRejected; + } + else if (xApi == FleetProvCborRegisterThingAccepted) + { + LogInfo( ( "Received accepted response from Fleet Provisioning RegisterThing API." ) ); + + xResponseStatus = ResponseAccepted; + + /* Copy the payload from the MQTT library's buffer to #pucPayloadBuffer. */ + ( void ) memcpy( ( void * ) pucPayloadBuffer, + ( const void * ) pxPublishInfo->pPayload, + ( size_t ) pxPublishInfo->payloadLength ); + + xPayloadLength = pxPublishInfo->payloadLength; + } + else if (xApi == FleetProvCborRegisterThingRejected) + { + LogError( ( "Received rejected response from Fleet Provisioning RegisterThing API." ) ); + + xResponseStatus = ResponseRejected; + } + else + { + LogError( ( "Received message on unexpected Fleet Provisioning topic. Topic: %.*s.", + ( int ) pxPublishInfo->topicNameLength, + ( const char * ) pxPublishInfo->pTopicName ) ); + } } } -} -/*-----------------------------------------------------------*/ - -static bool prvWaitForResponse( void ) -{ - bool xStatus = false; - - xResponseStatus = ResponseNotReceived; - - /* xResponseStatus is updated from the MQTT publish callback. */ - ( void ) xProcessLoop(); - - if( xResponseStatus == ResponseNotReceived ) + else { - LogError( ( "Timed out waiting for response." ) ); + vHandleOtherIncomingPacket( pxPacketInfo, pxDeserializedInfo->packetIdentifier ); + xResponseStatus = ResponseAccepted; } - - if( xResponseStatus == ResponseAccepted ) - { - xStatus = true; - } - - return xStatus; } /*-----------------------------------------------------------*/ @@ -364,7 +387,8 @@ static bool prvSubscribeToCsrResponseTopics( void ) { bool xStatus; - xStatus = xSubscribeToTopic( FP_CBOR_CREATE_CERT_ACCEPTED_TOPIC, + xStatus = xSubscribeToTopic( &xMqttContext, + FP_CBOR_CREATE_CERT_ACCEPTED_TOPIC, FP_CBOR_CREATE_CERT_ACCEPTED_LENGTH ); if( xStatus == false ) @@ -376,7 +400,8 @@ static bool prvSubscribeToCsrResponseTopics( void ) if( xStatus == true ) { - xStatus = xSubscribeToTopic( FP_CBOR_CREATE_CERT_REJECTED_TOPIC, + xStatus = xSubscribeToTopic( &xMqttContext, + FP_CBOR_CREATE_CERT_REJECTED_TOPIC, FP_CBOR_CREATE_CERT_REJECTED_LENGTH ); if( xStatus == false ) @@ -395,7 +420,8 @@ static bool prvUnsubscribeFromCsrResponseTopics( void ) { bool xStatus; - xStatus = xUnsubscribeFromTopic( FP_CBOR_CREATE_CERT_ACCEPTED_TOPIC, + xStatus = xUnsubscribeFromTopic( &xMqttContext, + FP_CBOR_CREATE_CERT_ACCEPTED_TOPIC, FP_CBOR_CREATE_CERT_ACCEPTED_LENGTH ); if( xStatus == false ) @@ -407,7 +433,8 @@ static bool prvUnsubscribeFromCsrResponseTopics( void ) if( xStatus == true ) { - xStatus = xUnsubscribeFromTopic( FP_CBOR_CREATE_CERT_REJECTED_TOPIC, + xStatus = xUnsubscribeFromTopic( &xMqttContext, + FP_CBOR_CREATE_CERT_REJECTED_TOPIC, FP_CBOR_CREATE_CERT_REJECTED_LENGTH ); if( xStatus == false ) @@ -426,7 +453,8 @@ static bool prvSubscribeToRegisterThingResponseTopics( void ) { bool xStatus; - xStatus = xSubscribeToTopic( FP_CBOR_REGISTER_ACCEPTED_TOPIC( democonfigPROVISIONING_TEMPLATE_NAME ), + xStatus = xSubscribeToTopic( &xMqttContext, + FP_CBOR_REGISTER_ACCEPTED_TOPIC( democonfigPROVISIONING_TEMPLATE_NAME ), FP_CBOR_REGISTER_ACCEPTED_LENGTH( fpdemoPROVISIONING_TEMPLATE_NAME_LENGTH ) ); if( xStatus == false ) @@ -438,7 +466,8 @@ static bool prvSubscribeToRegisterThingResponseTopics( void ) if( xStatus == true ) { - xStatus = xSubscribeToTopic( FP_CBOR_REGISTER_REJECTED_TOPIC( democonfigPROVISIONING_TEMPLATE_NAME ), + xStatus = xSubscribeToTopic( &xMqttContext, + FP_CBOR_REGISTER_REJECTED_TOPIC( democonfigPROVISIONING_TEMPLATE_NAME ), FP_CBOR_REGISTER_REJECTED_LENGTH( fpdemoPROVISIONING_TEMPLATE_NAME_LENGTH ) ); if( xStatus == false ) @@ -457,7 +486,8 @@ static bool prvUnsubscribeFromRegisterThingResponseTopics( void ) { bool xStatus; - xStatus = xUnsubscribeFromTopic( FP_CBOR_REGISTER_ACCEPTED_TOPIC( democonfigPROVISIONING_TEMPLATE_NAME ), + xStatus = xUnsubscribeFromTopic( &xMqttContext, + FP_CBOR_REGISTER_ACCEPTED_TOPIC( democonfigPROVISIONING_TEMPLATE_NAME ), FP_CBOR_REGISTER_ACCEPTED_LENGTH( fpdemoPROVISIONING_TEMPLATE_NAME_LENGTH ) ); if( xStatus == false ) @@ -469,7 +499,8 @@ static bool prvUnsubscribeFromRegisterThingResponseTopics( void ) if( xStatus == true ) { - xStatus = xUnsubscribeFromTopic( FP_CBOR_REGISTER_REJECTED_TOPIC( democonfigPROVISIONING_TEMPLATE_NAME ), + xStatus = xUnsubscribeFromTopic( &xMqttContext, + FP_CBOR_REGISTER_REJECTED_TOPIC( democonfigPROVISIONING_TEMPLATE_NAME ), FP_CBOR_REGISTER_REJECTED_LENGTH( fpdemoPROVISIONING_TEMPLATE_NAME_LENGTH ) ); if( xStatus == false ) @@ -530,10 +561,6 @@ int prvFleetProvisioningTask( void * pvParameters ) uint32_t ulDemoRunCount = 0U; CK_RV xPkcs11Ret = CKR_OK; - - NetworkContext_t xNetworkContext = { 0 }; - TlsTransportParams_t xTlsTransportParams = { 0 }; - /* Silence compiler warnings about unused variables. */ ( void ) pvParameters; @@ -584,7 +611,10 @@ int prvFleetProvisioningTask( void * pvParameters ) * connection fails, retries after a timeout. Timeout value will * exponentially increase until maximum attempts are reached. */ LogInfo( ( "Establishing MQTT session with claim certificate..." ) ); - xStatus = xEstablishMqttSession( prvProvisioningPublishCallback, + xStatus = xEstablishMqttSession( &xMqttContext, + &xNetworkContext, + &xBuffer, + prvProvisioningPublishCallback, pkcs11configLABEL_CLAIM_CERTIFICATE, pkcs11configLABEL_CLAIM_PRIVATE_KEY ); @@ -626,7 +656,8 @@ int prvFleetProvisioningTask( void * pvParameters ) if( xStatus == true ) { /* Publish the CSR to the CreateCertificatefromCsr API. */ - xPublishToTopic( FP_CBOR_CREATE_CERT_PUBLISH_TOPIC, + xPublishToTopic( &xMqttContext, + FP_CBOR_CREATE_CERT_PUBLISH_TOPIC, FP_CBOR_CREATE_CERT_PUBLISH_LENGTH, ( char * ) pucPayloadBuffer, xPayloadLength ); @@ -639,12 +670,6 @@ int prvFleetProvisioningTask( void * pvParameters ) } } - if( xStatus == true ) - { - /* Get the response to the CreateCertificatefromCsr request. */ - xStatus = prvWaitForResponse(); - } - if( xStatus == true ) { /* From the response, extract the certificate, certificate ID, and @@ -705,7 +730,8 @@ int prvFleetProvisioningTask( void * pvParameters ) if( xStatus == true ) { /* Publish the RegisterThing request. */ - xPublishToTopic( FP_CBOR_REGISTER_PUBLISH_TOPIC( democonfigPROVISIONING_TEMPLATE_NAME ), + xPublishToTopic( &xMqttContext, + FP_CBOR_REGISTER_PUBLISH_TOPIC( democonfigPROVISIONING_TEMPLATE_NAME ), FP_CBOR_REGISTER_PUBLISH_LENGTH( fpdemoPROVISIONING_TEMPLATE_NAME_LENGTH ), ( char * ) pucPayloadBuffer, xPayloadLength ); @@ -718,12 +744,6 @@ int prvFleetProvisioningTask( void * pvParameters ) } } - if( xStatus == true ) - { - /* Get the response to the RegisterThing request. */ - xStatus = prvWaitForResponse(); - } - if( xStatus == true ) { /* Extract the Thing name from the response. */ @@ -753,7 +773,7 @@ int prvFleetProvisioningTask( void * pvParameters ) * credentials. */ if( xConnectionEstablished == true ) { - xDisconnectMqttSession(); + xDisconnectMqttSession( &xMqttContext, &xNetworkContext ); xConnectionEstablished = false; } @@ -762,7 +782,10 @@ int prvFleetProvisioningTask( void * pvParameters ) if( xStatus == true ) { LogInfo( ( "Establishing MQTT session with provisioned certificate..." ) ); - xStatus = xEstablishMqttSession( prvProvisioningPublishCallback, + xStatus = xEstablishMqttSession( &xMqttContext, + &xNetworkContext, + &xBuffer, + prvProvisioningPublishCallback, pkcs11configLABEL_DEVICE_CERTIFICATE_FOR_TLS, pkcs11configLABEL_DEVICE_PRIVATE_KEY_FOR_TLS ); @@ -785,7 +808,7 @@ int prvFleetProvisioningTask( void * pvParameters ) if( xConnectionEstablished == true ) { /* Close the connection. */ - xDisconnectMqttSession(); + xDisconnectMqttSession( &xMqttContext, &xNetworkContext ); xConnectionEstablished = false; } diff --git a/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/WIN32.vcxproj b/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/WIN32.vcxproj index 57b19e2b2..b1a8e7903 100644 --- a/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/WIN32.vcxproj +++ b/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/WIN32.vcxproj @@ -560,10 +560,10 @@ + - @@ -695,6 +695,7 @@ + @@ -703,7 +704,6 @@ - diff --git a/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/WIN32.vcxproj.filters b/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/WIN32.vcxproj.filters index c91ee6ecd..101ed194f 100644 --- a/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/WIN32.vcxproj.filters +++ b/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/WIN32.vcxproj.filters @@ -470,7 +470,6 @@ FreeRTOS+\mbedtls\library - FreeRTOS+\tinycbor @@ -537,6 +536,7 @@ FreeRTOS+\FreeRTOS IoT Libraries\platform\mbedtls + @@ -904,7 +904,6 @@ Config - FreeRTOS+\tinycbor @@ -950,6 +949,7 @@ FreeRTOS+\FreeRTOS IoT Libraries\platform\transport\include + diff --git a/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/mqtt_operations.c b/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/mqtt_operations.c deleted file mode 100644 index 2860281b1..000000000 --- a/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/mqtt_operations.c +++ /dev/null @@ -1,1069 +0,0 @@ -/* - * FreeRTOS V202112.00 - * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - * - * https://www.FreeRTOS.org - * https://github.com/FreeRTOS - * - */ - -/** - * @file mqtt_operations.c - * - * @brief This file provides wrapper functions for MQTT operations on a mutually - * authenticated TLS connection. - * - * A mutually authenticated TLS connection is used to connect to the AWS IoT - * MQTT message broker in this example. Run the setup script - * (fleet_provisioning_demo_setup.py) and define democonfigROOT_CA_PEM - * in demo_config.h to achieve mutual authentication. - */ - -/* Standard includes. */ -#include -#include -#include -#include - -/* Config include. */ -#include "demo_config.h" - -/* Interface include. */ -#include "mqtt_operations.h" - -/* MbedTLS transport include. */ -#include "using_mbedtls_pkcs11.h" - -/*Include backoff algorithm header for retry logic.*/ -#include "backoff_algorithm.h" - -/** - * These configurations are required. Throw compilation error if the below - * configs are not defined. - */ -#ifndef democonfigMQTT_BROKER_ENDPOINT - #error "Please define AWS IoT MQTT broker endpoint(democonfigMQTT_BROKER_ENDPOINT) in demo_config.h." -#endif -#ifndef democonfigROOT_CA_PEM - #error "Please define the PEM-encoded Root CA certificate of the MQTT broker(democonfigROOT_CA_PEM) in demo_config.h." -#endif -#ifndef democonfigCLIENT_IDENTIFIER - #error "Please define a unique democonfigCLIENT_IDENTIFIER." -#endif - -/** - * Provide default values for undefined configuration settings. - */ -#ifndef democonfigMQTT_BROKER_PORT - #define democonfigMQTT_BROKER_PORT ( 8883 ) -#endif - -#ifndef democonfigNETWORK_BUFFER_SIZE - #define democonfigNETWORK_BUFFER_SIZE ( 1024U ) -#endif - -/** - * @brief Length of the AWS IoT endpoint. - */ -#define democonfigMQTT_BROKER_ENDPOINT_LENGTH ( ( uint16_t ) ( sizeof( democonfigMQTT_BROKER_ENDPOINT ) - 1 ) ) - -/** - * @brief Length of the client identifier. - */ -#define mqttopCLIENT_IDENTIFIER_LENGTH ( ( uint16_t ) ( sizeof( democonfigCLIENT_IDENTIFIER ) - 1 ) ) - -/** - * @brief ALPN protocol name for AWS IoT MQTT. - * - * This will be used if the democonfigMQTT_BROKER_PORT is configured as 443 for AWS IoT MQTT - * broker. Please see more details about the ALPN protocol for AWS IoT MQTT - * endpoint in the link below. - * https://aws.amazon.com/blogs/iot/mqtt-with-tls-client-authentication-on-port-443-why-it-is-useful-and-how-it-works/ - * - * @note OpenSSL requires that the protocol string passed to it for configuration be encoded - * with the prefix of 8-bit length information of the string. Thus, the 14 byte (0x0e) length - * information is prefixed to the string. - */ -#define mqttopALPN_PROTOCOL_NAME "\x0ex-amzn-mqtt-ca" - -/** - * @brief Length of ALPN protocol name. - */ -#define mqttopALPN_PROTOCOL_NAME_LENGTH ( ( uint16_t ) ( sizeof( mqttopALPN_PROTOCOL_NAME ) - 1 ) ) - -/** - * @brief The maximum number of retries for connecting to server. - */ -#define mqttopCONNECTION_RETRY_MAX_ATTEMPTS ( 5U ) - -/** - * @brief The maximum back-off delay (in milliseconds) for retrying connection to server. - */ -#define mqttopCONNECTION_RETRY_MAX_BACKOFF_DELAY_MS ( 5000U ) - -/** - * @brief The base back-off delay (in milliseconds) to use for connection retry attempts. - */ -#define mqttopCONNECTION_RETRY_BACKOFF_BASE_MS ( 500U ) - -/** - * @brief Timeout for receiving CONNACK packet in milliseconds. - */ -#define mqttopCONNACK_RECV_TIMEOUT_MS ( 1000U ) - -/** - * @brief Maximum number of outgoing publishes maintained in the application - * until an ack is received from the broker. - */ -#define mqttopMAX_OUTGOING_PUBLISHES ( 5U ) - -/** - * @brief Invalid packet identifier for the MQTT packets. Zero is always an - * invalid packet identifier as per MQTT 3.1.1 spec. - */ -#define mqttopMQTT_PACKET_ID_INVALID ( ( uint16_t ) 0U ) - -/** - * @brief Timeout for MQTT_ProcessLoop function in milliseconds. - */ -#define mqttopMQTT_PROCESS_LOOP_TIMEOUT_MS ( 1000U ) - -/** - * @brief The maximum time interval in seconds which is allowed to elapse - * between two Control Packets. - * - * It is the responsibility of the client to ensure that the interval between - * control packets being sent does not exceed the this keep-alive value. In the - * absence of sending any other control packets, the client MUST send a - * PINGREQ packet. - */ -#define mqttopMQTT_KEEP_ALIVE_INTERVAL_SECONDS ( 60U ) - -/** - * @brief Timeout in milliseconds for transport send and receive. - */ -#define mqttopTRANSPORT_SEND_RECV_TIMEOUT_MS ( 100U ) - -/** - * @brief Milliseconds per second. - */ -#define mqttopMILLISECONDS_PER_SECOND ( 1000U ) - -/** - * @brief Milliseconds per FreeRTOS tick. - */ -#define mqttopMILLISECONDS_PER_TICK ( mqttopMILLISECONDS_PER_SECOND / configTICK_RATE_HZ ) - -/** - * @brief The MQTT metrics string expected by AWS IoT MQTT Broker. - */ -#define mqttopMETRICS_STRING "?SDK=" democonfigOS_NAME "&Version=" democonfigOS_VERSION "&Platform=" democonfigHARDWARE_PLATFORM_NAME "&MQTTLib=" democonfigMQTT_LIB - -/** - * @brief The length of the MQTT metrics string. - */ -#define mqttopMETRICS_STRING_LENGTH ( ( uint16_t ) ( sizeof( mqttopMETRICS_STRING ) - 1 ) ) -/*-----------------------------------------------------------*/ - -/** - * @brief Structure to keep the MQTT publish packets until an ack is received - * for QoS1 publishes. - */ -typedef struct PublishPackets -{ - /** - * @brief Packet identifier of the publish packet. - */ - uint16_t usPacketId; - - /** - * @brief Publish info of the publish packet. - */ - MQTTPublishInfo_t xPubInfo; -} PublishPackets_t; - -/* Each compilation unit must define the NetworkContext struct. */ -struct NetworkContext -{ - SSLContext_t * pxParams; -}; -/*-----------------------------------------------------------*/ - -/** - * @brief Global entry time into the application to use as a reference timestamp - * in the #prvGetTimeMs function. #prvGetTimeMs will always return the difference - * between the current time and the global entry time. This will reduce the chances - * of overflow for the 32 bit unsigned integer used for holding the timestamp. - */ -static uint32_t ulGlobalEntryTimeMs; - -/** - * @brief Packet Identifier generated when Subscribe request was sent to the broker. - * - * It is used to match received Subscribe ACK to the transmitted subscribe - * request. - */ -static uint16_t usGlobalSubscribePacketIdentifier = 0U; - -/** - * @brief Packet Identifier generated when Unsubscribe request was sent to the broker. - * - * It is used to match received Unsubscribe ACK to the transmitted unsubscribe - * request. - */ -static uint16_t usGlobalUnsubscribePacketIdentifier = 0U; - -/** - * @brief Array to keep the outgoing publish messages. - * - * These stored outgoing publish messages are kept until a successful ack - * is received. - */ -static PublishPackets_t pxOutgoingPublishPackets[ mqttopMAX_OUTGOING_PUBLISHES ] = { 0 }; - -/** - * @brief The network buffer must remain valid for the lifetime of the MQTT context. - */ -static uint8_t pucBuffer[ democonfigNETWORK_BUFFER_SIZE ]; - -/** - * @brief The MQTT context used for MQTT operation. - */ -static MQTTContext_t xMqttContext = { 0 }; - -/** - * @brief The network context used for MbedTLS operation. - */ -static NetworkContext_t xNetworkContext = { 0 }; - -/** - * @brief The parameters for MbedTLS operation. - */ -static SSLContext_t xTlsContext = { 0 }; - -/** - * @brief The flag to indicate that the mqtt session is established. - */ -static bool xMqttSessionEstablished = false; - -/** - * @brief Callback registered when calling xEstablishMqttSession to get incoming - * publish messages. - */ -static MQTTPublishCallback_t xAppPublishCallback = NULL; -/*-----------------------------------------------------------*/ - -/** - * @brief The random number generator to use for exponential backoff with - * jitter retry logic. - * - * @return The generated random number. - */ -static uint32_t prvGenerateRandomNumber( void ); - -/** - * @brief Connect to the MQTT broker with reconnection retries. - * - * If connection fails, retry is attempted after a timeout. Timeout value - * exponentially increases until maximum timeout value is reached or the number - * of attempts are exhausted. - * - * @param[out] pxNetworkContext The created network context. - * @param[in] pcClientCertLabel The client certificate PKCS #11 label to use. - * @param[in] pcPrivateKeyLabel The private key PKCS #11 label for the client certificate. - * - * @return false on failure; true on successful connection. - */ -static bool prvConnectToBrokerWithBackoffRetries( NetworkContext_t * pxNetworkContext, - char * pcClientCertLabel, - char * pcPrivateKeyLabel ); - -/** - * @brief Get the free index in the #pxOutgoingPublishPackets array at which an - * outgoing publish can be stored. - * - * @param[out] pucIndex The index at which an outgoing publish can be stored. - * - * @return false if no more publishes can be stored; - * true if an index to store the next outgoing publish is obtained. - */ -static bool prvGetNextFreeIndexForOutgoingPublishes( uint8_t * pucIndex ); - -/** - * @brief Clean up the outgoing publish at given index from the - * #pxOutgoingPublishPackets array. - * - * @param[in] ucIndex The ucIndex at which a publish message has to be cleaned up. - */ -static void prvCleanupOutgoingPublishAt( uint8_t ucIndex ); - -/** - * @brief Clean up all the outgoing publishes in the #pxOutgoingPublishPackets array. - */ -static void prvCleanupOutgoingPublishes( void ); - -/** - * @brief Clean up the publish packet with the given packet id in the - * #pxOutgoingPublishPackets array. - * - * @param[in] usPacketId Packet id of the packet to be clean. - */ -static void prvCleanupOutgoingPublishWithPacketID( uint16_t usPacketId ); - -/** - * @brief Callback registered with the MQTT library. - * - * @param[in] pxMqttContext MQTT context pointer. - * @param[in] pxPacketInfo Packet Info pointer for the incoming packet. - * @param[in] pxDeserializedInfo Deserialized information from the incoming packet. - */ -static void prvMqttCallback( MQTTContext_t * pxMqttContext, - MQTTPacketInfo_t * pxPacketInfo, - MQTTDeserializedInfo_t * pxDeserializedInfo ); - -/** - * @brief Resend the publishes if a session is re-established with the broker. - * - * This function handles the resending of the QoS1 publish packets, which are - * maintained locally. - * - * @param[in] pxMqttContext The MQTT context pointer. - * - * @return true if all the unacknowledged QoS1 publishes are re-sent successfully; - * false otherwise. - */ -static bool prvHandlePublishResend( MQTTContext_t * pxMqttContext ); - -/** - * @brief The timer query function provided to the MQTT context. - * - * @return Time in milliseconds. - */ -static uint32_t prvGetTimeMs( void ); - -/*-----------------------------------------------------------*/ - -static uint32_t prvGenerateRandomNumber() -{ - return( ( uint32_t ) rand() ); -} - -/*-----------------------------------------------------------*/ - -static bool prvConnectToBrokerWithBackoffRetries( NetworkContext_t * pxNetworkContext, - char * pcClientCertLabel, - char * pcPrivateKeyLabel ) -{ - bool xReturnStatus = false; - BackoffAlgorithmStatus_t xBackoffAlgStatus = BackoffAlgorithmSuccess; - TlsTransportStatus_t xTlsStatus = TLS_TRANSPORT_SUCCESS; - BackoffAlgorithmContext_t xReconnectParams; - NetworkCredentials_t xTlsCredentials = { 0 }; - uint16_t usNextRetryBackOff = 0U; - const char * pcAlpn[] = { mqttopALPN_PROTOCOL_NAME, NULL }; - - /* Set the pParams member of the network context with desired transport. */ - pxNetworkContext->pxParams = &xTlsContext; - - /* Initialize credentials for establishing TLS session. */ - xTlsCredentials.pRootCa = democonfigROOT_CA_PEM; - xTlsCredentials.rootCaSize = sizeof( democonfigROOT_CA_PEM ); - xTlsCredentials.pClientCertLabel = pcClientCertLabel; - xTlsCredentials.pPrivateKeyLabel = pcPrivateKeyLabel; - - /* AWS IoT requires devices to send the Server Name Indication (SNI) - * extension to the Transport Layer Security (TLS) protocol and provide - * the complete endpoint address in the host_name field. Details about - * SNI for AWS IoT can be found in the link below. - * https://docs.aws.amazon.com/iot/latest/developerguide/transport-security.html - */ - xTlsCredentials.disableSni = false; - - if( democonfigMQTT_BROKER_PORT == 443 ) - { - /* Pass the ALPN protocol name depending on the port being used. - * Please see more details about the ALPN protocol for AWS IoT MQTT endpoint - * in the link below. - * https://aws.amazon.com/blogs/iot/mqtt-with-tls-client-authentication-on-port-443-why-it-is-useful-and-how-it-works/ - */ - xTlsCredentials.pAlpnProtos = pcAlpn; - } - - /* Initialize reconnect attempts and interval */ - BackoffAlgorithm_InitializeParams( &xReconnectParams, - mqttopCONNECTION_RETRY_BACKOFF_BASE_MS, - mqttopCONNECTION_RETRY_MAX_BACKOFF_DELAY_MS, - mqttopCONNECTION_RETRY_MAX_ATTEMPTS ); - - do - { - /* Establish a TLS session with the MQTT broker. This example connects - * to the MQTT broker as specified in democonfigMQTT_BROKER_ENDPOINT and democonfigMQTT_BROKER_PORT - * at the demo config header. */ - LogDebug( ( "Establishing a TLS session to %.*s:%d.", - democonfigMQTT_BROKER_ENDPOINT_LENGTH, - democonfigMQTT_BROKER_ENDPOINT, - democonfigMQTT_BROKER_PORT ) ); - - xTlsStatus = TLS_FreeRTOS_Connect( pxNetworkContext, - democonfigMQTT_BROKER_ENDPOINT, - democonfigMQTT_BROKER_PORT, - &xTlsCredentials, - mqttopTRANSPORT_SEND_RECV_TIMEOUT_MS, mqttopTRANSPORT_SEND_RECV_TIMEOUT_MS ); - - if( xTlsStatus == TLS_TRANSPORT_SUCCESS ) - { - /* Connection successful. */ - xReturnStatus = true; - } - else - { - /* Generate a random number and get back-off value (in milliseconds) for the next connection retry. */ - xBackoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &xReconnectParams, prvGenerateRandomNumber(), &usNextRetryBackOff ); - - if( xBackoffAlgStatus == BackoffAlgorithmRetriesExhausted ) - { - LogError( ( "Connection to the broker failed, all attempts exhausted." ) ); - } - else if( xBackoffAlgStatus == BackoffAlgorithmSuccess ) - { - LogWarn( ( "Connection to the broker failed. Retrying connection " - "after %hu ms backoff.", - ( unsigned short ) usNextRetryBackOff ) ); - vTaskDelay( pdMS_TO_TICKS( usNextRetryBackOff ) ); - } - } - } while( ( xTlsStatus != TLS_TRANSPORT_SUCCESS ) && ( xBackoffAlgStatus == BackoffAlgorithmSuccess ) ); - - return xReturnStatus; -} -/*-----------------------------------------------------------*/ - -static bool prvGetNextFreeIndexForOutgoingPublishes( uint8_t * pucIndex ) -{ - bool xReturnStatus = false; - uint8_t ucIndex = 0; - - configASSERT( pxOutgoingPublishPackets != NULL ); - configASSERT( pucIndex != NULL ); - - for( ucIndex = 0; ucIndex < mqttopMAX_OUTGOING_PUBLISHES; ucIndex++ ) - { - /* A free index is marked by invalid packet id. Check if the the index - * has a free slot. */ - if( pxOutgoingPublishPackets[ ucIndex ].usPacketId == mqttopMQTT_PACKET_ID_INVALID ) - { - xReturnStatus = true; - break; - } - } - - /* Copy the available index into the output param. */ - if( xReturnStatus == true ) - { - *pucIndex = ucIndex; - } - - return xReturnStatus; -} -/*-----------------------------------------------------------*/ - -static void prvCleanupOutgoingPublishAt( uint8_t ucIndex ) -{ - configASSERT( pxOutgoingPublishPackets != NULL ); - configASSERT( ucIndex < mqttopMAX_OUTGOING_PUBLISHES ); - - /* Clear the outgoing publish packet. */ - ( void ) memset( &( pxOutgoingPublishPackets[ ucIndex ] ), - 0x00, - sizeof( pxOutgoingPublishPackets[ ucIndex ] ) ); -} -/*-----------------------------------------------------------*/ - -static void prvCleanupOutgoingPublishes( void ) -{ - configASSERT( pxOutgoingPublishPackets != NULL ); - - /* Clean up all the outgoing publish packets. */ - ( void ) memset( pxOutgoingPublishPackets, 0x00, sizeof( pxOutgoingPublishPackets ) ); -} -/*-----------------------------------------------------------*/ - -static void prvCleanupOutgoingPublishWithPacketID( uint16_t usPacketId ) -{ - uint8_t ucIndex = 0; - - configASSERT( pxOutgoingPublishPackets != NULL ); - configASSERT( usPacketId != mqttopMQTT_PACKET_ID_INVALID ); - - /* Clean up the saved outgoing publish with packet Id equal to usPacketId. */ - for( ucIndex = 0; ucIndex < mqttopMAX_OUTGOING_PUBLISHES; ucIndex++ ) - { - if( pxOutgoingPublishPackets[ ucIndex ].usPacketId == usPacketId ) - { - prvCleanupOutgoingPublishAt( ucIndex ); - - LogDebug( ( "Cleaned up outgoing publish packet with packet id %u.", - usPacketId ) ); - - break; - } - } -} -/*-----------------------------------------------------------*/ - -static void prvMqttCallback( MQTTContext_t * pxMqttContext, - MQTTPacketInfo_t * pxPacketInfo, - MQTTDeserializedInfo_t * pxDeserializedInfo ) -{ - uint16_t usPacketIdentifier; - - configASSERT( pxMqttContext != NULL ); - configASSERT( pxPacketInfo != NULL ); - configASSERT( pxDeserializedInfo != NULL ); - - /* Suppress the unused parameter warning when asserts are disabled in - * build. */ - ( void ) pxMqttContext; - - usPacketIdentifier = pxDeserializedInfo->packetIdentifier; - - /* Handle an incoming publish. The lower 4 bits of the publish packet - * type is used for the dup, QoS, and retain flags. Hence masking - * out the lower bits to check if the packet is publish. */ - if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH ) - { - configASSERT( pxDeserializedInfo->pPublishInfo != NULL ); - - /* Invoke the application callback for incoming publishes. */ - if( xAppPublishCallback != NULL ) - { - xAppPublishCallback( pxDeserializedInfo->pPublishInfo, usPacketIdentifier ); - } - } - else - { - /* Handle other packets. */ - switch( pxPacketInfo->type ) - { - case MQTT_PACKET_TYPE_SUBACK: - LogDebug( ( "MQTT Packet type SUBACK received." ) ); - - /* Make sure the ACK packet identifier matches with the request - * packet identifier. */ - configASSERT( usGlobalSubscribePacketIdentifier == usPacketIdentifier ); - break; - - case MQTT_PACKET_TYPE_UNSUBACK: - LogDebug( ( "MQTT Packet type UNSUBACK received." ) ); - - /* Make sure the ACK packet identifier matches with the request - * packet identifier. */ - configASSERT( usGlobalUnsubscribePacketIdentifier == usPacketIdentifier ); - break; - - case MQTT_PACKET_TYPE_PINGRESP: - - /* We do not expect to receive PINGRESP as we are using - * MQTT_ProcessLoop. */ - LogWarn( ( "PINGRESP should not be received by the application " - "callback when using MQTT_ProcessLoop." ) ); - break; - - case MQTT_PACKET_TYPE_PUBACK: - LogDebug( ( "PUBACK received for packet id %u.", - usPacketIdentifier ) ); - - /* Cleanup the publish packet from the #pxOutgoingPublishPackets - * array when a PUBACK is received. */ - prvCleanupOutgoingPublishWithPacketID( usPacketIdentifier ); - break; - - /* Any other packet type is invalid. */ - default: - LogError( ( "Unknown packet type received:(%02x).", - pxPacketInfo->type ) ); - } - } -} -/*-----------------------------------------------------------*/ - -static bool prvHandlePublishResend( MQTTContext_t * pxMqttContext ) -{ - bool xReturnStatus = false; - MQTTStatus_t xMqttStatus = MQTTSuccess; - uint8_t ucIndex = 0U; - - configASSERT( pxOutgoingPublishPackets != NULL ); - - /* Resend all the QoS1 publishes still in the #pxOutgoingPublishPackets array. - * These are the publishes that haven't received a PUBACK yet. When a PUBACK - * is received, the corresponding publish is removed from the array. */ - for( ucIndex = 0U; ucIndex < mqttopMAX_OUTGOING_PUBLISHES; ucIndex++ ) - { - if( pxOutgoingPublishPackets[ ucIndex ].usPacketId != mqttopMQTT_PACKET_ID_INVALID ) - { - pxOutgoingPublishPackets[ ucIndex ].xPubInfo.dup = true; - - LogDebug( ( "Sending duplicate PUBLISH with packet id %u.", - pxOutgoingPublishPackets[ ucIndex ].usPacketId ) ); - xMqttStatus = MQTT_Publish( pxMqttContext, - &pxOutgoingPublishPackets[ ucIndex ].xPubInfo, - pxOutgoingPublishPackets[ ucIndex ].usPacketId ); - - if( xMqttStatus != MQTTSuccess ) - { - LogError( ( "Sending duplicate PUBLISH for packet id %u " - " failed with status %s.", - pxOutgoingPublishPackets[ ucIndex ].usPacketId, - MQTT_Status_strerror( xMqttStatus ) ) ); - break; - } - else - { - LogDebug( ( "Sent duplicate PUBLISH successfully for packet id %u.", - pxOutgoingPublishPackets[ ucIndex ].usPacketId ) ); - } - } - } - - /* Were all the unacknowledged QoS1 publishes successfully re-sent? */ - if( ucIndex == mqttopMAX_OUTGOING_PUBLISHES ) - { - xReturnStatus = true; - } - - return xReturnStatus; -} -/*-----------------------------------------------------------*/ - -bool xEstablishMqttSession( MQTTPublishCallback_t xPublishCallback, - char * pcClientCertLabel, - char * pcPrivateKeyLabel ) -{ - bool xReturnStatus = false; - MQTTStatus_t xMqttStatus; - MQTTConnectInfo_t xConnectInfo; - MQTTFixedBuffer_t xNetworkBuffer; - TransportInterface_t xTransport; - bool xCreateCleanSession = false; - MQTTContext_t * pxMqttContext = &xMqttContext; - NetworkContext_t * pxNetworkContext = &xNetworkContext; - bool xSessionPresent = false; - - configASSERT( pxMqttContext != NULL ); - configASSERT( pxNetworkContext != NULL ); - - /* Initialize the mqtt context and network context. */ - ( void ) memset( pxMqttContext, 0U, sizeof( MQTTContext_t ) ); - ( void ) memset( pxNetworkContext, 0U, sizeof( NetworkContext_t ) ); - - xReturnStatus = prvConnectToBrokerWithBackoffRetries( pxNetworkContext, - pcClientCertLabel, - pcPrivateKeyLabel ); - - if( xReturnStatus != true ) - { - /* Log an error to indicate connection failure after all - * reconnect attempts are over. */ - LogError( ( "Failed to connect to MQTT broker %.*s.", - democonfigMQTT_BROKER_ENDPOINT_LENGTH, - democonfigMQTT_BROKER_ENDPOINT ) ); - } - else - { - /* Fill in TransportInterface send and receive function pointers. - * For this demo, TCP sockets are used to send and receive data - * from the network. pxNetworkContext is an SSL context for OpenSSL.*/ - xTransport.pNetworkContext = pxNetworkContext; - xTransport.send = TLS_FreeRTOS_send; - xTransport.recv = TLS_FreeRTOS_recv; - - /* Fill the values for network buffer. */ - xNetworkBuffer.pBuffer = pucBuffer; - xNetworkBuffer.size = democonfigNETWORK_BUFFER_SIZE; - - /* Remember the publish callback supplied. */ - xAppPublishCallback = xPublishCallback; - - /* Initialize the MQTT library. */ - xMqttStatus = MQTT_Init( pxMqttContext, - &xTransport, - prvGetTimeMs, - prvMqttCallback, - &xNetworkBuffer ); - - if( xMqttStatus != MQTTSuccess ) - { - xReturnStatus = false; - LogError( ( "MQTT init failed with status %s.", - MQTT_Status_strerror( xMqttStatus ) ) ); - } - else - { - /* Establish an MQTT session by sending a CONNECT packet. */ - - /* If #xCreateCleanSession is true, start with a clean session - * i.e. direct the MQTT broker to discard any previous session data. - * If #xCreateCleanSession is false, direct the broker to attempt to - * reestablish a session which was already present. */ - xConnectInfo.cleanSession = xCreateCleanSession; - - /* The client identifier is used to uniquely identify this MQTT client to - * the MQTT broker. In a production device the identifier can be something - * unique, such as a device serial number. */ - xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER; - xConnectInfo.clientIdentifierLength = mqttopCLIENT_IDENTIFIER_LENGTH; - - /* The maximum time interval in seconds which is allowed to elapse - * between two Control Packets. - * It is the responsibility of the client to ensure that the interval between - * control packets being sent does not exceed the this keep-alive value. In the - * absence of sending any other control packets, the client MUST send a - * PINGREQ packet. */ - xConnectInfo.keepAliveSeconds = mqttopMQTT_KEEP_ALIVE_INTERVAL_SECONDS; - - /* Username and password for authentication. Not used in this demo. */ - xConnectInfo.pUserName = mqttopMETRICS_STRING; - xConnectInfo.userNameLength = mqttopMETRICS_STRING_LENGTH; - xConnectInfo.pPassword = NULL; - xConnectInfo.passwordLength = 0U; - - /* Send an MQTT CONNECT packet to the broker. */ - xMqttStatus = MQTT_Connect( pxMqttContext, - &xConnectInfo, - NULL, - mqttopCONNACK_RECV_TIMEOUT_MS, - &xSessionPresent ); - - if( xMqttStatus != MQTTSuccess ) - { - xReturnStatus = false; - LogError( ( "Connection with MQTT broker failed with status %s.", - MQTT_Status_strerror( xMqttStatus ) ) ); - } - else - { - LogDebug( ( "MQTT connection successfully established with broker." ) ); - } - } - - if( xReturnStatus == true ) - { - /* Keep a flag for indicating if MQTT session is established. This - * flag will mark that an MQTT DISCONNECT has to be sent at the end - * of the demo even if there are intermediate failures. */ - xMqttSessionEstablished = true; - } - - if( xReturnStatus == true ) - { - /* Check if a session is present and if there are any outgoing - * publishes that need to be resent. Resending unacknowledged - * publishes is needed only if the broker is re-establishing a - * session that was already present. */ - if( xSessionPresent == true ) - { - LogDebug( ( "An MQTT session with broker is re-established. " - "Resending unacked publishes." ) ); - - /* Handle all the resend of publish messages. */ - xReturnStatus = prvHandlePublishResend( &xMqttContext ); - } - else - { - LogDebug( ( "A clean MQTT connection is established." - " Cleaning up all the stored outgoing publishes." ) ); - - /* Clean up the outgoing publishes waiting for ack as this new - * connection doesn't re-establish an existing session. */ - prvCleanupOutgoingPublishes(); - } - } - } - - return xReturnStatus; -} -/*-----------------------------------------------------------*/ - -bool xDisconnectMqttSession( void ) -{ - MQTTStatus_t xMqttStatus = MQTTSuccess; - bool xReturnStatus = false; - MQTTContext_t * pxMqttContext = &xMqttContext; - NetworkContext_t * pxNetworkContext = &xNetworkContext; - - configASSERT( pxMqttContext != NULL ); - configASSERT( pxNetworkContext != NULL ); - - if( xMqttSessionEstablished == true ) - { - /* Send DISCONNECT. */ - xMqttStatus = MQTT_Disconnect( pxMqttContext ); - - if( xMqttStatus != MQTTSuccess ) - { - LogError( ( "Sending MQTT DISCONNECT failed with status=%u.", - xMqttStatus ) ); - } - else - { - /* MQTT DISCONNECT sent successfully. */ - xReturnStatus = true; - } - } - - /* End TLS session, then close TCP connection. */ - ( void ) TLS_FreeRTOS_Disconnect( pxNetworkContext ); - - return xReturnStatus; -} -/*-----------------------------------------------------------*/ - -bool xSubscribeToTopic( const char * pcTopicFilter, - uint16_t usTopicFilterLength ) -{ - bool xReturnStatus = false; - MQTTStatus_t xMqttStatus; - MQTTContext_t * pxMqttContext = &xMqttContext; - MQTTSubscribeInfo_t pxSubscriptionList[ 1 ]; - - configASSERT( pxMqttContext != NULL ); - configASSERT( pcTopicFilter != NULL ); - configASSERT( usTopicFilterLength > 0 ); - - /* Start with everything at 0. */ - ( void ) memset( ( void * ) pxSubscriptionList, 0x00, sizeof( pxSubscriptionList ) ); - - /* This example subscribes to only one topic and uses QOS1. */ - pxSubscriptionList[ 0 ].qos = MQTTQoS1; - pxSubscriptionList[ 0 ].pTopicFilter = pcTopicFilter; - pxSubscriptionList[ 0 ].topicFilterLength = usTopicFilterLength; - - /* Generate packet identifier for the SUBSCRIBE packet. */ - usGlobalSubscribePacketIdentifier = MQTT_GetPacketId( pxMqttContext ); - - /* Send SUBSCRIBE packet. */ - xMqttStatus = MQTT_Subscribe( pxMqttContext, - pxSubscriptionList, - sizeof( pxSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ), - usGlobalSubscribePacketIdentifier ); - - if( xMqttStatus != MQTTSuccess ) - { - LogError( ( "Failed to send SUBSCRIBE packet to broker with error = %s.", - MQTT_Status_strerror( xMqttStatus ) ) ); - } - else - { - LogDebug( ( "SUBSCRIBE topic %.*s to broker.", - usTopicFilterLength, - pcTopicFilter ) ); - - /* Process incoming packet from the broker. Acknowledgment for subscription - * ( SUBACK ) will be received here. However after sending the subscribe, the - * client may receive a publish before it receives a subscribe ack. Since this - * demo is subscribing to the topic to which no one is publishing, probability - * of receiving publish message before subscribe ack is zero; but application - * must be ready to receive any packet. This demo uses MQTT_ProcessLoop to - * receive packet from network. */ - xMqttStatus = MQTT_ProcessLoop( pxMqttContext, mqttopMQTT_PROCESS_LOOP_TIMEOUT_MS ); - - if( xMqttStatus != MQTTSuccess ) - { - LogError( ( "MQTT_ProcessLoop returned with status = %s.", - MQTT_Status_strerror( xMqttStatus ) ) ); - } - else - { - xReturnStatus = true; - } - } - - return xReturnStatus; -} -/*-----------------------------------------------------------*/ - -bool xUnsubscribeFromTopic( const char * pcTopicFilter, - uint16_t usTopicFilterLength ) -{ - bool xReturnStatus = false; - MQTTStatus_t xMqttStatus; - MQTTContext_t * pxMqttContext = &xMqttContext; - MQTTSubscribeInfo_t pxSubscriptionList[ 1 ]; - - configASSERT( pxMqttContext != NULL ); - configASSERT( pcTopicFilter != NULL ); - configASSERT( usTopicFilterLength > 0 ); - - /* Start with everything at 0. */ - ( void ) memset( ( void * ) pxSubscriptionList, 0x00, sizeof( pxSubscriptionList ) ); - - /* This example subscribes to only one topic and uses QOS1. */ - pxSubscriptionList[ 0 ].qos = MQTTQoS1; - pxSubscriptionList[ 0 ].pTopicFilter = pcTopicFilter; - pxSubscriptionList[ 0 ].topicFilterLength = usTopicFilterLength; - - /* Generate packet identifier for the UNSUBSCRIBE packet. */ - usGlobalUnsubscribePacketIdentifier = MQTT_GetPacketId( pxMqttContext ); - - /* Send UNSUBSCRIBE packet. */ - xMqttStatus = MQTT_Unsubscribe( pxMqttContext, - pxSubscriptionList, - sizeof( pxSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ), - usGlobalUnsubscribePacketIdentifier ); - - if( xMqttStatus != MQTTSuccess ) - { - LogError( ( "Failed to send UNSUBSCRIBE packet to broker with error = %s.", - MQTT_Status_strerror( xMqttStatus ) ) ); - } - else - { - LogDebug( ( "UNSUBSCRIBE sent topic %.*s to broker.", - usTopicFilterLength, - pcTopicFilter ) ); - - /* Process incoming packet from the broker. Acknowledgment for unsubscribe - * operation ( UNSUBACK ) will be received here. This demo uses - * MQTT_ProcessLoop to receive packet from network. */ - xMqttStatus = MQTT_ProcessLoop( pxMqttContext, mqttopMQTT_PROCESS_LOOP_TIMEOUT_MS ); - - if( xMqttStatus != MQTTSuccess ) - { - LogError( ( "MQTT_ProcessLoop returned with status = %s.", - MQTT_Status_strerror( xMqttStatus ) ) ); - } - else - { - xReturnStatus = true; - } - } - - return xReturnStatus; -} -/*-----------------------------------------------------------*/ - -bool xPublishToTopic( const char * pcTopicFilter, - uint16_t usTopicFilterLength, - const char * pcPayload, - size_t xPayloadLength ) -{ - bool xReturnStatus = false; - MQTTStatus_t xMqttStatus = MQTTSuccess; - uint8_t ucPublishIndex = mqttopMAX_OUTGOING_PUBLISHES; - MQTTContext_t * pxMqttContext = &xMqttContext; - - configASSERT( pxMqttContext != NULL ); - configASSERT( pcTopicFilter != NULL ); - configASSERT( usTopicFilterLength > 0 ); - - /* Get the next free index for the outgoing publish. All QoS1 outgoing - * publishes are stored until a PUBACK is received. These messages are - * stored for supporting a resend if a network connection is broken before - * receiving a PUBACK. */ - xReturnStatus = prvGetNextFreeIndexForOutgoingPublishes( &ucPublishIndex ); - - if( xReturnStatus == false ) - { - LogError( ( "Unable to find a free spot for outgoing PUBLISH message." ) ); - } - else - { - LogDebug( ( "Published payload: %.*s", - ( int ) xPayloadLength, - ( const char * ) pcPayload ) ); - - /* This example publishes to only one topic and uses QOS1. */ - pxOutgoingPublishPackets[ ucPublishIndex ].xPubInfo.qos = MQTTQoS1; - pxOutgoingPublishPackets[ ucPublishIndex ].xPubInfo.pTopicName = pcTopicFilter; - pxOutgoingPublishPackets[ ucPublishIndex ].xPubInfo.topicNameLength = usTopicFilterLength; - pxOutgoingPublishPackets[ ucPublishIndex ].xPubInfo.pPayload = pcPayload; - pxOutgoingPublishPackets[ ucPublishIndex ].xPubInfo.payloadLength = xPayloadLength; - - /* Get a new packet id. */ - pxOutgoingPublishPackets[ ucPublishIndex ].usPacketId = MQTT_GetPacketId( pxMqttContext ); - - /* Send PUBLISH packet. */ - xMqttStatus = MQTT_Publish( pxMqttContext, - &pxOutgoingPublishPackets[ ucPublishIndex ].xPubInfo, - pxOutgoingPublishPackets[ ucPublishIndex ].usPacketId ); - - if( xMqttStatus != MQTTSuccess ) - { - LogError( ( "Failed to send PUBLISH packet to broker with error = %s.", - MQTT_Status_strerror( xMqttStatus ) ) ); - prvCleanupOutgoingPublishAt( ucPublishIndex ); - xReturnStatus = false; - } - else - { - LogDebug( ( "PUBLISH sent for topic %.*s to broker with packet ID %u.", - usTopicFilterLength, - pcTopicFilter, - pxOutgoingPublishPackets[ ucPublishIndex ].usPacketId ) ); - } - } - - return xReturnStatus; -} -/*-----------------------------------------------------------*/ - -bool xProcessLoop( void ) -{ - bool xReturnStatus = false; - MQTTStatus_t xMqttStatus = MQTTSuccess; - - xMqttStatus = MQTT_ProcessLoop( &xMqttContext, mqttopMQTT_PROCESS_LOOP_TIMEOUT_MS ); - - if( xMqttStatus != MQTTSuccess ) - { - LogError( ( "MQTT_ProcessLoop returned with status = %s.", - MQTT_Status_strerror( xMqttStatus ) ) ); - } - else - { - LogDebug( ( "MQTT_ProcessLoop successful." ) ); - xReturnStatus = true; - } - - return xReturnStatus; -} -/*-----------------------------------------------------------*/ - -static uint32_t prvGetTimeMs( void ) -{ - TickType_t xTickCount = 0; - uint32_t ulTimeMs = 0UL; - - /* Get the current tick count. */ - xTickCount = xTaskGetTickCount(); - - /* Convert the ticks to milliseconds. */ - ulTimeMs = ( uint32_t ) xTickCount * mqttopMILLISECONDS_PER_TICK; - - /* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the - * elapsed time in the application. */ - ulTimeMs = ( uint32_t ) ( ulTimeMs - ulGlobalEntryTimeMs ); - - return ulTimeMs; -} - -/*-----------------------------------------------------------*/ diff --git a/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/mqtt_operations.h b/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/mqtt_operations.h deleted file mode 100644 index 8be89c25e..000000000 --- a/FreeRTOS-Plus/Demo/AWS/Fleet_Provisioning_Windows_Simulator/Fleet_Provisioning_With_CSR_Demo/mqtt_operations.h +++ /dev/null @@ -1,116 +0,0 @@ -/* - * FreeRTOS V202112.00 - * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - * - * https://www.FreeRTOS.org - * https://github.com/FreeRTOS - * - */ - -#ifndef MQTT_OPERATIONS_H_ -#define MQTT_OPERATIONS_H_ - -/* MQTT API header. */ -#include "core_mqtt.h" - -/* corePKCS11 include. */ -#include "core_pkcs11.h" - -/** - * @brief Application callback type to handle the incoming publishes. - * - * @param[in] pxPublishInfo Pointer to publish info of the incoming publish. - * @param[in] usPacketIdentifier Packet identifier of the incoming publish. - */ -typedef void (* MQTTPublishCallback_t )( MQTTPublishInfo_t * pxPublishInfo, - uint16_t usPacketIdentifier ); - -/** - * @brief Establish a MQTT connection. - * - * @param[in] xPublishCallback The callback function to receive incoming - * publishes from the MQTT broker. - * @param[in] pcClientCertLabel The client certificate PKCS #11 label to use. - * @param[in] pcPrivateKeyLabel The private key PKCS #11 label for the client certificate. - * - * @return true if an MQTT session is established; - * false otherwise. - */ -bool xEstablishMqttSession( MQTTPublishCallback_t xPublishCallback, - char * pcClientCertLabel, - char * pcPrivateKeyLabel ); - -/** - * @brief Disconnect the MQTT connection. - * - * @return true if the MQTT session was successfully disconnected; - * false otherwise. - */ -bool xDisconnectMqttSession( void ); - -/** - * @brief Subscribe to a MQTT topic filter. - * - * @param[in] pcTopicFilter The topic filter to subscribe to. - * @param[in] usTopicFilterLength Length of the topic buffer. - * - * @return true if subscribe operation was successful; - * false otherwise. - */ -bool xSubscribeToTopic( const char * pcTopicFilter, - uint16_t usTopicFilterLength ); - -/** - * @brief Unsubscribe from a MQTT topic filter. - * - * @param[in] pcTopicFilter The topic filter to unsubscribe from. - * @param[in] usTopicFilterLength Length of the topic buffer. - * - * @return true if unsubscribe operation was successful; - * false otherwise. - */ -bool xUnsubscribeFromTopic( const char * pcTopicFilter, - uint16_t usTopicFilterLength ); - -/** - * @brief Publish a message to a MQTT topic. - * - * @param[in] pcTopic The topic to publish the message on. - * @param[in] usTopicLength Length of the topic. - * @param[in] pcMessage The message to publish. - * @param[in] xMessageLength Length of the message. - * - * @return true if PUBLISH was successfully sent; - * false otherwise. - */ -bool xPublishToTopic( const char * pcTopic, - uint16_t usTopicLength, - const char * pcMessage, - size_t xMessageLength ); - -/** - * @brief Invoke the core MQTT library's process loop function. - * - * @return true if process loop was successful; - * false otherwise. - */ -bool xProcessLoop( void ); - -#endif /* ifndef MQTT_OPERATIONS_H_ */ diff --git a/FreeRTOS-Plus/Demo/AWS/Mqtt_Demo_Helpers/mqtt_demo_helpers.c b/FreeRTOS-Plus/Demo/AWS/Mqtt_Demo_Helpers/mqtt_demo_helpers.c index 775c3a348..dc3ea2405 100644 --- a/FreeRTOS-Plus/Demo/AWS/Mqtt_Demo_Helpers/mqtt_demo_helpers.c +++ b/FreeRTOS-Plus/Demo/AWS/Mqtt_Demo_Helpers/mqtt_demo_helpers.c @@ -202,8 +202,8 @@ typedef struct PublishPackets /*-----------------------------------------------------------*/ -/** - * @brief Each compilation unit that consumes the NetworkContext must define it. +/** + * @brief Each compilation unit that consumes the NetworkContext must define it. * It should contain a single pointer to the type of your desired transport. * When using multiple transports in the same compilation unit, define this pointer as void *. * diff --git a/FreeRTOS-Plus/Demo/AWS/Mqtt_Demo_Helpers/mqtt_pkcs11_demo_helpers.c b/FreeRTOS-Plus/Demo/AWS/Mqtt_Demo_Helpers/mqtt_pkcs11_demo_helpers.c new file mode 100644 index 000000000..8431388bc --- /dev/null +++ b/FreeRTOS-Plus/Demo/AWS/Mqtt_Demo_Helpers/mqtt_pkcs11_demo_helpers.c @@ -0,0 +1,1013 @@ +/* + * FreeRTOS V202112.00 + * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + * https://www.FreeRTOS.org + * https://github.com/FreeRTOS + * + */ + +/** + * @file mqtt_demo_helpers.c + * + * @brief This file provides helper functions used by the AWS demo applications to + * do MQTT operations over a mutually authenticated TLS connection. + * + * A mutually authenticated TLS connection is used to connect to the AWS IoT + * MQTT message broker in this example. Define democonfigCLIENT_PRIVATE_KEY_PEM, + * democonfigCLIENT_CERTIFICATE_PEM, and democonfigMQTT_BROKER_ENDPOINT in + * demo_config.h to achieve mutual authentication. + */ + +/* Standard includes. */ +#include +#include + +/* Kernel includes. */ +#include "FreeRTOS.h" +#include "task.h" + +/* Header file include */ +#include "mqtt_pkcs11_demo_helpers.h" + +/* MQTT library includes. */ +#include "core_mqtt.h" + +/* Exponential backoff retry include. */ +#include "backoff_algorithm.h" + +/* Transport interface implementation include header for TLS. */ +#include "using_mbedtls_pkcs11.h" + +/* Demo specific config. */ +#include "demo_config.h" + +/*------------- Demo configurations -------------------------*/ + +/** + * Note: The TLS connection credentials for the server root CA certificate, + * and device client certificate and private key should be defined in the + * demo_config.h file. + */ + +#ifndef democonfigROOT_CA_PEM + #error "Please define the AWS Root CA certificate (democonfigROOT_CA_PEM) in demo_config.h." +#endif + +#ifndef democonfigMQTT_BROKER_ENDPOINT + #error "Please define the AWS IoT broker endpoint (democonfigMQTT_BROKER_ENDPOINT) in demo_config.h." +#endif + +/*-----------------------------------------------------------*/ + +/** + * @brief The maximum number of retries for network operation with server. + */ +#define RETRY_MAX_ATTEMPTS ( 5U ) + +/** + * @brief The maximum back-off delay (in milliseconds) for retrying failed operation + * with server. + */ +#define RETRY_MAX_BACKOFF_DELAY_MS ( 5000U ) + +/** + * @brief The base back-off delay (in milliseconds) to use for network operation retry + * attempts. + */ +#define RETRY_BACKOFF_BASE_MS ( 500U ) + +/** + * @brief Timeout for receiving CONNACK packet in milliseconds. + */ +#define mqttexampleCONNACK_RECV_TIMEOUT_MS ( 1000U ) + +/** + * @brief The number of topic filters to subscribe. + */ +#define mqttexampleTOPIC_COUNT ( 1 ) + +/** + * @brief Time to wait between each cycle of the demo implemented by prvMQTTDemoTask(). + */ +#define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ( pdMS_TO_TICKS( 5000U ) ) + +/** + * @brief Timeout for MQTT_ProcessLoop in milliseconds. + */ +#define mqttexamplePROCESS_LOOP_TIMEOUT_MS ( 500U ) + +/** + * @brief Keep alive time reported to the broker while establishing an MQTT connection. + * + * It is the responsibility of the Client to ensure that the interval between + * Control Packets being sent does not exceed this Keep Alive value. In the + * absence of sending any other Control Packets, the Client MUST send a + * PINGREQ Packet. + */ +#define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 60U ) + +/** + * @brief Delay between MQTT publishes. Note that the process loop also has a + * timeout, so the total time between publishes is the sum of the two delays. + */ +#define mqttexampleDELAY_BETWEEN_PUBLISHES ( pdMS_TO_TICKS( 500U ) ) + +/** + * @brief Transport timeout in milliseconds for transport send and receive. + */ +#define mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ( 200U ) + +/** + * @brief Maximum number of outgoing publishes maintained in the application + * until an ack is received from the broker. + */ +#define MAX_OUTGOING_PUBLISHES ( 1U ) + +/** + * @brief Milliseconds per second. + */ +#define MILLISECONDS_PER_SECOND ( 1000U ) + +/** + * @brief Milliseconds per FreeRTOS tick. + */ +#define MILLISECONDS_PER_TICK ( MILLISECONDS_PER_SECOND / configTICK_RATE_HZ ) + +/** + * @brief The MQTT metrics string expected by AWS IoT. + */ +#define AWS_IOT_METRICS_STRING \ + "?SDK=" democonfigOS_NAME "&Version=" democonfigOS_VERSION \ + "&Platform=" democonfigHARDWARE_PLATFORM_NAME "&MQTTLib=" democonfigMQTT_LIB + +/** + * @brief The length of the MQTT metrics string expected by AWS IoT. + */ +#define AWS_IOT_METRICS_STRING_LENGTH ( ( uint16_t ) ( sizeof( AWS_IOT_METRICS_STRING ) - 1 ) ) + +/** + * @brief ALPN protocol name for AWS IoT MQTT. + * + * This will be used if the democonfigMQTT_BROKER_PORT is configured as 443 for AWS IoT MQTT + * broker. Please see more details about the ALPN protocol for AWS IoT MQTT + * endpoint in the link below. + * https://aws.amazon.com/blogs/iot/mqtt-with-tls-client-authentication-on-port-443-why-it-is-useful-and-how-it-works/ + * + * @note OpenSSL requires that the protocol string passed to it for configuration be encoded + * with the prefix of 8-bit length information of the string. Thus, the 14 byte (0x0e) length + * information is prefixed to the string. + */ +#define mqttopALPN_PROTOCOL_NAME "\x0ex-amzn-mqtt-ca" + + +/*-----------------------------------------------------------*/ + +/** + * @brief Structure to keep the MQTT publish packets until an ack is received + * for QoS1 publishes. + */ +typedef struct PublishPackets +{ + /** + * @brief Packet identifier of the publish packet. + */ + uint16_t packetId; + + /** + * @brief Publish info of the publish packet. + */ + MQTTPublishInfo_t pubInfo; +} PublishPackets_t; + +/*-----------------------------------------------------------*/ + +/** + * @brief Each compilation unit that consumes the NetworkContext must define it. + * It should contain a single pointer to the type of your desired transport. + * When using multiple transports in the same compilation unit, define this pointer as void *. + * + * @note Transport stacks are defined in FreeRTOS-Plus/Source/Application-Protocols/network_transport. + */ +struct NetworkContext +{ + TlsTransportParams_t * pParams; +}; + +/*-----------------------------------------------------------*/ + +/** + * @brief Global entry time into the application to use as a reference timestamp + * in the #prvGetTimeMs function. #prvGetTimeMs will always return the difference + * between the current time and the global entry time. This will reduce the chances + * of overflow for the 32 bit unsigned integer used for holding the timestamp. + */ +static uint32_t ulGlobalEntryTimeMs; + +/** + * @brief The flag to indicate the MQTT session changed. + */ +static BaseType_t xMqttSessionEstablished = pdFALSE; + +/** + * @brief Packet Identifier generated when Subscribe request was sent to the broker; + * it is used to match received Subscribe ACK to the transmitted subscribe. + */ +static uint16_t globalSubscribePacketIdentifier = 0U; + +/** + * @brief Packet Identifier generated when Unsubscribe request was sent to the broker; + * it is used to match received Unsubscribe ACK to the transmitted unsubscribe + * request. + */ +static uint16_t globalUnsubscribePacketIdentifier = 0U; + +/** + * @brief Array to keep the outgoing publish messages. + * These stored outgoing publish messages are kept until a successful ack + * is received. + */ +static PublishPackets_t outgoingPublishPackets[ MAX_OUTGOING_PUBLISHES ] = { 0 }; + +/*-----------------------------------------------------------*/ + +/** + * @brief A wrapper to the "uxRand()" random number generator so that it + * can be passed to the backoffAlgorithm library for retry logic. + * + * This function implements the #BackoffAlgorithm_RNG_T type interface + * in the backoffAlgorithm library API. + * + * @note The "uxRand" function represents a pseudo random number generator. + * However, it is recommended to use a True Random Number Generator (TRNG) + * for generating unique device-specific random values to avoid possibility + * of network collisions from multiple devices retrying network operations. + * + * @return The generated random number. This function ALWAYS succeeds. + */ +static int32_t prvGenerateRandomNumber(); + +/** + * @brief Connect to MQTT broker with reconnection retries. + * + * If connection fails, retry is attempted after a timeout. + * Timeout value will exponentially increase until maximum + * timeout value is reached or the number of attempts are exhausted. + * + * @param[out] pxNetworkContext The output parameter to return the created network context. + * @param[in] pcClientCertLabel The client certificate PKCS #11 label to use. + * @param[in] pcPrivateKeyLabel The private key PKCS #11 label for the client certificate. + * + * @return The status of the final connection attempt. + */ +static TlsTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkContext_t * pxNetworkContext, + char * pcClientCertLabel, + char * pcPrivateKeyLabel ); + +/** + * @brief Function to get the free index at which an outgoing publish + * can be stored. + * + * @param[out] pucIndex The output parameter to return the index at which an + * outgoing publish message can be stored. + * + * @return pdFAIL if no more publishes can be stored; + * pdTRUE if an index to store the next outgoing publish is obtained. + */ +static BaseType_t prvGetNextFreeIndexForOutgoingPublishes( uint8_t * pucIndex ); + +/** + * @brief Function to clean up an outgoing publish at given index from the + * #outgoingPublishPackets array. + * + * @param[in] ucIndex The index at which a publish message has to be cleaned up. + */ +static void vCleanupOutgoingPublishAt( uint8_t ucIndex ); + +/** + * @brief Function to clean up all the outgoing publishes maintained in the + * array. + */ +static void vCleanupOutgoingPublishes( void ); + +/** + * @brief Function to clean up the publish packet with the given packet id. + * + * @param[in] usPacketId Packet identifier of the packet to be cleaned up from + * the array. + */ +static void vCleanupOutgoingPublishWithPacketID( uint16_t usPacketId ); + +/** + * @brief Function to resend the publishes if a session is re-established with + * the broker. This function handles the resending of the QoS1 publish packets, + * which are maintained locally. + * + * @param[in] pxMqttContext MQTT context pointer. + */ +static BaseType_t xHandlePublishResend( MQTTContext_t * pxMqttContext ); + +/** + * @brief The timer query function provided to the MQTT context. + * + * @return Time in milliseconds. + */ +static uint32_t prvGetTimeMs( void ); + +/*-----------------------------------------------------------*/ + +static int32_t prvGenerateRandomNumber() +{ + return( uxRand() & INT32_MAX ); +} + +/*-----------------------------------------------------------*/ + +static TlsTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkContext_t * pxNetworkContext, + char * pcClientCertLabel, + char * pcPrivateKeyLabel ) +{ + TlsTransportStatus_t xNetworkStatus = TLS_TRANSPORT_SUCCESS; + BackoffAlgorithmStatus_t xBackoffAlgStatus = BackoffAlgorithmSuccess; + BackoffAlgorithmContext_t xReconnectParams = { 0 }; + NetworkCredentials_t xNetworkCredentials = { 0 }; + uint16_t usNextRetryBackOff = 0U; + const char * pcAlpn[] = { mqttopALPN_PROTOCOL_NAME, NULL }; + + /* ALPN protocols must be a NULL-terminated list of strings. Therefore, + * the first entry will contain the actual ALPN protocol string while the + * second entry must remain NULL. */ + char * pcAlpnProtocols[] = { NULL, NULL }; + + configASSERT( pxNetworkContext != NULL ); + + /* Set the credentials for establishing a TLS connection. */ + xNetworkCredentials.pRootCa = ( const unsigned char * ) democonfigROOT_CA_PEM; + xNetworkCredentials.rootCaSize = sizeof( democonfigROOT_CA_PEM ); + xNetworkCredentials.pClientCertLabel = pcClientCertLabel; + xNetworkCredentials.pPrivateKeyLabel = pcPrivateKeyLabel; + + /* AWS IoT requires devices to send the Server Name Indication (SNI) + * extension to the Transport Layer Security (TLS) protocol and provide + * the complete endpoint address in the host_name field. Details about + * SNI for AWS IoT can be found in the link below. + * https://docs.aws.amazon.com/iot/latest/developerguide/transport-security.html + */ + xNetworkCredentials.disableSni = pdFALSE; + + if( democonfigMQTT_BROKER_PORT == 443 ) + { + /* Pass the ALPN protocol name depending on the port being used. + * Please see more details about the ALPN protocol for AWS IoT MQTT endpoint + * in the link below. + * https://aws.amazon.com/blogs/iot/mqtt-with-tls-client-authentication-on-port-443-why-it-is-useful-and-how-it-works/ + */ + xNetworkCredentials.pAlpnProtos = pcAlpn; + } + + /* Initialize reconnect attempts and interval.*/ + BackoffAlgorithm_InitializeParams( &xReconnectParams, + RETRY_BACKOFF_BASE_MS, + RETRY_MAX_BACKOFF_DELAY_MS, + RETRY_MAX_ATTEMPTS ); + + /* Attempt to connect to MQTT broker. If connection fails, retry after + * a timeout. Timeout value will exponentially increase until maximum + * attempts are reached. + */ + do + { + /* Establish a TCP connection with the MQTT broker. This example connects to + * the MQTT broker as specified in democonfigMQTT_BROKER_ENDPOINT and + * democonfigMQTT_BROKER_PORT at the top of this file. */ + LogInfo( ( "Create a TCP connection to %s:%d.", + democonfigMQTT_BROKER_ENDPOINT, + democonfigMQTT_BROKER_PORT ) ); + xNetworkStatus = TLS_FreeRTOS_Connect( pxNetworkContext, + democonfigMQTT_BROKER_ENDPOINT, + democonfigMQTT_BROKER_PORT, + &xNetworkCredentials, + mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS, + mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ); + + if( xNetworkStatus != TLS_TRANSPORT_SUCCESS ) + { + /* Generate a random number and calculate backoff value (in milliseconds) for + * the next connection retry. + * Note: It is recommended to seed the random number generator with a device-specific + * entropy source so that possibility of multiple devices retrying failed network operations + * at similar intervals can be avoided. */ + xBackoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &xReconnectParams, uxRand(), &usNextRetryBackOff ); + + if( xBackoffAlgStatus == BackoffAlgorithmRetriesExhausted ) + { + LogError( ( "Connection to the broker failed, all attempts exhausted." ) ); + } + else if( xBackoffAlgStatus == BackoffAlgorithmSuccess ) + { + LogWarn( ( "Connection to the broker failed. " + "Retrying connection with backoff and jitter." ) ); + vTaskDelay( pdMS_TO_TICKS( usNextRetryBackOff ) ); + } + } + } while( ( xNetworkStatus != TLS_TRANSPORT_SUCCESS ) && ( xBackoffAlgStatus == BackoffAlgorithmSuccess ) ); + + return xNetworkStatus; +} + +/*-----------------------------------------------------------*/ + +static BaseType_t prvGetNextFreeIndexForOutgoingPublishes( uint8_t * pucIndex ) +{ + BaseType_t xReturnStatus = pdFAIL; + uint8_t ucIndex = 0; + + configASSERT( outgoingPublishPackets != NULL ); + configASSERT( pucIndex != NULL ); + + for( ucIndex = 0; ucIndex < MAX_OUTGOING_PUBLISHES; ucIndex++ ) + { + /* A free ucIndex is marked by invalid packet id. + * Check if the ucIndex has a free slot. */ + if( outgoingPublishPackets[ ucIndex ].packetId == MQTT_PACKET_ID_INVALID ) + { + xReturnStatus = pdPASS; + break; + } + } + + /* Copy the available ucIndex into the output param. */ + *pucIndex = ucIndex; + + return xReturnStatus; +} +/*-----------------------------------------------------------*/ + +static void vCleanupOutgoingPublishAt( uint8_t ucIndex ) +{ + configASSERT( outgoingPublishPackets != NULL ); + configASSERT( ucIndex < MAX_OUTGOING_PUBLISHES ); + + /* Clear the outgoing publish packet. */ + ( void ) memset( &( outgoingPublishPackets[ ucIndex ] ), + 0x00, + sizeof( outgoingPublishPackets[ ucIndex ] ) ); +} + +/*-----------------------------------------------------------*/ + +static void vCleanupOutgoingPublishes( void ) +{ + configASSERT( outgoingPublishPackets != NULL ); + + /* Clean up all the outgoing publish packets. */ + ( void ) memset( outgoingPublishPackets, 0x00, sizeof( outgoingPublishPackets ) ); +} + +/*-----------------------------------------------------------*/ + +static void vCleanupOutgoingPublishWithPacketID( uint16_t usPacketId ) +{ + uint8_t ucIndex = 0; + + configASSERT( outgoingPublishPackets != NULL ); + configASSERT( usPacketId != MQTT_PACKET_ID_INVALID ); + + /* Clean up all the saved outgoing publishes. */ + for( ; ucIndex < MAX_OUTGOING_PUBLISHES; ucIndex++ ) + { + if( outgoingPublishPackets[ ucIndex ].packetId == usPacketId ) + { + vCleanupOutgoingPublishAt( ucIndex ); + LogInfo( ( "Cleaned up outgoing publish packet with packet id %u.\n\n", + usPacketId ) ); + break; + } + } +} + +/*-----------------------------------------------------------*/ + +void vHandleOtherIncomingPacket( MQTTPacketInfo_t * pxPacketInfo, + uint16_t usPacketIdentifier ) +{ + /* Handle other packets. */ + switch( pxPacketInfo->type ) + { + case MQTT_PACKET_TYPE_SUBACK: + LogInfo( ( "MQTT_PACKET_TYPE_SUBACK.\n\n" ) ); + /* Make sure ACK packet identifier matches with Request packet identifier. */ + configASSERT( globalSubscribePacketIdentifier == usPacketIdentifier ); + break; + + case MQTT_PACKET_TYPE_UNSUBACK: + LogInfo( ( "MQTT_PACKET_TYPE_UNSUBACK.\n\n" ) ); + /* Make sure ACK packet identifier matches with Request packet identifier. */ + configASSERT( globalUnsubscribePacketIdentifier == usPacketIdentifier ); + break; + + case MQTT_PACKET_TYPE_PINGRESP: + + /* Nothing to be done from application as library handles + * PINGRESP with the use of MQTT_ProcessLoop API function. */ + LogWarn( ( "PINGRESP should not be handled by the application " + "callback when using MQTT_ProcessLoop.\n" ) ); + break; + + case MQTT_PACKET_TYPE_PUBACK: + LogInfo( ( "PUBACK received for packet id %u.\n\n", + usPacketIdentifier ) ); + /* Cleanup publish packet when a PUBACK is received. */ + vCleanupOutgoingPublishWithPacketID( usPacketIdentifier ); + break; + + /* Any other packet type is invalid. */ + default: + LogError( ( "Unknown packet type received:(%02x).\n\n", + pxPacketInfo->type ) ); + } +} + +/*-----------------------------------------------------------*/ + +static BaseType_t xHandlePublishResend( MQTTContext_t * pxMqttContext ) +{ + BaseType_t xReturnStatus = pdTRUE; + MQTTStatus_t xMQTTStatus = MQTTSuccess; + uint8_t ucIndex = 0U; + + configASSERT( outgoingPublishPackets != NULL ); + + /* Resend all the QoS1 publishes still in the array. These are the + * publishes that haven't received a PUBACK. When a PUBACK is + * received, the publish is removed from the array. */ + for( ucIndex = 0U; ucIndex < MAX_OUTGOING_PUBLISHES; ucIndex++ ) + { + if( outgoingPublishPackets[ ucIndex ].packetId != MQTT_PACKET_ID_INVALID ) + { + outgoingPublishPackets[ ucIndex ].pubInfo.dup = true; + + LogInfo( ( "Sending duplicate PUBLISH with packet id %u.", + outgoingPublishPackets[ ucIndex ].packetId ) ); + xMQTTStatus = MQTT_Publish( pxMqttContext, + &outgoingPublishPackets[ ucIndex ].pubInfo, + outgoingPublishPackets[ ucIndex ].packetId ); + + if( xMQTTStatus != MQTTSuccess ) + { + LogError( ( "Sending duplicate PUBLISH for packet id %u " + " failed with status %s.", + outgoingPublishPackets[ ucIndex ].packetId, + MQTT_Status_strerror( xMQTTStatus ) ) ); + xReturnStatus = pdFAIL; + break; + } + else + { + LogInfo( ( "Sent duplicate PUBLISH successfully for packet id %u.\n\n", + outgoingPublishPackets[ ucIndex ].packetId ) ); + } + } + } + + return xReturnStatus; +} + +/*-----------------------------------------------------------*/ + +BaseType_t xEstablishMqttSession( MQTTContext_t * pxMqttContext, + NetworkContext_t * pxNetworkContext, + MQTTFixedBuffer_t * pxNetworkBuffer, + MQTTEventCallback_t eventCallback, + char * pcClientCertLabel, + char * pcPrivateKeyLabel ) +{ + BaseType_t xReturnStatus = pdTRUE; + MQTTStatus_t xMQTTStatus; + MQTTConnectInfo_t xConnectInfo; + TransportInterface_t xTransport; + bool sessionPresent = false; + + configASSERT( pxMqttContext != NULL ); + configASSERT( pxNetworkContext != NULL ); + + /* Initialize the mqtt context. */ + ( void ) memset( pxMqttContext, 0U, sizeof( MQTTContext_t ) ); + + if( prvConnectToServerWithBackoffRetries( pxNetworkContext, + pcClientCertLabel, + pcPrivateKeyLabel ) != TLS_TRANSPORT_SUCCESS ) + { + /* Log error to indicate connection failure after all + * reconnect attempts are over. */ + LogError( ( "Failed to connect to MQTT broker %.*s.", + strlen( democonfigMQTT_BROKER_ENDPOINT ), + democonfigMQTT_BROKER_ENDPOINT ) ); + xReturnStatus = pdFAIL; + } + else + { + /* Fill in Transport Interface send and receive function pointers. */ + xTransport.pNetworkContext = pxNetworkContext; + xTransport.send = TLS_FreeRTOS_send; + xTransport.recv = TLS_FreeRTOS_recv; + + /* Initialize MQTT library. */ + xMQTTStatus = MQTT_Init( pxMqttContext, + &xTransport, + prvGetTimeMs, + eventCallback, + pxNetworkBuffer ); + + if( xMQTTStatus != MQTTSuccess ) + { + xReturnStatus = pdFAIL; + LogError( ( "MQTT init failed with status %s.", + MQTT_Status_strerror( xMQTTStatus ) ) ); + } + else + { + /* Establish MQTT session by sending a CONNECT packet. */ + + /* Many fields not used in this demo so start with everything at 0. */ + ( void ) memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) ); + + /* Start with a clean session i.e. direct the MQTT broker to discard any + * previous session data. Also, establishing a connection with clean session + * will ensure that the broker does not store any data when this client + * gets disconnected. */ + xConnectInfo.cleanSession = true; + + /* The client identifier is used to uniquely identify this MQTT client to + * the MQTT broker. In a production device the identifier can be something + * unique, such as a device serial number. */ + xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER; + xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER ); + + /* The maximum time interval in seconds which is allowed to elapse + * between two Control Packets. + * It is the responsibility of the Client to ensure that the interval between + * Control Packets being sent does not exceed this Keep Alive value. In the + * absence of sending any other Control Packets, the Client MUST send a + * PINGREQ Packet. */ + xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS; + + /* Append metrics when connecting to the AWS IoT Core broker. */ + #ifdef democonfigCLIENT_USERNAME + xConnectInfo.pUserName = CLIENT_USERNAME_WITH_METRICS; + xConnectInfo.userNameLength = ( uint16_t ) strlen( CLIENT_USERNAME_WITH_METRICS ); + xConnectInfo.pPassword = democonfigCLIENT_PASSWORD; + xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD ); + #else + xConnectInfo.pUserName = AWS_IOT_METRICS_STRING; + xConnectInfo.userNameLength = AWS_IOT_METRICS_STRING_LENGTH; + /* Password for authentication is not used. */ + xConnectInfo.pPassword = NULL; + xConnectInfo.passwordLength = 0U; + #endif /* ifdef democonfigCLIENT_USERNAME */ + + /* Send MQTT CONNECT packet to broker. */ + xMQTTStatus = MQTT_Connect( pxMqttContext, + &xConnectInfo, + NULL, + mqttexampleCONNACK_RECV_TIMEOUT_MS, + &sessionPresent ); + + if( xMQTTStatus != MQTTSuccess ) + { + xReturnStatus = pdFAIL; + LogError( ( "Connection with MQTT broker failed with status %s.", + MQTT_Status_strerror( xMQTTStatus ) ) ); + } + else + { + LogInfo( ( "MQTT connection successfully established with broker.\n\n" ) ); + } + } + + if( xReturnStatus == pdFAIL ) + { + /* Keep a flag for indicating if MQTT session is established. This + * flag will mark that an MQTT DISCONNECT has to be sent at the end + * of the demo even if there are intermediate failures. */ + xMqttSessionEstablished = true; + } + + if( xReturnStatus == pdFAIL ) + { + /* Check if session is present and if there are any outgoing publishes + * that need to resend. This is only valid if the broker is + * re-establishing a session which was already present. */ + if( sessionPresent == true ) + { + LogInfo( ( "An MQTT session with broker is re-established. " + "Resending unacked publishes." ) ); + + /* Handle all the resend of publish messages. */ + xReturnStatus = xHandlePublishResend( pxMqttContext ); + } + else + { + LogInfo( ( "A clean MQTT connection is established." + " Cleaning up all the stored outgoing publishes.\n\n" ) ); + + /* Clean up the outgoing publishes waiting for ack as this new + * connection doesn't re-establish an existing session. */ + vCleanupOutgoingPublishes(); + } + } + } + + return xReturnStatus; +} + +/*-----------------------------------------------------------*/ + +BaseType_t xDisconnectMqttSession( MQTTContext_t * pxMqttContext, + NetworkContext_t * pxNetworkContext ) +{ + MQTTStatus_t xMQTTStatus = MQTTSuccess; + BaseType_t xReturnStatus = pdTRUE; + + configASSERT( pxMqttContext != NULL ); + configASSERT( pxNetworkContext != NULL ); + + if( xMqttSessionEstablished == true ) + { + /* Send DISCONNECT. */ + xMQTTStatus = MQTT_Disconnect( pxMqttContext ); + + if( xMQTTStatus != MQTTSuccess ) + { + LogError( ( "Sending MQTT DISCONNECT failed with status=%s.", + MQTT_Status_strerror( xMQTTStatus ) ) ); + xReturnStatus = pdFAIL; + } + } + + /* Close the network connection. */ + TLS_FreeRTOS_Disconnect( pxNetworkContext ); + + return xReturnStatus; +} + +/*-----------------------------------------------------------*/ + +BaseType_t xSubscribeToTopic( MQTTContext_t * pxMqttContext, + const char * pcTopicFilter, + uint16_t usTopicFilterLength ) +{ + BaseType_t xReturnStatus = pdTRUE; + MQTTStatus_t xMQTTStatus; + MQTTSubscribeInfo_t pSubscriptionList[ mqttexampleTOPIC_COUNT ]; + + configASSERT( pxMqttContext != NULL ); + configASSERT( pcTopicFilter != NULL ); + configASSERT( usTopicFilterLength > 0 ); + + /* Start with everything at 0. */ + ( void ) memset( ( void * ) pSubscriptionList, 0x00, sizeof( pSubscriptionList ) ); + + /* This example subscribes to only one topic and uses QOS1. */ + pSubscriptionList[ 0 ].qos = MQTTQoS1; + pSubscriptionList[ 0 ].pTopicFilter = pcTopicFilter; + pSubscriptionList[ 0 ].topicFilterLength = usTopicFilterLength; + + /* Generate packet identifier for the SUBSCRIBE packet. */ + globalSubscribePacketIdentifier = MQTT_GetPacketId( pxMqttContext ); + + /* Send SUBSCRIBE packet. */ + xMQTTStatus = MQTT_Subscribe( pxMqttContext, + pSubscriptionList, + sizeof( pSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ), + globalSubscribePacketIdentifier ); + + if( xMQTTStatus != MQTTSuccess ) + { + LogError( ( "Failed to send SUBSCRIBE packet to broker with error = %s.", + MQTT_Status_strerror( xMQTTStatus ) ) ); + xReturnStatus = pdFAIL; + } + else + { + LogInfo( ( "SUBSCRIBE topic %.*s to broker.\n\n", + usTopicFilterLength, + pcTopicFilter ) ); + + /* Process incoming packet from the broker. Acknowledgment for subscription + * ( SUBACK ) will be received here. However after sending the subscribe, the + * client may receive a publish before it receives a subscribe ack. Since this + * demo is subscribing to the topic to which no one is publishing, probability + * of receiving publish message before subscribe ack is zero; but application + * must be ready to receive any packet. This demo uses MQTT_ProcessLoop to + * receive packet from network. */ + xMQTTStatus = MQTT_ProcessLoop( pxMqttContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); + + if( xMQTTStatus != MQTTSuccess ) + { + xReturnStatus = pdFAIL; + LogError( ( "MQTT_ProcessLoop returned with status = %s.", + MQTT_Status_strerror( xMQTTStatus ) ) ); + } + } + + return xReturnStatus; +} + +/*-----------------------------------------------------------*/ + +BaseType_t xUnsubscribeFromTopic( MQTTContext_t * pxMqttContext, + const char * pcTopicFilter, + uint16_t usTopicFilterLength ) +{ + BaseType_t xReturnStatus = pdTRUE; + MQTTStatus_t xMQTTStatus; + MQTTSubscribeInfo_t pSubscriptionList[ 1 ]; + + configASSERT( pxMqttContext != NULL ); + configASSERT( pcTopicFilter != NULL ); + configASSERT( usTopicFilterLength > 0 ); + + /* Start with everything at 0. */ + ( void ) memset( ( void * ) pSubscriptionList, 0x00, sizeof( pSubscriptionList ) ); + + /* This example subscribes to only one topic and uses QOS1. */ + pSubscriptionList[ 0 ].qos = MQTTQoS1; + pSubscriptionList[ 0 ].pTopicFilter = pcTopicFilter; + pSubscriptionList[ 0 ].topicFilterLength = usTopicFilterLength; + + /* Generate packet identifier for the UNSUBSCRIBE packet. */ + globalUnsubscribePacketIdentifier = MQTT_GetPacketId( pxMqttContext ); + + /* Send UNSUBSCRIBE packet. */ + xMQTTStatus = MQTT_Unsubscribe( pxMqttContext, + pSubscriptionList, + sizeof( pSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ), + globalUnsubscribePacketIdentifier ); + + if( xMQTTStatus != MQTTSuccess ) + { + LogError( ( "Failed to send UNSUBSCRIBE packet to broker with error = %s.", + MQTT_Status_strerror( xMQTTStatus ) ) ); + xReturnStatus = pdFAIL; + } + else + { + LogInfo( ( "UNSUBSCRIBE sent topic %.*s to broker.\n\n", + usTopicFilterLength, + pcTopicFilter ) ); + + /* Process the incoming packet from the broker. */ + xMQTTStatus = MQTT_ProcessLoop( pxMqttContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); + + if( xMQTTStatus != MQTTSuccess ) + { + xReturnStatus = pdFAIL; + LogError( ( "MQTT_ProcessLoop returned with status = %s.", + MQTT_Status_strerror( xMQTTStatus ) ) ); + } + } + + return xReturnStatus; +} + +/*-----------------------------------------------------------*/ + +BaseType_t xPublishToTopic( MQTTContext_t * pxMqttContext, + const char * pcTopicFilter, + int32_t topicFilterLength, + const char * pcPayload, + size_t payloadLength ) +{ + BaseType_t xReturnStatus = pdPASS; + MQTTStatus_t xMQTTStatus = MQTTSuccess; + uint8_t ucPublishIndex = MAX_OUTGOING_PUBLISHES; + + configASSERT( pxMqttContext != NULL ); + configASSERT( pcTopicFilter != NULL ); + configASSERT( topicFilterLength > 0 ); + + /* Get the next free index for the outgoing publish. All QoS1 outgoing + * publishes are stored until a PUBACK is received. These messages are + * stored for supporting a resend if a network connection is broken before + * receiving a PUBACK. */ + xReturnStatus = prvGetNextFreeIndexForOutgoingPublishes( &ucPublishIndex ); + + if( xReturnStatus == pdFAIL ) + { + LogError( ( "Unable to find a free spot for outgoing PUBLISH message.\n\n" ) ); + } + else + { + LogInfo( ( "the published payload:%.*s \r\n ", payloadLength, pcPayload ) ); + /* This example publishes to only one topic and uses QOS1. */ + outgoingPublishPackets[ ucPublishIndex ].pubInfo.qos = MQTTQoS1; + outgoingPublishPackets[ ucPublishIndex ].pubInfo.pTopicName = pcTopicFilter; + outgoingPublishPackets[ ucPublishIndex ].pubInfo.topicNameLength = topicFilterLength; + outgoingPublishPackets[ ucPublishIndex ].pubInfo.pPayload = pcPayload; + outgoingPublishPackets[ ucPublishIndex ].pubInfo.payloadLength = payloadLength; + + /* Get a new packet id. */ + outgoingPublishPackets[ ucPublishIndex ].packetId = MQTT_GetPacketId( pxMqttContext ); + + /* Send PUBLISH packet. */ + xMQTTStatus = MQTT_Publish( pxMqttContext, + &outgoingPublishPackets[ ucPublishIndex ].pubInfo, + outgoingPublishPackets[ ucPublishIndex ].packetId ); + + if( xMQTTStatus != MQTTSuccess ) + { + LogError( ( "Failed to send PUBLISH packet to broker with error = %s.", + MQTT_Status_strerror( xMQTTStatus ) ) ); + vCleanupOutgoingPublishAt( ucPublishIndex ); + xReturnStatus = pdFAIL; + } + else + { + LogInfo( ( "PUBLISH sent for topic %.*s to broker with packet ID %u.\n\n", + topicFilterLength, + pcTopicFilter, + outgoingPublishPackets[ ucPublishIndex ].packetId ) ); + + /* Calling MQTT_ProcessLoop to process incoming publish echo, since + * application subscribed to the same topic the broker will send + * publish message back to the application. This function also + * sends ping request to broker if MQTT_KEEP_ALIVE_INTERVAL_SECONDS + * has expired since the last MQTT packet sent and receive + * ping responses. */ + xMQTTStatus = MQTT_ProcessLoop( pxMqttContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); + + if( xMQTTStatus != MQTTSuccess ) + { + LogError( ( "MQTT_ProcessLoop returned with status = %s.", + MQTT_Status_strerror( xMQTTStatus ) ) ); + xReturnStatus = pdFAIL; + } + } + } + + return xReturnStatus; +} + +/*-----------------------------------------------------------*/ + +BaseType_t xProcessLoop( MQTTContext_t * pxMqttContext, + uint32_t ulTimeoutMs ) +{ + BaseType_t xReturnStatus = pdFAIL; + MQTTStatus_t xMQTTStatus = MQTTSuccess; + + xMQTTStatus = MQTT_ProcessLoop( pxMqttContext, ulTimeoutMs ); + + if( xMQTTStatus != MQTTSuccess ) + { + LogError( ( "MQTT_ProcessLoop returned with status = %s.", + MQTT_Status_strerror( xMQTTStatus ) ) ); + } + else + { + LogDebug( ( "MQTT_ProcessLoop successful." ) ); + xReturnStatus = pdPASS; + } + + return xReturnStatus; +} + +/*-----------------------------------------------------------*/ + +static uint32_t prvGetTimeMs( void ) +{ + TickType_t xTickCount = 0; + uint32_t ulTimeMs = 0UL; + + /* Get the current tick count. */ + xTickCount = xTaskGetTickCount(); + + /* Convert the ticks to milliseconds. */ + ulTimeMs = ( uint32_t ) xTickCount * MILLISECONDS_PER_TICK; + + /* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the + * elapsed time in the application. */ + ulTimeMs = ( uint32_t ) ( ulTimeMs - ulGlobalEntryTimeMs ); + + return ulTimeMs; +} + +/*-----------------------------------------------------------*/ diff --git a/FreeRTOS-Plus/Demo/AWS/Mqtt_Demo_Helpers/mqtt_pkcs11_demo_helpers.h b/FreeRTOS-Plus/Demo/AWS/Mqtt_Demo_Helpers/mqtt_pkcs11_demo_helpers.h new file mode 100644 index 000000000..2d37d4351 --- /dev/null +++ b/FreeRTOS-Plus/Demo/AWS/Mqtt_Demo_Helpers/mqtt_pkcs11_demo_helpers.h @@ -0,0 +1,141 @@ +/* + * FreeRTOS V202112.00 + * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + * https://www.FreeRTOS.org + * https://github.com/FreeRTOS + * + */ + +#ifndef MQTT_PKCS11_DEMO_HELPERS_H +#define MQTT_PKCS11_DEMO_HELPERS_H + +/* MQTT API header. */ +#include "core_mqtt.h" + +/* Transport interface implementation include header for TLS. */ +#include "using_mbedtls_pkcs11.h" + +/** + * @brief Establish a MQTT connection. + * + * @param[in, out] pxMqttContext The memory for the MQTTContext_t that will be used for the + * MQTT connection. + * @param[out] pxNetworkContext The memory for the NetworkContext_t required for the + * MQTT connection. + * @param[in] pxNetworkBuffer The buffer space for initializing the @p pxMqttContext MQTT + * context used in the MQTT connection. + * @param[in] eventCallback The callback function used to receive incoming + * publishes and incoming acks from MQTT library. + * @param[in] pcClientCertLabel The client certificate PKCS #11 label to use. + * @param[in] pcPrivateKeyLabel The private key PKCS #11 label for the client certificate. + * + * @return The status of the final connection attempt. + */ +BaseType_t xEstablishMqttSession( MQTTContext_t * pxMqttContext, + NetworkContext_t * pxNetworkContext, + MQTTFixedBuffer_t * pxNetworkBuffer, + MQTTEventCallback_t eventCallback, + char * pcClientCertLabel, + char * pcPrivateKeyLabel ); + +/** + * @brief Handle the incoming packet if it's not related to the device shadow. + * + * @param[in] pxPacketInfo Packet Info pointer for the incoming packet. + * @param[in] usPacketIdentifier Packet identifier of the incoming packet. + */ +void vHandleOtherIncomingPacket( MQTTPacketInfo_t * pxPacketInfo, + uint16_t usPacketIdentifier ); + +/** + * @brief Close the MQTT connection. + * + * @param[in, out] pxMqttContext The MQTT context for the MQTT connection to close. + * @param[in, out] pxNetworkContext The network context for the TLS session to + * terminate. + * + * @return pdPASS if DISCONNECT was successfully sent; + * pdFAIL otherwise. + */ +BaseType_t xDisconnectMqttSession( MQTTContext_t * pxMqttContext, + NetworkContext_t * pxNetworkContext ); + +/** + * @brief Subscribe to a MQTT topic filter. + * + * @param[in] pxMqttContext The MQTT context for the MQTT connection. + * @param[in] pcTopicFilter Pointer to the shadow topic buffer. + * @param[in] usTopicFilterLength Indicates the length of the shadow + * topic buffer. + * + * @return pdPASS if SUBSCRIBE was successfully sent; + * pdFAIL otherwise. + */ +BaseType_t xSubscribeToTopic( MQTTContext_t * pxMqttContext, + const char * pcTopicFilter, + uint16_t usTopicFilterLength ); + +/** + * @brief Sends an MQTT UNSUBSCRIBE to unsubscribe from the shadow + * topic. + * + * @param[in] pxMqttContext The MQTT context for the MQTT connection. + * @param[in] pcTopicFilter Pointer to the MQTT topic filter. + * @param[in] usTopicFilterLength Indicates the length of the topic filter. + * + * @return pdPASS if UNSUBSCRIBE was successfully sent; + * pdFAIL otherwise. + */ +BaseType_t xUnsubscribeFromTopic( MQTTContext_t * pxMqttContext, + const char * pcTopicFilter, + uint16_t usTopicFilterLength ); + +/** + * @brief Publish a message to a MQTT topic. + * + * @param[in] pxMqttContext The MQTT context for the MQTT connection. + * @param[in] pcTopicFilter Points to the topic. + * @param[in] topicFilterLength The length of the topic. + * @param[in] pcPayload Points to the payload. + * @param[in] payloadLength The length of the payload. + * + * @return pdPASS if PUBLISH was successfully sent; + * pdFAIL otherwise. + */ +BaseType_t xPublishToTopic( MQTTContext_t * pxMqttContext, + const char * pcTopicFilter, + int32_t topicFilterLength, + const char * pcPayload, + size_t payloadLength ); + +/** + * @brief Invoke the core MQTT library's process loop function. + * + * @param[in] pxMqttContext The MQTT context for the MQTT connection. + * @param[in] ulTimeoutMs Minimum time for the loop to run, if no error occurs. + * + * @return pdPASS if process loop was successful; + * pdFAIL otherwise. + */ +BaseType_t xProcessLoop( MQTTContext_t * pxMqttContext, + uint32_t ulTimeoutMs ); + +#endif /* ifndef MQTT_PKCS11_DEMO_HELPERS_H */