monitor.py 6.8 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2020-02-09 13:40
# @Author  : Lemon
# @File    : monitor.py
# @Software: PyCharm
import functools
import logging
import threading
import time
import traceback
import uuid
from concurrent.futures import ThreadPoolExecutor
from threading import Timer

from munch import Munch

from conf import settings
from core import callback, command
from db import models
from lib import control_server, common
from lib.common import TaskStatus
from lib.facebook import FacebookClient
from utils import parameter

log = logging.getLogger(__name__)
tostr = lambda x: (x)
add_type = lambda x, y: {"type": x, 'data': y}
now = lambda: round(time.time() * 1000)


class Monitor(callback.CallBack):

    def __init__(self):
        super().__init__()
        self._socket = self._temp_socket = None
        self._listenlist = dict()
        self._imei = models.Config.get('imei', lambda: uuid.uuid1().hex)
        self._name = models.Config.get('name', control_server.get_init_name)
        self._host = models.Config.get('host', lambda: settings.get_server()['url'])
        self.version = None
        self.executor = ThreadPoolExecutor(50, 'task_thread')
        self.init_config = {}
        self.queue = None

    def bind(self, socket):
        self._socket = socket
        models.Config.set('host', self._socket.ws_url)

    def go_login(self):
        threading.Thread(target=self._auto_login, args=(), name='auto_login_thread').start()

    def _auto_login(self):
        user_list = models.UserList.query(status=common.Status.ONLINE)
        for user in user_list:
            print("自动登录->", user)
            self.login(user.email, user.password, user.format_cookie(), user.user_agent)
            time.sleep(0.3)

    def _replace_call(self, client):
        funcs = [x for x in dir(self) if x.startswith("on")]
        for fname in funcs:
            mfunc = getattr(self, fname)
            if not mfunc:
                continue
            cfunc = functools.partial(mfunc, client)
            setattr(client, fname, cfunc)

    def login(self, email, password, cookie=None, user_agent=None):
        client = self._get_member(email)
        if client:
            self.onLoggedIn(client)  # 已登录就直接触发登录成功
            return client.info()
        else:
            obj = self.onLoggingIn(email, password, cookie, user_agent)  # 返回db中的user对象
            try:
                client = FacebookClient(obj)
                client.user_obj = obj
                client.extend = command.Executor(client)
                self._replace_call(client)
                self._add_member(client)
                self.onLoggedIn(client)
                return client.info()
            except BaseException as err:
                self.onLoggingError(email, repr(err))
                raise err

    def logout(self, email):
        client = self._get_member(email)
        if not client:
            return False
        self._remove_member(email)

        if client.listening:
            client.stopListening()
        self.onLogout(client)
        return True

    def _add_member(self, client):
        self._listenlist[client.email] = client

    def _remove_member(self, email):
        self._listenlist.pop(email, None)

    def _get_member(self, email) -> FacebookClient:
        if not email: return None
        return self._listenlist.get(email, None)

    @property
    def size(self):
        return len(self._listenlist)

    def members(self):
        return list(self._listenlist.keys())

    def _heartbeat(self):
        if self.init_config.get("disableSync"):
            return None
        payload = {
            "data": tostr({
                'online': self.members(),
                'imei': self._imei,
                'name': self._name,
            }),
            "type": "ping"
        }
        return payload

    def initialize(self):
        payload = {
            "data": tostr({
                'online': self.members(),
                'imei': self._imei,
                'name': self._name,
            }),
            "type": "initialize",
        }
        self._socket.send(payload)
        self._socket.payload_data = self._heartbeat  # 替换心跳数据获取方式

    def _notify_(self, type_, client, body: dict):
        data = {
            "body": tostr(body),
            "time": now(),
            "type": type_,
            "object": client.email,
        }
        if hasattr(client, 'uid') and client.uid:
            data['fbid'] = client.uid
        payload = add_type("notify", data)

        if self.queue: self.queue.publish_msg(data)
        self._socket.send(payload)

    def _task_(self, type_, client, taskid: int, code, msg: dict = None):
        '''任务执行情况反馈(0 收到任务,1 执行成功,-1 执行失败,-2 执行异常)'''
        payload = {
            'code': code,
            'type': type_,
        }
        if taskid: payload['taskId'] = taskid

        if msg: payload['msg'] = tostr(msg)
        self._notify_('wm_task', client, body=payload)

    def thread(self, client, taskid, kwargs: dict):
        func = kwargs.pop('fn')
        self._task_(func.__name__, client, taskid, code=TaskStatus.EXECUTE)
        try:
            res = func(**kwargs)
        except BaseException as err:
            self._task_(func.__name__, client, taskid, code=TaskStatus.FAILED,
                        msg={'error': repr(err), 'full_error': traceback.format_exc()})
        else:
            self._task_(func.__name__, client, taskid, code=TaskStatus.SUCCESS, msg=res)

    def execute(self, cmd: dict):
        ts = cmd.get('timeInMillis')
        type_ = cmd.get('type')
        taskid = int(cmd.get('taskId'))
        target = cmd.get('object')
        payload = cmd.get('data', '{}')  # str or jsonstr or dict

        if type_ == 'login':
            kwargs = parameter.join(self.login, payload)
            client = Munch(email=kwargs.get('email'))
            self.executor.submit(self.thread, client, taskid, kwargs)
        elif type_ == 'logout':
            kwargs = parameter.join(self.logout, payload)
            client = self._get_member(target)
            self.executor.submit(self.thread, client, taskid, kwargs)
        else:
            try:
                client = self._get_member(target)
                assert hasattr(client, 'extend'), '帐号未登录'
                if hasattr(client.extend, type_):
                    func = getattr(client.extend, type_)
                else:
                    func = getattr(client, type_)
                assert func, '未找到该类型指令'
                kwargs = parameter.join(func, payload)
                self.executor.submit(self.thread, client, taskid, kwargs)
            except BaseException as err:
                self._task_(type_, Munch(email=target), taskid, code=TaskStatus.EXCEPTION, msg={'error': repr(err)})