socket_.py 7.5 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2020-02-09 13:43
# @Author  : Lemon
# @File    : socket.py
# @Software: PyCharm
import datetime
import functools
import json
import random
import time
from threading import Timer

from tornado import gen
from tornado import httpclient
from tornado import httputil
from tornado import ioloop
from tornado import websocket

APPLICATION_JSON = 'application/json'
DEFAULT_CONNECT_TIMEOUT = 30
DEFAULT_REQUEST_TIMEOUT = 30


class WebSocketClient(object):
    """Base for web __socket clients.
    """
    DISCONNECTED = 0
    CONNECTING = 1
    CONNECTED = 2

    def __init__(self, io_loop=None,
                 connect_timeout=DEFAULT_CONNECT_TIMEOUT,
                 request_timeout=DEFAULT_REQUEST_TIMEOUT):

        self.connect_timeout = connect_timeout
        self.request_timeout = request_timeout
        self._io_loop = io_loop or ioloop.IOLoop.current()
        self._ws_connection = None
        self._connect_status = self.DISCONNECTED

    def connect(self, url):
        self._connect_status = self.CONNECTING

        headers = httputil.HTTPHeaders({
            'Clientname': self._clientname,
            'ClientUUID': self._clientuuid,
            'Content-Type': APPLICATION_JSON,
            'Origin': '*',
        })
        request = httpclient.HTTPRequest(url=url,
                                         connect_timeout=self.connect_timeout,
                                         request_timeout=self.request_timeout,
                                         headers=headers)

        ws_conn = websocket.WebSocketClientConnection(self._io_loop, request)
        ws_conn.connect_future.add_done_callback(self._connect_callback)

    def send(self, data: dict):
        if self.is_connected():
            self._io_loop.add_callback(self._send_msg, data)
            return True
        return False

    def _send_msg(self, data: dict, ensure_ascii: bool = False):
        if self._ws_connection:
            self._ws_connection.write_message(json.dumps(data, ensure_ascii=ensure_ascii))
            return True
        return False

    def close(self, reason=''):
        """Close connection.
        """
        if self._connect_status != self.DISCONNECTED:
            self._connect_status = self.DISCONNECTED
            self._ws_connection and self._ws_connection.close()
            self._ws_connection = None
            self.on_connection_close(reason)

    def _connect_callback(self, future):
        if future.exception() is None:
            self._connect_status = self.CONNECTED
            self._ws_connection = future.result()
            self.on_connection_success()
            self._read_messages()
        else:
            self.close(future.exception())

    def is_connected(self):
        return self._ws_connection is not None

    @gen.coroutine
    def _read_messages(self):
        while True:
            msg = yield self._ws_connection.read_message()
            if msg is None:
                self.close("server close")
                break
            self.on_message(msg)

    def on_message(self, msg):
        """This is called when new message is available from the server.
        :param str msg: server message.
        """
        raise NotImplementedError("This is called when new message is available from the server.")

    def on_connection_success(self):
        """This is called on successful connection ot the server.
        """
        print("on_connection_success!")

    def on_connection_close(self, reason):
        """This is called when server closed the connection.
        """
        print("on_connection_close!,", reason)


class MessageSocketClient(WebSocketClient):

    def __init__(self, connect_success=None, message_receive=None, connect_timeout=DEFAULT_CONNECT_TIMEOUT,
                 request_timeout=DEFAULT_REQUEST_TIMEOUT):
        self._connect_success = connect_success
        self._message_receive = message_receive

        self._later_handler = dict()
        self.connect_timeout = connect_timeout
        self.request_timeout = request_timeout
        self._io_loop = ioloop.IOLoop.current()
        self.ws_url = None
        self.auto_net = False
        self.heartbeat_interval_in_secs = 3

        super(MessageSocketClient, self).__init__(self._io_loop,
                                                  self.connect_timeout,
                                                  self.request_timeout)

        self._later_handler['move_server'] = self._io_loop.call_later(random.randint(30, 60), self.auto_move_server)

    def new_server_func(self):
        return self.ws_url, False

    def server_replace(self, ws_url):
        print("需要连接新服务器", ws_url)

    def auto_move_server(self):
        ws_url, move = self.new_server_func()
        if move:
            self.server_replace(ws_url)
        self._later_handler['move_server'] = self._io_loop.call_later(random.randint(30, 30), self.auto_move_server)

    def connect(self, url, name, uuid, reconnect=True, reconnect_interval=3):
        self.ws_url = url
        self.auto_reconnet = reconnect
        self.reconnect_interval = reconnect_interval
        self._clientname = name
        self._clientuuid = uuid

        super(MessageSocketClient, self).connect(self.ws_url)

    def on_connection_success(self):
        if self._connect_success:
            self._connect_success(self)
        self._later_handler['heartbeat'] = self._io_loop.call_later(self.heartbeat_interval_in_secs,
                                                                    functools.partial(self.sendheartbeat, self.ws_url))

    def on_connection_close(self, reason):
        print('%s Connection closed reason=%s' % (self.ws_url, reason,))
        self.reconnect()

    def reconnect(self):
        if not self.auto_reconnet:
            self.quit()
        if not self.is_connected() and self.auto_reconnet:
            self._later_handler['reconnect'] = self._io_loop.call_later(self.reconnect_interval,
                                                                        super(MessageSocketClient, self).connect,
                                                                        self.ws_url)

    def payload_data(self):
        return {"type": "ping"}

    def sendheartbeat(self, _url):
        print(datetime.datetime.now(), "发送❤️")
        if self.ws_url == _url:
            msg = self.payload_data()
            bool = self.send(msg) if msg else False

            if msg is None or bool:
                self._later_handler['heartbeat'] = self._io_loop.call_later(self.heartbeat_interval_in_secs,
                                                                            functools.partial(self.sendheartbeat,
                                                                                              self.ws_url))

    def on_message(self, msg):
        if not self._message_receive:
            raise BaseException("未设置接收消息函数->", self.receive)
        else:
            self._message_receive(msg)

    def quit(self):
        self.auto_reconnet = False
        self.close("quit")
        for k, handler in self._later_handler.items():
            self._io_loop.remove_timeout(handler)


if __name__ == '__main__':

    def success(sock):
        global sockccc
        sockccc = sock


    def rec(msg):
        if msg == 'quit':
            sockccc.quit()


    def interval():
        if sockccc:
            sockccc.send({'test': time.time()})
        Timer(3, interval).start()


    s1 = MessageSocketClient()
    s1.connect('ws://localhost:10000', reconnect=False)

    Timer(2, interval).start()
    ioloop.IOLoop.current().start()