ITANetAudioMessage.cpp 13.8 KB
Newer Older
Anne's avatar
Anne committed
1
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
2
3
4
5
#include <ITAStringUtils.h>

#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaBase/VistaExceptionBase.h>
6
#include <VistaBase/VistaStreamUtils.h>
Anne's avatar
Anne committed
7
8

#include <cassert>
9
10
#include <iostream>
#include <iomanip>
Anne's avatar
Anne committed
11
12
13

static int S_nMessageIds = 0;

14
15
16
17
CITANetAudioMessage::CITANetAudioMessage( VistaSerializingToolset::ByteOrderSwapBehavior bSwapBuffers )
	: m_vecIncomingBuffer( 2048 )
	, m_oOutgoing( 2048 )
	, m_pConnection( NULL )
Anne's avatar
Anne committed
18
{
19
20
	m_oOutgoing.SetByteorderSwapFlag( bSwapBuffers );
	m_oIncoming.SetByteorderSwapFlag( bSwapBuffers );
Anne's avatar
Anne committed
21
22
23
24
25
	ResetMessage();
}

void CITANetAudioMessage::ResetMessage()
{
26
	if( m_oIncoming.GetTailSize() > 0 )
27
		vstr::err() << "CITANetAudioMessage::ResetMessage() called before message was fully processed!" << std::endl;
Anne's avatar
Anne committed
28
29
30
31

	// wait till sending is complete -> this prevents us
	// from deleting the buffer while it is still being read
	// by the connection
32
33
	if( m_pConnection )
		m_pConnection->WaitForSendFinish();
Anne's avatar
Anne committed
34
35
36
37

	m_nMessageId = S_nMessageIds++;

	m_oOutgoing.ClearBuffer();
38
39
40
	m_oOutgoing.WriteInt32( 0 ); // Payload size
	m_oOutgoing.WriteInt32( 0 ); // Message Type
	m_oOutgoing.WriteInt32( 0 ); // Identifier
Anne's avatar
Anne committed
41

42
	m_oIncoming.SetBuffer( NULL, 0 );
Anne's avatar
Anne committed
43
44
45
46
47

	m_nMessageType = CITANetAudioProtocol::NP_INVALID;
	m_nAnswerType = CITANetAudioProtocol::NP_INVALID;

	m_pConnection = NULL;
48

Anne's avatar
Anne committed
49
#if NET_AUDIO_SHOW_TRAFFIC
50
	vstr::out() << "CITANetAudioMessage [Preparing] (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
51
#endif
Anne's avatar
Anne committed
52
53
}

54
void CITANetAudioMessage::SetConnection( VistaConnectionIP* pConn )
Anne's avatar
Anne committed
55
56
57
58
59
60
{
	m_pConnection = pConn;
}

void CITANetAudioMessage::WriteMessage()
{
61
	VistaType::byte* pBuffer = ( VistaType::byte* ) m_oOutgoing.GetBuffer();
Anne's avatar
Anne committed
62
63
64
	VistaType::sint32 iSwapDummy;

	// rewrite size dummy
65
66
67
68
	iSwapDummy = m_oOutgoing.GetBufferSize() - sizeof( VistaType::sint32 );
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
	memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
69

70
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
71
72
73

	// rewrite type dummy
	iSwapDummy = m_nMessageType;
74
75
76
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
	memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
77

78
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
79
80
81

	// rewrite messageid dummy
	iSwapDummy = m_nMessageId;
82
83
84
85
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
	memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );

Anne's avatar
Anne committed
86
#if NET_AUDIO_SHOW_TRAFFIC
87
	vstr::out() << "CITANetAudioMessage [  Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
88
#endif
Anne's avatar
Anne committed
89

90
	try
Anne's avatar
Anne committed
91
	{
92
		// It appears safe to send even very big data payload, so we will send at once
Anne's avatar
Anne committed
93
		int iRawBufferSize = m_oOutgoing.GetBufferSize();
94
95
		int nRet = m_pConnection->Send( m_oOutgoing.GetBuffer(), iRawBufferSize );

Anne's avatar
Anne committed
96
#if NET_AUDIO_SHOW_TRAFFIC
97
		vstr::out() << "CITANetAudioMessage [  Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ") RAW BUFFER DONE" << std::endl;
98
99
#endif

100
101
102
		m_pConnection->WaitForSendFinish();
		if( nRet != m_oOutgoing.GetBufferSize() )
			VISTA_THROW( "ITANetAudioMessage: could not send all data from output buffer via network connection", 255 );
Anne's avatar
Anne committed
103
	}
104
	catch (VistaExceptionBase& ex)
Anne's avatar
Anne committed
105
	{
106
		ITA_EXCEPT1( NETWORK_ERROR, ex.GetExceptionText() );
Anne's avatar
Anne committed
107
108
109
110
111
112
	}
}


void CITANetAudioMessage::ReadMessage()
{
Anne's avatar
Anne committed
113
#if NET_AUDIO_SHOW_TRAFFIC
114
	vstr::out() << "CITANetAudioMessage [ Reading ] Waiting for incoming data" << std::endl;
Anne's avatar
Anne committed
115
#endif
116
	long nIncomingBytes = m_pConnection->WaitForIncomingData( 0 );
117
	assert( nIncomingBytes >= 4 ); // we need at least the size of message
Anne's avatar
Anne committed
118
#if NET_AUDIO_SHOW_TRAFFIC
119
	vstr::out() << "CITANetAudioMessage [ Reading ] " << nIncomingBytes << " bytes incoming" << std::endl;
Anne's avatar
Anne committed
120
#endif
121

122
123
	VistaType::sint32 nMessagePayloadSize;
	int nBytesRead = m_pConnection->ReadInt32( nMessagePayloadSize );
124
	assert( nBytesRead == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
125
#if NET_AUDIO_SHOW_TRAFFIC
126
	vstr::out() << "CITANetAudioMessage [ Reading ] Expecting " << nMessagePayloadSize << " bytes message payload" << std::endl;
Anne's avatar
Anne committed
127
#endif
128
129
	// we need at least the two protocol ints
	assert( nMessagePayloadSize >= 2 * sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
130

131
132
	if( nMessagePayloadSize > ( int ) m_vecIncomingBuffer.size() )
		m_vecIncomingBuffer.resize( nMessagePayloadSize );
133
134
	
	// Receive all incoming data (potentially splitted)
135
	int iBytesReceivedTotal = 0;
136
	while( nMessagePayloadSize != iBytesReceivedTotal )
Anne's avatar
Anne committed
137
	{
Anne's avatar
Anne committed
138
139
		int iIncommingBytes = m_pConnection->WaitForIncomingData( 0 );
		int iBytesReceived = m_pConnection->Receive( &m_vecIncomingBuffer[ iBytesReceivedTotal ], iIncommingBytes );
140
		iBytesReceivedTotal += iBytesReceived;
141
142
#if NET_AUDIO_SHOW_TRAFFIC
		vstr::out() << "[ CITANetAudioMessage ] " << std::setw( 3 ) << std::floor( iBytesReceivedTotal / float( nMessagePayloadSize ) * 100.0f ) << "% transmitted" << std::endl;
Anne's avatar
Anne committed
143
#endif
Anne's avatar
Anne committed
144
145
	}

146
147
	// Transfer data into members
	m_oIncoming.SetBuffer( &m_vecIncomingBuffer[ 0 ], nMessagePayloadSize, false );
Anne's avatar
Anne committed
148
149
150
	m_nMessageType = ReadInt();
	m_nMessageId = ReadInt();

Anne's avatar
Anne committed
151
#if NET_AUDIO_SHOW_TRAFFIC
152
	vstr::out() << "CITANetAudioMessage [ Reading ] Finished receiving " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
153
154
#endif
}
Anne's avatar
Anne committed
155
156
157

void CITANetAudioMessage::WriteAnswer()
{
158

Anne's avatar
Anne committed
159
#if NET_AUDIO_SHOW_TRAFFIC
160
	vstr::out() << "CITANetAudioMessage [ Answering] to " << m_nMessageType << " with " << m_nAnswerType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
161
162
163
164
165
#endif

	assert( m_nAnswerType != CITANetAudioProtocol::NP_INVALID );

	VistaType::byte* pBuffer = ( VistaType::byte* )m_oOutgoing.GetBuffer();
Anne's avatar
Anne committed
166
167
168
	VistaType::sint32 iSwapDummy;

	// rewrite size dummy
169
170
171
172
	iSwapDummy = m_oOutgoing.GetBufferSize() - sizeof( VistaType::sint32 );
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
	memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
173

174
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
175
176
177

	// rewrite type dummy
	iSwapDummy = m_nAnswerType;
178
179
180
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
	memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
181

182
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
183
184
185

	// rewrite message dummy
	iSwapDummy = m_nMessageId;
186
187
188
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
	memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
189

190
191
	int nRet = m_pConnection->Send( m_oOutgoing.GetBuffer(), m_oOutgoing.GetBufferSize() );
	
192
193
194
	m_pConnection->WaitForSendFinish();
	if( nRet != m_oOutgoing.GetBufferSize() )
		ITA_EXCEPT1( UNKNOWN, "Could not write the expected number of bytes" );
Anne's avatar
Anne committed
195
196
197
198
}

void CITANetAudioMessage::ReadAnswer()
{
Anne's avatar
Anne committed
199
200

#if NET_AUDIO_SHOW_TRAFFIC
201
	vstr::out() << "CITANetAudioMessage [ Reading] yet unkown answer from initial message type " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ") OK" << std::endl;
202
203
#endif

204
	VistaType::sint32 nMessagePayloadSize;
205
	int nReturn;
206
	nReturn = m_pConnection->ReadInt32( nMessagePayloadSize );
207
	assert( nReturn == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
208
#if NET_AUDIO_SHOW_TRAFFIC
209
	vstr::out() << "CITANetAudioMessage [ Reading] Answer type " << nReturn << " (id=" << std::setw( 4 ) << m_nMessageId << ") OK" << std::endl;
Anne's avatar
Anne committed
210
#endif
Anne's avatar
Anne committed
211

212
	// We need at least the message type and message id in payload
213
	assert( nMessagePayloadSize >= 2 * sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
214

215
216
	if( nMessagePayloadSize > ( int ) m_vecIncomingBuffer.size() )
		m_vecIncomingBuffer.resize( nMessagePayloadSize );
217
218
219

	// @todo: read over while( received < total ) loop!!!

220
221
222
223
224
225
226
227
	int iBytesReceivedTotal = 0;
	while( nMessagePayloadSize != iBytesReceivedTotal )
	{
		int iIncommingBytes = m_pConnection->WaitForIncomingData( 0 );
		int iBytesReceived = m_pConnection->Receive( &m_vecIncomingBuffer[ iBytesReceivedTotal ], iIncommingBytes );
		iBytesReceivedTotal += iBytesReceived;
#if NET_AUDIO_SHOW_TRAFFIC
		vstr::out() << "[ CITANetAudioMessage ] " << std::setw( 3 ) << std::floor( iBytesReceivedTotal / float( nMessagePayloadSize ) * 100.0f ) << "% of answer transmitted" << std::endl;
Anne's avatar
Anne committed
228
#endif
229
230
231
232
233
234
	}

	if( iBytesReceivedTotal != nMessagePayloadSize )
		ITA_EXCEPT1( UNKNOWN, "Protokoll error, Received less bytes than expected when trying to receive answer" );

	// Swap data to deserialization buffer
235
	m_oIncoming.SetBuffer( &m_vecIncomingBuffer[ 0 ], nMessagePayloadSize, false );
Anne's avatar
Anne committed
236

237
	// Take out the two protocol variables type and message id from deserialization buffer
238
239
240
	m_nAnswerType = ReadInt();
	int nMessageID = ReadInt();
	assert( nMessageID == m_nMessageId );
Anne's avatar
Anne committed
241
242
243
244
245
246
247
}

int CITANetAudioMessage::GetMessageType() const
{
	return m_nMessageType;
}

248
void CITANetAudioMessage::SetMessageType( int nType )
Anne's avatar
Anne committed
249
{
250
	assert( m_nMessageType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
Anne's avatar
Anne committed
251
252
253
	m_nMessageType = nType;
}

254
void CITANetAudioMessage::SetAnswerType( int nType )
Anne's avatar
Anne committed
255
{
256
	assert( m_nAnswerType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
Anne's avatar
Anne committed
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
	m_nAnswerType = nType;
}

int CITANetAudioMessage::GetAnswerType() const
{
	return m_nAnswerType;
}

int CITANetAudioMessage::GetIncomingMessageSize() const
{
	return m_oIncoming.GetTailSize();
}

int CITANetAudioMessage::GetOutgoingMessageSize() const
{
	return m_oOutgoing.GetBufferSize();
}

bool CITANetAudioMessage::GetOutgoingMessageHasData() const
{
277
	return ( m_oOutgoing.GetBufferSize() > 4 * sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
278
279
}

280
void CITANetAudioMessage::WriteString( const std::string& sValue )
Anne's avatar
Anne committed
281
{
282
283
	m_oOutgoing.WriteInt32( ( VistaType::sint32 )sValue.size() );
	if( !sValue.empty() ) m_oOutgoing.WriteString( sValue );
Anne's avatar
Anne committed
284
285
}

286
void CITANetAudioMessage::WriteInt( const int iValue )
Anne's avatar
Anne committed
287
{
288
	m_oOutgoing.WriteInt32( ( VistaType::sint32 )iValue );
Anne's avatar
Anne committed
289
290
}

291
void CITANetAudioMessage::WriteBool( const bool bValue )
Anne's avatar
Anne committed
292
{
293
	m_oOutgoing.WriteBool( bValue );
Anne's avatar
Anne committed
294
295
}

296
void CITANetAudioMessage::WriteFloat( const float fValue )
Anne's avatar
Anne committed
297
{
298
	m_oOutgoing.WriteFloat32( fValue );
Anne's avatar
Anne committed
299
300
}

301
void CITANetAudioMessage::WriteDouble( const double dValue )
Anne's avatar
Anne committed
302
{
303
	m_oOutgoing.WriteFloat64( dValue );
Anne's avatar
Anne committed
304
305
306
307
308
}

std::string CITANetAudioMessage::ReadString()
{
	VistaType::sint32 nSize;
309
310
	int nReturn = m_oIncoming.ReadInt32( nSize );
	assert( nReturn == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
311
312

	// Empty string?
313
	if( nSize == 0 ) return "";
Anne's avatar
Anne committed
314
315

	std::string sValue;
316
317
	nReturn = m_oIncoming.ReadString( sValue, nSize );
	assert( nReturn == nSize );
Anne's avatar
Anne committed
318
319
320
321
322
323
	return sValue;
}

int CITANetAudioMessage::ReadInt()
{
	VistaType::sint32 nValue;
324
325
326
327
	int nReturn = m_oIncoming.ReadInt32( nValue );
	if( nReturn == -1 )
		ITA_EXCEPT1( UNKNOWN, "Could not read integer value from incoming message" );
	assert( nReturn == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
328
329
330
331
332
333
	return nValue;
}

bool CITANetAudioMessage::ReadBool()
{
	bool bValue;
334
335
	int nReturn = m_oIncoming.ReadBool( bValue );
	assert( nReturn == sizeof( bool ) );
Anne's avatar
Anne committed
336
337
338
339
340
	return bValue;
}
float CITANetAudioMessage::ReadFloat()
{
	float fValue;
341
342
	int nReturn = m_oIncoming.ReadFloat32( fValue );
	assert( nReturn == sizeof( float ) );
Anne's avatar
Anne committed
343
344
345
346
347
	return fValue;
}
double CITANetAudioMessage::ReadDouble()
{
	double dValue;
348
349
	int nReturn = m_oIncoming.ReadFloat64( dValue );
	assert( nReturn == sizeof( double ) );
Anne's avatar
Anne committed
350
351
352
353
	return dValue;
}


354
void CITANetAudioMessage::WriteException( const ITAException& oException )
Anne's avatar
Anne committed
355
{
356
357
358
	WriteInt( oException.iErrorCode );
	WriteString( oException.sModule );
	WriteString( oException.sReason );
Anne's avatar
Anne committed
359
360
361
362
363
364
365
}

ITAException CITANetAudioMessage::ReadException()
{
	int iErrorCode = ReadInt();
	std::string sModule = ReadString();
	std::string sReason = ReadString();
366
	return ITAException( iErrorCode, sModule, sReason );
Anne's avatar
Anne committed
367
368
369
370
371
372
373
374
375
376
377
}

VistaConnectionIP* CITANetAudioMessage::GetConnection() const
{
	return m_pConnection;
}

void CITANetAudioMessage::ClearConnection() {
	m_pConnection = NULL;
}

378
void CITANetAudioMessage::WriteIntVector( const std::vector<int> viData )
Anne's avatar
Anne committed
379
{
380
381
382
383
	int iSize = ( int ) viData.size();
	WriteInt( iSize );
	for( int i = 0; i < iSize; i++ )
		WriteInt( viData[ i ] );
Anne's avatar
Anne committed
384
385
386
387
388
389
}

std::vector<int> CITANetAudioMessage::ReadIntVector()
{
	std::vector<int> viData;
	int iSize = ReadInt();
390
391
	for( int i = 0; i < iSize; i++ )
		viData.push_back( ReadInt() );
Anne's avatar
Anne committed
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406

	return viData;
}

CITANetAudioProtocol::StreamingParameters CITANetAudioMessage::ReadStreamingParameters()
{
	CITANetAudioProtocol::StreamingParameters oParams;

	oParams.iChannels = ReadInt();
	oParams.dSampleRate = ReadDouble();
	oParams.iBlockSize = ReadInt();

	return oParams;
}

407
void CITANetAudioMessage::WriteStreamingParameters( const CITANetAudioProtocol::StreamingParameters & oParams )
Anne's avatar
Anne committed
408
{
409
410
411
	WriteInt( oParams.iChannels );
	WriteDouble( oParams.dSampleRate );
	WriteInt( oParams.iBlockSize );
Anne's avatar
Anne committed
412
}
413
414
415
416

int CITANetAudioMessage::ReadRingBufferSize()
{
	return ReadInt();
417
418
419
}

void CITANetAudioMessage::WriteRingBufferSize( const int iBufferSize )
420
{
421
422
423
	WriteInt( iBufferSize );
}

424
425
int CITANetAudioMessage::ReadRingBufferFree()
{
426
427
428
429
430
431
432
433
434
	return ReadInt();
}

void CITANetAudioMessage::WriteRingBufferFree( const int iBufferFree )
{
	WriteInt( iBufferFree );
}

void CITANetAudioMessage::ReadSampleFrame( ITASampleFrame* pSampleFrame )
435
436
{
	int iChannels = ReadInt();
437
438
439
440
441
442
	int iSamples = ReadInt();

	if( pSampleFrame->channels() != iChannels || pSampleFrame->GetLength() != iSamples )
		pSampleFrame->init( iChannels, iSamples, false );

	for( int i = 0; i < iChannels; i++ )
443
	{
444
		for( int j = 0; j < iSamples; j++ )
445
		{
446
			( *pSampleFrame )[ i ][ j ] = ReadFloat();
447
448
		}
	}
449
450
451
}

void CITANetAudioMessage::WriteSampleFrame( ITASampleFrame* pSamples )
452
{
453
454
455
456
	WriteInt( pSamples->channels() );
	WriteInt( pSamples->GetLength() );

	for( int i = 0; i < pSamples->channels(); i++ )
457
	{
458
		for( int j = 0; j < pSamples->GetLength(); j++ )
459
		{
460
			WriteFloat( ( *pSamples )[ i ][ j ] );
461
462
		}
	}
463
464
}