Commit 4e81b87c authored by Dipl.-Ing. Jonas Stienen's avatar Dipl.-Ing. Jonas Stienen

Merge branch 'ba_2016_heimes' of https://git.rwth-aachen.de/ita/ITADataSources into ba_2016_heimes

parents 44bb65cd f0ac824b
......@@ -65,6 +65,7 @@ public:
double dSampleRate;
int iBlockSize;
int iRingBufferSize;
int iTargetSampleLatency;
inline StreamingParameters()
{
......@@ -72,11 +73,16 @@ public:
dSampleRate = 0.0f;
iBlockSize = 0;
iRingBufferSize = 0;
iTargetSampleLatency = 0;
};
inline bool operator==( const StreamingParameters& rhs )
{
if ( ( iChannels == rhs.iChannels ) && ( dSampleRate == rhs.dSampleRate ) && ( iBlockSize == rhs.iBlockSize ) && ( iRingBufferSize == rhs.iRingBufferSize ) )
if ( ( iChannels == rhs.iChannels )
&& ( dSampleRate == rhs.dSampleRate )
&& (iBlockSize == rhs.iBlockSize)
&& (iRingBufferSize == rhs.iRingBufferSize)
&& (iTargetSampleLatency == rhs.iTargetSampleLatency))
return true;
else
return false;
......
......@@ -79,6 +79,8 @@ public:
*/
bool Connect( const std::string& sAddress, int iPort = 12480 );
void Disconnect();
//! Returns the connection status
/**
* @return True, if connected
......@@ -204,7 +206,7 @@ private:
int m_iWriteCursor; //!< Cursor where samples will be fed into ring buffer from net audio producer (always ahead)
bool m_bRingBufferFull; //!< Indicator if ring buffer is full (and read cursor equals write cursor)
ITASampleFrame m_sfRingBuffer; //!< Ring buffer
int m_iTargetSampleLatency; //!< Maximum allowed samples / target sample latency
int m_iTargetSampleLatencyServer; //!< Maximum allowed samples / target sample latency
int m_iStreamingStatus; //!< Current streaming status
double m_dLastStreamingTimeCode;
......
......@@ -70,13 +70,14 @@ private:
CITANetAudioMessage* m_pMessage;
VistaConnectionIP* m_pConnection;
VistaThreadEvent m_oBlockIncrementEvent;
//VistaThreadEvent m_oBlockIncrementEvent;
ITASampleFrame m_sfReceivingBuffer; //!< Buffer incoming data
CITANetAudioProtocol::StreamingParameters m_oParams;
bool m_bStopIndicated;
bool m_bStopped;
int iStreamingBlockId;
ITABufferedDataLoggerImplClient* m_pClientLogger;
......
......@@ -61,7 +61,7 @@ public:
};
CITANetAudioStreamingServer();
virtual ~CITANetAudioStreamingServer() {};
~CITANetAudioStreamingServer();
bool Start( const std::string& sAddress, int iPort );
bool IsClientConnected() const;
......
......@@ -46,5 +46,5 @@ void CITANetAudioClient::Disconnect()
bool CITANetAudioClient::GetIsConnected() const
{
return m_pConnection ? true : false;
return ( m_pConnection != NULL ) ? true : false;
}
......@@ -121,11 +121,9 @@ bool CITANetAudioMessage::ReadMessage( int timeout)
// TODO Timer entfernen
if (nIncomingBytes == -1)
return false;
else
int a = 5;
if (timeout != 0)
nIncomingBytes = m_pConnection->WaitForIncomingData( 0 );
//if (timeout != 0)
//nIncomingBytes = m_pConnection->WaitForIncomingData( 0 );
#if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "CITANetAudioMessage [ Reading ] " << nIncomingBytes << " bytes incoming" << std::endl;
#endif
......
......@@ -124,7 +124,7 @@ CITANetAudioStream::CITANetAudioStream( int iChannels, double dSamplingRate, int
, m_bRingBufferFull( false )
, m_iStreamingStatus( INVALID )
, m_dLastStreamingTimeCode( 0.0f )
, m_iTargetSampleLatency( iRingBufferCapacity )
, m_iTargetSampleLatencyServer( iRingBufferCapacity )
{
m_bRingBufferFull = false;
if( iBufferSize > iRingBufferCapacity )
......@@ -157,7 +157,7 @@ CITANetAudioStream::CITANetAudioStream( int iChannels, double dSamplingRate, int
oLog.dSampleRate = m_dSampleRate;
oLog.iBufferSize = GetBlocklength();
oLog.iRingBufferSize = GetRingBufferSize();
oLog.iTargetSampleLatency = m_iTargetSampleLatency;
oLog.iTargetSampleLatency = m_iTargetSampleLatencyServer;
m_pAudioLogger->log( oLog );
}
......@@ -174,6 +174,11 @@ bool CITANetAudioStream::Connect( const std::string& sAddress, int iPort )
return m_pNetAudioStreamingClient->Connect( sAddress, iPort );
}
void CITANetAudioStream::Disconnect()
{
m_pNetAudioStreamingClient->Disconnect();
}
bool CITANetAudioStream::GetIsConnected() const
{
return m_pNetAudioStreamingClient->GetIsConnected();
......@@ -202,17 +207,17 @@ void CITANetAudioStream::SetAllowedLatencySamples( int iLatencySamples )
if( iLatencySamples > GetMaximumLatencySamples() )
ITA_EXCEPT1( INVALID_PARAMETER, "Can not set latency greater than the maximum possible" );
m_iTargetSampleLatency = iLatencySamples;
m_iTargetSampleLatencyServer = iLatencySamples;
}
float CITANetAudioStream::GetAllowedLatencySeconds() const
{
return float( m_iTargetSampleLatency / GetSampleRate() );
return float(m_iTargetSampleLatencyServer / GetSampleRate());
}
int CITANetAudioStream::GetAllowedLatencySamples() const
{
return m_iTargetSampleLatency;
return m_iTargetSampleLatencyServer;
}
int CITANetAudioStream::GetMinimumLatencySamples() const
......@@ -321,7 +326,7 @@ void CITANetAudioStream::IncrementBlockPointer()
oLog.dStreamingTimeCode = m_dLastStreamingTimeCode;
oLog.uiBlockId = ++iAudioStreamingBlockID;
oLog.iFreeSamples = GetRingBufferFreeSamples( );
//m_pStreamLogger->log( oLog );
m_pStreamLogger->log( oLog );
//m_pNetAudioStreamingClient->TriggerBlockIncrement();
}
......
#include <ITANetAudioStreamingClient.h>
#include <ITANetAudioClient.h>
#include <ITANetAudioMessage.h>
#include <ITANetAudioStream.h>
#include <ITADataLog.h>
#include <ITAClock.h>
#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaBase/VistaStreamUtils.h>
//! Audio streaming log item
struct ITAClientLog : public ITALogDataBase
{
inline static std::ostream& outputDesc( std::ostream& os )
{
os << "BlockId";
os << "\t" << "WorldTimeStamp";
os << "\t" << "ProtocolStatus";
os << "\t" << "FreeSamples";
os << "\t" << "Channel";
os << std::endl;
return os;
};
inline std::ostream& outputData( std::ostream& os ) const
{
os << uiBlockId;
os << "\t" << std::setprecision( 12 ) << dWorldTimeStamp;
os << "\t" << iProtocolStatus;
os << "\t" << iFreeSamples;
os << "\t" << iChannel;
os << std::endl;
return os;
};
unsigned int uiBlockId; //!< Block identifier (audio streaming)
double dWorldTimeStamp;
int iProtocolStatus; //!< ... usw
int iFreeSamples;
int iChannel;
};
class ITABufferedDataLoggerImplClient : public ITABufferedDataLogger < ITAClientLog > {};
CITANetAudioStreamingClient::CITANetAudioStreamingClient( CITANetAudioStream* pParent )
: m_oBlockIncrementEvent( VistaThreadEvent::WAITABLE_EVENT )
, m_pStream( pParent )
, m_pConnection( NULL )
, m_bStopIndicated( false )
{
m_pClient = new CITANetAudioClient();
m_oParams.iChannels = pParent->GetNumberOfChannels();
m_oParams.dSampleRate = pParent->GetSampleRate( );
m_oParams.iBlockSize = pParent->GetBlocklength( );
m_oParams.iRingBufferSize = pParent->GetRingBufferSize( );
std::string paras = std::string("NetAudioLogClient") + std::string("_BS") + std::to_string(pParent->GetBlocklength()) + std::string("_Ch") + std::to_string(pParent->GetNumberOfChannels()) + std::string(".txt");
m_pClientLogger = new ITABufferedDataLoggerImplClient( );
m_pClientLogger->setOutputFile(paras);
iStreamingBlockId = 0;
m_pMessage = new CITANetAudioMessage( VistaSerializingToolset::SWAPS_MULTIBYTE_VALUES );
m_sfReceivingBuffer.init(m_oParams.iChannels, m_oParams.iRingBufferSize, false);
}
CITANetAudioStreamingClient::~CITANetAudioStreamingClient()
{
//try{
if (m_pConnection != NULL && m_pConnection->GetIsConnected())
{
m_pMessage->ResetMessage();
m_pMessage->SetConnection(m_pConnection);
m_pMessage->SetMessageType(CITANetAudioProtocol::NP_CLIENT_CLOSE);
m_pMessage->WriteBool(true);
m_pMessage->WriteMessage();
//m_pClient->Disconnect();
}
delete m_pClientLogger;
//}
//catch (ITAException e){
// std::cout << e << std::endl;
//}
}
bool CITANetAudioStreamingClient::Connect( const std::string& sAddress, int iPort )
{
if( GetIsConnected() )
return false;
if( !m_pClient->Connect( sAddress, iPort ) )
ITA_EXCEPT1( INVALID_PARAMETER, "Could not connect to " + sAddress );
m_pConnection = m_pClient->GetConnection();
m_pMessage->ResetMessage();
m_pMessage->SetConnection( m_pConnection );
// Validate streaming parameters of server and client
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_OPEN );
m_pMessage->WriteStreamingParameters( m_oParams );
m_pMessage->WriteMessage( );
m_pMessage->ResetMessage( );
while ( !m_pMessage->ReadMessage( 0 ) );
assert( m_pMessage->GetMessageType( ) == CITANetAudioProtocol::NP_SERVER_OPEN );
bool bOK = m_pMessage->ReadBool();
if( !bOK )
ITA_EXCEPT1( INVALID_PARAMETER, "Streaming server declined connection, detected streaming parameter mismatch." );
Run();
return true;
}
bool CITANetAudioStreamingClient::LoopBody()
{
ITAClientLog oLog;
oLog.uiBlockId = ++iStreamingBlockId;
if( m_bStopIndicated )
return true;
// Send Puffer informationenen
if (iStreamingBlockId % 2 == 1)
{
m_pMessage->ResetMessage();
m_pMessage->SetMessageType(CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES);
m_pMessage->WriteInt(m_pStream->GetRingBufferFreeSamples());
m_pMessage->WriteMessage();
}
// Send message to server that samples can be received
// Read answer
m_pMessage->ResetMessage( );
if ( m_pMessage->ReadMessage( 1 ) )
{
int iMsgType = m_pMessage->GetMessageType( );
switch ( iMsgType )
{
case CITANetAudioProtocol::NP_SERVER_SENDING_SAMPLES:
// Receive samples from net message and forward them to the stream ring buffer
m_pMessage->ReadSampleFrame( &m_sfReceivingBuffer );
if ( m_pStream->GetRingBufferFreeSamples( ) >= m_sfReceivingBuffer.GetLength( ) )
m_pStream->Transmit(m_sfReceivingBuffer, m_sfReceivingBuffer.GetLength());
#ifdef NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "[ITANetAudioStreamingClient] Recived " << m_sfReceivingBuffer.GetLength() << " samples" << std::endl;
#endif
break;
case CITANetAudioProtocol::NP_SERVER_GET_RINGBUFFER_FREE_SAMPLES:
m_pMessage->ReadBool();
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES );
m_pMessage->WriteInt( m_pStream->GetRingBufferFreeSamples( ) );
m_pMessage->WriteMessage();
break;
case CITANetAudioProtocol::NP_SERVER_CLOSE:
Disconnect( );
break;
default:
vstr::out( ) << "[ITANetAudioStreamingServer] Unkown protocol type : " << iMsgType << std::endl;
break;
}
oLog.iChannel = m_pStream->GetNumberOfChannels();
oLog.iProtocolStatus = iMsgType;
oLog.iFreeSamples = m_pStream->GetRingBufferFreeSamples();
oLog.dWorldTimeStamp = ITAClock::getDefaultClock( )->getTime( );
m_pClientLogger->log( oLog );
}
return false;
}
void CITANetAudioStreamingClient::TriggerBlockIncrement()
{
m_oBlockIncrementEvent.SignalEvent();
}
bool CITANetAudioStreamingClient::GetIsConnected() const
{
return m_pClient->GetIsConnected();
}
void CITANetAudioStreamingClient::Disconnect()
{
m_bStopIndicated = true;
StopGently( true );
//delete m_pConnection;
m_pConnection = NULL;
m_bStopIndicated = false;
}
#include <ITANetAudioStreamingClient.h>
#include <ITANetAudioClient.h>
#include <ITANetAudioMessage.h>
#include <ITANetAudioStream.h>
#include <ITADataLog.h>
#include <ITAClock.h>
#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaBase/VistaStreamUtils.h>
#include <VistaBase/VistaTimeUtils.h>
//! Audio streaming log item
struct ITAClientLog : public ITALogDataBase
{
inline static std::ostream& outputDesc( std::ostream& os )
{
os << "BlockId";
os << "\t" << "WorldTimeStamp";
os << "\t" << "ProtocolStatus";
os << "\t" << "FreeSamples";
os << "\t" << "Channel";
os << std::endl;
return os;
};
inline std::ostream& outputData( std::ostream& os ) const
{
os << uiBlockId;
os << "\t" << std::setprecision( 12 ) << dWorldTimeStamp;
os << "\t" << iProtocolStatus;
os << "\t" << iFreeSamples;
os << "\t" << iChannel;
os << std::endl;
return os;
};
unsigned int uiBlockId; //!< Block identifier (audio streaming)
double dWorldTimeStamp;
int iProtocolStatus; //!< ... usw
int iFreeSamples;
int iChannel;
};
class ITABufferedDataLoggerImplClient : public ITABufferedDataLogger < ITAClientLog > {};
CITANetAudioStreamingClient::CITANetAudioStreamingClient( CITANetAudioStream* pParent )
: m_pStream( pParent )
, m_pConnection( NULL )
, m_bStopIndicated( false )
, m_bStopped( false )
{
m_pClient = new CITANetAudioClient();
m_oParams.iChannels = pParent->GetNumberOfChannels();
m_oParams.dSampleRate = pParent->GetSampleRate( );
m_oParams.iBlockSize = pParent->GetBlocklength( );
m_oParams.iRingBufferSize = pParent->GetRingBufferSize( );
std::string paras = std::string("NetAudioLogClient") + std::string("_BS") + std::to_string(pParent->GetBlocklength()) + std::string("_Ch") + std::to_string(pParent->GetNumberOfChannels()) + std::string(".txt");
m_pClientLogger = new ITABufferedDataLoggerImplClient( );
m_pClientLogger->setOutputFile(paras);
iStreamingBlockId = 0;
m_pMessage = new CITANetAudioMessage( VistaSerializingToolset::SWAPS_MULTIBYTE_VALUES );
m_sfReceivingBuffer.init(m_oParams.iChannels, m_oParams.iRingBufferSize, false);
}
CITANetAudioStreamingClient::~CITANetAudioStreamingClient()
{
if( GetIsConnected() )
Disconnect();
StopGently( true );
delete m_pClientLogger;
delete m_pClient;
delete m_pMessage;
}
bool CITANetAudioStreamingClient::Connect( const std::string& sAddress, int iPort )
{
if( GetIsConnected() )
return false;
if( !m_pClient->Connect( sAddress, iPort ) )
ITA_EXCEPT1( INVALID_PARAMETER, "Could not connect to " + sAddress );
m_pConnection = m_pClient->GetConnection();
m_pMessage->ResetMessage();
m_pMessage->SetConnection( m_pConnection );
// Validate streaming parameters of server and client
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_OPEN );
m_pMessage->WriteStreamingParameters( m_oParams );
m_pMessage->WriteMessage( );
m_pMessage->ResetMessage( );
while ( !m_pMessage->ReadMessage( 0 ) );
assert( m_pMessage->GetMessageType( ) == CITANetAudioProtocol::NP_SERVER_OPEN );
bool bOK = m_pMessage->ReadBool();
if( !bOK )
ITA_EXCEPT1( INVALID_PARAMETER, "Streaming server declined connection, detected streaming parameter mismatch." );
Run();
return true;
}
bool CITANetAudioStreamingClient::LoopBody()
{
if( !GetIsConnected() )
return true;
ITAClientLog oLog;
oLog.uiBlockId = ++iStreamingBlockId;
if( m_bStopIndicated && !m_bStopped )
{
m_pMessage->ResetMessage();
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_CLOSE );
m_pMessage->WriteMessage();
while( true )
{
m_pMessage->ResetMessage();
m_pMessage->ReadMessage( 0 );
int iMsgType = m_pMessage->GetMessageType();
if( iMsgType == CITANetAudioProtocol::NP_SERVER_CLOSE )
break;
}
m_bStopped = true;
m_pMessage->SetConnection( NULL );
while( GetIsConnected() )
VistaTimeUtils::Sleep( 100 );
return true;
}
// Read answer (blocking)
m_pMessage->ResetMessage( );
if( m_pMessage->ReadMessage( 0 ) )
{
int iMsgType = m_pMessage->GetMessageType();
switch( iMsgType )
{
case CITANetAudioProtocol::NP_SERVER_SENDING_SAMPLES:
m_pMessage->ReadSampleFrame( &m_sfReceivingBuffer );
if( m_pStream->GetRingBufferFreeSamples() >= m_sfReceivingBuffer.GetLength() )
m_pStream->Transmit( m_sfReceivingBuffer, m_sfReceivingBuffer.GetLength() );
#ifdef NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "[ITANetAudioStreamingClient] Recived " << m_sfReceivingBuffer.GetLength() << " samples" << std::endl;
#endif
break;
case CITANetAudioProtocol::NP_SERVER_GET_RINGBUFFER_FREE_SAMPLES:
m_pMessage->ReadBool();
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES );
m_pMessage->WriteInt( m_pStream->GetRingBufferFreeSamples() );
m_pMessage->WriteMessage();
break;
case CITANetAudioProtocol::NP_SERVER_CLOSE:
Disconnect();
break;
default:
vstr::out() << "[ITANetAudioStreamingServer] Unkown protocol type : " << iMsgType << std::endl;
break;
}
oLog.iChannel = m_pStream->GetNumberOfChannels();
oLog.iProtocolStatus = iMsgType;
oLog.iFreeSamples = m_pStream->GetRingBufferFreeSamples();
oLog.dWorldTimeStamp = ITAClock::getDefaultClock( )->getTime( );
m_pClientLogger->log( oLog );
}
return false;
}
bool CITANetAudioStreamingClient::GetIsConnected() const
{
return m_pClient->GetIsConnected();
}
void CITANetAudioStreamingClient::Disconnect()
{
m_bStopIndicated = true;
while( !m_bStopped )
VistaTimeUtils::Sleep( 100 );
m_pConnection = NULL;
m_pClient->Disconnect();
m_bStopIndicated = false;
m_bStopped = false;
}
......@@ -63,6 +63,12 @@ CITANetAudioStreamingServer::CITANetAudioStreamingServer( )
m_iClientRingBufferFreeSamples = 0;
}
CITANetAudioStreamingServer::~CITANetAudioStreamingServer()
{
delete m_pNetAudioServer;
delete m_pServerLogger;
}
bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort )
{
if ( !m_pInputStream )
......@@ -196,11 +202,15 @@ bool CITANetAudioStreamingServer::LoopBody( )
{
case CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES:
{
m_iClientRingBufferFreeSamples = m_pMessage->ReadInt( );
m_iClientRingBufferFreeSamples = m_pMessage->ReadInt( );
break;
}
case CITANetAudioProtocol::NP_CLIENT_CLOSE:
{
m_pMessage->ResetMessage();
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_CLOSE );
m_pMessage->WriteMessage();
StopGently( false );
m_pConnection = NULL;
Stop( );
......@@ -273,8 +283,6 @@ std::string CITANetAudioStreamingServer::GetNetworkAddress( ) const
return m_pNetAudioServer->GetServerAddress( );
}
int CITANetAudioStreamingServer::GetNetworkPort( ) const
{
return m_pNetAudioServer->GetNetworkPort( );
......@@ -282,6 +290,5 @@ int CITANetAudioStreamingServer::GetNetworkPort( ) const
void CITANetAudioStreamingServer::Stop( )
{
delete m_pServerLogger;
m_pNetAudioServer->Stop( );
}
......@@ -61,7 +61,7 @@ private:
int main( int, char** )
{
// Sample server (forked away into a thread)
CServer oServer( g_sInputFilePath );
CServer* pServer = new CServer( g_sInputFilePath );
// Client dumping received stream and mixing down to two channels
CITANetAudioStream oNetAudioStream( g_iChannels, g_dSampleRate, g_iBlockLength, 100 * g_iBlockLength );
......@@ -100,11 +100,13 @@ int main( int, char** )
vstr::out() << "[ NetAudioTestClient ] Connected." << endl;
// Playback
float fSeconds = 5.0f;
float fSeconds = 10.0f;
vstr::out() << "[ NetAudioTestClient ] Playback started, waiting " << fSeconds << " seconds" << endl;
ITAPA.Sleep( fSeconds ); // blocking
vstr::out() << "[ NetAudioTestClient ] Done." << endl;
oNetAudioStream.Disconnect();
vstr::out() << "[ NetAudioTestClient ] Will now disconnect from net audio server '" << g_sServerName << "' and port " << g_iServerPort << endl;
vstr::out() << "[ NetAudioTestClient ] Closing in 1 second (net audio stream not connected and playing back zeros)" << endl;
ITAPA.Sleep( 1.0f );
......@@ -113,5 +115,7 @@ int main( int, char** )
ITAPA.Close( );
ITAPA.Finalize( );
delete pServer;
return 0;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment