monitor.py 6.0 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2020-02-09 13:40
# @Author  : Lemon
# @File    : monitor.py
# @Software: PyCharm
import random
import time
import uuid
import functools
import logging
from concurrent.futures import ThreadPoolExecutor

from munch import Munch

from core import callback, command
from lib import control_server
from lib.common import TaskStatus
from lib.facebook import FacebookClient
from lib.sqlhelper import UserList, Status, Config
from utils import parameter

log = logging.getLogger(__name__)
tostr = lambda x: (x)
add_type = lambda x, y: {"type": x, 'data': y}
now = lambda: round(time.time() * 1000)


class Monitor(callback.CallBack):

    def __init__(self):
        super().__init__()
        self._socket = self._temp_socket = None
        self._listenlist = dict()
        self._imei = Config.get('imei', lambda: uuid.uuid1().hex)
        self._name = Config.get('name', control_server.get_init_name)
        self.executor = ThreadPoolExecutor(50, 'task_thread')
        self.init_config = {}

    def bind(self, socket):
        self._socket = socket

    def _auto_login(self):
        user_list = UserList.query(status=Status.ONLINE)
        for user in user_list:
            print("执行登录", user)
            self.login(user.email, user.password, user.format_cookie())
            time.sleep(0.2)

    def _replace_call(self, client):
        funcs = [x for x in dir(self) if x.startswith("on")]
        for fname in funcs:
            mfunc = getattr(self, fname)
            if not mfunc:
                continue
            cfunc = functools.partial(mfunc, client)
            setattr(client, fname, cfunc)

    def login(self, email, password, cookie=None, user_agent=None):
        client = self._get_member(email)
        if client:
            self.onLoggedIn(client)  # 已登录就直接触发登录成功
            return client.info()
        else:
            obj = self.onLoggingIn(email, password, cookie, user_agent)  # 返回db中的user对象
            try:
                client = FacebookClient(obj)
                client.user_obj = obj
                client.extend = command.Executor(client)
                self._replace_call(client)
                self._add_member(client)
                self.onLoggedIn(client)
                return client.info()
            except BaseException as err:
                self.onLoggingError(email, repr(err))
                raise err

    def logout(self, email):
        client = self._get_member(email)
        if not client:
            return False
        self._remove_member(email)

        if not client.listening:
            return True
        client.stopListening()
        self.onLogout(client)
        return True

    def _add_member(self, client):
        self._listenlist[client.email] = client

    def _remove_member(self, email):
        self._listenlist.pop(email, None)

    def _get_member(self, email) -> FacebookClient:
        if not email: return None
        return self._listenlist.get(email, None)

    def size(self):
        return len(self._listenlist)

    def members(self):
        return list(self._listenlist.keys())

    def _heartbeat(self):
        if self.init_config.get("disableSync"):
            return None
        payload = {
            "data": tostr({
                'online': self.members(),
                'imei': self._imei,
                'name': self._name,
            }),
            "type": "ping"
        }
        return payload

    def _init_server_(self):
        payload = {
            "data": tostr({
                'online': self.members(),
                'imei': self._imei,
                'name': self._name,
            }),
            "type": "initialize",
        }
        self._socket.send(payload)
        self._socket.payload_data = self._heartbeat  # 替换心跳数据获取方式

    def _notify_(self, type_, client, body: dict):
        data = {
            "body": tostr(body),
            "time": now(),
            "type": type_,
            "object": client.email,
        }
        if hasattr(client, 'uid') and client.uid:
            data['fbid'] = client.uid
        payload = add_type("notify", data)
        self._socket.send(payload)

    def _task_(self, type_, client, taskid: int, code, msg: dict = None):
        '''任务执行情况反馈(0 收到任务,1 执行成功,-1 执行失败,-2 执行异常)'''
        payload = {
            'code': code,
            'type': type_,
        }
        if taskid: payload['taskId'] = taskid

        if msg: payload['msg'] = tostr(msg)
        self._notify_('wm_task', client, body=payload)

    def thread(self, client, taskid, kwargs: dict):
        func = kwargs.pop('fn')
        self._task_(func.__name__, client, taskid, code=TaskStatus.EXECUTE)
        try:
            res = func(**kwargs)
        except BaseException as err:
            self._task_(func.__name__, client, taskid, code=TaskStatus.EXCEPTION, msg={'error': repr(err)})
        else:
            self._task_(func.__name__, client, taskid, code=TaskStatus.SUCCESS, msg=res)

    def execute(self, cmd: dict):
        ts = cmd.get('timeInMillis')
        type_ = cmd.get('type')
        taskid = int(cmd.get('taskId'))
        target = cmd.get('object')
        payload = cmd.get('data', '{}')  # str or jsonstr or dict

        if type_ == 'login':
            kwargs = parameter.join(self.login, payload)
            client = Munch(email=kwargs.get('email'))
            self.executor.submit(self.thread, client, taskid, kwargs)
        else:
            try:
                client = self._get_member(target)
                if hasattr(client.extend, type_):
                    func = getattr(client.extend, type_)
                else:
                    func = getattr(client, type_)
                assert func, '未找到该类型指令'
                kwargs = parameter.join(func, payload)
                self.executor.submit(self.thread, client, taskid, kwargs)
            except BaseException as err:
                self._socket.send({'error': repr(err)})