Module hub.core.ipc
Expand source code
import hub
from hub.util.threading import terminate_thread
import socketserver
from typing import Optional, Callable, Dict
import inspect
import threading
import queue
import multiprocessing.connection
import atexit
import time
import uuid
_DISCONNECT_MESSAGE = "!@_dIsCoNNect"
def _get_free_port() -> int:
with socketserver.TCPServer(("localhost", 0), None) as s: # type: ignore
return s.server_address[1]
class Server(object):
def __init__(self, callback):
self.callback = callback
self.start()
atexit.register(self.stop)
def start(self):
if getattr(self, "_connect_thread", None):
return
self.port = _get_free_port()
self._listener = multiprocessing.connection.Listener(("localhost", self.port))
self._connections = {}
self._connect_thread = threading.Thread(target=self._connect_loop, daemon=True)
self._connect_thread.start()
def _connect_loop(self):
try:
while True:
try:
connection = self._listener.accept()
key = str(uuid.uuid4())
thread = threading.Thread(target=self._receive_loop, args=(key,))
self._connections[key] = (connection, thread)
thread.start()
except Exception:
time.sleep(0.1)
except Exception:
pass # Thread termination
def _receive_loop(self, key):
try:
while True:
connection = self._connections[key][0]
try:
msg = connection.recv()
except ConnectionAbortedError:
return # Required to avoid pytest.PytestUnhandledThreadExceptionWarning
if msg == _DISCONNECT_MESSAGE:
self._connections.pop(key)
connection.close()
return
self.callback(msg)
except Exception:
pass # Thread termination
def stop(self):
if self._connect_thread:
terminate_thread(self._connect_thread) # Do not accept anymore connections
self._connect_thread = None
timer = 0
while self._connections: # wait for clients to disconnect
if timer >= 5:
# clients taking too long, force shutdown
for connection, thread in self._connections.values():
terminate_thread(thread)
connection.close()
self._connections.clear()
else:
timer += 1
time.sleep(1)
self._listener.close()
@property
def url(self) -> str:
return f"http://localhost:{self.port}/"
class Client(object):
def __init__(self, port):
self.port = port
self._buffer = []
self._client = None
self._closed = False
threading.Thread(target=self._connect, daemon=True).start()
atexit.register(self.close)
def _connect(self):
while True:
try:
self._client = multiprocessing.connection.Client(
("localhost", self.port)
)
for stuff in self._buffer:
self._client.send(stuff)
self._buffer.clear()
return
except Exception:
time.sleep(1)
def send(self, stuff):
if self._client:
try:
self._client.send(stuff)
except Exception: # Server shutdown
pass
else:
self._buffer.append(stuff)
def close(self):
if self._closed:
return
try:
while not self._client:
time.sleep(0.5)
for stuff in self._buffer:
self._client.send(stuff)
self._client.send(_DISCONNECT_MESSAGE)
self._client.close()
self._client = None
self._closed = True
except Exception as e:
pass
Classes
class Client (port)
-
Expand source code
class Client(object): def __init__(self, port): self.port = port self._buffer = [] self._client = None self._closed = False threading.Thread(target=self._connect, daemon=True).start() atexit.register(self.close) def _connect(self): while True: try: self._client = multiprocessing.connection.Client( ("localhost", self.port) ) for stuff in self._buffer: self._client.send(stuff) self._buffer.clear() return except Exception: time.sleep(1) def send(self, stuff): if self._client: try: self._client.send(stuff) except Exception: # Server shutdown pass else: self._buffer.append(stuff) def close(self): if self._closed: return try: while not self._client: time.sleep(0.5) for stuff in self._buffer: self._client.send(stuff) self._client.send(_DISCONNECT_MESSAGE) self._client.close() self._client = None self._closed = True except Exception as e: pass
Methods
def close(self)
-
Expand source code
def close(self): if self._closed: return try: while not self._client: time.sleep(0.5) for stuff in self._buffer: self._client.send(stuff) self._client.send(_DISCONNECT_MESSAGE) self._client.close() self._client = None self._closed = True except Exception as e: pass
def send(self, stuff)
-
Expand source code
def send(self, stuff): if self._client: try: self._client.send(stuff) except Exception: # Server shutdown pass else: self._buffer.append(stuff)
class Server (callback)
-
Expand source code
class Server(object): def __init__(self, callback): self.callback = callback self.start() atexit.register(self.stop) def start(self): if getattr(self, "_connect_thread", None): return self.port = _get_free_port() self._listener = multiprocessing.connection.Listener(("localhost", self.port)) self._connections = {} self._connect_thread = threading.Thread(target=self._connect_loop, daemon=True) self._connect_thread.start() def _connect_loop(self): try: while True: try: connection = self._listener.accept() key = str(uuid.uuid4()) thread = threading.Thread(target=self._receive_loop, args=(key,)) self._connections[key] = (connection, thread) thread.start() except Exception: time.sleep(0.1) except Exception: pass # Thread termination def _receive_loop(self, key): try: while True: connection = self._connections[key][0] try: msg = connection.recv() except ConnectionAbortedError: return # Required to avoid pytest.PytestUnhandledThreadExceptionWarning if msg == _DISCONNECT_MESSAGE: self._connections.pop(key) connection.close() return self.callback(msg) except Exception: pass # Thread termination def stop(self): if self._connect_thread: terminate_thread(self._connect_thread) # Do not accept anymore connections self._connect_thread = None timer = 0 while self._connections: # wait for clients to disconnect if timer >= 5: # clients taking too long, force shutdown for connection, thread in self._connections.values(): terminate_thread(thread) connection.close() self._connections.clear() else: timer += 1 time.sleep(1) self._listener.close() @property def url(self) -> str: return f"http://localhost:{self.port}/"
Instance variables
var url
-
Expand source code
@property def url(self) -> str: return f"http://localhost:{self.port}/"
Methods
def start(self)
-
Expand source code
def start(self): if getattr(self, "_connect_thread", None): return self.port = _get_free_port() self._listener = multiprocessing.connection.Listener(("localhost", self.port)) self._connections = {} self._connect_thread = threading.Thread(target=self._connect_loop, daemon=True) self._connect_thread.start()
def stop(self)
-
Expand source code
def stop(self): if self._connect_thread: terminate_thread(self._connect_thread) # Do not accept anymore connections self._connect_thread = None timer = 0 while self._connections: # wait for clients to disconnect if timer >= 5: # clients taking too long, force shutdown for connection, thread in self._connections.values(): terminate_thread(thread) connection.close() self._connections.clear() else: timer += 1 time.sleep(1) self._listener.close()