Fixing problems with ring buffer behavior, still output is not good.

parent d396a444
......@@ -41,10 +41,26 @@ public:
CITANetAudioStream( int iChannels, double dSamplingRate, int iBufferSize, int iRingBufferCapacity = 2048 );
virtual ~CITANetAudioStream();
enum StreamingStatus
{
INVALID = -1,
STOPPED,
CONNECTED,
STREAMING,
BUFFER_UNDERRUN,
};
bool Connect( const std::string& sAddress, int iPort );
bool GetIsConnected() const;
//! Returns (static) size of ring buffer
int GetRingBufferSize() const;
//! Returns true if ring buffer is full
bool GetIsRingBufferFull() const;
//! Returns true if ring buffer is empty
bool GetIsRingBufferEmpty() const;
unsigned int GetBlocklength() const;
unsigned int GetNumberOfChannels() const;
......@@ -62,7 +78,8 @@ protected:
*/
int Transmit( const ITASampleFrame& sfNewSamples, int iNumSamples );
int GetRingbufferFreeSamples();
//! Returns free samples between write and read cursor
int GetRingBufferFreeSamples() const;
private:
CITANetAudioStreamingClient* m_pNetAudioStreamingClient;
......@@ -71,9 +88,12 @@ private:
ITASampleFrame m_sfOutputStreamBuffer;
int m_iReadCursor; //!< Cursor where samples will be consumed from ring buffer on next block
int m_iWriteCursor; //!< Cursor where samples will feeded into ring buffer from net audio producer
int m_iWriteCursor; //!< Cursor where samples will be fed into ring buffer from net audio producer (always ahead)
bool m_bRingBufferFull; //!< Indicator if ring buffer is full (and read cursor equals write cursor)
ITASampleFrame m_sfRingBuffer; //!< Buffer incoming data
int m_iStreamingStatus; //!< Current streaming status
friend class CITANetAudioStreamingClient;
};
......
......@@ -7,11 +7,14 @@
// STL
#include <cmath>
#include <iostream>
CITANetAudioStream::CITANetAudioStream( int iChannels, double dSamplingRate, int iBufferSize, int iRingBufferCapacity )
: m_sfOutputStreamBuffer( iChannels, iBufferSize, true )
, m_dSampleRate( dSamplingRate )
, m_sfRingBuffer( iChannels, iRingBufferCapacity, true )
, m_bRingBufferFull( false )
, m_iStreamingStatus( INVALID )
{
if( iBufferSize > iRingBufferCapacity )
......@@ -19,7 +22,9 @@ CITANetAudioStream::CITANetAudioStream( int iChannels, double dSamplingRate, int
m_pNetAudioStreamingClient = new CITANetAudioStreamingClient( this );
m_iReadCursor = 0;
m_iWriteCursor = 0;
m_iWriteCursor = 0; // always ahead, i.e. iWriteCursor >= iReadCursor if unwrapped
m_iStreamingStatus = STOPPED;
}
CITANetAudioStream::~CITANetAudioStream()
......@@ -29,7 +34,10 @@ CITANetAudioStream::~CITANetAudioStream()
bool CITANetAudioStream::Connect( const std::string& sAddress, int iPort )
{
return m_pNetAudioStreamingClient->Connect( sAddress, iPort );
bool bConnected = m_pNetAudioStreamingClient->Connect( sAddress, iPort );
if( bConnected )
m_iStreamingStatus = CONNECTED;
return bConnected;
}
bool CITANetAudioStream::GetIsConnected() const
......@@ -39,47 +47,85 @@ bool CITANetAudioStream::GetIsConnected() const
const float* CITANetAudioStream::GetBlockPointer( unsigned int uiChannel, const ITAStreamInfo* )
{
m_sfOutputStreamBuffer[uiChannel].Zero();
if (this->GetIsConnected())
ITASampleBuffer& sbOutputStreamBuffer( m_sfOutputStreamBuffer[ uiChannel ] );
sbOutputStreamBuffer.Zero();
const float* pfBlockPointer = sbOutputStreamBuffer.GetData();
if( !GetIsConnected() )
return pfBlockPointer;
m_iStreamingStatus = STREAMING;
int iCurrentWriteCursor = m_iWriteCursor; // local copy
if( iCurrentWriteCursor <= m_iReadCursor && GetRingBufferFreeSamples() > 0 ) // Wrap around?
iCurrentWriteCursor += GetRingBufferSize(); // Write pointer always ahead, so unwrap first
int iReadableSamples = iCurrentWriteCursor - m_iReadCursor;
if( iReadableSamples > int( GetBlocklength() ) ) // samples can be cyclic-copied safely from ring buffer
m_sfRingBuffer[ uiChannel ].cyclic_read( sbOutputStreamBuffer.GetData(), sbOutputStreamBuffer.GetLength(), m_iReadCursor );
if( iReadableSamples > 0 && iReadableSamples < int( GetBlocklength() ) )
{
// todo restlichen kopieren und dann rein und raus faden
int iCurrentWritePointer = m_iWriteCursor;
m_sfRingBuffer[uiChannel].cyclic_read(m_sfOutputStreamBuffer[uiChannel].GetData(),
m_sfOutputStreamBuffer.GetLength(), m_iReadCursor);
// @todo: fade with ITAFade
std::cerr << "Should fade right now, but skipping samples." << std::endl;
}
return m_sfOutputStreamBuffer[ uiChannel ].GetData();
return pfBlockPointer;
}
void CITANetAudioStream::IncrementBlockPointer()
{
// Increment read cursor by one audio block and wrap around if exceeding ring buffer
m_iReadCursor = ( m_iReadCursor + m_sfOutputStreamBuffer.GetLength() ) % m_sfRingBuffer.GetLength();
m_pNetAudioStreamingClient->TriggerBlockIncrement( );
if( GetRingBufferFreeSamples() >= int( GetBlocklength() ) )
{
m_iReadCursor = ( m_iReadCursor + m_sfOutputStreamBuffer.GetLength() ) % m_sfRingBuffer.GetLength();
m_iStreamingStatus = STREAMING;
}
else
{
m_iStreamingStatus = BUFFER_UNDERRUN;
m_iReadCursor = m_iWriteCursor;
}
m_pNetAudioStreamingClient->TriggerBlockIncrement();
}
int CITANetAudioStream::Transmit( const ITASampleFrame& sfNewSamples, int iNumSamples )
{
// Take local copies (concurrent access)
int iCurrentReadCursor = m_iReadCursor;
int iCurrentWriteCursor = m_iWriteCursor;
// write samples in the buffer
m_sfRingBuffer.cyclic_write(sfNewSamples, iNumSamples,
iCurrentReadCursor, m_iWriteCursor);
if( iCurrentWriteCursor < iCurrentReadCursor )
iCurrentWriteCursor += GetRingBufferSize(); // Unwrap, because write cursor always ahead
// set write curser
m_iWriteCursor = ( m_iWriteCursor + iNumSamples ) % m_sfRingBuffer.GetLength( );
if( GetRingBufferFreeSamples() < iNumSamples )
{
// @todo: only partly write
std::cerr << "BUFFER_OVERRUN! Would partly write samples because ring buffer will be full then." << std::endl;
m_iWriteCursor = m_iReadCursor;
}
else
{
// write samples into ring buffer
m_sfRingBuffer.cyclic_write( sfNewSamples, iNumSamples, 0, iCurrentWriteCursor );
// return free BufferSize
if (iCurrentReadCursor > m_iWriteCursor) {
return m_iWriteCursor - iCurrentReadCursor;
// set write curser
m_iWriteCursor += ( m_iWriteCursor + iNumSamples ) % GetRingBufferSize();
}
else {
return m_sfRingBuffer.GetLength() - m_iWriteCursor + iCurrentReadCursor;
}
return GetRingBufferFreeSamples();
}
int CITANetAudioStream::GetRingbufferFreeSamples()
int CITANetAudioStream::GetRingBufferFreeSamples() const
{
int iFreeSamples = ( m_iWriteCursor - m_iReadCursor + GetRingBufferSize() ) % GetRingBufferSize();
if( m_bRingBufferFull )
return 0;
int iFreeSamples = GetRingBufferSize() - ( ( m_iWriteCursor - m_iReadCursor + GetRingBufferSize() ) % GetRingBufferSize() );
assert( iFreeSamples > 0 );
return iFreeSamples;
}
......@@ -88,6 +134,16 @@ int CITANetAudioStream::GetRingBufferSize() const
return m_sfRingBuffer.GetLength();
}
bool CITANetAudioStream::GetIsRingBufferFull() const
{
return m_bRingBufferFull;
}
bool CITANetAudioStream::GetIsRingBufferEmpty() const
{
return ( !m_bRingBufferFull && m_iReadCursor == m_iWriteCursor );
}
unsigned int CITANetAudioStream::GetBlocklength() const
{
return ( unsigned int ) m_sfOutputStreamBuffer.GetLength();
......
......@@ -80,7 +80,7 @@ bool CITANetAudioStreamingClient::LoopBody()
m_pMessage->ResetMessage();
m_pMessage->SetConnection( m_pConnection );
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_WAITING_FOR_SAMPLES );
m_pMessage->WriteInt( m_pStream->GetRingbufferFreeSamples() );
m_pMessage->WriteInt( m_pStream->GetRingBufferFreeSamples() );
m_pMessage->WriteMessage();
// Wait for answer of server
......@@ -91,23 +91,22 @@ bool CITANetAudioStreamingClient::LoopBody()
case CITANetAudioProtocol::NP_INVALID:
// Something went wrong
std::cerr << "Received invalid message type" << std::endl;
break;
case CITANetAudioProtocol::NP_SERVER_WAITING_FOR_TRIGGER:
// Wait until block increment is triggered by audio context (more free samples in ring buffer)
std::cout << "Will wait for block increment" << std::endl;
m_oBlockIncrementEvent.WaitForEvent( true );
break;
case CITANetAudioProtocol::NP_SERVER_SEND_SAMPLES:
// Receive samples from net message and forward them to the stream ring buffer
m_pMessage->ReadSampleFrame( &m_sfReceivingBuffer );
if ( m_pStream->GetRingbufferFreeSamples( ) >= m_sfReceivingBuffer.GetLength( ) )
{
std::cout << "Receiving " << m_sfReceivingBuffer.GetLength() << " samples from streaming server" << std::endl;
if ( m_pStream->GetRingBufferFreeSamples( ) >= m_sfReceivingBuffer.GetLength( ) )
m_pStream->Transmit( m_sfReceivingBuffer, m_sfReceivingBuffer.GetLength( ) );
}
//else
// Fehler
......
......@@ -122,13 +122,12 @@ bool CITANetAudioStreamingServer::LoopBody()
m_pMessage->SetConnection( m_pConnection );
m_pMessage->ReadMessage(); // blocking
switch( m_pMessage->GetMessageType() )
int iMsgType = m_pMessage->GetMessageType();
switch( iMsgType )
{
case CITANetAudioProtocol::NP_CLIENT_WAITING_FOR_SAMPLES:
{
int iFreeSamples = m_pMessage->ReadInt();
// std::cout << "Freie Samples: " << iFreeSamples << std::endl;
// std::cout << "Laenge InputStram: " << m_pInputStream->GetBlocklength() << std::endl;
if( iFreeSamples >= m_pInputStream->GetBlocklength() )
{
// Send Samples
......@@ -137,30 +136,32 @@ bool CITANetAudioStreamingServer::LoopBody()
ITAStreamInfo oStreamInfo;
oStreamInfo.nSamples = m_sfTempTransmitBuffer.GetLength();
const float* pfData = m_pInputStream->GetBlockPointer( i, &oStreamInfo );
//std::cout << *pfData << std::endl;
if (pfData != 0)
m_sfTempTransmitBuffer[i].write(pfData, m_sfTempTransmitBuffer.GetLength());
if( pfData != 0 )
m_sfTempTransmitBuffer[ i ].write( pfData, m_sfTempTransmitBuffer.GetLength() );
}
m_pInputStream->IncrementBlockPointer();
m_pMessage->SetAnswerType( CITANetAudioProtocol::NP_SERVER_SEND_SAMPLES );
m_pMessage->WriteSampleFrame( &m_sfTempTransmitBuffer );
m_pMessage->WriteAnswer();
m_pInputStream->IncrementBlockPointer();
std::cout << "Transmitted " << m_pInputStream->GetBlocklength() << " samples, because there where " << iFreeSamples << " free samples on client side" << std::endl;
}
else
{
std::cout << "Could not transmitt, because there where only " << iFreeSamples << " free samples on client side" << std::endl;
// Waiting for Trigger
m_pMessage->SetAnswerType( CITANetAudioProtocol::NP_SERVER_WAITING_FOR_TRIGGER );
m_pMessage->WriteAnswer();
break;
}
float fTimeOut = m_pInputStream->GetBlocklength() / m_pInputStream->GetSampleRate();
//VistaTimeUtils::Sleep( (int) ( 1 * 100 ) );
break;
}
case CITANetAudioProtocol::NP_CLIENT_CLOSE:
{
m_pMessage->WriteAnswer();
m_pConnection = NULL;
StopGently( true );
......@@ -168,6 +169,12 @@ bool CITANetAudioStreamingServer::LoopBody()
return false;
}
default:
{
std::cout << "Unkown protocol type: " << iMsgType << std::endl;
break;
}
}
return true;
}
......
......@@ -17,7 +17,7 @@ static int g_iBufferSize = 256;
int main( int , char** )
{
CITANetAudioStream oNetAudioStream( 1, g_dSampleRate, g_iBufferSize, 100 * g_iBufferSize );
CITANetAudioStream oNetAudioStream( 1, g_dSampleRate, g_iBufferSize, 3 * g_iBufferSize );
ITAStreamProbe oProbe( &oNetAudioStream, "output.wav" );
ITAStreamMultiplier1N oMultiplier( &oProbe, 2 );
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment