Add Stream Batching Buffer (#916)

The difference between a stream buffer and a stream batching buffer is when
a task performs read on a non-empty buffer:
- The task reading from a non-empty stream buffer returns immediately
   regardless of the amount of data in the buffer.
- The task reading from a non-empty steam batching buffer blocks until the
   amount of data in the buffer exceeds the trigger level or the block time
   expires.
This commit is contained in:
Caleb Perkinson 2024-04-17 10:54:00 -04:00 committed by GitHub
parent 5a72344c9a
commit f69b1db45c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 260 additions and 44 deletions

View file

@ -224,6 +224,7 @@
/* Bits stored in the ucFlags field of the stream buffer. */
#define sbFLAGS_IS_MESSAGE_BUFFER ( ( uint8_t ) 1 ) /* Set if the stream buffer was created as a message buffer, in which case it holds discrete messages rather than a stream. */
#define sbFLAGS_IS_STATICALLY_ALLOCATED ( ( uint8_t ) 2 ) /* Set if the stream buffer was created using statically allocated memory. */
#define sbFLAGS_IS_BATCHING_BUFFER ( ( uint8_t ) 4 ) /* Set if the stream buffer was created as a batching buffer, meaning the receiver task will only unblock when the trigger level exceededs. */
/*-----------------------------------------------------------*/
@ -329,25 +330,31 @@ static void prvInitialiseNewStreamBuffer( StreamBuffer_t * const pxStreamBuffer,
#if ( configSUPPORT_DYNAMIC_ALLOCATION == 1 )
StreamBufferHandle_t xStreamBufferGenericCreate( size_t xBufferSizeBytes,
size_t xTriggerLevelBytes,
BaseType_t xIsMessageBuffer,
BaseType_t xStreamBufferType,
StreamBufferCallbackFunction_t pxSendCompletedCallback,
StreamBufferCallbackFunction_t pxReceiveCompletedCallback )
{
void * pvAllocatedMemory;
uint8_t ucFlags;
traceENTER_xStreamBufferGenericCreate( xBufferSizeBytes, xTriggerLevelBytes, xIsMessageBuffer, pxSendCompletedCallback, pxReceiveCompletedCallback );
traceENTER_xStreamBufferGenericCreate( xBufferSizeBytes, xTriggerLevelBytes, xStreamBufferType, pxSendCompletedCallback, pxReceiveCompletedCallback );
/* In case the stream buffer is going to be used as a message buffer
* (that is, it will hold discrete messages with a little meta data that
* says how big the next message is) check the buffer will be large enough
* to hold at least one message. */
if( xIsMessageBuffer == pdTRUE )
if( xStreamBufferType == sbTYPE_MESSAGE_BUFFER )
{
/* Is a message buffer but not statically allocated. */
ucFlags = sbFLAGS_IS_MESSAGE_BUFFER;
configASSERT( xBufferSizeBytes > sbBYTES_TO_STORE_MESSAGE_LENGTH );
}
else if( xStreamBufferType == sbTYPE_STREAM_BATCHING_BUFFER )
{
/* Is a batching buffer but not statically allocated. */
ucFlags = sbFLAGS_IS_BATCHING_BUFFER;
configASSERT( xBufferSizeBytes > 0 );
}
else
{
/* Not a message buffer and not statically allocated. */
@ -398,11 +405,11 @@ static void prvInitialiseNewStreamBuffer( StreamBuffer_t * const pxStreamBuffer,
pxSendCompletedCallback,
pxReceiveCompletedCallback );
traceSTREAM_BUFFER_CREATE( ( ( StreamBuffer_t * ) pvAllocatedMemory ), xIsMessageBuffer );
traceSTREAM_BUFFER_CREATE( ( ( StreamBuffer_t * ) pvAllocatedMemory ), xStreamBufferType );
}
else
{
traceSTREAM_BUFFER_CREATE_FAILED( xIsMessageBuffer );
traceSTREAM_BUFFER_CREATE_FAILED( xStreamBufferType );
}
traceRETURN_xStreamBufferGenericCreate( pvAllocatedMemory );
@ -419,7 +426,7 @@ static void prvInitialiseNewStreamBuffer( StreamBuffer_t * const pxStreamBuffer,
StreamBufferHandle_t xStreamBufferGenericCreateStatic( size_t xBufferSizeBytes,
size_t xTriggerLevelBytes,
BaseType_t xIsMessageBuffer,
BaseType_t xStreamBufferType,
uint8_t * const pucStreamBufferStorageArea,
StaticStreamBuffer_t * const pxStaticStreamBuffer,
StreamBufferCallbackFunction_t pxSendCompletedCallback,
@ -432,7 +439,7 @@ static void prvInitialiseNewStreamBuffer( StreamBuffer_t * const pxStreamBuffer,
StreamBufferHandle_t xReturn;
uint8_t ucFlags;
traceENTER_xStreamBufferGenericCreateStatic( xBufferSizeBytes, xTriggerLevelBytes, xIsMessageBuffer, pucStreamBufferStorageArea, pxStaticStreamBuffer, pxSendCompletedCallback, pxReceiveCompletedCallback );
traceENTER_xStreamBufferGenericCreateStatic( xBufferSizeBytes, xTriggerLevelBytes, xStreamBufferType, pucStreamBufferStorageArea, pxStaticStreamBuffer, pxSendCompletedCallback, pxReceiveCompletedCallback );
configASSERT( pucStreamBufferStorageArea );
configASSERT( pxStaticStreamBuffer );
@ -450,12 +457,18 @@ static void prvInitialiseNewStreamBuffer( StreamBuffer_t * const pxStreamBuffer,
* says how big the next message is) check the buffer will be large enough
* to hold at least one message. */
if( xIsMessageBuffer != pdFALSE )
if( xStreamBufferType == sbTYPE_MESSAGE_BUFFER )
{
/* Statically allocated message buffer. */
ucFlags = sbFLAGS_IS_MESSAGE_BUFFER | sbFLAGS_IS_STATICALLY_ALLOCATED;
configASSERT( xBufferSizeBytes > sbBYTES_TO_STORE_MESSAGE_LENGTH );
}
else if( xStreamBufferType == sbTYPE_STREAM_BATCHING_BUFFER )
{
/* Statically allocated batching buffer. */
ucFlags = sbFLAGS_IS_BATCHING_BUFFER | sbFLAGS_IS_STATICALLY_ALLOCATED;
configASSERT( xBufferSizeBytes > 0 );
}
else
{
/* Statically allocated stream buffer. */
@ -486,7 +499,7 @@ static void prvInitialiseNewStreamBuffer( StreamBuffer_t * const pxStreamBuffer,
* again. */
pxStreamBuffer->ucFlags |= sbFLAGS_IS_STATICALLY_ALLOCATED;
traceSTREAM_BUFFER_CREATE( pxStreamBuffer, xIsMessageBuffer );
traceSTREAM_BUFFER_CREATE( pxStreamBuffer, xStreamBufferType );
/* MISRA Ref 11.3.1 [Misaligned access] */
/* More details at: https://github.com/FreeRTOS/FreeRTOS-Kernel/blob/main/MISRA.md#rule-113 */
@ -496,7 +509,7 @@ static void prvInitialiseNewStreamBuffer( StreamBuffer_t * const pxStreamBuffer,
else
{
xReturn = NULL;
traceSTREAM_BUFFER_CREATE_STATIC_FAILED( xReturn, xIsMessageBuffer );
traceSTREAM_BUFFER_CREATE_STATIC_FAILED( xReturn, xStreamBufferType );
}
traceRETURN_xStreamBufferGenericCreateStatic( xReturn );
@ -1053,6 +1066,12 @@ size_t xStreamBufferReceive( StreamBufferHandle_t xStreamBuffer,
{
xBytesToStoreMessageLength = sbBYTES_TO_STORE_MESSAGE_LENGTH;
}
else if( ( pxStreamBuffer->ucFlags & sbFLAGS_IS_BATCHING_BUFFER ) != ( uint8_t ) 0 )
{
/* Force task to block if the batching buffer contains less bytes than
* the trigger level. */
xBytesToStoreMessageLength = pxStreamBuffer->xTriggerLevelBytes;
}
else
{
xBytesToStoreMessageLength = 0;
@ -1070,7 +1089,9 @@ size_t xStreamBufferReceive( StreamBufferHandle_t xStreamBuffer,
* xBytesToStoreMessageLength holds the number of bytes used to hold
* the length of the next discrete message. If this function was
* invoked by a stream buffer read then xBytesToStoreMessageLength will
* be 0. */
* be 0. If this function was invoked by a stream batch buffer read
* then xBytesToStoreMessageLength will be xTriggerLevelBytes value
* for the buffer.*/
if( xBytesAvailable <= xBytesToStoreMessageLength )
{
/* Clear notification state as going to wait for data. */