Commit 39e6401f authored by Roman Haefeli's avatar Roman Haefeli
Browse files

Merge branch 'multiprocessing' into 'master'

multiprocessing and error handling

See merge request TPF/tpf-scripts!2
parents af95acb7 89037a10
proxies/__pycache__/ proxies/__pycache__/
__pycache__/ __pycache__/
venv
.idea
\ No newline at end of file
...@@ -6,12 +6,12 @@ udp_many2manybi: for N-to-N connections (OSC) ...@@ -6,12 +6,12 @@ udp_many2manybi: for N-to-N connections (OSC)
""" """
import logging import logging
import multiprocessing
import socket import socket
import sys import sys
import threading
import time import time
class Many2ManyBiProxy(threading.Thread): class Many2ManyBiProxy(multiprocessing.Process):
""" """
Relays UDP packets between many endpoints. Incoming packets are forwarded to all other Relays UDP packets between many endpoints. Incoming packets are forwarded to all other
endpoints (not to itself). OSC messages '/hb' are not forwarded, but keep connection endpoints (not to itself). OSC messages '/hb' are not forwarded, but keep connection
...@@ -21,14 +21,15 @@ class Many2ManyBiProxy(threading.Thread): ...@@ -21,14 +21,15 @@ class Many2ManyBiProxy(threading.Thread):
def __init__(self, listen_port=None, listen_address='0.0.0.0', timeout=10, logger=None): def __init__(self, listen_port=None, listen_address='0.0.0.0', timeout=10, logger=None):
super(Many2ManyBiProxy, self).__init__() super(Many2ManyBiProxy, self).__init__()
if not isinstance(listen_port, int) or not 1024 <= listen_port <= 65535: if not isinstance(listen_port, int) or not 1024 <= listen_port <= 65535:
raise ValueError('Specified port "%s" is invalid.' % port) raise ValueError('Specified port "%s" is invalid.' % listen_port)
try: try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.settimeout(0.1) self.sock.settimeout(0.1)
self.sock.bind((listen_address, listen_port)) self.sock.bind((listen_address, listen_port))
except socket.error as msg: except socket.error as msg:
raise raise
self.kill_signal = False self.kill_signal = multiprocessing.Value('i', False)
# key of dict is client's (address, port) tuple # key of dict is client's (address, port) tuple
self.active_endpoints = {} self.active_endpoints = {}
self.timeout = timeout self.timeout = timeout
...@@ -36,7 +37,7 @@ class Many2ManyBiProxy(threading.Thread): ...@@ -36,7 +37,7 @@ class Many2ManyBiProxy(threading.Thread):
self.heartbeat_sequence = bytes([47, 104, 98, 0, 44, 0, 0, 0]) self.heartbeat_sequence = bytes([47, 104, 98, 0, 44, 0, 0, 0])
def run(self): def run(self):
while not self.kill_signal: while not self.kill_signal.value:
try: try:
try: try:
my_data, my_addr = self.sock.recvfrom(65536) my_data, my_addr = self.sock.recvfrom(65536)
...@@ -50,12 +51,19 @@ class Many2ManyBiProxy(threading.Thread): ...@@ -50,12 +51,19 @@ class Many2ManyBiProxy(threading.Thread):
if (self.active_endpoints[addr] + self.timeout) < time.time(): if (self.active_endpoints[addr] + self.timeout) < time.time():
del self.active_endpoints[addr] del self.active_endpoints[addr]
else: else:
self.sock.sendto(my_data, addr) try:
self.sock.sendto(my_data, addr)
except BlockingIOError:
continue
except: except:
self.logger.exception('Oops, something went wrong!', extra={'stack': True}) self.logger.exception('Oops, something went wrong!', extra={'stack': True})
self.source.close()
self.sink.close()
def stop(self): def stop(self):
self.kill_signal = True self.kill_signal.value = True
self.join() self.join()
def main(): def main():
......
...@@ -6,12 +6,12 @@ udp_mirror: reflects udp packets to their origin ...@@ -6,12 +6,12 @@ udp_mirror: reflects udp packets to their origin
""" """
import logging import logging
import multiprocessing
import socket import socket
import sys import sys
import threading
import time
class MirrorProxy(threading.Thread):
class MirrorProxy(multiprocessing.Process):
""" """
Relays any incoming UDP packets back to the sender. Mainly useful for testing Relays any incoming UDP packets back to the sender. Mainly useful for testing
purposes. purposes.
...@@ -23,15 +23,16 @@ class MirrorProxy(threading.Thread): ...@@ -23,15 +23,16 @@ class MirrorProxy(threading.Thread):
raise ValueError('Specified port "%s" is invalid.' % listen_port) raise ValueError('Specified port "%s" is invalid.' % listen_port)
try: try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.settimeout(0.1) self.sock.settimeout(0.1)
self.sock.bind((listen_address, listen_port)) self.sock.bind((listen_address, listen_port))
except socket.error as msg: except socket.error as msg:
raise raise
self.kill_signal = False self.kill_signal = multiprocessing.Value('i', False)
self.logger = logger self.logger = logger
def run(self): def run(self):
while not self.kill_signal: while not self.kill_signal.value:
try: try:
try: try:
data, addr = self.sock.recvfrom(65536) data, addr = self.sock.recvfrom(65536)
...@@ -41,8 +42,10 @@ class MirrorProxy(threading.Thread): ...@@ -41,8 +42,10 @@ class MirrorProxy(threading.Thread):
except: except:
self.logger.exception('Oops, something went wrong!', extra={'stack': True}) self.logger.exception('Oops, something went wrong!', extra={'stack': True})
self.sock.close()
def stop(self): def stop(self):
self.kill_signal = True self.kill_signal.value = True
self.join() self.join()
def main(): def main():
...@@ -52,6 +55,7 @@ def main(): ...@@ -52,6 +55,7 @@ def main():
try: try:
proxy = MirrorProxy(listen_port=int(sys.argv[1]), logger=logger) proxy = MirrorProxy(listen_port=int(sys.argv[1]), logger=logger)
proxy.start() proxy.start()
proxy.terminate()
proxy.join() proxy.join()
except KeyboardInterrupt: except KeyboardInterrupt:
proxy.stop() proxy.stop()
......
...@@ -6,13 +6,13 @@ udp_one2manybi: for 1-to-N connections (both directions) ...@@ -6,13 +6,13 @@ udp_one2manybi: for 1-to-N connections (both directions)
""" """
import logging import logging
import multiprocessing
import select import select
import socket import socket
import sys import sys
import threading
import time import time
class One2ManyBiProxy(threading.Thread): class One2ManyBiProxy(multiprocessing.Process):
""" """
Relays UDP packets from one source client to many sink clients. Sink clients Relays UDP packets from one source client to many sink clients. Sink clients
are expected to send dummy packets in regular intervals to signal their presence. are expected to send dummy packets in regular intervals to signal their presence.
...@@ -26,15 +26,17 @@ class One2ManyBiProxy(threading.Thread): ...@@ -26,15 +26,17 @@ class One2ManyBiProxy(threading.Thread):
raise ValueError('Specified port "%s" is invalid.' % port) raise ValueError('Specified port "%s" is invalid.' % port)
try: try:
self.source = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.source = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.source.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.source.bind((listen_address, one_port)) self.source.bind((listen_address, one_port))
except socket.error as msg: except socket.error as msg:
raise raise
try: try:
self.sink = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sink = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sink.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sink.bind((listen_address, many_port)) self.sink.bind((listen_address, many_port))
except socket.error as msg: except socket.error as msg:
raise raise
self.kill_signal = False self.kill_signal = multiprocessing.Value('i', False)
self.logger = logger self.logger = logger
# key of dict is sink_client's (address, port) tuple # key of dict is sink_client's (address, port) tuple
self.active_endpoints = {} self.active_endpoints = {}
...@@ -45,7 +47,7 @@ class One2ManyBiProxy(threading.Thread): ...@@ -45,7 +47,7 @@ class One2ManyBiProxy(threading.Thread):
def run(self): def run(self):
listening_sockets = [self.source, self.sink] listening_sockets = [self.source, self.sink]
while not self.kill_signal: while not self.kill_signal.value:
try: try:
readables, _w, _x = select.select(listening_sockets, [], [], 0.1) readables, _w, _x = select.select(listening_sockets, [], [], 0.1)
for sock in readables: for sock in readables:
...@@ -64,7 +66,10 @@ class One2ManyBiProxy(threading.Thread): ...@@ -64,7 +66,10 @@ class One2ManyBiProxy(threading.Thread):
if (self.active_endpoints[one_addr] + self.timeout) < time.time(): if (self.active_endpoints[one_addr] + self.timeout) < time.time():
del self.active_endpoints[one_addr] del self.active_endpoints[one_addr]
else: else:
self.source.sendto(many_data, one_addr) try:
self.sink.sendto(many_data, one_addr)
except BlockingIOError:
continue
elif sock.getsockname()[1] == self.one_port: elif sock.getsockname()[1] == self.one_port:
# one sends to many # one sends to many
one_data, one_addr = sock.recvfrom(65536) one_data, one_addr = sock.recvfrom(65536)
...@@ -76,14 +81,21 @@ class One2ManyBiProxy(threading.Thread): ...@@ -76,14 +81,21 @@ class One2ManyBiProxy(threading.Thread):
if (self.active_endpoints[many_addr] + self.timeout) < time.time(): if (self.active_endpoints[many_addr] + self.timeout) < time.time():
del self.active_endpoints[many_addr] del self.active_endpoints[many_addr]
else: else:
self.sink.sendto(one_data, many_addr) try:
self.sink.sendto(one_data, many_addr)
except BlockingIOError:
continue
else: else:
print('We should not ever reach that point') print('We should not ever reach that point')
except: except:
self.logger.exception('Oops, something went wrong!', extra={'stack': True}) self.logger.exception('Oops, something went wrong!', extra={'stack': True})
self.source.close()
self.sink.close()
def stop(self): def stop(self):
self.kill_signal = True self.kill_signal.value = True
self.join() self.join()
def main(): def main():
......
...@@ -6,12 +6,12 @@ udp_one2manymo: for 1-to-N connections ...@@ -6,12 +6,12 @@ udp_one2manymo: for 1-to-N connections
""" """
import logging import logging
import multiprocessing
import socket import socket
import sys import sys
import threading
import time import time
class One2ManyMoProxy(threading.Thread): class One2ManyMoProxy(multiprocessing.Process):
""" """
Relays UDP packets from one source client to many sink clients. Sink clients Relays UDP packets from one source client to many sink clients. Sink clients
are expected to send dummy packets in regular intervals to signal their presence. are expected to send dummy packets in regular intervals to signal their presence.
...@@ -25,25 +25,27 @@ class One2ManyMoProxy(threading.Thread): ...@@ -25,25 +25,27 @@ class One2ManyMoProxy(threading.Thread):
raise ValueError('Specified port "%s" is invalid.' % port) raise ValueError('Specified port "%s" is invalid.' % port)
try: try:
self.source = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.source = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.source.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.source.settimeout(0.1) self.source.settimeout(0.1)
self.source.bind((listen_address, listen_port)) self.source.bind((listen_address, listen_port))
except socket.error as msg: except socket.error as msg:
raise raise
try: try:
self.sink = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sink = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sink.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Make socket non-blocking by setting timeout to 0 # Make socket non-blocking by setting timeout to 0
self.sink.settimeout(0) self.sink.settimeout(0)
self.sink.bind((listen_address, send_port)) self.sink.bind((listen_address, send_port))
except socket.error as msg: except socket.error as msg:
raise raise
self.kill_signal = False self.kill_signal = multiprocessing.Value('i', False)
self.logger = logger self.logger = logger
# key of dict is sink_client's (address, port) tuple # key of dict is sink_client's (address, port) tuple
self.sink_clients = {} self.sink_clients = {}
self.timeout = timeout self.timeout = timeout
def run(self): def run(self):
while not self.kill_signal: while not self.kill_signal.value:
try: try:
# handle incoming packets from sink clients # handle incoming packets from sink clients
while True: while True:
...@@ -66,12 +68,18 @@ class One2ManyMoProxy(threading.Thread): ...@@ -66,12 +68,18 @@ class One2ManyMoProxy(threading.Thread):
# send data to remaining sink_clients # send data to remaining sink_clients
for client in self.sink_clients.keys(): for client in self.sink_clients.keys():
self.sink.sendto(data, client) try:
self.sink.sendto(data, client)
except BlockingIOError:
continue
except: except:
self.logger.exception('Oops, something went wrong!', extra={'stack': True}) self.logger.exception('Oops, something went wrong!', extra={'stack': True})
self.source.close()
self.sink.close()
def stop(self): def stop(self):
self.kill_signal = True self.kill_signal.value = True
self.join() self.join()
def main(): def main():
......
...@@ -6,12 +6,12 @@ udp_one2onebi: for 1-to-1 connections ...@@ -6,12 +6,12 @@ udp_one2onebi: for 1-to-1 connections
""" """
import logging import logging
import multiprocessing
import socket import socket
import sys import sys
import threading
import time
class One2OneBiProxy(threading.Thread):
class One2OneBiProxy(multiprocessing.Process):
""" """
Relays UDP packets between two endpoints. This allows two end-points Relays UDP packets between two endpoints. This allows two end-points
behind NAT firewalls to communicate with each other by relaying their traffic behind NAT firewalls to communicate with each other by relaying their traffic
...@@ -24,17 +24,18 @@ class One2OneBiProxy(threading.Thread): ...@@ -24,17 +24,18 @@ class One2OneBiProxy(threading.Thread):
raise ValueError('Specified port "%s" is invalid.' % listen_port) raise ValueError('Specified port "%s" is invalid.' % listen_port)
try: try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.settimeout(0.1) self.sock.settimeout(0.1)
self.sock.bind((listen_address, listen_port)) self.sock.bind((listen_address, listen_port))
except socket.error as msg: except socket.error as msg:
raise raise
self.kill_signal = False self.kill_signal = multiprocessing.Value('i', False)
self.logger = logger self.logger = logger
def run(self): def run(self):
client1 = None client1 = None
client2 = None client2 = None
while not self.kill_signal: while not self.kill_signal.value:
try: try:
try: try:
data, addr = self.sock.recvfrom(65536) data, addr = self.sock.recvfrom(65536)
...@@ -48,15 +49,20 @@ class One2OneBiProxy(threading.Thread): ...@@ -48,15 +49,20 @@ class One2OneBiProxy(threading.Thread):
# transmit data # transmit data
if client1 and client2: if client1 and client2:
if addr == client1: try:
self.sock.sendto(data, client2) if addr == client1:
elif addr == client2: self.sock.sendto(data, client2)
self.sock.sendto(data, client1) elif addr == client2:
self.sock.sendto(data, client1)
except BlockingIOError:
continue
except: except:
self.logger.exception('Oops, something went wrong!', extra={'stack': True}) self.logger.exception('Oops, something went wrong!', extra={'stack': True})
self.sock.close()
def stop(self): def stop(self):
self.kill_signal = True self.kill_signal.value = True
self.join() self.join()
def main(): def main():
......
...@@ -104,13 +104,13 @@ def start_proxy(): ...@@ -104,13 +104,13 @@ def start_proxy():
if proxydef['type'] == 'one2oneBi': if proxydef['type'] == 'one2oneBi':
obj = proxies.One2OneBiProxy(listen_port=proxydef['port'], logger=app.logger) obj = proxies.One2OneBiProxy(listen_port=proxydef['port'], logger=app.logger)
elif proxydef['type'] == 'one2manyMo': elif proxydef['type'] == 'one2manyMo':
obj = proxies.One2ManyMoProxy(listen_port=proxydef['port'], send_port=proxydef['port']+1) obj = proxies.One2ManyMoProxy(listen_port=proxydef['port'], send_port=proxydef['port']+1, logger=app.logger)
elif proxydef['type'] == 'mirror': elif proxydef['type'] == 'mirror':
obj = proxies.MirrorProxy(listen_port=proxydef['port']) obj = proxies.MirrorProxy(listen_port=proxydef['port'], logger=app.logger)
elif proxydef['type'] == 'one2manyBi': elif proxydef['type'] == 'one2manyBi':
obj = proxies.One2ManyBiProxy(one_port=proxydef['port'], many_port=proxydef['port']+1) obj = proxies.One2ManyBiProxy(one_port=proxydef['port'], many_port=proxydef['port']+1, logger=app.logger)
elif proxydef['type'] == 'many2manyBi': elif proxydef['type'] == 'many2manyBi':
obj = proxies.Many2ManyBiProxy(listen_port=proxydef['port']) obj = proxies.Many2ManyBiProxy(listen_port=proxydef['port'], logger=app.logger)
else: else:
response = {'status': 'Error', 'msg': 'An unknown error occurred'} response = {'status': 'Error', 'msg': 'An unknown error occurred'}
return r(json.dumps(response), 422) return r(json.dumps(response), 422)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment