ITANetAudioStreamingClient.cpp 5.93 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
#include <ITANetAudioStreamingClient.h>

#include <ITANetAudioClient.h>
#include <ITANetAudioMessage.h>
#include <ITANetAudioStream.h>
#include <ITADataLog.h>
#include <ITAClock.h>

#include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaBase/VistaStreamUtils.h>

//! Audio streaming log item
struct ITAClientLog : public ITALogDataBase
{
	inline static std::ostream& outputDesc( std::ostream& os )
	{
		os << "BlockId";
		os << "\t" << "WorldTimeStamp";
		os << "\t" << "ProtocolStatus";
		os << "\t" << "FreeSamples";
		os << "\t" << "Channel";
		os << std::endl;
		return os;
	};

	inline std::ostream& outputData( std::ostream& os ) const
	{
		os << uiBlockId;
		os << "\t" << std::setprecision( 12 ) << dWorldTimeStamp;
		os << "\t" << iProtocolStatus;
		os << "\t" << iFreeSamples;
		os << "\t" << iChannel;
		os << std::endl;
		return os;
	};

	unsigned int uiBlockId; //!< Block identifier (audio streaming)
	double dWorldTimeStamp;
	int iProtocolStatus; //!< ... usw
	int iFreeSamples;
	int iChannel;

};

class ITABufferedDataLoggerImplClient : public ITABufferedDataLogger < ITAClientLog > {};


CITANetAudioStreamingClient::CITANetAudioStreamingClient( CITANetAudioStream* pParent )
	: m_oBlockIncrementEvent( VistaThreadEvent::WAITABLE_EVENT )
	, m_pStream( pParent )
	, m_pConnection( NULL )
	, m_bStopIndicated( false )
{
	m_pClient = new CITANetAudioClient();

	m_oParams.iChannels = pParent->GetNumberOfChannels();
	m_oParams.dSampleRate = pParent->GetSampleRate( );
	m_oParams.iBlockSize = pParent->GetBlocklength( );
	m_oParams.iRingBufferSize = pParent->GetRingBufferSize( );

	std::string paras = std::string("NetAudioLogClient") + std::string("_BS") + std::to_string(pParent->GetBlocklength()) + std::string("_Ch") + std::to_string(pParent->GetNumberOfChannels()) + std::string(".txt");
	m_pClientLogger = new ITABufferedDataLoggerImplClient( );
	m_pClientLogger->setOutputFile(paras);
	iStreamingBlockId = 0;
	m_pMessage = new CITANetAudioMessage( VistaSerializingToolset::SWAPS_MULTIBYTE_VALUES );
66
	m_sfReceivingBuffer.init(m_oParams.iChannels, m_oParams.iRingBufferSize, false);
67 68 69 70 71
}

CITANetAudioStreamingClient::~CITANetAudioStreamingClient()
{
	//try{
72
	if (m_pConnection != NULL && m_pConnection->GetIsConnected())
73 74 75 76
		{
			m_pMessage->ResetMessage();
			m_pMessage->SetConnection(m_pConnection);
			m_pMessage->SetMessageType(CITANetAudioProtocol::NP_CLIENT_CLOSE);
Anne Heimes's avatar
Anne Heimes committed
77
			m_pMessage->WriteBool(true);
78
			m_pMessage->WriteMessage();
Anne Heimes's avatar
Anne Heimes committed
79
			//m_pClient->Disconnect();
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
		}
		delete m_pClientLogger;
	//}
	//catch (ITAException e){
	//	std::cout << e << std::endl;
	//}
}

bool CITANetAudioStreamingClient::Connect( const std::string& sAddress, int iPort )
{
	if( GetIsConnected() )
		return false;
	
	if( !m_pClient->Connect( sAddress, iPort ) )
		ITA_EXCEPT1( INVALID_PARAMETER, "Could not connect to " + sAddress );
	
	m_pConnection = m_pClient->GetConnection();

	m_pMessage->ResetMessage();
	m_pMessage->SetConnection( m_pConnection );

	// Validate streaming parameters of server and client
	m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_OPEN );
	m_pMessage->WriteStreamingParameters( m_oParams );
	m_pMessage->WriteMessage( );
	m_pMessage->ResetMessage( );

	while ( !m_pMessage->ReadMessage( 0 ) );
	
	assert( m_pMessage->GetMessageType( ) == CITANetAudioProtocol::NP_SERVER_OPEN );
	bool bOK = m_pMessage->ReadBool();
	
	if( !bOK )
		ITA_EXCEPT1( INVALID_PARAMETER, "Streaming server declined connection, detected streaming parameter mismatch." );

	Run();

	return true;
}

bool CITANetAudioStreamingClient::LoopBody()
{
	ITAClientLog oLog;
	oLog.uiBlockId = ++iStreamingBlockId;

	if( m_bStopIndicated )
		return true;

128 129 130 131 132 133 134 135
	// Send Puffer informationenen
	if (iStreamingBlockId % 2 == 1)
	{
		m_pMessage->ResetMessage();
		m_pMessage->SetMessageType(CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES);
		m_pMessage->WriteInt(m_pStream->GetRingBufferFreeSamples());
		m_pMessage->WriteMessage();
	}
136 137
	// Send message to server that samples can be received

138

139
	// Read answer 
140 141
	m_pMessage->ResetMessage( );
	if ( m_pMessage->ReadMessage( 1 ) )
142 143 144 145 146 147 148 149 150
	{
		int iMsgType = m_pMessage->GetMessageType( );
		switch ( iMsgType )
		{
			case CITANetAudioProtocol::NP_SERVER_SENDING_SAMPLES:
				// Receive samples from net message and forward them to the stream ring buffer

				m_pMessage->ReadSampleFrame( &m_sfReceivingBuffer );
				if ( m_pStream->GetRingBufferFreeSamples( ) >= m_sfReceivingBuffer.GetLength( ) )
Anne Heimes's avatar
Anne Heimes committed
151 152 153 154
					m_pStream->Transmit(m_sfReceivingBuffer, m_sfReceivingBuffer.GetLength());
#ifdef NET_AUDIO_SHOW_TRAFFIC
				vstr::out() << "[ITANetAudioStreamingClient] Recived " << m_sfReceivingBuffer.GetLength() << " samples" << std::endl;
#endif
155 156
				break;
			case CITANetAudioProtocol::NP_SERVER_GET_RINGBUFFER_FREE_SAMPLES:
Anne Heimes's avatar
Anne Heimes committed
157
				m_pMessage->ReadBool();
158
				m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_SENDING_RINGBUFFER_FREE_SAMPLES );
Anne's avatar
Anne committed
159
				m_pMessage->WriteInt( m_pStream->GetRingBufferFreeSamples( ) );
Anne Heimes's avatar
Anne Heimes committed
160
				m_pMessage->WriteMessage();
161 162 163
				break;
			case CITANetAudioProtocol::NP_SERVER_CLOSE:
				Disconnect( );
Anne's avatar
Anne committed
164 165 166 167
				break;
			default:
				vstr::out( ) << "[ITANetAudioStreamingServer] Unkown protocol type : " << iMsgType << std::endl;
				break;
168
		}
Anne Heimes's avatar
Anne Heimes committed
169
		oLog.iChannel = m_pStream->GetNumberOfChannels();
170
		oLog.iProtocolStatus = iMsgType;
Anne Heimes's avatar
Anne Heimes committed
171
		oLog.iFreeSamples = m_pStream->GetRingBufferFreeSamples();
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
		oLog.dWorldTimeStamp = ITAClock::getDefaultClock( )->getTime( );
		m_pClientLogger->log( oLog );
	}
	return false;
}

void CITANetAudioStreamingClient::TriggerBlockIncrement()
{
	m_oBlockIncrementEvent.SignalEvent();
}

bool CITANetAudioStreamingClient::GetIsConnected() const
{
	return m_pClient->GetIsConnected();
}

void CITANetAudioStreamingClient::Disconnect()
{
	m_bStopIndicated = true;
	StopGently( true );

	//delete m_pConnection;
	m_pConnection = NULL;

	m_bStopIndicated = false;
}