ITANetAudioStreamingServer.cpp 9.99 KB
Newer Older
Anne's avatar
Anne committed
1 2
#include <ITANetAudioStreamingServer.h>
#include <ITANetAudioServer.h>
3
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
4 5

// ITA includes
Anne's avatar
Anne committed
6
#include <ITADataSource.h>
7
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
8
#include <ITAException.h>
9
#include <ITAStreamInfo.h>
Anne's avatar
Anne committed
10
#include <ITAClock.h>
Anne's avatar
Anne committed
11 12 13 14 15 16 17

// Vista includes
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaInterProcComm/IPNet/VistaTCPSocket.h>
#include <VistaBase/VistaTimeUtils.h>
#include <VistaInterProcComm/IPNet/VistaIPAddress.h>
18
#include <VistaBase/VistaStreamUtils.h>
Anne's avatar
Anne committed
19
#include <ITADataLog.h>
Anne's avatar
Anne committed
20 21 22 23 24

// STL
#include <cmath>
#include <cassert>

25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
struct ITAServerLog : public ITALogDataBase
{
	inline static std::ostream& outputDesc( std::ostream& os )
	{
		os << "BlockId";
		os << "\t" << "WorldTimeStamp";
		os << "\t" << "ProtocolStatus";
		os << "\t" << "FreeSamples";
		os << std::endl;
		return os;
	};

	inline std::ostream& outputData( std::ostream& os ) const
	{
		os << uiBlockId;
		os << "\t" << std::setprecision( 12 ) << dWorldTimeStamp;
		os << "\t" << iProtocolStatus;
		os << "\t" << iFreeSamples;
		os << std::endl;
		return os;
	};

	unsigned int uiBlockId; //!< Block identifier (audio streaming)
	double dWorldTimeStamp;
	int iProtocolStatus; //!< ... usw
	int iFreeSamples;
Anne's avatar
Anne committed
51 52 53 54
};

class ITABufferedDataLoggerImplServer : public ITABufferedDataLogger < ITAServerLog > {};

55
CITANetAudioStreamingServer::CITANetAudioStreamingServer()
56 57 58 59 60
	: m_pInputStream( NULL )
	, m_iUpdateStrategy( AUTO )
	, m_pConnection( NULL )
	, m_pNetAudioServer( new CITANetAudioServer() )
	, m_dLastTimeStamp( 0 )
61
	, m_iTargetLatencySamples( -1 )
62
	, m_sServerLogBaseName( "ITANetAudioStreamingServer" )
63
{
Anne's avatar
Anne committed
64
	iServerBlockId = 0;
65
	m_iMaxSendBlocks = 40;
Anne Heimes's avatar
Anne Heimes committed
66
	m_iClientRingBufferFreeSamples = 0;
Anne's avatar
Anne committed
67 68
}

69 70 71 72 73 74
CITANetAudioStreamingServer::~CITANetAudioStreamingServer()
{
	delete m_pNetAudioServer;
	delete m_pServerLogger;
}

75
bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort, double dTimeIntervalCientSendStatus )
Anne's avatar
Anne committed
76
{
77
	if( !m_pInputStream )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
78 79
		ITA_EXCEPT1( MODAL_EXCEPTION, "Can not start server without a valid input stream" );

80
	if( !m_pNetAudioServer->Start( sAddress, iPort ) ) // blocking
81 82
		return false;

83
	m_pConnection = m_pNetAudioServer->GetConnection();
84

85
	m_pMessage = new CITANetAudioMessage( m_pConnection->GetByteorderSwapFlag() );
Jonas Stienen's avatar
Jonas Stienen committed
86 87
	m_pMessage->SetMessageLoggerBaseName( GetServerLogBaseName() + "_Messages" );

88
	m_pMessage->ResetMessage();
89
	m_pMessage->SetConnection( m_pConnection );
90
	while( !m_pMessage->ReadMessage( 0 ) ); //blocking
91

92 93
	assert( m_pMessage->GetMessageType() == CITANetAudioProtocol::NP_CLIENT_OPEN );
	CITANetAudioProtocol::StreamingParameters oClientParams = m_pMessage->ReadStreamingParameters();
94

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
95
	bool bOK = false;
Anne's avatar
Anne committed
96
	m_oServerParams.iRingBufferSize = oClientParams.iRingBufferSize;
Anne Heimes's avatar
Anne Heimes committed
97
	m_oServerParams.iBlockSize = oClientParams.iBlockSize;
98
	m_iClientRingBufferFreeSamples = m_oServerParams.iRingBufferSize;
99 100

	m_dLastTimeStamp = ITAClock::getDefaultClock()->getTime();
101
	if( m_oServerParams == oClientParams )
Anne's avatar
Anne committed
102
	{
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
103
		bOK = true;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
104
#ifdef NET_AUDIO_SHOW_TRAFFIC
105
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters matched. Will resume with streaming" << std::endl;
Anne Heimes's avatar
Anne Heimes committed
106
#endif
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
107 108 109
	}
	else
	{
Anne Heimes's avatar
Anne Heimes committed
110
#ifdef NET_AUDIO_SHOW_TRAFFIC
111
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters mismatch detected. Will notify client and stop." << std::endl;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
112
#endif
113
}
114

115 116
	m_pServerLogger = new ITABufferedDataLoggerImplServer();
	m_pServerLogger->setOutputFile( m_sServerLogBaseName + ".log" );
Anne's avatar
Anne committed
117

118
	m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_OPEN );
Anne Heimes's avatar
Anne Heimes committed
119
	m_pMessage->WriteDouble( dTimeIntervalCientSendStatus );
120
	m_pMessage->WriteMessage();
121

122
	m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels(), m_oServerParams.iRingBufferSize, true );
Anne Heimes's avatar
Anne Heimes committed
123 124


125 126
	if( bOK )
		Run();
127

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
128
	return bOK;
Anne's avatar
Anne committed
129 130
}

131
bool CITANetAudioStreamingServer::LoopBody()
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
132
{
133
	bool bAskClient = false;
134
	m_pMessage->ResetMessage();
Anne's avatar
Anne committed
135 136 137
	ITAServerLog oLog;
	oLog.uiBlockId = ++iServerBlockId;
	int iMsgType;
Anne's avatar
Anne committed
138
	// Sending Samples 
139 140
	int iBlockLength = m_pInputStream->GetBlocklength();
	int iClientRingBufferTargetLatencyFreeSamples = m_iClientRingBufferFreeSamples - ( m_oServerParams.iRingBufferSize - m_iTargetLatencySamples );
141

142
	if( iClientRingBufferTargetLatencyFreeSamples >= iBlockLength )
Anne's avatar
Anne committed
143 144
	{
		// Send Samples
Anne Heimes's avatar
Anne Heimes committed
145
		int iSendBlocks = iClientRingBufferTargetLatencyFreeSamples / iBlockLength;
146
		bAskClient = true;
147 148 149 150 151

		if( m_sfTempTransmitBuffer.GetLength() != iBlockLength )
			m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels(), iBlockLength, false );

		for( int j = 0; j < iSendBlocks; j++ )
152
		{
153
			for( int i = 0; i < int( m_pInputStream->GetNumberOfChannels() ); i++ )
154 155
			{
				ITAStreamInfo oStreamInfo;
Anne Heimes's avatar
Anne Heimes committed
156
				oStreamInfo.nSamples = iBlockLength;
157 158

				const float* pfData = m_pInputStream->GetBlockPointer( i, &oStreamInfo );
159
				if( pfData != 0 )
Anne Heimes's avatar
Anne Heimes committed
160
					m_sfTempTransmitBuffer[ i ].write( pfData, iBlockLength, 0 );
161
			}
162
			m_pInputStream->IncrementBlockPointer();
Anne Heimes's avatar
Anne Heimes committed
163 164 165
			iMsgType = CITANetAudioProtocol::NP_SERVER_SENDING_SAMPLES;
			m_pMessage->SetMessageType( iMsgType );
			m_pMessage->WriteSampleFrame( &m_sfTempTransmitBuffer );
166 167 168
			m_pMessage->WriteMessage();
			m_iClientRingBufferFreeSamples -= iBlockLength;
			m_pMessage->ResetMessage();
Anne Heimes's avatar
Anne Heimes committed
169
		}
Anne's avatar
Anne committed
170
#ifdef NET_AUDIO_SHOW_TRAFFIC
171 172
		vstr::out() << "[ITANetAudioStreamingServer] Transmitted " << iSendSamples << " samples for "
			<< m_pInputStream->GetNumberOfChannels() << " channels" << std::endl;
Anne's avatar
Anne committed
173
#endif
174
		}
175 176 177
	else
		bAskClient = true;

178

Anne's avatar
Anne committed
179 180

	// Try to Empfange Daten
181
	m_pMessage->ResetMessage();
182

183
	if( m_pMessage->ReadMessage( 1 ) )
184
	{
Anne's avatar
Anne committed
185 186
		ITAServerLog oLog;
		oLog.uiBlockId = ++iServerBlockId;
187 188
		int iMsgType = m_pMessage->GetMessageType();
		switch( iMsgType )
189
		{
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
		case CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES:
		{
			m_iClientRingBufferFreeSamples = m_pMessage->ReadInt();
			m_dLastTimeStamp = ITAClock::getDefaultClock()->getTime();
			bAskClient = false;
			break;
		}
		case CITANetAudioProtocol::NP_CLIENT_CLOSE:
		{
			m_pMessage->ResetMessage();
			m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_CLOSE );
			m_pMessage->WriteMessage();

			StopGently( false );
			m_pConnection = NULL;
			Stop();
			bAskClient = false;
			break;
		}
		default:
		{
			vstr::out() << "[ITANetAudioStreamingServer] Unkown protocol type : " << iMsgType << std::endl;
			break;
		}
214 215 216
		}
		oLog.iFreeSamples = m_iClientRingBufferFreeSamples;
		oLog.iProtocolStatus = iMsgType;
217
		oLog.dWorldTimeStamp = ITAClock::getDefaultClock()->getTime();
Anne's avatar
Anne committed
218
		m_pServerLogger->log( oLog );
219
	}
220 221
	else
	{
222 223
		ITAServerLog oLog;
		oLog.uiBlockId = ++iServerBlockId;
Anne Heimes's avatar
bugfix  
Anne Heimes committed
224
		// Neue Samples, bei ca 1ms warten
225 226 227 228
		const double dTimestamp = ITAClock::getDefaultClock()->getTime();
		const double dTimeDiff = dTimestamp - m_dLastTimeStamp;
		m_dLastTimeStamp = dTimestamp;
		oLog.dWorldTimeStamp = dTimestamp;
229
		double dEstimatedSamples = dTimeDiff * m_pInputStream->GetSampleRate();
230
		m_iClientRingBufferFreeSamples += ( int ) dEstimatedSamples;
231 232
		oLog.iFreeSamples = m_iClientRingBufferFreeSamples;
		oLog.iProtocolStatus = 555;
233
		m_pServerLogger->log( oLog );
234
	}
235

236
	bAskClient = false;
237
	if( bAskClient )
238 239 240 241 242 243 244 245
	{
#ifdef NET_AUDIO_SHOW_TRAFFIC
		vstr::out() << "[ITANetAudioStreamingServer] Not enough free samples in client buffer, requesting a trigger when more free samples available" << std::endl;
#endif
		ITAServerLog oLog;
		oLog.uiBlockId = ++iServerBlockId;
		m_pMessage->ResetMessage();
		iMsgType = CITANetAudioProtocol::NP_SERVER_GET_RINGBUFFER_FREE_SAMPLES;
246 247
		m_pMessage->SetMessageType( iMsgType );
		m_pMessage->WriteBool( true );
248 249 250 251
		m_pMessage->WriteMessage();
		oLog.iProtocolStatus = iMsgType;
		oLog.iFreeSamples = m_iClientRingBufferFreeSamples;
		oLog.dWorldTimeStamp = ITAClock::getDefaultClock()->getTime();
252
		m_pServerLogger->log( oLog );
253 254 255
	}


Anne's avatar
Anne committed
256
	return false;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
257 258
}

259 260
void CITANetAudioStreamingServer::SetInputStream( ITADatasource* pInStream )
{
261
	if( VistaThreadLoop::IsRunning() )
262 263 264
		ITA_EXCEPT1( MODAL_EXCEPTION, "Streaming loop already running, can not change input stream" );

	m_pInputStream = pInStream;
265 266 267
	m_oServerParams.dSampleRate = m_pInputStream->GetSampleRate();
	m_oServerParams.iBlockSize = m_pInputStream->GetBlocklength();
	m_oServerParams.iChannels = m_pInputStream->GetNumberOfChannels();
268 269
}

270
ITADatasource* CITANetAudioStreamingServer::GetInputStream() const
Anne's avatar
Anne committed
271 272 273 274
{
	return m_pInputStream;
}

275
int CITANetAudioStreamingServer::GetNetStreamBlocklength() const
276
{
277
	return m_sfTempTransmitBuffer.GetLength();
278 279
}

280
int CITANetAudioStreamingServer::GetNetStreamNumberOfChannels() const
281
{
282
	return m_sfTempTransmitBuffer.channels();
283 284
}

285
void CITANetAudioStreamingServer::SetAutomaticUpdateRate()
286 287 288 289
{
	m_iUpdateStrategy = AUTO;
}

290 291 292 293 294 295 296 297 298
void CITANetAudioStreamingServer::SetTargetLatencySamples( const int iTargetLatency )
{
	// Streaming already set up?
	if( IsClientConnected() && m_iTargetLatencySamples < m_oServerParams.iBlockSize )
		ITA_EXCEPT1( INVALID_PARAMETER, "Target latency has to be at least the block size of the audio streaming at client side." );

	m_iTargetLatencySamples = iTargetLatency;
}

299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
void CITANetAudioStreamingServer::SetServerLogBaseName( const std::string& sBaseName )
{
	if( IsClientConnected() )
		ITA_EXCEPT1( MODAL_EXCEPTION, "Streaming and logging already started." );

	assert( !m_sServerLogBaseName.empty() );
	m_sServerLogBaseName = sBaseName;
}

std::string CITANetAudioStreamingServer::GetServerLogBaseName() const
{
	return m_sServerLogBaseName;
}

bool CITANetAudioStreamingServer::IsClientConnected() const
314
{
315
	return m_pNetAudioServer->IsConnected();
316 317
}

318
std::string CITANetAudioStreamingServer::GetNetworkAddress() const
319
{
320
	return m_pNetAudioServer->GetServerAddress();
321 322
}

323
int CITANetAudioStreamingServer::GetNetworkPort() const
324
{
325
	return m_pNetAudioServer->GetNetworkPort();
326 327
}

328
void CITANetAudioStreamingServer::Stop()
Anne's avatar
Anne committed
329
{
330
	m_pNetAudioServer->Stop();
Anne Heimes's avatar
bugfix  
Anne Heimes committed
331
	m_pMessage->ClearConnection();
Anne's avatar
Anne committed
332
}