Commit bd3a1c22 authored by Dipl.-Ing. Jonas Stienen's avatar Dipl.-Ing. Jonas Stienen
Browse files

Compiling net message

parent 861fde06
......@@ -70,6 +70,7 @@ set( ITADataSourcesSources
"src/ITAFileDataSink.cpp"
"src/ITAFileDataSource.cpp"
"src/ITANetAudioStream.cpp"
"src/ITANetAudioProtocol.cpp"
"src/ITANetAudioSampleServer.cpp"
"src/ITAPeakDetector.cpp"
"src/ITARMSDetector.cpp"
......
......@@ -83,6 +83,10 @@ public:
int GetOutgoingMessageSize() const;
bool GetOutgoingMessageHasData() const;
void SetMessageType( int nType );
void SetAnswerType( int nType );
int GetMessageType() const;
int GetAnswerType() const;
// --= Serializing functions =--
......@@ -116,7 +120,7 @@ private:
int m_nMessageId;
int m_nAnswerType;
VistaByteBufferSerializer m_oOutgoing;
VistaByteBufferSerializer m_oIncoming;
VistaByteBufferDeSerializer m_oIncoming;
std::vector< VistaType::byte > m_vecIncomingBuffer;
VistaConnectionIP* m_pConnection;
......
#include <ITANetAudioStream.h>
#include <ITANetAudioProtocol.h>
// ITA includes
#include <ITAException.h>
#include <ITASampleFrame.h>
#include <ITAStringUtils.h>
// Vista includes
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaInterProcComm/IPNet/VistaTCPServer.h>
#include <VistaInterProcComm/IPNet/VistaTCPSocket.h>
//#include <VistaBase/VistaTimeUtils.h>
#include <VistaInterProcComm/IPNet/VistaIPAddress.h>
#include <VistaBase/VistaExceptionBase.h>
// STL
#include <cmath>
#include <cassert>
class CITANetAudioStreamConnection : public VistaThreadLoop
static int S_nMessageIds = 0;
CITANetAudioMessage::CITANetAudioMessage( VistaSerializingToolset::ByteOrderSwapBehavior bSwapBuffers )
: m_vecIncomingBuffer()
, m_oOutgoing()
, m_pConnection( NULL )
{
public:
enum MessageType
{
NET_MESSAGE_NONE = 0,
NET_MESSAGE_OPEN,
NET_MESSAGE_CLOSE,
NET_MESSAGE_SAMPLES,
};
inline CITANetAudioStreamConnection( CITANetAudioStream* pParent )
: m_pParent( pParent )
, m_pConnection( NULL )
, m_bStopIndicated( false )
{
};
m_oOutgoing.SetByteorderSwapFlag( bSwapBuffers );
m_oIncoming.SetByteorderSwapFlag( bSwapBuffers );
ResetMessage();
}
inline bool Connect( const std::string& sAddress, int iPort )
void CITANetAudioMessage::ResetMessage()
{
if( m_oIncoming.GetTailSize() > 0 )
{
if( m_pConnection )
ITA_EXCEPT1( MODAL_EXCEPTION, "This net stream is already connected" );
std::cerr << "CITANetAudioMessage::ResetMessage() called before message was fully processed!" << std::endl;
}
// wait till sending is complete -> this prevents us
// from deleting the buffer while it is still being read
// by the connection
if( m_pConnection )
m_pConnection->WaitForSendFinish();
m_nMessageId = S_nMessageIds++;
m_oOutgoing.ClearBuffer();
m_oOutgoing.WriteInt32( 0 ); // size dummy
m_oOutgoing.WriteInt32( 0 ); // type dummy
m_oOutgoing.WriteInt32( 0 ); // exceptmode dummy
m_oOutgoing.WriteInt32( 0 ); // ID
m_oIncoming.SetBuffer( NULL, 0 );
m_nMessageType = CITANetAudioProtocol::NP_INVALID;
m_nAnswerType = CITANetAudioProtocol::NP_INVALID;
m_pConnection = NULL;
}
void CITANetAudioMessage::SetConnection( VistaConnectionIP* pConn )
{
m_pConnection = pConn;
}
void CITANetAudioMessage::WriteMessage()
{
VistaType::byte* pBuffer = ( VistaType::byte* ) m_oOutgoing.GetBuffer();
VistaType::sint32 iSwapDummy;
// Attempt to connect and check parameters
m_pConnection = new VistaConnectionIP( VistaConnectionIP::CT_TCP, sAddress, iPort );
if( !m_pConnection->GetIsConnected() )
// rewrite size dummy
iSwapDummy = m_oOutgoing.GetBufferSize() - sizeof( VistaType::sint32 );
if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy );
memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
pBuffer += sizeof( VistaType::sint32 );
// rewrite type dummy
iSwapDummy = m_nMessageType;
if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy );
memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
pBuffer += sizeof( VistaType::sint32 );
// rewrite messageid dummy
iSwapDummy = m_nMessageId;
if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy );
memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
try
{
int iSize = m_oOutgoing.GetBufferSize();
int nRet = m_pConnection->WriteRawBuffer( m_oOutgoing.GetBuffer(), iSize );
if( nRet != m_oOutgoing.GetBufferSize() )
{
delete m_pConnection;
m_pConnection = NULL;
return false;
VistaExceptionBase ex( "ITANetAudioMessage::WriteMessage: Connection error", "CITANetAudioMessage", -1, -1 );
//throw( ex );
}
}
catch( VistaExceptionBase& ex )
{
std::string sExceptionText = ex.GetExceptionText();
std::cerr << sExceptionText << std::endl;
//ITA_EXCEPT1(UNKNOWN, sExceptionText.c_str());
}
}
int iMessageType = NET_MESSAGE_OPEN;
m_pConnection->Send( &iMessageType, sizeof( int ) );
int iNumChannels = ( int ) m_pParent->GetNumberOfChannels();
m_pConnection->Send( &iNumChannels, sizeof( int ) );
double dSampleRate = m_pParent->GetSampleRate();
m_pConnection->Send( &dSampleRate, sizeof( double ) );
int iBlockLength = ( int ) m_pParent->GetBlocklength();
m_pConnection->Send( &iBlockLength, sizeof( int ) );
int iRingBufferSize = ( int ) m_pParent->GetRingBufferSize();
m_pConnection->Send( &iRingBufferSize, sizeof( int ) );
m_pConnection->WaitForSendFinish();
int iServerMessageType;
m_pConnection->Receive( &iServerMessageType, sizeof( int ) );
void CITANetAudioMessage::ReadMessage()
{
try
{
VistaType::sint32 nMessageSize;
int nReturn = m_pConnection->ReadInt32( nMessageSize );
// we need at least the two protocol ints
assert( nMessageSize >= 3 * sizeof( VistaType::sint32 ) );
if( nMessageSize > ( int ) m_vecIncomingBuffer.size() )
m_vecIncomingBuffer.resize( nMessageSize );
Run();
};
nReturn = m_pConnection->ReadRawBuffer( &m_vecIncomingBuffer[ 0 ], nMessageSize );
if( nReturn != nMessageSize )
ITA_EXCEPT1( UNKNOWN, "Protokoll error, Received less bytes than expected" );
inline void Disconnect()
m_oIncoming.SetBuffer( &m_vecIncomingBuffer[ 0 ], nReturn );
// DEBUG: std::cout << "Remainign Size after Mesage Read: " << m_pConnection->PendingDataSize() << std::endl;
}
catch( VistaExceptionBase& ex )
{
ITA_EXCEPT1( UNKNOWN, ex.GetExceptionText() );
}
catch( ITAException& ex )
{
m_bStopIndicated = true;
StopGently( true );
ex;
}
m_nMessageType = ReadInt();
m_nMessageId = ReadInt();
}
void CITANetAudioMessage::WriteAnswer()
{
VistaType::byte* pBuffer = ( VistaType::byte* )m_oOutgoing.GetBuffer();
VistaType::sint32 iSwapDummy;
// rewrite size dummy
iSwapDummy = m_oOutgoing.GetBufferSize() - sizeof( VistaType::sint32 );
if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy );
memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
pBuffer += sizeof( VistaType::sint32 );
delete m_pConnection;
m_pConnection = NULL;
// rewrite type dummy
iSwapDummy = m_nAnswerType;
if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy );
memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
m_bStopIndicated = false;
};
pBuffer += sizeof( VistaType::sint32 );
// rewrite message dummy
iSwapDummy = m_nMessageId;
if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy );
memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
try {
int nRet = m_pConnection->WriteRawBuffer( m_oOutgoing.GetBuffer(), m_oOutgoing.GetBufferSize() );
if( nRet != m_oOutgoing.GetBufferSize() )
ITA_EXCEPT1( UNKNOWN, "Could not write the expected number of bytes" );
}
catch( VistaExceptionBase& ex ) {
ITA_EXCEPT1( UNKNOWN, ex.GetExceptionText() );
}
}
inline ~CITANetAudioStreamConnection()
void CITANetAudioMessage::ReadAnswer()
{
try
{
if( m_pConnection )
VistaType::sint32 nMessageSize;
int nReturn;
try
{
nReturn = m_pConnection->ReadInt32( nMessageSize );
}
catch( ... )
{
int iMessageType = NET_MESSAGE_CLOSE;
m_pConnection->Send( &iMessageType, sizeof( int ) );
nReturn = -1; // Network connection error
}
};
inline bool LoopBody()
if( nReturn != sizeof( VistaType::sint32 ) ) {
ITA_EXCEPT1( UNKNOWN, "Protokoll error, Received less bytes than expected" );
}
// we need at least the two protocol types
assert( nMessageSize >= 2 * sizeof( VistaType::sint32 ) );
if( nMessageSize > ( int ) m_vecIncomingBuffer.size() )
m_vecIncomingBuffer.resize( nMessageSize );
nReturn = m_pConnection->ReadRawBuffer( &m_vecIncomingBuffer[ 0 ], nMessageSize );
if( nReturn != nMessageSize )
ITA_EXCEPT1( UNKNOWN, "Protokoll error, Received less bytes than expected" );
m_oIncoming.SetBuffer( &m_vecIncomingBuffer[ 0 ], nReturn );
}
catch( VistaExceptionBase& ex )
{
// Probable connection loss
return;
ITA_EXCEPT1( UNKNOWN, ex.GetExceptionText() );
}
catch( ITAException& ex )
{
if( m_bStopIndicated )
return true;
std::string sErrorText = ex.ToString();
}
// Receive messages
while( true )
{
m_pConnection->Receive( NULL, 0 ); // @todo: receive messages and react
try
{
m_nAnswerType = ReadInt(); // TODO: assert weg, dafr Kontrolle falls Server crasht<
ReadInt(); // protocol overhead - just read and ignore
int nMessageID = ReadInt();
assert( nMessageID == m_nMessageId );
m_nMessageId = nMessageID;
}
catch( ITAException& ex )
{
std::cerr << "ITANetAudioMessage: Protocol error: " << ex << std::endl;
}
}
int iNumSamples = 12;
if( true )
m_pParent->Transmit( m_sfReceivingBuffer, iNumSamples );
}
};
private:
VistaConnectionIP* m_pConnection;
CITANetAudioStream* m_pParent;
ITASampleFrame m_sfReceivingBuffer;
bool m_bStopIndicated;
};
CITANetAudioStream::CITANetAudioStream( int iChannels, double dSamplingRate, int iBufferSize, int iRingBufferCapacity )
: m_sfOutputStreamBuffer( iChannels, iBufferSize, true )
, m_dSampleRate( dSamplingRate )
, m_sfRingBuffer( iChannels, iRingBufferCapacity, true )
int CITANetAudioMessage::GetMessageType() const
{
m_pNetAudioProducer = new CITANetAudioStreamConnection( this );
return m_nMessageType;
}
bool CITANetAudioStream::Connect( const std::string& sAddress, int iPort )
void CITANetAudioMessage::SetMessageType( int nType )
{
return m_pNetAudioProducer->Connect( sAddress, iPort );
assert( m_nMessageType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
m_nMessageType = nType;
}
CITANetAudioStream::~CITANetAudioStream()
void CITANetAudioMessage::SetAnswerType( int nType )
{
delete m_pNetAudioProducer;
assert( m_nAnswerType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
m_nAnswerType = nType;
}
const float* CITANetAudioStream::GetBlockPointer( unsigned int uiChannel, const ITAStreamInfo* )
int CITANetAudioMessage::GetIncomingMessageSize() const
{
// @todo: implement cyclic read from ring buffer
return m_sfOutputStreamBuffer[ uiChannel ].GetData();
return m_oIncoming.GetTailSize();
}
int CITANetAudioMessage::GetOutgoingMessageSize() const
{
return m_oOutgoing.GetBufferSize();
}
bool CITANetAudioMessage::GetOutgoingMessageHasData() const
{
return ( m_oOutgoing.GetBufferSize() > 4 * sizeof( VistaType::sint32 ) );
}
void CITANetAudioMessage::WriteString( const std::string& sValue )
{
m_oOutgoing.WriteInt32( ( VistaType::sint32 )sValue.size() );
if( !sValue.empty() ) m_oOutgoing.WriteString( sValue );
}
void CITANetAudioMessage::WriteInt( const int iValue )
{
m_oOutgoing.WriteInt32( ( VistaType::sint32 )iValue );
}
void CITANetAudioMessage::WriteBool( const bool bValue )
{
m_oOutgoing.WriteBool( bValue );
}
void CITANetAudioMessage::WriteFloat( const float fValue )
{
m_oOutgoing.WriteFloat32( fValue );
}
void CITANetAudioMessage::WriteDouble( const double dValue )
{
m_oOutgoing.WriteFloat64( dValue );
}
std::string CITANetAudioMessage::ReadString()
{
VistaType::sint32 nSize;
int nReturn = m_oIncoming.ReadInt32( nSize );
assert( nReturn == sizeof( VistaType::sint32 ) );
// Empty string?
if( nSize == 0 ) return "";
std::string sValue;
nReturn = m_oIncoming.ReadString( sValue, nSize );
assert( nReturn == nSize );
return sValue;
}
int CITANetAudioMessage::ReadInt()
{
VistaType::sint32 nValue;
int nReturn = m_oIncoming.ReadInt32( nValue );
if( nReturn == -1 )
ITA_EXCEPT1( UNKNOWN, "Could not read integer value from incoming message" );
assert( nReturn == sizeof( VistaType::sint32 ) );
return nValue;
}
void CITANetAudioStream::IncrementBlockPointer()
bool CITANetAudioMessage::ReadBool()
{
bool bValue;
int nReturn = m_oIncoming.ReadBool( bValue );
assert( nReturn == sizeof( bool ) );
return bValue;
}
float CITANetAudioMessage::ReadFloat()
{
// Increment read cursor by one audio block and wrap around if exceeding ring buffer
m_iReadCursor = ( m_iReadCursor + m_sfOutputStreamBuffer.GetLength() ) % m_sfRingBuffer.GetLength();
float fValue;
int nReturn = m_oIncoming.ReadFloat32( fValue );
assert( nReturn == sizeof( float ) );
return fValue;
}
double CITANetAudioMessage::ReadDouble()
{
double dValue;
int nReturn = m_oIncoming.ReadFloat64( dValue );
assert( nReturn == sizeof( double ) );
return dValue;
}
int CITANetAudioStream::Transmit( const ITASampleFrame& sfNewSamples, int iNumSamples )
void CITANetAudioMessage::WriteException( const ITAException& oException )
{
ITA_EXCEPT0( NOT_IMPLEMENTED );
WriteInt( oException.iErrorCode );
WriteString( oException.sModule );
WriteString( oException.sReason );
}
int CITANetAudioStream::GetRingBufferSize() const
ITAException CITANetAudioMessage::ReadException()
{
return m_sfRingBuffer.GetLength();
int iErrorCode = ReadInt();
std::string sModule = ReadString();
std::string sReason = ReadString();
return ITAException( iErrorCode, sModule, sReason );
}
unsigned int CITANetAudioStream::GetBlocklength() const
VistaConnectionIP* CITANetAudioMessage::GetConnection() const
{
return ( unsigned int ) m_sfOutputStreamBuffer.GetLength();
return m_pConnection;
}
unsigned int CITANetAudioStream::GetNumberOfChannels() const
void CITANetAudioMessage::ClearConnection() {
m_pConnection = NULL;
}
void CITANetAudioMessage::WriteIntVector( const std::vector<int> viData )
{
return ( unsigned int ) m_sfOutputStreamBuffer.channels();
int iSize = ( int ) viData.size();
WriteInt( iSize );
for( int i = 0; i<iSize; i++ )
WriteInt( viData[ i ] );
}
double CITANetAudioStream::GetSampleRate() const
std::vector<int> CITANetAudioMessage::ReadIntVector()
{
return m_dSampleRate;
std::vector<int> viData;
int iSize = ReadInt();
for( int i = 0; i<iSize; i++ )
viData.push_back( ReadInt() );
return viData;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment