diff --git a/bin/start.py b/bin/start.py index cb213d0..4f34b43 100644 --- a/bin/start.py +++ b/bin/start.py @@ -1,5 +1,6 @@ #!/usr/bin/env python import sys, os +import fbchat._mqtt BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) @@ -7,6 +8,17 @@ sys.path.append(BASE_DIR) from core import core from conf import log_settings + +def reload_mqtt(): + '''替换_mqtt模块,增加Proxy支持''' + import importlib.machinery, importlib.util + loader = importlib.machinery.SourceFileLoader('fbchat._mqtt', 'lib/_mqtt.py') + spec = importlib.util.spec_from_loader(loader.name, loader) + fbchat._mqtt = importlib.util.module_from_spec(spec) + loader.exec_module(fbchat._mqtt) + + if __name__ == '__main__': # log_settings.load_logging_cfg().info("Running") + reload_mqtt() core.run() diff --git a/core/core.py b/core/core.py index 714a44c..4810e17 100644 --- a/core/core.py +++ b/core/core.py @@ -16,6 +16,11 @@ from core.monitor import Monitor from lib import control_server from lib.socket_ import MessageSocketClient +try: + from lib.message_queue import MessageQueue as Queue +except: + Queue = None + monitor = Monitor() monitor.version = settings.get_version() @@ -94,4 +99,9 @@ def run(): sched.start() # init schedule end + # message queue start + if Queue: + monitor.queue = Queue(monitor._name, monitor.execute).start() + # message queue end + ioloop.IOLoop.instance().start() diff --git a/core/monitor.py b/core/monitor.py index 9343b2e..452de1f 100644 --- a/core/monitor.py +++ b/core/monitor.py @@ -41,6 +41,7 @@ class Monitor(callback.CallBack): self.version = None self.executor = ThreadPoolExecutor(50, 'task_thread') self.init_config = {} + self.queue = None def bind(self, socket): self._socket = socket @@ -147,6 +148,8 @@ class Monitor(callback.CallBack): if hasattr(client, 'uid') and client.uid: data['fbid'] = client.uid payload = add_type("notify", data) + + if self.queue: self.queue.publish_msg(data) self._socket.send(payload) def _task_(self, type_, client, taskid: int, code, msg: dict = None): diff --git a/lib/_mqtt.py b/lib/_mqtt.py new file mode 100644 index 0000000..31f9825 --- /dev/null +++ b/lib/_mqtt.py @@ -0,0 +1,328 @@ +import re + +import attr +import random +import paho.mqtt.client +import socks +from fbchat._core import log +from fbchat import _util, _exception, _graphql + +import ssl + +ssl._create_default_https_context = ssl._create_unverified_context + + +def generate_session_id(): + """Generate a random session ID between 1 and 9007199254740991.""" + return random.randint(1, 2 ** 53) + + +@attr.s(slots=True) +class Mqtt(object): + _state = attr.ib() + _mqtt = attr.ib() + _on_message = attr.ib() + _chat_on = attr.ib() + _foreground = attr.ib() + _sequence_id = attr.ib() + _sync_token = attr.ib(None) + + _HOST = "edge-chat.facebook.com" + + @classmethod + def connect(cls, state, on_message, chat_on, foreground): + mqtt = paho.mqtt.client.Client( + client_id="mqttwsclient", + clean_session=True, + protocol=paho.mqtt.client.MQTTv31, + transport="websockets", + ) + mqtt.enable_logger() + + if state._session.proxies: + proxy = state._session.params.get('sock') + mqtt.proxy_set(**proxy) + # mqtt.max_inflight_messages_set(20) # The rest will get queued + # mqtt.max_queued_messages_set(0) # Unlimited messages can be queued + # mqtt.message_retry_set(20) # Retry sending for at least 20 seconds + # mqtt.reconnect_delay_set(min_delay=1, max_delay=120) + # TODO: Is region (lla | atn | odn | others?) important? + mqtt.tls_set() + + self = cls( + state=state, + mqtt=mqtt, + on_message=on_message, + chat_on=chat_on, + foreground=foreground, + sequence_id=cls._fetch_sequence_id(state), + ) + + # Configure callbacks + mqtt.on_message = self._on_message_handler + mqtt.on_connect = self._on_connect_handler + + self._configure_connect_options() + + # Attempt to connect + try: + rc = mqtt.connect(self._HOST, 443, keepalive=10) + except ( + # Taken from .loop_forever + paho.mqtt.client.socket.error, + OSError, + paho.mqtt.client.WebsocketConnectionError, + ) as e: + raise _exception.FBchatException("MQTT connection failed") + + # Raise error if connecting failed + if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: + err = paho.mqtt.client.error_string(rc) + raise _exception.FBchatException("MQTT connection failed: {}".format(err)) + + return self + + def _on_message_handler(self, client, userdata, message): + # Parse payload JSON + try: + j = _util.parse_json(message.payload.decode("utf-8")) + except (_exception.FBchatFacebookError, UnicodeDecodeError): + log.exception("Failed parsing MQTT data on %s as JSON", message.topic) + return + + if message.topic == "/t_ms": + # Update sync_token when received + # This is received in the first message after we've created a messenger + # sync queue. + if "syncToken" in j and "firstDeltaSeqId" in j: + self._sync_token = j["syncToken"] + self._sequence_id = j["firstDeltaSeqId"] + + # Update last sequence id when received + if "lastIssuedSeqId" in j: + self._sequence_id = j["lastIssuedSeqId"] + + if "errorCode" in j: + # Known types: ERROR_QUEUE_OVERFLOW | ERROR_QUEUE_NOT_FOUND + # 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00' + log.error("MQTT error code %s received", j["errorCode"]) + # TODO: Consider resetting the sync_token and sequence ID here? + + log.debug("MQTT payload: %s, %s", message.topic, j) + + # Call the external callback + self._on_message(message.topic, j) + + @staticmethod + def _fetch_sequence_id(state): + """Fetch sequence ID.""" + params = { + "limit": 1, + "tags": ["INBOX"], + "before": None, + "includeDeliveryReceipts": False, + "includeSeqID": True, + } + log.debug("Fetching MQTT sequence ID") + # Same request as in `Client.fetchThreadList` + (j,) = state._graphql_requests(_graphql.from_doc_id("1349387578499440", params)) + try: + return int(j["viewer"]["message_threads"]["sync_sequence_id"]) + except (KeyError, ValueError): + # TODO: Proper exceptions + raise + + def _on_connect_handler(self, client, userdata, flags, rc): + if rc == 21: + raise _exception.FBchatException( + "Failed connecting. Maybe your cookies are wrong?" + ) + if rc != 0: + return # Don't try to send publish if the connection failed + + # configure receiving messages. + payload = { + "sync_api_version": 10, + "max_deltas_able_to_process": 1000, + "delta_batch_size": 500, + "encoding": "JSON", + "entity_fbid": self._state.user_id, + } + + # If we don't have a sync_token, create a new messenger queue + # This is done so that across reconnects, if we've received a sync token, we + # SHOULD receive a piece of data in /t_ms exactly once! + if self._sync_token is None: + topic = "/messenger_sync_create_queue" + payload["initial_titan_sequence_id"] = str(self._sequence_id) + payload["device_params"] = None + else: + topic = "/messenger_sync_get_diffs" + payload["last_seq_id"] = str(self._sequence_id) + payload["sync_token"] = self._sync_token + + self._mqtt.publish(topic, _util.json_minimal(payload), qos=1) + + def _configure_connect_options(self): + # Generate a new session ID on each reconnect + session_id = generate_session_id() + + topics = [ + # Things that happen in chats (e.g. messages) + "/t_ms", + # Group typing notifications + "/thread_typing", + # Private chat typing notifications + "/orca_typing_notifications", + # Active notifications + "/orca_presence", + # Other notifications not related to chats (e.g. friend requests) + "/legacy_web", + # Facebook's continuous error reporting/logging? + "/br_sr", + # Response to /br_sr + "/sr_res", + # TODO: Investigate the response from this! (A bunch of binary data) + # "/t_p", + # TODO: Find out what this does! + "/webrtc", + # TODO: Find out what this does! + "/onevc", + # TODO: Find out what this does! + "/notify_disconnect", + # Old, no longer active topics + # These are here just in case something interesting pops up + "/inbox", + "/mercury", + "/messaging_events", + "/orca_message_notifications", + "/pp", + "/t_rtc", + "/webrtc_response", + ] + + username = { + # The user ID + "u": self._state.user_id, + # Session ID + "s": session_id, + # Active status setting + "chat_on": self._chat_on, + # foreground_state - Whether the window is focused + "fg": self._foreground, + # Can be any random ID + "d": self._state._client_id, + # Application ID, taken from facebook.com + "aid": 219994525426954, + # MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing + "st": topics, + # MQTT extension by FB, allows making a PUBLISH while CONNECTing + # Using this is more efficient, but the same can be acheived with: + # def on_connect(*args): + # mqtt.publish(topic, payload, qos=1) + # mqtt.on_connect = on_connect + # TODO: For some reason this doesn't work! + "pm": [ + # { + # "topic": topic, + # "payload": payload, + # "qos": 1, + # "messageId": 65536, + # } + ], + # Unknown parameters + "cp": 3, + "ecp": 10, + "ct": "websocket", + "mqtt_sid": "", + "dc": "", + "no_auto_fg": True, + "gas": None, + "pack": [], + } + + # TODO: Make this thread safe + self._mqtt.username_pw_set(_util.json_minimal(username)) + + headers = { + # TODO: Make this access thread safe + "Cookie": _util.get_cookie_header( + self._state._session, "https://edge-chat.facebook.com/chat" + ), + "User-Agent": self._state._session.headers["User-Agent"], + "Origin": "https://www.facebook.com", + "Host": self._HOST, + } + + self._mqtt.ws_set_options( + path="/chat?sid={}".format(session_id), headers=headers + ) + + def loop_once(self, on_error=None): + """Run the listening loop once. + + Returns whether to keep listening or not. + """ + rc = self._mqtt.loop(timeout=1.0) + + # If disconnect() has been called + if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting: + return False # Stop listening + + if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: + # If known/expected error + if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST: + log.warning("Connection lost, retrying") + elif rc == paho.mqtt.client.MQTT_ERR_NOMEM: + # This error is wrongly classified + # See https://github.com/eclipse/paho.mqtt.python/issues/340 + log.warning("Connection error, retrying") + else: + err = paho.mqtt.client.error_string(rc) + log.error("MQTT Error: %s", err) + # For backwards compatibility + if on_error: + on_error(_exception.FBchatException("MQTT Error {}".format(err))) + + # Wait before reconnecting + self._mqtt._reconnect_wait() + + # Try reconnecting + self._configure_connect_options() + try: + self._mqtt.reconnect() + except ( + # Taken from .loop_forever + paho.mqtt.client.socket.error, + OSError, + paho.mqtt.client.WebsocketConnectionError, + ) as e: + log.debug("MQTT reconnection failed: %s", e) + + return True # Keep listening + + def disconnect(self): + self._mqtt.disconnect() + + def set_foreground(self, value): + payload = _util.json_minimal({"foreground": value}) + info = self._mqtt.publish("/foreground_state", payload=payload, qos=1) + self._foreground = value + # TODO: We can't wait for this, since the loop is running with .loop_forever() + # info.wait_for_publish() + + def set_chat_on(self, value): + # TODO: Is this the right request to make? + data = {"make_user_available_when_in_foreground": value} + payload = _util.json_minimal(data) + info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1) + self._chat_on = value + # TODO: We can't wait for this, since the loop is running with .loop_forever() + # info.wait_for_publish() + + # def send_additional_contacts(self, additional_contacts): + # payload = _util.json_minimal({"additional_contacts": additional_contacts}) + # info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1) + # + # def browser_close(self): + # info = self._mqtt.publish("/browser_close", payload=b"{}", qos=1) diff --git a/lib/common.py b/lib/common.py index 5492629..60df5e0 100644 --- a/lib/common.py +++ b/lib/common.py @@ -104,7 +104,6 @@ class College(): data["concentration_ids[%d]" % i] = sub[1] return data - @classmethod def from_dict(cls, data: dict): self = cls() @@ -176,7 +175,7 @@ def todict(obj, include: list = None): return res -def tobase64(obj): +def tobase64(obj) -> str: if isinstance(obj, (dict, list)): obj = demjson.encode(obj) if isinstance(obj, str): @@ -187,6 +186,66 @@ def tobase64(obj): raise BaseException("must str,list,dict,bytes") -def frombase64(base): +def frombase64(base) -> dict: string = base64.b64decode(base).decode() return demjson.decode(string) + + +def format_proxies(conf: dict, format='requests'): + ''' + e.g. + { + 'type': 'socks5', + 'host': '47.56.152.111', + 'pass': 'nantian888', + 'port': '1080', + 'user': 'ntkj' + } + ''' + type_ = conf.get('type', None) + host = conf.get('host', None) + port = conf.get('port', None) + user = conf.get('user', None) + pass_ = conf.get('pass', None) + + if format == 'requests': + info = [] + info.append(type_) + info.append("://") + p_auth = [] + if user: + p_auth.append(str(user)) + p_auth.append(":") + if pass_: + p_auth.append(str(pass_)) + if p_auth: + info.append(''.join(p_auth)) + info.append('@') + info.append(host) + if port: + info.append(':') + info.append(str(port)) + text = ''.join(info) + return {'http': text, 'https': text} + else: + _type_ = getattr(__import__('socks'), 'PROXY_TYPES').get(type_.upper()) + assert _type_, '代理协议错误,可选socks5,socks4,http' + proxy = { + "proxy_type": _type_, + "proxy_addr": host, + "proxy_port": int(port), + } + if user: + proxy['proxy_username'] = user + if pass_: + proxy['proxy_password'] = pass_ + return proxy + + +if __name__ == '__main__': + conf = {'type': 'http', + 'host': '47.56.152.111', + 'pass': 'nantian888', + 'port': '1080', + 'user': 'ntkj'} + print(format_proxies(conf, '1')) diff --git a/lib/facebook.py b/lib/facebook.py index a9dddf0..1548a61 100644 --- a/lib/facebook.py +++ b/lib/facebook.py @@ -14,10 +14,12 @@ from enum import Enum import demjson import furl +import requests from fbchat import Client, ThreadType, Message, Sticker, FBchatUserError, _exception, log, _util from fbchat._state import State, session_factory, is_home from lib import google_map, common +from lib._mqtt import Mqtt from lib.common import WorkPlace, College from utils import parse_html, _attachment @@ -25,10 +27,16 @@ from utils import parse_html, _attachment class PCState(State): @classmethod - def login(cls, email, password, on_2fa_callback, user_agent=None): + def login(cls, email, password, on_2fa_callback, user_agent=None, proxy=None): '''换成了PC的登录方式''' session = session_factory(user_agent=user_agent) + if proxy: + short_proxy = common.format_proxies(proxy, 'requests') + long_proxy = common.format_proxies(proxy, 'sock') + session.params.update({'sock': long_proxy}) + session.proxies.update(short_proxy) + soup = parse_html.find_input_fields_with_pc(session.get("https://www.facebook.com/").text) data = dict((elem["name"], elem["value"]) for elem in soup @@ -71,6 +79,19 @@ class PCState(State): return "Location" in r.headers and is_home(r.headers["Location"]) @classmethod + def from_cookies(cls, cookies, user_agent=None, proxy=None): + session = session_factory(user_agent=user_agent) + session.cookies = requests.cookies.merge_cookies(session.cookies, cookies) + + if proxy: + short_proxy = common.format_proxies(proxy, 'requests') + long_proxy = common.format_proxies(proxy, 'sock') + + session.params.update({'sock': long_proxy}) + session.proxies.update(short_proxy) + return cls.from_session(session=session) + + @classmethod def from_session(cls, session): def get_user_id(session): @@ -81,7 +102,7 @@ class PCState(State): user_id = get_user_id(session) - r = session.get(_util.prefix_url("/"), timeout=10) + r = session.get(_util.prefix_url("/"), timeout=30) b = parse_html.show_home_page(r.text) logout_menu = b.find('div', id='logoutMenu') @@ -171,6 +192,7 @@ class FacebookClient(Client): self.email = user_obj.email self.user_obj = user_obj self.extend = None + self.proxy = user_obj.proxy if hasattr(user_obj, 'proxy') else None super().__init__(user_obj.email, user_obj.password, user_obj.user_agent, max_tries, user_obj.format_cookie()) @@ -179,7 +201,7 @@ class FacebookClient(Client): def setSession(self, session_cookies, user_agent=None): try: - self._state = PCState.from_cookies(session_cookies, user_agent=user_agent) + self._state = PCState.from_cookies(session_cookies, user_agent=user_agent, proxy=self.proxy) self._uid = self._state.user_id except Exception as e: log.exception("Failed loading session") @@ -203,6 +225,7 @@ class FacebookClient(Client): password, on_2fa_callback=self.on2FACode, user_agent=user_agent, + proxy=self.proxy ) self._uid = self._state.user_id except Exception as err: @@ -1010,3 +1033,21 @@ class FacebookClient(Client): 'ext_data': next_data } return response + + def startListening(self): + if not self._mqtt: + self._mqtt = Mqtt.connect( + state=self._state, + on_message=self._parse_message, + chat_on=self._markAlive, + foreground=True, + ) + self.onQprimer(ts=int(time.time() * 1000), msg=None) + self.listening = True + + def securityDevice(self): + data = {'av': self.uid, 'fb_api_caller_class': 'RelayModern', + 'fb_api_req_friendly_name': 'SecuritySettingsSessionGroupRefetchQuery', + 'variables': '{"session_count":20}', 'doc_id': '2549907381730805'} + res = self._post('https://www.facebook.com/api/graphql/', data) + return res['data']['security_settings']['sessions'] diff --git a/lib/sqlhelper.py b/lib/sqlhelper.py index 5a908b7..4f9a5d0 100644 --- a/lib/sqlhelper.py +++ b/lib/sqlhelper.py @@ -73,6 +73,8 @@ class UserList(Base): fbid = Column(String(20), index=True) status = Column(Integer, default=0, nullable=False, index=True) + # proxy = Column(String(256)) + def __repr__(self): return "User(id={}, email={}, password={}, cookie={}, fbid={}, status={})" \ .format(self.id, self.email, self.password, len(self.cookie) if self.cookie else None, self.fbid, diff --git a/requirements.txt b/requirements.txt index 0b8280c..5a089d0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,5 @@ furl==2.1.0 sqlalchemy==1.3.12 psutil==5.6.7 demjson==2.2.4 -apscheduler==3.6.3 \ No newline at end of file +apscheduler==3.6.3 +pysocks==1.7.1 \ No newline at end of file