ITANetAudioMessage.cpp 12.9 KB
Newer Older
Anne's avatar
Anne committed
1
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
2
3
4
5
#include <ITAStringUtils.h>

#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaBase/VistaExceptionBase.h>
6
#include <VistaBase/VistaStreamUtils.h>
Anne's avatar
Anne committed
7
#include <ITAClock.h>
Anne Heimes's avatar
Anne Heimes committed
8
#include <ITADataLog.h>
9
10
11

#include <cstring>
#include <algorithm>
Anne's avatar
Anne committed
12
#include <cassert>
13
14
#include <iostream>
#include <iomanip>
Anne's avatar
Anne committed
15
16
17

static int S_nMessageIds = 0;

Anne Heimes's avatar
Anne Heimes committed
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
struct ITAProtocolLog : public ITALogDataBase
{
	inline static std::ostream& outputDesc(std::ostream& os)
	{
		os << "BlockId";
		os << "\t" << "WorldTimeStamp";
		os << "\t" << "MessageType";
		os << "\t" << "Status";
		os << "\t" << "Paketgroesse";
		os << std::endl;
		return os;
	};

	inline std::ostream& outputData(std::ostream& os) const
	{
		os << uiBlockId;
		os << "\t" << std::setprecision(12) << dWorldTimeStamp;
		os << "\t" << iMessageType;
		os << "\t" << iStatus;
		os << "\t" << nMessagePayloadSize;
		os << std::endl;
		return os;
	};

	unsigned int uiBlockId; //!< Block identifier (audio streaming)
	double dWorldTimeStamp;
	int iMessageType; //!< ... usw
	int iStatus; //!< ... usw
	VistaType::sint32 nMessagePayloadSize;

};

class ITABufferedDataLoggerImplProtocol : public ITABufferedDataLogger < ITAProtocolLog > {};

52
53
54
55
CITANetAudioMessage::CITANetAudioMessage( VistaSerializingToolset::ByteOrderSwapBehavior bSwapBuffers )
	: m_vecIncomingBuffer( 2048 )
	, m_oOutgoing( 2048 )
	, m_pConnection( NULL )
56
	, m_iBytesReceivedTotal(0)
Anne's avatar
Anne committed
57
{
58
59
	m_oOutgoing.SetByteorderSwapFlag( bSwapBuffers );
	m_oIncoming.SetByteorderSwapFlag( bSwapBuffers );
Anne's avatar
Anne committed
60
	ResetMessage();
Anne Heimes's avatar
Anne Heimes committed
61
62

	m_pProtocolLogger = new ITABufferedDataLoggerImplProtocol();
Anne's avatar
Anne committed
63
64
65
66
}

void CITANetAudioMessage::ResetMessage()
{
Anne Heimes's avatar
Anne Heimes committed
67
68
	if (m_oIncoming.GetTailSize() > 0)
	{
Anne Heimes's avatar
Anne Heimes committed
69
		vstr::err() << "CITANetAudioMessage::ResetMessage() called before message was fully processed!" << std::endl;
Anne's avatar
Anne committed
70

Anne Heimes's avatar
Anne Heimes committed
71
72
73
74
75
76
77
78
79
80
		ITAProtocolLog oLog;
		oLog.uiBlockId = m_nMessageId;
		oLog.iMessageType = m_nMessageType;
		oLog.iStatus = -1;
		oLog.nMessagePayloadSize = 0;
		oLog.dWorldTimeStamp = ITAClock::getDefaultClock()->getTime();

		m_pProtocolLogger->log(oLog);
	}

Anne's avatar
Anne committed
81
82
83
	// wait till sending is complete -> this prevents us
	// from deleting the buffer while it is still being read
	// by the connection
84
	if (m_pConnection != NULL)
85
		m_pConnection->WaitForSendFinish();
Anne's avatar
Anne committed
86
87
88
89

	m_nMessageId = S_nMessageIds++;

	m_oOutgoing.ClearBuffer();
90
91
92
	m_oOutgoing.WriteInt32( 0 ); // Payload size
	m_oOutgoing.WriteInt32( 0 ); // Message Type
	m_oOutgoing.WriteInt32( 0 ); // Identifier
Anne's avatar
Anne committed
93

94
	m_oIncoming.SetBuffer( NULL, 0 );
Anne's avatar
Anne committed
95

Anne's avatar
Anne committed
96
	m_nMessageType = -1;
Anne's avatar
Anne committed
97

Anne's avatar
Anne committed
98
	//m_pConnection = NULL;
99

Anne's avatar
Anne committed
100
#if NET_AUDIO_SHOW_TRAFFIC
101
	vstr::out() << "CITANetAudioMessage [Preparing] (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
102
#endif
Anne's avatar
Anne committed
103
104
}

105
void CITANetAudioMessage::SetConnection( VistaConnectionIP* pConn )
Anne's avatar
Anne committed
106
107
108
109
110
111
{
	m_pConnection = pConn;
}

void CITANetAudioMessage::WriteMessage()
{
Anne Heimes's avatar
Anne Heimes committed
112
	ITAProtocolLog oLog;
113
	VistaType::byte* pBuffer = ( VistaType::byte* ) m_oOutgoing.GetBuffer();
Anne's avatar
Anne committed
114
115
116
	VistaType::sint32 iSwapDummy;

	// rewrite size dummy
117
	iSwapDummy = m_oOutgoing.GetBufferSize() - sizeof( VistaType::sint32 );
Anne Heimes's avatar
Anne Heimes committed
118
	oLog.nMessagePayloadSize = iSwapDummy;
119
120
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
121
	std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
122

123
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
124
125
126

	// rewrite type dummy
	iSwapDummy = m_nMessageType;
Anne Heimes's avatar
Anne Heimes committed
127
	oLog.iMessageType = m_nMessageType;
128
129
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
130
	std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
131

132
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
133
134
135

	// rewrite messageid dummy
	iSwapDummy = m_nMessageId;
Anne Heimes's avatar
Anne Heimes committed
136
	oLog.uiBlockId = m_nMessageId;
137
138
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
139
	std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
Anne Heimes's avatar
Anne Heimes committed
140
141
142
	oLog.iStatus = 0;
	oLog.dWorldTimeStamp = ITAClock::getDefaultClock()->getTime();
	m_pProtocolLogger->log( oLog );
Anne's avatar
Anne committed
143
#if NET_AUDIO_SHOW_TRAFFIC
144
	vstr::out() << "CITANetAudioMessage [  Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
145
#endif
Anne's avatar
Anne committed
146

147
	try
Anne's avatar
Anne committed
148
	{
149
		// It appears safe to send even very big data payload, so we will send at once
Anne's avatar
Anne committed
150
		int iRawBufferSize = m_oOutgoing.GetBufferSize();
151
152
		int nRet = m_pConnection->Send( m_oOutgoing.GetBuffer(), iRawBufferSize );

Anne's avatar
Anne committed
153
#if NET_AUDIO_SHOW_TRAFFIC
154
		vstr::out() << "CITANetAudioMessage [  Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ") RAW BUFFER DONE" << std::endl;
155
156
#endif

157
		m_pConnection->WaitForSendFinish();
158
159
		//if( nRet != m_oOutgoing.GetBufferSize() )
			//VISTA_THROW( "ITANetAudioMessage: could not send all data from output buffer via network connection", 255 );
Anne's avatar
Anne committed
160
	}
161
	catch (VistaExceptionBase& ex)
Anne's avatar
Anne committed
162
	{
163
		ITA_EXCEPT1( NETWORK_ERROR, ex.GetExceptionText() );
Anne's avatar
Anne committed
164
165
166
167
	}
}


Anne's avatar
Anne committed
168
bool CITANetAudioMessage::ReadMessage( int timeout)
Anne's avatar
Anne committed
169
{
Anne Heimes's avatar
Anne Heimes committed
170
	ITAProtocolLog oLog;
Anne's avatar
Anne committed
171
#if NET_AUDIO_SHOW_TRAFFIC
172
	vstr::out() << "CITANetAudioMessage [ Reading ] Waiting for incoming data" << std::endl;
Anne's avatar
Anne committed
173
#endif
Anne's avatar
Anne committed
174
	// WaitForIncomming Data int in ca ms
Anne's avatar
Anne committed
175
	long nIncomingBytes = m_pConnection->WaitForIncomingData( timeout );
Anne's avatar
Anne committed
176
	// TODO Timer entfernen
Anne Heimes's avatar
Anne Heimes committed
177
	if (nIncomingBytes == -1)
Anne's avatar
Anne committed
178
		return false;
Anne's avatar
Anne committed
179

180
181
	//if (timeout != 0)
		//nIncomingBytes = m_pConnection->WaitForIncomingData( 0 );
Anne's avatar
Anne committed
182
#if NET_AUDIO_SHOW_TRAFFIC
183
	vstr::out() << "CITANetAudioMessage [ Reading ] " << nIncomingBytes << " bytes incoming" << std::endl;
Anne's avatar
Anne committed
184
#endif
185

186
187
	VistaType::sint32 nMessagePayloadSize;
	int nBytesRead = m_pConnection->ReadInt32( nMessagePayloadSize );
Anne Heimes's avatar
Anne Heimes committed
188
	oLog.nMessagePayloadSize = nMessagePayloadSize;
189
	assert( nBytesRead == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
190
#if NET_AUDIO_SHOW_TRAFFIC
191
	vstr::out() << "CITANetAudioMessage [ Reading ] Expecting " << nMessagePayloadSize << " bytes message payload" << std::endl;
Anne's avatar
Anne committed
192
#endif
Anne Heimes's avatar
Anne Heimes committed
193
194
	if (nMessagePayloadSize <= 0)
		return false;
195
	// we need at least the two protocol ints
196
	//assert( nMessagePayloadSize >= 2 * sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
197

198
199
	if( nMessagePayloadSize > ( int ) m_vecIncomingBuffer.size() )
		m_vecIncomingBuffer.resize( nMessagePayloadSize );
200
201
	
	// Receive all incoming data (potentially splitted)
202
	
Anne Heimes's avatar
Anne Heimes committed
203
	while (nMessagePayloadSize > m_iBytesReceivedTotal)
Anne's avatar
Anne committed
204
	{
Anne's avatar
Anne committed
205
		int iIncommingBytes = m_pConnection->WaitForIncomingData( 0 );
Anne's avatar
Anne committed
206
207
		int iBytesReceived;
		if ( nMessagePayloadSize < iIncommingBytes )
Anne Heimes's avatar
Anne Heimes committed
208
			iBytesReceived = m_pConnection->Receive(&m_vecIncomingBuffer[m_iBytesReceivedTotal], nMessagePayloadSize - m_iBytesReceivedTotal);
Anne's avatar
Anne committed
209
		else
Anne Heimes's avatar
Anne Heimes committed
210
211
			iBytesReceived = m_pConnection->Receive(&m_vecIncomingBuffer[m_iBytesReceivedTotal], iIncommingBytes);
		m_iBytesReceivedTotal += iBytesReceived;
212
#if NET_AUDIO_SHOW_TRAFFIC
213
		vstr::out() << "[ CITANetAudioMessage ] " << std::setw( 3 ) << std::floor( iBytesReceivedTotal / float( nMessagePayloadSize ) * 100.0f ) << "% transmitted" << std::endl;
Anne's avatar
Anne committed
214
#endif
Anne's avatar
Anne committed
215
	}
216
	m_iBytesReceivedTotal = 0;
Anne's avatar
Anne committed
217

Anne Heimes's avatar
Anne Heimes committed
218
219
	oLog.iStatus = 1;
	oLog.dWorldTimeStamp = ITAClock::getDefaultClock()->getTime();
220
221
	// Transfer data into members
	m_oIncoming.SetBuffer( &m_vecIncomingBuffer[ 0 ], nMessagePayloadSize, false );
Anne's avatar
Anne committed
222
223
	m_nMessageType = ReadInt();
	m_nMessageId = ReadInt();
Anne Heimes's avatar
Anne Heimes committed
224
225
226
	oLog.iMessageType = m_nMessageType;
	oLog.uiBlockId = m_nMessageId;
	m_pProtocolLogger->log( oLog );
Anne's avatar
Anne committed
227

Anne's avatar
Anne committed
228
#if NET_AUDIO_SHOW_TRAFFIC
229
	vstr::out() << "CITANetAudioMessage [ Reading ] Finished receiving " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
230
#endif
231
	return true;
232
}
Anne's avatar
Anne committed
233
234
235
236
237
238

int CITANetAudioMessage::GetMessageType() const
{
	return m_nMessageType;
}

239
void CITANetAudioMessage::SetMessageType( int nType )
Anne's avatar
Anne committed
240
{
Anne's avatar
Anne committed
241
	//assert( m_nMessageType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
Anne's avatar
Anne committed
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
	m_nMessageType = nType;
}

int CITANetAudioMessage::GetIncomingMessageSize() const
{
	return m_oIncoming.GetTailSize();
}

int CITANetAudioMessage::GetOutgoingMessageSize() const
{
	return m_oOutgoing.GetBufferSize();
}

bool CITANetAudioMessage::GetOutgoingMessageHasData() const
{
257
	return ( m_oOutgoing.GetBufferSize() > 4 * sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
258
259
}

260
void CITANetAudioMessage::WriteString( const std::string& sValue )
Anne's avatar
Anne committed
261
{
262
263
	m_oOutgoing.WriteInt32( ( VistaType::sint32 )sValue.size() );
	if( !sValue.empty() ) m_oOutgoing.WriteString( sValue );
Anne's avatar
Anne committed
264
265
}

266
void CITANetAudioMessage::WriteInt( const int iValue )
Anne's avatar
Anne committed
267
{
268
	m_oOutgoing.WriteInt32( ( VistaType::sint32 )iValue );
Anne's avatar
Anne committed
269
270
}

271
void CITANetAudioMessage::WriteBool( const bool bValue )
Anne's avatar
Anne committed
272
{
273
	m_oOutgoing.WriteBool( bValue );
Anne's avatar
Anne committed
274
275
}

276
void CITANetAudioMessage::WriteFloat( const float fValue )
Anne's avatar
Anne committed
277
{
278
	m_oOutgoing.WriteFloat32( fValue );
Anne's avatar
Anne committed
279
280
}

281
void CITANetAudioMessage::WriteDouble( const double dValue )
Anne's avatar
Anne committed
282
{
283
	m_oOutgoing.WriteFloat64( dValue );
Anne's avatar
Anne committed
284
285
286
287
288
}

std::string CITANetAudioMessage::ReadString()
{
	VistaType::sint32 nSize;
289
290
	int nReturn = m_oIncoming.ReadInt32( nSize );
	assert( nReturn == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
291
292

	// Empty string?
293
	if( nSize == 0 ) return "";
Anne's avatar
Anne committed
294
295

	std::string sValue;
296
297
	nReturn = m_oIncoming.ReadString( sValue, nSize );
	assert( nReturn == nSize );
Anne's avatar
Anne committed
298
299
300
301
302
303
	return sValue;
}

int CITANetAudioMessage::ReadInt()
{
	VistaType::sint32 nValue;
304
305
306
307
	int nReturn = m_oIncoming.ReadInt32( nValue );
	if( nReturn == -1 )
		ITA_EXCEPT1( UNKNOWN, "Could not read integer value from incoming message" );
	assert( nReturn == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
308
309
310
311
312
313
	return nValue;
}

bool CITANetAudioMessage::ReadBool()
{
	bool bValue;
314
315
	int nReturn = m_oIncoming.ReadBool( bValue );
	assert( nReturn == sizeof( bool ) );
Anne's avatar
Anne committed
316
317
318
319
320
	return bValue;
}
float CITANetAudioMessage::ReadFloat()
{
	float fValue;
321
322
	int nReturn = m_oIncoming.ReadFloat32( fValue );
	assert( nReturn == sizeof( float ) );
Anne's avatar
Anne committed
323
324
325
326
327
	return fValue;
}
double CITANetAudioMessage::ReadDouble()
{
	double dValue;
328
329
	int nReturn = m_oIncoming.ReadFloat64( dValue );
	assert( nReturn == sizeof( double ) );
Anne's avatar
Anne committed
330
331
332
333
	return dValue;
}


334
void CITANetAudioMessage::WriteException( const ITAException& oException )
Anne's avatar
Anne committed
335
{
336
337
338
	WriteInt( oException.iErrorCode );
	WriteString( oException.sModule );
	WriteString( oException.sReason );
Anne's avatar
Anne committed
339
340
341
342
343
344
345
}

ITAException CITANetAudioMessage::ReadException()
{
	int iErrorCode = ReadInt();
	std::string sModule = ReadString();
	std::string sReason = ReadString();
346
	return ITAException( iErrorCode, sModule, sReason );
Anne's avatar
Anne committed
347
348
349
350
351
352
353
354
355
}

VistaConnectionIP* CITANetAudioMessage::GetConnection() const
{
	return m_pConnection;
}

void CITANetAudioMessage::ClearConnection() {
	m_pConnection = NULL;
Anne Heimes's avatar
Anne Heimes committed
356
	delete m_pProtocolLogger;
Anne's avatar
Anne committed
357
358
}

359
void CITANetAudioMessage::WriteIntVector( const std::vector<int> viData )
Anne's avatar
Anne committed
360
{
361
362
363
364
	int iSize = ( int ) viData.size();
	WriteInt( iSize );
	for( int i = 0; i < iSize; i++ )
		WriteInt( viData[ i ] );
Anne's avatar
Anne committed
365
366
367
368
369
370
}

std::vector<int> CITANetAudioMessage::ReadIntVector()
{
	std::vector<int> viData;
	int iSize = ReadInt();
371
372
	for( int i = 0; i < iSize; i++ )
		viData.push_back( ReadInt() );
Anne's avatar
Anne committed
373
374
375
376
377
378
379
380
381

	return viData;
}

CITANetAudioProtocol::StreamingParameters CITANetAudioMessage::ReadStreamingParameters()
{
	CITANetAudioProtocol::StreamingParameters oParams;

	oParams.iChannels = ReadInt();
382
	oParams.dSampleRate = ReadDouble( );
383
384
385
	oParams.iBlockSize = ReadInt();
	oParams.iRingBufferSize = ReadInt();
	oParams.iTargetSampleLatency = ReadInt();
386
	oParams.dTimeIntervalSendInfos = ReadDouble();
Anne's avatar
Anne committed
387
388
389
390

	return oParams;
}

391
void CITANetAudioMessage::WriteStreamingParameters( const CITANetAudioProtocol::StreamingParameters & oParams )
Anne's avatar
Anne committed
392
{
393
394
	WriteInt( oParams.iChannels );
	WriteDouble( oParams.dSampleRate );
395
396
397
	WriteInt(oParams.iBlockSize);
	WriteInt(oParams.iRingBufferSize);
	WriteInt(oParams.iTargetSampleLatency);
398
	WriteDouble(oParams.dTimeIntervalSendInfos);
Anne Heimes's avatar
Anne Heimes committed
399
400
401
402


	std::string paras = std::string("NetAudioLogProtocol") + std::string("_BS") + std::to_string(oParams.iBlockSize) + std::string("_Ch") + std::to_string(oParams.iChannels) + std::string("_tl") + std::to_string(oParams.iTargetSampleLatency) + std::string(".txt");
	m_pProtocolLogger->setOutputFile(paras);
Anne's avatar
Anne committed
403
}
404
405
406
407

int CITANetAudioMessage::ReadRingBufferSize()
{
	return ReadInt();
408
409
410
}

void CITANetAudioMessage::WriteRingBufferSize( const int iBufferSize )
411
{
412
413
414
	WriteInt( iBufferSize );
}

415
416
int CITANetAudioMessage::ReadRingBufferFree()
{
417
418
419
420
421
422
423
424
425
	return ReadInt();
}

void CITANetAudioMessage::WriteRingBufferFree( const int iBufferFree )
{
	WriteInt( iBufferFree );
}

void CITANetAudioMessage::ReadSampleFrame( ITASampleFrame* pSampleFrame )
426
427
{
	int iChannels = ReadInt();
428
429
430
431
432
	int iSamples = ReadInt();
	if( pSampleFrame->channels() != iChannels || pSampleFrame->GetLength() != iSamples )
		pSampleFrame->init( iChannels, iSamples, false );

	for( int i = 0; i < iChannels; i++ )
433
	{
434
		for( int j = 0; j < iSamples; j++ )
435
		{
436
			( *pSampleFrame )[ i ][ j ] = ReadFloat();
437
438
		}
	}
439
440
441
}

void CITANetAudioMessage::WriteSampleFrame( ITASampleFrame* pSamples )
442
{
443
444
445
446
	WriteInt( pSamples->channels() );
	WriteInt( pSamples->GetLength() );

	for( int i = 0; i < pSamples->channels(); i++ )
447
	{
448
		for( int j = 0; j < pSamples->GetLength(); j++ )
449
		{
450
			WriteFloat( ( *pSamples )[ i ][ j ] );
451
452
		}
	}
453
454
}