monitor.py 13.0 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2020-02-09 13:40
# @Author  : Lemon
# @File    : monitor.py
# @Software: PyCharm
import functools
import json
import logging
import multiprocessing
import os
import threading
import time
import traceback
import uuid
from concurrent.futures import ThreadPoolExecutor

import psutil
from fbchat import FBchatUserError
from fbchat._exception import FBchatNotLoggedIn
from munch import Munch

from conf import settings
from conf.settings import get_mark
from core import callback, command
from db import models
from lib import control_server, common
from lib.common import TaskStatus
from lib.facebook import FacebookClient
from lib.facebook_async import FaceBookAsync
from lib.ttl_cache import lru_cache
from lib.websock import Client
from utils import parameter, system_info
from utils.cache import cache

log = logging.getLogger(__name__)
tostr = lambda x: (x)
add_type = lambda x, y: {"type": x, 'data': y}
now = lambda: round(time.time() * 1000)


class Monitor(callback.CallBack):

    def __init__(self):
        super().__init__()
        self._listenlist = dict()
        self._imei = models.Config.get('imei', lambda: uuid.uuid1().hex)
        self._name = models.Config.get('name', control_server.get_init_name)
        self._host = models.Config.get('host', lambda: settings.get_server()['url'])
        self.version = None
        self.latest_version = None
        self.executor = ThreadPoolExecutor(100, 'task_thread')
        self.init_config = {}
        self.queue = None

    def bind(self):
        models.Config.set('host', Client.target_server)

    def go_login(self):
        threading.Thread(target=self._auto_login, args=(), name='auto_login_thread').start()

    def _auto_login(self):
        user_list = models.UserList.query(status=common.Status.ONLINE)
        for user in user_list:
            try:
                cookie = user.format_cookie()
                self.login(user.email, user.password, cookie, user.user_agent)
            except Exception as e:
                print('登录', user.email, e)
            time.sleep(0.1)

    def _replace_call(self, client):
        funcs = [x for x in dir(self) if x.startswith("on")]
        for fname in funcs:
            mfunc = getattr(self, fname)
            if not mfunc:
                continue
            cfunc = functools.partial(mfunc, client)
            setattr(client, fname, cfunc)

    def login(self, email, password, cookie=None, user_agent=None, proxy=None, approvals_code: list = None,
              clear: bool = False, is_second: bool = False, adId=None, deviceId=None,
              familyDeviceId=None, machineId=None, is_collect: bool = False, user_mark: str = None):
        """
        登录facebook帐号
        """
        # 处理制表符,换行符

        email = email.strip().strip('\r\n')
        password = password.strip().strip('\r\n')
        user_agent = user_agent.strip().strip('\r\n')
        limit = int((int(psutil.virtual_memory().total / 1024 / 1024) - 183) / 10.6)  # 根据机器配置设置最大登录上限
        if self.size >= limit:
            raise Exception("单台终端登录到达上限,%d个号" % limit)
        client = self._get_member(email)
        if client:
            self.onLoggedIn(client)
            return client.info()
        else:
            if user_agent is None or user_agent == '':
                user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36'
            obj = self.onLoggingIn(email, password, cookie, user_agent, proxy, approvals_code, clear, is_second,
                                   adId, deviceId, familyDeviceId, machineId, is_collect, user_mark)
            try:
                # client = FacebookClient(obj, logout_call=functools.partial(self.logout, obj.email),
                #                         logging_level=logging.INFO, client=self)
                client = FaceBookAsync(obj, logout_call=functools.partial(self.logout, obj.email),
                                       logging_level=logging.INFO, client=self)
                client.user_obj = obj
                client.extend = command.Executor(client)
                self._replace_call(client)
                self._add_member(client)
                self.onLoggedIn(client)
                return client.info()
            except Exception as err:
                verification_method = cache.get(f'{email}:verification_method', [])
                cache.delete(f'{email}:verification_method')
                self.onLoggingError(email, str(err), verification_method)
                raise err

    def logout(self, email):
        try:
            client = self._get_member(email)
            if not client:
                return False
            try:
                client._mqtt and client._mqtt.stop()
            except Exception as e:
                print('logout异常', e)
            self._remove_member(email)
            self.onLogout(client)
        except:
            pass
        return True

    def _add_member(self, client):
        email = client.email

        def long_link_thread():
            client.listen(True)
            self._remove_member(email)
            print(email, '注销了')
            self.onLogout(client, '帐号退出')

        def long_link_process():
            client.listen(True)
            self._remove_member(email)
            print(email, '注销了')
            self.onLogout(client, '帐号退出')

        # 通过线程的方式启动异步链接
        t = threading.Thread(target=long_link_thread, name='[email protected]{}'.format(client.email))
        t.setDaemon(True)
        t.start()
        # # 通过进程的方式启动异步链接
        # p = multiprocessing.Process(target=long_link_process, name='[email protected]{}'.format(client.email))
        # p.daemon = True
        # p.start()

        self._listenlist[email] = client

    def _remove_member(self, email):
        self._listenlist.pop(email, None)

    def _get_member(self, email) -> FacebookClient:
        if not email: return None
        return self._listenlist.get(email, None)

    @property
    def size(self):
        return len(self._listenlist)

    def members(self):
        return list(self._listenlist.keys())

    def _heartbeat(self):
        if self.init_config.get("disableSync"):
            return None
        payload = {
            "data": tostr({
                'online': self.members(),
                'imei': self._imei,
                'name': self._name,
            }),
            "type": "ping"
        }
        return payload

    def initialize(self):
        payload = {
            "data": tostr({
                'online': self.members(),
                'imei': self._imei,
                'name': self._name,
            }),
            "type": "initialize",
        }
        Client.send_all(payload)
        Client.payload_data = self._heartbeat

    def _notify_(self, type_, client, body: dict):
        data = {
            "body": tostr(body),
            "time": now(),
            "type": type_,
            "object": client.email,
        }
        if self.queue:
            self.queue.publish_msg(data)
        else:
            if hasattr(client, 'uid') and client.uid:
                data['fbid'] = client.uid
            payload = add_type("notify", data)
            Client.send_all(payload)

    def _task_(self, type_, client, taskid: int, code, msg: dict = None):
        '''任务执行情况反馈(0 收到任务,1 执行成功,-1 执行失败,-2 执行异常)'''
        payload = {
            'code': code,
            'type': type_,
        }
        if taskid:
            payload['taskId'] = taskid

        if msg:
            payload['msg'] = tostr(msg)
        self._notify_('wm_task', client, body=payload)

    def _online_(self):
        payload = {
            "data": tostr({
                'online': self.members(),
                'imei': self._imei,
                'name': self._name,
            }),
            "type": "online",
        }
        Client.send_all(payload)

    def thread(self, client, taskid, kwargs: dict):
        func = kwargs.pop('fn')
        if not self.queue:
            self._task_(func.__name__, client, taskid, code=TaskStatus.EXECUTE)
        try:
            res = func(**kwargs)
        except FBchatNotLoggedIn as err:
            self._task_(func.__name__, client, taskid, code=TaskStatus.FAILED,
                        msg={'error': str(err)})
            self.logout(client.email)
        except FBchatUserError as err:
            self._task_(func.__name__, client, taskid, code=TaskStatus.FAILED,
                        msg={'error': str(err)})
        except Exception as err:
            kwargs['terminal_name'] = self._name
            self._task_(func.__name__, client, taskid, code=TaskStatus.FAILED,
                        msg={'error': str(err)[:500], 'kwargs': kwargs, 'traceback': traceback.format_exc()[:5000]})
        else:
            self._task_(func.__name__, client, taskid, code=TaskStatus.SUCCESS, msg=res)

    def decode_payload(self, payload):
        if isinstance(payload, str):
            payload = payload.replace('%u', '\\u')
        elif isinstance(payload, dict):
            payload = json.dumps(payload).replace('%u', '\\u')
            payload = json.loads(payload)
        return payload

    def execute(self, cmd: dict):
        type_ = cmd.get('type')
        taskid = int(cmd.get('taskId'))
        target = cmd.get('object')
        payload = cmd.get('data', '{}')  # str or jsonstr or dict
        unicode = cmd.get('unicode', False)
        payload = self.decode_payload(payload) if unicode else payload
        'topic' in cmd and lru_cache.set(taskid, cmd.get('topic'), ttl=180)
        print('task_type=>', type_)
        if type_ == 'login':
            kwargs = parameter.join(self.login, payload)
            client = Munch(email=kwargs.get('email'))
            self.executor.submit(self.thread, client, taskid, kwargs)
        elif type_ == 'logout':
            try:
                client = self._get_member(target)
                assert hasattr(client, 'extend'), '帐号未登录'
                kwargs = parameter.join(self.logout, payload)
                client = self._get_member(target)
                self.executor.submit(self.thread, client, taskid, kwargs)
            except Exception as err:
                self._task_(type_, Munch(email=target), taskid, code=TaskStatus.EXCEPTION, msg={'error': repr(err)})
        elif type_ == 'update':
            kwargs = parameter.join(self.update, {})
            client = Munch(email=target)
            self.executor.submit(self.thread, client, taskid, kwargs)
        else:
            try:
                if type_ not in ['change']:
                    client = self._get_member(target)
                    assert hasattr(client, 'extend'), '帐号未登录'
                    if hasattr(client.extend, type_):
                        func = getattr(client.extend, type_)
                    else:
                        func = getattr(client, type_)
                    assert func, '未找到该类型指令'
                    kwargs = parameter.join(func, payload)
                    self.executor.submit(self.thread, client, taskid, kwargs)
            except Exception as err:
                self._task_(type_, Munch(email=target), taskid, code=TaskStatus.EXCEPTION, msg={'error': repr(err)})

    def update(self):
        cmd = 'sh ~/fbchat_install.sh'
        md5_value = os.popen(cmd)
        return md5_value.read()

    def check_update(self):
        if self.latest_version and common.version_compare(self.version.name, self.latest_version) == 1:
            self._notify_(type_='update', client=Munch(email='system'), body={
                'version': self.version.name,
                'new_version': self.latest_version,
            })

            kwargs = parameter.join(self.update, {})
            client = Munch(email='system')
            self.executor.submit(self.thread, client, 0, kwargs)

    def auto_move_server(self):
        def set_latest_version(ver):
            self.latest_version = ver

        new_server_func = functools.partial(
            control_server.post_terminal_info,
            lambda: self._name,
            lambda: Client.target_server,
            lambda: self.size,
            lambda: models.UserList.all_count(),
            lambda: self.version.name,
            lambda: get_mark(),
            set_latest_version
        )
        ws_url, bool = new_server_func()
        Client.connect_server(ws_url, try_reconnect=False)

    def report_system_info(self):
        """
        上报终端信息
        """
        if not self.queue:
            return
        sysinfo = system_info.get2()
        sysinfo.update({
            'all_count': models.UserList.all_count(),
            'online_count': self.size,
            'mark': get_mark()
        })
        print("[report_system_info]>>>>>>", sysinfo)
        self.queue.publish("%s/sysinfo" % self._name, json.dumps(sysinfo))


if __name__ == '__main__':
    print(models.UserList.all_count())