moment/harmonyqml/backend/network_manager.py

122 lines
3.3 KiB
Python
Raw Normal View History

# Copyright 2019 miruka
# This file is part of harmonyqml, licensed under GPLv3.
import logging
import socket
import ssl
import time
from threading import Lock
from typing import Callable, Optional, Tuple
from uuid import UUID
import nio
import nio.responses as nr
OptSock = Optional[ssl.SSLSocket]
NioRequestFunc = Callable[..., Tuple[UUID, bytes]]
class NioErrorResponse(Exception):
def __init__(self, response: nr.ErrorResponse) -> None:
self.response = response
super().__init__(str(response))
class NetworkManager:
http_retry_codes = {408, 429, 500, 502, 503, 504, 507}
def __init__(self, host: str, port: int, nio_client: nio.client.HttpClient
) -> None:
self.host = host
self.port = port
self.nio = nio_client
self._ssl_context: ssl.SSLContext = ssl.create_default_context()
self._ssl_session: Optional[ssl.SSLSession] = None
self._lock: Lock = Lock()
def _get_socket(self) -> ssl.SSLSocket:
sock = self._ssl_context.wrap_socket( # type: ignore
socket.create_connection((self.host, self.port)),
server_hostname = self.host,
session = self._ssl_session,
)
self._ssl_session = self._ssl_session or sock.session
return sock
@staticmethod
def _close_socket(sock: socket.socket) -> None:
try:
sock.shutdown(how=socket.SHUT_RDWR)
except OSError: # Already closer by server
pass
sock.close()
def read(self, with_sock: OptSock = None) -> nr.Response:
sock = with_sock or self._get_socket()
response = None
while not response:
left_to_send = self.nio.data_to_send()
if left_to_send:
self.write(left_to_send, sock)
self.nio.receive(sock.recv(4096))
response = self.nio.next_response()
if isinstance(response, nr.ErrorResponse):
raise NioErrorResponse(response)
if not with_sock:
self._close_socket(sock)
return response
def write(self, data: bytes, with_sock: OptSock = None) -> None:
if not data:
return
sock = with_sock or self._get_socket()
sock.sendall(data)
if not with_sock:
self._close_socket(sock)
def talk(self, nio_func: NioRequestFunc, *args, **kwargs) -> nr.Response:
with self._lock:
while True:
to_send = nio_func(*args, **kwargs)[1]
sock = self._get_socket()
try:
self.write(to_send, sock)
response = self.read(sock)
except NioErrorResponse as err:
logging.error("bad read for %s: %s", nio_func, err)
self._close_socket(sock)
if self._should_abort_talk(err):
logging.error("aborting talk")
break
time.sleep(10)
else:
break
self._close_socket(sock)
return response
def _should_abort_talk(self, err: NioErrorResponse) -> bool:
if err.response.status_code in self.http_retry_codes:
return False
return True