ITANetAudioMessage.cpp 10.9 KB
Newer Older
Anne's avatar
Anne committed
1
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
2
3
4
5
#include <ITAStringUtils.h>

#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaBase/VistaExceptionBase.h>
6
#include <VistaBase/VistaStreamUtils.h>
Anne's avatar
Anne committed
7
#include <ITAClock.h>
8
9
10

#include <cstring>
#include <algorithm>
Anne's avatar
Anne committed
11
#include <cassert>
12
13
#include <iostream>
#include <iomanip>
Anne's avatar
Anne committed
14
15
16

static int S_nMessageIds = 0;

17
18
19
20
CITANetAudioMessage::CITANetAudioMessage( VistaSerializingToolset::ByteOrderSwapBehavior bSwapBuffers )
	: m_vecIncomingBuffer( 2048 )
	, m_oOutgoing( 2048 )
	, m_pConnection( NULL )
21
	, m_iBytesReceivedTotal(0)
Anne's avatar
Anne committed
22
{
23
24
	m_oOutgoing.SetByteorderSwapFlag( bSwapBuffers );
	m_oIncoming.SetByteorderSwapFlag( bSwapBuffers );
Anne's avatar
Anne committed
25
26
27
28
29
	ResetMessage();
}

void CITANetAudioMessage::ResetMessage()
{
30
	if( m_oIncoming.GetTailSize() > 0 )
Anne Heimes's avatar
Anne Heimes committed
31
		vstr::err() << "CITANetAudioMessage::ResetMessage() called before message was fully processed!" << std::endl;
Anne's avatar
Anne committed
32
33
34
35

	// wait till sending is complete -> this prevents us
	// from deleting the buffer while it is still being read
	// by the connection
36
	if (m_pConnection != NULL)
37
		m_pConnection->WaitForSendFinish();
Anne's avatar
Anne committed
38
39
40
41

	m_nMessageId = S_nMessageIds++;

	m_oOutgoing.ClearBuffer();
42
43
44
	m_oOutgoing.WriteInt32( 0 ); // Payload size
	m_oOutgoing.WriteInt32( 0 ); // Message Type
	m_oOutgoing.WriteInt32( 0 ); // Identifier
Anne's avatar
Anne committed
45

46
	m_oIncoming.SetBuffer( NULL, 0 );
Anne's avatar
Anne committed
47

Anne's avatar
Anne committed
48
	m_nMessageType = -1;
Anne's avatar
Anne committed
49

Anne's avatar
Anne committed
50
	//m_pConnection = NULL;
51

Anne's avatar
Anne committed
52
#if NET_AUDIO_SHOW_TRAFFIC
53
	vstr::out() << "CITANetAudioMessage [Preparing] (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
54
#endif
Anne's avatar
Anne committed
55
56
}

57
void CITANetAudioMessage::SetConnection( VistaConnectionIP* pConn )
Anne's avatar
Anne committed
58
59
60
61
62
63
{
	m_pConnection = pConn;
}

void CITANetAudioMessage::WriteMessage()
{
64
	VistaType::byte* pBuffer = ( VistaType::byte* ) m_oOutgoing.GetBuffer();
Anne's avatar
Anne committed
65
66
67
	VistaType::sint32 iSwapDummy;

	// rewrite size dummy
68
69
70
	iSwapDummy = m_oOutgoing.GetBufferSize() - sizeof( VistaType::sint32 );
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
71
	std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
72

73
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
74
75
76

	// rewrite type dummy
	iSwapDummy = m_nMessageType;
77
78
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
79
	std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
80

81
	pBuffer += sizeof( VistaType::sint32 );
Anne's avatar
Anne committed
82
83
84

	// rewrite messageid dummy
	iSwapDummy = m_nMessageId;
85
86
	if( m_oOutgoing.GetByteorderSwapFlag() )
		VistaSerializingToolset::Swap4( &iSwapDummy );
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
87
	std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
88

Anne's avatar
Anne committed
89
#if NET_AUDIO_SHOW_TRAFFIC
90
	vstr::out() << "CITANetAudioMessage [  Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
91
#endif
Anne's avatar
Anne committed
92

93
	try
Anne's avatar
Anne committed
94
	{
95
		// It appears safe to send even very big data payload, so we will send at once
Anne's avatar
Anne committed
96
		int iRawBufferSize = m_oOutgoing.GetBufferSize();
97
98
		int nRet = m_pConnection->Send( m_oOutgoing.GetBuffer(), iRawBufferSize );

Anne's avatar
Anne committed
99
#if NET_AUDIO_SHOW_TRAFFIC
100
		vstr::out() << "CITANetAudioMessage [  Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ") RAW BUFFER DONE" << std::endl;
101
102
#endif

103
		m_pConnection->WaitForSendFinish();
104
105
		//if( nRet != m_oOutgoing.GetBufferSize() )
			//VISTA_THROW( "ITANetAudioMessage: could not send all data from output buffer via network connection", 255 );
Anne's avatar
Anne committed
106
	}
107
	catch (VistaExceptionBase& ex)
Anne's avatar
Anne committed
108
	{
109
		ITA_EXCEPT1( NETWORK_ERROR, ex.GetExceptionText() );
Anne's avatar
Anne committed
110
111
112
113
	}
}


Anne's avatar
Anne committed
114
bool CITANetAudioMessage::ReadMessage( int timeout)
Anne's avatar
Anne committed
115
{
Anne's avatar
Anne committed
116
#if NET_AUDIO_SHOW_TRAFFIC
117
	vstr::out() << "CITANetAudioMessage [ Reading ] Waiting for incoming data" << std::endl;
Anne's avatar
Anne committed
118
#endif
Anne's avatar
Anne committed
119
	// WaitForIncomming Data int in ca ms
Anne's avatar
Anne committed
120
	long nIncomingBytes = m_pConnection->WaitForIncomingData( timeout );
Anne's avatar
Anne committed
121
	// TODO Timer entfernen
Anne Heimes's avatar
Anne Heimes committed
122
	if (nIncomingBytes == -1)
Anne's avatar
Anne committed
123
		return false;
Anne's avatar
Anne committed
124

125
126
	//if (timeout != 0)
		//nIncomingBytes = m_pConnection->WaitForIncomingData( 0 );
Anne's avatar
Anne committed
127
#if NET_AUDIO_SHOW_TRAFFIC
128
	vstr::out() << "CITANetAudioMessage [ Reading ] " << nIncomingBytes << " bytes incoming" << std::endl;
Anne's avatar
Anne committed
129
#endif
130

131
132
	VistaType::sint32 nMessagePayloadSize;
	int nBytesRead = m_pConnection->ReadInt32( nMessagePayloadSize );
133
	assert( nBytesRead == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
134
#if NET_AUDIO_SHOW_TRAFFIC
135
	vstr::out() << "CITANetAudioMessage [ Reading ] Expecting " << nMessagePayloadSize << " bytes message payload" << std::endl;
Anne's avatar
Anne committed
136
#endif
Anne Heimes's avatar
Anne Heimes committed
137
138
	if (nMessagePayloadSize <= 0)
		return false;
139
	// we need at least the two protocol ints
140
	//assert( nMessagePayloadSize >= 2 * sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
141

142
143
	if( nMessagePayloadSize > ( int ) m_vecIncomingBuffer.size() )
		m_vecIncomingBuffer.resize( nMessagePayloadSize );
144
145
	
	// Receive all incoming data (potentially splitted)
146
	
Anne Heimes's avatar
Anne Heimes committed
147
	while (nMessagePayloadSize > m_iBytesReceivedTotal)
Anne's avatar
Anne committed
148
	{
Anne's avatar
Anne committed
149
		int iIncommingBytes = m_pConnection->WaitForIncomingData( 0 );
Anne's avatar
Anne committed
150
151
		int iBytesReceived;
		if ( nMessagePayloadSize < iIncommingBytes )
Anne Heimes's avatar
Anne Heimes committed
152
			iBytesReceived = m_pConnection->Receive(&m_vecIncomingBuffer[m_iBytesReceivedTotal], nMessagePayloadSize - m_iBytesReceivedTotal);
Anne's avatar
Anne committed
153
		else
Anne Heimes's avatar
Anne Heimes committed
154
155
			iBytesReceived = m_pConnection->Receive(&m_vecIncomingBuffer[m_iBytesReceivedTotal], iIncommingBytes);
		m_iBytesReceivedTotal += iBytesReceived;
156
#if NET_AUDIO_SHOW_TRAFFIC
157
		vstr::out() << "[ CITANetAudioMessage ] " << std::setw( 3 ) << std::floor( iBytesReceivedTotal / float( nMessagePayloadSize ) * 100.0f ) << "% transmitted" << std::endl;
Anne's avatar
Anne committed
158
#endif
Anne's avatar
Anne committed
159
	}
160
	m_iBytesReceivedTotal = 0;
Anne's avatar
Anne committed
161

162
163
	// Transfer data into members
	m_oIncoming.SetBuffer( &m_vecIncomingBuffer[ 0 ], nMessagePayloadSize, false );
Anne's avatar
Anne committed
164
165
166
	m_nMessageType = ReadInt();
	m_nMessageId = ReadInt();

Anne's avatar
Anne committed
167
#if NET_AUDIO_SHOW_TRAFFIC
168
	vstr::out() << "CITANetAudioMessage [ Reading ] Finished receiving " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
169
#endif
170
	return true;
171
}
Anne's avatar
Anne committed
172
173
174
175
176
177

int CITANetAudioMessage::GetMessageType() const
{
	return m_nMessageType;
}

178
void CITANetAudioMessage::SetMessageType( int nType )
Anne's avatar
Anne committed
179
{
Anne's avatar
Anne committed
180
	//assert( m_nMessageType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
Anne's avatar
Anne committed
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
	m_nMessageType = nType;
}

int CITANetAudioMessage::GetIncomingMessageSize() const
{
	return m_oIncoming.GetTailSize();
}

int CITANetAudioMessage::GetOutgoingMessageSize() const
{
	return m_oOutgoing.GetBufferSize();
}

bool CITANetAudioMessage::GetOutgoingMessageHasData() const
{
196
	return ( m_oOutgoing.GetBufferSize() > 4 * sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
197
198
}

199
void CITANetAudioMessage::WriteString( const std::string& sValue )
Anne's avatar
Anne committed
200
{
201
202
	m_oOutgoing.WriteInt32( ( VistaType::sint32 )sValue.size() );
	if( !sValue.empty() ) m_oOutgoing.WriteString( sValue );
Anne's avatar
Anne committed
203
204
}

205
void CITANetAudioMessage::WriteInt( const int iValue )
Anne's avatar
Anne committed
206
{
207
	m_oOutgoing.WriteInt32( ( VistaType::sint32 )iValue );
Anne's avatar
Anne committed
208
209
}

210
void CITANetAudioMessage::WriteBool( const bool bValue )
Anne's avatar
Anne committed
211
{
212
	m_oOutgoing.WriteBool( bValue );
Anne's avatar
Anne committed
213
214
}

215
void CITANetAudioMessage::WriteFloat( const float fValue )
Anne's avatar
Anne committed
216
{
217
	m_oOutgoing.WriteFloat32( fValue );
Anne's avatar
Anne committed
218
219
}

220
void CITANetAudioMessage::WriteDouble( const double dValue )
Anne's avatar
Anne committed
221
{
222
	m_oOutgoing.WriteFloat64( dValue );
Anne's avatar
Anne committed
223
224
225
226
227
}

std::string CITANetAudioMessage::ReadString()
{
	VistaType::sint32 nSize;
228
229
	int nReturn = m_oIncoming.ReadInt32( nSize );
	assert( nReturn == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
230
231

	// Empty string?
232
	if( nSize == 0 ) return "";
Anne's avatar
Anne committed
233
234

	std::string sValue;
235
236
	nReturn = m_oIncoming.ReadString( sValue, nSize );
	assert( nReturn == nSize );
Anne's avatar
Anne committed
237
238
239
240
241
242
	return sValue;
}

int CITANetAudioMessage::ReadInt()
{
	VistaType::sint32 nValue;
243
244
245
246
	int nReturn = m_oIncoming.ReadInt32( nValue );
	if( nReturn == -1 )
		ITA_EXCEPT1( UNKNOWN, "Could not read integer value from incoming message" );
	assert( nReturn == sizeof( VistaType::sint32 ) );
Anne's avatar
Anne committed
247
248
249
250
251
252
	return nValue;
}

bool CITANetAudioMessage::ReadBool()
{
	bool bValue;
253
254
	int nReturn = m_oIncoming.ReadBool( bValue );
	assert( nReturn == sizeof( bool ) );
Anne's avatar
Anne committed
255
256
257
258
259
	return bValue;
}
float CITANetAudioMessage::ReadFloat()
{
	float fValue;
260
261
	int nReturn = m_oIncoming.ReadFloat32( fValue );
	assert( nReturn == sizeof( float ) );
Anne's avatar
Anne committed
262
263
264
265
266
	return fValue;
}
double CITANetAudioMessage::ReadDouble()
{
	double dValue;
267
268
	int nReturn = m_oIncoming.ReadFloat64( dValue );
	assert( nReturn == sizeof( double ) );
Anne's avatar
Anne committed
269
270
271
272
	return dValue;
}


273
void CITANetAudioMessage::WriteException( const ITAException& oException )
Anne's avatar
Anne committed
274
{
275
276
277
	WriteInt( oException.iErrorCode );
	WriteString( oException.sModule );
	WriteString( oException.sReason );
Anne's avatar
Anne committed
278
279
280
281
282
283
284
}

ITAException CITANetAudioMessage::ReadException()
{
	int iErrorCode = ReadInt();
	std::string sModule = ReadString();
	std::string sReason = ReadString();
285
	return ITAException( iErrorCode, sModule, sReason );
Anne's avatar
Anne committed
286
287
288
289
290
291
292
293
294
295
296
}

VistaConnectionIP* CITANetAudioMessage::GetConnection() const
{
	return m_pConnection;
}

void CITANetAudioMessage::ClearConnection() {
	m_pConnection = NULL;
}

297
void CITANetAudioMessage::WriteIntVector( const std::vector<int> viData )
Anne's avatar
Anne committed
298
{
299
300
301
302
	int iSize = ( int ) viData.size();
	WriteInt( iSize );
	for( int i = 0; i < iSize; i++ )
		WriteInt( viData[ i ] );
Anne's avatar
Anne committed
303
304
305
306
307
308
}

std::vector<int> CITANetAudioMessage::ReadIntVector()
{
	std::vector<int> viData;
	int iSize = ReadInt();
309
310
	for( int i = 0; i < iSize; i++ )
		viData.push_back( ReadInt() );
Anne's avatar
Anne committed
311
312
313
314
315
316
317
318
319

	return viData;
}

CITANetAudioProtocol::StreamingParameters CITANetAudioMessage::ReadStreamingParameters()
{
	CITANetAudioProtocol::StreamingParameters oParams;

	oParams.iChannels = ReadInt();
320
	oParams.dSampleRate = ReadDouble( );
321
322
323
	oParams.iBlockSize = ReadInt();
	oParams.iRingBufferSize = ReadInt();
	oParams.iTargetSampleLatency = ReadInt();
324
	oParams.dTimeIntervalSendInfos = ReadDouble();
Anne's avatar
Anne committed
325
326
327
328

	return oParams;
}

329
void CITANetAudioMessage::WriteStreamingParameters( const CITANetAudioProtocol::StreamingParameters & oParams )
Anne's avatar
Anne committed
330
{
331
332
	WriteInt( oParams.iChannels );
	WriteDouble( oParams.dSampleRate );
333
334
335
	WriteInt(oParams.iBlockSize);
	WriteInt(oParams.iRingBufferSize);
	WriteInt(oParams.iTargetSampleLatency);
336
	WriteDouble(oParams.dTimeIntervalSendInfos);
Anne's avatar
Anne committed
337
}
338
339
340
341

int CITANetAudioMessage::ReadRingBufferSize()
{
	return ReadInt();
342
343
344
}

void CITANetAudioMessage::WriteRingBufferSize( const int iBufferSize )
345
{
346
347
348
	WriteInt( iBufferSize );
}

349
350
int CITANetAudioMessage::ReadRingBufferFree()
{
351
352
353
354
355
356
357
358
359
	return ReadInt();
}

void CITANetAudioMessage::WriteRingBufferFree( const int iBufferFree )
{
	WriteInt( iBufferFree );
}

void CITANetAudioMessage::ReadSampleFrame( ITASampleFrame* pSampleFrame )
360
361
{
	int iChannels = ReadInt();
362
363
364
365
366
	int iSamples = ReadInt();
	if( pSampleFrame->channels() != iChannels || pSampleFrame->GetLength() != iSamples )
		pSampleFrame->init( iChannels, iSamples, false );

	for( int i = 0; i < iChannels; i++ )
367
	{
368
		for( int j = 0; j < iSamples; j++ )
369
		{
370
			( *pSampleFrame )[ i ][ j ] = ReadFloat();
371
372
		}
	}
373
374
375
}

void CITANetAudioMessage::WriteSampleFrame( ITASampleFrame* pSamples )
376
{
377
378
379
380
	WriteInt( pSamples->channels() );
	WriteInt( pSamples->GetLength() );

	for( int i = 0; i < pSamples->channels(); i++ )
381
	{
382
		for( int j = 0; j < pSamples->GetLength(); j++ )
383
		{
384
			WriteFloat( ( *pSamples )[ i ][ j ] );
385
386
		}
	}
387
388
}