Commit e6dabdff authored by Anton Runov's avatar Anton Runov
Browse files

Add optional IO statistics reporting

parent f899838a
......@@ -171,6 +171,15 @@ public:
virtual void setSocket(int &socket) = 0;
#endif
struct PktStat {
uint32_t tot;
uint32_t lost;
uint32_t outOfOrder;
uint32_t revived;
uint32_t statCount;
};
virtual bool getStats(PktStat*) {return false;}
signals:
void signalError(const char* error_message);
......
......@@ -52,6 +52,8 @@
#include <QHostInfo>
#include <QThread>
#include <QTcpSocket>
#include <QTimer>
#include <QDateTime>
using std::cout; using std::endl;
......@@ -109,7 +111,8 @@ JackTrip::JackTrip(jacktripModeT JacktripMode,
mReceivedConnection(false),
mTcpConnectionError(false),
mStopped(false),
mConnectDefaultAudioPorts(true)
mConnectDefaultAudioPorts(true),
mIOStatLogStream(std::cout.rdbuf())
{
createHeader(mPacketHeaderType);
}
......@@ -437,6 +440,54 @@ void JackTrip::startProcess(
if (mConnectDefaultAudioPorts) { mAudioInterface->connectDefaultPorts(); }
}
//*******************************************************************************
void JackTrip::startIOStatTimer(int timeout_sec, const std::ostream& log_stream)
{
mIOStatLogStream.rdbuf(log_stream.rdbuf());
QTimer *timer = new QTimer(this);
connect(timer, SIGNAL(timeout()), this, SLOT(onStatTimer()));
timer->start(timeout_sec*1000);
}
//*******************************************************************************
void JackTrip::onStatTimer()
{
DataProtocol::PktStat pkt_stat;
if (!mDataProtocolReceiver->getStats(&pkt_stat)) {
return;
}
bool reset = (0 == pkt_stat.statCount);
RingBuffer::IOStat recv_io_stat;
if (!mReceiveRingBuffer->getStats(&recv_io_stat, reset)) {
return;
}
RingBuffer::IOStat send_io_stat;
if (!mSendRingBuffer->getStats(&send_io_stat, reset)) {
return;
}
QString now = QDateTime::currentDateTime().toString(Qt::ISODate);
int32_t skew = recv_io_stat.underruns - recv_io_stat.overflows
- pkt_stat.lost + pkt_stat.revived;
static QMutex mutex;
QMutexLocker locker(&mutex);
mIOStatLogStream << now.toLocal8Bit().constData()
<< " " << getPeerAddress().toLocal8Bit().constData()
<< " send: "
<< send_io_stat.underruns
<< "/" << send_io_stat.overflows
<< " recv: "
<< recv_io_stat.underruns
<< "/" << recv_io_stat.overflows
<< " prot: "
<< pkt_stat.lost
<< "/" << pkt_stat.outOfOrder
<< "/" << pkt_stat.revived
<< " tot: "
<< pkt_stat.tot
<< " skew: " << skew
<< endl;
}
//*******************************************************************************
void JackTrip::stop()
......
......@@ -392,6 +392,8 @@ public:
void printTextTest() {std::cout << "=== JackTrip PRINT ===" << std::endl;}
void printTextTest2() {std::cout << "=== JackTrip PRINT2 ===" << std::endl;}
void startIOStatTimer(int timeout_sec, const std::ostream& log_stream);
public slots:
/// \brief Slot to stop all the processes and threads
virtual void slotStopProcesses()
......@@ -418,6 +420,7 @@ public slots:
{ std::cout << "=== TESTING ===" << std::endl; }
void slotReceivedConnectionFromPeer()
{ mReceivedConnection = true; }
void onStatTimer();
signals:
......@@ -509,6 +512,7 @@ private:
volatile bool mStopped;
bool mConnectDefaultAudioPorts; ///< Connect or not default audio ports
std::ostream mIOStatLogStream;
};
#endif
......@@ -47,6 +47,7 @@
#include "UdpMasterListener.h"
#include "NetKS.h"
#include "LoopBack.h"
#include "Settings.h"
#ifdef WAIR // wair
#include "dcblock2gain.dsp.h"
#endif // endwhere
......@@ -127,6 +128,7 @@ void JackTripWorker::run()
// Create and setup JackTrip Object
//JackTrip jacktrip(JackTrip::SERVER, JackTrip::UDP, mNumChans, 2);
if (gVerboseFlag) cout << "---> JackTripWorker: Creating jacktrip objects..." << endl;
Settings* settings = mUdpMasterListener->getSettings();
#ifdef WAIR // WAIR
// forces BufferQueueLength to 2
......@@ -220,6 +222,9 @@ void JackTripWorker::run()
mID
#endif // endwhere
);
if (0 != settings->getIOStatTimeout()) {
jacktrip.startIOStatTimer(settings->getIOStatTimeout(), settings->getIOStatStream());
}
// if (gVerboseFlag) cout << "---> JackTripWorker: start..." << endl;
// jacktrip.start(); // ########### JamTest Only #################
......
......@@ -86,6 +86,8 @@ RingBuffer::RingBuffer(int SlotSize, int NumSlots) :
mWritePosition = ( (NumSlots/2) * SlotSize ) % mTotalSize;
// Udpate Full Slots accordingly
mFullSlots = (NumSlots/2);
mUnderruns = 0;
mOverflows = 0;
}
......@@ -226,6 +228,7 @@ void RingBuffer::underrunReset()
//mFullSlots += mNumSlots/2;
// There's nothing new to read, so we clear the whole buffer (Set the entire buffer to 0)
std::memset(mRingBuffer, 0, mTotalSize);
++mUnderruns;
}
......@@ -237,6 +240,7 @@ void RingBuffer::overflowReset()
//mReadPosition = ( mWritePosition + ( (mNumSlots/2) * mSlotSize ) ) % mTotalSize;
mReadPosition = ( mReadPosition + ( (mNumSlots/2) * mSlotSize ) ) % mTotalSize;
mFullSlots -= mNumSlots/2;
mOverflows += mNumSlots/2 + 1;
}
......@@ -248,3 +252,15 @@ void RingBuffer::debugDump() const
cout << "mWritePosition = " << mWritePosition << endl;
cout << "mFullSlots = " << mFullSlots << endl;
}
//*******************************************************************************
bool RingBuffer::getStats(RingBuffer::IOStat* stat, bool reset)
{
if (reset) {
mUnderruns = 0;
mOverflows = 0;
}
stat->underruns = mUnderruns;
stat->overflows = mOverflows;
return true;
}
......@@ -100,6 +100,11 @@ public:
*/
void readSlotNonBlocking(int8_t* ptrToReadSlot);
struct IOStat {
uint32_t underruns;
uint32_t overflows;
};
virtual bool getStats(IOStat* stat, bool reset);
protected:
......@@ -139,6 +144,8 @@ private:
QMutex mMutex; ///< Mutex to protect read and write operations
QWaitCondition mBufferIsNotFull; ///< Buffer not full condition to monitor threads
QWaitCondition mBufferIsNotEmpty; ///< Buffer not empty condition to monitor threads
std::atomic<uint32_t> mUnderruns;
std::atomic<uint32_t> mOverflows;
};
#endif
......@@ -85,7 +85,8 @@ Settings::Settings() :
mChanfeDefaultID(0),
mChanfeDefaultBS(false),
mHubConnectionMode(JackTrip::SERVERTOCLIENT),
mConnectDefaultAudioPorts(true)
mConnectDefaultAudioPorts(true),
mIOStatTimeout(0)
{}
//*******************************************************************************
......@@ -140,6 +141,8 @@ void Settings::parseInput(int argc, char** argv)
{ "version", no_argument, NULL, 'v' }, // Version Number
{ "verbose", no_argument, NULL, 'V' }, // Verbose mode
{ "hubpatch", required_argument, NULL, 'p' }, // Set hubConnectionMode for auto patch in Jack
{ "iostat", required_argument, NULL, 'I' }, // Set IO stat timeout
{ "iostatlog", required_argument, NULL, 'G' }, // Set IO stat log file
{ "help", no_argument, NULL, 'h' }, // Print Help
{ NULL, 0, NULL, 0 }
};
......@@ -316,6 +319,25 @@ void Settings::parseInput(int argc, char** argv)
printUsage();
std::exit(1); }
break;
case 'I': // IO Stat timeout
//-------------------------------------------------------
mIOStatTimeout = atoi(optarg);
if (0 > mIOStatTimeout) {
std::cerr << "--iostat ERROR: negative timeout." << endl;
printUsage();
std::exit(1);
}
break;
case 'G': // IO Stat log file
//-------------------------------------------------------
mIOStatStream.open(optarg);
if (!mIOStatStream.is_open()) {
std::cerr << "--iostatlog FAILED to open " << optarg
<< " for writing." << endl;
printUsage();
std::exit(1);
}
break;
case 'h':
//-------------------------------------------------------
printUsage();
......@@ -394,6 +416,10 @@ void Settings::printUsage()
cout << " --bufsize # Set the buffer size, works on --rtaudio mode only (default: 128)" << endl;
cout << " --deviceid # The rtaudio device id --rtaudio mode only (default: 0)" << endl;
cout << endl;
cout << "ARGUMENTS TO DISPLAY IO STATISTICS:" << endl;
cout << " --iostat <time_in_secs> Turn on IO stat reporting with specified interval (in seconds)" << endl;
cout << " --iostatlog <log_file> Save stat log into a file (default: print in stdout)" << endl;
cout << endl;
cout << "HELP ARGUMENTS: " << endl;
cout << " -v, --version Prints Version Number" << endl;
cout << " -V, --verbose Verbose mode, prints debug messages" << endl;
......@@ -409,6 +435,7 @@ void Settings::startJackTrip()
/// \todo Change this, just here to test
if ( mJackTripServer ) {
UdpMasterListener* udpmaster = new UdpMasterListener;
udpmaster->setSettings(this);
#ifdef WAIR // WAIR
udpmaster->setWAIR(mWAIR);
#endif // endwhere
......@@ -573,6 +600,9 @@ void Settings::startJackTrip()
0 // for WAIR compatibility, ID in jack client name
#endif // endwhere
);
if (0 < getIOStatTimeout()) {
mJackTrip->startIOStatTimer(getIOStatTimeout(), getIOStatStream());
}
// if (gVerboseFlag) std::cout << "Settings:startJackTrip before mJackTrip->start" << std::endl;
// this is a noop
// mJackTrip->start();
......
......@@ -40,6 +40,7 @@
#define __SETTINGS_H__
#include <cstdlib>
#include <fstream>
#include "DataProtocol.h"
......@@ -69,6 +70,11 @@ public:
void printUsage();
bool getLoopBack() { return mLoopBack; }
int getIOStatTimeout() const {return mIOStatTimeout;}
const std::ostream& getIOStatStream() const
{
return mIOStatStream.is_open() ? (std::ostream&)mIOStatStream : std::cout;
}
public slots:
......@@ -113,6 +119,8 @@ private:
unsigned int mAudioBufferSize;
unsigned int mHubConnectionMode;
bool mConnectDefaultAudioPorts; ///< Connect or not jack audio ports
int mIOStatTimeout;
std::ofstream mIOStatStream;
};
#endif
......@@ -535,6 +535,11 @@ void UdpDataProtocol::run()
uint16_t current_seq_num = 0; // Store current sequence number
uint16_t last_seq_num = 0; // Store last package sequence number
uint16_t newer_seq_num = 0; // Store newer sequence number
mTotCount = 0;
mLostCount = 0;
mOutOfOrderCount = 0;
mRevivedCount = 0;
mStatCount = 0;
if (gVerboseFlag) std::cout << "step 8" << std::endl;
while ( !mStopped )
......@@ -655,6 +660,18 @@ void UdpDataProtocol::receivePacketRedundancy(QUdpSocket& UdpSocket,
mJackTrip->getPeerSequenceNumber(full_redundant_packet);
current_seq_num = newer_seq_num;
if (0 != last_seq_num) {
int16_t lost = newer_seq_num - last_seq_num - 1;
if (0 > lost) {
// Out of order packet, should be ignored
++mOutOfOrderCount;
return;
}
else if (0 != lost) {
mLostCount += lost;
}
mTotCount += 1 + lost;
}
//cout << current_seq_num << " ";
int redun_last_index = 0;
......@@ -670,6 +687,7 @@ void UdpDataProtocol::receivePacketRedundancy(QUdpSocket& UdpSocket,
mJackTrip->getPeerSequenceNumber( full_redundant_packet + (i*full_packet_size) );
//cout << current_seq_num << " ";
}
mRevivedCount += redun_last_index;
//cout << endl;
last_seq_num = newer_seq_num; // Save last read packet
......@@ -684,6 +702,22 @@ void UdpDataProtocol::receivePacketRedundancy(QUdpSocket& UdpSocket,
}
}
//*******************************************************************************
bool UdpDataProtocol::getStats(DataProtocol::PktStat* stat)
{
if (0 == mStatCount) {
mLostCount = 0;
mOutOfOrderCount = 0;
mRevivedCount = 0;
}
stat->tot = mTotCount;
stat->lost = mLostCount;
stat->outOfOrder = mOutOfOrderCount;
stat->revived = mRevivedCount;
stat->statCount = mStatCount++;
return true;
}
//*******************************************************************************
void UdpDataProtocol::sendPacketRedundancy(int8_t* full_redundant_packet,
int full_redundant_packet_size,
......
......@@ -142,6 +142,7 @@ public:
*/
virtual void run();
virtual bool getStats(PktStat* stat);
private slots:
void printUdpWaitedTooLong(int wait_msec);
......@@ -214,6 +215,12 @@ private:
unsigned int mUdpRedundancyFactor; ///< Factor of redundancy
static QMutex sUdpMutex; ///< Mutex to make thread safe the binding process
std::atomic<uint32_t> mTotCount;
std::atomic<uint32_t> mLostCount;
std::atomic<uint32_t> mOutOfOrderCount;
std::atomic<uint32_t> mRevivedCount;
uint32_t mStatCount;
};
#endif // __UDPDATAPROTOCOL_H__
......@@ -53,6 +53,7 @@
#include "jacktrip_types.h"
#include "jacktrip_globals.h"
class JackTripWorker; // forward declaration
class Settings;
typedef struct {
QString address;
......@@ -83,6 +84,9 @@ public:
void setConnectDefaultAudioPorts(bool connectDefaultAudioPorts) { m_connectDefaultAudioPorts = connectDefaultAudioPorts; }
void setSettings(Settings* s) {m_settings = s;}
Settings* getSettings() const {return m_settings;}
private slots:
void testReceive()
{ std::cout << "========= TEST RECEIVE SLOT ===========" << std::endl; }
......@@ -141,6 +145,7 @@ private:
int mBufferQueueLength;
bool m_connectDefaultAudioPorts;
Settings* m_settings;
#ifdef WAIR // wair
bool mWAIR;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment