Aufgrund einer Wartung wird GitLab am 26.10. zwischen 8:00 und 9:00 Uhr kurzzeitig nicht zur Verfügung stehen. / Due to maintenance, GitLab will be temporarily unavailable on 26.10. between 8:00 and 9:00 am.

ITANetAudioMessage.cpp 13 KB
Newer Older
Anne's avatar
Anne committed
1
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
2
3
4
5
#include <ITAStringUtils.h>

#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaBase/VistaExceptionBase.h>
6
#include <VistaBase/VistaStreamUtils.h>
Anne's avatar
Anne committed
7
#include <ITAClock.h>
Anne Heimes's avatar
Anne Heimes committed
8
#include <ITADataLog.h>
9
10
11

#include <cstring>
#include <algorithm>
Anne's avatar
Anne committed
12
#include <cassert>
13
14
#include <iostream>
#include <iomanip>
Anne's avatar
Anne committed
15
16
17

static int S_nMessageIds = 0;

Anne Heimes's avatar
Anne Heimes committed
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
struct ITAProtocolLog : public ITALogDataBase
{
	inline static std::ostream& outputDesc(std::ostream& os)
	{
		os << "BlockId";
		os << "\t" << "WorldTimeStamp";
		os << "\t" << "MessageType";
		os << "\t" << "Status";
		os << "\t" << "Paketgroesse";
		os << std::endl;
		return os;
	};

	inline std::ostream& outputData(std::ostream& os) const
	{
		os << uiBlockId;
		os << "\t" << std::setprecision(12) << dWorldTimeStamp;
		os << "\t" << iMessageType;
		os << "\t" << iStatus;
		os << "\t" << nMessagePayloadSize;
		os << std::endl;
		return os;
	};

	unsigned int uiBlockId; //!< Block identifier (audio streaming)
	double dWorldTimeStamp;
	int iMessageType; //!< ... usw
	int iStatus; //!< ... usw
	VistaType::sint32 nMessagePayloadSize;

};

class ITABufferedDataLoggerImplProtocol : public ITABufferedDataLogger < ITAProtocolLog > {};

52
53
54
55
CITANetAudioMessage::CITANetAudioMessage( VistaSerializingToolset::ByteOrderSwapBehavior bSwapBuffers )
	: m_vecIncomingBuffer( 2048 )
	, m_oOutgoing( 2048 )
	, m_pConnection( NULL )
56
	, m_iBytesReceivedTotal(0)
Anne's avatar
Anne committed
57
{
Anne Heimes's avatar
Anne Heimes committed
58
59
	m_pProtocolLogger = new ITABufferedDataLoggerImplProtocol();
	m_nMessageId = 0;
60
61
	m_oOutgoing.SetByteorderSwapFlag( bSwapBuffers );
	m_oIncoming.SetByteorderSwapFlag( bSwapBuffers );
Anne's avatar
Anne committed
62
63
64
65
66
	ResetMessage();
}

void CITANetAudioMessage::ResetMessage()
{
Anne Heimes's avatar
Anne Heimes committed
67
68
69
70
71
72
73
74

	ITAProtocolLog oLog;
	oLog.uiBlockId = m_nMessageId;
	oLog.iMessageType = 0;
	oLog.nMessagePayloadSize = 0;

	oLog.iStatus = 2;
	
Anne Heimes's avatar
Anne Heimes committed
75
76
	if (m_oIncoming.GetTailSize() > 0)
	{
Anne Heimes's avatar
Anne Heimes committed
77
		vstr::err() << "CITANetAudioMessage::ResetMessage() called before message was fully processed!" << std::endl;
Anne Heimes's avatar
Anne Heimes committed
78
		oLog.iStatus = -1;		
Anne Heimes's avatar
Anne Heimes committed
79
80
	}

Anne's avatar
Anne committed
81
82
83
	// wait till sending is complete -> this prevents us
	// from deleting the buffer while it is still being read
	// by the connection
84
	if (m_pConnection != NULL)
85
		m_pConnection->WaitForSendFinish();
Anne's avatar
Anne committed
86
87
88
89

	m_nMessageId = S_nMessageIds++;

	m_oOutgoing.ClearBuffer();
90
91
92
	m_oOutgoing.WriteInt32( 0 ); // Payload size
	m_oOutgoing.WriteInt32( 0 ); // Message Type
	m_oOutgoing.WriteInt32( 0 ); // Identifier
Anne's avatar
Anne committed
93

94
	m_oIncoming.SetBuffer( NULL, 0 );
Anne's avatar
Anne committed
95

Anne's avatar
Anne committed
96
	m_nMessageType = -1;
Anne Heimes's avatar
Anne Heimes committed
97
98
99
	oLog.dWorldTimeStamp = ITAClock::getDefaultClock()->getTime();

	m_pProtocolLogger->log(oLog);
Anne's avatar
Anne committed
100

Anne's avatar
Anne committed
101
	//m_pConnection = NULL;
102

Anne's avatar
Anne committed
103
#if NET_AUDIO_SHOW_TRAFFIC
104
	vstr::out() << "CITANetAudioMessage [Preparing] (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
105
#endif
Anne's avatar
Anne committed
106
107
}

108
void CITANetAudioMessage::SetConnection( VistaConnectionIP* pConn )
Anne's avatar
Anne committed
109
110
111
112
113
114
{
	m_pConnection = pConn;
}

void CITANetAudioMessage::WriteMessage()
{
Anne Heimes's avatar
Anne Heimes committed
115
	ITAProtocolLog oLog;
116
	VistaType::byte* pBuffer = ( VistaType::byte* ) m_oOutgoing.GetBuffer();
Anne's avatar
Anne committed
117
118
119
	VistaType::sint32 iSwapDummy;

	// rewrite size dummy
120
	iSwapDummy = m_oOutgoing.GetBufferSize() - sizeof( VistaType::sint32 );
Anne Heimes's avatar
Anne Heimes committed
121
	oLog.nMessagePayloadSize = iSwapDummy;
122
123
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
124
	std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
125

126
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
127
128
129

	// rewrite type dummy
	iSwapDummy = m_nMessageType;
Anne Heimes's avatar
Anne Heimes committed
130
	oLog.iMessageType = m_nMessageType;
131
132
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
133
	std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
134

135
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
136
137
138

	// rewrite messageid dummy
	iSwapDummy = m_nMessageId;
Anne Heimes's avatar
Anne Heimes committed
139
	oLog.uiBlockId = m_nMessageId;
140
141
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
142
	std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
Anne Heimes's avatar
Anne Heimes committed
143
144
145
	oLog.iStatus = 0;
	oLog.dWorldTimeStamp = ITAClock::getDefaultClock()->getTime();
	m_pProtocolLogger->log( oLog );
Anne's avatar
Anne committed
146
#if NET_AUDIO_SHOW_TRAFFIC
147
	vstr::out() << "CITANetAudioMessage [  Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
148
#endif
Anne's avatar
Anne committed
149

150
	try
Anne's avatar
Anne committed
151
	{
152
		// It appears safe to send even very big data payload, so we will send at once
Anne's avatar
Anne committed
153
		int iRawBufferSize = m_oOutgoing.GetBufferSize();
154
155
		int nRet = m_pConnection->Send( m_oOutgoing.GetBuffer(), iRawBufferSize );

Anne's avatar
Anne committed
156
#if NET_AUDIO_SHOW_TRAFFIC
157
		vstr::out() << "CITANetAudioMessage [  Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ") RAW BUFFER DONE" << std::endl;
158
159
#endif

160
		m_pConnection->WaitForSendFinish();
161
162
		//if( nRet != m_oOutgoing.GetBufferSize() )
			//VISTA_THROW( "ITANetAudioMessage: could not send all data from output buffer via network connection", 255 );
Anne's avatar
Anne committed
163
	}
164
	catch (VistaExceptionBase& ex)
Anne's avatar
Anne committed
165
	{
166
		ITA_EXCEPT1( NETWORK_ERROR, ex.GetExceptionText() );
Anne's avatar
Anne committed
167
168
169
170
	}
}


Anne's avatar
Anne committed
171
bool CITANetAudioMessage::ReadMessage( int timeout)
Anne's avatar
Anne committed
172
{
Anne Heimes's avatar
Anne Heimes committed
173
	ITAProtocolLog oLog;
Anne's avatar
Anne committed
174
#if NET_AUDIO_SHOW_TRAFFIC
175
	vstr::out() << "CITANetAudioMessage [ Reading ] Waiting for incoming data" << std::endl;
Anne's avatar
Anne committed
176
#endif
Anne's avatar
Anne committed
177
	// WaitForIncomming Data int in ca ms
Anne's avatar
Anne committed
178
	long nIncomingBytes = m_pConnection->WaitForIncomingData( timeout );
Anne's avatar
Anne committed
179
	// TODO Timer entfernen
Anne Heimes's avatar
Anne Heimes committed
180
	if (nIncomingBytes == -1)
Anne's avatar
Anne committed
181
		return false;
Anne's avatar
Anne committed
182

183
184
	//if (timeout != 0)
		//nIncomingBytes = m_pConnection->WaitForIncomingData( 0 );
Anne's avatar
Anne committed
185
#if NET_AUDIO_SHOW_TRAFFIC
186
	vstr::out() << "CITANetAudioMessage [ Reading ] " << nIncomingBytes << " bytes incoming" << std::endl;
Anne's avatar
Anne committed
187
#endif
188

189
190
	VistaType::sint32 nMessagePayloadSize;
	int nBytesRead = m_pConnection->ReadInt32( nMessagePayloadSize );
Anne Heimes's avatar
Anne Heimes committed
191
	oLog.nMessagePayloadSize = nMessagePayloadSize;
192
	assert( nBytesRead == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
193
#if NET_AUDIO_SHOW_TRAFFIC
194
	vstr::out() << "CITANetAudioMessage [ Reading ] Expecting " << nMessagePayloadSize << " bytes message payload" << std::endl;
Anne's avatar
Anne committed
195
#endif
Anne Heimes's avatar
Anne Heimes committed
196
197
	if (nMessagePayloadSize <= 0)
		return false;
198
	// we need at least the two protocol ints
199
	//assert( nMessagePayloadSize >= 2 * sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
200

201
202
	if( nMessagePayloadSize > ( int ) m_vecIncomingBuffer.size() )
		m_vecIncomingBuffer.resize( nMessagePayloadSize );
203
204
	
	// Receive all incoming data (potentially splitted)
205
	
Anne Heimes's avatar
Anne Heimes committed
206
	while (nMessagePayloadSize > m_iBytesReceivedTotal)
Anne's avatar
Anne committed
207
	{
Anne's avatar
Anne committed
208
		int iIncommingBytes = m_pConnection->WaitForIncomingData( 0 );
Anne's avatar
Anne committed
209
210
		int iBytesReceived;
		if ( nMessagePayloadSize < iIncommingBytes )
Anne Heimes's avatar
Anne Heimes committed
211
			iBytesReceived = m_pConnection->Receive(&m_vecIncomingBuffer[m_iBytesReceivedTotal], nMessagePayloadSize - m_iBytesReceivedTotal);
Anne's avatar
Anne committed
212
		else
Anne Heimes's avatar
Anne Heimes committed
213
214
			iBytesReceived = m_pConnection->Receive(&m_vecIncomingBuffer[m_iBytesReceivedTotal], iIncommingBytes);
		m_iBytesReceivedTotal += iBytesReceived;
215
#if NET_AUDIO_SHOW_TRAFFIC
216
		vstr::out() << "[ CITANetAudioMessage ] " << std::setw( 3 ) << std::floor( iBytesReceivedTotal / float( nMessagePayloadSize ) * 100.0f ) << "% transmitted" << std::endl;
Anne's avatar
Anne committed
217
#endif
Anne's avatar
Anne committed
218
	}
219
	m_iBytesReceivedTotal = 0;
Anne's avatar
Anne committed
220

Anne Heimes's avatar
Anne Heimes committed
221
222
	oLog.iStatus = 1;
	oLog.dWorldTimeStamp = ITAClock::getDefaultClock()->getTime();
223
224
	// Transfer data into members
	m_oIncoming.SetBuffer( &m_vecIncomingBuffer[ 0 ], nMessagePayloadSize, false );
Anne's avatar
Anne committed
225
226
	m_nMessageType = ReadInt();
	m_nMessageId = ReadInt();
Anne Heimes's avatar
Anne Heimes committed
227
228
229
	oLog.iMessageType = m_nMessageType;
	oLog.uiBlockId = m_nMessageId;
	m_pProtocolLogger->log( oLog );
Anne's avatar
Anne committed
230

Anne's avatar
Anne committed
231
#if NET_AUDIO_SHOW_TRAFFIC
232
	vstr::out() << "CITANetAudioMessage [ Reading ] Finished receiving " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
233
#endif
234
	return true;
235
}
Anne's avatar
Anne committed
236
237
238
239
240
241

int CITANetAudioMessage::GetMessageType() const
{
	return m_nMessageType;
}

242
void CITANetAudioMessage::SetMessageType( int nType )
Anne's avatar
Anne committed
243
{
Anne's avatar
Anne committed
244
	//assert( m_nMessageType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
Anne's avatar
Anne committed
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
	m_nMessageType = nType;
}

int CITANetAudioMessage::GetIncomingMessageSize() const
{
	return m_oIncoming.GetTailSize();
}

int CITANetAudioMessage::GetOutgoingMessageSize() const
{
	return m_oOutgoing.GetBufferSize();
}

bool CITANetAudioMessage::GetOutgoingMessageHasData() const
{
260
	return ( m_oOutgoing.GetBufferSize() > 4 * sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
261
262
}

263
void CITANetAudioMessage::WriteString( const std::string& sValue )
Anne's avatar
Anne committed
264
{
265
266
	m_oOutgoing.WriteInt32( ( VistaType::sint32 )sValue.size() );
	if( !sValue.empty() ) m_oOutgoing.WriteString( sValue );
Anne's avatar
Anne committed
267
268
}

269
void CITANetAudioMessage::WriteInt( const int iValue )
Anne's avatar
Anne committed
270
{
271
	m_oOutgoing.WriteInt32( ( VistaType::sint32 )iValue );
Anne's avatar
Anne committed
272
273
}

274
void CITANetAudioMessage::WriteBool( const bool bValue )
Anne's avatar
Anne committed
275
{
276
	m_oOutgoing.WriteBool( bValue );
Anne's avatar
Anne committed
277
278
}

279
void CITANetAudioMessage::WriteFloat( const float fValue )
Anne's avatar
Anne committed
280
{
281
	m_oOutgoing.WriteFloat32( fValue );
Anne's avatar
Anne committed
282
283
}

284
void CITANetAudioMessage::WriteDouble( const double dValue )
Anne's avatar
Anne committed
285
{
286
	m_oOutgoing.WriteFloat64( dValue );
Anne's avatar
Anne committed
287
288
289
290
291
}

std::string CITANetAudioMessage::ReadString()
{
	VistaType::sint32 nSize;
292
293
	int nReturn = m_oIncoming.ReadInt32( nSize );
	assert( nReturn == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
294
295

	// Empty string?
296
	if( nSize == 0 ) return "";
Anne's avatar
Anne committed
297
298

	std::string sValue;
299
300
	nReturn = m_oIncoming.ReadString( sValue, nSize );
	assert( nReturn == nSize );
Anne's avatar
Anne committed
301
302
303
304
305
306
	return sValue;
}

int CITANetAudioMessage::ReadInt()
{
	VistaType::sint32 nValue;
307
308
309
310
	int nReturn = m_oIncoming.ReadInt32( nValue );
	if( nReturn == -1 )
		ITA_EXCEPT1( UNKNOWN, "Could not read integer value from incoming message" );
	assert( nReturn == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
311
312
313
314
315
316
	return nValue;
}

bool CITANetAudioMessage::ReadBool()
{
	bool bValue;
317
318
	int nReturn = m_oIncoming.ReadBool( bValue );
	assert( nReturn == sizeof( bool ) );
Anne's avatar
Anne committed
319
320
321
322
323
	return bValue;
}
float CITANetAudioMessage::ReadFloat()
{
	float fValue;
324
325
	int nReturn = m_oIncoming.ReadFloat32( fValue );
	assert( nReturn == sizeof( float ) );
Anne's avatar
Anne committed
326
327
328
329
330
	return fValue;
}
double CITANetAudioMessage::ReadDouble()
{
	double dValue;
331
332
	int nReturn = m_oIncoming.ReadFloat64( dValue );
	assert( nReturn == sizeof( double ) );
Anne's avatar
Anne committed
333
334
335
336
	return dValue;
}


337
void CITANetAudioMessage::WriteException( const ITAException& oException )
Anne's avatar
Anne committed
338
{
339
340
341
	WriteInt( oException.iErrorCode );
	WriteString( oException.sModule );
	WriteString( oException.sReason );
Anne's avatar
Anne committed
342
343
344
345
346
347
348
}

ITAException CITANetAudioMessage::ReadException()
{
	int iErrorCode = ReadInt();
	std::string sModule = ReadString();
	std::string sReason = ReadString();
349
	return ITAException( iErrorCode, sModule, sReason );
Anne's avatar
Anne committed
350
351
352
353
354
355
356
357
358
}

VistaConnectionIP* CITANetAudioMessage::GetConnection() const
{
	return m_pConnection;
}

void CITANetAudioMessage::ClearConnection() {
	m_pConnection = NULL;
Anne Heimes's avatar
Anne Heimes committed
359
	delete m_pProtocolLogger;
Anne's avatar
Anne committed
360
361
}

362
void CITANetAudioMessage::WriteIntVector( const std::vector<int> viData )
Anne's avatar
Anne committed
363
{
364
365
366
367
	int iSize = ( int ) viData.size();
	WriteInt( iSize );
	for( int i = 0; i < iSize; i++ )
		WriteInt( viData[ i ] );
Anne's avatar
Anne committed
368
369
370
371
372
373
}

std::vector<int> CITANetAudioMessage::ReadIntVector()
{
	std::vector<int> viData;
	int iSize = ReadInt();
374
375
	for( int i = 0; i < iSize; i++ )
		viData.push_back( ReadInt() );
Anne's avatar
Anne committed
376
377
378
379
380
381
382
383
384

	return viData;
}

CITANetAudioProtocol::StreamingParameters CITANetAudioMessage::ReadStreamingParameters()
{
	CITANetAudioProtocol::StreamingParameters oParams;

	oParams.iChannels = ReadInt();
385
	oParams.dSampleRate = ReadDouble( );
386
387
388
	oParams.iBlockSize = ReadInt();
	oParams.iRingBufferSize = ReadInt();
	oParams.iTargetSampleLatency = ReadInt();
389
	oParams.dTimeIntervalSendInfos = ReadDouble();
Anne's avatar
Anne committed
390
391
392
393

	return oParams;
}

394
void CITANetAudioMessage::WriteStreamingParameters( const CITANetAudioProtocol::StreamingParameters & oParams )
Anne's avatar
Anne committed
395
{
396
397
	WriteInt( oParams.iChannels );
	WriteDouble( oParams.dSampleRate );
398
399
400
	WriteInt(oParams.iBlockSize);
	WriteInt(oParams.iRingBufferSize);
	WriteInt(oParams.iTargetSampleLatency);
401
	WriteDouble(oParams.dTimeIntervalSendInfos);
Anne Heimes's avatar
Anne Heimes committed
402
403
404
405


	std::string paras = std::string("NetAudioLogProtocol") + std::string("_BS") + std::to_string(oParams.iBlockSize) + std::string("_Ch") + std::to_string(oParams.iChannels) + std::string("_tl") + std::to_string(oParams.iTargetSampleLatency) + std::string(".txt");
	m_pProtocolLogger->setOutputFile(paras);
Anne's avatar
Anne committed
406
}
407
408
409
410

int CITANetAudioMessage::ReadRingBufferSize()
{
	return ReadInt();
411
412
413
}

void CITANetAudioMessage::WriteRingBufferSize( const int iBufferSize )
414
{
415
416
417
	WriteInt( iBufferSize );
}

418
419
int CITANetAudioMessage::ReadRingBufferFree()
{
420
421
422
423
424
425
426
427
428
	return ReadInt();
}

void CITANetAudioMessage::WriteRingBufferFree( const int iBufferFree )
{
	WriteInt( iBufferFree );
}

void CITANetAudioMessage::ReadSampleFrame( ITASampleFrame* pSampleFrame )
429
430
{
	int iChannels = ReadInt();
431
432
433
434
435
	int iSamples = ReadInt();
	if( pSampleFrame->channels() != iChannels || pSampleFrame->GetLength() != iSamples )
		pSampleFrame->init( iChannels, iSamples, false );

	for( int i = 0; i < iChannels; i++ )
436
	{
437
		for( int j = 0; j < iSamples; j++ )
438
		{
439
			( *pSampleFrame )[ i ][ j ] = ReadFloat();
440
441
		}
	}
442
443
444
}

void CITANetAudioMessage::WriteSampleFrame( ITASampleFrame* pSamples )
445
{
446
447
448
449
	WriteInt( pSamples->channels() );
	WriteInt( pSamples->GetLength() );

	for( int i = 0; i < pSamples->channels(); i++ )
450
	{
451
		for( int j = 0; j < pSamples->GetLength(); j++ )
452
		{
453
			WriteFloat( ( *pSamples )[ i ][ j ] );
454
455
		}
	}
456
457
}