Commit 1e68da00 authored by jcacerec's avatar jcacerec
Browse files

a bunch of the server re-connecting client bugs fixed, all debug messages still there

parent ff160529
......@@ -225,12 +225,14 @@ int JackAudioInterface::startProcess() const
//*******************************************************************************
int JackAudioInterface::stopProcess() const
{
cout << "---> BEFORE JackAudioInterface::stopProcess()" << endl;
QMutexLocker locker(&sJackMutex);
if ( int code = (jack_client_close(mClient)) )
{
std::cerr << "Cannot disconnect client" << std::endl;
return(code);
}
cout << "---> AFTER JackAudioInterface::stopProcess()" << endl;
return(0);
}
......
......@@ -295,6 +295,9 @@ void JackTrip::startProcess() throw(std::invalid_argument)
QObject::connect(mDataProtocolReceiver, SIGNAL(signalReceivedConnectionFromPeer()),
this, SLOT(slotReceivedConnectionFromPeer()),
Qt::QueuedConnection);
QObject::connect(this, SIGNAL(signalUdpTimeOut()),
this, SLOT(slotStopProcesses()), Qt::QueuedConnection);
//QObject::connect(mDataProtocolSender, SIGNAL(signalError(const char*)),
// this, SLOT(slotStopProcesses()), Qt::QueuedConnection);
//QObject::connect(mDataProtocolReceiver, SIGNAL(signalError(const char*)),
......@@ -313,6 +316,9 @@ void JackTrip::startProcess() throw(std::invalid_argument)
case CLIENTTOPINGSERVER :
clientPingToServerStart();
break;
case SERVERPINGSERVER :
serverStart(true);
break;
default:
throw std::invalid_argument("Jacktrip Mode undefined");
break;
......@@ -383,7 +389,8 @@ void JackTrip::clientStart() throw(std::invalid_argument)
//*******************************************************************************
void JackTrip::serverStart() throw(std::invalid_argument, std::runtime_error)
void JackTrip::serverStart(bool timeout, int udpTimeout)
throw(std::invalid_argument, std::runtime_error)
{
// Set the peer address
if ( !mPeerAddress.isEmpty() ) {
......@@ -394,7 +401,8 @@ void JackTrip::serverStart() throw(std::invalid_argument, std::runtime_error)
}
// Get the client address when it connects
cout << "Waiting for Connection From Client..." << endl;//stop();
cout << "Waiting for Connection From Client..." << endl;
cout << "TIME OUT ----> " << timeout << endl;
QHostAddress peerHostAddress;
uint16_t peer_port;
QUdpSocket UdpSockTemp;// Create socket to wait for client
......@@ -406,9 +414,24 @@ void JackTrip::serverStart() throw(std::invalid_argument, std::runtime_error)
throw std::runtime_error("Could not bind UDP socket. It may be already binded.");
}
// Listen to client
while ( !UdpSockTemp.hasPendingDatagrams() ) {
if (mStopped == true) { return; }
QThread::usleep(100000);
int sleepTime = 100; // ms
int elapsedTime = 0;
if (timeout) {
while ( (!UdpSockTemp.hasPendingDatagrams()) && (elapsedTime <= udpTimeout) ) {
if (mStopped == true) { emit signalUdpTimeOut(); return; }
QThread::msleep(sleepTime);
elapsedTime += sleepTime;
}
if (!UdpSockTemp.hasPendingDatagrams()) {
emit signalUdpTimeOut();
cout << "---> JackTrip::serverStart TIMEOUT" << endl;
return;
}
} else {
while ( !UdpSockTemp.hasPendingDatagrams() ) {
if (mStopped == true) { emit signalUdpTimeOut(); return; }
QThread::msleep(sleepTime);
}
}
char buf[1];
// set client address
......
......@@ -82,7 +82,8 @@ public:
enum jacktripModeT {
SERVER, ///< Run in Server Mode
CLIENT, ///< Run in Client Mode
CLIENTTOPINGSERVER ///< Client of the Ping Server Mode
CLIENTTOPINGSERVER, ///< Client of the Ping Server Mode
SERVERPINGSERVER ///< Server of the MultiThreaded JackTrip
};
/// \brief Enum for the JackTrip Underrun Mode, when packets
......@@ -377,6 +378,8 @@ public slots:
signals:
void signalUdpTimeOut();
/// \brief Signal emitted when all the processes and threads are stopped
void signalProcessesStopped();
/// \brief Signal emitted when no UDP Packets have been received for a while
......@@ -397,7 +400,10 @@ public:
/// \brief Starts for the CLIENT mode
void clientStart() throw(std::invalid_argument);
/// \brief Starts for the SERVER mode
void serverStart() throw(std::invalid_argument, std::runtime_error);
/// \param timout Set the server to timeout after 2 seconds if no client connections are received.
/// Usefull for the multithreaded server
void serverStart(bool timeout = false, int udpTimeout = gTimeOutMultiThreadedServer)
throw(std::invalid_argument, std::runtime_error);
/// \brief Stats for the Client to Ping Server
virtual void clientPingToServerStart() throw(std::invalid_argument);
......
......@@ -108,11 +108,10 @@ void JackTripWorker::run()
This is not supported, exceptions thrown in worker threads must be
caught before control returns to Qt Concurrent.'*/
{
// Thread is already spawning, so release the lock
QMutexLocker locker(&mMutex);
mSpawning = true;
}
cout << "---> BEFORE QMutexLocker locker(&mMutex); mSpawning = true; " << endl;
{ QMutexLocker locker(&mMutex); mSpawning = true; }
cout << "---> AFTER QMutexLocker locker(&mMutex); mSpawning = true; " << endl;
QHostAddress ClientAddress;
......@@ -125,13 +124,13 @@ void JackTripWorker::run()
// Create and setup JackTrip Object
//JackTrip jacktrip(JackTrip::SERVER, JackTrip::UDP, mNumChans, 2);
#ifndef __JAMTEST__
JackTrip jacktrip(JackTrip::SERVER, JackTrip::UDP, mNumChans, 2);
JackTrip jacktrip(JackTrip::SERVERPINGSERVER, JackTrip::UDP, mNumChans, 2);
#endif
#ifdef __JAMTEST__
JamTest jacktrip(JackTrip::SERVER); // ########### JamTest #################
//JackTrip jacktrip(JackTrip::SERVER, JackTrip::UDP, mNumChans, 2);
JamTest jacktrip(JackTrip::SERVERPINGSERVER); // ########### JamTest #################
//JackTrip jacktrip(JackTrip::SERVERPINGSERVER, JackTrip::UDP, mNumChans, 2);
#endif
cout << "---> BEFORE ClientAddress.setAddress(mClientAddress) " << endl;
ClientAddress.setAddress(mClientAddress);
// If I don't type this line, I get a bus error in the next line.
// I still haven't figure out why
......@@ -140,25 +139,37 @@ void JackTripWorker::run()
jacktrip.setBindPorts(mServerPort);
//jacktrip.setPeerPorts(mClientPort);
cout << "---> BEFORE PeerConnectionMode " << endl;
int PeerConnectionMode = setJackTripFromClientHeader(jacktrip);
if ( PeerConnectionMode == -1 ) {
mUdpMasterListener->releaseThread(mID);
{ QMutexLocker locker(&mMutex); mSpawning = false; }
return;
}
// Connect signals and slots
// -------------------------
cout << "---> BEFORE SIGNAL SLOTS 1 " << endl;
// Connection to terminate JackTrip when packets haven't arrive for
// a certain amount of time
QObject::connect(&jacktrip, SIGNAL(signalNoUdpPacketsForSeconds()),
&jacktrip, SLOT(slotStopProcesses()), Qt::QueuedConnection);
cout << "---> BEFORE SIGNAL SLOTS 2 " << endl;
// Connection to terminate the local eventloop when jacktrip is done
QObject::connect(&jacktrip, SIGNAL(signalProcessesStopped()),
&event_loop, SLOT(quit()), Qt::QueuedConnection);
cout << "---> BEFORE SIGNAL SLOTS 3 " << endl;
QObject::connect(this, SIGNAL(signalRemoveThread()),
&jacktrip, SLOT(slotStopProcesses()), Qt::QueuedConnection);
// Start Threads and event loop
cout << "---> BEFORE jacktrip.startProcess() " << endl;
jacktrip.startProcess();
cout << "---> BEFORE jacktrip.start()" << endl;
jacktrip.start(); // ########### JamTest Only #################
cout << "@@@@@@@@@@@@@@@@@> AFTER JACKTRIPWORKER jacktrip.start()" << endl;
cout << "---> BEFORE QMutexLocker locker(&mMutex);" << endl;
{ // Thread is already spawning, so release the lock
QMutexLocker locker(&mMutex);
mSpawning = false;
......@@ -196,6 +207,7 @@ void JackTripWorker::run()
//*******************************************************************************
// returns -1 on error
int JackTripWorker::setJackTripFromClientHeader(JackTrip& jacktrip)
{
//QHostAddress peerHostAddress;
......@@ -210,10 +222,23 @@ int JackTripWorker::setJackTripFromClientHeader(JackTrip& jacktrip)
}
// Listen to client
QWaitCondition sleep;
QMutex mutex; mutex.lock();
while ( !UdpSockTemp.hasPendingDatagrams() ) { sleep.wait(&mutex,100); }
mutex.unlock();
QWaitCondition sleep; // time is in milliseconds
QMutex mutex;
int sleepTime = 100; // ms
int udpTimeout = gTimeOutMultiThreadedServer*1000; // gTimeOutMultiThreadedServer seconds
int elapsedTime = 0;
{
QMutexLocker lock(&mutex);
while ( (!UdpSockTemp.hasPendingDatagrams()) && (elapsedTime <= udpTimeout) ) {
sleep.wait(&mutex,sleepTime);
elapsedTime += sleepTime;
cout << "---------> ELAPSED TIME: " << elapsedTime << endl;
}
}
if (!UdpSockTemp.hasPendingDatagrams()) {
cout << "---> UdpSockTemp.hasPendingDatagrams() returning error" << endl;
return -1;
}
int packet_size = UdpSockTemp.pendingDatagramSize();
char packet[packet_size];
UdpSockTemp.readDatagram(packet, packet_size);
......
......@@ -117,81 +117,99 @@ void UdpMasterListener::run()
std::exit(1);
}
cout << "TCP Server Listening in Port = " << TcpServer.serverPort() << endl;
const int tcpTimeout = 5*1000;
cout << "JackTrip MULTI-THREADED SERVER: TCP Server Listening in Port = " << TcpServer.serverPort() << endl;
while ( !mStopped )
{
cout << "Waiting for client connections..." << endl;
cout << "JackTrip MULTI-THREADED SERVER: Waiting for client connections..." << endl;
cout << "=======================================================" << endl;
while ( !TcpServer.waitForNewConnection(1000) ) {} // block until a new connection is received
cout << "Client Connection Received!" << endl;
QTcpSocket *clientConnection = TcpServer.nextPendingConnection();
PeerAddress = clientConnection->peerAddress();
cout << "Client Connect Received from Address : "
<< PeerAddress.toString().toStdString() << endl;
// Get UDP port from client
// ------------------------
peer_udp_port = readClientUdpPort(clientConnection);
cout << "Client UDP Port is = " << peer_udp_port << endl;
// Check is client is new or not
// -----------------------------
// Check if Address is not already in the thread pool
// check by comparing 32-bit addresses
int id = isNewAddress(PeerAddress.toIPv4Address(), peer_udp_port);
// If the address is not new, we need to remove the client from the pool
// before re-starting the connection
if (id == -1) {
int id_remove;
id_remove = getPoolID(PeerAddress.toIPv4Address(), peer_udp_port);
// stop the thread
//if ( mJTWorkers->at(id_remove) != NULL) {
while ( !TcpServer.waitForNewConnection(1000) )
{ if (mStopped) { return; } } // block until a new connection is received
cout << "JackTrip MULTI-THREADED SERVER: Client Connection Received!" << endl;
// Control loop to be able to exit if UDPs or TCPs error ocurr
for (int dum = 0; dum<1; dum++) {
QTcpSocket *clientConnection = TcpServer.nextPendingConnection();
if ( !clientConnection->waitForConnected(tcpTimeout) ) {
std::cerr << clientConnection->errorString().toStdString() << endl;
break;
}
PeerAddress = clientConnection->peerAddress();
cout << "JackTrip MULTI-THREADED SERVER: Client Connect Received from Address : "
<< PeerAddress.toString().toStdString() << endl;
// Get UDP port from client
// ------------------------
peer_udp_port = readClientUdpPort(clientConnection);
if ( peer_udp_port == 0 ) { break; }
cout << "JackTrip MULTI-THREADED SERVER: Client UDP Port is = " << peer_udp_port << endl;
// Check is client is new or not
// -----------------------------
// Check if Address is not already in the thread pool
// check by comparing 32-bit addresses
int id = isNewAddress(PeerAddress.toIPv4Address(), peer_udp_port);
// If the address is not new, we need to remove the client from the pool
// before re-starting the connection
if (id == -1) {
int id_remove;
id_remove = getPoolID(PeerAddress.toIPv4Address(), peer_udp_port);
// stop the thread
mJTWorkers->at(id_remove)->stopThread();
//}
// block until the thread has been removed from the pool
while ( isNewAddress(PeerAddress.toIPv4Address(), peer_udp_port) == -1 )
{ cout << "removing" << endl; QThread::msleep(10); }
cout << "---> AFTER REMOVING" << endl;
// Get a new ID for this client
//id = isNewAddress(PeerAddress.toIPv4Address(), peer_udp_port);
id = getPoolID(PeerAddress.toIPv4Address(), peer_udp_port);
}
cout << "AFTER FIRST LOOP" << endl;
// Assign server port and send it to Client
server_udp_port = mBasePort+id;
sendUdpPort(clientConnection, server_udp_port);
// Close and Delete the socket
// ---------------------------
clientConnection->close();
cout << "---> BEFORE DELETE clientConnection" << endl;
delete clientConnection;
cout << "Client TCP Socket Closed!" << endl;
// Spawn Thread to Pool
// --------------------
// Register JackTripWorker with the master listener
cout << "BEFORE DELETE mJTWorkers" << endl;
delete mJTWorkers->at(id); // just in case the Worker was previously created
mJTWorkers->insert(id, NULL);
mJTWorkers->replace(id, new JackTripWorker(this));
cout << "---> AFTER new" << endl;
// redirect port and spawn listener
{
QMutexLocker lock(&mMutex);
cout << "---> AFTER QMutexLocker" << endl;
mJTWorkers->at(id)->setJackTrip(id, mActiveAddress[id][0],
server_udp_port, mActiveAddress[id][1],
1); /// \todo temp default to 1 channel
// block until the thread has been removed from the pool
while ( isNewAddress(PeerAddress.toIPv4Address(), peer_udp_port) == -1 )
{ cout << "removing" << endl; QThread::msleep(10); }
// Get a new ID for this client
//id = isNewAddress(PeerAddress.toIPv4Address(), peer_udp_port);
id = getPoolID(PeerAddress.toIPv4Address(), peer_udp_port);
}
// Assign server port and send it to Client
server_udp_port = mBasePort+id;
if ( sendUdpPort(clientConnection, server_udp_port) == 0 ) {
releaseThread(id);
delete clientConnection;
break;
}
// Close and Delete the socket
// ---------------------------
clientConnection->close();
cout << "---> BEFORE DELETE clientConnection" << endl;
delete clientConnection;
cout << "JackTrip MULTI-THREADED SERVER: Client TCP Socket Closed!" << endl;
// Spawn Thread to Pool
// --------------------
// Register JackTripWorker with the master listener
cout << "BEFORE DELETE mJTWorkers" << endl;
delete mJTWorkers->at(id); // just in case the Worker was previously created
mJTWorkers->insert(id, NULL);
mJTWorkers->replace(id, new JackTripWorker(this));
cout << "---> AFTER new" << endl;
// redirect port and spawn listener
{
QMutexLocker lock(&mMutex);
cout << "---> AFTER QMutexLocker" << endl;
mJTWorkers->at(id)->setJackTrip(id, mActiveAddress[id][0],
server_udp_port, mActiveAddress[id][1],
1); /// \todo temp default to 1 channel
}
cout << "---> AFTER setJackTrip" << endl;
//send one thread to the pool
mThreadPool.start(mJTWorkers->at(id), QThread::TimeCriticalPriority);
cout << "---> AFTER mThreadPool.start" << endl;
// wait until one is complete before another spawns
while (mJTWorkers->at(id)->isSpawning()) { QThread::msleep(10); }
cout << "---> AFTER while (mJTWorkers->at(id)->isSpawning())" << endl;
//mTotalRunningThreads++;
cout << "JackTrip MULTI-THREADED SERVER: Total Running Threads: " << mTotalRunningThreads << endl;
cout << "===============================================================" << endl;
QThread::msleep(100);
}
//send one thread to the pool
mThreadPool.start(mJTWorkers->at(id), QThread::TimeCriticalPriority);
// wait until one is complete before another spawns
while (mJTWorkers->at(id)->isSpawning()) { QThread::msleep(10); }
mTotalRunningThreads++;
cout << "Total Running Threads: " << mTotalRunningThreads << endl;
cout << "=======================================================" << endl;
QThread::msleep(100);
}
/*
// Create objects on the stack
QUdpSocket MasterUdpSocket;
......@@ -245,6 +263,7 @@ void UdpMasterListener::run()
//*******************************************************************************
// Returns 0 on error
int UdpMasterListener::readClientUdpPort(QTcpSocket* clientConnection)
{
// Read the size of the package
......@@ -271,7 +290,7 @@ int UdpMasterListener::readClientUdpPort(QTcpSocket* clientConnection)
//*******************************************************************************
void UdpMasterListener::sendUdpPort(QTcpSocket* clientConnection, int udp_port)
int UdpMasterListener::sendUdpPort(QTcpSocket* clientConnection, int udp_port)
{
// Send Port Number to Client
// --------------------------
......@@ -279,8 +298,16 @@ void UdpMasterListener::sendUdpPort(QTcpSocket* clientConnection, int udp_port)
std::memcpy(port_buf, &udp_port, sizeof(udp_port));
clientConnection->write(port_buf, sizeof(udp_port));
while ( clientConnection->bytesToWrite() > 0 ) {
clientConnection->waitForBytesWritten(-1);
cout << "----> clientConnection->isValid(): " << clientConnection->isValid() << "STATE: " << clientConnection->state() << endl;
//if ( clientConnection->isValid() ) {
if ( clientConnection->state() == QAbstractSocket::ConnectedState ) {
clientConnection->waitForBytesWritten(-1);
}
else {
return 0;
}
}
return 1;
cout << "Port sent to Client" << endl;
}
......@@ -356,6 +383,9 @@ int UdpMasterListener::isNewAddress(uint32_t address, uint16_t port)
}
}
//cout << "ID -------------------------------> " << id << "BUSYADDRESS " << busyAddress << endl;
if (!busyAddress) {
mTotalRunningThreads++;
}
return ((busyAddress) ? -1 : id);
}
......
......@@ -94,7 +94,7 @@ private:
static void bindUdpSocket(QUdpSocket& udpsocket, int port) throw(std::runtime_error);
int readClientUdpPort(QTcpSocket* clientConnection);
void sendUdpPort(QTcpSocket* clientConnection, int udp_port);
int sendUdpPort(QTcpSocket* clientConnection, int udp_port);
/** \brief Send the JackTripWorker to the thread pool. This will run
......
......@@ -60,6 +60,7 @@ const int gDefaultOutputQueueLength = 4;
const uint32_t gDefaultSampleRate = 48000;
const uint32_t gDefaultBufferSizeInSamples = 128;
const int gDefaultRedundancy = 1;
const int gTimeOutMultiThreadedServer = 2000; // seconds
//@}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment