message.py
6.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019-10-19 13:56
# @Author : Lemon
# @File : message.py
# @Software: PyCharm
import json
import math
import re
import time
from traceback import format_exc
from munch import Munch
import sched
from __global import var, isNil, get_regin_str
from __global.var import redis, upload_db_pool, sync_data_pool
from __sdk import wecall, database
from __sdk.api import call_to_server, randomPhone
def handler_upload(user: Munch):
'''调用通讯录上传'''
onlyImei = user.username
if onlyImei.startswith('00'):
client = var.pdd_array.get(user.client_name)
if client.get_online(onlyImei):
phonedicts = database.get_phones(size=200)
phones = []
for p in phonedicts:
phone = p['phone'] # 0086123456
md5 = p['md5'] # md5(0086123456)
phones.append(phone)
var.md5_detail[md5] = {'phone': phone, 'detail': int(p['detail']), 'prefix': p['prefix']}
if len(phones) > 0:
res = call_to_server(user, wecall.uploadContact, False, user.username, phones)
print('call_to_server.res', user.username, res)
res = json.loads(res)
return res['code'] == 0
return False
else:
print("未绑定手机号")
return True
def control_user():
'''遍历号码列表'''
def doing(username):
exp = 60 * 65
try:
key = 'user:upload:%s' % username
count = redis.get(key)
if count:
count = int(count)
exp = math.ceil(int(redis.pttl(key)) / 1000)
else:
exp = 60 * 66
count = 0
upload = 200 - count
if upload > 0:
print(key, '可以上传数:%d' % upload)
try:
user = var.user_array[username]
if handler_upload(user=user):
redis.set(key, upload, ex=60 * 65)
else:
return 60 * 5
except:
pass
time.sleep(3.5)
else:
print(key, '不可以上传了')
time.sleep(.1)
except BaseException as e:
print(e)
return exp
expr = []
for username in var.initialized.copy():
second = doing(username)
expr.append(second)
if not expr:
expr.append(60)
sched.add_task(control_user, min(expr), 'seconds')
def handler_func(V1Record):
'''进行拉黑与设置手机号'''
onlyImei, v1 = V1Record # 是个元组
var.locks[onlyImei].acquire() # 对号码操作上锁
user = var.user_array.get(onlyImei)
res = call_to_server(user, wecall.setBlacklist, False, onlyImei, v1, True)
try:
res = json.loads(res)
if res and res['code'] == 0:
time.sleep(1.5)
call_to_server(user, wecall.setPhoneNumList, False, onlyImei, v1, [randomPhone()])
except BaseException as e:
print('handler_func', res, e, sep='|')
var.locks[onlyImei].release() # 对号码操作解锁
def handler_release(Params):
'''释放黑名单'''
onlyImei, wxid, flag = Params
user = var.user_array.get(onlyImei)
call_to_server(user, wecall.setBlacklist, False, onlyImei, wxid, flag)
def on_receive(client, msg):
data = Munch.fromDict(json.loads(msg))
if data.type == 'openController':
print('连接成功')
sched.add_task(control_user, 30, 'seconds', job_id='control_user')
if data.type in ['pong']:
return
if data.type == "notify":
try:
T2 = Munch.fromDict(json.loads(data.data))
if 'phoneContact' == T2.type: # 通讯录的异步回调
phoneContact(T2)
elif 'updateFriend' == T2.type: # 好友资料更新的异步回调
updateFriend(T2)
elif 'account' == T2.type:
account(client, T2)
else:
pass
except:
print(format_exc())
pass
def phoneContact(T2):
T3 = Munch.fromDict(json.loads(T2.body))
assert T3.get('from') == 'fmessage'
Request = Munch.fromDict(json.loads(T3.request))
v1 = Request.fromusername
md5 = re.findall(r'mobilelongidentify=\"(.*?)\"', T3.raw)[0]
detail_and_phone = var.md5_detail.get(md5, {})
nil_get_wxid = detail_and_phone.get('detail', None)
phone = detail_and_phone.get('phone', None)
if nil_get_wxid is None:
print("通过MD5没取到信息")
return
V1Record = (T2.username, v1,)
if V1Record not in var.v1_set:
var.v1_set.add(V1Record)
print('队列数目', len(var.v1_set))
if nil_get_wxid == 1: # 不需要详情,直接上传到数据库
tableprefix = detail_and_phone.get('prefix', None)
var.md5_detail.pop(md5, None)
upload_db_pool.submit(database.upload_bind_data, phone, Request, tableprefix)
else:
var.v1MD5[v1] = md5 # 记录v1绑定md5
sync_data_pool.submit(handler_func, V1Record)
def updateFriend(T2):
# onlyImei = T2.username
T3 = Munch.fromDict(json.loads(T2.body))
if 'friend' in T3:
friend = T3.friend
elif 'old' in T3:
friend = T3.old
else:
return
v1 = friend.encryptUserName
md5 = var.v1MD5.get(v1, None)
if not md5:
return
phone_detail = var.md5_detail.get(md5, {})
user = phone_detail.get('phone', None)
prefix = phone_detail.get('prefix', None)
var.md5_detail.pop(md5, None)
wxid = friend.wxid
nickname = friend.name
alias = friend.alias
sex = friend.sex
assert user and wxid, ('参数不全', T2)
province, city = get_regin_str(user, friend.province, friend.city)
if isNil(alias) and not wxid.startswith('wxid_'):
alias = friend.wxid
Request = Munch.fromDict({
'fromusername': wxid,
'fromnickname': nickname,
'alias': alias,
'sex': sex,
'province': province,
'city': city,
})
upload_db_pool.submit(database.upload_bind_data, user, Request, prefix)
# 释黑任务,投入线程池 (暂不处理)
# Params = (onlyImei, wxid, False)
# sync_data_pool.submit(handler_release, Params)
def account(client, Data):
onlyImei = Data.username
msg = json.loads(Data.body)
event = msg['event']
if event == 'onLogout':
print("号码下线", onlyImei)
client.remove_online(onlyImei)
elif event == "onLoginSuccess":
print("号码上线", onlyImei)
client.add_online(onlyImei)