ITANetAudioStreamingServer.cpp 9.52 KB
Newer Older
Anne's avatar
Anne committed
1 2
#include <ITANetAudioStreamingServer.h>
#include <ITANetAudioServer.h>
3
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
4 5

// ITA includes
Anne's avatar
Anne committed
6
#include <ITADataSource.h>
7
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
8
#include <ITAException.h>
9
#include <ITAStreamInfo.h>
Anne's avatar
Anne committed
10
#include <ITAClock.h>
Anne's avatar
Anne committed
11 12 13 14 15 16 17

// Vista includes
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaInterProcComm/IPNet/VistaTCPSocket.h>
#include <VistaBase/VistaTimeUtils.h>
#include <VistaInterProcComm/IPNet/VistaIPAddress.h>
18
#include <VistaBase/VistaStreamUtils.h>
Anne's avatar
Anne committed
19
#include <ITADataLog.h>
Anne's avatar
Anne committed
20 21 22 23 24

// STL
#include <cmath>
#include <cassert>

25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
struct ITAServerLog : public ITALogDataBase
{
	inline static std::ostream& outputDesc( std::ostream& os )
	{
		os << "BlockId";
		os << "\t" << "WorldTimeStamp";
		os << "\t" << "ProtocolStatus";
		os << "\t" << "FreeSamples";
		os << std::endl;
		return os;
	};

	inline std::ostream& outputData( std::ostream& os ) const
	{
		os << uiBlockId;
		os << "\t" << std::setprecision( 12 ) << dWorldTimeStamp;
		os << "\t" << iProtocolStatus;
		os << "\t" << iFreeSamples;
		os << std::endl;
		return os;
	};

	unsigned int uiBlockId; //!< Block identifier (audio streaming)
	double dWorldTimeStamp;
	int iProtocolStatus; //!< ... usw
	int iFreeSamples;
Anne's avatar
Anne committed
51 52 53 54
};

class ITABufferedDataLoggerImplServer : public ITABufferedDataLogger < ITAServerLog > {};

55 56 57 58 59 60
CITANetAudioStreamingServer::CITANetAudioStreamingServer()
	: m_pInputStream(NULL)
	, m_iUpdateStrategy(AUTO)
	, m_pConnection(NULL)
	, m_pNetAudioServer(new CITANetAudioServer())
	, m_dLastTimeStamp(0)
61
{
Anne's avatar
Anne committed
62
	iServerBlockId = 0;
63
	m_iMaxSendBlocks = 40;
Anne Heimes's avatar
Anne Heimes committed
64
	m_iClientRingBufferFreeSamples = 0;
Anne's avatar
Anne committed
65 66
}

67 68 69 70 71 72
CITANetAudioStreamingServer::~CITANetAudioStreamingServer()
{
	delete m_pNetAudioServer;
	delete m_pServerLogger;
}

73
bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort , double dTimeIntervalCientSendStatus)
Anne's avatar
Anne committed
74
{
Anne's avatar
Anne committed
75
	if ( !m_pInputStream )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
76 77
		ITA_EXCEPT1( MODAL_EXCEPTION, "Can not start server without a valid input stream" );

Anne's avatar
Anne committed
78
	if ( !m_pNetAudioServer->Start( sAddress, iPort ) ) // blocking
79 80
		return false;

Anne's avatar
Anne committed
81
	m_pConnection = m_pNetAudioServer->GetConnection( );
82

Anne's avatar
Anne committed
83 84
	m_pMessage = new CITANetAudioMessage( m_pConnection->GetByteorderSwapFlag( ) );
	m_pMessage->ResetMessage( );
85
	m_pMessage->SetConnection( m_pConnection );
86
	while ( !m_pMessage->ReadMessage( 0 ) ); //blocking
87

Anne's avatar
Anne committed
88 89
	assert( m_pMessage->GetMessageType( ) == CITANetAudioProtocol::NP_CLIENT_OPEN );
	CITANetAudioProtocol::StreamingParameters oClientParams = m_pMessage->ReadStreamingParameters( );
90

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
91
	bool bOK = false;
Anne's avatar
Anne committed
92
	m_oServerParams.iRingBufferSize = oClientParams.iRingBufferSize;
93
	m_oServerParams.iTargetSampleLatency = oClientParams.iTargetSampleLatency;
Anne Heimes's avatar
Anne Heimes committed
94
	m_oServerParams.iBlockSize = oClientParams.iBlockSize;
95
	m_oServerParams.dTimeIntervalSendInfos = dTimeIntervalCientSendStatus;
96 97 98
	m_iClientRingBufferFreeSamples = m_oServerParams.iTargetSampleLatency;

	m_dLastTimeStamp = ITAClock::getDefaultClock()->getTime();
Anne's avatar
Anne committed
99
	if ( m_oServerParams == oClientParams )
Anne's avatar
Anne committed
100
	{
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
101
		bOK = true;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
102
#ifdef NET_AUDIO_SHOW_TRAFFIC
103
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters matched. Will resume with streaming" << std::endl;
Anne Heimes's avatar
Anne Heimes committed
104
#endif
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
105 106 107
	}
	else
	{
Anne Heimes's avatar
Anne Heimes committed
108
#ifdef NET_AUDIO_SHOW_TRAFFIC
109
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters mismatch detected. Will notify client and stop." << std::endl;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
110
#endif
Anne's avatar
Anne committed
111
	}
112

Anne Heimes's avatar
Anne Heimes committed
113
	std::string paras = std::string("NetAudioLogServer") + std::string("_BS") + std::to_string(m_oServerParams.iBlockSize) + std::string("_Ch") + std::to_string(m_oServerParams.iChannels) + std::string("_tl") + std::to_string(m_oServerParams.iTargetSampleLatency) + std::string(".txt");
114
	m_pServerLogger = new ITABufferedDataLoggerImplServer( );
Anne's avatar
Anne committed
115
	m_pServerLogger->setOutputFile( paras );
Anne's avatar
Anne committed
116

117
	m_pMessage->SetMessageType(CITANetAudioProtocol::NP_SERVER_OPEN);
Anne Heimes's avatar
Anne Heimes committed
118
	m_pMessage->WriteDouble( dTimeIntervalCientSendStatus );
Anne's avatar
Anne committed
119
	m_pMessage->WriteMessage( );
120

Anne Heimes's avatar
Anne Heimes committed
121 122 123
	m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels( ), m_oServerParams.iRingBufferSize, true );


Anne's avatar
Anne committed
124 125
	if ( bOK )
		Run( );
126

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
127
	return bOK;
Anne's avatar
Anne committed
128 129
}

Anne's avatar
Anne committed
130
bool CITANetAudioStreamingServer::LoopBody( )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
131
{
132
	bool bAskClient = false;
133
	m_pMessage->ResetMessage( );
Anne's avatar
Anne committed
134 135 136
	ITAServerLog oLog;
	oLog.uiBlockId = ++iServerBlockId;
	int iMsgType;
Anne's avatar
Anne committed
137
	// Sending Samples 
Anne Heimes's avatar
Anne Heimes committed
138
	int iBlockLength = m_pInputStream->GetBlocklength( );
139 140
	int iClientRingBufferTargetLatencyFreeSamples = m_iClientRingBufferFreeSamples - (m_oServerParams.iRingBufferSize - m_oServerParams.iTargetSampleLatency);

Anne Heimes's avatar
Anne Heimes committed
141
	if (iClientRingBufferTargetLatencyFreeSamples >= iBlockLength)
Anne's avatar
Anne committed
142 143
	{
		// Send Samples
Anne Heimes's avatar
Anne Heimes committed
144
		int iSendBlocks = iClientRingBufferTargetLatencyFreeSamples / iBlockLength;
145 146
		bAskClient = true;
		
Anne Heimes's avatar
Anne Heimes committed
147 148
		if ( m_sfTempTransmitBuffer.GetLength( ) != iSendBlocks * iBlockLength )
			m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels( ), iSendBlocks * iBlockLength, false );
149 150 151 152 153 154
		
		for ( int j = 0; j < iSendBlocks; j++ )
		{
			for ( int i = 0; i < int( m_pInputStream->GetNumberOfChannels( ) ); i++ )
			{
				ITAStreamInfo oStreamInfo;
Anne Heimes's avatar
Anne Heimes committed
155
				oStreamInfo.nSamples = iBlockLength;
156 157 158

				const float* pfData = m_pInputStream->GetBlockPointer( i, &oStreamInfo );
				if ( pfData != 0 )
Anne Heimes's avatar
Anne Heimes committed
159
					m_sfTempTransmitBuffer[ i ].write( pfData, iBlockLength, j * iBlockLength );
160 161
			}
			m_pInputStream->IncrementBlockPointer( );
Anne Heimes's avatar
Anne Heimes committed
162 163
		}
		iMsgType = CITANetAudioProtocol::NP_SERVER_SENDING_SAMPLES;
164 165 166
		m_pMessage->SetMessageType(iMsgType);
		m_pMessage->WriteSampleFrame(&m_sfTempTransmitBuffer);
		m_pMessage->WriteMessage();
Anne Heimes's avatar
Anne Heimes committed
167
		m_iClientRingBufferFreeSamples -= iSendBlocks * iBlockLength;
Anne's avatar
Anne committed
168
#ifdef NET_AUDIO_SHOW_TRAFFIC
169 170
		vstr::out() << "[ITANetAudioStreamingServer] Transmitted " << iSendSamples << " samples for "
			<< m_pInputStream->GetNumberOfChannels() << " channels" << std::endl;
Anne's avatar
Anne committed
171
#endif
172 173 174 175
	}
	else
		bAskClient = true;

176

Anne's avatar
Anne committed
177 178

	// Try to Empfange Daten
Anne's avatar
Anne committed
179
	m_pMessage->ResetMessage( );
180

181
	if ( m_pMessage->ReadMessage( 1 ) )
182
	{
Anne's avatar
Anne committed
183 184
		ITAServerLog oLog;
		oLog.uiBlockId = ++iServerBlockId;
Anne's avatar
Anne committed
185 186
		int iMsgType = m_pMessage->GetMessageType( );
		switch ( iMsgType )
187
		{
Anne's avatar
Anne committed
188
			case CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES:
189
			{
190
				m_iClientRingBufferFreeSamples = m_pMessage->ReadInt( );
191
				m_dLastTimeStamp = ITAClock::getDefaultClock()->getTime();
192
				bAskClient = false;
Anne's avatar
Anne committed
193 194 195 196
				break;
			}
			case CITANetAudioProtocol::NP_CLIENT_CLOSE:
			{
197 198 199 200
				m_pMessage->ResetMessage();
				m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_CLOSE );
				m_pMessage->WriteMessage();

Anne's avatar
Anne committed
201
				StopGently( false );
Anne's avatar
Anne committed
202
				m_pConnection = NULL;
Anne's avatar
Anne committed
203
				Stop( );
204
				bAskClient = false;
Anne's avatar
Anne committed
205
				break;
Anne's avatar
Anne committed
206 207 208 209 210 211
			}
			default:
			{
				vstr::out( ) << "[ITANetAudioStreamingServer] Unkown protocol type : " << iMsgType << std::endl;
				break;
			}
212 213 214
		}
		oLog.iFreeSamples = m_iClientRingBufferFreeSamples;
		oLog.iProtocolStatus = iMsgType;
Anne's avatar
Anne committed
215 216
		oLog.dWorldTimeStamp = ITAClock::getDefaultClock( )->getTime( );
		m_pServerLogger->log( oLog );
217
	}
218 219
	else
	{
220 221
		ITAServerLog oLog;
		oLog.uiBlockId = ++iServerBlockId;
Anne Heimes's avatar
bugfix  
Anne Heimes committed
222
		// Neue Samples, bei ca 1ms warten
223 224 225 226 227 228
		const double dTimestamp = ITAClock::getDefaultClock()->getTime();
		const double dTimeDiff = dTimestamp - m_dLastTimeStamp;
		m_dLastTimeStamp = dTimestamp;
		oLog.dWorldTimeStamp = dTimestamp;
		float dEstimatedSamples = dTimeDiff * m_pInputStream->GetSampleRate();
		m_iClientRingBufferFreeSamples += (int)dEstimatedSamples;
229 230 231
		oLog.iFreeSamples = m_iClientRingBufferFreeSamples;
		oLog.iProtocolStatus = 555;
		m_pServerLogger->log(oLog);
232
	}
233

234
	bAskClient = false;
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
	if (bAskClient)
	{
#ifdef NET_AUDIO_SHOW_TRAFFIC
		vstr::out() << "[ITANetAudioStreamingServer] Not enough free samples in client buffer, requesting a trigger when more free samples available" << std::endl;
#endif
		ITAServerLog oLog;
		oLog.uiBlockId = ++iServerBlockId;
		m_pMessage->ResetMessage();
		iMsgType = CITANetAudioProtocol::NP_SERVER_GET_RINGBUFFER_FREE_SAMPLES;
		m_pMessage->SetMessageType(iMsgType);
		m_pMessage->WriteBool(true);
		m_pMessage->WriteMessage();
		oLog.iProtocolStatus = iMsgType;
		oLog.iFreeSamples = m_iClientRingBufferFreeSamples;
		oLog.dWorldTimeStamp = ITAClock::getDefaultClock()->getTime();
		m_pServerLogger->log(oLog);
	}


Anne's avatar
Anne committed
254
	return false;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
255 256
}

257 258
void CITANetAudioStreamingServer::SetInputStream( ITADatasource* pInStream )
{
Anne's avatar
Anne committed
259
	if ( VistaThreadLoop::IsRunning( ) )
260 261 262
		ITA_EXCEPT1( MODAL_EXCEPTION, "Streaming loop already running, can not change input stream" );

	m_pInputStream = pInStream;
Anne's avatar
Anne committed
263 264 265
	m_oServerParams.dSampleRate = m_pInputStream->GetSampleRate( );
	m_oServerParams.iBlockSize = m_pInputStream->GetBlocklength( );
	m_oServerParams.iChannels = m_pInputStream->GetNumberOfChannels( );
266 267
}

Anne's avatar
Anne committed
268
ITADatasource* CITANetAudioStreamingServer::GetInputStream( ) const
Anne's avatar
Anne committed
269 270 271 272
{
	return m_pInputStream;
}

Anne's avatar
Anne committed
273
int CITANetAudioStreamingServer::GetNetStreamBlocklength( ) const
274
{
Anne's avatar
Anne committed
275
	return m_sfTempTransmitBuffer.GetLength( );
276 277
}

Anne's avatar
Anne committed
278
int CITANetAudioStreamingServer::GetNetStreamNumberOfChannels( ) const
279
{
Anne's avatar
Anne committed
280
	return m_sfTempTransmitBuffer.channels( );
281 282
}

Anne's avatar
Anne committed
283
void CITANetAudioStreamingServer::SetAutomaticUpdateRate( )
284 285 286 287
{
	m_iUpdateStrategy = AUTO;
}

Anne's avatar
Anne committed
288
bool CITANetAudioStreamingServer::IsClientConnected( ) const
289
{
Anne's avatar
Anne committed
290
	return m_pNetAudioServer->IsConnected( );
291 292
}

Anne's avatar
Anne committed
293
std::string CITANetAudioStreamingServer::GetNetworkAddress( ) const
294
{
Anne's avatar
Anne committed
295
	return m_pNetAudioServer->GetServerAddress( );
296 297
}

Anne's avatar
Anne committed
298
int CITANetAudioStreamingServer::GetNetworkPort( ) const
299
{
Anne's avatar
Anne committed
300
	return m_pNetAudioServer->GetNetworkPort( );
301 302
}

Anne's avatar
Anne committed
303
void CITANetAudioStreamingServer::Stop( )
Anne's avatar
Anne committed
304
{
Anne's avatar
Anne committed
305
	m_pNetAudioServer->Stop( );
Anne's avatar
Anne committed
306
}