ITANetAudioStreamingServer.cpp 10.9 KB
Newer Older
Anne's avatar
Anne committed
1
#include <ITANetAudioStreamingServer.h>
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
2
3
4

#include "ITANetAudioServer.h"
#include "ITANetAudioMessage.h"
Anne's avatar
Anne committed
5
6

// ITA includes
Anne's avatar
Anne committed
7
#include <ITADataSource.h>
Anne's avatar
Anne committed
8
#include <ITAException.h>
9
#include <ITAStreamInfo.h>
Anne's avatar
Anne committed
10
#include <ITAClock.h>
Anne's avatar
Anne committed
11
12
13
14
15
16
17

// Vista includes
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaInterProcComm/IPNet/VistaTCPSocket.h>
#include <VistaBase/VistaTimeUtils.h>
#include <VistaInterProcComm/IPNet/VistaIPAddress.h>
18
#include <VistaInterProcComm/Concurrency/VistaPriority.h>
19
#include <VistaBase/VistaStreamUtils.h>
Anne's avatar
Anne committed
20
#include <ITADataLog.h>
Anne's avatar
Anne committed
21
22
23
24
25

// STL
#include <cmath>
#include <cassert>

26
27
28
29
30
31
32
struct ITAServerLog : public ITALogDataBase
{
	inline static std::ostream& outputDesc( std::ostream& os )
	{
		os << "BlockId";
		os << "\t" << "WorldTimeStamp";
		os << "\t" << "ProtocolStatus";
33
34
		os << "\t" << "EstimatedFreeSamples";
		os << "\t" << "TransmittedSamples";
35
36
37
38
39
40
41
42
		os << std::endl;
		return os;
	};

	inline std::ostream& outputData( std::ostream& os ) const
	{
		os << uiBlockId;
		os << "\t" << std::setprecision( 12 ) << dWorldTimeStamp;
43
44
45
		os << "\t" << sProtocolStatus;
		os << "\t" << iEstimatedFreeSamples;
		os << "\t" << iTransmittedSamples;
46
47
48
49
50
51
		os << std::endl;
		return os;
	};

	unsigned int uiBlockId; //!< Block identifier (audio streaming)
	double dWorldTimeStamp;
52
53
54
	std::string sProtocolStatus;
	int iEstimatedFreeSamples;
	int iTransmittedSamples;
Anne's avatar
Anne committed
55
56
57
58
};

class ITABufferedDataLoggerImplServer : public ITABufferedDataLogger < ITAServerLog > {};

59
CITANetAudioStreamingServer::CITANetAudioStreamingServer()
60
61
62
63
	: m_pInputStream( NULL )
	, m_pConnection( NULL )
	, m_pNetAudioServer( new CITANetAudioServer() )
	, m_dLastTimeStamp( 0 )
64
	, m_iTargetLatencySamples( -1 )
65
	, m_sServerLogBaseName( "ITANetAudioStreamingServer" )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
66
	, m_bDebuggingEnabled( false )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
67
68
69
	, m_iMaxSendBlocks( 40 )
	, m_iServerBlockId( 0 )
	, m_iEstimatedClientRingBufferFreeSamples( 0 )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
70
	, m_iClientRingBufferSize( 0 )
71
{
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
72
	// Careful with this:
73
	//SetPriority( VistaPriority::VISTA_MID_PRIORITY );
Anne's avatar
Anne committed
74
75
}

76
77
78
CITANetAudioStreamingServer::~CITANetAudioStreamingServer()
{
	delete m_pNetAudioServer;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
79

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
80
81
82
83
84
85
86
	
	if( GetIsDebuggingEnabled() )
	{
		vstr::out() << "[ ITANetAudioStreamingServer ] Processing statistics: " << m_swTryReadBlockStats.ToString() << std::endl;
		vstr::out() << "[ ITANetAudioStreamingServer ] Try-read access statistics: " << m_swTryReadAccessStats.ToString() << std::endl;
	}
	else
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
87
		m_pServerLogger->setOutputFile( "" ); // disables export
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
88

89
	delete m_pServerLogger;
90

91
92
}

93
bool CITANetAudioStreamingServer::Start(const std::string& sAddress, int iPort, double dTimeIntervalCientSendStatus)
Anne's avatar
Anne committed
94
{
95
	if( !m_pInputStream )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
96
97
		ITA_EXCEPT1( MODAL_EXCEPTION, "Can not start server without a valid input stream" );

98
	if( !m_pNetAudioServer->Start( sAddress, iPort ) ) // blocking
99
100
		return false;

101
	m_pConnection = m_pNetAudioServer->GetConnection();
102

103
	m_pMessage = new CITANetAudioMessage( m_pConnection->GetByteorderSwapFlag() );
Jonas Stienen's avatar
Jonas Stienen committed
104
105
	m_pMessage->SetMessageLoggerBaseName( GetServerLogBaseName() + "_Messages" );

106
	m_pMessage->ResetMessage();
107
	m_pMessage->SetConnection( m_pConnection );
108
	while( !m_pMessage->ReadMessage( 0 ) ); //blocking
109

110
111
	assert( m_pMessage->GetMessageType() == CITANetAudioProtocol::NP_CLIENT_OPEN );
	CITANetAudioProtocol::StreamingParameters oClientParams = m_pMessage->ReadStreamingParameters();
112

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
113
114
115
116
117
	CITANetAudioProtocol::StreamingParameters oServerParams;
	oServerParams.iRingBufferSize = oClientParams.iRingBufferSize;
	oServerParams.iBlockSize = m_pInputStream->GetBlocklength();
	oServerParams.dSampleRate = m_pInputStream->GetSampleRate();
	oServerParams.iChannels = m_pInputStream->GetNumberOfChannels();
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
118

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
119
120
121
	m_iEstimatedClientRingBufferFreeSamples = oServerParams.iRingBufferSize;
	m_iClientRingBufferSize = oClientParams.iRingBufferSize;
	m_iSendingBlockLength = oServerParams.iBlockSize;
122

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
123
	m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels(), oServerParams.iRingBufferSize, true );
124
125
126

	m_pServerLogger = new ITABufferedDataLoggerImplServer();
	m_pServerLogger->setOutputFile( m_sServerLogBaseName + "_Server.log" );
127
	m_dLastTimeStamp = ITAClock::getDefaultClock()->getTime();
128

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
129
	if( oServerParams == oClientParams )
Anne's avatar
Anne committed
130
	{
131
132
133
134
		m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_OPEN );
		m_pMessage->WriteDouble( dTimeIntervalCientSendStatus );
		m_pMessage->WriteMessage();

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
135
#ifdef NET_AUDIO_SHOW_TRAFFIC
136
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters matched. Will resume with streaming" << std::endl;
Anne Heimes's avatar
Anne Heimes committed
137
#endif
138
139
140
141

		Run(); // Start thread loop

		return true;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
142
143
144
	}
	else
	{
145
146
147
		m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_REFUSED_INVALID_PARAMETERS );
		m_pMessage->WriteMessage();

Anne Heimes's avatar
Anne Heimes committed
148
#ifdef NET_AUDIO_SHOW_TRAFFIC
149
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters mismatch detected. Will notify client and stop." << std::endl;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
150
#endif
151

152
153
		return false;
	}
Anne's avatar
Anne committed
154
155
}

156
bool CITANetAudioStreamingServer::LoopBody()
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
157
{
158
159
	const double dNow = ITAClock::getDefaultClock()->getTime();

Anne's avatar
Anne committed
160
	ITAServerLog oLog;
161
	oLog.dWorldTimeStamp = dNow;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
162
	oLog.uiBlockId = ++m_iServerBlockId;
163
164
165
	oLog.iTransmittedSamples = 0;
	
	// Sending Samples
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
166
	int iEstimatedClientRingBufferTargetLatencyFreeSamples = m_iEstimatedClientRingBufferFreeSamples - ( m_iClientRingBufferSize - m_iTargetLatencySamples );
167

168
	if (iEstimatedClientRingBufferTargetLatencyFreeSamples >= m_iSendingBlockLength)
Anne's avatar
Anne committed
169
170
	{
		// Send Samples
171
		int iSendBlocks = iEstimatedClientRingBufferTargetLatencyFreeSamples / m_iSendingBlockLength;
172

173
		// Besser wre vermutlich, gleich alle samples zu senden und nicht nur einen Block nach dem anderen
174
175
		if (m_sfTempTransmitBuffer.GetLength() != m_iSendingBlockLength)
			m_sfTempTransmitBuffer.init(m_pInputStream->GetNumberOfChannels(), m_iSendingBlockLength, false);
176
177

		for( int j = 0; j < iSendBlocks; j++ )
178
		{
179
			for( int i = 0; i < int( m_pInputStream->GetNumberOfChannels() ); i++ )
180
181
			{
				ITAStreamInfo oStreamInfo;
182
				oStreamInfo.nSamples = m_iSendingBlockLength;
183
184

				const float* pfData = m_pInputStream->GetBlockPointer( i, &oStreamInfo );
185
				if( pfData != 0 )
186
					m_sfTempTransmitBuffer[i].write(pfData, m_iSendingBlockLength, 0);
187
			}
188

189
			m_pInputStream->IncrementBlockPointer();
190
191
192

			m_pMessage->ResetMessage();
			m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_SENDING_SAMPLES );
Anne Heimes's avatar
Anne Heimes committed
193
			m_pMessage->WriteSampleFrame( &m_sfTempTransmitBuffer );
194
			m_pMessage->WriteMessage();
195
			m_iEstimatedClientRingBufferFreeSamples -= m_iSendingBlockLength;
Anne Heimes's avatar
Anne Heimes committed
196
		}
197

Anne's avatar
Anne committed
198
#ifdef NET_AUDIO_SHOW_TRAFFIC
199
200
		vstr::out() << "[ITANetAudioStreamingServer] Transmitted " << iSendSamples << " samples for "
			<< m_pInputStream->GetNumberOfChannels() << " channels" << std::endl;
Anne's avatar
Anne committed
201
#endif
202

203
204
205
206
		oLog.iTransmittedSamples = iSendBlocks * m_pInputStream->GetBlocklength();
	}
	
	// Try-read incoming messages from client (e.g. regular status information)
207
	m_pMessage->ResetMessage();
208
209
	m_swTryReadBlockStats.start();
	m_swTryReadAccessStats.start();
210
	if( m_pMessage->ReadMessage( 1 ) )
211
	{
212
		m_swTryReadAccessStats.stop();
213
214
		int iMsgType = m_pMessage->GetMessageType();
		switch( iMsgType )
215
		{
216
217
		case CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES:
		{
218
219
			m_iEstimatedClientRingBufferFreeSamples = m_pMessage->ReadInt();
			m_dLastTimeStamp = dNow;
220
221
222
223
			break;
		}
		case CITANetAudioProtocol::NP_CLIENT_CLOSE:
		{
224
225
226
			// Stop here because answer on client close might be blocking, we don't want that in our statistics
			m_swTryReadBlockStats.stop();

227
228
229
230
231
232
233
234
235
236
237
			m_pMessage->ResetMessage();
			m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_CLOSE );
			m_pMessage->WriteMessage();

			StopGently( false );
			m_pConnection = NULL;
			Stop();
			break;
		}
		default:
		{
238
			vstr::out() << "[ ITANetAudioStreamingServer ] Unkown protocol type : " << iMsgType << std::endl;
239
240
			break;
		}
241
		}
242
243

		oLog.sProtocolStatus = CITANetAudioProtocol::GetNPMessageID( iMsgType );
244
	}
245
246
	else
	{
247
248
249
		// There is no status message, so we estimate the client-side ring buffer status
		const double dTimeDiff = dNow - m_dLastTimeStamp;
		m_dLastTimeStamp = dNow;
250
		double dEstimatedSamples = dTimeDiff * m_pInputStream->GetSampleRate();
251
252
		m_iEstimatedClientRingBufferFreeSamples += ( int ) dEstimatedSamples;
		oLog.sProtocolStatus = "SERVER_ESTIMATION";
253
	}
254
255
256
257
258
	if( m_swTryReadBlockStats.started() ) // only stop if still running
		m_swTryReadBlockStats.stop();
	
	oLog.iEstimatedFreeSamples = m_iEstimatedClientRingBufferFreeSamples;
	m_pServerLogger->log( oLog );
259

Anne's avatar
Anne committed
260
	return false;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
261
262
}

263
264
void CITANetAudioStreamingServer::SetInputStream( ITADatasource* pInStream )
{
265
	if( VistaThreadLoop::IsRunning() )
266
267
268
269
270
		ITA_EXCEPT1( MODAL_EXCEPTION, "Streaming loop already running, can not change input stream" );

	m_pInputStream = pInStream;
}

271
ITADatasource* CITANetAudioStreamingServer::GetInputStream() const
Anne's avatar
Anne committed
272
273
274
275
{
	return m_pInputStream;
}

276
int CITANetAudioStreamingServer::GetNetStreamBlocklength() const
277
{
278
	return m_sfTempTransmitBuffer.GetLength();
279
280
}

281
282
283
284
285
286
287
288
289
290
int CITANetAudioStreamingServer::GetSendingBlockLength() const
{
	return m_iSendingBlockLength;
}

void CITANetAudioStreamingServer::SetSendingBlockLength(const int iSendingBlockLength)
{
	m_iSendingBlockLength = iSendingBlockLength;
}

291
int CITANetAudioStreamingServer::GetNetStreamNumberOfChannels() const
292
{
293
	return m_sfTempTransmitBuffer.channels();
294
295
}

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
296
void CITANetAudioStreamingServer::SetDebuggingEnabled( bool bEnabled )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
297
{
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
298
	m_bDebuggingEnabled = bEnabled;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
299
300
}

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
301
bool CITANetAudioStreamingServer::GetIsDebuggingEnabled() const
302
{
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
303
	return m_bDebuggingEnabled;
304
305
}

306
307
308
void CITANetAudioStreamingServer::SetTargetLatencySamples( const int iTargetLatency )
{
	// Streaming already set up?
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
309
310
311
312
313
314
	if( IsClientConnected() )
		ITA_EXCEPT1( MODAL_EXCEPTION, "Server not connected, client ring buffer unkown" );

	if( m_pInputStream )
		if( m_iTargetLatencySamples < m_pInputStream->GetBlocklength() )
			ITA_EXCEPT1( INVALID_PARAMETER, "Target latency has to be at least the block size of the audio streaming at client side." );
315
316
317
318

	m_iTargetLatencySamples = iTargetLatency;
}

319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
void CITANetAudioStreamingServer::SetServerLogBaseName( const std::string& sBaseName )
{
	if( IsClientConnected() )
		ITA_EXCEPT1( MODAL_EXCEPTION, "Streaming and logging already started." );

	assert( !m_sServerLogBaseName.empty() );
	m_sServerLogBaseName = sBaseName;
}

std::string CITANetAudioStreamingServer::GetServerLogBaseName() const
{
	return m_sServerLogBaseName;
}

bool CITANetAudioStreamingServer::IsClientConnected() const
334
{
335
	return m_pNetAudioServer->IsConnected();
336
337
}

338
std::string CITANetAudioStreamingServer::GetNetworkAddress() const
339
{
340
	return m_pNetAudioServer->GetServerAddress();
341
342
}

343
int CITANetAudioStreamingServer::GetNetworkPort() const
344
{
345
	return m_pNetAudioServer->GetNetworkPort();
346
347
}

348
void CITANetAudioStreamingServer::Stop()
Anne's avatar
Anne committed
349
{
350
	m_pNetAudioServer->Stop();
Anne Heimes's avatar
bugfix    
Anne Heimes committed
351
	m_pMessage->ClearConnection();
Anne's avatar
Anne committed
352
}