...
 
Commits (119)
...@@ -110,23 +110,22 @@ endif( ) ...@@ -110,23 +110,22 @@ endif( )
if( ITA_DATA_SOURCES_WITH_NET_AUDIO ) if( ITA_DATA_SOURCES_WITH_NET_AUDIO )
list( APPEND ITADataSourcesHeader list( APPEND ITADataSourcesHeader
"include/ITANetAudioClient.h"
"include/ITANetAudioMessage.h"
"include/ITANetAudioProtocol.h"
"include/ITANetAudioServer.h"
"include/ITANetAudioStream.h" "include/ITANetAudioStream.h"
"include/ITANetAudioStreamingClient.h" "include/ITANetAudioSampleServer.h"
"include/ITANetAudioStreamingServer.h" "include/ITANetAudioStreamingServer.h"
"include/ITANetAudioStreamingClient.h"
) )
list( APPEND ITADataSourcesSources list( APPEND ITADataSourcesSources
"src/ITANetAudioClient.cpp" "src/ITANetAudioClient.cpp"
"src/ITANetAudioClient.h"
"src/ITANetAudioMessage.cpp" "src/ITANetAudioMessage.cpp"
"src/ITANetAudioProtocol.cpp" "src/ITANetAudioMessage.h"
"src/ITANetAudioProtocol.h"
"src/ITANetAudioServer.cpp"
"src/ITANetAudioServer.h"
"src/ITANetAudioStream.cpp" "src/ITANetAudioStream.cpp"
"src/ITANetAudioStreamingClient.cpp" "src/ITANetAudioStreamingClient.cpp"
"src/ITANetAudioStreamingClient.h"
"src/ITANetAudioStreamingServer.cpp" "src/ITANetAudioStreamingServer.cpp"
"src/ITANetAudioServer.cpp"
) )
endif( ) endif( )
......
/*
* ----------------------------------------------------------------
*
* ITA core libs
* (c) Copyright Institute of Technical Acoustics (ITA)
* RWTH Aachen University, Germany, 2015-2017
*
* ----------------------------------------------------------------
* ____ __________ _______
* // / //__ ___/ // _ |
* // / // / // /_| |
* // / // / // ___ |
* //__/ //__/ //__/ |__|
*
* ----------------------------------------------------------------
*
*/
#ifndef INCLUDE_WATCHER_ITA_NET_AUDIO_SAMPLE_SERVER
#define INCLUDE_WATCHER_ITA_NET_AUDIO_SAMPLE_SERVER
#include <ITADataSourcesDefinitions.h>
#include <ITANetAudioStreamingServer.h>
#include <ITADataSourceRealization.h>
//! Sample-generation class with abstract method for providing samples
/*
* This ready-to-use class helps to provide samples for a NetAudio streaming server with
* a single method for processing that has to be implemented ...
* ... just derive and implement Process() method. Have a look at Zero() method
* for exemplary usage of sample buffer.
*/
class CITASampleProcessor : public ITADatasourceRealization
{
public:
//! Create a sample processor with streaming parameters
/*
* @param[in] iNumChannels Channels provided
* @param[in] dSampleRate Audio processing sampling rate
* @param[in] iBlockLength Audio processing block length / buffer size
*/
inline CITASampleProcessor( const int iNumChannels, const double dSampleRate, const int iBlockLength )
: ITADatasourceRealization( ( unsigned int ) ( iNumChannels ), dSampleRate, ( unsigned int ) ( iBlockLength ) )
{
m_vvfSampleBuffer.resize( iNumChannels );
for( size_t c = 0; c < iNumChannels; c++ )
m_vvfSampleBuffer[ c ].resize( iBlockLength );
Zero();
};
inline ~CITASampleProcessor()
{
};
//! Sets all channels and samples to zero
inline void Zero()
{
/*
* Use this as an example how to work with the buffer structure.
*/
// Iterate over channels
for( size_t c = 0; c < m_vvfSampleBuffer.size(); c++ )
{
std::vector< float >& vfSingleChannelSampleBuffer( m_vvfSampleBuffer[ c ] ); // One channel
// Iterate over samples of channel
for( size_t n = 0; n < vfSingleChannelSampleBuffer.size(); n++ )
{
float& fSample( vfSingleChannelSampleBuffer[ n ] ); // One sample
fSample = 0.0f; // -> Manipulation
}
}
};
//! Process samples (overwrite this virtual method)
/**
* Method that is called in audio streaming context and requests
* to produce or copy audio samples into the internal buffer m_vvfSampleBuffer
*
* @param[in] pStreamInfo Information over streaming status, i.e. sample count and time stamp
*
*/
virtual void Process( const ITAStreamInfo* pStreamInfo ) =0;
protected:
std::vector< std::vector< float > > m_vvfSampleBuffer; //!< Multi-channel sample buffer to be filled
private:
//! Delegate internal buffer to audio stream (ITADatasource)
inline void ProcessStream( const ITAStreamInfo* pInfo )
{
Process( pInfo );
for( size_t c = 0; c < m_vvfSampleBuffer.size(); c++ )
{
float* pfData = GetWritePointer( ( unsigned int ) ( c ) );
for( size_t n = 0; n < m_vvfSampleBuffer[ c ].size(); n++ )
pfData[ n ] = m_vvfSampleBuffer[ c ][ n ];
}
IncrementWritePointer();
};
};
//! Network audio sample server (for providing samples via derived generator class)
/**
* Audio sample transmitter for a networked sample callback function that can connect via TCP/IP.
*
* @sa CITANetAudioStream CITANetAudioStreamingServer CITASampleProcessor
* @note not thread-safe
*/
class CITANetAudioSampleServer : public CITANetAudioStreamingServer
{
public:
inline CITANetAudioSampleServer( CITASampleProcessor* pProcessor )
: m_pSampleProcessor( pProcessor )
{
SetInputStream( m_pSampleProcessor );
};
inline ~CITANetAudioSampleServer()
{};
private:
//! Prohibit public access to streaming context and delegate
inline void SetInputStream( ITADatasource* pDataSource )
{
CITANetAudioStreamingServer::SetInputStream( pDataSource );
};
//! Prohibit public access to streaming context and delegate
inline ITADatasource* GetInputStream() const
{
return CITANetAudioStreamingServer::GetInputStream();
};
CITASampleProcessor* m_pSampleProcessor; //!< Callback / sample processor
};
#endif // INCLUDE_WATCHER_ITA_NET_AUDIO_SAMPLE_SERVER
...@@ -26,9 +26,6 @@ ...@@ -26,9 +26,6 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <iostream>
#include <fstream>
using namespace std;
class CITANetAudioStreamingClient; class CITANetAudioStreamingClient;
...@@ -38,7 +35,7 @@ class ITABufferedDataLoggerImplAudio; ...@@ -38,7 +35,7 @@ class ITABufferedDataLoggerImplAudio;
//! Network audio stream //! Network audio stream
/** /**
* Audio streaming for a signal source that is connected via TCP/IP. * Audio streaming for a signal source that is connected via TCP/IP or UDP.
* The network audio stream behaves like a client and receives samples * The network audio stream behaves like a client and receives samples
* from a network audio stream server, CITANetAudioStreamingSearver. * from a network audio stream server, CITANetAudioStreamingSearver.
* *
...@@ -46,11 +43,13 @@ class ITABufferedDataLoggerImplAudio; ...@@ -46,11 +43,13 @@ class ITABufferedDataLoggerImplAudio;
* block the streaming processing, because it is decoupled from the * block the streaming processing, because it is decoupled from the
* network connection and forwards samples from an internal ring buffer. * network connection and forwards samples from an internal ring buffer.
* If the buffer runs out of samples, zeros will be return. If the buffer * If the buffer runs out of samples, zeros will be return. If the buffer
* overruns, the sample server will be suspendeb by blocking the network * overruns, the sample server will be suspended by blocking the network
* data flow. * data flow.
* *
* Latency can be managed by either providing a small ring buffer or * Latency can be managed by either providing a small ring buffer and
* oversizing the ring buffer and requesting a target latency. * constantly filling it uo, or by oversizing the internal ring buffer
* only pushing samples to meet a target latency. This has to be
* implemented by the server.
* *
* \note not thread-safe * \note not thread-safe
*/ */
...@@ -58,7 +57,21 @@ class ITA_DATA_SOURCES_API CITANetAudioStream : public ITADatasource ...@@ -58,7 +57,21 @@ class ITA_DATA_SOURCES_API CITANetAudioStream : public ITADatasource
{ {
public: public:
//! Constructor of a network audio stream //! Constructor of a network audio stream
CITANetAudioStream( int iChannels, double dSamplingRate, int iBufferSize, int iRingBufferCapacity = 2048 ); /**
* @param[in] iChannels Number of channels
* @param[in] dSamplingRate Sampling rate
* @param[in] iBufferSize Size of audio streaming buffer
* @param[in] iRingBufferCapacity Internal ring buffer
*
* The ring buffer capacity should be roughly 6-10 buffer sizes long for short audio streaming buffers,
* and can go down to one block in case of higher audio buffer sizes.
*
* The streaming parameters have to match with the server settings (yes also buffer size, that of the audio streaming context)
*
* @note Accept for more memory usage, oversizing the buffer does not require more CPU.
*/
CITANetAudioStream( const int iChannels, const double dSamplingRate, const int iBufferSize, const int iRingBufferCapacity = 2048 );
virtual ~CITANetAudioStream(); virtual ~CITANetAudioStream();
//! Network streaming status of client //! Network streaming status of client
...@@ -68,16 +81,20 @@ public: ...@@ -68,16 +81,20 @@ public:
STOPPED, //!< Client not connected to a server and streaming stopped, i.e. not receiving samples by choice STOPPED, //!< Client not connected to a server and streaming stopped, i.e. not receiving samples by choice
CONNECTED, //!< Client is connected to a sample server (and potentially receives samples) CONNECTED, //!< Client is connected to a sample server (and potentially receives samples)
STREAMING, //!< STREAMING, //!<
BUFFER_UNDERRUN, BUFFER_UNDERRUN, //!< Client internal audio buffer ran out of samples
BUFFER_OVERRUN, BUFFER_OVERRUN, //!< Client internal audio ring buffer is full
}; };
//! Connect a streaming server //! Connect a streaming server
/** /**
* @sAddress[in] Server address IP, i.e. 127.0.0.1 * @sAddress[in] Server address IP (127.0.0.1, localhost, etc.)
* @iPort[in] Server socket port, defaults to 12480 * @iPort[in] Server socket port, defaults to 12480
* @return True, if connection could be established and streaming parameters match
*/ */
bool Connect( const std::string& sAddress, int iPort = 12480 ); bool Connect( const std::string& sAddress, const int iPort = 12480, const bool bUseUDP = false );
//! Disconnct safely from server
void Disconnect();
//! Returns the connection status //! Returns the connection status
/** /**
...@@ -85,43 +102,47 @@ public: ...@@ -85,43 +102,47 @@ public:
*/ */
bool GetIsConnected() const; bool GetIsConnected() const;
//! Set allowed latency (s) //! Returns the minimal latency possible (single block)
/** /**
* Sets the latency that will be used for reading and writing from ring buffer. * @return Minimum latency in seconds
* New samples will be requested and send if the latency / ring buffer samples
* is lower than the target latency.
*/
void SetAllowedLatencySeconds( float fLatencySeconds );
void SetAllowedLatencySamples( int iLatencySamples );
float GetAllowedLatencySeconds() const;
int GetAllowedLatencySamples() const;
//! Sets the minimal latency possible
/**
* Real-time network audio is considered to process at lowest latency possible.
* However, this implementation requires at least one block. Hence latency is
* depending on sampling rate and block length.
*
* @sa GetMinimumLatencySamples()
* @sa GetMinimumLatencySamples()
*/ */
float GetMinimumLatencySeconds() const; float GetMinimumLatencySeconds() const;
//! Returns the maximum latency possible (entire ring buffer used)
/**
* @return Maximum latency in seconds
*/
float GetMaximumLatencySeconds() const; float GetMaximumLatencySeconds() const;
//! Returns the minimum latency possible (single block)
/**
* @return Minimum latency in samples
*/
int GetMinimumLatencySamples() const; int GetMinimumLatencySamples() const;
//! Returns the maximum latency possible (entire ring buffer used)
/**
* @return Maximum latency in samples
*/
int GetMaximumLatencySamples() const; int GetMaximumLatencySamples() const;
//! Sets the latency for real-time processing //! Returns the NetAudio streaming logger base name
std::string GetNetAudioStreamLoggerBaseName() const;
//! Sets the NetAudio streaming logger base name
/** /**
* Real-time network audio is considered to process at lowest latency possible. * If debugging is enabled, all debugging files will be named
* However, this implementation requires at least one block. Hence latency is * with this suffix.
* depending on sampling rate and block length. This method basically * @param[in] sBaseName Base name string
* sets the minimum allowed latency to this value. *
*
* @sa GetMinimumLatencySeconds()
* @sa SetAllowedLatencySeconds()
*/ */
void SetLatencyForRealtime(); void SetNetAudioStreamingLoggerBaseName( const std::string& sBaseName );
//! Enabled/disables export of loggers
void SetDebuggingEnabled( bool bEnabled );
//! Logging export flag getter
bool GetIsDebuggingEnabled() const;
//! Returns (static) size of ring buffer //! Returns (static) size of ring buffer
/** /**
...@@ -167,6 +188,7 @@ public: ...@@ -167,6 +188,7 @@ public:
*/ */
void IncrementBlockPointer(); void IncrementBlockPointer();
protected: protected:
//! This method is called by the networkg client and pushes samples into the ring buffer //! This method is called by the networkg client and pushes samples into the ring buffer
/** /**
...@@ -180,7 +202,7 @@ protected: ...@@ -180,7 +202,7 @@ protected:
* *
* @note This method is not called out of the audio streaming context but out of the network context. * @note This method is not called out of the audio streaming context but out of the network context.
*/ */
int Transmit( const ITASampleFrame& sfNewSamples, int iNumSamples ); int Transmit( const ITASampleFrame& sfNewSamples, const int iNumSamples );
//! Returns samples that can be read from ring buffer //! Returns samples that can be read from ring buffer
/** /**
...@@ -194,6 +216,9 @@ protected: ...@@ -194,6 +216,9 @@ protected:
*/ */
int GetRingBufferFreeSamples() const; int GetRingBufferFreeSamples() const;
//! Returns a string for the streaming status identifier
static std::string GetStreamingStatusString( const int iStreamingStatus );
private: private:
CITANetAudioStreamingClient* m_pNetAudioStreamingClient; //!< Audio streaming network client CITANetAudioStreamingClient* m_pNetAudioStreamingClient; //!< Audio streaming network client
...@@ -204,16 +229,17 @@ private: ...@@ -204,16 +229,17 @@ private:
int m_iWriteCursor; //!< Cursor where samples will be fed into ring buffer from net audio producer (always ahead) int m_iWriteCursor; //!< Cursor where samples will be fed into ring buffer from net audio producer (always ahead)
bool m_bRingBufferFull; //!< Indicator if ring buffer is full (and read cursor equals write cursor) bool m_bRingBufferFull; //!< Indicator if ring buffer is full (and read cursor equals write cursor)
ITASampleFrame m_sfRingBuffer; //!< Ring buffer ITASampleFrame m_sfRingBuffer; //!< Ring buffer
int m_iTargetSampleLatency; //!< Maximum allowed samples / target sample latency
int m_iStreamingStatus; //!< Current streaming status int m_iStreamingStatus; //!< Current streaming status
double m_dLastStreamingTimeCode; double m_dLastStreamingTimeCode;
ITABufferedDataLoggerImplAudio* m_pAudioLogger; //!< Logging for the audio stream ITABufferedDataLoggerImplStream* m_pAudioStreamLogger; //!< Logging for the audio stream
ITABufferedDataLoggerImplStream* m_pStreamLogger; //!< Logging for the audio stream ITABufferedDataLoggerImplNet* m_pNetworkStreamLogger; //!< Logging for the network stream
ITABufferedDataLoggerImplNet* m_pNetLogger; //!< Logging for the network stream std::string m_sNetAudioStreamLoggerBaseName;
int iAudioStreamingBlockID; //!< Audio streaming block id bool m_bDebuggingEnabled;
int iNetStreamingBlockID; //!< Network streaming block id
int m_iAudioStreamingBlockID; //!< Audio streaming block id
int m_iNetStreamingBlockID; //!< Network streaming block id
friend class CITANetAudioStreamingClient; friend class CITANetAudioStreamingClient;
}; };
......
...@@ -21,60 +21,72 @@ ...@@ -21,60 +21,72 @@
#include <ITADataSourcesDefinitions.h> #include <ITADataSourcesDefinitions.h>
#include <ITANetAudioProtocol.h> #include <ITASampleFrame.h>
#include <ITAStopWatch.h>
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
#include <string> #include <string>
#include <vector> #include <vector>
#include <iostream> #include <iostream>
#include <fstream>
#include <ITANetAudioProtocol.h>
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
#include <ITASampleFrame.h>
class ITADatasource; class ITADatasource;
class CITANetAudioMessage; class CITANetAudioMessage;
class CITANetAudioProtocol; class CITANetAudioProtocol;
class CITANetAudioServer; class CITANetAudioServer;
class CITANetAudioMessage; class CITANetAudioMessage;
class VistaTCPSocket; class CITABufferedDataLoggerImplServer;
class VistaConnectionIP;
//! Network audio sample server (for connecting a net audio stream)
//! Network audio streaming server (for connecting a net audio stream) with an ITADataSource connection
/** /**
* Audio sample transmitter for a networked signal source that can connect via TCP/IP. * Audio sample transmitter for a networked signal source that can connect via TCP/IP.
* *
* \sa CITANetAudioStream * @sa CITANetAudioStream
* \note not thread-safe * @note not thread-safe
*/ */
class ITA_DATA_SOURCES_API CITANetAudioStreamingServer : public VistaThreadLoop class ITA_DATA_SOURCES_API CITANetAudioStreamingServer : public VistaThreadLoop
{ {
public: public:
enum UpdateStrategy
{
AUTO = 1, //!< Automatic update rate based on sample rate and block length of client (default)
ADAPTIVE, //!< Adaptive update rate, adjusts for drifting clocks
CONSTANT, //!< Set a user-defined update rate (may cause forced pausing of sample feeding or dropouts on client side)
};
CITANetAudioStreamingServer(); CITANetAudioStreamingServer();
virtual ~CITANetAudioStreamingServer() {}; ~CITANetAudioStreamingServer();
bool Start( const std::string& sAddress, int iPort ); //! Start to listen on a socket (blocking)
bool Start( const std::string& sAddress, const int iPort, const double dTimeIntervalCientSendStatus, const bool bUseUDP = false );
bool IsClientConnected() const; bool IsClientConnected() const;
std::string GetNetworkAddress() const; std::string GetNetworkAddress() const;
int GetNetworkPort() const; int GetNetworkPort() const;
bool LoopBody();
void Stop(); void Stop();
void SetInputStream( ITADatasource* pInStream ); void SetInputStream( ITADatasource* pInStream );
int GetNetStreamBlocklength() const; int GetNetStreamBlocklength() const;
int GetNetStreamNumberOfChannels() const; int GetNetStreamNumberOfChannels( ) const;
double GetNetStreamSampleRate() const; double GetNetStreamSampleRate( ) const;
double GetEstimatedCorrFactor( ) const;
void SetEstimatedCorrFactor( double dcorrFactor );
//! Enabled/disables export of loggers
void SetDebuggingEnabled( bool bEnabled );
//! Logging export flag getter
bool GetIsDebuggingEnabled() const;
int GetSendingBlockLength() const;
void SetSendingBlockLength( const int iSendingBlockLength );
void SetAutomaticUpdateRate(); void SetTargetLatencySamples( const int iTargetLatency );
int GetTargetLatencySamples() const;
void SetServerLogBaseName( const std::string& sBaseName );
std::string GetServerLogBaseName() const;
bool LoopBody();
protected: protected:
ITADatasource* GetInputStream() const; ITADatasource* GetInputStream() const;
...@@ -85,12 +97,25 @@ private: ...@@ -85,12 +97,25 @@ private:
ITADatasource* m_pInputStream; ITADatasource* m_pInputStream;
VistaConnectionIP* m_pConnection; VistaConnectionIP* m_pConnection;
CITANetAudioProtocol::StreamingParameters m_oServerParams; CITANetAudioMessage* m_pMessage;
CITANetAudioMessage* m_pIncomingMessage;
CITANetAudioMessage* m_pOutgoingMessage; CITABufferedDataLoggerImplServer* m_pServerLogger;
std::string m_sServerLogBaseName;
ITAStopWatch m_swTryReadBlockStats, m_swTryReadAccessStats;
bool m_bDebuggingEnabled;
int m_iServerBlockId;
double m_dLastTimeStamp;
double m_dEstimatedCorrFactor;
int m_iTargetLatencySamples;
int m_iEstimatedClientRingBufferFreeSamples;
int m_iClientRingBufferSize;
int m_iSendingBlockLength;
int m_iMaxSendBlocks;
int m_iUpdateStrategy; double m_dStreamTimeStart; //!< Stream time start
int m_iClientRingBufferFreeSamples; long unsigned int m_nStreamSampleCounts; //!< Samples that has been streamed
friend class CITANetAudioServer; friend class CITANetAudioServer;
}; };
......
// $Id: $ /*
* ----------------------------------------------------------------
*
* ITA core libs
* (c) Copyright Institute of Technical Acoustics (ITA)
* RWTH Aachen University, Germany, 2015-2017
*
* ----------------------------------------------------------------
* ____ __________ _______
* // / //__ ___/ // _ |
* // / // / // /_| |
* // / // / // ___ |
* //__/ //__/ //__/ |__|
*
* ----------------------------------------------------------------
*
*/
#ifndef __ITA_STREAM_INFO_H__ #ifndef INCLUDE_WATCHER_ITA_STREAM_INFO
#define __ITA_STREAM_INFO_H__ #define INCLUDE_WATCHER_ITA_STREAM_INFO
#include <ITATypes.h> #include <ITATypes.h>
// Diese Datenklasse beschreibt den Zustand eines Audiostreams //! Time code information for audio streams
class ITAStreamInfo { class ITAStreamInfo
{
public: public:
// Anzahl der abgespielten Samples seit Beginn des Streamings uint64_t nSamples; //!< Number of samples processed
uint64_t nSamples; double dStreamTimeCode; //!< Stream time code (starts with zero)
double dSysTimeCode; //!< System time stamp code (begings with current time stamp of system)
// TODO: Beschreiben inline ITAStreamInfo()
double dTimecode; : nSamples( 0 )
, dStreamTimeCode( 0.0f )
, dSysTimeCode( 0.0f )
{};
//! Standard-Konstruktor (setzt alle Werte 0) inline virtual ~ITAStreamInfo() {};
ITAStreamInfo() : nSamples(0), dTimecode(0) {}
//! Destruktor
virtual ~ITAStreamInfo() {};
}; };
#endif // __ITA_STREAM_INFO_H__ #endif // INCLUDE_WATCHER_ITA_STREAM_INFO
...@@ -220,7 +220,7 @@ private: ...@@ -220,7 +220,7 @@ private:
ITADatasource* pDatasource; //!< Datasource assigned to the input ITADatasource* pDatasource; //!< Datasource assigned to the input
std::vector< const float* > vpfInputData; //!< Pointers to the next stream blocks std::vector< const float* > vpfInputData; //!< Pointers to the next stream blocks
inline InputDesc( const int iChannels, const int iBlockLength ) inline InputDesc( const int iChannels, const int )
: vpfInputData( iChannels, nullptr ) : vpfInputData( iChannels, nullptr )
, iChannels( iChannels ) , iChannels( iChannels )
, fCurrentGain( 1.0f ) , fCurrentGain( 1.0f )
......
This diff is collapsed.
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <ITAException.h> #include <ITAException.h>
#include <ITAFunctors.h> #include <ITAFunctors.h>
#include <ITANumericUtils.h> #include <ITANumericUtils.h>
#include <ITAClock.h>
#ifndef WIN32 #ifndef WIN32
#include <memory.h> #include <memory.h>
...@@ -98,7 +99,8 @@ void ITABufferDataSink::Transfer( unsigned int uiSamples ) ...@@ -98,7 +99,8 @@ void ITABufferDataSink::Transfer( unsigned int uiSamples )
m_uiWriteCursor += m; m_uiWriteCursor += m;
m_siState.nSamples += m_pdsSource->GetBlocklength(); m_siState.nSamples += m_pdsSource->GetBlocklength();
m_siState.dTimecode = ( double ) ( m_siState.nSamples ) / m_pdsSource->GetSampleRate(); m_siState.dStreamTimeCode = ( double ) ( m_siState.nSamples ) / m_pdsSource->GetSampleRate();
m_siState.dSysTimeCode = ITAClock::getDefaultClock()->getTime();
m_pdsSource->IncrementBlockPointer(); m_pdsSource->IncrementBlockPointer();
} }
......
#include "ITADataSourceRealization.h" #include "ITADataSourceRealization.h"
#include <cassert>
#include <ITAFastMath.h> #include <ITAFastMath.h>
#include <cassert>
/* ITADatasourceRealization::ITADatasourceRealization( unsigned int uiChannels, double dSamplerate, unsigned int uiBlocklength, unsigned int uiCapacity )
ITADatasourceRealization::ITADatasourceRealization(unsigned int uiChannels,
unsigned int uiBlocklength,
unsigned int uiCapacity)
{
Init(uiChannels, uiBlocklength, uiCapacity);
}
*/
ITADatasourceRealization::ITADatasourceRealization(unsigned int uiChannels,
double dSamplerate,
unsigned int uiBlocklength,
unsigned int uiCapacity)
{ {
assert( dSamplerate > 0 ); assert( dSamplerate > 0 );
m_dSampleRate = dSamplerate; m_dSampleRate = dSamplerate;
m_oStreamProps.dSamplerate = dSamplerate; m_oStreamProps.dSamplerate = dSamplerate;
Init(uiChannels, uiBlocklength, uiCapacity); Init( uiChannels, uiBlocklength, uiCapacity );
} }
void ITADatasourceRealization::Init(unsigned int uiChannels, void ITADatasourceRealization::Init( unsigned int uiChannels, unsigned int uiBlocklength, unsigned int uiCapacity )
unsigned int uiBlocklength,
unsigned int uiCapacity)
{ {
assert( uiChannels > 0 ); assert( uiChannels > 0 );
assert( uiBlocklength > 0 ); assert( uiBlocklength > 0 );
...@@ -49,7 +33,7 @@ void ITADatasourceRealization::Init(unsigned int uiChannels, ...@@ -49,7 +33,7 @@ void ITADatasourceRealization::Init(unsigned int uiChannels,
m_oStreamProps.uiChannels = m_uiChannels; m_oStreamProps.uiChannels = m_uiChannels;
m_oStreamProps.uiBlocklength = m_uiBlocklength; m_oStreamProps.uiBlocklength = m_uiBlocklength;
m_uiBufferSize = uiBlocklength * (uiCapacity+1); m_uiBufferSize = uiBlocklength * ( uiCapacity + 1 );
m_pEventHandler = NULL; m_pEventHandler = NULL;
...@@ -57,31 +41,33 @@ void ITADatasourceRealization::Init(unsigned int uiChannels, ...@@ -57,31 +41,33 @@ void ITADatasourceRealization::Init(unsigned int uiChannels,
Organisation des Puffers: Damit die Blcke der einzelnen Kanle Organisation des Puffers: Damit die Blcke der einzelnen Kanle
im Speicher ortlich nher liegen ist das Array wiefolgt indiziert: im Speicher ortlich nher liegen ist das Array wiefolgt indiziert:
[1. Block Kanal 1], ..., [1. Block Kanal k], [2. Block Kanal 1], ... [1. Block Kanal 1], ..., [1. Block Kanal k], [2. Block Kanal 1], ...
*/ */
// Puffer erzeugen und mit Nullen initialiseren // Puffer erzeugen und mit Nullen initialiseren
// TODO: Fehlerbehandlung beim Speicherallozieren // TODO: Fehlerbehandlung beim Speicherallozieren
/* Bugfix zu Bug #001: /* Bugfix zu Bug #001:
Hier wurde der Puffer einfach um 1024 Felder verlngert. Hier wurde der Puffer einfach um 1024 Felder verlngert.
Damit Funktioniert Wuschels ASIO4ALL jetzt. Ungeklrt aber Damit Funktioniert Wuschels ASIO4ALL jetzt. Ungeklrt aber
warum der Fehler auftrat? warum der Fehler auftrat?
2005-2-14 2005-2-14
*/ */
m_pfBuffer = fm_falloc(m_uiBufferSize * m_uiChannels + /* >>> */ 1024 /* <<< */, false); m_pfBuffer = fm_falloc( m_uiBufferSize * m_uiChannels + /* >>> */ 1024 /* <<< */, false );
Reset(); Reset();
} }
ITADatasourceRealization::~ITADatasourceRealization() { ITADatasourceRealization::~ITADatasourceRealization()
fm_free(m_pfBuffer); {
fm_free( m_pfBuffer );
} }
void ITADatasourceRealization::Reset() { void ITADatasourceRealization::Reset()
{
m_uiReadCursor = 0; m_uiReadCursor = 0;
m_uiWriteCursor = 0; m_uiWriteCursor = 0;
...@@ -93,22 +79,26 @@ void ITADatasourceRealization::Reset() { ...@@ -93,22 +79,26 @@ void ITADatasourceRealization::Reset() {
m_iGBPEntrances = 0; m_iGBPEntrances = 0;
m_bGBPFirst = true; m_bGBPFirst = true;
fm_zero(m_pfBuffer, m_uiBufferSize * m_uiChannels + /* >>> */ 1024 /* <<< */); fm_zero( m_pfBuffer, m_uiBufferSize * m_uiChannels + /* >>> */ 1024 /* <<< */ );
} }
bool ITADatasourceRealization::HasStreamErrors() const { bool ITADatasourceRealization::HasStreamErrors() const
return (m_iBufferUnderflows > 0) || (m_iBufferOverflows > 0) || (m_iGBPReentrances > 0); {
return ( m_iBufferUnderflows > 0 ) || ( m_iBufferOverflows > 0 ) || ( m_iGBPReentrances > 0 );
} }
ITADatasourceRealizationEventHandler* ITADatasourceRealization::GetStreamEventHandler() const { ITADatasourceRealizationEventHandler* ITADatasourceRealization::GetStreamEventHandler() const
{
return m_pEventHandler; return m_pEventHandler;
} }
void ITADatasourceRealization::SetStreamEventHandler(ITADatasourceRealizationEventHandler* pHandler) { void ITADatasourceRealization::SetStreamEventHandler( ITADatasourceRealizationEventHandler* pHandler )
{
m_pEventHandler = pHandler; m_pEventHandler = pHandler;
} }
const float* ITADatasourceRealization::GetBlockPointer(unsigned int uiChannel, const ITAStreamInfo* pStreamInfo) { const float* ITADatasourceRealization::GetBlockPointer( unsigned int uiChannel, const ITAStreamInfo* pStreamInfo )
{
assert( uiChannel < m_uiChannels ); assert( uiChannel < m_uiChannels );
/* /*
...@@ -117,7 +107,8 @@ const float* ITADatasourceRealization::GetBlockPointer(unsigned int uiChannel, c ...@@ -117,7 +107,8 @@ const float* ITADatasourceRealization::GetBlockPointer(unsigned int uiChannel, c
* *
* WICHTIG: Dies sollte nicht passieren. Fehler beim anwendenden Programmierer! * WICHTIG: Dies sollte nicht passieren. Fehler beim anwendenden Programmierer!
*/ */
if (++m_iGBPEntrances > 1) { if( ++m_iGBPEntrances > 1 )
{
--m_iGBPEntrances; --m_iGBPEntrances;
++m_iGBPReentrances; ++m_iGBPReentrances;
return NULL; return NULL;
...@@ -125,12 +116,16 @@ const float* ITADatasourceRealization::GetBlockPointer(unsigned int uiChannel, c ...@@ -125,12 +116,16 @@ const float* ITADatasourceRealization::GetBlockPointer(unsigned int uiChannel, c
// Hook/Handler aufrufen // Hook/Handler aufrufen
PreGetBlockPointer(); PreGetBlockPointer();
if (m_pEventHandler) m_pEventHandler->HandlePreGetBlockPointer(this, uiChannel); if( m_pEventHandler )
m_pEventHandler->HandlePreGetBlockPointer( this, uiChannel );
if (m_bGBPFirst) { if( m_bGBPFirst )
{
// Erster Eintritt in GBP seit letztem IBP => Daten produzieren // Erster Eintritt in GBP seit letztem IBP => Daten produzieren
ProcessStream(pStreamInfo); ProcessStream( pStreamInfo );
if (m_pEventHandler) m_pEventHandler->HandleProcessStream(this, pStreamInfo);
if( m_pEventHandler )
m_pEventHandler->HandleProcessStream( this, pStreamInfo );
m_bGBPFirst = false; m_bGBPFirst = false;
} }
...@@ -145,45 +140,51 @@ const float* ITADatasourceRealization::GetBlockPointer(unsigned int uiChannel, c ...@@ -145,45 +140,51 @@ const float* ITADatasourceRealization::GetBlockPointer(unsigned int uiChannel, c
*/ */
unsigned int uiLocalReadCursor = m_uiReadCursor; unsigned int uiLocalReadCursor = m_uiReadCursor;
if (uiLocalReadCursor == m_uiWriteCursor) { if( uiLocalReadCursor == m_uiWriteCursor )
{
++m_iBufferUnderflows; ++m_iBufferUnderflows;
--m_iGBPEntrances; --m_iGBPEntrances;
return NULL; return NULL;
} }
--m_iGBPEntrances; --m_iGBPEntrances;
return m_pfBuffer + (uiChannel * m_uiBufferSize) + uiLocalReadCursor; return m_pfBuffer + ( uiChannel * m_uiBufferSize ) + uiLocalReadCursor;
} }
void ITADatasourceRealization::IncrementBlockPointer() { void ITADatasourceRealization::IncrementBlockPointer()
{
unsigned int uiLocalReadCursor = m_uiReadCursor; unsigned int uiLocalReadCursor = m_uiReadCursor;
if (uiLocalReadCursor == m_uiWriteCursor) if( uiLocalReadCursor == m_uiWriteCursor )
// Keine Daten im Ausgabepuffer? Kein Inkrement mglich! (Fehlerfall) // Keine Daten im Ausgabepuffer? Kein Inkrement mglich! (Fehlerfall)
++m_iBufferUnderflows; ++m_iBufferUnderflows;
else else
// Lesezeiger inkrementieren // Lesezeiger inkrementieren
m_uiReadCursor = (uiLocalReadCursor + m_uiBlocklength) % m_uiBufferSize; m_uiReadCursor = ( uiLocalReadCursor + m_uiBlocklength ) % m_uiBufferSize;
m_bGBPFirst = true; m_bGBPFirst = true;
PostIncrementBlockPointer(); PostIncrementBlockPointer();
if (m_pEventHandler) m_pEventHandler->HandlePostIncrementBlockPointer(this);
if( m_pEventHandler )
m_pEventHandler->HandlePostIncrementBlockPointer( this );
} }
float* ITADatasourceRealization::GetWritePointer(unsigned int uiChannel) { float* ITADatasourceRealization::GetWritePointer( unsigned int uiChannel )
{
assert( uiChannel < m_uiChannels ); assert( uiChannel < m_uiChannels );
return m_pfBuffer + (uiChannel * m_uiBufferSize) + m_uiWriteCursor; return m_pfBuffer + ( uiChannel * m_uiBufferSize ) + m_uiWriteCursor;
} }
void ITADatasourceRealization::IncrementWritePointer() { void ITADatasourceRealization::IncrementWritePointer()
{
// Lokaler Schreibcursor // Lokaler Schreibcursor
unsigned int uiLocalWriteCursor = m_uiWriteCursor; unsigned int uiLocalWriteCursor = m_uiWriteCursor;
unsigned int uiNewWriteCursor = (uiLocalWriteCursor + m_uiBlocklength) % m_uiBufferSize; unsigned int uiNewWriteCursor = ( uiLocalWriteCursor + m_uiBlocklength ) % m_uiBufferSize;
// Pufferberlauf // Pufferberlauf
if (uiNewWriteCursor == m_uiReadCursor) { if( uiNewWriteCursor == m_uiReadCursor )
{
++m_iBufferOverflows; ++m_iBufferOverflows;
return; return;
} }
...@@ -192,6 +193,6 @@ void ITADatasourceRealization::IncrementWritePointer() { ...@@ -192,6 +193,6 @@ void ITADatasourceRealization::IncrementWritePointer() {
m_uiWriteCursor = uiNewWriteCursor; m_uiWriteCursor = uiNewWriteCursor;
} }
void ITADatasourceRealizationEventHandler::HandlePreGetBlockPointer(ITADatasourceRealization* pSender, unsigned int uiChannel) {} void ITADatasourceRealizationEventHandler::HandlePreGetBlockPointer( ITADatasourceRealization*, unsigned int ) {}
void ITADatasourceRealizationEventHandler::HandlePostIncrementBlockPointer(ITADatasourceRealization* pSender) {} void ITADatasourceRealizationEventHandler::HandlePostIncrementBlockPointer( ITADatasourceRealization* ) {}
void ITADatasourceRealizationEventHandler::HandleProcessStream(ITADatasourceRealization* pSender, const ITAStreamInfo* pStreamInfo) {} void ITADatasourceRealizationEventHandler::HandleProcessStream( ITADatasourceRealization*, const ITAStreamInfo* ) {}
...@@ -8,6 +8,8 @@ ...@@ -8,6 +8,8 @@
#include <ITAStreamInfo.h> #include <ITAStreamInfo.h>
#include <ITAAudiofileWriter.h> #include <ITAAudiofileWriter.h>
#include <ITAException.h> #include <ITAException.h>
#include <ITAClock.h>
#include <cmath> #include <cmath>
#include <string> #include <string>
#include <vector> #include <vector>
...@@ -90,7 +92,8 @@ void WriteFromDatasourceToBuffer(ITADatasource* pSource, ...@@ -90,7 +92,8 @@ void WriteFromDatasourceToBuffer(ITADatasource* pSource,
n += uiBlocklength; n += uiBlocklength;
siState.nSamples += uiBlocklength; siState.nSamples += uiBlocklength;
siState.dTimecode = (double) (siState.nSamples) / dSamplerate; siState.dStreamTimeCode = (double) (siState.nSamples) / dSamplerate;
siState.dSysTimeCode = ITAClock::getDefaultClock()->getTime();
if (bDisplayProgress) if (bDisplayProgress)
{ {
...@@ -193,7 +196,8 @@ void WriteFromDatasourceToFile(ITADatasource* pSource, ...@@ -193,7 +196,8 @@ void WriteFromDatasourceToFile(ITADatasource* pSource,
pSource->IncrementBlockPointer(); pSource->IncrementBlockPointer();
siState.nSamples += uiBlocklength; siState.nSamples += uiBlocklength;
siState.dTimecode = (double) (siState.nSamples) / dSamplerate; siState.dStreamTimeCode = (double) (siState.nSamples) / dSamplerate;
siState.dSysTimeCode = ITAClock::getDefaultClock()->getTime();
// Daten schreiben // Daten schreiben
writer->write((std::min)(uiBlocklength, (uiNumberOfSamples - n)), vpfData); writer->write((std::min)(uiBlocklength, (uiNumberOfSamples - n)), vpfData);
......
...@@ -4,13 +4,16 @@ ...@@ -4,13 +4,16 @@
#include <ITADataSource.h> #include <ITADataSource.h>
#include <ITAAudiofileWriter.h> #include <ITAAudiofileWriter.h>
#include <ITANumericUtils.h> #include <ITANumericUtils.h>
#include <ITAClock.h>
ITAFileDatasink::ITAFileDatasink( std::string sFilename, ITADatasource* pdsSource, ITAQuantization eQuantization ) ITAFileDatasink::ITAFileDatasink( std::string sFilename, ITADatasource* pdsSource, ITAQuantization eQuantization )
: m_pfSilence( NULL ) { : m_pfSilence( NULL )
{
m_pdsSource = pdsSource; m_pdsSource = pdsSource;
m_pFileWriter = NULL; m_pFileWriter = NULL;
if( pdsSource ) { if( pdsSource )
{
m_vpfData.resize( pdsSource->GetNumberOfChannels() ); m_vpfData.resize( pdsSource->GetNumberOfChannels() );
ITAAudiofileProperties props; ITAAudiofileProperties props;
...@@ -24,19 +27,23 @@ ITAFileDatasink::ITAFileDatasink( std::string sFilename, ITADatasource* pdsSourc ...@@ -24,19 +27,23 @@ ITAFileDatasink::ITAFileDatasink( std::string sFilename, ITADatasource* pdsSourc
} }
} }
ITAFileDatasink::~ITAFileDatasink() { ITAFileDatasink::~ITAFileDatasink()
{
delete m_pFileWriter; delete m_pFileWriter;
fm_free( m_pfSilence ); fm_free( m_pfSilence );
} }
void ITAFileDatasink::Transfer( unsigned int uiSamples ) { void ITAFileDatasink::Transfer( unsigned int uiSamples )
if( m_pdsSource ) { {
if( m_pdsSource )
{
// Anzahl der zu transferrierenden Blcke bestimmen // Anzahl der zu transferrierenden Blcke bestimmen
unsigned int b = m_pdsSource->GetBlocklength(); unsigned int b = m_pdsSource->GetBlocklength();
unsigned int n = uprdivu( uiSamples, b ); unsigned int n = uprdivu( uiSamples, b );
for( unsigned int i = 0; i < n; i++ ) { for( unsigned int i = 0; i < n; i++ ) {
for( unsigned int j = 0; j < m_pdsSource->GetNumberOfChannels(); j++ ) { for( unsigned int j = 0; j < m_pdsSource->GetNumberOfChannels(); j++ )
{
const float* pfSrc = m_pdsSource->GetBlockPointer( j, &m_siState ); const float* pfSrc = m_pdsSource->GetBlockPointer( j, &m_siState );
if( pfSrc ) if( pfSrc )
m_vpfData[ j ] = ( float* ) pfSrc; m_vpfData[ j ] = ( float* ) pfSrc;
...@@ -45,7 +52,8 @@ void ITAFileDatasink::Transfer( unsigned int uiSamples ) { ...@@ -45,7 +52,8 @@ void ITAFileDatasink::Transfer( unsigned int uiSamples ) {
} }
m_pdsSource->IncrementBlockPointer(); m_pdsSource->IncrementBlockPointer();
m_siState.nSamples += b; m_siState.nSamples += b;
m_siState.dTimecode = ( double ) ( m_siState.nSamples ) / m_pdsSource->GetSampleRate(); m_siState.dStreamTimeCode = ( double ) ( m_siState.nSamples ) / m_pdsSource->GetSampleRate();
m_siState.dSysTimeCode = ITAClock::getDefaultClock()->getTime();
m_pFileWriter->write( b, m_vpfData ); m_pFileWriter->write( b, m_vpfData );
} }
......
#include <ITANetAudioClient.h> #include "ITANetAudioClient.h"
#include <ITANetAudioMessage.h>
#include <ITANetAudioProtocol.h>
#include <ITANetAudioStream.h> #include <ITANetAudioStream.h>
#include <ITAException.h>
#include <VistaInterProcComm/Connections/VistaConnectionIP.h> #include <VistaInterProcComm/Connections/VistaConnectionIP.h>
...@@ -16,13 +15,15 @@ CITANetAudioClient::~CITANetAudioClient() ...@@ -16,13 +15,15 @@ CITANetAudioClient::~CITANetAudioClient()
delete m_pConnection; delete m_pConnection;
} }
bool CITANetAudioClient::Connect( const std::string& sAddress, int iPort ) bool CITANetAudioClient::Connect( const std::string& sAddress, const int iPort, const bool bUseUDP /* = false */ )
{ {
if( GetIsConnected() ) if( GetIsConnected() )
ITA_EXCEPT1( MODAL_EXCEPTION, "This net stream is already connected" ); ITA_EXCEPT1( MODAL_EXCEPTION, "This net stream is already connected" );
// Attempt to connect and check parameters // Attempt to connect and check parameters
m_pConnection = new VistaConnectionIP( VistaConnectionIP::CT_TCP, sAddress, iPort ); const VistaConnectionIP::VistaProtocol iCTProtocol = bUseUDP ? VistaConnectionIP::CT_UDP : VistaConnectionIP::CT_TCP;
m_pConnection = new VistaConnectionIP( iCTProtocol, sAddress, iPort );
if( !GetIsConnected() ) if( !GetIsConnected() )
{ {
delete m_pConnection; delete m_pConnection;
...@@ -46,5 +47,8 @@ void CITANetAudioClient::Disconnect() ...@@ -46,5 +47,8 @@ void CITANetAudioClient::Disconnect()
bool CITANetAudioClient::GetIsConnected() const bool CITANetAudioClient::GetIsConnected() const
{ {
return m_pConnection ? true : false; if( m_pConnection )
return m_pConnection->GetIsOpen();
else
return false;
} }
...@@ -28,7 +28,7 @@ class VistaConnectionIP; ...@@ -28,7 +28,7 @@ class VistaConnectionIP;
//! A network audio client that connects to a network audio server //! A network audio client that connects to a network audio server
/** /**
* Use CITANetAudioStreamingClient to start an audio stream with the connection of this client. * Use CITANetAudioStreamingClient to start an audio stream with the connection of this client.
* This class is basically a helper around Vista TCP/IP network functionality. * This class is basically a helper around Vista TCP/IP or UDP network functionality.
* *
*/ */
class CITANetAudioClient class CITANetAudioClient
...@@ -42,7 +42,7 @@ public: ...@@ -42,7 +42,7 @@ public:
CITANetAudioClient(); CITANetAudioClient();
~CITANetAudioClient(); ~CITANetAudioClient();
bool Connect( const std::string& sAddress, int iPort ); bool Connect( const std::string& sAddress, const int iPort, const bool bUseUDP );
void Disconnect(); void Disconnect();
bool GetIsConnected() const; bool GetIsConnected() const;
......
#include <ITANetAudioMessage.h> #include "ITANetAudioMessage.h"
#include <ITAClock.h>
#include <ITADataLog.h>
#include <ITAStringUtils.h> #include <ITAStringUtils.h>
#include <VistaInterProcComm/Connections/VistaConnectionIP.h> #include <VistaInterProcComm/Connections/VistaConnectionIP.h>
#include <VistaBase/VistaExceptionBase.h> #include <VistaBase/VistaExceptionBase.h>
#include <VistaBase/VistaStreamUtils.h> #include <VistaBase/VistaStreamUtils.h>
#include <cstring>
#include <algorithm> #include <algorithm>
#include <cstring>
#include <cassert> #include <cassert>
#include <iostream> #include <iostream>
#include <iomanip> #include <iomanip>
static int S_nMessageIds = 0; static int S_nMessageIds = 0;
CITANetAudioMessage::CITANetAudioMessage( VistaConnectionIP* pConnection ) struct ITANetAudioMessageLog : public ITALogDataBase
{
inline static std::ostream& outputDesc( std::ostream& os )
{
os << "BlockId";
os << "\t" << "WorldTimeStamp";
os << "\t" << "MessageType";
os << "\t" << "Action";
os << "\t" << "InternalProcessingTime";
os << "\t" << "PayloadSize";
os << std::endl;
return os;
};
inline std::ostream& outputData( std::ostream& os ) const
{
os << uiBlockId;
os << "\t" << std::setprecision( 12 ) << dWorldTimeStamp;
os << "\t" << sMessageType;
os << "\t" << sAction;
os << "\t" << std::setprecision( 12 ) << dInternalProcessingTime;
os << "\t" << nMessagePayloadSize;
os << std::endl;
return os;
};
unsigned int uiBlockId; //!< Block identifier (audio streaming)
double dWorldTimeStamp; //!< Time stamp at beginning of logged message process
std::string sMessageType; //!< Protocol message type
std::string sAction; //!< Triggered action
double dInternalProcessingTime; //!< Processing within message class
VistaType::sint32 nMessagePayloadSize; //!< Data
};
class ITABufferedDataLoggerImplProtocol : public ITABufferedDataLogger < ITANetAudioMessageLog > {};
CITANetAudioMessage::CITANetAudioMessage( VistaSerializingToolset::ByteOrderSwapBehavior bSwapBuffers )
: m_vecIncomingBuffer( 2048 ) : m_vecIncomingBuffer( 2048 )
, m_oOutgoing( 2048 ) , m_oOutgoing( 2048 )
, m_pConnection( pConnection ) , m_pConnection( NULL )
, m_nTimeoutMilliseconds( 1 ) , m_iBytesReceivedTotal( 0 )
, m_sMessageLoggerBaseName( "ITANetAudioMessage" )
, m_bDebuggingEnabled( false )
{ {
m_oOutgoing.SetByteorderSwapFlag( pConnection->GetByteorderSwapFlag() ); m_pMessageLogger = new ITABufferedDataLoggerImplProtocol();
m_oIncoming.SetByteorderSwapFlag( pConnection->GetByteorderSwapFlag() ); m_pMessageLogger->setOutputFile( m_sMessageLoggerBaseName + ".log" );
m_nMessageId = 0;
m_oOutgoing.SetByteorderSwapFlag( bSwapBuffers );
m_oIncoming.SetByteorderSwapFlag( bSwapBuffers );
ResetMessage(); ResetMessage();
} }
void CITANetAudioMessage::ResetMessage() void CITANetAudioMessage::ResetMessage()
{ {
const double dInTime = ITAClock::getDefaultClock()->getTime();
ITANetAudioMessageLog oLog;
oLog.uiBlockId = m_nMessageId;
oLog.sMessageType = "RESET_MESSAGE";
oLog.nMessagePayloadSize = 0;
oLog.dWorldTimeStamp = dInTime;
oLog.sAction = "reset_message";
if( m_oIncoming.GetTailSize() > 0 ) if( m_oIncoming.GetTailSize() > 0 )
{
vstr::err() << "CITANetAudioMessage::ResetMessage() called before message was fully processed!" << std::endl; vstr::err() << "CITANetAudioMessage::ResetMessage() called before message was fully processed!" << std::endl;
oLog.sAction = "reset_failed";
}
// wait till sending is complete -> this prevents us // wait till sending is complete -> this prevents us
// from deleting the buffer while it is still being read // from deleting the buffer while it is still being read
// by the connection // by the connection
if( m_pConnection ) if( m_pConnection != NULL )
m_pConnection->WaitForSendFinish(); m_pConnection->WaitForSendFinish(); // can be time-costly
m_nMessageId = S_nMessageIds++; m_nMessageId = S_nMessageIds++;
...@@ -44,20 +104,32 @@ void CITANetAudioMessage::ResetMessage() ...@@ -44,20 +104,32 @@ void CITANetAudioMessage::ResetMessage()
m_oIncoming.SetBuffer( NULL, 0 ); m_oIncoming.SetBuffer( NULL, 0 );
m_nMessageType = CITANetAudioProtocol::NP_INVALID; m_nMessageType = -1;
oLog.dInternalProcessingTime = ITAClock::getDefaultClock()->getTime() - dInTime;
m_pMessageLogger->log( oLog );
#if NET_AUDIO_SHOW_TRAFFIC #if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "CITANetAudioMessage [Preparing] (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl; vstr::out() << "CITANetAudioMessage [Preparing] (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
#endif #endif
} }
void CITANetAudioMessage::SetConnection( VistaConnectionIP* pConn )
{
m_pConnection = pConn;
}
void CITANetAudioMessage::WriteMessage() void CITANetAudioMessage::WriteMessage()
{ {
const double dInTime = ITAClock::getDefaultClock()->getTime();
ITANetAudioMessageLog oLog;
oLog.dWorldTimeStamp = dInTime;
VistaType::byte* pBuffer = ( VistaType::byte* ) m_oOutgoing.GetBuffer(); VistaType::byte* pBuffer = ( VistaType::byte* ) m_oOutgoing.GetBuffer();
VistaType::sint32 iSwapDummy; VistaType::sint32 iSwapDummy;
// rewrite size dummy // rewrite size dummy
iSwapDummy = m_oOutgoing.GetBufferSize() - sizeof( VistaType::sint32 ); iSwapDummy = m_oOutgoing.GetBufferSize() - sizeof( VistaType::sint32 );
oLog.nMessagePayloadSize = iSwapDummy;
if( m_oOutgoing.GetByteorderSwapFlag() ) if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy ); VistaSerializingToolset::Swap4( &iSwapDummy );
std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) ); std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
...@@ -66,6 +138,7 @@ void CITANetAudioMessage::WriteMessage() ...@@ -66,6 +138,7 @@ void CITANetAudioMessage::WriteMessage()
// rewrite type dummy // rewrite type dummy
iSwapDummy = m_nMessageType; iSwapDummy = m_nMessageType;
oLog.sMessageType = CITANetAudioProtocol::GetNPMessageID( m_nMessageType );
if( m_oOutgoing.GetByteorderSwapFlag() ) if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy ); VistaSerializingToolset::Swap4( &iSwapDummy );
std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) ); std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
...@@ -74,9 +147,11 @@ void CITANetAudioMessage::WriteMessage() ...@@ -74,9 +147,11 @@ void CITANetAudioMessage::WriteMessage()
// rewrite messageid dummy // rewrite messageid dummy
iSwapDummy = m_nMessageId; iSwapDummy = m_nMessageId;
oLog.uiBlockId = m_nMessageId;
if( m_oOutgoing.GetByteorderSwapFlag() ) if( m_oOutgoing.GetByteorderSwapFlag() )
VistaSerializingToolset::Swap4( &iSwapDummy ); VistaSerializingToolset::Swap4( &iSwapDummy );
std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) ); std::memcpy( pBuffer, &iSwapDummy, sizeof( VistaType::sint32 ) );
oLog.sAction = "write_message";
#if NET_AUDIO_SHOW_TRAFFIC #if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "CITANetAudioMessage [ Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl; vstr::out() << "CITANetAudioMessage [ Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
...@@ -86,75 +161,106 @@ void CITANetAudioMessage::WriteMessage() ...@@ -86,75 +161,106 @@ void CITANetAudioMessage::WriteMessage()
{ {
// It appears safe to send even very big data payload, so we will send at once // It appears safe to send even very big data payload, so we will send at once
int iRawBufferSize = m_oOutgoing.GetBufferSize(); int iRawBufferSize = m_oOutgoing.GetBufferSize();
assert( iRawBufferSize > 4 );
int nRet = m_pConnection->Send( m_oOutgoing.GetBuffer(), iRawBufferSize ); int nRet = m_pConnection->Send( m_oOutgoing.GetBuffer(), iRawBufferSize );
#if NET_AUDIO_SHOW_TRAFFIC #if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "CITANetAudioMessage [ Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ") RAW BUFFER DONE" << std::endl; vstr::out() << "CITANetAudioMessage [ Writing] " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ") RAW BUFFER DONE" << std::endl;
#endif #endif
m_pConnection->WaitForSendFinish(); // Block processing until data is successfully transmitted
unsigned long nData = m_pConnection->WaitForSendFinish( 0 );
if( nRet != m_oOutgoing.GetBufferSize() ) if( nRet != m_oOutgoing.GetBufferSize() )
VISTA_THROW( "ITANetAudioMessage: could not send all data from output buffer via network connection", 255 ); VISTA_THROW( "ITANetAudioMessage: could not send all data from output buffer via network connection", 255 );
} }
catch( VistaExceptionBase& ex ) catch (VistaExceptionBase& ex)
{ {
ITA_EXCEPT1( NETWORK_ERROR, ex.GetExceptionText() ); ITA_EXCEPT1( NETWORK_ERROR, ex.GetExceptionText() );
} }
oLog.dInternalProcessingTime = ITAClock::getDefaultClock()->getTime() - dInTime;
m_pMessageLogger->log( oLog );
return;
} }
bool CITANetAudioMessage::TryReadMessage() bool CITANetAudioMessage::ReadMessage( const int iTimeoutMilliseconds )
{ {
ITANetAudioMessageLog oLog;
const double dInTime = ITAClock::getDefaultClock()->getTime();
oLog.dWorldTimeStamp = dInTime;
#if NET_AUDIO_SHOW_TRAFFIC #if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "CITANetAudioMessage [ TryRead ] Waiting for incoming data for " << m_nTimeoutMilliseconds << std::endl; vstr::out() << "CITANetAudioMessage [ Reading ] Waiting for incoming data" << std::endl;
#endif
long nIncomingBytes = m_pConnection->WaitForIncomingData( m_nTimeoutMilliseconds );
if( nIncomingBytes <= 0 )
{
#if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "CITANetAudioMessage [ TryRead ] nothing incoming" << std::endl;
#endif #endif
// WaitForIncomming Data int in ca ms
long nIncomingBytes = m_pConnection->WaitForIncomingData( iTimeoutMilliseconds );
if( nIncomingBytes == -1 )
return false; return false;
}
assert( nIncomingBytes >= 4 ); // we need at least the size of message
#if NET_AUDIO_SHOW_TRAFFIC #if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "CITANetAudioMessage [ Reading ] " << nIncomingBytes << " bytes incoming" << std::endl; vstr::out() << "CITANetAudioMessage [ Reading ] " << nIncomingBytes << " bytes incoming" << std::endl;
#endif #endif
VistaType::sint32 nMessagePayloadSize; VistaType::sint32 nMessagePayloadSize;
int nBytesRead = m_pConnection->ReadInt32( nMessagePayloadSize ); int nBytesRead = m_pConnection->ReadInt32( nMessagePayloadSize );
assert( nBytesRead == sizeof( VistaType::sint32 ) ); assert( nBytesRead == sizeof( VistaType::sint32 ) );
oLog.nMessagePayloadSize = nMessagePayloadSize;
#if NET_AUDIO_SHOW_TRAFFIC #if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "CITANetAudioMessage [ Reading ] Expecting " << nMessagePayloadSize << " bytes message payload" << std::endl; vstr::out() << "CITANetAudioMessage [ Reading ] Expecting " << nMessagePayloadSize << " bytes message payload" << std::endl;
#endif #endif
// we need at least the two protocol ints
//assert( nMessagePayloadSize >= 2 * sizeof( VistaType::sint32 ) ); if( nMessagePayloadSize <= 0 )
return false; // Try-read failed, returning!
// we need at least the two protocol ints
assert( nMessagePayloadSize >= 2 * sizeof( VistaType::sint32 ) );
if( nMessagePayloadSize > ( int ) m_vecIncomingBuffer.size() ) if( nMessagePayloadSize > ( int ) m_vecIncomingBuffer.size() )
m_vecIncomingBuffer.resize( nMessagePayloadSize ); m_vecIncomingBuffer.resize( nMessagePayloadSize );
// Receive all incoming data (potentially splitted)
int iBytesReceivedTotal = 0; // Receive all incoming data (potentially splitted!!)
while( nMessagePayloadSize < iBytesReceivedTotal )
m_iBytesReceivedTotal = 0;
while( ( unsigned long ) nMessagePayloadSize != m_iBytesReceivedTotal )
{ {
// We force a blocking-wait for the rest (or the data to be fetched until message payload is copied)
int iIncommingBytes = m_pConnection->WaitForIncomingData( 0 ); int iIncommingBytes = m_pConnection->WaitForIncomingData( 0 );
int iBytesReceived = m_pConnection->Receive( &m_vecIncomingBuffer[ iBytesReceivedTotal ], iIncommingBytes ); assert( m_iBytesReceivedTotal < m_vecIncomingBuffer.size() );
iBytesReceivedTotal += iBytesReceived;
// Check if we are already receiving another message and only read until end-of-payload, then release this message-read
int iBytesReceived = -1;
const int nPendingPayloadBytes = nMessagePayloadSize - m_iBytesReceivedTotal;
if( iIncommingBytes > nPendingPayloadBytes )
iBytesReceived = m_pConnection->Receive( &m_vecIncomingBuffer[ m_iBytesReceivedTotal ], nPendingPayloadBytes );
else
iBytesReceived = m_pConnection->Receive( &m_vecIncomingBuffer[ m_iBytesReceivedTotal ], iIncommingBytes );
m_iBytesReceivedTotal += iBytesReceived;
#if NET_AUDIO_SHOW_TRAFFIC #if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "[ CITANetAudioMessage ] " << std::setw( 3 ) << std::floor( iBytesReceivedTotal / float( nMessagePayloadSize ) * 100.0f ) << "% transmitted" << std::endl; vstr::out() << "[ CITANetAudioMessage ] " << std::setw( 3 ) << std::floor( m_iBytesReceivedTotal / float( nMessagePayloadSize ) * 100.0f ) << "% transmitted" << std::endl;
#endif #endif
} }
assert( m_iBytesReceivedTotal == nMessagePayloadSize );
// Transfer data into members // Transfer data into members
m_oIncoming.SetBuffer( &m_vecIncomingBuffer[ 0 ], nMessagePayloadSize, false ); m_oIncoming.SetBuffer( &m_vecIncomingBuffer[ 0 ], nMessagePayloadSize, false );
m_nMessageType = ReadInt(); m_nMessageType = ReadInt();
m_nMessageId = ReadInt(); m_nMessageId = ReadInt();
oLog.sAction = "read_message";
oLog.sMessageType = CITANetAudioProtocol::GetNPMessageID( m_nMessageType );
oLog.uiBlockId = m_nMessageId;
oLog.dWorldTimeStamp = ITAClock::getDefaultClock()->getTime() - dInTime;
m_pMessageLogger->log( oLog );
#if NET_AUDIO_SHOW_TRAFFIC #if NET_AUDIO_SHOW_TRAFFIC
vstr::out() << "CITANetAudioMessage [ Reading ] Finished receiving " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl; vstr::out() << "CITANetAudioMessage [ Reading ] Finished receiving " << m_nMessageType << " (id=" << std::setw( 4 ) << m_nMessageId << ")" << std::endl;
#endif #endif
return true; return true;
} }
...@@ -165,7 +271,6 @@ int CITANetAudioMessage::GetMessageType() const ...@@ -165,7 +271,6 @@ int CITANetAudioMessage::GetMessageType() const
void CITANetAudioMessage::SetMessageType( int nType ) void CITANetAudioMessage::SetMessageType( int nType )
{ {
assert( m_nMessageType == CITANetAudioProtocol::NP_INVALID ); // should only be set once
m_nMessageType = nType; m_nMessageType = nType;
} }
...@@ -278,11 +383,15 @@ VistaConnectionIP* CITANetAudioMessage::GetConnection() const ...@@ -278,11 +383,15 @@ VistaConnectionIP* CITANetAudioMessage::GetConnection() const
return m_pConnection; return m_pConnection;
} }
void CITANetAudioMessage::ClearConnection() { void CITANetAudioMessage::ClearConnection()
{
m_pConnection = NULL; m_pConnection = NULL;
if( GetIsDebuggingEnabled() == false )
m_pMessageLogger->setOutputFile( "" ); // disable output
delete m_pMessageLogger;
} }
void CITANetAudioMessage::WriteIntVector( const std::vector<int> viData ) void CITANetAudioMessage::WriteIntVector( const std::vector< int > viData )
{ {
int iSize = ( int ) viData.size(); int iSize = ( int ) viData.size();
WriteInt( iSize ); WriteInt( iSize );
...@@ -307,6 +416,7 @@ CITANetAudioProtocol::StreamingParameters CITANetAudioMessage::ReadStreamingPara ...@@ -307,6 +416,7 @@ CITANetAudioProtocol::StreamingParameters CITANetAudioMessage::ReadStreamingPara
oParams.iChannels = ReadInt(); oParams.iChannels = ReadInt();