forked from imbstt/impf-progress-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbot.py
More file actions
239 lines (197 loc) · 9.5 KB
/
bot.py
File metadata and controls
239 lines (197 loc) · 9.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
import asyncio
import csv
import urllib.request
from datetime import *
import configparser
import discord
import psycopg2
from discord.ext import tasks, commands
from discord.ext.commands.errors import *
discord_bot = configparser.ConfigParser()
discord_bot.read('config.ini')
# Bot Config
bot_config = discord_bot["Discord-Bot"]
token = bot_config["Token"]
prefix = bot_config["Prefix"]
general_config = discord_bot["General"]
hours = general_config["how-often-check"]
# Database
db_config = discord_bot["Database"]
db_port = db_config["port"]
db_host = db_config["host"]
db_user = db_config["user"]
db_database = db_config["database"]
db_password = db_config["password"]
db_tablename = db_config["tablename"]
database_connection = psycopg2.connect(
host=db_host,
user=db_user,
password=db_password,
database=db_database,
port=db_port)
cur = database_connection.cursor()
TSV_URL = "https://impfdashboard.de/static/data/germany_vaccinations_timeseries_v2.tsv"
CONFIG_FILENAME = 'state.cfg'
config = configparser.ConfigParser()
config.read(CONFIG_FILENAME)
def generateProgressbar(percentage):
num_chars = 25
num_filled = round(percentage * num_chars)
num_empty = num_chars - num_filled
display_percentage = str(round(percentage * 100, 1)).replace('.', ',')
msg = '{}{} {}%'.format('▓' * num_filled, '░' * num_empty, display_percentage)
return msg
def getCurrentdata(url):
tsvstream = urllib.request.urlopen(url)
tsv_file_lines = tsvstream.read().decode('utf-8').splitlines()
tsv_data_lines = csv.DictReader(tsv_file_lines, delimiter='\t')
# skip to last line
for line_dict in tsv_data_lines:
pass
return line_dict
def checkIfShouldTweet(data):
last_date = datetime.strptime(config.get('LAST_TWEET', 'date'), '%Y-%m-%d')
curr_date = datetime.strptime(data.get('date'), '%Y-%m-%d')
print("date: {} / {}".format(config.get('LAST_TWEET', 'date'), data.get('date')))
if last_date < curr_date:
impf_quote_erst_old = float(config.get('LAST_TWEET', 'impf_quote_erst'))
impf_quote_voll_old = float(config.get('LAST_TWEET', 'impf_quote_voll'))
impf_quote_erst_new = float(data.get('impf_quote_erst'))
impf_quote_voll_new = float(data.get('impf_quote_voll'))
print("erst: {} / {}".format(round(impf_quote_erst_old * 100, 1), round(impf_quote_erst_new * 100, 1)))
print("voll: {} / {}".format(round(impf_quote_voll_old * 100, 1), round(impf_quote_voll_new * 100, 1)))
if round(impf_quote_erst_old * 100, 1) < round(impf_quote_erst_new * 100, 1):
return True
if round(impf_quote_voll_old * 100, 1) < round(impf_quote_voll_new * 100, 1):
return True
return False
else:
print("date is same or older: do not send")
return False
def saveState(data):
config.set('LAST_TWEET', 'date', data.get('date'))
config.set('LAST_TWEET', 'impf_quote_erst', data.get('impf_quote_erst'))
config.set('LAST_TWEET', 'impf_quote_voll', data.get('impf_quote_voll'))
with open(CONFIG_FILENAME, 'w') as configfile:
config.write(configfile)
# Diese Funktion generiert den Inhalt der Nachricht welche anschließen per Embed versendet wird
def generateMessage(data):
bar_erst = generateProgressbar(float(data.get('impf_quote_erst')))
bar_voll = generateProgressbar(float(data.get('impf_quote_voll')))
msg = '{} mind. eine Impfdosis\n{} vollständig Geimpfte\n [Impfen lassen](https://pyshort.de/impfen)'.format(
bar_erst, bar_voll)
return msg
class MyClient(commands.AutoShardedBot):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.runAll.start()
async def on_ready(self):
print("Du bist eingeloggt als {0.user} über discord.py Version {1}".format(client, discord.__version__))
print(f"Der bot läuft zurzeit auf {len(list(client.shards))} Shards")
print("Der Bot ist auf den folgenden " + str(len(client.guilds)) + " Servern:")
for guild in client.guilds:
print("- " + str(guild.name))
client.loop.create_task(self.status_task())
# Das hier ist der Status welchen der Bot auf discord hat :)
async def status_task(self):
while True:
await client.change_presence(
activity=discord.Game(f"mit Impfstatistiken auf {str(len(client.guilds))} Servern"))
await asyncio.sleep(15)
await client.change_presence(activity=discord.Game("https://impfdashboard.de"))
await asyncio.sleep(15)
await client.change_presence(activity=discord.Game("https://pyshort.de/impfen"))
await asyncio.sleep(15)
# Hier ist die Nachricht die an den Owner gesendet wird sobald der Bot auf einen Server hinzugefügt wird
async def on_guild_join(self, guild):
info = await client.application_info()
await guild.owner.send(f"Hey {guild.owner.mention}\n"
f"Du hast scheinbar gerade den Bot auf deinen Server hinzugefügt :)\n"
f"Um den Bot zu nutzen musst du {prefix}start in den Kanal schreiben in welchen der Bot die Nachrichten senden soll\n"
f"Der Bot checkt alle {hours} Stunden die Impfzahlen vom RKI und BMG und schickt diese dann in einen Channel :)"
f"Mit {prefix}stop kannst du das ganze wieder deaktivieren\n"
f"Lg {info.owner.name}#{info.owner.discriminator}")
# Hiermit wird eine Nachricht geschrieben falls der Bot erwähnt wird
async def on_message(self, message):
try:
if message.mentions[0] == client.user:
await message.channel.send(f"Hey {message.author.mention} :)")
await message.channel.send(f'Mein Prefix ist "{prefix}"')
except IndexError:
pass
await client.process_commands(message)
@tasks.loop(count=1)
async def send_task(self):
data = getCurrentdata(TSV_URL)
cur = database_connection.cursor()
for guild in client.guilds:
try:
cur.execute(f"SELECT channelid from {db_tablename} where serverid = {guild.id}")
channelid = cur.fetchone()[0]
database_connection.commit()
channel = client.get_channel(channelid)
embed = discord.Embed(title="Impfstatistiken für Deutschland :)", url="https://impfdashboard.de",
description=generateMessage(data), timestamp=datetime.utcnow(),
colour=discord.Color.random())
await channel.send(embed=embed)
except Exception:
pass
@tasks.loop(hours=int(hours))
async def runAll(self):
data = getCurrentdata(TSV_URL)
should_send = checkIfShouldTweet(data)
if should_send:
self.send_task.start()
print("Messages were send!")
else:
pass
saveState(data)
@runAll.before_loop
async def before_my_task(self):
await self.wait_until_ready()
client = MyClient(command_prefix=prefix, intents=discord.Intents.all())
# Hier eine kleine Funktion um einen Einladungslink zu generieren
@client.command(name="einladen", aliases=["invite"])
async def einladen(ctx):
embed = discord.Embed()
embed.set_author(name="Hier klicken um den Bot einzuladen :)",
url=discord.utils.oauth_url(client.user.id, permissions=discord.Permissions(117760),
guild=ctx.guild))
await ctx.channel.send(embed=embed)
# Damit startest du das senden auf einem Discord
@commands.cooldown(1, 3, commands.BucketType.user)
@client.command(name="start", aliases=["setup", "starten", "on", "activate"])
async def activate(ctx):
if ctx.message.author.guild_permissions.administrator:
cur.execute(f"DELETE from {db_tablename} where serverid = {ctx.guild.id}")
cur.execute(f"INSERT INTO {db_tablename} (serverid, channelid) VALUES ({ctx.guild.id},{ctx.channel.id})")
database_connection.commit()
await ctx.send("Die Impfzahlen werden ab jetzt in diesen Channel geschickt :)")
else:
raise PermissionError
# Hiermit stoppst du den Bot vom senden
# Der cooldown ist dafür da damit man den Bot nicht zum crashen bringen kann indem man die ganze Zeit an und aus schaltet
@commands.cooldown(1, 3, commands.BucketType.user)
@client.command(name="stop", aliases=["off", "deactivate"])
async def deactivate(ctx):
if ctx.message.author.guild_permissions.administrator:
cur.execute(f"DELETE from {db_tablename} where serverid = {ctx.guild.id}")
database_connection.commit()
await ctx.send("Der Bot schickt jetzt also keine Impfzahlen mehr rein ://")
await ctx.send(f"Zum reaktivieren einfach {prefix}start schreiben :)")
else:
raise PermissionError
# Das hier ist das "Error Handling" welches bestimmt was passieren soll falls Fehler auftreten
@client.event
async def on_command_error(ctx, error):
if isinstance(error, MissingPermissions):
await ctx.send(f"{ctx.author.mention} Du hast keine Berechtigung dazu! ")
elif isinstance(error, CommandOnCooldown):
await ctx.send(
"**Dieser Command hat einen Cooldown!**, versuche es in {:.2f} Sekunden erneut".format(error.retry_after))
else:
await ctx.send("Fehler")
raise error
# Hier remove ich noch den unschönen help command welcher bei discord.py "vorinstalliert" ist
client.remove_command("help")
client.run(token)