summaryrefslogtreecommitdiff
path: root/cogs/IRC.py
diff options
context:
space:
mode:
authorlexicade <jasonnlelong@gmail.com>2023-01-27 21:06:30 +0000
committerlexicade <jasonnlelong@gmail.com>2023-01-27 21:06:30 +0000
commit52801b4de1d63cd01191acf7fcee137977140ec0 (patch)
tree08271a1f1e3e8060486b6651c67c9934867c648e /cogs/IRC.py
parent8df873808c86805624851356f5dea76ec621de23 (diff)
Project initHEADmain
Diffstat (limited to 'cogs/IRC.py')
-rw-r--r--cogs/IRC.py444
1 files changed, 444 insertions, 0 deletions
diff --git a/cogs/IRC.py b/cogs/IRC.py
new file mode 100644
index 0000000..93ce167
--- /dev/null
+++ b/cogs/IRC.py
@@ -0,0 +1,444 @@
+from discord.ext import commands
+import discord
+import aiojobs
+import socket
+import asyncio
+import threading
+import _thread
+import re
+import json
+import itertools
+import requests
+import discord
+from discord import Webhook, RequestsWebhookAdapter
+from datetime import datetime
+
+
+class IRCClient:
+ def __init__(self, chan_pairs, config, discord_client):
+ self.connected = False
+ self.chan_pairs = chan_pairs
+ self.config = config
+ self.discord_client = discord_client
+
+ def irc_connect(self, server, port, nickname, password):
+ print("Connecting to {}:{}".format(server, port))
+
+ s = socket.socket()
+
+ s.connect((server, port))
+ s.send(f"NICK {nickname}\r\n".encode())
+ s.send(f"USER {nickname} * * {nickname}\r\n".encode())
+ s.send(f"PASS {nickname}:{password}\r\n".encode())
+
+ self.connected = True
+ print("Connected.")
+ self.send_to_webhook("[IRC Bridge]", "Connected!")
+
+ return s, server, nickname
+
+ def join_channels(self):
+ for channel in [pair[0] for pair in self.chan_pairs]:
+ print(f"Joining {channel}")
+ self.s.send(f"JOIN {channel}\r\n".encode())
+ return
+
+ def msg_process(self, rawmsg): # figure out what we want to do with our irc message
+ prefix, command, args, msg = self.split_msg(rawmsg)
+ print(f"[IRC] {rawmsg}")
+
+ # msg format is "nick PRIVMSG #channel :message"
+ if command in ["376", "422"]:
+ print("END OF MOTD")
+ self.join_channels()
+
+ elif command == "PING":
+ self.s.send("PONG {}\r\n".format(msg).encode())
+ print("PONG'd")
+
+ elif command == "PRIVMSG":
+ author = prefix.split("!")[0]
+
+ # send message to run comm coroutine
+ for pair in self.chan_pairs:
+ if args[0] == pair[0]:
+ if msg.startswith("?status") and len(msg.split()) > 1:
+ name = msg.split(" ", 1)[1].lower()
+ status_msg = ""
+ for member in self.discord_client.get_channel(pair[1]).guild.members:
+ if member.name.lower() == name or (member.nick and member.nick.lower() == name):
+ status_msg += "{} is currently {}".format(member.name, str(member.status))
+
+ self.send_message(args[0], status_msg)
+ continue
+
+ elif msg.startswith("?optin"):
+ cfg = GlobalMethods.get_config()
+ if author not in cfg["users"]:
+ print("a")
+ cfg["users"].append(author)
+ with open("./cogs/config.json", 'w') as fp:
+ json.dump(cfg, fp, indent=4)
+ response = "Added to THE BRIDGE!"
+ else:
+ response = "You're already on THE BRIDGE!"
+ print("b")
+ print("c")
+ self.send_message(args[0], response)
+ continue
+
+ elif msg.startswith("?optout"):
+ cfg = GlobalMethods.get_config()
+ if author in cfg["users"]:
+ cfg["users"].remove(author)
+ with open("./cogs/config.json", 'w') as fp:
+ json.dump(cfg, fp, indent=4)
+ response = "Removed from THE BRIDGE!"
+ else:
+ response = "You've not opted into THE BRIDGE!"
+ self.send_message(args[0], response)
+ continue
+
+ # Check allowed user
+ config = GlobalMethods.get_config()
+
+ # Clean message
+ clean_msg = GlobalMethods.irc_to_discord(msg)
+
+ # clean_msg = GlobalMethods.irc_to_discord(msg, pair[1], self.discord_client)
+ action_regex = re.match(r"\u0001ACTION (.+)\u0001", clean_msg) # format /me
+ # if action_regex:
+ # formatted_msg = "**\* {}** {}".format(author, action_regex.group(1))
+ # else:
+ # formatted_msg = "**<{}>** {}".format(author, clean_msg)
+
+ discord_channel = self.discord_client.get_channel(pair[1])
+ # Create webhook
+ self.send_to_webhook(author, clean_msg)
+
+ return
+
+ def split_msg(self, rawmsg): # interpret irc message
+ msgpre, sep, msg = rawmsg.partition(" :")
+
+ if not sep: # if sep is empty
+ msg = None
+
+ msgpre_list = msgpre.split()
+
+ if msgpre_list[0].startswith(":"):
+ prefix = msgpre_list.pop(0).lstrip(":")
+ else:
+ prefix = None
+
+ command = msgpre_list.pop(0)
+
+ args = msgpre_list
+
+ return prefix, command, args, msg
+
+ def irc_run(self): # start main irc loop
+ if not self.connected:
+ self.s, self.server, self.nick = self.irc_connect(**self.config)
+
+ line_buffer = ""
+
+ while True:
+ responce = self.s.recv(2048)
+
+ line_buffer += responce.decode()
+ lines = line_buffer.split("\n")
+
+ line_buffer = lines.pop()
+
+ for line in lines:
+ line = line.rstrip()
+
+ if line:
+ self.msg_process(line)
+ else:
+ pass
+
+ def stop_irc(self):
+ self._Thread_stop()
+
+ def send_to_webhook(self, author, msg):
+ config = GlobalMethods.get_config()
+
+ opt_ins = GlobalMethods.get_opts()
+ if author not in opt_ins:
+ return
+
+ webhook = Webhook.partial(config["webhook"][0]["id"],
+ config["webhook"][0]["key"],
+ adapter=RequestsWebhookAdapter())
+ webhook.send(content=msg,
+ wait=False,
+ username=author,
+ avatar_url=None,
+ tts=False,
+ file=None,
+ files=None,
+ embed=None,
+ embeds=None)
+ webhook = None
+
+ # profile image: f"https://eu.ui-avatars.com/api/name={author}?length=1?&background=0088ff&color=FFF"
+
+ def send_message(self, channel, msg): # send irc message
+ try:
+ # self.send("PRIVMSG {} :{}\r\n".format(channel, msg).encode())
+ self.s.send("PRIVMSG {} :{}\r\n".format(channel, msg).encode())
+
+ except BrokenPipeError as e:
+ _thread.interrupt_main()
+ # exit("Error in message size too large. Exiting...")
+ return
+
+
+class GlobalMethods:
+ def discordToIrc_deprec(msg):
+ def replaceFormatting(form, replacement, string):
+ start_form = re.escape(form)
+ end_form = re.escape(form[::-1]) # reverse it
+
+ pattern = r"{}((?:(?!{}).)*?){}".format(start_form, start_form, end_form)
+ str_split = re.split(pattern, string)
+
+ if len(str_split) == 1: # no formatting required
+ return str_split[0]
+
+ new_str = ""
+ for idx, part in enumerate(str_split):
+ if idx % 2 == 1:
+ if re.search(r"https?:\/\/[^ \n]*$", new_str): # make sure this formatting is not part of a url
+ new_str += "{}{}{}".format(form, part, form[::-1])
+ else:
+ new_str += "{}{}\x0F".format(replacement, part)
+ else:
+ new_str += part
+
+ return new_str
+
+ def createHaste(code):
+ response = requests.post("https://hastebin.com/documents", data=code)
+ key = response.json()["key"]
+ url = "https://hastebin.com/" + key
+ return url
+
+ formatting_table = [ # comment lines of this table to disable certain types of formatting relay
+ ("***__", "\x02\x1D\x1F"), # ***__UNDERLINE BOLD ITALICS__***
+ ("__***", "\x02\x1D\x1F"), # __***UNDERLINE BOLD ITALICS***__
+ ("**__", "\x02\x1F"), # **__UNDERLINE BOLD__**
+ ("__**", "\x02\x1F"), # __**UNDERLINE BOLD**__
+ ("*__", "\x1D\x1F"), # *__UNDERLINE ITALICS__*
+ ("__*", "\x1D\x1F"), # __*UNDERLINE ITALICS*__
+ ("***", "\x02\x1D"), # ***BOLD ITALICS***
+ ("**_", "\x02\x1D"), # **_BOLD ITALICS_**
+ ("_**", "\x02\x1D"), # _**BOLD ITALICS**_
+ ("__", "\x1F"), # __UNDERLINE__
+ ("**", "\x02"), # **BOLD**
+ ("*", "\x1D"), # *ITALICS*
+ ("_", "\x1D"), # _ITALICS_
+ ("`", "\x11"), # `MONOSPACE`
+ ("~~", "\x1e") # ~~STRIKETHROUGH~~
+ ]
+
+ # replace codeblocks
+ msg = re.sub(r"```(?:\w+\n|\n)?(.+?)```", lambda m: createHaste(m.group(1)), msg, flags=re.S)
+
+ # replace newlines
+ if "\n" in msg:
+ msg = msg.replace("\n", " ")
+
+ # replace formatting
+ for form in formatting_table:
+ msg = replaceFormatting(form[0], form[1], msg)
+
+ # clean up emotes
+ msg = re.sub(r"<(:\w+:)\d+>", lambda m: m.group(1), msg)
+
+ return msg
+
+ def ircToDiscord_deprec(msg, channel, discord_client):
+ # print(f"[IRC] {msg}")
+ msg = re.sub(r"\x03\d{0,2}(?:,\d{0,2})?", "", msg)
+
+ formatting_table = [
+ (["\x02", "\x1D", "\x1F"], "***__"), # bold italics underline
+ (["\x1D", "\x1F"], "*__"), # italics underline
+ (["\x02", "\x1F"], "**_"), # bold underline
+ (["\x02", "\x1D"], "***"), # bold italics
+ (["\x02"], "**"), # bold
+ (["\x1D"], "*"), # italics
+ (["\x1F"], "__"), # underline
+ (["\x11"], "`"), # code
+ (["\x1e"], "~~") # strikethrough
+ ]
+
+ for form in formatting_table:
+ # check for matches for all permutation of the list
+ perms = itertools.permutations(form[0])
+ for perm in perms:
+ if "\x0F" not in msg:
+ msg += "\x0F"
+ msg = re.sub(r"{}(.*?)\x0F".format("".join(perm)),
+ lambda m: "{}{}{}".format(form[1], m.group(1), form[1][::-1]), msg)
+
+ for char in ["\x02", "\x1D", "\x1F", "\x0F"]:
+ msg = msg.replace(char, "")
+
+ mentions = re.findall(r"@(\S+)", msg)
+ if mentions:
+ def mentionGetter(name_match):
+ name = name_match.group(1)
+ for member in discord_client.get_channel(channel).guild.members: # dota2mods serverid
+ if member.name.lower() == name.lower() or (member.nick and member.nick.lower() == name.lower()):
+ return member.mention
+ # user was not found, just return original text
+ return "@" + name
+
+ msg = re.sub(r"@(\S+)", mentionGetter, msg)
+
+ return msg
+
+ def discord_to_irc(self, msg):
+ # Clean up emoji IDs
+ for emoji_match in re.findall(r"<a?:\D+:\d+>", msg.content):
+ clean_match = re.search(r"a?:\D+:", emoji_match)
+ msg.content = msg.content.replace(emoji_match, clean_match.group()).strip()
+
+ # Clean up channel IDs
+ for channel_match in re.findall(r"<#\d+>", msg.content):
+ channel_id = re.search(r"\d+", channel_match)
+ channel = self.bot.get_channel(int(channel_id.group()))
+ msg.content = msg.content.replace(channel_match, f"#{channel.name}/Discord").strip()
+
+ # Clean up user IDs
+ for user_match in re.findall(r"<@!?\d+>", msg.content):
+ user_id = re.search(r"\d+", user_match)
+ user = self.bot.get_user(int(user_id.group()))
+ msg.content = msg.content.replace(user_match, f"@{user.name}").strip()
+
+ # Check up on the URLs
+ # for url_match in re.findall(r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+", msg):
+ for attachment in msg.attachments:
+ msg.content = f"{msg.content} {attachment.url}".strip()
+
+ return msg
+
+ @staticmethod
+ def irc_to_discord(msg):
+ if msg.startswith("!") or msg.startswith("sb!"):
+ msg = f".{msg}"
+ return msg
+
+ @staticmethod
+ def get_config():
+ with open("./cogs/config.json") as fp:
+ config = json.load(fp)
+
+ return config
+
+ @staticmethod
+ def get_opts():
+ cfg = GlobalMethods.get_config()
+ response = ", ".join(cfg["users"])
+ return response
+
+class IRCBridge(commands.Cog):
+ def __init__(self, bot):
+ self.bot = bot
+ global extension_name
+ global irc_client
+ irc_client = None
+ extension_name = "[IRC Bridge] "
+
+ # sch_irc = await aiojobs.create_scheduler()
+ # await sch_irc.spawn(irc_run(self, ctx, con, sch_irc))
+
+ @commands.command(hidden=True)
+ async def irc(self, ctx):
+ """Create a link to an IRC channel"""
+ config = GlobalMethods.get_config()
+ chan_pairs = [(pair["irc_channel"], pair["discord_channel"]) for pair in config["pairs"]]
+ client = self.bot
+
+ global irc_thread
+ global irc_client
+ irc_client = IRCClient(chan_pairs=chan_pairs, config=config["irc"], discord_client=client)
+ irc_thread = None
+
+ irc_thread = threading.Thread(target=irc_client.irc_run, daemon=True)
+ irc_thread.start()
+
+ @commands.Cog.listener('on_message')
+ async def irc_on_message(self, message):
+ global irc_client
+ # if message.channel.id == 547581821957701632 and len(message.content) > 0:
+ # # Create webhook
+ # webhook = Webhook.partial(676475492202709033, "MDwKMQhB6zB6qol-U7k9pdKbEmp_B1s-LjoXNslyFJ_CZrO7CST2GdFpDFmz8nSNuBCZ", adapter=RequestsWebhookAdapter())
+ #
+ # # Send temperature as text
+ # webhook.send(content=u"" + message.content + "",
+ # wait=False,
+ # username=message.author.name,
+ # avatar_url=f"https://eu.ui-avatars.com/api/name={message.author.name}?length=1?&background=0088ff&color=FFF",
+ # tts=False,
+ # file=None,
+ # files=None,
+ # embed=None,
+ # embeds=None)
+ #
+ #
+ # #https://discordapp.com/api/webhooks/676475492202709033/MDwKMQhB6zB6qol-U7k9pdKbEmp_B1s-LjoXNslyFJ_CZrO7CST2GdFpDFmz8nSNuBCZ
+ #
+ # elif message.channel.id == 654002737167597588:
+ # print(f"{message.author.name}:{message.content}")
+ # print(f"[IRC] {message}")
+
+ if message.webhook_id is not None:
+ return
+
+ if len(message.embeds) == 1 and message.content == "":
+ # message.content = message.embeds[0].description
+ message.content = "Embedded. Cannot show this message."
+
+ if irc_client is not None:
+ # print(f"Message from bot: {message.author != self.bot.user}")
+ # print(f"Message is webhook: {message.webhook_id is None}")
+ # if message.author != self.bot.user:
+ config = GlobalMethods.get_config()
+
+ chan_pairs = [(pair["irc_channel"], pair["discord_channel"]) for pair in config["pairs"]]
+ if chan_pairs[0][1] == message.channel.id:
+ # Check for username shenannigans
+ # ?
+
+ # Ensure message content gets cleaned up
+ message = GlobalMethods.discord_to_irc(self, message)
+
+ msg_string = f"{message.author.name}: {message.content}"
+
+ irc_client.send_message(chan_pairs[0][0], msg_string)
+
+ if message.content == "sb!shutdown":
+ global irc_thread
+ irc_thread.stop_irc()
+
+
+def setup(bot):
+ print("INFO: Loading [IRC]... ", end="")
+ bot.add_cog(IRCBridge(bot))
+ print("Done!")
+
+
+def teardown(bot):
+ # global irc_thread
+ # global irc_client
+ # irc_client.shutdown()
+ # irc_thread.stop()
+ # irc_client = None
+ # irc_thread = None
+
+ print("INFO: Unloading [IRC]")