Commit 836ee756 authored by Jonas Stienen's avatar Jonas Stienen

Refactoring a lot for logging netaudio messages

parent db336c73
......@@ -47,6 +47,7 @@ class CITANetAudioStream;
class ITA_DATA_SOURCES_API CITANetAudioProtocol
{
public:
static const int NP_CLIENT_IDLE = 0;
static const int NP_CLIENT_OPEN = 100;
static const int NP_CLIENT_CLOSE = 101;
static const int NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES = 111;
......@@ -65,8 +66,6 @@ public:
double dSampleRate;
int iBlockSize;
int iRingBufferSize;
int iTargetSampleLatency;
double dTimeIntervalSendInfos;
inline StreamingParameters()
{
......@@ -74,8 +73,6 @@ public:
dSampleRate = 0.0f;
iBlockSize = 0;
iRingBufferSize = 0;
iTargetSampleLatency = 0;
dTimeIntervalSendInfos = 0;
};
inline bool operator==( const StreamingParameters& rhs )
......@@ -83,9 +80,7 @@ public:
if ( ( iChannels == rhs.iChannels )
&& ( dSampleRate == rhs.dSampleRate )
&& (iBlockSize == rhs.iBlockSize)
&& (iRingBufferSize == rhs.iRingBufferSize)
&& (iTargetSampleLatency == rhs.iTargetSampleLatency)
&& (dTimeIntervalSendInfos == rhs.dTimeIntervalSendInfos))
&& (iRingBufferSize == rhs.iRingBufferSize))
return true;
else
return false;
......
......@@ -25,6 +25,7 @@
#include <ITASampleFrame.h>
#include <ITAStreamProbe.h>
#include <ITAStopWatch.h>
#include <VistaInterProcComm/Concurrency/VistaThreadEvent.h>
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
......@@ -69,19 +70,21 @@ private:
CITANetAudioProtocol* m_pProtocol;
CITANetAudioMessage* m_pMessage;
VistaConnectionIP* m_pConnection;
//VistaThreadEvent m_oBlockIncrementEvent;
ITASampleFrame m_sfReceivingBuffer; //!< Buffer incoming data
CITANetAudioProtocol::StreamingParameters m_oParams;
bool m_bStopIndicated;
bool m_bStopped;
double m_dLastAckknowlengementTimeStamp;
int n;
int iStreamingBlockId;
int m_iStreamingBlockId;
double m_dServerClockSyncRequestTimeInterval;
double m_dServerClockSyncLastSyncTime;
ITABufferedDataLoggerImplClient* m_pClientLogger;
ITAStopWatch m_swTryReadStats;
friend class CITANetAudioStream;
};
......
......@@ -63,7 +63,7 @@ public:
CITANetAudioStreamingServer();
~CITANetAudioStreamingServer();
bool Start(const std::string& sAddress, int iPort, double dTimeIntervalCientSendStatus);
bool Start( const std::string& sAddress, const int iPort, const double dTimeIntervalCientSendStatus );
bool IsClientConnected() const;
std::string GetNetworkAddress() const;
int GetNetworkPort() const;
......@@ -71,13 +71,16 @@ public:
void Stop();
void SetInputStream( ITADatasource* pInStream );
int GetNetStreamBlocklength() const;
int GetNetStreamNumberOfChannels() const;
double GetNetStreamSampleRate() const;
void SetAutomaticUpdateRate();
void SetTargetLatencySamples( const int iTargetLatency );
void GetTargetLatencySamples() const;
protected:
ITADatasource* GetInputStream() const;
......@@ -95,6 +98,7 @@ private:
int m_iUpdateStrategy;
int m_iClientRingBufferFreeSamples;
int m_iTargetLatencySamples;
int m_iMaxSendBlocks;
double m_dLastTimeStamp;
......
This diff is collapsed.
......@@ -51,21 +51,22 @@ CITANetAudioStreamingClient::CITANetAudioStreamingClient( CITANetAudioStream* pP
, m_pConnection( NULL )
, m_bStopIndicated( false )
, m_bStopped( false )
, m_iStreamingBlockId( 0 )
, m_dServerClockSyncRequestTimeInterval( 0.1f )
, m_dServerClockSyncLastSyncTime( 0.0f )
{
m_pClient = new CITANetAudioClient();
n = 0;
m_oParams.iChannels = pParent->GetNumberOfChannels();
m_oParams.dSampleRate = pParent->GetSampleRate( );
m_oParams.dSampleRate = pParent->GetSampleRate();
m_oParams.iBlockSize = pParent->GetBlocklength();
m_oParams.iRingBufferSize = pParent->GetRingBufferSize();
m_oParams.iTargetSampleLatency = pParent->GetAllowedLatencySamples();
std::string paras = std::string("NetAudioLogClient") + std::string("_BS") + std::to_string(pParent->GetBlocklength()) + std::string("_Ch") + std::to_string(pParent->GetNumberOfChannels()) + std::string("_tl") + std::to_string(pParent->GetAllowedLatencySamples()) + std::string(".txt");
m_pClientLogger = new ITABufferedDataLoggerImplClient( );
m_pClientLogger->setOutputFile(paras);
iStreamingBlockId = 0;
std::string paras = std::string( "NetAudioLogClient" ) + std::string( "_BS" ) + std::to_string( pParent->GetBlocklength() ) + std::string( "_Ch" ) + std::to_string( pParent->GetNumberOfChannels() ) + std::string( "_tl" ) + std::to_string( pParent->GetAllowedLatencySamples() ) + std::string( ".txt" );
m_pClientLogger = new ITABufferedDataLoggerImplClient();
m_pClientLogger->setOutputFile( paras );
m_pMessage = new CITANetAudioMessage( VistaSerializingToolset::SWAPS_MULTIBYTE_VALUES );
m_sfReceivingBuffer.init(m_oParams.iChannels, m_oParams.iRingBufferSize, false);
m_sfReceivingBuffer.init( m_oParams.iChannels, m_oParams.iRingBufferSize, false );
}
CITANetAudioStreamingClient::~CITANetAudioStreamingClient()
......@@ -84,33 +85,32 @@ bool CITANetAudioStreamingClient::Connect( const std::string& sAddress, int iPor
{
if( GetIsConnected() )
return false;
if( !m_pClient->Connect( sAddress, iPort ) )
ITA_EXCEPT1( INVALID_PARAMETER, "Could not connect to " + sAddress );
if( !m_pClient->GetIsConnected() )
return false;
m_pConnection = m_pClient->GetConnection();
m_pMessage->ResetMessage();
m_pMessage->SetConnection( m_pConnection );
// Validate streaming parameters of server and client
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_OPEN );
m_pMessage->WriteStreamingParameters( m_oParams );
m_pMessage->WriteMessage( );
m_pMessage->ResetMessage( );
m_pMessage->WriteMessage();
m_pMessage->ResetMessage();
while ( !m_pMessage->ReadMessage( 0 ) );
assert( m_pMessage->GetMessageType( ) == CITANetAudioProtocol::NP_SERVER_OPEN );
m_oParams.dTimeIntervalSendInfos = m_pMessage->ReadDouble();
if (m_oParams.dTimeIntervalSendInfos <= 0)
ITA_EXCEPT1( INVALID_PARAMETER, "Streaming server declined connection, detected streaming parameter mismatch." );
while( !m_pMessage->ReadMessage( 0 ) );
m_dLastAckknowlengementTimeStamp = 0;
assert( m_pMessage->GetMessageType() == CITANetAudioProtocol::NP_SERVER_OPEN );
// Clock sync vars
m_dServerClockSyncRequestTimeInterval = m_pMessage->ReadDouble();
m_dServerClockSyncLastSyncTime = 0;
Run();
return true;
......@@ -122,7 +122,12 @@ bool CITANetAudioStreamingClient::LoopBody()
return true;
ITAClientLog oLog;
oLog.uiBlockId = ++iStreamingBlockId;
oLog.uiBlockId = ++m_iStreamingBlockId;
oLog.iChannel = m_pStream->GetNumberOfChannels();
oLog.iFreeSamples = m_pStream->GetRingBufferFreeSamples();
oLog.iProtocolStatus = CITANetAudioProtocol::NP_CLIENT_IDLE;
oLog.dWorldTimeStamp = ITAClock::getDefaultClock()->getTime();
if( m_bStopIndicated && !m_bStopped )
{
......@@ -147,12 +152,13 @@ bool CITANetAudioStreamingClient::LoopBody()
return true;
}
// Read answer (blocking)
m_pMessage->ResetMessage( );
// Try-read message (blocking for a timeout of 1ms)
m_pMessage->ResetMessage();
m_swTryReadStats.start();
if( m_pMessage->ReadMessage( 1 ) )
{
m_swTryReadStats.stop();
int iMsgType = m_pMessage->GetMessageType();
switch( iMsgType )
{
......@@ -163,39 +169,43 @@ bool CITANetAudioStreamingClient::LoopBody()
#ifdef NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "[ITANetAudioStreamingClient] Recived " << m_sfReceivingBuffer.GetLength() << " samples" << std::endl;
#endif
n++;
break;
case CITANetAudioProtocol::NP_SERVER_GET_RINGBUFFER_FREE_SAMPLES:
m_pMessage->ReadBool();
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES );
m_pMessage->WriteInt( m_pStream->GetRingBufferFreeSamples() );
m_pMessage->WriteMessage();
break;
case CITANetAudioProtocol::NP_SERVER_CLOSE:
Disconnect();
break;
default:
vstr::out() << "[ITANetAudioStreamingServer] Unkown protocol type : " << iMsgType << std::endl;
break;
}
oLog.iChannel = m_pStream->GetNumberOfChannels();
// Also log message type on incoming message
oLog.iProtocolStatus = iMsgType;
oLog.iFreeSamples = m_pStream->GetRingBufferFreeSamples();
oLog.dWorldTimeStamp = ITAClock::getDefaultClock( )->getTime( );
m_pClientLogger->log( oLog );
}
else
m_swTryReadStats.stop();
m_pClientLogger->log( oLog );
// Send ring buffer free samples occasionally (as requested by server)
const double dNow = ITAClock::getDefaultClock()->getTime();
if( ( m_dServerClockSyncLastSyncTime + m_dServerClockSyncRequestTimeInterval ) < dNow )
{
if ((m_dLastAckknowlengementTimeStamp + m_oParams.dTimeIntervalSendInfos) < ITAClock::getDefaultClock()->getTime() )
{
// sende mal freie samples
m_pMessage->SetMessageType(CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES);
m_pMessage->WriteInt(m_pStream->GetRingBufferFreeSamples());
m_pMessage->WriteMessage();
m_dLastAckknowlengementTimeStamp = ITAClock::getDefaultClock()->getTime();
}
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES );
m_pMessage->WriteInt( m_pStream->GetRingBufferFreeSamples() );
m_pMessage->WriteMessage();
m_dServerClockSyncLastSyncTime = dNow;
}
return false;
}
......@@ -210,7 +220,7 @@ void CITANetAudioStreamingClient::Disconnect()
while( !m_bStopped )
VistaTimeUtils::Sleep( 100 );
m_pConnection = NULL;
m_pMessage->ClearConnection();
m_pClient->Disconnect();
......
......@@ -58,6 +58,7 @@ CITANetAudioStreamingServer::CITANetAudioStreamingServer()
, m_pConnection(NULL)
, m_pNetAudioServer(new CITANetAudioServer())
, m_dLastTimeStamp(0)
, m_iTargetLatencySamples( -1 )
{
iServerBlockId = 0;
m_iMaxSendBlocks = 40;
......@@ -90,9 +91,8 @@ bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort
bool bOK = false;
m_oServerParams.iRingBufferSize = oClientParams.iRingBufferSize;
m_oServerParams.iTargetSampleLatency = oClientParams.iTargetSampleLatency;
m_oServerParams.iBlockSize = oClientParams.iBlockSize;
m_iClientRingBufferFreeSamples = m_oServerParams.iTargetSampleLatency;
m_iClientRingBufferFreeSamples = m_oServerParams.iRingBufferSize;
m_dLastTimeStamp = ITAClock::getDefaultClock()->getTime();
if ( m_oServerParams == oClientParams )
......@@ -109,7 +109,7 @@ bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort
#endif
}
std::string paras = std::string("NetAudioLogServer") + std::string("_BS") + std::to_string(m_oServerParams.iBlockSize) + std::string("_Ch") + std::to_string(m_oServerParams.iChannels) + std::string("_tl") + std::to_string(m_oServerParams.iTargetSampleLatency) + std::string(".txt");
std::string paras = std::string("NetAudioLogServer") + std::string("_BS") + std::to_string(m_oServerParams.iBlockSize) + std::string("_Ch") + std::to_string(m_oServerParams.iChannels) + std::string("_tl") + std::to_string(m_iTargetLatencySamples) + std::string(".txt");
m_pServerLogger = new ITABufferedDataLoggerImplServer( );
m_pServerLogger->setOutputFile( paras );
......@@ -135,7 +135,7 @@ bool CITANetAudioStreamingServer::LoopBody( )
int iMsgType;
// Sending Samples
int iBlockLength = m_pInputStream->GetBlocklength( );
int iClientRingBufferTargetLatencyFreeSamples = m_iClientRingBufferFreeSamples - (m_oServerParams.iRingBufferSize - m_oServerParams.iTargetSampleLatency);
int iClientRingBufferTargetLatencyFreeSamples = m_iClientRingBufferFreeSamples - (m_oServerParams.iRingBufferSize - m_iTargetLatencySamples);
if (iClientRingBufferTargetLatencyFreeSamples >= iBlockLength)
{
......@@ -224,7 +224,7 @@ bool CITANetAudioStreamingServer::LoopBody( )
const double dTimeDiff = dTimestamp - m_dLastTimeStamp;
m_dLastTimeStamp = dTimestamp;
oLog.dWorldTimeStamp = dTimestamp;
float dEstimatedSamples = dTimeDiff * m_pInputStream->GetSampleRate();
double dEstimatedSamples = dTimeDiff * m_pInputStream->GetSampleRate();
m_iClientRingBufferFreeSamples += (int)dEstimatedSamples;
oLog.iFreeSamples = m_iClientRingBufferFreeSamples;
oLog.iProtocolStatus = 555;
......@@ -285,6 +285,15 @@ void CITANetAudioStreamingServer::SetAutomaticUpdateRate( )
m_iUpdateStrategy = AUTO;
}
void CITANetAudioStreamingServer::SetTargetLatencySamples( const int iTargetLatency )
{
// Streaming already set up?
if( IsClientConnected() && m_iTargetLatencySamples < m_oServerParams.iBlockSize )
ITA_EXCEPT1( INVALID_PARAMETER, "Target latency has to be at least the block size of the audio streaming at client side." );
m_iTargetLatencySamples = iTargetLatency;
}
bool CITANetAudioStreamingServer::IsClientConnected( ) const
{
return m_pNetAudioServer->IsConnected( );
......
......@@ -18,7 +18,7 @@ int g_iServerPort = 12480;
double g_dSampleRate = 44100.0;
int g_iBlockLength = 64;
int g_iChannels = 2;
int g_iRingBufferSize = 2048; // 22ms max
int g_iRingBufferSize = 88200;
double g_dPlaybackDuration = 10; // seconds
int main( int argc, char* argv[] )
......
......@@ -15,7 +15,7 @@ int g_iServerPort = 12480;
double g_dSampleRate = 44100.0;
int g_iBlockLength = 64;
int g_iChannels = 2;
int g_iTargetLatencySamples = 128; // 1.4512ms
int g_iTargetLatencySamples = 44100; // 1.4512ms
double g_dClientStatusMessageTimeout = 0.1; // seconds
string g_sFileName = "gershwin-mono.wav";
......@@ -47,6 +47,7 @@ int main( int argc, char** argv )
ITAStreamMultiplier1N oMuliplier( pSource, g_iChannels );
CITANetAudioStreamingServer oStreamingServer;
oStreamingServer.SetInputStream( &oMuliplier );
oStreamingServer.SetTargetLatencySamples( g_iTargetLatencySamples );
cout << "Starting net audio server and waiting for connections on '" << g_sServerName << "' on port " << g_iServerPort << endl;
oStreamingServer.Start( g_sServerName, g_iServerPort, g_dClientStatusMessageTimeout );
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment