Commit bfef6e2c authored by Dipl.-Ing. Jonas Stienen's avatar Dipl.-Ing. Jonas Stienen
Browse files

streaming class cleanup, API changes and more documentation

parent ae4fe6f9
......@@ -37,73 +37,143 @@ class ITABufferedDataLoggerImplNet;
//! Network audio stream
/**
* Audio streaming for a signal source that is connected via TCP/IP.
*
* \note not thread-safe
*/
* Audio streaming for a signal source that is connected via TCP/IP.
* The network audio stream behaves like a client and receives samples
* from a network audio stream server, CITANetAudioStreamingSearver.
*
* The stream will always work within a streaming context and will never
* block the streaming processing, because it is decoupled from the
* network connection and forwards samples from an internal ring buffer.
* If the buffer runs out of samples, zeros will be return. If the buffer
* overruns, the sample server will be suspendeb by blocking the network
* data flow.
*
* Latency can be managed by either providing a small ring buffer or
* oversizing the ring buffer and requesting a target latency.
*
* \note not thread-safe
*/
class ITA_DATA_SOURCES_API CITANetAudioStream : public ITADatasource
{
public:
//! Constructor of a network audio stream
CITANetAudioStream( int iChannels, double dSamplingRate, int iBufferSize, int iRingBufferCapacity = 2048 );
virtual ~CITANetAudioStream();
//! Network streaming status of client
enum StreamingStatus
{
INVALID = -1,
STOPPED,
CONNECTED,
STREAMING,
INVALID = -1, //!< Invalid status, for exception detection
STOPPED, //!< Client not connected to a server and streaming stopped, i.e. not receiving samples by choice
CONNECTED, //!< Client is connected to a sample server (and potentially receives samples)
STREAMING, //!<
BUFFER_UNDERRUN,
BUFFER_OVERRUN,
};
bool Connect( const std::string& sAddress, int iPort );
//! Connect a streaming server
/**
* @sAddress[in] Server address IP, i.e. 127.0.0.1
* @iPort[in] Server socket port, defaults to 12480
*/
bool Connect( const std::string& sAddress, int iPort = 12480 );
//! Returns the connection status
/**
* @return True, if connected
*/
bool GetIsConnected() const;
//! Returns (static) size of ring buffer
/**
* @return Number of maximum samples that can be hold by internal ring buffer
*/
int GetRingBufferSize() const;
//! Returns true if ring buffer is full
/**
* @return True, if ring buffer full (down to the last sample, not smaller that block size)
*/
bool GetIsRingBufferFull() const;
//! Returns true if ring buffer is empty
/**
* @return True, if ring buffer empty (down to the last sample, not smaller that block size)
*/
bool GetIsRingBufferEmpty() const;
//! Returns block size
unsigned int GetBlocklength() const;
//! Returns number of channels
unsigned int GetNumberOfChannels() const;
//! Returns sampling rate
double GetSampleRate() const;
//! Audio streaming block read for a given channel
/**
* This method is called by the streaming context for each channel, also providing stream infos.
* It copies samples out of the ring buffer, but does not touch the read/write cursors.
*/
const float* GetBlockPointer( unsigned int uiChannel, const ITAStreamInfo* );
//! Audio streaming block increment
/**
* This method updates the read curser and forwards it by one block. This operations
* frees samples out of the ring buffer. It also triggersan event
* to indicate that the ring buffer readable sample number has been decreased.
* Depending on the network streaming settings, the trigger will be forwarded
* to the server to inform about the number of free samples, that can be re-filled.
*/
void IncrementBlockPointer();
protected:
//! This method is called by the streaming client and pushes sampes into the ring buffer
//! This method is called by the networkg client and pushes samples into the ring buffer
/**
* \param sfNewSamples Sample buffer (multi channel) with sample data
* If samples fit ring buffer, the rite curser will be increased by iNumSamples, hence
* the number of free (writable) samples decreases.
*
* @param[in] sfNewSamples Sample frame with new samples to be appended to the ring buffer
* \param iNumSamples samples to be read from the sample frame (must be smaller or equal length)
*
* \return Number of free samples in ring buffer
* @return Number of free samples in ring buffer
*
* @note This method is not called out of the audio streaming context but out of the network context.
*/
int Transmit( const ITASampleFrame& sfNewSamples, int iNumSamples );
//! Returns samples that can be read from ring buffer
/**
* @return Readable samples between read and write cursor.
*/
int GetRingBufferAvailableSamples() const;
//! Returns free samples between write and read cursor
/**
* @return Free samples between write and read cursor.
*/
int GetRingBufferFreeSamples() const;
private:
CITANetAudioStreamingClient* m_pNetAudioStreamingClient;
CITANetAudioStreamingClient* m_pNetAudioStreamingClient; //!< Audio streaming network client
double m_dSampleRate;
ITASampleFrame m_sfOutputStreamBuffer;
double m_dSampleRate; //!< Sampling rate
ITASampleFrame m_sfOutputStreamBuffer; //!< Output samples temp buffer (audio context)
int m_iReadCursor; //!< Cursor where samples will be consumed from ring buffer on next block
int m_iWriteCursor; //!< Cursor where samples will be fed into ring buffer from net audio producer (always ahead)
bool m_bRingBufferFull; //!< Indicator if ring buffer is full (and read cursor equals write cursor)
ITASampleFrame m_sfRingBuffer; //!< Buffer incoming data
ITASampleFrame m_sfRingBuffer; //!< Ring buffer
int m_iStreamingStatus; //!< Current streaming status
friend class CITANetAudioStreamingClient;
ITABufferedDataLoggerImplStream* m_pStreamLogger;
ITABufferedDataLoggerImplNet* m_pNetLogger;
int iID;
ITABufferedDataLoggerImplStream* m_pStreamLogger; //!< Logging for the audio stream
ITABufferedDataLoggerImplNet* m_pNetLogger; //!< Logging for the network stream
int iAudioStreamingBlockID; //!< Audio streaming block id
int iNetStreamingBlockID; //!< Network streaming block id
friend class CITANetAudioStreamingClient;
};
#endif // INCLUDE_WATCHER_ITA_NET_AUDIO_STREAM
......@@ -12,30 +12,49 @@
// STL
#include <cmath>
#include <iostream>
struct ITAStreamLog : public ITALogDataBase {
static std::ostream& outputDesc( std::ostream& os ) {
os << "BlockId " << "\tTimeStamp" << "\tStreamingStatus" << std::endl;
//! Audio streaming log item
struct ITAStreamLog : public ITALogDataBase
{
inline static std::ostream& outputDesc( std::ostream& os )
{
os << "BlockId";
os << "\t" << "TimeCode";
os << "\t" << "StreamingStatus";
os << "\t" << "FreeSamples";
os << std::endl;
return os;
};
virtual std::ostream& outputData( std::ostream& os ) const {
os << "StreamLog\t" << uiBlockId << "\t" << std::setprecision( 12 ) << dTimecode << "\t" << iStreamingStatus << "\t" << iFreeSamples << std::endl;
inline std::ostream& outputData( std::ostream& os ) const
{
os << uiBlockId;
os << "\t" << std::setprecision( 12 ) << dTimecode;
os << "\t" << iStreamingStatus;
os << "\t" << iFreeSamples;
os << std::endl;
return os;
};
unsigned int uiBlockId;
int iStreamingStatus;
unsigned int uiBlockId; //!< Block identifier (audio streaming)
int iStreamingStatus; //!< ... usw
double dTimecode;
int iFreeSamples;
};
struct ITANetLog : public ITALogDataBase {
static std::ostream& outputDesc( std::ostream& os ) {
//! Network streaming log item
struct ITANetLog : public ITALogDataBase
{
inline static std::ostream& outputDesc( std::ostream& os )
{
os << "BlockId " << "\tTimeStamp" << "\tBufferstatus" << std::endl;
return os;
};
virtual std::ostream& outputData( std::ostream& os ) const {
inline std::ostream& outputData( std::ostream& os ) const
{
os << "NetLog\t"<< uiBlockId << "\t" << std::setprecision( 12 ) << dTimecode << "\t" << iBufferStatus << "\t" << iFreeSamples << std::endl;
return os;
};
......@@ -46,12 +65,10 @@ struct ITANetLog : public ITALogDataBase {
int iFreeSamples;
};
class ITABufferedDataLoggerImplStream : public ITABufferedDataLogger<ITAStreamLog> {
public:
};
class ITABufferedDataLoggerImplNet : public ITABufferedDataLogger<ITANetLog> {
public:
};
class ITABufferedDataLoggerImplStream : public ITABufferedDataLogger < ITAStreamLog > {};
class ITABufferedDataLoggerImplNet : public ITABufferedDataLogger < ITANetLog > {};
CITANetAudioStream::CITANetAudioStream( int iChannels, double dSamplingRate, int iBufferSize, int iRingBufferCapacity )
: m_sfOutputStreamBuffer( iChannels, iBufferSize, true )
......@@ -70,12 +87,15 @@ CITANetAudioStream::CITANetAudioStream( int iChannels, double dSamplingRate, int
m_iWriteCursor = 0; // always ahead, i.e. iWriteCursor >= iReadCursor if unwrapped
m_iStreamingStatus = STOPPED;
// Logging
m_pStreamLogger = new ITABufferedDataLoggerImplStream( );
m_pStreamLogger->setOutputFile( "NetAudioStream.log" );
m_pNetLogger = new ITABufferedDataLoggerImplNet( );
m_pNetLogger->setOutputFile( "NetAudioNet.log" );
iID = 0;
m_pStreamLogger = new ITABufferedDataLoggerImplStream();
m_pStreamLogger->setOutputFile( "NetAudioLogStream.txt" );
iAudioStreamingBlockID = 0;
m_pNetLogger = new ITABufferedDataLoggerImplNet();
m_pNetLogger->setOutputFile( "NetAudioLogNet.txt" );
iNetStreamingBlockID = 0;
}
CITANetAudioStream::~CITANetAudioStream()
......@@ -87,10 +107,7 @@ CITANetAudioStream::~CITANetAudioStream()
bool CITANetAudioStream::Connect( const std::string& sAddress, int iPort )
{
bool bConnected = m_pNetAudioStreamingClient->Connect( sAddress, iPort );
if( bConnected )
m_iStreamingStatus = CONNECTED;
return bConnected;
return m_pNetAudioStreamingClient->Connect( sAddress, iPort );
}
bool CITANetAudioStream::GetIsConnected() const
......@@ -100,35 +117,44 @@ bool CITANetAudioStream::GetIsConnected() const
const float* CITANetAudioStream::GetBlockPointer( unsigned int uiChannel, const ITAStreamInfo* pInfo )
{
if ( !GetIsConnected( ) )
if( !GetIsConnected() )
{
m_sfOutputStreamBuffer[ uiChannel ].Zero( );
return m_sfOutputStreamBuffer[ uiChannel ].GetData( );
}
if ( GetIsRingBufferEmpty( ) )
{
m_sfOutputStreamBuffer[ uiChannel ].Zero( );
}
// Es ist mindestens ein Block da
else
{
// Es ist mindestens ein Block da
m_sfRingBuffer[ uiChannel ].cyclic_read( m_sfOutputStreamBuffer[ uiChannel ].GetData( ), m_sfOutputStreamBuffer.GetLength( ), m_iReadCursor );
// weniger als ein Block
// todo fading
if( GetIsRingBufferEmpty() )
{
m_sfOutputStreamBuffer[ uiChannel ].Zero();
m_iStreamingStatus = BUFFER_UNDERRUN;
}
else
{
if( GetRingBufferAvailableSamples() < GetBlocklength() )
{
// @todo: fade out
m_sfRingBuffer[ uiChannel ].Zero();
m_iStreamingStatus = BUFFER_UNDERRUN;
}
else
{
// Normal behaviour (if everything is OK with ring buffer status)
m_sfRingBuffer[ uiChannel ].cyclic_read( m_sfOutputStreamBuffer[ uiChannel ].GetData(), GetBlocklength(), m_iReadCursor );
m_iStreamingStatus = STREAMING;
}
}
}
if ( uiChannel == 0 )
{
ITAStreamLog oLog;
oLog.iStreamingStatus = m_iStreamingStatus;
oLog.dTimecode = ITAClock::getDefaultClock( )->getTime( );
iID++;
oLog.uiBlockId = iID;
oLog.uiBlockId = ++iAudioStreamingBlockID;
oLog.iFreeSamples = GetRingBufferFreeSamples( );
m_pStreamLogger->log( oLog );
}
return m_sfOutputStreamBuffer[uiChannel].GetData();
}
......@@ -145,11 +171,11 @@ void CITANetAudioStream::IncrementBlockPointer()
}
else if ( GetIsRingBufferEmpty( ) )
{
m_iStreamingStatus = STOPPED;
m_iStreamingStatus = BUFFER_UNDERRUN;
}
else
{
m_iStreamingStatus = BUFFER_UNDERRUN;
m_iStreamingStatus = BUFFER_OVERRUN;
m_iReadCursor = m_iWriteCursor;
}
m_bRingBufferFull = false;
......@@ -197,14 +223,20 @@ int CITANetAudioStream::Transmit( const ITASampleFrame& sfNewSamples, int iNumSa
}
oLog.dTimecode = ITAClock::getDefaultClock( )->getTime( );
iID++;
oLog.uiBlockId = iID;
iAudioStreamingBlockID++;
oLog.uiBlockId = iAudioStreamingBlockID;
oLog.iFreeSamples = GetRingBufferFreeSamples( );
m_pNetLogger->log( oLog );
return GetRingBufferFreeSamples();
}
int CITANetAudioStream::GetRingBufferAvailableSamples() const
{
return GetRingBufferSize() - GetRingBufferFreeSamples();
}
int CITANetAudioStream::GetRingBufferFreeSamples() const
{
if( m_bRingBufferFull )
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment