Commit 9bfdfce6 authored by Roman Haefeli's avatar Roman Haefeli
Browse files

Merge branch 'switchboard' into 'master'

Switchboard

See merge request TPF/tpf-scripts!1
parents d02bdff2 5ca69918
proxies/__pycache__/
__pycache__/
This diff is collapsed.
# tpf-scripts
# tpf-switchboard
Collections of scripts and tools used in the TPF context.
*tpf-switchboard* is a service with a JSON API to manage different types
of UDP proxies. UDP proxies are way to establish UDP connections between
clients from behind NAT firewalls. The UDP proxies support several
connection topologies.
## UDP proxy scripts
It is written in [Python](https://www.python.org/) and uses the
[flask](https://flask.palletsprojects.com/) framework.
Those scripts are intended to be run on a server with a public IP address. They
relay incoming UDP packets between connected clients. We use them to establish
UDP connections between endpoints behind NAT firewalls.
## JSON API description
### udp_mirror
### Start a new proxy
**udp_mirror** mirrors incoming packets. This is useful for testing, for instance
A new proxy process is launched by sending a HTTP `POST` request with POST data
containing the description of the new proxy in JSON format to the path `<base>/proxies/`.
An example of POST data:
```json
{
"port": 11000,
"type", "one2oneBi",
"room": "rehearsal",
"description": "UltraGrid stage"
}
```
All four parameters `port`, `type`, `room`, and `description` are mandatory and must
be specified. Omitting any of them causes an error.
cURL example for starting a new proxy:
```bash
curl \
--header "Content-Type: application/json" \
--request POST \
--data '{"port": 11000, "type": "one2oneBi", "description": "UltraGrid stage", "room": "rehearsal"}' \
http://localhost:3591/proxies/
```
### Inspect running proxies
#### path `<base>/proxies/`
Information about running proxies is gathered with a HTTP `GET` request to `<base>/proxies/`. For
a specific proxy, use the proxy's specific path `<base>/proxies/<port>`.
cURL examples:
```bash
curl \
--requeset GET \
http://localhost:3591/proxies/
```
```bash
curl \
--requeset GET \
http://localhost:3591/proxies/11000
```
#### path `<base>/rooms/`
You can also request running proxies grouped by room through `<base>/rooms/`. Also, all
proxies running in a specific room can be listed.
cURL examples:
```bash
curl \
--requeset GET \
http://localhost:3591/rooms/
```
```bash
curl \
--requeset GET \
http://localhost:3591/rooms/rehearsal
```
### Stop a running proxy
A running proxy is stopped with a HTTP `DELETE` request to the proxy's path.
cURL example:
```bash
curl \
--request DELETE
http://localhost:3591/proxies/11000
```
**NOTE**:
Since `DELETE` requests should be treated in an idem-potent way, stopping
an already stopped proxy is considered not an error.
### Return values
For requests to semantically valid paths, the return value is a JSON object
of the format:
```json
{
"status": "<Either 'Error' or 'OK'>",
"msg": "<Some message describing the status>"
}
```
Depending on type of request and on status, different HTTP status codes are
returned. HTTP status may be one of `200`, `201`, `404`, `422`.
## UDP proxy types
**mirror** mirrors incoming packets. This is useful for testing, for instance
to test if the server port is reachable. Also, it can be used to test applications
like UltraGrid when no second peer is available.
### udp_proxy
**udp_proxy** establishes a connection between two endpoints. As soon as both endpoints
**one2oneBi** establishes a connection between two endpoints. As soon as both endpoints
have sent at least one packet, the script starts relaying incoming between clients. This
script handles exactly one connetion with two endpoints.
### udp_dyn_proxy
**one2manyMo** opens two listening socket, a source and a sink. It relays all incoming
traffic from the source to all clients connected to the sink. Sink clients are requested
to send at least one packet per second to signal their active connection. Packets from
sink clients are discarded.
**one2manyBi** establishes 1-to-N connections like **one2manyMo**, but additionally allows
sink clients send packets to the source. Packets from source are forwarded to all active
sink clients, packets from sink clients are forwarded to the source client. For keeping
connections alive without forwarding any data, **one2manyBi** discards OSC packets with an
address `/hb` and no payload.
**many2manyBi** relays incoming packets to all active clients but to to itself. Clients
are considered active as long as they send at least one packet per second. OSC packets
with an address `/hb` and no payload are discarded and may be used by clients to keep
their connection alive without sending data.
## Deployment
The recommended way of running *tpf-switchboard* is to execute it under
[gunicorn](https://gunicorn.org/). The included script `setup.sh` automates
the process of setting up *tpf-switchboard* as a system service. The script
is tested on *Debian* and *Ubuntu*. Run it as root:
```bash
./setup.sh
```
## About
*tpf-switchboard* is developed in the research project **Spatial Dis-/
Continuities in Telematic Performances**. It is one element of our tool set to
enable remote locations to create overlapping spaces on physical and virtual
stages.
## Authors
* Roman Haefeli <roman.haefeli@zhdk.ch>
**udp_dyn_proxy** handles many endpoint-to-endpoint connections simultaneously. For
that to work, each client sends a token message with a key word to the server. After
having received the same token message from two different clients, the sever starts
relaying packets between the two clients.
## License
### udp_multi_proxy
**GPL 3.0** (see [LICENSE.txt](LICENSE.txt))
**udp_multi_proxy** relays an incoming stream of UDP packets from a source client to
one or many receiving clients. The script opens two listening sockets, one for the
source client and one with an offset for the receiving clients. Receiving clients
need to send a dummy packet in regular intervals to keep their connection alive.
from proxies.udp_one2onebi import (One2OneBiProxy)
from proxies.udp_mirror import (MirrorProxy)
from proxies.udp_one2manymo import (One2ManyMoProxy)
from proxies.udp_one2manybi import (One2ManyBiProxy)
from proxies.udp_many2manybi import (Many2ManyBiProxy)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
udp_many2manybi: for N-to-N connections (OSC)
"""
import socket
import sys
import threading
import time
class Many2ManyBiProxy(threading.Thread):
"""
Relays UDP packets between many endpoints. Incoming packets are forwarded to all other
endpoints (not to itself). OSC messages '/hb' are not forwarded, but keep connection
alive.
"""
def __init__(self, listen_port=None, listen_address='0.0.0.0', timeout=10):
super(Many2ManyBiProxy, self).__init__()
if not isinstance(listen_port, int) or not 1024 <= listen_port <= 65535:
raise ValueError('Specified port "%s" is invalid.' % port)
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.settimeout(0.1)
self.sock.bind((listen_address, listen_port))
except socket.error as msg:
raise
self.kill_signal = False
# key of dict is client's (address, port) tuple
self.active_endpoints = {}
self.timeout = timeout
self.heartbeat_sequence = bytes([47, 104, 98, 0, 44, 0, 0, 0])
def run(self):
while not self.kill_signal:
try:
my_data, my_addr = self.sock.recvfrom(65536)
except socket.timeout:
continue
self.active_endpoints[my_addr] = time.time()
if self.heartbeat_sequence != my_data[:len(self.heartbeat_sequence)]:
other_clients = list(self.active_endpoints.keys())
other_clients.remove(my_addr)
for addr in other_clients:
if (self.active_endpoints[addr] + self.timeout) < time.time():
del self.active_endpoints[addr]
else:
self.sock.sendto(my_data, addr)
def stop(self):
self.kill_signal = True
self.join()
def main():
try:
listen_port = int(sys.argv[1])
proxy = Many2ManyBiProxy(listen_port=listen_port)
proxy.start()
proxy.join()
except KeyboardInterrupt:
proxy.stop()
sys.exit(0)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
udp_mirror: reflects udp packets to their origin
"""
import socket
import sys
import threading
import time
class MirrorProxy(threading.Thread):
"""
Relays any incoming UDP packets back to the sender. Mainly useful for testing
purposes.
"""
def __init__(self, listen_port=None, listen_address='0.0.0.0'):
super(MirrorProxy, self).__init__()
if not isinstance(listen_port, int) or not 1024 <= listen_port <= 65535:
raise ValueError('Specified port "%s" is invalid.' % listen_port)
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.settimeout(0.1)
self.sock.bind((listen_address, listen_port))
except socket.error as msg:
raise
self.kill_signal = False
def run(self):
while not self.kill_signal:
try:
data, addr = self.sock.recvfrom(65536)
except socket.timeout:
continue
self.sock.sendto(data, addr)
def stop(self):
self.kill_signal = True
self.join()
def main():
try:
proxy = MirrorProxy(listen_port=int(sys.argv[1]))
proxy.start()
proxy.join()
except KeyboardInterrupt:
proxy.stop()
sys.exit(0)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
udp_one2manybi: for 1-to-N connections (both directions)
"""
import select
import socket
import sys
import threading
import time
class One2ManyBiProxy(threading.Thread):
"""
Relays UDP packets from one source client to many sink clients. Sink clients
are expected to send dummy packets in regular intervals to signal their presence.
Different ports are used for source and sink clients.
"""
def __init__(self, one_port=None, many_port=None, listen_address='0.0.0.0', timeout=10):
super(One2ManyBiProxy, self).__init__()
for port in [one_port, many_port]:
if not isinstance(port, int) or not 1024 <= port <= 65535:
raise ValueError('Specified port "%s" is invalid.' % port)
try:
self.source = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.source.settimeout(0.1)
self.source.bind((listen_address, one_port))
except socket.error as msg:
raise
try:
self.sink = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Make socket non-blocking by setting timeout to 0
self.sink.settimeout(0)
self.sink.bind((listen_address, many_port))
except socket.error as msg:
raise
self.kill_signal = False
# key of dict is sink_client's (address, port) tuple
self.active_endpoints = {}
self.timeout = timeout
self.heartbeat_sequence = bytes([47, 104, 98, 0, 44, 0, 0, 0])
self.one_port = one_port
self.many_port = many_port
def run(self):
listening_sockets = [self.source, self.sink]
while not self.kill_signal:
readables, _w, _x = select.select(listening_sockets, [], [], 0.1)
for sock in readables:
if sock.getsockname()[1] == self.many_port:
# many sends back to one
many_data, many_addr = sock.recvfrom(65536)
self.active_endpoints[many_addr] = time.time()
if self.heartbeat_sequence != many_data[:len(self.heartbeat_sequence)]:
try:
self.active_endpoints[one_addr]
except KeyError:
# Do nothing after ithe 'one' endpoint has expired
continue
except UnboundLocalError:
continue
if (self.active_endpoints[one_addr] + self.timeout) < time.time():
del self.active_endpoints[one_addr]
else:
self.source.sendto(many_data, one_addr)
elif sock.getsockname()[1] == self.one_port:
# one sends to many
one_data, one_addr = sock.recvfrom(65536)
self.active_endpoints[one_addr] = time.time()
if self.heartbeat_sequence != one_data[:len(self.heartbeat_sequence)]:
many_list = list(self.active_endpoints.keys())
many_list.remove(one_addr)
for many_addr in many_list:
if (self.active_endpoints[many_addr] + self.timeout) < time.time():
del self.active_endpoints[many_addr]
else:
self.sink.sendto(one_data, many_addr)
else:
print('We should not ever reach that point')
def stop(self):
self.kill_signal = True
self.join()
def main():
try:
one_port = int(sys.argv[1])
many_port = one_port + 1
proxy = One2ManyBiProxy(one_port=one_port, many_port=many_port)
proxy.start()
proxy.join()
except KeyboardInterrupt:
proxy.stop()
sys.exit(0)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
udp_one2manymo: for 1-to-N connections
"""
import socket
import sys
import threading
import time
class One2ManyMoProxy(threading.Thread):
"""
Relays UDP packets from one source client to many sink clients. Sink clients
are expected to send dummy packets in regular intervals to signal their presence.
Different ports are used for source and sink clients.
"""
def __init__(self, listen_port=None, send_port=None, listen_address='0.0.0.0', timeout=10):
super(One2ManyMoProxy, self).__init__()
for port in [listen_port, send_port]:
if not isinstance(port, int) or not 1024 <= port <= 65535:
raise ValueError('Specified port "%s" is invalid.' % port)
try:
self.source = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.source.settimeout(0.1)
self.source.bind((listen_address, listen_port))
except socket.error as msg:
raise
try:
self.sink = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Make socket non-blocking by setting timeout to 0
self.sink.settimeout(0)
self.sink.bind((listen_address, send_port))
except socket.error as msg:
raise
self.kill_signal = False
# key of dict is sink_client's (address, port) tuple
self.sink_clients = {}
self.timeout = timeout
def run(self):
while not self.kill_signal:
# handle incoming packets from sink clients
while True:
try:
_trash, sink_addr = self.sink.recvfrom(65536)
except BlockingIOError:
break
self.sink_clients[sink_addr] = time.time()
# handle incoming packets from source client
try:
data, addr = self.source.recvfrom(65536)
except socket.timeout:
continue
# remove expired clients from sink_clients
for client, prev_ts in list(self.sink_clients.items()):
if (prev_ts + self.timeout) < time.time():
del self.sink_clients[client]
# send data to remaining sink_clients
for client in self.sink_clients.keys():
self.sink.sendto(data, client)
def stop(self):
self.kill_signal = True
self.join()
def main():
try:
source_port = int(sys.argv[1])
sink_port = source_port + 1
proxy = One2ManyMoProxy(listen_port=source_port, send_port=sink_port)
proxy.start()
proxy.join()
except KeyboardInterrupt:
proxy.stop()
sys.exit(0)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
udp_one2onebi: for 1-to-1 connections
"""
import socket
import sys
import threading
import time
class One2OneBiProxy(threading.Thread):
"""
Relays UDP packets between two endpoints. This allows two end-points
behind NAT firewalls to communicate with each other by relaying their traffic
through a server with a public IP running this script.
"""
def __init__(self, listen_port=None, listen_address='0.0.0.0'):
super(One2OneBiProxy, self).__init__()
if not isinstance(listen_port, int) or not 1024 <= listen_port <= 65535:
raise ValueError('Specified port "%s" is invalid.' % listen_port)
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.settimeout(0.1)
self.sock.bind((listen_address, listen_port))
except socket.error as msg:
raise
self.kill_signal = False
def run(self):
client1 = None
client2 = None
while not self.kill_signal:
try:
data, addr = self.sock.recvfrom(65536)
except socket.timeout:
continue
# Assigning clients
if addr != client1 and addr != client2:
client1 = client2
client2 = addr
# transmit data
if client1 and client2:
if addr == client1:
self.sock.sendto(data, client2)
elif addr == client2:
self.sock.sendto(data, client1)
def stop(self):
self.kill_signal = True
self.join()
def main():
try:
proxy = One2OneBiProxy(listen_port=int(sys.argv[1]))
proxy.start()
proxy.join()
except KeyboardInterrupt:
proxy.stop()
sys.exit(0)
if __name__ == '__main__':
main()
#!/bin/bash
USER="tpf-switchboard"
APP_PATH="$( cd "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
APP_NAME="switchboard"
SERVICE_NAME="tpf-switchboard"
LOG_DIR="/var/log/tpf-switchboard"
LISTEN_PORT=3591
LISTEN_ADDRESS="0.0.0.0"
function hilite {
echo -ne "\033[32m"
echo "${@}"
echo -ne "\033[0m"
}
function errexit {
echo -ne "\033[31m"
echo "${@}"
echo "Exiting prematurely."
echo -ne "\033[0m"
exit 1
}
# are we root?
hilite "Check whether we are root."
[ "$(whoami)" == "root" ] || errexit "We are not running as root. Exiting now."
# Install flask
hilite "Make sure flask is installed"
if ! which flask > /dev/null
then
apt-get install python3-flask || errexit "Failed to install python3-flask"
fi
# Install gunicorn3
hilite "Make sure gunicorn3 is installed"
if ! which gunicorn3 > /dev/null
then
apt-get install gunicorn3 || errexit "Failed to install gunicorn3"
fi
# add user to the system