-
Notifications
You must be signed in to change notification settings - Fork 0
/
block.py
120 lines (101 loc) · 5.11 KB
/
block.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from argparse import ArgumentParser
import sys
import json
from authentication import *
'''
def replyProbBotOfMentions(api, rapidapi_key, twitter_app_auth):
bom = botometer.Botometer(wait_on_ratelimit=True, rapidapi_key=rapidapi_key, **twitter_app_auth)
tweets = api.mentions_timeline()
for tweet in tweets:
#print(tweet.id)
result = bom.check_account('@' + tweet.user.screen_name)
percent = result["cap"]["universal"] * 100.0
print(tweet.user.screen_name, percent)
msg = "Oi @" + tweet.user.screen_name + ", você tem " + str(percent) + "% de chance de ser bot. Bjomeliga!"
api.update_status(
msg,
in_reply_to_status_id=tweet.id,
auto_populate_reply_metadata=True
)
print(msg, tweet.user.screen_name)
'''
def searchAndBlock(twitterApi, botometerObject, userSearched, filename, bot_threshold):
tweetCount = 0
maxTweets = 10000000
max_id = -1
sinceId = None
searchQuery = userSearched
tweetsPerQry = 100
print("Searching max {0} tweets".format(maxTweets))
while tweetCount < maxTweets:
if (max_id <= 0):
if (not sinceId):
new_tweets = twitterApi.search(q=searchQuery, count=tweetsPerQry)
else:
new_tweets = twitterApi.search(q=searchQuery, count=tweetsPerQry,
since_id=sinceId)
else:
if (not sinceId):
new_tweets = twitterApi.search(q=searchQuery, count=tweetsPerQry,
max_id=str(max_id - 1))
else:
new_tweets = twitterApi.search(q=searchQuery, count=tweetsPerQry,
max_id=str(max_id - 1),
since_id=sinceId)
if not new_tweets:
print("No more tweets found")
break
for tweet in new_tweets:
response = botometerObject.check_account('@' + tweet.user.screen_name)
#nota de 0 a 5 (5 é bot, 0 é humano)
probOfBeingBot = response["display_scores"]["universal"]
print(tweet.user.screen_name, probOfBeingBot, tweet.text)
wasBlocked = False
if probOfBeingBot > bot_threshold:
wasBlocked = True
twitterApi.create_block(tweet.user.screen_name)
d = {}
d["username"] = tweet.user.screen_name
d["score"] = probOfBeingBot
d["tweet"] = tweet._json
d["wasBlocked"] = wasBlocked
f = open(filename, 'a+')
f.write(json.dumps(d))
f.write("\n\n")
f.close()
tweetCount += len(new_tweets)
print("Searched {0} tweets".format(tweetCount))
max_id = new_tweets[-1].id
print ("Searched {0} tweets, Saved to {1}".format(tweetCount, filename))
def init(args):
if(args.token_secret is None and args.token_key is None):
api, token_key, token_secret = authenticateOnTwitterWithoutToken(args.consumer_key, args.secret_key)
else:
api = authenticateOnTwitterWithToken(args.consumer_key, args.secret_key, args.token_key, args.token_secret)
token_key = args.token_key
token_secret = args.token_secret
rapidapi_key = args.rapid_key
twitter_app_auth = {
'consumer_key': args.consumer_key,
'consumer_secret': args.secret_key,
'access_token': token_key,
'access_token_secret': token_secret,
}
botometerObj = authenticateOnBotometer(rapidapi_key, twitter_app_auth)
searchAndBlock(twitterApi, botometerObj, args.username, args.block_file, args.bot_threshold)
if __name__ == "__main__":
parser = ArgumentParser(description = 'Realiza uma busca pelas menções recebidas de um nome de um usuário e realiza bloqueios automáticos quando acha uma conta bot / Performs a search for mentions received by a twitter account and performs automatic blocks when finding a bot account')
parser.add_argument('consumer_key', help = 'Chave consumidora / Consumer key')
parser.add_argument('secret_key', help = 'Chave secreta / Secret key')
parser.add_argument('rapid_key', help = 'Chave Rapid API / Key rapid API')
parser.add_argument('username', help = 'Usuário buscado / Searched usename')
parser.add_argument('-tk', dest='token_key', default = None, help = 'Chave de token / Token key')
parser.add_argument('-ts', dest='token_secret', default = None, help = 'Token secreto / Token secret')
parser.add_argument('-f', dest='block_file', default = 'block.json', help="Nome do arquivo de saída contendo o relatório em formato json / Name of the output file containing the report in json format")
parser.add_argument('-b', action = 'store', dest = 'bot_threshold', type = float, default = 2.5, required = False,
help = 'Limiar que define quando um usuário será bloqueado / Threshold that defines when a user will be blocked')
if len(sys.argv) < 4:
parser.print_help(sys.stderr)
sys.exit(1)
args = parser.parse_args()
init(args)