JackNetInterface.cpp 36.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
Copyright (C) 2001 Paul Davis
Copyright (C) 2008 Romain Moret at Grame

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/

#include "JackNetInterface.h"
#include "JackException.h"
22
#include "JackPlatformPlug.h"
23
#include <assert.h>
24
25
26

using namespace std;

27
28
29
30
31
32
/*  
 TODO : since midi buffers now uses up to BUFFER_SIZE_MAX frames, 
 probably also use BUFFER_SIZE_MAX in everything related to MIDI events 
 handling (see MidiBufferInit in JackMidiPort.cpp)
*/

33
34
35
36
namespace Jack
{
    // JackNetInterface*******************************************

37
38
39
40
41
42
43
44
45
46
    JackNetInterface::JackNetInterface() : fSocket()
    {
        fTxBuffer = NULL;
        fRxBuffer = NULL;
        fNetAudioCaptureBuffer = NULL;
        fNetAudioPlaybackBuffer = NULL;
        fNetMidiCaptureBuffer = NULL;
        fNetMidiPlaybackBuffer = NULL;
    }

moret's avatar
moret committed
47
    JackNetInterface::JackNetInterface ( const char* multicast_ip, int port ) : fSocket ( multicast_ip, port )
48
    {
49
        strcpy(fMulticastIP, multicast_ip);
50
51
52
53
54
55
        fTxBuffer = NULL;
        fRxBuffer = NULL;
        fNetAudioCaptureBuffer = NULL;
        fNetAudioPlaybackBuffer = NULL;
        fNetMidiCaptureBuffer = NULL;
        fNetMidiPlaybackBuffer = NULL;
56
57
    }

moret's avatar
moret committed
58
    JackNetInterface::JackNetInterface ( session_params_t& params, JackNetSocket& socket, const char* multicast_ip ) : fSocket ( socket )
59
    {
moret's avatar
moret committed
60
        fParams = params;
61
        strcpy(fMulticastIP, multicast_ip);
62
63
64
65
66
67
        fTxBuffer = NULL;
        fRxBuffer = NULL;
        fNetAudioCaptureBuffer = NULL;
        fNetAudioPlaybackBuffer = NULL;
        fNetMidiCaptureBuffer = NULL;
        fNetMidiPlaybackBuffer = NULL;
68
69
70
71
    }

    JackNetInterface::~JackNetInterface()
    {
moret's avatar
moret committed
72
        jack_log ( "JackNetInterface::~JackNetInterface" );
moret's avatar
moret committed
73

74
75
76
77
78
79
80
81
82
        fSocket.Close();
        delete[] fTxBuffer;
        delete[] fRxBuffer;
        delete fNetAudioCaptureBuffer;
        delete fNetAudioPlaybackBuffer;
        delete fNetMidiCaptureBuffer;
        delete fNetMidiPlaybackBuffer;
    }

83
    void JackNetInterface::SetFramesPerPacket()
moret's avatar
moret committed
84
    {
moret's avatar
moret committed
85
        jack_log ( "JackNetInterface::SetFramesPerPacket" );
moret's avatar
moret committed
86

87
88
89
90
91
92
93
        if (fParams.fSendAudioChannels == 0 && fParams.fReturnAudioChannels == 0) {
            fParams.fFramesPerPacket = fParams.fPeriodSize;
        } else {
            jack_nframes_t period = ( int ) powf ( 2.f, ( int ) ( log (float ( fParams.fMtu - sizeof ( packet_header_t ) )
                                               / ( max ( fParams.fReturnAudioChannels, fParams.fSendAudioChannels ) * sizeof ( sample_t ) ) ) / log ( 2. ) ) );
            fParams.fFramesPerPacket = ( period > fParams.fPeriodSize ) ? fParams.fPeriodSize : period;
        }
moret's avatar
moret committed
94
95
96
97
    }

    int JackNetInterface::SetNetBufferSize()
    {
moret's avatar
moret committed
98
        jack_log ( "JackNetInterface::SetNetBufferSize" );
moret's avatar
moret committed
99

moret's avatar
moret committed
100
        float audio_size, midi_size;
101
        int bufsize;
moret's avatar
moret committed
102
103
104
105
        //audio
        audio_size = fParams.fMtu * ( fParams.fPeriodSize / fParams.fFramesPerPacket );
        //midi
        midi_size = fParams.fMtu * ( max ( fParams.fSendMidiChannels, fParams.fReturnMidiChannels ) *
moret's avatar
moret committed
106
                                     fParams.fPeriodSize * sizeof ( sample_t ) / ( fParams.fMtu - sizeof ( packet_header_t ) ) );
moret's avatar
moret committed
107
        //bufsize = sync + audio + midi
108
        bufsize = MAX_LATENCY * (fParams.fMtu + ( int ) audio_size + ( int ) midi_size);
moret's avatar
moret committed
109
110
111

        //tx buffer
        if ( fSocket.SetOption ( SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof ( bufsize ) ) == SOCKET_ERROR )
112
            return SOCKET_ERROR;
moret's avatar
moret committed
113
114
115

        //rx buffer
        if ( fSocket.SetOption ( SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof ( bufsize ) ) == SOCKET_ERROR )
116
            return SOCKET_ERROR;
moret's avatar
moret committed
117

118
        return 0;
moret's avatar
moret committed
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
    }

    int JackNetInterface::GetNMidiPckt()
    {
        //even if there is no midi data, jack need an empty buffer to know there is no event to read
        //99% of the cases : all data in one packet
        if ( fTxHeader.fMidiDataSize <= ( fParams.fMtu - sizeof ( packet_header_t ) ) )
            return 1;
        //else, get the number of needed packets (simply slice the biiig buffer)
        int npckt = fTxHeader.fMidiDataSize / ( fParams.fMtu - sizeof ( packet_header_t ) );
        if ( fTxHeader.fMidiDataSize % ( fParams.fMtu - sizeof ( packet_header_t ) ) )
            return ++npckt;
        return npckt;
    }

    bool JackNetInterface::IsNextPacket()
    {
        packet_header_t* rx_head = reinterpret_cast<packet_header_t*> ( fRxBuffer );
        //ignore first cycle
138
        if ( fRxHeader.fCycle <= 1 ) {
moret's avatar
moret committed
139
            return true;
140
        }
moret's avatar
moret committed
141
        //same PcktID (cycle), next SubPcktID (subcycle)
142
143
144
        if ( ( fRxHeader.fSubCycle < ( fNSubProcess - 1 ) ) && ( rx_head->fCycle == fRxHeader.fCycle ) && ( rx_head->fSubCycle == ( fRxHeader.fSubCycle + 1 ) ) ) {
             return true;
        }
moret's avatar
moret committed
145
        //next PcktID (cycle), SubPcktID reset to 0 (first subcyle)
146
        if ( ( rx_head->fCycle == ( fRxHeader.fCycle + 1 ) ) && ( fRxHeader.fSubCycle == ( fNSubProcess - 1 ) ) && ( rx_head->fSubCycle == 0 ) ) {
moret's avatar
moret committed
147
            return true;
148
        }
moret's avatar
moret committed
149
        //else, packet(s) missing, return false
moret's avatar
moret committed
150
151
152
        return false;
    }

153
154
    void JackNetInterface::SetParams()
    {
moret's avatar
moret committed
155
        //number of audio subcycles (packets)
156
157
        fNSubProcess = fParams.fPeriodSize / fParams.fFramesPerPacket;

moret's avatar
moret committed
158
159
160
        //payload size
        fPayloadSize = fParams.fMtu - sizeof ( packet_header_t );

161
162
163
164
165
166
167
        //TX header init
        strcpy ( fTxHeader.fPacketType, "header" );
        fTxHeader.fID = fParams.fID;
        fTxHeader.fCycle = 0;
        fTxHeader.fSubCycle = 0;
        fTxHeader.fMidiDataSize = 0;
        fTxHeader.fBitdepth = fParams.fBitdepth;
moret's avatar
moret committed
168
        fTxHeader.fIsLastPckt = 0;
169
170
171
172
173
174
175
176

        //RX header init
        strcpy ( fRxHeader.fPacketType, "header" );
        fRxHeader.fID = fParams.fID;
        fRxHeader.fCycle = 0;
        fRxHeader.fSubCycle = 0;
        fRxHeader.fMidiDataSize = 0;
        fRxHeader.fBitdepth = fParams.fBitdepth;
moret's avatar
moret committed
177
        fRxHeader.fIsLastPckt = 0;
178
179
180
181

        //network buffers
        fTxBuffer = new char[fParams.fMtu];
        fRxBuffer = new char[fParams.fMtu];
182
183
        assert ( fTxBuffer );
        assert ( fRxBuffer );
184

moret's avatar
moret committed
185
        //net audio/midi buffers'addresses
186
187
188
189
        fTxData = fTxBuffer + sizeof ( packet_header_t );
        fRxData = fRxBuffer + sizeof ( packet_header_t );
    }

190
    // JackNetMasterInterface ************************************************************************************
191

192
    bool JackNetMasterInterface::Init()
193
    {
194
        jack_log ( "JackNetMasterInterface::Init, ID %u.", fParams.fID );
195

196
        session_params_t host_params;
197
        uint attempt = 0;
198
199
200
201
202
        int rx_bytes = 0;

        //socket
        if ( fSocket.NewSocket() == SOCKET_ERROR )
        {
203
204
            jack_error ( "Can't create socket : %s", StrError ( NET_ERROR_CODE ) );
            return false;
205
206
        }

207
        //timeout on receive (for init)
208
        if ( fSocket.SetTimeOut ( MASTER_INIT_TIMEOUT ) < 0 )
209
            jack_error ( "Can't set timeout : %s", StrError ( NET_ERROR_CODE ) );
210
            
211
212
213
214
215
216
        //connect
        if ( fSocket.Connect() == SOCKET_ERROR )
        {
            jack_error ( "Can't connect : %s", StrError ( NET_ERROR_CODE ) );
            return false;
        }
217

moret's avatar
moret committed
218
        //set the number of complete audio frames we can put in a packet
moret's avatar
moret committed
219
220
        SetFramesPerPacket();

221
222
        //send 'SLAVE_SETUP' until 'START_MASTER' received
        jack_info ( "Sending parameters to %s ...", fParams.fSlaveNetName );
223
224
        do
        {
225
            session_params_t net_params;
226
            SetPacketType ( &fParams, SLAVE_SETUP );
227
228
229
            SessionParamsHToN(&fParams, &net_params);
            
            if ( fSocket.Send ( &net_params, sizeof ( session_params_t ), 0 ) == SOCKET_ERROR )
230
                jack_error ( "Error in send : ", StrError ( NET_ERROR_CODE ) );
231
232
233
                
            memset(&net_params, 0, sizeof ( session_params_t ));
            if ( ( ( rx_bytes = fSocket.Recv ( &net_params, sizeof ( session_params_t ), 0 ) ) == SOCKET_ERROR ) && ( fSocket.GetError() != NET_NO_DATA ) )
234
            {
235
236
                jack_error ( "Problem with network." );
                return false;
237
            }
238
239
            
            SessionParamsNToH(&net_params, &host_params);
240
        }
241
        while ( ( GetPacketType ( &host_params ) != START_MASTER ) && ( ++attempt < SLAVE_SETUP_RETRY ) );
242
        if ( attempt == SLAVE_SETUP_RETRY )
243
        {
244
245
            jack_error ( "Slave doesn't respond, exiting." );
            return false;
246
247
        }

248
        //set the new timeout for the socket
moret's avatar
moret committed
249
        if ( SetRxTimeout() == SOCKET_ERROR )
250
251
252
253
        {
            jack_error ( "Can't set rx timeout : %s", StrError ( NET_ERROR_CODE ) );
            return false;
        }
254

255
        //set the new rx buffer size
moret's avatar
moret committed
256
        if ( SetNetBufferSize() == SOCKET_ERROR )
257
        {
moret's avatar
moret committed
258
            jack_error ( "Can't set net buffer sizes : %s", StrError ( NET_ERROR_CODE ) );
259
            return false;
260
        }
261
262

        return true;
263
264
    }

moret's avatar
moret committed
265
266
    int JackNetMasterInterface::SetRxTimeout()
    {
moret's avatar
moret committed
267
268
        jack_log ( "JackNetMasterInterface::SetRxTimeout" );

moret's avatar
moret committed
269
        float time = 0;
moret's avatar
moret committed
270
271
        //slow or normal mode, short timeout on recv (2 audio subcycles)
        if ( ( fParams.fNetworkMode == 's' ) || ( fParams.fNetworkMode == 'n' ) )
moret's avatar
moret committed
272
            time = 2000000.f * ( static_cast<float> ( fParams.fFramesPerPacket ) / static_cast<float> ( fParams.fSampleRate ) );
moret's avatar
moret committed
273
        //fast mode, wait for 75% of the entire cycle duration
moret's avatar
moret committed
274
275
276
277
278
        else if ( fParams.fNetworkMode == 'f' )
            time = 750000.f * ( static_cast<float> ( fParams.fPeriodSize ) / static_cast<float> ( fParams.fSampleRate ) );
        return fSocket.SetTimeOut ( static_cast<int> ( time ) );
    }

279
    void JackNetMasterInterface::SetParams()
moret's avatar
moret committed
280
    {
281
        jack_log ( "JackNetMasterInterface::SetParams" );
moret's avatar
moret committed
282
283
284

        JackNetInterface::SetParams();

285
286
        fTxHeader.fDataStream = 's';
        fRxHeader.fDataStream = 'r';
moret's avatar
moret committed
287
288

        //midi net buffers
289
290
        fNetMidiCaptureBuffer = new NetMidiBuffer ( &fParams, fParams.fSendMidiChannels, fTxData );
        fNetMidiPlaybackBuffer = new NetMidiBuffer ( &fParams, fParams.fReturnMidiChannels, fRxData );
291
292
        assert ( fNetMidiCaptureBuffer );
        assert ( fNetMidiPlaybackBuffer );
moret's avatar
moret committed
293
294

        //audio net buffers
295
296
        fNetAudioCaptureBuffer = new NetAudioBuffer ( &fParams, fParams.fSendAudioChannels, fTxData );
        fNetAudioPlaybackBuffer = new NetAudioBuffer ( &fParams, fParams.fReturnAudioChannels, fRxData );
297
298
        assert ( fNetAudioCaptureBuffer );
        assert ( fNetAudioPlaybackBuffer );
moret's avatar
moret committed
299
300

        //audio netbuffer length
301
302
        fAudioTxLen = sizeof ( packet_header_t ) + fNetAudioCaptureBuffer->GetSize();
        fAudioRxLen = sizeof ( packet_header_t ) + fNetAudioPlaybackBuffer->GetSize();
moret's avatar
moret committed
303
304
    }

305
    void JackNetMasterInterface::Exit()
306
    {
307
308
309
310
        jack_log ( "JackNetMasterInterface::Exit, ID %u", fParams.fID );

        //stop process
        fRunning = false;
311

312
313
314
315
        //send a 'multicast euthanasia request' - new socket is required on macosx
        jack_info ( "Exiting '%s'", fParams.fName );
        SetPacketType ( &fParams, KILL_MASTER );
        JackNetSocket mcast_socket ( fMulticastIP, fSocket.GetPort() );
316
317
318
319
        
        session_params_t net_params;
        SessionParamsHToN(&fParams, &net_params);

320
321
        if ( mcast_socket.NewSocket() == SOCKET_ERROR )
            jack_error ( "Can't create socket : %s", StrError ( NET_ERROR_CODE ) );
322
        if ( mcast_socket.SendTo ( &net_params, sizeof ( session_params_t ), 0, fMulticastIP ) == SOCKET_ERROR )
323
            jack_error ( "Can't send suicide request : %s", StrError ( NET_ERROR_CODE ) );
324
            
325
        mcast_socket.Close();
326

327
328
        // UGLY temporary way to be sure the thread does not call code possibly causing a deadlock in JackEngine.
        ThreadExit();
329
330
    }

331
    int JackNetMasterInterface::Recv ( size_t size, int flags )
332
    {
333
334
        int rx_bytes;
        if ( ( ( rx_bytes = fSocket.Recv ( fRxBuffer, size, flags ) ) == SOCKET_ERROR ) && fRunning )
335
336
        {
            net_error_t error = fSocket.GetError();
337
338
339
340
            //no data isn't really a network error, so just return 0 avalaible read bytes
            if ( error == NET_NO_DATA )
                return 0;
            else if ( error == NET_CONN_ERROR )
341
342
343
            {
                //fatal connection issue, exit
                jack_error ( "'%s' : %s, exiting.", fParams.fName, StrError ( NET_ERROR_CODE ) );
344
                //ask to the manager to properly remove the master
345
346
                Exit();
            }
moret's avatar
moret committed
347
            else
348
                jack_error ( "Error in master receive : %s", StrError ( NET_ERROR_CODE ) );
349
        }
350
351
352
353
        
        packet_header_t* header = reinterpret_cast<packet_header_t*>(fRxBuffer);
        PacketHeaderNToH(header, header);
        return rx_bytes;
354
    }
355
356
    
    int JackNetMasterInterface::Send ( size_t size, int flags )
357
    {
358
359
360
361
362
        int tx_bytes;
        packet_header_t* header = reinterpret_cast<packet_header_t*>(fTxBuffer);
        PacketHeaderHToN(header, header);
        
        if ( ( ( tx_bytes = fSocket.Send ( fTxBuffer, size, flags ) ) == SOCKET_ERROR ) && fRunning )
363
364
        {
            net_error_t error = fSocket.GetError();
365
            if ( error == NET_CONN_ERROR )
366
            {
367
368
369
                //fatal connection issue, exit
                jack_error ( "'%s' : %s, exiting.", fParams.fName, StrError ( NET_ERROR_CODE ) );
                Exit();
370
            }
moret's avatar
moret committed
371
            else
372
                jack_error ( "Error in master send : %s", StrError ( NET_ERROR_CODE ) );
373
        }
374
        return tx_bytes;
375
    }
376
377
378
    
    bool JackNetMasterInterface::IsSynched()
    {
sletz's avatar
sletz committed
379
        if (fParams.fNetworkMode == 's') {
380
            return (fCycleOffset < 3);
sletz's avatar
sletz committed
381
        } else {
382
            return true;
sletz's avatar
sletz committed
383
        }
384
385
    }
    
386
    int JackNetMasterInterface::SyncSend()
387
    {
388
389
390
        fTxHeader.fCycle++;
        fTxHeader.fSubCycle = 0;
        fTxHeader.fDataType = 's';
391
        fTxHeader.fIsLastPckt = ( fParams.fSendMidiChannels == 0 && fParams.fSendAudioChannels == 0) ?  1 : 0;
392
393
394
395
396
397
398
399
400
        fTxHeader.fPacketSize = fParams.fMtu;
        memcpy ( fTxBuffer, &fTxHeader, sizeof ( packet_header_t ) );
        return Send ( fTxHeader.fPacketSize, 0 );
    }

    int JackNetMasterInterface::DataSend()
    {
        uint subproc;
        //midi
401
        if ( fParams.fSendMidiChannels > 0)
402
        {
403
404
405
            //set global header fields and get the number of midi packets
            fTxHeader.fDataType = 'm';
            fTxHeader.fMidiDataSize = fNetMidiCaptureBuffer->RenderFromJackPorts();
moret's avatar
moret committed
406
            fTxHeader.fNMidiPckt = GetNMidiPckt();
407
            for ( subproc = 0; subproc < fTxHeader.fNMidiPckt; subproc++ )
408
            {
409
                fTxHeader.fSubCycle = subproc;
410
                fTxHeader.fIsLastPckt = ( ( subproc == ( fTxHeader.fNMidiPckt - 1 ) ) && (fParams.fSendAudioChannels == 0)) ? 1 : 0;
411
                fTxHeader.fPacketSize = sizeof ( packet_header_t ) + fNetMidiCaptureBuffer->RenderToNetwork ( subproc, fTxHeader.fMidiDataSize );
412
413
414
                memcpy ( fTxBuffer, &fTxHeader, sizeof ( packet_header_t ) );
                if ( Send ( fTxHeader.fPacketSize, 0 ) == SOCKET_ERROR )
                    return SOCKET_ERROR;
415
416
            }
        }
417
418

        //audio
419
        if ( fParams.fSendAudioChannels > 0)
420
421
        {
            fTxHeader.fDataType = 'a';
422
423
            fTxHeader.fMidiDataSize = 0;
            fTxHeader.fNMidiPckt = 0;
424
425
426
            for ( subproc = 0; subproc < fNSubProcess; subproc++ )
            {
                fTxHeader.fSubCycle = subproc;
moret's avatar
moret committed
427
                fTxHeader.fIsLastPckt = ( subproc == ( fNSubProcess - 1 ) ) ? 1 : 0;
428
429
430
431
432
433
434
435
436
                fTxHeader.fPacketSize = fAudioTxLen;
                memcpy ( fTxBuffer, &fTxHeader, sizeof ( packet_header_t ) );
                fNetAudioCaptureBuffer->RenderFromJackPorts ( subproc );
                if ( Send ( fTxHeader.fPacketSize, 0 ) == SOCKET_ERROR )
                    return SOCKET_ERROR;
            }
        }

        return 0;
437
438
    }

439
    int JackNetMasterInterface::SyncRecv()
440
441
    {
        packet_header_t* rx_head = reinterpret_cast<packet_header_t*> ( fRxBuffer );
442
443
        int rx_bytes = Recv ( fParams.fMtu, MSG_PEEK );
        
444
445
446
        if ( ( rx_bytes == 0 ) || ( rx_bytes == SOCKET_ERROR ) )
            return rx_bytes;

447
        fCycleOffset = fTxHeader.fCycle - rx_head->fCycle;
448
      
449
        switch ( fParams.fNetworkMode )
450
        {
451
            case 's' :
moret's avatar
moret committed
452
453
454
455
456
                //slow mode : allow to use full bandwidth and heavy process on the slave
                //  - extra latency is set to two cycles, one cycle for send/receive operations + one cycle for heavy process on the slave
                //  - if the network is two fast, just wait the next cycle, this mode allows a shorter cycle duration for the master
                //  - this mode will skip the two first cycles, thus it lets time for data to be processed and queued on the socket rx buffer
                //the slow mode is the safest mode because it wait twice the bandwidth relative time (send/return + process)
457
                if (fCycleOffset < 2)
458
                     return 0;
459
460
                else
                    rx_bytes = Recv ( rx_head->fPacketSize, 0 );
461
462
463
464
                    
                if (fCycleOffset > 2) {
                    jack_info("Warning : '%s' runs in slow network mode, but data received too late (%d cycle(s) offset)", fParams.fName, fCycleOffset);
                }
465
                break;
466

467
468
469
470
471
            case 'n' :
                //normal use of the network :
                //  - extra latency is set to one cycle, what is the time needed to receive streams using full network bandwidth
                //  - if the network is too fast, just wait the next cycle, the benefit here is the master's cycle is shorter
                //  - indeed, data is supposed to be on the network rx buffer, so we don't have to wait for it
472
                if (fCycleOffset < 1)
473
474
475
                    return 0;
                else
                    rx_bytes = Recv ( rx_head->fPacketSize, 0 );
476
477
478
                    
                if (fCycleOffset != 1) 
                    jack_info("'%s' can't run in normal network mode, data received too late (%d cycle(s) offset)", fParams.fName, fCycleOffset);
479
                break;
480

481
482
483
484
485
486
            case 'f' :
                //fast mode suppose the network bandwith is larger than required for the transmission (only a few channels for example)
                //    - packets can be quickly received, quickly is here relative to the cycle duration
                //    - here, receive data, we can't keep it queued on the rx buffer,
                //    - but if there is a cycle offset, tell the user, that means we're not in fast mode anymore, network is too slow
                rx_bytes = Recv ( rx_head->fPacketSize, 0 );
487
488
489
                
                if (fCycleOffset != 0)
                    jack_info("'%s' can't run in fast network mode, data received too late (%d cycle(s) offset)", fParams.fName, fCycleOffset);
490
                break;
491
        }
492

moret's avatar
moret committed
493
        fRxHeader.fIsLastPckt = rx_head->fIsLastPckt;
494
495
        return rx_bytes;
    }
496
    
497
    int JackNetMasterInterface::DataRecv()
498
499
    {
        int rx_bytes = 0;
500
        uint jumpcnt = 0;
501
        uint recvd_midi_pckt = 0;
502
        uint recvd_audio_pckt = 0;
503
        packet_header_t* rx_head = reinterpret_cast<packet_header_t*> ( fRxBuffer );
504
        
moret's avatar
moret committed
505
        while ( !fRxHeader.fIsLastPckt )
506
        {
moret's avatar
moret committed
507
508
            //how much data is queued on the rx buffer ?
            rx_bytes = Recv ( fParams.fMtu, MSG_PEEK );
509
              
moret's avatar
moret committed
510
511
            if ( rx_bytes == SOCKET_ERROR )
                return rx_bytes;
moret's avatar
moret committed
512
            //if no data
moret's avatar
moret committed
513
            if ( ( rx_bytes == 0 ) && ( ++jumpcnt == fNSubProcess ) )
514
            {
moret's avatar
moret committed
515
516
517
518
519
520
521
522
                jack_error ( "No data from %s...", fParams.fName );
                jumpcnt = 0;
            }
            //else if data is valid,
            if ( rx_bytes && ( rx_head->fDataStream == 'r' ) && ( rx_head->fID == fParams.fID ) )
            {
                //read data
                switch ( rx_head->fDataType )
523
                {
moret's avatar
moret committed
524
                    case 'm':   //midi
525
                        rx_bytes = Recv ( rx_head->fPacketSize, 0 );
moret's avatar
moret committed
526
                        fRxHeader.fCycle = rx_head->fCycle;
moret's avatar
moret committed
527
                        fRxHeader.fIsLastPckt = rx_head->fIsLastPckt;
moret's avatar
moret committed
528
                        fNetMidiPlaybackBuffer->RenderFromNetwork ( rx_head->fSubCycle, rx_bytes - sizeof ( packet_header_t ) );
529
                        if ( ++recvd_midi_pckt == rx_head->fNMidiPckt )
moret's avatar
moret committed
530
531
532
                            fNetMidiPlaybackBuffer->RenderToJackPorts();
                        jumpcnt = 0;
                        break;
533

moret's avatar
moret committed
534
                    case 'a':   //audio
535
                        rx_bytes = Recv ( rx_head->fPacketSize, 0 );
536
537
538
                        // SL: 25/01/09
                        // if ( !IsNextPacket() )
                        //    jack_error ( "Packet(s) missing from '%s'...", fParams.fName );
539
540
541
                        if (recvd_audio_pckt++ != rx_head->fSubCycle) {
                            jack_error("Packet(s) missing from '%s'...", fParams.fSlaveNetName);
                        }
moret's avatar
moret committed
542
543
                        fRxHeader.fCycle = rx_head->fCycle;
                        fRxHeader.fSubCycle = rx_head->fSubCycle;
moret's avatar
moret committed
544
                        fRxHeader.fIsLastPckt = rx_head->fIsLastPckt;
moret's avatar
moret committed
545
546
547
                        fNetAudioPlaybackBuffer->RenderToJackPorts ( rx_head->fSubCycle );
                        jumpcnt = 0;
                        break;
548

moret's avatar
moret committed
549
                    case 's':   //sync
550
                        /* SL: 25/01/09
moret's avatar
moret committed
551
552
                        if ( rx_head->fCycle == fTxHeader.fCycle )
                            return 0;
553
                        */
554
                        jack_info("NetMaster : overloaded, skipping receive from '%s'", fParams.fName);
555
                        return 0;
556
557
558
                }
            }
        }
559
        return rx_bytes;
560
561
    }

562
563
// JackNetSlaveInterface ************************************************************************************************

moret's avatar
moret committed
564
565
    uint JackNetSlaveInterface::fSlaveCounter = 0;

566
    bool JackNetSlaveInterface::Init()
567
    {
568
        jack_log ( "JackNetSlaveInterface::Init()" );
569

570
571
        //set the parameters to send
        strcpy ( fParams.fPacketType, "params" );
572
        fParams.fProtocolVersion = SLAVE_PROTOCOL;
573
        SetPacketType ( &fParams, SLAVE_AVAILABLE );
574

575
576
577
        //init loop : get a master and start, do it until connection is ok
        net_status_t status;
        do
578
        {
579
580
            //first, get a master, do it until a valid connection is running
            do
581
            {
582
583
584
                status = GetNetMaster();
                if ( status == NET_SOCKET_ERROR )
                    return false;
585
            }
586
            while ( status != NET_CONNECTED );
587

588
589
            //then tell the master we are ready
            jack_info ( "Initializing connection with %s...", fParams.fMasterNetName );
moret's avatar
moret committed
590
            status = SendStartToMaster();
591
592
            if ( status == NET_ERROR )
                return false;
593
        }
594
        while ( status != NET_ROLLING );
595

596
597
        return true;
    }
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
    
    // Separate the connection protocol into two separated step
    
    bool JackNetSlaveInterface::InitConnection()
    {
        jack_log ( "JackNetSlaveInterface::InitConnection()" );

        //set the parameters to send
        strcpy (fParams.fPacketType, "params");
        fParams.fProtocolVersion = SLAVE_PROTOCOL;
        SetPacketType (&fParams, SLAVE_AVAILABLE);

        net_status_t status;
        do
        {
            //get a master
            status = GetNetMaster();
            if (status == NET_SOCKET_ERROR)
                return false;
        }
        while (status != NET_CONNECTED);
  
        return true;
    }
    
    bool JackNetSlaveInterface::InitRendering()
    {
        jack_log("JackNetSlaveInterface::InitRendering()");

        net_status_t status;
        do
        {
            //then tell the master we are ready
            jack_info("Initializing connection with %s...", fParams.fMasterNetName);
            status = SendStartToMaster();
            if (status == NET_ERROR)
                return false;
        }
        while (status != NET_ROLLING);   
        
        return true;
    }
640

641
    net_status_t JackNetSlaveInterface::GetNetMaster()
642
    {
643
644
        jack_log ( "JackNetSlaveInterface::GetNetMaster()" );
        //utility
645
        session_params_t host_params;
646
647
648
649
650
        int rx_bytes = 0;

        //socket
        if ( fSocket.NewSocket() == SOCKET_ERROR )
        {
651
652
            jack_error ( "Fatal error : network unreachable - %s", StrError ( NET_ERROR_CODE ) );
            return NET_SOCKET_ERROR;
653
654
        }

655
656
657
658
659
        //bind the socket
        if ( fSocket.Bind() == SOCKET_ERROR )
            jack_error ( "Can't bind the socket : %s", StrError ( NET_ERROR_CODE ) );

        //timeout on receive
660
        if ( fSocket.SetTimeOut ( SLAVE_INIT_TIMEOUT ) == SOCKET_ERROR )
661
662
            jack_error ( "Can't set timeout : %s", StrError ( NET_ERROR_CODE ) );

663
        //disable local loop
moret's avatar
moret committed
664
        if ( fSocket.SetLocalLoop() == SOCKET_ERROR )
665
            jack_error ( "Can't disable multicast loop : %s", StrError ( NET_ERROR_CODE ) );
666

667
668
        //send 'AVAILABLE' until 'SLAVE_SETUP' received
        jack_info ( "Waiting for a master..." );
669
670
        do
        {
671
            //send 'available'
672
673
674
            session_params_t net_params;
            SessionParamsHToN(&fParams, &net_params);
            if ( fSocket.SendTo ( &net_params, sizeof ( session_params_t ), 0, fMulticastIP ) == SOCKET_ERROR )
675
                jack_error ( "Error in data send : %s", StrError ( NET_ERROR_CODE ) );
676
                
677
            //filter incoming packets : don't exit while no error is detected
678
679
680
            memset(&net_params, 0, sizeof ( session_params_t ));
            rx_bytes = fSocket.CatchHost ( &net_params, sizeof ( session_params_t ), 0 );
            SessionParamsNToH(&net_params, &host_params);
681
            if ( ( rx_bytes == SOCKET_ERROR ) && ( fSocket.GetError() != NET_NO_DATA ) )
682
            {
683
684
                jack_error ( "Can't receive : %s", StrError ( NET_ERROR_CODE ) );
                return NET_RECV_ERROR;
685
686
            }
        }
687
        while ( strcmp ( host_params.fPacketType, fParams.fPacketType )  && ( GetPacketType ( &host_params ) != SLAVE_SETUP ) );
688

moret's avatar
moret committed
689
        //everything is OK, copy parameters
690
        fParams = host_params;
moret's avatar
moret committed
691
692
693
694
695

        //set the new buffer sizes
        if ( SetNetBufferSize() == SOCKET_ERROR )
            jack_error ( "Can't set net buffer sizes : %s", StrError ( NET_ERROR_CODE ) );

696
697
        //connect the socket
        if ( fSocket.Connect() == SOCKET_ERROR )
698
        {
699
700
            jack_error ( "Error in connect : %s", StrError ( NET_ERROR_CODE ) );
            return NET_CONNECT_ERROR;
701
702
        }

moret's avatar
moret committed
703
        return NET_CONNECTED;
704
705
    }

moret's avatar
moret committed
706
    net_status_t JackNetSlaveInterface::SendStartToMaster()
707
    {
moret's avatar
moret committed
708
        jack_log ( "JackNetSlaveInterface::SendStartToMaster" );
moret's avatar
moret committed
709

710
        //tell the master to start
711
        session_params_t net_params;
712
        SetPacketType ( &fParams, START_MASTER );
713
714
        SessionParamsHToN(&fParams, &net_params);
        if ( fSocket.Send ( &net_params, sizeof ( session_params_t ), 0 ) == SOCKET_ERROR )
715
        {
716
717
            jack_error ( "Error in send : %s", StrError ( NET_ERROR_CODE ) );
            return ( fSocket.GetError() == NET_CONN_ERROR ) ? NET_ERROR : NET_SEND_ERROR;
718
        }
719
        return NET_ROLLING;
720
721
    }

722
    void JackNetSlaveInterface::SetParams()
moret's avatar
moret committed
723
    {
724
        jack_log ( "JackNetSlaveInterface::SetParams" );
moret's avatar
moret committed
725
726
727

        JackNetInterface::SetParams();

728
729
        fTxHeader.fDataStream = 'r';
        fRxHeader.fDataStream = 's';
moret's avatar
moret committed
730
731

        //midi net buffers
732
733
        fNetMidiCaptureBuffer = new NetMidiBuffer ( &fParams, fParams.fSendMidiChannels, fRxData );
        fNetMidiPlaybackBuffer = new NetMidiBuffer ( &fParams, fParams.fReturnMidiChannels, fTxData );
moret's avatar
moret committed
734
735

        //audio net buffers
736
737
        fNetAudioCaptureBuffer = new NetAudioBuffer ( &fParams, fParams.fSendAudioChannels, fRxData );
        fNetAudioPlaybackBuffer = new NetAudioBuffer ( &fParams, fParams.fReturnAudioChannels, fTxData );
moret's avatar
moret committed
738
739
740
741
742
743

        //audio netbuffer length
        fAudioTxLen = sizeof ( packet_header_t ) + fNetAudioPlaybackBuffer->GetSize();
        fAudioRxLen = sizeof ( packet_header_t ) + fNetAudioCaptureBuffer->GetSize();
    }

744
    int JackNetSlaveInterface::Recv ( size_t size, int flags )
745
    {
746
747
748
749
750
751
752
753
754
755
756
757
        int rx_bytes = fSocket.Recv ( fRxBuffer, size, flags );
        //handle errors
        if ( rx_bytes == SOCKET_ERROR )
        {
            net_error_t error = fSocket.GetError();
            //no data isn't really an error in realtime processing, so just return 0
            if ( error == NET_NO_DATA )
                jack_error ( "No data, is the master still running ?" );
            //if a network error occurs, this exception will restart the driver
            else if ( error == NET_CONN_ERROR )
            {
                jack_error ( "Connection lost." );
moret's avatar
moret committed
758
                throw JackNetException();
759
760
            }
            else
sletz's avatar
sletz committed
761
                jack_error ( "Fatal error in slave receive : %s", StrError ( NET_ERROR_CODE ) );
762
        }
763
764
765
        
        packet_header_t* header = reinterpret_cast<packet_header_t*>(fRxBuffer);
        PacketHeaderNToH(header, header);
766
        return rx_bytes;
767
768
    }

769
    int JackNetSlaveInterface::Send ( size_t size, int flags )
770
    {
771
772
        packet_header_t* header = reinterpret_cast<packet_header_t*>(fTxBuffer);
        PacketHeaderHToN(header, header);
773
        int tx_bytes = fSocket.Send ( fTxBuffer, size, flags );
774
         
775
776
        //handle errors
        if ( tx_bytes == SOCKET_ERROR )
777
778
        {
            net_error_t error = fSocket.GetError();
779
780
            //if a network error occurs, this exception will restart the driver
            if ( error == NET_CONN_ERROR )
781
            {
782
                jack_error ( "Connection lost." );
moret's avatar
moret committed
783
                throw JackNetException();
784
            }
785
            else
sletz's avatar
sletz committed
786
                jack_error ( "Fatal error in slave send : %s", StrError ( NET_ERROR_CODE ) );
787
788
789
790
        }
        return tx_bytes;
    }

791
    int JackNetSlaveInterface::SyncRecv()
792
    {
793
794
795
796
        int rx_bytes = 0;
        packet_header_t* rx_head = reinterpret_cast<packet_header_t*> ( fRxBuffer );
        //receive sync (launch the cycle)
        do
797
        {
798
799
800
801
802
            rx_bytes = Recv ( fParams.fMtu, 0 );
            //connection issue, send will detect it, so don't skip the cycle (return 0)
            if ( rx_bytes == SOCKET_ERROR )
                return rx_bytes;
        }
803
804
        while ((strcmp(rx_head->fPacketType, "header") != 0) && (rx_head->fDataType != 's'));
        
moret's avatar
moret committed
805
        fRxHeader.fIsLastPckt = rx_head->fIsLastPckt;
806
807
808
809
810
811
        return rx_bytes;
    }

    int JackNetSlaveInterface::DataRecv()
    {
        uint recvd_midi_pckt = 0;
812
        uint recvd_audio_pckt = 0;
813
814
815
        int rx_bytes = 0;
        packet_header_t* rx_head = reinterpret_cast<packet_header_t*> ( fRxBuffer );

moret's avatar
moret committed
816
        while ( !fRxHeader.fIsLastPckt )
817
        {
moret's avatar
moret committed
818
819
            rx_bytes = Recv ( fParams.fMtu, MSG_PEEK );
            //error here, problem with recv, just skip the cycle (return -1)
moret's avatar
moret committed
820

moret's avatar
moret committed
821
822
823
            if ( rx_bytes == SOCKET_ERROR )
                return rx_bytes;
            if ( rx_bytes && ( rx_head->fDataStream == 's' ) && ( rx_head->fID == fParams.fID ) )
824
            {
moret's avatar
moret committed
825
                switch ( rx_head->fDataType )
826
                {
moret's avatar
moret committed
827
828
829
                    case 'm':   //midi
                        rx_bytes = Recv ( rx_head->fPacketSize, 0 );
                        fRxHeader.fCycle = rx_head->fCycle;
moret's avatar
moret committed
830
                        fRxHeader.fIsLastPckt = rx_head->fIsLastPckt;
moret's avatar
moret committed
831
832
833
834
                        fNetMidiCaptureBuffer->RenderFromNetwork ( rx_head->fSubCycle, rx_bytes - sizeof ( packet_header_t ) );
                        if ( ++recvd_midi_pckt == rx_head->fNMidiPckt )
                            fNetMidiCaptureBuffer->RenderToJackPorts();
                        break;
835

moret's avatar
moret committed
836
837
                    case 'a':   //audio
                        rx_bytes = Recv ( rx_head->fPacketSize, 0 );
838
839
840
                        //SL: 25/01/09
                        // if ( !IsNextPacket() )
                        //    jack_error ( "Packet(s) missing..." );
841
842
843
                        if (recvd_audio_pckt++ != rx_head->fSubCycle) {
                            jack_error("Packet(s) missing from '%s'...", fParams.fMasterNetName);
                        }
moret's avatar
moret committed
844
845
                        fRxHeader.fCycle = rx_head->fCycle;
                        fRxHeader.fSubCycle = rx_head->fSubCycle;
moret's avatar
moret committed
846
                        fRxHeader.fIsLastPckt = rx_head->fIsLastPckt;
moret's avatar
moret committed
847
848
                        fNetAudioCaptureBuffer->RenderToJackPorts ( rx_head->fSubCycle );
                        break;
849

moret's avatar
moret committed
850
851
852
                    case 's':   //sync
                        jack_info ( "NetSlave : overloaded, skipping receive." );
                        return 0;
853
                }
854
855
            }
        }
856
857
        fRxHeader.fCycle = rx_head->fCycle;
        return 0;
858
859
    }

860
    int JackNetSlaveInterface::SyncSend()
861
    {
862
863
864
865
866
        //tx header
        if ( fParams.fSlaveSyncMode )
            fTxHeader.fCycle = fRxHeader.fCycle;
        else
            fTxHeader.fCycle++;
867
868
        fTxHeader.fSubCycle = 0;
        fTxHeader.fDataType = 's';
869
        fTxHeader.fIsLastPckt = ( fParams.fReturnMidiChannels == 0 && fParams.fReturnAudioChannels == 0) ?  1 : 0;
870
871
872
873
874
        fTxHeader.fPacketSize = fParams.fMtu;
        memcpy ( fTxBuffer, &fTxHeader, sizeof ( packet_header_t ) );
        return Send ( fTxHeader.fPacketSize, 0 );
    }

875
    int JackNetSlaveInterface::DataSend()
876
877
    {
        uint subproc;
878

879
        //midi
880
        if ( fParams.fReturnMidiChannels > 0)
881
882
        {
            fTxHeader.fDataType = 'm';
883
            fTxHeader.fMidiDataSize = fNetMidiPlaybackBuffer->RenderFromJackPorts();
moret's avatar
moret committed
884
            fTxHeader.fNMidiPckt = GetNMidiPckt();
885
886
887
            for ( subproc = 0; subproc < fTxHeader.fNMidiPckt; subproc++ )
            {
                fTxHeader.fSubCycle = subproc;
moret's avatar
moret committed
888
                fTxHeader.fIsLastPckt = ( ( subproc == ( fTxHeader.fNMidiPckt - 1 ) ) && !fParams.fReturnAudioChannels ) ? 1 : 0;
889
                fTxHeader.fPacketSize = sizeof ( packet_header_t ) + fNetMidiPlaybackBuffer->RenderToNetwork ( subproc, fTxHeader.fMidiDataSize );
890
891
                memcpy ( fTxBuffer, &fTxHeader, sizeof ( packet_header_t ) );
                if ( Send ( fTxHeader.fPacketSize, 0 ) == SOCKET_ERROR )
moret's avatar
moret committed
892
                    return SOCKET_ERROR;
893
894
895
896
            }
        }

        //audio
897
        if ( fParams.fReturnAudioChannels > 0)
898
899
        {
            fTxHeader.fDataType = 'a';
900
901
            fTxHeader.fMidiDataSize = 0;
            fTxHeader.fNMidiPckt = 0;
902
903
904
            for ( subproc = 0; subproc < fNSubProcess; subproc++ )
            {
                fTxHeader.fSubCycle = subproc;
moret's avatar
moret committed
905
                fTxHeader.fIsLastPckt = ( subproc == ( fNSubProcess - 1 ) ) ? 1 : 0;
906
907
                fTxHeader.fPacketSize = fAudioTxLen;
                memcpy ( fTxBuffer, &fTxHeader, sizeof ( packet_header_t ) );