Commit 8a5575f0 authored by jcaceres's avatar jcaceres
Browse files

Kind of working, I still get a weird error

parent 2ef23480
......@@ -205,9 +205,9 @@ StreamBD::cmd (MainDialog *eventThread)
audioDevice->setThreads(t);
t.netin = new UDPInput (netInfo, audioInfo);
t.netin->setGUI((QObject *)eventThread);
t.netin->setGUI((QObject *)eventThread);
t.netout = new UDPOutput (netInfo, audioInfo);
t.netout->setGUI((QObject *)eventThread);
t.netout->setGUI((QObject *)eventThread);
ConnectPlugins (t.audioin, t.netout, t.streamout);
ConnectPlugins (t.netin, t.audioout, t.streamin);
......@@ -579,9 +579,11 @@ StreamBD::EstablishConnection (runModeT runMode, char *hostname, UDPOutput * net
usleep (10000);
// cout << ".";
}
/////FOLOW THIS TO FIND THE PROBLEM
//**************JPC COMENTED OUT*******************
//cout << endl << "Connection received from: " <<
// netin->peer ().toString () << endl;
cout << endl << "Connection received from: " <<
netin->peer().toString().latin1() << endl;
//*************************************************
cout << "Requesting return connection....";
netout->connect (netin->peer ());
......
......@@ -41,7 +41,7 @@ maxSeq (maxSeq)
prepareReaderWriter ();
dataLock = new QSemaphore (1);
cout << (*dataLock).available() << " FDF SDFJSDIF JSDFIJ DSIFJSDOI JOSI" << endl;
//cout << (*dataLock).available() << endl;
secondTry = false;
}
......
......@@ -3,7 +3,7 @@
#define INCLUDED_CIRCULARBUFFER
#include <qthread.h>
//#include <qsemaphore.h>
//#include <qsemaphore.h>//***JPC Port to qt4*****************
#include <QSemaphore>
/**
* @brief Provides a circular buffer that can be written to and read
......
......@@ -26,7 +26,7 @@ echo "qmake .pro built"
#Create Makefile
qmake-qt4 -makefile -unix -o Makefile \
"CONFIG+=qt thread warn_on debug" \
"QMAKE_CXXFLAGS+=-Wno-deprecated -pg -g" \
"QMAKE_CXXFLAGS+=-Wno-deprecated -g -O2" \
"QT += network qt3support" \
"INCLUDEPATH+=/usr/include/stk ../include" \
"DEFINES+=__LINUX_ALSA__ APP_NAME=$APP_NAME_QUOTES" \
......
......@@ -5,223 +5,281 @@
#include <stdlib.h>
#include <stdio.h>
#include <iostream.h>
#include <QHostInfo>//***JPC Port to qt4*****************
extern QString *IPv4Addr (char *namebuf);
extern int set_fifo_priority (bool half);
//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
UDPInput::UDPInput (NetworkInfoT netInfo, AudioInfoT audInfo):
InputPlugin ("UDP Input"), netInfo (netInfo), audInfo (audInfo)
InputPlugin ("UDP Input"), netInfo (netInfo), audInfo (audInfo)
{
bpp = netInfo->getDataBytesPerPacket ();
cout << "bpp = " << bpp << endl;
char localhostbuf[100];
has_peer = false;
_rcvr = NULL;
sock = new Q3SocketDevice (Q3SocketDevice::Datagram); // for an unreliable UDP socket
sock->setAddressReusable(true);
if (gethostname (localhostbuf, 99))
{
perror ("gethostname");
exit ();
}
cout << "Rx buff = " << sock->receiveBufferSize () << endl;
QHostAddress *ha = new QHostAddress ();
QString *s = IPv4Addr (localhostbuf); // dotted integer from name
ha->setAddress (*s);
if (!(sock->bind (*ha, netInfo->getInPort ())))
{
perror ("bind\n");
exit ();
}
if (!sock->isValid ())
{
cout << "socket creation error " << endl;
}
bpp = netInfo->getDataBytesPerPacket ();
cout << "bpp = " << bpp << endl;
char localhostbuf[100];
has_peer = false;
_rcvr = NULL;
//sock = new Q3SocketDevice (Q3SocketDevice::Datagram); // for an unreliable UDP socket//***JPC Port to qt4*****************
//sock->setAddressReusable(true);//***JPC Port to qt4*****************
sock = new QUdpSocket;//***JPC Port to qt4*****************
peerAddress = new QHostAddress;
if (gethostname (localhostbuf, 99))
{
perror ("gethostname");
exit ();
}
cout <<"Local Host Name: " << QString(QHostInfo::localHostName ()).latin1() << endl;//***JPC Port to qt4*****************
//cout << "Rx buff = " << sock->receiveBufferSize () << endl;//***JPC Port to qt4*****************
QHostAddress *ha = new QHostAddress ();//***JPC Port to qt4*****************
QString *s = IPv4Addr (localhostbuf); // dotted integer from name//***JPC Port to qt4*****************
ha->setAddress (*s);//***JPC Port to qt4*****************
cout << "INPUT PORT: " << netInfo->getInPort () << endl;
//if (!(sock->bind (*ha, netInfo->getInPort ())))//***JPC Port to qt4*****************
if (!(sock->bind (*ha, netInfo->getInPort (), QUdpSocket::ShareAddress ) ) )//***JPC Port to qt4*****************
{
perror ("bind\n");
exit ();
}
if (!sock->isValid ())
{
cout << "socket creation error " << endl;
}
packetIndex = 0;
wholeSize = sizeof (nsHeader) + (netInfo->getChunksPerPacket () * bpp) + 1;
packetData = new char[wholeSize];
memset (packetData, 0, wholeSize);
numRedundantBuffers = netInfo->getChunksPerPacket() - 1;
maxPacketIndex = netInfo->getMaxSeq();
cout << endl << "UDPInput binding to " << localhostbuf
<< " port " << netInfo->getInPort () << endl;
packetIndex = 0;
wholeSize = sizeof (nsHeader) + (netInfo->getChunksPerPacket () * bpp) + 1;
packetData = new char[wholeSize];
memset (packetData, 0, wholeSize);
numRedundantBuffers = netInfo->getChunksPerPacket() - 1;
maxPacketIndex = netInfo->getMaxSeq();
cout << endl << "UDPInput binding to " << localhostbuf
<< " port " << netInfo->getInPort () << endl;
}
/** cout that the UDPInput thread has started */
//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
UDPInput::~UDPInput()
{
delete sock;
delete peerAddress;
}
//-------------------------------------------------------------------------------
/*! cout that the UDPInput thread has started
*
*/
//-------------------------------------------------------------------------------
void
UDPInput::Initial ()
{
cout << "Started UDPInput thread" << endl;
cout << "Started UDPInput thread" << endl;
}
/**
* Receive a buffer from the UDPSocket.
//-------------------------------------------------------------------------------
/* Receive a buffer from the UDPSocket.
* @param buf is the location at which to store the buffer.
* @param le is the size of the buffer to be received.
*/
//-------------------------------------------------------------------------------
int
UDPInput::rcv (char *buf)
{
int rv = sock->readBlock (packetData, wholeSize);
char *datapart;
packetIndex = ((nsHeader *) packetData)->i_seq;
datapart = packetData + sizeof (nsHeader) +
((packetIndex % ((nsHeader *) packetData)->i_copies) * bpp);
memcpy (buf, datapart, bpp);
/*
((nsHeader *) packetData)->i_type = 0;
((nsHeader *) packetData)->i_nframes = 1;
((nsHeader *) packetData)->i_nchans = 2;
((nsHeader *) packetData)->i_copies = n;
((nsHeader *) packetData)->i_cksum = 4;
((nsHeader *) packetData)->i_seq = packetIndex;
((nsHeader *) packetData)->i_rtnseq = 6;
((nsHeader *) packetData)->i_rtt = 7;
*/
if (rv < 0)
{
cerr << "bad read..." << endl;
}
return packetIndex;
//int rv = sock->readBlock (packetData, wholeSize);//***JPC Port to qt4*****************
//cout << "###############################################" << endl;
int rv = sock->readDatagram (packetData, wholeSize, peerAddress);//***JPC Port to qt4*****************
//cout << "###############################################" << rv << endl;
//cout << "***Packet Size***: " << rv << endl;//***JPC Port to qt4*****************
char *datapart;
packetIndex = ((nsHeader *) packetData)->i_seq;
datapart = packetData + sizeof (nsHeader) +
((packetIndex % ((nsHeader *) packetData)->i_copies) * bpp);
memcpy (buf, datapart, bpp);
/*
((nsHeader *) packetData)->i_type = 0;
((nsHeader *) packetData)->i_nframes = 1;
((nsHeader *) packetData)->i_nchans = 2;
((nsHeader *) packetData)->i_copies = n;
((nsHeader *) packetData)->i_cksum = 4;
((nsHeader *) packetData)->i_seq = packetIndex;
((nsHeader *) packetData)->i_rtnseq = 6;
((nsHeader *) packetData)->i_rtt = 7;
*/
if (rv < 0)
{
cerr << "bad read..." << endl;
}
return packetIndex;
}
//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
int
UDPInput::rcvz1 (char *bufz1, int z)
{
char *datapart;
packetIndex = ((nsHeader *) packetData)->i_seq-z;
if (packetIndex < 0)
{
packetIndex += maxPacketIndex;
// cout << "backed below 0 (a good thing)" << endl;
}
datapart = packetData + sizeof (nsHeader) +
((packetIndex % ((nsHeader *) packetData)->i_copies) * bpp);
memcpy (bufz1, datapart, bpp);
return packetIndex;
char *datapart;
packetIndex = ((nsHeader *) packetData)->i_seq-z;
if (packetIndex < 0)
{
packetIndex += maxPacketIndex;
// cout << "backed below 0 (a good thing)" << endl;
}
datapart = packetData + sizeof (nsHeader) +
((packetIndex % ((nsHeader *) packetData)->i_copies) * bpp);
memcpy (bufz1, datapart, bpp);
return packetIndex;
}
/**
* The main run loop. Calls waitForConnection(). Once connected,
//-------------------------------------------------------------------------------
/* The main run loop. Calls waitForConnection(). Once connected,
* while running is true, it checks if the socket has a new buffer.
* If it does, it writes the buffer to the stream.
*/
//-------------------------------------------------------------------------------
void
UDPInput::stop ()
{
_running = false;
_running = false;
}
//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
void
UDPInput::run ()
{
_running = true;
int seq;
char *buf = (char *) new char[bpp];
char *bufz1 = (char *) new char[bpp];
_running = true;
int seq;
char *buf = (char *) new char[bpp];
char *bufz1 = (char *) new char[bpp];
cout << "UDP Input waiting for peer." << endl;
while (has_peer == false)
cout << "UDP Input waiting for peer." << endl;
while (has_peer == false)
{
//cout << "pendingDatagramSize!!!!!!" << sock->pendingDatagramSize () << endl;//***JPC Port to qt4*****************
//cout << "hasPendingDatagrams!!!!!!" << sock->hasPendingDatagrams () << endl;//***JPC Port to qt4*****************
//}
//if (sock->bytesAvailable () >= wholeSize) // not an error//***JPC Port to qt4*****************
if (sock->pendingDatagramSize () >= wholeSize) // not an error//***JPC Port to qt4*****************
{
if (sock->bytesAvailable () >= wholeSize) // not an error
{
cout <<"wholeSize = " << wholeSize << " " <<sock->bytesAvailable ()<< endl;
//sock->readBlock (buf, wholeSize);
this->rcv (buf);
has_peer = true; // really rcvd something
cout << "UDP Input Connected!" << endl;
} else
msleep (100);
}
msleep (10);
cout << "Started UDP Input Run" << endl;
set_fifo_priority (false);
bool timeout;
int ret;
unsigned long now = 0;
unsigned long lastTickTime = usecTime ();
int ctr = 0;
double max = 0.0;
int gap;
double gapAvg = 0.0;
while (_running)
//cout <<"wholeSize = " << wholeSize << " " <<sock->bytesAvailable ()<< endl;// not an error//***JPC Port to qt4*****************
cout <<"wholeSize = " << wholeSize << " " <<sock->pendingDatagramSize ()<< endl;// not an error//***JPC Port to qt4*****************
//sock->readBlock (buf, wholeSize);
this->rcv (buf);
has_peer = true; // really rcvd something
cout << "UDP Input Connected!" << endl;
} else
msleep (100);
}
msleep (10);
cout << "Started UDP Input Run" << endl;
set_fifo_priority (false);
bool timeout;
int ret;
unsigned long now = 0;
unsigned long lastTickTime = usecTime ();
int ctr = 0;
double max = 0.0;
int gap;
double gapAvg = 0.0;
while (_running)
{
// If timeout is non-null and no error occurred
// (i.e. it does not return -1): this function sets *timeout to TRUE,
// if the reason for returning was that the timeout was reached;
// otherwise it sets *timeout to FALSE. This is useful to find out
// if the peer closed the connection.
//ret = (sock->waitForMore (30, &timeout));//***JPC Port to qt4*****************
timeout = sock->waitForReadyRead(30);//***JPC Port to qt4*****************
//if (ret == -1)//***JPC Port to qt4*****************
//cerr << "udp in sock problems..." << endl;//***JPC Port to qt4*****************
//else if (timeout)//***JPC Port to qt4*****************
if (!timeout)//***JPC Port to qt4*****************
cerr << "udp in waited too long (more than 30ms)..." << endl;
else
{
// If timeout is non-null and no error occurred
// (i.e. it does not return -1): this function sets *timeout to TRUE,
// if the reason for returning was that the timeout was reached;
// otherwise it sets *timeout to FALSE. This is useful to find out
// if the peer closed the connection.
ret = (sock->waitForMore (30, &timeout));
if (ret == -1)
cerr << "udp in sock problems..." << endl;
else if (timeout)
cerr << "udp in waited too long..." << endl;
else
{
seq = this->rcv (buf);
if (stream == NULL)
{
cerr << "ERROR: UDPInput has no sream to write to!" << endl;
}
seq = this->rcv (buf);
if (stream == NULL)
{
cerr << "ERROR: UDPInput has no sream to write to!" << endl;
}
int z = numRedundantBuffers;
while (z)
{
int zseq = this->rcvz1 (bufz1, z);
stream->writeRedundant (bufz1, key, z, zseq);
z--;
}
gap = stream->writeRedundant (buf, key, 0, seq);
// cout << "writePosition " << gap <<"\t\t";
/*
now = usecTime ();
ctr++;
if (ctr == 40)
{
ctr = 0;
gapAvg = gapAvg / 40.0;
// plotVal (gapAvg);
// plotVal (max);
gapAvg = 0.0;
max = 0.0;
} else {
double xxx = (((double)now - (double)lastTickTime)/1000000.0);
// if (xxx>max) max = xxx;
max += xxx;
gapAvg += ((double)gap);
}
lastTickTime = now;
*/
// stream->write (buf, key);
}
int z = numRedundantBuffers;
while (z)
{
int zseq = this->rcvz1 (bufz1, z);
stream->writeRedundant (bufz1, key, z, zseq);
z--;
}
gap = stream->writeRedundant (buf, key, 0, seq);
// cout << "writePosition " << gap <<"\t\t";
/*
now = usecTime ();
ctr++;
if (ctr == 40)
{
ctr = 0;
gapAvg = gapAvg / 40.0;
// plotVal (gapAvg);
// plotVal (max);
gapAvg = 0.0;
max = 0.0;
} else {
double xxx = (((double)now - (double)lastTickTime)/1000000.0);
// if (xxx>max) max = xxx;
max += xxx;
gapAvg += ((double)gap);
}
lastTickTime = now;
*/
// stream->write (buf, key);
}
cout << "UDP Input stop" << endl;
}
cout << "UDP Input stop" << endl;
}
//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
void UDPInput::plotVal (double v)
{
if(_rcvr!=NULL)
{
ThreadCommEvent *e = new ThreadCommEvent (-1.0,
v,
0.0);
QApplication::postEvent (_rcvr, e); // to app event loop
}
/*
if(_rcvr!=NULL)
{
ThreadCommEvent *e = new ThreadCommEvent (-1.0, v, 0.0);
QApplication::postEvent (_rcvr, e); // to app event loop
}
*/
}
//-------------------------------------------------------------------------------
//-------------------------------------------------------------------------------
bool UDPInput::hasPeer ()
{
return has_peer;
return has_peer;
}
/** Returns the address of the peer to which the socket is connected. */
//-------------------------------------------------------------------------------
/* Returns the address of the peer to which the socket is connected
*
*/
//-------------------------------------------------------------------------------
QHostAddress UDPInput::peer ()
{
return sock->peerAddress ();
cout << "#################RETURNING PEEEEERRRRRRRRR#############################" << endl;
sock->readDatagram (packetData, wholeSize, peerAddress);//***JPC Port to qt4*****************
cout << (*peerAddress).toString().latin1() << endl;
//cout << sock->state() << endl;
//cout << sock->peerName().latin1() << endl;
//cout << "RETURNING 23232323**************** *********" << endl;
//return sock->peerAddress ();
return (*peerAddress);
}
......@@ -5,7 +5,8 @@
#include "udp.h"
#include "networkInfo.h"
#include "audioInfo.h"
#include <q3socketdevice.h>
//#include <q3socketdevice.h> //***JPC Port to qt4*****************
#include <QUdpSocket> //***JPC Port to qt4*****************
#include <qobject.h>
/**
......@@ -15,34 +16,40 @@
class UDPInput:public InputPlugin
{
private:
NetworkInfoT netInfo;
AudioInfoT audInfo;
Q3SocketDevice *sock;
bool _running;
bool has_peer;
int packetIndex; //used for netdebug, checking order of incoming packets
int maxPacketIndex;
char *packetData;
void setPacketSize (int size);
public:
UDPInput (NetworkInfoT netInfo, AudioInfoT audInfo);
int rcvz1 (char *bufz1, int z);
int rcv (char *buf);
bool hasPeer ();
QHostAddress peer ();
void Initial ();
void run ();
void stop ();
int bpp;
int wholeSize;
int numRedundantBuffers;
void plotVal (double v);
};
private:
NetworkInfoT netInfo;
AudioInfoT audInfo;
//Q3SocketDevice *sock;//***JPC Port to qt4*****************
QUdpSocket *sock;//***JPC Port to qt4*****************
bool _running;
bool has_peer;
int packetIndex; //used for netdebug, checking order of incoming packets
int maxPacketIndex;
char *packetData;
void setPacketSize (int size);
QHostAddress *peerAddress;//***JPC Port to qt4*****************
public:
UDPInput (NetworkInfoT netInfo, AudioInfoT audInfo);
virtual ~UDPInput();
int rcvz1 (char *bufz1, int z);
int rcv (char *buf);
bool hasPeer ();
QHostAddress peer ();
void Initial ();
void run ();
void stop ();
int bpp;
int wholeSize;
int numRedundantBuffers;
void plotVal (double v);
};
#endif
......@@ -14,17 +14,23 @@ audInfo (audInfo)
{
bpp = netInfo->getDataBytesPerPacket ();
char localhostbuf[100];
sock = new Q3SocketDevice (Q3SocketDevice::Datagram);
sock->setAddressReusable(true);
//sock = new Q3SocketDevice (Q3SocketDevice::Datagram);//***JPC Port to qt4*****************
//sock->setAddressReusable(true);//***JPC Port to qt4*****************
sock = new QUdpSocket;
if (gethostname (localhostbuf, 99))
{
perror ("gethostname");
exit ();
}
QHostAddress *ha = new QHostAddress ();
QString *s = IPv4Addr (localhostbuf); // dotted integer from name
QString *s = IPv4Addr (localhostbuf);
//cout << "AAAAAAAAA" << (*s).latin1() << endl;
ha->setAddress (*s);
if (!(sock->bind (*ha, netInfo->getOutPort ())))
cout << "AAAAAAAAA" << (*ha).toString ().latin1() << endl;
//if (!(sock->bind (*ha, netInfo->getOutPort ())))//***JPC Port to qt4*****************
if (!(sock->bind (*ha, netInfo->getOutPort (), QUdpSocket::ShareAddress ) ) )//***JPC Port to qt4*****************
{
perror ("bind\n");
exit ();
......@@ -53,6 +59,13 @@ audInfo (audInfo)
}
UDPOutput::~UDPOutput()
{
delete sock;
}