Commit b19eb31b authored by jcacerec's avatar jcacerec
Browse files

Server working stable, all known bugs removed, all debug comments still in

parent 1e68da00
......@@ -53,6 +53,7 @@
#include <QThread>
#include <QHostAddress>
#include <QMutex>
#include <QMutexLocker>
class JackTrip; // forward declaration
......@@ -134,8 +135,10 @@ public:
/// \brief Stops the execution of the Thread
virtual void stop() {
QMutexLocker lock(&mMutex);
std::cout << "!!!!!!!!!!!!!!!!!!!!!!!!STOPPING DATA PROTOCOL" << std::endl;
mStopped = true;
std::cout << "mStopped in stop dataprotocol: " << mStopped << std::endl;
std::cout << "!!!!!!!!!!!!!!!!!!!!!!!!AFTER mStopped true" << std::endl;
}
......@@ -184,6 +187,7 @@ protected:
volatile bool mHasPeerAddress;
/// Boolean that indicates if a packet was received
volatile bool mHasPacketsToReceive;
QMutex mMutex;
private:
......
......@@ -227,6 +227,7 @@ int JackAudioInterface::stopProcess() const
{
cout << "---> BEFORE JackAudioInterface::stopProcess()" << endl;
QMutexLocker locker(&sJackMutex);
cout << "---> AFTER JackAudioInterface::stopProcess() MUTEX" << endl;
if ( int code = (jack_client_close(mClient)) )
{
std::cerr << "Cannot disconnect client" << std::endl;
......
......@@ -317,7 +317,9 @@ void JackTrip::startProcess() throw(std::invalid_argument)
clientPingToServerStart();
break;
case SERVERPINGSERVER :
serverStart(true);
if ( serverStart(true) == -1 ) {
return;
}
break;
default:
throw std::invalid_argument("Jacktrip Mode undefined");
......@@ -325,6 +327,7 @@ void JackTrip::startProcess() throw(std::invalid_argument)
}
// Start Threads
cout << "---> STARTING THREADS mAudioInterface->startProcess();" << endl;
mAudioInterface->startProcess();
for (int i = 0; i < mProcessPlugins.size(); ++i) {
......@@ -332,8 +335,10 @@ void JackTrip::startProcess() throw(std::invalid_argument)
}
mAudioInterface->connectDefaultPorts();
mDataProtocolReceiver->start();
cout << "---> AFTER THREADS mDataProtocolReceiver->start();" << endl;
QThread::msleep(1);
mDataProtocolSender->start();
cout << "---> AFTER THREADS mDataProtocolSender->start();" << endl;
}
......@@ -343,8 +348,9 @@ void JackTrip::stop()
cout << "INIT STOP-----------" << endl;
// Stop The Sender
mDataProtocolSender->stop();
cout << "Stop The Sender 1-----------" << endl;
mDataProtocolSender->wait();
cout << "Stop The Sender-----------" << endl;
cout << "Stop The Sender 2-----------" << endl;
// Stop The Receiver
mDataProtocolReceiver->stop();
......@@ -389,7 +395,7 @@ void JackTrip::clientStart() throw(std::invalid_argument)
//*******************************************************************************
void JackTrip::serverStart(bool timeout, int udpTimeout)
int JackTrip::serverStart(bool timeout, int udpTimeout)
throw(std::invalid_argument, std::runtime_error)
{
// Set the peer address
......@@ -411,6 +417,7 @@ void JackTrip::serverStart(bool timeout, int udpTimeout)
if ( !UdpSockTemp.bind(QHostAddress::Any, mReceiverBindPort,
QUdpSocket::DefaultForPlatform) )
{
std::cerr << "in JackTrip: Could not bind UDP socket. It may be already binded." << endl;
throw std::runtime_error("Could not bind UDP socket. It may be already binded.");
}
// Listen to client
......@@ -418,18 +425,18 @@ void JackTrip::serverStart(bool timeout, int udpTimeout)
int elapsedTime = 0;
if (timeout) {
while ( (!UdpSockTemp.hasPendingDatagrams()) && (elapsedTime <= udpTimeout) ) {
if (mStopped == true) { emit signalUdpTimeOut(); return; }
if (mStopped == true) { emit signalUdpTimeOut(); UdpSockTemp.close(); return -1; }
QThread::msleep(sleepTime);
elapsedTime += sleepTime;
}
if (!UdpSockTemp.hasPendingDatagrams()) {
emit signalUdpTimeOut();
cout << "---> JackTrip::serverStart TIMEOUT" << endl;
return;
return -1;
}
} else {
while ( !UdpSockTemp.hasPendingDatagrams() ) {
if (mStopped == true) { emit signalUdpTimeOut(); return; }
if (mStopped == true) { emit signalUdpTimeOut(); return -1; }
QThread::msleep(sleepTime);
}
}
......@@ -455,6 +462,7 @@ void JackTrip::serverStart(bool timeout, int udpTimeout)
mDataProtocolSender->setPeerPort(peer_port);
mDataProtocolReceiver->setPeerPort(peer_port);
setPeerPorts(peer_port);
return 0;
}
......
......@@ -402,7 +402,7 @@ public:
/// \brief Starts for the SERVER mode
/// \param timout Set the server to timeout after 2 seconds if no client connections are received.
/// Usefull for the multithreaded server
void serverStart(bool timeout = false, int udpTimeout = gTimeOutMultiThreadedServer)
int serverStart(bool timeout = false, int udpTimeout = gTimeOutMultiThreadedServer)
throw(std::invalid_argument, std::runtime_error);
/// \brief Stats for the Client to Ping Server
virtual void clientPingToServerStart() throw(std::invalid_argument);
......
......@@ -130,6 +130,22 @@ void JackTripWorker::run()
JamTest jacktrip(JackTrip::SERVERPINGSERVER); // ########### JamTest #################
//JackTrip jacktrip(JackTrip::SERVERPINGSERVER, JackTrip::UDP, mNumChans, 2);
#endif
// Connect signals and slots
// -------------------------
cout << "---> BEFORE SIGNAL SLOTS 1 " << endl;
// Connection to terminate JackTrip when packets haven't arrive for
// a certain amount of time
QObject::connect(&jacktrip, SIGNAL(signalNoUdpPacketsForSeconds()),
&jacktrip, SLOT(slotStopProcesses()), Qt::QueuedConnection);
cout << "---> BEFORE SIGNAL SLOTS 2 " << endl;
// Connection to terminate the local eventloop when jacktrip is done
QObject::connect(&jacktrip, SIGNAL(signalProcessesStopped()),
&event_loop, SLOT(quit()), Qt::QueuedConnection);
cout << "---> BEFORE SIGNAL SLOTS 3 " << endl;
QObject::connect(this, SIGNAL(signalRemoveThread()),
&jacktrip, SLOT(slotStopProcesses()), Qt::QueuedConnection);
cout << "---> BEFORE ClientAddress.setAddress(mClientAddress) " << endl;
ClientAddress.setAddress(mClientAddress);
// If I don't type this line, I get a bus error in the next line.
......@@ -147,20 +163,7 @@ void JackTripWorker::run()
return;
}
// Connect signals and slots
// -------------------------
cout << "---> BEFORE SIGNAL SLOTS 1 " << endl;
// Connection to terminate JackTrip when packets haven't arrive for
// a certain amount of time
QObject::connect(&jacktrip, SIGNAL(signalNoUdpPacketsForSeconds()),
&jacktrip, SLOT(slotStopProcesses()), Qt::QueuedConnection);
cout << "---> BEFORE SIGNAL SLOTS 2 " << endl;
// Connection to terminate the local eventloop when jacktrip is done
QObject::connect(&jacktrip, SIGNAL(signalProcessesStopped()),
&event_loop, SLOT(quit()), Qt::QueuedConnection);
cout << "---> BEFORE SIGNAL SLOTS 3 " << endl;
QObject::connect(this, SIGNAL(signalRemoveThread()),
&jacktrip, SLOT(slotStopProcesses()), Qt::QueuedConnection);
// Start Threads and event loop
cout << "---> BEFORE jacktrip.startProcess() " << endl;
......@@ -178,10 +181,7 @@ void JackTripWorker::run()
event_loop.exec(); // Excecution will block here until exit() the QEventLoop
//--------------------------------------------------------------------------
{ // Thread is already spawning, so release the lock
QMutexLocker locker(&mMutex);
mSpawning = true;
}
{ QMutexLocker locker(&mMutex); mSpawning = true; }
// wait for jacktrip to be done before exiting the Worker Thread
jacktrip.wait();
......@@ -192,9 +192,15 @@ void JackTripWorker::run()
std::cerr << "Couldn't send thread to the Pool" << endl;
std::cerr << e.what() << endl;
std::cerr << gPrintSeparator << endl;
mUdpMasterListener->releaseThread(mID);
{ QMutexLocker locker(&mMutex); mSpawning = false; }
return;
}
mUdpMasterListener->releaseThread(mID);
{
QMutexLocker locker(&mMutex);
mUdpMasterListener->releaseThread(mID);
}
cout << "JackTrip ID = " << mID << " released from the THREAD POOL" << endl;
cout << gPrintSeparator << endl;
......@@ -218,6 +224,7 @@ int JackTripWorker::setJackTripFromClientHeader(JackTrip& jacktrip)
if ( !UdpSockTemp.bind(QHostAddress::Any, mServerPort,
QUdpSocket::DefaultForPlatform) )
{
std::cerr << "in JackTripWorker: Could not bind UDP socket. It may be already binded." << endl;
throw std::runtime_error("Could not bind UDP socket. It may be already binded.");
}
......@@ -225,7 +232,7 @@ int JackTripWorker::setJackTripFromClientHeader(JackTrip& jacktrip)
QWaitCondition sleep; // time is in milliseconds
QMutex mutex;
int sleepTime = 100; // ms
int udpTimeout = gTimeOutMultiThreadedServer*1000; // gTimeOutMultiThreadedServer seconds
int udpTimeout = gTimeOutMultiThreadedServer; // gTimeOutMultiThreadedServer mseconds
int elapsedTime = 0;
{
QMutexLocker lock(&mutex);
......@@ -236,6 +243,7 @@ int JackTripWorker::setJackTripFromClientHeader(JackTrip& jacktrip)
}
}
if (!UdpSockTemp.hasPendingDatagrams()) {
UdpSockTemp.close();
cout << "---> UdpSockTemp.hasPendingDatagrams() returning error" << endl;
return -1;
}
......@@ -275,5 +283,7 @@ bool JackTripWorker::isSpawning()
//*******************************************************************************
void JackTripWorker::stopThread()
{
QMutexLocker locker(&mMutex);
cout << "---> EMIT SIGNAL STOP THREADS JackTripWorker::stopThread()" << endl;
emit signalRemoveThread();
}
......@@ -70,6 +70,7 @@ mRunMode(runmode),
mAudioPacket(NULL), mFullPacket(NULL),
mUdpRedundancyFactor(udp_redundancy_factor)
{
mStopped = false;
if (mRunMode == RECEIVER) {
QObject::connect(this, SIGNAL(signalWatingTooLong(int)),
jacktrip, SLOT(slotUdpWatingTooLong(int)), Qt::QueuedConnection);
......@@ -303,7 +304,13 @@ void UdpDataProtocol::getPeerAddressFromFirstPacket(QUdpSocket& UdpSocket,
//*******************************************************************************
void UdpDataProtocol::run()
{
mStopped = false;
cout << "===> STARTING UdpDataProtocol::run()" << endl;
/*
{
QMutexLocker lock(&mMutex);
mStopped = false;
}
*/
//QObject::connect(this, SIGNAL(signalError(const char*)),
// mJackTrip, SLOT(slotStopProcesses()),
......@@ -318,6 +325,7 @@ void UdpDataProtocol::run()
//this->terminate();
//this->wait();
cout << "AFTER EMITING" << endl;
return;
}
......@@ -364,7 +372,7 @@ void UdpDataProtocol::run()
// from that packet
std::cout << "Waiting for Peer..." << std::endl;
// This blocks waiting for the first packet
while ( !UdpSocket.hasPendingDatagrams() && !mStopped ) {
while ( !UdpSocket.hasPendingDatagrams() ) {
if (mStopped) { return; }
QThread::msleep(100);
}
......
......@@ -159,8 +159,10 @@ void UdpMasterListener::run()
// stop the thread
mJTWorkers->at(id_remove)->stopThread();
// block until the thread has been removed from the pool
while ( isNewAddress(PeerAddress.toIPv4Address(), peer_udp_port) == -1 )
{ cout << "removing" << endl; QThread::msleep(10); }
while ( isNewAddress(PeerAddress.toIPv4Address(), peer_udp_port) == -1 ) {
cout << "JackTrip MULTI-THREADED SERVER: Removing JackTripWorker from pool..." << endl;
QThread::msleep(10);
}
// Get a new ID for this client
//id = isNewAddress(PeerAddress.toIPv4Address(), peer_udp_port);
id = getPoolID(PeerAddress.toIPv4Address(), peer_udp_port);
......@@ -168,8 +170,9 @@ void UdpMasterListener::run()
// Assign server port and send it to Client
server_udp_port = mBasePort+id;
if ( sendUdpPort(clientConnection, server_udp_port) == 0 ) {
releaseThread(id);
clientConnection->close();
delete clientConnection;
releaseThread(id);
break;
}
......@@ -407,7 +410,9 @@ int UdpMasterListener::getPoolID(uint32_t address, uint16_t port)
//*******************************************************************************
int UdpMasterListener::releaseThread(int id)
{
cout << "---> UdpMasterListener::releaseThread(int id)" << endl;
QMutexLocker lock(&mMutex);
cout << "---> AFTER UdpMasterListener::releaseThread(int id) MUTEX" << endl;
mActiveAddress[id][0] = 0;
mActiveAddress[id][1] = 0;
mTotalRunningThreads--;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment