ITANetAudioStreamingServer.cpp 10.7 KB
Newer Older
Anne's avatar
Anne committed
1 2
#include <ITANetAudioStreamingServer.h>
#include <ITANetAudioServer.h>
3
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
4 5

// ITA includes
Anne's avatar
Anne committed
6
#include <ITADataSource.h>
7
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
8
#include <ITAException.h>
9
#include <ITAStreamInfo.h>
Anne's avatar
Anne committed
10
#include <ITAClock.h>
Anne's avatar
Anne committed
11 12 13 14 15 16 17

// Vista includes
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaInterProcComm/IPNet/VistaTCPSocket.h>
#include <VistaBase/VistaTimeUtils.h>
#include <VistaInterProcComm/IPNet/VistaIPAddress.h>
18
#include <VistaInterProcComm/Concurrency/VistaPriority.h>
19
#include <VistaBase/VistaStreamUtils.h>
Anne's avatar
Anne committed
20
#include <ITADataLog.h>
Anne's avatar
Anne committed
21 22 23 24 25

// STL
#include <cmath>
#include <cassert>

26 27 28 29 30 31 32
struct ITAServerLog : public ITALogDataBase
{
	inline static std::ostream& outputDesc( std::ostream& os )
	{
		os << "BlockId";
		os << "\t" << "WorldTimeStamp";
		os << "\t" << "ProtocolStatus";
33 34
		os << "\t" << "EstimatedFreeSamples";
		os << "\t" << "TransmittedSamples";
35 36 37 38 39 40 41 42
		os << std::endl;
		return os;
	};

	inline std::ostream& outputData( std::ostream& os ) const
	{
		os << uiBlockId;
		os << "\t" << std::setprecision( 12 ) << dWorldTimeStamp;
43 44 45
		os << "\t" << sProtocolStatus;
		os << "\t" << iEstimatedFreeSamples;
		os << "\t" << iTransmittedSamples;
46 47 48 49 50 51
		os << std::endl;
		return os;
	};

	unsigned int uiBlockId; //!< Block identifier (audio streaming)
	double dWorldTimeStamp;
52 53 54
	std::string sProtocolStatus;
	int iEstimatedFreeSamples;
	int iTransmittedSamples;
Anne's avatar
Anne committed
55 56 57 58
};

class ITABufferedDataLoggerImplServer : public ITABufferedDataLogger < ITAServerLog > {};

59
CITANetAudioStreamingServer::CITANetAudioStreamingServer()
60 61 62 63
	: m_pInputStream( NULL )
	, m_pConnection( NULL )
	, m_pNetAudioServer( new CITANetAudioServer() )
	, m_dLastTimeStamp( 0 )
64
	, m_iTargetLatencySamples( -1 )
65
	, m_sServerLogBaseName( "ITANetAudioStreamingServer" )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
66 67 68 69
	, m_bExportLogs( false )
	, m_iMaxSendBlocks( 40 )
	, m_iServerBlockId( 0 )
	, m_iEstimatedClientRingBufferFreeSamples( 0 )
70
{
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
71
	// Careful with this:
72
	//SetPriority( VistaPriority::VISTA_MID_PRIORITY );
Anne's avatar
Anne committed
73 74
}

75 76 77
CITANetAudioStreamingServer::~CITANetAudioStreamingServer()
{
	delete m_pNetAudioServer;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
78 79 80

	if( m_bExportLogs == false )
		m_pServerLogger->setOutputFile( "" ); // disables export
81
	delete m_pServerLogger;
82 83 84

	vstr::out() << "[ ITANetAudioStreamingServer ] Processing statistics: " << m_swTryReadBlockStats.ToString() << std::endl;
	vstr::out() << "[ ITANetAudioStreamingServer ] Try-read access statistics: " << m_swTryReadAccessStats.ToString() << std::endl;
85 86
}

87
bool CITANetAudioStreamingServer::Start(const std::string& sAddress, int iPort, double dTimeIntervalCientSendStatus)
Anne's avatar
Anne committed
88
{
89
	if( !m_pInputStream )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
90 91
		ITA_EXCEPT1( MODAL_EXCEPTION, "Can not start server without a valid input stream" );

92
	if( !m_pNetAudioServer->Start( sAddress, iPort ) ) // blocking
93 94
		return false;

95
	m_pConnection = m_pNetAudioServer->GetConnection();
96

97
	m_pMessage = new CITANetAudioMessage( m_pConnection->GetByteorderSwapFlag() );
Jonas Stienen's avatar
Jonas Stienen committed
98 99
	m_pMessage->SetMessageLoggerBaseName( GetServerLogBaseName() + "_Messages" );

100
	m_pMessage->ResetMessage();
101
	m_pMessage->SetConnection( m_pConnection );
102
	while( !m_pMessage->ReadMessage( 0 ) ); //blocking
103

104 105
	assert( m_pMessage->GetMessageType() == CITANetAudioProtocol::NP_CLIENT_OPEN );
	CITANetAudioProtocol::StreamingParameters oClientParams = m_pMessage->ReadStreamingParameters();
106

Anne's avatar
Anne committed
107
	m_oServerParams.iRingBufferSize = oClientParams.iRingBufferSize;
Anne Heimes's avatar
Anne Heimes committed
108
	m_oServerParams.iBlockSize = oClientParams.iBlockSize;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
109

110
	m_iEstimatedClientRingBufferFreeSamples = m_oServerParams.iRingBufferSize;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
111
	m_iSendingBlockLength = m_oServerParams.iBlockSize;
112

113 114 115 116
	m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels(), m_oServerParams.iRingBufferSize, true );

	m_pServerLogger = new ITABufferedDataLoggerImplServer();
	m_pServerLogger->setOutputFile( m_sServerLogBaseName + "_Server.log" );
117
	m_dLastTimeStamp = ITAClock::getDefaultClock()->getTime();
118

119
	if( m_oServerParams == oClientParams )
Anne's avatar
Anne committed
120
	{
121 122 123 124
		m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_OPEN );
		m_pMessage->WriteDouble( dTimeIntervalCientSendStatus );
		m_pMessage->WriteMessage();

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
125
#ifdef NET_AUDIO_SHOW_TRAFFIC
126
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters matched. Will resume with streaming" << std::endl;
Anne Heimes's avatar
Anne Heimes committed
127
#endif
128 129 130 131

		Run(); // Start thread loop

		return true;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
132 133 134
	}
	else
	{
135 136 137
		m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_REFUSED_INVALID_PARAMETERS );
		m_pMessage->WriteMessage();

Anne Heimes's avatar
Anne Heimes committed
138
#ifdef NET_AUDIO_SHOW_TRAFFIC
139
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters mismatch detected. Will notify client and stop." << std::endl;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
140
#endif
141

142 143
		return false;
	}
Anne's avatar
Anne committed
144 145
}

146
bool CITANetAudioStreamingServer::LoopBody()
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
147
{
148 149
	const double dNow = ITAClock::getDefaultClock()->getTime();

Anne's avatar
Anne committed
150
	ITAServerLog oLog;
151
	oLog.dWorldTimeStamp = dNow;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
152
	oLog.uiBlockId = ++m_iServerBlockId;
153 154 155 156
	oLog.iTransmittedSamples = 0;
	
	// Sending Samples
	int iEstimatedClientRingBufferTargetLatencyFreeSamples = m_iEstimatedClientRingBufferFreeSamples - ( m_oServerParams.iRingBufferSize - m_iTargetLatencySamples );
157

158
	if (iEstimatedClientRingBufferTargetLatencyFreeSamples >= m_iSendingBlockLength)
Anne's avatar
Anne committed
159 160
	{
		// Send Samples
161
		int iSendBlocks = iEstimatedClientRingBufferTargetLatencyFreeSamples / m_iSendingBlockLength;
162

163
		// Besser wre vermutlich, gleich alle samples zu senden und nicht nur einen Block nach dem anderen
164 165
		if (m_sfTempTransmitBuffer.GetLength() != m_iSendingBlockLength)
			m_sfTempTransmitBuffer.init(m_pInputStream->GetNumberOfChannels(), m_iSendingBlockLength, false);
166 167

		for( int j = 0; j < iSendBlocks; j++ )
168
		{
169
			for( int i = 0; i < int( m_pInputStream->GetNumberOfChannels() ); i++ )
170 171
			{
				ITAStreamInfo oStreamInfo;
172
				oStreamInfo.nSamples = m_iSendingBlockLength;
173 174

				const float* pfData = m_pInputStream->GetBlockPointer( i, &oStreamInfo );
175
				if( pfData != 0 )
176
					m_sfTempTransmitBuffer[i].write(pfData, m_iSendingBlockLength, 0);
177
			}
178

179
			m_pInputStream->IncrementBlockPointer();
180 181 182

			m_pMessage->ResetMessage();
			m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_SENDING_SAMPLES );
Anne Heimes's avatar
Anne Heimes committed
183
			m_pMessage->WriteSampleFrame( &m_sfTempTransmitBuffer );
184
			m_pMessage->WriteMessage();
185
			m_iEstimatedClientRingBufferFreeSamples -= m_iSendingBlockLength;
Anne Heimes's avatar
Anne Heimes committed
186
		}
187

Anne's avatar
Anne committed
188
#ifdef NET_AUDIO_SHOW_TRAFFIC
189 190
		vstr::out() << "[ITANetAudioStreamingServer] Transmitted " << iSendSamples << " samples for "
			<< m_pInputStream->GetNumberOfChannels() << " channels" << std::endl;
Anne's avatar
Anne committed
191
#endif
192

193 194 195 196
		oLog.iTransmittedSamples = iSendBlocks * m_pInputStream->GetBlocklength();
	}
	
	// Try-read incoming messages from client (e.g. regular status information)
197
	m_pMessage->ResetMessage();
198 199
	m_swTryReadBlockStats.start();
	m_swTryReadAccessStats.start();
200
	if( m_pMessage->ReadMessage( 1 ) )
201
	{
202
		m_swTryReadAccessStats.stop();
203 204
		int iMsgType = m_pMessage->GetMessageType();
		switch( iMsgType )
205
		{
206 207
		case CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES:
		{
208 209
			m_iEstimatedClientRingBufferFreeSamples = m_pMessage->ReadInt();
			m_dLastTimeStamp = dNow;
210 211 212 213
			break;
		}
		case CITANetAudioProtocol::NP_CLIENT_CLOSE:
		{
214 215 216
			// Stop here because answer on client close might be blocking, we don't want that in our statistics
			m_swTryReadBlockStats.stop();

217 218 219 220 221 222 223 224 225 226 227
			m_pMessage->ResetMessage();
			m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_CLOSE );
			m_pMessage->WriteMessage();

			StopGently( false );
			m_pConnection = NULL;
			Stop();
			break;
		}
		default:
		{
228
			vstr::out() << "[ ITANetAudioStreamingServer ] Unkown protocol type : " << iMsgType << std::endl;
229 230
			break;
		}
231
		}
232 233

		oLog.sProtocolStatus = CITANetAudioProtocol::GetNPMessageID( iMsgType );
234
	}
235 236
	else
	{
237 238 239
		// There is no status message, so we estimate the client-side ring buffer status
		const double dTimeDiff = dNow - m_dLastTimeStamp;
		m_dLastTimeStamp = dNow;
240
		double dEstimatedSamples = dTimeDiff * m_pInputStream->GetSampleRate();
241 242
		m_iEstimatedClientRingBufferFreeSamples += ( int ) dEstimatedSamples;
		oLog.sProtocolStatus = "SERVER_ESTIMATION";
243
	}
244 245 246 247 248
	if( m_swTryReadBlockStats.started() ) // only stop if still running
		m_swTryReadBlockStats.stop();
	
	oLog.iEstimatedFreeSamples = m_iEstimatedClientRingBufferFreeSamples;
	m_pServerLogger->log( oLog );
249

Anne's avatar
Anne committed
250
	return false;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
251 252
}

253 254
void CITANetAudioStreamingServer::SetInputStream( ITADatasource* pInStream )
{
255
	if( VistaThreadLoop::IsRunning() )
256 257 258
		ITA_EXCEPT1( MODAL_EXCEPTION, "Streaming loop already running, can not change input stream" );

	m_pInputStream = pInStream;
259 260 261
	m_oServerParams.dSampleRate = m_pInputStream->GetSampleRate();
	m_oServerParams.iBlockSize = m_pInputStream->GetBlocklength();
	m_oServerParams.iChannels = m_pInputStream->GetNumberOfChannels();
262 263
}

264
ITADatasource* CITANetAudioStreamingServer::GetInputStream() const
Anne's avatar
Anne committed
265 266 267 268
{
	return m_pInputStream;
}

269
int CITANetAudioStreamingServer::GetNetStreamBlocklength() const
270
{
271
	return m_sfTempTransmitBuffer.GetLength();
272 273
}

274 275 276 277 278 279 280 281 282 283
int CITANetAudioStreamingServer::GetSendingBlockLength() const
{
	return m_iSendingBlockLength;
}

void CITANetAudioStreamingServer::SetSendingBlockLength(const int iSendingBlockLength)
{
	m_iSendingBlockLength = iSendingBlockLength;
}

284
int CITANetAudioStreamingServer::GetNetStreamNumberOfChannels() const
285
{
286
	return m_sfTempTransmitBuffer.channels();
287 288
}

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
289 290 291 292 293 294
void CITANetAudioStreamingServer::SetLoggingExportEnabled( bool bEnabled )
{
	m_bExportLogs = bEnabled;
}

bool CITANetAudioStreamingServer::GetLoggingExportEnabled() const
295
{
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
296
	return m_bExportLogs;
297 298
}

299 300 301 302 303 304 305 306 307
void CITANetAudioStreamingServer::SetTargetLatencySamples( const int iTargetLatency )
{
	// Streaming already set up?
	if( IsClientConnected() && m_iTargetLatencySamples < m_oServerParams.iBlockSize )
		ITA_EXCEPT1( INVALID_PARAMETER, "Target latency has to be at least the block size of the audio streaming at client side." );

	m_iTargetLatencySamples = iTargetLatency;
}

308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
void CITANetAudioStreamingServer::SetServerLogBaseName( const std::string& sBaseName )
{
	if( IsClientConnected() )
		ITA_EXCEPT1( MODAL_EXCEPTION, "Streaming and logging already started." );

	assert( !m_sServerLogBaseName.empty() );
	m_sServerLogBaseName = sBaseName;
}

std::string CITANetAudioStreamingServer::GetServerLogBaseName() const
{
	return m_sServerLogBaseName;
}

bool CITANetAudioStreamingServer::IsClientConnected() const
323
{
324
	return m_pNetAudioServer->IsConnected();
325 326
}

327
std::string CITANetAudioStreamingServer::GetNetworkAddress() const
328
{
329
	return m_pNetAudioServer->GetServerAddress();
330 331
}

332
int CITANetAudioStreamingServer::GetNetworkPort() const
333
{
334
	return m_pNetAudioServer->GetNetworkPort();
335 336
}

337
void CITANetAudioStreamingServer::Stop()
Anne's avatar
Anne committed
338
{
339
	m_pNetAudioServer->Stop();
Anne Heimes's avatar
bugfix  
Anne Heimes committed
340
	m_pMessage->ClearConnection();
Anne's avatar
Anne committed
341
}