message.py 6.6 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2019-10-19 13:56
# @Author  : Lemon
# @File    : message.py
# @Software: PyCharm
import json
import math
import re
import time
from traceback import format_exc

from munch import Munch

import sched
from __global import var, isNil, get_regin_str
from __global.var import redis, upload_db_pool, sync_data_pool
from __sdk import wecall, database
from __sdk.api import call_to_server, randomPhone


def handler_upload(user: Munch):
    '''调用通讯录上传'''
    onlyImei = user.username
    if onlyImei.startswith('00'):
        client = var.pdd_array.get(user.client_name)
        if client.get_online(onlyImei):
            phonedicts = database.get_phones(size=200)
            phones = []
            for p in phonedicts:
                phone = p['phone']  # 0086123456
                md5 = p['md5']  # md5(0086123456)
                phones.append(phone)
                var.md5_detail[md5] = {'phone': phone, 'detail': int(p['detail']), 'prefix': p['prefix']}
            if len(phones) > 0:
                res = call_to_server(user, wecall.uploadContact, False, user.username, phones)
                print('call_to_server.res', user.username, res)
                res = json.loads(res)
                return res['code'] == 0
            return False
    else:
        print("未绑定手机号")
        return True


def control_user():
    '''遍历号码列表'''

    def doing(username):
        exp = 60 * 65
        try:
            key = 'user:upload:%s' % username
            count = redis.get(key)
            if count:
                count = int(count)
                exp = math.ceil(int(redis.pttl(key)) / 1000)
            else:
                exp = 60 * 66
                count = 0
            upload = 200 - count
            if upload > 0:
                print(key, '可以上传数:%d' % upload)
                try:
                    user = var.user_array[username]
                    if handler_upload(user=user):
                        redis.set(key, upload, ex=60 * 65)
                    else:
                        return 60 * 5
                except:
                    pass
                time.sleep(3.5)
            else:
                print(key, '不可以上传了')
            time.sleep(.1)
        except BaseException as e:
            print(e)
        return exp

    expr = []
    for username in var.initialized.copy():
        second = doing(username)
        expr.append(second)
    if not expr:
        expr.append(60)
    sched.add_task(control_user, min(expr), 'seconds')


def handler_func(V1Record):
    '''进行拉黑与设置手机号'''
    onlyImei, v1 = V1Record  # 是个元组
    var.locks[onlyImei].acquire()  # 对号码操作上锁
    user = var.user_array.get(onlyImei)
    res = call_to_server(user, wecall.setBlacklist, False, onlyImei, v1, True)
    try:
        res = json.loads(res)
        if res and res['code'] == 0:
            time.sleep(1.5)
            call_to_server(user, wecall.setPhoneNumList, False, onlyImei, v1, [randomPhone()])
    except BaseException as e:
        print('handler_func', res, e, sep='|')
    var.locks[onlyImei].release()  # 对号码操作解锁


def handler_release(Params):
    '''释放黑名单'''
    onlyImei, wxid, flag = Params
    user = var.user_array.get(onlyImei)
    call_to_server(user, wecall.setBlacklist, False, onlyImei, wxid, flag)


def on_receive(client, msg):
    data = Munch.fromDict(json.loads(msg))
    if data.type == 'openController':
        print('连接成功')
        sched.add_task(control_user, 30, 'seconds', job_id='control_user')
    if data.type in ['pong']:
        return
    if data.type == "notify":
        try:
            T2 = Munch.fromDict(json.loads(data.data))
            if 'phoneContact' == T2.type:  # 通讯录的异步回调
                phoneContact(T2)
            elif 'updateFriend' == T2.type:  # 好友资料更新的异步回调
                updateFriend(T2)
            elif 'account' == T2.type:
                account(client, T2)
            else:
                pass
        except:
            print(format_exc())
            pass


def phoneContact(T2):
    T3 = Munch.fromDict(json.loads(T2.body))
    assert T3.get('from') == 'fmessage'
    Request = Munch.fromDict(json.loads(T3.request))
    v1 = Request.fromusername
    md5 = re.findall(r'mobilelongidentify=\"(.*?)\"', T3.raw)[0]

    detail_and_phone = var.md5_detail.get(md5, {})
    nil_get_wxid = detail_and_phone.get('detail', None)
    phone = detail_and_phone.get('phone', None)

    if nil_get_wxid is None:
        print("通过MD5没取到信息")
        return

    V1Record = (T2.username, v1,)
    if V1Record not in var.v1_set:
        var.v1_set.add(V1Record)
        print('队列数目', len(var.v1_set))
        if nil_get_wxid == 1:  # 不需要详情,直接上传到数据库
            tableprefix = detail_and_phone.get('prefix', None)
            var.md5_detail.pop(md5, None)
            upload_db_pool.submit(database.upload_bind_data, phone, Request, tableprefix)
        else:
            var.v1MD5[v1] = md5  # 记录v1绑定md5
            sync_data_pool.submit(handler_func, V1Record)


def updateFriend(T2):
    # onlyImei = T2.username
    T3 = Munch.fromDict(json.loads(T2.body))
    if 'friend' in T3:
        friend = T3.friend
    elif 'old' in T3:
        friend = T3.old
    else:
        return
    v1 = friend.encryptUserName
    md5 = var.v1MD5.get(v1, None)
    if not md5:
        return
    phone_detail = var.md5_detail.get(md5, {})
    user = phone_detail.get('phone', None)
    prefix = phone_detail.get('prefix', None)
    var.md5_detail.pop(md5, None)

    wxid = friend.wxid
    nickname = friend.name
    alias = friend.alias
    sex = friend.sex
    assert user and wxid, ('参数不全', T2)

    province, city = get_regin_str(user, friend.province, friend.city)
    if isNil(alias) and not wxid.startswith('wxid_'):
        alias = friend.wxid

    Request = Munch.fromDict({
        'fromusername': wxid,
        'fromnickname': nickname,
        'alias': alias,
        'sex': sex,
        'province': province,
        'city': city,
    })
    upload_db_pool.submit(database.upload_bind_data, user, Request, prefix)

    #  释黑任务,投入线程池 (暂不处理)
    # Params = (onlyImei, wxid, False)
    # sync_data_pool.submit(handler_release, Params)


def account(client, Data):
    onlyImei = Data.username
    msg = json.loads(Data.body)
    event = msg['event']
    if event == 'onLogout':
        print("号码下线", onlyImei)
        client.remove_online(onlyImei)
    elif event == "onLoginSuccess":
        print("号码上线", onlyImei)
        client.add_online(onlyImei)