From 1f2a0ca302105b3338c8e2e754e4df7283900688 Mon Sep 17 00:00:00 2001 From: lemon <961222258@qq.com> Date: Sat, 15 Feb 2020 22:19:01 +0800 Subject: [PATCH] 新增转移终端优化方案 --- core/core.py | 26 +++++++++++++++++--------- core/monitor.py | 10 ++++------ lib/control_server.py | 15 ++++++++++++++- lib/socket_.py | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------ 4 files changed, 97 insertions(+), 34 deletions(-) diff --git a/core/core.py b/core/core.py index c2136e6..4ba54f8 100644 --- a/core/core.py +++ b/core/core.py @@ -5,17 +5,15 @@ # @File : core.py # @Software: PyCharm import json -import threading from tornado import ioloop from conf import settings from core.monitor import Monitor -from lib.common import Exchange +from lib import control_server from lib.socket_ import MessageSocketClient -socket = MessageSocketClient() -monitor = Monitor.bind(socket) +monitor = Monitor() def message_receive(message): @@ -24,8 +22,6 @@ def message_receive(message): monitor.login("HantzKlair97@mail.ru", "nantian888") elif message == 'exit1': monitor.logout('100047151842270') - elif message == 'uid': - socket.send({"keys": monitor.members()}) else: try: req = json.loads(message) @@ -43,12 +39,24 @@ def message_receive(message): print(traceback.format_exc()) -def connect_success(): - # Timer(2, monitor._auto_login).start()#连接后发送下 +def server_replace(ws_url): + monitor._temp_socket = MessageSocketClient(connect_success, message_receive) + monitor._temp_socket.connect(ws_url, reconnect=False) + + +def connect_success(sock): + if monitor._temp_socket == sock: + sock.auto_reconnet = True + monitor._socket.quit() # 旧的断开 + monitor._temp_socket = None + + monitor.bind(sock) monitor._init_server_() + monitor._socket.new_server_func = lambda: control_server.get_ws_address(monitor._name, sock.ws_url) + monitor._socket.server_replace = server_replace def run(): + socket = MessageSocketClient(connect_success, message_receive) socket.connect(**settings.get_server()) - socket.receive(message_receive).success(connect_success) ioloop.IOLoop.instance().start() diff --git a/core/monitor.py b/core/monitor.py index 0b87f01..4bc73b9 100644 --- a/core/monitor.py +++ b/core/monitor.py @@ -4,6 +4,7 @@ # @Author : Lemon # @File : monitor.py # @Software: PyCharm +import random import time import uuid import functools @@ -29,18 +30,15 @@ class Monitor(callback.CallBack): def __init__(self): super().__init__() - self._socket = None + self._socket = self._temp_socket = None self._listenlist = dict() self._imei = Config.get('imei', lambda: uuid.uuid1().hex) self._name = Config.get('name', control_server.get_init_name) self.executor = ThreadPoolExecutor(50, 'task_thread') self.init_config = {} - @classmethod - def bind(cls, socket): - self = cls() + def bind(self, socket): self._socket = socket - return self def _auto_login(self): user_list = UserList.query(status=Status.ONLINE) @@ -125,7 +123,7 @@ class Monitor(callback.CallBack): 'imei': self._imei, 'name': self._name, }), - "type": "initialize" + "type": "initialize", } self._socket.send(payload) self._socket.payload_data = self._heartbeat # 替换心跳数据获取方式 diff --git a/lib/control_server.py b/lib/control_server.py index f2f516f..d90a254 100644 --- a/lib/control_server.py +++ b/lib/control_server.py @@ -3,9 +3,22 @@ import requests +SERVER = 'http://47.115.27.201:39001' + def get_init_name(): - r = requests.get('http://47.115.27.201:39001/get_only_int') + r = requests.get(SERVER + '/get_only_int') num = r.json().get('data', {}).get('only_int', 0) assert num, '初始化接口,无法获取终端索引ID' return "FBCHAT-{}".format(num) + + +def get_ws_address(name, source_ws): + data = {"name": name, "ws": source_ws} + + try: + r = requests.post(SERVER + '/post_terminal_info', data, timeout=1) + new_ws = r.json().get('data', {}).get('ws', source_ws) + except: + new_ws = source_ws + return new_ws, not new_ws == source_ws diff --git a/lib/socket_.py b/lib/socket_.py index 7cc1ed1..793e71d 100644 --- a/lib/socket_.py +++ b/lib/socket_.py @@ -6,6 +6,9 @@ # @Software: PyCharm import functools import json +import random +import time +from threading import Timer from tornado import gen from tornado import httpclient @@ -111,20 +114,36 @@ class WebSocketClient(object): class MessageSocketClient(WebSocketClient): - def __init__(self, connect_timeout=DEFAULT_CONNECT_TIMEOUT, + def __init__(self, connect_success=None, message_receive=None, connect_timeout=DEFAULT_CONNECT_TIMEOUT, request_timeout=DEFAULT_REQUEST_TIMEOUT): + self._connect_success = connect_success + self._message_receive = message_receive + + self._later_handler = dict() self.connect_timeout = connect_timeout self.request_timeout = request_timeout self._io_loop = ioloop.IOLoop.current() self.ws_url = None - self.auto_reconnet = False - self._message_receive = self._connect_success = None + self.auto_net = False + self.heartbeat_interval_in_secs = 30 super(MessageSocketClient, self).__init__(self._io_loop, self.connect_timeout, self.request_timeout) - self.heartbeat_interval_in_secs = 30 + self._later_handler['move_server'] = self._io_loop.call_later(random.randint(30, 60), self.auto_move_server) + + def new_server_func(self): + return self.ws_url, False + + def server_replace(self, ws_url): + print("需要连接新服务器", ws_url) + + def auto_move_server(self): + ws_url, move = self.new_server_func() + if move: + self.server_replace(ws_url) + self._later_handler['move_server'] = self._io_loop.call_later(random.randint(30, 30), self.auto_move_server) def connect(self, url, reconnect=True, reconnect_interval=3): self.ws_url = url @@ -134,8 +153,9 @@ class MessageSocketClient(WebSocketClient): def on_connection_success(self): if self._connect_success: - self._connect_success() - self._io_loop.call_later(self.heartbeat_interval_in_secs, functools.partial(self.sendheartbeat)) + self._connect_success(self) + self._later_handler['heartbeat'] = self._io_loop.call_later(self.heartbeat_interval_in_secs, + functools.partial(self.sendheartbeat)) def on_connection_close(self, reason): print('%s Connection closed reason=%s' % (self.ws_url, reason,)) @@ -143,8 +163,9 @@ class MessageSocketClient(WebSocketClient): def reconnect(self): if not self.is_connected() and self.auto_reconnet: - self._io_loop.call_later(self.reconnect_interval, - super(MessageSocketClient, self).connect, self.ws_url) + self._later_handler['reconnect'] = self._io_loop.call_later(self.reconnect_interval, + super(MessageSocketClient, self).connect, + self.ws_url) def payload_data(self): return {"type": "ping"} @@ -154,19 +175,42 @@ class MessageSocketClient(WebSocketClient): bool = self.send(msg) if msg else False if msg is None or bool: - self._io_loop.call_later(self.heartbeat_interval_in_secs, - functools.partial(self.sendheartbeat)) - - def receive(self, message_receive): - self._message_receive = message_receive - return self - - def success(self, connect_success): - self._connect_success = connect_success - return self + self._later_handler['heartbeat'] = self._io_loop.call_later(self.heartbeat_interval_in_secs, + functools.partial(self.sendheartbeat)) def on_message(self, msg): if not self._message_receive: raise BaseException("未设置接收消息函数->", self.receive) else: self._message_receive(msg) + + def quit(self): + self.auto_reconnet = False + self.close("quit") + for k, handler in self._later_handler.items(): + self._io_loop.remove_timeout(handler) + + +if __name__ == '__main__': + + def success(sock): + global sockccc + sockccc = sock + + + def rec(msg): + if msg == 'quit': + sockccc.quit() + + + def interval(): + if sockccc: + sockccc.send({'test': time.time()}) + Timer(3, interval).start() + + + s1 = MessageSocketClient() + s1.connect('ws://localhost:10000', reconnect=False) + + Timer(2, interval).start() + ioloop.IOLoop.current().start() -- libgit2 0.24.0