Add the Labs projects provided in the V10.2.1_191129 zip file.

This commit is contained in:
Richard Barry 2019-12-02 23:39:25 +00:00
parent 46e5937529
commit e5708b38e9
801 changed files with 356576 additions and 0 deletions

View file

@ -0,0 +1,859 @@
/*
* IoT MQTT V2.1.0
* Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* @file iot_mqtt.h
* @brief User-facing functions of the MQTT 3.1.1 library.
*/
#ifndef IOT_MQTT_H_
#define IOT_MQTT_H_
/* The config header is always included first. */
#include "iot_config.h"
/* MQTT types include. */
#include "types/iot_mqtt_types.h"
/*------------------------- MQTT library functions --------------------------*/
/**
* @functionspage{mqtt,MQTT library}
* - @functionname{mqtt_function_init}
* - @functionname{mqtt_function_cleanup}
* - @functionname{mqtt_function_receivecallback}
* - @functionname{mqtt_function_connect}
* - @functionname{mqtt_function_disconnect}
* - @functionname{mqtt_function_subscribeasync}
* - @functionname{mqtt_function_subscribesync}
* - @functionname{mqtt_function_unsubscribeasync}
* - @functionname{mqtt_function_unsubscribesync}
* - @functionname{mqtt_function_publishasync}
* - @functionname{mqtt_function_publishsync}
* - @functionname{mqtt_function_wait}
* - @functionname{mqtt_function_strerror}
* - @functionname{mqtt_function_operationtype}
* - @functionname{mqtt_function_issubscribed}
*/
/**
* @functionpage{IotMqtt_Init,mqtt,init}
* @functionpage{IotMqtt_Cleanup,mqtt,cleanup}
* @functionpage{IotMqtt_ReceiveCallback,mqtt,receivecallback}
* @functionpage{IotMqtt_Connect,mqtt,connect}
* @functionpage{IotMqtt_Disconnect,mqtt,disconnect}
* @functionpage{IotMqtt_SubscribeAsync,mqtt,subscribeasync}
* @functionpage{IotMqtt_SubscribeSync,mqtt,subscribesync}
* @functionpage{IotMqtt_UnsubscribeAsync,mqtt,unsubscribeasync}
* @functionpage{IotMqtt_UnsubscribeSync,mqtt,unsubscribesync}
* @functionpage{IotMqtt_PublishAsync,mqtt,publishasync}
* @functionpage{IotMqtt_PublishSync,mqtt,publishsync}
* @functionpage{IotMqtt_Wait,mqtt,wait}
* @functionpage{IotMqtt_strerror,mqtt,strerror}
* @functionpage{IotMqtt_OperationType,mqtt,operationtype}
* @functionpage{IotMqtt_IsSubscribed,mqtt,issubscribed}
*/
/**
* @brief One-time initialization function for the MQTT library.
*
* This function performs setup of the MQTT library. <b>It must be called
* once (and only once) before calling any other MQTT function.</b> Calling this
* function more than once without first calling @ref mqtt_function_cleanup
* may result in a crash.
*
* @return One of the following:
* - #IOT_MQTT_SUCCESS
* - #IOT_MQTT_NOT_INITIALIZED
*
* @warning No thread-safety guarantees are provided for this function.
*
* @see @ref mqtt_function_cleanup
*/
/* @[declare_mqtt_init] */
IotMqttError_t IotMqtt_Init( void );
/* @[declare_mqtt_init] */
/**
* @brief One-time deinitialization function for the MQTT library.
*
* This function frees resources taken in @ref mqtt_function_init. It should be
* called after [closing all MQTT connections](@ref mqtt_function_disconnect) to
* clean up the MQTT library. After this function returns, @ref mqtt_function_init
* must be called again before calling any other MQTT function.
*
* @warning No thread-safety guarantees are provided for this function. Do not
* call this function if any MQTT connections are open!
*
* @see @ref mqtt_function_init
*/
/* @[declare_mqtt_cleanup] */
void IotMqtt_Cleanup( void );
/* @[declare_mqtt_cleanup] */
/**
* @brief Network receive callback for the MQTT library.
*
* This function should be called by the system whenever data is available for
* the MQTT library.
*
* @param[in] pNetworkConnection The network connection associated with the MQTT
* connection, passed by the network stack.
* @param[in] pReceiveContext A pointer to the MQTT connection handle for which
* the packet was received.
*/
/* @[declare_mqtt_receivecallback] */
void IotMqtt_ReceiveCallback( IotNetworkConnection_t pNetworkConnection,
void * pReceiveContext );
/* @[declare_mqtt_receivecallback] */
/**
* @brief Establish a new MQTT connection.
*
* This function opens a connection between a new MQTT client and an MQTT server
* (also called a <i>broker</i>). MQTT connections are established on top of transport
* layer protocols (such as TCP/IP), and optionally, application layer security
* protocols (such as TLS). The MQTT packet that establishes a connection is called
* the MQTT CONNECT packet. After @ref mqtt_function_init, this function must be
* called before any other MQTT library function.
*
* If [pConnectInfo->cleanSession](@ref IotMqttConnectInfo_t.cleanSession) is `true`,
* this function establishes a clean MQTT session. Subscriptions and unacknowledged
* PUBLISH messages will be discarded when the connection is closed.
*
* If [pConnectInfo->cleanSession](@ref IotMqttConnectInfo_t.cleanSession) is `false`,
* this function establishes (or re-establishes) a persistent MQTT session. The parameters
* [pConnectInfo->pPreviousSubscriptions](@ref IotMqttConnectInfo_t.pPreviousSubscriptions)
* and [pConnectInfo->previousSubscriptionCount](@ref IotMqttConnectInfo_t.previousSubscriptionCount)
* may be used to restore subscriptions present in a re-established persistent session.
* Any restored subscriptions <b>MUST</b> have been present in the persistent session;
* <b>this function does not send an MQTT SUBSCRIBE packet!</b>
*
* [pConnectInfo->pPreviousSubscriptions](@ref IotMqttConnectInfo_t.pPreviousSubscriptions)
* and [pConnectInfo->previousSubscriptionCount](@ref IotMqttConnectInfo_t.previousSubscriptionCount) can
* also be used to pass a list of subscriptions to be stored locally without a SUBSCRIBE packet being
* sent to the broker. These subscriptions are useful to invoke application level callbacks for messages received
* on unsolicited topics from the broker.
*
* This MQTT library is network agnostic, meaning it has no knowledge of the
* underlying network protocol carrying the MQTT packets. It interacts with the
* network through a network abstraction layer, allowing it to be used with many
* different network stacks. The network abstraction layer is established
* per-connection, allowing every #IotMqttConnection_t to use a different network
* stack. The parameter `pNetworkInterface` sets up the network abstraction layer
* for an MQTT connection; see the documentation on #IotMqttNetworkInfo_t for details
* on its members.
*
* The `pConnectInfo` parameter provides the contents of the MQTT CONNECT packet.
* Most members [are defined by the MQTT spec.](@ref IotMqttConnectInfo_t). The
* [pConnectInfo->pWillInfo](@ref IotMqttConnectInfo_t.pWillInfo) member provides
* information on a Last Will and Testament (LWT) message to be published if the
* MQTT connection is closed without [sending a DISCONNECT packet]
* (@ref mqtt_function_disconnect). Unlike other PUBLISH
* messages, a LWT message payload is limited to 65535 bytes in length. Additionally,
* the retry [interval](@ref IotMqttPublishInfo_t.retryMs) and [limit]
* (@ref IotMqttPublishInfo_t.retryLimit) members of #IotMqttPublishInfo_t
* are ignored for LWT messages. The LWT message is optional; `pWillInfo` may be NULL.
*
* Unlike @ref mqtt_function_publishasync, @ref mqtt_function_subscribeasync, and
* @ref mqtt_function_unsubscribeasync, this function is always blocking. Additionally,
* because the MQTT connection acknowledgement packet (CONNACK packet) does not
* contain any information on <i>which</i> CONNECT packet it acknowledges, only one
* CONNECT operation may be in progress at any time. This means that parallel
* threads making calls to @ref mqtt_function_connect will be serialized to send
* their CONNECT packets one-by-one.
*
* @param[in] pNetworkInfo Information on the transport-layer network connection
* to use with the MQTT connection.
* @param[in] pConnectInfo MQTT connection setup parameters.
* @param[in] timeoutMs If the MQTT server does not accept the connection within
* this timeout in milliseconds, this function returns #IOT_MQTT_TIMEOUT.
* @param[out] pMqttConnection Set to a newly-initialized MQTT connection handle
* if this function succeeds.
*
* @return One of the following:
* - #IOT_MQTT_SUCCESS
* - #IOT_MQTT_NOT_INITIALIZED
* - #IOT_MQTT_BAD_PARAMETER
* - #IOT_MQTT_NO_MEMORY
* - #IOT_MQTT_NETWORK_ERROR
* - #IOT_MQTT_SCHEDULING_ERROR
* - #IOT_MQTT_BAD_RESPONSE
* - #IOT_MQTT_TIMEOUT
* - #IOT_MQTT_SERVER_REFUSED
*
* <b>Example</b>
* @code{c}
*
* // Callback function to receive messages from the broker on an unsolicited topic.
* void unsolicitedMessageCallback( void * pArgument, IotMqttCallbackParam_t * pPublish );
*
* // Parameters to MQTT connect.
* IotMqttConnection_t mqttConnection = IOT_MQTT_CONNECTION_INITIALIZER;
* IotMqttNetworkInfo_t networkInfo = IOT_MQTT_NETWORK_INFO_INITIALIZER;
* IotMqttConnectInfo_t connectInfo = IOT_MQTT_CONNECT_INFO_INITIALIZER;
* IotMqttPublishInfo_t willInfo = IOT_MQTT_PUBLISH_INFO_INITIALIZER;
*
* // A local subscription to receive messages from the broker on an unsolicited topic.
* IotMqttSubscription_t subscription = IOT_MQTT_SUBSCRIPTION_INITIALIZER;
*
* // Example network abstraction types.
* IotNetworkServerInfo_t serverInfo = { ... };
* IotNetworkCredentials_t credentialInfo = { ... };
* IotNetworkInterface_t networkInterface = { ... };
*
* // Example using a generic network implementation.
* networkInfo.createNetworkConnection = true;
* networkInfo.u.setup.pNetworkServerInfo = &serverInfo;
* networkInfo.u.setup.pNetworkCredentialInfo = &credentialInfo;
* networkInfo.pNetworkInterface = &networkInterface;
*
* // Set the members of the connection info (password and username not used).
* connectInfo.cleanSession = true;
* connectInfo.keepAliveSeconds = 30;
* connectInfo.pClientIdentifier = "uniqueclientidentifier";
* connectInfo.clientIdentifierLength = 22;
*
* // Set the members of the will info (retain and retry not used).
* willInfo.qos = IOT_MQTT_QOS_1;
* willInfo.pTopicName = "will/topic/name";
* willInfo.topicNameLength = ( uint16_t ) strlen( willInfo.pTopicName );
* willInfo.pPayload = "MQTT client unexpectedly disconnected.";
* willInfo.payloadLength = strlen( willInfo.pPayload );
*
* // Set the pointer to the will info.
* connectInfo.pWillInfo = &willInfo;
*
* // [Optional] Set a local subscription to receive the broker messages on an unsolicited topic.
* subscription.qos = IOT_MQTT_QOS_0;
* subscription.pTopicFilter = "some/unsolicited/topic";
* subscription.topicLength = ( uint16_t ) strlen( subscription.pTopicFilter );
* subscription.callback.function = unsolicitedMessageCallback;
* connectInfo.pPreviousSubscriptions = &subscription;
* connectInfo.previousSubscriptionCount = 1;
*
*
* // Call CONNECT with a 5 second block time. Should return
* // IOT_MQTT_SUCCESS when successful.
* IotMqttError_t result = IotMqtt_Connect( &networkInfo,
* &connectInfo,
* 5000,
* &mqttConnection );
*
* if( result == IOT_MQTT_SUCCESS )
* {
* // Do something with the MQTT connection...
*
* // Clean up and close the MQTT connection once it's no longer needed.
* IotMqtt_Disconnect( mqttConnection, 0 );
* }
* @endcode
*/
/* @[declare_mqtt_connect] */
IotMqttError_t IotMqtt_Connect( const IotMqttNetworkInfo_t * pNetworkInfo,
const IotMqttConnectInfo_t * pConnectInfo,
uint32_t timeoutMs,
IotMqttConnection_t * const pMqttConnection );
/* @[declare_mqtt_connect] */
/**
* @brief Closes an MQTT connection and frees resources.
*
* This function closes an MQTT connection and should only be called once
* the MQTT connection is no longer needed. Its exact behavior depends on the
* `flags` parameter.
*
* Normally, `flags` should be `0`. This gracefully shuts down an MQTT
* connection by sending an MQTT DISCONNECT packet. Any [network close function]
* (@ref IotNetworkInterface_t::close) provided [when the connection was established]
* (@ref mqtt_function_connect) will also be called. Note that because the MQTT server
* will not acknowledge a DISCONNECT packet, the client has no way of knowing if
* the server received the DISCONNECT packet. In the case where the DISCONNECT
* packet is lost in transport, any Last Will and Testament (LWT) message established
* with the connection may be published. However, if the DISCONNECT reaches the
* MQTT server, the LWT message will be discarded and not published.
*
* Should the underlying network connection become unusable, this function should
* be called with `flags` set to #IOT_MQTT_FLAG_CLEANUP_ONLY. In this case, no
* DISCONNECT packet will be sent, though the [network close function](@ref IotNetworkInterface_t::close)
* will still be called. This function will only free the resources used by the MQTT
* connection; it still must be called even if the network is offline to avoid leaking
* resources.
*
* @ref mqtt_function_disconnect modifies `mqttConnection`, so it shouldn't
* be used after calling this function.
*
* @param[in] mqttConnection The MQTT connection to close and clean up.
* @param[in] flags Flags which modify the behavior of this function. See @ref mqtt_constants_flags.
*/
/* @[declare_mqtt_disconnect] */
void IotMqtt_Disconnect( IotMqttConnection_t mqttConnection,
uint32_t flags );
/* @[declare_mqtt_disconnect] */
/**
* @brief Subscribes to the given array of topic filters and optionally
* receive an asynchronous notification when the subscribe completes.
*
* This function sends an MQTT SUBSCRIBE packet to the server. A SUBSCRIBE
* packet notifies the server to send any matching PUBLISH messages to this client.
* A single SUBSCRIBE packet may carry more than one topic filter, hence the
* parameters to this function include an array of [subscriptions]
* (@ref IotMqttSubscription_t).
*
* An MQTT subscription has two pieces:
* 1. The subscription topic filter registered with the MQTT server. The MQTT
* SUBSCRIBE packet sent from this client to server notifies the server to send
* messages matching the given topic filters to this client.
* 2. The [callback function](@ref IotMqttCallbackInfo_t.function) that this
* client will invoke when an incoming message is received. The callback function
* notifies applications of an incoming PUBLISH message.
*
* The helper function @ref mqtt_function_issubscribed can be used to check if a
* [callback function](@ref IotMqttCallbackInfo_t.function) is registered for
* a particular topic filter.
*
* To modify an already-registered subscription callback, call this function with
* a new `pSubscriptionList`. Any topic filters in `pSubscriptionList` that already
* have a registered callback will be replaced with the new values in `pSubscriptionList`.
*
* @attention QoS 2 subscriptions are currently unsupported. Only 0 or 1 are valid
* for subscription QoS.
*
* @param[in] mqttConnection The MQTT connection to use for the subscription.
* @param[in] pSubscriptionList Pointer to the first element in the array of
* subscriptions.
* @param[in] subscriptionCount The number of elements in pSubscriptionList.
* @param[in] flags Flags which modify the behavior of this function. See @ref mqtt_constants_flags.
* @param[in] pCallbackInfo Asynchronous notification of this function's completion (`NULL` to disable).
* @param[out] pSubscribeOperation Set to a handle by which this operation may be
* referenced after this function returns. This reference is invalidated once
* the subscription operation completes.
*
* @return This function will return #IOT_MQTT_STATUS_PENDING upon success.
* @return Upon completion of the subscription (either through an
* #IotMqttCallbackInfo_t or @ref mqtt_function_wait), the status will be one of:
* - #IOT_MQTT_SUCCESS
* - #IOT_MQTT_NETWORK_ERROR
* - #IOT_MQTT_SCHEDULING_ERROR
* - #IOT_MQTT_BAD_RESPONSE
* - #IOT_MQTT_SERVER_REFUSED
* @return If this function fails before queuing a subscribe operation, it will return
* one of:
* - #IOT_MQTT_NOT_INITIALIZED
* - #IOT_MQTT_BAD_PARAMETER
* - #IOT_MQTT_NO_MEMORY
*
* @see @ref mqtt_function_subscribesync for a blocking variant of this function.
* @see @ref mqtt_function_unsubscribeasync for the function that removes subscriptions.
*
* <b>Example</b>
* @code{c}
* #define NUMBER_OF_SUBSCRIPTIONS ...
*
* // Subscription callback function.
* void subscriptionCallback( void * pArgument, IotMqttCallbackParam_t * pPublish );
*
* // An initialized and connected MQTT connection.
* IotMqttConnection_t mqttConnection;
*
* // Subscription information.
* pSubscriptions[ NUMBER_OF_SUBSCRIPTIONS ] = { IOT_MQTT_SUBSCRIPTION_INITIALIZER };
* IotMqttOperation_t lastOperation = IOT_MQTT_OPERATION_INITIALIZER;
*
* // Set the subscription information.
* for( int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++ )
* {
* pSubscriptions[ i ].qos = IOT_MQTT_QOS_1;
* pSubscriptions[ i ].pTopicFilter = "some/topic/filter";
* pSubscriptions[ i ].topicLength = ( uint16_t ) strlen( pSubscriptions[ i ].pTopicFilter );
* pSubscriptions[ i ].callback.function = subscriptionCallback;
* }
*
* IotMqttError_t result = IotMqtt_SubscribeAsync( mqttConnection,
* pSubscriptions,
* NUMBER_OF_SUBSCRIPTIONS,
* IOT_MQTT_FLAG_WAITABLE,
* NULL,
* &lastOperation );
*
* // Subscribe returns IOT_MQTT_STATUS_PENDING when successful. Wait up to
* // 5 seconds for the operation to complete.
* if( result == IOT_MQTT_STATUS_PENDING )
* {
* result = IotMqtt_Wait( subscriptionRef, 5000 );
* }
*
* // Check that the subscriptions were successful.
* if( result == IOT_MQTT_SUCCESS )
* {
* // Wait for messages on the subscription topic filters...
*
* // Unsubscribe once the subscriptions are no longer needed.
* result = IotMqtt_UnsubscribeAsync( mqttConnection,
* pSubscriptions,
* NUMBER_OF_SUBSCRIPTIONS,
* IOT_MQTT_FLAG_WAITABLE,
* NULL,
* &lastOperation );
*
* // UNSUBSCRIBE returns IOT_MQTT_STATUS_PENDING when successful.
* // Wait up to 5 seconds for the operation to complete.
* if( result == IOT_MQTT_STATUS_PENDING )
* {
* result = IotMqtt_Wait( lastOperation, 5000 );
* }
* }
* // Check which subscriptions were rejected by the server.
* else if( result == IOT_MQTT_SERVER_REFUSED )
* {
* for( int i = 0; i < NUMBER_OF_SUBSCRIPTIONS; i++ )
* {
* if( IotMqtt_IsSubscribed( mqttConnection,
* pSubscriptions[ i ].pTopicFilter,
* pSubscriptions[ i ].topicFilterLength,
* NULL ) == false )
* {
* // This subscription was rejected.
* }
* }
* }
* @endcode
*/
/* @[declare_mqtt_subscribeasync] */
IotMqttError_t IotMqtt_SubscribeAsync( IotMqttConnection_t mqttConnection,
const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
IotMqttOperation_t * const pSubscribeOperation );
/* @[declare_mqtt_subscribeasync] */
/**
* @brief Subscribes to the given array of topic filters with a timeout.
*
* This function sends an MQTT SUBSCRIBE packet to the server, then waits for
* a server response to the packet. Internally, this function is a call to @ref
* mqtt_function_subscribeasync followed by @ref mqtt_function_wait. See @ref
* mqtt_function_subscribeasync for more information about the MQTT SUBSCRIBE operation.
*
* @attention QoS 2 subscriptions are currently unsupported. Only 0 or 1 are valid
* for subscription QoS.
*
* @param[in] mqttConnection The MQTT connection to use for the subscription.
* @param[in] pSubscriptionList Pointer to the first element in the array of
* subscriptions.
* @param[in] subscriptionCount The number of elements in pSubscriptionList.
* @param[in] flags Flags which modify the behavior of this function. See @ref mqtt_constants_flags.
* Currently, flags are ignored by this function; this parameter is for
* future-compatibility.
* @param[in] timeoutMs If the MQTT server does not acknowledge the subscriptions within
* this timeout in milliseconds, this function returns #IOT_MQTT_TIMEOUT.
*
* @return One of the following:
* - #IOT_MQTT_SUCCESS
* - #IOT_MQTT_NOT_INITIALIZED
* - #IOT_MQTT_BAD_PARAMETER
* - #IOT_MQTT_NO_MEMORY
* - #IOT_MQTT_NETWORK_ERROR
* - #IOT_MQTT_SCHEDULING_ERROR
* - #IOT_MQTT_BAD_RESPONSE
* - #IOT_MQTT_TIMEOUT
* - #IOT_MQTT_SERVER_REFUSED
*/
/* @[declare_mqtt_subscribesync] */
IotMqttError_t IotMqtt_SubscribeSync( IotMqttConnection_t mqttConnection,
const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount,
uint32_t flags,
uint32_t timeoutMs );
/* @[declare_mqtt_subscribesync] */
/**
* @brief Unsubscribes from the given array of topic filters and optionally
* receive an asynchronous notification when the unsubscribe completes.
*
* This function sends an MQTT UNSUBSCRIBE packet to the server. An UNSUBSCRIBE
* packet removes registered topic filters from the server. After unsubscribing,
* the server will no longer send messages on these topic filters to the client.
*
* Corresponding [subscription callback functions](@ref IotMqttCallbackInfo_t.function)
* are also removed from the MQTT connection. These subscription callback functions
* will be removed even if the MQTT UNSUBSCRIBE packet fails to send.
*
* @param[in] mqttConnection The MQTT connection used for the subscription.
* @param[in] pSubscriptionList Pointer to the first element in the array of
* subscriptions.
* @param[in] subscriptionCount The number of elements in pSubscriptionList.
* @param[in] flags Flags which modify the behavior of this function. See @ref mqtt_constants_flags.
* @param[in] pCallbackInfo Asynchronous notification of this function's completion (`NULL` to disable).
* @param[out] pUnsubscribeOperation Set to a handle by which this operation may be
* referenced after this function returns. This reference is invalidated once
* the unsubscribe operation completes.
*
* @return This function will return #IOT_MQTT_STATUS_PENDING upon success.
* @return Upon completion of the unsubscribe (either through an
* #IotMqttCallbackInfo_t or @ref mqtt_function_wait), the status will be one of:
* - #IOT_MQTT_SUCCESS
* - #IOT_MQTT_NETWORK_ERROR
* - #IOT_MQTT_SCHEDULING_ERROR
* - #IOT_MQTT_BAD_RESPONSE
* @return If this function fails before queuing an unsubscribe operation, it will return
* one of:
* - #IOT_MQTT_NOT_INITIALIZED
* - #IOT_MQTT_BAD_PARAMETER
* - #IOT_MQTT_NO_MEMORY
*
* @see @ref mqtt_function_unsubscribesync for a blocking variant of this function.
* @see @ref mqtt_function_subscribeasync for the function that adds subscriptions.
*/
/* @[declare_mqtt_unsubscribeasync] */
IotMqttError_t IotMqtt_UnsubscribeAsync( IotMqttConnection_t mqttConnection,
const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
IotMqttOperation_t * const pUnsubscribeOperation );
/* @[declare_mqtt_unsubscribeasync] */
/**
* @brief Unsubscribes from a given array of topic filters with a timeout.
*
* This function sends an MQTT UNSUBSCRIBE packet to the server, then waits
* for a server response to the packet. Internally, this function is a call to
* @ref mqtt_function_unsubscribeasync followed by @ref mqtt_function_wait. See @ref
* mqtt_function_unsubscribeasync for more information about the MQTT UNSUBSCRIBE
* operation.
*
* @param[in] mqttConnection The MQTT connection used for the subscription.
* @param[in] pSubscriptionList Pointer to the first element in the array of
* subscriptions.
* @param[in] subscriptionCount The number of elements in pSubscriptionList.
* @param[in] flags Flags which modify the behavior of this function. See @ref mqtt_constants_flags.
* Flags are currently ignored but reserved for future use.
* @param[in] timeoutMs If the MQTT server does not acknowledge the UNSUBSCRIBE within
* this timeout in milliseconds, this function returns #IOT_MQTT_TIMEOUT.
*
* @return One of the following:
* - #IOT_MQTT_SUCCESS
* - #IOT_MQTT_NOT_INITIALIZED
* - #IOT_MQTT_BAD_PARAMETER
* - #IOT_MQTT_NO_MEMORY
* - #IOT_MQTT_NETWORK_ERROR
* - #IOT_MQTT_SCHEDULING_ERROR
* - #IOT_MQTT_BAD_RESPONSE
*/
/* @[declare_mqtt_unsubscribesync] */
IotMqttError_t IotMqtt_UnsubscribeSync( IotMqttConnection_t mqttConnection,
const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount,
uint32_t flags,
uint32_t timeoutMs );
/* @[declare_mqtt_unsubscribesync] */
/**
* @brief Publishes a message to the given topic name and optionally
* receive an asynchronous notification when the publish completes.
*
* This function sends an MQTT PUBLISH packet to the server. A PUBLISH packet
* contains a payload and a topic name. Any clients with a subscription on a
* topic filter matching the PUBLISH topic name will receive a copy of the
* PUBLISH packet from the server.
*
* If a PUBLISH packet fails to reach the server and it is not a QoS 0 message,
* it will be retransmitted. See #IotMqttPublishInfo_t for a description
* of the retransmission strategy.
*
* @attention QoS 2 messages are currently unsupported. Only 0 or 1 are valid
* for message QoS.
*
* @param[in] mqttConnection The MQTT connection to use for the publish.
* @param[in] pPublishInfo MQTT publish parameters.
* @param[in] flags Flags which modify the behavior of this function. See @ref mqtt_constants_flags.
* @param[in] pCallbackInfo Asynchronous notification of this function's completion (`NULL` to disable).
* @param[out] pPublishOperation Set to a handle by which this operation may be
* referenced after this function returns. This reference is invalidated once
* the publish operation completes.
*
* @return This function will return #IOT_MQTT_STATUS_PENDING upon success for
* QoS 1 publishes. For a QoS 0 publish it returns #IOT_MQTT_SUCCESS upon
* success.
* @return Upon completion of a QoS 1 publish (either through an
* #IotMqttCallbackInfo_t or @ref mqtt_function_wait), the status will be one of:
* - #IOT_MQTT_SUCCESS
* - #IOT_MQTT_NETWORK_ERROR
* - #IOT_MQTT_SCHEDULING_ERROR
* - #IOT_MQTT_BAD_RESPONSE
* - #IOT_MQTT_RETRY_NO_RESPONSE (if [pPublishInfo->retryMs](@ref IotMqttPublishInfo_t.retryMs)
* and [pPublishInfo->retryLimit](@ref IotMqttPublishInfo_t.retryLimit) were set).
* @return If this function fails before queuing an publish operation (regardless
* of QoS), it will return one of:
* - #IOT_MQTT_NOT_INITIALIZED
* - #IOT_MQTT_BAD_PARAMETER
* - #IOT_MQTT_NO_MEMORY
*
* @note The parameters `pCallbackInfo` and `pPublishOperation` should only be used for QoS
* 1 publishes. For QoS 0, they should both be `NULL`.
*
* @see @ref mqtt_function_publishsync for a blocking variant of this function.
*
* <b>Example</b>
* @code{c}
* // An initialized and connected MQTT connection.
* IotMqttConnection_t mqttConnection;
*
* // Publish information.
* IotMqttPublishInfo_t publishInfo = IOT_MQTT_PUBLISH_INFO_INITIALIZER;
*
* // Set the publish information. QoS 0 example (retain not used):
* publishInfo.qos = IOT_MQTT_QOS_0;
* publishInfo.pTopicName = "some/topic/name";
* publishInfo.topicNameLength = ( uint16_t ) strlen( publishInfo.pTopicName );
* publishInfo.pPayload = "payload";
* publishInfo.payloadLength = strlen( publishInfo.pPayload );
*
* // QoS 0 publish should return IOT_MQTT_SUCCESS upon success.
* IotMqttError_t qos0Result = IotMqtt_PublishAsync( mqttConnection,
* &publishInfo,
* 0,
* NULL,
* NULL );
*
* // QoS 1 with retry example (using same topic name and payload as QoS 0 example):
* IotMqttOperation_t qos1Operation = IOT_MQTT_OPERATION_INITIALIZER;
* publishInfo.qos = IOT_MQTT_QOS_1;
* publishInfo.retryMs = 1000; // Retry if no response is received in 1 second.
* publishInfo.retryLimit = 5; // Retry up to 5 times.
*
* // QoS 1 publish should return IOT_MQTT_STATUS_PENDING upon success.
* IotMqttError_t qos1Result = IotMqtt_PublishAsync( mqttConnection,
* &publishInfo,
* IOT_MQTT_FLAG_WAITABLE,
* NULL,
* &qos1Operation );
*
* // Wait up to 5 seconds for the publish to complete.
* if( qos1Result == IOT_MQTT_STATUS_PENDING )
* {
* qos1Result = IotMqtt_Wait( qos1Operation, 5000 );
* }
* @endcode
*/
/* @[declare_mqtt_publishasync] */
IotMqttError_t IotMqtt_PublishAsync( IotMqttConnection_t mqttConnection,
const IotMqttPublishInfo_t * pPublishInfo,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
IotMqttOperation_t * const pPublishOperation );
/* @[declare_mqtt_publishasync] */
/**
* @brief Publish a message to the given topic name with a timeout.
*
* This function sends an MQTT PUBLISH packet to the server, then waits for
* a server response to the packet. Internally, this function is a call to @ref
* mqtt_function_publishasync followed by @ref mqtt_function_wait. See @ref
* mqtt_function_publishasync for more information about the MQTT PUBLISH operation.
*
* @attention QoS 2 messages are currently unsupported. Only 0 or 1 are valid
* for message QoS.
*
* @param[in] mqttConnection The MQTT connection to use for the publish.
* @param[in] pPublishInfo MQTT publish parameters.
* @param[in] flags Flags which modify the behavior of this function. See @ref mqtt_constants_flags.
* Currently, flags are ignored by this function; this parameter is for
* future-compatibility.
* @param[in] timeoutMs If the MQTT server does not acknowledge a QoS 1 PUBLISH
* within this timeout in milliseconds, this function returns #IOT_MQTT_TIMEOUT.
* This parameter is ignored for QoS 0 PUBLISH messages.
*
* @return One of the following:
* - #IOT_MQTT_SUCCESS
* - #IOT_MQTT_NOT_INITIALIZED
* - #IOT_MQTT_BAD_PARAMETER
* - #IOT_MQTT_NO_MEMORY
* - #IOT_MQTT_NETWORK_ERROR
* - #IOT_MQTT_SCHEDULING_ERROR
* - #IOT_MQTT_BAD_RESPONSE
* - #IOT_MQTT_RETRY_NO_RESPONSE (if [pPublishInfo->retryMs](@ref IotMqttPublishInfo_t.retryMs)
* and [pPublishInfo->retryLimit](@ref IotMqttPublishInfo_t.retryLimit) were set).
*/
/* @[declare_mqtt_publishsync] */
IotMqttError_t IotMqtt_PublishSync( IotMqttConnection_t mqttConnection,
const IotMqttPublishInfo_t * pPublishInfo,
uint32_t flags,
uint32_t timeoutMs );
/* @[declare_mqtt_publishsync] */
/**
* @brief Waits for an operation to complete.
*
* This function blocks to wait for a [subscribe](@ref mqtt_function_subscribeasync),
* [unsubscribe](@ref mqtt_function_unsubscribeasync), or [publish]
* (@ref mqtt_function_publishasync) to complete. These operations are by default
* asynchronous; the function calls queue an operation for processing, and a
* callback is invoked once the operation is complete.
*
* To use this function, the flag #IOT_MQTT_FLAG_WAITABLE must have been
* set in the operation's function call. Additionally, this function must always
* be called with any waitable operation to clean up resources.
*
* Regardless of its return value, this function always clean up resources used
* by the waitable operation. This means `reference` is invalidated as soon as
* this function returns, even if it returns #IOT_MQTT_TIMEOUT or another error.
*
* @param[in] operation Reference to the operation to wait for. The flag
* #IOT_MQTT_FLAG_WAITABLE must have been set for this operation.
* @param[in] timeoutMs How many milliseconds to wait before returning
* #IOT_MQTT_TIMEOUT.
*
* @return The return value of this function depends on the MQTT operation associated
* with `reference`. See #IotMqttError_t for possible return values.
*
* <b>Example</b>
* @code{c}
* // Operation reference and timeout.
* IotMqttOperation_t publishOperation = IOT_MQTT_OPERATION_INITIALIZER;
* uint32_t timeoutMs = 5000; // 5 seconds
*
* // MQTT operation to wait for.
* IotMqttError_t result = IotMqtt_PublishAsync( mqttConnection,
* &publishInfo,
* IOT_MQTT_FLAG_WAITABLE,
* NULL,
* &publishOperation );
*
* // Publish should have returned IOT_MQTT_STATUS_PENDING. The call to wait
* // returns once the result of the publish is available or the timeout expires.
* if( result == IOT_MQTT_STATUS_PENDING )
* {
* result = IotMqtt_Wait( publishOperation, timeoutMs );
*
* // After the call to wait, the result of the publish is known
* // (not IOT_MQTT_STATUS_PENDING).
* assert( result != IOT_MQTT_STATUS_PENDING );
* }
* @endcode
*/
/* @[declare_mqtt_wait] */
IotMqttError_t IotMqtt_Wait( IotMqttOperation_t operation,
uint32_t timeoutMs );
/* @[declare_mqtt_wait] */
/*-------------------------- MQTT helper functions --------------------------*/
/**
* @brief Returns a string that describes an #IotMqttError_t.
*
* Like the POSIX `strerror`, this function returns a string describing a
* return code. In this case, the return code is an MQTT library error code,
* `status`.
*
* The string returned by this function <b>MUST</b> be treated as read-only: any
* attempt to modify its contents may result in a crash. Therefore, this function
* is limited to usage in logging.
*
* @param[in] status The status to describe.
*
* @return A read-only string that describes `status`.
*
* @warning The string returned by this function must never be modified.
*/
/* @[declare_mqtt_strerror] */
const char * IotMqtt_strerror( IotMqttError_t status );
/* @[declare_mqtt_strerror] */
/**
* @brief Returns a string that describes an #IotMqttOperationType_t.
*
* This function returns a string describing an MQTT operation type, `operation`.
*
* The string returned by this function <b>MUST</b> be treated as read-only: any
* attempt to modify its contents may result in a crash. Therefore, this function
* is limited to usage in logging.
*
* @param[in] operation The operation to describe.
*
* @return A read-only string that describes `operation`.
*
* @warning The string returned by this function must never be modified.
*/
/* @[declare_mqtt_operationtype] */
const char * IotMqtt_OperationType( IotMqttOperationType_t operation );
/* @[declare_mqtt_operationtype] */
/**
* @brief Check if an MQTT connection has a subscription for a topic filter.
*
* This function checks whether an MQTT connection `mqttConnection` has a
* subscription callback registered for a topic filter `pTopicFilter`. If a
* subscription callback is found, its details are copied into the output parameter
* `pCurrentSubscription`. This subscription callback will be invoked for incoming
* PUBLISH messages on `pTopicFilter`.
*
* <b>The check for a matching subscription is only performed client-side</b>;
* therefore, this function should not be relied upon for perfect accuracy. For
* example, this function may return an incorrect result if the MQTT server
* crashes and drops subscriptions without informing the client.
*
* Note that an MQTT connection's subscriptions might change between the time this
* function checks the subscription list and its caller tests the return value.
* This function certainly should not be used concurrently with any pending SUBSCRIBE
* or UNSUBSCRIBE operations.
*
* One suitable use of this function is to check <i>which</i> subscriptions were rejected
* if @ref mqtt_function_subscribeasync returns #IOT_MQTT_SERVER_REFUSED; that return
* code only means that <i>at least one</i> subscription was rejected.
*
* @param[in] mqttConnection The MQTT connection to check.
* @param[in] pTopicFilter The topic filter to check.
* @param[in] topicFilterLength Length of `pTopicFilter`.
* @param[out] pCurrentSubscription If a subscription is found, its details are
* copied here. This output parameter is only valid if this function returns `true` (`NULL` to disable).
*
* @return `true` if a subscription was found; `false` otherwise.
*
* @note The subscription QoS is not stored by the MQTT library; therefore,
* `pCurrentSubscription->qos` will always be set to #IOT_MQTT_QOS_0.
*/
/* @[declare_mqtt_issubscribed] */
bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,
const char * pTopicFilter,
uint16_t topicFilterLength,
IotMqttSubscription_t * const pCurrentSubscription );
/* @[declare_mqtt_issubscribed] */
/**
* @cond DOXYGEN_IGNORE
* Doxygen should ignore this section.
*
* Backwards compatibility macros for previous function names.
*/
#define IotMqtt_Subscribe IotMqtt_SubscribeAsync
#define IotMqtt_TimedSubscribe IotMqtt_SubscribeSync
#define IotMqtt_Unsubscribe IotMqtt_UnsubscribeAsync
#define IotMqtt_TimedUnsubscribe IotMqtt_UnsubscribeSync
#define IotMqtt_Publish IotMqtt_PublishAsync
#define IotMqtt_TimedPublish IotMqtt_PublishSync
/** @endcond */
#endif /* ifndef IOT_MQTT_H_ */

View file

@ -0,0 +1,702 @@
/*
* IoT MQTT V2.1.0
* Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* @file iot_mqtt_serialize.h
* @brief User-facing functions for serializing MQTT 3.1.1 packets. This header should
* be included for building a single threaded light-weight MQTT client bypassing
* stateful CSDK MQTT library.
*/
#ifndef _IOT_MQTT_SERIALIZE_H_
#define _IOT_MQTT_SERIALIZE_H_
/* The config header is always included first. */
#include "iot_config.h"
/* MQTT types include. */
#include "types/iot_mqtt_types.h"
/*------------------------- MQTT library functions --------------------------*/
/**
* @functionspage{mqtt,MQTT library}
* - @functionname{mqtt_function_getconnectpacketsize}
* - @functionname{mqtt_function_serializeconnect}
* - @functionname{mqtt_function_getsubscriptionpacketsize}
* - @functionname{mqtt_function_serializesubscribe}
* - @functionname{mqtt_function_serializeunsubscribe}
* - @functionname{mqtt_function_getpublishpacketsize}
* - @functionname{mqtt_function_serializepublish}
* - @functionname{mqtt_function_serializedisconnect}
* - @functionname{mqtt_function_serializepingreq}
* - @functionname{mqtt_function_getincomingmqttpackettypeandlength}
* - @functionname{mqtt_function_deserializeresponse}
* - @functionname{mqtt_function_deserializepublish}
*/
/**
* @functionpage{IotMqtt_GetConnectPacketSize,mqtt,getconnectpacketsize}
* @functionpage{IotMqtt_SerializeConnect,mqtt,serializeconnect}
* @functionpage{IotMqtt_GetSubscriptionPacketSize,mqtt,getsubscriptionpacketsize}
* @functionpage{IotMqtt_SerializeSubscribe,mqtt,serializesubscribe}
* @functionpage{IotMqtt_SerializeUnsubscribe,mqtt,serializeunsubscribe}
* @functionpage{IotMqtt_GetPublishPacketSize,mqtt,getpublishpacketsize}
* @functionpage{IotMqtt_SerializePublish,mqtt,serializepublish}
* @functionpage{IotMqtt_SerializeDisconnect,mqtt,serializedisconnect}
* @functionpage{IotMqtt_SerializePingreq,mqtt,serializepingreq}
* @functionpage{IotMqtt_GetIncomingMQTTPacketTypeAndLength,mqtt,getincomingmqttpackettypeandlength}
* @functionpage{IotMqtt_DeserializeResponse,mqtt,deserializeresponse}
* @functionpage{IotMqtt_DeserializePublish,mqtt,deserializepublish}
*/
/**
* @brief Calculate the size and "Remaining length" of a CONNECT packet generated
* from the given parameters.
*
* @param[in] pConnectInfo User-provided CONNECT information struct.
* @param[out] pRemainingLength Output for calculated "Remaining length" field.
* @param[out] pPacketSize Output for calculated total packet size.
*
* @return IOT_MQTT_SUCCESS if the packet is within the length allowed by MQTT 3.1.1 spec;
* IOT_MQTT_BAD_PARAMETER otherwise. If this function returns `IOT_MQTT_BAD_PARAMETER`,
* the output parameters should be ignored.
*
* @note This call is part of serializer API used for implementing light-weight MQTT client.
*
* <b>Example</b>
* @code{c}
* // Example code below shows how IotMqtt_GetConnectPacketSize() should be used to calculate
* // the size of connect request.
*
* IotMqttConnectInfo_t xConnectInfo;
* size_t xRemainingLength = 0;
* size_t xPacketSize = 0;
* IotMqttError_t xResult;
*
* // start with everything set to zero
* memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) );
*
* // Initialize connection info, details are out of scope for this example.
* _initializeConnectInfo( &xConnectInfo );
* // Get size requirement for the connect packet
* xResult = IotMqtt_GetConnectPacketSize( &xConnectInfo, &xRemainingLength, &xPacketSize );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
*
* // Application should allocate buffer with size == xPacketSize or use static buffer
* // with size >= xPacketSize to serialize connect request.
* @endcode
*/
/* @[declare_mqtt_getconnectpacketsize] */
IotMqttError_t IotMqtt_GetConnectPacketSize( const IotMqttConnectInfo_t * pConnectInfo,
size_t * pRemainingLength,
size_t * pPacketSize );
/* @[declare_mqtt_getconnectpacketsize] */
/**
* @brief Generate a CONNECT packet from the given parameters.
*
* @param[in] pConnectInfo User-provided CONNECT information.
* @param[in] remainingLength remaining length of the packet to be serialized.
* @param[in, out] pBuffer User provided buffer where the CONNECT packet is written.
* @param[in] bufferSize Size of the buffer pointed to by pBuffer.
*
* @return #IOT_MQTT_SUCCESS or #IOT_MQTT_NO_MEMORY.
*
* @note pBuffer must be allocated by caller. Use @ref mqtt_function_getconnectpacketsize
* to determine the required size.
* @note This call is part of serializer API used for implementing light-weight MQTT client.
*
* <b>Example</b>
* @code{c}
* // Example code below shows how IotMqtt_SerializeConnect() should be used to serialize
* // MQTT connect packet and send it to MQTT broker.
* // Example uses static memory but dynamically allocated memory can be used as well.
* // Get size requirement for the connect packet.
*
* #define mqttexampleSHARED_BUFFER_SIZE 100
* static ucSharedBuffer[mqttexampleSHARED_BUFFER_SIZE];
* void sendConnectPacket( int xMQTTSocket )
* {
* IotMqttConnectInfo_t xConnectInfo;
* size_t xRemainingLength = 0;
* size_t xPacketSize = 0;
* IotMqttError_t xResult;
* size_t xSentBytes = 0;
* // Get size requirement for MQTT connect packet.
* xResult = IotMqtt_GetConnectPacketSize( &xConnectInfo, &xRemainingLength, &xPacketSize );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
* // Make sure the packet size is less than static buffer size
* IotMqtt_Assert( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );
* // Serialize MQTT connect packet into provided buffer
* xResult = IotMqtt_SerializeConnect( &xConnectInfo, xRemainingLength, ucSharedBuffer, xPacketSize );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
* // xMQTTSocket here is posix socket created and connected to MQTT broker outside of this function.
* xSentBytes = send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 );
* IotMqtt_Assert( xSentBytes == xPacketSize );
* }
* @endcode
*/
/* @[declare_mqtt_serializeconnect] */
IotMqttError_t IotMqtt_SerializeConnect( const IotMqttConnectInfo_t * pConnectInfo,
size_t remainingLength,
uint8_t * pBuffer,
size_t bufferSize );
/* @[declare_mqtt_serializeconnect] */
/**
* @brief Calculate the size and "Remaining length" of a SUBSCRIBE or UNSUBSCRIBE
* packet generated from the given parameters.
*
* @param[in] type Either IOT_MQTT_SUBSCRIBE or IOT_MQTT_UNSUBSCRIBE.
* @param[in] pSubscriptionList User-provided array of subscriptions.
* @param[in] subscriptionCount Size of `pSubscriptionList`.
* @param[out] pRemainingLength Output for calculated "Remaining length" field.
* @param[out] pPacketSize Output for calculated total packet size.
*
* @return IOT_MQTT_SUCCESS if the packet is within the length allowed by MQTT 3.1.1 spec;
* IOT_MQTT_BAD_PARAMETER otherwise. If this function returns IOT_MQTT_BAD_PARAMETER,
* the output parameters should be ignored.
*
* @note This call is part of serializer API used for implementing light-weight MQTT client.
*
* <b>Example</b>
* @code{c}
* // Example code below shows how IotMqtt_GetSubscriptionPacketSize() should be used to calculate
* // the size of subscribe or unsubscribe request.
*
* IotMqttError_t xResult;
* IotMqttSubscription_t xMQTTSubscription[ 1 ];
* size_t xRemainingLength = 0;
* size_t xPacketSize = 0;
*
* // Initialize Subscribe parameters. Details are out of scope for this example.
* // It will involve setting QOS, topic filter and topic filter length.
* _initializeSubscribe( xMQTTSubscription );
*
* xResult = IotMqtt_GetSubscriptionPacketSize( IOT_MQTT_SUBSCRIBE,
* xMQTTSubscription,
* sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),
* &xRemainingLength, &xPacketSize );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
*
* // Application should allocate buffer with size == xPacketSize or use static buffer
* // with size >= xPacketSize to serialize connect request.
* @endcode
*/
/* @[declare_mqtt_getsubscriptionpacketsize] */
IotMqttError_t IotMqtt_GetSubscriptionPacketSize( IotMqttOperationType_t type,
const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount,
size_t * pRemainingLength,
size_t * pPacketSize );
/* @[declare_mqtt_getsubscriptionpacketsize] */
/**
* @brief Generate a SUBSCRIBE packet from the given parameters.
*
* @param[in] pSubscriptionList User-provided array of subscriptions.
* @param[in] subscriptionCount Size of `pSubscriptionList`.
* @param[in] remainingLength remaining length of the packet to be serialized.
* @param[out] pPacketIdentifier The packet identifier generated for this SUBSCRIBE.
* @param[in, out] pBuffer User provide buffer where the SUBSCRIBE packet is written.
* @param[in] bufferSize Size of the buffer pointed to by pBuffer.
*
* @return #IOT_MQTT_SUCCESS or #IOT_MQTT_NO_MEMORY.
*
* @note pBuffer must be allocated by caller.
* @note This call is part of serializer API used for implementing light-weight MQTT client.
* <b>Example</b>
* @code{c}
* // Example code below shows how IotMqtt_SerializeSubscribe() should be used to serialize
* // MQTT Subscribe packet and send it to MQTT broker.
* // Example uses static memory, but dynamically allocated memory can be used as well.
* // Get size requirement for the MQTT subscribe packet.
*
* #define mqttexampleSHARED_BUFFER_SIZE 100
* static ucSharedBuffer[mqttexampleSHARED_BUFFER_SIZE];
* void sendSubscribePacket( int xMQTTSocket )
* {
* IotMqttSubscription_t xMQTTSubscription[ 1 ];
* size_t xRemainingLength = 0;
* size_t xPacketSize = 0;
* IotMqttError_t xResult;
* size_t xSentBytes = 0;
*
* // Initialize Subscribe parameters. Details are out of scope for this example.
* // It will involve setting QOS, topic filter and topic filter length.
* _initializeSubscribe( xMQTTSubscription );
* // Get size requirement for MQTT Subscribe packet.
* xResult = IotMqtt_GetSubscriptionPacketSize( IOT_MQTT_SUBSCRIBE,
* xMQTTSubscription,
* sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),
* &xRemainingLength, &xPacketSize );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
* // Make sure the packet size is less than static buffer size.
* IotMqtt_Assert( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );
*
* // Serialize subscribe into statically allocated ucSharedBuffer.
* xResult = IotMqtt_SerializeSubscribe( xMQTTSubscription,
* sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),
* xRemainingLength,
* &usPacketIdentifier,
* ucSharedBuffer,
* xPacketSize );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
* // xMQTTSocket here is posix socket created and connected to MQTT broker outside of this function.
* xSentBytes = send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 );
* IotMqtt_Assert( xSentBytes == xPacketSize );
* }
* @endcode
*/
/* @[declare_mqtt_serializesubscribe] */
IotMqttError_t IotMqtt_SerializeSubscribe( const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount,
size_t remainingLength,
uint16_t * pPacketIdentifier,
uint8_t * pBuffer,
size_t bufferSize );
/* @[declare_mqtt_serializesubscribe] */
/**
* @brief Generate a UNSUBSCRIBE packet from the given parameters.
*
* @param[in] pSubscriptionList User-provided array of subscriptions to remove.
* @param[in] subscriptionCount Size of `pSubscriptionList`.
* @param[in] remainingLength remaining length of the packet to be serialized.
* @param[out] pPacketIdentifier The packet identifier generated for this UNSUBSCRIBE.
* @param[in, out] pBuffer User provide buffer where the UNSUBSCRIBE packet is written.
* @param[in] bufferSize Size of the buffer pointed to by pBuffer.
*
* @return #IOT_MQTT_SUCCESS or #IOT_MQTT_NO_MEMORY.
*
* @note pBuffer must be allocated by caller.
* @note This call is part of serializer API used for implementing light-weight MQTT client.
*
* <b>Example</b>
* @code{c}
* // Example code below shows how IotMqtt_SerializeUnsubscribe() should be used to serialize
* // MQTT unsubscribe packet and send it to MQTT broker.
* // Example uses static memory, but dynamically allocated memory can be used as well.
* // Get size requirement for the Unsubscribe packet.
*
* #define mqttexampleSHARED_BUFFER_SIZE 100
* static ucSharedBuffer[mqttexampleSHARED_BUFFER_SIZE];
* void sendUnsubscribePacket( int xMQTTSocket )
* {
* // Following example shows one topic example.
* IotMqttSubscription_t xMQTTSubscription[ 1 ];
* size_t xRemainingLength = 0;
* size_t xPacketSize = 0;
* IotMqttError_t xResult;
* size_t xSentBytes = 0;
* // Get size requirement for MQTT unsubscribe packet.
* xResult = IotMqtt_GetSubscriptionPacketSize( IOT_MQTT_UNSUBSCRIBE,
* xMQTTSubscription,
* sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),
* &xRemainingLength, &xPacketSize );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
* // Make sure the packet size is less than static buffer size.
* IotMqtt_Assert( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );
* // Serialize subscribe into statically allocated ucSharedBuffer.
* xResult = IotMqtt_SerializeUnsubscribe( xMQTTSubscription,
* sizeof( xMQTTSubscription ) / sizeof( IotMqttSubscription_t ),
* xRemainingLength,
* &usPacketIdentifier,
* ucSharedBuffer,
* xPacketSize );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
* // xMQTTSocket here is posix socket created and connected to MQTT broker outside of this function.
* xSentBytes = send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 );
* IotMqtt_Assert( xSentBytes == xPacketSize );
* }
* @endcode
*/
/* @[declare_mqtt_serializeunsubscribe] */
IotMqttError_t IotMqtt_SerializeUnsubscribe( const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount,
size_t remainingLength,
uint16_t * pPacketIdentifier,
uint8_t * pBuffer,
size_t bufferSize );
/* @[declare_mqtt_serializeunsubscribe] */
/**
* @brief Calculate the size and "Remaining length" of a PUBLISH packet generated
* from the given parameters.
*
* @param[in] pPublishInfo User-provided PUBLISH information struct.
* @param[out] pRemainingLength Output for calculated "Remaining length" field.
* @param[out] pPacketSize Output for calculated total packet size.
*
* @return IOT_MQTT_SUCCESS if the packet is within the length allowed by MQTT 3.1.1 spec;
* IOT_MQTT_BAD_PARAMETER otherwise. If this function returns IOT_MQTT_BAD_PARAMETER,
* the output parameters should be ignored.
*
* @note This call is part of serializer API used for implementing light-weight MQTT client.
*
* <b>Example</b>
* @code{c}
* // Example code below shows how IotMqtt_GetPublishPacketSize() should be used to calculate
* // the size of MQTT publish request.
*
* IotMqttError_t xResult;
* IotMqttPublishInfo_t xMQTTPublishInfo;
* size_t xRemainingLength = 0;
* size_t xPacketSize = 0;
*
* // Initialize Publish parameters. Details are out of scope for this example.
* // It will involve setting QOS, topic filter, topic filter length, payload
* // payload length
* _initializePublish( &xMQTTPublishInfo );
*
* // Find out length of Publish packet size.
* xResult = IotMqtt_GetPublishPacketSize( &xMQTTPublishInfo, &xRemainingLength, &xPacketSize );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
*
* // Application should allocate buffer with size == xPacketSize or use static buffer
* // with size >= xPacketSize to serialize connect request.
* @endcode
*/
/* @[declare_mqtt_getpublishpacketsize] */
IotMqttError_t IotMqtt_GetPublishPacketSize( IotMqttPublishInfo_t * pPublishInfo,
size_t * pRemainingLength,
size_t * pPacketSize );
/* @[declare_mqtt_getpublishpacketsize] */
/**
* @brief Generate a PUBLISH packet from the given parameters.
*
* @param[in] pPublishInfo User-provided PUBLISH information.
* @param[in] remainingLength remaining length of the packet to be serialized.
* @param[out] pPacketIdentifier The packet identifier generated for this PUBLISH.
* @param[out] pPacketIdentifierHigh Where the high byte of the packet identifier
* is written.
* @param[in, out] pBuffer User provide buffer where the PUBLISH packet is written.
* @param[in] bufferSize Size of the buffer pointed to by pBuffer.
*
* @return #IOT_MQTT_SUCCESS or #IOT_MQTT_BAD_PARAMETER.
*
* @note pBuffer must be allocated by caller.
* @note This call is part of serializer API used for implementing light-weight MQTT client.
*
* <b>Example</b>
* @code{c}
* // Example code below shows how IotMqtt_SerializePublish() should be used to serialize
* // MQTT Publish packet and send it to broker.
* // Example uses static memory, but dynamically allocated memory can be used as well.
*
* #define mqttexampleSHARED_BUFFER_SIZE 100
* static ucSharedBuffer[mqttexampleSHARED_BUFFER_SIZE];
* void sendUnsubscribePacket( int xMQTTSocket )
* {
* IotMqttError_t xResult;
* IotMqttPublishInfo_t xMQTTPublishInfo;
* size_t xRemainingLength = 0;
* size_t xPacketSize = 0;
* size_t xSentBytes = 0;
* uint16_t usPacketIdentifier;
* uint8_t * pusPacketIdentifierHigh;
*
* // Initialize Publish parameters. Details are out of scope for this example.
* // It will involve setting QOS, topic filter, topic filter length, payload
* // payload length.
* _initializePublish( &xMQTTPublishInfo );
*
* // Find out length of Publish packet size.
* xResult = IotMqtt_GetPublishPacketSize( &xMQTTPublishInfo, &xRemainingLength, &xPacketSize );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
* // Make sure the packet size is less than static buffer size
* IotMqtt_Assert( xPacketSize < mqttexampleSHARED_BUFFER_SIZE );
*
* xResult = IotMqtt_SerializePublish( &xMQTTPublishInfo,
* xRemainingLength,
* &usPacketIdentifier,
* &pusPacketIdentifierHigh,
* ucSharedBuffer,
* xPacketSize );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
*
* // xMQTTSocket here is posix socket created and connected to MQTT broker outside of this function.
* xSentBytes = send( xMQTTSocket, ( void * ) ucSharedBuffer, xPacketSize, 0 );
* IotMqtt_Assert( xSentBytes == xPacketSize );
* }
* @endcode
*/
/* @[declare_mqtt_serializepublish] */
IotMqttError_t IotMqtt_SerializePublish( IotMqttPublishInfo_t * pPublishInfo,
size_t remainingLength,
uint16_t * pPacketIdentifier,
uint8_t ** pPacketIdentifierHigh,
uint8_t * pBuffer,
size_t bufferSize );
/* @[declare_mqtt_serializepublish] */
/**
* @brief Generate a DISCONNECT packet
*
* @param[in, out] pBuffer User provide buffer where the DISCONNECT packet is written.
* @param[in] bufferSize Size of the buffer pointed to by pBuffer.
*
* @return returns #IOT_MQTT_SUCCESS or #IOT_MQTT_BAD_PARAMETER
*
* @note This call is part of serializer API used for implementing light-weight MQTT client.
*
* <b>Example</b>
* @code{c}
* // Example below shows how IotMqtt_SerializeDisconnect() should be used.
*
* #define mqttexampleSHARED_BUFFER_SIZE 100
* static ucSharedBuffer[mqttexampleSHARED_BUFFER_SIZE];
* void sendDisconnectRequest( int xMQTTSocket )
* {
* size_t xSentBytes = 0;
*
* // Disconnect is fixed length packet, therefore there is no need to calculate the size,
* // just makes sure static buffer can accommodate disconnect request.
* IotMqtt_Assert( MQTT_PACKET_DISCONNECT_SIZE <= mqttexampleSHARED_BUFFER_SIZE );
*
* // Serialize Disconnect packet into static buffer (dynamically allocated buffer can be used as well)
* xResult = IotMqtt_SerializeDisconnect( ucSharedBuffer, MQTT_PACKET_DISCONNECT_SIZE );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
*
* // xMQTTSocket here is posix socket created and connected to MQTT broker outside of this function.
* xSentByte = send( xMQTTSocket, ( void * ) ucSharedBuffer, MQTT_PACKET_DISCONNECT_SIZE, 0 );
* IotMqtt_Assert( xSentByte == MQTT_PACKET_DISCONNECT_SIZE );
* }
*
* @endcode
*/
/* @[declare_mqtt_serializedisconnect] */
IotMqttError_t IotMqtt_SerializeDisconnect( uint8_t * pBuffer,
size_t bufferSize );
/* @[declare_mqtt_serializedisconnect] */
/**
* @brief Generate a PINGREQ packet.
*
* @param[in, out] pBuffer User provide buffer where the PINGREQ packet is written.
* @param[in] bufferSize Size of the buffer pointed to by pBuffer.
*
* @return #IOT_MQTT_SUCCESS or #IOT_MQTT_BAD_PARAMETER.
*
* @note This call is part of serializer API used for implementing light-weight MQTT client.
*
* <b>Example</b>
* @code{c}
* // Example below shows how IotMqtt_SerializePingReq() should be used.
*
* #define mqttexampleSHARED_BUFFER_SIZE 100
* static ucSharedBuffer[mqttexampleSHARED_BUFFER_SIZE];
* void sendPingRequest( int xMQTTSocket )
* {
* size_t xSentBytes = 0;
*
* // PingReq is fixed length packet, therefore there is no need to calculate the size,
* // just makes sure static buffer can accommodate Ping request.
* IotMqtt_Assert( MQTT_PACKET_PINGREQ_SIZE <= mqttexampleSHARED_BUFFER_SIZE );
*
* xResult = IotMqtt_SerializePingreq( ucSharedBuffer, MQTT_PACKET_PINGREQ_SIZE );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
*
* // xMQTTSocket here is posix socket created and connected to MQTT broker outside of this function.
* xSentByte = send( xMQTTSocket, ( void * ) ucSharedBuffer, MQTT_PACKET_DISCONNECT_SIZE, 0 );
* IotMqtt_Assert( xSentByte == MQTT_PACKET_PINGREQ_SIZE);
* }
* @endcode
*/
/* @[declare_mqtt_serializepingreq] */
IotMqttError_t IotMqtt_SerializePingreq( uint8_t * pBuffer,
size_t bufferSize );
/* @[declare_mqtt_serializepingreq] */
/**
* @brief Extract MQTT packet type and length from incoming packet
*
* @param[in, out] pIncomingPacket Pointer to IotMqttPacketInfo_t structure
* where type, remaining length and packet identifier are stored.
* @param[in] getNextByte Pointer to platform specific function which is used
* to extract type and length from incoming received stream (see example ).
* @param[in] pNetworkConnection Pointer to platform specific network connection
* which is used by getNextByte to receive network data
*
* @return #IOT_MQTT_SUCCESS on successful extraction of type and length,
* #IOT_MQTT_BAD_RESPONSE on failure.
*
* @note This call is part of serializer API used for implementing light-weight MQTT client.
*
* <b>Example</b>
* @code{c}
* // Example code below shows how to implement getNetxByte function with posix sockets.
* // Note: IotMqttGetNextByte_t typedef IotMqttError_t (* IotMqttGetNextByte_t)( void * pNetworkContext,
* // uint8_t * pNextByte );
* // Note: It is assumed that socket is already created and connected,
*
* IotMqttError_t getNextByte( void * pContext,
* uint8_t * pNextByte )
* {
* int socket = ( int ) ( *pvContext );
* int receivedBytes;
* IotMqttError_t result;
*
* receivedBytes = recv( socket, ( void * ) pNextByte, sizeof( uint8_t ), 0 );
*
* if( receivedBytes == sizeof( uint8_t ) )
* {
* result = IOT_MQTT_SUCCESS;
* }
* else
* {
* result = IOT_MQTT_TIMEOUT;
* }
*
* return result;
* }
*
* // Example below shows how IotMqtt_GetIncomingMQTTPacketTypeAndLength() is used to extract type
* // and length from incoming ping response.
* // xMQTTSocket here is posix socket created and connected to MQTT broker outside of this function.
* void getTypeAndLengthFromIncomingMQTTPingResponse( int xMQTTSocket )
* {
* IotMqttPacketInfo_t xIncomingPacket;
* IotMqttError_t xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
* IotMqtt_Assert( xIncomingPacket.type == MQTT_PACKET_TYPE_PINGRESP );
* }
* @endcode
*
*/
/* @[declare_mqtt_getincomingmqttpackettypeandlength] */
IotMqttError_t IotMqtt_GetIncomingMQTTPacketTypeAndLength( IotMqttPacketInfo_t * pIncomingPacket,
IotMqttGetNextByte_t getNextByte,
void * pNetworkConnection );
/* @[declare_mqtt_getincomingmqttpackettypeandlength] */
/**
* @brief Deserialize incoming publish packet.
*
* @param[in, out] pMqttPacket The caller of this API sets type, remainingLength and pRemainingData.
* On success, packetIdentifier and pubInfo will be set by the function.
*
* @return One of the following:
* - #IOT_MQTT_SUCCESS
* - #IOT_MQTT_BAD_RESPONSE
* - #IOT_MQTT_SERVER_REFUSED
*
* @note This call is part of serializer API used for implementing light-weight MQTT client.
*
* <b>Example</b>
* @code{c}
* // Example below shows how IotMqtt_DeserializePublish() used to extract contents of incoming
* // Publish. xMQTTSocket here is posix socket created and connected to MQTT broker outside of this function.
* void processIncomingPublish( int xMQTTSocket )
* {
* IotMqttError_t xResult;
* IotMqttPacketInfo_t xIncomingPacket;
*
* xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
* IotMqtt_Assert( ( xIncomingPacket.type & 0xf0 ) == MQTT_PACKET_TYPE_PUBLISH );
* IotMqtt_Assert( xIncomingPacket.remainingLength <= mqttexampleSHARED_BUFFER_SIZE );
*
* // Receive the remaining bytes.
* if( recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == xIncomingPacket.remainingLength )
* {
* xIncomingPacket.pRemainingData = ucSharedBuffer;
*
* if( IotMqtt_DeserializePublish( &xIncomingPacket ) != IOT_MQTT_SUCCESS )
* {
* xResult = IOT_MQTT_BAD_RESPONSE;
* }
* else
* {
* // Process incoming Publish.
* IotLogInfo( "Incoming QOS : %d\n", xIncomingPacket.pubInfo.qos );
* IotLogInfo( "Incoming Publish Topic Name: %.*s\n", xIncomingPacket.pubInfo.topicNameLength, xIncomingPacket.pubInfo.pTopicName );
* IotLogInfo( "Incoming Publish Message : %.*s\n", xIncomingPacket.pubInfo.payloadLength, xIncomingPacket.pubInfo.pPayload );
* }
* }
* else
* {
* xResult = IOT_MQTT_NETWORK_ERROR;
* }
*
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
* }
* @endcode
*/
/* @[declare_mqtt_deserializepublish] */
IotMqttError_t IotMqtt_DeserializePublish( IotMqttPacketInfo_t * pMqttPacket );
/* @[declare_mqtt_deserializepublish] */
/**
* @brief Deserialize incoming ack packets.
*
* @param[in, out] pMqttPacket The caller of this API sets type, remainingLength and pRemainingData.
* On success, packetIdentifier will be set.
*
* @return One of the following:
* - #IOT_MQTT_SUCCESS
* - #IOT_MQTT_BAD_RESPONSE
* - #IOT_MQTT_SERVER_REFUSED
*
* @note This call is part of serializer API used for implementing light-weight MQTT client.
*
* <b>Example</b>
* @code{c}
* // Example below shows how IotMqtt_DeserializeResponse() is used to process unsubscribe ack.
* // xMQTTSocket here is posix socket created and connected to MQTT broker outside of this function.
* void processUnsubscribeAck( int xMQTTSocket )
* {
* IotMqttError_t xResult;
* IotMqttPacketInfo_t xIncomingPacket;
*
* xResult = IotMqtt_GetIncomingMQTTPacketTypeAndLength( &xIncomingPacket, getNextByte, ( void * ) xMQTTSocket );
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
* IotMqtt_Assert( xIncomingPacket.type == MQTT_PACKET_TYPE_UNSUBACK );
* IotMqtt_Assert( xIncomingPacket.remainingLength <= sizeof( ucSharedBuffer ) );
*
* // Receive the remaining bytes.
* if( recv( xMQTTSocket, ( void * ) ucSharedBuffer, xIncomingPacket.remainingLength, 0 ) == xIncomingPacket.remainingLength )
* {
* xIncomingPacket.pRemainingData = ucSharedBuffer;
*
* if( IotMqtt_DeserializeResponse( &xIncomingPacket ) != IOT_MQTT_SUCCESS )
* {
* xResult = IOT_MQTT_BAD_RESPONSE;
* }
* }
* else
* {
* xResult = IOT_MQTT_NETWORK_ERROR;
* }
* IotMqtt_Assert( xResult == IOT_MQTT_SUCCESS );
* }
* @endcode
*/
/* @[declare_mqtt_deserializeresponse] */
IotMqttError_t IotMqtt_DeserializeResponse( IotMqttPacketInfo_t * pMqttPacket );
/* @[declare_mqtt_deserializeresponse] */
#endif /* ifndef IOT_MQTT_SERIALIZE_H_ */

View file

@ -0,0 +1,884 @@
/*
* IoT MQTT V2.1.0
* Copyright (C) 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* @file iot_mqtt_network.c
* @brief Implements functions involving transport layer connections.
*/
/* The config header is always included first. */
#include "iot_config.h"
/* Standard includes. */
#include <string.h>
/* Error handling include. */
#include "iot_error.h"
/* MQTT internal include. */
#include "private/iot_mqtt_internal.h"
/* Platform layer includes. */
#include "platform/iot_threads.h"
/* Atomics include. */
#include "iot_atomic.h"
/*-----------------------------------------------------------*/
/**
* @brief Check if an incoming packet type is valid.
*
* @param[in] packetType The packet type to check.
*
* @return `true` if the packet type is valid; `false` otherwise.
*/
static bool _incomingPacketValid( uint8_t packetType );
/**
* @brief Get an incoming MQTT packet from the network.
*
* @param[in] pNetworkConnection Network connection to use for receive, which
* may be different from the network connection associated with the MQTT connection.
* @param[in] pMqttConnection The associated MQTT connection.
* @param[out] pIncomingPacket Output parameter for the incoming packet.
*
* @return #IOT_MQTT_SUCCESS, #IOT_MQTT_NO_MEMORY or #IOT_MQTT_BAD_RESPONSE.
*/
static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,
const _mqttConnection_t * pMqttConnection,
_mqttPacket_t * pIncomingPacket );
/**
* @brief Deserialize a packet received from the network.
*
* @param[in] pMqttConnection The associated MQTT connection.
* @param[in] pIncomingPacket The packet received from the network.
*
* @return #IOT_MQTT_SUCCESS, #IOT_MQTT_NO_MEMORY, #IOT_MQTT_NETWORK_ERROR,
* #IOT_MQTT_SCHEDULING_ERROR, #IOT_MQTT_BAD_RESPONSE, or #IOT_MQTT_SERVER_REFUSED.
*/
static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConnection,
_mqttPacket_t * pIncomingPacket );
/**
* @brief Send a PUBACK for a received QoS 1 PUBLISH packet.
*
* @param[in] pMqttConnection Which connection the PUBACK should be sent over.
* @param[in] packetIdentifier Which packet identifier to include in PUBACK.
*/
static void _sendPuback( _mqttConnection_t * pMqttConnection,
uint16_t packetIdentifier );
/**
* @brief Flush a packet from the stream of incoming data.
*
* This function is called when memory for a packet cannot be allocated. The
* packet is flushed from the stream of incoming data so that the next packet
* may be read.
*
* @param[in] pNetworkConnection Network connection to use for receive, which
* may be different from the network connection associated with the MQTT connection.
* @param[in] pMqttConnection The associated MQTT connection.
* @param[in] length The length of the packet to flush.
*/
static void _flushPacket( void * pNetworkConnection,
const _mqttConnection_t * pMqttConnection,
size_t length );
/**
* @cond DOXYGEN_IGNORE
* Doxygen should ignore this section.
*
* Declaration of local MQTT serializer override selectors
*/
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttGetPacketType_t,
_getPacketTypeFunc,
_IotMqtt_GetPacketType,
getPacketType )
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttGetRemainingLength_t,
_getRemainingLengthFunc,
_IotMqtt_GetRemainingLength,
getRemainingLength )
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t,
_getConnackDeserializer,
_IotMqtt_DeserializeConnack,
deserialize.connack )
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t,
_getPublishDeserializer,
_IotMqtt_DeserializePublish,
deserialize.publish )
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t,
_getPubackDeserializer,
_IotMqtt_DeserializePuback,
deserialize.puback )
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t,
_getSubackDeserializer,
_IotMqtt_DeserializeSuback,
deserialize.suback )
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t,
_getUnsubackDeserializer,
_IotMqtt_DeserializeUnsuback,
deserialize.unsuback )
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttDeserialize_t,
_getPingrespDeserializer,
_IotMqtt_DeserializePingresp,
deserialize.pingresp )
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttSerializePuback_t,
_getMqttPubackSerializer,
_IotMqtt_SerializePuback,
serialize.puback )
_SERIALIZER_OVERRIDE_SELECTOR( IotMqttFreePacket_t,
_getMqttFreePacketFunc,
_IotMqtt_FreePacket,
freePacket )
#else /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
#define _getPacketTypeFunc( pSerializer ) _IotMqtt_GetPacketType
#define _getRemainingLengthFunc( pSerializer ) _IotMqtt_GetRemainingLength
#define _getConnackDeserializer( pSerializer ) _IotMqtt_DeserializeConnack
#define _getPublishDeserializer( pSerializer ) _IotMqtt_DeserializePublish
#define _getPubackDeserializer( pSerializer ) _IotMqtt_DeserializePuback
#define _getSubackDeserializer( pSerializer ) _IotMqtt_DeserializeSuback
#define _getUnsubackDeserializer( pSerializer ) _IotMqtt_DeserializeUnsuback
#define _getPingrespDeserializer( pSerializer ) _IotMqtt_DeserializePingresp
#define _getMqttPubackSerializer( pSerializer ) _IotMqtt_SerializePuback
#define _getMqttFreePacketFunc( pSerializer ) _IotMqtt_FreePacket
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/** @endcond */
/*-----------------------------------------------------------*/
static bool _incomingPacketValid( uint8_t packetType )
{
bool status = true;
/* Check packet type. Mask out lower bits to ignore flags. */
switch( packetType & 0xf0 )
{
/* Valid incoming packet types. */
case MQTT_PACKET_TYPE_CONNACK:
case MQTT_PACKET_TYPE_PUBLISH:
case MQTT_PACKET_TYPE_PUBACK:
case MQTT_PACKET_TYPE_SUBACK:
case MQTT_PACKET_TYPE_UNSUBACK:
case MQTT_PACKET_TYPE_PINGRESP:
break;
/* Any other packet type is invalid. */
default:
status = false;
break;
}
return status;
}
/*-----------------------------------------------------------*/
static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,
const _mqttConnection_t * pMqttConnection,
_mqttPacket_t * pIncomingPacket )
{
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
size_t dataBytesRead = 0;
/* No buffer for remaining data should be allocated. */
IotMqtt_Assert( pIncomingPacket->pRemainingData == NULL );
IotMqtt_Assert( pIncomingPacket->remainingLength == 0 );
/* Read the packet type, which is the first byte available. */
pIncomingPacket->type = _getPacketTypeFunc( pMqttConnection->pSerializer )( pNetworkConnection,
pMqttConnection->pNetworkInterface );
/* Check that the incoming packet type is valid. */
if( _incomingPacketValid( pIncomingPacket->type ) == false )
{
IotLogError( "(MQTT connection %p) Unknown packet type %02x received.",
pMqttConnection,
pIncomingPacket->type );
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Read the remaining length. */
pIncomingPacket->remainingLength = _getRemainingLengthFunc( pMqttConnection->pSerializer )( pNetworkConnection,
pMqttConnection->pNetworkInterface );
if( pIncomingPacket->remainingLength == MQTT_REMAINING_LENGTH_INVALID )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Allocate a buffer for the remaining data and read the data. */
if( pIncomingPacket->remainingLength > 0 )
{
pIncomingPacket->pRemainingData = IotMqtt_MallocMessage( pIncomingPacket->remainingLength );
if( pIncomingPacket->pRemainingData == NULL )
{
IotLogError( "(MQTT connection %p) Failed to allocate buffer of length "
"%lu for incoming packet type %lu.",
pMqttConnection,
( unsigned long ) pIncomingPacket->remainingLength,
( unsigned long ) pIncomingPacket->type );
_flushPacket( pNetworkConnection, pMqttConnection, pIncomingPacket->remainingLength );
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );
}
else
{
EMPTY_ELSE_MARKER;
}
dataBytesRead = pMqttConnection->pNetworkInterface->receive( pNetworkConnection,
pIncomingPacket->pRemainingData,
pIncomingPacket->remainingLength );
if( dataBytesRead != pIncomingPacket->remainingLength )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_RESPONSE );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Clean up on error. */
IOT_FUNCTION_CLEANUP_BEGIN();
if( status != IOT_MQTT_SUCCESS )
{
if( pIncomingPacket->pRemainingData != NULL )
{
IotMqtt_FreeMessage( pIncomingPacket->pRemainingData );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
IOT_FUNCTION_CLEANUP_END();
}
/*-----------------------------------------------------------*/
static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConnection,
_mqttPacket_t * pIncomingPacket )
{
IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
_mqttOperation_t * pOperation = NULL;
/* A buffer for remaining data must be allocated if remaining length is not 0. */
IotMqtt_Assert( ( pIncomingPacket->remainingLength > 0 ) ==
( pIncomingPacket->pRemainingData != NULL ) );
/* Only valid packets should be given to this function. */
IotMqtt_Assert( _incomingPacketValid( pIncomingPacket->type ) == true );
/* Mask out the low bits of packet type to ignore flags. */
switch( ( pIncomingPacket->type & 0xf0 ) )
{
case MQTT_PACKET_TYPE_CONNACK:
IotLogDebug( "(MQTT connection %p) CONNACK in data stream.", pMqttConnection );
/* Deserialize CONNACK and notify of result. */
status = _getConnackDeserializer( pMqttConnection->pSerializer )( pIncomingPacket );
pOperation = _IotMqtt_FindOperation( pMqttConnection,
IOT_MQTT_CONNECT,
NULL );
if( pOperation != NULL )
{
pOperation->u.operation.status = status;
_IotMqtt_Notify( pOperation );
}
else
{
EMPTY_ELSE_MARKER;
}
break;
case MQTT_PACKET_TYPE_PUBLISH:
IotLogDebug( "(MQTT connection %p) PUBLISH in data stream.", pMqttConnection );
/* Allocate memory to handle the incoming PUBLISH. */
pOperation = IotMqtt_MallocOperation( sizeof( _mqttOperation_t ) );
if( pOperation == NULL )
{
IotLogWarn( "Failed to allocate memory for incoming PUBLISH." );
status = IOT_MQTT_NO_MEMORY;
break;
}
else
{
/* Set the members of the incoming PUBLISH operation. */
( void ) memset( pOperation, 0x00, sizeof( _mqttOperation_t ) );
pOperation->incomingPublish = true;
pOperation->pMqttConnection = pMqttConnection;
pIncomingPacket->u.pIncomingPublish = pOperation;
}
/* Deserialize incoming PUBLISH. */
status = _getPublishDeserializer( pMqttConnection->pSerializer )( pIncomingPacket );
if( status == IOT_MQTT_SUCCESS )
{
/* Send a PUBACK for QoS 1 PUBLISH. */
if( pOperation->u.publish.publishInfo.qos == IOT_MQTT_QOS_1 )
{
_sendPuback( pMqttConnection, pIncomingPacket->packetIdentifier );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Transfer ownership of the received MQTT packet to the PUBLISH operation. */
pOperation->u.publish.pReceivedData = pIncomingPacket->pRemainingData;
pIncomingPacket->pRemainingData = NULL;
/* Add the PUBLISH to the list of operations pending processing. */
IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
IotListDouble_InsertHead( &( pMqttConnection->pendingProcessing ),
&( pOperation->link ) );
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
/* Increment the MQTT connection reference count before scheduling an
* incoming PUBLISH. */
if( _IotMqtt_IncrementConnectionReferences( pMqttConnection ) == true )
{
/* Schedule PUBLISH for callback invocation. */
status = _IotMqtt_ScheduleOperation( pOperation, _IotMqtt_ProcessIncomingPublish, 0 );
}
else
{
status = IOT_MQTT_NETWORK_ERROR;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Free PUBLISH operation on error. */
if( status != IOT_MQTT_SUCCESS )
{
/* Check ownership of the received MQTT packet. */
if( pOperation->u.publish.pReceivedData != NULL )
{
/* Retrieve the pointer MQTT packet pointer so it may be freed later. */
IotMqtt_Assert( pIncomingPacket->pRemainingData == NULL );
pIncomingPacket->pRemainingData = ( uint8_t * ) pOperation->u.publish.pReceivedData;
}
else
{
EMPTY_ELSE_MARKER;
}
/* Remove operation from pending processing list. */
IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
if( IotLink_IsLinked( &( pOperation->link ) ) == true )
{
IotListDouble_Remove( &( pOperation->link ) );
}
else
{
EMPTY_ELSE_MARKER;
}
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
IotMqtt_Assert( pOperation != NULL );
IotMqtt_FreeOperation( pOperation );
}
else
{
EMPTY_ELSE_MARKER;
}
break;
case MQTT_PACKET_TYPE_PUBACK:
IotLogDebug( "(MQTT connection %p) PUBACK in data stream.", pMqttConnection );
/* Deserialize PUBACK and notify of result. */
status = _getPubackDeserializer( pMqttConnection->pSerializer )( pIncomingPacket );
pOperation = _IotMqtt_FindOperation( pMqttConnection,
IOT_MQTT_PUBLISH_TO_SERVER,
&( pIncomingPacket->packetIdentifier ) );
if( pOperation != NULL )
{
pOperation->u.operation.status = status;
_IotMqtt_Notify( pOperation );
}
else
{
EMPTY_ELSE_MARKER;
}
break;
case MQTT_PACKET_TYPE_SUBACK:
IotLogDebug( "(MQTT connection %p) SUBACK in data stream.", pMqttConnection );
/* Deserialize SUBACK and notify of result. */
pIncomingPacket->u.pMqttConnection = pMqttConnection;
status = _getSubackDeserializer( pMqttConnection->pSerializer )( pIncomingPacket );
pOperation = _IotMqtt_FindOperation( pMqttConnection,
IOT_MQTT_SUBSCRIBE,
&( pIncomingPacket->packetIdentifier ) );
if( pOperation != NULL )
{
pOperation->u.operation.status = status;
_IotMqtt_Notify( pOperation );
}
else
{
EMPTY_ELSE_MARKER;
}
break;
case MQTT_PACKET_TYPE_UNSUBACK:
IotLogDebug( "(MQTT connection %p) UNSUBACK in data stream.", pMqttConnection );
/* Deserialize UNSUBACK and notify of result. */
status = _getUnsubackDeserializer( pMqttConnection->pSerializer )( pIncomingPacket );
pOperation = _IotMqtt_FindOperation( pMqttConnection,
IOT_MQTT_UNSUBSCRIBE,
&( pIncomingPacket->packetIdentifier ) );
if( pOperation != NULL )
{
pOperation->u.operation.status = status;
_IotMqtt_Notify( pOperation );
}
else
{
EMPTY_ELSE_MARKER;
}
break;
default:
/* The only remaining valid type is PINGRESP. */
IotMqtt_Assert( ( pIncomingPacket->type & 0xf0 ) == MQTT_PACKET_TYPE_PINGRESP );
IotLogDebug( "(MQTT connection %p) PINGRESP in data stream.", pMqttConnection );
/* Deserialize PINGRESP. */
status = _getPingrespDeserializer( pMqttConnection->pSerializer )( pIncomingPacket );
if( status == IOT_MQTT_SUCCESS )
{
if( Atomic_CompareAndSwap_u32( &( pMqttConnection->pingreq.u.operation.periodic.ping.failure ),
0,
1 ) == 1 )
{
IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",
pMqttConnection );
}
else
{
IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",
pMqttConnection );
}
}
else
{
EMPTY_ELSE_MARKER;
}
break;
}
if( status != IOT_MQTT_SUCCESS )
{
IotLogError( "(MQTT connection %p) Packet parser status %s.",
pMqttConnection,
IotMqtt_strerror( status ) );
}
else
{
EMPTY_ELSE_MARKER;
}
return status;
}
/*-----------------------------------------------------------*/
static void _sendPuback( _mqttConnection_t * pMqttConnection,
uint16_t packetIdentifier )
{
IotMqttError_t status = IOT_MQTT_STATUS_PENDING;
_mqttOperation_t * pPubackOperation = NULL;
IotLogDebug( "(MQTT connection %p) Sending PUBACK for received PUBLISH %hu.",
pMqttConnection,
packetIdentifier );
/* Create a PUBACK operation. */
status = _IotMqtt_CreateOperation( pMqttConnection,
0,
NULL,
&pPubackOperation );
if( status != IOT_MQTT_SUCCESS )
{
IOT_GOTO_CLEANUP();
}
/* Set the operation type. */
pPubackOperation->u.operation.type = IOT_MQTT_PUBACK;
/* Generate a PUBACK packet from the packet identifier. */
status = _getMqttPubackSerializer( pMqttConnection->pSerializer )( packetIdentifier,
&( pPubackOperation->u.operation.pMqttPacket ),
&( pPubackOperation->u.operation.packetSize ) );
if( status != IOT_MQTT_SUCCESS )
{
IOT_GOTO_CLEANUP();
}
/* Add the PUBACK operation to the send queue for network transmission. */
status = _IotMqtt_ScheduleOperation( pPubackOperation,
_IotMqtt_ProcessSend,
0 );
if( status != IOT_MQTT_SUCCESS )
{
IotLogError( "(MQTT connection %p) Failed to enqueue PUBACK for sending.",
pMqttConnection );
IOT_GOTO_CLEANUP();
}
else
{
EMPTY_ELSE_MARKER;
}
/* Clean up on error. */
IOT_FUNCTION_CLEANUP_BEGIN();
if( status != IOT_MQTT_SUCCESS )
{
if( pPubackOperation != NULL )
{
_IotMqtt_DestroyOperation( pPubackOperation );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
}
/*-----------------------------------------------------------*/
static void _flushPacket( void * pNetworkConnection,
const _mqttConnection_t * pMqttConnection,
size_t length )
{
size_t bytesFlushed = 0;
uint8_t receivedByte = 0;
for( bytesFlushed = 0; bytesFlushed < length; bytesFlushed++ )
{
( void ) _IotMqtt_GetNextByte( pNetworkConnection,
pMqttConnection->pNetworkInterface,
&receivedByte );
}
}
/*-----------------------------------------------------------*/
bool _IotMqtt_GetNextByte( void * pNetworkConnection,
const IotNetworkInterface_t * pNetworkInterface,
uint8_t * pIncomingByte )
{
bool status = false;
uint8_t incomingByte = 0;
size_t bytesReceived = 0;
/* Attempt to read 1 byte. */
bytesReceived = pNetworkInterface->receive( pNetworkConnection,
&incomingByte,
1 );
/* Set the output parameter and return success if 1 byte was read. */
if( bytesReceived == 1 )
{
*pIncomingByte = incomingByte;
status = true;
}
else
{
/* Network receive must return 0 on failure. */
IotMqtt_Assert( bytesReceived == 0 );
}
return status;
}
/*-----------------------------------------------------------*/
void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason,
_mqttConnection_t * pMqttConnection )
{
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;
IotNetworkError_t closeStatus = IOT_NETWORK_SUCCESS;
IotMqttCallbackParam_t callbackParam = { .u.message = { 0 } };
void * pNetworkConnection = NULL, * pDisconnectCallbackContext = NULL;
/* Disconnect callback function. */
void ( * disconnectCallback )( void *,
IotMqttCallbackParam_t * ) = NULL;
/* Network close function. */
IotNetworkError_t ( * closeConnection) ( IotNetworkConnection_t ) = NULL;
/* Mark the MQTT connection as disconnected and the keep-alive as failed. */
IotMutex_Lock( &( pMqttConnection->referencesMutex ) );
pMqttConnection->disconnected = true;
if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )
{
/* Keep-alive must have a PINGREQ allocated. */
IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket != NULL );
IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize != 0 );
/* PINGREQ provides a reference to the connection, so reference count must
* be nonzero. */
IotMqtt_Assert( pMqttConnection->references > 0 );
/* Attempt to cancel the keep-alive job. */
taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,
pMqttConnection->pingreq.job,
NULL );
/* Clean up keep-alive if its job was successfully canceled. Otherwise,
* the executing keep-alive job will clean up itself. */
if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )
{
/* Free the packet */
_getMqttFreePacketFunc( pMqttConnection->pSerializer )( pMqttConnection->pingreq.u.operation.pMqttPacket );
/* Clear data about the keep-alive. */
pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;
pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;
pMqttConnection->pingreq.u.operation.packetSize = 0;
/* Keep-alive is cleaned up; decrement reference count. Since this
* function must be followed with a call to DISCONNECT, a check to
* destroy the connection is not done here. */
pMqttConnection->references--;
IotLogDebug( "(MQTT connection %p) Keep-alive job canceled and cleaned up.",
pMqttConnection );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Copy the function pointers and contexts, as the MQTT connection may be
* modified after the mutex is released. */
disconnectCallback = pMqttConnection->disconnectCallback.function;
pDisconnectCallbackContext = pMqttConnection->disconnectCallback.pCallbackContext;
closeConnection = pMqttConnection->pNetworkInterface->close;
pNetworkConnection = pMqttConnection->pNetworkConnection;
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );
/* Close the network connection. */
if( closeConnection != NULL )
{
closeStatus = closeConnection( pNetworkConnection );
if( closeStatus == IOT_NETWORK_SUCCESS )
{
IotLogInfo( "(MQTT connection %p) Network connection closed.", pMqttConnection );
}
else
{
IotLogWarn( "(MQTT connection %p) Failed to close network connection, error %d.",
pMqttConnection,
closeStatus );
}
}
else
{
IotLogWarn( "(MQTT connection %p) No network close function was set. Network connection"
" not closed.", pMqttConnection );
}
/* Invoke the disconnect callback. */
if( disconnectCallback != NULL )
{
/* Set the members of the callback parameter. */
callbackParam.mqttConnection = pMqttConnection;
callbackParam.u.disconnectReason = disconnectReason;
disconnectCallback( pDisconnectCallbackContext,
&callbackParam );
}
else
{
EMPTY_ELSE_MARKER;
}
}
/*-----------------------------------------------------------*/
void IotMqtt_ReceiveCallback( IotNetworkConnection_t pNetworkConnection,
void * pReceiveContext )
{
IotMqttError_t status = IOT_MQTT_SUCCESS;
_mqttPacket_t incomingPacket = { .u.pMqttConnection = NULL };
/* Cast context to correct type. */
_mqttConnection_t * pMqttConnection = ( _mqttConnection_t * ) pReceiveContext;
/* Read an MQTT packet from the network. */
status = _getIncomingPacket( pNetworkConnection,
pMqttConnection,
&incomingPacket );
if( status == IOT_MQTT_SUCCESS )
{
/* Deserialize the received packet. */
status = _deserializeIncomingPacket( pMqttConnection,
&incomingPacket );
/* Free any buffers allocated for the MQTT packet. */
if( incomingPacket.pRemainingData != NULL )
{
IotMqtt_FreeMessage( incomingPacket.pRemainingData );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Close the network connection on a bad response. */
if( status == IOT_MQTT_BAD_RESPONSE )
{
IotLogError( "(MQTT connection %p) Error processing incoming data. Closing connection.",
pMqttConnection );
_IotMqtt_CloseNetworkConnection( IOT_MQTT_BAD_PACKET_RECEIVED,
pMqttConnection );
}
else
{
EMPTY_ELSE_MARKER;
}
}
/*-----------------------------------------------------------*/
IotMqttError_t IotMqtt_GetIncomingMQTTPacketTypeAndLength( IotMqttPacketInfo_t * pIncomingPacket,
IotMqttGetNextByte_t getNextByte,
void * pNetworkConnection )
{
IotMqttError_t status = IOT_MQTT_SUCCESS;
/* Read the packet type, which is the first byte available. */
if( getNextByte( pNetworkConnection, &( pIncomingPacket->type ) ) == IOT_MQTT_SUCCESS )
{
/* Check that the incoming packet type is valid. */
if( _incomingPacketValid( pIncomingPacket->type ) == false )
{
IotLogError( "(MQTT connection) Unknown packet type %02x received.",
pIncomingPacket->type );
status = IOT_MQTT_BAD_RESPONSE;
}
else
{
/* Read the remaining length. */
pIncomingPacket->remainingLength = _IotMqtt_GetRemainingLength_Generic( pNetworkConnection,
getNextByte );
if( pIncomingPacket->remainingLength == MQTT_REMAINING_LENGTH_INVALID )
{
status = IOT_MQTT_BAD_RESPONSE;
}
}
}
else
{
status = IOT_MQTT_NETWORK_ERROR;
}
return status;
}
/*-----------------------------------------------------------*/

View file

@ -0,0 +1,645 @@
/*
* IoT MQTT V2.1.0
* Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* @file iot_mqtt_subscription.c
* @brief Implements functions that manage subscriptions for an MQTT connection.
*/
/* The config header is always included first. */
#include "iot_config.h"
/* Standard includes. */
#include <stdbool.h>
#include <string.h>
/* Error handling include. */
#include "iot_error.h"
/* MQTT internal include. */
#include "private/iot_mqtt_internal.h"
/* Platform layer includes. */
#include "platform/iot_threads.h"
/*-----------------------------------------------------------*/
/**
* @brief First parameter to #_topicMatch.
*/
typedef struct _topicMatchParams
{
const char * pTopicName; /**< @brief The topic name to parse. */
uint16_t topicNameLength; /**< @brief Length of #_topicMatchParams_t.pTopicName. */
bool exactMatchOnly; /**< @brief Whether to allow wildcards or require exact matches. */
} _topicMatchParams_t;
/**
* @brief First parameter to #_packetMatch.
*/
typedef struct _packetMatchParams
{
uint16_t packetIdentifier; /**< Packet identifier to match. */
int32_t order; /**< Order to match. Set to #MQTT_REMOVE_ALL_SUBSCRIPTIONS to ignore. */
} _packetMatchParams_t;
/*-----------------------------------------------------------*/
/**
* @brief Matches a topic name (from a publish) with a topic filter (from a
* subscription).
*
* @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.
* @param[in] pMatch Pointer to a #_topicMatchParams_t.
*
* @return `true` if the arguments match the subscription topic filter; `false`
* otherwise.
*/
static bool _topicMatch( const IotLink_t * pSubscriptionLink,
void * pMatch );
/**
* @brief Matches a packet identifier and order.
*
* @param[in] pSubscriptionLink Pointer to the link member of an #_mqttSubscription_t.
* @param[in] pMatch Pointer to a #_packetMatchParams_t.
*
* @return `true` if the arguments match the subscription's packet info; `false`
* otherwise.
*/
static bool _packetMatch( const IotLink_t * pSubscriptionLink,
void * pMatch );
/*-----------------------------------------------------------*/
static bool _topicMatch( const IotLink_t * pSubscriptionLink,
void * pMatch )
{
IOT_FUNCTION_ENTRY( bool, false );
uint16_t nameIndex = 0, filterIndex = 0;
/* Because this function is called from a container function, the given link
* must never be NULL. */
IotMqtt_Assert( pSubscriptionLink != NULL );
_mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,
pSubscriptionLink,
link );
_topicMatchParams_t * pParam = ( _topicMatchParams_t * ) pMatch;
/* Extract the relevant strings and lengths from parameters. */
const char * pTopicName = pParam->pTopicName;
const char * pTopicFilter = pSubscription->pTopicFilter;
const uint16_t topicNameLength = pParam->topicNameLength;
const uint16_t topicFilterLength = pSubscription->topicFilterLength;
/* Check for an exact match. */
if( topicNameLength == topicFilterLength )
{
status = ( strncmp( pTopicName, pTopicFilter, topicNameLength ) == 0 );
IOT_GOTO_CLEANUP();
}
else
{
EMPTY_ELSE_MARKER;
}
/* If the topic lengths are different but an exact match is required, return
* false. */
if( pParam->exactMatchOnly == true )
{
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
while( ( nameIndex < topicNameLength ) && ( filterIndex < topicFilterLength ) )
{
/* Check if the character in the topic name matches the corresponding
* character in the topic filter string. */
if( pTopicName[ nameIndex ] == pTopicFilter[ filterIndex ] )
{
/* Handle special corner cases as documented by the MQTT protocol spec. */
/* Filter "sport/#" also matches "sport" since # includes the parent level. */
if( nameIndex == topicNameLength - 1 )
{
if( filterIndex == topicFilterLength - 3 )
{
if( pTopicFilter[ filterIndex + 1 ] == '/' )
{
if( pTopicFilter[ filterIndex + 2 ] == '#' )
{
IOT_SET_AND_GOTO_CLEANUP( true );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Filter "sport/+" also matches the "sport/" but not "sport". */
if( nameIndex == topicNameLength - 1 )
{
if( filterIndex == topicFilterLength - 2 )
{
if( pTopicFilter[ filterIndex + 1 ] == '+' )
{
IOT_SET_AND_GOTO_CLEANUP( true );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
/* Check for wildcards. */
if( pTopicFilter[ filterIndex ] == '+' )
{
/* Move topic name index to the end of the current level.
* This is identified by '/'. */
while( nameIndex < topicNameLength && pTopicName[ nameIndex ] != '/' )
{
nameIndex++;
}
/* Increment filter index to skip '/'. */
filterIndex++;
continue;
}
else if( pTopicFilter[ filterIndex ] == '#' )
{
/* Subsequent characters don't need to be checked if the for the
* multi-level wildcard. */
IOT_SET_AND_GOTO_CLEANUP( true );
}
else
{
/* Any character mismatch other than '+' or '#' means the topic
* name does not match the topic filter. */
IOT_SET_AND_GOTO_CLEANUP( false );
}
}
/* Increment indexes. */
nameIndex++;
filterIndex++;
}
/* If the end of both strings has been reached, they match. */
if( ( nameIndex == topicNameLength ) && ( filterIndex == topicFilterLength ) )
{
IOT_SET_AND_GOTO_CLEANUP( true );
}
else
{
EMPTY_ELSE_MARKER;
}
IOT_FUNCTION_EXIT_NO_CLEANUP();
}
/*-----------------------------------------------------------*/
static bool _packetMatch( const IotLink_t * pSubscriptionLink,
void * pMatch )
{
bool match = false;
/* Because this function is called from a container function, the given link
* must never be NULL. */
IotMqtt_Assert( pSubscriptionLink != NULL );
_mqttSubscription_t * pSubscription = IotLink_Container( _mqttSubscription_t,
pSubscriptionLink,
link );
_packetMatchParams_t * pParam = ( _packetMatchParams_t * ) pMatch;
/* Compare packet identifiers. */
if( pParam->packetIdentifier == pSubscription->packetInfo.identifier )
{
/* Compare orders if order is not MQTT_REMOVE_ALL_SUBSCRIPTIONS. */
if( pParam->order == MQTT_REMOVE_ALL_SUBSCRIPTIONS )
{
match = true;
}
else
{
match = ( ( size_t ) pParam->order ) == pSubscription->packetInfo.order;
}
}
/* If this subscription should be removed, check the reference count. */
if( match == true )
{
/* Reference count must not be negative. */
IotMqtt_Assert( pSubscription->references >= 0 );
/* If the reference count is positive, this subscription cannot be
* removed yet because there are subscription callbacks using it. */
if( pSubscription->references > 0 )
{
match = false;
/* Set the unsubscribed flag. The last active subscription callback
* will remove and clean up this subscription. */
pSubscription->unsubscribed = true;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
return match;
}
/*-----------------------------------------------------------*/
IotMqttError_t _IotMqtt_AddSubscriptions( _mqttConnection_t * pMqttConnection,
uint16_t subscribePacketIdentifier,
const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount )
{
IotMqttError_t status = IOT_MQTT_SUCCESS;
size_t i = 0;
_mqttSubscription_t * pNewSubscription = NULL;
IotLink_t * pSubscriptionLink = NULL;
_topicMatchParams_t topicMatchParams = { .exactMatchOnly = true };
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
for( i = 0; i < subscriptionCount; i++ )
{
/* Check if this topic filter is already registered. */
topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;
topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;
pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
NULL,
_topicMatch,
&topicMatchParams );
if( pSubscriptionLink != NULL )
{
pNewSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
/* The lengths of exactly matching topic filters must match. */
IotMqtt_Assert( pNewSubscription->topicFilterLength == pSubscriptionList[ i ].topicFilterLength );
/* Replace the callback and packet info with the new parameters. */
pNewSubscription->callback = pSubscriptionList[ i ].callback;
pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;
pNewSubscription->packetInfo.order = i;
}
else
{
/* Allocate memory for a new subscription. */
pNewSubscription = IotMqtt_MallocSubscription( sizeof( _mqttSubscription_t ) +
pSubscriptionList[ i ].topicFilterLength );
if( pNewSubscription == NULL )
{
status = IOT_MQTT_NO_MEMORY;
break;
}
else
{
/* Clear the new subscription. */
( void ) memset( pNewSubscription,
0x00,
sizeof( _mqttSubscription_t ) + pSubscriptionList[ i ].topicFilterLength );
/* Set the members of the new subscription and add it to the list. */
pNewSubscription->packetInfo.identifier = subscribePacketIdentifier;
pNewSubscription->packetInfo.order = i;
pNewSubscription->callback = pSubscriptionList[ i ].callback;
pNewSubscription->topicFilterLength = pSubscriptionList[ i ].topicFilterLength;
( void ) memcpy( pNewSubscription->pTopicFilter,
pSubscriptionList[ i ].pTopicFilter,
( size_t ) ( pSubscriptionList[ i ].topicFilterLength ) );
IotListDouble_InsertHead( &( pMqttConnection->subscriptionList ),
&( pNewSubscription->link ) );
}
}
}
IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
/* If memory allocation failed, remove all previously added subscriptions. */
if( status != IOT_MQTT_SUCCESS )
{
_IotMqtt_RemoveSubscriptionByTopicFilter( pMqttConnection,
pSubscriptionList,
i );
}
else
{
EMPTY_ELSE_MARKER;
}
return status;
}
/*-----------------------------------------------------------*/
void _IotMqtt_InvokeSubscriptionCallback( _mqttConnection_t * pMqttConnection,
IotMqttCallbackParam_t * pCallbackParam )
{
_mqttSubscription_t * pSubscription = NULL;
IotLink_t * pCurrentLink = NULL, * pNextLink = NULL;
void * pCallbackContext = NULL;
void ( * callbackFunction )( void *,
IotMqttCallbackParam_t * ) = NULL;
_topicMatchParams_t topicMatchParams = { 0 };
/* Set the members of the search parameter. */
topicMatchParams.pTopicName = pCallbackParam->u.message.info.pTopicName;
topicMatchParams.topicNameLength = pCallbackParam->u.message.info.topicNameLength;
topicMatchParams.exactMatchOnly = false;
/* Prevent any other thread from modifying the subscription list while this
* function is searching. */
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
/* Search the subscription list for all matching subscriptions starting at
* the list head. */
while( true )
{
pCurrentLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
pCurrentLink,
_topicMatch,
&topicMatchParams );
/* No subscription found. Exit loop. */
if( pCurrentLink == NULL )
{
break;
}
else
{
EMPTY_ELSE_MARKER;
}
/* Subscription found. Calculate pointer to subscription object. */
pSubscription = IotLink_Container( _mqttSubscription_t, pCurrentLink, link );
/* Subscription validation should not have allowed a NULL callback function. */
IotMqtt_Assert( pSubscription->callback.function != NULL );
/* Increment the subscription's reference count. */
( pSubscription->references )++;
/* Copy the necessary members of the subscription before releasing the
* subscription list mutex. */
pCallbackContext = pSubscription->callback.pCallbackContext;
callbackFunction = pSubscription->callback.function;
/* Unlock the subscription list mutex. */
IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
/* Set the members of the callback parameter. */
pCallbackParam->mqttConnection = pMqttConnection;
pCallbackParam->u.message.pTopicFilter = pSubscription->pTopicFilter;
pCallbackParam->u.message.topicFilterLength = pSubscription->topicFilterLength;
/* Invoke the subscription callback. */
callbackFunction( pCallbackContext, pCallbackParam );
/* Lock the subscription list mutex to decrement the reference count. */
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
/* Decrement the reference count. It must still be positive. */
( pSubscription->references )--;
IotMqtt_Assert( pSubscription->references >= 0 );
/* Save the pointer to the next link in case this subscription is freed. */
pNextLink = pCurrentLink->pNext;
/* Remove this subscription if it has no references and the unsubscribed
* flag is set. */
if( pSubscription->unsubscribed == true )
{
/* An unsubscribed subscription should have been removed from the list. */
IotMqtt_Assert( IotLink_IsLinked( &( pSubscription->link ) ) == false );
/* Free subscriptions with no references. */
if( pSubscription->references == 0 )
{
IotMqtt_FreeSubscription( pSubscription );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Move current link pointer. */
pCurrentLink = pNextLink;
}
IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
_IotMqtt_DecrementConnectionReferences( pMqttConnection );
}
/*-----------------------------------------------------------*/
void _IotMqtt_RemoveSubscriptionByPacket( _mqttConnection_t * pMqttConnection,
uint16_t packetIdentifier,
int32_t order )
{
_packetMatchParams_t packetMatchParams = { 0 };
/* Set the members of the search parameter. */
packetMatchParams.packetIdentifier = packetIdentifier;
packetMatchParams.order = order;
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
IotListDouble_RemoveAllMatches( &( pMqttConnection->subscriptionList ),
_packetMatch,
( void * ) ( &packetMatchParams ),
IotMqtt_FreeSubscription,
offsetof( _mqttSubscription_t, link ) );
IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
}
/*-----------------------------------------------------------*/
void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnection,
const IotMqttSubscription_t * pSubscriptionList,
size_t subscriptionCount )
{
size_t i = 0;
_mqttSubscription_t * pSubscription = NULL;
IotLink_t * pSubscriptionLink = NULL;
_topicMatchParams_t topicMatchParams = { 0 };
/* Prevent any other thread from modifying the subscription list while this
* function is running. */
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );
/* Find and remove each topic filter from the list. */
for( i = 0; i < subscriptionCount; i++ )
{
topicMatchParams.pTopicName = pSubscriptionList[ i ].pTopicFilter;
topicMatchParams.topicNameLength = pSubscriptionList[ i ].topicFilterLength;
topicMatchParams.exactMatchOnly = true;
pSubscriptionLink = IotListDouble_FindFirstMatch( &( pMqttConnection->subscriptionList ),
NULL,
_topicMatch,
&topicMatchParams );
if( pSubscriptionLink != NULL )
{
pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
/* Reference count must not be negative. */
IotMqtt_Assert( pSubscription->references >= 0 );
/* Remove subscription from list. */
IotListDouble_Remove( pSubscriptionLink );
/* Check the reference count. This subscription cannot be removed if
* there are subscription callbacks using it. */
if( pSubscription->references > 0 )
{
/* Set the unsubscribed flag. The last active subscription callback
* will remove and clean up this subscription. */
pSubscription->unsubscribed = true;
}
else
{
/* Free a subscription with no references. */
IotMqtt_FreeSubscription( pSubscription );
}
}
else
{
EMPTY_ELSE_MARKER;
}
}
IotMutex_Unlock( &( pMqttConnection->subscriptionMutex ) );
}
/*-----------------------------------------------------------*/
bool IotMqtt_IsSubscribed( IotMqttConnection_t mqttConnection,
const char * pTopicFilter,
uint16_t topicFilterLength,
IotMqttSubscription_t * const pCurrentSubscription )
{
bool status = false;
_mqttSubscription_t * pSubscription = NULL;
IotLink_t * pSubscriptionLink = NULL;
_topicMatchParams_t topicMatchParams = { 0 };
/* Set the members of the search parameter. */
topicMatchParams.pTopicName = pTopicFilter;
topicMatchParams.topicNameLength = topicFilterLength;
topicMatchParams.exactMatchOnly = true;
/* Prevent any other thread from modifying the subscription list while this
* function is running. */
IotMutex_Lock( &( mqttConnection->subscriptionMutex ) );
/* Search for a matching subscription. */
pSubscriptionLink = IotListDouble_FindFirstMatch( &( mqttConnection->subscriptionList ),
NULL,
_topicMatch,
&topicMatchParams );
/* Check if a matching subscription was found. */
if( pSubscriptionLink != NULL )
{
pSubscription = IotLink_Container( _mqttSubscription_t, pSubscriptionLink, link );
/* Copy the matching subscription to the output parameter. */
if( pCurrentSubscription != NULL )
{
pCurrentSubscription->pTopicFilter = pTopicFilter;
pCurrentSubscription->topicFilterLength = topicFilterLength;
pCurrentSubscription->qos = IOT_MQTT_QOS_0;
pCurrentSubscription->callback = pSubscription->callback;
}
else
{
EMPTY_ELSE_MARKER;
}
status = true;
}
else
{
EMPTY_ELSE_MARKER;
}
IotMutex_Unlock( &( mqttConnection->subscriptionMutex ) );
return status;
}
/*-----------------------------------------------------------*/
/* Provide access to internal functions and variables if testing. */
#if IOT_BUILD_TESTS == 1
#include "iot_test_access_mqtt_subscription.c"
#endif

View file

@ -0,0 +1,637 @@
/*
* IoT MQTT V2.1.0
* Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* @file iot_mqtt_validate.c
* @brief Implements functions that validate the structs of the MQTT library.
*/
/* The config header is always included first. */
#include "iot_config.h"
/* Error handling include. */
#include "iot_error.h"
/* MQTT internal include. */
#include "private/iot_mqtt_internal.h"
/**
* @brief Check that an #IotMqttPublishInfo_t is valid.
*
* @param[in] awsIotMqttMode Specifies if this PUBLISH packet is being sent to
* an AWS IoT MQTT server.
* @param[in] maximumPayloadLength Maximum payload length.
* @param[in] pPublishTypeDescription String describing the publish type.
* @param[in] pPublishInfo The #IotMqttPublishInfo_t to validate.
*
* @return `true` if `pPublishInfo` is valid; `false` otherwise.
*/
static bool _validatePublish( bool awsIotMqttMode,
size_t maximumPayloadLength,
const char * pPublishTypeDescription,
const IotMqttPublishInfo_t * pPublishInfo );
/*-----------------------------------------------------------*/
bool _IotMqtt_ValidateConnect( const IotMqttConnectInfo_t * pConnectInfo )
{
IOT_FUNCTION_ENTRY( bool, true );
uint16_t maxClientIdLength = MQTT_SERVER_MAX_CLIENTID_LENGTH;
bool enforceMaxClientIdLength = false;
/* Check for NULL. */
if( pConnectInfo == NULL )
{
IotLogError( "MQTT connection information cannot be NULL." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check that a client identifier was set. */
if( pConnectInfo->pClientIdentifier == NULL )
{
IotLogError( "Client identifier must be set." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check for a zero-length client identifier. Zero-length client identifiers
* are not allowed with clean sessions. */
if( pConnectInfo->clientIdentifierLength == 0 )
{
IotLogWarn( "A zero-length client identifier was provided." );
if( pConnectInfo->cleanSession == true )
{
IotLogError( "A zero-length client identifier cannot be used with a clean session." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check that the number of persistent session subscriptions is valid. */
if( pConnectInfo->pPreviousSubscriptions != NULL )
{
if( _IotMqtt_ValidateSubscriptionList( IOT_MQTT_SUBSCRIBE,
pConnectInfo->awsIotMqttMode,
pConnectInfo->pPreviousSubscriptions,
pConnectInfo->previousSubscriptionCount ) == false )
{
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* If will info is provided, check that it is valid. */
if( pConnectInfo->pWillInfo != NULL )
{
if( _IotMqtt_ValidateLwtPublish( pConnectInfo->awsIotMqttMode,
pConnectInfo->pWillInfo ) == false )
{
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* The AWS IoT MQTT service enforces a client ID length limit. */
if( pConnectInfo->awsIotMqttMode == true )
{
maxClientIdLength = AWS_IOT_MQTT_SERVER_MAX_CLIENTID_LENGTH;
enforceMaxClientIdLength = true;
}
if( pConnectInfo->clientIdentifierLength > maxClientIdLength )
{
if( enforceMaxClientIdLength == false )
{
IotLogWarn( "A client identifier length of %hu is longer than %hu, "
"which is "
"the longest client identifier a server must accept.",
pConnectInfo->clientIdentifierLength,
maxClientIdLength );
}
else
{
IotLogError( "A client identifier length of %hu exceeds the "
"maximum supported length of %hu.",
pConnectInfo->clientIdentifierLength,
maxClientIdLength );
IOT_SET_AND_GOTO_CLEANUP( false );
}
}
else
{
EMPTY_ELSE_MARKER;
}
IOT_FUNCTION_EXIT_NO_CLEANUP();
}
/*-----------------------------------------------------------*/
static bool _validatePublish( bool awsIotMqttMode,
size_t maximumPayloadLength,
const char * pPublishTypeDescription,
const IotMqttPublishInfo_t * pPublishInfo )
{
IOT_FUNCTION_ENTRY( bool, true );
/* Check for NULL. */
if( pPublishInfo == NULL )
{
IotLogError( "Publish information cannot be NULL." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check topic name for NULL or zero-length. */
if( pPublishInfo->pTopicName == NULL )
{
IotLogError( "Publish topic name must be set." );
}
else
{
EMPTY_ELSE_MARKER;
}
if( pPublishInfo->topicNameLength == 0 )
{
IotLogError( "Publish topic name length cannot be 0." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
if( pPublishInfo->payloadLength != 0 )
{
if( pPublishInfo->payloadLength > maximumPayloadLength )
{
IotLogError( "%s payload size of %zu exceeds maximum length of %zu.",
pPublishTypeDescription,
pPublishInfo->payloadLength,
maximumPayloadLength );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
if( pPublishInfo->pPayload == NULL )
{
IotLogError( "Nonzero payload length cannot have a NULL payload." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check for a valid QoS. */
if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
{
if( pPublishInfo->qos != IOT_MQTT_QOS_1 )
{
IotLogError( "Publish QoS must be either 0 or 1." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check the retry parameters. */
if( pPublishInfo->retryLimit > 0 )
{
if( pPublishInfo->retryMs == 0 )
{
IotLogError( "Publish retry time must be positive." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check for compatibility with AWS IoT MQTT server. */
if( awsIotMqttMode == true )
{
/* Check for retained message. */
if( pPublishInfo->retain == true )
{
IotLogError( "AWS IoT does not support retained publish messages." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check topic name length. */
if( pPublishInfo->topicNameLength > AWS_IOT_MQTT_SERVER_MAX_TOPIC_LENGTH )
{
IotLogError( "AWS IoT does not support topic names longer than %d bytes.",
AWS_IOT_MQTT_SERVER_MAX_TOPIC_LENGTH );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
IOT_FUNCTION_EXIT_NO_CLEANUP();
}
/*-----------------------------------------------------------*/
bool _IotMqtt_ValidatePublish( bool awsIotMqttMode,
const IotMqttPublishInfo_t * pPublishInfo )
{
size_t maximumPayloadLength = MQTT_SERVER_MAX_PUBLISH_PAYLOAD_LENGTH;
if( awsIotMqttMode == true )
{
maximumPayloadLength = AWS_IOT_MQTT_SERVER_MAX_PUBLISH_PAYLOAD_LENGTH;
}
else
{
EMPTY_ELSE_MARKER;
}
return _validatePublish( awsIotMqttMode,
maximumPayloadLength,
"Publish",
pPublishInfo );
}
/*-----------------------------------------------------------*/
bool _IotMqtt_ValidateLwtPublish( bool awsIotMqttMode,
const IotMqttPublishInfo_t * pLwtPublishInfo )
{
return _validatePublish( awsIotMqttMode,
MQTT_SERVER_MAX_LWT_PAYLOAD_LENGTH,
"LWT",
pLwtPublishInfo );
}
/*-----------------------------------------------------------*/
bool _IotMqtt_ValidateOperation( IotMqttOperation_t operation )
{
IOT_FUNCTION_ENTRY( bool, true );
/* Check for NULL. */
if( operation == NULL )
{
IotLogError( "Operation reference cannot be NULL." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check that reference is waitable. */
if( ( operation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE ) != IOT_MQTT_FLAG_WAITABLE )
{
IotLogError( "Operation is not waitable." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
IOT_FUNCTION_EXIT_NO_CLEANUP();
}
/*-----------------------------------------------------------*/
bool _IotMqtt_ValidateSubscriptionList( IotMqttOperationType_t operation,
bool awsIotMqttMode,
const IotMqttSubscription_t * pListStart,
size_t listSize )
{
IOT_FUNCTION_ENTRY( bool, true );
size_t i = 0;
uint16_t j = 0;
const IotMqttSubscription_t * pListElement = NULL;
/* Operation must be either subscribe or unsubscribe. */
IotMqtt_Assert( ( operation == IOT_MQTT_SUBSCRIBE ) ||
( operation == IOT_MQTT_UNSUBSCRIBE ) );
/* Check for empty list. */
if( pListStart == NULL )
{
IotLogError( "Subscription list pointer cannot be NULL." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
if( listSize == 0 )
{
IotLogError( "Empty subscription list." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* AWS IoT supports at most 8 topic filters in a single SUBSCRIBE packet. */
if( awsIotMqttMode == true )
{
if( listSize > AWS_IOT_MQTT_SERVER_MAX_TOPIC_FILTERS_PER_SUBSCRIBE )
{
IotLogError( "AWS IoT does not support more than %d topic filters per "
"subscription request.",
AWS_IOT_MQTT_SERVER_MAX_TOPIC_FILTERS_PER_SUBSCRIBE );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
for( i = 0; i < listSize; i++ )
{
pListElement = &( pListStart[ i ] );
/* Check for a valid QoS and callback function when subscribing. */
if( operation == IOT_MQTT_SUBSCRIBE )
{
if( pListElement->qos != IOT_MQTT_QOS_0 )
{
if( pListElement->qos != IOT_MQTT_QOS_1 )
{
IotLogError( "Subscription QoS must be either 0 or 1." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
if( pListElement->callback.function == NULL )
{
IotLogError( "Callback function must be set." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check subscription topic filter. */
if( pListElement->pTopicFilter == NULL )
{
IotLogError( "Subscription topic filter cannot be NULL." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
if( pListElement->topicFilterLength == 0 )
{
IotLogError( "Subscription topic filter length cannot be 0." );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check for compatibility with AWS IoT MQTT server. */
if( awsIotMqttMode == true )
{
/* Check topic filter length. */
if( pListElement->topicFilterLength > AWS_IOT_MQTT_SERVER_MAX_TOPIC_LENGTH )
{
IotLogError( "AWS IoT does not support topic filters longer than %d bytes.",
AWS_IOT_MQTT_SERVER_MAX_TOPIC_LENGTH );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check that the wildcards '+' and '#' are being used correctly. */
for( j = 0; j < pListElement->topicFilterLength; j++ )
{
switch( pListElement->pTopicFilter[ j ] )
{
/* Check that the single level wildcard '+' is used correctly. */
case '+':
/* Unless '+' is the first character in the filter, it must be preceded by '/'. */
if( j > 0 )
{
if( pListElement->pTopicFilter[ j - 1 ] != '/' )
{
IotLogError( "Invalid topic filter %.*s -- '+' must be preceded by '/'.",
pListElement->topicFilterLength,
pListElement->pTopicFilter );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Unless '+' is the last character in the filter, it must be succeeded by '/'. */
if( j < pListElement->topicFilterLength - 1 )
{
if( pListElement->pTopicFilter[ j + 1 ] != '/' )
{
IotLogError( "Invalid topic filter %.*s -- '+' must be succeeded by '/'.",
pListElement->topicFilterLength,
pListElement->pTopicFilter );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
break;
/* Check that the multi-level wildcard '#' is used correctly. */
case '#':
/* '#' must be the last character in the filter. */
if( j != pListElement->topicFilterLength - 1 )
{
IotLogError( "Invalid topic filter %.*s -- '#' must be the last character.",
pListElement->topicFilterLength,
pListElement->pTopicFilter );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Unless '#' is standalone, it must be preceded by '/'. */
if( pListElement->topicFilterLength > 1 )
{
if( pListElement->pTopicFilter[ j - 1 ] != '/' )
{
IotLogError( "Invalid topic filter %.*s -- '#' must be preceded by '/'.",
pListElement->topicFilterLength,
pListElement->pTopicFilter );
IOT_SET_AND_GOTO_CLEANUP( false );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
break;
default:
break;
}
}
}
IOT_FUNCTION_EXIT_NO_CLEANUP();
}
/*-----------------------------------------------------------*/