From 5df074422418c31099a9f496b588163145505b96 Mon Sep 17 00:00:00 2001 From: Miles Frain Date: Thu, 18 Feb 2021 15:08:56 -0800 Subject: [PATCH] Perform atomic message read in prvReadBytesFromBuffer --- stream_buffer.c | 76 ++++++++++++++++++++++++------------------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/stream_buffer.c b/stream_buffer.c index f6065ca15..6673ef1ea 100644 --- a/stream_buffer.c +++ b/stream_buffer.c @@ -200,13 +200,21 @@ static size_t prvWriteMessageToBuffer( StreamBuffer_t * const pxStreamBuffer, size_t xRequiredSpace ) PRIVILEGED_FUNCTION; /* - * Read xMaxCount bytes from the pxStreamBuffer message buffer and write them - * to pucData. + * Copies xCount bytes from the pxStreamBuffer's data storage area to pucData. + * This function does not update the buffer's xTail pointer, so multiple reads + * may be chained together "atomically". This is useful for Message Buffers where + * the length and data bytes are read in two separate chunks, and we don't want + * the writer to see the buffer as having more free space until after all data is + * copied over, especially if we have to abort the read due to insufficient receiving space. + * This function takes a custom xTail value to indicate where to read from (necessary + * for chaining) and returns the the resulting xTail position. + * To mark the read as complete, manually set the buffer's xTail field with the + * returned xTail from this function. */ static size_t prvReadBytesFromBuffer( StreamBuffer_t * pxStreamBuffer, uint8_t * pucData, - size_t xMaxCount, - size_t xBytesAvailable ) PRIVILEGED_FUNCTION; + size_t xCount, + size_t xTail ) PRIVILEGED_FUNCTION; /* * Called by both pxStreamBufferCreate() and pxStreamBufferCreateStatic() to @@ -864,7 +872,7 @@ size_t xStreamBufferReceive( StreamBufferHandle_t xStreamBuffer, size_t xStreamBufferNextMessageLengthBytes( StreamBufferHandle_t xStreamBuffer ) { StreamBuffer_t * const pxStreamBuffer = xStreamBuffer; - size_t xReturn, xBytesAvailable, xOriginalTail; + size_t xReturn, xBytesAvailable; configMESSAGE_BUFFER_LENGTH_TYPE xTempReturn; configASSERT( pxStreamBuffer ); @@ -878,14 +886,9 @@ size_t xStreamBufferNextMessageLengthBytes( StreamBufferHandle_t xStreamBuffer ) { /* The number of bytes available is greater than the number of bytes * required to hold the length of the next message, so another message - * is available. Return its length without removing the length bytes - * from the buffer. A copy of the tail is stored so the buffer can be - * returned to its prior state as the message is not actually being - * removed from the buffer. */ - xOriginalTail = pxStreamBuffer->xTail; - ( void ) prvReadBytesFromBuffer( pxStreamBuffer, ( uint8_t * ) &xTempReturn, sbBYTES_TO_STORE_MESSAGE_LENGTH, xBytesAvailable ); - xReturn = ( size_t ) xTempReturn; - pxStreamBuffer->xTail = xOriginalTail; + * is available. */ + ( void ) prvReadBytesFromBuffer( pxStreamBuffer, ( uint8_t * ) &xTempReturn, sbBYTES_TO_STORE_MESSAGE_LENGTH, pxStreamBuffer->xTail); + xReturn = ( size_t ) xTempReturn; } else { @@ -968,17 +971,16 @@ static size_t prvReadMessageFromBuffer( StreamBuffer_t * pxStreamBuffer, size_t xBufferLengthBytes, size_t xBytesAvailable ) { - size_t xOriginalTail, xReceivedLength, xNextMessageLength; + size_t xTail, xCount, xNextMessageLength; configMESSAGE_BUFFER_LENGTH_TYPE xTempNextMessageLength; + xTail = pxStreamBuffer->xTail; + if( ( pxStreamBuffer->ucFlags & sbFLAGS_IS_MESSAGE_BUFFER ) != ( uint8_t ) 0 ) { /* A discrete message is being received. First receive the length - * of the message. A copy of the tail is stored so the buffer can be - * returned to its prior state if the length of the message is too - * large for the provided buffer. */ - xOriginalTail = pxStreamBuffer->xTail; - ( void ) prvReadBytesFromBuffer( pxStreamBuffer, ( uint8_t * ) &xTempNextMessageLength, sbBYTES_TO_STORE_MESSAGE_LENGTH, xBytesAvailable ); + * of the message. */ + xTail = prvReadBytesFromBuffer( pxStreamBuffer, ( uint8_t * ) &xTempNextMessageLength, sbBYTES_TO_STORE_MESSAGE_LENGTH, xTail ); xNextMessageLength = ( size_t ) xTempNextMessageLength; /* Reduce the number of bytes available by the number of bytes just @@ -989,10 +991,7 @@ static size_t prvReadMessageFromBuffer( StreamBuffer_t * pxStreamBuffer, * user. */ if( xNextMessageLength > xBufferLengthBytes ) { - /* The user has provided insufficient space to read the message - * so return the buffer to its previous state (so the length of - * the message is in the buffer again). */ - pxStreamBuffer->xTail = xOriginalTail; + /* The user has provided insufficient space to read the message. */ xNextMessageLength = 0; } else @@ -1007,10 +1006,16 @@ static size_t prvReadMessageFromBuffer( StreamBuffer_t * pxStreamBuffer, xNextMessageLength = xBufferLengthBytes; } - /* Read the actual data. */ - xReceivedLength = prvReadBytesFromBuffer( pxStreamBuffer, ( uint8_t * ) pvRxData, xNextMessageLength, xBytesAvailable ); /*lint !e9079 Data storage area is implemented as uint8_t array for ease of sizing, indexing and alignment. */ + /* Use the minimum of the wanted bytes and the available bytes. */ + xCount = configMIN( xNextMessageLength, xBytesAvailable ); - return xReceivedLength; + /* Read the actual data. */ + xTail = prvReadBytesFromBuffer( pxStreamBuffer, ( uint8_t * ) pvRxData, xCount, xTail); /*lint !e9079 Data storage area is implemented as uint8_t array for ease of sizing, indexing and alignment. */ + + /* Update the tail to mark the data as officialy consumed. */ + pxStreamBuffer->xTail = xTail; + + return xCount; } /*-----------------------------------------------------------*/ @@ -1185,17 +1190,14 @@ static size_t prvWriteBytesToBuffer( StreamBuffer_t * const pxStreamBuffer, static size_t prvReadBytesFromBuffer( StreamBuffer_t * pxStreamBuffer, uint8_t * pucData, - size_t xMaxCount, - size_t xBytesAvailable ) + size_t xCount, + size_t xTail ) { - size_t xCount, xFirstLength, xNextTail; - - /* Use the minimum of the wanted bytes and the available bytes. */ - xCount = configMIN( xBytesAvailable, xMaxCount ); + size_t xFirstLength, xNextTail; if( xCount > ( size_t ) 0 ) { - xNextTail = pxStreamBuffer->xTail; + xNextTail = xTail; /* Calculate the number of bytes that can be read - which may be * less than the number wanted if the data wraps around to the start of @@ -1204,7 +1206,7 @@ static size_t prvReadBytesFromBuffer( StreamBuffer_t * pxStreamBuffer, /* Obtain the number of bytes it is possible to obtain in the first * read. Asserts check bounds of read and write. */ - configASSERT( xFirstLength <= xMaxCount ); + configASSERT( xFirstLength <= xCount ); configASSERT( ( xNextTail + xFirstLength ) <= pxStreamBuffer->xLength ); ( void ) memcpy( ( void * ) pucData, ( const void * ) &( pxStreamBuffer->pucBuffer[ xNextTail ] ), xFirstLength ); /*lint !e9087 memcpy() requires void *. */ @@ -1213,7 +1215,7 @@ static size_t prvReadBytesFromBuffer( StreamBuffer_t * pxStreamBuffer, if( xCount > xFirstLength ) { /*...then read the remaining bytes from the start of the buffer. */ - configASSERT( xCount <= xMaxCount ); + configASSERT( xCount <= xCount ); ( void ) memcpy( ( void * ) &( pucData[ xFirstLength ] ), ( void * ) ( pxStreamBuffer->pucBuffer ), xCount - xFirstLength ); /*lint !e9087 memcpy() requires void *. */ } else @@ -1229,15 +1231,13 @@ static size_t prvReadBytesFromBuffer( StreamBuffer_t * pxStreamBuffer, { xNextTail -= pxStreamBuffer->xLength; } - - pxStreamBuffer->xTail = xNextTail; } else { mtCOVERAGE_TEST_MARKER(); } - return xCount; + return xNextTail; } /*-----------------------------------------------------------*/