ITANetAudioMessage.cpp 13.9 KB
Newer Older
Anne's avatar
Anne committed
1
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
2
3
4
5
6
7
#include <ITAStringUtils.h>

#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaBase/VistaExceptionBase.h>

#include <cassert>
8
9
#include <iostream>
#include <iomanip>
Anne's avatar
Anne committed
10

Anne's avatar
Anne committed
11
12
#define NET_AUDIO_SHOW_TRAFFIC 0;

Anne's avatar
Anne committed
13
14
static int S_nMessageIds = 0;

15
16
17
18
CITANetAudioMessage::CITANetAudioMessage( VistaSerializingToolset::ByteOrderSwapBehavior bSwapBuffers )
	: m_vecIncomingBuffer( 2048 )
	, m_oOutgoing( 2048 )
	, m_pConnection( NULL )
Anne's avatar
Anne committed
19
{
20
21
	m_oOutgoing.SetByteorderSwapFlag( bSwapBuffers );
	m_oIncoming.SetByteorderSwapFlag( bSwapBuffers );
Anne's avatar
Anne committed
22
23
24
25
26
	ResetMessage();
}

void CITANetAudioMessage::ResetMessage()
{
27
	if( m_oIncoming.GetTailSize() > 0 )
Anne's avatar
Anne committed
28
29
30
31
32
33
34
	{
		std::cerr << "CITANetAudioMessage::ResetMessage() called before message was fully processed!" << std::endl;
	}

	// wait till sending is complete -> this prevents us
	// from deleting the buffer while it is still being read
	// by the connection
35
36
	//if( m_pConnection )
	//	m_pConnection->WaitForSendFinish();
Anne's avatar
Anne committed
37
38
39
40

	m_nMessageId = S_nMessageIds++;

	m_oOutgoing.ClearBuffer();
41
42
43
	m_oOutgoing.WriteInt32( 0 ); // size dummy
	m_oOutgoing.WriteInt32( 0 ); // type dummy
	m_oOutgoing.WriteInt32( 0 ); // ID
Anne's avatar
Anne committed
44

45
	m_oIncoming.SetBuffer( NULL, 0 );
Anne's avatar
Anne committed
46
47
48
49
50

	m_nMessageType = CITANetAudioProtocol::NP_INVALID;
	m_nAnswerType = CITANetAudioProtocol::NP_INVALID;

	m_pConnection = NULL;
51

Anne's avatar
Anne committed
52
#if NET_AUDIO_SHOW_TRAFFIC
53
54
	std::cout << "CITANetAudioMessage [Preparing] (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
#endif
Anne's avatar
Anne committed
55
56
}

57
void CITANetAudioMessage::SetConnection( VistaConnectionIP* pConn )
Anne's avatar
Anne committed
58
59
60
61
62
63
{
	m_pConnection = pConn;
}

void CITANetAudioMessage::WriteMessage()
{
64
	VistaType::byte* pBuffer = ( VistaType::byte* ) m_oOutgoing.GetBuffer();
Anne's avatar
Anne committed
65
66
67
	VistaType::sint32 iSwapDummy;

	// rewrite size dummy
68
69
70
71
	iSwapDummy = m_oOutgoing.GetBufferSize() - sizeof( VistaType::sint32 );
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
	memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
72

73
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
74
75
76

	// rewrite type dummy
	iSwapDummy = m_nMessageType;
77
78
79
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
	memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
80

81
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
82
83
84

	// rewrite messageid dummy
	iSwapDummy = m_nMessageId;
85
86
87
88
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
	memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );

Anne's avatar
Anne committed
89
#if NET_AUDIO_SHOW_TRAFFIC
90
91
	std::cout << "CITANetAudioMessage [  Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
#endif
Anne's avatar
Anne committed
92

93
	try
Anne's avatar
Anne committed
94
95
	{
		int iRawBufferSize = m_oOutgoing.GetBufferSize();
96
97
		int nRet = m_pConnection->Send( m_oOutgoing.GetBuffer(), iRawBufferSize );

Anne's avatar
Anne committed
98
#if NET_AUDIO_SHOW_TRAFFIC
99
100
101
102
103
104
		std::cout << "CITANetAudioMessage [  Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ") RAW BUFFER DONE" << std::endl;
#endif

		//m_pConnection->WaitForSendFinish();
		//if( nRet != m_oOutgoing.GetBufferSize() )
		//	ITA_EXCEPT1( NETWORK_ERROR, "Could not write the expected number of bytes" );
Anne's avatar
Anne committed
105
	}
106
	catch (VistaExceptionBase& ex)
Anne's avatar
Anne committed
107
	{
108
		ITA_EXCEPT1(NETWORK_ERROR, ex.GetExceptionText());
Anne's avatar
Anne committed
109
110
111
112
113
114
	}
}


void CITANetAudioMessage::ReadMessage()
{
Anne's avatar
Anne committed
115
#if NET_AUDIO_SHOW_TRAFFIC
116
	std::cout << "CITANetAudioMessage [ Reading ] Waiting for incoming data" << std::endl;
Anne's avatar
Anne committed
117
#endif
118
	long nIncomingBytes = m_pConnection->WaitForIncomingData( 0 );
Anne's avatar
Anne committed
119
#if NET_AUDIO_SHOW_TRAFFIC
120
	std::cout << "CITANetAudioMessage [ Reading ] " << nIncomingBytes << " bytes incoming" << std::endl;
Anne's avatar
Anne committed
121
#endif
122

123
124
125
	VistaType::sint32 nMessagePayloadSize;
	int nBytesRead = m_pConnection->ReadInt32( nMessagePayloadSize );
	assert( nBytesRead = sizeof( int ) );
Anne's avatar
Anne committed
126
#if NET_AUDIO_SHOW_TRAFFIC
127
	std::cout << "CITANetAudioMessage [ Reading ] Expecting " << nMessagePayloadSize << " bytes message payload" << std::endl;
Anne's avatar
Anne committed
128
#endif
129
130
	// we need at least the two protocol ints
	assert( nMessagePayloadSize >= 2 * sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
131

132
133
	if( nMessagePayloadSize > ( int ) m_vecIncomingBuffer.size() )
		m_vecIncomingBuffer.resize( nMessagePayloadSize );
Anne's avatar
Anne committed
134

Anne's avatar
Anne committed
135
136
137
138
139
140
141
142
143
144
145
	/*
	int iBytesReceivedTotal = 0;
			while( iPayloadDataSize != iBytesReceivedTotal )
			{
				long nIncomingBytes = pSocket->WaitForIncomingData( 0 );
				int iBytesReceived = pSocket->ReceiveRaw( &vdIncomingData[ iBytesReceivedTotal ], nIncomingBytes );
				iBytesReceivedTotal += iBytesReceived;
				vstr::out() << "[ Server ] " << setw( 3 ) << std::floor( iBytesReceivedTotal / float( iPayloadDataSize ) * 100.0f ) << "% transmitted" << endl;
			}
*/

146
	int iBytesReceivedTotal = m_pConnection->Receive( &m_vecIncomingBuffer[ 0 ], nMessagePayloadSize );
Anne's avatar
Anne committed
147
#if NET_AUDIO_SHOW_TRAFFIC
148
	std::cout << "CITANetAudioMessage [ Reading ] Received " << iBytesReceivedTotal << " bytes, so far." << std::endl;
Anne's avatar
Anne committed
149
#endif
150
	while( nMessagePayloadSize != iBytesReceivedTotal )
Anne's avatar
Anne committed
151
	{
Anne's avatar
Anne committed
152
153
		int iIncommingBytes = m_pConnection->WaitForIncomingData( 0 );
		int iBytesReceived = m_pConnection->Receive( &m_vecIncomingBuffer[ iBytesReceivedTotal ], iIncommingBytes );
154
		iBytesReceivedTotal += iBytesReceived;
Anne's avatar
Anne committed
155
156
157
#if NET_AUDIO_SHOW_TRAFFIC
		std::cout << "CITANetAudioMessage [ Reading ] Further " << std::setw( 3 ) << iBytesReceivedTotal << " transmitted" << std::endl;
#endif
Anne's avatar
Anne committed
158
159
	}

160
161
	// Transfer data into members
	m_oIncoming.SetBuffer( &m_vecIncomingBuffer[ 0 ], nMessagePayloadSize, false );
Anne's avatar
Anne committed
162
163
164
	m_nMessageType = ReadInt();
	m_nMessageId = ReadInt();

Anne's avatar
Anne committed
165
#if NET_AUDIO_SHOW_TRAFFIC
166
167
168
	std::cout << "CITANetAudioMessage [ Reading ] Finished receiving " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
#endif
}
Anne's avatar
Anne committed
169
170
171

void CITANetAudioMessage::WriteAnswer()
{
172

Anne's avatar
Anne committed
173
#if NET_AUDIO_SHOW_TRAFFIC
174
175
176
177
178
179
	std::cout << "CITANetAudioMessage [ Answering] to " << m_nMessageType << " with " << m_nAnswerType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
#endif

	assert( m_nAnswerType != CITANetAudioProtocol::NP_INVALID );

	VistaType::byte* pBuffer = ( VistaType::byte* )m_oOutgoing.GetBuffer();
Anne's avatar
Anne committed
180
181
182
	VistaType::sint32 iSwapDummy;

	// rewrite size dummy
183
184
185
186
	iSwapDummy = m_oOutgoing.GetBufferSize() - sizeof( VistaType::sint32 );
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
	memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
187

188
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
189
190
191

	// rewrite type dummy
	iSwapDummy = m_nAnswerType;
192
193
194
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
	memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
195

196
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
197
198
199

	// rewrite message dummy
	iSwapDummy = m_nMessageId;
200
201
202
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
	memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
203

204
205
206
207
208
	int nRet = m_pConnection->Send( m_oOutgoing.GetBuffer(), m_oOutgoing.GetBufferSize() );
	//m_pConnection->WaitForSendFinish();
	//f( nRet != m_oOutgoing.GetBufferSize() )
	//	ITA_EXCEPT1( UNKNOWN, "Could not write the expected number of bytes" );
	
Anne's avatar
Anne committed
209
210
211
212
}

void CITANetAudioMessage::ReadAnswer()
{
Anne's avatar
Anne committed
213
214

#if NET_AUDIO_SHOW_TRAFFIC
215
216
217
218
	std::cout << "CITANetAudioMessage [ Reading] yet unkown answer from message " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ") OK" << std::endl;
#endif

	//try
Anne's avatar
Anne committed
219
220
221
	{
		VistaType::sint32 nMessageSize;
		int nReturn;
222
		nReturn = m_pConnection->ReadInt32( nMessageSize );
Anne's avatar
Anne committed
223
224
225
#if NET_AUDIO_SHOW_TRAFFIC
		std::cout << "CITANetAudioMessage [ Reading] 1. return is " << nReturn << " (id=" << std::setw( 4 ) << m_nMessageId << ") OK" << std::endl;
#endif
226
227
		if( nReturn != sizeof( VistaType::sint32 ) )
			ITA_EXCEPT1( UNKNOWN, "Protokoll error, was expecting 4 bytes to read message size, but received " + std::to_string( nReturn ) );
Anne's avatar
Anne committed
228
229

		// we need at least the two protocol types
230
		assert( nMessageSize >= 2 * sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
231

232
233
		if( nMessageSize > ( int ) m_vecIncomingBuffer.size() )
			m_vecIncomingBuffer.resize( nMessageSize );
Anne's avatar
Anne committed
234
235

		// jst: hier nicht while( nReturn < nMessageSize) ReadRawBuffer??
236
237
238
		nReturn = m_pConnection->ReadRawBuffer( &m_vecIncomingBuffer[ 0 ], nMessageSize );
		if( nReturn != nMessageSize )
			ITA_EXCEPT1( UNKNOWN, "Protokoll error, Received less bytes than expected" );
Anne's avatar
Anne committed
239
240
241
#if NET_AUDIO_SHOW_TRAFFIC
		std::cout << "CITANetAudioMessage [ Reading] 2. return is " << nReturn << " (id=" << std::setw( 4 ) << m_nMessageId << ") OK" << std::endl;
#endif
242
		m_oIncoming.SetBuffer( &m_vecIncomingBuffer[ 0 ], nReturn );
Anne's avatar
Anne committed
243
	}
244
	//catch (VistaExceptionBase& ex)
Anne's avatar
Anne committed
245
246
	{
		// Probable connection loss
247
		//ITA_EXCEPT1( UNKNOWN, ex.GetExceptionText() );
Anne's avatar
Anne committed
248
249
	}

250
251
252
253
	m_nAnswerType = ReadInt();
	int nMessageID = ReadInt();
	assert( nMessageID == m_nMessageId );
	m_nMessageId = nMessageID;
Anne's avatar
Anne committed
254
255
256
257
258
259
260
261
}


int CITANetAudioMessage::GetMessageType() const
{
	return m_nMessageType;
}

262
void CITANetAudioMessage::SetMessageType( int nType )
Anne's avatar
Anne committed
263
{
264
	assert( m_nMessageType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
Anne's avatar
Anne committed
265
266
267
	m_nMessageType = nType;
}

268
void CITANetAudioMessage::SetAnswerType( int nType )
Anne's avatar
Anne committed
269
{
270
	assert( m_nAnswerType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
Anne's avatar
Anne committed
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
	m_nAnswerType = nType;
}

int CITANetAudioMessage::GetAnswerType() const
{
	return m_nAnswerType;
}

int CITANetAudioMessage::GetIncomingMessageSize() const
{
	return m_oIncoming.GetTailSize();
}

int CITANetAudioMessage::GetOutgoingMessageSize() const
{
	return m_oOutgoing.GetBufferSize();
}

bool CITANetAudioMessage::GetOutgoingMessageHasData() const
{
291
	return ( m_oOutgoing.GetBufferSize() > 4 * sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
292
293
}

294
void CITANetAudioMessage::WriteString( const std::string& sValue )
Anne's avatar
Anne committed
295
{
296
297
	m_oOutgoing.WriteInt32( ( VistaType::sint32 )sValue.size() );
	if( !sValue.empty() ) m_oOutgoing.WriteString( sValue );
Anne's avatar
Anne committed
298
299
}

300
void CITANetAudioMessage::WriteInt( const int iValue )
Anne's avatar
Anne committed
301
{
302
	m_oOutgoing.WriteInt32( ( VistaType::sint32 )iValue );
Anne's avatar
Anne committed
303
304
}

305
void CITANetAudioMessage::WriteBool( const bool bValue )
Anne's avatar
Anne committed
306
{
307
	m_oOutgoing.WriteBool( bValue );
Anne's avatar
Anne committed
308
309
}

310
void CITANetAudioMessage::WriteFloat( const float fValue )
Anne's avatar
Anne committed
311
{
312
	m_oOutgoing.WriteFloat32( fValue );
Anne's avatar
Anne committed
313
314
}

315
void CITANetAudioMessage::WriteDouble( const double dValue )
Anne's avatar
Anne committed
316
{
317
	m_oOutgoing.WriteFloat64( dValue );
Anne's avatar
Anne committed
318
319
320
321
322
}

std::string CITANetAudioMessage::ReadString()
{
	VistaType::sint32 nSize;
323
324
	int nReturn = m_oIncoming.ReadInt32( nSize );
	assert( nReturn == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
325
326

	// Empty string?
327
	if( nSize == 0 ) return "";
Anne's avatar
Anne committed
328
329

	std::string sValue;
330
331
	nReturn = m_oIncoming.ReadString( sValue, nSize );
	assert( nReturn == nSize );
Anne's avatar
Anne committed
332
333
334
335
336
337
	return sValue;
}

int CITANetAudioMessage::ReadInt()
{
	VistaType::sint32 nValue;
338
339
340
341
	int nReturn = m_oIncoming.ReadInt32( nValue );
	if( nReturn == -1 )
		ITA_EXCEPT1( UNKNOWN, "Could not read integer value from incoming message" );
	assert( nReturn == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
342
343
344
345
346
347
	return nValue;
}

bool CITANetAudioMessage::ReadBool()
{
	bool bValue;
348
349
	int nReturn = m_oIncoming.ReadBool( bValue );
	assert( nReturn == sizeof( bool ) );
Anne's avatar
Anne committed
350
351
352
353
354
	return bValue;
}
float CITANetAudioMessage::ReadFloat()
{
	float fValue;
355
356
	int nReturn = m_oIncoming.ReadFloat32( fValue );
	assert( nReturn == sizeof( float ) );
Anne's avatar
Anne committed
357
358
359
360
361
	return fValue;
}
double CITANetAudioMessage::ReadDouble()
{
	double dValue;
362
363
	int nReturn = m_oIncoming.ReadFloat64( dValue );
	assert( nReturn == sizeof( double ) );
Anne's avatar
Anne committed
364
365
366
367
	return dValue;
}


368
void CITANetAudioMessage::WriteException( const ITAException& oException )
Anne's avatar
Anne committed
369
{
370
371
372
	WriteInt( oException.iErrorCode );
	WriteString( oException.sModule );
	WriteString( oException.sReason );
Anne's avatar
Anne committed
373
374
375
376
377
378
379
}

ITAException CITANetAudioMessage::ReadException()
{
	int iErrorCode = ReadInt();
	std::string sModule = ReadString();
	std::string sReason = ReadString();
380
	return ITAException( iErrorCode, sModule, sReason );
Anne's avatar
Anne committed
381
382
383
384
385
386
387
388
389
390
391
}

VistaConnectionIP* CITANetAudioMessage::GetConnection() const
{
	return m_pConnection;
}

void CITANetAudioMessage::ClearConnection() {
	m_pConnection = NULL;
}

392
void CITANetAudioMessage::WriteIntVector( const std::vector<int> viData )
Anne's avatar
Anne committed
393
{
394
395
396
397
	int iSize = ( int ) viData.size();
	WriteInt( iSize );
	for( int i = 0; i < iSize; i++ )
		WriteInt( viData[ i ] );
Anne's avatar
Anne committed
398
399
400
401
402
403
}

std::vector<int> CITANetAudioMessage::ReadIntVector()
{
	std::vector<int> viData;
	int iSize = ReadInt();
404
405
	for( int i = 0; i < iSize; i++ )
		viData.push_back( ReadInt() );
Anne's avatar
Anne committed
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420

	return viData;
}

CITANetAudioProtocol::StreamingParameters CITANetAudioMessage::ReadStreamingParameters()
{
	CITANetAudioProtocol::StreamingParameters oParams;

	oParams.iChannels = ReadInt();
	oParams.dSampleRate = ReadDouble();
	oParams.iBlockSize = ReadInt();

	return oParams;
}

421
void CITANetAudioMessage::WriteStreamingParameters( const CITANetAudioProtocol::StreamingParameters & oParams )
Anne's avatar
Anne committed
422
{
423
424
425
	WriteInt( oParams.iChannels );
	WriteDouble( oParams.dSampleRate );
	WriteInt( oParams.iBlockSize );
Anne's avatar
Anne committed
426
}
427
428
429
430

int CITANetAudioMessage::ReadRingBufferSize()
{
	return ReadInt();
431
432
433
}

void CITANetAudioMessage::WriteRingBufferSize( const int iBufferSize )
434
{
435
436
437
	WriteInt( iBufferSize );
}

438
439
int CITANetAudioMessage::ReadRingBufferFree()
{
440
441
442
443
444
445
446
447
448
	return ReadInt();
}

void CITANetAudioMessage::WriteRingBufferFree( const int iBufferFree )
{
	WriteInt( iBufferFree );
}

void CITANetAudioMessage::ReadSampleFrame( ITASampleFrame* pSampleFrame )
449
450
{
	int iChannels = ReadInt();
451
452
453
454
455
456
	int iSamples = ReadInt();

	if( pSampleFrame->channels() != iChannels || pSampleFrame->GetLength() != iSamples )
		pSampleFrame->init( iChannels, iSamples, false );

	for( int i = 0; i < iChannels; i++ )
457
	{
458
		for( int j = 0; j < iSamples; j++ )
459
		{
460
			( *pSampleFrame )[ i ][ j ] = ReadFloat();
461
462
		}
	}
463
464
465
}

void CITANetAudioMessage::WriteSampleFrame( ITASampleFrame* pSamples )
466
{
467
468
469
470
	WriteInt( pSamples->channels() );
	WriteInt( pSamples->GetLength() );

	for( int i = 0; i < pSamples->channels(); i++ )
471
	{
472
		for( int j = 0; j < pSamples->GetLength(); j++ )
473
		{
474
			WriteFloat( ( *pSamples )[ i ][ j ] );
475
476
		}
	}
477
478
}