ITANetAudioMessage.cpp 10.4 KB
Newer Older
Anne's avatar
Anne committed
1
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
2 3 4 5
#include <ITAStringUtils.h>

#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaBase/VistaExceptionBase.h>
6
#include <VistaBase/VistaStreamUtils.h>
7 8 9

#include <cstring>
#include <algorithm>
Anne's avatar
Anne committed
10
#include <cassert>
11 12
#include <iostream>
#include <iomanip>
Anne's avatar
Anne committed
13 14 15

static int S_nMessageIds = 0;

16
CITANetAudioMessage::CITANetAudioMessage( VistaConnectionIP* pConnection )
17 18
	: m_vecIncomingBuffer( 2048 )
	, m_oOutgoing( 2048 )
19 20
	, m_pConnection( pConnection )
	, m_nTimeoutMilliseconds( 1 )
Anne's avatar
Anne committed
21
{
22 23
	m_oOutgoing.SetByteorderSwapFlag( pConnection->GetByteorderSwapFlag() );
	m_oIncoming.SetByteorderSwapFlag( pConnection->GetByteorderSwapFlag() );
Anne's avatar
Anne committed
24 25 26 27 28
	ResetMessage();
}

void CITANetAudioMessage::ResetMessage()
{
29
	if( m_oIncoming.GetTailSize() > 0 )
30
		vstr::err() << "CITANetAudioMessage::ResetMessage() called before message was fully processed!" << std::endl;
Anne's avatar
Anne committed
31 32 33 34

	// wait till sending is complete -> this prevents us
	// from deleting the buffer while it is still being read
	// by the connection
35 36
	if( m_pConnection )
		m_pConnection->WaitForSendFinish();
Anne's avatar
Anne committed
37 38 39 40

	m_nMessageId = S_nMessageIds++;

	m_oOutgoing.ClearBuffer();
41 42 43
	m_oOutgoing.WriteInt32( 0 ); // Payload size
	m_oOutgoing.WriteInt32( 0 ); // Message Type
	m_oOutgoing.WriteInt32( 0 ); // Identifier
Anne's avatar
Anne committed
44

45
	m_oIncoming.SetBuffer( NULL, 0 );
Anne's avatar
Anne committed
46 47

	m_nMessageType = CITANetAudioProtocol::NP_INVALID;
48

Anne's avatar
Anne committed
49
#if NET_AUDIO_SHOW_TRAFFIC
50
	vstr::out() << "CITANetAudioMessage [Preparing] (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
51
#endif
Anne's avatar
Anne committed
52 53 54 55
}

void CITANetAudioMessage::WriteMessage()
{
56
	VistaType::byte* pBuffer = ( VistaType::byte* ) m_oOutgoing.GetBuffer();
Anne's avatar
Anne committed
57 58 59
	VistaType::sint32 iSwapDummy;

	// rewrite size dummy
60 61 62
	iSwapDummy = m_oOutgoing.GetBufferSize() - sizeof( VistaType::sint32 );
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
63
	std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
64

65
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
66 67 68

	// rewrite type dummy
	iSwapDummy = m_nMessageType;
69 70
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
71
	std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
72

73
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
74 75 76

	// rewrite messageid dummy
	iSwapDummy = m_nMessageId;
77 78
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
79
	std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
80

Anne's avatar
Anne committed
81
#if NET_AUDIO_SHOW_TRAFFIC
82
	vstr::out() << "CITANetAudioMessage [  Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
83
#endif
Anne's avatar
Anne committed
84

85
	try
Anne's avatar
Anne committed
86
	{
87
		// It appears safe to send even very big data payload, so we will send at once
Anne's avatar
Anne committed
88
		int iRawBufferSize = m_oOutgoing.GetBufferSize();
89
		assert( iRawBufferSize > 4 );
90 91
		int nRet = m_pConnection->Send( m_oOutgoing.GetBuffer(), iRawBufferSize );

Anne's avatar
Anne committed
92
#if NET_AUDIO_SHOW_TRAFFIC
93
		vstr::out() << "CITANetAudioMessage [  Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ") RAW BUFFER DONE" << std::endl;
94 95
#endif

96 97 98
		m_pConnection->WaitForSendFinish();
		if( nRet != m_oOutgoing.GetBufferSize() )
			VISTA_THROW( "ITANetAudioMessage: could not send all data from output buffer via network connection", 255 );
Anne's avatar
Anne committed
99
	}
100
	catch( VistaExceptionBase& ex )
Anne's avatar
Anne committed
101
	{
102
		ITA_EXCEPT1( NETWORK_ERROR, ex.GetExceptionText() );
Anne's avatar
Anne committed
103 104 105
	}
}

106
bool CITANetAudioMessage::TryReadMessage()
Anne's avatar
Anne committed
107
{
Anne's avatar
Anne committed
108
#if NET_AUDIO_SHOW_TRAFFIC
109
	vstr::out() << "CITANetAudioMessage [ TryRead ] Waiting for incoming data for " << m_nTimeoutMilliseconds << std::endl;
Anne's avatar
Anne committed
110
#endif
111 112 113 114 115 116 117 118 119
	long nIncomingBytes = m_pConnection->WaitForIncomingData( m_nTimeoutMilliseconds );
	if( nIncomingBytes <= 0 )
	{
#if NET_AUDIO_SHOW_TRAFFIC
			vstr::out() << "CITANetAudioMessage [ TryRead ] nothing incoming" << std::endl;
#endif
		return false;
	}
		
120
	assert( nIncomingBytes >= 4 ); // we need at least the size of message
Anne's avatar
Anne committed
121
#if NET_AUDIO_SHOW_TRAFFIC
122
	vstr::out() << "CITANetAudioMessage [ Reading ] " << nIncomingBytes << " bytes incoming" << std::endl;
Anne's avatar
Anne committed
123
#endif
124

125 126
	VistaType::sint32 nMessagePayloadSize;
	int nBytesRead = m_pConnection->ReadInt32( nMessagePayloadSize );
127
	assert( nBytesRead == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
128
#if NET_AUDIO_SHOW_TRAFFIC
129
	vstr::out() << "CITANetAudioMessage [ Reading ] Expecting " << nMessagePayloadSize << " bytes message payload" << std::endl;
Anne's avatar
Anne committed
130
#endif
131
	// we need at least the two protocol ints
132
	//assert( nMessagePayloadSize >= 2 * sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
133

134 135
	if( nMessagePayloadSize > ( int ) m_vecIncomingBuffer.size() )
		m_vecIncomingBuffer.resize( nMessagePayloadSize );
136

137
	// Receive all incoming data (potentially splitted)
138
	int iBytesReceivedTotal = 0;
139
	while( nMessagePayloadSize < iBytesReceivedTotal )
Anne's avatar
Anne committed
140
	{
Anne's avatar
Anne committed
141 142
		int iIncommingBytes = m_pConnection->WaitForIncomingData( 0 );
		int iBytesReceived = m_pConnection->Receive( &m_vecIncomingBuffer[ iBytesReceivedTotal ], iIncommingBytes );
143
		iBytesReceivedTotal += iBytesReceived;
144
#if NET_AUDIO_SHOW_TRAFFIC
145
		vstr::out() << "[ CITANetAudioMessage ] " << std::setw( 3 ) << std::floor( iBytesReceivedTotal / float( nMessagePayloadSize ) * 100.0f ) << "% transmitted" << std::endl;
Anne's avatar
Anne committed
146
#endif
Anne's avatar
Anne committed
147 148
	}

149 150
	// Transfer data into members
	m_oIncoming.SetBuffer( &m_vecIncomingBuffer[ 0 ], nMessagePayloadSize, false );
Anne's avatar
Anne committed
151 152 153
	m_nMessageType = ReadInt();
	m_nMessageId = ReadInt();

Anne's avatar
Anne committed
154
#if NET_AUDIO_SHOW_TRAFFIC
155
	vstr::out() << "CITANetAudioMessage [ Reading ] Finished receiving " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
156
#endif
Anne's avatar
Anne committed
157

158
	return true;
Anne's avatar
Anne committed
159 160 161 162 163 164 165
}

int CITANetAudioMessage::GetMessageType() const
{
	return m_nMessageType;
}

166
void CITANetAudioMessage::SetMessageType( int nType )
Anne's avatar
Anne committed
167
{
168
	assert( m_nMessageType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
Anne's avatar
Anne committed
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
	m_nMessageType = nType;
}

int CITANetAudioMessage::GetIncomingMessageSize() const
{
	return m_oIncoming.GetTailSize();
}

int CITANetAudioMessage::GetOutgoingMessageSize() const
{
	return m_oOutgoing.GetBufferSize();
}

bool CITANetAudioMessage::GetOutgoingMessageHasData() const
{
184
	return ( m_oOutgoing.GetBufferSize() > 4 * sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
185 186
}

187
void CITANetAudioMessage::WriteString( const std::string& sValue )
Anne's avatar
Anne committed
188
{
189 190
	m_oOutgoing.WriteInt32( ( VistaType::sint32 )sValue.size() );
	if( !sValue.empty() ) m_oOutgoing.WriteString( sValue );
Anne's avatar
Anne committed
191 192
}

193
void CITANetAudioMessage::WriteInt( const int iValue )
Anne's avatar
Anne committed
194
{
195
	m_oOutgoing.WriteInt32( ( VistaType::sint32 )iValue );
Anne's avatar
Anne committed
196 197
}

198
void CITANetAudioMessage::WriteBool( const bool bValue )
Anne's avatar
Anne committed
199
{
200
	m_oOutgoing.WriteBool( bValue );
Anne's avatar
Anne committed
201 202
}

203
void CITANetAudioMessage::WriteFloat( const float fValue )
Anne's avatar
Anne committed
204
{
205
	m_oOutgoing.WriteFloat32( fValue );
Anne's avatar
Anne committed
206 207
}

208
void CITANetAudioMessage::WriteDouble( const double dValue )
Anne's avatar
Anne committed
209
{
210
	m_oOutgoing.WriteFloat64( dValue );
Anne's avatar
Anne committed
211 212 213 214 215
}

std::string CITANetAudioMessage::ReadString()
{
	VistaType::sint32 nSize;
216 217
	int nReturn = m_oIncoming.ReadInt32( nSize );
	assert( nReturn == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
218 219

	// Empty string?
220
	if( nSize == 0 ) return "";
Anne's avatar
Anne committed
221 222

	std::string sValue;
223 224
	nReturn = m_oIncoming.ReadString( sValue, nSize );
	assert( nReturn == nSize );
Anne's avatar
Anne committed
225 226 227 228 229 230
	return sValue;
}

int CITANetAudioMessage::ReadInt()
{
	VistaType::sint32 nValue;
231 232 233 234
	int nReturn = m_oIncoming.ReadInt32( nValue );
	if( nReturn == -1 )
		ITA_EXCEPT1( UNKNOWN, "Could not read integer value from incoming message" );
	assert( nReturn == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
235 236 237 238 239 240
	return nValue;
}

bool CITANetAudioMessage::ReadBool()
{
	bool bValue;
241 242
	int nReturn = m_oIncoming.ReadBool( bValue );
	assert( nReturn == sizeof( bool ) );
Anne's avatar
Anne committed
243 244 245 246 247
	return bValue;
}
float CITANetAudioMessage::ReadFloat()
{
	float fValue;
248 249
	int nReturn = m_oIncoming.ReadFloat32( fValue );
	assert( nReturn == sizeof( float ) );
Anne's avatar
Anne committed
250 251 252 253 254
	return fValue;
}
double CITANetAudioMessage::ReadDouble()
{
	double dValue;
255 256
	int nReturn = m_oIncoming.ReadFloat64( dValue );
	assert( nReturn == sizeof( double ) );
Anne's avatar
Anne committed
257 258 259 260
	return dValue;
}


261
void CITANetAudioMessage::WriteException( const ITAException& oException )
Anne's avatar
Anne committed
262
{
263 264 265
	WriteInt( oException.iErrorCode );
	WriteString( oException.sModule );
	WriteString( oException.sReason );
Anne's avatar
Anne committed
266 267 268 269 270 271 272
}

ITAException CITANetAudioMessage::ReadException()
{
	int iErrorCode = ReadInt();
	std::string sModule = ReadString();
	std::string sReason = ReadString();
273
	return ITAException( iErrorCode, sModule, sReason );
Anne's avatar
Anne committed
274 275 276 277 278 279 280 281 282 283 284
}

VistaConnectionIP* CITANetAudioMessage::GetConnection() const
{
	return m_pConnection;
}

void CITANetAudioMessage::ClearConnection() {
	m_pConnection = NULL;
}

285
void CITANetAudioMessage::WriteIntVector( const std::vector<int> viData )
Anne's avatar
Anne committed
286
{
287 288 289 290
	int iSize = ( int ) viData.size();
	WriteInt( iSize );
	for( int i = 0; i < iSize; i++ )
		WriteInt( viData[ i ] );
Anne's avatar
Anne committed
291 292 293 294 295 296
}

std::vector<int> CITANetAudioMessage::ReadIntVector()
{
	std::vector<int> viData;
	int iSize = ReadInt();
297 298
	for( int i = 0; i < iSize; i++ )
		viData.push_back( ReadInt() );
Anne's avatar
Anne committed
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313

	return viData;
}

CITANetAudioProtocol::StreamingParameters CITANetAudioMessage::ReadStreamingParameters()
{
	CITANetAudioProtocol::StreamingParameters oParams;

	oParams.iChannels = ReadInt();
	oParams.dSampleRate = ReadDouble();
	oParams.iBlockSize = ReadInt();

	return oParams;
}

314
void CITANetAudioMessage::WriteStreamingParameters( const CITANetAudioProtocol::StreamingParameters & oParams )
Anne's avatar
Anne committed
315
{
316 317 318
	WriteInt( oParams.iChannels );
	WriteDouble( oParams.dSampleRate );
	WriteInt( oParams.iBlockSize );
Anne's avatar
Anne committed
319
}
320 321 322 323

int CITANetAudioMessage::ReadRingBufferSize()
{
	return ReadInt();
324 325 326
}

void CITANetAudioMessage::WriteRingBufferSize( const int iBufferSize )
327
{
328 329 330
	WriteInt( iBufferSize );
}

331 332
int CITANetAudioMessage::ReadRingBufferFree()
{
333 334 335 336 337 338 339 340 341
	return ReadInt();
}

void CITANetAudioMessage::WriteRingBufferFree( const int iBufferFree )
{
	WriteInt( iBufferFree );
}

void CITANetAudioMessage::ReadSampleFrame( ITASampleFrame* pSampleFrame )
342 343
{
	int iChannels = ReadInt();
344 345 346 347 348 349
	int iSamples = ReadInt();

	if( pSampleFrame->channels() != iChannels || pSampleFrame->GetLength() != iSamples )
		pSampleFrame->init( iChannels, iSamples, false );

	for( int i = 0; i < iChannels; i++ )
350
	{
351
		for( int j = 0; j < iSamples; j++ )
352
		{
353
			( *pSampleFrame )[ i ][ j ] = ReadFloat();
354 355
		}
	}
356 357 358
}

void CITANetAudioMessage::WriteSampleFrame( ITASampleFrame* pSamples )
359
{
360 361 362 363
	WriteInt( pSamples->channels() );
	WriteInt( pSamples->GetLength() );

	for( int i = 0; i < pSamples->channels(); i++ )
364
	{
365
		for( int j = 0; j < pSamples->GetLength(); j++ )
366
		{
367
			WriteFloat( ( *pSamples )[ i ][ j ] );
368 369
		}
	}
370
}