FreeRTOS에서 어떻게 공유자원에 대한 접근을 제한해 상호 배제를 획득하는지에 대한 시리즈이다.
혹시 상호 배제에 대한 개념, 그리고 Critical Section과 Scheduler Suspend에 대한 내용이 궁금하다면 이전 글을 확인해보자.
ㅤ
Mutex와 Binary Semaphore
앞서 확인했던 Critical Section과 Scheduler Suspend는 차단 방식으로 구성된다. Critical Section은 인터럽트를 차단해버렸고, Scheduler Suspend는 스케줄러를 차단해버렸다.
ㅤ
Mutex와 Semaphore는 차단이 아닌 “대기” 방식으로 상호 배제를 획득한다.
- Mutex : 상호 배제(MUTual EXclusion)을 위한 잠금 매커니즘
- Binary Semaphore : 0 또는 1의 값을 가지는 신호 매커니즘
ㅤ
// Mutex
SemaphoreHandle_t xMutex;
// 생성 (초기값: 사용 가능)
xMutex = xSemaphoreCreateMutex();
// 사용
xSemaphoreTake(xMutex, portMAX_DELAY); // 잠금
// 여기에서 공유 자원 접근
xSemaphoreGive(xMutex); // 잠금 해제
// Binary Semaphore
SemaphoreHandle_t xSemaphore;
// 생성 (초기값: 사용 가능)
xSemaphore = xSemaphoreCreateBinary();
xSemaphoreGive(xSemaphore); // 1로 설정
// 사용
xSemaphoreTake(xSemaphore, portMAX_DELAY); // 1 → 0
// 여기에서 공유 자원 접근
xSemaphoreGive(xSemaphore); // 0 → 1
ㅤ
얼핏 보면 Mutex와 Binary Semaphore는 비슷하게 사용하고 동작하는 것 같지만, 내부 구조는 매우 다르다! 그래서, 이 둘을 사용하는 목적도 다르다.
| 바이너리 세마포어 | 뮤텍스 | |
|---|---|---|
| 소유권의 개념 | 없음 | 있음! |
| 우선순위 상속 | 없음 | 있음!! |
| Recursive 획득 | 불가 | 불가 (Recursive Mutex 별도) |
| ISR에서 사용 | 가능 (FromISR) | 불가능 |
| Give는 누가? | 아무나 가능 | 소유자만 가능 |
| 목적 | Synchronization 신호 전달 | 상호 배제 |
ㅤ
이번 글에서는 두 방식 중 Mutex의 구조와 동작에 대해서 한 번 살펴보자.
ㅤ
Mutex에서 상호 배제를 얻는 방법
Mutex는 하나의 KEY를 두고, 이 KEY를 가지고 있는 Task (프로세스) 만이 특정한 코드 구역에 접근할 수 있도록 만드는 방법이다. 이건 하드웨어가 아닌 소프트웨어적(코드 베이스)으로 구현된 방식이기 때문에 특정한 매커니즘이 있는게 아니라 그냥 자료구조로 실행 흐름을 제어하는 것이다. (그런 의미에서 Critical Section과 다름)
ㅤ
양질의 FreeRTOS Document에 있는 그림이다! Task A와 Task B가 하나의 공유되는 자원을 점유하기 위해서 Mutex를 사용하는 과정에 대해 설명해둔 그림이다.
자원을 식당에 있는 화장실, Mutex를 화장실 열쇠라고 생각하면 아주 유쾌하고 명쾌하다. ㅋㅋㅋㅋ

어떤 Task 이든 Mutex에 의해 보호되는 자원에 접근하고 사용하기 위해서는 Mutex를 점유해야한다. (Hold) 위 그림에서 TaskA는 먼저 Mutex를 점유하고, 그 다음 자원에 접근해 사용하고 있다.
ㅤ

만약 TaskB가 이 자원에 접근하고자 할 때, Mutex를 이미 Task A가 점유중인 상태이므로 Task B는 자원에 접근할 수 없다. Task A가 모든 작업을 마치고 Mutex의 점유를 해제할 때까지 Task B는 잠시 Blocked 상태에 들어가 대기한다.
ㅤ

그 다음 TaskA가 자원의 사용을 마무리하고 Mutex를 반환했을 때, 비로소 Task B가 이 Mutex를 다시 점유하고, 자원을 사용할 수 있다. Task B 역시 자원 사용을 마치고 나면 이 Mutex를 반환해, 다른 Task에서 자원을 사용할 수 있도록 해준다.
ㅤ
요런 방식으로 공유자원에 한 번에 하나의 Task 만 접근하도록 논리를 구성하는 방법이 Mutex이다.
물론 이게 “논리적”으로 구현되어 있는 방식이기 때문에, 누가 약속을 어기고 그냥 냅다 무시하고 리소스에 접근해버린다면 막을 수 없다. 하드웨어적으로 보호할 수 있는 영역이 아니기 때문.
ㅤ
Mutex에서만 발생하는 우선순위 상속
우선순위 상속에 대해 논하기 전에, 우선순위 역전에 대해서 먼저 알아보자.
ㅤ
우선순위 역전
우선순위 역전이란 높은 우선순위의 작업이 낮은 우선순위의 작업에 의해 지연되는 현상을 말한다.

ㅤ
위 그림에서 3가지 우선순위의 Task가 실행되고 있다. 위 상황에서 우선순위의 역전이 발생하고 있다.
- 처음에 낮은 우선순위의 Task가 자원을 점유해서 사용하고 있었다.
- 높은 우선순위의 Task가 들어와서 자원을 사용하려고 했는데, 낮은 우선순위 작업이 이를 사용중이라 점유가 끝날 때까지 대기에 들어간다. 이건 우선순위 역전이 아니다. (자원 점유가 끝날 때까지 기다려주는건 매우 자연스러운 동작이다!)
- 낮은 우선순위의 Task가 열심히 작업을 하던 중, 중간 우선순위의 Task가 들어왔다. 우선순위 원칙에 따라서 낮은 우선순위 Task에서 중간 우선순위의 Task에게 자리를 양보해야한다. 이때, 중간 우선순위의 Task 의 작업을 기다리느라 높은 우선순위의 Task가 더 오래 기다려야 한다. 위 그림에서 (3)의 순간에 우선순위 역전이 발생한다!
ㅤ
우선순위 상속
위 문제를 해결하기 위해서 “우선순위 상속 (Priority Inheritance)”을 사용한다.
ㅤ
우선순위 상속이란, 높은 우선순위의 Task가 낮은 Task를 기다리는 경우, 해당 Task를 잠시동안 자신의 우선순위와 동등하게 끌어올리는 것을 말한다. 이를 통해서 중간 우선순위의 Task가 감히 끼어들지 못해 높은 우선순위 Task를 최대한 빠르게 처리하고자 한다.

ㅤ
우리의 FreeRTOS에서 우선순위가 진짜로 상속되는지 확인하기 위해, 위 우선순위 역전의 상황을 코드로 만들어주었다.



이 지점에서 LowTask가 HighTask와 같은 우선순위로 올라선다! 디버거에서도 LowTask의 Actual Priority가 HighTask와 같은 3으로 올라선 것을 볼 수 있다. HighTask가 LowTask에게 우선순위를 상속해주는 순간이다!
ㅤ

HighTask가 원하는 것을 얻고나면 LowTask는 다시 원래의 우선순위로 내려가게 된다.
ㅤ
실습에 사용한 코드는 다음과 같다.
void LowPriorityTask(void *pvParameters)
{
UBaseType_t uxPriority;
for(;;)
{
// 1. Mutex 획득 전 우선순위 확인
uxPriority = uxTaskPriorityGet(NULL);
// 2. Mutex 획득
xSemaphoreTake(xMutex, portMAX_DELAY);
// 3. Mutex 획득 후 우선순위 확인
uxPriority = uxTaskPriorityGet(NULL);
// 4. Critical Section - 긴 작업 시뮬레이션
for(uint32_t i = 0; i < 1000000; i++)
{
lowTaskCounter++;
// 이 구간에서 High Task가 깨어나면
// Priority Inheritance 발생
if(i == 500000)
{
uxPriority = uxTaskPriorityGet(NULL);
// High Task가 대기 중이면 우선순위 = 3 (상속됨)
}
}
// 5. Mutex 해제 전 우선순위 확인
uxPriority = uxTaskPriorityGet(NULL);
// 6. Mutex 해제
xSemaphoreGive(xMutex);
// 7. Mutex 해제 후 우선순위 확인
uxPriority = uxTaskPriorityGet(NULL);
// 여기에 브레이크포인트 5: Mutex 해제 후 우선순위 = 1 (원래대로)
vTaskDelay(pdMS_TO_TICKS(1000));
}
}
void MediumPriorityTask(void *pvParameters)
{
for(;;)
{
// Medium Task는 Mutex를 사용하지 않음
// 단순히 CPU를 소모하는 작업
for(uint32_t i = 0; i < 500000; i++)
{
mediumTaskCounter++;
}
vTaskDelay(pdMS_TO_TICKS(100));
}
}
void HighPriorityTask(void *pvParameters)
{
UBaseType_t uxLowTaskPriority;
// Low Task가 먼저 Mutex를 획득하도록 초기 딜레이
vTaskDelay(pdMS_TO_TICKS(500));
for(;;)
{
// Low Task의 현재 우선순위 확인
uxLowTaskPriority = uxTaskPriorityGet(xLowPriorityTaskHandle);
// Mutex 요청 전 Low Task 우선순위 확인
// Mutex 획득 시도 (Low Task가 보유 중이면 대기)
xSemaphoreTake(xMutex, portMAX_DELAY);
// 이 시점에서 Low Task의 우선순위가 다시 1로 복귀했을 것
uxLowTaskPriority = uxTaskPriorityGet(xLowPriorityTaskHandle);
// Mutex 획득 후 Low Task 우선순위 확인
// Critical Section
for(uint32_t i = 0; i < 100000; i++)
{
highTaskCounter++;
}
xSemaphoreGive(xMutex);
vTaskDelay(pdMS_TO_TICKS(1000));
}
}
이후 코드상으로 어떻게 우선순위 상속을 수행해주는지 체크가 가능하다! 좀 치네
ㅤ
Mutex의 내부 구현
FreeRTOS에서 Mutex를 사용하려면 configUSE_MUTEXES를 1로 Set 해주어야 한다!!

Mutex의 생성
Mutex는 xQueueGenericCreate 를 통해 만들어진다. 즉, Mutex는 Queue 자료구조의 일종이다! 이후에 나오겠지만 Binary Semaphore 역시 Queue를 기반으로 만들어진다. 그래서 유사한 형태를 띄기 때문에 Mutex 부분을 잘 봐두자!
Queue는 이 글 다음 아티클에서 집중적으로 다룰 예정!
ㅤ
#define xSemaphoreCreateMutex() xQueueCreateMutex( queueQUEUE_TYPE_MUTEX )
QueueHandle_t xQueueCreateMutex( const uint8_t ucQueueType )
{
QueueHandle_t xNewQueue;
const UBaseType_t uxMutexLength = ( UBaseType_t ) 1, uxMutexSize = ( UBaseType_t ) 0;
xNewQueue = xQueueGenericCreate( uxMutexLength, uxMutexSize, ucQueueType );
prvInitialiseMutex( ( Queue_t * ) xNewQueue );
return xNewQueue;
}
내부에서 Mutex의 생성은 2단계로 나뉜다.
- Queue 구조 생성
- Mutex로 초기화
ㅤ
우선 Queue 구조의 생성!
Mutex의 생성 코드를 보면 xQueueGenericCreate 함수를 통해 Queue를 생성하고 핸들을 반환하고 있다. 이때 넘겨주는 인자를 살펴보면 다음과 같다.
// 길이는 1
// 아이템의 크기는 0
// 타입은 MUTEX 인 큐 생성
xQueueGenericCreate( 1, 0, queueQUEUE_TYPE_MUTEX );
ㅤ
xQueueGenericCreate 함수에서는 다음의 역할을 수행한다.
- 주어진 Queue의 크기만큼 메모리 공간을 확보
- 새로 만들어진 Queue를 초기화
QueueHandle_t xQueueGenericCreate( const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, const uint8_t ucQueueType )
{
Queue_t *pxNewQueue;
size_t xQueueSizeInBytes;
uint8_t *pucQueueStorage;
// Queue가 사용할 메모리 크기
// 세마포어(뮤텍스)로 쓰는 경우 0이 될 수 있음
xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize );
// 필요한 메모리 동적할당 받기
// Queue 크기 + Queue 관리를 위한 메타데이터 구조체 크기
pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes )
if( pxNewQueue != NULL )
{
// 메모리 시작 | 큐 데이터 | 큐 메타데이터 | 메모리 끝
pucQueueStorage = ( uint8_t * ) pxNewQueue;
pucQueueStorage += sizeof( Queue_t );
prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue );
}
return pxNewQueue;
}
ㅤ
여기에서 이제 prvInitialiseNewQueue 함수 내부에서 만들어줄 이 Queue를 관리하기 위한 메타데이터가 어떻게 구성되어있는지 한 번 살펴보고 넘어가자.
또 한 번 더 언급하지만, Queue 자체는 이후에 좀 더 자세하게 살펴볼 예정이기에, 이번에는 Mutex와 Semaphore로 사용하기 위한 Queue 특성을 위주로 살펴본다.
typedef struct QueueDefinition
{
// Queue에서 사용되는 프로퍼티. 세마/뮤텍스는 사용 안함
int8_t *pcHead;
int8_t *pcWriteTo;
union
{
QueuePointers_t xQueue; // Queue로 쓸 때에만
SemaphoreData_t xSemaphore; // 뮤텍스로 쓸 때에만
} u;
List_t xTasksWaitingToSend; // 세마/뮤텍스는 사용 안함
List_t xTasksWaitingToReceive; // 세마/뮤텍스를 대기중인 Task 목록
// 이게 세마/뮤텍스의 점유를 위한 플래그 역할을 한다.
// 가장 중요함!!
volatile UBaseType_t uxMessagesWaiting;
// 나머지는 사용하지 않음
UBaseType_t uxLength;
UBaseType_t uxItemSize;
volatile int8_t cRxLock;
volatile int8_t cTxLock;
} xQUEUE;
typedef struct SemaphoreData
{
TaskHandle_t xMutexHolder; 초기화 과정이다. // 뮤텍스의 소유권이 어떤 Task에게 있는가
UBaseType_t uxRecursiveCallCount; // Recursive Mutex를 위한 속성
} SemaphoreData_t;
아무래도 이 Queue 구조체 자체가 Queue, Semaphore, Mutex, Recursive Type으로 모두 사용될 수 있다보니, 세마/뮤텍스에서는 불필요한 항목들도 종종 보이는 것 같다. 그치만 이렇게 설계를 한 이유도 분명 있을 터. (코드 용량을 최대한 줄이기 위함이 아니였을까)
ㅤ
이제 이 Queue를 채우는 과정에서 세마/뮤텍스로 사용되는 경우에 대해서만 쏙쏙 뽑아서 확인해보자.
static void prvInitialiseNewQueue( const UBaseType_t uxQueueLength, const UBaseType_t uxItemSize, uint8_t *pucQueueStorage, const uint8_t ucQueueType, Queue_t *pxNewQueue )
{
if( uxItemSize == ( UBaseType_t ) 0 )
{
// Binary Semaphore 또는 Mutex로 쓰는 경우에는
// 데이터 공간이 없긴 한데 NULL을 가리키면 다른 의미가 되어버리니
// Item 의 크기가 0이므로 pcHead가 Queue 스스로를 가리키도록 한다.
pxNewQueue->pcHead = ( int8_t * ) pxNewQueue;
}
// 기본 길이와 크기를 넣어주고 초기화 함수 호출
pxNewQueue->uxLength = uxQueueLength;
pxNewQueue->uxItemSize = uxItemSize;
( void ) xQueueGenericReset( pxNewQueue, pdTRUE );
}
BaseType_t xQueueGenericReset( QueueHandle_t xQueue, BaseType_t xNewQueue )
{
Queue_t * const pxQueue = xQueue;
taskENTER_CRITICAL();
{
**// 현재 자원의 개수를 0개로 설정**
pxQueue->uxMessagesWaiting = ( UBaseType_t ) 0U;
// 만약 새로 생성하는 Queue가 아니라면 (기존 Queue를 비우는 작업)
if( xNewQueue == pdFALSE )
{
// 대기중인 Task를 깨우는 작업을 수행
...
}
// 새로 생성하는 Queue가 맞다면
else
{
// 대기를 위한 List를 빈 리스트로 초기화
vListInitialise( &( pxQueue->xTasksWaitingToSend ) );
vListInitialise( &( pxQueue->xTasksWaitingToReceive ) );
}
}
taskEXIT_CRITICAL();
return pdPASS;
}
ㅤ
이렇게 Queue의 기본적인 내용들을 채워넣었으니, 이제 Mutex로 사용하기 위한 정보를 초기화 해주어야한다.
ㅤ
생성된 Queue에 대해서 prvInitialiseMutex( ( Queue_t * ) xNewQueue ); 함수 호출을 통해 내용들을 채워주고 있다. 이 함수를 한 번 열어보자.
static void prvInitialiseMutex( Queue_t *pxNewQueue )
{
if( pxNewQueue != NULL )
{
// 이 Mutex의 소유자는 아직 NULL
pxNewQueue->u.xSemaphore.xMutexHolder = NULL;
// Queue의 타입을 뮤텍스로 지정
pxNewQueue->uxQueueType = queueQUEUE_IS_MUTEX;
/* In case this is a recursive mutex. */
pxNewQueue->u.xSemaphore.uxRecursiveCallCount = 0;
/* Start with the semaphore in the expected state. */
( void ) xQueueGenericSend( pxNewQueue, NULL, ( TickType_t ) 0U, queueSEND_TO_BACK );
}
}
ㅤ
현재 이 Mutex에는 카운트가 하나도 없는 상태라서 Task가 자원 점유를 시도할 수 없는 상태이다 (uxMessagesWaiting 값이 0). 그래서 Mutex에 카운트를 하나 올려주기 위해서 여기에 Give를 수행해주어야 한다. 이는 Queue를 기준으로 Send 행동에 속하므로, Queue에게 빈 데이터를 하나 Send 하며 Count + 1 을 처리해준다.
BaseType_t xQueueGenericSend( QueueHandle_t xQueue,
const void * const pvItemToQueue,
TickType_t xTicksToWait,
const BaseType_t xCopyPosition )
{
BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired;
TimeOut_t xTimeOut;
Queue_t * const pxQueue = xQueue;
for( ;; )
{
taskENTER_CRITICAL();
{
// 현재 Queue에 공간이 있는가?
// usMessagesWaiting은 0 < uxLength는 1
if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
{
// 카운트를 하나 증가시키고 Mutex 로직 처리하기
xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );
// 처음 생성할 때는 대기하는 Task가 없음
// 대기중인 Task에 대한 로직은 생략
// 초기화 완료하고 PASS 반환
taskEXIT_CRITICAL();
return pdPASS;
}
}
taskEXIT_CRITICAL();
// 나머지는 오류 발생한 경우의 핸들러
}
}
static BaseType_t prvCopyDataToQueue(
Queue_t * const pxQueue,
const void *pvItemToQueue, // NULL
const BaseType_t xPosition
)
{
BaseType_t xReturn = pdFALSE;
// 세마/뮤텍스인 경우
if (pxQueue->uxItemSize == 0)
{
// 만약 뮤텍스라면
if (pxQueue->uxQueueType == queueQUEUE_IS_MUTEX)
{
// 우선순위 복원 (초기화 시에는 xMutexHolder = NULL이므로 아무 동작 안 함)
xReturn = xTaskPriorityDisinherit(pxQueue->u.xSemaphore.xMutexHolder);
// 소유자 제거 (이미 NULL)
pxQueue->u.xSemaphore.xMutexHolder = NULL;
}
// 세마포어인 경우는 생략
}
// 카운트 증가 (0 → 1)
++(pxQueue->uxMessagesWaiting);
return xReturn;
}
ㅤ
Mutex를 생성하기 위해 정말 길게길게 왔다. 그치만 결국 위 과정에서 한 것은 다음의 내용들이다.
- Queue를 하나 생성한다. 이때 Item은 1개, Item 크기는 0으로 지정한다.
- Mutex로 사용하기 위한 기본 정보들을 초기화한다.
- Mutex로 사용하기 위해 카운트를 1 올려준다.
ㅤ
그러면 Queue의 자료구조에는 결과적으로 다음과 같이 데이터들이 초기화되게 된다.

ㅤ
Mutex의 자원 점유와 해제
그렇다면 이제 Mutex를 이용해 자원을 점유하고 해제하는 과정이 어떻게 구현되는지에 대해서 살펴보자. 우선 앞서 살펴봤던 것처럼, 뮤텍스에 대해서 xSemaphoreTake 함수로 Mutex를 Take 하여 다른 Task 에서의 자원에 대한 접근을 막는다. 그리고 모든 사용이 끝나면 xSemaphoreGive 함수로 Mutex를 Give 하여 기다리고 있던 다른 Task에게 사용권을 넘긴다.
// 사용
xSemaphoreTake(xMutex, portMAX_DELAY); // 잠금
// 여기에서 공유 자원 접근
xSemaphoreGive(xMutex); // 잠금 해제
ㅤ
이 내부에서는 어떤 일들이 일어나고 있기에 다른 Task의 접근을 막거나 허용할 수 있는 것인지 살펴보자. 우선 xSemaphoreTake 를 통한 자원 점유 과정이다.
ㅤ
함수는 매크로를 통해 xQueueSemaphoreTake 라는 함수를 호출해주고 있다.
#define xSemaphoreTake( xSemaphore, xBlockTime ) \
xQueueSemaphoreTake( ( xSemaphore ), ( xBlockTime ) )
ㅤ
xQueueSemaphoreTake 라는 함수의 길이가 매우 길어서, 여기에 들어가는 작은 함수들을 먼저 확인하고, 본문을 보려고 한다.
// 현재 실행 Task TCB의 Mutex 개수 += 1
TaskHandle_t pvTaskIncrementMutexHeldCount( void )
{
// 현재 실행중인 Task의 TCB에 가지고 있는 Mutex 개수를 +1 해주고
// TCB의 주소를 반환
if( pxCurrentTCB != NULL )
{
( pxCurrentTCB->uxMutexesHeld )++;
}
return pxCurrentTCB;
}
// 현재 Tick Count와 현재 누적 Tick 횟수를 기록
void vTaskInternalSetTimeOutState( TimeOut_t * const pxTimeOut )
{
/* For internal use only as it does not use a critical section. */
pxTimeOut->xOverflowCount = xNumOfOverflows;
pxTimeOut->xTimeOnEntering = xTickCount;
}
우선순위 상속에 대한 코드는 아래쪽에 따로 챕터를 빼어 작성해두었다.
ㅤ
xQueueSemaphoreTake 함수이다.
혹시 코드가 너무 길다면 숨참고 스크롤을 내리자. 시나리오별로 정리를 해두었다.
BaseType_t xQueueSemaphoreTake( QueueHandle_t xQueue, TickType_t xTicksToWait )
{
BaseType_t xEntryTimeSet = pdFALSE;
TimeOut_t xTimeOut;
Queue_t * const pxQueue = xQueue;
#if( configUSE_MUTEXES == 1 )
BaseType_t xInheritanceOccurred = pdFALSE;
#endif
for( ;; )
{
taskENTER_CRITICAL();
{
// 현재 Mutex의 Count를 가져옴
const UBaseType_t uxSemaphoreCount = pxQueue->uxMessagesWaiting;
// Mutex를 Take할 수 있는 상황인지 확인 (한 개 이상 남아있음)
if( uxSemaphoreCount > ( UBaseType_t ) 0 )
{
// Mutex의 Count를 하나 줄임
pxQueue->uxMessagesWaiting = uxSemaphoreCount - ( UBaseType_t ) 1;
#if ( configUSE_MUTEXES == 1 )
{
if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX )
{
// Mutex의 소유자를 넣어주는 방법이 CurrentTCB를 보는 것이였다! 아하!!
pxQueue->u.xSemaphore.xMutexHolder = pvTaskIncrementMutexHeldCount();
}
}
#endif /* configUSE_MUTEXES */
// 이 부분은 Mutex의 코드가 아님! 간단히 보고 넘어가자.
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
{
queueYIELD_IF_USING_PREEMPTION();
}
}
taskEXIT_CRITICAL();
// 만약 자원을 혼자 사용한다면 이 경로로 들어와서 바로 자원을 할당받고 return 한다
return pdPASS;
}
// 만약 이미 점유가 되어있어서 Mutex를 Take 할 수 없는 상황이라면
else
{
// Take 할 때까지 기다리지 않는다면
if( xTicksToWait == ( TickType_t ) 0 )
{
taskEXIT_CRITICAL();
// 에러를 반환
return errQUEUE_EMPTY;
}
// Take 할 때까지 기다리겠다 + 진입시간을 기록하지 않았따면
else if( xEntryTimeSet == pdFALSE )
{
// 현재 진입 시간을 TimeOut 변수에 기록
vTaskInternalSetTimeOutState( &xTimeOut );
xEntryTimeSet = pdTRUE;
}
}
}
taskEXIT_CRITICAL();
// 잠시 스케줄러를 중단 (Task 전환 방지)
// Queue를 잠그기 (ISR에서 Queue 조작하더라도 값 변경 방지)
vTaskSuspendAll();
prvLockQueue( pxQueue );
// 진입시간(xTimeOut)을 갱신하면서 타임아웃이 만료되었는지 체크
if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE )
{
// 아직 시간이 남았다면 if 문 진입
// 현재 Queue가 비었다면 (Mutex의 Count가 0이면)
if( prvIsQueueEmpty( pxQueue ) != pdFALSE )
{
#if ( configUSE_MUTEXES == 1 )
{
// Mutex라면 **우선순위 상속을 수행**
if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX )
{
taskENTER_CRITICAL();
{
xInheritanceOccurred = xTaskPriorityInherit(
pxQueue->u.xSemaphore.xMutexHolder
);
}
taskEXIT_CRITICAL();
}
}
#endif
// Mutex의 대기 리스트에 Current Task를 추가
// 현재 Task는 DelayedList에 들어간다
vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToReceive ), xTicksToWait );
// Queue 잠금 풀고 스케줄러 재개
// 다른 Ready Task를 실행하기
prvUnlockQueue( pxQueue );
if( xTaskResumeAll() == pdFALSE )
{
portYIELD_WITHIN_API();
}
}
// 현재 Queue가 사용 가능하다면 (Mutex Count가 1)
else
{
// 스케줄링을 재개하고 가장 위 for문으로 돌아가서
// Mutex 점유를 다시 시도하기
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
}
}
// Timeout이 발생했다면
else
{
// 일단 스케줄링을 재개하고
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
// Mutex의 Count가 0이라면
// TimeOut이 발생했는데도 아직도 Mutex 할당을 실패한 경우
if( prvIsQueueEmpty( pxQueue ) != pdFALSE )
{
#if ( configUSE_MUTEXES == 1 )
{
// 만약 우선순위 상속이 발생했었다면
if( xInheritanceOccurred != pdFALSE )
{
taskENTER_CRITICAL();
{
// 현재 대기중인 가장 높은 Task의 우선순위를 확인하고
// Mutex 소유 Task의 우선순위를 복원한다
UBaseType_t uxHighestWaitingPriority;
uxHighestWaitingPriority = prvGetDisinheritPriorityAfterTimeout( pxQueue );
vTaskPriorityDisinheritAfterTimeout( pxQueue->u.xSemaphore.xMutexHolder, uxHighestWaitingPriority );
}
taskEXIT_CRITICAL();
}
}
#endif /* configUSE_MUTEXES */
return errQUEUE_EMPTY;
}
}
}
}
ㅤ
자원 점유를 위한 코드가 꽤나 긴데, 이건 여러가지 시나리오가 뭉쳐있기 때문이다. 크게 4가지 시나리오로 Mutex 점유 과정을 파악할 수 있다.
ㅤ
상황 1) 나 혼자서 자원을 점유하려고 시도 - 즉시 획득 성공
uxMessagesWaiting값이 0이 아닌지 확인 → TRUEuxMessagesWaiting값을 1 감소시키고xMutexHolder를 현재 Task로 지정- 점유 성공
ㅤ
상황 2) 누군가 이미 점유 + 대기 안함
uxMessagesWaiting값이 0이 아닌지 확인 → FALSE- 대기시간
xTicksToWait이 남았는지 확인 → FALSE - 점유 실패
ㅤ
상황 3) 누군가 이미 점유 + 대기 후 점유 성공
uxMessagesWaiting값이 0이 아닌지 확인 → FALSE- 대기시간
xTicksToWait이 남았는지 확인 → TRUE - 진입시간 기록 및 타임아웃 체크 → 만료 X
- 우선순위 상속 수행 (MutexHolder의 우선순위가 현재 Task 보다 낮으면 끌올)
- Mutex의 EventList에 등록하고 Blocked 상태로 전환 + Yield로 다른 Task 실행
- < 다른 Task에서 Mutex를 해제해서 Event가 발생하면 Task가 Ready → 실행>
- 다시
for(;;)진입 uxMessagesWaiting값이 0이 아닌지 확인 → TRUEuxMessagesWaiting값을 1 감소시키고xMutexHolder를 현재 Task로 지정- 점유 성공
ㅤ
상황 4) 누군가 이미 점유 + 대기 후 점유 실패
uxMessagesWaiting값이 0이 아닌지 확인 → FALSE- 대기시간
xTicksToWait이 남았는지 확인 → TRUE - 진입시간 기록 및 타임아웃 체크 → 만료 X
- 우선순위 상속 수행 (MutexHolder의 우선순위가 현재 Task 보다 낮으면 끌올)
- Mutex의 EventList에 등록하고 Blocked 상태로 전환 + Yield로 다른 Task 실행
- < 타이머에 의해 Event가 발생하면 Task가 Ready → 실행 >
- 다시
for(;;)진입 uxMessagesWaiting값이 0이 아닌지 확인 → FALSE- 타임아웃 체크 → 만료
- 여전히
prvIsQueueEmpty== 0 인지 확인 → TRUE - 우선순위 복원 (우선순위 상속 되돌리기)
- 점유 실패
ㅤ
우선순위 상속과 복원에 대한 구현
우선순위 상속을 어떻게 만들었는지가 궁금했는데, 여기 코드에서 생각보다 너무 상세하고 명쾌하게 그 방식이 제시되고 있었다. 우선 아래는 우선순위 상속과 복원에 사용되는 함수 2가지이다.
/*
* Raises the priority of the mutex holder to that of the calling task should
* the mutex holder have a priority less than the calling task.
*/
BaseType_t xTaskPriorityInherit( TaskHandle_t const pxMutexHolder ) PRIVILEGED_FUNCTION;
/*
* Set the priority of a task back to its proper priority in the case that it
* inherited a higher priority while it was holding a semaphore.
*/
BaseType_t xTaskPriorityDisinherit( TaskHandle_t const pxMutexHolder ) PRIVILEGED_FUNCTION;
ㅤ
우선순위의 상속부터 확인해보자.
BaseType_t xTaskPriorityInherit( TaskHandle_t const pxMutexHolder )
{
TCB_t * const pxMutexHolderTCB = pxMutexHolder;
BaseType_t xReturn = pdFALSE;
if( pxMutexHolder != NULL )
{
// 이 Mutex를 들고있는 Task의 우선순위보다 현재 실행중인 Task의 우선순위가 더 높다면
if( pxMutexHolderTCB->uxPriority < pxCurrentTCB->uxPriority )
{
// 혹시 EventList의 value 칸이 다른 목적으로 사용중이 아니라면
if( ( listGET_LIST_ITEM_VALUE( &( pxMutexHolderTCB->xEventListItem ) )
& taskEVENT_LIST_ITEM_VALUE_IN_USE ) == 0UL )
{
// 역순 우선순위를 ItemValue에 담기
// => 우선순위가 높을수록 앞에 오도록 설계
listSET_LIST_ITEM_VALUE( &( pxMutexHolderTCB->xEventListItem ),
( TickType_t ) configMAX_PRIORITIES - ( TickType_t ) pxCurrentTCB->uxPriority );
}
// 만약 상속 대상 Task가 Ready 상태였다면
// Ready List 를 전환해줘야한다
if( listIS_CONTAINED_WITHIN( &( pxReadyTasksLists[ pxMutexHolderTCB->uxPriority ] ),
&( pxMutexHolderTCB->xStateListItem ) ) != pdFALSE )
{
// 기존 Ready 목록에서 대상 Task 제거
// -> 현재 Task의 우선순위로 대상 Task의 우선순위 변경
// -> 새 우선순위의 Ready 목록에 삽입
if( uxListRemove( &( pxMutexHolderTCB->xStateListItem ) ) == ( UBaseType_t ) 0 )
{
portRESET_READY_PRIORITY( pxMutexHolderTCB->uxPriority, uxTopReadyPriority );
}
pxMutexHolderTCB->uxPriority = pxCurrentTCB->uxPriority;
prvAddTaskToReadyList( pxMutexHolderTCB );
}
// 상속 대상 Task가 Ready가 아니면 그냥 우선순위 변경해주면 됨
// 그러면 스케줄러에 의해 새로운 우선순위 Ready 리스트로 잘 들어감
else
{
pxMutexHolderTCB->uxPriority = pxCurrentTCB->uxPriority;
}
/* Inheritance occurred. */
xReturn = pdTRUE;
}
else
{
// 만약 현재 Mutex를 점유중인 Task와 우선순위가 같거나 작은데
// Mutex 점유 Task의 원래 우선순위가 현재 Task보다 낮았다면
// 이것도 우선순위 상속이라고 치고 반환해준다.
if( pxMutexHolderTCB->uxBasePriority < pxCurrentTCB->uxPriority )
{
xReturn = pdTRUE;
}
}
}
return xReturn;
}
ㅤ
길긴 한데, 사실 우선순위를 비교해보고 나보다 낮으면 끌어올린다는 컨셉만 알고있으면 전혀 어려울 게 없는 함수이다!
ㅤ
이번에는 우선순위 복원으로 가보자. 우선순위 상속으로 잠시 높은 우선순위로 올라왔다가, 높은 Task가 원하던 일을 처리(Mutex Give)하고나면 다시 낮은 우선순위로 내려가게된다.
여기에서 내가 헷갈렸던 점은 “우선순위 복원”은 스스로 작업 완료 후 우선순위를 낮추는 과정과, 높은 우선순위 Task가 자신이 끌어올렸던 Task의 우선순위를 다시 낮추는 과정, 총 2가지라는 것이다.
전자는 높은 우선순위 Task가 원하는 것을 수행한 뒤 스스로 자세를 낮추는 것이고, 후자는 높은 우선순위 Task가 결국 원하는 자원을 얻지 못했을 때 직접 낮은 우선순위 Task의 상속을 해제해주는 것이다. 이떄 해당 Task의 우선순위를 낮춰줄 때 대상 Task는 그 사실을 모르고 있다가, 이후 Running 시점이 되어서야 알게된다.
// 스스로 우선순위를 낮추기
BaseType_t xTaskPriorityDisinherit( TaskHandle_t const pxMutexHolder )
{
TCB_t * const pxTCB = pxMutexHolder;
BaseType_t xReturn = pdFALSE;
if( pxMutexHolder != NULL )
{
// 현재 Task가 가지고 있던 Mutex 개수를 하나 감소시킨다
( pxTCB->uxMutexesHeld )--;
// 만약 우선순위 상속이 발생한 상황이라면
if( pxTCB->uxPriority != pxTCB->uxBasePriority )
{
// 이제 아무런 Mutex도 가지고 있지 않다면
if( pxTCB->uxMutexesHeld == ( UBaseType_t ) 0 )
{
// 현재 StateList에서 자신을 제거하고,
// 기존 우선순위로 변경한 뒤
// 다시 적절한 ReadyList로 들어간다.
if( uxListRemove( &( pxTCB->xStateListItem ) ) == ( UBaseType_t ) 0 )
{
taskRESET_READY_PRIORITY( pxTCB->uxPriority );
}
pxTCB->uxPriority = pxTCB->uxBasePriority;
listSET_LIST_ITEM_VALUE( &( pxTCB->xEventListItem ),
( TickType_t ) configMAX_PRIORITIES - ( TickType_t ) pxTCB->uxPriority );
prvAddTaskToReadyList( pxTCB );
// 우선순위의 변경이 있었으니, Context-Switch가 필요함을 알리기 위해 TRUE를 반환한다.
xReturn = pdTRUE;
}
}
}
return xReturn;
}
ㅤ
이번에는 타임아웃으로 높은 Task가 자원을 점유하지 못했을 때, 낮은 Task의 우선순위를 끌어내려주는 함수이다.
// 타임아웃에 의한 다른 Task의 우선순위 낮추기
void vTaskPriorityDisinheritAfterTimeout( TaskHandle_t const pxMutexHolder, UBaseType_t uxHighestPriorityWaitingTask )
{
TCB_t * const pxTCB = pxMutexHolder;
UBaseType_t uxPriorityUsedOnEntry, uxPriorityToUse;
const UBaseType_t uxOnlyOneMutexHeld = ( UBaseType_t ) 1;
if( pxMutexHolder != NULL )
{
// 우선순위를 낮춰야하는데, 중간 우선순위의 Task가 여전히 이 Task를 기다리고 있는 경우
// BasePriority 대신에 중간 우선순위를 넣어준다.
if( pxTCB->uxBasePriority < uxHighestPriorityWaitingTask )
{
uxPriorityToUse = uxHighestPriorityWaitingTask;
}
// 우선순위를 높여줄 필요가 없는 상황이라면 BasePriority를 넣어주기
else
{
uxPriorityToUse = pxTCB->uxBasePriority;
}
// 우선순위의 변경이 필요한 상황이라면
if( pxTCB->uxPriority != uxPriorityToUse )
{
// 오직 하나의 Mutex만 들고있을 때 우선순위를 복원한다.
// 혹시 다른 Mutex에 의해서 우선순위가 높아진 상황일 수도 있으니깐
if( pxTCB->uxMutexesHeld == uxOnlyOneMutexHeld )
{
// 우선순위 변경을 처리해준다.
uxPriorityUsedOnEntry = pxTCB->uxPriority;
pxTCB->uxPriority = uxPriorityToUse;
if( ( listGET_LIST_ITEM_VALUE( &( pxTCB->xEventListItem ) ) & taskEVENT_LIST_ITEM_VALUE_IN_USE ) == 0UL )
{
listSET_LIST_ITEM_VALUE( &( pxTCB->xEventListItem ),
( TickType_t ) configMAX_PRIORITIES - ( TickType_t ) uxPriorityToUse );
}
if( listIS_CONTAINED_WITHIN( &( pxReadyTasksLists[ uxPriorityUsedOnEntry ] ),
&( pxTCB->xStateListItem ) ) != pdFALSE )
{
if( uxListRemove( &( pxTCB->xStateListItem ) ) == ( UBaseType_t ) 0 )
{
portRESET_READY_PRIORITY( pxTCB->uxPriority, uxTopReadyPriority );
}
prvAddTaskToReadyList( pxTCB );
}
}
}
}
}
위 함수에서 인자로 들어가는 uxHighestPriorityWaitingTask 는 내가 더 이상 Mutex를 점유하는 Task를 필요로 하지 않을 때, 그 다음으로 해당 Task의 Mutex를 기다리는 Task의 우선순위를 말한다. 즉, 낮은 우선순위의 Mutex를 중간 Task와 높은 Task가 모두 기다리고 있다고 할 때, 우선순위 상속에 의해 Mutex Task는 높은 우선순위를 가진다.
ㅤ
Mutex는 언제 사용하는가
이전 글에서 다뤘던 Critical Section, Scheduler Suspend와 비교해서 Mutex는 어떤 장점을 가질까?
ㅤ
잠깐 이 두 방식을 정리해보면 다음의 문제점이 있다.
Critical Section의 문제
- 인터럽트 차단 → 모든 인터럽트에 대한 ISR이 지연된다
- 시간 제약 → 길면 안된다. $10\mu s$ 정도. 그런데 UART 같은 통신은 ms 까지도 걸린다.
- 인터럽트 처리가 늦어져서 Hard Real-Time 을 지키기 어렵다.
- ㅤ
Scheduler Suspend의 문제
- 얘는 인터럽트는 허용하기에 ISR은 정상적으로 동작한다.
- 그러나, 스케줄링을 막아 다른 모든 Task가 수십 ms 동안 실행되지 않는다.
- 높은 우선순위를 가지는 Task도 깨어나지 못하고 기다려야 한다.
- 1ms 정도의 수행시간까지만 사용할 수 있다.
ㅤ
Mutex를 사용하면 어떤 점들을 해결할 수 있냐면!
- 인터럽트를 막지 않기 때문에, ISR이 정상적으로 동작한다.
- Mutex를 기다리는 동안 다른 Task가 CPU를 사용할 수 있다.
- 시간 제약이 없다.
- 우선순위 상속을 통해 우선순위 역전의 발생을 방지한다.
ㅤ
SemaphoreHandle_t xMutex = xSemaphoreCreateMutex();
void UART_SendString(const char *str)
{
// Mutex 획득 (대기 방식)
xSemaphoreTake(xMutex, portMAX_DELAY);
// 긴 작업 가능 (인터럽트 허용, 다른 태스크도 실행)
HAL_UART_Transmit(&huart1, (uint8_t*)str, strlen(str), HAL_MAX_DELAY);
xSemaphoreGive(xMutex);
}
ㅤ
그래서 Mutex는 다음의 상황에 적용하기 좋다.
- ms 단위의 긴 시간동안 자원을 점유하는 경우.
- 복잡한 자료구조에 대해 값 변경 시 보호해야하는 경우 (작업이 길어질 수 있는데, 이러면 다른 Task 먼저 처리 가능해서 유용)
- 공유 HW 자원에 대한 관리 (SPI, I2C, UART 같은 HW 접근을 제어할 때 유용)
- 재진입이 불가능한 라이브러리 함수를 호출하는 경우 (re-entrancy 가 나쁜 함수 … printf 같은거)
- 우선순위 역전의 방지가 필요한 경우
ㅤ
앞서 코드에서 봤다시피, 타입이 Mutex인 경우에만 우선순위 역전을 핸들링하는 코드가 호출되었다. 이는 “소유권”의 개념이 Mutex에만 있기 때문이다. 다음에 정리하겠지만, Binary Semaphore로는 이 우선순위 역전을 막아낼 수 없다.
ㅤ
그렇다면 Mutex를 사용하면 구린 상황은 어떤게 있을까!
- 작업이 $\mu s$단위로 너무 짧다면 오버헤드가 크다. 이런 경우에는 Critical Section을 쓰자.
- ISR에서는 Mutex를 사용하면 안된다.
ㅤ
| 특성 | Critical Section | Scheduler Suspend | Mutex |
|---|---|---|---|
| 작업 시간 | < 10μs | 10μs ~ 1ms | > 1ms (제한 없음) |
| 인터럽트 | 차단 ❌ | 허용 ✅ | 허용 ✅ |
| 태스크 전환 | 차단 ❌ | 차단 ❌ | 허용 ✅ |
| 다른 태스크 실행 | 불가 ❌ | 불가 ❌ | 가능 ✅ |
| ISR 사용 | 가능 ✅ | 불가 ❌ | 불가 ❌ |
| 우선순위 상속 | 없음 ❌ | 없음 ❌ | 있음 ✅ |
| 블로킹 (대기) | 불가 ❌ | 불가 ❌ | 가능 ✅ |
| 오버헤드 | 최소 (~20 사이클) | 낮음 (~100 사이클) | 높음 (~300 사이클) |
| 인터럽트 Latency | 증가 ❌ | 유지 ✅ | 유지 ✅ |
| 태스크 응답성 | 영향 적음 | 저하 ⚠️ | 유지 ✅ |
| 우선순위 역전 | 발생 가능 ⚠️ | 발생 가능 ⚠️ | 방지 ✅ |
| 권장 용도 | 짧은 변수 조작 | 중간 데이터 복사 | 긴 작업/공유 자원 |
생각보다 내용이 너무너무 길어져서 Mutex 만 다뤘다. 아마 다음에 다룰 Semaphore 등은 Mutex와 많은 부분 코드가 겹쳐서 코드량이 상당히 줄어들 듯!
'Embedded System > FreeRTOS' 카테고리의 다른 글
| [FreeRTOS] 자원 관리 (1) - Critical Section과 Scheduler Suspend (0) | 2025.11.21 |
|---|---|
| [FreeRTOS] 커널의 시작과 첫번째 Task의 실행 (0) | 2025.11.21 |
| [FreeRTOS] Task의 스케줄링 (1) | 2025.11.19 |
| [FreeRTOS] RTOS의 Context Switch 과정 파헤치기 (0) | 2025.11.15 |
| [FreeRTOS] RTOS의 System Exception (0) | 2025.11.15 |