作者 lemon

新增socks5代理支持,新增获取安全设备记录接口

#!/usr/bin/env python
import sys, os
import fbchat._mqtt
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
... ... @@ -7,6 +8,17 @@ sys.path.append(BASE_DIR)
from core import core
from conf import log_settings
def reload_mqtt():
'''替换_mqtt模块,增加Proxy支持'''
import importlib.machinery, importlib.util
loader = importlib.machinery.SourceFileLoader('fbchat._mqtt', 'lib/_mqtt.py')
spec = importlib.util.spec_from_loader(loader.name, loader)
fbchat._mqtt = importlib.util.module_from_spec(spec)
loader.exec_module(fbchat._mqtt)
if __name__ == '__main__':
# log_settings.load_logging_cfg().info("Running")
reload_mqtt()
core.run()
... ...
... ... @@ -16,6 +16,11 @@ from core.monitor import Monitor
from lib import control_server
from lib.socket_ import MessageSocketClient
try:
from lib.message_queue import MessageQueue as Queue
except:
Queue = None
monitor = Monitor()
monitor.version = settings.get_version()
... ... @@ -94,4 +99,9 @@ def run():
sched.start()
# init schedule end
# message queue start
if Queue:
monitor.queue = Queue(monitor._name, monitor.execute).start()
# message queue end
ioloop.IOLoop.instance().start()
... ...
... ... @@ -41,6 +41,7 @@ class Monitor(callback.CallBack):
self.version = None
self.executor = ThreadPoolExecutor(50, 'task_thread')
self.init_config = {}
self.queue = None
def bind(self, socket):
self._socket = socket
... ... @@ -147,6 +148,8 @@ class Monitor(callback.CallBack):
if hasattr(client, 'uid') and client.uid:
data['fbid'] = client.uid
payload = add_type("notify", data)
if self.queue: self.queue.publish_msg(data)
self._socket.send(payload)
def _task_(self, type_, client, taskid: int, code, msg: dict = None):
... ...
import re
import attr
import random
import paho.mqtt.client
import socks
from fbchat._core import log
from fbchat import _util, _exception, _graphql
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
def generate_session_id():
"""Generate a random session ID between 1 and 9007199254740991."""
return random.randint(1, 2 ** 53)
@attr.s(slots=True)
class Mqtt(object):
_state = attr.ib()
_mqtt = attr.ib()
_on_message = attr.ib()
_chat_on = attr.ib()
_foreground = attr.ib()
_sequence_id = attr.ib()
_sync_token = attr.ib(None)
_HOST = "edge-chat.facebook.com"
@classmethod
def connect(cls, state, on_message, chat_on, foreground):
mqtt = paho.mqtt.client.Client(
client_id="mqttwsclient",
clean_session=True,
protocol=paho.mqtt.client.MQTTv31,
transport="websockets",
)
mqtt.enable_logger()
if state._session.proxies:
proxy = state._session.params.get('sock')
mqtt.proxy_set(**proxy)
# mqtt.max_inflight_messages_set(20) # The rest will get queued
# mqtt.max_queued_messages_set(0) # Unlimited messages can be queued
# mqtt.message_retry_set(20) # Retry sending for at least 20 seconds
# mqtt.reconnect_delay_set(min_delay=1, max_delay=120)
# TODO: Is region (lla | atn | odn | others?) important?
mqtt.tls_set()
self = cls(
state=state,
mqtt=mqtt,
on_message=on_message,
chat_on=chat_on,
foreground=foreground,
sequence_id=cls._fetch_sequence_id(state),
)
# Configure callbacks
mqtt.on_message = self._on_message_handler
mqtt.on_connect = self._on_connect_handler
self._configure_connect_options()
# Attempt to connect
try:
rc = mqtt.connect(self._HOST, 443, keepalive=10)
except (
# Taken from .loop_forever
paho.mqtt.client.socket.error,
OSError,
paho.mqtt.client.WebsocketConnectionError,
) as e:
raise _exception.FBchatException("MQTT connection failed")
# Raise error if connecting failed
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
err = paho.mqtt.client.error_string(rc)
raise _exception.FBchatException("MQTT connection failed: {}".format(err))
return self
def _on_message_handler(self, client, userdata, message):
# Parse payload JSON
try:
j = _util.parse_json(message.payload.decode("utf-8"))
except (_exception.FBchatFacebookError, UnicodeDecodeError):
log.exception("Failed parsing MQTT data on %s as JSON", message.topic)
return
if message.topic == "/t_ms":
# Update sync_token when received
# This is received in the first message after we've created a messenger
# sync queue.
if "syncToken" in j and "firstDeltaSeqId" in j:
self._sync_token = j["syncToken"]
self._sequence_id = j["firstDeltaSeqId"]
# Update last sequence id when received
if "lastIssuedSeqId" in j:
self._sequence_id = j["lastIssuedSeqId"]
if "errorCode" in j:
# Known types: ERROR_QUEUE_OVERFLOW | ERROR_QUEUE_NOT_FOUND
# 'F\xfa\x84\x8c\x85\xf8\xbc-\x88 FB_PAGES_INSUFFICIENT_PERMISSION\x00'
log.error("MQTT error code %s received", j["errorCode"])
# TODO: Consider resetting the sync_token and sequence ID here?
log.debug("MQTT payload: %s, %s", message.topic, j)
# Call the external callback
self._on_message(message.topic, j)
@staticmethod
def _fetch_sequence_id(state):
"""Fetch sequence ID."""
params = {
"limit": 1,
"tags": ["INBOX"],
"before": None,
"includeDeliveryReceipts": False,
"includeSeqID": True,
}
log.debug("Fetching MQTT sequence ID")
# Same request as in `Client.fetchThreadList`
(j,) = state._graphql_requests(_graphql.from_doc_id("1349387578499440", params))
try:
return int(j["viewer"]["message_threads"]["sync_sequence_id"])
except (KeyError, ValueError):
# TODO: Proper exceptions
raise
def _on_connect_handler(self, client, userdata, flags, rc):
if rc == 21:
raise _exception.FBchatException(
"Failed connecting. Maybe your cookies are wrong?"
)
if rc != 0:
return # Don't try to send publish if the connection failed
# configure receiving messages.
payload = {
"sync_api_version": 10,
"max_deltas_able_to_process": 1000,
"delta_batch_size": 500,
"encoding": "JSON",
"entity_fbid": self._state.user_id,
}
# If we don't have a sync_token, create a new messenger queue
# This is done so that across reconnects, if we've received a sync token, we
# SHOULD receive a piece of data in /t_ms exactly once!
if self._sync_token is None:
topic = "/messenger_sync_create_queue"
payload["initial_titan_sequence_id"] = str(self._sequence_id)
payload["device_params"] = None
else:
topic = "/messenger_sync_get_diffs"
payload["last_seq_id"] = str(self._sequence_id)
payload["sync_token"] = self._sync_token
self._mqtt.publish(topic, _util.json_minimal(payload), qos=1)
def _configure_connect_options(self):
# Generate a new session ID on each reconnect
session_id = generate_session_id()
topics = [
# Things that happen in chats (e.g. messages)
"/t_ms",
# Group typing notifications
"/thread_typing",
# Private chat typing notifications
"/orca_typing_notifications",
# Active notifications
"/orca_presence",
# Other notifications not related to chats (e.g. friend requests)
"/legacy_web",
# Facebook's continuous error reporting/logging?
"/br_sr",
# Response to /br_sr
"/sr_res",
# TODO: Investigate the response from this! (A bunch of binary data)
# "/t_p",
# TODO: Find out what this does!
"/webrtc",
# TODO: Find out what this does!
"/onevc",
# TODO: Find out what this does!
"/notify_disconnect",
# Old, no longer active topics
# These are here just in case something interesting pops up
"/inbox",
"/mercury",
"/messaging_events",
"/orca_message_notifications",
"/pp",
"/t_rtc",
"/webrtc_response",
]
username = {
# The user ID
"u": self._state.user_id,
# Session ID
"s": session_id,
# Active status setting
"chat_on": self._chat_on,
# foreground_state - Whether the window is focused
"fg": self._foreground,
# Can be any random ID
"d": self._state._client_id,
# Application ID, taken from facebook.com
"aid": 219994525426954,
# MQTT extension by FB, allows making a SUBSCRIBE while CONNECTing
"st": topics,
# MQTT extension by FB, allows making a PUBLISH while CONNECTing
# Using this is more efficient, but the same can be acheived with:
# def on_connect(*args):
# mqtt.publish(topic, payload, qos=1)
# mqtt.on_connect = on_connect
# TODO: For some reason this doesn't work!
"pm": [
# {
# "topic": topic,
# "payload": payload,
# "qos": 1,
# "messageId": 65536,
# }
],
# Unknown parameters
"cp": 3,
"ecp": 10,
"ct": "websocket",
"mqtt_sid": "",
"dc": "",
"no_auto_fg": True,
"gas": None,
"pack": [],
}
# TODO: Make this thread safe
self._mqtt.username_pw_set(_util.json_minimal(username))
headers = {
# TODO: Make this access thread safe
"Cookie": _util.get_cookie_header(
self._state._session, "https://edge-chat.facebook.com/chat"
),
"User-Agent": self._state._session.headers["User-Agent"],
"Origin": "https://www.facebook.com",
"Host": self._HOST,
}
self._mqtt.ws_set_options(
path="/chat?sid={}".format(session_id), headers=headers
)
def loop_once(self, on_error=None):
"""Run the listening loop once.
Returns whether to keep listening or not.
"""
rc = self._mqtt.loop(timeout=1.0)
# If disconnect() has been called
if self._mqtt._state == paho.mqtt.client.mqtt_cs_disconnecting:
return False # Stop listening
if rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
# If known/expected error
if rc == paho.mqtt.client.MQTT_ERR_CONN_LOST:
log.warning("Connection lost, retrying")
elif rc == paho.mqtt.client.MQTT_ERR_NOMEM:
# This error is wrongly classified
# See https://github.com/eclipse/paho.mqtt.python/issues/340
log.warning("Connection error, retrying")
else:
err = paho.mqtt.client.error_string(rc)
log.error("MQTT Error: %s", err)
# For backwards compatibility
if on_error:
on_error(_exception.FBchatException("MQTT Error {}".format(err)))
# Wait before reconnecting
self._mqtt._reconnect_wait()
# Try reconnecting
self._configure_connect_options()
try:
self._mqtt.reconnect()
except (
# Taken from .loop_forever
paho.mqtt.client.socket.error,
OSError,
paho.mqtt.client.WebsocketConnectionError,
) as e:
log.debug("MQTT reconnection failed: %s", e)
return True # Keep listening
def disconnect(self):
self._mqtt.disconnect()
def set_foreground(self, value):
payload = _util.json_minimal({"foreground": value})
info = self._mqtt.publish("/foreground_state", payload=payload, qos=1)
self._foreground = value
# TODO: We can't wait for this, since the loop is running with .loop_forever()
# info.wait_for_publish()
def set_chat_on(self, value):
# TODO: Is this the right request to make?
data = {"make_user_available_when_in_foreground": value}
payload = _util.json_minimal(data)
info = self._mqtt.publish("/set_client_settings", payload=payload, qos=1)
self._chat_on = value
# TODO: We can't wait for this, since the loop is running with .loop_forever()
# info.wait_for_publish()
# def send_additional_contacts(self, additional_contacts):
# payload = _util.json_minimal({"additional_contacts": additional_contacts})
# info = self._mqtt.publish("/send_additional_contacts", payload=payload, qos=1)
#
# def browser_close(self):
# info = self._mqtt.publish("/browser_close", payload=b"{}", qos=1)
... ...
... ... @@ -104,7 +104,6 @@ class College():
data["concentration_ids[%d]" % i] = sub[1]
return data
@classmethod
def from_dict(cls, data: dict):
self = cls()
... ... @@ -176,7 +175,7 @@ def todict(obj, include: list = None):
return res
def tobase64(obj):
def tobase64(obj) -> str:
if isinstance(obj, (dict, list)):
obj = demjson.encode(obj)
if isinstance(obj, str):
... ... @@ -187,6 +186,66 @@ def tobase64(obj):
raise BaseException("must str,list,dict,bytes")
def frombase64(base):
def frombase64(base) -> dict:
string = base64.b64decode(base).decode()
return demjson.decode(string)
def format_proxies(conf: dict, format='requests'):
'''
e.g.
{
'type': 'socks5',
'host': '47.56.152.111',
'pass': 'nantian888',
'port': '1080',
'user': 'ntkj'
}
'''
type_ = conf.get('type', None)
host = conf.get('host', None)
port = conf.get('port', None)
user = conf.get('user', None)
pass_ = conf.get('pass', None)
if format == 'requests':
info = []
info.append(type_)
info.append("://")
p_auth = []
if user:
p_auth.append(str(user))
p_auth.append(":")
if pass_:
p_auth.append(str(pass_))
if p_auth:
info.append(''.join(p_auth))
info.append('@')
info.append(host)
if port:
info.append(':')
info.append(str(port))
text = ''.join(info)
return {'http': text, 'https': text}
else:
_type_ = getattr(__import__('socks'), 'PROXY_TYPES').get(type_.upper())
assert _type_, '代理协议错误,可选socks5,socks4,http'
proxy = {
"proxy_type": _type_,
"proxy_addr": host,
"proxy_port": int(port),
}
if user:
proxy['proxy_username'] = user
if pass_:
proxy['proxy_password'] = pass_
return proxy
if __name__ == '__main__':
conf = {'type': 'http',
'host': '47.56.152.111',
'pass': 'nantian888',
'port': '1080',
'user': 'ntkj'}
print(format_proxies(conf, '1'))
... ...
... ... @@ -14,10 +14,12 @@ from enum import Enum
import demjson
import furl
import requests
from fbchat import Client, ThreadType, Message, Sticker, FBchatUserError, _exception, log, _util
from fbchat._state import State, session_factory, is_home
from lib import google_map, common
from lib._mqtt import Mqtt
from lib.common import WorkPlace, College
from utils import parse_html, _attachment
... ... @@ -25,10 +27,16 @@ from utils import parse_html, _attachment
class PCState(State):
@classmethod
def login(cls, email, password, on_2fa_callback, user_agent=None):
def login(cls, email, password, on_2fa_callback, user_agent=None, proxy=None):
'''换成了PC的登录方式'''
session = session_factory(user_agent=user_agent)
if proxy:
short_proxy = common.format_proxies(proxy, 'requests')
long_proxy = common.format_proxies(proxy, 'sock')
session.params.update({'sock': long_proxy})
session.proxies.update(short_proxy)
soup = parse_html.find_input_fields_with_pc(session.get("https://www.facebook.com/").text)
data = dict((elem["name"], elem["value"])
for elem in soup
... ... @@ -71,6 +79,19 @@ class PCState(State):
return "Location" in r.headers and is_home(r.headers["Location"])
@classmethod
def from_cookies(cls, cookies, user_agent=None, proxy=None):
session = session_factory(user_agent=user_agent)
session.cookies = requests.cookies.merge_cookies(session.cookies, cookies)
if proxy:
short_proxy = common.format_proxies(proxy, 'requests')
long_proxy = common.format_proxies(proxy, 'sock')
session.params.update({'sock': long_proxy})
session.proxies.update(short_proxy)
return cls.from_session(session=session)
@classmethod
def from_session(cls, session):
def get_user_id(session):
... ... @@ -81,7 +102,7 @@ class PCState(State):
user_id = get_user_id(session)
r = session.get(_util.prefix_url("/"), timeout=10)
r = session.get(_util.prefix_url("/"), timeout=30)
b = parse_html.show_home_page(r.text)
logout_menu = b.find('div', id='logoutMenu')
... ... @@ -171,6 +192,7 @@ class FacebookClient(Client):
self.email = user_obj.email
self.user_obj = user_obj
self.extend = None
self.proxy = user_obj.proxy if hasattr(user_obj, 'proxy') else None
super().__init__(user_obj.email, user_obj.password, user_obj.user_agent, max_tries, user_obj.format_cookie())
... ... @@ -179,7 +201,7 @@ class FacebookClient(Client):
def setSession(self, session_cookies, user_agent=None):
try:
self._state = PCState.from_cookies(session_cookies, user_agent=user_agent)
self._state = PCState.from_cookies(session_cookies, user_agent=user_agent, proxy=self.proxy)
self._uid = self._state.user_id
except Exception as e:
log.exception("Failed loading session")
... ... @@ -203,6 +225,7 @@ class FacebookClient(Client):
password,
on_2fa_callback=self.on2FACode,
user_agent=user_agent,
proxy=self.proxy
)
self._uid = self._state.user_id
except Exception as err:
... ... @@ -1010,3 +1033,21 @@ class FacebookClient(Client):
'ext_data': next_data
}
return response
def startListening(self):
if not self._mqtt:
self._mqtt = Mqtt.connect(
state=self._state,
on_message=self._parse_message,
chat_on=self._markAlive,
foreground=True,
)
self.onQprimer(ts=int(time.time() * 1000), msg=None)
self.listening = True
def securityDevice(self):
data = {'av': self.uid, 'fb_api_caller_class': 'RelayModern',
'fb_api_req_friendly_name': 'SecuritySettingsSessionGroupRefetchQuery',
'variables': '{"session_count":20}', 'doc_id': '2549907381730805'}
res = self._post('https://www.facebook.com/api/graphql/', data)
return res['data']['security_settings']['sessions']
... ...
... ... @@ -73,6 +73,8 @@ class UserList(Base):
fbid = Column(String(20), index=True)
status = Column(Integer, default=0, nullable=False, index=True)
# proxy = Column(String(256))
def __repr__(self):
return "User(id={}, email={}, password={}, cookie={}, fbid={}, status={})" \
.format(self.id, self.email, self.password, len(self.cookie) if self.cookie else None, self.fbid,
... ...
... ... @@ -8,4 +8,5 @@ furl==2.1.0
sqlalchemy==1.3.12
psutil==5.6.7
demjson==2.2.4
apscheduler==3.6.3
\ No newline at end of file
apscheduler==3.6.3
pysocks==1.7.1
\ No newline at end of file
... ...