FreeRTOS-Kernel/FreeRTOS-Plus/Demo/AWS/Mqtt_Demo_Helpers/mqtt_demo_helpers.c
Soren Ptak 8424589ed1
FreeRTOS Windows Simulator Build Changes and LTS 2.0 Update (#872)
* Update mbedtls to version v3.2.1

* Adjust include paths for github workflow

* Update FreeRTOS+TCP to v3.1.0

* Add initial VS project files for Kernel, +TCP, and mbedtls

* winpcap: Consolidate to a single copy of WinPcap

* Downgrade library projects to VS 2019. Add heap_4 to kernel.

* Remove *.vcxproj.user files and add to gitignore

* Disable unwanted kernel config options

* Update FreeRTOS+TCP and Kernel include paths

* Update FreeRTOS+TCP Windows Minimal sln

* Remove .suo and .vcxproj.user files

* Update mbedtls transport implementations for mbedtls v3.2.1

* Fix typo in mbedtls_freertos_port.c

* Update vcxproj files for +TCP, Kernel, Mbedtls

* Fix typo in name WindowSimulator -> WindowsSimulator

* Add wpcap lib to FreeRTOS+TCP.vcxproj

* Update FreeRTOS+TCP Minimal Demo for Windows Simulator

* Mask MSVC string function warnings

Aad _CRT_SECURE_NO_WARNINGS preprocessor definition.

* Move projects to their own directories to make VisualStudio happy

* mbedtls_freertos_port.c: Fix formatting?

* Add coreHTTP, Logging libs. Adjust dependencies

* Disable FreeRTOS Kernel malloc failed hook

* Update coreHTTP Plaintext demo

* Rename / relocate transport interface implementations

* Remove old VS project files

* Remove extra core_http_config.h files

* Remove extra FreeRTOSConfig.h files

* Remove extra FreeRTOSIPConfig.h files

* Remove old mbedtls_config.h files

* Remove old FreeRTOSConfig and core_http_config files

* Update HTTP Mutual Auth Demo build files

* Fix transport_plaintext.h include name

* Update HTTP_Plaintext demo build files

* Update CoreHTTP_S3_Download VS project files

* Update HTTP_S3_Upload demo build files

* Update CoreHTTP_S3_Download_Multithreaded demo build files

* Add GithubActions builds for FreeRTOS+ CoreHTTP Demos

* Update S3DownloadHTTPExample.c to be compatible with mbedtls 3.x

* Update GithubActions FreeRTOS+ config file

* Combine core_pkcs11_config.h files into a single one

* Add corePKCS11 VS project file

* Update corePKCS11 WinSim demo project and build files

* Update corePKCS11 library to version 3.5.0

* Modifiying demos to build using static libraries

* Adding a header file

* Two more demos

* Update corePKCS11 demo code and auto-format

* Downgrade corePKCS11 library build file to v142 / VS 2019

* Speed up CI builds with selective submodule initialization

* Separate corePKCS11 demo into it's own job.

* Remove WIN32.vcxproj and WIN32.vcxproj.filters files

* Remove old configuration header files

Partially reverts 73829ced6061e4584e521185178a61b4a437c5e0

* Revert unwanted coreHTTP demo changes

* Changing include paths for demos

* Adding an include

* MQTT WoflSSL Demo update

* wolfSSL Demo changes

* Config changes for MQTT Demos

* Initial Device Defender demo update.

* Initial Device Shadow demo update.

* Update mbedtls_transport_pkcs11 for Mbedtls 3.x

* Update corePKCS11 MQTT Mutual Auth demo

* Add a default definition of SdkLog

* Update CorePKCS11 + MQTT Mutual Auth demo sln name

* Build all configs of coreHTTP demos

* Update coreSNTP Demo build files.

* Update coreSNTP Demo sln name

* Update FreeRTOS+TCP Posix demo

* Update FreeRTOS+TCP Qemu ARM MPS2 AN385 Demo

* Update FreeRTOS+TCP Demo Github Actions builds

* Update Fleet Provisioning WinSim Demo build files

* Remove mbedtls_pk_info_t references from mbedtls_pk_pkcs11.h

* Restore / update the FreeRTOS+TCP Minimal WinSim demo

* Initial Jobs demo update.

* Fix jobs demo build.

* Initial OTA over HTTP demo update.

* Initial OTA over MQTT demo update.

* Ota_Over_Mqtt_Demo build fix.

* OTA over MQTT demo fix.

* Update HTTP demo solution file name for CI.

* Update Github actions workflow to old HTTP demo names.

* Update coreSTNP demo to old solution name.

* Fix defender demo / fix mbedtls config to use threading alt.

* Add MBEDTLS_CONFIG_FILE definition to each config

* Fix config file name.

* MQTT Mutual auth fixes.

* Fix job demo.

* Device shadow demo fixes.

* Fix coreSNTP demo not setting alt threading functions for mbedtls.

* Enable Static allocation, Add default hooks for FreeRTOS Kernel and +TCP

* Add xPlatformIsNetworkUp platform function to FreeRTOS+TCP hooks

* Enable runtime statistics in the Windows Simualator Kernel config

* Revert "Fix coreSNTP demo not setting alt threading functions for mbedtls."

This reverts commit 9069707519561ca8136d58c0f18fb176c9050a1d.

* Revert mbedtls threading related config changes

* Add xPlatformIsNetworkUp function prototype

* Remove boileplate FreeRTOS kernel and +tcp hooks

* Refactor device defender demo for clarity

* Add wait loop calling xPlatformIsNetworkUp

* Add missing vPlatformInitLogging function

* Add vPlatformInitLogging and vLoggingPrintf defitions to logging headers

* Updating the FreeRTOS_Plus_CLI_with_Trace_Windows_Simulator

* Updating the FreeRTOS_Plus_Reliance_Edge_and_CLI_Windows_Simulator demo

* Updates to the FreeRTOS_Plus_WolfSSL_Windows_Simulator demo

* Fixing wrong include path

* Upating FreeRTOS_Plus_WolfSSL_FIPS_Ready_Windows_Simulator demo

* Update coreMQTT WinSim demos to print start and end condition.

* Modifiying repos updated as part of the CLI demo GitHub workflow

* Removing duplicate functions from the FreeRTOS_Plus_TCP_Minimal_Windows_Simulator demo

* Updated FreeRTOS_Plus_TCP_UDP_Mode_CLI_Windows_Simulator demo

* Updating corePKCS11_MQTT_Mutual_Auth_Windows_Simulator Demo to call the static function that creates task

* Fix log message using unitialized string in MQTT Multitask demo.

* Fixing a broken extern function

* Fixing a typo extern function name

* Added a reference to coreHTTP

* Fixing a pre-processor issue in the OTA_Over_Http_Demo

* Updating the MQTT_Mutual_Auth_Demo_with_BG96 demo

* Updating the MQTT_Mutual_Auth_Demo_with_HL7802 demo

* Changes to the MQTT_Mutual_Auth_Demo_with_SARA_R4 demo

* Fix demos for CI.

* Adding the source path to the CBMC proofs for FreeRTOS+TCP

* Spell check fixes, adding words to lexicons

* Fixing a typo

* Add arg to skip prompt in setup script.

* Update paths of script to be relative to the file.

* Changing manifest.yml file to point to corePKCS11 3.5.0

* Added CI markers to cellular demos.

* Fix cellular demo flow.

* Fix celullar demos.

* Initial TCP sockets wrapper rework - will break things.

* First cellular demo fix for new sockets wrapper.

* Minor fix to cellular sockets wrapper.

* Fix mbedtls bio using FreeRTOS Plus TCP call.

* Clean up BG96 demo project files.

* Update HL7802 demo.

* Fix SARA R4 demo for new sockets wrapper.

* Fix Device Defender, Device Shadow, and Fleet PRovisioning.

* Fix Jobs demo.

* Fix OTA over HTTP demo.

* Fix OTA over MQTT demo.

* Fix HTTP mutual auth demo.

* Fix OTA over MQTT demo endianness.

* Fix OTA over HTTP demo endianness.

* Fix HTTP Plaintext demo.

* Fix HTTP S3 download demo.

* Fix plaintext transport

* Fix OTA demos.

* Fix OTA demos.

* Fix OTA HTTP demo.

* Fix HTTP S3 Download multithreaded demo.

* Fix HTTP S3 Upload demo

* Fix corePKCS11 Mutual Auth demo.

* Updating MQTT_Mutual_Auth

* Update pkcs11 setup script.

* Updating the MQTT_Basic_TLS Demo

* Organize PKCS11 demos project.

* Updating MQTT_Keep_Alive demo

* Clean up SNTP demo.

* Updated MQTT_Multitask demo

* Updated MQTT_Plain_Text

* Updating the MQTT_Serializer Demo

* Updating corePKCS11_MQTT_Mutual_Auth_Windows_Simulator

* Updating coreSNTP_Windows_Simulator

* Clean up demo projects.

* Add markers to PKCS11 mutual auth demo.

* Fix Fleet Provisioning demo script.

* Fix SNTP demo solution.

* Fix coreSNTP project files.

* Fix Fleet Provisiong script.

* Fix fleet provisioning script.

* Fix demo config template.

* Fleet provisioning demo markers.

* Updating MQTT_Mutual_Auth_wolfSSL demo and the transport_wolfSSL file

* Fixing FreeRTOS_Plus_TCP_Echo_POSIX

* Fixing CLI and Trace Demos

* Fixing TCP_ECHO_POSIX demo

* Adding a word to the lexicon

* Remove unneeded files.

* Update github workflows to use Ubuntu 20.04.

* Change OTA demo target names to RTOSDemo for OTAE2E tests.

* Fixing Headers

* Updating headers

* Two more headers

* Adding words to the lexicon

* Whitepsace

* Ignore mbedtls config file for header check.

* Removing FreeRTOS Header from the mbedtls_config_v3.2.1.h file

* Fix bug in lPKCS11PkMbedtlsCloseSessionAndFree. Add doxygen api docs.

* Update lexicon.txt

* Fix spelling

* Apply suggestions from code review

Co-authored-by: jasonpcarroll <23126711+jasonpcarroll@users.noreply.github.com>

* Add return code comment for p11_ecdsa_ctx_init

* Rename WindowsSimulator folder to VisualStudio_StaticProjects.

* Remove references to coroutines

* Fix mbedtls_pk_pkcs11.c

* Update to LTS 2.0 submodule pointers (#880)

* Update submodule pointers to LTS 2.0

* Initial coreMQTT 2.1.1 update.

* Update AWS demos for coreMQTT 2.1.1

* Fix deprecated macro for coreMQTT demos.

* Fix keep alive demo.

* Fix plaintext demo.

* Fix MQTT wolfSSL demo.

* Fix MQTT PKCS11 demo.

* Remove duplicate functions.

* Fix Mutual auth demos for Cellular.

* Fix OTA demos.

* Fix header of plaintext demo config.

* Set writev to NULL for OTA demos.

* Fix mbedlts config for OTA demos.

* Fix spelling.

Co-authored-by: Jason Carroll <czjaso@amazon.com>

* Removing blank line

* Fix jobs demo race condition.

* Fix race condition from WinPCap network interface.

* Update lexicon.

Co-authored-by: Paul Bartell <pbartell@amazon.com>
Co-authored-by: Jason Carroll <czjaso@amazon.com>
Co-authored-by: Paul Bartell <paul.bartell@gmail.com>
Co-authored-by: jasonpcarroll <23126711+jasonpcarroll@users.noreply.github.com>
2022-11-29 14:21:09 -08:00

1103 lines
41 KiB
C

/*
* FreeRTOS V202112.00
* Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* https://www.FreeRTOS.org
* https://github.com/FreeRTOS
*
*/
/**
* @file mqtt_demo_helpers.c
*
* @brief This file provides helper functions used by the AWS demo applications to
* do MQTT operations over a mutually authenticated TLS connection.
*
* A mutually authenticated TLS connection is used to connect to the AWS IoT
* MQTT message broker in this example. Define democonfigCLIENT_PRIVATE_KEY_PEM,
* democonfigCLIENT_CERTIFICATE_PEM, and democonfigMQTT_BROKER_ENDPOINT in
* demo_config.h to achieve mutual authentication.
*/
/* Standard includes. */
#include <stdlib.h>
#include <string.h>
/* Kernel includes. */
#include "FreeRTOS.h"
#include "task.h"
/* Shadow includes */
#include "mqtt_demo_helpers.h"
/* MQTT library includes. */
#include "core_mqtt.h"
/* Exponential backoff retry include. */
#include "backoff_algorithm.h"
/* Transport interface implementation include header for TLS. */
#include "transport_mbedtls.h"
/* Demo specific config. */
#include "demo_config.h"
/*------------- Demo configurations -------------------------*/
/**
* Note: The TLS connection credentials for the server root CA certificate,
* and device client certificate and private key should be defined in the
* demo_config.h file.
*/
#ifndef democonfigROOT_CA_PEM
#error "Please define the AWS Root CA certificate (democonfigROOT_CA_PEM) in demo_config.h."
#endif
#ifndef democonfigCLIENT_PRIVATE_KEY_PEM
#error "Please define client private key (democonfigCLIENT_PRIVATE_KEY_PEM) in demo_config.h."
#endif
#ifndef democonfigCLIENT_CERTIFICATE_PEM
#error "Please define client certificate (democonfigCLIENT_CERTIFICATE_PEM) in demo_config.h."
#endif
#ifndef democonfigMQTT_BROKER_ENDPOINT
#error "Please define the AWS IoT broker endpoint (democonfigMQTT_BROKER_ENDPOINT) in demo_config.h."
#endif
/*-----------------------------------------------------------*/
/**
* @brief The maximum number of retries for network operation with server.
*/
#define RETRY_MAX_ATTEMPTS ( 5U )
/**
* @brief The maximum back-off delay (in milliseconds) for retrying failed operation
* with server.
*/
#define RETRY_MAX_BACKOFF_DELAY_MS ( 5000U )
/**
* @brief The base back-off delay (in milliseconds) to use for network operation retry
* attempts.
*/
#define RETRY_BACKOFF_BASE_MS ( 500U )
/**
* @brief Timeout for receiving CONNACK packet in milliseconds.
*/
#define mqttexampleCONNACK_RECV_TIMEOUT_MS ( 1000U )
/**
* @brief The number of topic filters to subscribe.
*/
#define mqttexampleTOPIC_COUNT ( 1 )
/**
* @brief Time to wait between each cycle of the demo implemented by prvMQTTDemoTask().
*/
#define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ( pdMS_TO_TICKS( 5000U ) )
/**
* @brief Timeout for MQTT_ProcessLoop in milliseconds.
*/
#define mqttexamplePROCESS_LOOP_TIMEOUT_MS ( 500U )
/**
* @brief Keep alive time reported to the broker while establishing an MQTT connection.
*
* It is the responsibility of the Client to ensure that the interval between
* Control Packets being sent does not exceed this Keep Alive value. In the
* absence of sending any other Control Packets, the Client MUST send a
* PINGREQ Packet.
*/
#define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 60U )
/**
* @brief Delay between MQTT publishes. Note that the process loop also has a
* timeout, so the total time between publishes is the sum of the two delays.
*/
#define mqttexampleDELAY_BETWEEN_PUBLISHES ( pdMS_TO_TICKS( 500U ) )
/**
* @brief Transport timeout in milliseconds for transport send and receive.
*/
#define mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ( 200U )
/**
* @brief The length of the outgoing publish records array used by the coreMQTT
* library to track QoS > 0 packet ACKS for outgoing publishes.
*/
#define mqttexampleOUTGOING_PUBLISH_RECORD_LEN ( 10U )
/**
* @brief The length of the incoming publish records array used by the coreMQTT
* library to track QoS > 0 packet ACKS for incoming publishes.
*/
#define mqttexampleINCOMING_PUBLISH_RECORD_LEN ( 10U )
/**
* @brief Maximum number of outgoing publishes maintained in the application
* until an ack is received from the broker.
*/
#define MAX_OUTGOING_PUBLISHES ( 1U )
/**
* @brief Milliseconds per second.
*/
#define MILLISECONDS_PER_SECOND ( 1000U )
/**
* @brief Milliseconds per FreeRTOS tick.
*/
#define MILLISECONDS_PER_TICK ( MILLISECONDS_PER_SECOND / configTICK_RATE_HZ )
/**
* @brief The MQTT metrics string expected by AWS IoT.
*/
#define AWS_IOT_METRICS_STRING \
"?SDK=" democonfigOS_NAME "&Version=" democonfigOS_VERSION \
"&Platform=" democonfigHARDWARE_PLATFORM_NAME "&MQTTLib=" democonfigMQTT_LIB
/*-----------------------------------------------------------*/
/**
* @brief Structure to keep the MQTT publish packets until an ack is received
* for QoS1 publishes.
*/
typedef struct PublishPackets
{
/**
* @brief Packet identifier of the publish packet.
*/
uint16_t packetId;
/**
* @brief Publish info of the publish packet.
*/
MQTTPublishInfo_t pubInfo;
} PublishPackets_t;
/*-----------------------------------------------------------*/
/**
* @brief Each compilation unit that consumes the NetworkContext must define it.
* It should contain a single pointer to the type of your desired transport.
* When using multiple transports in the same compilation unit, define this pointer as void *.
*
* @note Transport stacks are defined in FreeRTOS-Plus/Source/Application-Protocols/network_transport.
*/
struct NetworkContext
{
TlsTransportParams_t * pParams;
};
/*-----------------------------------------------------------*/
/**
* @brief Global entry time into the application to use as a reference timestamp
* in the #prvGetTimeMs function. #prvGetTimeMs will always return the difference
* between the current time and the global entry time. This will reduce the chances
* of overflow for the 32 bit unsigned integer used for holding the timestamp.
*/
static uint32_t ulGlobalEntryTimeMs;
/**
* @brief The flag to indicate the MQTT session changed.
*/
static BaseType_t xMqttSessionEstablished = pdFALSE;
/**
* @brief Packet Identifier generated when Subscribe request was sent to the broker;
* it is used to match received Subscribe ACK to the transmitted subscribe.
*/
static uint16_t globalSubscribePacketIdentifier = 0U;
/**
* @brief Packet Identifier generated when Unsubscribe request was sent to the broker;
* it is used to match received Unsubscribe ACK to the transmitted unsubscribe
* request.
*/
static uint16_t globalUnsubscribePacketIdentifier = 0U;
/**
* @brief Array to keep the outgoing publish messages.
* These stored outgoing publish messages are kept until a successful ack
* is received.
*/
static PublishPackets_t outgoingPublishPackets[ MAX_OUTGOING_PUBLISHES ] = { 0 };
/**
* @brief Array to track the outgoing publish records for outgoing publishes
* with QoS > 0.
*
* This is passed into #MQTT_InitStatefulQoS to allow for QoS > 0.
*
*/
static MQTTPubAckInfo_t pOutgoingPublishRecords[ mqttexampleOUTGOING_PUBLISH_RECORD_LEN ];
/**
* @brief Array to track the incoming publish records for incoming publishes
* with QoS > 0.
*
* This is passed into #MQTT_InitStatefulQoS to allow for QoS > 0.
*
*/
static MQTTPubAckInfo_t pIncomingPublishRecords[ mqttexampleINCOMING_PUBLISH_RECORD_LEN ];
/*-----------------------------------------------------------*/
/**
* @brief A wrapper to the "uxRand()" random number generator so that it
* can be passed to the backoffAlgorithm library for retry logic.
*
* This function implements the #BackoffAlgorithm_RNG_T type interface
* in the backoffAlgorithm library API.
*
* @note The "uxRand" function represents a pseudo random number generator.
* However, it is recommended to use a True Random Number Generator (TRNG)
* for generating unique device-specific random values to avoid possibility
* of network collisions from multiple devices retrying network operations.
*
* @return The generated random number. This function ALWAYS succeeds.
*/
static int32_t prvGenerateRandomNumber();
/**
* @brief Connect to MQTT broker with reconnection retries.
*
* If connection fails, retry is attempted after a timeout.
* Timeout value will exponentially increase until maximum
* timeout value is reached or the number of attempts are exhausted.
*
* @param[out] pxNetworkContext The output parameter to return the created network context.
*
* @return The status of the final connection attempt.
*/
static TlsTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkContext_t * pxNetworkContext );
/**
* @brief Function to get the free index at which an outgoing publish
* can be stored.
*
* @param[out] pucIndex The output parameter to return the index at which an
* outgoing publish message can be stored.
*
* @return pdFAIL if no more publishes can be stored;
* pdTRUE if an index to store the next outgoing publish is obtained.
*/
static BaseType_t prvGetNextFreeIndexForOutgoingPublishes( uint8_t * pucIndex );
/**
* @brief Function to clean up an outgoing publish at given index from the
* #outgoingPublishPackets array.
*
* @param[in] ucIndex The index at which a publish message has to be cleaned up.
*/
static void vCleanupOutgoingPublishAt( uint8_t ucIndex );
/**
* @brief Function to clean up all the outgoing publishes maintained in the
* array.
*/
static void vCleanupOutgoingPublishes( void );
/**
* @brief Function to clean up the publish packet with the given packet id.
*
* @param[in] usPacketId Packet identifier of the packet to be cleaned up from
* the array.
*/
static void vCleanupOutgoingPublishWithPacketID( uint16_t usPacketId );
/**
* @brief Function to resend the publishes if a session is re-established with
* the broker. This function handles the resending of the QoS1 publish packets,
* which are maintained locally.
*
* @param[in] pxMqttContext MQTT context pointer.
*/
static BaseType_t xHandlePublishResend( MQTTContext_t * pxMqttContext );
/**
* @brief The timer query function provided to the MQTT context.
*
* @return Time in milliseconds.
*/
static uint32_t prvGetTimeMs( void );
/**
* @brief Call #MQTT_ProcessLoop in a loop for the duration of a timeout or
* #MQTT_ProcessLoop returns a failure.
*
* @param[in] pMqttContext MQTT context pointer.
* @param[in] ulTimeoutMs Duration to call #MQTT_ProcessLoop for.
*
* @return Returns the return value of the last call to #MQTT_ProcessLoop.
*/
static MQTTStatus_t prvProcessLoopWithTimeout( MQTTContext_t * pMqttContext,
uint32_t ulTimeoutMs );
/*-----------------------------------------------------------*/
static int32_t prvGenerateRandomNumber()
{
return( uxRand() & INT32_MAX );
}
/*-----------------------------------------------------------*/
static TlsTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkContext_t * pxNetworkContext )
{
TlsTransportStatus_t xNetworkStatus = TLS_TRANSPORT_SUCCESS;
BackoffAlgorithmStatus_t xBackoffAlgStatus = BackoffAlgorithmSuccess;
BackoffAlgorithmContext_t xReconnectParams = { 0 };
NetworkCredentials_t xNetworkCredentials = { 0 };
uint16_t usNextRetryBackOff = 0U;
#if defined( democonfigCLIENT_USERNAME )
/*
* When democonfigCLIENT_USERNAME is defined, use the "mqtt" alpn to connect
* to AWS IoT Core with Custom Authentication on port 443.
*
* Custom Authentication uses the contents of the username and password
* fields of the MQTT CONNECT packet to authenticate the client.
*
* For more information, refer to the documentation at:
* https://docs.aws.amazon.com/iot/latest/developerguide/custom-authentication.html
*/
static const char * ppcAlpnProtocols[] = { "mqtt", NULL };
#if democonfigMQTT_BROKER_PORT != 443U
#error "Connections to AWS IoT Core with custom authentication must connect to TCP port 443 with the \"mqtt\" alpn."
#endif /* democonfigMQTT_BROKER_PORT != 443U */
#else /* if !defined( democonfigCLIENT_USERNAME ) */
/*
* Otherwise, use the "x-amzn-mqtt-ca" alpn to connect to AWS IoT Core using
* x509 Certificate Authentication.
*/
static const char * ppcAlpnProtocols[] = { "x-amzn-mqtt-ca", NULL };
#endif /* !defined( democonfigCLIENT_USERNAME ) */
/*
* An ALPN identifier is only required when connecting to AWS IoT core on port 443.
* https://docs.aws.amazon.com/iot/latest/developerguide/protocols.html
*/
#if democonfigMQTT_BROKER_PORT == 443U
xNetworkCredentials.pAlpnProtos = ppcAlpnProtocols;
#elif democonfigMQTT_BROKER_PORT == 8883U
xNetworkCredentials.pAlpnProtos = NULL;
#else /* democonfigMQTT_BROKER_PORT != 8883U */
xNetworkCredentials.pAlpnProtos = NULL;
#error "MQTT connections to AWS IoT Core are only allowed on ports 443 and 8883."
#endif /* democonfigMQTT_BROKER_PORT != 443U */
configASSERT( pxNetworkContext != NULL );
/* Set the credentials for establishing a TLS connection. */
xNetworkCredentials.pRootCa = ( const unsigned char * ) democonfigROOT_CA_PEM;
xNetworkCredentials.rootCaSize = sizeof( democonfigROOT_CA_PEM );
#ifdef democonfigCLIENT_CERTIFICATE_PEM
xNetworkCredentials.pClientCert = ( const unsigned char * ) democonfigCLIENT_CERTIFICATE_PEM;
xNetworkCredentials.clientCertSize = sizeof( democonfigCLIENT_CERTIFICATE_PEM );
xNetworkCredentials.pPrivateKey = ( const unsigned char * ) democonfigCLIENT_PRIVATE_KEY_PEM;
xNetworkCredentials.privateKeySize = sizeof( democonfigCLIENT_PRIVATE_KEY_PEM );
#endif
xNetworkCredentials.disableSni = pdFALSE;
/* Initialize reconnect attempts and interval.*/
BackoffAlgorithm_InitializeParams( &xReconnectParams,
RETRY_BACKOFF_BASE_MS,
RETRY_MAX_BACKOFF_DELAY_MS,
RETRY_MAX_ATTEMPTS );
/* Attempt to connect to MQTT broker. If connection fails, retry after
* a timeout. Timeout value will exponentially increase until maximum
* attempts are reached.
*/
do
{
/* Establish a TCP connection with the MQTT broker. This example connects to
* the MQTT broker as specified in democonfigMQTT_BROKER_ENDPOINT and
* democonfigMQTT_BROKER_PORT at the top of this file. */
LogInfo( ( "Create a TCP connection to %s:%d.",
democonfigMQTT_BROKER_ENDPOINT,
democonfigMQTT_BROKER_PORT ) );
xNetworkStatus = TLS_FreeRTOS_Connect( pxNetworkContext,
democonfigMQTT_BROKER_ENDPOINT,
democonfigMQTT_BROKER_PORT,
&xNetworkCredentials,
mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS,
mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS );
if( xNetworkStatus != TLS_TRANSPORT_SUCCESS )
{
/* Generate a random number and calculate backoff value (in milliseconds) for
* the next connection retry.
* Note: It is recommended to seed the random number generator with a device-specific
* entropy source so that possibility of multiple devices retrying failed network operations
* at similar intervals can be avoided. */
xBackoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &xReconnectParams, uxRand(), &usNextRetryBackOff );
if( xBackoffAlgStatus == BackoffAlgorithmRetriesExhausted )
{
LogError( ( "Connection to the broker failed, all attempts exhausted." ) );
}
else if( xBackoffAlgStatus == BackoffAlgorithmSuccess )
{
LogWarn( ( "Connection to the broker failed. "
"Retrying connection with backoff and jitter." ) );
vTaskDelay( pdMS_TO_TICKS( usNextRetryBackOff ) );
}
}
} while( ( xNetworkStatus != TLS_TRANSPORT_SUCCESS ) && ( xBackoffAlgStatus == BackoffAlgorithmSuccess ) );
return xNetworkStatus;
}
/*-----------------------------------------------------------*/
static BaseType_t prvGetNextFreeIndexForOutgoingPublishes( uint8_t * pucIndex )
{
BaseType_t xReturnStatus = pdFAIL;
uint8_t ucIndex = 0;
configASSERT( outgoingPublishPackets != NULL );
configASSERT( pucIndex != NULL );
for( ucIndex = 0; ucIndex < MAX_OUTGOING_PUBLISHES; ucIndex++ )
{
/* A free ucIndex is marked by invalid packet id.
* Check if the ucIndex has a free slot. */
if( outgoingPublishPackets[ ucIndex ].packetId == MQTT_PACKET_ID_INVALID )
{
xReturnStatus = pdPASS;
break;
}
}
/* Copy the available ucIndex into the output param. */
*pucIndex = ucIndex;
return xReturnStatus;
}
/*-----------------------------------------------------------*/
static MQTTStatus_t prvProcessLoopWithTimeout( MQTTContext_t * pMqttContext,
uint32_t ulTimeoutMs )
{
uint32_t ulMqttProcessLoopTimeoutTime;
uint32_t ulCurrentTime;
MQTTStatus_t eMqttStatus = MQTTSuccess;
ulCurrentTime = pMqttContext->getTime();
ulMqttProcessLoopTimeoutTime = ulCurrentTime + ulTimeoutMs;
/* Call MQTT_ProcessLoop multiple times a timeout happens, or
* MQTT_ProcessLoop fails. */
while( ( ulCurrentTime < ulMqttProcessLoopTimeoutTime ) &&
( eMqttStatus == MQTTSuccess || eMqttStatus == MQTTNeedMoreBytes ) )
{
eMqttStatus = MQTT_ProcessLoop( pMqttContext );
ulCurrentTime = pMqttContext->getTime();
}
if( eMqttStatus == MQTTNeedMoreBytes )
{
eMqttStatus = MQTTSuccess;
}
return eMqttStatus;
}
/*-----------------------------------------------------------*/
static void vCleanupOutgoingPublishAt( uint8_t ucIndex )
{
configASSERT( outgoingPublishPackets != NULL );
configASSERT( ucIndex < MAX_OUTGOING_PUBLISHES );
/* Clear the outgoing publish packet. */
( void ) memset( &( outgoingPublishPackets[ ucIndex ] ),
0x00,
sizeof( outgoingPublishPackets[ ucIndex ] ) );
}
/*-----------------------------------------------------------*/
static void vCleanupOutgoingPublishes( void )
{
configASSERT( outgoingPublishPackets != NULL );
/* Clean up all the outgoing publish packets. */
( void ) memset( outgoingPublishPackets, 0x00, sizeof( outgoingPublishPackets ) );
}
/*-----------------------------------------------------------*/
static void vCleanupOutgoingPublishWithPacketID( uint16_t usPacketId )
{
uint8_t ucIndex = 0;
configASSERT( outgoingPublishPackets != NULL );
configASSERT( usPacketId != MQTT_PACKET_ID_INVALID );
/* Clean up all the saved outgoing publishes. */
for( ; ucIndex < MAX_OUTGOING_PUBLISHES; ucIndex++ )
{
if( outgoingPublishPackets[ ucIndex ].packetId == usPacketId )
{
vCleanupOutgoingPublishAt( ucIndex );
LogInfo( ( "Cleaned up outgoing publish packet with packet id %u.\n\n",
usPacketId ) );
break;
}
}
}
/*-----------------------------------------------------------*/
void vHandleOtherIncomingPacket( MQTTPacketInfo_t * pxPacketInfo,
uint16_t usPacketIdentifier )
{
/* Handle other packets. */
switch( pxPacketInfo->type )
{
case MQTT_PACKET_TYPE_SUBACK:
LogInfo( ( "MQTT_PACKET_TYPE_SUBACK.\n\n" ) );
/* Make sure ACK packet identifier matches with Request packet identifier. */
configASSERT( globalSubscribePacketIdentifier == usPacketIdentifier );
break;
case MQTT_PACKET_TYPE_UNSUBACK:
LogInfo( ( "MQTT_PACKET_TYPE_UNSUBACK.\n\n" ) );
/* Make sure ACK packet identifier matches with Request packet identifier. */
configASSERT( globalUnsubscribePacketIdentifier == usPacketIdentifier );
break;
case MQTT_PACKET_TYPE_PINGRESP:
/* Nothing to be done from application as library handles
* PINGRESP with the use of MQTT_ProcessLoop API function. */
LogWarn( ( "PINGRESP should not be handled by the application "
"callback when using MQTT_ProcessLoop.\n" ) );
break;
case MQTT_PACKET_TYPE_PUBACK:
LogInfo( ( "PUBACK received for packet id %u.\n\n",
usPacketIdentifier ) );
/* Cleanup publish packet when a PUBACK is received. */
vCleanupOutgoingPublishWithPacketID( usPacketIdentifier );
break;
/* Any other packet type is invalid. */
default:
LogError( ( "Unknown packet type received:(%02x).\n\n",
pxPacketInfo->type ) );
}
}
/*-----------------------------------------------------------*/
static BaseType_t xHandlePublishResend( MQTTContext_t * pxMqttContext )
{
BaseType_t xReturnStatus = pdTRUE;
MQTTStatus_t xMQTTStatus = MQTTSuccess;
uint8_t ucIndex = 0U;
configASSERT( outgoingPublishPackets != NULL );
/* Resend all the QoS1 publishes still in the array. These are the
* publishes that haven't received a PUBACK. When a PUBACK is
* received, the publish is removed from the array. */
for( ucIndex = 0U; ucIndex < MAX_OUTGOING_PUBLISHES; ucIndex++ )
{
if( outgoingPublishPackets[ ucIndex ].packetId != MQTT_PACKET_ID_INVALID )
{
outgoingPublishPackets[ ucIndex ].pubInfo.dup = true;
LogInfo( ( "Sending duplicate PUBLISH with packet id %u.",
outgoingPublishPackets[ ucIndex ].packetId ) );
xMQTTStatus = MQTT_Publish( pxMqttContext,
&outgoingPublishPackets[ ucIndex ].pubInfo,
outgoingPublishPackets[ ucIndex ].packetId );
if( xMQTTStatus != MQTTSuccess )
{
LogError( ( "Sending duplicate PUBLISH for packet id %u "
" failed with status %s.",
outgoingPublishPackets[ ucIndex ].packetId,
MQTT_Status_strerror( xMQTTStatus ) ) );
xReturnStatus = pdFAIL;
break;
}
else
{
LogInfo( ( "Sent duplicate PUBLISH successfully for packet id %u.\n\n",
outgoingPublishPackets[ ucIndex ].packetId ) );
}
}
}
return xReturnStatus;
}
/*-----------------------------------------------------------*/
BaseType_t xEstablishMqttSession( MQTTContext_t * pxMqttContext,
NetworkContext_t * pxNetworkContext,
MQTTFixedBuffer_t * pxNetworkBuffer,
MQTTEventCallback_t eventCallback )
{
BaseType_t xReturnStatus = pdTRUE;
MQTTStatus_t xMQTTStatus;
MQTTConnectInfo_t xConnectInfo;
TransportInterface_t xTransport;
bool sessionPresent = false;
configASSERT( pxMqttContext != NULL );
configASSERT( pxNetworkContext != NULL );
/* Initialize the mqtt context. */
( void ) memset( pxMqttContext, 0U, sizeof( MQTTContext_t ) );
if( prvConnectToServerWithBackoffRetries( pxNetworkContext ) != TLS_TRANSPORT_SUCCESS )
{
/* Log error to indicate connection failure after all
* reconnect attempts are over. */
LogError( ( "Failed to connect to MQTT broker %.*s.",
strlen( democonfigMQTT_BROKER_ENDPOINT ),
democonfigMQTT_BROKER_ENDPOINT ) );
xReturnStatus = pdFAIL;
}
else
{
/* Fill in Transport Interface send and receive function pointers. */
xTransport.pNetworkContext = pxNetworkContext;
xTransport.send = TLS_FreeRTOS_send;
xTransport.recv = TLS_FreeRTOS_recv;
xTransport.writev = NULL;
/* Initialize MQTT library. */
xMQTTStatus = MQTT_Init( pxMqttContext,
&xTransport,
prvGetTimeMs,
eventCallback,
pxNetworkBuffer );
if( xMQTTStatus != MQTTSuccess )
{
xReturnStatus = pdFAIL;
LogError( ( "MQTT_Init failed with status %s.",
MQTT_Status_strerror( xMQTTStatus ) ) );
}
else
{
xMQTTStatus = MQTT_InitStatefulQoS( pxMqttContext,
pOutgoingPublishRecords,
mqttexampleOUTGOING_PUBLISH_RECORD_LEN,
pIncomingPublishRecords,
mqttexampleINCOMING_PUBLISH_RECORD_LEN );
if( xMQTTStatus != MQTTSuccess )
{
xReturnStatus = pdFAIL;
LogError( ( "MQTT_InitStatefulQos failed with status %s.",
MQTT_Status_strerror( xMQTTStatus ) ) );
}
else
{
/* Establish MQTT session by sending a CONNECT packet. */
/* Many fields not used in this demo so start with everything at 0. */
( void ) memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) );
/* Start with a clean session i.e. direct the MQTT broker to discard any
* previous session data. Also, establishing a connection with clean session
* will ensure that the broker does not store any data when this client
* gets disconnected. */
xConnectInfo.cleanSession = true;
/* The client identifier is used to uniquely identify this MQTT client to
* the MQTT broker. In a production device the identifier can be something
* unique, such as a device serial number. */
xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER;
xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER );
/* The maximum time interval in seconds which is allowed to elapse
* between two Control Packets.
* It is the responsibility of the Client to ensure that the interval between
* Control Packets being sent does not exceed this Keep Alive value. In the
* absence of sending any other Control Packets, the Client MUST send a
* PINGREQ Packet. */
xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS;
#if defined( democonfigCLIENT_USERNAME )
/* Append metrics string when connecting to AWS IoT Core with custom auth */
xConnectInfo.pUserName = democonfigCLIENT_USERNAME AWS_IOT_METRICS_STRING;
xConnectInfo.userNameLength = ( uint16_t ) strlen( democonfigCLIENT_USERNAME AWS_IOT_METRICS_STRING );
/* Use the provided password as-is */
xConnectInfo.pPassword = democonfigCLIENT_PASSWORD;
xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD );
#else
/* If no username is needed, only send the metrics string */
xConnectInfo.pUserName = AWS_IOT_METRICS_STRING;
xConnectInfo.userNameLength = ( uint16_t ) strlen( AWS_IOT_METRICS_STRING );
/* Password for authentication is not used. */
xConnectInfo.pPassword = NULL;
xConnectInfo.passwordLength = 0U;
#endif /* defined( democonfigCLIENT_USERNAME ) */
/* Send MQTT CONNECT packet to broker. */
xMQTTStatus = MQTT_Connect( pxMqttContext,
&xConnectInfo,
NULL,
mqttexampleCONNACK_RECV_TIMEOUT_MS,
&sessionPresent );
if( xMQTTStatus != MQTTSuccess )
{
xReturnStatus = pdFAIL;
LogError( ( "Connection with MQTT broker failed with status %s.",
MQTT_Status_strerror( xMQTTStatus ) ) );
}
else
{
LogInfo( ( "MQTT connection successfully established with broker.\n\n" ) );
}
}
}
if( xReturnStatus == pdFAIL )
{
/* Keep a flag for indicating if MQTT session is established. This
* flag will mark that an MQTT DISCONNECT has to be sent at the end
* of the demo even if there are intermediate failures. */
xMqttSessionEstablished = true;
}
if( xReturnStatus == pdFAIL )
{
/* Check if session is present and if there are any outgoing publishes
* that need to resend. This is only valid if the broker is
* re-establishing a session which was already present. */
if( sessionPresent == true )
{
LogInfo( ( "An MQTT session with broker is re-established. "
"Resending unacked publishes." ) );
/* Handle all the resend of publish messages. */
xReturnStatus = xHandlePublishResend( pxMqttContext );
}
else
{
LogInfo( ( "A clean MQTT connection is established."
" Cleaning up all the stored outgoing publishes.\n\n" ) );
/* Clean up the outgoing publishes waiting for ack as this new
* connection doesn't re-establish an existing session. */
vCleanupOutgoingPublishes();
}
}
}
return xReturnStatus;
}
/*-----------------------------------------------------------*/
BaseType_t xDisconnectMqttSession( MQTTContext_t * pxMqttContext,
NetworkContext_t * pxNetworkContext )
{
MQTTStatus_t xMQTTStatus = MQTTSuccess;
BaseType_t xReturnStatus = pdTRUE;
configASSERT( pxMqttContext != NULL );
configASSERT( pxNetworkContext != NULL );
if( xMqttSessionEstablished == true )
{
/* Send DISCONNECT. */
xMQTTStatus = MQTT_Disconnect( pxMqttContext );
if( xMQTTStatus != MQTTSuccess )
{
LogError( ( "Sending MQTT DISCONNECT failed with status=%s.",
MQTT_Status_strerror( xMQTTStatus ) ) );
xReturnStatus = pdFAIL;
}
}
/* Close the network connection. */
TLS_FreeRTOS_Disconnect( pxNetworkContext );
return xReturnStatus;
}
/*-----------------------------------------------------------*/
BaseType_t xSubscribeToTopic( MQTTContext_t * pxMqttContext,
const char * pcTopicFilter,
uint16_t usTopicFilterLength )
{
BaseType_t xReturnStatus = pdTRUE;
MQTTStatus_t xMQTTStatus;
MQTTSubscribeInfo_t pSubscriptionList[ mqttexampleTOPIC_COUNT ];
configASSERT( pxMqttContext != NULL );
configASSERT( pcTopicFilter != NULL );
configASSERT( usTopicFilterLength > 0 );
/* Start with everything at 0. */
( void ) memset( ( void * ) pSubscriptionList, 0x00, sizeof( pSubscriptionList ) );
/* This example subscribes to only one topic and uses QOS1. */
pSubscriptionList[ 0 ].qos = MQTTQoS1;
pSubscriptionList[ 0 ].pTopicFilter = pcTopicFilter;
pSubscriptionList[ 0 ].topicFilterLength = usTopicFilterLength;
/* Generate packet identifier for the SUBSCRIBE packet. */
globalSubscribePacketIdentifier = MQTT_GetPacketId( pxMqttContext );
/* Send SUBSCRIBE packet. */
xMQTTStatus = MQTT_Subscribe( pxMqttContext,
pSubscriptionList,
sizeof( pSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ),
globalSubscribePacketIdentifier );
if( xMQTTStatus != MQTTSuccess )
{
LogError( ( "Failed to send SUBSCRIBE packet to broker with error = %s.",
MQTT_Status_strerror( xMQTTStatus ) ) );
xReturnStatus = pdFAIL;
}
else
{
LogInfo( ( "SUBSCRIBE topic %.*s to broker.\n\n",
usTopicFilterLength,
pcTopicFilter ) );
/* Process incoming packet from the broker. Acknowledgment for subscription
* ( SUBACK ) will be received here. However after sending the subscribe, the
* client may receive a publish before it receives a subscribe ack. Since this
* demo is subscribing to the topic to which no one is publishing, probability
* of receiving publish message before subscribe ack is zero; but application
* must be ready to receive any packet. This demo uses MQTT_ProcessLoop to
* receive packet from network. */
xMQTTStatus = prvProcessLoopWithTimeout( pxMqttContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS );
if( xMQTTStatus != MQTTSuccess )
{
xReturnStatus = pdFAIL;
LogError( ( "MQTT_ProcessLoop returned with status = %s.",
MQTT_Status_strerror( xMQTTStatus ) ) );
}
}
return xReturnStatus;
}
/*-----------------------------------------------------------*/
BaseType_t xUnsubscribeFromTopic( MQTTContext_t * pxMqttContext,
const char * pcTopicFilter,
uint16_t usTopicFilterLength )
{
BaseType_t xReturnStatus = pdTRUE;
MQTTStatus_t xMQTTStatus;
MQTTSubscribeInfo_t pSubscriptionList[ 1 ];
configASSERT( pxMqttContext != NULL );
configASSERT( pcTopicFilter != NULL );
configASSERT( usTopicFilterLength > 0 );
/* Start with everything at 0. */
( void ) memset( ( void * ) pSubscriptionList, 0x00, sizeof( pSubscriptionList ) );
/* This example subscribes to only one topic and uses QOS1. */
pSubscriptionList[ 0 ].qos = MQTTQoS1;
pSubscriptionList[ 0 ].pTopicFilter = pcTopicFilter;
pSubscriptionList[ 0 ].topicFilterLength = usTopicFilterLength;
/* Generate packet identifier for the UNSUBSCRIBE packet. */
globalUnsubscribePacketIdentifier = MQTT_GetPacketId( pxMqttContext );
/* Send UNSUBSCRIBE packet. */
xMQTTStatus = MQTT_Unsubscribe( pxMqttContext,
pSubscriptionList,
sizeof( pSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ),
globalUnsubscribePacketIdentifier );
if( xMQTTStatus != MQTTSuccess )
{
LogError( ( "Failed to send UNSUBSCRIBE packet to broker with error = %s.",
MQTT_Status_strerror( xMQTTStatus ) ) );
xReturnStatus = pdFAIL;
}
else
{
LogInfo( ( "UNSUBSCRIBE sent topic %.*s to broker.\n\n",
usTopicFilterLength,
pcTopicFilter ) );
/* Process the incoming packet from the broker. */
xMQTTStatus = prvProcessLoopWithTimeout( pxMqttContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS );
if( xMQTTStatus != MQTTSuccess )
{
xReturnStatus = pdFAIL;
LogError( ( "MQTT_ProcessLoop returned with status = %s.",
MQTT_Status_strerror( xMQTTStatus ) ) );
}
}
return xReturnStatus;
}
/*-----------------------------------------------------------*/
BaseType_t xPublishToTopic( MQTTContext_t * pxMqttContext,
const char * pcTopicFilter,
int32_t topicFilterLength,
const char * pcPayload,
size_t payloadLength )
{
BaseType_t xReturnStatus = pdPASS;
MQTTStatus_t xMQTTStatus = MQTTSuccess;
uint8_t ucPublishIndex = MAX_OUTGOING_PUBLISHES;
configASSERT( pxMqttContext != NULL );
configASSERT( pcTopicFilter != NULL );
configASSERT( topicFilterLength > 0 );
/* Get the next free index for the outgoing publish. All QoS1 outgoing
* publishes are stored until a PUBACK is received. These messages are
* stored for supporting a resend if a network connection is broken before
* receiving a PUBACK. */
xReturnStatus = prvGetNextFreeIndexForOutgoingPublishes( &ucPublishIndex );
if( xReturnStatus == pdFAIL )
{
LogError( ( "Unable to find a free spot for outgoing PUBLISH message.\n\n" ) );
}
else
{
LogInfo( ( "the published payload:%.*s \r\n ", payloadLength, pcPayload ) );
/* This example publishes to only one topic and uses QOS1. */
outgoingPublishPackets[ ucPublishIndex ].pubInfo.qos = MQTTQoS1;
outgoingPublishPackets[ ucPublishIndex ].pubInfo.pTopicName = pcTopicFilter;
outgoingPublishPackets[ ucPublishIndex ].pubInfo.topicNameLength = topicFilterLength;
outgoingPublishPackets[ ucPublishIndex ].pubInfo.pPayload = pcPayload;
outgoingPublishPackets[ ucPublishIndex ].pubInfo.payloadLength = payloadLength;
/* Get a new packet id. */
outgoingPublishPackets[ ucPublishIndex ].packetId = MQTT_GetPacketId( pxMqttContext );
/* Send PUBLISH packet. */
xMQTTStatus = MQTT_Publish( pxMqttContext,
&outgoingPublishPackets[ ucPublishIndex ].pubInfo,
outgoingPublishPackets[ ucPublishIndex ].packetId );
if( xMQTTStatus != MQTTSuccess )
{
LogError( ( "Failed to send PUBLISH packet to broker with error = %s.",
MQTT_Status_strerror( xMQTTStatus ) ) );
vCleanupOutgoingPublishAt( ucPublishIndex );
xReturnStatus = pdFAIL;
}
else
{
LogInfo( ( "PUBLISH sent for topic %.*s to broker with packet ID %u.\n\n",
topicFilterLength,
pcTopicFilter,
outgoingPublishPackets[ ucPublishIndex ].packetId ) );
/* Calling MQTT_ProcessLoop to process incoming publish echo, since
* application subscribed to the same topic the broker will send
* publish message back to the application. This function also
* sends ping request to broker if MQTT_KEEP_ALIVE_INTERVAL_SECONDS
* has expired since the last MQTT packet sent and receive
* ping responses. */
xMQTTStatus = prvProcessLoopWithTimeout( pxMqttContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS );
if( xMQTTStatus != MQTTSuccess )
{
LogError( ( "MQTT_ProcessLoop returned with status = %s.",
MQTT_Status_strerror( xMQTTStatus ) ) );
xReturnStatus = pdFAIL;
}
}
}
return xReturnStatus;
}
/*-----------------------------------------------------------*/
BaseType_t xProcessLoop( MQTTContext_t * pxMqttContext,
uint32_t ulTimeoutMs )
{
BaseType_t xReturnStatus = pdFAIL;
MQTTStatus_t xMQTTStatus = MQTTSuccess;
xMQTTStatus = prvProcessLoopWithTimeout( pxMqttContext, ulTimeoutMs );
if( xMQTTStatus != MQTTSuccess )
{
LogError( ( "MQTT_ProcessLoop returned with status = %s.",
MQTT_Status_strerror( xMQTTStatus ) ) );
}
else
{
LogDebug( ( "MQTT_ProcessLoop successful." ) );
xReturnStatus = pdPASS;
}
return xReturnStatus;
}
/*-----------------------------------------------------------*/
static uint32_t prvGetTimeMs( void )
{
TickType_t xTickCount = 0;
uint32_t ulTimeMs = 0UL;
/* Get the current tick count. */
xTickCount = xTaskGetTickCount();
/* Convert the ticks to milliseconds. */
ulTimeMs = ( uint32_t ) xTickCount * MILLISECONDS_PER_TICK;
/* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the
* elapsed time in the application. */
ulTimeMs = ( uint32_t ) ( ulTimeMs - ulGlobalEntryTimeMs );
return ulTimeMs;
}
/*-----------------------------------------------------------*/