Commit 0d840db2 authored by Dipl.-Ing. Jonas Stienen's avatar Dipl.-Ing. Jonas Stienen
Browse files

Merge branch 'ba_2016_heimes' into develop

parents 44235669 437cbfd9
......@@ -26,6 +26,10 @@
#include <string>
#include <vector>
#include <iostream>
#include <fstream>
using namespace std;
class CITANetAudioStreamingClient;
......@@ -41,10 +45,26 @@ public:
CITANetAudioStream( int iChannels, double dSamplingRate, int iBufferSize, int iRingBufferCapacity = 2048 );
virtual ~CITANetAudioStream();
enum StreamingStatus
{
INVALID = -1,
STOPPED,
CONNECTED,
STREAMING,
BUFFER_UNDERRUN,
};
bool Connect( const std::string& sAddress, int iPort );
bool GetIsConnected() const;
//! Returns (static) size of ring buffer
int GetRingBufferSize() const;
//! Returns true if ring buffer is full
bool GetIsRingBufferFull() const;
//! Returns true if ring buffer is empty
bool GetIsRingBufferEmpty() const;
unsigned int GetBlocklength() const;
unsigned int GetNumberOfChannels() const;
......@@ -62,7 +82,8 @@ protected:
*/
int Transmit( const ITASampleFrame& sfNewSamples, int iNumSamples );
int GetRingbufferFreeSamples();
//! Returns free samples between write and read cursor
int GetRingBufferFreeSamples() const;
private:
CITANetAudioStreamingClient* m_pNetAudioStreamingClient;
......@@ -71,10 +92,14 @@ private:
ITASampleFrame m_sfOutputStreamBuffer;
int m_iReadCursor; //!< Cursor where samples will be consumed from ring buffer on next block
int m_iWriteCursor; //!< Cursor where samples will feeded into ring buffer from net audio producer
int m_iWriteCursor; //!< Cursor where samples will be fed into ring buffer from net audio producer (always ahead)
bool m_bRingBufferFull; //!< Indicator if ring buffer is full (and read cursor equals write cursor)
ITASampleFrame m_sfRingBuffer; //!< Buffer incoming data
friend class CITANetAudioStreamingClient;
int m_iStreamingStatus; //!< Current streaming status
friend class CITANetAudioStreamingClient;
ofstream outputFile;
};
#endif // INCLUDE_WATCHER_ITA_NET_AUDIO_STREAM
......@@ -24,6 +24,7 @@
#include <ITANetAudioProtocol.h>
#include <ITASampleFrame.h>
#include <ITAStreamProbe.h>
#include <VistaInterProcComm/Concurrency/VistaThreadEvent.h>
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
......@@ -61,6 +62,7 @@ protected:
private:
CITANetAudioClient* m_pClient;
CITANetAudioStream* m_pStream;
ITAStreamProbe* m_pStreamProbe;
CITANetAudioProtocol* m_pProtocol;
CITANetAudioMessage* m_pMessage;
......
......@@ -7,11 +7,14 @@
// STL
#include <cmath>
#include <iostream>
CITANetAudioStream::CITANetAudioStream( int iChannels, double dSamplingRate, int iBufferSize, int iRingBufferCapacity )
: m_sfOutputStreamBuffer( iChannels, iBufferSize, true )
, m_dSampleRate( dSamplingRate )
, m_sfRingBuffer( iChannels, iRingBufferCapacity, true )
, m_bRingBufferFull( false )
, m_iStreamingStatus( INVALID )
{
if( iBufferSize > iRingBufferCapacity )
......@@ -19,17 +22,24 @@ CITANetAudioStream::CITANetAudioStream( int iChannels, double dSamplingRate, int
m_pNetAudioStreamingClient = new CITANetAudioStreamingClient( this );
m_iReadCursor = 0;
m_iWriteCursor = 0;
m_iWriteCursor = 0; // always ahead, i.e. iWriteCursor >= iReadCursor if unwrapped
m_iStreamingStatus = STOPPED;
outputFile.open( "program3data.txt" );
}
CITANetAudioStream::~CITANetAudioStream()
{
delete m_pNetAudioStreamingClient;
delete m_pNetAudioStreamingClient;
outputFile.close( );
}
bool CITANetAudioStream::Connect( const std::string& sAddress, int iPort )
{
return m_pNetAudioStreamingClient->Connect( sAddress, iPort );
bool bConnected = m_pNetAudioStreamingClient->Connect( sAddress, iPort );
if( bConnected )
m_iStreamingStatus = CONNECTED;
return bConnected;
}
bool CITANetAudioStream::GetIsConnected() const
......@@ -39,56 +49,102 @@ bool CITANetAudioStream::GetIsConnected() const
const float* CITANetAudioStream::GetBlockPointer( unsigned int uiChannel, const ITAStreamInfo* )
{
// @todo: is connected?
m_sfOutputStreamBuffer[uiChannel].Zero();
if (this->GetIsConnected())
{
ITASampleBuffer& sbOutputStreamBuffer( m_sfOutputStreamBuffer[ uiChannel ] );
sbOutputStreamBuffer.Zero();
const float* pfBlockPointer = sbOutputStreamBuffer.GetData();
if( !GetIsConnected() )
return pfBlockPointer;
// todo restlichen kopieren und dann rein und raus faden
int iCurrentWritePointer = m_iWriteCursor;
if (iCurrentWritePointer > m_iReadCursor) {
m_sfRingBuffer[uiChannel].cyclic_read(m_sfOutputStreamBuffer[uiChannel].GetData(),
m_sfOutputStreamBuffer.GetLength(), m_iReadCursor);
}
else
{
// Hallo
int a = 0;
}
m_iStreamingStatus = STREAMING;
int iCurrentWriteCursor = m_iWriteCursor; // local copy
if( iCurrentWriteCursor <= m_iReadCursor && GetRingBufferFreeSamples() > 0 ) // Wrap around?
iCurrentWriteCursor += GetRingBufferSize(); // Write pointer always ahead, so unwrap first
int iReadableSamples = iCurrentWriteCursor - m_iReadCursor;
if ( iReadableSamples > int( GetBlocklength( ) ) ) // samples can be cyclic-copied safely from ring buffer
{
m_sfRingBuffer[ uiChannel ].cyclic_read( sbOutputStreamBuffer.GetData( ), sbOutputStreamBuffer.GetLength( ), m_iReadCursor );
pfBlockPointer = sbOutputStreamBuffer.GetData( );
}
return m_sfOutputStreamBuffer[ uiChannel ].GetData();
else if( iReadableSamples > 0)
{
// @todo: fade with ITAFade
//std::cerr << "Should fade right now, but skipping samples." << std::endl;
}
return pfBlockPointer;
}
void CITANetAudioStream::IncrementBlockPointer()
{
// Increment read cursor by one audio block and wrap around if exceeding ring buffer
m_iReadCursor = ( m_iReadCursor + m_sfOutputStreamBuffer.GetLength() ) % m_sfRingBuffer.GetLength();
if ( ( GetRingBufferSize() - GetRingBufferFreeSamples( )) >= int( GetBlocklength( ) ) )
{
m_iReadCursor = ( m_iReadCursor + m_sfOutputStreamBuffer.GetLength() ) % m_sfRingBuffer.GetLength();
m_iStreamingStatus = STREAMING;
//outputFile << "incRead ";
}
else if ( GetIsRingBufferEmpty( ) )
{
//outputFile << "buffer empty ";
}
else
{
m_iStreamingStatus = BUFFER_UNDERRUN;
m_iReadCursor = m_iWriteCursor;
//outputFile << "BufferOverrun ";
}
m_bRingBufferFull = false;
//outputFile << "\tRead: " << m_iReadCursor;
//outputFile << "\tWrite : " << m_iWriteCursor;
//outputFile << "\tFreeSamples: " << GetRingBufferFreeSamples ()<< endl;
m_pNetAudioStreamingClient->TriggerBlockIncrement();
}
int CITANetAudioStream::Transmit( const ITASampleFrame& sfNewSamples, int iNumSamples )
{
// Take local copies (concurrent access)
int iCurrentReadCursor = m_iReadCursor;
int iCurrentWriteCursor = m_iWriteCursor;
// write samples in the buffer
m_sfRingBuffer.cyclic_write(sfNewSamples, iNumSamples,
iCurrentReadCursor, m_iWriteCursor);
if( iCurrentWriteCursor < iCurrentReadCursor )
iCurrentWriteCursor += GetRingBufferSize(); // Unwrap, because write cursor always ahead
// set write curser
m_iWriteCursor = ( m_iWriteCursor + iNumSamples ) % m_sfRingBuffer.GetLength();
if( GetRingBufferFreeSamples() < iNumSamples )
{
// @todo: only partly write
//std::cerr << "BUFFER_OVERRUN! Would partly write samples because ring buffer will be full then." << std::endl;
m_iWriteCursor = m_iReadCursor;
m_bRingBufferFull = false;
//outputFile << " incSomeWrite: ";
}
else
{
// write samples into ring buffer
m_sfRingBuffer.cyclic_write( sfNewSamples, iNumSamples, 0, iCurrentWriteCursor );
// return free BufferSize
if (iCurrentReadCursor > m_iWriteCursor) {
return m_iWriteCursor - iCurrentReadCursor;
// set write curser
m_iWriteCursor = ( m_iWriteCursor + iNumSamples ) % GetRingBufferSize( );
m_bRingBufferFull = true;
//outputFile << " IncWrite: ";
}
else {
return m_sfRingBuffer.GetLength() - m_iWriteCursor + iCurrentReadCursor;
}
//outputFile << "\tRead: " << m_iReadCursor;
//outputFile << "\tWrite : " << m_iWriteCursor;
//outputFile << "\tFreeSamples: " << GetRingBufferFreeSamples( ) << endl;
return GetRingBufferFreeSamples();
}
int CITANetAudioStream::GetRingbufferFreeSamples()
int CITANetAudioStream::GetRingBufferFreeSamples() const
{
int iFreeSamples = ( m_iWriteCursor - m_iReadCursor + GetRingBufferSize() ) % GetRingBufferSize();
if( m_bRingBufferFull )
return 0;
int iFreeSamples = GetRingBufferSize() - ( ( m_iWriteCursor - m_iReadCursor + GetRingBufferSize() ) % GetRingBufferSize() );
assert( iFreeSamples > 0 );
return iFreeSamples;
}
......@@ -97,6 +153,16 @@ int CITANetAudioStream::GetRingBufferSize() const
return m_sfRingBuffer.GetLength();
}
bool CITANetAudioStream::GetIsRingBufferFull() const
{
return m_bRingBufferFull;
}
bool CITANetAudioStream::GetIsRingBufferEmpty() const
{
return ( !m_bRingBufferFull && m_iReadCursor == m_iWriteCursor );
}
unsigned int CITANetAudioStream::GetBlocklength() const
{
return ( unsigned int ) m_sfOutputStreamBuffer.GetLength();
......
......@@ -12,6 +12,8 @@ CITANetAudioStreamingClient::CITANetAudioStreamingClient( CITANetAudioStream* pP
, m_pConnection( NULL )
, m_bStopIndicated( false )
{
m_pStreamProbe = new ITAStreamProbe( pParent, "output.wav" );
m_pClient = new CITANetAudioClient();
m_oParams.iChannels = pParent->GetNumberOfChannels();
......@@ -78,7 +80,7 @@ bool CITANetAudioStreamingClient::LoopBody()
m_pMessage->ResetMessage();
m_pMessage->SetConnection( m_pConnection );
m_pMessage->SetMessageType( CITANetAudioProtocol::NP_CLIENT_WAITING_FOR_SAMPLES );
m_pMessage->WriteInt( m_pStream->GetRingbufferFreeSamples() );
m_pMessage->WriteInt( m_pStream->GetRingBufferFreeSamples() );
m_pMessage->WriteMessage();
// Wait for answer of server
......@@ -89,19 +91,22 @@ bool CITANetAudioStreamingClient::LoopBody()
case CITANetAudioProtocol::NP_INVALID:
// Something went wrong
std::cerr << "Received invalid message type" << std::endl;
break;
case CITANetAudioProtocol::NP_SERVER_WAITING_FOR_TRIGGER:
// Wait until block increment is triggered by audio context (more free samples in ring buffer)
//std::cout << "Will wait for block increment" << std::endl;
m_oBlockIncrementEvent.WaitForEvent( true );
break;
case CITANetAudioProtocol::NP_SERVER_SEND_SAMPLES:
// Receive samples from net message and forward them to the stream ring buffer
m_pMessage->ReadSampleFrame( &m_sfReceivingBuffer );
if (m_pStream->GetRingbufferFreeSamples() >= m_sfReceivingBuffer.GetLength())
m_pStream->Transmit(m_sfReceivingBuffer, m_sfReceivingBuffer.GetLength());
//std::cout << "Receiving " << m_sfReceivingBuffer.GetLength() << " samples from streaming server" << std::endl;
if ( m_pStream->GetRingBufferFreeSamples( ) >= m_sfReceivingBuffer.GetLength( ) )
m_pStream->Transmit( m_sfReceivingBuffer, m_sfReceivingBuffer.GetLength( ) );
//else
// Fehler
......
......@@ -52,11 +52,16 @@ bool CITANetAudioStreamingServer::Start( const std::string& sAddress, int iPort
m_pInputStream->GetBlocklength() == oClientParams.iBlockSize)
{
bOK = true;
std::cout << " Alle Daten Stimmen: \nAnzahl Channel: " << oClientParams.iChannels << std::endl;
std::cout << "SampleRate: " << oClientParams.dSampleRate << std::endl;
std::cout << "Blockgroesse: " << oClientParams.iBlockSize << std::endl;
}
std::cout << " Client Data: \nAnzahl Channel: " << oClientParams.iChannels << std::endl;
std::cout << "SampleRate: " << oClientParams.dSampleRate << std::endl;
std::cout << "Blockgroesse: " << oClientParams.iBlockSize << std::endl;
std::cout << " Server Data: \nAnzahl Channel: " << m_pInputStream->GetNumberOfChannels() << std::endl;
std::cout << "SampleRate: " << m_pInputStream->GetSampleRate() << std::endl;
std::cout << "Blockgroesse: " << m_pInputStream->GetBlocklength() << std::endl;
m_pMessage->SetAnswerType( CITANetAudioProtocol::NP_SERVER_OPEN );
m_pMessage->WriteBool( bOK );
m_pMessage->WriteAnswer();
......@@ -117,41 +122,46 @@ bool CITANetAudioStreamingServer::LoopBody()
m_pMessage->SetConnection( m_pConnection );
m_pMessage->ReadMessage(); // blocking
switch( m_pMessage->GetMessageType() )
int iMsgType = m_pMessage->GetMessageType();
switch( iMsgType )
{
case CITANetAudioProtocol::NP_CLIENT_WAITING_FOR_SAMPLES:
{
int iFreeSamples = m_pMessage->ReadInt();
// std::cout << "Freie Samples: " << iFreeSamples << std::endl;
// std::cout << "Laenge InputStram: " << m_pInputStream->GetBlocklength() << std::endl;
if( iFreeSamples >= m_pInputStream->GetBlocklength() )
{
// Send Samples
for( int i = 0; i < m_pInputStream->GetNumberOfChannels(); i++ )
{
ITAStreamInfo oStreamInfo;
oStreamInfo.nSamples = m_sfTempTransmitBuffer.GetLength();
const float* pfData = m_pInputStream->GetBlockPointer( i, &oStreamInfo );
m_sfTempTransmitBuffer[ i ].write( pfData, m_pInputStream->GetBlocklength() );
if( pfData != 0 )
m_sfTempTransmitBuffer[ i ].write( pfData, m_sfTempTransmitBuffer.GetLength() );
}
m_pInputStream->IncrementBlockPointer();
m_pMessage->SetAnswerType( CITANetAudioProtocol::NP_SERVER_SEND_SAMPLES );
m_pMessage->WriteSampleFrame( &m_sfTempTransmitBuffer );
m_pMessage->WriteAnswer();
//std::cout << "Transmitted " << m_pInputStream->GetBlocklength() << " samples, because there where " << iFreeSamples << " free samples on client side" << std::endl;
}
else
{
//std::cout << "Could not transmitt, because there where only " << iFreeSamples << " free samples on client side" << std::endl;
// Waiting for Trigger
m_pMessage->SetAnswerType( CITANetAudioProtocol::NP_SERVER_WAITING_FOR_TRIGGER );
m_pMessage->WriteAnswer();
break;
}
m_pMessage->WriteAnswer();
m_pInputStream->IncrementBlockPointer();
float fTimeOut = m_pInputStream->GetBlocklength() / m_pInputStream->GetSampleRate();
//VistaTimeUtils::Sleep( (int) ( 1 * 100 ) );
break;
}
case CITANetAudioProtocol::NP_CLIENT_CLOSE:
{
m_pMessage->WriteAnswer();
m_pConnection = NULL;
StopGently( true );
......@@ -159,6 +169,12 @@ bool CITANetAudioStreamingServer::LoopBody()
return false;
}
default:
{
std::cout << "Unkown protocol type: " << iMsgType << std::endl;
break;
}
}
return true;
}
......
......@@ -5,22 +5,25 @@
#include <ITAPortaudioInterface.h>
#include <ITAStreamMultiplier1N.h>
#include <ITAException.h>
#include <ITAFileDatasource.h>
#include <ITAStreamProbe.h>
using namespace std;
static string g_sServerName = "localhost";
static int g_iServerPort = 12480;
static double g_dSampleRate = 44.1e3;
static double g_dSampleRate = 44100;
static int g_iBufferSize = 256;
int main( int , char** )
{
CITANetAudioStream oNetAudioStream( 1, g_dSampleRate, g_iBufferSize, 4 * g_iBufferSize );
ITAStreamMultiplier1N oMultiplier( &oNetAudioStream, 2 );
CITANetAudioStream oNetAudioStream( 2, g_dSampleRate, g_iBufferSize, 100 * g_iBufferSize );
ITAStreamProbe oProbe( &oNetAudioStream, "out_gutentag.wav" );
//ITAStreamMultiplier1N oMultiplier( &oProbe, 2 );
ITAPortaudioInterface ITAPA( g_dSampleRate, g_iBufferSize );
ITAPA.Initialize();
ITAPA.SetPlaybackDatasource( &oMultiplier );
ITAPA.SetPlaybackDatasource( &oProbe );
ITAPA.Open();
ITAPA.Start();
......@@ -55,6 +58,8 @@ int main( int , char** )
ITAPA.Stop();
ITAPA.Close();
ITAPA.Finalize();
return 0;
}
......@@ -4,20 +4,21 @@
#include <ITANetAudioStreamingServer.h>
#include <ITANetAudioServer.h>
#include <ITAStreamFunctionGenerator.h>
#include <ITAFileDatasource.h>
using namespace std;
static string g_sServerName = "localhost";
static int g_iServerPort = 12480;
static double g_dSampleRate = 44.1e3;
static double g_dSampleRate = 44100;
static int g_iBlockLength = 256;
int main( int , char** )
{
ITAStreamFunctionGenerator oGenerator( 1, g_dSampleRate, g_iBlockLength, ITAStreamFunctionGenerator::SINE, 456.78f, 0.81f, true );
ITAStreamFunctionGenerator oGenerator( 2, g_dSampleRate, g_iBlockLength, ITAStreamFunctionGenerator::SINE, 456.78f, 0.81f, true );
ITAFileDatasource oDatei("01_Empfang_Guten_Tag.wav", g_iBlockLength);
CITANetAudioStreamingServer oStreamingServer;
oStreamingServer.SetInputStream( &oGenerator );
oStreamingServer.SetInputStream(&oDatei);
cout << "Starting server and waiting for connections on '" << g_sServerName << "' on port " << g_iServerPort << endl;
oStreamingServer.Start( g_sServerName, g_iServerPort );
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment