UdpDataProtocol.cpp 29.8 KB
Newer Older
jcaceres's avatar
jcaceres committed
1
2
//*****************************************************************
/*
jcaceres's avatar
jcaceres committed
3
  JackTrip: A System for High-Quality Audio Network Performance
jcaceres's avatar
jcaceres committed
4
5
6
7
  over the Internet

  Copyright (c) 2008 Juan-Pablo Caceres, Chris Chafe.
  SoundWIRE group at CCRMA, Stanford University.
8

jcaceres's avatar
jcaceres committed
9
10
11
12
13
14
15
16
  Permission is hereby granted, free of charge, to any person
  obtaining a copy of this software and associated documentation
  files (the "Software"), to deal in the Software without
  restriction, including without limitation the rights to use,
  copy, modify, merge, publish, distribute, sublicense, and/or sell
  copies of the Software, and to permit persons to whom the
  Software is furnished to do so, subject to the following
  conditions:
17

jcaceres's avatar
jcaceres committed
18
19
  The above copyright notice and this permission notice shall be
  included in all copies or substantial portions of the Software.
20

jcaceres's avatar
jcaceres committed
21
22
23
24
25
26
27
28
29
30
31
32
  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
  OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
  WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
  OTHER DEALINGS IN THE SOFTWARE.
*/
//*****************************************************************

/**
jcaceres's avatar
jcaceres committed
33
 * \file UdpDataProtocol.cpp
jcaceres's avatar
jcaceres committed
34
35
36
37
 * \author Juan-Pablo Caceres
 * \date June 2008
 */

jcaceres's avatar
jcaceres committed
38
#include "UdpDataProtocol.h"
39
#include "jacktrip_globals.h"
40
41
#include "JackTrip.h"

jcacerec's avatar
jcacerec committed
42
43
#include <QHostInfo>

jcaceres's avatar
jcaceres committed
44
45
#include <cstring>
#include <iostream>
jcaceres's avatar
jcaceres committed
46
#include <cstdlib>
47
#include <cerrno>
48
#include <stdexcept>
49
#ifdef __WIN_32__
Chris Chafe's avatar
src/  
Chris Chafe committed
50
51
//#include <winsock.h>
#include <winsock2.h> //cc need SD_SEND
Aaron Wyatt's avatar
Aaron Wyatt committed
52
#include <ws2tcpip.h> // for IPv6
53
#endif
jcacerec's avatar
jcacerec committed
54
#if defined (__LINUX__) || (__MAC__OSX__)
55
#include <sys/socket.h> // for POSIX Sockets
56
#endif
jcacerec's avatar
jcacerec committed
57

58
59
using std::cout; using std::endl;

60
61
// NOTE: It's better not to use
// using namespace std;
jcaceres's avatar
jcaceres committed
62
// because some functions (like exit()) get confused with QT functions
jcaceres's avatar
jcaceres committed
63

jcacerec's avatar
jcacerec committed
64
65
// sJackMutex definition
QMutex UdpDataProtocol::sUdpMutex;
jcaceres's avatar
jcaceres committed
66

67
//*******************************************************************************
68
UdpDataProtocol::UdpDataProtocol(JackTrip* jacktrip, const runModeT runmode,
jcacerec's avatar
jcacerec committed
69
                                 int bind_port, int peer_port,
jcacerec's avatar
jcacerec committed
70
                                 unsigned int udp_redundancy_factor) :
71
72
73
74
    DataProtocol(jacktrip, runmode, bind_port, peer_port),
    mBindPort(bind_port), mPeerPort(peer_port),
    mRunMode(runmode),
    mAudioPacket(NULL), mFullPacket(NULL),
Aaron Wyatt's avatar
Aaron Wyatt committed
75
    mUdpRedundancyFactor(udp_redundancy_factor)
76
{
77
    mStopped = false;
Aaron Wyatt's avatar
Aaron Wyatt committed
78
    mIPv6 = false;
Aaron Wyatt's avatar
Aaron Wyatt committed
79
80
81
82
83
    std::memset(&mPeerAddr, 0, sizeof(mPeerAddr));
    std::memset(&mPeerAddr6, 0, sizeof(mPeerAddr6));
    mPeerAddr.sin_port = htons(mPeerPort);
    mPeerAddr6.sin6_port = htons(mPeerPort);
    
84
    if (mRunMode == RECEIVER) {
85
86
        QObject::connect(this, SIGNAL(signalWaitingTooLong(int)),
                         jacktrip, SLOT(slotUdpWaitingTooLongClientGoneProbably(int)), Qt::QueuedConnection);
87
    }
88
89
90
}


91
92
93
//*******************************************************************************
UdpDataProtocol::~UdpDataProtocol()
{
94
95
96
97
    delete[] mAudioPacket;
    delete[] mFullPacket;
    wait();
}
98
99


100
//*******************************************************************************
101
void UdpDataProtocol::setPeerAddress(const char* peerHostOrIP)
102
{
103
    // Get DNS Address
Aaron Wyatt's avatar
Aaron Wyatt committed
104
105
106
#if defined (__LINUX__) || (__MAC__OSX__)
    //Don't make the following code conditional on windows
    //(Addresses a weird timing bug when in hub client mode)
Aaron Wyatt's avatar
Aaron Wyatt committed
107
    if (!mPeerAddress.setAddress(peerHostOrIP)) {
Aaron Wyatt's avatar
Aaron Wyatt committed
108
#endif
Aaron Wyatt's avatar
Aaron Wyatt committed
109
110
111
112
        QHostInfo info = QHostInfo::fromName(peerHostOrIP);
        if (!info.addresses().isEmpty()) {
            // use the first IP address
            mPeerAddress = info.addresses().first();
Aaron Wyatt's avatar
Aaron Wyatt committed
113
        }
114
115
        //cout << "UdpDataProtocol::setPeerAddress IP Address Number: "
        //    << mPeerAddress.toString().toStdString() << endl;
Aaron Wyatt's avatar
Aaron Wyatt committed
116
#if defined (__LINUX__) || (__MAC__OSX__)
117
    }
Aaron Wyatt's avatar
Aaron Wyatt committed
118
#endif
jcacerec's avatar
jcacerec committed
119

120
    // check if the ip address is valid
Aaron Wyatt's avatar
Aaron Wyatt committed
121
122
    if ( mPeerAddress.protocol() == QAbstractSocket::IPv6Protocol ) {
        mIPv6 = true;
Aaron Wyatt's avatar
Aaron Wyatt committed
123
    } else  if ( mPeerAddress.protocol() != QAbstractSocket::IPv4Protocol ) {
124
125
126
127
128
129
130
131
132
        QString error_message = "Incorrect presentation format address\n '";
        error_message.append(peerHostOrIP);
        error_message.append("' is not a valid IP address or Host Name");
        //std::cerr << "ERROR: Incorrect presentation format address" << endl;
        //std::cerr << "'" << peerHostOrIP <<"' does not seem to be a valid IP address" << endl;
        //throw std::invalid_argument("Incorrect presentation format address");
        throw std::invalid_argument( error_message.toStdString());
    }
    /*
Aaron Wyatt's avatar
Aaron Wyatt committed
133
134
135
136
137
138
139
140
    else {
        std::cout << "Peer Address set to: "
            << mPeerAddress.toString().toStdString() << std::endl;
        cout << gPrintSeparator << endl;
        usleep(100);
    }
    */

Aaron Wyatt's avatar
Aaron Wyatt committed
141
    // Save our address as an appropriate address structure
Aaron Wyatt's avatar
Aaron Wyatt committed
142
    if (mIPv6) {
Aaron Wyatt's avatar
Aaron Wyatt committed
143
        mPeerAddr6.sin6_family = AF_INET6;
Aaron Wyatt's avatar
Aaron Wyatt committed
144
145
146
        ::inet_pton(AF_INET6, mPeerAddress.toString().toLatin1().constData(),
                    &mPeerAddr6.sin6_addr);
    } else {
Aaron Wyatt's avatar
Aaron Wyatt committed
147
        mPeerAddr.sin_family = AF_INET;
Aaron Wyatt's avatar
Aaron Wyatt committed
148
149
150
        ::inet_pton(AF_INET, mPeerAddress.toString().toLatin1().constData(),
                    &mPeerAddr.sin_addr);
    }
151
152
}

Aaron Wyatt's avatar
Aaron Wyatt committed
153
#if defined (__WIN_32__)
154
void UdpDataProtocol::setSocket(SOCKET &socket)
Aaron Wyatt's avatar
Aaron Wyatt committed
155
#else
156
void UdpDataProtocol::setSocket(int &socket)
Aaron Wyatt's avatar
Aaron Wyatt committed
157
#endif
158
{
Aaron Wyatt's avatar
Aaron Wyatt committed
159
160
    //If we haven't been passed a valid socket, then we should bind one.
#if defined (__WIN_32__)
Aaron Wyatt's avatar
Aaron Wyatt committed
161
    if (socket == INVALID_SOCKET) {
Aaron Wyatt's avatar
Aaron Wyatt committed
162
#else
Aaron Wyatt's avatar
Aaron Wyatt committed
163
    if (socket == -1) {
Aaron Wyatt's avatar
Aaron Wyatt committed
164
#endif
Aaron Wyatt's avatar
Aaron Wyatt committed
165
166
167
168
169
170
        try {
            if (gVerboseFlag) std::cout << "    UdpDataProtocol:run" << mRunMode << " before bindSocket(UdpSocket)" << std::endl;
            socket = bindSocket(); // Bind Socket
        } catch ( const std::exception & e ) {
            emit signalError( e.what() );
            return;
Aaron Wyatt's avatar
Aaron Wyatt committed
171
        }
Aaron Wyatt's avatar
Aaron Wyatt committed
172
    }
Aaron Wyatt's avatar
Aaron Wyatt committed
173
    mSocket = socket;
Aaron Wyatt's avatar
Aaron Wyatt committed
174
175
176
177
178
}


//*******************************************************************************
#if defined (__WIN_32__)
179
SOCKET UdpDataProtocol::bindSocket()
Aaron Wyatt's avatar
Aaron Wyatt committed
180
#else
181
int UdpDataProtocol::bindSocket()
Aaron Wyatt's avatar
Aaron Wyatt committed
182
183
#endif
{
Aaron Wyatt's avatar
Aaron Wyatt committed
184
185
    QMutexLocker locker(&sUdpMutex);

186
#if defined __WIN_32__
187
188
189
    WORD wVersionRequested;
    WSADATA wsaData;
    int err;
190

191
    wVersionRequested = MAKEWORD( 1, 1 );
192

193
194
195
196
    err = WSAStartup( wVersionRequested, &wsaData );
    if ( err != 0 ) {
        // Tell the user that we couldn't find a useable
        // winsock.dll.
197

Aaron Wyatt's avatar
Aaron Wyatt committed
198
        return INVALID_SOCKET;
199
    }
200

201
    // Confirm that the Windows Sockets DLL supports 1.1. or higher
202

203
204
205
206
207
    if ( LOBYTE( wsaData.wVersion ) != 1 ||
         HIBYTE( wsaData.wVersion ) != 1 ) {
        // Tell the user that we couldn't find a useable
        // winsock.dll.
        WSACleanup( );
Aaron Wyatt's avatar
Aaron Wyatt committed
208
        return INVALID_SOCKET;
209
    }
210

211
    SOCKET sock_fd;
212
#endif
213

214
#if defined ( __LINUX__ ) || (__MAC_OSX__)
215
    int sock_fd;
Aaron Wyatt's avatar
Aaron Wyatt committed
216
217
#endif

Aaron Wyatt's avatar
Aaron Wyatt committed
218
    //Set local IPv4 or IPv6 Address
219
    struct sockaddr_in local_addr;
Aaron Wyatt's avatar
Aaron Wyatt committed
220
    struct sockaddr_in6 local_addr6;
221

Aaron Wyatt's avatar
Aaron Wyatt committed
222
    // Create socket descriptor
Aaron Wyatt's avatar
Aaron Wyatt committed
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
    if (mIPv6) {
        sock_fd = socket(AF_INET6, SOCK_DGRAM, 0);
        std::memset(&local_addr6, 0, sizeof(local_addr6));
        local_addr6.sin6_family = AF_INET6;
        local_addr6.sin6_addr = in6addr_any;
        local_addr6.sin6_port = htons(mBindPort);
    } else {
        sock_fd = socket(AF_INET, SOCK_DGRAM, 0);

        //::bzero(&local_addr, sizeof(local_addr));
        std::memset(&local_addr, 0, sizeof(local_addr)); // set buffer to 0
        local_addr.sin_family = AF_INET; //AF_INET: IPv4 Protocol
        local_addr.sin_addr.s_addr = htonl(INADDR_ANY); //INADDR_ANY: let the kernel decide the active address
        local_addr.sin_port = htons(mBindPort); //set local port
    }
238

239
240
    // Set socket to be reusable, this is platform dependent
    int one = 1;
241
#if defined ( __LINUX__ )
242
    ::setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
243
244
#endif
#if defined ( __MAC_OSX__ )
245
246
247
    // This option is not avialable on Linux, and without it MAC OS X
    // has problems rebinding a socket
    ::setsockopt(sock_fd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one));
248
#endif
249
#if defined (__WIN_32__)
250
251
    //make address/port reusable
    setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, (char*)&one, sizeof(one));
252
#endif
253

254
    // Bind the Socket
Aaron Wyatt's avatar
Aaron Wyatt committed
255
256
257
258
259
260
261
    if (mIPv6) {
        if ( (::bind(sock_fd, (struct sockaddr *) &local_addr6, sizeof(local_addr6))) < 0 )
        { throw std::runtime_error("ERROR: UDP Socket Bind Error"); }
    } else {
        if ( (::bind(sock_fd, (struct sockaddr *) &local_addr, sizeof(local_addr))) < 0 )
        { throw std::runtime_error("ERROR: UDP Socket Bind Error"); }
    }
262

263
264
    // To be able to use the two UDP sockets bound to the same port number,
    // we connect the receiver and issue a SHUT_WR.
Aaron Wyatt's avatar
Aaron Wyatt committed
265

Aaron Wyatt's avatar
Aaron Wyatt committed
266
    // This didn't work for IPv6, so we'll instead share a full duplex socket.
Aaron Wyatt's avatar
Aaron Wyatt committed
267
    /*if (mRunMode == SENDER) {
268
269
270
        // We use the sender as an unconnected UDP socket
        UdpSocket.setSocketDescriptor(sock_fd, QUdpSocket::BoundState,
                                      QUdpSocket::WriteOnly);
Aaron Wyatt's avatar
Aaron Wyatt committed
271
272
    }*/
    if (!mIPv6) {
Aaron Wyatt's avatar
Aaron Wyatt committed
273
        // Connect only if we're using IPv4.
Aaron Wyatt's avatar
Aaron Wyatt committed
274
275
        // (Connecting presents an issue when a host has multiple IP addresses and the peer decides to send from
        // a different address. While this generally won't be a problem for IPv4, it will for IPv6.)
Aaron Wyatt's avatar
Aaron Wyatt committed
276
        if ( (::connect(sock_fd, (struct sockaddr *) &mPeerAddr, sizeof(mPeerAddr))) < 0)
Aaron Wyatt's avatar
Aaron Wyatt committed
277
        { throw std::runtime_error("ERROR: Could not connect UDP socket"); }
Aaron Wyatt's avatar
Aaron Wyatt committed
278
#if defined (__LINUX__) || (__MAC_OSX__)
Aaron Wyatt's avatar
Aaron Wyatt committed
279
        //if ( (::shutdown(sock_fd,SHUT_WR)) < 0)
Aaron Wyatt's avatar
Aaron Wyatt committed
280
        //{ throw std::runtime_error("ERROR: Could shutdown SHUT_WR UDP socket"); }
281
282
#endif
#if defined __WIN_32__
Aaron Wyatt's avatar
Aaron Wyatt committed
283
284
285
286
287
288
289
        /*int shut_sr = shutdown(sock_fd, SD_SEND);  //shut down sender's receive function
        if ( shut_sr< 0)
        {
            fprintf(stderr, "ERROR: Could not shutdown SD_SEND UDP socket");
            throw std::runtime_error("ERROR: Could not shutdown SD_SEND UDP socket");
        }*/
#endif
290
    }
jcacerec's avatar
jcacerec committed
291

Aaron Wyatt's avatar
Aaron Wyatt committed
292
293
    return sock_fd;

294
295
    // OLD CODE WITHOUT POSIX FIX--------------------------------------------------
    /*
296
  /// \todo if port is already used, try binding in a different port
jcacerec's avatar
jcacerec committed
297
298
299
  QUdpSocket::BindMode bind_mode;
  if (mRunMode == RECEIVER) {
    bind_mode = QUdpSocket::DontShareAddress; }
300
  else if (mRunMode == SENDER) { //Share sender socket
jcacerec's avatar
jcacerec committed
301
302
    bind_mode = QUdpSocket::ShareAddress; }

303
  // QHostAddress::Any : let the kernel decide the active address
304
  if ( !UdpSocket.bind(QHostAddress::Any, mBindPort, bind_mode) ) {
305
    throw std::runtime_error("Could not bind UDP socket. It may be already binded.");
306
307
  }
  else {
jcaceres's avatar
jcaceres committed
308
    if ( mRunMode == RECEIVER ) {
309
      cout << "UDP Socket Receiving in Port: " << mBindPort << endl;
jcaceres's avatar
jcaceres committed
310
311
      cout << gPrintSeparator << endl;
    }
312
  }
313
  */
314
    // ----------------------------------------------------------------------------
315
316
317
318
}


//*******************************************************************************
Aaron Wyatt's avatar
Aaron Wyatt committed
319
int UdpDataProtocol::receivePacket(QUdpSocket& UdpSocket, char* buf, const size_t n)
320
{
321
    // Block until There's something to read
Aaron Wyatt's avatar
Aaron Wyatt committed
322
323
    while ( (UdpSocket.pendingDatagramSize() < n) && !mStopped ) { QThread::usleep(100); }
    int n_bytes = UdpSocket.readDatagram(buf, n);
324
    return n_bytes;
325
326
327
328
}


//*******************************************************************************
Aaron Wyatt's avatar
Aaron Wyatt committed
329
int UdpDataProtocol::sendPacket(const char* buf, const size_t n)
330
{
Aaron Wyatt's avatar
Aaron Wyatt committed
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
/*#if defined (__WIN_32__)
    //Alternative windows specific code that uses winsock equivalents of the bsd socket functions.
    DWORD n_bytes;
    WSABUF buffer;
    int error;
    buffer.len = n;
    buffer.buf = (char *)buf;

    if (mIPv6) {
        error = WSASendTo(mSocket, &buffer, 1, &n_bytes, 0, (struct sockaddr *) &mPeerAddr6, sizeof(mPeerAddr6), 0, 0);
    } else {
        error = WSASend(mSocket, &buffer, 1, &n_bytes, 0, 0, 0);
    }
    if (error == SOCKET_ERROR) {
        cout << "Socket Error: " << WSAGetLastError() << endl;
    }
    return (int)n_bytes;
#else*/
Aaron Wyatt's avatar
Aaron Wyatt committed
349
    int n_bytes;
Aaron Wyatt's avatar
Aaron Wyatt committed
350
351
    if (mIPv6) {
        n_bytes = ::sendto(mSocket, buf, n, 0, (struct sockaddr *) &mPeerAddr6, sizeof(mPeerAddr6));
Aaron Wyatt's avatar
Aaron Wyatt committed
352
    } else {
Aaron Wyatt's avatar
Aaron Wyatt committed
353
        n_bytes = ::send(mSocket, buf, n, 0);
Aaron Wyatt's avatar
Aaron Wyatt committed
354
    }
355
    return n_bytes;
Aaron Wyatt's avatar
Aaron Wyatt committed
356
//#endif
357
358
359
}


360
//*******************************************************************************
Aaron Wyatt's avatar
Aaron Wyatt committed
361
void UdpDataProtocol::getPeerAddressFromFirstPacket(QUdpSocket& UdpSocket,
jcacerec's avatar
jcacerec committed
362
363
                                                    QHostAddress& peerHostAddress,
                                                    uint16_t& port)
364
{
Aaron Wyatt's avatar
Aaron Wyatt committed
365
    while ( !UdpSocket.hasPendingDatagrams() ) {
366
367
368
        msleep(100);
    }
    char buf[1];
Aaron Wyatt's avatar
Aaron Wyatt committed
369
    UdpSocket.readDatagram(buf, 1, &peerHostAddress, &port);
370
371
}

jcacerec's avatar
jcacerec committed
372

373
374
375
//*******************************************************************************
void UdpDataProtocol::run()
{
Chris Chafe's avatar
src/  
Chris Chafe committed
376
377
378
379
380
381
382
383
384
385
386
    if (gVerboseFlag) switch ( mRunMode )
    {
    case RECEIVER : {
        std::cout << "step 3" << std::endl;
        break; }

    case SENDER : {
        std::cout << "step 4" << std::endl;
        break; }
    }

387
388
389
390
    //QObject::connect(this, SIGNAL(signalError(const char*)),
    //                 mJackTrip, SLOT(slotStopProcesses()),
    //                 Qt::QueuedConnection);

Aaron Wyatt's avatar
Aaron Wyatt committed
391
392
393
394
395
396
397
398
399
400
401
402
403
404
    //Wrap our socket in a QUdpSocket object if we're the receiver, for convenience.
    //If we're the sender, we'll just write directly to our socket.
    QUdpSocket UdpSocket;
    if (mRunMode == RECEIVER) {
        if (mIPv6) {
            UdpSocket.setSocketDescriptor(mSocket, QUdpSocket::BoundState,
                                          QUdpSocket::ReadOnly);
        } else {
            UdpSocket.setSocketDescriptor(mSocket, QUdpSocket::ConnectedState,
                                          QUdpSocket::ReadOnly);
        }
        cout << "UDP Socket Receiving in Port: " << mBindPort << endl;
        cout << gPrintSeparator << endl;
    }
405

406
    if (gVerboseFlag) std::cout << "    UdpDataProtocol:run" << mRunMode << " before Setup Audio Packet buffer, Full Packet buffer, Redundancy Variables" << std::endl;
407
408
409
410
411
412
413
414
415
416
417
418
    // Setup Audio Packet buffer
    size_t audio_packet_size = getAudioPacketSizeInBites();
    //cout << "audio_packet_size: " << audio_packet_size << endl;
    mAudioPacket = new int8_t[audio_packet_size];
    std::memset(mAudioPacket, 0, audio_packet_size); // set buffer to 0

    // Setup Full Packet buffer
    int full_packet_size = mJackTrip->getPacketSizeInBytes();
    //cout << "full_packet_size: " << full_packet_size << endl;
    mFullPacket = new int8_t[full_packet_size];
    std::memset(mFullPacket, 0, full_packet_size); // set buffer to 0

419
    //  bool timeout = false; // Time out flag for packets that arrive too late
420
421
422
423
424
425
426
427
428
429
430
431
432

    // Put header in first packet
    mJackTrip->putHeaderInPacket(mFullPacket, mAudioPacket);

    // Redundancy Variables
    // (Algorithm explained at the end of this file)
    // ---------------------------------------------
    int full_redundant_packet_size = full_packet_size * mUdpRedundancyFactor;
    int8_t* full_redundant_packet;
    full_redundant_packet = new int8_t[full_redundant_packet_size];
    std::memset(full_redundant_packet, 0, full_redundant_packet_size); // Initialize to 0

    // Set realtime priority (function in jacktrip_globals.h)
433
    if (gVerboseFlag) std::cout << "    UdpDataProtocol:run" << mRunMode << " before setRealtimeProcessPriority()" << std::endl;
Aaron Wyatt's avatar
Aaron Wyatt committed
434
    //std::cout << "Experimental version -- not using setRealtimeProcessPriority()" << std::endl;
435
    //setRealtimeProcessPriority();
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495

    /////////////////////
    // to see thread priorities
    // sudo ps -eLo pri,rtprio,cls,pid,nice,cmd | grep -E 'jackd|jacktrip|rtc|RTPRI' | sort -r

    // from David Runge

    //  It seems that it tries to apply the highest available SCHED_FIFO to
    //  jacktrip or half of it (?) [1] (although that's not what you would want,
    //  as this would mean assigning a higher priority to jacktrip than e.g. to
    //  the audio interface and e.g. IRQs that need to be taken care of).

    //  The version on github [2] (current 1.1) is actually worse off, as it
    //  just hardcodes RTPRIO 99 (which means jacktrip will compete with the
    //  Linux kernel watchdog, if the user trying to launch jacktrip is even
    //  allowed to use that high of a priority!).
    //  On most systems this will not work at all (aside from it being outright
    //  dangerous). On Arch (and also Ubuntu) the sane default is to allow
    //  rtprio 95 to a privileged user group (e.g. 'realtime' or 'audio', etc.)

    //  It would be very awesome, if setting the priority would be dealt with by
    //  a command line flag to jacktrip (e.g. `jacktrip --priority=50`) and
    //  otherwise defaulting to a much much lower number (e.g. 10), so the
    //  application can be run out-of-the-box (even without being in a
    //  privileged group).

    // from Nando

    //  You should actually be using the priority that jack gives you when you
    //  create the realtime thread, that puts your process "behind" - so to
    //  speak - the processing that jack does on behalf of all its clients, and
    //  behind (in a properly configured system) the audio interface processing
    //  interrupt. No need to select a priority yourself.

    //  In a Fedora system I run jack with a priority of 65 (the Fedora packages
    //  changed the default to a much lower one which is a big no-no). The
    //  clients inherit 60, I think. Some clients that have their own internal
    //  structure of processes (jconvolver) run multiple threads and use
    //  priorities below 60 for them (ie: they start with what jack gave them).

    //  If you need to run a thread (not the audio thread) with higher priority
    //  you could retrieve the priority that jack gave you and add some magic
    //  number to get it to be above jack itself (10 would be fine in my
    //  experience).

    //without setting it
    //        PRI RTPRIO CLS   PID  NI CMD
    //         60     20  FF  4348   - /usr/bin/jackd -dalsa -dhw:CODEC -r48000 -p128 -n2 -Xseq
    //         55     15  FF  9835   - ./jacktrip -s
    //         19      -  TS  9835   0 ./jacktrip -s
    //         19      -  TS  9835   0 ./jacktrip -s
    //         19      -  TS  9835   0 ./jacktrip -s
    //         19      -  TS  9835   0 ./jacktrip -s
    //         19      -  TS  9835   0 ./jacktrip -s
    //         19      -  TS  4348   0 /usr/bin/jackd -dalsa -dhw:CODEC -r48000 -p128 -n2 -Xseq
    //         19      -  TS  4348   0 /usr/bin/jackd -dalsa -dhw:CODEC -r48000 -p128 -n2 -Xseq
    //         19      -  TS  4348   0 /usr/bin/jackd -dalsa -dhw:CODEC -r48000 -p128 -n2 -Xseq
    //         19      -  TS  4348   0 /usr/bin/jackd -dalsa -dhw:CODEC -r48000 -p128 -n2 -Xseq

    // jack puts its clients in FF at 5 points below itself
496
497
498
499
500

    switch ( mRunMode )
    {
    case RECEIVER : {
        // Connect signals and slots for packets arriving too late notifications
501
        QObject::connect(this, SIGNAL(signalWaitingTooLong(int)),
502
                         this, SLOT(printUdpWaitedTooLong(int)),
503
504
505
506
                         Qt::QueuedConnection);
        //-----------------------------------------------------------------------------------
        // Wait for the first packet to be ready and obtain address
        // from that packet
507
        if (gVerboseFlag) std::cout << "    UdpDataProtocol:run" << mRunMode << " before !UdpSocket.hasPendingDatagrams()" << std::endl;
508
509
        std::cout << "Waiting for Peer..." << std::endl;
        // This blocks waiting for the first packet
Aaron Wyatt's avatar
Aaron Wyatt committed
510
        while ( !UdpSocket.hasPendingDatagrams() ) {
511
512
            if (mStopped) { return; }
            QThread::msleep(100);
513
            if (gVerboseFlag) std::cout << "100ms  " << std::flush;
514
        }
Aaron Wyatt's avatar
Aaron Wyatt committed
515
        int first_packet_size = UdpSocket.pendingDatagramSize();
516
517
518
519
520
        // The following line is the same as
        int8_t* first_packet = new int8_t[first_packet_size];
        /// \todo fix this to avoid memory leaks
        // but avoids memory leaks
        //std::tr1::shared_ptr<int8_t> first_packet(new int8_t[first_packet_size]);
Aaron Wyatt's avatar
Aaron Wyatt committed
521
        receivePacket( UdpSocket, reinterpret_cast<char*>(first_packet), first_packet_size);
522
        // Check that peer has the same audio settings
523
        if (gVerboseFlag) std::cout << std::endl << "    UdpDataProtocol:run" << mRunMode << " before mJackTrip->checkPeerSettings()" << std::endl;
524
        mJackTrip->checkPeerSettings(first_packet);
525
526
        if (gVerboseFlag) std::cout << "step 7" << std::endl;
        if (gVerboseFlag) std::cout << "    UdpDataProtocol:run" << mRunMode << " before mJackTrip->parseAudioPacket()" << std::endl;
527
        mJackTrip->parseAudioPacket(mFullPacket, mAudioPacket);
528
        std::cout << "Received Connection from Peer!" << std::endl;
529
530
531
532
533
534
535
536
537
538
        emit signalReceivedConnectionFromPeer();

        // Redundancy Variables
        // --------------------
        // NOTE: These types need to be the same unsigned integer as the sequence
        // number in the header. That way, they wrap around in the "same place"
        uint16_t current_seq_num = 0; // Store current sequence number
        uint16_t last_seq_num = 0;    // Store last package sequence number
        uint16_t newer_seq_num = 0;   // Store newer sequence number

539
        if (gVerboseFlag) std::cout << "step 8" << std::endl;
540
541
542
543
544
545
546
        while ( !mStopped )
        {
            // Timer to report packets arriving too late
            // This QT method gave me a lot of trouble, so I replaced it with my own 'waitForReady'
            // that uses signals and slots and can also report with packets have not
            // arrive for a longer time
            //timeout = UdpSocket.waitForReadyRead(30);
547
            //        timeout = cc unused!
Aaron Wyatt's avatar
Aaron Wyatt committed
548
            waitForReady(UdpSocket, 60000); //60 seconds
549
550
551

            // OLD CODE WITHOUT REDUNDANCY----------------------------------------------------
            /*
jcacerec's avatar
jcacerec committed
552
553
        // This is blocking until we get a packet...
        receivePacket( UdpSocket, reinterpret_cast<char*>(mFullPacket), full_packet_size);
jcacerec's avatar
jcacerec committed
554

jcacerec's avatar
jcacerec committed
555
        mJackTrip->parseAudioPacket(mFullPacket, mAudioPacket);
jcacerec's avatar
jcacerec committed
556

jcacerec's avatar
jcacerec committed
557
558
559
560
561
        // ...so we want to send the packet to the buffer as soon as we get in from
        // the socket, i.e., non-blocking
        //mRingBuffer->insertSlotNonBlocking(mAudioPacket);
        mJackTrip->writeAudioBuffer(mAudioPacket);
        */
562
            //----------------------------------------------------------------------------------
Aaron Wyatt's avatar
Aaron Wyatt committed
563
            receivePacketRedundancy(UdpSocket,
564
565
566
567
568
569
570
571
572
573
                                    full_redundant_packet,
                                    full_redundant_packet_size,
                                    full_packet_size,
                                    current_seq_num,
                                    last_seq_num,
                                    newer_seq_num);
        }
        break; }

    case SENDER : {
Aaron Wyatt's avatar
Aaron Wyatt committed
574
575
        //Make sure we don't start sending packets too soon.
        QThread::msleep(100);
576
577
578
579
580
        //-----------------------------------------------------------------------------------
        while ( !mStopped )
        {
            // OLD CODE WITHOUT REDUNDANCY -----------------------------------------------------
            /*
jcacerec's avatar
jcacerec committed
581
582
583
584
585
586
587
        // We block until there's stuff available to read
        mJackTrip->readAudioBuffer( mAudioPacket );
        mJackTrip->putHeaderInPacket(mFullPacket, mAudioPacket);
        // This will send the packet immediately
        //int bytes_sent = sendPacket( reinterpret_cast<char*>(mFullPacket), full_packet_size);
        sendPacket( UdpSocket, PeerAddress, reinterpret_cast<char*>(mFullPacket), full_packet_size);
        */
588
            //----------------------------------------------------------------------------------
Aaron Wyatt's avatar
Aaron Wyatt committed
589
            sendPacketRedundancy(full_redundant_packet,
590
591
592
593
594
                                 full_redundant_packet_size,
                                 full_packet_size);
        }
        break; }
    }
595
}
596
597
598


//*******************************************************************************
Chris Chafe's avatar
src/  
Chris Chafe committed
599
//bool
Aaron Wyatt's avatar
Aaron Wyatt committed
600
void UdpDataProtocol::waitForReady(QUdpSocket& UdpSocket, int timeout_msec)
601
{
602
603
604
    int loop_resolution_usec = 100; // usecs to wait on each loop
    int emit_resolution_usec = 10000; // 10 milliseconds
    int timeout_usec = timeout_msec * 1000;
605
    int elapsed_time_usec = 0; // Ellapsed time in milliseconds
606

607
    while ( ( !(
Aaron Wyatt's avatar
Aaron Wyatt committed
608
609
                  UdpSocket.hasPendingDatagrams() &&
                  (UdpSocket.pendingDatagramSize() > 0)
610
                  ) && (elapsed_time_usec <= timeout_usec) )
611
            && !mStopped ){
612
        //    if (mStopped) { return false; }
613
        QThread::usleep(loop_resolution_usec);
614
        elapsed_time_usec += loop_resolution_usec;
615

616
617
        if ( !(elapsed_time_usec % emit_resolution_usec) ) {
            emit signalWaitingTooLong(static_cast<int>(elapsed_time_usec/1000));
618
        }
619
    }
620
621
622
623
624
625
626
    // cc under what condition?
    //  if ( elapsed_time_usec >= timeout_usec )
    //  {
    //    emit signalWaitingTooLong(elapsed_time_usec/1000);
    //    return false;
    //  }
    //  return true;
627
628
629
630
}


//*******************************************************************************
631
void UdpDataProtocol::printUdpWaitedTooLong(int wait_msec)
632
{
633
634
635
636
    int wait_time = 30; // msec
    if ( !(wait_msec%wait_time) ) {
        std::cerr << "UDP waiting too long (more than " << wait_time << "ms)..." << endl;
    }
637
}
jcacerec's avatar
jcacerec committed
638
639


640
//*******************************************************************************
Aaron Wyatt's avatar
Aaron Wyatt committed
641
void UdpDataProtocol::receivePacketRedundancy(QUdpSocket& UdpSocket,
642
643
644
645
646
647
648
                                              int8_t* full_redundant_packet,
                                              int full_redundant_packet_size,
                                              int full_packet_size,
                                              uint16_t& current_seq_num,
                                              uint16_t& last_seq_num,
                                              uint16_t& newer_seq_num)
{
649
650
651
652
653
654
655
656
657
658
    // This is blocking until we get a packet...
    receivePacket( UdpSocket, reinterpret_cast<char*>(full_redundant_packet),
                   full_redundant_packet_size);

    // Get Packet Sequence Number
    newer_seq_num =
            mJackTrip->getPeerSequenceNumber(full_redundant_packet);
    current_seq_num = newer_seq_num;


659
    //cout << current_seq_num << " ";
660
661
662
663
664
665
666
667
668
669
670
671
672
673
    int redun_last_index = 0;
    for (unsigned int i = 1; i<mUdpRedundancyFactor; i++) {
        // Check if the package we receive is the next one expected, i.e.,
        // current_seq_num == (last_seq_num+1)
        if ( current_seq_num == (last_seq_num+1) ) { break; }

        // if it's not, check the next one until it is the corresponding packet
        // or there aren't more available packets
        redun_last_index = i; // index of packet to use in the redundant packet
        current_seq_num =
                mJackTrip->getPeerSequenceNumber( full_redundant_packet + (i*full_packet_size) );
        //cout << current_seq_num << " ";
    }
    //cout << endl;
674

675
    last_seq_num = newer_seq_num; // Save last read packet
676

677
678
679
680
681
682
683
684
    // Send to audio all available audio packets, in order
    for (int i = redun_last_index; i>=0; i--) {
        memcpy(mFullPacket,
               full_redundant_packet + (i*full_packet_size),
               full_packet_size);
        mJackTrip->parseAudioPacket(mFullPacket, mAudioPacket);
        mJackTrip->writeAudioBuffer(mAudioPacket);
    }
685
}
686

687
//*******************************************************************************
Aaron Wyatt's avatar
Aaron Wyatt committed
688
void UdpDataProtocol::sendPacketRedundancy(int8_t* full_redundant_packet,
689
690
691
                                           int full_redundant_packet_size,
                                           int full_packet_size)
{
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
    mJackTrip->readAudioBuffer( mAudioPacket );
    mJackTrip->putHeaderInPacket(mFullPacket, mAudioPacket);

    // Move older packets to end of array of redundant packets
    std::memmove(full_redundant_packet+full_packet_size,
                 full_redundant_packet,
                 full_packet_size*(mUdpRedundancyFactor-1));
    // Copy new packet to the begining of array
    std::memcpy(full_redundant_packet,
                mFullPacket, full_packet_size);

    // 10% (or other number) packet lost simulation.
    // Uncomment the if to activate
    //---------------------------------------------------------------------------------
    //int random_integer = rand();
    //if ( random_integer > (RAND_MAX/10) )
    //{
Aaron Wyatt's avatar
Aaron Wyatt committed
709
    sendPacket( reinterpret_cast<char*>(full_redundant_packet),
710
711
712
713
714
                full_redundant_packet_size);
    //}
    //---------------------------------------------------------------------------------

    mJackTrip->increaseSequenceNumber();
715
}
jcacerec's avatar
jcacerec committed
716

717

jcacerec's avatar
jcacerec committed
718
719
/*
  The Redundancy Algorythmn works as follows. We send a packet that contains
720
  a mUdpRedundancyFactor number of packets (header+audio). This big packet looks
jcacerec's avatar
jcacerec committed
721
  as follows
722

jcacerec's avatar
jcacerec committed
723
  ----------  ------------       -----------------------------------
724
  | UDP[n] |  | UDP[n-1] |  ...  | UDP[n-(mUdpRedundancyFactor-1)] |
jcacerec's avatar
jcacerec committed
725
726
727
  ----------  ------------       -----------------------------------

  Then, for the new audio buffer, we shift everything to the right and send:
728

jcacerec's avatar
jcacerec committed
729
  ----------  ------------       -------------------------------------
730
  | UDP[n+1] |  | UDP[n] |  ...  | UDP[n-(mUdpRedundancyFactor-1)+1] |
jcacerec's avatar
jcacerec committed
731
732
733
734
735
736
737
738
  ----------  ------------       -------------------------------------

  etc...

  For a redundancy factor of 4, this will look as follows:
  ----------  ----------  ----------  ----------
  | UDP[4] |  | UDP[3] |  | UDP[2] |  | UDP[1] |
  ----------  ----------  ----------  ----------
739

jcacerec's avatar
jcacerec committed
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
  ----------  ----------  ----------  ----------
  | UDP[5] |  | UDP[4] |  | UDP[3] |  | UDP[2] |
  ----------  ----------  ----------  ----------

  ----------  ----------  ----------  ----------
  | UDP[6] |  | UDP[5] |  | UDP[4] |  | UDP[3] |
  ----------  ----------  ----------  ----------

  etc...

  Then, the receiving end checks if the firs packet in the list is the one it should use,
  otherwise it continure reding the mUdpRedundancyFactor packets until it finds the one that
  should come next (this can better perfected by just jumping until the correct packet).
  If it has more than one packet that it hasn't yet received, it sends it to the soundcard
  one by one.
*/