Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GSoc 2024: Integration of LLM into command prompt and exception handling #2306

Open
egede opened this issue Mar 1, 2024 · 3 comments
Open

Comments

@egede
Copy link
Member

egede commented Mar 1, 2024

This is just a placeholder for the GSoC 2024 proposal.

@egede
Copy link
Member Author

egede commented Mar 1, 2024

We might want to extract information from MatterMost for assisting the LLM. The script pasted in below will be useful for that. Script is a bit old so might require a bit of adaptation.

image

from datetime import datetime
from mattermostdriver import Driver
import pathlib
import json


def connect(host, username, login_token):
    # Connect to server
    d = Driver({
        "url": host,
        "login_id": username,
        "password": "blablabla",
        "token": login_token,
        "port": 443,
        "scheme": 'https',
        "timeout": 30
    })
    d.login()
    # Get all usernames as we want to use those instead of the user ids
    user_id_to_name = {}
    page = 0
    print("Downloading all user data")
    while True:
        users_resp = d.users.get_users(params={"per_page": 200, "page": page})
        if len(users_resp) == 0:
            break
        for user in users_resp:
            user_id_to_name[user["id"]] = user["username"]
        page += 1
    my_user_id = d.users.get_user_by_username(username)["id"]
    print("Id of logged in user:", my_user_id)

    return d, user_id_to_name, my_user_id


def select_team(d, my_user_id):
    teams = d.teams.get_user_teams(my_user_id)
    print("Found teams:")
    for i_team, team in enumerate(teams):
        print("{}\t{}\t{}".format(i_team, team["name"], team["id"]))
    team_idx = int(input("Select team by idx: "))
    team = teams[team_idx]
    print("Selected team", team["name"])
    return team


def select_channel(d, team, my_user_id, user_id_to_name):
    channels = d.channels.get_channels_for_user(my_user_id, team["id"])
    # Add display name to direct messages
    for channel in channels:
        if channel["type"] != "D":
            continue

        # The channel name consists of two user ids connected by a double underscore
        user_ids = channel["name"].split("__")
        other_user_id = user_ids[1] if user_ids[0] == my_user_id else user_ids[0]
        channel["display_name"] = user_id_to_name[other_user_id]
    # Sort channels by name for easier search
    channels = sorted(channels, key=lambda x: x["display_name"].lower())
    print("Found Channels:")
    for i_channel, channel in enumerate(channels):
        print("{}\t{}\t{}".format(i_channel, channel["display_name"], channel["id"]))
    channel_input = input("Select channels by idx separated by comma: ")
    channel_idxs = channel_input.replace(" ", "").split(",")
    selected_channels = [channels[int(idx)] for idx in channel_idxs]
    print("Selected channel(s):", ", ".join([channel["display_name"] for channel in selected_channels]))
    return selected_channels


def export_channel(d, channel, user_id_to_name, output_base, before=None, after=None):
    # Sanitize channel name
    channel_name = channel["display_name"].replace("\\", "").replace("/", "")

    print("Exporting channel", channel_name)
    if after:
        after = datetime.strptime(after, '%Y-%m-%d').timestamp()
    if before:
        before = datetime.strptime(before, '%Y-%m-%d').timestamp()

    # Get all posts for selected channel
    page = 0
    all_posts = []
    while True:
        print("Requesting channel page {}".format(page))
        posts = d.posts.get_posts_for_channel(channel["id"], params={"per_page": 200, "page": page})

        if len(posts["posts"]) == 0:
            # If no posts are returned, we have reached the end
            break

        all_posts.extend([posts["posts"][post] for post in posts["order"]])
        page += 1
    print("Found {} posts".format(len(all_posts)))
    # Create output directory
    output_base = pathlib.Path(output_base) / channel_name
    if not output_base.exists():
        output_base.mkdir()
    # Simplify all posts to contain only username, date, message and files in chronological order
    simple_posts = []
    for i_post, post in enumerate(reversed(all_posts)):

        # Filter posts by date range
        created = post["create_at"] / 1000
        if (before and created > before) or (after and created < after):
            continue

        user_id = post["user_id"]
        if user_id not in user_id_to_name:
            user_id_to_name[user_id] = d.users.get_user(user_id)["username"]
        username = user_id_to_name[user_id]
        created = datetime.utcfromtimestamp(post["create_at"] / 1000).strftime('%Y-%m-%dT%H:%M:%SZ')
        message = post["message"]
        simple_post = dict(id=i_post, created=created, username=username, message=message)

        # If a code block is given in the message, dump it to file
        if message.count("```") > 1:
            start_pos = message.find("```") + 3
            end_pos = message.rfind("```")

            cut = message[start_pos:end_pos]
            if not len(cut):
                print("Code cut has no length")
            else:
                filename = "%03d" % i_post + "_code.txt"
                with open(output_base / filename, "w") as f:
                    f.write(cut)

        # If any files are attached to the message, download each
        if "files" in post["metadata"]:
            filenames = []
            for file in post["metadata"]["files"]:
                if download_files:
                    filename = "%03d" % i_post + "_" + file["name"]
                    print("Downloading", file["name"])
                    resp = d.files.get_file(file["id"])
                    # Mattermost Driver unfortunately parses json files to dicts
                    if isinstance(resp, dict):
                        with open(output_base / filename, "w") as f:
                            json.dump(resp, f)
                    else:
                        with open(output_base / filename, "wb") as f:
                            f.write(resp.content)

                filenames.append(file["name"])
            simple_post["files"] = filenames
        simple_posts.append(simple_post)

    # Export posts to json file
    output_filename = channel_name + ".json"
    with open(output_base / output_filename, "w", encoding='utf8') as f:
        json.dump(simple_posts, f, indent=2, ensure_ascii=False)
    print("Dumped channel texts to", output_filename)


if __name__ == '__main__':
    host = "mattermost.web.cern.ch"
    username = ""  # Your gitlab username
    login_token = ""  # Access Token. Can be extracted from Browser Inspector (MMAUTHTOKEN)
    output_base = "results/"
    download_files = True

    # Range of posts to be exported as string in format "YYYY-MM-DD". Use None if no filter should be applied
    after = None
    before = None

    d, user_id_to_name, my_user_id = connect(host, username, login_token)
    team = select_team(d, my_user_id)
    channels = select_channel(d, team, my_user_id, user_id_to_name)
    for channel in channels:
        export_channel(d, channel, user_id_to_name, output_base, before, after)
    print("Finished export")

@egede
Copy link
Member Author

egede commented Mar 13, 2024

Sorry for the slow reply. No, this is not part of the challenge, but simply posted there as information for whoever will be the GSoC student. You can take it as information though if you decide to write up a project proposal that you submit to Google.

@dg1223
Copy link
Contributor

dg1223 commented Mar 14, 2024

No problem, thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants