티스토리 뷰
728x90
주요 기능 설명
- 환형 큐:
- m_vQueue 벡터를 사용하여 환형 버퍼를 관리합니다.
- m_nHead와 m_nTail을 사용하여 큐의 시작과 끝을 관리합니다.
- 멀티스레드 동작:
- std::mutex와 std::condition_variable을 사용하여 스레드 안전성을 보장합니다.
- Push와 Pop은 동기화되어 있으며, 상태에 따라 조건 변수를 사용하여 대기 상태를 처리합니다.
- 큐 크기 확장:
- Push 메서드는 큐가 꽉 찬 경우 Resize를 호출하여 크기를 두 배로 확장합니다.
- 큐 크기 축소:
- Pop 메서드와 MonitorQueue 메서드는 큐 크기가 일정 비율 이하로 줄어들면 크기를 절반으로 축소합니다.
- 크기 축소는 최소 용량(m_nMinCapacity) 이하로는 진행되지 않습니다.
- 모니터링 쓰레드:
- MonitorQueue 메서드는 주기적으로 큐 상태를 점검하고 크기를 축소합니다.
- StartMonitoring과 StopMonitoring을 통해 모니터링을 시작하고 종료할 수 있습니다.
- 크로스 플랫폼:
- POSIX 및 Windows에서 모두 사용 가능한 표준 C++ 라이브러리를 활용합니다.
/******************************************************************************
* File Name: circular_queue.h
* Description: Thread-safe resizable circular queue implementation.
* Author: HaeJuK
* Date: 2024-11-20
******************************************************************************/
#ifndef __CIRCULAR_QUEUE_H__
#define __CIRCULAR_QUEUE_H__
#include <vector>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <thread>
#include <atomic>
// Class declaration
template<typename T>
class cCircularQueue
{
public:
explicit cCircularQueue( size_t _nCapacity )
: m_vQueue( _nCapacity ), m_nCapacity( _nCapacity ), m_nHead( 0 ), m_nTail( 0 ), m_nSize( 0 )
{}
// Push an element into the queue (thread-safe)
bool Push( const T& _tValue )
{
std::unique_lock<std::mutex> lock( m_mtxQueue );
if( m_nSize >= m_nCapacity )
{
if( false == Resize( m_nCapacity * 2 ) )
{
return false; // Resize failed
}
}
m_vQueue[m_nTail] = _tValue;
m_nTail = ( m_nTail + 1 ) % m_nCapacity;
++m_nSize;
m_cvNotEmpty.notify_one();
return true;
}
// Pop an element from the queue (thread-safe)
bool Pop( T& _tValue )
{
std::unique_lock<std::mutex> lock( m_mtxQueue );
m_cvNotEmpty.wait( lock, [this]()
{
return m_nSize > 0;
});
if( m_nSize == 0 )
{
return false; // Should not occur due to condition variable
}
_tValue = m_vQueue[m_nHead];
m_nHead = ( m_nHead + 1 ) % m_nCapacity;
--m_nSize;
if( m_nSize < m_nCapacity / 4 && m_nCapacity > m_nMinCapacity )
{
Resize( m_nCapacity / 2 );
}
return true;
}
// Periodically shrink the queue based on idle state
void MonitorQueue()
{
while( m_bRunning )
{
std::this_thread::sleep_for( std::chrono::seconds( m_nMonitorInterval ) );
std::unique_lock<std::mutex> lock( m_mtxQueue );
if( m_nSize < m_nCapacity / 4 && m_nCapacity > m_nMinCapacity )
{
Resize( m_nCapacity / 2 );
}
}
}
// Start the queue monitoring thread
void StartMonitoring( size_t _nInterval = 5 )
{
m_nMonitorInterval = _nInterval;
m_bRunning = true;
m_thMonitor = std::thread( &nxcCircularQueue::MonitorQueue, this );
}
// Stop the queue monitoring thread
void StopMonitoring()
{
m_bRunning = false;
if( m_thMonitor.joinable() )
{
m_thMonitor.join();
}
}
~cCircularQueue()
{
StopMonitoring();
}
private:
bool Resize( size_t _nNewCapacity )
{
if( _nNewCapacity < m_nSize )
{
return false; // Cannot shrink below current size
}
std::vector<T> vNewQueue( _nNewCapacity );
for( size_t nIndex = 0; nIndex < m_nSize; ++nIndex )
{
vNewQueue[nIndex] = m_vQueue[( m_nHead + nIndex ) % m_nCapacity];
}
m_vQueue.swap( vNewQueue );
m_nHead = 0;
m_nTail = m_nSize;
m_nCapacity = _nNewCapacity;
return true;
}
std::vector<T> m_vQueue;
size_t m_nCapacity;
size_t m_nMinCapacity = 16; // Minimum capacity
size_t m_nHead;
size_t m_nTail;
size_t m_nSize;
std::mutex m_mtxQueue;
std::condition_variable m_cvNotEmpty;
std::atomic<bool> m_bRunning{ false };
size_t m_nMonitorInterval{ 5 };
std::thread m_thMonitor;
};
#endif // !__CIRCULAR_QUEUE_H__
#include "circular_queue.h"
#include <iostream>
#include <thread>
int main()
{
cCircularQueue<int> queue( 16 );
queue.StartMonitoring( 10 ); // 10초마다 큐 상태 확인
std::thread producer([&]()
{
for( int i = 0; i < 100; ++i )
{
queue.Push( i );
std::this_thread::sleep_for( std::chrono::milliseconds( 50 ) );
}
});
std::thread consumer([&]()
{
int nValue = 0;
for( int i = 0; i < 100; ++i )
{
if( queue.Pop( nValue ) )
{
std::cout << "Popped: " << nValue << std::endl;
}
std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
}
});
producer.join();
consumer.join();
queue.StopMonitoring();
return 0;
}
728x90
반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- C# 고급 기술
- 블루버블다이브팀
- 블루버블다이빙팀
- 울릉도
- C
- 제주도
- 리눅스
- 암호화
- 외돌개
- 서귀포블루버블
- C#
- PowerShell
- ip
- C++
- CMake
- DLL
- 성산블루버블
- 현포다이브
- Build
- 스쿠버다이빙
- 패턴
- Linux
- Thread
- 스쿠버 다이빙
- 블루버블
- 서귀포
- Windows
- 윈도우
- C#.NET
- OpenSource
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
글 보관함
반응형