JackTripWorker.cpp 12.3 KB
Newer Older
1
2
3
4
5
6
7
//*****************************************************************
/*
  JackTrip: A System for High-Quality Audio Network Performance
  over the Internet

  Copyright (c) 2008 Juan-Pablo Caceres, Chris Chafe.
  SoundWIRE group at CCRMA, Stanford University.
8

9
10
11
12
13
14
15
16
  Permission is hereby granted, free of charge, to any person
  obtaining a copy of this software and associated documentation
  files (the "Software"), to deal in the Software without
  restriction, including without limitation the rights to use,
  copy, modify, merge, publish, distribute, sublicense, and/or sell
  copies of the Software, and to permit persons to whom the
  Software is furnished to do so, subject to the following
  conditions:
17

18
19
  The above copyright notice and this permission notice shall be
  included in all copies or substantial portions of the Software.
20

21
22
23
24
25
26
27
28
29
30
31
  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
  OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
  WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
  OTHER DEALINGS IN THE SOFTWARE.
*/
//*****************************************************************

32
33
34
35
36
37
38
39
40
41
/**
 * \file JackTripWorker.cpp
 * \author Juan-Pablo Caceres
 * \date September 2008
 */

#include <iostream>
#include <unistd.h>

#include <QTimer>
42
#include <QMutexLocker>
43
#include <QWaitCondition>
44
45
46
47
48
49

#include "JackTripWorker.h"
#include "JackTrip.h"
#include "UdpMasterListener.h"
#include "NetKS.h"
#include "LoopBack.h"
Chris Chafe's avatar
src/  
Chris Chafe committed
50
51
52
#ifdef WAIR // wair
#include "dcblock2gain.dsp.h"
#endif // endwhere
53
#ifdef __JAMTEST__
54
#include "JamTest.h"
55
#endif
56
57
58
59

using std::cout; using std::endl;

//*******************************************************************************
Aaron Wyatt's avatar
Aaron Wyatt committed
60
JackTripWorker::JackTripWorker(UdpMasterListener* udpmasterlistener, int BufferQueueLength, JackTrip::underrunModeT UnderRunMode) :
Chris Chafe's avatar
src/  
Chris Chafe committed
61
    mUdpMasterListener(udpmasterlistener),
62
    m_connectDefaultAudioPorts(false),
Aaron Wyatt's avatar
Aaron Wyatt committed
63
    mBufferQueueLength(BufferQueueLength),
Aaron Wyatt's avatar
Aaron Wyatt committed
64
    mUnderRunMode(UnderRunMode),
65
66
67
    mSpawning(false),
    mID(0),
    mNumChans(1)
Chris Chafe's avatar
src/  
Chris Chafe committed
68
69
70
71
  #ifdef WAIR // wair
  ,mNumNetRevChans(0),
    mWAIR(false)
  #endif // endwhere
72
{
73
74
75
    setAutoDelete(false); // stick around after calling run()
    //mNetks = new NetKS;
    //mNetks->play();
76
77
78
79
80
81
}


//*******************************************************************************
JackTripWorker::~JackTripWorker()
{
82
    //delete mUdpMasterListener;
83
84
85
86
}


//*******************************************************************************
87
void JackTripWorker::setJackTrip(int id,
Aaron Wyatt's avatar
Aaron Wyatt committed
88
                                 QString client_address,
89
90
91
92
                                 uint16_t server_port,
                                 uint16_t client_port,
                                 int num_channels,
                                 bool connectDefaultAudioPorts)
93
{
94
95
96
97
98
99
100
101
102
103
104
    { //Start Spawning, so lock mSpawning
        QMutexLocker locker(&mMutex);
        mSpawning = true;
    }
    mID = id;
    // Set the jacktrip address and ports
    //mClientAddress.setAddress(client_address);
    mClientAddress = client_address;
    mServerPort = server_port;
    mClientPort = client_port;
    mNumChans = num_channels;
105
    m_connectDefaultAudioPorts = connectDefaultAudioPorts;
106
107
108
109
110
111
}


//*******************************************************************************
void JackTripWorker::run()
{
112
    /* NOTE: This is the message that qt prints when an exception is thrown:
113
114
    'Qt Concurrent has caught an exception thrown from a worker thread.
    This is not supported, exceptions thrown in worker threads must be
jcacerec's avatar
jcacerec committed
115
    caught before control returns to Qt Concurrent.'*/
116

117
    { QMutexLocker locker(&mMutex); mSpawning = true; }
118

Aaron Wyatt's avatar
Aaron Wyatt committed
119
    //QHostAddress ClientAddress;
120

121
122
123
124
125
    // Try catching any exceptions that come from JackTrip
    try
    {
        // Local event loop. this is necesary because QRunnables don't have their own as QThreads
        QEventLoop event_loop;
jcacerec's avatar
jcacerec committed
126

127
128
        // Create and setup JackTrip Object
        //JackTrip jacktrip(JackTrip::SERVER, JackTrip::UDP, mNumChans, 2);
Chris Chafe's avatar
src/  
Chris Chafe committed
129
130
131
132
133
134
135
136
137
138
139
140
141
        if (gVerboseFlag) cout << "---> JackTripWorker: Creating jacktrip objects..." << endl;

#ifdef WAIR // WAIR
        // forces    BufferQueueLength to 2
        // need to parse numNetChans from incoming header
        // but force to 16 for now
#define FORCEBUFFERQ 2
        if (mUdpMasterListener->isWAIR()) { // invoked with -Sw
            mWAIR = true;
            mNumNetRevChans = NUMNETREVCHANSbecauseNOTINRECEIVEDheader;
        } else {};
#endif // endwhere

142
#ifndef __JAMTEST__
Chris Chafe's avatar
src/  
Chris Chafe committed
143
144
145
146
147
148
149
150
151
#ifdef WAIR // WAIR
        //        bool tmp = mJTWorkers->at(id)->isWAIR();
        //        qDebug() << "is WAIR?" <<  tmp ;
        qDebug() << "mNumNetRevChans" <<  mNumNetRevChans ;

        JackTrip jacktrip(JackTrip::SERVERPINGSERVER, JackTrip::UDP, mNumChans,
                          mNumNetRevChans, FORCEBUFFERQ);
        JackTrip * mJackTrip = &jacktrip;
#else // endwhere
Aaron Wyatt's avatar
Aaron Wyatt committed
152
        JackTrip jacktrip(JackTrip::SERVERPINGSERVER, JackTrip::UDP, mNumChans, mBufferQueueLength);
Chris Chafe's avatar
src/  
Chris Chafe committed
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#endif // not wair

#ifdef WAIR // WAIR
        // Add Plugins
        if ( mWAIR ) {
            cout << "Running in WAIR Mode..." << endl;
            cout << gPrintSeparator << std::endl;
            switch ( mNumNetRevChans )
            {
            case 16 : // freeverb
                mJackTrip->appendProcessPlugin(new dcblock2gain(mNumChans)); // plugin slot 0
                ///////////////
                //            mJackTrip->appendProcessPlugin(new comb16server(mNumNetChans));
                // -S LAIR no AP  mJackTrip->appendProcessPlugin(new AP8(mNumChans));
                break;
            default:
                throw std::invalid_argument("Settings: mNumNetChans doesn't correspond to Faust plugin");
                break;
            }
        }
#endif // endwhere
#endif // ifndef __JAMTEST__

176
#ifdef __JAMTEST__
177
178
        JamTest jacktrip(JackTrip::SERVERPINGSERVER); // ########### JamTest #################
        //JackTrip jacktrip(JackTrip::SERVERPINGSERVER, JackTrip::UDP, mNumChans, 2);
179
#endif
180

181
182
        jacktrip.setConnectDefaultAudioPorts(m_connectDefaultAudioPorts);

Aaron Wyatt's avatar
Aaron Wyatt committed
183
184
185
        // Set our underrun mode
        jacktrip.setUnderRunMode(mUnderRunMode);

186
187
        // Connect signals and slots
        // -------------------------
Chris Chafe's avatar
src/  
Chris Chafe committed
188
        if (gVerboseFlag) cout << "---> JackTripWorker: Connecting signals and slots..." << endl;
189
190
191
192
193
194
195
196
197
198
        // Connection to terminate JackTrip when packets haven't arrive for
        // a certain amount of time
        QObject::connect(&jacktrip, SIGNAL(signalNoUdpPacketsForSeconds()),
                         &jacktrip, SLOT(slotStopProcesses()), Qt::QueuedConnection);
        // Connection to terminate the local eventloop when jacktrip is done
        QObject::connect(&jacktrip, SIGNAL(signalProcessesStopped()),
                         &event_loop, SLOT(quit()), Qt::QueuedConnection);
        QObject::connect(this, SIGNAL(signalRemoveThread()),
                         &jacktrip, SLOT(slotStopProcesses()), Qt::QueuedConnection);

Aaron Wyatt's avatar
Aaron Wyatt committed
199
        //ClientAddress.setAddress(mClientAddress);
200
201
        // If I don't type this line, I get a bus error in the next line.
        // I still haven't figure out why
Aaron Wyatt's avatar
Aaron Wyatt committed
202
203
204
        //ClientAddress.toString().toLatin1().constData();
        //jacktrip.setPeerAddress(ClientAddress.toString().toLatin1().constData());
        jacktrip.setPeerAddress(mClientAddress.toLatin1().constData());
205
206
207
        jacktrip.setBindPorts(mServerPort);
        //jacktrip.setPeerPorts(mClientPort);

Chris Chafe's avatar
src/  
Chris Chafe committed
208
        if (gVerboseFlag) cout << "---> JackTripWorker: setJackTripFromClientHeader..." << endl;
209
210
211
212
213
214
215
216
        int PeerConnectionMode = setJackTripFromClientHeader(jacktrip);
        if ( PeerConnectionMode == -1 ) {
            mUdpMasterListener->releaseThread(mID);
            { QMutexLocker locker(&mMutex); mSpawning = false; }
            return;
        }

        // Start Threads and event loop
Chris Chafe's avatar
src/  
Chris Chafe committed
217
218
        if (gVerboseFlag) cout << "---> JackTripWorker: startProcess..." << endl;
        jacktrip.startProcess(
Chris Chafe's avatar
Chris Chafe committed
219
            #ifdef WAIRTOMASTER // wair
Chris Chafe's avatar
src/  
Chris Chafe committed
220
221
222
223
224
                    mID
            #endif // endwhere
                    );
        // if (gVerboseFlag) cout << "---> JackTripWorker: start..." << endl;
        // jacktrip.start(); // ########### JamTest Only #################
225
226
227
228
229
230
231
232
233
234
235

        // Thread is already spawning, so release the lock
        { QMutexLocker locker(&mMutex); mSpawning = false; }

        event_loop.exec(); // Excecution will block here until exit() the QEventLoop
        //--------------------------------------------------------------------------

        { QMutexLocker locker(&mMutex); mSpawning = true; }

        // wait for jacktrip to be done before exiting the Worker Thread
        jacktrip.wait();
jcacerec's avatar
jcacerec committed
236

237
238
239
240
241
242
243
244
245
246
    }
    catch ( const std::exception & e )
    {
        std::cerr << "Couldn't send thread to the Pool" << endl;
        std::cerr << e.what() << endl;
        std::cerr << gPrintSeparator << endl;
        mUdpMasterListener->releaseThread(mID);
        { QMutexLocker locker(&mMutex); mSpawning = false; }
        return;
    }
247

248
249
250
251
    {
        QMutexLocker locker(&mMutex);
        mUdpMasterListener->releaseThread(mID);
    }
252

253
254
255
256
257
258
259
    cout << "JackTrip ID = " << mID << " released from the THREAD POOL" << endl;
    cout << gPrintSeparator << endl;
    {
        // Thread is already spawning, so release the lock
        QMutexLocker locker(&mMutex);
        mSpawning = false;
    }
260
261
262
}


263
//*******************************************************************************
264
// returns -1 on error
265
int JackTripWorker::setJackTripFromClientHeader(JackTrip& jacktrip)
266
{
267
268
269
270
271
272
273
274
275
276
    //QHostAddress peerHostAddress;
    //uint16_t peer_port;
    QUdpSocket UdpSockTemp;// Create socket to wait for client

    // Bind the socket
    if ( !UdpSockTemp.bind(QHostAddress::Any, mServerPort,
                           QUdpSocket::DefaultForPlatform) )
    {
        std::cerr << "in JackTripWorker: Could not bind UDP socket. It may be already binded." << endl;
        throw std::runtime_error("Could not bind UDP socket. It may be already binded.");
277
    }
278
279
280
281
282
283
284
285
286
287
288
289

    // Listen to client
    QWaitCondition sleep; // time is in milliseconds
    QMutex mutex;
    int sleepTime = 100; // ms
    int udpTimeout = gTimeOutMultiThreadedServer; // gTimeOutMultiThreadedServer mseconds
    int elapsedTime = 0;
    {
        QMutexLocker lock(&mutex);
        while ( (!UdpSockTemp.hasPendingDatagrams()) && (elapsedTime <= udpTimeout) ) {
            sleep.wait(&mutex,sleepTime);
            elapsedTime += sleepTime;
Chris Chafe's avatar
src/  
Chris Chafe committed
290
            if (gVerboseFlag) cout << "---------> ELAPSED TIME: " << elapsedTime << endl;
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
        }
    }
    // Check if we time out or not
    if (!UdpSockTemp.hasPendingDatagrams()) {
        std::cerr << "--->JackTripWorker: is not receiving Datagrams (timeout)" << endl;
        UdpSockTemp.close();
        return -1;
    }
    int packet_size = UdpSockTemp.pendingDatagramSize();
    char packet[packet_size];
    UdpSockTemp.readDatagram(packet, packet_size);
    UdpSockTemp.close(); // close the socket
    int8_t* full_packet = reinterpret_cast<int8_t*>(packet);

    int PeerBufferSize = jacktrip.getPeerBufferSize(full_packet);
    int PeerSamplingRate = jacktrip.getPeerSamplingRate(full_packet);
    int PeerBitResolution = jacktrip.getPeerBitResolution(full_packet);
    int PeerNumChannels = jacktrip.getPeerNumChannels(full_packet);
    int PeerConnectionMode = jacktrip.getPeerConnectionMode(full_packet);

Chris Chafe's avatar
src/  
Chris Chafe committed
311
312
313
314
315
    if (gVerboseFlag) cout << "--->JackTripWorker: getPeerBufferSize = " << PeerBufferSize << endl;
    if (gVerboseFlag) cout << "--->JackTripWorker: getPeerSamplingRate = " << PeerSamplingRate << endl;
    if (gVerboseFlag) cout << "--->JackTripWorker: getPeerBitResolution = " << PeerBitResolution << endl;
    cout << "--->JackTripWorker: PeerNumChannels = " << PeerNumChannels << endl;
    if (gVerboseFlag) cout << "--->JackTripWorker: getPeerConnectionMode = " << PeerConnectionMode << endl;
316
317
318

    jacktrip.setNumChannels(PeerNumChannels);
    return PeerConnectionMode;
319
320
321
}


322
//*******************************************************************************
323
bool JackTripWorker::isSpawning()
324
{
325
326
    QMutexLocker locker(&mMutex);
    return mSpawning;
327
}
328
329


330
331
332
//*******************************************************************************
void JackTripWorker::stopThread()
{
333
334
    QMutexLocker locker(&mMutex);
    emit signalRemoveThread();
335
}