diff options
author | lexicade <jasonnlelong@gmail.com> | 2023-01-27 21:06:30 +0000 |
---|---|---|
committer | lexicade <jasonnlelong@gmail.com> | 2023-01-27 21:06:30 +0000 |
commit | 52801b4de1d63cd01191acf7fcee137977140ec0 (patch) | |
tree | 08271a1f1e3e8060486b6651c67c9934867c648e /cogs/ExtensionLoader.py | |
parent | 8df873808c86805624851356f5dea76ec621de23 (diff) |
Diffstat (limited to 'cogs/ExtensionLoader.py')
-rw-r--r-- | cogs/ExtensionLoader.py | 304 |
1 files changed, 304 insertions, 0 deletions
diff --git a/cogs/ExtensionLoader.py b/cogs/ExtensionLoader.py new file mode 100644 index 0000000..5de3d29 --- /dev/null +++ b/cogs/ExtensionLoader.py @@ -0,0 +1,304 @@ +from discord.ext import commands, tasks +import discord +import traceback, sys +import configparser +import os +import json +import importlib +import utils +import random +importlib.reload(utils) + + +class ExtensionLoader(commands.Cog): + def __init__(self, bot): + self.bot = bot + # global extension_name + # extension_name = "Extension Loader" + self.init_load_ext.start() + + @tasks.loop(count=1) + async def init_load_ext(self): + # Get list of Cogs from config + self.check_for_new_cogs(self.bot) + lst_cogs = self.load_config(f"./config/cogs_{self.bot.user.name}.json") + + # Load all loaded Cogs + for item in lst_cogs["loaded"]: + if item != "ExtensionLoader": + # await self.manage_ext("load", item) + try: + self.bot.load_extension(f"cogs.{item}") + except: + True + + def bot_admin_check(ctx): + # Read config + config = configparser.ConfigParser() + config.read('config.ini') + + results = list(map(int, config["synthy"]["bot_admins"].split(","))) + + if ctx.message.author.id in list(results): + return True + else: + # raise commands.MissingPermissions(['Synthy Administrator']) + True + + async def manage_ext(self, action: str, extension: str, ctx): + try: + if action == "load": + self.bot.load_extension(f"cogs.{extension}") + await ctx.message.add_reaction("✅") + + elif action == "reload": + self.bot.reload_extension(f"cogs.{extension}") + + if self.bot.user.id == 459056626377293824: + await self.bot.register_application_commands() + + elif self.bot.user.id == 900672193543942144: + guild = self.bot.get_guild(578293484843434061) + await self.bot.register_application_commands(guild=guild) + await ctx.message.add_reaction("⚔") + + await ctx.message.add_reaction("✅") + + elif action == "unload" and os.path.splitext(os.path.basename(__file__))[0] != extension: + self.bot.unload_extension(f"cogs.{extension}") + await ctx.message.add_reaction("✅") + + return + + except discord.ext.commands.ExtensionNotFound as e: + emb = await utils.embed(ctx, f"{extension}", f"The cog `{e.name}` wasn\'t found. Check this is spelt correctly.") + + except discord.ext.commands.errors.ExtensionNotLoaded as e: + emb = await utils.embed(ctx, f"{extension} The extension `{e.name}` is not loaded.", "") + + except discord.ext.commands.errors.ExtensionAlreadyLoaded as e: + emb = await utils.embed(ctx, f"{extension}", f"The extension `{e.name}` is already loaded.") + + except discord.ext.commands.errors.NoEntryPointError as e: + emb = await utils.embed(ctx, f"{extension}", f"There is not entry point in extension `{e.name}`.") + + except discord.ext.commands.errors.ExtensionFailed as e: + emb = await utils.embed(ctx, f"Failed to load {extension}", f"There is an execution error in `{e.name}`.\n"+ + f"{e.original}") + await ctx.send(embed=emb) + + @commands.check(bot_admin_check) + @commands.command(pass_context=True, hidden=True) + async def log(self, ctx, *, extension_name): + result = await self.config_log_toggle(extension_name, f"./config/cogs_{self.bot.user.name}.json") + if result: + emb = await utils.embed(ctx, "Debug Logging", result) + await ctx.send(embed=emb) + + # print(ctx.guild.me.guild_permissions) + if ctx.guild.me.permissions_in(ctx.channel).manage_messages: + await ctx.message.delete() + else: + await ctx.message.add_reaction("✅") + + @commands.check(bot_admin_check) + @commands.command(pass_context=True, hidden=True) + async def load(self, ctx, *, extension_name): + await self.load_to_config(extension_name, f"./config/cogs_{self.bot.user.name}.json") + await self.manage_ext("load", extension_name, ctx) + + @commands.check(bot_admin_check) + @commands.bot_has_permissions(embed_links=True, add_reactions=True) + @commands.command(pass_context=True, hidden=True) + async def reload(self, ctx, *, extension_name): + await self.manage_ext("reload", extension_name, ctx) + + @commands.check(bot_admin_check) + @commands.command(pass_context=True, hidden=True) + async def unload(self, ctx, *, extension_name): + await self.manage_ext("unload", extension_name, ctx) + + @commands.check(bot_admin_check) + @commands.command(pass_context=True, hidden=True) + async def slashunload(self, ctx): + await ctx.message.add_reaction("⌛") + if self.bot.user.id == 459056626377293824: + await self.bot.register_application_commands(None) + + elif self.bot.user.id == 900672193543942144: + guild = self.bot.get_guild(578293484843434061) + await self.bot.register_application_commands(None, guild=guild) + + await ctx.message.add_reaction("✅") + + @commands.check(bot_admin_check) + @commands.command(pass_context=True, hidden=True) + async def slashload(self, ctx): + await ctx.message.add_reaction("⌛") + if self.bot.user.id == 459056626377293824: + await self.bot.register_application_commands() + + elif self.bot.user.id == 900672193543942144: + guild = self.bot.get_guild(578293484843434061) + await self.bot.register_application_commands(guild=guild) + + await ctx.message.add_reaction("✅") + + @commands.check(bot_admin_check) + @commands.command(pass_context=True, hidden=True) + async def ohno(self, ctx): + newnum = 1 / 0 + + @commands.Cog.listener() + async def on_command_error(self, ctx: discord.ext.commands.Context, error): + if isinstance(error, commands.MissingPermissions): + print(error) + print(error.__dict__) + emb = await utils.embed(ctx, f'I can\'t let you do that, ~~Starfox~~ {ctx.author.display_name}!', f'This needs to be done by someone who has these permissions:\n{" ,".join(error.missing_perms)}') + await ctx.send(embed=emb) + + elif isinstance(error, commands.MemberNotFound): + emb = await utils.embed(ctx, f'Cannot find user', f'I couldn\'t find that user! Double check the capitalisation as this is case sensitive!') + await ctx.send(embed=emb) + return + + elif isinstance(error, commands.MissingRequiredArgument): + emb = await utils.embed(ctx, f'Unknown command', f'I couldn\'t figure out what you mean, double check the command your using isn\'t missing anything. Is this is a recurring error, you can let my [code monkey](https://discord.gg/bDAa7cu) know.') + await ctx.send(embed=emb) + return + + elif not (isinstance(error, commands.CommandNotFound)): + jokes = ["99 bugs in the code, 99 bugs in the code. You take one down, patch it around... 129 bugs in the code.", + "I asked my master why he writes bad code. He said \"No comment.\"", + "A code tester walks into the bar and orders 2764389237498 beers."] + + # Create user error + log = "".join(traceback.format_exception(type(error), error, error.__traceback__)) + emb = await utils.notice("That shouldn't happen...", + "I will let my code slave know so he can fix this."+ + "You can try this again later or visit [his home](https://discord.gg/bDAa7cu) if you want to speak to him.\n\n"+ + "") + await ctx.send(embed=emb) + + # Create developer error + err_chnl = self.bot.get_channel(740267554416885842) + + err_body_details = [f'Guild: {ctx.guild.name} ({ctx.guild.id})', + f'Channel: {ctx.channel.name} ({ctx.channel.id})', + f'Author: {ctx.author.mention} {ctx.author.name} ({ctx.author.id})', + f'Command: {ctx.prefix}{ctx.command} --- Success Status:{ctx.command_failed}'] + if hasattr(ctx, 'message') and hasattr(ctx.message, 'content'): + err_body_details.append(f'Message: {ctx.message.content}') + else: + slash_context = await self.bot.get_slash_context(ctx.interaction) + err_body_details.append(f'Values: {slash_context.given_values}') + err_body_errors = '\n'.join(err_body_details) + + err_content = f"<@&649208333626114048> {err_body_errors}" + + err_emb_title = f"{error}"[:256] + err_emb_body = log[:2048] + emb = await utils.notice(err_emb_title, err_emb_body) + + await err_chnl.send(content=err_content, embed=emb) + + @staticmethod + def load_config(config_path): + with open(config_path) as fp: + try: + config = json.load(fp) + except json.JSONDecodeError as e: + print(e) + return None + + return config + + @staticmethod + def save_config(config_path, config): + with open(config_path, 'w') as fp: + json.dump(config, fp, indent=4) + + async def load_to_config(self, cog, config_path): + config = self.load_config(config_path) + + if cog in config["unloaded"] and not cog in config["loaded"]: + config["unloaded"].remove(cog) + config["loaded"].append(cog) + self.save_config(f"./config/cogs_{self.bot.user.name}.json", config) + return True + return False + + async def unload_from_config(self, cog): + config = self.load_config(config_path) + + if not cog in config["unloaded"] and cog in config["loaded"]: + config["unloaded"].append(cog) + config["loaded"].remove(cog) + self.save_config(f"./config/cogs_{self.bot.user.name}.json", config) + return True + return False + + async def config_log_toggle(self, cog, config_path): + config = self.load_config(config_path) + + if cog in config['debug']['inactive'] and not cog in config['debug']['active']: + config['debug']['inactive'].remove(cog) + config['debug']['active'].append(cog) + self.save_config(f"./config/cogs_{self.bot.user.name}.json", config) + return f"Logging turned on for {cog}" + + elif cog in config['debug']['active'] and not cog in config['debug']['inactive']: + config['debug']['active'].remove(cog) + config['debug']['inactive'].append(cog) + self.save_config(f"./config/cogs_{self.bot.user.name}.json", config) + return f"Logging turned off for {cog}" + return False + + @staticmethod + def get_cogs(): + cogs = [] + for file in os.listdir("./cogs/"): + if file.endswith(".py") and not file.startswith("__"): + cogs.append(file.replace(".py", "")) + return cogs + + def check_for_new_cogs(self, bot): + # Get Config + config_path = f"./config/cogs_{bot.user.name}.json" + + if os.path.exists(config_path): + config = self.load_config(config_path) + if not config: + return "JSON could not be read." + else: + config = {"unloaded": [], "loaded": ["ExtensionLoader"], "debug": {"active": [], "inactive": []}} + with open(f"./config/cogs_{self.bot.user.name}.json", 'w') as config_file: + json.dump(config, config_file) + + # Get cogs + cogs = self.get_cogs() + + # Check all cogs + for cog in cogs: + if not cog in config["unloaded"] and not cog in config["loaded"]: + config["unloaded"].append(cog) + + if not cog in config['debug']['active'] and not cog in config['debug']['inactive']: + config['debug']['inactive'].append(cog) + + # Save config + self.save_config(config_path, config) + return "Successfully checked cogs" + + +def setup(bot): + # try: + print("INFO --- Loading Extensions --- ", end="\n") + bot.add_cog(ExtensionLoader(bot)) + print("INFO --- Extensions Loaded --- ", end="\n") + # except Exception as e: + # print(e) + + +def teardown(bot): + print("INFO: Unloading [ExtensionLoader]") |