作者 lemon

新增转移终端优化方案

@@ -5,17 +5,15 @@ @@ -5,17 +5,15 @@
5 # @File : core.py 5 # @File : core.py
6 # @Software: PyCharm 6 # @Software: PyCharm
7 import json 7 import json
8 -import threading  
9 8
10 from tornado import ioloop 9 from tornado import ioloop
11 10
12 from conf import settings 11 from conf import settings
13 from core.monitor import Monitor 12 from core.monitor import Monitor
14 -from lib.common import Exchange 13 +from lib import control_server
15 from lib.socket_ import MessageSocketClient 14 from lib.socket_ import MessageSocketClient
16 15
17 -socket = MessageSocketClient()  
18 -monitor = Monitor.bind(socket) 16 +monitor = Monitor()
19 17
20 18
21 def message_receive(message): 19 def message_receive(message):
@@ -24,8 +22,6 @@ def message_receive(message): @@ -24,8 +22,6 @@ def message_receive(message):
24 monitor.login("[email protected].ru", "nantian888") 22 monitor.login("[email protected]", "nantian888")
25 elif message == 'exit1': 23 elif message == 'exit1':
26 monitor.logout('100047151842270') 24 monitor.logout('100047151842270')
27 - elif message == 'uid':  
28 - socket.send({"keys": monitor.members()})  
29 else: 25 else:
30 try: 26 try:
31 req = json.loads(message) 27 req = json.loads(message)
@@ -43,12 +39,24 @@ def message_receive(message): @@ -43,12 +39,24 @@ def message_receive(message):
43 print(traceback.format_exc()) 39 print(traceback.format_exc())
44 40
45 41
46 -def connect_success():  
47 - # Timer(2, monitor._auto_login).start()#连接后发送下 42 +def server_replace(ws_url):
  43 + monitor._temp_socket = MessageSocketClient(connect_success, message_receive)
  44 + monitor._temp_socket.connect(ws_url, reconnect=False)
  45 +
  46 +
  47 +def connect_success(sock):
  48 + if monitor._temp_socket == sock:
  49 + sock.auto_reconnet = True
  50 + monitor._socket.quit() # 旧的断开
  51 + monitor._temp_socket = None
  52 +
  53 + monitor.bind(sock)
48 monitor._init_server_() 54 monitor._init_server_()
  55 + monitor._socket.new_server_func = lambda: control_server.get_ws_address(monitor._name, sock.ws_url)
  56 + monitor._socket.server_replace = server_replace
49 57
50 58
51 def run(): 59 def run():
  60 + socket = MessageSocketClient(connect_success, message_receive)
52 socket.connect(**settings.get_server()) 61 socket.connect(**settings.get_server())
53 - socket.receive(message_receive).success(connect_success)  
54 ioloop.IOLoop.instance().start() 62 ioloop.IOLoop.instance().start()
@@ -4,6 +4,7 @@ @@ -4,6 +4,7 @@
4 # @Author : Lemon 4 # @Author : Lemon
5 # @File : monitor.py 5 # @File : monitor.py
6 # @Software: PyCharm 6 # @Software: PyCharm
  7 +import random
7 import time 8 import time
8 import uuid 9 import uuid
9 import functools 10 import functools
@@ -29,18 +30,15 @@ class Monitor(callback.CallBack): @@ -29,18 +30,15 @@ class Monitor(callback.CallBack):
29 30
30 def __init__(self): 31 def __init__(self):
31 super().__init__() 32 super().__init__()
32 - self._socket = None 33 + self._socket = self._temp_socket = None
33 self._listenlist = dict() 34 self._listenlist = dict()
34 self._imei = Config.get('imei', lambda: uuid.uuid1().hex) 35 self._imei = Config.get('imei', lambda: uuid.uuid1().hex)
35 self._name = Config.get('name', control_server.get_init_name) 36 self._name = Config.get('name', control_server.get_init_name)
36 self.executor = ThreadPoolExecutor(50, 'task_thread') 37 self.executor = ThreadPoolExecutor(50, 'task_thread')
37 self.init_config = {} 38 self.init_config = {}
38 39
39 - @classmethod  
40 - def bind(cls, socket):  
41 - self = cls() 40 + def bind(self, socket):
42 self._socket = socket 41 self._socket = socket
43 - return self  
44 42
45 def _auto_login(self): 43 def _auto_login(self):
46 user_list = UserList.query(status=Status.ONLINE) 44 user_list = UserList.query(status=Status.ONLINE)
@@ -125,7 +123,7 @@ class Monitor(callback.CallBack): @@ -125,7 +123,7 @@ class Monitor(callback.CallBack):
125 'imei': self._imei, 123 'imei': self._imei,
126 'name': self._name, 124 'name': self._name,
127 }), 125 }),
128 - "type": "initialize" 126 + "type": "initialize",
129 } 127 }
130 self._socket.send(payload) 128 self._socket.send(payload)
131 self._socket.payload_data = self._heartbeat # 替换心跳数据获取方式 129 self._socket.payload_data = self._heartbeat # 替换心跳数据获取方式
@@ -3,9 +3,22 @@ @@ -3,9 +3,22 @@
3 3
4 import requests 4 import requests
5 5
  6 +SERVER = 'http://47.115.27.201:39001'
  7 +
6 8
7 def get_init_name(): 9 def get_init_name():
8 - r = requests.get('http://47.115.27.201:39001/get_only_int') 10 + r = requests.get(SERVER + '/get_only_int')
9 num = r.json().get('data', {}).get('only_int', 0) 11 num = r.json().get('data', {}).get('only_int', 0)
10 assert num, '初始化接口,无法获取终端索引ID' 12 assert num, '初始化接口,无法获取终端索引ID'
11 return "FBCHAT-{}".format(num) 13 return "FBCHAT-{}".format(num)
  14 +
  15 +
  16 +def get_ws_address(name, source_ws):
  17 + data = {"name": name, "ws": source_ws}
  18 +
  19 + try:
  20 + r = requests.post(SERVER + '/post_terminal_info', data, timeout=1)
  21 + new_ws = r.json().get('data', {}).get('ws', source_ws)
  22 + except:
  23 + new_ws = source_ws
  24 + return new_ws, not new_ws == source_ws
@@ -6,6 +6,9 @@ @@ -6,6 +6,9 @@
6 # @Software: PyCharm 6 # @Software: PyCharm
7 import functools 7 import functools
8 import json 8 import json
  9 +import random
  10 +import time
  11 +from threading import Timer
9 12
10 from tornado import gen 13 from tornado import gen
11 from tornado import httpclient 14 from tornado import httpclient
@@ -111,20 +114,36 @@ class WebSocketClient(object): @@ -111,20 +114,36 @@ class WebSocketClient(object):
111 114
112 class MessageSocketClient(WebSocketClient): 115 class MessageSocketClient(WebSocketClient):
113 116
114 - def __init__(self, connect_timeout=DEFAULT_CONNECT_TIMEOUT, 117 + def __init__(self, connect_success=None, message_receive=None, connect_timeout=DEFAULT_CONNECT_TIMEOUT,
115 request_timeout=DEFAULT_REQUEST_TIMEOUT): 118 request_timeout=DEFAULT_REQUEST_TIMEOUT):
  119 + self._connect_success = connect_success
  120 + self._message_receive = message_receive
  121 +
  122 + self._later_handler = dict()
116 self.connect_timeout = connect_timeout 123 self.connect_timeout = connect_timeout
117 self.request_timeout = request_timeout 124 self.request_timeout = request_timeout
118 self._io_loop = ioloop.IOLoop.current() 125 self._io_loop = ioloop.IOLoop.current()
119 self.ws_url = None 126 self.ws_url = None
120 - self.auto_reconnet = False  
121 - self._message_receive = self._connect_success = None 127 + self.auto_net = False
  128 + self.heartbeat_interval_in_secs = 30
122 129
123 super(MessageSocketClient, self).__init__(self._io_loop, 130 super(MessageSocketClient, self).__init__(self._io_loop,
124 self.connect_timeout, 131 self.connect_timeout,
125 self.request_timeout) 132 self.request_timeout)
126 133
127 - self.heartbeat_interval_in_secs = 30 134 + self._later_handler['move_server'] = self._io_loop.call_later(random.randint(30, 60), self.auto_move_server)
  135 +
  136 + def new_server_func(self):
  137 + return self.ws_url, False
  138 +
  139 + def server_replace(self, ws_url):
  140 + print("需要连接新服务器", ws_url)
  141 +
  142 + def auto_move_server(self):
  143 + ws_url, move = self.new_server_func()
  144 + if move:
  145 + self.server_replace(ws_url)
  146 + self._later_handler['move_server'] = self._io_loop.call_later(random.randint(30, 30), self.auto_move_server)
128 147
129 def connect(self, url, reconnect=True, reconnect_interval=3): 148 def connect(self, url, reconnect=True, reconnect_interval=3):
130 self.ws_url = url 149 self.ws_url = url
@@ -134,8 +153,9 @@ class MessageSocketClient(WebSocketClient): @@ -134,8 +153,9 @@ class MessageSocketClient(WebSocketClient):
134 153
135 def on_connection_success(self): 154 def on_connection_success(self):
136 if self._connect_success: 155 if self._connect_success:
137 - self._connect_success()  
138 - self._io_loop.call_later(self.heartbeat_interval_in_secs, functools.partial(self.sendheartbeat)) 156 + self._connect_success(self)
  157 + self._later_handler['heartbeat'] = self._io_loop.call_later(self.heartbeat_interval_in_secs,
  158 + functools.partial(self.sendheartbeat))
139 159
140 def on_connection_close(self, reason): 160 def on_connection_close(self, reason):
141 print('%s Connection closed reason=%s' % (self.ws_url, reason,)) 161 print('%s Connection closed reason=%s' % (self.ws_url, reason,))
@@ -143,8 +163,9 @@ class MessageSocketClient(WebSocketClient): @@ -143,8 +163,9 @@ class MessageSocketClient(WebSocketClient):
143 163
144 def reconnect(self): 164 def reconnect(self):
145 if not self.is_connected() and self.auto_reconnet: 165 if not self.is_connected() and self.auto_reconnet:
146 - self._io_loop.call_later(self.reconnect_interval,  
147 - super(MessageSocketClient, self).connect, self.ws_url) 166 + self._later_handler['reconnect'] = self._io_loop.call_later(self.reconnect_interval,
  167 + super(MessageSocketClient, self).connect,
  168 + self.ws_url)
148 169
149 def payload_data(self): 170 def payload_data(self):
150 return {"type": "ping"} 171 return {"type": "ping"}
@@ -154,19 +175,42 @@ class MessageSocketClient(WebSocketClient): @@ -154,19 +175,42 @@ class MessageSocketClient(WebSocketClient):
154 bool = self.send(msg) if msg else False 175 bool = self.send(msg) if msg else False
155 176
156 if msg is None or bool: 177 if msg is None or bool:
157 - self._io_loop.call_later(self.heartbeat_interval_in_secs, 178 + self._later_handler['heartbeat'] = self._io_loop.call_later(self.heartbeat_interval_in_secs,
158 functools.partial(self.sendheartbeat)) 179 functools.partial(self.sendheartbeat))
159 180
160 - def receive(self, message_receive):  
161 - self._message_receive = message_receive  
162 - return self  
163 -  
164 - def success(self, connect_success):  
165 - self._connect_success = connect_success  
166 - return self  
167 -  
168 def on_message(self, msg): 181 def on_message(self, msg):
169 if not self._message_receive: 182 if not self._message_receive:
170 raise BaseException("未设置接收消息函数->", self.receive) 183 raise BaseException("未设置接收消息函数->", self.receive)
171 else: 184 else:
172 self._message_receive(msg) 185 self._message_receive(msg)
  186 +
  187 + def quit(self):
  188 + self.auto_reconnet = False
  189 + self.close("quit")
  190 + for k, handler in self._later_handler.items():
  191 + self._io_loop.remove_timeout(handler)
  192 +
  193 +
  194 +if __name__ == '__main__':
  195 +
  196 + def success(sock):
  197 + global sockccc
  198 + sockccc = sock
  199 +
  200 +
  201 + def rec(msg):
  202 + if msg == 'quit':
  203 + sockccc.quit()
  204 +
  205 +
  206 + def interval():
  207 + if sockccc:
  208 + sockccc.send({'test': time.time()})
  209 + Timer(3, interval).start()
  210 +
  211 +
  212 + s1 = MessageSocketClient()
  213 + s1.connect('ws://localhost:10000', reconnect=False)
  214 +
  215 + Timer(2, interval).start()
  216 + ioloop.IOLoop.current().start()