ITANetAudioStreamingServer.cpp 7.46 KB
Newer Older
Anne's avatar
Anne committed
1 2
#include <ITANetAudioStreamingServer.h>
#include <ITANetAudioServer.h>
3
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
4 5

// ITA includes
Anne's avatar
Anne committed
6
#include <ITADataSource.h>
7
#include <ITANetAudioMessage.h>
Anne's avatar
Anne committed
8
#include <ITAException.h>
9
#include <ITAStreamInfo.h>
Anne's avatar
Anne committed
10
#include <ITAClock.h>
Anne's avatar
Anne committed
11 12 13 14 15 16 17

// Vista includes
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaInterProcComm/IPNet/VistaTCPSocket.h>
#include <VistaBase/VistaTimeUtils.h>
#include <VistaInterProcComm/IPNet/VistaIPAddress.h>
18
#include <VistaBase/VistaStreamUtils.h>
Anne's avatar
Anne committed
19
#include <ITADataLog.h>
Anne's avatar
Anne committed
20 21 22 23 24

// STL
#include <cmath>
#include <cassert>

25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
struct ITAServerLog : public ITALogDataBase
{
	inline static std::ostream& outputDesc( std::ostream& os )
	{
		os << "BlockId";
		os << "\t" << "WorldTimeStamp";
		os << "\t" << "ProtocolStatus";
		os << "\t" << "FreeSamples";
		os << std::endl;
		return os;
	};

	inline std::ostream& outputData( std::ostream& os ) const
	{
		os << uiBlockId;
		os << "\t" << std::setprecision( 12 ) << dWorldTimeStamp;
		os << "\t" << iProtocolStatus;
		os << "\t" << iFreeSamples;
		os << std::endl;
		return os;
	};

	unsigned int uiBlockId; //!< Block identifier (audio streaming)
	double dWorldTimeStamp;
	int iProtocolStatus; //!< ... usw
	int iFreeSamples;
Anne's avatar
Anne committed
51 52 53 54
};

class ITABufferedDataLoggerImplServer : public ITABufferedDataLogger < ITAServerLog > {};

Anne's avatar
Anne committed
55 56 57 58 59
CITANetAudioStreamingServer::CITANetAudioStreamingServer( )
: m_pInputStream( NULL )
, m_iUpdateStrategy( AUTO )
, m_pConnection( NULL )
, m_pNetAudioServer( new CITANetAudioServer( ) )
60
{
Anne's avatar
Anne committed
61
	iServerBlockId = 0;
Anne Heimes's avatar
Anne Heimes committed
62
	m_iClientRingBufferFreeSamples = 0;
Anne's avatar
Anne committed
63 64
}

65
bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort )
Anne's avatar
Anne committed
66
{
Anne's avatar
Anne committed
67
	if ( !m_pInputStream )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
68 69
		ITA_EXCEPT1( MODAL_EXCEPTION, "Can not start server without a valid input stream" );

Anne's avatar
Anne committed
70
	if ( !m_pNetAudioServer->Start( sAddress, iPort ) ) // blocking
71 72
		return false;

Anne's avatar
Anne committed
73
	m_pConnection = m_pNetAudioServer->GetConnection( );
74

Anne's avatar
Anne committed
75 76
	m_pMessage = new CITANetAudioMessage( m_pConnection->GetByteorderSwapFlag( ) );
	m_pMessage->ResetMessage( );
77
	m_pMessage->SetConnection( m_pConnection );
78
	while ( !m_pMessage->ReadMessage( 0 ) ); //blocking
79

Anne's avatar
Anne committed
80 81
	assert( m_pMessage->GetMessageType( ) == CITANetAudioProtocol::NP_CLIENT_OPEN );
	CITANetAudioProtocol::StreamingParameters oClientParams = m_pMessage->ReadStreamingParameters( );
82

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
83
	bool bOK = false;
Anne's avatar
Anne committed
84
	m_oServerParams.iRingBufferSize = oClientParams.iRingBufferSize;
Anne's avatar
Anne committed
85
	m_iClientRingBufferFreeSamples = m_oServerParams.iRingBufferSize;
Anne's avatar
Anne committed
86
	if ( m_oServerParams == oClientParams )
Anne's avatar
Anne committed
87
	{
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
88
		bOK = true;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
89
#ifdef NET_AUDIO_SHOW_TRAFFIC
90
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters matched. Will resume with streaming" << std::endl;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
91 92 93
	}
	else
	{
94
		vstr::out() << "[ITANetAudioStreamingServer] Server and client parameters mismatch detected. Will notify client and stop." << std::endl;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
95
#endif
Anne's avatar
Anne committed
96
	}
97 98 99

	std::string paras = std::string( "NetAudioLogServer" ) + std::string( "_BS" ) + std::to_string( m_oServerParams.iBlockSize ) + std::string( "_Ch" ) + std::to_string( m_oServerParams.iChannels ) + std::string( ".txt" );
	m_pServerLogger = new ITABufferedDataLoggerImplServer( );
Anne's avatar
Anne committed
100
	m_pServerLogger->setOutputFile( paras );
Anne's avatar
Anne committed
101

Anne's avatar
Anne committed
102
	m_pMessage->SetMessageType( CITANetAudioProtocol::NP_SERVER_OPEN );
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
103
	m_pMessage->WriteBool( bOK );
Anne's avatar
Anne committed
104
	m_pMessage->WriteMessage( );
105

Anne Heimes's avatar
Anne Heimes committed
106 107 108
	m_sfTempTransmitBuffer.init( m_pInputStream->GetNumberOfChannels( ), m_oServerParams.iRingBufferSize, true );


Anne's avatar
Anne committed
109 110
	if ( bOK )
		Run( );
111

Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
112
	return bOK;
Anne's avatar
Anne committed
113 114
}

Anne's avatar
Anne committed
115
bool CITANetAudioStreamingServer::LoopBody( )
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
116
{
117
	m_pMessage->ResetMessage( );
Anne's avatar
Anne committed
118 119 120
	ITAServerLog oLog;
	oLog.uiBlockId = ++iServerBlockId;
	int iMsgType;
Anne's avatar
Anne committed
121
	// Sending Samples 
Anne Heimes's avatar
Anne Heimes committed
122
	if ( m_iClientRingBufferFreeSamples >= int( m_pInputStream->GetBlocklength() ) )
Anne's avatar
Anne committed
123 124
	{
		// Send Samples
Anne Heimes's avatar
Anne Heimes committed
125
		for ( int i = 0; i < int( m_pInputStream->GetNumberOfChannels( ) ); i++ )
Anne's avatar
Anne committed
126
		{
Anne Heimes's avatar
Anne Heimes committed
127
			ITAStreamInfo oStreamInfo;
Anne Heimes's avatar
Anne Heimes committed
128
			oStreamInfo.nSamples = m_iClientRingBufferFreeSamples;
Anne Heimes's avatar
Anne Heimes committed
129 130
			const float* pfData = m_pInputStream->GetBlockPointer( i, &oStreamInfo );
			if ( pfData != 0 )
Anne Heimes's avatar
Anne Heimes committed
131
				m_sfTempTransmitBuffer[ i ].write( pfData, m_iClientRingBufferFreeSamples );
Anne Heimes's avatar
Anne Heimes committed
132 133 134 135 136 137
		}
		m_pInputStream->IncrementBlockPointer( );
		iMsgType = CITANetAudioProtocol::NP_SERVER_SENDING_SAMPLES;
		m_pMessage->SetMessageType( iMsgType );
		m_pMessage->WriteSampleFrame( &m_sfTempTransmitBuffer );
		m_pMessage->WriteMessage( );
Anne Heimes's avatar
Anne Heimes committed
138
		m_iClientRingBufferFreeSamples -= m_iClientRingBufferFreeSamples;
Anne's avatar
Anne committed
139
#ifdef NET_AUDIO_SHOW_TRAFFIC
Anne Heimes's avatar
Anne Heimes committed
140 141
		vstr::out( ) << "[ITANetAudioStreamingServer] Transmitted " << m_sfTempTransmitBuffer.GetLength( ) << " samples for "
			<< m_pInputStream->GetNumberOfChannels( ) << " channels" << std::endl;
Anne's avatar
Anne committed
142 143
#endif
		// Waiting for Trigger
Anne's avatar
Anne committed
144 145
		iMsgType = CITANetAudioProtocol::NP_SERVER_GET_RINGBUFFER_FREE_SAMPLES;
		m_pMessage->SetMessageType( iMsgType );
Anne's avatar
Anne committed
146 147
		m_pMessage->WriteMessage( );

Anne Heimes's avatar
Anne Heimes committed
148 149
		oLog.iProtocolStatus = iMsgType;

Anne's avatar
Anne committed
150 151 152 153
#ifdef NET_AUDIO_SHOW_TRAFFIC
		vstr::out( ) << "[ITANetAudioStreamingServer] Not enough free samples in client buffer, requesting a trigger when more free samples available" << std::endl;
#endif
	}
154 155

	oLog.iFreeSamples = m_iClientRingBufferFreeSamples;
Anne's avatar
Anne committed
156 157
	oLog.dWorldTimeStamp = ITAClock::getDefaultClock( )->getTime( );
	m_pServerLogger->log( oLog );
Anne's avatar
Anne committed
158 159

	// Try to Empfange Daten
Anne's avatar
Anne committed
160
	m_pMessage->ResetMessage( );
161

162
	if ( m_pMessage->ReadMessage( 1 ) )
163
	{
Anne's avatar
Anne committed
164 165
		ITAServerLog oLog;
		oLog.uiBlockId = ++iServerBlockId;
Anne's avatar
Anne committed
166 167
		int iMsgType = m_pMessage->GetMessageType( );
		switch ( iMsgType )
168
		{
Anne's avatar
Anne committed
169
			case CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES:
170
			{
Anne's avatar
Anne committed
171
				m_iClientRingBufferFreeSamples = m_pMessage->ReadInt( );	
Anne's avatar
Anne committed
172 173 174 175 176
				break;
			}
			case CITANetAudioProtocol::NP_CLIENT_CLOSE:
			{
				StopGently( false );
Anne's avatar
Anne committed
177
				m_pConnection = NULL;
Anne's avatar
Anne committed
178
				Stop( );
Anne's avatar
Anne committed
179
				break;
Anne's avatar
Anne committed
180 181 182 183 184 185
			}
			default:
			{
				vstr::out( ) << "[ITANetAudioStreamingServer] Unkown protocol type : " << iMsgType << std::endl;
				break;
			}
186 187 188
		}
		oLog.iFreeSamples = m_iClientRingBufferFreeSamples;
		oLog.iProtocolStatus = iMsgType;
Anne's avatar
Anne committed
189 190
		oLog.dWorldTimeStamp = ITAClock::getDefaultClock( )->getTime( );
		m_pServerLogger->log( oLog );
191
	}
192 193


Anne's avatar
Anne committed
194
	return false;
Dipl.-Ing. Jonas Stienen's avatar
Dipl.-Ing. Jonas Stienen committed
195 196
}

197 198
void CITANetAudioStreamingServer::SetInputStream( ITADatasource* pInStream )
{
Anne's avatar
Anne committed
199
	if ( VistaThreadLoop::IsRunning( ) )
200 201 202
		ITA_EXCEPT1( MODAL_EXCEPTION, "Streaming loop already running, can not change input stream" );

	m_pInputStream = pInStream;
Anne's avatar
Anne committed
203 204 205
	m_oServerParams.dSampleRate = m_pInputStream->GetSampleRate( );
	m_oServerParams.iBlockSize = m_pInputStream->GetBlocklength( );
	m_oServerParams.iChannels = m_pInputStream->GetNumberOfChannels( );
206 207
}

Anne's avatar
Anne committed
208
ITADatasource* CITANetAudioStreamingServer::GetInputStream( ) const
Anne's avatar
Anne committed
209 210 211 212
{
	return m_pInputStream;
}

Anne's avatar
Anne committed
213
int CITANetAudioStreamingServer::GetNetStreamBlocklength( ) const
214
{
Anne's avatar
Anne committed
215
	return m_sfTempTransmitBuffer.GetLength( );
216 217
}

Anne's avatar
Anne committed
218
int CITANetAudioStreamingServer::GetNetStreamNumberOfChannels( ) const
219
{
Anne's avatar
Anne committed
220
	return m_sfTempTransmitBuffer.channels( );
221 222
}

Anne's avatar
Anne committed
223
void CITANetAudioStreamingServer::SetAutomaticUpdateRate( )
224 225 226 227
{
	m_iUpdateStrategy = AUTO;
}

Anne's avatar
Anne committed
228
bool CITANetAudioStreamingServer::IsClientConnected( ) const
229
{
Anne's avatar
Anne committed
230
	return m_pNetAudioServer->IsConnected( );
231 232
}

Anne's avatar
Anne committed
233
std::string CITANetAudioStreamingServer::GetNetworkAddress( ) const
234
{
Anne's avatar
Anne committed
235
	return m_pNetAudioServer->GetServerAddress( );
236 237
}

Anne's avatar
Anne committed
238
int CITANetAudioStreamingServer::GetNetworkPort( ) const
239
{
Anne's avatar
Anne committed
240
	return m_pNetAudioServer->GetNetworkPort( );
241 242
}

Anne's avatar
Anne committed
243
void CITANetAudioStreamingServer::Stop( )
Anne's avatar
Anne committed
244
{
Anne's avatar
Anne committed
245
	delete m_pServerLogger;
Anne's avatar
Anne committed
246
	m_pNetAudioServer->Stop( );
Anne's avatar
Anne committed
247
}