ITANetAudioStreamingServer.cpp 11.7 KB
Newer Older
Anne's avatar
Anne committed
1
#include <ITANetAudioStreamingServer.h>
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
2 3 4

#include "ITANetAudioServer.h"
#include "ITANetAudioMessage.h"
Anne's avatar
Anne committed
5 6

// ITA includes
Anne's avatar
Anne committed
7
#include <ITADataSource.h>
Anne's avatar
Anne committed
8
#include <ITAException.h>
9
#include <ITAStreamInfo.h>
Anne's avatar
Anne committed
10
#include <ITAClock.h>
Anne's avatar
Anne committed
11 12 13 14 15 16

// Vista includes
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaBase/VistaTimeUtils.h>
#include <VistaInterProcComm/IPNet/VistaIPAddress.h>
17
#include <VistaInterProcComm/Concurrency/VistaPriority.h>
18
#include <VistaBase/VistaStreamUtils.h>
Anne's avatar
Anne committed
19
#include <ITADataLog.h>
Anne's avatar
Anne committed
20 21 22 23 24

// STL
#include <cmath>
#include <cassert>

25
struct CITAServerLog : public ITALogDataBase
26 27 28 29 30 31
{
	inline static std::ostream& outputDesc( std::ostream& os )
	{
		os << "BlockId";
		os << "\t" << "WorldTimeStamp";
		os << "\t" << "ProtocolStatus";
32 33
		os << "\t" << "EstimatedFreeSamples";
		os << "\t" << "TransmittedSamples";
34
		os << "\t" << "EstimatedCorrFactor";
35 36 37 38 39 40 41 42
		os << std::endl;
		return os;
	};

	inline std::ostream& outputData( std::ostream& os ) const
	{
		os << uiBlockId;
		os << "\t" << std::setprecision( 12 ) << dWorldTimeStamp;
43 44 45
		os << "\t" << sProtocolStatus;
		os << "\t" << iEstimatedFreeSamples;
		os << "\t" << iTransmittedSamples;
46
		os << "\t" << dEstimatedCorrFactor;
47 48 49 50 51 52
		os << std::endl;
		return os;
	};

	unsigned int uiBlockId; //!< Block identifier (audio streaming)
	double dWorldTimeStamp;
53 54 55
	std::string sProtocolStatus;
	int iEstimatedFreeSamples;
	int iTransmittedSamples;
56
	double dEstimatedCorrFactor;
Anne's avatar
Anne committed
57 58
};

59
class CITABufferedDataLoggerImplServer : public ITABufferedDataLogger < CITAServerLog > {};
Anne's avatar
Anne committed
60

61
CITANetAudioStreamingServer::CITANetAudioStreamingServer()
62 63 64 65
	: m_pInputStream( NULL )
	, m_pConnection( NULL )
	, m_pNetAudioServer( new CITANetAudioServer() )
	, m_dLastTimeStamp( 0 )
66
	, m_iTargetLatencySamples( -1 )
67
	, m_sServerLogBaseName( "ITANetAudioStreamingServer" )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
68
	, m_bDebuggingEnabled( false )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
69 70 71
	, m_iMaxSendBlocks( 40 )
	, m_iServerBlockId( 0 )
	, m_iEstimatedClientRingBufferFreeSamples( 0 )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
72
	, m_iClientRingBufferSize( 0 )
73
	, m_dEstimatedCorrFactor( 1 )
74 75
	, m_dStreamTimeStart( 0.0f )
	, m_nStreamSampleCounts( 0 )
76
{
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
77
	// Careful with this:
78
	//SetPriority( VistaPriority::VISTA_MID_PRIORITY );
Anne's avatar
Anne committed
79 80
}

81 82 83
CITANetAudioStreamingServer::~CITANetAudioStreamingServer()
{
	delete m_pNetAudioServer;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
84

85

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
86 87 88 89 90 91
	if( GetIsDebuggingEnabled() )
	{
		vstr::out() << "[ ITANetAudioStreamingServer ] Processing statistics: " << m_swTryReadBlockStats.ToString() << std::endl;
		vstr::out() << "[ ITANetAudioStreamingServer ] Try-read access statistics: " << m_swTryReadAccessStats.ToString() << std::endl;
	}
	else
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
92
		m_pServerLogger->setOutputFile( "" ); // disables export
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
93

94
	delete m_pServerLogger;
95

96 97
}

98
bool CITANetAudioStreamingServer::Start( const std::string& sAddress, const int iPort, const double dTimeIntervalCientSendStatus, const bool bUseUDP /* = false */ )
Anne's avatar
Anne committed
99
{
100
	if( !m_pInputStream )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
101 102
		ITA_EXCEPT1( MODAL_EXCEPTION, "Can not start server without a valid input stream" );

103
	if( !m_pNetAudioServer->Start( sAddress, iPort, bUseUDP ) ) // blocking
104 105
		return false;

106
	m_pConnection = m_pNetAudioServer->GetConnection();
107

108
	m_pMessage = new CITANetAudioMessage( m_pConnection->GetByteorderSwapFlag() );
Jonas Stienen's avatar
Jonas Stienen committed
109
	m_pMessage->SetMessageLoggerBaseName( GetServerLogBaseName() + "_Messages" );
Anne Heimes's avatar
Anne Heimes committed
110
	m_pMessage->SetDebuggingEnabled(GetIsDebuggingEnabled());
111
	m_pMessage->ResetMessage();
112
	m_pMessage->SetConnection( m_pConnection );
113
	while( !m_pMessage->ReadMessage( 0 ) ); //blocking
114

115 116
	assert( m_pMessage->GetMessageType() == CITANetAudioProtocol::NP_CLIENT_OPEN );
	CITANetAudioProtocol::StreamingParameters oClientParams = m_pMessage->ReadStreamingParameters();
117

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
118 119 120 121 122
	CITANetAudioProtocol::StreamingParameters oServerParams;
	oServerParams.iRingBufferSize = oClientParams.iRingBufferSize;
	oServerParams.iBlockSize = m_pInputStream->GetBlocklength();
	oServerParams.dSampleRate = m_pInputStream->GetSampleRate();
	oServerParams.iChannels = m_pInputStream->GetNumberOfChannels();
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
123

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
124 125 126
	m_iEstimatedClientRingBufferFreeSamples = oServerParams.iRingBufferSize;
	m_iClientRingBufferSize = oClientParams.iRingBufferSize;
	m_iSendingBlockLength = oServerParams.iBlockSize;
127

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
128
	m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels(), oServerParams.iRingBufferSize, true );
129

130
	m_pServerLogger = new CITABufferedDataLoggerImplServer();
131
	m_pServerLogger->setOutputFile( m_sServerLogBaseName + "_Server.log" );
132
	m_dLastTimeStamp = ITAClock::getDefaultClock()->getTime();
133

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
134
	if( oServerParams == oClientParams )
Anne's avatar
Anne committed
135
	{
136 137 138 139
		m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_OPEN );
		m_pMessage->WriteDouble( dTimeIntervalCientSendStatus );
		m_pMessage->WriteMessage();

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
140
#ifdef NET_AUDIO_SHOW_TRAFFIC
141
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters matched. Will resume with streaming" << std::endl;
Anne Heimes's avatar
Anne Heimes committed
142
#endif
143 144 145 146

		Run(); // Start thread loop

		return true;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
147 148 149
	}
	else
	{
150 151 152
		m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_REFUSED_INVALID_PARAMETERS );
		m_pMessage->WriteMessage();

Anne Heimes's avatar
Anne Heimes committed
153
#ifdef NET_AUDIO_SHOW_TRAFFIC
154
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters mismatch detected. Will notify client and stop." << std::endl;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
155
#endif
156

157 158
		return false;
	}
Anne's avatar
Anne committed
159 160
}

161
bool CITANetAudioStreamingServer::LoopBody()
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
162
{
163 164
	const double dNow = ITAClock::getDefaultClock()->getTime();

165 166 167
	if( m_dStreamTimeStart == 0.0f )
		m_dStreamTimeStart = dNow;

168
	CITAServerLog oLog;
169
	oLog.dWorldTimeStamp = dNow;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
170
	oLog.uiBlockId = ++m_iServerBlockId;
171
	oLog.iTransmittedSamples = 0;
172

173
	// Sending Samples
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
174
	int iEstimatedClientRingBufferTargetLatencyFreeSamples = m_iEstimatedClientRingBufferFreeSamples - ( m_iClientRingBufferSize - m_iTargetLatencySamples );
175

176
	if( iEstimatedClientRingBufferTargetLatencyFreeSamples >= m_iSendingBlockLength )
Anne's avatar
Anne committed
177 178
	{
		// Send Samples
179
		int iSendBlocks = iEstimatedClientRingBufferTargetLatencyFreeSamples / m_iSendingBlockLength;
180

181
		// Besser wre vermutlich, gleich alle samples zu senden und nicht nur einen Block nach dem anderen
182 183
		if( m_sfTempTransmitBuffer.GetLength() != m_iSendingBlockLength )
			m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels(), m_iSendingBlockLength, false );
184 185

		for( int j = 0; j < iSendBlocks; j++ )
186
		{
187
			for( int i = 0; i < int( m_pInputStream->GetNumberOfChannels() ); i++ )
188 189
			{
				ITAStreamInfo oStreamInfo;
190 191
				oStreamInfo.nSamples = ( m_nStreamSampleCounts += m_iSendingBlockLength );
				oStreamInfo.dTimecode = dNow - m_dStreamTimeStart;
192 193

				const float* pfData = m_pInputStream->GetBlockPointer( i, &oStreamInfo );
194
				if( pfData != nullptr )
195
					m_sfTempTransmitBuffer[ i ].write( pfData, m_iSendingBlockLength, 0 );
196
			}
197

198
			m_pInputStream->IncrementBlockPointer();
199 200 201

			m_pMessage->ResetMessage();
			m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_SENDING_SAMPLES );
Anne Heimes's avatar
Anne Heimes committed
202
			m_pMessage->WriteSampleFrame( &m_sfTempTransmitBuffer );
203
			m_pMessage->WriteMessage();
204
			m_iEstimatedClientRingBufferFreeSamples -= m_iSendingBlockLength;
Anne Heimes's avatar
Anne Heimes committed
205
		}
206

Anne's avatar
Anne committed
207
#ifdef NET_AUDIO_SHOW_TRAFFIC
208
		vstr::out() << "[ITANetAudioStreamingServer] Transmitted " << m_iSendingBlockLength << " samples for "
209
			<< m_pInputStream->GetNumberOfChannels() << " channels" << std::endl;
Anne's avatar
Anne committed
210
#endif
211

212 213
		oLog.iTransmittedSamples = iSendBlocks * m_pInputStream->GetBlocklength();
	}
214

215
	// Try-read incoming messages from client (e.g. regular status information)
216
	m_pMessage->ResetMessage();
217 218
	m_swTryReadBlockStats.start();
	m_swTryReadAccessStats.start();
219
	if( m_pMessage->ReadMessage( 1 ) )
220
	{
221
		m_swTryReadAccessStats.stop();
222 223
		int iMsgType = m_pMessage->GetMessageType();
		switch( iMsgType )
224
		{
225 226
		case CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES:
		{
227 228
			m_iEstimatedClientRingBufferFreeSamples = m_pMessage->ReadInt();
			m_dLastTimeStamp = dNow;
229 230 231 232
			break;
		}
		case CITANetAudioProtocol::NP_CLIENT_CLOSE:
		{
233 234 235
			// Stop here because answer on client close might be blocking, we don't want that in our statistics
			m_swTryReadBlockStats.stop();

236 237 238 239 240 241 242 243 244 245 246
			m_pMessage->ResetMessage();
			m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_CLOSE );
			m_pMessage->WriteMessage();

			StopGently( false );
			m_pConnection = NULL;
			Stop();
			break;
		}
		default:
		{
247
			vstr::out() << "[ ITANetAudioStreamingServer ] Unkown protocol type : " << iMsgType << std::endl;
248 249
			break;
		}
250
		}
251 252

		oLog.sProtocolStatus = CITANetAudioProtocol::GetNPMessageID( iMsgType );
253
	}
254 255
	else
	{
256 257 258
		// There is no status message, so we estimate the client-side ring buffer status
		const double dTimeDiff = dNow - m_dLastTimeStamp;
		m_dLastTimeStamp = dNow;
259
		double dEstimatedSamples = m_dEstimatedCorrFactor * dTimeDiff * m_pInputStream->GetSampleRate();
260 261
		m_iEstimatedClientRingBufferFreeSamples += ( int ) dEstimatedSamples;
		oLog.sProtocolStatus = "SERVER_ESTIMATION";
262
	}
263 264
	if( m_swTryReadBlockStats.started() ) // only stop if still running
		m_swTryReadBlockStats.stop();
265

266
	oLog.iEstimatedFreeSamples = m_iEstimatedClientRingBufferFreeSamples;
267
	oLog.dEstimatedCorrFactor = m_dEstimatedCorrFactor;
268
	m_pServerLogger->log( oLog );
269

Anne's avatar
Anne committed
270
	return false;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
271 272
}

273 274
void CITANetAudioStreamingServer::SetInputStream( ITADatasource* pInStream )
{
275
	if( VistaThreadLoop::IsRunning() )
276 277 278 279 280
		ITA_EXCEPT1( MODAL_EXCEPTION, "Streaming loop already running, can not change input stream" );

	m_pInputStream = pInStream;
}

281
ITADatasource* CITANetAudioStreamingServer::GetInputStream() const
Anne's avatar
Anne committed
282 283 284 285
{
	return m_pInputStream;
}

286
int CITANetAudioStreamingServer::GetNetStreamBlocklength() const
287
{
288
	return m_sfTempTransmitBuffer.GetLength();
289 290
}

291 292 293 294 295
int CITANetAudioStreamingServer::GetSendingBlockLength() const
{
	return m_iSendingBlockLength;
}

296
void CITANetAudioStreamingServer::SetSendingBlockLength( const int iSendingBlockLength )
297 298 299 300
{
	m_iSendingBlockLength = iSendingBlockLength;
}

301
int CITANetAudioStreamingServer::GetNetStreamNumberOfChannels() const
302
{
303
	return m_sfTempTransmitBuffer.channels();
304 305
}

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
306
void CITANetAudioStreamingServer::SetDebuggingEnabled( bool bEnabled )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
307
{
Anne Heimes's avatar
Anne Heimes committed
308
	
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
309
	m_bDebuggingEnabled = bEnabled;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
310 311
}

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
312
bool CITANetAudioStreamingServer::GetIsDebuggingEnabled() const
313
{
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
314
	return m_bDebuggingEnabled;
315 316
}

317 318 319
void CITANetAudioStreamingServer::SetTargetLatencySamples( const int iTargetLatency )
{
	// Streaming already set up?
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
320 321 322 323
	if( IsClientConnected() )
		ITA_EXCEPT1( MODAL_EXCEPTION, "Server not connected, client ring buffer unkown" );

	if( m_pInputStream )
Anne Heimes's avatar
Anne Heimes committed
324
		if ( iTargetLatency < int( m_pInputStream->GetBlocklength( ) ) )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
325
			ITA_EXCEPT1( INVALID_PARAMETER, "Target latency has to be at least the block size of the audio streaming at client side." );
326 327 328 329

	m_iTargetLatencySamples = iTargetLatency;
}

330 331 332 333 334 335 336
void CITANetAudioStreamingServer::SetServerLogBaseName( const std::string& sBaseName )
{
	if( IsClientConnected() )
		ITA_EXCEPT1( MODAL_EXCEPTION, "Streaming and logging already started." );

	assert( !m_sServerLogBaseName.empty() );
	m_sServerLogBaseName = sBaseName;
Anne Heimes's avatar
Anne Heimes committed
337

338 339 340 341 342 343 344 345
}

std::string CITANetAudioStreamingServer::GetServerLogBaseName() const
{
	return m_sServerLogBaseName;
}

bool CITANetAudioStreamingServer::IsClientConnected() const
346
{
347
	return m_pNetAudioServer->IsConnected();
348 349
}

350
std::string CITANetAudioStreamingServer::GetNetworkAddress() const
351
{
352
	return m_pNetAudioServer->GetServerAddress();
353 354
}

355
int CITANetAudioStreamingServer::GetNetworkPort() const
356
{
357
	return m_pNetAudioServer->GetNetworkPort();
358 359
}

360 361 362 363 364 365 366 367 368 369 370 371
double CITANetAudioStreamingServer::GetEstimatedCorrFactor( ) const
{
	return m_dEstimatedCorrFactor;
}

void CITANetAudioStreamingServer::SetEstimatedCorrFactor( double dCorrFactor )
{
	m_dEstimatedCorrFactor = dCorrFactor;
}



372
void CITANetAudioStreamingServer::Stop()
Anne's avatar
Anne committed
373
{
374
	m_pNetAudioServer->Stop();
Anne Heimes's avatar
bugfix  
Anne Heimes committed
375
	m_pMessage->ClearConnection();
Anne's avatar
Anne committed
376
}