udp_input.cpp 11.6 KB
Newer Older
jcaceres's avatar
jcaceres committed
1
2
3
4
5
6
7
#include "udp_input.h"
#include "networkInfo.h"
#include "stream.h"
#include "unistd.h"
#include <stdlib.h>
#include <stdio.h>
#include <iostream.h>
8
#include <QHostInfo>//***JPC Port to qt4*****************
jcaceres's avatar
jcaceres committed
9
#include "jamlink.h"
10
11
#include <netinet/in.h>

jcaceres's avatar
jcaceres committed
12
13
14
extern QString *IPv4Addr (char *namebuf);
extern int set_fifo_priority (bool half);

15
16
//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
17
UDPInput::UDPInput (NetworkInfoT netInfo, AudioInfoT audInfo):
18
  InputPlugin ("UDP Input"), netInfo (netInfo), audInfo (audInfo)
jcaceres's avatar
jcaceres committed
19
{
20
21
  bpp = netInfo->getDataBytesPerPacket ();
  cout << "bpp = " << bpp << endl;
jcaceres's avatar
jcaceres committed
22

23
24
  has_peer = false;
  _rcvr = NULL;
jcaceres's avatar
jcaceres committed
25

26
  packetIndex = 0;
jcaceres's avatar
jcaceres committed
27
28
  //wholeSize = sizeof (nsHeader) + (netInfo->getChunksPerPacket () * bpp) + 1;//JPC JLink***********************************
  wholeSize = sizeof (nsHeader) + (netInfo->getChunksPerPacket () * bpp);//JPC JLink***********************************
jcaceres's avatar
JL    
jcaceres committed
29
  cout << "wholeSize=================== " <<  wholeSize << endl;
30

31
32
  packetData = new char[wholeSize];
  memset (packetData, 0, wholeSize);
jcaceres's avatar
jcaceres committed
33
34
  //numRedundantBuffers = netInfo->getChunksPerPacket() - 1;//JPC JLink***********************************
  numRedundantBuffers = 0;//JPC JLink***********************************
35
  maxPacketIndex = netInfo->getMaxSeq();
jcaceres's avatar
jcaceres committed
36
37
38
}


39
40
41
42
//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
UDPInput::~UDPInput()
{
jcaceres's avatar
jcaceres committed
43
44
  //I need to check how to really clear memory with multithreads,
  //the following lines don't work.
45
46
  //delete sock;
  //delete peerAddress;
47
48
49
50
51
52
53
54
}


//-------------------------------------------------------------------------------
/*! cout that the UDPInput thread has started
 *
 */
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
55
56
57
void
UDPInput::Initial ()
{
58
  cout << "Started UDPInput thread" << endl;
jcaceres's avatar
jcaceres committed
59
60
}

61
62
63

//-------------------------------------------------------------------------------
/* Receive a buffer from the UDPSocket.
jcaceres's avatar
jcaceres committed
64
65
66
 * @param buf is the location at which to store the buffer.
 * @param le is the size of the buffer to be received.
 */
67
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
68
69
70
int
UDPInput::rcv (char *buf)
{
71
72
73
  //int	rv = sock->readBlock (packetData, wholeSize);//***JPC Port to qt4*****************
  int	rv = sock->readDatagram (packetData, wholeSize, peerAddress);//***JPC Port to qt4*****************
  //cout << "***Packet Size***: " << rv << endl;//***JPC Port to qt4*****************
74
  byteSwap(packetData, wholeSize);//JPC JLink***********************************
75
76

  char *datapart;
jcaceres's avatar
jcaceres committed
77
78
79
80
  //packetIndex = ((nsHeader *) packetData)->i_seq;//JPC JLink***********************************
  packetHeader = ((nsHeader *) packetData)->i_head;//JPC JLink***********************************
  //datapart = packetData + sizeof (nsHeader) + //JPC JLink***********************************
  //  ((packetIndex % ((nsHeader *) packetData)->i_copies) * bpp);//JPC JLink***********************************
81

jcaceres's avatar
jcaceres committed
82
  datapart = packetData + sizeof (nsHeader);//JPC JLink***********************************
83
  //byteSwap(datapart, wholeSize);
84
  memcpy (buf, datapart, bpp);
85
  
jcaceres's avatar
jcaceres committed
86
87
88
89

  //###############JPC JLink#######################
  // Binary print function
  //unsigned short caca = 0xFFFF;
90
91
92
93
94
  //PR("header in binary 24 ==============: ", ETX_RATE_MASK(ETX_24KHZ | ETX_XTND));
  //PR("header in binary 22 ==============: ", ETX_RATE_MASK( ETX_22KHZ | ETX_XTND));
  //PR("header in binary 8 ==============: ",  ETX_RATE_MASK(ETX_8KHZ | ETX_XTND));
  //PR("tess 1 ======", 2 );
  //PR("header in binary INPUT:", packetHeader);
jcaceres's avatar
JL    
jcaceres committed
95
  
96
97
98
99
100
101
102
103
104
105
  // Byteswaping Test Function
  /*
  char datapartSWAP[wholeSize - 2];
  //packetData2 = new char[wholeSize - 2];
  for (int i= 0; i < (wholeSize-2)/2; i++) {
    datapartSWAP[2*i]   = datapart[2*i+1];
    datapartSWAP[2*i+1] = datapart[2*i];
  }
  memcpy (buf, datapartSWAP, bpp);
  */
jcaceres's avatar
JL    
jcaceres committed
106

107
108
109
110
111
112
113
  //uint16_t caca;
  //caca = 60;
  //PR("CACACACACACACAACA:", caca);
  
  //uint16_t caca2;
  //caca2 = htons(caca);
  //PR("CACACACACACACAACA22222222222:", caca2);
114

jcaceres's avatar
jcaceres committed
115
116
  //###############################################
  
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
  /*
    ((nsHeader *) packetData)->i_type = 0;
    ((nsHeader *) packetData)->i_nframes = 1;
    ((nsHeader *) packetData)->i_nchans = 2;
    ((nsHeader *) packetData)->i_copies = n;
    ((nsHeader *) packetData)->i_cksum = 4;
    ((nsHeader *) packetData)->i_seq = packetIndex;
    ((nsHeader *) packetData)->i_rtnseq = 6;
    ((nsHeader *) packetData)->i_rtt = 7;
  */
  if (rv < 0)
    {
      cerr << "bad read..." << endl;
    }
  return packetIndex;
jcaceres's avatar
jcaceres committed
132
133
}

134
135
136

//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
137
138
139
int
UDPInput::rcvz1 (char *bufz1, int z)
{
140
  char *datapart;
jcaceres's avatar
jcaceres committed
141
142
  //packetIndex = ((nsHeader *) packetData)->i_seq-z;//JPC JLink***********************************
  packetIndex = ((nsHeader *) packetData)->i_head-z;//JPC JLink***********************************
143
144
145
146
147
  if (packetIndex < 0) 
    {
      packetIndex += maxPacketIndex;
      //		cout << "backed below 0 (a good thing)" << endl;
    }
jcaceres's avatar
jcaceres committed
148
149
  //datapart = packetData + sizeof (nsHeader) + //JPC JLink***********************************
  //  ((packetIndex % ((nsHeader *) packetData)->i_copies) * bpp);//JPC JLink***********************************
150
151
  memcpy (bufz1, datapart, bpp);
  return packetIndex;
jcaceres's avatar
jcaceres committed
152
153
}

154
155
156

//-------------------------------------------------------------------------------
/* The main run loop.  Calls waitForConnection().  Once connected,
jcaceres's avatar
jcaceres committed
157
158
159
 * while running is true, it checks if the socket has a new buffer.
 * If it does, it writes the buffer to the stream.
 */
160
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
161
162
163
void
UDPInput::stop ()
{
164
  _running = false;
jcaceres's avatar
jcaceres committed
165
166
}

167
168
//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
169
170
171
void
UDPInput::run ()
{
jcaceres's avatar
jcaceres committed
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
  char localhostbuf[100];
  sock = new QUdpSocket; //***JPC Port to qt4*****************
  peerAddress = new QHostAddress;
  if (gethostname (localhostbuf, 99))
    {
      perror ("gethostname");
      exit ();
    }

  cout <<"Local Host Name: " << QString(QHostInfo::localHostName ()).latin1() << endl;//***JPC Port to qt4*****************
  //cout << "Rx buff = " << sock->receiveBufferSize () << endl;//***JPC Port to qt4*****************
  QHostAddress *ha = new QHostAddress ();//***JPC Port to qt4*****************
  QString *s = IPv4Addr (localhostbuf);	// dotted integer from name//***JPC Port to qt4*****************
  ha->setAddress (*s);//***JPC Port to qt4*****************
  cout << "INPUT PORT: " << netInfo->getInPort () << endl;

  //if (!(sock->bind (*ha, netInfo->getInPort ())))//***JPC Port to qt4*****************
  if (!(sock->bind (*ha, netInfo->getInPort (), QUdpSocket::ShareAddress ) ) )//***JPC Port to qt4*****************
    {
      perror ("bind\n");
      exit ();
    }
  if (!sock->isValid ())
    {
      cout << "socket creation error " << endl;
    }
  cout << endl << "UDPInput binding to " << localhostbuf
       << " port " << netInfo->getInPort () << endl;


202
203
204
205
  _running = true;
  int seq;
  char *buf = (char *) new char[bpp];
  char *bufz1 = (char *) new char[bpp];
jcaceres's avatar
jcaceres committed
206

207
208
209
210
211
212
213
214
215
  cout << "UDP Input waiting for peer." << endl;
  while (has_peer == false)
    {
      //cout << "pendingDatagramSize!!!!!!" << sock->pendingDatagramSize () << endl;//***JPC Port to qt4*****************
      //cout << "hasPendingDatagrams!!!!!!" << sock->hasPendingDatagrams () << endl;//***JPC Port to qt4*****************
      //}
	  
      //if (sock->bytesAvailable () >= wholeSize)	// not an error//***JPC Port to qt4*****************
      if (sock->pendingDatagramSize () >= wholeSize)	// not an error//***JPC Port to qt4*****************
jcaceres's avatar
jcaceres committed
216
	{
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
	  //cout <<"wholeSize = " << wholeSize << " " <<sock->bytesAvailable ()<< endl;// not an error//***JPC Port to qt4*****************
	  cout <<"wholeSize = " << wholeSize << " " <<sock->pendingDatagramSize ()<< endl;// not an error//***JPC Port to qt4*****************
	  //sock->readBlock (buf, wholeSize);
	  this->rcv (buf);
	  has_peer = true;	// really rcvd something
	  cout << "UDP Input Connected!" << endl;			
	} else
	msleep (100);
    }
  msleep (10);
  cout << "Started UDP Input Run" << endl;
  set_fifo_priority (false);
  bool timeout;
  int ret;
  unsigned long now = 0;
  unsigned long lastTickTime = usecTime ();
  int ctr = 0;
jcaceres's avatar
jcaceres committed
234
  //double max = 0.0;//JPC JLink***********************************
235
  int gap;
jcaceres's avatar
jcaceres committed
236
  //double gapAvg = 0.0; //JPC JLink***********************************
237
238
239
240
241
242
243
244
245
  while (_running)
    {
      // If timeout is non-null and no error occurred 
      // (i.e. it does not return -1): this function sets *timeout to TRUE, 
      // if the reason for returning was that the timeout was reached; 
      // otherwise it sets *timeout to FALSE. This is useful to find out 
      // if the peer closed the connection.
      //ret = (sock->waitForMore (30, &timeout));//***JPC Port to qt4*****************
      timeout = sock->waitForReadyRead(30);//***JPC Port to qt4*****************
246

247
248
249
      //if (ret == -1)//***JPC Port to qt4*****************
      //cerr << "udp in sock problems..." << endl;//***JPC Port to qt4*****************
      //else if (timeout)//***JPC Port to qt4*****************
jcaceres's avatar
jcaceres committed
250
      if (!timeout) {//***JPC Port to qt4*****************
251
	cerr << "udp in waited too long (more than 30ms)..." << endl;
jcaceres's avatar
jcaceres committed
252
	}
253
      else 
jcaceres's avatar
jcaceres committed
254
	{
255
256
257
258
259
	  seq = this->rcv (buf);
	  if (stream == NULL)
	    {
	      cerr << "ERROR: UDPInput has no sream to write to!" << endl;
	    }
jcaceres's avatar
jcaceres committed
260
			
261
262
263
264
265
266
267
	  int z = numRedundantBuffers;
	  while (z)
	    {
	      int zseq = this->rcvz1 (bufz1, z);
	      stream->writeRedundant (bufz1, key, z, zseq);
	      z--;
	    }
jcaceres's avatar
jcaceres committed
268
269
	  //gap = stream->writeRedundant (buf, key, 0, seq);//JPC JLink***********************************
	  stream->writeRedundant (buf, key, 0, seq);//JPC JLink***********************************
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
	  //		cout << "writePosition " << gap <<"\t\t";
	  /*
	    now = usecTime ();
	    ctr++;
	    if (ctr == 40)
	    {
	    ctr = 0;
	    gapAvg = gapAvg / 40.0;
	    //		plotVal (gapAvg);
	    //		plotVal (max);
	    gapAvg = 0.0;
	    max = 0.0;
	    } else {
	    double xxx = (((double)now - (double)lastTickTime)/1000000.0);
	    //			if (xxx>max) max = xxx;
	    max += xxx;
	    gapAvg += ((double)gap);
	    }
	    lastTickTime = now;
	  */
	  //			stream->write (buf, key);
jcaceres's avatar
jcaceres committed
291
	}
292
293
    }
  cout << "UDP Input stop" << endl;
jcaceres's avatar
jcaceres committed
294
295
}

296
297
298

//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
299
300
void UDPInput::plotVal (double v)
{
301
302
303
304
305
    if(_rcvr!=NULL)
    {
    ThreadCommEvent *e = new ThreadCommEvent (-1.0, v, 0.0);
    QApplication::postEvent (_rcvr, e);	// to app event loop
    }
jcaceres's avatar
jcaceres committed
306
}
307
308
309
310


//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
311
312
bool UDPInput::hasPeer ()
{
313
  return has_peer;
jcaceres's avatar
jcaceres committed
314
315
316
}


317
318
319
320
321
//-------------------------------------------------------------------------------
/* Returns the address of the peer to which the socket is connected
 *
 */
//-------------------------------------------------------------------------------
jcaceres's avatar
jcaceres committed
322
323
QHostAddress UDPInput::peer ()
{
324
325
326
327
328
329
  sock->readDatagram (packetData, wholeSize, peerAddress);//***JPC Port to qt4*****************
  cout << (*peerAddress).toString().latin1() << endl;
  //cout << sock->state() << endl;
  //cout << sock->peerName().latin1() << endl;
  //return sock->peerAddress ();
  return (*peerAddress);
jcaceres's avatar
jcaceres committed
330
}