ITANetAudioStreamingServer.cpp 7.57 KB
Newer Older
Anne's avatar
Anne committed
1 2
#include <ITANetAudioStreamingServer.h>
#include <ITANetAudioServer.h>
3
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
4 5

// ITA includes
Anne's avatar
Anne committed
6
#include <ITADataSource.h>
7
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
8
#include <ITAException.h>
Anne's avatar
Anne committed
9 10
#include <ITAStreamInfo.h>
#include <ITAClock.h>
Anne's avatar
Anne committed
11 12 13 14 15 16 17

// Vista includes
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaInterProcComm/IPNet/VistaTCPSocket.h>
#include <VistaBase/VistaTimeUtils.h>
#include <VistaInterProcComm/IPNet/VistaIPAddress.h>
Anne's avatar
Anne committed
18 19
#include <VistaBase/VistaStreamUtils.h>
#include <ITADataLog.h>
Anne's avatar
Anne committed
20 21 22 23 24

// STL
#include <cmath>
#include <cassert>

Anne's avatar
Anne committed
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
struct ITAServerLog : public ITALogDataBase
{
	inline static std::ostream& outputDesc( std::ostream& os )
	{
		os << "BlockId";
		os << "\t" << "WorldTimeStamp";
		os << "\t" << "ProtocolStatus";
		os << "\t" << "FreeSamples";
		os << std::endl;
		return os;
	};

	inline std::ostream& outputData( std::ostream& os ) const
	{
		os << uiBlockId;
		os << "\t" << std::setprecision( 12 ) << dWorldTimeStamp;
		os << "\t" << iProtocolStatus;
		os << "\t" << iFreeSamples;
		os << std::endl;
		return os;
	};

	unsigned int uiBlockId; //!< Block identifier (audio streaming)
	double dWorldTimeStamp;
	int iProtocolStatus; //!< ... usw
	int iFreeSamples;
};

class ITABufferedDataLoggerImplServer : public ITABufferedDataLogger < ITAServerLog > {};

Anne's avatar
Anne committed
55 56 57 58 59
CITANetAudioStreamingServer::CITANetAudioStreamingServer( )
: m_pInputStream( NULL )
, m_iUpdateStrategy( AUTO )
, m_pConnection( NULL )
, m_pNetAudioServer( new CITANetAudioServer( ) )
Anne's avatar
Anne committed
60 61
{
	iServerBlockId = 0;
Anne's avatar
Anne committed
62 63
}

64
bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort )
Anne's avatar
Anne committed
65
{
Anne's avatar
Anne committed
66
	if ( !m_pInputStream )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
67 68
		ITA_EXCEPT1( MODAL_EXCEPTION, "Can not start server without a valid input stream" );

Anne's avatar
Anne committed
69
	if ( !m_pNetAudioServer->Start( sAddress, iPort ) ) // blocking
70 71
		return false;

Anne's avatar
Anne committed
72
	m_pConnection = m_pNetAudioServer->GetConnection( );
73

Anne's avatar
Anne committed
74 75
	m_pMessage = new CITANetAudioMessage( m_pConnection->GetByteorderSwapFlag( ) );
	m_pMessage->ResetMessage( );
76
	m_pMessage->SetConnection( m_pConnection );
Anne's avatar
Anne committed
77
	while ( !m_pMessage->ReadMessage( 0 ) ); //blocking
78

Anne's avatar
Anne committed
79 80
	assert( m_pMessage->GetMessageType( ) == CITANetAudioProtocol::NP_CLIENT_OPEN );
	CITANetAudioProtocol::StreamingParameters oClientParams = m_pMessage->ReadStreamingParameters( );
81

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
82
	bool bOK = false;
Anne's avatar
Anne committed
83
	m_oServerParams.iRingBufferSize = oClientParams.iRingBufferSize;
Anne's avatar
Anne committed
84
	m_iClientRingBufferFreeSamples = m_oServerParams.iRingBufferSize;
Anne's avatar
Anne committed
85
	if ( m_oServerParams == oClientParams )
Anne's avatar
Anne committed
86
	{
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
87
		bOK = true;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
88
#ifdef NET_AUDIO_SHOW_TRAFFIC
89
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters matched. Will resume with streaming" << std::endl;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
90 91 92
	}
	else
	{
93
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters mismatch detected. Will notify client and stop." << std::endl;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
94
#endif
Anne's avatar
Anne committed
95
	}
Anne's avatar
Anne committed
96 97 98 99

	std::string paras = std::string( "NetAudioLogServer" ) + std::string( "_BS" ) + std::to_string( m_oServerParams.iBlockSize ) + std::string( "_Ch" ) + std::to_string( m_oServerParams.iChannels ) + std::string( ".txt" );
	m_pServerLogger = new ITABufferedDataLoggerImplServer( );
	m_pServerLogger->setOutputFile( paras );
Anne's avatar
Anne committed
100

Anne's avatar
Anne committed
101
	m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_OPEN );
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
102
	m_pMessage->WriteBool( bOK );
Anne's avatar
Anne committed
103
	m_pMessage->WriteMessage( );
104

Anne's avatar
Anne committed
105 106
	if ( bOK )
		Run( );
107

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
108
	return bOK;
Anne's avatar
Anne committed
109 110
}

Anne's avatar
Anne committed
111
bool CITANetAudioStreamingServer::LoopBody( )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
112
{
113
	m_pMessage->ResetMessage( );
Anne's avatar
Anne committed
114 115 116
	ITAServerLog oLog;
	oLog.uiBlockId = ++iServerBlockId;
	int iMsgType;
Anne's avatar
Anne committed
117 118 119 120 121 122 123 124 125 126 127 128 129
	// Sending Samples 
	if ( m_iClientRingBufferFreeSamples >= int( m_pInputStream->GetBlocklength( ) ) )
	{
		// Send Samples
		for ( int i = 0; i < int( m_pInputStream->GetNumberOfChannels( ) ); i++ )
		{
			ITAStreamInfo oStreamInfo;
			oStreamInfo.nSamples = m_sfTempTransmitBuffer.GetLength( );
			const float* pfData = m_pInputStream->GetBlockPointer( i, &oStreamInfo );
			if ( pfData != 0 )
				m_sfTempTransmitBuffer[ i ].write( pfData, m_sfTempTransmitBuffer.GetLength( ) );
		}
		m_pInputStream->IncrementBlockPointer( );
Anne's avatar
Anne committed
130 131
		iMsgType = CITANetAudioProtocol::NP_SERVER_SENDING_SAMPLES;
		m_pMessage->SetMessageType( iMsgType );
Anne's avatar
Anne committed
132 133 134 135 136 137 138 139 140 141 142
		m_pMessage->WriteSampleFrame( &m_sfTempTransmitBuffer );
		m_pMessage->WriteMessage( ); 
		m_iClientRingBufferFreeSamples -= m_sfTempTransmitBuffer.GetLength( );
#ifdef NET_AUDIO_SHOW_TRAFFIC
			vstr::out( ) << "[ITANetAudioStreamingServer] Transmitted " << m_sfTempTransmitBuffer.GetLength( ) << " samples for "
			<< m_pInputStream->GetNumberOfChannels( ) << " channels" << std::endl;
#endif
	}
	else
	{
		// Waiting for Trigger
Anne's avatar
Anne committed
143 144
		iMsgType = CITANetAudioProtocol::NP_SERVER_GET_RINGBUFFER_FREE_SAMPLES;
		m_pMessage->SetMessageType( iMsgType );
Anne's avatar
Anne committed
145 146 147 148 149 150
		m_pMessage->WriteMessage( );

#ifdef NET_AUDIO_SHOW_TRAFFIC
		vstr::out( ) << "[ITANetAudioStreamingServer] Not enough free samples in client buffer, requesting a trigger when more free samples available" << std::endl;
#endif
	}
Anne's avatar
Anne committed
151 152 153 154 155

	oLog.iFreeSamples = m_iClientRingBufferFreeSamples;
	oLog.iProtocolStatus = iMsgType;
	oLog.dWorldTimeStamp = ITAClock::getDefaultClock( )->getTime( );
	m_pServerLogger->log( oLog );
Anne's avatar
Anne committed
156 157

	// Try to Empfange Daten
Anne's avatar
Anne committed
158
	//m_pMessage->SetConnection( m_pConnection );
Anne's avatar
Anne committed
159
	m_pMessage->ResetMessage( );
160

Anne's avatar
Anne committed
161
	if ( m_pMessage->ReadMessage( 1 ) )
162
	{
Anne's avatar
Anne committed
163 164
		ITAServerLog oLog;
		oLog.uiBlockId = ++iServerBlockId;
Anne's avatar
Anne committed
165 166
		int iMsgType = m_pMessage->GetMessageType( );
		switch ( iMsgType )
167
		{
Anne's avatar
Anne committed
168
			case CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES:
169
			{
Anne's avatar
Anne committed
170
				m_iClientRingBufferFreeSamples = m_pMessage->ReadInt( );	
Anne's avatar
Anne committed
171 172 173 174 175 176 177
				break;
			}
			case CITANetAudioProtocol::NP_CLIENT_CLOSE:
			{
				//m_pMessage->SetAnswerType( CITANetAudioProtocol::NP_SERVER_CLOSE );
				//m_pMessage->WriteAnswer();
				StopGently( false );
Anne's avatar
Anne committed
178
				m_pConnection = NULL;
Anne's avatar
Anne committed
179
				Stop( );
Anne's avatar
Anne committed
180
				break;
Anne's avatar
Anne committed
181 182 183 184 185 186
			}
			default:
			{
				vstr::out( ) << "[ITANetAudioStreamingServer] Unkown protocol type : " << iMsgType << std::endl;
				break;
			}
Anne's avatar
Anne committed
187 188 189 190 191 192
		}
		oLog.iFreeSamples = m_iClientRingBufferFreeSamples;
		oLog.iProtocolStatus = iMsgType;
		oLog.dWorldTimeStamp = ITAClock::getDefaultClock( )->getTime( );
		m_pServerLogger->log( oLog );
	}
193 194


Anne's avatar
Anne committed
195
	return false;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
196 197
}

198 199
void CITANetAudioStreamingServer::SetInputStream( ITADatasource* pInStream )
{
Anne's avatar
Anne committed
200
	if ( VistaThreadLoop::IsRunning( ) )
201 202 203
		ITA_EXCEPT1( MODAL_EXCEPTION, "Streaming loop already running, can not change input stream" );

	m_pInputStream = pInStream;
Anne's avatar
Anne committed
204 205 206 207
	m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels( ), m_pInputStream->GetBlocklength( ), true );
	m_oServerParams.dSampleRate = m_pInputStream->GetSampleRate( );
	m_oServerParams.iBlockSize = m_pInputStream->GetBlocklength( );
	m_oServerParams.iChannels = m_pInputStream->GetNumberOfChannels( );
208 209
}

Anne's avatar
Anne committed
210
ITADatasource* CITANetAudioStreamingServer::GetInputStream( ) const
Anne's avatar
Anne committed
211 212 213 214
{
	return m_pInputStream;
}

Anne's avatar
Anne committed
215
int CITANetAudioStreamingServer::GetNetStreamBlocklength( ) const
216
{
Anne's avatar
Anne committed
217
	return m_sfTempTransmitBuffer.GetLength( );
218 219
}

Anne's avatar
Anne committed
220
int CITANetAudioStreamingServer::GetNetStreamNumberOfChannels( ) const
221
{
Anne's avatar
Anne committed
222
	return m_sfTempTransmitBuffer.channels( );
223 224
}

Anne's avatar
Anne committed
225
void CITANetAudioStreamingServer::SetAutomaticUpdateRate( )
226 227 228 229
{
	m_iUpdateStrategy = AUTO;
}

Anne's avatar
Anne committed
230
bool CITANetAudioStreamingServer::IsClientConnected( ) const
231
{
Anne's avatar
Anne committed
232
	return m_pNetAudioServer->IsConnected( );
233 234
}

Anne's avatar
Anne committed
235
std::string CITANetAudioStreamingServer::GetNetworkAddress( ) const
236
{
Anne's avatar
Anne committed
237
	return m_pNetAudioServer->GetServerAddress( );
238 239
}

Anne's avatar
Anne committed
240
int CITANetAudioStreamingServer::GetNetworkPort( ) const
241
{
Anne's avatar
Anne committed
242
	return m_pNetAudioServer->GetNetworkPort( );
243 244
}

Anne's avatar
Anne committed
245
void CITANetAudioStreamingServer::Stop( )
Anne's avatar
Anne committed
246
{
Anne's avatar
Anne committed
247
	delete m_pServerLogger;
Anne's avatar
Anne committed
248
	m_pNetAudioServer->Stop( );
Anne's avatar
Anne committed
249
}