socket_.py 6.6 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2020-02-09 13:43
# @Author  : Lemon
# @File    : socket.py
# @Software: PyCharm
import datetime
import functools
import json
import random

from tornado import gen
from tornado import httpclient
from tornado import httputil
from tornado import ioloop
from tornado import websocket

APPLICATION_JSON = 'application/json'
DEFAULT_CONNECT_TIMEOUT = 30
DEFAULT_REQUEST_TIMEOUT = 30


class WebSocketClient(object):
    """Base for web __socket clients.
    """
    DISCONNECTED = 0
    CONNECTING = 1
    CONNECTED = 2

    def __init__(self, io_loop=None,
                 connect_timeout=DEFAULT_CONNECT_TIMEOUT,
                 request_timeout=DEFAULT_REQUEST_TIMEOUT):

        self.connect_timeout = connect_timeout
        self.request_timeout = request_timeout
        self._io_loop = io_loop or ioloop.IOLoop.current()
        self._ws_connection = None
        self._connect_status = self.DISCONNECTED
        self._clientname = self._clientuuid = None

    def connect(self, url):
        self._connect_status = self.CONNECTING

        headers = httputil.HTTPHeaders({
            'Clientname': self._clientname,
            'ClientUUID': self._clientuuid,
            'Content-Type': APPLICATION_JSON,
            'Origin': '*',
        })
        request = httpclient.HTTPRequest(url=url,
                                         connect_timeout=self.connect_timeout,
                                         request_timeout=self.request_timeout,
                                         headers=headers)

        ws_conn = websocket.WebSocketClientConnection(self._io_loop, request)
        ws_conn.connect_future.add_done_callback(self._connect_callback)

    def send(self, data: dict):
        if self.is_connected():
            self._io_loop.add_callback(self._send_msg, data)
            return True
        return False

    def _send_msg(self, data: dict, ensure_ascii: bool = False):
        if self._ws_connection:
            self._ws_connection.write_message(json.dumps(data, ensure_ascii=ensure_ascii))
            return True
        return False

    def close(self, reason=''):
        """Close connection.
        """
        if self._connect_status != self.DISCONNECTED:
            self._connect_status = self.DISCONNECTED
            self._ws_connection and self._ws_connection.close()
            self._ws_connection = None
            self.on_connection_close(reason)

    def _connect_callback(self, future):
        if future.exception() is None:
            self._connect_status = self.CONNECTED
            self._ws_connection = future.result()
            self.on_connection_success()
            self._read_messages()
        else:
            self.close(future.exception())

    def is_connected(self):
        return self._ws_connection is not None

    @gen.coroutine
    def _read_messages(self):
        while True:
            msg = yield self._ws_connection.read_message()
            if msg is None:
                self.close("server close")
                break
            self.on_message(msg)

    def on_message(self, msg):
        """This is called when new message is available from the server.
        :param str msg: server message.
        """
        raise NotImplementedError("This is called when new message is available from the server.")

    def on_connection_success(self):
        """This is called on successful connection ot the server.
        """
        print("on_connection_success!")

    def on_connection_close(self, reason):
        """This is called when server closed the connection.
        """
        print("on_connection_close!,", reason)


class MessageSocketClient(WebSocketClient):

    def __init__(self, connect_success=None, message_receive=None, connect_timeout=DEFAULT_CONNECT_TIMEOUT,
                 request_timeout=DEFAULT_REQUEST_TIMEOUT):
        self._connect_success = connect_success
        self._message_receive = message_receive

        self._later_handler = dict()
        self.connect_timeout = connect_timeout
        self.request_timeout = request_timeout
        self._io_loop = ioloop.IOLoop.current()
        self.ws_url = None
        self.auto_net = False
        self.heartbeat_interval_in_secs = 30

        super(MessageSocketClient, self).__init__(self._io_loop,
                                                  self.connect_timeout,
                                                  self.request_timeout)

    def new_server_func(self):
        return self.ws_url, False

    def server_replace(self, ws_url):
        print("需要连接新服务器", ws_url)

    def auto_move_server(self):
        ws_url, move = self.new_server_func()
        if move:
            print('需要移动->', ws_url)
            self.server_replace(ws_url)

    def connect(self, url, name, uuid, reconnect=True, reconnect_interval=3):
        self.ws_url = url
        self.auto_reconnet = reconnect
        self.reconnect_interval = reconnect_interval
        self._clientname = name
        self._clientuuid = uuid

        super(MessageSocketClient, self).connect(self.ws_url)

    def on_connection_success(self):
        if self._connect_success:
            self._connect_success(self)

    def on_connection_close(self, reason):
        print('%s Connection closed reason=%s' % (self.ws_url, reason,))
        self.reconnect()

    def reconnect(self):
        if not self.auto_reconnet:
            self.quit()
        if not self.is_connected() and self.auto_reconnet:
            self._later_handler['reconnect'] = self._io_loop.call_later(self.reconnect_interval,
                                                                        super(MessageSocketClient, self).connect,
                                                                        self.ws_url)

    def payload_data(self):
        return {"type": "ping"}

    def sendheartbeat(self):
        print(datetime.datetime.now(), "发送❤️")
        msg = self.payload_data()
        self.send(msg)

    def on_message(self, msg):
        if not self._message_receive:
            raise BaseException("未设置接收消息函数->", self.receive)
        else:
            self._message_receive(msg)

    def quit(self):
        self.auto_reconnet = False
        self.close("quit")
        for k, handler in self._later_handler.items():
            self._io_loop.remove_timeout(handler)


if __name__ == '__main__':
    from apscheduler.schedulers.tornado import TornadoScheduler


    def run():
        print(1)


    sock = MessageSocketClient()
    sched = TornadoScheduler()
    sched.add_job(run, 'interval', seconds=3)
    sched.start()

    sock._io_loop.instance().start()