티스토리 뷰

개발/C,C++

간단한 큐 만들기

-=HaeJuK=- 2024. 11. 20. 15:53
728x90

주요 기능 설명

  1. 환형 큐:
    • m_vQueue 벡터를 사용하여 환형 버퍼를 관리합니다.
    • m_nHead와 m_nTail을 사용하여 큐의 시작과 끝을 관리합니다.
  2. 멀티스레드 동작:
    • std::mutex와 std::condition_variable을 사용하여 스레드 안전성을 보장합니다.
    • Push와 Pop은 동기화되어 있으며, 상태에 따라 조건 변수를 사용하여 대기 상태를 처리합니다.
  3. 큐 크기 확장:
    • Push 메서드는 큐가 꽉 찬 경우 Resize를 호출하여 크기를 두 배로 확장합니다.
  4. 큐 크기 축소:
    • Pop 메서드와 MonitorQueue 메서드는 큐 크기가 일정 비율 이하로 줄어들면 크기를 절반으로 축소합니다.
    • 크기 축소는 최소 용량(m_nMinCapacity) 이하로는 진행되지 않습니다.
  5. 모니터링 쓰레드:
    • MonitorQueue 메서드는 주기적으로 큐 상태를 점검하고 크기를 축소합니다.
    • StartMonitoring과 StopMonitoring을 통해 모니터링을 시작하고 종료할 수 있습니다.
  6. 크로스 플랫폼:
    • POSIX 및 Windows에서 모두 사용 가능한 표준 C++ 라이브러리를 활용합니다.
/******************************************************************************
* File Name: circular_queue.h
* Description: Thread-safe resizable circular queue implementation.
* Author: HaeJuK
* Date: 2024-11-20
******************************************************************************/

#ifndef __CIRCULAR_QUEUE_H__
#define __CIRCULAR_QUEUE_H__

#include <vector>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <thread>
#include <atomic>

// Class declaration
template<typename T>
class cCircularQueue
{
public:
    explicit cCircularQueue( size_t _nCapacity )
        : m_vQueue( _nCapacity ), m_nCapacity( _nCapacity ), m_nHead( 0 ), m_nTail( 0 ), m_nSize( 0 )
    {}

    // Push an element into the queue (thread-safe)
    bool Push( const T& _tValue )
    {
        std::unique_lock<std::mutex> lock( m_mtxQueue );

        if( m_nSize >= m_nCapacity )
        {
            if( false == Resize( m_nCapacity * 2 ) )
            {
                return false; // Resize failed
            }
        }

        m_vQueue[m_nTail] = _tValue;
        m_nTail = ( m_nTail + 1 ) % m_nCapacity;
        ++m_nSize;

        m_cvNotEmpty.notify_one();
        return true;
    }

    // Pop an element from the queue (thread-safe)
    bool Pop( T& _tValue )
    {
        std::unique_lock<std::mutex> lock( m_mtxQueue );

        m_cvNotEmpty.wait( lock, [this]()
        {
            return m_nSize > 0;
        });

        if( m_nSize == 0 )
        {
            return false; // Should not occur due to condition variable
        }

        _tValue = m_vQueue[m_nHead];
        m_nHead = ( m_nHead + 1 ) % m_nCapacity;
        --m_nSize;

        if( m_nSize < m_nCapacity / 4 && m_nCapacity > m_nMinCapacity )
        {
            Resize( m_nCapacity / 2 );
        }

        return true;
    }

    // Periodically shrink the queue based on idle state
    void MonitorQueue()
    {
        while( m_bRunning )
        {
            std::this_thread::sleep_for( std::chrono::seconds( m_nMonitorInterval ) );

            std::unique_lock<std::mutex> lock( m_mtxQueue );
            if( m_nSize < m_nCapacity / 4 && m_nCapacity > m_nMinCapacity )
            {
                Resize( m_nCapacity / 2 );
            }
        }
    }

    // Start the queue monitoring thread
    void StartMonitoring( size_t _nInterval = 5 )
    {
        m_nMonitorInterval = _nInterval;
        m_bRunning = true;
        m_thMonitor = std::thread( &nxcCircularQueue::MonitorQueue, this );
    }

    // Stop the queue monitoring thread
    void StopMonitoring()
    {
        m_bRunning = false;
        if( m_thMonitor.joinable() )
        {
            m_thMonitor.join();
        }
    }

    ~cCircularQueue()
    {
        StopMonitoring();
    }

private:
    bool Resize( size_t _nNewCapacity )
    {
        if( _nNewCapacity < m_nSize )
        {
            return false; // Cannot shrink below current size
        }

        std::vector<T> vNewQueue( _nNewCapacity );

        for( size_t nIndex = 0; nIndex < m_nSize; ++nIndex )
        {
            vNewQueue[nIndex] = m_vQueue[( m_nHead + nIndex ) % m_nCapacity];
        }

        m_vQueue.swap( vNewQueue );
        m_nHead = 0;
        m_nTail = m_nSize;
        m_nCapacity = _nNewCapacity;

        return true;
    }

    std::vector<T> m_vQueue;
    size_t m_nCapacity;
    size_t m_nMinCapacity = 16; // Minimum capacity
    size_t m_nHead;
    size_t m_nTail;
    size_t m_nSize;

    std::mutex m_mtxQueue;
    std::condition_variable m_cvNotEmpty;

    std::atomic<bool> m_bRunning{ false };
    size_t m_nMonitorInterval{ 5 };
    std::thread m_thMonitor;
};

#endif // !__CIRCULAR_QUEUE_H__
#include "circular_queue.h"
#include <iostream>
#include <thread>

int main()
{
    cCircularQueue<int> queue( 16 );

    queue.StartMonitoring( 10 ); // 10초마다 큐 상태 확인

    std::thread producer([&]()
    {
        for( int i = 0; i < 100; ++i )
        {
            queue.Push( i );
            std::this_thread::sleep_for( std::chrono::milliseconds( 50 ) );
        }
    });

    std::thread consumer([&]()
    {
        int nValue = 0;
        for( int i = 0; i < 100; ++i )
        {
            if( queue.Pop( nValue ) )
            {
                std::cout << "Popped: " << nValue << std::endl;
            }
            std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
        }
    });

    producer.join();
    consumer.join();

    queue.StopMonitoring();
    return 0;
}

 

728x90
반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함
반응형