Commit f2afe5e6 authored by Dipl.-Ing. Jonas Stienen's avatar Dipl.-Ing. Jonas Stienen
Browse files

Splitting net audio code into further files (protocol, message, connection)

parent 433477b4
......@@ -48,6 +48,8 @@ set( ITADataSourcesHeader
"include/ITAFileDataSink.h"
"include/ITAFileDataSource.h"
"include/ITANetAudioStream.h"
"include/ITANetAudioConnection.h"
"include/ITANetAudioMessage.h"
"include/ITANetAudioProtocol.h"
"include/ITANetAudioSampleServer.h"
"include/ITAPeakDetector.h"
......@@ -70,6 +72,7 @@ set( ITADataSourcesSources
"src/ITAFileDataSink.cpp"
"src/ITAFileDataSource.cpp"
"src/ITANetAudioStream.cpp"
"src/ITANetAudioMessage.cpp"
"src/ITANetAudioProtocol.cpp"
"src/ITANetAudioSampleServer.cpp"
"src/ITAPeakDetector.cpp"
......
/*
* ----------------------------------------------------------------
*
* ITA core libs
* (c) Copyright Institute of Technical Acoustics (ITA)
* RWTH Aachen University, Germany, 2015-2016
*
* ----------------------------------------------------------------
* ____ __________ _______
* // / //__ ___/ // _ |
* // / // / // /_| |
* // / // / // ___ |
* //__/ //__/ //__/ |__|
*
* ----------------------------------------------------------------
*
*/
#ifndef INCLUDE_WATCHER_ITA_NET_AUDIO_CONNECTION
#define INCLUDE_WATCHER_ITA_NET_AUDIO_CONNECTION
#include <ITADataSourcesDefinitions.h>
#include <ITADataSource.h>
#include <ITASampleFrame.h>
#include <string>
#include <vector>
class CITANetAudioStreamConnection;
//! Network audio stream
/**
* Audio streaming for a signal source that is connected via TCP/IP.
*
* \note not thread-safe
*/
class ITA_DATA_SOURCES_API CITANetAudioStream : public ITADatasource
{
public:
CITANetAudioStream( int iChannels, double dSamplingRate, int iBufferSize, int iRingBufferCapacity );
virtual ~CITANetAudioStream();
bool Connect( const std::string& sAddress, int iPort );
bool GetIsConnected() const;
int GetRingBufferSize() const;
unsigned int GetBlocklength() const;
unsigned int GetNumberOfChannels() const;
double GetSampleRate() const;
const float* GetBlockPointer( unsigned int uiChannel, const ITAStreamInfo* );
void IncrementBlockPointer();
protected:
int Transmit( const ITASampleFrame& sfNewSamples, int iNumSamples );
private:
CITANetAudioStreamConnection* m_pNetAudioProducer;
double m_dSampleRate;
ITASampleFrame m_sfOutputStreamBuffer;
int m_iReadCursor; //!< Cursor where samples will be consumed from ring buffer on next block
int m_iWriteCursor; //!< Cursor where samples will feeded into ring buffer from net audio producer
ITASampleFrame m_sfRingBuffer;
friend class CITANetAudioStreamConnection;
};
#endif // INCLUDE_WATCHER_ITA_NET_AUDIO_CONNECTION
/*
* ----------------------------------------------------------------
*
* ITA core libs
* (c) Copyright Institute of Technical Acoustics (ITA)
* RWTH Aachen University, Germany, 2015-2016
*
* ----------------------------------------------------------------
* ____ __________ _______
* // / //__ ___/ // _ |
* // / // / // /_| |
* // / // / // ___ |
* //__/ //__/ //__/ |__|
*
* ----------------------------------------------------------------
*
*/
#ifndef INCLUDE_WATCHER_ITA_NET_AUDIO_MESSAGE
#define INCLUDE_WATCHER_ITA_NET_AUDIO_MESSAGE
#include <ITADataSourcesDefinitions.h>
// ITA includes
#include <ITAException.h>
#include <ITASampleBuffer.h>
#include <ITASampleFrame.h>
// Vista includes
#include <VistaInterProcComm/Connections/VistaByteBufferSerializer.h>
#include <VistaInterProcComm/Connections/VistaByteBufferDeSerializer.h>
// STL includes
#include <string>
#include <vector>
class VistaConnectionIP;
//! Network audio messages
/*
* Messages consist of a message part and an answer part, each read or written
* separately. Messages have a two-int-header (SIZE, MSGTYPE), and
* answers have a two-int header (SIZE; ANSWERTYPE)
*/
class ITA_DATA_SOURCES_API CITANetAudioMessage
{
public:
CITANetAudioMessage( VistaSerializingToolset::ByteOrderSwapBehavior bSwapBuffers );
void SetConnection( VistaConnectionIP* );
VistaConnectionIP* GetConnection() const;
void ClearConnection();
void WriteMessage();
void ReadMessage();
void WriteAnswer();
void ReadAnswer();
void ResetMessage();
int GetIncomingMessageSize() const;
int GetOutgoingMessageSize() const;
bool GetOutgoingMessageHasData() const;
void SetMessageType( int nType );
void SetAnswerType( int nType );
int GetMessageType() const;
int GetAnswerType() const;
void WriteInt( const int );
void WriteBool( const bool );
void WriteDouble( const double );
void WriteException( const ITAException& );
void WriteFloat( const float );
void WriteString( const std::string& );
void WriteIntVector( const std::vector< int > );
void WriteFloatVector( const std::vector< float > );
std::string ReadString();
int ReadInt();
bool ReadBool();
ITAException ReadException();
float ReadFloat();
double ReadDouble();
std::vector< int > ReadIntVector();
std::vector< float > ReadFloatVector();
private:
int m_nMessageType;
int m_nMessageId;
int m_nAnswerType;
VistaByteBufferSerializer m_oOutgoing;
VistaByteBufferDeSerializer m_oIncoming;
std::vector< VistaType::byte > m_vecIncomingBuffer;
VistaConnectionIP* m_pConnection;
};
#endif // INCLUDE_WATCHER_ITA_NET_AUDIO_MESSAGE
......@@ -37,6 +37,8 @@
// Forward declarations
class VistaConnectionIP;
class CITANetAudioSampleServer;
class CITANetAudioStream;
//! Network audio protocol
/**
......@@ -60,80 +62,6 @@ public:
CITANetAudioProtocol();
virtual ~CITANetAudioProtocol();
// Connection
bool InitializeAsServer( CRavenNetServerImpl* pServer );
bool InitializeAsClient( CRavenNetClientImpl* pServer, VistaConnectionIP* pCommandChannel, VistaConnectionIP* pHeadChannel, int iExceptionhandlingmode );
};
/** Network audio messages
*
* Messages consist of a message part and an answer part, each read or written
* separately. Messages have a two-int-header (SIZE, MSGTYPE), and
* answers have a two-int header (SIZE; ANSWERTYPE)
*/
class ITA_DATA_SOURCES_API CITANetAudioMessage
{
public:
CITANetAudioMessage( VistaSerializingToolset::ByteOrderSwapBehavior bSwapBuffers );
void ResetMessage();
void SetConnection( VistaConnectionIP* );
void WriteMessage();
void ReadMessage();
void WriteAnswer();
void ReadAnswer();
int GetIncomingMessageSize() const;
int GetOutgoingMessageSize() const;
bool GetOutgoingMessageHasData() const;
void SetMessageType( int nType );
void SetAnswerType( int nType );
int GetMessageType() const;
int GetAnswerType() const;
// --= Serializing functions =--
// Basics
void WriteInt( const int );
void WriteBool( const bool );
void WriteDouble( const double );
void WriteException( const ITAException& );
void WriteFloat( const float );
void WriteString( const std::string& );
void WriteIntVector( const std::vector< int > );
void WriteFloatVector( const std::vector< float > );
// --= Reader =--
std::string ReadString();
int ReadInt();
bool ReadBool();
ITAException ReadException();
float ReadFloat();
double ReadDouble();
std::vector< int > ReadIntVector();
std::vector< float > ReadFloatVector();
VistaConnectionIP* GetConnection() const;
void ClearConnection();
private:
int m_nMessageType;
int m_nMessageId;
int m_nAnswerType;
VistaByteBufferSerializer m_oOutgoing;
VistaByteBufferDeSerializer m_oIncoming;
std::vector< VistaType::byte > m_vecIncomingBuffer;
VistaConnectionIP* m_pConnection;
};
#endif // INCLUDE_WATCHER_ITA_NET_AUDIO_PROTOCOL
......@@ -42,9 +42,7 @@ public:
virtual ~CITANetAudioStream();
bool Connect( const std::string& sAddress, int iPort );
bool IsConnected() const;
std::string GetNetworkAddress() const;
int GetNetworkPort() const;
bool GetIsConnected() const;
int GetRingBufferSize() const;
......
#include <ITANetAudioMessage.h>
#include <ITANetAudioProtocol.h>
#include <ITAStringUtils.h>
#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaBase/VistaExceptionBase.h>
#include <cassert>
static int S_nMessageIds = 0;
CITANetAudioMessage::CITANetAudioMessage( VistaSerializingToolset::ByteOrderSwapBehavior bSwapBuffers )
: m_vecIncomingBuffer()
, m_oOutgoing()
, m_pConnection( NULL )
{
m_oOutgoing.SetByteorderSwapFlag( bSwapBuffers );
m_oIncoming.SetByteorderSwapFlag( bSwapBuffers );
ResetMessage();
}
void CITANetAudioMessage::ResetMessage()
{
if( m_oIncoming.GetTailSize() > 0 )
{
std::cerr << "CITANetAudioMessage::ResetMessage() called before message was fully processed!" << std::endl;
}
// wait till sending is complete -> this prevents us
// from deleting the buffer while it is still being read
// by the connection
if( m_pConnection )
m_pConnection->WaitForSendFinish();
m_nMessageId = S_nMessageIds++;
m_oOutgoing.ClearBuffer();
m_oOutgoing.WriteInt32( 0 ); // size dummy
m_oOutgoing.WriteInt32( 0 ); // type dummy
m_oOutgoing.WriteInt32( 0 ); // exceptmode dummy
m_oOutgoing.WriteInt32( 0 ); // ID
m_oIncoming.SetBuffer( NULL, 0 );
m_nMessageType = CITANetAudioProtocol::NP_INVALID;
m_nAnswerType = CITANetAudioProtocol::NP_INVALID;
m_pConnection = NULL;
}
void CITANetAudioMessage::SetConnection( VistaConnectionIP* pConn )
{
m_pConnection = pConn;
}
void CITANetAudioMessage::WriteMessage()
{
VistaType::byte* pBuffer = ( VistaType::byte* ) m_oOutgoing.GetBuffer();
VistaType::sint32 iSwapDummy;
// rewrite size dummy
iSwapDummy = m_oOutgoing.GetBufferSize() - sizeof( VistaType::sint32 );
if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy );
memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
pBuffer += sizeof( VistaType::sint32 );
// rewrite type dummy
iSwapDummy = m_nMessageType;
if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy );
memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
pBuffer += sizeof( VistaType::sint32 );
// rewrite messageid dummy
iSwapDummy = m_nMessageId;
if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy );
memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
try
{
int iSize = m_oOutgoing.GetBufferSize();
int nRet = m_pConnection->WriteRawBuffer( m_oOutgoing.GetBuffer(), iSize );
if( nRet != m_oOutgoing.GetBufferSize() )
{
VistaExceptionBase ex( "ITANetAudioMessage::WriteMessage: Connection error", "CITANetAudioMessage", -1, -1 );
//throw( ex );
}
}
catch( VistaExceptionBase& ex )
{
std::string sExceptionText = ex.GetExceptionText();
std::cerr << sExceptionText << std::endl;
//ITA_EXCEPT1(UNKNOWN, sExceptionText.c_str());
}
}
void CITANetAudioMessage::ReadMessage()
{
try
{
VistaType::sint32 nMessageSize;
int nReturn = m_pConnection->ReadInt32( nMessageSize );
// we need at least the two protocol ints
assert( nMessageSize >= 3 * sizeof( VistaType::sint32 ) );
if( nMessageSize > ( int ) m_vecIncomingBuffer.size() )
m_vecIncomingBuffer.resize( nMessageSize );
nReturn = m_pConnection->ReadRawBuffer( &m_vecIncomingBuffer[ 0 ], nMessageSize );
if( nReturn != nMessageSize )
ITA_EXCEPT1( UNKNOWN, "Protokoll error, Received less bytes than expected" );
m_oIncoming.SetBuffer( &m_vecIncomingBuffer[ 0 ], nReturn );
// DEBUG: std::cout << "Remainign Size after Mesage Read: " << m_pConnection->PendingDataSize() << std::endl;
}
catch( VistaExceptionBase& ex )
{
ITA_EXCEPT1( UNKNOWN, ex.GetExceptionText() );
}
catch( ITAException& ex )
{
ex;
}
m_nMessageType = ReadInt();
m_nMessageId = ReadInt();
}
void CITANetAudioMessage::WriteAnswer()
{
VistaType::byte* pBuffer = ( VistaType::byte* )m_oOutgoing.GetBuffer();
VistaType::sint32 iSwapDummy;
// rewrite size dummy
iSwapDummy = m_oOutgoing.GetBufferSize() - sizeof( VistaType::sint32 );
if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy );
memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
pBuffer += sizeof( VistaType::sint32 );
// rewrite type dummy
iSwapDummy = m_nAnswerType;
if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy );
memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
pBuffer += sizeof( VistaType::sint32 );
// rewrite message dummy
iSwapDummy = m_nMessageId;
if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy );
memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
try {
int nRet = m_pConnection->WriteRawBuffer( m_oOutgoing.GetBuffer(), m_oOutgoing.GetBufferSize() );
if( nRet != m_oOutgoing.GetBufferSize() )
ITA_EXCEPT1( UNKNOWN, "Could not write the expected number of bytes" );
}
catch( VistaExceptionBase& ex ) {
ITA_EXCEPT1( UNKNOWN, ex.GetExceptionText() );
}
}
void CITANetAudioMessage::ReadAnswer()
{
try
{
VistaType::sint32 nMessageSize;
int nReturn;
try
{
nReturn = m_pConnection->ReadInt32( nMessageSize );
}
catch( ... )
{
nReturn = -1; // Network connection error
}
if( nReturn != sizeof( VistaType::sint32 ) ) {
ITA_EXCEPT1( UNKNOWN, "Protokoll error, Received less bytes than expected" );
}
// we need at least the two protocol types
assert( nMessageSize >= 2 * sizeof( VistaType::sint32 ) );
if( nMessageSize > ( int ) m_vecIncomingBuffer.size() )
m_vecIncomingBuffer.resize( nMessageSize );
nReturn = m_pConnection->ReadRawBuffer( &m_vecIncomingBuffer[ 0 ], nMessageSize );
if( nReturn != nMessageSize )
ITA_EXCEPT1( UNKNOWN, "Protokoll error, Received less bytes than expected" );
m_oIncoming.SetBuffer( &m_vecIncomingBuffer[ 0 ], nReturn );
}
catch( VistaExceptionBase& ex )
{
// Probable connection loss
return;
ITA_EXCEPT1( UNKNOWN, ex.GetExceptionText() );
}
catch( ITAException& ex )
{
std::string sErrorText = ex.ToString();
}
try
{
m_nAnswerType = ReadInt(); // TODO: assert weg, dafr Kontrolle falls Server crasht<
ReadInt(); // protocol overhead - just read and ignore
int nMessageID = ReadInt();
assert( nMessageID == m_nMessageId );
m_nMessageId = nMessageID;
}
catch( ITAException& ex )
{
std::cerr << "ITANetAudioMessage: Protocol error: " << ex << std::endl;
}
}
int CITANetAudioMessage::GetMessageType() const
{
return m_nMessageType;
}
void CITANetAudioMessage::SetMessageType( int nType )
{
assert( m_nMessageType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
m_nMessageType = nType;
}
void CITANetAudioMessage::SetAnswerType( int nType )
{
assert( m_nAnswerType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
m_nAnswerType = nType;
}
int CITANetAudioMessage::GetIncomingMessageSize() const
{
return m_oIncoming.GetTailSize();
}
int CITANetAudioMessage::GetOutgoingMessageSize() const
{
return m_oOutgoing.GetBufferSize();
}
bool CITANetAudioMessage::GetOutgoingMessageHasData() const
{
return ( m_oOutgoing.GetBufferSize() > 4 * sizeof( VistaType::sint32 ) );
}
void CITANetAudioMessage::WriteString( const std::string& sValue )
{
m_oOutgoing.WriteInt32( ( VistaType::sint32 )sValue.size() );
if( !sValue.empty() ) m_oOutgoing.WriteString( sValue );
}
void CITANetAudioMessage::WriteInt( const int iValue )
{
m_oOutgoing.WriteInt32( ( VistaType::sint32 )iValue );
}
void CITANetAudioMessage::WriteBool( const bool bValue )
{
m_oOutgoing.WriteBool( bValue );
}
void CITANetAudioMessage::WriteFloat( const float fValue )
{
m_oOutgoing.WriteFloat32( fValue );
}
void CITANetAudioMessage::WriteDouble( const double dValue )
{
m_oOutgoing.WriteFloat64( dValue );
}
std::string CITANetAudioMessage::ReadString()
{
VistaType::sint32 nSize;
int nReturn = m_oIncoming.ReadInt32( nSize );
assert( nReturn == sizeof( VistaType::sint32 ) );
// Empty string?
if( nSize == 0 ) return "";
std::string sValue;
nReturn = m_oIncoming.ReadString( sValue, nSize );
assert( nReturn == nSize );
return sValue;
}
int CITANetAudioMessage::ReadInt()
{
VistaType::sint32 nValue;
int nReturn = m_oIncoming.ReadInt32( nValue );
if( nReturn == -1 )
ITA_EXCEPT1( UNKNOWN, "Could not read integer value from incoming message" );
assert( nReturn == sizeof( VistaType::sint32 ) );
return nValue;
}
bool CITANetAudioMessage::ReadBool()
{
bool bValue;
int nReturn = m_oIncoming.ReadBool( bValue );
assert( nReturn == sizeof( bool ) );
return bValue;
}
float CITANetAudioMessage::ReadFloat()
{
float fValue;
int nReturn = m_oIncoming.ReadFloat32( fValue );
assert( nReturn == sizeof( float ) );
return fValue;
}
double CITANetAudioMessage::ReadDouble()
{
double dValue;
int nReturn = m_oIncoming.ReadFloat64( dValue );
assert( nReturn == sizeof( double ) );