monitor.py 19.0 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2020-02-09 13:40
# @Author  : Lemon
# @File    : monitor.py
# @Software: PyCharm
import functools
import json
import logging
import os
import threading
import time
import traceback
import uuid
from concurrent.futures import ThreadPoolExecutor

from fbchat import FBchatUserError
from fbchat._exception import FBchatNotLoggedIn
from munch import Munch
from selenium import webdriver
from selenium.webdriver import DesiredCapabilities

from conf import settings
from core import callback, command
from db import models
from lib import control_server, common
from lib.common import TaskStatus
from lib.facebook import FacebookClient
from lib.ttl_cache import lru_cache
from lib.websock import Client
from utils import parameter, system_info

log = logging.getLogger(__name__)
tostr = lambda x: (x)
add_type = lambda x, y: {"type": x, 'data': y}
now = lambda: round(time.time() * 1000)


class Monitor(callback.CallBack):

    def __init__(self):
        super().__init__()
        self._listenlist = dict()
        self._imei = models.Config.get('imei', lambda: uuid.uuid1().hex)
        self._name = models.Config.get('name', control_server.get_init_name)
        self._host = models.Config.get('host', lambda: settings.get_server()['url'])
        self.version = None
        self.latest_version = None
        self.executor = ThreadPoolExecutor(100, 'task_thread')
        self.init_config = {}
        self.queue = None
        self.changeLock = threading.Lock()
        self.change_list = []

    def bind(self):
        models.Config.set('host', Client.target_server)

    def go_login(self):
        threading.Thread(target=self._auto_login, args=(), name='auto_login_thread').start()

    def _auto_login(self):
        user_list = models.UserList.query(status=common.Status.ONLINE)
        for user in user_list:
            try:
                cookie = user.format_cookie()
                self.login(user.email, user.password, cookie, user.user_agent)
            except Exception as e:
                print('登录', user.email, e)
            time.sleep(0.3)

    def _replace_call(self, client):
        funcs = [x for x in dir(self) if x.startswith("on")]
        for fname in funcs:
            mfunc = getattr(self, fname)
            if not mfunc:
                continue
            cfunc = functools.partial(mfunc, client)
            setattr(client, fname, cfunc)

    def change(self, email, password, cookie=None):
        limit = 100
        if self.size >= limit:
            raise Exception("单台终端登录到达上限,%d个号" % limit)
        client = self._get_member(email)
        if client:
            self.onLoggedIn(client)
            return client.info()
        else:
            try:
                if email in self.change_list:
                    self.onChangEnv(email, 'changeError', f'{email}已经在登录队列中')
                    return
                else:
                    self.change_list.append(email)
                    self.onChangEnv(email, 'waiting', '队列中...')
                self.changeLock.acquire()
                self.changeingIn(email)  # 转环境的话清理缓存
                result, email, password, cookie, user_agent = self.selenium_login(email, password, cookie)
                if result:
                    self.onChangSucceed(email, '匹配设备指纹成功',
                                        data={'email': email, 'password': password, 'cookie': cookie})
            except Exception as err:
                self.onChangEnv(email, 'changeError', str(err))
                raise err
            finally:
                self.change_list.remove(email)
                self.changeLock.release()

    def login(self, email, password, cookie=None, user_agent=None, proxy=None):
        limit = 100
        if self.size >= limit:
            raise Exception("单台终端登录到达上限,%d个号" % limit)
        client = self._get_member(email)
        if client:
            self.onLoggedIn(client)
            return client.info()
        else:
            if user_agent is None or user_agent == '':
                user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) " \
                             "Chrome/84.0.4147.105 Safari/537.36"
            obj = self.onLoggingIn(email, password, cookie, user_agent, proxy)
            try:
                client = FacebookClient(obj, logout_call=functools.partial(self.logout, obj.email),
                                        logging_level=logging.INFO)
                client.user_obj = obj
                client.extend = command.Executor(client)
                self._replace_call(client)
                self._add_member(client)
                self.onLoggedIn(client)
                return client.info()
            except Exception as err:
                self.onLoggingError(email, str(err))
                raise err

    def selenium_login(self, email, password, cookie=None, user_agent=None):
        """
        如果为号密登录,则使用浏览器获取cookie
        :param email:
        :param password:
        :param cookie:
        :param user_agent:
        :return:
        """
        email = email.strip()
        password = password.strip()
        import random
        capabilities = DesiredCapabilities.CHROME
        chrome_options = webdriver.ChromeOptions()
        chrome_options.add_argument('--no-sandbox')  # root用户不加这条会无法运行
        chrome_options.add_argument('--headless')  # 增加无界面选项
        chrome_options.add_argument('--proxy-server=http://8.210.170.163:24000')  # 设置随机代理
        from utils.encpass import get_random_device
        deviceName = get_random_device()
        print(email, '终端设备', deviceName)
        self.onChangEnv(email, 'changeCreatDevice', '创建登录环境')
        chrome_options.add_experimental_option("mobileEmulation", {"deviceName": deviceName})
        mainUrl = "https://m.facebook.com/"
        if user_agent is None or user_agent == '':
            user_agent = 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1'
        chrome_options.add_argument(f'user-agent="{user_agent}"')
        # 创建浏览器对象
        browser = webdriver.Chrome(desired_capabilities=capabilities, options=chrome_options)
        try:
            self.onChangEnv(email, 'changeInitFinger', '初始设备指纹')
            if cookie is not None and cookie != '':
                self.onChangEnv(email, 'changeInitCookies', '初始化cookies')
                print(email, '导入cookie')
                browser.get(mainUrl)
                from utils.encpass import back_cookies
                cookies = back_cookies(cookie)
                for cookie in cookies:
                    print(cookie['name'], cookie['value'])
                    browser.add_cookie(cookie)
                browser.get(mainUrl)
            else:
                browser.get(mainUrl)
                print(email, '点击帐号input')
                browser.find_element_by_xpath('//*[@id="m_login_email"]').click()
                print(email, '输入帐号')
                browser.find_element_by_xpath('//*[@id="m_login_email"]').send_keys(email)
                print(email, '点击密码input')
                browser.find_element_by_xpath('//*[@id="m_login_password"]').click()
                print(email, '输入密码')
                browser.find_element_by_xpath('//*[@id="m_login_password"]').send_keys(password)
                browser.find_element_by_xpath('//*[@id="u_0_4"]/button').click()
                i = 0
                while browser.current_url == mainUrl:
                    time.sleep(2)
                    i += 1
                    if i >= 3:
                        break
                    else:
                        print(email, '等待页面', browser.current_url)
                        continue
                if 'save-device' in browser.current_url and 'checkpoint' not in browser.current_url:
                    self.onChangEnv(email, 'changeSaveFinger', '保存设备指纹')
                    print(email, '保存设备点击ok')
                    browser.find_element_by_xpath(
                        '//*[@id="root"]/div[1]/div/div/div[3]/div[2]/form/div/button').click()
                # user_agent_ = browser.execute_script("return navigator.userAgent")  # 获取ua头部
            user_agent_ = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.105 Safari/537.36'  # 因为协议初始化必须要PCua头部
            print(email, '提取user_agent_', user_agent_)
            from utils.encpass import deal_cookies
            cookie_ = deal_cookies(browser.get_cookies())
            print(email, '提取cookies', cookie_)
            print("判断连接", browser.current_url)
            time.sleep(2)
            if 'checkpoint' in browser.current_url:
                self.onChangEnv(email, 'changeError', '登录失败:帐号封锁')
                browser.close()
                return False, email, password, cookie_, user_agent_
            if 'login_attempt' in browser.current_url:
                self.onChangEnv(email, 'changeError', '登录失败:帐号密码错误')
                browser.close()
                return False, email, password, cookie_, user_agent_
            if 'c_user' not in cookie_:
                self.onChangEnv(email, 'changeError', '登录失败,请检查帐号密码,或者重试')
                browser.close()
                return False, email, password, cookie_, user_agent_
            browser.get(mainUrl)
            page_source = browser.page_source
            if 'Friend requests' in page_source:
                click_list = ['News Feed', 'Friend requests', 'Notifications', 'Search', 'More']
            elif 'News Feed' in page_source:
                click_list = ['News Feed', 'Friend Requests', 'Notifications', 'Search', 'More']
            elif '动态消息' in page_source:
                click_list = ['动态消息', '加好友请求', '通知', '搜索', '更多']
            elif 'ニュースフィード' in page_source:
                click_list = ['ニュースフィード', '友達リクエスト', 'お知らせ', '検索', 'その他']
            else:
                self.onChangEnv(email, 'changeError', '未知语言类型,请联系小陈添加')
                browser.close()
                return False, email, password, cookie_, user_agent_
            action_list = random.sample(click_list, 5)
            print('随机动作', action_list)
            for i, action in enumerate(action_list):
                print(email, action)
                self.onChangEnv(email, 'changeMatching', f'匹配设备指纹{i + 1}...')
                browser.find_element_by_name(action).click()
                browser.get('https://m.facebook.com/home.php')
            browser.close()
            return True, email, password, cookie_, user_agent_
        except Exception as err:
            self.onChangEnv(email, 'changeError', str(err))
            browser.close()
            return False, email, password, cookie, user_agent

    def logout(self, email):
        try:
            client = self._get_member(email)
            if not client:
                return False
            try:
                client._mqtt and client._mqtt.stop()
            except Exception as e:
                print('logout异常', e)
            self._remove_member(email)
            self.onLogout(client)
        except:
            pass
        return True

    def _add_member(self, client):
        email = client.email

        def long_link_thread():
            client.listen(True)
            self._remove_member(email)
            print(email, '注销了')

        t = threading.Thread(target=long_link_thread, name='[email protected]{}'.format(client.email))
        t.setDaemon(True)
        t.start()

        self._listenlist[email] = client

    def _remove_member(self, email):
        self._listenlist.pop(email, None)

    def _get_member(self, email) -> FacebookClient:
        if not email: return None
        return self._listenlist.get(email, None)

    @property
    def size(self):
        return len(self._listenlist)

    def members(self):
        return list(self._listenlist.keys())

    def _heartbeat(self):
        if self.init_config.get("disableSync"):
            return None
        payload = {
            "data": tostr({
                'online': self.members(),
                'imei': self._imei,
                'name': self._name,
            }),
            "type": "ping"
        }
        return payload

    def initialize(self):
        payload = {
            "data": tostr({
                'online': self.members(),
                'imei': self._imei,
                'name': self._name,
            }),
            "type": "initialize",
        }
        Client.send_all(payload)
        Client.payload_data = self._heartbeat

    def _notify_(self, type_, client, body: dict):
        data = {
            "body": tostr(body),
            "time": now(),
            "type": type_,
            "object": client.email,
        }
        if self.queue:
            self.queue.publish_msg(data)
        else:
            if hasattr(client, 'uid') and client.uid:
                data['fbid'] = client.uid
            payload = add_type("notify", data)
            Client.send_all(payload)

    def _task_(self, type_, client, taskid: int, code, msg: dict = None):
        '''任务执行情况反馈(0 收到任务,1 执行成功,-1 执行失败,-2 执行异常)'''
        payload = {
            'code': code,
            'type': type_,
        }
        if taskid:
            payload['taskId'] = taskid

        if msg:
            payload['msg'] = tostr(msg)
        self._notify_('wm_task', client, body=payload)

    def _online_(self):
        payload = {
            "data": tostr({
                'online': self.members(),
                'imei': self._imei,
                'name': self._name,
            }),
            "type": "online",
        }
        Client.send_all(payload)

    def thread(self, client, taskid, kwargs: dict):
        func = kwargs.pop('fn')
        if not self.queue:
            self._task_(func.__name__, client, taskid, code=TaskStatus.EXECUTE)
        try:
            res = func(**kwargs)
        except FBchatNotLoggedIn as err:
            self._task_(func.__name__, client, taskid, code=TaskStatus.FAILED,
                        msg={'error': str(err)})
            self.logout(client.email)
        except FBchatUserError as err:
            self._task_(func.__name__, client, taskid, code=TaskStatus.FAILED,
                        msg={'error': str(err)})
        except Exception as err:
            kwargs['terminal_name'] = self._name
            self._task_(func.__name__, client, taskid, code=TaskStatus.FAILED,
                        msg={'error': str(err), 'kwargs': kwargs, 'traceback': traceback.format_exc()})
        else:
            self._task_(func.__name__, client, taskid, code=TaskStatus.SUCCESS, msg=res)

    def decode_payload(self, payload):
        if isinstance(payload, str):
            payload = payload.replace('%u', '\\u')
        elif isinstance(payload, dict):
            payload = json.dumps(payload).replace('%u', '\\u')
            payload = json.loads(payload)
        return payload

    def execute(self, cmd: dict):
        # ts = cmd.get('timeInMillis')
        type_ = cmd.get('type')
        taskid = int(cmd.get('taskId'))
        target = cmd.get('object')
        payload = cmd.get('data', '{}')  # str or jsonstr or dict
        unicode = cmd.get('unicode', False)
        payload = self.decode_payload(payload) if unicode else payload
        'topic' in cmd and lru_cache.set(taskid, cmd.get('topic'), ttl=180)
        print('任务类型', type_)
        if type_ == 'change':
            kwargs = parameter.join(self.change, payload)
            client = Munch(email=kwargs.get('email'))
            self.executor.submit(self.thread, client, taskid, kwargs)
        if type_ == 'login':
            kwargs = parameter.join(self.login, payload)
            client = Munch(email=kwargs.get('email'))
            self.executor.submit(self.thread, client, taskid, kwargs)
        elif type_ == 'logout':
            try:
                client = self._get_member(target)
                assert hasattr(client, 'extend'), '帐号未登录'
                kwargs = parameter.join(self.logout, payload)
                client = self._get_member(target)
                self.executor.submit(self.thread, client, taskid, kwargs)
            except Exception as err:
                self._task_(type_, Munch(email=target), taskid, code=TaskStatus.EXCEPTION, msg={'error': repr(err)})
        elif type_ == 'update':
            kwargs = parameter.join(self.update, {})
            client = Munch(email=target)
            self.executor.submit(self.thread, client, taskid, kwargs)
        else:
            try:
                if type_ not in ['change']:
                    client = self._get_member(target)
                    assert hasattr(client, 'extend'), '帐号未登录'
                    if hasattr(client.extend, type_):
                        func = getattr(client.extend, type_)
                    else:
                        func = getattr(client, type_)
                    assert func, '未找到该类型指令'
                    kwargs = parameter.join(func, payload)
                    self.executor.submit(self.thread, client, taskid, kwargs)
            except Exception as err:
                self._task_(type_, Munch(email=target), taskid, code=TaskStatus.EXCEPTION, msg={'error': repr(err)})

    def update(self):
        cmd = 'sh ~/fbchat_install.sh'
        md5_value = os.popen(cmd)
        return md5_value.read()

    def check_update(self):
        if self.latest_version and common.version_compare(self.version.name, self.latest_version) == 1:
            self._notify_(type_='update', client=Munch(email='system'), body={
                'version': self.version.name,
                'new_version': self.latest_version,
            })

            kwargs = parameter.join(self.update, {})
            client = Munch(email='system')
            self.executor.submit(self.thread, client, 0, kwargs)

    def auto_move_server(self):
        def set_latest_version(ver):
            self.latest_version = ver

        new_server_func = functools.partial(
            control_server.post_terminal_info,
            lambda: self._name,
            lambda: Client.target_server,
            lambda: self.size,
            lambda: self.version.name,
            set_latest_version
        )
        ws_url, bool = new_server_func()
        Client.connect_server(ws_url, try_reconnect=False)

    def report_system_info(self):
        if not self.queue:
            return
        sysinfo = system_info.get2()
        sysinfo.update({
            'all_count': models.UserList.all_count(),
            'online_count': self.size,
        })
        self.queue.publish("%s/sysinfo" % self._name, json.dumps(sysinfo))