Commit d5ab4303 authored by Florian Bruggisser's avatar Florian Bruggisser
Browse files

implemented multiprocessing

parent af95acb7
proxies/__pycache__/
__pycache__/
venv
.idea
\ No newline at end of file
......@@ -6,12 +6,12 @@ udp_many2manybi: for N-to-N connections (OSC)
"""
import logging
import multiprocessing
import socket
import sys
import threading
import time
class Many2ManyBiProxy(threading.Thread):
class Many2ManyBiProxy(multiprocessing.Process):
"""
Relays UDP packets between many endpoints. Incoming packets are forwarded to all other
endpoints (not to itself). OSC messages '/hb' are not forwarded, but keep connection
......@@ -21,14 +21,14 @@ class Many2ManyBiProxy(threading.Thread):
def __init__(self, listen_port=None, listen_address='0.0.0.0', timeout=10, logger=None):
super(Many2ManyBiProxy, self).__init__()
if not isinstance(listen_port, int) or not 1024 <= listen_port <= 65535:
raise ValueError('Specified port "%s" is invalid.' % port)
raise ValueError('Specified port "%s" is invalid.' % listen_port)
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.settimeout(0.1)
self.sock.bind((listen_address, listen_port))
except socket.error as msg:
raise
self.kill_signal = False
self.kill_signal = multiprocessing.Value('i', False)
# key of dict is client's (address, port) tuple
self.active_endpoints = {}
self.timeout = timeout
......@@ -36,7 +36,7 @@ class Many2ManyBiProxy(threading.Thread):
self.heartbeat_sequence = bytes([47, 104, 98, 0, 44, 0, 0, 0])
def run(self):
while not self.kill_signal:
while not self.kill_signal.value:
try:
try:
my_data, my_addr = self.sock.recvfrom(65536)
......@@ -55,7 +55,7 @@ class Many2ManyBiProxy(threading.Thread):
self.logger.exception('Oops, something went wrong!', extra={'stack': True})
def stop(self):
self.kill_signal = True
self.kill_signal.value = True
self.join()
def main():
......
......@@ -6,12 +6,12 @@ udp_mirror: reflects udp packets to their origin
"""
import logging
import multiprocessing
import socket
import sys
import threading
import time
class MirrorProxy(threading.Thread):
class MirrorProxy(multiprocessing.Process):
"""
Relays any incoming UDP packets back to the sender. Mainly useful for testing
purposes.
......@@ -27,11 +27,11 @@ class MirrorProxy(threading.Thread):
self.sock.bind((listen_address, listen_port))
except socket.error as msg:
raise
self.kill_signal = False
self.kill_signal = multiprocessing.Value('i', False)
self.logger = logger
def run(self):
while not self.kill_signal:
while not self.kill_signal.value:
try:
try:
data, addr = self.sock.recvfrom(65536)
......@@ -42,7 +42,7 @@ class MirrorProxy(threading.Thread):
self.logger.exception('Oops, something went wrong!', extra={'stack': True})
def stop(self):
self.kill_signal = True
self.kill_signal.value = True
self.join()
def main():
......@@ -52,6 +52,7 @@ def main():
try:
proxy = MirrorProxy(listen_port=int(sys.argv[1]), logger=logger)
proxy.start()
proxy.terminate()
proxy.join()
except KeyboardInterrupt:
proxy.stop()
......
......@@ -6,13 +6,13 @@ udp_one2manybi: for 1-to-N connections (both directions)
"""
import logging
import multiprocessing
import select
import socket
import sys
import threading
import time
class One2ManyBiProxy(threading.Thread):
class One2ManyBiProxy(multiprocessing.Process):
"""
Relays UDP packets from one source client to many sink clients. Sink clients
are expected to send dummy packets in regular intervals to signal their presence.
......@@ -34,7 +34,7 @@ class One2ManyBiProxy(threading.Thread):
self.sink.bind((listen_address, many_port))
except socket.error as msg:
raise
self.kill_signal = False
self.kill_signal = multiprocessing.Value('i', False)
self.logger = logger
# key of dict is sink_client's (address, port) tuple
self.active_endpoints = {}
......@@ -45,7 +45,7 @@ class One2ManyBiProxy(threading.Thread):
def run(self):
listening_sockets = [self.source, self.sink]
while not self.kill_signal:
while not self.kill_signal.value:
try:
readables, _w, _x = select.select(listening_sockets, [], [], 0.1)
for sock in readables:
......@@ -83,7 +83,7 @@ class One2ManyBiProxy(threading.Thread):
self.logger.exception('Oops, something went wrong!', extra={'stack': True})
def stop(self):
self.kill_signal = True
self.kill_signal.value = True
self.join()
def main():
......
......@@ -6,12 +6,12 @@ udp_one2manymo: for 1-to-N connections
"""
import logging
import multiprocessing
import socket
import sys
import threading
import time
class One2ManyMoProxy(threading.Thread):
class One2ManyMoProxy(multiprocessing.Process):
"""
Relays UDP packets from one source client to many sink clients. Sink clients
are expected to send dummy packets in regular intervals to signal their presence.
......@@ -36,14 +36,14 @@ class One2ManyMoProxy(threading.Thread):
self.sink.bind((listen_address, send_port))
except socket.error as msg:
raise
self.kill_signal = False
self.kill_signal = multiprocessing.Value('i', False)
self.logger = logger
# key of dict is sink_client's (address, port) tuple
self.sink_clients = {}
self.timeout = timeout
def run(self):
while not self.kill_signal:
while not self.kill_signal.value:
try:
# handle incoming packets from sink clients
while True:
......@@ -71,7 +71,7 @@ class One2ManyMoProxy(threading.Thread):
self.logger.exception('Oops, something went wrong!', extra={'stack': True})
def stop(self):
self.kill_signal = True
self.kill_signal.value = True
self.join()
def main():
......
......@@ -6,12 +6,12 @@ udp_one2onebi: for 1-to-1 connections
"""
import logging
import multiprocessing
import socket
import sys
import threading
import time
class One2OneBiProxy(threading.Thread):
class One2OneBiProxy(multiprocessing.Process):
"""
Relays UDP packets between two endpoints. This allows two end-points
behind NAT firewalls to communicate with each other by relaying their traffic
......@@ -28,13 +28,13 @@ class One2OneBiProxy(threading.Thread):
self.sock.bind((listen_address, listen_port))
except socket.error as msg:
raise
self.kill_signal = False
self.kill_signal = multiprocessing.Value('i', False)
self.logger = logger
def run(self):
client1 = None
client2 = None
while not self.kill_signal:
while not self.kill_signal.value:
try:
try:
data, addr = self.sock.recvfrom(65536)
......@@ -56,7 +56,7 @@ class One2OneBiProxy(threading.Thread):
self.logger.exception('Oops, something went wrong!', extra={'stack': True})
def stop(self):
self.kill_signal = True
self.kill_signal.value = True
self.join()
def main():
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment