diff options
Diffstat (limited to 'cogs/Steam.py')
-rw-r--r-- | cogs/Steam.py | 278 |
1 files changed, 278 insertions, 0 deletions
diff --git a/cogs/Steam.py b/cogs/Steam.py new file mode 100644 index 0000000..089e4f8 --- /dev/null +++ b/cogs/Steam.py @@ -0,0 +1,278 @@ +from discord.ext import commands +from collections import OrderedDict +import os +import discord +from fuzzywuzzy import fuzz +import requests +import json +import locale +import pickle +import importlib +import utils +importlib.reload(utils) +import time +colours = {"default": 0, + "teal": 0x1abc9c, + "dark teal": 0x11806a, + "green": 0x2ecc71, + "dark green": 0x1f8b4c, + "blue": 0x3498db, + "dark blue": 0x206694, + "purple": 0x9b59b6, + "dark purple": 0x71368a, + "magenta": 0xe91e63, + "dark magenta": 0xad1457, + "gold": 0xf1c40f, + "dark gold": 0xc27c0e, + "orange": 0xe67e22, + "dark orange": 0xa84300, + "red": 0xe74c3c, + "dark red": 0x992d22, + "lighter grey": 0x95a5a6, + "dark grey": 0x607d8b, + "light grey": 0x979c9f, + "darker grey": 0x546e7a, + "blurple": 0x7289da, + "greyple": 0x99aab5} + + +class Steam(commands.Cog): + def __init__(self, bot): + self.bot = bot + locale.setlocale(locale.LC_ALL, '') + + # @commands.group(invoke_without_command=True) + # async def steam(self, ctx): + # """Search steam for games.""" + # if search == (): + # # Create message + # prefix = await self.bot.get_prefix(ctx.message) + # + # emb = await utils.embed(ctx, f"Commands for `{prefix[2]}steam`", "", "") + # emb = await utils.field(emb, f"{prefix[2]}steam `[text]`", "Return an SCPs info from the scp-wiki") + # emb = await utils.field(emb, f"{prefix[2]}steam `[game name]`", "Search for a game on steam") + # emb = await utils.field(emb, f"{prefix[2]}steam list `[game name]`", "Search for the closest top 10 games on Steam") + # emb = await utils.field(emb, f"{prefix[2]}steam list select `[id]`", "Show a games information from the search list.") + # await ctx.send(embed=emb) + # if ctx.channel.permissions_for(ctx.me).manage_messages: + # await ctx.message.delete() + + @commands.command( + aliases=[], + application_command_meta=commands.ApplicationCommandMeta( + options=[ + discord.ApplicationCommandOption( + name="game_name", + description="Search for a game on the Steam store.", + type=discord.ApplicationCommandOptionType.string, + required=True, + ) + ], + ) + ) + @commands.defer(ephemeral=False) + async def steam(self, ctx, *, game_name): + """Search for a game on the Steam store.""" + # await ctx.channel.trigger_typing() + # await ctx.interaction.edit_original_message(content="Searching for game...") + search = " ".join(game_name) + start_time = time.time() + # Get live or cached JSON + json_data = await self.get_json_data() + + # Get 1 match form the given search term + new_list = await self.get_best_matches(search, 1, json_data) + + if new_list != {}: + # Collect data for the given AppID + message_output = await self.steam_by_app_id(new_list["1"]["app_id"]) + + # Generate an embed message for the AppID + if message_output is None: + emb = await utils.embed(ctx, f"Unable to access information for the game `{new_list['1']['app_name']}`", + "This game/app is likely hidden and will not work.") + await ctx.send(embed=emb) + + elif type(message_output) == dict: + emb = await self.create_embed(message_output, ctx, f"\nDone in {round(time.time() - start_time, 2)} seconds.") + await ctx.interaction.edit_original_message(content=None, embed=emb) + else: + await ctx.send(content=f"I couldn't find anything similar to that") + + # @commands.defer(ephemeral=False) + # @steam.command(invoke_without_command=True) + # async def list(self, ctx, *search_term): + # await ctx.message.channel.trigger_typing() + # search_term = " ".join(search_term) + # + # # Load steam search config + # try: + # loaded_obj = await self.load_obj("steam") + # except EOFError: + # loaded_obj = {} + # + # # Get live or cached JSON + # json_data = await self.get_json_data() + # top_matches = await self.get_best_matches(search_term, 10, json_data) + # best_ten = "" + # for item in top_matches: + # best_ten = best_ten + f"**{item})** {top_matches[item]['app_name']} `({top_matches[item]['app_match']}%)`\n" + # best_ten = best_ten + "\nThese are the closest 10 matches.\nUse the list select `[id]` command to pick an item from this list." + # + # search_json = {f"{ctx.guild.id}#{ctx.author.id}": top_matches} + # await self.save_obj(search_json, "steam") + # + # # Print output + # message_embed = discord.Embed(title="", description=best_ten, colour=colours["blue"]) + # message_embed.set_footer(text=f"Requested by {ctx.message.author}.") + # message_embed.set_author(name=ctx.author.display_name, url="", icon_url=ctx.author.avatar_url) + # + # await ctx.send(embed=message_embed) + # + # @list.command() + # async def select(self, ctx, *, search_int): + # ret = await self.load_obj("steam") + # + # if f"{ctx.guild.id}#{ctx.author.id}" in str(ret): + # for i, item in enumerate(ret[f"{ctx.guild.id}#{ctx.author.id}"].values()): + # if i+1 == int(search_int): + # # Collect data for the given AppID + # message_output = await self.steam_by_app_id(item["app_id"]) + # + # # Generate an embed message for the AppID + # if message_output is None: + # message_embed = discord.Embed(title=f"Unable to access information for the game `{item['app_name']}`", + # description="This game/app is likely hidden and will not work.", + # colour=colours["blue"]) + # await ctx.send(embed=message_embed) + # + # elif type(message_output) == dict: + # app_embed = await self.create_embed(message_output, ctx) + # await ctx.send(embed=app_embed) + # + # ret.pop(f"{ctx.guild.id}#{ctx.author.id}", None) + # await self.save_obj(ret, "steam") + # + # else: + # await ctx.send(content="No search results to use.") + + async def create_embed(self, message_output, ctx, footer=""): + message_output["description"] = message_output["description"].replace("&", "&").replace(""", "\"") + + emb = await utils.embed(ctx, message_output["name"], message_output["description"], url=message_output["url"], image=message_output["image"], footer=footer) + emb = await utils.field(emb, "Price", message_output["cost"] + " " + message_output["discount"]) + emb = await utils.field(emb, "Platforms", value=message_output["support"], inline=True) + emb = await utils.field(emb, "Reviews", value=message_output["reviews"], inline=True) + + val_release = "N/A" if message_output["releasedate"] == "" else message_output["releasedate"] + emb = await utils.field(emb, "Release Date", value=val_release, inline=True) + return emb + + async def get_json_data(self): + response = requests.get("http://api.steampowered.com/ISteamApps/GetAppList/v2") + json_data = json.loads(response.text) + + if json_data == {'applist': {'apps': []}}: + with open('./cogs/steam_app_cache.json') as json_file: + json_data = json.load(json_file) + return json_data + + async def get_best_matches(self, search_term, return_total, json_data): + lst_all_results = [] + for i, item in enumerate(json_data['applist']['apps']): + # print(f"var: {search_term} - type:{type(search_term)}") + # print(f"var: {str(item['name']).lower()} - type:{type(str(item['name']).lower())}") + + app_match = fuzz.ratio(str(item['name']).lower(), search_term.lower()) + lst_all_results.append({"app_match": app_match, "app_id": str(item['appid']), "app_name": str(item['name'])}) + lst_all_results_sorted = sorted(lst_all_results, key=lambda k: k["app_match"], reverse=True) + + return_dict = {} + for i, item in enumerate(lst_all_results_sorted[:return_total]): + app_key = i+1 + + return_dict[str(app_key)] = {"app_match": item["app_match"], + "app_id": item["app_id"], + "app_name": item["app_name"]} + return return_dict + + @staticmethod + async def steam_by_app_id(str_app_id): + # Obtain and load the JSON + # https://store.steampowered.com/api/appdetails?appids=204360&cc=gb&filters=price_overview + + game_req = requests.get(f"http://store.steampowered.com/api/appdetails?appids={str_app_id}&cc=gb") + game_req_data = json.loads(game_req.text) + + if game_req_data[str_app_id]["success"]: + # Platforms + game_support_concat = [] + if game_req_data[str_app_id]['data']['platforms']['windows']: + game_support_concat.append("Windows") + if game_req_data[str_app_id]['data']['platforms']['linux']: + game_support_concat.append("Linux") + if game_req_data[str_app_id]['data']['platforms']['mac']: + game_support_concat.append("Mac") + game_support_concat = ", ".join(game_support_concat) + + # Determine price + game_name = str(game_req_data[str_app_id]['data']['name']) + game_free = str(game_req_data[str_app_id]['data']['is_free']) + game_url = f"http://store.steampowered.com/app/{str_app_id}/{game_name.replace(' ', '_')}/" + if game_free == "True": + game_cost = "Free" + game_discount = "" + else: + try: + game_cost = game_req_data[str_app_id]['data']['price_overview']['final_formatted'] + game_discount = str(game_req_data[str_app_id]['data']['price_overview']['discount_percent']) + if game_discount == "0": + game_discount = "" + else: + game_discount = "(" + game_discount + "% off!)" + except Exception as e: + game_cost = "No Price Listed!" + game_discount = "" + + # Get reviews data + review_request = requests.get(f"https://store.steampowered.com/appreviews/{str_app_id}?json=1&language=all&purchase_type=all") + review_request_data = json.loads(review_request.text) + + game_details = {"name": game_name, + "cost": game_cost, + "discount": game_discount, + "support": game_support_concat, + "description": str(game_req_data[str_app_id]['data']['short_description']), + "url": game_url, + "image": game_req_data[str_app_id]['data']["header_image"], + "reviews": review_request_data["query_summary"]["review_score_desc"], + "releasedate": game_req_data[str_app_id]['data']["release_date"]["date"] + } + else: + game_details = None + + return game_details + + @staticmethod + async def save_obj(obj, filename: str): + with open(f"{filename}.pkl", 'wb') as f: + pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL) + + @staticmethod + async def load_obj(filename): + if not os.path.isfile("./steam.pkl"): + os.mknod("./steam.pkl") + + with open(f"{filename}.pkl", "rb") as f: + return pickle.load(f) + + +def setup(bot): + print("INFO: Loading [Steam]... ", end="") + bot.add_cog(Steam(bot)) + print("Done!") + + +def teardown(bot): + print("INFO: Unloading [Steam]") + |