ITANetAudioStreamingServer.cpp 7.54 KB
Newer Older
Anne's avatar
Anne committed
1 2
#include <ITANetAudioStreamingServer.h>
#include <ITANetAudioServer.h>
3
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
4 5

// ITA includes
Anne's avatar
Anne committed
6
#include <ITADataSource.h>
7
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
8
#include <ITAException.h>
Anne's avatar
Anne committed
9 10
#include <ITAStreamInfo.h>
#include <ITAClock.h>
Anne's avatar
Anne committed
11 12 13 14 15 16 17

// Vista includes
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaInterProcComm/IPNet/VistaTCPSocket.h>
#include <VistaBase/VistaTimeUtils.h>
#include <VistaInterProcComm/IPNet/VistaIPAddress.h>
Anne's avatar
Anne committed
18 19
#include <VistaBase/VistaStreamUtils.h>
#include <ITADataLog.h>
Anne's avatar
Anne committed
20 21 22 23 24

// STL
#include <cmath>
#include <cassert>

Anne's avatar
Anne committed
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
struct ITAServerLog : public ITALogDataBase
{
	inline static std::ostream& outputDesc( std::ostream& os )
	{
		os << "BlockId";
		os << "\t" << "WorldTimeStamp";
		os << "\t" << "ProtocolStatus";
		os << "\t" << "FreeSamples";
		os << std::endl;
		return os;
	};

	inline std::ostream& outputData( std::ostream& os ) const
	{
		os << uiBlockId;
		os << "\t" << std::setprecision( 12 ) << dWorldTimeStamp;
		os << "\t" << iProtocolStatus;
		os << "\t" << iFreeSamples;
		os << std::endl;
		return os;
	};

	unsigned int uiBlockId; //!< Block identifier (audio streaming)
	double dWorldTimeStamp;
	int iProtocolStatus; //!< ... usw
	int iFreeSamples;
};

class ITABufferedDataLoggerImplServer : public ITABufferedDataLogger < ITAServerLog > {};

Anne's avatar
Anne committed
55 56 57 58 59
CITANetAudioStreamingServer::CITANetAudioStreamingServer( )
: m_pInputStream( NULL )
, m_iUpdateStrategy( AUTO )
, m_pConnection( NULL )
, m_pNetAudioServer( new CITANetAudioServer( ) )
Anne's avatar
Anne committed
60 61
{
	iServerBlockId = 0;
Anne's avatar
Anne committed
62 63
}

64
bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort )
Anne's avatar
Anne committed
65
{
Anne's avatar
Anne committed
66
	if ( !m_pInputStream )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
67 68
		ITA_EXCEPT1( MODAL_EXCEPTION, "Can not start server without a valid input stream" );

Anne's avatar
Anne committed
69
	if ( !m_pNetAudioServer->Start( sAddress, iPort ) ) // blocking
70 71
		return false;

Anne's avatar
Anne committed
72
	m_pConnection = m_pNetAudioServer->GetConnection( );
73

Anne's avatar
Anne committed
74 75
	m_pMessage = new CITANetAudioMessage( m_pConnection->GetByteorderSwapFlag( ) );
	m_pMessage->ResetMessage( );
76
	m_pMessage->SetConnection( m_pConnection );
Anne's avatar
Anne committed
77
	while ( !m_pMessage->ReadMessage( 0 ) ); //blocking
78

Anne's avatar
Anne committed
79 80
	assert( m_pMessage->GetMessageType( ) == CITANetAudioProtocol::NP_CLIENT_OPEN );
	CITANetAudioProtocol::StreamingParameters oClientParams = m_pMessage->ReadStreamingParameters( );
81

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
82
	bool bOK = false;
Anne's avatar
Anne committed
83
	m_oServerParams.iRingBufferSize = oClientParams.iRingBufferSize;
Anne's avatar
Anne committed
84
	m_iClientRingBufferFreeSamples = m_oServerParams.iRingBufferSize;
Anne's avatar
Anne committed
85
	if ( m_oServerParams == oClientParams )
Anne's avatar
Anne committed
86
	{
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
87
		bOK = true;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
88
#ifdef NET_AUDIO_SHOW_TRAFFIC
89
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters matched. Will resume with streaming" << std::endl;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
90 91 92
	}
	else
	{
93
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters mismatch detected. Will notify client and stop." << std::endl;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
94
#endif
Anne's avatar
Anne committed
95
	}
Anne's avatar
Anne committed
96 97 98 99

	std::string paras = std::string( "NetAudioLogServer" ) + std::string( "_BS" ) + std::to_string( m_oServerParams.iBlockSize ) + std::string( "_Ch" ) + std::to_string( m_oServerParams.iChannels ) + std::string( ".txt" );
	m_pServerLogger = new ITABufferedDataLoggerImplServer( );
	m_pServerLogger->setOutputFile( paras );
Anne's avatar
Anne committed
100

Anne's avatar
Anne committed
101
	m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_OPEN );
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
102
	m_pMessage->WriteBool( bOK );
Anne's avatar
Anne committed
103
	m_pMessage->WriteMessage( );
104

Anne's avatar
Anne committed
105 106
	if ( bOK )
		Run( );
107

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
108
	return bOK;
Anne's avatar
Anne committed
109 110
}

Anne's avatar
Anne committed
111
bool CITANetAudioStreamingServer::LoopBody( )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
112
{
Anne's avatar
Anne committed
113 114 115
	ITAServerLog oLog;
	oLog.uiBlockId = ++iServerBlockId;
	int iMsgType;
Anne's avatar
Anne committed
116 117 118 119 120 121 122 123 124 125 126 127 128
	// Sending Samples 
	if ( m_iClientRingBufferFreeSamples >= int( m_pInputStream->GetBlocklength( ) ) )
	{
		// Send Samples
		for ( int i = 0; i < int( m_pInputStream->GetNumberOfChannels( ) ); i++ )
		{
			ITAStreamInfo oStreamInfo;
			oStreamInfo.nSamples = m_sfTempTransmitBuffer.GetLength( );
			const float* pfData = m_pInputStream->GetBlockPointer( i, &oStreamInfo );
			if ( pfData != 0 )
				m_sfTempTransmitBuffer[ i ].write( pfData, m_sfTempTransmitBuffer.GetLength( ) );
		}
		m_pInputStream->IncrementBlockPointer( );
Anne's avatar
Anne committed
129 130
		iMsgType = CITANetAudioProtocol::NP_SERVER_SENDING_SAMPLES;
		m_pMessage->SetMessageType( iMsgType );
Anne's avatar
Anne committed
131 132 133 134 135 136 137 138 139 140 141
		m_pMessage->WriteSampleFrame( &m_sfTempTransmitBuffer );
		m_pMessage->WriteMessage( ); 
		m_iClientRingBufferFreeSamples -= m_sfTempTransmitBuffer.GetLength( );
#ifdef NET_AUDIO_SHOW_TRAFFIC
			vstr::out( ) << "[ITANetAudioStreamingServer] Transmitted " << m_sfTempTransmitBuffer.GetLength( ) << " samples for "
			<< m_pInputStream->GetNumberOfChannels( ) << " channels" << std::endl;
#endif
	}
	else
	{
		// Waiting for Trigger
Anne's avatar
Anne committed
142 143
		iMsgType = CITANetAudioProtocol::NP_SERVER_GET_RINGBUFFER_FREE_SAMPLES;
		m_pMessage->SetMessageType( iMsgType );
Anne's avatar
Anne committed
144 145 146 147 148 149
		m_pMessage->WriteMessage( );

#ifdef NET_AUDIO_SHOW_TRAFFIC
		vstr::out( ) << "[ITANetAudioStreamingServer] Not enough free samples in client buffer, requesting a trigger when more free samples available" << std::endl;
#endif
	}
Anne's avatar
Anne committed
150 151 152 153 154

	oLog.iFreeSamples = m_iClientRingBufferFreeSamples;
	oLog.iProtocolStatus = iMsgType;
	oLog.dWorldTimeStamp = ITAClock::getDefaultClock( )->getTime( );
	m_pServerLogger->log( oLog );
Anne's avatar
Anne committed
155 156

	// Try to Empfange Daten
Anne's avatar
Anne committed
157
	//m_pMessage->SetConnection( m_pConnection );
Anne's avatar
Anne committed
158
	m_pMessage->ResetMessage( );
159

Anne's avatar
Anne committed
160
	if ( m_pMessage->ReadMessage( 1 ) )
161
	{
Anne's avatar
Anne committed
162 163
		ITAServerLog oLog;
		oLog.uiBlockId = ++iServerBlockId;
Anne's avatar
Anne committed
164 165
		int iMsgType = m_pMessage->GetMessageType( );
		switch ( iMsgType )
166
		{
Anne's avatar
Anne committed
167
			case CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES:
168
			{
Anne's avatar
Anne committed
169
				m_iClientRingBufferFreeSamples = m_pMessage->ReadInt( );	
Anne's avatar
Anne committed
170 171 172 173 174 175 176
				break;
			}
			case CITANetAudioProtocol::NP_CLIENT_CLOSE:
			{
				//m_pMessage->SetAnswerType( CITANetAudioProtocol::NP_SERVER_CLOSE );
				//m_pMessage->WriteAnswer();
				StopGently( false );
Anne's avatar
Anne committed
177
				m_pConnection = NULL;
Anne's avatar
Anne committed
178
				Stop( );
Anne's avatar
Anne committed
179
				break;
Anne's avatar
Anne committed
180 181 182 183 184 185
			}
			default:
			{
				vstr::out( ) << "[ITANetAudioStreamingServer] Unkown protocol type : " << iMsgType << std::endl;
				break;
			}
Anne's avatar
Anne committed
186 187 188 189 190 191
		}
		oLog.iFreeSamples = m_iClientRingBufferFreeSamples;
		oLog.iProtocolStatus = iMsgType;
		oLog.dWorldTimeStamp = ITAClock::getDefaultClock( )->getTime( );
		m_pServerLogger->log( oLog );
	}
192 193


Anne's avatar
Anne committed
194
	return false;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
195 196
}

197 198
void CITANetAudioStreamingServer::SetInputStream( ITADatasource* pInStream )
{
Anne's avatar
Anne committed
199
	if ( VistaThreadLoop::IsRunning( ) )
200 201 202
		ITA_EXCEPT1( MODAL_EXCEPTION, "Streaming loop already running, can not change input stream" );

	m_pInputStream = pInStream;
Anne's avatar
Anne committed
203 204 205 206
	m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels( ), m_pInputStream->GetBlocklength( ), true );
	m_oServerParams.dSampleRate = m_pInputStream->GetSampleRate( );
	m_oServerParams.iBlockSize = m_pInputStream->GetBlocklength( );
	m_oServerParams.iChannels = m_pInputStream->GetNumberOfChannels( );
207 208
}

Anne's avatar
Anne committed
209
ITADatasource* CITANetAudioStreamingServer::GetInputStream( ) const
Anne's avatar
Anne committed
210 211 212 213
{
	return m_pInputStream;
}

Anne's avatar
Anne committed
214
int CITANetAudioStreamingServer::GetNetStreamBlocklength( ) const
215
{
Anne's avatar
Anne committed
216
	return m_sfTempTransmitBuffer.GetLength( );
217 218
}

Anne's avatar
Anne committed
219
int CITANetAudioStreamingServer::GetNetStreamNumberOfChannels( ) const
220
{
Anne's avatar
Anne committed
221
	return m_sfTempTransmitBuffer.channels( );
222 223
}

Anne's avatar
Anne committed
224
void CITANetAudioStreamingServer::SetAutomaticUpdateRate( )
225 226 227 228
{
	m_iUpdateStrategy = AUTO;
}

Anne's avatar
Anne committed
229
bool CITANetAudioStreamingServer::IsClientConnected( ) const
230
{
Anne's avatar
Anne committed
231
	return m_pNetAudioServer->IsConnected( );
232 233
}

Anne's avatar
Anne committed
234
std::string CITANetAudioStreamingServer::GetNetworkAddress( ) const
235
{
Anne's avatar
Anne committed
236
	return m_pNetAudioServer->GetServerAddress( );
237 238
}

Anne's avatar
Anne committed
239
int CITANetAudioStreamingServer::GetNetworkPort( ) const
240
{
Anne's avatar
Anne committed
241
	return m_pNetAudioServer->GetNetworkPort( );
242 243
}

Anne's avatar
Anne committed
244
void CITANetAudioStreamingServer::Stop( )
Anne's avatar
Anne committed
245
{
Anne's avatar
Anne committed
246
	delete m_pServerLogger;
Anne's avatar
Anne committed
247
	m_pNetAudioServer->Stop( );
Anne's avatar
Anne committed
248
}