Rename \FreeRTOS-Plus\Source\FreeRTOS-Plus-IoT-SDK to \FreeRTOS-Plus\Source\FreeRTOS-IoT-Libraries.

This commit is contained in:
Gaurav Aggarwal 2019-07-23 03:41:27 +00:00
parent 7af8756c97
commit 9dd72d4b44
45 changed files with 0 additions and 0 deletions

View file

@ -0,0 +1,794 @@
/*
* Amazon FreeRTOS MQTT V2.0.0
* Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* http://aws.amazon.com/freertos
* http://www.FreeRTOS.org
*/
/**
* @file iot_mqtt_agent.c
* @brief MQTT Agent implementation. Provides backwards compatibility between
* MQTT v2 and MQTT v1.
*/
/* The config header is always included first. */
#include "iot_config.h"
/* Standard includes. */
#include <string.h>
/* FreeRTOS includes. */
#include "FreeRTOS.h"
#include "semphr.h"
/* MQTT v1 includes. */
#include "iot_mqtt_agent.h"
#include "iot_mqtt_agent_config.h"
#include "iot_mqtt_agent_config_defaults.h"
/* MQTT v2 include. */
#include "iot_mqtt.h"
/* Platform network include. */
#include "platform/iot_network_freertos.h"
/*-----------------------------------------------------------*/
/**
* @brief Converts FreeRTOS ticks to milliseconds.
*/
#define mqttTICKS_TO_MS( xTicks ) ( xTicks * 1000 / configTICK_RATE_HZ )
/*-----------------------------------------------------------*/
/**
* @brief Stores data to convert between the MQTT v1 subscription callback
* and the MQTT v2 subscription callback.
*/
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
typedef struct MQTTCallback
{
BaseType_t xInUse; /**< Whether this instance is in-use. */
MQTTPublishCallback_t xFunction; /**< MQTT v1 callback function. */
void * pvParameter; /**< Parameter to xFunction. */
uint16_t usTopicFilterLength; /**< Length of pcTopicFilter. */
char pcTopicFilter[ mqttconfigSUBSCRIPTION_MANAGER_MAX_TOPIC_LENGTH ]; /**< Topic filter. */
} MQTTCallback_t;
#endif
/**
* @brief Stores data on an active MQTT connection.
*/
typedef struct MQTTConnection
{
IotMqttConnection_t xMQTTConnection; /**< MQTT v2 connection handle. */
MQTTAgentCallback_t pxCallback; /**< MQTT v1 global callback. */
void * pvUserData; /**< Parameter to pxCallback. */
StaticSemaphore_t xConnectionMutex; /**< Protects from concurrent accesses. */
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
MQTTCallback_t xCallbacks /**< Conversion table of MQTT v1 to MQTT v2 subscription callbacks. */
[ mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS ];
#endif
} MQTTConnection_t;
/*-----------------------------------------------------------*/
/**
* @brief Convert an MQTT v2 return code to an MQTT v1 return code.
*
* @param[in] xMqttStatus The MQTT v2 return code.
*
* @return An equivalent MQTT v1 return code.
*/
static inline MQTTAgentReturnCode_t prvConvertReturnCode( IotMqttError_t xMqttStatus );
/**
* @brief Wraps an MQTT v1 publish callback.
*
* @param[in] pvParameter The MQTT connection.
* @param[in] pxPublish Information about the incoming publish.
*/
static void prvPublishCallbackWrapper( void * pvParameter,
IotMqttCallbackParam_t * const pxPublish );
/**
* @brief Wraps an MQTT v1 disconnect callback.
*
* @param[in] pvCallbackContext The MQTT connection.
* @param[in] pxDisconnect Information about the disconnect.
*/
static void prvDisconnectCallbackWrapper( void * pvParameter,
IotMqttCallbackParam_t * pxDisconnect );
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
/**
* @brief Store an MQTT v1 callback in the conversion table.
*
* @param[in] pxConnection Where to store the callback.
* @param[in] pcTopicFilter Topic filter to store.
* @param[in] usTopicFilterLength Length of pcTopicFilter.
* @param[in] xCallback MQTT v1 callback to store.
* @param[in] pvParameter Parameter to xCallback.
*
* @return pdPASS if the callback was successfully stored; pdFAIL otherwise.
*/
static BaseType_t prvStoreCallback( MQTTConnection_t * const pxConnection,
const char * const pcTopicFilter,
uint16_t usTopicFilterLength,
MQTTPublishCallback_t xCallback,
void * pvParameter );
/**
* @brief Search the callback conversion table for the given topic filter.
*
* @param[in] pxConnection The connection containing the conversion table.
* @param[in] pcTopicFilter The topic filter to search for.
* @param[in] usTopicFilterLength The length of pcTopicFilter.
*
* @return A pointer to the callback entry if found; NULL otherwise.
* @note This function should be called with pxConnection->xConnectionMutex
* locked.
*/
static MQTTCallback_t * prvFindCallback( MQTTConnection_t * const pxConnection,
const char * const pcTopicFilter,
uint16_t usTopicFilterLength );
/**
* @brief Remove a topic filter from the callback conversion table.
*
* @param[in] pxConnection The connection containing the conversion table.
* @param[in] pcTopicFilter The topic filter to remove.
* @param[in] usTopicFilterLength The length of pcTopic.
*/
static void prvRemoveCallback( MQTTConnection_t * const pxConnection,
const char * const pcTopicFilter,
uint16_t usTopicFilterLength );
#endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
/*-----------------------------------------------------------*/
/**
* @brief The number of available MQTT brokers, controlled by the constant
* mqttconfigMAX_BROKERS;
*/
static UBaseType_t uxAvailableBrokers = mqttconfigMAX_BROKERS;
/*-----------------------------------------------------------*/
static inline MQTTAgentReturnCode_t prvConvertReturnCode( IotMqttError_t xMqttStatus )
{
MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;
switch( xMqttStatus )
{
case IOT_MQTT_SUCCESS:
case IOT_MQTT_STATUS_PENDING:
xStatus = eMQTTAgentSuccess;
break;
case IOT_MQTT_TIMEOUT:
xStatus = eMQTTAgentTimeout;
break;
default:
xStatus = eMQTTAgentFailure;
break;
}
return xStatus;
}
/*-----------------------------------------------------------*/
static void prvPublishCallbackWrapper( void * pvParameter,
IotMqttCallbackParam_t * const pxPublish )
{
BaseType_t xStatus = pdPASS;
size_t xBufferSize = 0;
uint8_t * pucMqttBuffer = NULL;
MQTTBool_t xCallbackReturn = eMQTTFalse;
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) pvParameter;
MQTTAgentCallbackParams_t xPublishData = { .xMQTTEvent = eMQTTAgentPublish };
/* Calculate the size of the MQTT buffer that must be allocated. */
if( xStatus == pdPASS )
{
xBufferSize = pxPublish->u.message.info.topicNameLength +
pxPublish->u.message.info.payloadLength;
/* Check for overflow. */
if( ( xBufferSize < pxPublish->u.message.info.topicNameLength ) ||
( xBufferSize < pxPublish->u.message.info.payloadLength ) )
{
mqttconfigDEBUG_LOG( ( "Incoming PUBLISH message and topic name length too large.\r\n" ) );
xStatus = pdFAIL;
}
}
/* Allocate an MQTT buffer for the callback. */
if( xStatus == pdPASS )
{
pucMqttBuffer = pvPortMalloc( xBufferSize );
if( pucMqttBuffer == NULL )
{
mqttconfigDEBUG_LOG( ( "Failed to allocate memory for MQTT buffer.\r\n" ) );
xStatus = pdFAIL;
}
else
{
/* Copy the topic name and payload. The topic name and payload must be
* copied in case the user decides to take ownership of the MQTT buffer.
* The original buffer containing the MQTT topic name and payload may
* contain further unprocessed packets and must remain property of the
* MQTT library. Therefore, the topic name and payload are copied into
* another buffer for the user. */
( void ) memcpy( pucMqttBuffer,
pxPublish->u.message.info.pTopicName,
pxPublish->u.message.info.topicNameLength );
( void ) memcpy( pucMqttBuffer + pxPublish->u.message.info.topicNameLength,
pxPublish->u.message.info.pPayload,
pxPublish->u.message.info.payloadLength );
/* Set the members of the callback parameter. */
xPublishData.xMQTTEvent = eMQTTAgentPublish;
xPublishData.u.xPublishData.pucTopic = pucMqttBuffer;
xPublishData.u.xPublishData.usTopicLength = pxPublish->u.message.info.topicNameLength;
xPublishData.u.xPublishData.pvData = pucMqttBuffer + pxPublish->u.message.info.topicNameLength;
xPublishData.u.xPublishData.ulDataLength = ( uint32_t ) pxPublish->u.message.info.payloadLength;
xPublishData.u.xPublishData.xQos = ( MQTTQoS_t ) pxPublish->u.message.info.qos;
xPublishData.u.xPublishData.xBuffer = pucMqttBuffer;
}
}
if( xStatus == pdPASS )
{
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
/* When subscription management is enabled, search for a matching subscription. */
MQTTCallback_t * pxCallbackEntry = prvFindCallback( pxConnection,
pxPublish->u.message.pTopicFilter,
pxPublish->u.message.topicFilterLength );
/* Check if a matching MQTT v1 subscription was found. */
if( pxCallbackEntry != NULL )
{
/* Invoke the topic-specific callback if it exists. */
if( pxCallbackEntry->xFunction != NULL )
{
xCallbackReturn = pxCallbackEntry->xFunction( pxCallbackEntry->pvParameter,
&( xPublishData.u.xPublishData ) );
}
else
{
/* Otherwise, invoke the global callback. */
if( pxConnection->pxCallback != NULL )
{
xCallbackReturn = ( MQTTBool_t ) pxConnection->pxCallback( pxConnection->pvUserData,
&xPublishData );
}
}
}
#else /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
/* When subscription management is disabled, invoke the global callback
* if one exists. */
/* When subscription management is disabled, the topic filter must be "#". */
mqttconfigASSERT( *( xPublish.message.pTopicFilter ) == '#' );
mqttconfigASSERT( xPublish.message.topicFilterLength == 1 );
if( pxConnection->pxCallback != NULL )
{
xCallbackReturn = ( MQTTBool_t ) pxConnection->pxCallback( pxConnection->pvUserData,
&xPublishData );
}
#endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
}
/* Free the MQTT buffer if the user did not take ownership of it. */
if( ( xCallbackReturn == eMQTTFalse ) && ( pucMqttBuffer != NULL ) )
{
vPortFree( pucMqttBuffer );
}
}
/*-----------------------------------------------------------*/
static void prvDisconnectCallbackWrapper( void * pvParameter,
IotMqttCallbackParam_t * pxDisconnect )
{
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) pvParameter;
MQTTAgentCallbackParams_t xCallbackParams = { .xMQTTEvent = eMQTTAgentDisconnect };
( void ) pxDisconnect;
/* This function should only be called if a callback was set. */
mqttconfigASSERT( pxConnection->pxCallback != NULL );
/* Invoke the MQTT v1 callback. Ignore the return value. */
pxConnection->pxCallback( pxConnection->pvUserData,
&xCallbackParams );
}
/*-----------------------------------------------------------*/
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
static BaseType_t prvStoreCallback( MQTTConnection_t * const pxConnection,
const char * const pcTopicFilter,
uint16_t usTopicFilterLength,
MQTTPublishCallback_t xCallback,
void * pvParameter )
{
MQTTCallback_t * pxCallback = NULL;
BaseType_t xStatus = pdFAIL, i = 0;
/* Prevent other tasks from modifying stored callbacks while this function
* runs. */
if( xSemaphoreTake( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ),
portMAX_DELAY ) == pdTRUE )
{
/* Check if the topic filter already has an entry. */
pxCallback = prvFindCallback( pxConnection, pcTopicFilter, usTopicFilterLength );
if( pxCallback == NULL )
{
/* If no entry was found, find a free entry. */
for( i = 0; i < mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS; i++ )
{
if( pxConnection->xCallbacks[ i ].xInUse == pdFALSE )
{
pxConnection->xCallbacks[ i ].xInUse = pdTRUE;
pxCallback = &( pxConnection->xCallbacks[ i ] );
break;
}
}
}
/* Set the members of the callback entry. */
if( i < mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS )
{
pxCallback->pvParameter = pvParameter;
pxCallback->usTopicFilterLength = usTopicFilterLength;
pxCallback->xFunction = xCallback;
( void ) strncpy( pxCallback->pcTopicFilter, pcTopicFilter, usTopicFilterLength );
xStatus = pdPASS;
}
( void ) xSemaphoreGive( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ) );
}
return xStatus;
}
/*-----------------------------------------------------------*/
static MQTTCallback_t * prvFindCallback( MQTTConnection_t * const pxConnection,
const char * const pcTopicFilter,
uint16_t usTopicFilterLength )
{
BaseType_t i = 0;
MQTTCallback_t * pxResult = NULL;
/* Search the callback conversion table for the topic filter. */
for( i = 0; i < mqttconfigSUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS; i++ )
{
if( ( pxConnection->xCallbacks[ i ].usTopicFilterLength == usTopicFilterLength ) &&
( strncmp( pxConnection->xCallbacks[ i ].pcTopicFilter,
pcTopicFilter,
usTopicFilterLength ) == 0 ) )
{
pxResult = &( pxConnection->xCallbacks[ i ] );
break;
}
}
return pxResult;
}
/*-----------------------------------------------------------*/
static void prvRemoveCallback( MQTTConnection_t * const pxConnection,
const char * const pcTopicFilter,
uint16_t usTopicFilterLength )
{
MQTTCallback_t * pxCallback = NULL;
/* Prevent other tasks from modifying stored callbacks while this function
* runs. */
if( xSemaphoreTake( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ),
portMAX_DELAY ) == pdTRUE )
{
/* Find the given topic filter. */
pxCallback = prvFindCallback( pxConnection, pcTopicFilter, usTopicFilterLength );
if( pxCallback != NULL )
{
/* Clear the callback entry. */
mqttconfigASSERT( pxCallback->xInUse == pdTRUE );
( void ) memset( pxCallback, 0x00, sizeof( MQTTCallback_t ) );
}
( void ) xSemaphoreGive( ( QueueHandle_t ) &( pxConnection->xConnectionMutex ) );
}
}
#endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
/*-----------------------------------------------------------*/
IotMqttConnection_t MQTT_AGENT_Getv2Connection( MQTTAgentHandle_t xMQTTHandle )
{
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
return pxConnection->xMQTTConnection;
}
/*-----------------------------------------------------------*/
BaseType_t MQTT_AGENT_Init( void )
{
BaseType_t xStatus = pdFALSE;
/* Call the initialization function of MQTT v2. */
if( IotMqtt_Init() == IOT_MQTT_SUCCESS )
{
xStatus = pdTRUE;
}
return xStatus;
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_Create( MQTTAgentHandle_t * const pxMQTTHandle )
{
MQTTConnection_t * pxNewConnection = NULL;
MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;
/* Check how many brokers are available; fail if all brokers are in use. */
taskENTER_CRITICAL();
{
if( uxAvailableBrokers == 0 )
{
xStatus = eMQTTAgentFailure;
}
else
{
uxAvailableBrokers--;
mqttconfigASSERT( uxAvailableBrokers <= mqttconfigMAX_BROKERS );
}
}
taskEXIT_CRITICAL();
/* Allocate memory for an MQTT connection. */
if( xStatus == eMQTTAgentSuccess )
{
pxNewConnection = pvPortMalloc( sizeof( MQTTConnection_t ) );
if( pxNewConnection == NULL )
{
xStatus = eMQTTAgentFailure;
taskENTER_CRITICAL();
{
uxAvailableBrokers++;
mqttconfigASSERT( uxAvailableBrokers <= mqttconfigMAX_BROKERS );
}
taskEXIT_CRITICAL();
}
else
{
( void ) memset( pxNewConnection, 0x00, sizeof( MQTTConnection_t ) );
pxNewConnection->xMQTTConnection = IOT_MQTT_CONNECTION_INITIALIZER;
}
}
/* Create the connection mutex and set the output parameter. */
if( xStatus == eMQTTAgentSuccess )
{
( void ) xSemaphoreCreateMutexStatic( &( pxNewConnection->xConnectionMutex ) );
*pxMQTTHandle = ( MQTTAgentHandle_t ) pxNewConnection;
}
return xStatus;
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_Delete( MQTTAgentHandle_t xMQTTHandle )
{
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
/* Clean up any allocated MQTT or network resources. */
if( pxConnection->xMQTTConnection != IOT_MQTT_CONNECTION_INITIALIZER )
{
IotMqtt_Disconnect( pxConnection->xMQTTConnection, IOT_MQTT_FLAG_CLEANUP_ONLY );
pxConnection->xMQTTConnection = IOT_MQTT_CONNECTION_INITIALIZER;
}
/* Free memory used by the MQTT connection. */
vPortFree( pxConnection );
/* Increment the number of available brokers. */
taskENTER_CRITICAL();
{
uxAvailableBrokers++;
mqttconfigASSERT( uxAvailableBrokers <= mqttconfigMAX_BROKERS );
}
taskEXIT_CRITICAL();
return eMQTTAgentSuccess;
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_Connect( MQTTAgentHandle_t xMQTTHandle,
const MQTTAgentConnectParams_t * const pxConnectParams,
TickType_t xTimeoutTicks )
{
MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;
IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
IotNetworkServerInfo_t xServerInfo = { 0 };
IotNetworkCredentials_t xCredentials = AWS_IOT_NETWORK_CREDENTIALS_AFR_INITIALIZER, * pxCredentials = NULL;
IotMqttNetworkInfo_t xNetworkInfo = IOT_MQTT_NETWORK_INFO_INITIALIZER;
IotMqttConnectInfo_t xMqttConnectInfo = IOT_MQTT_CONNECT_INFO_INITIALIZER;
/* Copy the global callback and parameter. */
pxConnection->pxCallback = pxConnectParams->pxCallback;
pxConnection->pvUserData = pxConnectParams->pvUserData;
/* Set the TLS info for a secured connection. */
if( ( pxConnectParams->xSecuredConnection == pdTRUE ) ||
( ( pxConnectParams->xFlags & mqttagentREQUIRE_TLS ) == mqttagentREQUIRE_TLS ) )
{
pxCredentials = &xCredentials;
/* Set the server certificate. Other credentials are set by the initializer. */
xCredentials.pRootCa = pxConnectParams->pcCertificate;
xCredentials.rootCaSize = ( size_t ) pxConnectParams->ulCertificateSize;
/* Disable ALPN if requested. */
if( ( pxConnectParams->xFlags & mqttagentUSE_AWS_IOT_ALPN_443 ) == 0 )
{
xCredentials.pAlpnProtos = NULL;
}
/* Disable SNI if requested. */
if( ( pxConnectParams->xURLIsIPAddress == pdTRUE ) ||
( ( pxConnectParams->xFlags & mqttagentURL_IS_IP_ADDRESS ) == mqttagentURL_IS_IP_ADDRESS ) )
{
xCredentials.disableSni = true;
}
}
/* Set the server info. */
xServerInfo.pHostName = pxConnectParams->pcURL;
xServerInfo.port = pxConnectParams->usPort;
/* Set the members of the network info. */
xNetworkInfo.createNetworkConnection = true;
xNetworkInfo.u.setup.pNetworkServerInfo = &xServerInfo;
xNetworkInfo.u.setup.pNetworkCredentialInfo = pxCredentials;
xNetworkInfo.pNetworkInterface = IOT_NETWORK_INTERFACE_AFR;
if( pxConnectParams->pxCallback != NULL )
{
xNetworkInfo.disconnectCallback.function = prvDisconnectCallbackWrapper;
xNetworkInfo.disconnectCallback.pCallbackContext = pxConnection;
}
/* Set the members of the MQTT connect info. */
xMqttConnectInfo.awsIotMqttMode = true;
xMqttConnectInfo.cleanSession = true;
xMqttConnectInfo.pClientIdentifier = ( const char * ) ( pxConnectParams->pucClientId );
xMqttConnectInfo.clientIdentifierLength = pxConnectParams->usClientIdLength;
xMqttConnectInfo.keepAliveSeconds = mqttconfigKEEP_ALIVE_INTERVAL_SECONDS;
/* Call MQTT v2's CONNECT function. */
xMqttStatus = IotMqtt_Connect( &xNetworkInfo,
&xMqttConnectInfo,
mqttTICKS_TO_MS( xTimeoutTicks ),
&( pxConnection->xMQTTConnection ) );
xStatus = prvConvertReturnCode( xMqttStatus );
/* Add a subscription to "#" to support the global callback when subscription
* manager is disabled. */
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 0 )
IotMqttSubscription_t xGlobalSubscription = IOT_MQTT_SUBSCRIPTION_INITIALIZER;
IotMqttReference_t xGlobalSubscriptionRef = IOT_MQTT_REFERENCE_INITIALIZER;
if( xStatus == eMQTTAgentSuccess )
{
xGlobalSubscription.pTopicFilter = "#";
xGlobalSubscription.topicFilterLength = 1;
xGlobalSubscription.qos = 0;
xGlobalSubscription.callback.param1 = pxConnection;
xGlobalSubscription.callback.function = prvPublishCallbackWrapper;
xMqttStatus = IotMqtt_Subscribe( pxConnection->xMQTTConnection,
&xGlobalSubscription,
1,
IOT_MQTT_FLAG_WAITABLE,
NULL,
&xGlobalSubscriptionRef );
xStatus = prvConvertReturnCode( xMqttStatus );
}
/* Wait for the subscription to "#" to complete. */
if( xStatus == eMQTTAgentSuccess )
{
xMqttStatus = IotMqtt_Wait( xGlobalSubscriptionRef,
mqttTICKS_TO_MS( xTimeoutTicks ) );
xStatus = prvConvertReturnCode( xMqttStatus );
}
#endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
return xStatus;
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_Disconnect( MQTTAgentHandle_t xMQTTHandle,
TickType_t xTimeoutTicks )
{
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
/* MQTT v2's DISCONNECT function does not have a timeout argument. */
( void ) xTimeoutTicks;
/* Check that the connection is established. */
if( pxConnection->xMQTTConnection != IOT_MQTT_CONNECTION_INITIALIZER )
{
/* Call MQTT v2's DISCONNECT function. */
IotMqtt_Disconnect( pxConnection->xMQTTConnection,
0 );
pxConnection->xMQTTConnection = IOT_MQTT_CONNECTION_INITIALIZER;
}
return eMQTTAgentSuccess;
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_Subscribe( MQTTAgentHandle_t xMQTTHandle,
const MQTTAgentSubscribeParams_t * const pxSubscribeParams,
TickType_t xTimeoutTicks )
{
MQTTAgentReturnCode_t xStatus = eMQTTAgentSuccess;
IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
IotMqttSubscription_t xSubscription = IOT_MQTT_SUBSCRIPTION_INITIALIZER;
/* Store the topic filter if subscription management is enabled. */
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
/* Check topic filter length. */
if( pxSubscribeParams->usTopicLength > mqttconfigSUBSCRIPTION_MANAGER_MAX_TOPIC_LENGTH )
{
xStatus = eMQTTAgentFailure;
}
/* Store the subscription. */
if( prvStoreCallback( pxConnection,
( const char * ) pxSubscribeParams->pucTopic,
pxSubscribeParams->usTopicLength,
pxSubscribeParams->pxPublishCallback,
pxSubscribeParams->pvPublishCallbackContext ) == pdFAIL )
{
xStatus = eMQTTAgentFailure;
}
#endif /* if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 ) */
/* Call MQTT v2 blocking SUBSCRIBE function. */
if( xStatus == eMQTTAgentSuccess )
{
/* Set the members of the MQTT subscription. */
xSubscription.pTopicFilter = ( const char * ) ( pxSubscribeParams->pucTopic );
xSubscription.topicFilterLength = pxSubscribeParams->usTopicLength;
xSubscription.qos = ( IotMqttQos_t ) pxSubscribeParams->xQoS;
xSubscription.callback.pCallbackContext = pxConnection;
xSubscription.callback.function = prvPublishCallbackWrapper;
xMqttStatus = IotMqtt_TimedSubscribe( pxConnection->xMQTTConnection,
&xSubscription,
1,
0,
mqttTICKS_TO_MS( xTimeoutTicks ) );
xStatus = prvConvertReturnCode( xMqttStatus );
}
return xStatus;
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_Unsubscribe( MQTTAgentHandle_t xMQTTHandle,
const MQTTAgentUnsubscribeParams_t * const pxUnsubscribeParams,
TickType_t xTimeoutTicks )
{
IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
IotMqttSubscription_t xSubscription = IOT_MQTT_SUBSCRIPTION_INITIALIZER;
/* Remove any subscription callback that may be registered. */
#if ( mqttconfigENABLE_SUBSCRIPTION_MANAGEMENT == 1 )
prvRemoveCallback( pxConnection,
( const char * ) ( pxUnsubscribeParams->pucTopic ),
pxUnsubscribeParams->usTopicLength );
#endif
/* Set the members of the subscription to remove. */
xSubscription.pTopicFilter = ( const char * ) ( pxUnsubscribeParams->pucTopic );
xSubscription.topicFilterLength = pxUnsubscribeParams->usTopicLength;
xSubscription.callback.pCallbackContext = pxConnection;
xSubscription.callback.function = prvPublishCallbackWrapper;
/* Call MQTT v2 blocking UNSUBSCRIBE function. */
xMqttStatus = IotMqtt_TimedUnsubscribe( pxConnection->xMQTTConnection,
&xSubscription,
1,
0,
mqttTICKS_TO_MS( xTimeoutTicks ) );
return prvConvertReturnCode( xMqttStatus );
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_Publish( MQTTAgentHandle_t xMQTTHandle,
const MQTTAgentPublishParams_t * const pxPublishParams,
TickType_t xTimeoutTicks )
{
IotMqttError_t xMqttStatus = IOT_MQTT_STATUS_PENDING;
MQTTConnection_t * pxConnection = ( MQTTConnection_t * ) xMQTTHandle;
IotMqttPublishInfo_t xPublishInfo = IOT_MQTT_PUBLISH_INFO_INITIALIZER;
/* Set the members of the publish info. */
xPublishInfo.pTopicName = ( const char * ) pxPublishParams->pucTopic;
xPublishInfo.topicNameLength = pxPublishParams->usTopicLength;
xPublishInfo.qos = ( IotMqttQos_t ) pxPublishParams->xQoS;
xPublishInfo.pPayload = ( const void * ) pxPublishParams->pvData;
xPublishInfo.payloadLength = pxPublishParams->ulDataLength;
/* Call the MQTT v2 blocking PUBLISH function. */
xMqttStatus = IotMqtt_TimedPublish( pxConnection->xMQTTConnection,
&xPublishInfo,
0,
mqttTICKS_TO_MS( xTimeoutTicks ) );
return prvConvertReturnCode( xMqttStatus );
}
/*-----------------------------------------------------------*/
MQTTAgentReturnCode_t MQTT_AGENT_ReturnBuffer( MQTTAgentHandle_t xMQTTHandle,
MQTTBufferHandle_t xBufferHandle )
{
( void ) xMQTTHandle;
/* Free the MQTT buffer. */
vPortFree( xBufferHandle );
return eMQTTAgentSuccess;
}
/*-----------------------------------------------------------*/

View file

@ -0,0 +1,912 @@
/*
* Amazon FreeRTOS MQTT V2.0.0
* Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* http://aws.amazon.com/freertos
* http://www.FreeRTOS.org
*/
/**
* @file iot_mqtt_network.c
* @brief Implements functions involving transport layer connections.
*/
/* The config header is always included first. */
#include "iot_config.h"
/* Standard includes. */
#include <string.h>
/* Error handling include. */
#include "private/iot_error.h"
/* MQTT internal include. */
#include "private/iot_mqtt_internal.h"
/* Platform layer includes. */
#include "platform/iot_threads.h"
/*-----------------------------------------------------------*/
/**
* @brief Check if an incoming packet type is valid.
*
* @param[in] packetType The packet type to check.
*
* @return `true` if the packet type is valid; `false` otherwise.
*/
static bool _incomingPacketValid( uint8_t packetType );
/**
* @brief Get an incoming MQTT packet from the network.
*
* @param[in] pNetworkConnection Network connection to use for receive, which
* may be different from the network connection associated with the MQTT connection.
* @param[in] pMqttConnection The associated MQTT connection.
* @param[out] pIncomingPacket Output parameter for the incoming packet.
*
* @return #IOT_MQTT_SUCCESS, #IOT_MQTT_NO_MEMORY or #IOT_MQTT_BAD_RESPONSE.
*/
static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,
const _mqttConnection_t * pMqttConnection,
_mqttPacket_t * pIncomingPacket );
/**
* @brief Deserialize a packet received from the network.
*
* @param[in] pMqttConnection The associated MQTT connection.
* @param[in] pIncomingPacket The packet received from the network.
*
* @return #IOT_MQTT_SUCCESS, #IOT_MQTT_NO_MEMORY, #IOT_MQTT_NETWORK_ERROR,
* #IOT_MQTT_SCHEDULING_ERROR, #IOT_MQTT_BAD_RESPONSE, or #IOT_MQTT_SERVER_REFUSED.
*/
static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConnection,
_mqttPacket_t * pIncomingPacket );
/**
* @brief Send a PUBACK for a received QoS 1 PUBLISH packet.
*
* @param[in] pMqttConnection Which connection the PUBACK should be sent over.
* @param[in] packetIdentifier Which packet identifier to include in PUBACK.
*/
static void _sendPuback( _mqttConnection_t * pMqttConnection,
uint16_t packetIdentifier );
/*-----------------------------------------------------------*/
static bool _incomingPacketValid( uint8_t packetType )
{
bool status = true;
/* Check packet type. Mask out lower bits to ignore flags. */
switch( packetType & 0xf0 )
{
/* Valid incoming packet types. */
case MQTT_PACKET_TYPE_CONNACK:
case MQTT_PACKET_TYPE_PUBLISH:
case MQTT_PACKET_TYPE_PUBACK:
case MQTT_PACKET_TYPE_SUBACK:
case MQTT_PACKET_TYPE_UNSUBACK:
case MQTT_PACKET_TYPE_PINGRESP:
break;
/* Any other packet type is invalid. */
default:
status = false;
break;
}
return status;
}
/*-----------------------------------------------------------*/
static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,
const _mqttConnection_t * pMqttConnection,
_mqttPacket_t * pIncomingPacket )
{
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
size_t dataBytesRead = 0;
/* Default functions for retrieving packet type and length. */
uint8_t ( * getPacketType )( void *,
const IotNetworkInterface_t * ) = _IotMqtt_GetPacketType;
size_t ( * getRemainingLength )( void *,
const IotNetworkInterface_t * ) = _IotMqtt_GetRemainingLength;
/* No buffer for remaining data should be allocated. */
IotMqtt_Assert( pIncomingPacket->pRemainingData == NULL );
IotMqtt_Assert( pIncomingPacket->remainingLength == 0 );
/* Choose packet type and length functions. */
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( pMqttConnection->pSerializer != NULL )
{
if( pMqttConnection->pSerializer->getPacketType != NULL )
{
getPacketType = pMqttConnection->pSerializer->getPacketType;
}
else
{
EMPTY_ELSE_MARKER;
}
if( pMqttConnection->pSerializer->getRemainingLength != NULL )
{
getRemainingLength = pMqttConnection->pSerializer->getRemainingLength;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Read the packet type, which is the first byte available. */
pIncomingPacket->type = getPacketType( pNetworkConnection,
pMqttConnection->pNetworkInterface );
/* Check that the incoming packet type is valid. */
if( _incomingPacketValid( pIncomingPacket->type ) == false )
{
IotLogError( "(MQTT connection %p) Unknown packet type %02x received.",
pMqttConnection,
pIncomingPacket->type );
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Read the remaining length. */
pIncomingPacket->remainingLength = getRemainingLength( pNetworkConnection,
pMqttConnection->pNetworkInterface );
if( pIncomingPacket->remainingLength == MQTT_REMAINING_LENGTH_INVALID )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Allocate a buffer for the remaining data and read the data. */
if( pIncomingPacket->remainingLength > 0 )
{
pIncomingPacket->pRemainingData = IotMqtt_MallocMessage( pIncomingPacket->remainingLength );
if( pIncomingPacket->pRemainingData == NULL )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
}
else
{
EMPTY_ELSE_MARKER;
}
dataBytesRead = pMqttConnection->pNetworkInterface->receive( pNetworkConnection,
pIncomingPacket->pRemainingData,
pIncomingPacket->remainingLength );
if( dataBytesRead != pIncomingPacket->remainingLength )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Clean up on error. */
IOT_FUNCTION_CLEANUP_BEGIN();
if( status != IOT_MQTT_SUCCESS )
{
if( pIncomingPacket->pRemainingData != NULL )
{
IotMqtt_FreeMessage( pIncomingPacket->pRemainingData );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
IOT_FUNCTION_CLEANUP_END();
}
/*-----------------------------------------------------------*/
static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConnection,
_mqttPacket_t * pIncomingPacket )
{
IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
_mqttOperation_t * pOperation = NULL;
/* Deserializer function. */
IotMqttError_t ( * deserialize )( _mqttPacket_t * ) = NULL;
/* A buffer for remaining data must be allocated if remaining length is not 0. */
IotMqtt_Assert( ( pIncomingPacket->remainingLength > 0 ) ==
( pIncomingPacket->pRemainingData != NULL ) );
/* Only valid packets should be given to this function. */
IotMqtt_Assert( _incomingPacketValid( pIncomingPacket->type ) == true );
/* Mask out the low bits of packet type to ignore flags. */
switch( ( pIncomingPacket->type & 0xf0 ) )
{
case MQTT_PACKET_TYPE_CONNACK:
IotLogDebug( "(MQTT connection %p) CONNACK in data stream.", pMqttConnection );
/* Choose CONNACK deserializer. */
deserialize = _IotMqtt_DeserializeConnack;
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( pMqttConnection->pSerializer != NULL )
{
if( pMqttConnection->pSerializer->deserialize.connack != NULL )
{
deserialize = pMqttConnection->pSerializer->deserialize.connack;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Deserialize CONNACK and notify of result. */
status = deserialize( pIncomingPacket );
pOperation = _IotMqtt_FindOperation( pMqttConnection,
IOT_MQTT_CONNECT,
NULL );
if( pOperation != NULL )
{
pOperation->u.operation.status = status;
_IotMqtt_Notify( pOperation );
}
else
{
EMPTY_ELSE_MARKER;
}
break;
case MQTT_PACKET_TYPE_PUBLISH:
IotLogDebug( "(MQTT connection %p) PUBLISH in data stream.", pMqttConnection );
/* Allocate memory to handle the incoming PUBLISH. */
pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) );
if( pOperation == NULL )
{
IotLogWarn( "Failed to allocate memory for incoming PUBLISH." );
status = IOT_MQTT_NO_MEMORY;
break;
}
else
{
/* Set the members of the incoming PUBLISH operation. */
( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) );
pOperation->incomingPublish = true;
pOperation->pMqttConnection = pMqttConnection;
pIncomingPacket->u.pIncomingPublish = pOperation;
}
/* Choose a PUBLISH deserializer. */
deserialize = _IotMqtt_DeserializePublish;
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( pMqttConnection->pSerializer != NULL )
{
if( pMqttConnection->pSerializer->deserialize.publish != NULL )
{
deserialize = pMqttConnection->pSerializer->deserialize.publish;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Deserialize incoming PUBLISH. */
status = deserialize( pIncomingPacket );
if( status == IOT_MQTT_SUCCESS )
{
/* Send a PUBACK for QoS 1 PUBLISH. */
if( pOperation->u.publish.publishInfo.qos == IOT_MQTT_QOS_1 )
{
_sendPuback( pMqttConnection, pIncomingPacket->packetIdentifier );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Transfer ownership of the received MQTT packet to the PUBLISH operation. */
pOperation->u.publish.pReceivedData = pIncomingPacket->pRemainingData;
pIncomingPacket->pRemainingData = NULL;
/* Add the PUBLISH to the list of operations pending processing. */
IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),
&( pOperation->link ) );
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
/* Increment the MQTT connection reference count before scheduling an
* incoming PUBLISH. */
if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == true )
{
/* Schedule PUBLISH for callback invocation. */
status = _IotMqtt_ScheduleOperation( pOperation, _IotMqtt_ProcessIncomingPublish, 0 );
}
else
{
status = IOT_MQTT_NETWORK_ERROR;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Free PUBLISH operation on error. */
if( status != IOT_MQTT_SUCCESS )
{
/* Check ownership of the received MQTT packet. */
if( pOperation->u.publish.pReceivedData != NULL )
{
/* Retrieve the pointer MQTT packet pointer so it may be freed later. */
IotMqtt_Assert( pIncomingPacket->pRemainingData == NULL );
pIncomingPacket->pRemainingData = ( uint8_t * ) pOperation->u.publish.pReceivedData;
}
else
{
EMPTY_ELSE_MARKER;
}
/* Remove operation from pending processing list. */
IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
if( IotLink_IsLinked( &( pOperation->link ) ) == true )
{
IotListDouble_Remove( &( pOperation->link ) );
}
else
{
EMPTY_ELSE_MARKER;
}
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
IotMqtt_Assert( pOperation != NULL );
IotMqtt_FreeOperation( pOperation );
}
else
{
EMPTY_ELSE_MARKER;
}
break;
case MQTT_PACKET_TYPE_PUBACK:
IotLogDebug( "(MQTT connection %p) PUBACK in data stream.", pMqttConnection );
/* Choose PUBACK deserializer. */
deserialize = _IotMqtt_DeserializePuback;
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( pMqttConnection->pSerializer != NULL )
{
if( pMqttConnection->pSerializer->deserialize.puback != NULL )
{
deserialize = pMqttConnection->pSerializer->deserialize.puback;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Deserialize PUBACK and notify of result. */
status = deserialize( pIncomingPacket );
pOperation = _IotMqtt_FindOperation( pMqttConnection,
IOT_MQTT_PUBLISH_TO_SERVER,
&( pIncomingPacket->packetIdentifier ) );
if( pOperation != NULL )
{
pOperation->u.operation.status = status;
_IotMqtt_Notify( pOperation );
}
else
{
EMPTY_ELSE_MARKER;
}
break;
case MQTT_PACKET_TYPE_SUBACK:
IotLogDebug( "(MQTT connection %p) SUBACK in data stream.", pMqttConnection );
/* Choose SUBACK deserializer. */
deserialize = _IotMqtt_DeserializeSuback;
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( pMqttConnection->pSerializer != NULL )
{
if( pMqttConnection->pSerializer->deserialize.suback != NULL )
{
deserialize = pMqttConnection->pSerializer->deserialize.suback;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Deserialize SUBACK and notify of result. */
pIncomingPacket->u.pMqttConnection = pMqttConnection;
status = deserialize( pIncomingPacket );
pOperation = _IotMqtt_FindOperation( pMqttConnection,
IOT_MQTT_SUBSCRIBE,
&( pIncomingPacket->packetIdentifier ) );
if( pOperation != NULL )
{
pOperation->u.operation.status = status;
_IotMqtt_Notify( pOperation );
}
else
{
EMPTY_ELSE_MARKER;
}
break;
case MQTT_PACKET_TYPE_UNSUBACK:
IotLogDebug( "(MQTT connection %p) UNSUBACK in data stream.", pMqttConnection );
/* Choose UNSUBACK deserializer. */
deserialize = _IotMqtt_DeserializeUnsuback;
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( pMqttConnection->pSerializer != NULL )
{
if( pMqttConnection->pSerializer->deserialize.unsuback != NULL )
{
deserialize = pMqttConnection->pSerializer->deserialize.unsuback;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Deserialize UNSUBACK and notify of result. */
status = deserialize( pIncomingPacket );
pOperation = _IotMqtt_FindOperation( pMqttConnection,
IOT_MQTT_UNSUBSCRIBE,
&( pIncomingPacket->packetIdentifier ) );
if( pOperation != NULL )
{
pOperation->u.operation.status = status;
_IotMqtt_Notify( pOperation );
}
else
{
EMPTY_ELSE_MARKER;
}
break;
default:
/* The only remaining valid type is PINGRESP. */
IotMqtt_Assert( ( pIncomingPacket->type & 0xf0 ) == MQTT_PACKET_TYPE_PINGRESP );
IotLogDebug( "(MQTT connection %p) PINGRESP in data stream.", pMqttConnection );
/* Choose PINGRESP deserializer. */
deserialize = _IotMqtt_DeserializePingresp;
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( pMqttConnection->pSerializer != NULL )
{
if( pMqttConnection->pSerializer->deserialize.pingresp != NULL )
{
deserialize = pMqttConnection->pSerializer->deserialize.pingresp;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Deserialize PINGRESP. */
status = deserialize( pIncomingPacket );
if( status == IOT_MQTT_SUCCESS )
{
IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
if( pMqttConnection->keepAliveFailure == false )
{
IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",
pMqttConnection );
}
else
{
IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",
pMqttConnection );
pMqttConnection->keepAliveFailure = false;
}
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
}
else
{
EMPTY_ELSE_MARKER;
}
break;
}
if( status != IOT_MQTT_SUCCESS )
{
IotLogError( "(MQTT connection %p) Packet parser status %s.",
pMqttConnection,
IotMqtt_strerror( status ) );
}
else
{
EMPTY_ELSE_MARKER;
}
return status;
}
/*-----------------------------------------------------------*/
static void _sendPuback( _mqttConnection_t * pMqttConnection,
uint16_t packetIdentifier )
{
IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;
uint8_t * pPuback = NULL;
size_t pubackSize = 0, bytesSent = 0;
/* Default PUBACK serializer and free packet functions. */
IotMqttError_t ( * serializePuback )( uint16_t,
uint8_t **,
size_t * ) = _IotMqtt_SerializePuback;
void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;
IotLogDebug( "(MQTT connection %p) Sending PUBACK for received PUBLISH %hu.",
pMqttConnection,
packetIdentifier );
/* Choose PUBACK serializer and free packet functions. */
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( pMqttConnection->pSerializer != NULL )
{
if( pMqttConnection->pSerializer->serialize.puback != NULL )
{
serializePuback = pMqttConnection->pSerializer->serialize.puback;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( pMqttConnection->pSerializer != NULL )
{
if( pMqttConnection->pSerializer->freePacket != NULL )
{
freePacket = pMqttConnection->pSerializer->freePacket;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* Generate a PUBACK packet from the packet identifier. */
serializeStatus = serializePuback( packetIdentifier,
&pPuback,
&pubackSize );
if( serializeStatus != IOT_MQTT_SUCCESS )
{
IotLogWarn( "(MQTT connection %p) Failed to generate PUBACK packet for "
"received PUBLISH %hu.",
pMqttConnection,
packetIdentifier );
}
else
{
bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,
pPuback,
pubackSize );
if( bytesSent != pubackSize )
{
IotLogWarn( "(MQTT connection %p) Failed to send PUBACK for received"
" PUBLISH %hu.",
pMqttConnection,
packetIdentifier );
}
else
{
IotLogDebug( "(MQTT connection %p) PUBACK for received PUBLISH %hu sent.",
pMqttConnection,
packetIdentifier );
}
freePacket( pPuback );
}
}
/*-----------------------------------------------------------*/
bool _IotMqtt_GetNextByte( void * pNetworkConnection,
const IotNetworkInterface_t * pNetworkInterface,
uint8_t * pIncomingByte )
{
bool status = false;
uint8_t incomingByte = 0;
size_t bytesReceived = 0;
/* Attempt to read 1 byte. */
bytesReceived = pNetworkInterface->receive( pNetworkConnection,
&incomingByte,
1 );
/* Set the output parameter and return success if 1 byte was read. */
if( bytesReceived == 1 )
{
*pIncomingByte = incomingByte;
status = true;
}
else
{
/* Network receive must return 0 on failure. */
IotMqtt_Assert( bytesReceived == 0 );
}
return status;
}
/*-----------------------------------------------------------*/
void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason,
_mqttConnection_t * pMqttConnection )
{
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
IotNetworkError_t closeStatus = IOT_NETWORK_SUCCESS;
IotMqttCallbackParam_t callbackParam = { .u.message = { 0 } };
/* Mark the MQTT connection as disconnected and the keep-alive as failed. */
IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
pMqttConnection->disconnected = true;
pMqttConnection->keepAliveFailure = true;
if( pMqttConnection->keepAliveMs != 0 )
{
/* Keep-alive must have a PINGREQ allocated. */
IotMqtt_Assert( pMqttConnection->pPingreqPacket != NULL );
IotMqtt_Assert( pMqttConnection->pingreqPacketSize != 0 );
/* PINGREQ provides a reference to the connection, so reference count must
* be nonzero. */
IotMqtt_Assert( pMqttConnection->references > 0 );
/* Attempt to cancel the keep-alive job. */
taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
pMqttConnection->keepAliveJob,
NULL );
/* If the keep-alive job was not canceled, it must be already executing.
* Any other return value is invalid. */
IotMqtt_Assert( ( taskPoolStatus == IOT_TASKPOOL_SUCCESS ) ||
( taskPoolStatus == IOT_TASKPOOL_CANCEL_FAILED ) );
/* Clean up keep-alive if its job was successfully canceled. Otherwise,
* the executing keep-alive job will clean up itself. */
if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
{
/* Clean up PINGREQ packet and job. */
_IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );
/* Clear data about the keep-alive. */
pMqttConnection->keepAliveMs = 0;
pMqttConnection->pPingreqPacket = NULL;
pMqttConnection->pingreqPacketSize = 0;
/* Keep-alive is cleaned up; decrement reference count. Since this
* function must be followed with a call to DISCONNECT, a check to
* destroy the connection is not done here. */
pMqttConnection->references--;
IotLogDebug( "(MQTT connection %p) Keep-alive job canceled and cleaned up.",
pMqttConnection );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
/* Close the network connection. */
if( pMqttConnection->pNetworkInterface->close != NULL )
{
closeStatus = pMqttConnection->pNetworkInterface->close( pMqttConnection->pNetworkConnection );
if( closeStatus == IOT_NETWORK_SUCCESS )
{
IotLogInfo( "(MQTT connection %p) Network connection closed.", pMqttConnection );
}
else
{
IotLogWarn( "(MQTT connection %p) Failed to close network connection, error %d.",
pMqttConnection,
closeStatus );
}
}
else
{
IotLogWarn( "(MQTT connection %p) No network close function was set. Network connection"
" not closed.", pMqttConnection );
}
/* Invoke the disconnect callback. */
if( pMqttConnection->disconnectCallback.function != NULL )
{
/* Set the members of the callback parameter. */
callbackParam.mqttConnection = pMqttConnection;
callbackParam.u.disconnectReason = disconnectReason;
pMqttConnection->disconnectCallback.function( pMqttConnection->disconnectCallback.pCallbackContext,
&callbackParam );
}
else
{
EMPTY_ELSE_MARKER;
}
}
/*-----------------------------------------------------------*/
void IotMqtt_ReceiveCallback( void * pNetworkConnection,
void * pReceiveContext )
{
IotMqttError_t status = IOT_MQTT_SUCCESS;
_mqttPacket_t incomingPacket = { .u.pMqttConnection = NULL };
/* Cast context to correct type. */
_mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pReceiveContext;
/* Read an MQTT packet from the network. */
status = _getIncomingPacket( pNetworkConnection,
pMqttConnection,
&incomingPacket );
if( status == IOT_MQTT_SUCCESS )
{
/* Deserialize the received packet. */
status = _deserializeIncomingPacket( pMqttConnection,
&incomingPacket );
/* Free any buffers allocated for the MQTT packet. */
if( incomingPacket.pRemainingData != NULL )
{
IotMqtt_FreeMessage( incomingPacket.pRemainingData );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Close the network connection on a bad response. */
if( status == IOT_MQTT_BAD_RESPONSE )
{
IotLogError( "(MQTT connection %p) Error processing incoming data. Closing connection.",
pMqttConnection );
_IotMqtt_CloseNetworkConnection( IOT_MQTT_BAD_PACKET_RECEIVED,
pMqttConnection );
}
else
{
EMPTY_ELSE_MARKER;
}
}
/*-----------------------------------------------------------*/

View file

@ -0,0 +1,207 @@
/*
* Amazon FreeRTOS MQTT V2.0.0
* Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* http://aws.amazon.com/freertos
* http://www.FreeRTOS.org
*/
/**
* @file iot_mqtt_static_memory.c
* @brief Implementation of MQTT static memory functions.
*/
/* The config header is always included first. */
#include "iot_config.h"
/* This file should only be compiled if dynamic memory allocation is forbidden. */
#if IOT_STATIC_MEMORY_ONLY == 1
/* Standard includes. */
#include <stdbool.h>
#include <stddef.h>
#include <string.h>
/* Static memory include. */
#include "private/iot_static_memory.h"
/* MQTT internal include. */
#include "private/iot_mqtt_internal.h"
/*-----------------------------------------------------------*/
/**
* @cond DOXYGEN_IGNORE
* Doxygen should ignore this section.
*
* Provide default values for undefined configuration constants.
*/
#ifndef IOT_MQTT_CONNECTIONS
#define IOT_MQTT_CONNECTIONS ( 1 )
#endif
#ifndef IOT_MQTT_MAX_IN_PROGRESS_OPERATIONS
#define IOT_MQTT_MAX_IN_PROGRESS_OPERATIONS ( 10 )
#endif
#ifndef IOT_MQTT_SUBSCRIPTIONS
#define IOT_MQTT_SUBSCRIPTIONS ( 8 )
#endif
/** @endcond */
/* Validate static memory configuration settings. */
#if IOT_MQTT_CONNECTIONS <= 0
#error "IOT_MQTT_CONNECTIONS cannot be 0 or negative."
#endif
#if IOT_MQTT_MAX_IN_PROGRESS_OPERATIONS <= 0
#error "IOT_MQTT_MAX_IN_PROGRESS_OPERATIONS cannot be 0 or negative."
#endif
#if IOT_MQTT_SUBSCRIPTIONS <= 0
#error "IOT_MQTT_SUBSCRIPTIONS cannot be 0 or negative."
#endif
/**
* @brief The size of a static memory MQTT subscription.
*
* Since the pTopic member of #_mqttSubscription_t is variable-length, the constant
* #AWS_IOT_MQTT_SERVER_MAX_TOPIC_LENGTH is used for the length of
* #_mqttSubscription_t.pTopicFilter.
*/
#define MQTT_SUBSCRIPTION_SIZE ( sizeof( _mqttSubscription_t ) + AWS_IOT_MQTT_SERVER_MAX_TOPIC_LENGTH )
/*-----------------------------------------------------------*/
/*
* Static memory buffers and flags, allocated and zeroed at compile-time.
*/
static bool _pInUseMqttConnections[ IOT_MQTT_CONNECTIONS ] = { 0 }; /**< @brief MQTT connection in-use flags. */
static _mqttConnection_t _pMqttConnections[ IOT_MQTT_CONNECTIONS ] = { { 0 } }; /**< @brief MQTT connections. */
static bool _pInUseMqttOperations[ IOT_MQTT_MAX_IN_PROGRESS_OPERATIONS ] = { 0 }; /**< @brief MQTT operation in-use flags. */
static _mqttOperation_t _pMqttOperations[ IOT_MQTT_MAX_IN_PROGRESS_OPERATIONS ] = { { .link = { 0 } } }; /**< @brief MQTT operations. */
static bool _pInUseMqttSubscriptions[ IOT_MQTT_SUBSCRIPTIONS ] = { 0 }; /**< @brief MQTT subscription in-use flags. */
static char _pMqttSubscriptions[ IOT_MQTT_SUBSCRIPTIONS ][ MQTT_SUBSCRIPTION_SIZE ] = { { 0 } }; /**< @brief MQTT subscriptions. */
/*-----------------------------------------------------------*/
void * IotMqtt_MallocConnection( size_t size )
{
int32_t freeIndex = -1;
void * pNewConnection = NULL;
/* Check size argument. */
if( size == sizeof( _mqttConnection_t ) )
{
/* Find a free MQTT connection. */
freeIndex = IotStaticMemory_FindFree( _pInUseMqttConnections,
IOT_MQTT_CONNECTIONS );
if( freeIndex != -1 )
{
pNewConnection = &( _pMqttConnections[ freeIndex ] );
}
}
return pNewConnection;
}
/*-----------------------------------------------------------*/
void IotMqtt_FreeConnection( void * ptr )
{
/* Return the in-use MQTT connection. */
IotStaticMemory_ReturnInUse( ptr,
_pMqttConnections,
_pInUseMqttConnections,
IOT_MQTT_CONNECTIONS,
sizeof( _mqttConnection_t ) );
}
/*-----------------------------------------------------------*/
void * IotMqtt_MallocOperation( size_t size )
{
int32_t freeIndex = -1;
void * pNewOperation = NULL;
/* Check size argument. */
if( size == sizeof( _mqttOperation_t ) )
{
/* Find a free MQTT operation. */
freeIndex = IotStaticMemory_FindFree( _pInUseMqttOperations,
IOT_MQTT_MAX_IN_PROGRESS_OPERATIONS );
if( freeIndex != -1 )
{
pNewOperation = &( _pMqttOperations[ freeIndex ] );
}
}
return pNewOperation;
}
/*-----------------------------------------------------------*/
void IotMqtt_FreeOperation( void * ptr )
{
/* Return the in-use MQTT operation. */
IotStaticMemory_ReturnInUse( ptr,
_pMqttOperations,
_pInUseMqttOperations,
IOT_MQTT_MAX_IN_PROGRESS_OPERATIONS,
sizeof( _mqttOperation_t ) );
}
/*-----------------------------------------------------------*/
void * IotMqtt_MallocSubscription( size_t size )
{
int32_t freeIndex = -1;
void * pNewSubscription = NULL;
if( size <= MQTT_SUBSCRIPTION_SIZE )
{
/* Get the index of a free MQTT subscription. */
freeIndex = IotStaticMemory_FindFree( _pInUseMqttSubscriptions,
IOT_MQTT_SUBSCRIPTIONS );
if( freeIndex != -1 )
{
pNewSubscription = &( _pMqttSubscriptions[ freeIndex ][ 0 ] );
}
}
return pNewSubscription;
}
/*-----------------------------------------------------------*/
void IotMqtt_FreeSubscription( void * ptr )
{
/* Return the in-use MQTT subscription. */
IotStaticMemory_ReturnInUse( ptr,
_pMqttSubscriptions,
_pInUseMqttSubscriptions,
IOT_MQTT_SUBSCRIPTIONS,
MQTT_SUBSCRIPTION_SIZE );
}
/*-----------------------------------------------------------*/
#endif

View file

@ -0,0 +1,648 @@
/*
* Amazon FreeRTOS MQTT V2.0.0
* Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* http://aws.amazon.com/freertos
* http://www.FreeRTOS.org
*/
/**
* @file iot_mqtt_subscription.c
* @brief Implements functions that manage subscriptions for an MQTT connection.
*/
/* The config header is always included first. */
#include "iot_config.h"
/* Standard includes. */
#include <stdbool.h>
#include <string.h>
/* Error handling include. */
#include "private/iot_error.h"
/* MQTT internal include. */
#include "private/iot_mqtt_internal.h"
/* Platform layer includes. */
#include "platform/iot_threads.h"
/*-----------------------------------------------------------*/
/**
* @brief First parameter to #_topicMatch.
*/
typedef struct _topicMatchParams
{
const char * pTopicName; /**< @brief The topic name to parse. */
uint16_t topicNameLength; /**< @brief Length of #_topicMatchParams_t.pTopicName. */
bool exactMatchOnly; /**< @brief Whether to allow wildcards or require exact matches. */
} _topicMatchParams_t;
/**
* @brief First parameter to #_packetMatch.
*/
typedef struct _packetMatchParams
{
uint16_t packetIdentifier; /**< Packet identifier to match. */
int32_t order; /**< Order to match. Set to `-1` to ignore. */
} _packetMatchParams_t;
/*-----------------------------------------------------------*/
/**
* @brief Matches a topic name (from a publish) with a topic filter (from a
* subscription).
*
* @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.
* @param[in] pMatch Pointer to a #_topicMatchParams_t.
*
* @return `true` if the arguments match the subscription topic filter; `false`
* otherwise.
*/
static bool _topicMatch( const IotLink_t * pSubscriptionLink,
void * pMatch );
/**
* @brief Matches a packet identifier and order.
*
* @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.
* @param[in] pMatch Pointer to a #_packetMatchParams_t.
*
* @return `true` if the arguments match the subscription's packet info; `false`
* otherwise.
*/
static bool _packetMatch( const IotLink_t * pSubscriptionLink,
void * pMatch );
/*-----------------------------------------------------------*/
static bool _topicMatch( const IotLink_t * pSubscriptionLink,
void * pMatch )
{
IOT_FUNCTION_ENTRY( bool, false );
uint16_t nameIndex = 0, filterIndex = 0;
/* Because this function is called from a container function, the given link
* must never be NULL. */
IotMqtt_Assert( pSubscriptionLink != NULL );
_mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,
pSubscriptionLink,
link );
_topicMatchParams_t * pParam = ( _topicMatchParams_t * ) pMatch;
/* Extract the relevant strings and lengths from parameters. */
const char * pTopicName = pParam->pTopicName;
const char * pTopicFilter = pSubscription->pTopicFilter;
const uint16_t topicNameLength = pParam->topicNameLength;
const uint16_t topicFilterLength = pSubscription->topicFilterLength;
/* Check for an exact match. */
if( topicNameLength == topicFilterLength )
{
status = ( strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0 );
IOT_GOTO_CLEANUP();
}
else
{
EMPTY_ELSE_MARKER;
}
/* If the topic lengths are different but an exact match is required, return
* false. */
if( pParam->exactMatchOnly == true )
{
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) )
{
/* Check if the character in the topic name matches the corresponding
* character in the topic filter string. */
if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] )
{
/* Handle special corner cases as documented by the MQTT protocol spec. */
/* Filter "sport/#" also matches "sport" since # includes the parent level. */
if( nameIndex == topicNameLength - 1 )
{
if( filterIndex == topicFilterLength - 3 )
{
if( pTopicFilter[ filterIndex + 1 ] == '/' )
{
if( pTopicFilter[ filterIndex + 2 ] == '#' )
{
IOT_SET_AND_GOTO_CLEANUP( true );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Filter "sport/+" also matches the "sport/" but not "sport". */
if( nameIndex == topicNameLength - 1 )
{
if( filterIndex == topicFilterLength - 2 )
{
if( pTopicFilter[ filterIndex + 1 ] == '+' )
{
IOT_SET_AND_GOTO_CLEANUP( true );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
/* Check for wildcards. */
if( pTopicFilter[ filterIndex ] == '+' )
{
/* Move topic name index to the end of the current level.
* This is identified by '/'. */
while( nameIndex < topicNameLength && pTopicName[ nameIndex ] != '/' )
{
nameIndex++;
}
/* Increment filter index to skip '/'. */
filterIndex++;
continue;
}
else if( pTopicFilter[ filterIndex ] == '#' )
{
/* Subsequent characters don't need to be checked if the for the
* multi-level wildcard. */
IOT_SET_AND_GOTO_CLEANUP( true );
}
else
{
/* Any character mismatch other than '+' or '#' means the topic
* name does not match the topic filter. */
IOT_SET_AND_GOTO_CLEANUP( false );
}
}
/* Increment indexes. */
nameIndex++;
filterIndex++;
}
/* If the end of both strings has been reached, they match. */
if( ( nameIndex == topicNameLength ) && ( filterIndex == topicFilterLength ) )
{
IOT_SET_AND_GOTO_CLEANUP( true );
}
else
{
EMPTY_ELSE_MARKER;
}
IOT_FUNCTION_EXIT_NO_CLEANUP();
}
/*-----------------------------------------------------------*/
static bool _packetMatch( const IotLink_t * pSubscriptionLink,
void * pMatch )
{
bool match = false;
/* Because this function is called from a container function, the given link
* must never be NULL. */
IotMqtt_Assert( pSubscriptionLink != NULL );
_mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,
pSubscriptionLink,
link );
_packetMatchParams_t * pParam = ( _packetMatchParams_t * ) pMatch;
/* Compare packet identifiers. */
if( pParam->packetIdentifier == pSubscription->packetInfo.identifier )
{
/* Compare orders if order is not -1. */
if( pParam->order == -1 )
{
match = true;
}
else
{
match = ( ( size_t ) pParam->order ) == pSubscription->packetInfo.order;
}
}
/* If this subscription should be removed, check the reference count. */
if( match == true )
{
/* Reference count must not be negative. */
IotMqtt_Assert( pSubscription->references >= 0 );
/* If the reference count is positive, this subscription cannot be
* removed yet because there are subscription callbacks using it. */
if( pSubscription->references > 0 )
{
match = false;
/* Set the unsubscribed flag. The last active subscription callback
* will remove and clean up this subscription. */
pSubscription->unsubscribed = true;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
return match;
}
/*-----------------------------------------------------------*/
IotMqttError_t _IotMqtt_AddSubscriptions( _mqttConnection_t * pMqttConnection,
uint16_t subscribePacketIdentifier,
const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount )
{
IotMqttError_t status = IOT_MQTT_SUCCESS;
size_t i = 0;
_mqttSubscription_t * pNewSubscription = NULL;
IotLink_t * pSubscriptionLink = NULL;
_topicMatchParams_t topicMatchParams = { .exactMatchOnly = true };
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
for( i = 0; i < subscriptionCount; i++ )
{
/* Check if this topic filter is already registered. */
topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;
topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;
pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
NULL,
_topicMatch,
&topicMatchParams );
if( pSubscriptionLink != NULL )
{
pNewSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
/* The lengths of exactly matching topic filters must match. */
IotMqtt_Assert( pNewSubscription->topicFilterLength == pSubscriptionList[ i ].topicFilterLength );
/* Replace the callback and packet info with the new parameters. */
pNewSubscription->callback = pSubscriptionList[ i ].callback;
pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;
pNewSubscription->packetInfo.order = i;
}
else
{
/* Allocate memory for a new subscription. */
pNewSubscription = IotMqtt_MallocSubscription( sizeof( _mqttSubscription_t ) +
pSubscriptionList[ i ].topicFilterLength );
if( pNewSubscription == NULL )
{
status = IOT_MQTT_NO_MEMORY;
break;
}
else
{
/* Clear the new subscription. */
( void ) memset( pNewSubscription,
0x00,
sizeof( _mqttSubscription_t ) + pSubscriptionList[ i ].topicFilterLength );
/* Set the members of the new subscription and add it to the list. */
pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;
pNewSubscription->packetInfo.order = i;
pNewSubscription->callback = pSubscriptionList[ i ].callback;
pNewSubscription->topicFilterLength = pSubscriptionList[ i ].topicFilterLength;
( void ) memcpy( pNewSubscription->pTopicFilter,
pSubscriptionList[ i ].pTopicFilter,
( size_t ) ( pSubscriptionList[ i ].topicFilterLength ) );
IotListDouble_InsertHead( &( pMqttConnection->subscriptionList ),
&( pNewSubscription->link ) );
}
}
}
IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
/* If memory allocation failed, remove all previously added subscriptions. */
if( status != IOT_MQTT_SUCCESS )
{
_IotMqtt_RemoveSubscriptionByTopicFilter( pMqttConnection,
pSubscriptionList,
i );
}
else
{
EMPTY_ELSE_MARKER;
}
return status;
}
/*-----------------------------------------------------------*/
void _IotMqtt_InvokeSubscriptionCallback( _mqttConnection_t * pMqttConnection,
IotMqttCallbackParam_t * pCallbackParam )
{
_mqttSubscription_t * pSubscription = NULL;
IotLink_t * pCurrentLink = NULL, * pNextLink = NULL;
void * pCallbackContext = NULL;
void ( * callbackFunction )( void *,
IotMqttCallbackParam_t * ) = NULL;
_topicMatchParams_t topicMatchParams =
{
.pTopicName = pCallbackParam->u.message.info.pTopicName,
.topicNameLength = pCallbackParam->u.message.info.topicNameLength,
.exactMatchOnly = false
};
/* Prevent any other thread from modifying the subscription list while this
* function is searching. */
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
/* Search the subscription list for all matching subscriptions starting at
* the list head. */
while( true )
{
pCurrentLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
pCurrentLink,
_topicMatch,
&topicMatchParams );
/* No subscription found. Exit loop. */
if( pCurrentLink == NULL )
{
break;
}
else
{
EMPTY_ELSE_MARKER;
}
/* Subscription found. Calculate pointer to subscription object. */
pSubscription = IotLink_Container( _mqttSubscription_t, pCurrentLink, link );
/* Subscription validation should not have allowed a NULL callback function. */
IotMqtt_Assert( pSubscription->callback.function != NULL );
/* Increment the subscription's reference count. */
( pSubscription->references )++;
/* Copy the necessary members of the subscription before releasing the
* subscription list mutex. */
pCallbackContext = pSubscription->callback.pCallbackContext;
callbackFunction = pSubscription->callback.function;
/* Unlock the subscription list mutex. */
IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
/* Set the members of the callback parameter. */
pCallbackParam->mqttConnection = pMqttConnection;
pCallbackParam->u.message.pTopicFilter = pSubscription->pTopicFilter;
pCallbackParam->u.message.topicFilterLength = pSubscription->topicFilterLength;
/* Invoke the subscription callback. */
callbackFunction( pCallbackContext, pCallbackParam );
/* Lock the subscription list mutex to decrement the reference count. */
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
/* Decrement the reference count. It must still be positive. */
( pSubscription->references )--;
IotMqtt_Assert( pSubscription->references >= 0 );
/* Save the pointer to the next link in case this subscription is freed. */
pNextLink = pCurrentLink->pNext;
/* Remove this subscription if it has no references and the unsubscribed
* flag is set. */
if( pSubscription->unsubscribed == true )
{
/* An unsubscribed subscription should have been removed from the list. */
IotMqtt_Assert( IotLink_IsLinked( &( pSubscription->link ) ) == false );
/* Free subscriptions with no references. */
if( pSubscription->references == 0 )
{
IotMqtt_FreeSubscription( pSubscription );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Move current link pointer. */
pCurrentLink = pNextLink;
}
IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
_IotMqtt_DecrementConnectionReferences( pMqttConnection );
}
/*-----------------------------------------------------------*/
void _IotMqtt_RemoveSubscriptionByPacket( _mqttConnection_t * pMqttConnection,
uint16_t packetIdentifier,
int32_t order )
{
const _packetMatchParams_t packetMatchParams =
{
.packetIdentifier = packetIdentifier,
.order = order
};
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),
_packetMatch,
( void * ) ( &packetMatchParams ),
IotMqtt_FreeSubscription,
offsetof( _mqttSubscription_t, link ) );
IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
}
/*-----------------------------------------------------------*/
void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnection,
const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount )
{
size_t i = 0;
_mqttSubscription_t * pSubscription = NULL;
IotLink_t * pSubscriptionLink = NULL;
_topicMatchParams_t topicMatchParams = { 0 };
/* Prevent any other thread from modifying the subscription list while this
* function is running. */
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
/* Find and remove each topic filter from the list. */
for( i = 0; i < subscriptionCount; i++ )
{
topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;
topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;
topicMatchParams.exactMatchOnly = true;
pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
NULL,
_topicMatch,
&topicMatchParams );
if( pSubscriptionLink != NULL )
{
pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
/* Reference count must not be negative. */
IotMqtt_Assert( pSubscription->references >= 0 );
/* Remove subscription from list. */
IotListDouble_Remove( pSubscriptionLink );
/* Check the reference count. This subscription cannot be removed if
* there are subscription callbacks using it. */
if( pSubscription->references > 0 )
{
/* Set the unsubscribed flag. The last active subscription callback
* will remove and clean up this subscription. */
pSubscription->unsubscribed = true;
}
else
{
/* Free a subscription with no references. */
IotMqtt_FreeSubscription( pSubscription );
}
}
else
{
EMPTY_ELSE_MARKER;
}
}
IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
}
/*-----------------------------------------------------------*/
bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,
const char * pTopicFilter,
uint16_t topicFilterLength,
IotMqttSubscription_t * pCurrentSubscription )
{
bool status = false;
_mqttSubscription_t * pSubscription = NULL;
IotLink_t * pSubscriptionLink = NULL;
_topicMatchParams_t topicMatchParams =
{
.pTopicName = pTopicFilter,
.topicNameLength = topicFilterLength,
.exactMatchOnly = true
};
/* Prevent any other thread from modifying the subscription list while this
* function is running. */
IotMutex_Lock( &( mqttConnection->subscriptionMutex ) );
/* Search for a matching subscription. */
pSubscriptionLink = IotListDouble_FindFirstMatch( &( mqttConnection->subscriptionList ),
NULL,
_topicMatch,
&topicMatchParams );
/* Check if a matching subscription was found. */
if( pSubscriptionLink != NULL )
{
pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
/* Copy the matching subscription to the output parameter. */
if( pCurrentSubscription != NULL )
{
pCurrentSubscription->pTopicFilter = pTopicFilter;
pCurrentSubscription->topicFilterLength = topicFilterLength;
pCurrentSubscription->qos = IOT_MQTT_QOS_0;
pCurrentSubscription->callback = pSubscription->callback;
}
else
{
EMPTY_ELSE_MARKER;
}
status = true;
}
else
{
EMPTY_ELSE_MARKER;
}
IotMutex_Unlock( &( mqttConnection->subscriptionMutex ) );
return status;
}
/*-----------------------------------------------------------*/
/* Provide access to internal functions and variables if testing. */
#if IOT_BUILD_TESTS == 1
#include "iot_test_access_mqtt_subscription.c"
#endif

View file

@ -0,0 +1,593 @@
/*
* Amazon FreeRTOS MQTT V2.0.0
* Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* http://aws.amazon.com/freertos
* http://www.FreeRTOS.org
*/
/**
* @file iot_mqtt_validate.c
* @brief Implements functions that validate the structs of the MQTT library.
*/
/* The config header is always included first. */
#include "iot_config.h"
/* Error handling include. */
#include "private/iot_error.h"
/* MQTT internal include. */
#include "private/iot_mqtt_internal.h"
/*-----------------------------------------------------------*/
bool _IotMqtt_ValidateConnect( const IotMqttConnectInfo_t * pConnectInfo )
{
IOT_FUNCTION_ENTRY( bool, true );
/* Check for NULL. */
if( pConnectInfo == NULL )
{
IotLogError( "MQTT connection information cannot be NULL." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check that a client identifier was set. */
if( pConnectInfo->pClientIdentifier == NULL )
{
IotLogError( "Client identifier must be set." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check for a zero-length client identifier. Zero-length client identifiers
* are not allowed with clean sessions. */
if( pConnectInfo->clientIdentifierLength == 0 )
{
IotLogWarn( "A zero-length client identifier was provided." );
if( pConnectInfo->cleanSession == true )
{
IotLogError( "A zero-length client identifier cannot be used with a clean session." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check that the number of persistent session subscriptions is valid. */
if( pConnectInfo->cleanSession == false )
{
if( pConnectInfo->pPreviousSubscriptions != NULL )
{
if( pConnectInfo->previousSubscriptionCount == 0 )
{
IotLogError( "Previous subscription count cannot be 0." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* In MQTT 3.1.1, servers are not obligated to accept client identifiers longer
* than 23 characters. */
if( pConnectInfo->clientIdentifierLength > 23 )
{
IotLogWarn( "A client identifier length of %hu is longer than 23, which is "
"the longest client identifier a server must accept.",
pConnectInfo->clientIdentifierLength );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check for compatibility with the AWS IoT MQTT service limits. */
if( pConnectInfo->awsIotMqttMode == true )
{
/* Check that client identifier is within the service limit. */
if( pConnectInfo->clientIdentifierLength > AWS_IOT_MQTT_SERVER_MAX_CLIENTID )
{
IotLogError( "AWS IoT does not support client identifiers longer than %d bytes.",
AWS_IOT_MQTT_SERVER_MAX_CLIENTID );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check for compliance with AWS IoT keep-alive limits. */
if( pConnectInfo->keepAliveSeconds == 0 )
{
IotLogWarn( "AWS IoT does not support disabling keep-alive. Default keep-alive "
"of %d seconds will be used.",
AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE );
}
else if( pConnectInfo->keepAliveSeconds < AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE )
{
IotLogWarn( "AWS IoT does not support keep-alive intervals less than %d seconds. "
"An interval of %d seconds will be used.",
AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE,
AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE );
}
else if( pConnectInfo->keepAliveSeconds > AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE )
{
IotLogWarn( "AWS IoT does not support keep-alive intervals greater than %d seconds. "
"An interval of %d seconds will be used.",
AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE,
AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
IOT_FUNCTION_EXIT_NO_CLEANUP();
}
/*-----------------------------------------------------------*/
bool _IotMqtt_ValidatePublish( bool awsIotMqttMode,
const IotMqttPublishInfo_t * pPublishInfo )
{
IOT_FUNCTION_ENTRY( bool, true );
/* Check for NULL. */
if( pPublishInfo == NULL )
{
IotLogError( "Publish information cannot be NULL." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check topic name for NULL or zero-length. */
if( pPublishInfo->pTopicName == NULL )
{
IotLogError( "Publish topic name must be set." );
}
else
{
EMPTY_ELSE_MARKER;
}
if( pPublishInfo->topicNameLength == 0 )
{
IotLogError( "Publish topic name length cannot be 0." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Only allow NULL payloads with zero length. */
if( pPublishInfo->pPayload == NULL )
{
if( pPublishInfo->payloadLength != 0 )
{
IotLogError( "Nonzero payload length cannot have a NULL payload." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check for a valid QoS. */
if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
{
if( pPublishInfo->qos != IOT_MQTT_QOS_1 )
{
IotLogError( "Publish QoS must be either 0 or 1." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check the retry parameters. */
if( pPublishInfo->retryLimit > 0 )
{
if( pPublishInfo->retryMs == 0 )
{
IotLogError( "Publish retry time must be positive." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check for compatibility with AWS IoT MQTT server. */
if( awsIotMqttMode == true )
{
/* Check for retained message. */
if( pPublishInfo->retain == true )
{
IotLogError( "AWS IoT does not support retained publish messages." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check topic name length. */
if( pPublishInfo->topicNameLength > AWS_IOT_MQTT_SERVER_MAX_TOPIC_LENGTH )
{
IotLogError( "AWS IoT does not support topic names longer than %d bytes.",
AWS_IOT_MQTT_SERVER_MAX_TOPIC_LENGTH );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
IOT_FUNCTION_EXIT_NO_CLEANUP();
}
/*-----------------------------------------------------------*/
bool _IotMqtt_ValidateOperation( IotMqttOperation_t operation )
{
IOT_FUNCTION_ENTRY( bool, true );
/* Check for NULL. */
if( operation == NULL )
{
IotLogError( "Operation reference cannot be NULL." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check that reference is waitable. */
if( ( operation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) != IOT_MQTT_FLAG_WAITABLE )
{
IotLogError( "Operation is not waitable." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
IOT_FUNCTION_EXIT_NO_CLEANUP();
}
/*-----------------------------------------------------------*/
bool _IotMqtt_ValidateSubscriptionList( IotMqttOperationType_t operation,
bool awsIotMqttMode,
const IotMqttSubscription_t * pListStart,
size_t listSize )
{
IOT_FUNCTION_ENTRY( bool, true );
size_t i = 0;
uint16_t j = 0;
const IotMqttSubscription_t * pListElement = NULL;
/* Operation must be either subscribe or unsubscribe. */
IotMqtt_Assert( ( operation == IOT_MQTT_SUBSCRIBE ) ||
( operation == IOT_MQTT_UNSUBSCRIBE ) );
/* Check for empty list. */
if( pListStart == NULL )
{
IotLogError( "Subscription list pointer cannot be NULL." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
if( listSize == 0 )
{
IotLogError( "Empty subscription list." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* AWS IoT supports at most 8 topic filters in a single SUBSCRIBE packet. */
if( awsIotMqttMode == true )
{
if( listSize > AWS_IOT_MQTT_SERVER_MAX_TOPIC_FILTERS_PER_SUBSCRIBE )
{
IotLogError( "AWS IoT does not support more than %d topic filters per "
"subscription request.",
AWS_IOT_MQTT_SERVER_MAX_TOPIC_FILTERS_PER_SUBSCRIBE );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
for( i = 0; i < listSize; i++ )
{
pListElement = &( pListStart[ i ] );
/* Check for a valid QoS and callback function when subscribing. */
if( operation == IOT_MQTT_SUBSCRIBE )
{
if( pListElement->qos != IOT_MQTT_QOS_0 )
{
if( pListElement->qos != IOT_MQTT_QOS_1 )
{
IotLogError( "Subscription QoS must be either 0 or 1." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
if( pListElement->callback.function == NULL )
{
IotLogError( "Callback function must be set." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check subscription topic filter. */
if( pListElement->pTopicFilter == NULL )
{
IotLogError( "Subscription topic filter cannot be NULL." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
if( pListElement->topicFilterLength == 0 )
{
IotLogError( "Subscription topic filter length cannot be 0." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check for compatibility with AWS IoT MQTT server. */
if( awsIotMqttMode == true )
{
/* Check topic filter length. */
if( pListElement->topicFilterLength > AWS_IOT_MQTT_SERVER_MAX_TOPIC_LENGTH )
{
IotLogError( "AWS IoT does not support topic filters longer than %d bytes.",
AWS_IOT_MQTT_SERVER_MAX_TOPIC_LENGTH );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check that the wildcards '+' and '#' are being used correctly. */
for( j = 0; j < pListElement->topicFilterLength; j++ )
{
switch( pListElement->pTopicFilter[ j ] )
{
/* Check that the single level wildcard '+' is used correctly. */
case '+':
/* Unless '+' is the first character in the filter, it must be preceded by '/'. */
if( j > 0 )
{
if( pListElement->pTopicFilter[ j - 1 ] != '/' )
{
IotLogError( "Invalid topic filter %.*s -- '+' must be preceded by '/'.",
pListElement->topicFilterLength,
pListElement->pTopicFilter );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Unless '+' is the last character in the filter, it must be succeeded by '/'. */
if( j < pListElement->topicFilterLength - 1 )
{
if( pListElement->pTopicFilter[ j + 1 ] != '/' )
{
IotLogError( "Invalid topic filter %.*s -- '+' must be succeeded by '/'.",
pListElement->topicFilterLength,
pListElement->pTopicFilter );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
break;
/* Check that the multi-level wildcard '#' is used correctly. */
case '#':
/* '#' must be the last character in the filter. */
if( j != pListElement->topicFilterLength - 1 )
{
IotLogError( "Invalid topic filter %.*s -- '#' must be the last character.",
pListElement->topicFilterLength,
pListElement->pTopicFilter );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Unless '#' is standalone, it must be preceded by '/'. */
if( pListElement->topicFilterLength > 1 )
{
if( pListElement->pTopicFilter[ j - 1 ] != '/' )
{
IotLogError( "Invalid topic filter %.*s -- '#' must be preceded by '/'.",
pListElement->topicFilterLength,
pListElement->pTopicFilter );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
break;
default:
break;
}
}
}
IOT_FUNCTION_EXIT_NO_CLEANUP();
}
/*-----------------------------------------------------------*/

View file

@ -0,0 +1,915 @@
/*
* Amazon FreeRTOS MQTT V2.0.0
* Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* http://aws.amazon.com/freertos
* http://www.FreeRTOS.org
*/
/**
* @file iot_mqtt_internal.h
* @brief Internal header of MQTT library. This header should not be included in
* typical application code.
*/
#ifndef IOT_MQTT_INTERNAL_H_
#define IOT_MQTT_INTERNAL_H_
/* The config header is always included first. */
#include "iot_config.h"
/* Linear containers (lists and queues) include. */
#include "iot_linear_containers.h"
/* MQTT include. */
#include "iot_mqtt.h"
/* Task pool include. */
#include "iot_taskpool.h"
/**
* @def IotMqtt_Assert( expression )
* @brief Assertion macro for the MQTT library.
*
* Set @ref IOT_MQTT_ENABLE_ASSERTS to `1` to enable assertions in the MQTT
* library.
*
* @param[in] expression Expression to be evaluated.
*/
#if IOT_MQTT_ENABLE_ASSERTS == 1
#ifndef IotMqtt_Assert
#include <assert.h>
#define IotMqtt_Assert( expression ) assert( expression )
#endif
#else
#define IotMqtt_Assert( expression )
#endif
/* Configure logs for MQTT functions. */
#ifdef IOT_LOG_LEVEL_MQTT
#define LIBRARY_LOG_LEVEL IOT_LOG_LEVEL_MQTT
#else
#ifdef IOT_LOG_LEVEL_GLOBAL
#define LIBRARY_LOG_LEVEL IOT_LOG_LEVEL_GLOBAL
#else
#define LIBRARY_LOG_LEVEL IOT_LOG_NONE
#endif
#endif
#define LIBRARY_LOG_NAME ( "MQTT" )
#include "iot_logging_setup.h"
/*
* Provide default values for undefined memory allocation functions based on
* the usage of dynamic memory allocation.
*/
#if IOT_STATIC_MEMORY_ONLY == 1
#include "private/iot_static_memory.h"
/**
* @brief Allocate an #_mqttConnection_t. This function should have the same
* signature as [malloc]
* (http://pubs.opengroup.org/onlinepubs/9699919799/functions/malloc.html).
*/
void * IotMqtt_MallocConnection( size_t size );
/**
* @brief Free an #_mqttConnection_t. This function should have the same
* signature as [free]
* (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).
*/
void IotMqtt_FreeConnection( void * ptr );
/**
* @brief Allocate memory for an MQTT packet. This function should have the
* same signature as [malloc]
* (http://pubs.opengroup.org/onlinepubs/9699919799/functions/malloc.html).
*/
#define IotMqtt_MallocMessage Iot_MallocMessageBuffer
/**
* @brief Free an MQTT packet. This function should have the same signature
* as [free]
* (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).
*/
#define IotMqtt_FreeMessage Iot_FreeMessageBuffer
/**
* @brief Allocate an #_mqttOperation_t. This function should have the same
* signature as [malloc]
* (http://pubs.opengroup.org/onlinepubs/9699919799/functions/malloc.html).
*/
void * IotMqtt_MallocOperation( size_t size );
/**
* @brief Free an #_mqttOperation_t. This function should have the same
* signature as [free]
* (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).
*/
void IotMqtt_FreeOperation( void * ptr );
/**
* @brief Allocate an #_mqttSubscription_t. This function should have the
* same signature as [malloc]
* (http://pubs.opengroup.org/onlinepubs/9699919799/functions/malloc.html).
*/
void * IotMqtt_MallocSubscription( size_t size );
/**
* @brief Free an #_mqttSubscription_t. This function should have the same
* signature as [free]
* (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).
*/
void IotMqtt_FreeSubscription( void * ptr );
#else /* if IOT_STATIC_MEMORY_ONLY == 1 */
#include <stdlib.h>
#ifndef IotMqtt_MallocConnection
#define IotMqtt_MallocConnection malloc
#endif
#ifndef IotMqtt_FreeConnection
#define IotMqtt_FreeConnection free
#endif
#ifndef IotMqtt_MallocMessage
#define IotMqtt_MallocMessage malloc
#endif
#ifndef IotMqtt_FreeMessage
#define IotMqtt_FreeMessage free
#endif
#ifndef IotMqtt_MallocOperation
#define IotMqtt_MallocOperation malloc
#endif
#ifndef IotMqtt_FreeOperation
#define IotMqtt_FreeOperation free
#endif
#ifndef IotMqtt_MallocSubscription
#define IotMqtt_MallocSubscription malloc
#endif
#ifndef IotMqtt_FreeSubscription
#define IotMqtt_FreeSubscription free
#endif
#endif /* if IOT_STATIC_MEMORY_ONLY == 1 */
/**
* @cond DOXYGEN_IGNORE
* Doxygen should ignore this section.
*
* Provide default values for undefined configuration constants.
*/
#ifndef AWS_IOT_MQTT_ENABLE_METRICS
#define AWS_IOT_MQTT_ENABLE_METRICS ( 1 )
#endif
#ifndef IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES
#define IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES ( 0 )
#endif
#ifndef IOT_MQTT_RESPONSE_WAIT_MS
#define IOT_MQTT_RESPONSE_WAIT_MS ( 1000 )
#endif
#ifndef IOT_MQTT_RETRY_MS_CEILING
#define IOT_MQTT_RETRY_MS_CEILING ( 60000 )
#endif
/** @endcond */
/**
* @brief Marks the empty statement of an `else` branch.
*
* Does nothing, but allows test coverage to detect branches not taken. By default,
* this is defined to nothing. When running code coverage testing, this is defined
* to an assembly NOP.
*/
#ifndef EMPTY_ELSE_MARKER
#define EMPTY_ELSE_MARKER
#endif
/*
* Constants related to limits defined in AWS Service Limits.
*
* For details, see
* https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html
*
* Used to validate parameters if when connecting to an AWS IoT MQTT server.
*/
#define AWS_IOT_MQTT_SERVER_MIN_KEEPALIVE ( 30 ) /**< @brief Minumum keep-alive interval accepted by AWS IoT. */
#define AWS_IOT_MQTT_SERVER_MAX_KEEPALIVE ( 1200 ) /**< @brief Maximum keep-alive interval accepted by AWS IoT. */
#define AWS_IOT_MQTT_SERVER_MAX_CLIENTID ( 128 ) /**< @brief Maximum length of client identifier accepted by AWS IoT. */
#define AWS_IOT_MQTT_SERVER_MAX_TOPIC_LENGTH ( 256 ) /**< @brief Maximum length of topic names or filters accepted by AWS IoT. */
#define AWS_IOT_MQTT_SERVER_MAX_TOPIC_FILTERS_PER_SUBSCRIBE ( 8 ) /**< @brief Maximum number of topic filters in a single SUBSCRIBE packet. */
/*
* MQTT control packet type and flags. Always the first byte of an MQTT
* packet.
*
* For details, see
* http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/csprd02/mqtt-v3.1.1-csprd02.html#_Toc385349757
*/
#define MQTT_PACKET_TYPE_CONNECT ( ( uint8_t ) 0x10U ) /**< @brief CONNECT (client-to-server). */
#define MQTT_PACKET_TYPE_CONNACK ( ( uint8_t ) 0x20U ) /**< @brief CONNACK (server-to-client). */
#define MQTT_PACKET_TYPE_PUBLISH ( ( uint8_t ) 0x30U ) /**< @brief PUBLISH (bi-directional). */
#define MQTT_PACKET_TYPE_PUBACK ( ( uint8_t ) 0x40U ) /**< @brief PUBACK (server-to-client). */
#define MQTT_PACKET_TYPE_SUBSCRIBE ( ( uint8_t ) 0x82U ) /**< @brief SUBSCRIBE (client-to-server). */
#define MQTT_PACKET_TYPE_SUBACK ( ( uint8_t ) 0x90U ) /**< @brief SUBACK (server-to-client). */
#define MQTT_PACKET_TYPE_UNSUBSCRIBE ( ( uint8_t ) 0xa2U ) /**< @brief UNSUBSCRIBE (client-to-server). */
#define MQTT_PACKET_TYPE_UNSUBACK ( ( uint8_t ) 0xb0U ) /**< @brief UNSUBACK (server-to-client). */
#define MQTT_PACKET_TYPE_PINGREQ ( ( uint8_t ) 0xc0U ) /**< @brief PINGREQ (client-to-server). */
#define MQTT_PACKET_TYPE_PINGRESP ( ( uint8_t ) 0xd0U ) /**< @brief PINGRESP (server-to-client). */
#define MQTT_PACKET_TYPE_DISCONNECT ( ( uint8_t ) 0xe0U ) /**< @brief DISCONNECT (client-to-server). */
/**
* @brief A value that represents an invalid remaining length.
*
* This value is greater than what is allowed by the MQTT specification.
*/
#define MQTT_REMAINING_LENGTH_INVALID ( ( size_t ) 268435456 )
/*---------------------- MQTT internal data structures ----------------------*/
/**
* @brief Represents an MQTT connection.
*/
typedef struct _mqttConnection
{
bool awsIotMqttMode; /**< @brief Specifies if this connection is to an AWS IoT MQTT server. */
bool ownNetworkConnection; /**< @brief Whether this MQTT connection owns its network connection. */
void * pNetworkConnection; /**< @brief References the transport-layer network connection. */
const IotNetworkInterface_t * pNetworkInterface; /**< @brief Network interface provided to @ref mqtt_function_connect. */
IotMqttCallbackInfo_t disconnectCallback; /**< @brief A function to invoke when this connection is disconnected. */
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
const IotMqttSerializer_t * pSerializer; /**< @brief MQTT packet serializer overrides. */
#endif
bool disconnected; /**< @brief Tracks if this connection has been disconnected. */
IotMutex_t referencesMutex; /**< @brief Recursive mutex. Grants access to connection state and operation lists. */
int32_t references; /**< @brief Counts callbacks and operations using this connection. */
IotListDouble_t pendingProcessing; /**< @brief List of operations waiting to be processed by a task pool routine. */
IotListDouble_t pendingResponse; /**< @brief List of processed operations awaiting a server response. */
IotListDouble_t subscriptionList; /**< @brief Holds subscriptions associated with this connection. */
IotMutex_t subscriptionMutex; /**< @brief Grants exclusive access to the subscription list. */
bool keepAliveFailure; /**< @brief Failure flag for keep-alive operation. */
uint32_t keepAliveMs; /**< @brief Keep-alive interval in milliseconds. Its max value (per spec) is 65,535,000. */
uint32_t nextKeepAliveMs; /**< @brief Relative delay for next keep-alive job. */
IotTaskPoolJobStorage_t keepAliveJobStorage; /**< @brief Task pool job for processing this connection's keep-alive. */
IotTaskPoolJob_t keepAliveJob; /**< @brief Task pool job for processing this connection's keep-alive. */
uint8_t * pPingreqPacket; /**< @brief An MQTT PINGREQ packet, allocated if keep-alive is active. */
size_t pingreqPacketSize; /**< @brief The size of an allocated PINGREQ packet. */
} _mqttConnection_t;
/**
* @brief Represents a subscription stored in an MQTT connection.
*/
typedef struct _mqttSubscription
{
IotLink_t link; /**< @brief List link member. */
int32_t references; /**< @brief How many subscription callbacks are using this subscription. */
/**
* @brief Tracks whether @ref mqtt_function_unsubscribe has been called for
* this subscription.
*
* If there are active subscription callbacks, @ref mqtt_function_unsubscribe
* cannot remove this subscription. Instead, it will set this flag, which
* schedules the removal of this subscription once all subscription callbacks
* terminate.
*/
bool unsubscribed;
struct
{
uint16_t identifier; /**< @brief Packet identifier. */
size_t order; /**< @brief Order in the packet's list of subscriptions. */
} packetInfo; /**< @brief Information about the SUBSCRIBE packet that registered this subscription. */
IotMqttCallbackInfo_t callback; /**< @brief Callback information for this subscription. */
uint16_t topicFilterLength; /**< @brief Length of #_mqttSubscription_t.pTopicFilter. */
char pTopicFilter[]; /**< @brief The subscription topic filter. */
} _mqttSubscription_t;
/**
* @brief Internal structure representing a single MQTT operation, such as
* CONNECT, SUBSCRIBE, PUBLISH, etc.
*
* Queues of these structures keeps track of all in-progress MQTT operations.
*/
typedef struct _mqttOperation
{
/* Pointers to neighboring queue elements. */
IotLink_t link; /**< @brief List link member. */
bool incomingPublish; /**< @brief Set to true if this operation an incoming PUBLISH. */
_mqttConnection_t * pMqttConnection; /**< @brief MQTT connection associated with this operation. */
IotTaskPoolJobStorage_t jobStorage; /**< @brief Task pool job storage associated with this operation. */
IotTaskPoolJob_t job; /**< @brief Task pool job associated with this operation. */
union
{
/* If incomingPublish is false, this struct is valid. */
struct
{
/* Basic operation information. */
int32_t jobReference; /**< @brief Tracks if a job is using this operation. Must always be 0, 1, or 2. */
IotMqttOperationType_t type; /**< @brief What operation this structure represents. */
uint32_t flags; /**< @brief Flags passed to the function that created this operation. */
uint16_t packetIdentifier; /**< @brief The packet identifier used with this operation. */
/* Serialized packet and size. */
uint8_t * pMqttPacket; /**< @brief The MQTT packet to send over the network. */
uint8_t * pPacketIdentifierHigh; /**< @brief The location of the high byte of the packet identifier in the MQTT packet. */
size_t packetSize; /**< @brief Size of `pMqttPacket`. */
/* How to notify of an operation's completion. */
union
{
IotSemaphore_t waitSemaphore; /**< @brief Semaphore to be used with @ref mqtt_function_wait. */
IotMqttCallbackInfo_t callback; /**< @brief User-provided callback function and parameter. */
} notify; /**< @brief How to notify of this operation's completion. */
IotMqttError_t status; /**< @brief Result of this operation. This is reported once a response is received. */
struct
{
uint32_t count;
uint32_t limit;
uint32_t nextPeriod;
} retry;
} operation;
/* If incomingPublish is true, this struct is valid. */
struct
{
IotMqttPublishInfo_t publishInfo; /**< @brief Deserialized PUBLISH. */
const void * pReceivedData; /**< @brief Any buffer associated with this PUBLISH that should be freed. */
} publish;
} u; /**< @brief Valid member depends on _mqttOperation_t.incomingPublish. */
} _mqttOperation_t;
/**
* @brief Represents an MQTT packet received from the network.
*
* This struct is used to hold parameters for the deserializers so that all
* deserializers have the same function signature.
*/
typedef struct _mqttPacket
{
union
{
/**
* @brief (Input) MQTT connection associated with this packet. Only used
* when deserializing SUBACKs.
*/
_mqttConnection_t * pMqttConnection;
/**
* @brief (Output) Operation representing an incoming PUBLISH. Only used
* when deserializing PUBLISHes.
*/
_mqttOperation_t * pIncomingPublish;
} u; /**< @brief Valid member depends on packet being decoded. */
uint8_t * pRemainingData; /**< @brief (Input) The remaining data in MQTT packet. */
size_t remainingLength; /**< @brief (Input) Length of the remaining data in the MQTT packet. */
uint16_t packetIdentifier; /**< @brief (Output) MQTT packet identifier. */
uint8_t type; /**< @brief (Input) A value identifying the packet type. */
} _mqttPacket_t;
/*-------------------- MQTT struct validation functions ---------------------*/
/**
* @brief Check that an #IotMqttConnectInfo_t is valid.
*
* @param[in] pConnectInfo The #IotMqttConnectInfo_t to validate.
*
* @return `true` if `pConnectInfo` is valid; `false` otherwise.
*/
bool _IotMqtt_ValidateConnect( const IotMqttConnectInfo_t * pConnectInfo );
/**
* @brief Check that an #IotMqttPublishInfo_t is valid.
*
* @param[in] awsIotMqttMode Specifies if this PUBLISH packet is being sent to
* an AWS IoT MQTT server.
* @param[in] pPublishInfo The #IotMqttPublishInfo_t to validate.
*
* @return `true` if `pPublishInfo` is valid; `false` otherwise.
*/
bool _IotMqtt_ValidatePublish( bool awsIotMqttMode,
const IotMqttPublishInfo_t * pPublishInfo );
/**
* @brief Check that an #IotMqttOperation_t is valid and waitable.
*
* @param[in] operation The #IotMqttOperation_t to validate.
*
* @return `true` if `operation` is valid; `false` otherwise.
*/
bool _IotMqtt_ValidateOperation( IotMqttOperation_t operation );
/**
* @brief Check that a list of #IotMqttSubscription_t is valid.
*
* @param[in] operation Either #IOT_MQTT_SUBSCRIBE or #IOT_MQTT_UNSUBSCRIBE.
* Some parameters are not validated for #IOT_MQTT_UNSUBSCRIBE.
* @param[in] awsIotMqttMode Specifies if this SUBSCRIBE packet is being sent to
* an AWS IoT MQTT server.
* @param[in] pListStart First element of the list to validate.
* @param[in] listSize Number of elements in the subscription list.
*
* @return `true` if every element in the list is valid; `false` otherwise.
*/
bool _IotMqtt_ValidateSubscriptionList( IotMqttOperationType_t operation,
bool awsIotMqttMode,
const IotMqttSubscription_t * pListStart,
size_t listSize );
/*-------------------- MQTT packet serializer functions ---------------------*/
/**
* @brief Get the MQTT packet type from a stream of bytes off the network.
*
* @param[in] pNetworkConnection Reference to the network connection.
* @param[in] pNetworkInterface Function pointers used to interact with the
* network.
*
* @return One of the server-to-client MQTT packet types.
*
* @note This function is only used for incoming packets, and may not work
* correctly for outgoing packets.
*/
uint8_t _IotMqtt_GetPacketType( void * pNetworkConnection,
const IotNetworkInterface_t * pNetworkInterface );
/**
* @brief Get the remaining length from a stream of bytes off the network.
*
* @param[in] pNetworkConnection Reference to the network connection.
* @param[in] pNetworkInterface Function pointers used to interact with the
* network.
*
* @return The remaining length; #MQTT_REMAINING_LENGTH_INVALID on error.
*/
size_t _IotMqtt_GetRemainingLength( void * pNetworkConnection,
const IotNetworkInterface_t * pNetworkInterface );
/**
* @brief Generate a CONNECT packet from the given parameters.
*
* @param[in] pConnectInfo User-provided CONNECT information.
* @param[out] pConnectPacket Where the CONNECT packet is written.
* @param[out] pPacketSize Size of the packet written to `pConnectPacket`.
*
* @return #IOT_MQTT_SUCCESS or #IOT_MQTT_NO_MEMORY.
*/
IotMqttError_t _IotMqtt_SerializeConnect( const IotMqttConnectInfo_t * pConnectInfo,
uint8_t ** pConnectPacket,
size_t * pPacketSize );
/**
* @brief Deserialize a CONNACK packet.
*
* Converts the packet from a stream of bytes to an #IotMqttError_t. Also
* prints out debug log messages about the packet.
*
* @param[in,out] pConnack Pointer to an MQTT packet struct representing a CONNACK.
*
* @return #IOT_MQTT_SUCCESS if CONNACK specifies that CONNECT was accepted;
* #IOT_MQTT_SERVER_REFUSED if CONNACK specifies that CONNECT was rejected;
* #IOT_MQTT_BAD_RESPONSE if the CONNACK packet doesn't follow MQTT spec.
*/
IotMqttError_t _IotMqtt_DeserializeConnack( _mqttPacket_t * pConnack );
/**
* @brief Generate a PUBLISH packet from the given parameters.
*
* @param[in] pPublishInfo User-provided PUBLISH information.
* @param[out] pPublishPacket Where the PUBLISH packet is written.
* @param[out] pPacketSize Size of the packet written to `pPublishPacket`.
* @param[out] pPacketIdentifier The packet identifier generated for this PUBLISH.
* @param[out] pPacketIdentifierHigh Where the high byte of the packet identifier
* is written.
*
* @return #IOT_MQTT_SUCCESS or #IOT_MQTT_NO_MEMORY.
*/
IotMqttError_t _IotMqtt_SerializePublish( const IotMqttPublishInfo_t * pPublishInfo,
uint8_t ** pPublishPacket,
size_t * pPacketSize,
uint16_t * pPacketIdentifier,
uint8_t ** pPacketIdentifierHigh );
/**
* @brief Set the DUP bit in a QoS 1 PUBLISH packet.
*
* @param[in] pPublishPacket Pointer to the PUBLISH packet to modify.
* @param[in] pPacketIdentifierHigh The high byte of any packet identifier to modify.
* @param[out] pNewPacketIdentifier Since AWS IoT does not support the DUP flag,
* a new packet identifier is generated and should be written here. This parameter
* is only used when connected to an AWS IoT MQTT server.
*
* @note See #IotMqttPublishInfo_t for caveats with retransmission to the
* AWS IoT MQTT server.
*/
void _IotMqtt_PublishSetDup( uint8_t * pPublishPacket,
uint8_t * pPacketIdentifierHigh,
uint16_t * pNewPacketIdentifier );
/**
* @brief Deserialize a PUBLISH packet received from the server.
*
* Converts the packet from a stream of bytes to an #IotMqttPublishInfo_t and
* extracts the packet identifier. Also prints out debug log messages about the
* packet.
*
* @param[in,out] pPublish Pointer to an MQTT packet struct representing a PUBLISH.
*
* @return #IOT_MQTT_SUCCESS if PUBLISH is valid; #IOT_MQTT_BAD_RESPONSE
* if the PUBLISH packet doesn't follow MQTT spec.
*/
IotMqttError_t _IotMqtt_DeserializePublish( _mqttPacket_t * pPublish );
/**
* @brief Generate a PUBACK packet for the given packet identifier.
*
* @param[in] packetIdentifier The packet identifier to place in PUBACK.
* @param[out] pPubackPacket Where the PUBACK packet is written.
* @param[out] pPacketSize Size of the packet written to `pPubackPacket`.
*
* @return #IOT_MQTT_SUCCESS or #IOT_MQTT_NO_MEMORY.
*/
IotMqttError_t _IotMqtt_SerializePuback( uint16_t packetIdentifier,
uint8_t ** pPubackPacket,
size_t * pPacketSize );
/**
* @brief Deserialize a PUBACK packet.
*
* Converts the packet from a stream of bytes to an #IotMqttError_t and extracts
* the packet identifier. Also prints out debug log messages about the packet.
*
* @param[in,out] pPuback Pointer to an MQTT packet struct representing a PUBACK.
*
* @return #IOT_MQTT_SUCCESS if PUBACK is valid; #IOT_MQTT_BAD_RESPONSE
* if the PUBACK packet doesn't follow MQTT spec.
*/
IotMqttError_t _IotMqtt_DeserializePuback( _mqttPacket_t * pPuback );
/**
* @brief Generate a SUBSCRIBE packet from the given parameters.
*
* @param[in] pSubscriptionList User-provided array of subscriptions.
* @param[in] subscriptionCount Size of `pSubscriptionList`.
* @param[out] pSubscribePacket Where the SUBSCRIBE packet is written.
* @param[out] pPacketSize Size of the packet written to `pSubscribePacket`.
* @param[out] pPacketIdentifier The packet identifier generated for this SUBSCRIBE.
*
* @return #IOT_MQTT_SUCCESS or #IOT_MQTT_NO_MEMORY.
*/
IotMqttError_t _IotMqtt_SerializeSubscribe( const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount,
uint8_t ** pSubscribePacket,
size_t * pPacketSize,
uint16_t * pPacketIdentifier );
/**
* @brief Deserialize a SUBACK packet.
*
* Converts the packet from a stream of bytes to an #IotMqttError_t and extracts
* the packet identifier. Also prints out debug log messages about the packet.
*
* @param[in,out] pSuback Pointer to an MQTT packet struct representing a SUBACK.
*
* @return #IOT_MQTT_SUCCESS if SUBACK is valid; #IOT_MQTT_BAD_RESPONSE
* if the SUBACK packet doesn't follow MQTT spec.
*/
IotMqttError_t _IotMqtt_DeserializeSuback( _mqttPacket_t * pSuback );
/**
* @brief Generate an UNSUBSCRIBE packet from the given parameters.
*
* @param[in] pSubscriptionList User-provided array of subscriptions to remove.
* @param[in] subscriptionCount Size of `pSubscriptionList`.
* @param[out] pUnsubscribePacket Where the UNSUBSCRIBE packet is written.
* @param[out] pPacketSize Size of the packet written to `pUnsubscribePacket`.
* @param[out] pPacketIdentifier The packet identifier generated for this UNSUBSCRIBE.
*
* @return #IOT_MQTT_SUCCESS or #IOT_MQTT_NO_MEMORY.
*/
IotMqttError_t _IotMqtt_SerializeUnsubscribe( const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount,
uint8_t ** pUnsubscribePacket,
size_t * pPacketSize,
uint16_t * pPacketIdentifier );
/**
* @brief Deserialize a UNSUBACK packet.
*
* Converts the packet from a stream of bytes to an #IotMqttError_t and extracts
* the packet identifier. Also prints out debug log messages about the packet.
*
* @param[in,out] pUnsuback Pointer to an MQTT packet struct representing an UNSUBACK.
*
* @return #IOT_MQTT_SUCCESS if UNSUBACK is valid; #IOT_MQTT_BAD_RESPONSE
* if the UNSUBACK packet doesn't follow MQTT spec.
*/
IotMqttError_t _IotMqtt_DeserializeUnsuback( _mqttPacket_t * pUnsuback );
/**
* @brief Generate a PINGREQ packet.
*
* @param[out] pPingreqPacket Where the PINGREQ packet is written.
* @param[out] pPacketSize Size of the packet written to `pPingreqPacket`.
*
* @return Always returns #IOT_MQTT_SUCCESS.
*/
IotMqttError_t _IotMqtt_SerializePingreq( uint8_t ** pPingreqPacket,
size_t * pPacketSize );
/**
* @brief Deserialize a PINGRESP packet.
*
* Converts the packet from a stream of bytes to an #IotMqttError_t. Also
* prints out debug log messages about the packet.
*
* @param[in,out] pPingresp Pointer to an MQTT packet struct representing a PINGRESP.
*
* @return #IOT_MQTT_SUCCESS if PINGRESP is valid; #IOT_MQTT_BAD_RESPONSE
* if the PINGRESP packet doesn't follow MQTT spec.
*/
IotMqttError_t _IotMqtt_DeserializePingresp( _mqttPacket_t * pPingresp );
/**
* @brief Generate a DISCONNECT packet.
*
* @param[out] pDisconnectPacket Where the DISCONNECT packet is written.
* @param[out] pPacketSize Size of the packet written to `pDisconnectPacket`.
*
* @return Always returns #IOT_MQTT_SUCCESS.
*/
IotMqttError_t _IotMqtt_SerializeDisconnect( uint8_t ** pDisconnectPacket,
size_t * pPacketSize );
/**
* @brief Free a packet generated by the serializer.
*
* @param[in] pPacket The packet to free.
*/
void _IotMqtt_FreePacket( uint8_t * pPacket );
/*-------------------- MQTT operation record functions ----------------------*/
/**
* @brief Create a record for a new in-progress MQTT operation.
*
* @param[in] pMqttConnection The MQTT connection to associate with the operation.
* @param[in] flags Flags variable passed to a user-facing MQTT function.
* @param[in] pCallbackInfo User-provided callback function and parameter.
* @param[out] pNewOperation Set to point to the new operation on success.
*
* @return #IOT_MQTT_SUCCESS, #IOT_MQTT_BAD_PARAMETER, or #IOT_MQTT_NO_MEMORY.
*/
IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
_mqttOperation_t ** pNewOperation );
/**
* @brief Decrement the job reference count of an MQTT operation and optionally
* cancel its job.
*
* Checks if the operation may be destroyed afterwards.
*
* @param[in] pOperation The MQTT operation with the job to cancel.
* @param[in] cancelJob Whether to attempt cancellation of the operation's job.
*
* @return `true` if the the operation may be safely destroyed; `false` otherwise.
*/
bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation,
bool cancelJob );
/**
* @brief Free resources used to record an MQTT operation. This is called when
* the operation completes.
*
* @param[in] pOperation The operation which completed.
*/
void _IotMqtt_DestroyOperation( _mqttOperation_t * pOperation );
/**
* @brief Task pool routine for processing an MQTT connection's keep-alive.
*
* @param[in] pTaskPool Pointer to the system task pool.
* @param[in] pKeepAliveJob Pointer the an MQTT connection's keep-alive job.
* @param[in] pContext Pointer to an MQTT connection, passed as an opaque context.
*/
void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
IotTaskPoolJob_t pKeepAliveJob,
void * pContext );
/**
* @brief Task pool routine for processing an incoming PUBLISH message.
*
* @param[in] pTaskPool Pointer to the system task pool.
* @param[in] pPublishJob Pointer to the incoming PUBLISH operation's job.
* @param[in] pContext Pointer to the incoming PUBLISH operation, passed as an
* opaque context.
*/
void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,
IotTaskPoolJob_t pPublishJob,
void * pContext );
/**
* @brief Task pool routine for processing an MQTT operation to send.
*
* @param[in] pTaskPool Pointer to the system task pool.
* @param[in] pSendJob Pointer to an operation's job.
* @param[in] pContext Pointer to the operation to send, passed as an opaque
* context.
*/
void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,
IotTaskPoolJob_t pSendJob,
void * pContext );
/**
* @brief Task pool routine for processing a completed MQTT operation.
*
* @param[in] pTaskPool Pointer to the system task pool.
* @param[in] pOperationJob Pointer to the completed operation's job.
* @param[in] pContext Pointer to the completed operation, passed as an opaque
* context.
*/
void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool,
IotTaskPoolJob_t pOperationJob,
void * pContext );
/**
* @brief Schedule an operation for immediate processing.
*
* @param[in] pOperation The operation to schedule.
* @param[in] jobRoutine The routine to run for the job. Must be either
* #_IotMqtt_ProcessSend, #_IotMqtt_ProcessCompletedOperation, or
* #_IotMqtt_ProcessIncomingPublish.
* @param[in] delay A delay before the operation job should be executed. Pass
* `0` to execute ASAP.
*
* @return #IOT_MQTT_SUCCESS or #IOT_MQTT_SCHEDULING_ERROR.
*/
IotMqttError_t _IotMqtt_ScheduleOperation( _mqttOperation_t * pOperation,
IotTaskPoolRoutine_t jobRoutine,
uint32_t delay );
/**
* @brief Search a list of MQTT operations pending responses using an operation
* name and packet identifier. Removes a matching operation from the list if found.
*
* @param[in] pMqttConnection The connection associated with the operation.
* @param[in] type The operation type to look for.
* @param[in] pPacketIdentifier A packet identifier to match. Pass `NULL` to ignore.
*
* @return Pointer to any matching operation; `NULL` if no match was found.
*/
_mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,
IotMqttOperationType_t type,
const uint16_t * pPacketIdentifier );
/**
* @brief Notify of a completed MQTT operation.
*
* @param[in] pOperation The MQTT operation which completed.
*
* Depending on the parameters passed to a user-facing MQTT function, the
* notification will cause @ref mqtt_function_wait to return or invoke a
* user-provided callback.
*/
void _IotMqtt_Notify( _mqttOperation_t * pOperation );
/*----------------- MQTT subscription management functions ------------------*/
/**
* @brief Add an array of subscriptions to the subscription manager.
*
* @param[in] pMqttConnection The MQTT connection associated with the subscriptions.
* @param[in] subscribePacketIdentifier Packet identifier for the subscriptions'
* SUBSCRIBE packet.
* @param[in] pSubscriptionList The first element in the array.
* @param[in] subscriptionCount Number of elements in `pSubscriptionList`.
*
* @return #IOT_MQTT_SUCCESS or #IOT_MQTT_NO_MEMORY.
*/
IotMqttError_t _IotMqtt_AddSubscriptions( _mqttConnection_t * pMqttConnection,
uint16_t subscribePacketIdentifier,
const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount );
/**
* @brief Process a received PUBLISH from the server, invoking any subscription
* callbacks that have a matching topic filter.
*
* @param[in] pMqttConnection The MQTT connection associated with the received
* PUBLISH.
* @param[in] pCallbackParam The parameter to pass to a PUBLISH callback.
*/
void _IotMqtt_InvokeSubscriptionCallback( _mqttConnection_t * pMqttConnection,
IotMqttCallbackParam_t * pCallbackParam );
/**
* @brief Remove a single subscription from the subscription manager by
* packetIdentifier and order.
*
* @param[in] pMqttConnection The MQTT connection associated with the subscriptions.
* @param[in] packetIdentifier The packet identifier associated with the subscription's
* SUBSCRIBE packet.
* @param[in] order The order of the subscription in the SUBSCRIBE packet.
* Pass `-1` to ignore order and remove all subscriptions for `packetIdentifier`.
*/
void _IotMqtt_RemoveSubscriptionByPacket( _mqttConnection_t * pMqttConnection,
uint16_t packetIdentifier,
int32_t order );
/**
* @brief Remove an array of subscriptions from the subscription manager by
* topic filter.
*
* @param[in] pMqttConnection The MQTT connection associated with the subscriptions.
* @param[in] pSubscriptionList The first element in the array.
* @param[in] subscriptionCount Number of elements in `pSubscriptionList`.
*/
void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnection,
const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount );
/*------------------ MQTT connection management functions -------------------*/
/**
* @brief Attempt to increment the reference count of an MQTT connection.
*
* @param[in] pMqttConnection The referenced MQTT connection.
*
* @return `true` if the reference count was incremented; `false` otherwise. The
* reference count will not be incremented for a disconnected connection.
*/
bool _IotMqtt_IncrementConnectionReferences( _mqttConnection_t * pMqttConnection );
/**
* @brief Decrement the reference count of an MQTT connection.
*
* Also destroys an unreferenced MQTT connection.
*
* @param[in] pMqttConnection The referenced MQTT connection.
*/
void _IotMqtt_DecrementConnectionReferences( _mqttConnection_t * pMqttConnection );
/**
* @brief Read the next available byte on a network connection.
*
* @param[in] pNetworkConnection Reference to the network connection.
* @param[in] pNetworkInterface Function pointers used to interact with the
* network.
* @param[out] pIncomingByte The byte read from the network.
*
* @return `true` if a byte was successfully received from the network; `false`
* otherwise.
*/
bool _IotMqtt_GetNextByte( void * pNetworkConnection,
const IotNetworkInterface_t * pNetworkInterface,
uint8_t * pIncomingByte );
/**
* @brief Closes the network connection associated with an MQTT connection.
*
* A network disconnect function must be set in the network interface for the
* network connection to be closed.
*
* @param[in] disconnectReason A reason to pass to the connection's disconnect
* callback.
* @param[in] pMqttConnection The MQTT connection with the network connection
* to close.
*/
void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason,
_mqttConnection_t * pMqttConnection );
#endif /* ifndef IOT_MQTT_INTERNAL_H_ */