Fixing comm

parent 7acee70d
...@@ -60,6 +60,7 @@ public: ...@@ -60,6 +60,7 @@ public:
static const int NP_SERVER_OPEN = 201; static const int NP_SERVER_OPEN = 201;
static const int NP_SERVER_GET_RINGBUFFER_SIZE = 210; static const int NP_SERVER_GET_RINGBUFFER_SIZE = 210;
static const int NP_SERVER_GET_RINGBUFFER_FREE = 211; static const int NP_SERVER_GET_RINGBUFFER_FREE = 211;
static const int NP_SERVER_WAITING_FOR_TRIGGER = 221;
static const int NP_SERVER_SEND_SAMPLES = 222; static const int NP_SERVER_SEND_SAMPLES = 222;
CITANetAudioProtocol(); CITANetAudioProtocol();
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include <ITASampleFrame.h> #include <ITASampleFrame.h>
#include <VistaInterProcComm/Concurrency/VistaThreadEvent.h>
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h> #include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
#include <string> #include <string>
...@@ -54,6 +55,9 @@ public: ...@@ -54,6 +55,9 @@ public:
bool LoopBody(); bool LoopBody();
protected:
void TriggerBlockIncrement();
private: private:
CITANetAudioClient* m_pClient; CITANetAudioClient* m_pClient;
CITANetAudioStream* m_pStream; CITANetAudioStream* m_pStream;
...@@ -62,6 +66,8 @@ private: ...@@ -62,6 +66,8 @@ private:
CITANetAudioMessage* m_pMessage; CITANetAudioMessage* m_pMessage;
VistaConnectionIP* m_pConnection; VistaConnectionIP* m_pConnection;
VistaThreadEvent m_oBlockIncrementEvent;
ITASampleFrame m_sfReceivingBuffer; //!< Buffer incoming data ITASampleFrame m_sfReceivingBuffer; //!< Buffer incoming data
CITANetAudioProtocol::StreamingParameters m_oClientParams; CITANetAudioProtocol::StreamingParameters m_oClientParams;
...@@ -69,7 +75,7 @@ private: ...@@ -69,7 +75,7 @@ private:
bool m_bStopIndicated; bool m_bStopIndicated;
friend class CITANetAudioClient; friend class CITANetAudioStream;
}; };
#endif // INCLUDE_WATCHER_ITA_NET_AUDIO_STREAMING_CLIENT #endif // INCLUDE_WATCHER_ITA_NET_AUDIO_STREAMING_CLIENT
...@@ -102,7 +102,7 @@ void CITANetAudioMessage::ReadMessage() ...@@ -102,7 +102,7 @@ void CITANetAudioMessage::ReadMessage()
int nReturn = m_pConnection->ReadInt32( nMessageSize ); int nReturn = m_pConnection->ReadInt32( nMessageSize );
// we need at least the two protocol ints // we need at least the two protocol ints
assert( nMessageSize >= 3 * sizeof( VistaType::sint32 ) ); assert( nMessageSize >= 2 * sizeof( VistaType::sint32 ) );
if( nMessageSize > ( int ) m_vecIncomingBuffer.size() ) if( nMessageSize > ( int ) m_vecIncomingBuffer.size() )
m_vecIncomingBuffer.resize( nMessageSize ); m_vecIncomingBuffer.resize( nMessageSize );
......
...@@ -56,6 +56,7 @@ void CITANetAudioStream::IncrementBlockPointer() ...@@ -56,6 +56,7 @@ void CITANetAudioStream::IncrementBlockPointer()
{ {
// Increment read cursor by one audio block and wrap around if exceeding ring buffer // Increment read cursor by one audio block and wrap around if exceeding ring buffer
m_iReadCursor = ( m_iReadCursor + m_sfOutputStreamBuffer.GetLength() ) % m_sfRingBuffer.GetLength(); m_iReadCursor = ( m_iReadCursor + m_sfOutputStreamBuffer.GetLength() ) % m_sfRingBuffer.GetLength();
m_pNetAudioStreamingClient->TriggerBlockIncrement();
} }
int CITANetAudioStream::Transmit( const ITASampleFrame& sfNewSamples, int iNumSamples ) int CITANetAudioStream::Transmit( const ITASampleFrame& sfNewSamples, int iNumSamples )
...@@ -80,7 +81,8 @@ int CITANetAudioStream::Transmit( const ITASampleFrame& sfNewSamples, int iNumSa ...@@ -80,7 +81,8 @@ int CITANetAudioStream::Transmit( const ITASampleFrame& sfNewSamples, int iNumSa
int CITANetAudioStream::GetRingbufferFreeSamples() int CITANetAudioStream::GetRingbufferFreeSamples()
{ {
ITA_EXCEPT0( NOT_IMPLEMENTED ); int iFreeSamples = ( m_iWriteCursor - m_iReadCursor + GetRingBufferSize() ) % GetRingBufferSize();
return iFreeSamples;
} }
int CITANetAudioStream::GetRingBufferSize() const int CITANetAudioStream::GetRingBufferSize() const
......
...@@ -7,7 +7,8 @@ ...@@ -7,7 +7,8 @@
#include <VistaInterProcComm/Connections/VistaConnectionIP.h> #include <VistaInterProcComm/Connections/VistaConnectionIP.h>
CITANetAudioStreamingClient::CITANetAudioStreamingClient( CITANetAudioStream* pParent ) CITANetAudioStreamingClient::CITANetAudioStreamingClient( CITANetAudioStream* pParent )
: m_pStream( pParent ) : m_oBlockIncrementEvent( VistaThreadEvent::WAITABLE_EVENT )
, m_pStream( pParent )
, m_pConnection( NULL ) , m_pConnection( NULL )
, m_bStopIndicated( false ) , m_bStopIndicated( false )
{ {
...@@ -24,8 +25,12 @@ CITANetAudioStreamingClient::~CITANetAudioStreamingClient() ...@@ -24,8 +25,12 @@ CITANetAudioStreamingClient::~CITANetAudioStreamingClient()
{ {
if( m_pConnection ) if( m_pConnection )
{ {
m_pMessage->ResetMessage();
m_pMessage->SetConnection( m_pConnection );
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_CLOSE ); m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_CLOSE );
m_pMessage->WriteAnswer(); m_pMessage->WriteMessage();
m_pMessage->ReadAnswer();
m_pClient->Disconnect();
} }
} }
...@@ -44,21 +49,15 @@ bool CITANetAudioStreamingClient::Connect( const std::string& sAddress, int iPor ...@@ -44,21 +49,15 @@ bool CITANetAudioStreamingClient::Connect( const std::string& sAddress, int iPor
// Validate streaming parameters of server and client // Validate streaming parameters of server and client
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_OPEN ); m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_OPEN );
//m_pMessage->WriteStreamingParameters( m_oClientParams ); // Not yet m_pMessage->WriteStreamingParameters( m_oClientParams );
m_pMessage->WriteInt( 42 );
m_pMessage->WriteMessage(); m_pMessage->WriteMessage();
m_pMessage->ReadAnswer(); m_pMessage->ReadAnswer();
assert( m_pMessage->GetAnswerType() == CITANetAudioProtocol::NP_SERVER_OPEN ); assert( m_pMessage->GetAnswerType() == CITANetAudioProtocol::NP_SERVER_OPEN );
int i42 = m_pMessage->ReadInt(); bool bOK = m_pMessage->ReadBool();
/* Not yet if( !bOK )
CITANetAudioProtocol::StreamingParameters oServerParams = m_pMessage->ReadStreamingParameters(); ITA_EXCEPT1( INVALID_PARAMETER, "Streaming server declined connection, detected streaming parameter mismatch." );
if( oServerParams == m_oClientParams )
m_oServerParams = oServerParams;
else
ITA_EXCEPT1( INVALID_PARAMETER, "Streaming parameters of network audio server and client do not match." );
*/
Run(); Run();
...@@ -81,22 +80,43 @@ bool CITANetAudioStreamingClient::LoopBody() ...@@ -81,22 +80,43 @@ bool CITANetAudioStreamingClient::LoopBody()
if( m_bStopIndicated ) if( m_bStopIndicated )
return true; return true;
// Receive message // Send message to server that samples can be received
m_pMessage->ReadMessage(); m_pMessage->ResetMessage();
switch( m_pMessage->GetMessageType() ) m_pMessage->SetConnection( m_pConnection );
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_WAITING_FOR_SAMPLES );
m_pMessage->WriteInt( m_pStream->GetRingbufferFreeSamples() );
m_pMessage->WriteMessage();
// Wait for answer of server
m_pMessage->ReadAnswer();
switch( m_pMessage->GetAnswerType() )
{ {
case CITANetAudioProtocol::NP_INVALID: case CITANetAudioProtocol::NP_INVALID:
// Something went wrong
break;
case CITANetAudioProtocol::NP_SERVER_WAITING_FOR_TRIGGER:
// Wait until block increment is triggered by audio context (more free samples in ring buffer)
m_oBlockIncrementEvent.WaitForEvent( true );
break; break;
case CITANetAudioProtocol::NP_SERVER_SEND_SAMPLES: case CITANetAudioProtocol::NP_SERVER_SEND_SAMPLES:
// Receive samples and forward them to the stream ring buffer
/* /*
int iNumSamples = m_pMessage->ReadSamples( m_sfReceivingBuffer ); int iNumSamples = m_pMessage->ReadSampleFrame( &m_sfReceivingBuffer );
m_pParent->Transmit( m_sfReceivingBuffer, iNumSamples ); if( m_pStream->GetRingbufferFreeSamples() >= iNumSamples )
int iFreeSamples = m_pParent->GetRingBufferFreeSamples(); m_pStream->Transmit( m_sfReceivingBuffer, iNumSamples );
m_pMessage->WriteFreeRingBufferSamples( iFreeSamples );
m_pMessage->WriteAnswer();
*/ */
break; break;
} }
return true;
}
void CITANetAudioStreamingClient::TriggerBlockIncrement()
{
m_oBlockIncrementEvent.SignalEvent();
} }
bool CITANetAudioStreamingClient::GetIsConnected() const bool CITANetAudioStreamingClient::GetIsConnected() const
......
...@@ -28,6 +28,9 @@ CITANetAudioStreamingServer::CITANetAudioStreamingServer() ...@@ -28,6 +28,9 @@ CITANetAudioStreamingServer::CITANetAudioStreamingServer()
bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort ) bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort )
{ {
if( !m_pInputStream )
ITA_EXCEPT1( MODAL_EXCEPTION, "Can not start server without a valid input stream" );
// TODO: vorrckgabe noch anfangen zu senden (Samples) // TODO: vorrckgabe noch anfangen zu senden (Samples)
if( !m_pNetAudioServer->Start( sAddress, iPort ) ) // blocking if( !m_pNetAudioServer->Start( sAddress, iPort ) ) // blocking
return false; return false;
...@@ -39,18 +42,20 @@ bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort ...@@ -39,18 +42,20 @@ bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort
m_pMessage->SetConnection( m_pConnection ); m_pMessage->SetConnection( m_pConnection );
m_pMessage->ReadMessage(); // blocking m_pMessage->ReadMessage(); // blocking
int nMT = m_pMessage->GetMessageType(); assert( m_pMessage->GetMessageType() == CITANetAudioProtocol::NP_CLIENT_OPEN );
assert( nMT == CITANetAudioProtocol::NP_CLIENT_OPEN ); CITANetAudioProtocol::StreamingParameters oClientParams = m_pMessage->ReadStreamingParameters();
int i42 = m_pMessage->ReadInt();
//CITANetAudioProtocol::StreamingParameters oClientParams = m_pMessage->ReadStreamingParameters(); bool bOK = false;
if( m_pInputStream->GetNumberOfChannels() == oClientParams.iChannels &&
m_pInputStream->GetSampleRate() == oClientParams.dSampleRate &&
m_pInputStream->GetBlocklength() == oClientParams.iBlockSize )
bOK = true;
m_pMessage->SetAnswerType( CITANetAudioProtocol::NP_SERVER_OPEN ); m_pMessage->SetAnswerType( CITANetAudioProtocol::NP_SERVER_OPEN );
m_pMessage->WriteInt( 2 * 42 ); m_pMessage->WriteBool( bOK );
m_pMessage->WriteAnswer(); m_pMessage->WriteAnswer();
if( m_pInputStream ) Run();
Run();
return true; return true;
} }
...@@ -83,8 +88,6 @@ void CITANetAudioStreamingServer::SetInputStream( ITADatasource* pInStream ) ...@@ -83,8 +88,6 @@ void CITANetAudioStreamingServer::SetInputStream( ITADatasource* pInStream )
m_pInputStream = pInStream; m_pInputStream = pInStream;
m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels(), m_pInputStream->GetBlocklength(), true ); m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels(), m_pInputStream->GetBlocklength(), true );
if( m_pConnection )
Run();
} }
int CITANetAudioStreamingServer::GetNetStreamBlocklength() const int CITANetAudioStreamingServer::GetNetStreamBlocklength() const
...@@ -111,19 +114,24 @@ bool CITANetAudioStreamingServer::LoopBody() ...@@ -111,19 +114,24 @@ bool CITANetAudioStreamingServer::LoopBody()
switch( m_pMessage->GetMessageType() ) switch( m_pMessage->GetMessageType() )
{ {
case CITANetAudioProtocol::NP_CLIENT_WAITING_FOR_SAMPLES: case CITANetAudioProtocol::NP_CLIENT_WAITING_FOR_SAMPLES:
if( m_pInputStream ) for( int i = 0; i < m_pInputStream->GetNumberOfChannels(); i++ )
{ {
for( int i = 0; i < m_pInputStream->GetNumberOfChannels(); i++ ) ITAStreamInfo oStreamInfo;
{ const float* pfData = m_pInputStream->GetBlockPointer( i, &oStreamInfo );
ITAStreamInfo oStreamInfo; m_sfTempTransmitBuffer[ i ].write( pfData, m_pInputStream->GetBlocklength() );
const float* pfData = m_pInputStream->GetBlockPointer( i, &oStreamInfo );
m_sfTempTransmitBuffer[ i ].write( pfData, m_pInputStream->GetBlocklength() );
}
} }
//m_pMessage->WriteSampleFrame( &m_sfTempTransmitBuffer ); //m_pMessage->WriteSampleFrame( &m_sfTempTransmitBuffer );
m_pMessage->WriteAnswer(); m_pMessage->WriteAnswer();
break; break;
case CITANetAudioProtocol::NP_CLIENT_CLOSE:
m_pMessage->WriteAnswer();
m_pConnection = NULL;
StopGently( true );
Stop();
return false;
} }
return true; return true;
......
...@@ -10,7 +10,7 @@ using namespace std; ...@@ -10,7 +10,7 @@ using namespace std;
static string g_sServerName = "localhost"; static string g_sServerName = "localhost";
static int g_iServerPort = 12480; static int g_iServerPort = 12480;
static double g_dSampleRate = 44.1e3; static double g_dSampleRate = 44.1e3;
static int g_iBlockLength = 265; static int g_iBlockLength = 256;
int main( int , char** ) int main( int , char** )
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment