ITANetAudioStreamingServer.cpp 9.95 KB
Newer Older
Anne's avatar
Anne committed
1 2
#include <ITANetAudioStreamingServer.h>
#include <ITANetAudioServer.h>
3
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
4 5

// ITA includes
Anne's avatar
Anne committed
6
#include <ITADataSource.h>
7
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
8
#include <ITAException.h>
9
#include <ITAStreamInfo.h>
Anne's avatar
Anne committed
10
#include <ITAClock.h>
Anne's avatar
Anne committed
11 12 13 14 15 16 17

// Vista includes
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaInterProcComm/IPNet/VistaTCPSocket.h>
#include <VistaBase/VistaTimeUtils.h>
#include <VistaInterProcComm/IPNet/VistaIPAddress.h>
18
#include <VistaBase/VistaStreamUtils.h>
Anne's avatar
Anne committed
19
#include <ITADataLog.h>
Anne's avatar
Anne committed
20 21 22 23 24

// STL
#include <cmath>
#include <cassert>

25 26 27 28 29 30 31
struct ITAServerLog : public ITALogDataBase
{
	inline static std::ostream& outputDesc( std::ostream& os )
	{
		os << "BlockId";
		os << "\t" << "WorldTimeStamp";
		os << "\t" << "ProtocolStatus";
32 33
		os << "\t" << "EstimatedFreeSamples";
		os << "\t" << "TransmittedSamples";
34 35 36 37 38 39 40 41
		os << std::endl;
		return os;
	};

	inline std::ostream& outputData( std::ostream& os ) const
	{
		os << uiBlockId;
		os << "\t" << std::setprecision( 12 ) << dWorldTimeStamp;
42 43 44
		os << "\t" << sProtocolStatus;
		os << "\t" << iEstimatedFreeSamples;
		os << "\t" << iTransmittedSamples;
45 46 47 48 49 50
		os << std::endl;
		return os;
	};

	unsigned int uiBlockId; //!< Block identifier (audio streaming)
	double dWorldTimeStamp;
51 52 53
	std::string sProtocolStatus;
	int iEstimatedFreeSamples;
	int iTransmittedSamples;
Anne's avatar
Anne committed
54 55 56 57
};

class ITABufferedDataLoggerImplServer : public ITABufferedDataLogger < ITAServerLog > {};

58
CITANetAudioStreamingServer::CITANetAudioStreamingServer()
59 60 61 62 63
	: m_pInputStream( NULL )
	, m_iUpdateStrategy( AUTO )
	, m_pConnection( NULL )
	, m_pNetAudioServer( new CITANetAudioServer() )
	, m_dLastTimeStamp( 0 )
64
	, m_iTargetLatencySamples( -1 )
65
	, m_sServerLogBaseName( "ITANetAudioStreamingServer" )
66
{
Anne's avatar
Anne committed
67
	iServerBlockId = 0;
68
	m_iMaxSendBlocks = 40;
69
	m_iEstimatedClientRingBufferFreeSamples = 0;
Anne's avatar
Anne committed
70 71
}

72 73 74 75
CITANetAudioStreamingServer::~CITANetAudioStreamingServer()
{
	delete m_pNetAudioServer;
	delete m_pServerLogger;
76 77 78

	vstr::out() << "[ ITANetAudioStreamingServer ] Processing statistics: " << m_swTryReadBlockStats.ToString() << std::endl;
	vstr::out() << "[ ITANetAudioStreamingServer ] Try-read access statistics: " << m_swTryReadAccessStats.ToString() << std::endl;
79 80
}

81
bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort, double dTimeIntervalCientSendStatus )
Anne's avatar
Anne committed
82
{
83
	if( !m_pInputStream )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
84 85
		ITA_EXCEPT1( MODAL_EXCEPTION, "Can not start server without a valid input stream" );

86
	if( !m_pNetAudioServer->Start( sAddress, iPort ) ) // blocking
87 88
		return false;

89
	m_pConnection = m_pNetAudioServer->GetConnection();
90

91
	m_pMessage = new CITANetAudioMessage( m_pConnection->GetByteorderSwapFlag() );
Jonas Stienen's avatar
Jonas Stienen committed
92 93
	m_pMessage->SetMessageLoggerBaseName( GetServerLogBaseName() + "_Messages" );

94
	m_pMessage->ResetMessage();
95
	m_pMessage->SetConnection( m_pConnection );
96
	while( !m_pMessage->ReadMessage( 0 ) ); //blocking
97

98 99
	assert( m_pMessage->GetMessageType() == CITANetAudioProtocol::NP_CLIENT_OPEN );
	CITANetAudioProtocol::StreamingParameters oClientParams = m_pMessage->ReadStreamingParameters();
100

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
101
	bool bOK = false;
Anne's avatar
Anne committed
102
	m_oServerParams.iRingBufferSize = oClientParams.iRingBufferSize;
Anne Heimes's avatar
Anne Heimes committed
103
	m_oServerParams.iBlockSize = oClientParams.iBlockSize;
104
	m_iEstimatedClientRingBufferFreeSamples = m_oServerParams.iRingBufferSize;
105 106

	m_dLastTimeStamp = ITAClock::getDefaultClock()->getTime();
107
	if( m_oServerParams == oClientParams )
Anne's avatar
Anne committed
108
	{
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
109
		bOK = true;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
110
#ifdef NET_AUDIO_SHOW_TRAFFIC
111
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters matched. Will resume with streaming" << std::endl;
Anne Heimes's avatar
Anne Heimes committed
112
#endif
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
113 114 115
	}
	else
	{
Anne Heimes's avatar
Anne Heimes committed
116
#ifdef NET_AUDIO_SHOW_TRAFFIC
117
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters mismatch detected. Will notify client and stop." << std::endl;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
118
#endif
119
}
120

121 122
	m_pServerLogger = new ITABufferedDataLoggerImplServer();
	m_pServerLogger->setOutputFile( m_sServerLogBaseName + ".log" );
Anne's avatar
Anne committed
123

124
	m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_OPEN );
Anne Heimes's avatar
Anne Heimes committed
125
	m_pMessage->WriteDouble( dTimeIntervalCientSendStatus );
126
	m_pMessage->WriteMessage();
127

128
	m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels(), m_oServerParams.iRingBufferSize, true );
Anne Heimes's avatar
Anne Heimes committed
129 130


131 132
	if( bOK )
		Run();
133

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
134
	return bOK;
Anne's avatar
Anne committed
135 136
}

137
bool CITANetAudioStreamingServer::LoopBody()
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
138
{
139 140
	const double dNow = ITAClock::getDefaultClock()->getTime();

Anne's avatar
Anne committed
141
	ITAServerLog oLog;
142
	oLog.dWorldTimeStamp = dNow;
Anne's avatar
Anne committed
143
	oLog.uiBlockId = ++iServerBlockId;
144 145 146
	oLog.iTransmittedSamples = 0;
	
	// Sending Samples
147
	int iBlockLength = m_pInputStream->GetBlocklength();
148
	int iEstimatedClientRingBufferTargetLatencyFreeSamples = m_iEstimatedClientRingBufferFreeSamples - ( m_oServerParams.iRingBufferSize - m_iTargetLatencySamples );
149

150
	if( iEstimatedClientRingBufferTargetLatencyFreeSamples >= iBlockLength )
Anne's avatar
Anne committed
151 152
	{
		// Send Samples
153
		int iSendBlocks = iEstimatedClientRingBufferTargetLatencyFreeSamples / iBlockLength;
154

155
		// Besser wre vermutlich, gleich alle samples zu senden und nicht nur einen Block nach dem anderen
156 157 158 159
		if( m_sfTempTransmitBuffer.GetLength() != iBlockLength )
			m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels(), iBlockLength, false );

		for( int j = 0; j < iSendBlocks; j++ )
160
		{
161
			for( int i = 0; i < int( m_pInputStream->GetNumberOfChannels() ); i++ )
162 163
			{
				ITAStreamInfo oStreamInfo;
Anne Heimes's avatar
Anne Heimes committed
164
				oStreamInfo.nSamples = iBlockLength;
165 166

				const float* pfData = m_pInputStream->GetBlockPointer( i, &oStreamInfo );
167
				if( pfData != 0 )
Anne Heimes's avatar
Anne Heimes committed
168
					m_sfTempTransmitBuffer[ i ].write( pfData, iBlockLength, 0 );
169
			}
170

171
			m_pInputStream->IncrementBlockPointer();
172 173 174

			m_pMessage->ResetMessage();
			m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_SENDING_SAMPLES );
Anne Heimes's avatar
Anne Heimes committed
175
			m_pMessage->WriteSampleFrame( &m_sfTempTransmitBuffer );
176
			m_pMessage->WriteMessage();
177
			m_iEstimatedClientRingBufferFreeSamples -= iBlockLength;
Anne Heimes's avatar
Anne Heimes committed
178
		}
179

Anne's avatar
Anne committed
180
#ifdef NET_AUDIO_SHOW_TRAFFIC
181 182
		vstr::out() << "[ITANetAudioStreamingServer] Transmitted " << iSendSamples << " samples for "
			<< m_pInputStream->GetNumberOfChannels() << " channels" << std::endl;
Anne's avatar
Anne committed
183
#endif
184

185 186 187 188
		oLog.iTransmittedSamples = iSendBlocks * m_pInputStream->GetBlocklength();
	}
	
	// Try-read incoming messages from client (e.g. regular status information)
189
	m_pMessage->ResetMessage();
190 191
	m_swTryReadBlockStats.start();
	m_swTryReadAccessStats.start();
192
	if( m_pMessage->ReadMessage( 1 ) )
193
	{
194
		m_swTryReadAccessStats.stop();
195 196
		int iMsgType = m_pMessage->GetMessageType();
		switch( iMsgType )
197
		{
198 199
		case CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES:
		{
200 201
			m_iEstimatedClientRingBufferFreeSamples = m_pMessage->ReadInt();
			m_dLastTimeStamp = dNow;
202 203 204 205
			break;
		}
		case CITANetAudioProtocol::NP_CLIENT_CLOSE:
		{
206 207 208
			// Stop here because answer on client close might be blocking, we don't want that in our statistics
			m_swTryReadBlockStats.stop();

209 210 211 212 213 214 215 216 217 218 219
			m_pMessage->ResetMessage();
			m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_CLOSE );
			m_pMessage->WriteMessage();

			StopGently( false );
			m_pConnection = NULL;
			Stop();
			break;
		}
		default:
		{
220
			vstr::out() << "[ ITANetAudioStreamingServer ] Unkown protocol type : " << iMsgType << std::endl;
221 222
			break;
		}
223
		}
224 225

		oLog.sProtocolStatus = CITANetAudioProtocol::GetNPMessageID( iMsgType );
226
	}
227 228
	else
	{
229 230 231
		// There is no status message, so we estimate the client-side ring buffer status
		const double dTimeDiff = dNow - m_dLastTimeStamp;
		m_dLastTimeStamp = dNow;
232
		double dEstimatedSamples = dTimeDiff * m_pInputStream->GetSampleRate();
233 234
		m_iEstimatedClientRingBufferFreeSamples += ( int ) dEstimatedSamples;
		oLog.sProtocolStatus = "SERVER_ESTIMATION";
235
	}
236 237 238 239 240
	if( m_swTryReadBlockStats.started() ) // only stop if still running
		m_swTryReadBlockStats.stop();
	
	oLog.iEstimatedFreeSamples = m_iEstimatedClientRingBufferFreeSamples;
	m_pServerLogger->log( oLog );
241

Anne's avatar
Anne committed
242
	return false;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
243 244
}

245 246
void CITANetAudioStreamingServer::SetInputStream( ITADatasource* pInStream )
{
247
	if( VistaThreadLoop::IsRunning() )
248 249 250
		ITA_EXCEPT1( MODAL_EXCEPTION, "Streaming loop already running, can not change input stream" );

	m_pInputStream = pInStream;
251 252 253
	m_oServerParams.dSampleRate = m_pInputStream->GetSampleRate();
	m_oServerParams.iBlockSize = m_pInputStream->GetBlocklength();
	m_oServerParams.iChannels = m_pInputStream->GetNumberOfChannels();
254 255
}

256
ITADatasource* CITANetAudioStreamingServer::GetInputStream() const
Anne's avatar
Anne committed
257 258 259 260
{
	return m_pInputStream;
}

261
int CITANetAudioStreamingServer::GetNetStreamBlocklength() const
262
{
263
	return m_sfTempTransmitBuffer.GetLength();
264 265
}

266
int CITANetAudioStreamingServer::GetNetStreamNumberOfChannels() const
267
{
268
	return m_sfTempTransmitBuffer.channels();
269 270
}

271
void CITANetAudioStreamingServer::SetAutomaticUpdateRate()
272 273 274 275
{
	m_iUpdateStrategy = AUTO;
}

276 277 278 279 280 281 282 283 284
void CITANetAudioStreamingServer::SetTargetLatencySamples( const int iTargetLatency )
{
	// Streaming already set up?
	if( IsClientConnected() && m_iTargetLatencySamples < m_oServerParams.iBlockSize )
		ITA_EXCEPT1( INVALID_PARAMETER, "Target latency has to be at least the block size of the audio streaming at client side." );

	m_iTargetLatencySamples = iTargetLatency;
}

285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
void CITANetAudioStreamingServer::SetServerLogBaseName( const std::string& sBaseName )
{
	if( IsClientConnected() )
		ITA_EXCEPT1( MODAL_EXCEPTION, "Streaming and logging already started." );

	assert( !m_sServerLogBaseName.empty() );
	m_sServerLogBaseName = sBaseName;
}

std::string CITANetAudioStreamingServer::GetServerLogBaseName() const
{
	return m_sServerLogBaseName;
}

bool CITANetAudioStreamingServer::IsClientConnected() const
300
{
301
	return m_pNetAudioServer->IsConnected();
302 303
}

304
std::string CITANetAudioStreamingServer::GetNetworkAddress() const
305
{
306
	return m_pNetAudioServer->GetServerAddress();
307 308
}

309
int CITANetAudioStreamingServer::GetNetworkPort() const
310
{
311
	return m_pNetAudioServer->GetNetworkPort();
312 313
}

314
void CITANetAudioStreamingServer::Stop()
Anne's avatar
Anne committed
315
{
316
	m_pNetAudioServer->Stop();
Anne Heimes's avatar
bugfix  
Anne Heimes committed
317
	m_pMessage->ClearConnection();
Anne's avatar
Anne committed
318
}