Commit 8abdf74e authored by Anne's avatar Anne

protocol 2.0 not running

parent 39a0c4ef
......@@ -55,9 +55,8 @@ public:
void ClearConnection();
void WriteMessage();
void ReadMessage();
void WriteAnswer();
void ReadAnswer();
// Returns false if no incomming data
bool ReadMessage();
void ResetMessage();
......@@ -66,9 +65,7 @@ public:
bool GetOutgoingMessageHasData() const;
void SetMessageType( int nType );
void SetAnswerType( int nType );
int GetMessageType() const;
int GetAnswerType() const;
void WriteInt( const int );
......@@ -100,7 +97,6 @@ public:
private:
int m_nMessageType;
int m_nMessageId;
int m_nAnswerType;
VistaByteBufferSerializer m_oOutgoing; //!< Serialization buffer for messages
VistaByteBufferDeSerializer m_oIncoming; //!< Deserialization buffer for messages
std::vector< VistaType::byte > m_vecIncomingBuffer; // Net IO buffer
......
......@@ -51,6 +51,7 @@ public:
static const int NP_INVALID = -1;
static const int NP_GET_VERSION_INFO = 1;
static const int NP_NO_MESSAGE = 2;
static const int NP_CLIENT_OPEN = 100;
static const int NP_CLIENT_CLOSE = 101;
......
......@@ -3,7 +3,8 @@
#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaBase/VistaExceptionBase.h>
#include <VistaBase/VistaStreamUtils.h>
#include <VistaBase/VistaStreamUtils.h>
#include <ITAClock.h>
#include <cstring>
#include <algorithm>
......@@ -44,7 +45,6 @@ void CITANetAudioMessage::ResetMessage()
m_oIncoming.SetBuffer( NULL, 0 );
m_nMessageType = CITANetAudioProtocol::NP_INVALID;
m_nAnswerType = CITANetAudioProtocol::NP_INVALID;
m_pConnection = NULL;
......@@ -110,13 +110,22 @@ void CITANetAudioMessage::WriteMessage()
}
void CITANetAudioMessage::ReadMessage()
bool CITANetAudioMessage::ReadMessage()
{
#if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "CITANetAudioMessage [ Reading ] Waiting for incoming data" << std::endl;
#endif
long nIncomingBytes = m_pConnection->WaitForIncomingData( 0 );
assert( nIncomingBytes >= 4 ); // we need at least the size of message
double dTimeBefore = ITAClock::getDefaultClock( )->getTime( );
// WaitForIncomming Data int in ca ms
long nIncomingBytes = m_pConnection->WaitForIncomingData( 1 );
double dTimeAfter = ITAClock::getDefaultClock( )->getTime( );
double DTimeDiff = dTimeAfter - dTimeBefore;
// TODO Timer entfernen
if ( nIncomingBytes < 4 )
{
m_nMessageType = CITANetAudioProtocol::NP_NO_MESSAGE;
return false;
}
#if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "CITANetAudioMessage [ Reading ] " << nIncomingBytes << " bytes incoming" << std::endl;
#endif
......@@ -155,93 +164,6 @@ void CITANetAudioMessage::ReadMessage()
#endif
}
void CITANetAudioMessage::WriteAnswer()
{
#if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "CITANetAudioMessage [ Answering] to " << m_nMessageType << " with " << m_nAnswerType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
#endif
assert( m_nAnswerType != CITANetAudioProtocol::NP_INVALID );
VistaType::byte* pBuffer = ( VistaType::byte* )m_oOutgoing.GetBuffer();
VistaType::sint32 iSwapDummy;
// rewrite size dummy
iSwapDummy = m_oOutgoing.GetBufferSize() - sizeof( VistaType::sint32 );
if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy );
memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
pBuffer += sizeof( VistaType::sint32 );
// rewrite type dummy
iSwapDummy = m_nAnswerType;
if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy );
memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
pBuffer += sizeof( VistaType::sint32 );
// rewrite message dummy
iSwapDummy = m_nMessageId;
if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy );
memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
int nRet = m_pConnection->Send( m_oOutgoing.GetBuffer(), m_oOutgoing.GetBufferSize() );
m_pConnection->WaitForSendFinish();
if( nRet != m_oOutgoing.GetBufferSize() )
ITA_EXCEPT1( UNKNOWN, "Could not write the expected number of bytes" );
}
void CITANetAudioMessage::ReadAnswer()
{
#if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "CITANetAudioMessage [ Reading] yet unkown answer from initial message type " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ") OK" << std::endl;
#endif
VistaType::sint32 nMessagePayloadSize;
int nReturn;
nReturn = m_pConnection->ReadInt32( nMessagePayloadSize );
assert( nReturn == sizeof( VistaType::sint32 ) );
#if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "CITANetAudioMessage [ Reading] Answer type " << nReturn << " (id=" << std::setw( 4 ) << m_nMessageId << ") OK" << std::endl;
#endif
// We need at least the message type and message id in payload
assert( nMessagePayloadSize >= 2 * sizeof( VistaType::sint32 ) );
if( nMessagePayloadSize > ( int ) m_vecIncomingBuffer.size() )
m_vecIncomingBuffer.resize( nMessagePayloadSize );
// @todo: read over while( received < total ) loop!!!
int iBytesReceivedTotal = 0;
while( nMessagePayloadSize != iBytesReceivedTotal )
{
int iIncommingBytes = m_pConnection->WaitForIncomingData( 0 );
int iBytesReceived = m_pConnection->Receive( &m_vecIncomingBuffer[ iBytesReceivedTotal ], iIncommingBytes );
iBytesReceivedTotal += iBytesReceived;
#if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "[ CITANetAudioMessage ] " << std::setw( 3 ) << std::floor( iBytesReceivedTotal / float( nMessagePayloadSize ) * 100.0f ) << "% of answer transmitted" << std::endl;
#endif
}
if( iBytesReceivedTotal != nMessagePayloadSize )
ITA_EXCEPT1( UNKNOWN, "Protokoll error, Received less bytes than expected when trying to receive answer" );
// Swap data to deserialization buffer
m_oIncoming.SetBuffer( &m_vecIncomingBuffer[ 0 ], nMessagePayloadSize, false );
// Take out the two protocol variables type and message id from deserialization buffer
m_nAnswerType = ReadInt();
int nMessageID = ReadInt();
assert( nMessageID == m_nMessageId );
}
int CITANetAudioMessage::GetMessageType() const
{
return m_nMessageType;
......@@ -249,21 +171,10 @@ int CITANetAudioMessage::GetMessageType() const
void CITANetAudioMessage::SetMessageType( int nType )
{
assert( m_nMessageType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
//assert( m_nMessageType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
m_nMessageType = nType;
}
void CITANetAudioMessage::SetAnswerType( int nType )
{
assert( m_nAnswerType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
m_nAnswerType = nType;
}
int CITANetAudioMessage::GetAnswerType() const
{
return m_nAnswerType;
}
int CITANetAudioMessage::GetIncomingMessageSize() const
{
return m_oIncoming.GetTailSize();
......
......@@ -101,8 +101,9 @@ bool CITANetAudioStreamingClient::Connect( const std::string& sAddress, int iPor
m_pMessage->WriteStreamingParameters( m_oParams );
m_pMessage->WriteMessage();
m_pMessage->ReadAnswer();
assert( m_pMessage->GetAnswerType() == CITANetAudioProtocol::NP_SERVER_OPEN );
while ( !m_pMessage->ReadMessage( ) );
assert( m_pMessage->GetMessageType( ) == CITANetAudioProtocol::NP_SERVER_OPEN );
bool bOK = m_pMessage->ReadBool();
if( !bOK )
......@@ -132,43 +133,45 @@ bool CITANetAudioStreamingClient::LoopBody()
m_pMessage->WriteInt( iFreeSamplesUntilAllowedReached );
m_pMessage->WriteMessage();
// Wait for answer of server
m_pMessage->ReadAnswer();
int iAnswerType = m_pMessage->GetAnswerType();
switch( iAnswerType )
// Read answer
if ( m_pMessage->ReadMessage( ) )
{
int iAnswerType = m_pMessage->GetMessageType( );
switch ( iAnswerType )
{
case CITANetAudioProtocol::NP_INVALID:
// Something went wrong
vstr::err() << "Received invalid message type" << std::endl;
break;
case CITANetAudioProtocol::NP_INVALID:
// Something went wrong
vstr::err( ) << "Received invalid message type" << std::endl;
break;
case CITANetAudioProtocol::NP_SERVER_CLOSE:
Disconnect();
break;
case CITANetAudioProtocol::NP_SERVER_CLOSE:
Disconnect( );
break;
case CITANetAudioProtocol::NP_SERVER_WAITING_FOR_TRIGGER:
// Wait until block increment is triggered by audio context (more free samples in ring buffer)
m_oBlockIncrementEvent.WaitForEvent( true );
break;
case CITANetAudioProtocol::NP_SERVER_WAITING_FOR_TRIGGER:
// Wait until block increment is triggered by audio context (more free samples in ring buffer)
m_oBlockIncrementEvent.WaitForEvent( true );
break;
case CITANetAudioProtocol::NP_SERVER_SEND_SAMPLES:
// Receive samples from net message and forward them to the stream ring buffer
case CITANetAudioProtocol::NP_SERVER_SEND_SAMPLES:
// Receive samples from net message and forward them to the stream ring buffer
m_pMessage->ReadSampleFrame( &m_sfReceivingBuffer );
if ( m_pStream->GetRingBufferFreeSamples( ) >= m_sfReceivingBuffer.GetLength( ) )
m_pStream->Transmit( m_sfReceivingBuffer, m_sfReceivingBuffer.GetLength( ) );
//else
m_pMessage->ReadSampleFrame( &m_sfReceivingBuffer );
if ( m_pStream->GetRingBufferFreeSamples( ) >= m_sfReceivingBuffer.GetLength( ) )
m_pStream->Transmit( m_sfReceivingBuffer, m_sfReceivingBuffer.GetLength( ) );
//else
// Fehler
break;
case CITANetAudioProtocol::NP_SERVER_GET_RINGBUFFER_FREE :
break;
break;
case CITANetAudioProtocol::NP_SERVER_GET_RINGBUFFER_FREE:
break;
}
oLog.iChannel = m_pStream->GetNumberOfChannels( );
oLog.iProtocolStatus = iAnswerType;
oLog.dWorldTimeStamp = ITAClock::getDefaultClock( )->getTime( );
m_pClientLogger->log( oLog );
}
oLog.iChannel = m_pStream->GetNumberOfChannels();
oLog.iProtocolStatus = iAnswerType;
oLog.dWorldTimeStamp = ITAClock::getDefaultClock( )->getTime( );
m_pClientLogger->log( oLog );
return true;
}
......
......@@ -20,34 +20,34 @@
#include <cmath>
#include <cassert>
CITANetAudioStreamingServer::CITANetAudioStreamingServer()
: m_pInputStream( NULL )
, m_iUpdateStrategy( AUTO )
, m_pConnection( NULL )
, m_pNetAudioServer( new CITANetAudioServer() )
CITANetAudioStreamingServer::CITANetAudioStreamingServer( )
: m_pInputStream( NULL )
, m_iUpdateStrategy( AUTO )
, m_pConnection( NULL )
, m_pNetAudioServer( new CITANetAudioServer( ) )
{
}
bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort )
{
if( !m_pInputStream )
if ( !m_pInputStream )
ITA_EXCEPT1( MODAL_EXCEPTION, "Can not start server without a valid input stream" );
if( !m_pNetAudioServer->Start( sAddress, iPort ) ) // blocking
if ( !m_pNetAudioServer->Start( sAddress, iPort ) ) // blocking
return false;
m_pConnection = m_pNetAudioServer->GetConnection();
m_pConnection = m_pNetAudioServer->GetConnection( );
m_pMessage = new CITANetAudioMessage( m_pConnection->GetByteorderSwapFlag() );
m_pMessage->ResetMessage();
m_pMessage = new CITANetAudioMessage( m_pConnection->GetByteorderSwapFlag( ) );
m_pMessage->ResetMessage( );
m_pMessage->SetConnection( m_pConnection );
m_pMessage->ReadMessage(); // blocking
while ( !m_pMessage->ReadMessage( ) ); //blocking
assert( m_pMessage->GetMessageType() == CITANetAudioProtocol::NP_CLIENT_OPEN );
CITANetAudioProtocol::StreamingParameters oClientParams = m_pMessage->ReadStreamingParameters();
assert( m_pMessage->GetMessageType( ) == CITANetAudioProtocol::NP_CLIENT_OPEN );
CITANetAudioProtocol::StreamingParameters oClientParams = m_pMessage->ReadStreamingParameters( );
bool bOK = false;
if( m_oServerParams == oClientParams )
if ( m_oServerParams == oClientParams )
{
bOK = true;
#ifdef NET_AUDIO_SHOW_TRAFFIC
......@@ -59,81 +59,79 @@ bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort
#endif
}
m_pMessage->SetAnswerType( CITANetAudioProtocol::NP_SERVER_OPEN );
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_OPEN );
m_pMessage->WriteBool( bOK );
m_pMessage->WriteAnswer();
m_pMessage->WriteMessage( );
if( bOK )
Run();
if ( bOK )
Run( );
return bOK;
}
bool CITANetAudioStreamingServer::LoopBody()
bool CITANetAudioStreamingServer::LoopBody( )
{
m_pMessage->ResetMessage();
m_pMessage->ResetMessage( );
m_pMessage->SetConnection( m_pConnection );
m_pMessage->ReadMessage(); // blocking
m_pMessage->ReadMessage( );
int iMsgType = m_pMessage->GetMessageType();
switch( iMsgType )
if ( m_pMessage->ReadMessage( ) )
{
case CITANetAudioProtocol::NP_CLIENT_WAITING_FOR_SAMPLES:
{
int iFreeSamples = m_pMessage->ReadInt();
if( iFreeSamples >= int( m_pInputStream->GetBlocklength() ) )
int iMsgType = m_pMessage->GetMessageType( );
switch ( iMsgType )
{
// Send Samples
for( int i = 0; i < int( m_pInputStream->GetNumberOfChannels() ); i++ )
case CITANetAudioProtocol::NP_CLIENT_WAITING_FOR_SAMPLES:
{
ITAStreamInfo oStreamInfo;
oStreamInfo.nSamples = m_sfTempTransmitBuffer.GetLength();
const float* pfData = m_pInputStream->GetBlockPointer( i, &oStreamInfo );
if( pfData != 0 )
m_sfTempTransmitBuffer[ i ].write( pfData, m_sfTempTransmitBuffer.GetLength() );
}
m_pInputStream->IncrementBlockPointer();
m_pMessage->SetAnswerType( CITANetAudioProtocol::NP_SERVER_SEND_SAMPLES );
m_pMessage->WriteSampleFrame( &m_sfTempTransmitBuffer );
m_pMessage->WriteAnswer();
int iFreeSamples = m_pMessage->ReadInt( );
if ( iFreeSamples >= int( m_pInputStream->GetBlocklength( ) ) )
{
// Send Samples
for ( int i = 0; i < int( m_pInputStream->GetNumberOfChannels( ) ); i++ )
{
ITAStreamInfo oStreamInfo;
oStreamInfo.nSamples = m_sfTempTransmitBuffer.GetLength( );
const float* pfData = m_pInputStream->GetBlockPointer( i, &oStreamInfo );
if ( pfData != 0 )
m_sfTempTransmitBuffer[ i ].write( pfData, m_sfTempTransmitBuffer.GetLength( ) );
}
m_pInputStream->IncrementBlockPointer( );
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_SEND_SAMPLES );
m_pMessage->WriteSampleFrame( &m_sfTempTransmitBuffer );
m_pMessage->WriteMessage( );
#ifdef NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "[ITANetAudioStreamingServer] Transmitted "<< m_sfTempTransmitBuffer.GetLength() << " samples for "
<< m_pInputStream->GetNumberOfChannels() << " channels" << std::endl;
vstr::out() << "[ITANetAudioStreamingServer] Transmitted "<< m_sfTempTransmitBuffer.GetLength() << " samples for "
<< m_pInputStream->GetNumberOfChannels() << " channels" << std::endl;
#endif
}
else
{
// Waiting for Trigger
m_pMessage->SetAnswerType( CITANetAudioProtocol::NP_SERVER_WAITING_FOR_TRIGGER );
m_pMessage->WriteAnswer();
}
else
{
// Waiting for Trigger
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_WAITING_FOR_TRIGGER );
m_pMessage->WriteMessage( );
#ifdef NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "[ITANetAudioStreamingServer] Not enough free samples in client buffer, requesting a trigger when more free samples available" << std::endl;
vstr::out() << "[ITANetAudioStreamingServer] Not enough free samples in client buffer, requesting a trigger when more free samples available" << std::endl;
#endif
break;
}
break;
}
case CITANetAudioProtocol::NP_CLIENT_CLOSE:
{
//m_pMessage->SetAnswerType( CITANetAudioProtocol::NP_SERVER_CLOSE );
//m_pMessage->WriteAnswer();
StopGently( false );
//m_pConnection = NULL;
Stop( );
return false;
}
default:
{
vstr::out( ) << "[ITANetAudioStreamingServer] Unkown protocol type : " << iMsgType << std::endl;
break;
}
}
break;
}
case CITANetAudioProtocol::NP_CLIENT_CLOSE:
{
//m_pMessage->SetAnswerType( CITANetAudioProtocol::NP_SERVER_CLOSE );
//m_pMessage->WriteAnswer();
StopGently(false);
//m_pConnection = NULL;
Stop();
return false;
}
default:
{
vstr::out() << "[ITANetAudioStreamingServer] Unkown protocol type : " << iMsgType << std::endl;
break;
}
}
return true;
......@@ -141,52 +139,52 @@ bool CITANetAudioStreamingServer::LoopBody()
void CITANetAudioStreamingServer::SetInputStream( ITADatasource* pInStream )
{
if( VistaThreadLoop::IsRunning() )
if ( VistaThreadLoop::IsRunning( ) )
ITA_EXCEPT1( MODAL_EXCEPTION, "Streaming loop already running, can not change input stream" );
m_pInputStream = pInStream;
m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels(), m_pInputStream->GetBlocklength(), true );
m_oServerParams.dSampleRate = m_pInputStream->GetSampleRate();
m_oServerParams.iBlockSize = m_pInputStream->GetBlocklength();
m_oServerParams.iChannels = m_pInputStream->GetNumberOfChannels();
m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels( ), m_pInputStream->GetBlocklength( ), true );
m_oServerParams.dSampleRate = m_pInputStream->GetSampleRate( );
m_oServerParams.iBlockSize = m_pInputStream->GetBlocklength( );
m_oServerParams.iChannels = m_pInputStream->GetNumberOfChannels( );
}
ITADatasource* CITANetAudioStreamingServer::GetInputStream() const
ITADatasource* CITANetAudioStreamingServer::GetInputStream( ) const
{
return m_pInputStream;
}
int CITANetAudioStreamingServer::GetNetStreamBlocklength() const
int CITANetAudioStreamingServer::GetNetStreamBlocklength( ) const
{
return m_sfTempTransmitBuffer.GetLength();
return m_sfTempTransmitBuffer.GetLength( );
}
int CITANetAudioStreamingServer::GetNetStreamNumberOfChannels() const
int CITANetAudioStreamingServer::GetNetStreamNumberOfChannels( ) const
{
return m_sfTempTransmitBuffer.channels();
return m_sfTempTransmitBuffer.channels( );
}
void CITANetAudioStreamingServer::SetAutomaticUpdateRate()
void CITANetAudioStreamingServer::SetAutomaticUpdateRate( )
{
m_iUpdateStrategy = AUTO;
}
bool CITANetAudioStreamingServer::IsClientConnected() const
bool CITANetAudioStreamingServer::IsClientConnected( ) const
{
return m_pNetAudioServer->IsConnected();
return m_pNetAudioServer->IsConnected( );
}
std::string CITANetAudioStreamingServer::GetNetworkAddress() const
std::string CITANetAudioStreamingServer::GetNetworkAddress( ) const
{
return m_pNetAudioServer->GetServerAddress();
return m_pNetAudioServer->GetServerAddress( );
}
int CITANetAudioStreamingServer::GetNetworkPort() const
int CITANetAudioStreamingServer::GetNetworkPort( ) const
{
return m_pNetAudioServer->GetNetworkPort();
return m_pNetAudioServer->GetNetworkPort( );
}
void CITANetAudioStreamingServer::Stop()
void CITANetAudioStreamingServer::Stop( )
{
m_pNetAudioServer->Stop();
m_pNetAudioServer->Stop( );
}
......@@ -22,7 +22,7 @@ const static string g_sInputFilePath = "gershwin-mono.wav";
const static int g_iServerPort = 12480;
const static double g_dSampleRate = 44100;
const static int g_iBlockLength = 1024;
const static int g_iChannels = 257;
const static int g_iChannels = 100;
class CServer : public VistaThread
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment